morseanalyser/PicoAirQuality.py
Robin Clark 12846d5a8a Button B beeps morse and records the milli second time stamp
Next make a buffered record of ON OFF button presses along
with the time stamps so that a non interrupt
function can determine which morse values have been entered.

This can then be displayed on the screen.

This is based on the PicoAirQ code atmo.

In the fututre there may be a microphone amplified and filtered
to only hear morse frquencies at audio. this pcb could then listen to morse
being played and decode it.
An RS-232 output could be used for logging to
other computers etc.
2022-07-16 20:01:18 +01:00

1226 lines
53 KiB
Python

import math
import framebuf
import array
import os
from machine import Pin, PWM, ADC, time_pulse_us, I2C, RTC
from rp2 import PIO, StateMachine, asm_pio
from time import sleep, sleep_ms, sleep_us, ticks_ms, ticks_us
from micropython import const
# Initialise the module with all outputs off
# High Power Output Pins
hp_3 = Pin(3, Pin.OUT)
hp_15 = Pin(15, Pin.OUT)
hp_3.value(0)
hp_15.value(0)
# Servo
servo = Pin(2, Pin.OUT)
servo.value(0)
# Buzzer
buzzer = PWM(Pin(4))
buzzer.duty_u16(0)
# The KitronikButton class enable the use of the 2 user input buttons on the board
class KitronikButton:
def __init__(self):
self.buttonA = Pin(12, Pin.IN, Pin.PULL_DOWN)
self.buttonB = Pin(13, Pin.IN, Pin.PULL_DOWN)
# The KitronikOutputControl class enables control of the servo and high-power outputs on the board
class KitronikOutputControl:
# This code drives a pwm on the PIO. It is running at 2Mhz, which gives the PWM a 1uS resolution.
@asm_pio(sideset_init=PIO.OUT_LOW)
def _servo_pwm():
# First we clear the pin to zero, then load the registers. Y is always 20000 - 20uS, x is the pulse 'on' length.
pull(noblock) .side(0)
mov(x, osr) # Keep most recent pull data stashed in X, for recycling by noblock
mov(y, isr) # ISR must be preloaded with PWM count max
# This is where the looping work is done. the overall loop rate is 1Mhz (clock is 2Mhz - we have 2 instructions to do)
label("loop")
jmp(x_not_y, "skip") #if there is 'excess' Y number leave the pin alone and jump to the 'skip' label until we get to the X value
nop() .side(1)
label("skip")
jmp(y_dec, "loop") #count down y by 1 and jump to pwmloop. When y is 0 we will go back to the 'pull' command
def __init__(self):
# High Power Output Pins
self.highPwr_3 = Pin(3, Pin.OUT)
self.highPwr_15 = Pin(15, Pin.OUT)
# Servo Control
self.servo = []
# Servo 0 degrees -> pulse of 0.5ms, 180 degrees 2.5ms
# Pulse train freq 50hz - 20ms
# 1us is freq of 1000000
# Servo pulses range from 500 to 2500us and overall pulse train is 20000us repeat.
# Servo is on GP2
self.maxServoPulse = 2500
self.minServoPulse = 500
self.pulseTrain = 20000
self.degreesToUS = 2000/180
# Create and start the servo statemachine
self.servo.append (StateMachine(1, self._servo_pwm, freq=2000000, sideset_base=Pin(2)))
self.servo[0].put(self.pulseTrain)
self.servo[0].exec("pull()")
self.servo[0].exec("mov(isr, osr)")
self.registerServo()
# Doesn't actually register/unregister, just stops and starts the servo PIO
# The servo is registered by default in the __init__() function - these are only required if you want to use Pin 2 for something else, and then register the servo again
def registerServo(self):
if(not self.servo[0].active()):
self.servo[0].active(1)
def deregisterServo(self):
if(self.servo[0].active()):
self.servo[0].active(0)
# goToPosition takes a degree position for the servo to go to.
# 0degrees->180 degrees is 0->2000us, plus offset of 500uS
# 1 degree ~ 11uS.
# This function does the sum then calls goToPeriod to actually poke the PIO
def servoToPosition(self, degrees):
pulseLength = int(degrees*self.degreesToUS + 500)
self.servoToPeriod(pulseLength)
def servoToPeriod(self, period):
if(period < 500):
period = 500
if(period >2500):
period =2500
self.servo[0].put(period)
# Functions to turn on/off the high power outputs
# Enter the pin number, either '3' or '15'
def highPowerOn(self, pin):
if (pin == 3):
self.highPwr_3.value(1)
elif (pin == 15):
self.highPwr_15.value(1)
def highPowerOff(self, pin):
if (pin == 3):
self.highPwr_3.value(0)
elif (pin == 15):
self.highPwr_15.value(0)
# The KitronikDataLogger class enables data logging through the Pico file system
# It is possible to create multiple data logger instances to then log to multiple files simulataneously
class KitronikDataLogger:
# Function is called when the class is initialised - sets the maximum permissable filesize, the data separator and creates the log file with the entered filename
# Separator options: ("comma", "semicolon", "tab")
def __init__(self, file = "data_log.txt", separator = "semicolon"):
self.MAX_FILE_SIZE = 500000 # This is approximately 10000 full entries
if (separator == "comma"):
self.SEPARATOR = ","
elif (separator == "semicolon"):
self.SEPARATOR = ";"
elif (separator == "tab"):
self.SEPARATOR = "\t"
self.FILENAME = file
try:
f = open(self.FILENAME, "x")
f.close()
except OSError:
print("File already exists")
self.line1 = ""
self.line2 = ""
self.line3 = ""
self.dataHeadings = ""
self.projectInfo = False
self.headings = False
# Write a header section to the specified file (there are 3 free text fields, each will write on a separate line)
def writeProjectInfo(self, line1="", line2="", line3=""):
if (line1 != ""):
self.writeFile(self.FILENAME, line1 + "\r\n")
self.line1 = line1
if (line2 != ""):
self.writeFile(self.FILENAME, line2 + "\r\n")
self.line2 = line2
if (line3 != ""):
self.writeFile(self.FILENAME, line3 + "\r\n")
self.line3 = line3
self.projectInfo = True
# This writes whatever is passed to it to the file
def writeFile(self, file, passed):
f = open(file, "a") #open in append - creates if not existing, will append if it exists
f.write(passed)
f.close()
# Input and write to the file up to 10 data field headings
def nameColumnHeadings(self, field1="", field2="", field3="", field4="", field5="", field6="", field7="", field8="", field9="", field10=""):
dataHeadings = ""
if (field1 != ""):
dataHeadings = field1 + self.SEPARATOR
if (field2 != ""):
dataHeadings = dataHeadings + field2 + self.SEPARATOR
if (field3 != ""):
dataHeadings = dataHeadings + field3 + self.SEPARATOR
if (field4 != ""):
dataHeadings = dataHeadings + field4 + self.SEPARATOR
if (field5 != ""):
dataHeadings = dataHeadings + field5 + self.SEPARATOR
if (field6 != ""):
dataHeadings = dataHeadings + field6 + self.SEPARATOR
if (field7 != ""):
dataHeadings = dataHeadings + field7 + self.SEPARATOR
if (field8 != ""):
dataHeadings = dataHeadings + field8 + self.SEPARATOR
if (field9 != ""):
dataHeadings = dataHeadings + field9 + self.SEPARATOR
if (field10 != ""):
dataHeadings = dataHeadings + field10 + self.SEPARATOR
self.dataHeadings = dataHeadings
self.writeFile(self.FILENAME, dataHeadings + "\r\n")
self.headings = True
# Store up to 10 data entries (match the order with the data headings used)
def storeDataEntry(self, field1="", field2="", field3="", field4="", field5="", field6="", field7="", field8="", field9="", field10=""):
dataEntry = ""
if (field1 != ""):
dataEntry = field1 + self.SEPARATOR
if (field2 != ""):
dataEntry = dataEntry + field2 + self.SEPARATOR
if (field3 != ""):
dataEntry = dataEntry + field3 + self.SEPARATOR
if (field4 != ""):
dataEntry = dataEntry + field4 + self.SEPARATOR
if (field5 != ""):
dataEntry = dataEntry + field5 + self.SEPARATOR
if (field6 != ""):
dataEntry = dataEntry + field6 + self.SEPARATOR
if (field7 != ""):
dataEntry = dataEntry + field7 + self.SEPARATOR
if (field8 != ""):
dataEntry = dataEntry + field8 + self.SEPARATOR
if (field9 != ""):
dataEntry = dataEntry + field9 + self.SEPARATOR
if (field10 != ""):
dataEntry = dataEntry + field10 + self.SEPARATOR
while (self.checkFileSize() > self.MAX_FILE_SIZE):
self.removeOneLine()
self.writeFile(self.FILENAME, dataEntry + "\r\n")
# This returns the size of the file, or 0 if the file does not exist
def checkFileSize(self):
# f is a file-like object.
try:
f = open(self.FILENAME, "r") # Open read - this throws an error if file does not exist - in that case the size is 0
f.seek(0, 2)
size = f.tell()
f.close()
return size
except:
# if we wanted to know we could print some diagnostics here like:
#print("Exception - File does not exist")
return 0
# Remove a line from the data file to make space for more data
def removeOneLine(self):
tempName = self.FILENAME + ".bak"
readFrom = open(self.FILENAME, "r")
writeTo = open(tempName, "w")
if self.projectInfo: # If there is Project Info, skip over these lines and then write them to the temporary file
for l in range(3):
readFrom.readline()
writeTo.write(self.line1 + "\r\n")
writeTo.write(self.line2 + "\r\n")
writeTo.write(self.line3 + "\r\n")
if self.headings:
readFrom.readline() # If there are Headings, skip over this line and then write them to the temporary file
writeTo.write(self.dataHeadings + "\r\n")
readFrom.readline() # Read and throw away the first line of data in the file
for lines in readFrom: # Write the remaining lines to the temporary file
writeTo.write(lines)
readFrom.close() # Close both files
writeTo.close()
os.remove(self.FILENAME) # Delete original log file
os.rename(tempName, self.FILENAME) # Rename temporary file as new log file (now with the first line of data removed)
# Deletes all the contents of the file
def eraseAllData(self):
f = open(self.FILENAME, "w")
f.write("")
f.close()
# Deletes the file from the Pico file system
def deleteDataFile(self):
os.remove(self.FILENAME)
# The KitronikBuzzer class enables control of the piezo buzzer on the board
class KitronikBuzzer:
# Function is called when the class is initialised and sets the buzzer pin to GP4
def __init__(self):
self.buzzer = PWM(Pin(4))
self.dutyCycle = 32767
# Play a continous tone at a specified frequency
def playTone(self, freq):
if (freq < 30):
freq = 30
if (freq > 3000):
freq = 3000
self.buzzer.freq(freq)
self.buzzer.duty_u16(self.dutyCycle)
# Play a tone at a speciied frequency for a specified length of time in ms
def playTone_Length(self, freq, length):
self.playTone(freq)
sleep_ms(length)
self.stopTone()
# Stop the buzzer producing a tone
def stopTone(self):
self.buzzer.duty_u16(0)
# The KitronikZIPLEDs class enables control of the ZIP LEDs both on the board and any connected externally
class KitronikZIPLEDs:
# We drive the ZIP LEDs using one of the PIO statemachines.
@asm_pio(sideset_init=PIO.OUT_LOW, out_shiftdir=PIO.SHIFT_LEFT, autopull=True, pull_thresh=24)
def _ZIPLEDOutput():
T1 = 2
T2 = 5
T3 = 3
wrap_target()
label("bitloop")
out(x, 1) .side(0) [T3 - 1]
jmp(not_x, "do_zero") .side(1) [T1 - 1]
jmp("bitloop") .side(1) [T2 - 1]
label("do_zero")
nop() .side(0) [T2 - 1]
wrap()
def __init__(self, num_zip_leds):
self.num_zip_leds = num_zip_leds
# Create and start the StateMachine for the ZIPLeds
self.ZIPLEDs = StateMachine(4, self._ZIPLEDOutput, freq=8_000_000, sideset_base=Pin(20))
self.theLEDs = array.array("I", [0 for _ in range(self.num_zip_leds)]) #an array for the LED colours.
self.brightness = 0.5 #20% initially
self.ZIPLEDs.active(1)
# Define some colour tuples for people to use.
self.BLACK = (0, 0, 0)
self.RED = (255, 0, 0)
self.YELLOW = (255, 150, 0)
self.GREEN = (0, 255, 0)
self.CYAN = (0, 255, 255)
self.BLUE = (0, 0, 255)
self.PURPLE = (180, 0, 255)
self.WHITE = (255, 255, 255)
self.COLOURS = (self.BLACK, self.RED, self.YELLOW, self.GREEN, self.CYAN, self.BLUE, self.PURPLE, self.WHITE)
# Show pushes the current setup of the LEDS to the physical LEDS - it makes them visible.
def show(self):
brightAdjustedLEDs = array.array("I", [0 for _ in range(self.num_zip_leds)])
for i,c in enumerate(self.theLEDs):
r = int(((c >> 8) & 0xFF) * self.brightness)
g = int(((c >> 16) & 0xFF) * self.brightness)
b = int((c & 0xFF) * self.brightness)
brightAdjustedLEDs[i] = (g<<16) + (r<<8) + b
self.ZIPLEDs.put(brightAdjustedLEDs, 8)
# Turn the LED off by setting the colour to black
def clear(self, whichLED):
self.setLED(whichLED, self.BLACK)
# Sets the colour of an individual LED. Use show to make change visible
def setLED(self, whichLED, whichColour):
if(whichLED<0):
raise Exception("INVALID LED:",whichLED," specified")
elif(whichLED>(self.num_zip_leds - 1)):
raise Exception("INVALID LED:",whichLED," specified")
else:
self.theLEDs[whichLED] = (whichColour[1]<<16) + (whichColour[0]<<8) + whichColour[2]
# Gets the stored colour of an individual LED, which isnt nessecerily the colour on show if it has been changed, but not 'show'n
def getLED(self, whichLED):
if(whichLED<0):
raise Exception("INVALID LED:",whichLED," specified")
elif(whichLED>(self.num_zip_leds - 1)):
raise Exception("INVALID LED:",whichLED," specified")
else:
return(((self.theLEDs[whichLED]>>8) & 0xff), ((self.theLEDs[whichLED]>>16)& 0xff) ,((self.theLEDs[whichLED])& 0xff))
# Takes 0-100 as a brightness value, brighness is applies in the'show' function
def setBrightness(self, value):
#cap to 0-100%
if (value<0):
value = 0
elif (value>100):
value=100
self.brightness = value / 100
# The KitronikRTC class enables use of the Pico RTC
class KitronikRTC:
# Function is called when the class is initialised and creates an instance of the Pico RTC and defines all the global variables
def __init__(self):
self.rtc = RTC()
self.day = 0
self.month = 0
self.year = 0
self.weekday = 0 # In range 0 - 6, 0 = Monday, 6 = Sunday
self.hour = 0
self.minute = 0
self.second = 0
self.alarmHour = 0
self.alarmMinute = 0
self.alarmSet = False
self.alarmTrigger = False
self.alarmRepeat = False
self.hourPeriod = 0
self.minutePeriod = 0
# Function calculates the weekday (0 = Monday, 6 = Sunday) based on the date, taking into account leap years
def calcWeekday(self, day, month, year):
dayOffset = [0, 3, 2, 5, 0, 3, 5, 1, 4, 6, 2, 4]
if (month < 3):
month = month - 1
self.weekday = (year + (year // 4) - (year // 100) + (year // 400) + dayOffset[month - 1] + day) % 7 # Returns 0 - 6, 0 = Sunday, 6 = Saturday
if (self.weekday == 0):
self.weekday = 6
else:
self.weekday = self.weekday - 1
# Set the date on the Pico RTC
def setDate(self, day, month, year):
self.getDateTime()
self.calcWeekday(day, month, year)
self.rtc.datetime((year, month, day, self.weekday, self.hour, self.minute, self.second, 0))
# Set the time on the Pico RTC
def setTime(self, hours, minutes, seconds):
self.getDateTime()
self.rtc.datetime((self.year, self.month, self.day, self.weekday, hours, minutes, seconds, 0))
# Return the current date and time
def getDateTime(self):
newDateTime = self.rtc.datetime()
self.day = newDateTime[2]
self.month = newDateTime[1]
self.year = newDateTime[0]
self.weekday = newDateTime[3]
self.hour = newDateTime[4]
self.minute = newDateTime[5]
self.second = newDateTime[6]
# Return the current date as a string
def readDateString(self):
self.getDateTime()
if (self.day < 10):
day = "0" + str(self.day)
else:
day = str(self.day)
if (self.month < 10):
month = "0" + str(self.month)
else:
month = str(self.month)
date = day + "/" + month + "/" + str(self.year)
return date
# Return the current time as a string
def readTimeString(self):
self.getDateTime()
if (self.hour < 10):
hour = "0" + str(self.hour)
else:
hour = str(self.hour)
if (self.minute < 10):
minute = "0" + str(self.minute)
else:
minute = str(self.minute)
if (self.second < 10):
second = "0" + str(self.second)
else:
second = str(self.second)
time = hour + ":" + minute + ":" + second
return time
# Return a specific date/time parameter as a number
# d = day, m = month, y = year, h = hour, min = minute, s = second
def readParameter(self, parameter):
self.getDateTime()
if (parameter == "d"):
return self.day
elif (parameter == "m"):
return self.month
elif (parameter == "y"):
return self.year
elif (parameter == "h"):
return self.hour
elif (parameter == "min"):
return self.minute
elif (parameter == "s"):
return self.second
# Set an alarm for a specific hour and minute
# Extra options to set a periodically repeating alarm (set alarmRepeat to True and then specifiy the hour and/or minute period between alarms)
def setAlarm(self, hour, minute, alarmRepeat=False, hourPeriod=0, minutePeriod=0):
self.alarmHour = math.ceil(hour)
self.alarmMinute = math.ceil(minute)
self.alarmRepeat = alarmRepeat
if alarmRepeat:
self.hourPeriod = math.ceil(hourPeriod)
self.minutePeriod = math.ceil(minutePeriod)
self.alarmSet = True
# Check whether the alarm conditions have been met and then trigger the alarm
def checkAlarm(self):
if self.alarmSet:
if (self.readParameter("h") == self.alarmHour):
if (self.readParameter("min") == self.alarmMinute):
self.alarmTrigger = True
return self.alarmTrigger
# Sets 'alarmTrigger' back to False, checks whether the alarm should repeat (sets new one if it should) or sets 'alarmSet' back to False
def silenceAlarm(self):
self.alarmTrigger = False
if not self.alarmRepeat:
self.alarmSet = False
else:
newHour = self.alarmHour + self.hourPeriod
newMinute = self.alarmMinute + self.minutePeriod
if (newMinute > 59):
newMinute = newMinute - 60
newHour = newHour + 1
if (newHour > 23):
newHour = newHour - 24
self.setAlarm(newHour, newMinute, True, self.hourPeriod, self.minutePeriod)
# The KitronikBME688 class enables contro and use of the BME688 sensor on the board
class KitronikBME688:
# The following functions are for reading the registers on the BME688
# Function for reading register as signed 8 bit integer
def getUInt8(self, reg):
return int.from_bytes(self.i2c.readfrom_mem(self.CHIP_ADDRESS, reg, 1), "big")
# Function to convert unsigned ints to twos complement signed ints
def twosComp(self, value, bits):
if ((value & (1 << (bits - 1))) != 0):
value = value - (1 << bits)
return value
# Function for proportionally mapping a value to a different value range
def mapValues(self, value, frMin, frMax, toMin, toMax):
toRange = toMax - toMin
mappedVal = toMin + ((value - frMin) * ((toMax - toMin) / (frMax - frMin)))
return mappedVal
def __init__(self, i2cAddr=0x77, sda=6, scl=7):
self.CHIP_ADDRESS = i2cAddr # I2C address as determined by hardware configuration
sda = Pin(sda)
scl = Pin(scl)
self.i2c = I2C(1,sda=sda, scl=scl, freq=100000)
# Useful BME688 Register Addresses
# Control
self.CTRL_MEAS = 0x74 # Bit position <7:5>: Temperature oversampling Bit position <4:2>: Pressure oversampling Bit position <1:0>: Sensor power mode
self.RESET = 0xE0 # Write 0xB6 to initiate soft-reset (same effect as power-on reset)
self.CHIP_ID = 0xD0 # Read this to return the chip ID: 0x61 - good way to check communication is occurring
self.CTRL_HUM = 0x72 # Bit position <2:0>: Humidity oversampling settings
self.CONFIG = 0x75 # Bit position <4:2>: IIR filter settings
self.CTRL_GAS_0 = 0x70 # Bit position <3>: Heater off (set to '1' to turn off current injection)
self.CTRL_GAS_1 = 0x71 # Bit position <5> DATASHEET ERROR: Enable gas conversions to start when set to '1' Bit position <3:0>: Heater step selection (0 to 9)
# Pressure Data
self.PRESS_MSB_0 = 0x1F # Forced & Parallel: MSB [19:12]
self.PRESS_LSB_0 = 0x20 # Forced & Parallel: LSB [11:4]
self.PRESS_XLSB_0 = 0x21 # Forced & Parallel: XLSB [3:0]
# Temperature Data
self.TEMP_MSB_0 = 0x22 # Forced & Parallel: MSB [19:12]
self.TEMP_LSB_0 = 0x23 # Forced & Parallel: LSB [11:4]
self.TEMP_XLSB_0 = 0x24 # Forced & Parallel: XLSB [3:0]
# Humidity Data
self.HUMID_MSB_0 = 0x25 # Forced & Parallel: MSB [15:8]
self.HUMID_LSB_0 = 0x26 # Forced & Parallel: LSB [7:0]
# Gas Resistance Data
self.GAS_RES_MSB_0 = 0x2C # Forced & Parallel: MSB [9:2]
self.GAS_RES_LSB_0 = 0x2D # Forced & Parallel: Bit <7:6>: LSB [1:0] Bit <5>: Gas valid Bit <4>: Heater stability Bit <3:0>: Gas resistance range
# Status
self.MEAS_STATUS_0 = 0x1D # Forced & Parallel: Bit <7>: New data Bit <6>: Gas measuring Bit <5>: Measuring Bit <3:0>: Gas measurement index
# Calibration parameters for compensation calculations
# Temperature
self.PAR_T1 = self.twosComp((self.getUInt8(0xEA) << 8) | self.getUInt8(0xE9), 16) # Signed 16-bit
self.PAR_T2 = self.twosComp((self.getUInt8(0x8B) << 8) | self.getUInt8(0x8A), 16) # Signed 16-bit
self.PAR_T3 = self.twosComp(self.getUInt8(0x8C), 8) # Signed 8-bit
# Pressure
self.PAR_P1 = (self.getUInt8(0x8F) << 8) | self.getUInt8(0x8E) # Always a positive number, do not do twosComp() conversion!
self.PAR_P2 = self.twosComp((self.getUInt8(0x91) << 8) | self.getUInt8(0x90), 16) # Signed 16-bit
self.PAR_P3 = self.twosComp(self.getUInt8(0x92), 8) # Signed 8-bit
self.PAR_P4 = self.twosComp((self.getUInt8(0x95) << 8) | self.getUInt8(0x94), 16) # Signed 16-bit
self.PAR_P5 = self.twosComp((self.getUInt8(0x97) << 8) | self.getUInt8(0x96), 16) # Signed 16-bit
self.PAR_P6 = self.twosComp(self.getUInt8(0x99), 8) # Signed 8-bit
self.PAR_P7 = self.twosComp(self.getUInt8(0x98), 8) # Signed 8-bit
self.PAR_P8 = self.twosComp((self.getUInt8(0x9D) << 8) | self.getUInt8(0x9C), 16) # Signed 16-bit
self.PAR_P9 = self.twosComp((self.getUInt8(0x9F) << 8) | self.getUInt8(0x9E), 16) # Signed 16-bit
self.PAR_P10 = self.twosComp(self.getUInt8(0xA0), 8) # Signed 8-bit
# Humidity
parH1_LSB_parH2_LSB = self.getUInt8(0xE2)
self.PAR_H1 = (self.getUInt8(0xE3) << 4) | (parH1_LSB_parH2_LSB & 0x0F)
self.PAR_H2 = (self.getUInt8(0xE1) << 4) | (parH1_LSB_parH2_LSB >> 4)
self.PAR_H3 = self.twosComp(self.getUInt8(0xE4), 8) # Signed 8-bit
self.PAR_H4 = self.twosComp(self.getUInt8(0xE5), 8) # Signed 8-bit
self.PAR_H5 = self.twosComp(self.getUInt8(0xE6), 8) # Signed 8-bit
self.PAR_H6 = self.twosComp(self.getUInt8(0xE7), 8) # Signed 8-bit
self.PAR_H7 = self.twosComp(self.getUInt8(0xE8), 8) # Signed 8-bit
# Gas resistance
self.PAR_G1 = self.twosComp(self.getUInt8(0xED), 8) # Signed 8-bit
self.PAR_G2 = self.twosComp((self.getUInt8(0xEB) << 8) | self.getUInt8(0xEC), 16) # Signed 16-bit
self.PAR_G3 = self.getUInt8(0xEE) # Unsigned 8-bit
self.RES_HEAT_RANGE = (self.getUInt8(0x02) >> 4) & 0x03
self.RES_HEAT_VAL = self.twosComp(self.getUInt8(0x00), 8) # Signed 8-bit
# Oversampling rate constants
self.OSRS_1X = 0x01
self.OSRS_2X = 0x02
self.OSRS_4X = 0x03
self.OSRS_8X = 0x04
self.OSRS_16X = 0x05
# IIR filter coefficient values
self.IIR_0 = 0x00
self.IIR_1 = 0x01
self.IIR_3 = 0x02
self.IIR_7 = 0x03
self.IIR_15 = 0x04
self.IIR_31 = 0x05
self.IIR_63 = 0x06
self.IIR_127 = 0x07
#Global variables used for storing one copy of value, these are used in multiple locations for calculations
self.bme688InitFlag = False
self.gasInit = False
self.tRead = 0 # calculated readings of sensor parameters from raw adc readings
self.pRead = 0
self.hRead = 0
self.gRes = 0
self.iaqPercent = 0
self.iaqScore = 0
self.airQualityRating = ""
self.eCO2Value = 0
self.gBase = 0
self.hBase = 40 # Between 30% & 50% is a widely recognised optimal indoor humidity, 40% is a good middle ground
self.hWeight = 0.25 # Humidity contributes 25% to the IAQ score, gas resistance is 75%
self.hPrev = 0
self.measTime = 0
self.measTimePrev = 0
self.tRaw = 0 # adc reading of raw temperature
self.pRaw = 0 # adc reading of raw pressure
self.hRaw = 0 # adc reading of raw humidity
self.gResRaw = 0 # adc reading of raw gas resistance
self.gasRange = 0
self.t_fine = 0 # Intermediate temperature value used for pressure calculation
self.newAmbTemp = 0
self.tAmbient = 0 # Intermediate temperature value used for heater calculation
self.ambTempFlag = False
# Create an instance of the OLED display screen for use during setup and for error messages
self.screen = KitronikOLED()
# Begin the hardware inititialisation for the BME688 sensor
self.bme688Init()
# Temperature compensation calculation: rawADC to degrees C (integer)
def calcTemperature(self, tempADC):
var1 = (tempADC >> 3) - (self.PAR_T1 << 1)
var2 = (var1 * self.PAR_T2) >> 11
var3 = ((((var1 >> 1) * (var1 >> 1)) >> 12) * (self.PAR_T3 << 4)) >> 14
self.t_fine = var2 + var3
self.newAmbTemp = ((self.t_fine * 5) + 128) >> 8
self.tRead = self.newAmbTemp / 100 # Convert to floating point with 2 dp
if (self.ambTempFlag == False):
self.tAmbient = self.newAmbTemp
# Pressure compensation calculation: rawADC to Pascals (integer)
def intCalcPressure(self, pressureADC):
var1 = (self.t_fine >> 1) - 64000
var2 = ((((var1 >> 2) * (var1 >> 2)) >> 11) * self.PAR_P6) >> 2
var2 = var2 + ((var1 * self.PAR_P5) << 1)
var2 = (var2 >> 2) + (self.PAR_P4 << 16)
var1 = (((((var1 >> 2) * (var1 >> 2)) >> 13) * (self.PAR_P3 << 5)) >> 3) + ((self.PAR_P2 * var1) >> 1)
var1 = var1 >> 18
var1 = ((32768 + var1) * self.PAR_P1) >> 15
self.pRead = 1048576 - pressureADC
self.pRead = ((self.pRead - (var2 >> 12)) * 3125)
if (self.pRead >= (1 << 30)):
self.pRead = (self.pRead // var1) << 1
else:
self.pRead = ((self.pRead << 1) // var1)
var1 = (self.PAR_P9 * (((self.pRead >> 3) * (self.pRead >> 3)) >> 13)) >> 12
var2 = ((self.pRead >> 2) * self.PAR_P8) >> 13
var3 = ((self.pRead >> 8) * (self.pRead >> 8) * (self.pRead >> 8) * self.PAR_P10) >> 17
self.pRead = self.pRead + ((var1 + var2 + var3 + (self.PAR_P7 << 7)) >> 4)
# Humidity compensation calculation: rawADC to % (integer)
# 'tempScaled' is the current reading from the Temperature sensor
def intCalcHumidity(self, humidADC, tempScaled):
self.hPrev = self.hRead
tempScaled = math.trunc(tempScaled)
var1 = humidADC - (self.PAR_H1 << 4) - (((tempScaled * self.PAR_H3) // 100) >> 1)
var2 = (self.PAR_H2 * (((tempScaled * self.PAR_H4) // 100) + (((tempScaled * ((tempScaled * self.PAR_H5) // 100)) >> 6) // 100) + (1 << 14))) >> 10
var3 = var1 * var2
var4 = ((self.PAR_H6 << 7) + ((tempScaled * self.PAR_H7) // 100)) >> 4
var5 = ((var3 >> 14) * (var3 >> 14)) >> 10
var6 = (var4 * var5) >> 1
self.hRead = (var3 + var6) >> 12
self.hRead = (((var3 + var6) >> 10) * (1000)) >> 12
self.hRead = self.hRead // 1000
# Gas sensor heater target temperature to target resistance calculation
# 'ambientTemp' is reading from Temperature sensor in degC (could be averaged over a day when there is enough data?)
# 'targetTemp' is the desired temperature of the hot plate in degC (in range 200 to 400)
# Note: Heating duration also needs to be specified for each heating step in 'gas_wait' registers
def intConvertGasTargetTemp(self, ambientTemp, targetTemp):
var1 = ((ambientTemp * self.PAR_G3) // 1000) << 8 # Divide by 1000 as we have ambientTemp in pre-degC format (i.e. 2500 rather than 25.00 degC)
var2 = (self.PAR_G1 + 784) * (((((self.PAR_G2 + 154009) * targetTemp * 5) // 100) + 3276800) // 10)
var3 = var1 + (var2 >> 1)
var4 = (var3 // (self.RES_HEAT_RANGE + 4))
var5 = (131 * self.RES_HEAT_VAL) + 65536 # Target heater resistance in Ohms
resHeatX100 = (((var4 // var5) - 250) * 34)
resHeat = ((resHeatX100 + 50) // 100)
return resHeat
# Gas resistance compensation calculation: rawADC & range to Ohms (integer)
def intCalcgRes(self, gasADC, gasRange):
var1 = 262144 >> gasRange
var2 = gasADC - 512
var2 = var2 * 3
var2 = 4096 + var2
calcGasRes = ((10000 * var1) // var2)
self.gRes = calcGasRes * 100
# Initialise the BME688, establishing communication, entering initial T, P & H oversampling rates, setup filter and do a first data reading (won't return gas)
def bme688Init(self):
# Establish communication with BME688
chipID = self.i2c.readfrom_mem(self.CHIP_ADDRESS, self.CHIP_ID, 1)
chipID = int.from_bytes(chipID, "big")
while (chipID != 97):
chipID = self.i2c.readfrom_mem(self.CHIP_ADDRESS, self.CHIP_ID, 1)
# Do a soft reset
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.RESET, "\xB6")
sleep_ms(1000)
# Set mode to SLEEP MODE: CTRL_MEAS reg <1:0>
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, "\x00")
# Set the oversampling rates for Temperature, Pressure and Humidity
# Humidity: CTRL_HUM bits <2:0>
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_HUM, str(self.OSRS_2X))
# Temperature: CTRL_MEAS bits <7:5> Pressure: CTRL_MEAS bits <4:2> (Combine and write both in one command)
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, str(((self.OSRS_2X << 5) | (self.OSRS_16X << 2))))
# IIR Filter: CONFIG bits <4:2>
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CONFIG, str(self.IIR_3 << 2))
# Enable gas conversion: CTRL_GAS_1 bit <5> (although datasheet says <4> - not sure what's going on here...)
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_GAS_1, "\x20")
self.bme688InitFlag = True
# Do an initial data read (will only return temperature, pressure and humidity as no gas sensor parameters have been set)
self.measureData()
# Setup the gas sensor (defaults are 300°C and 180ms).
# targetTemp is the target temperature for the gas sensor plate to reach (200 - 400°C), eg: 300
# heatDuration is the length of time for the heater to be turned on (0 - 4032ms), eg: 180
# WARNING: The temperature and duration values can be changed but this is not recommended unless the user is familiar with gas sensor setup
# The default values have been chosen as they provide a good all-round sensor response for air quality purposes
def setupGasSensor(self, targetTemp=300, heatDuration=180):
if (self.bme688InitFlag == False):
self.bme688Init()
# Limit targetTemp between 200°C & 400°C
if (targetTemp < 200):
targetTemp = 200
elif (targetTemp > 400):
targetTemp = 400
# Limit heatDuration between 0ms and 4032ms
if (heatDuration < 0):
heatDuration = 0
elif (heatDuration > 4032):
heatDuration = 4032
# Define the target heater resistance from temperature
self.i2c.writeto_mem(self.CHIP_ADDRESS, 0x5A, self.intConvertGasTargetTemp(self.tAmbient, targetTemp).to_bytes(1, 'big')) # res_wait_0 register - heater step 0
# Define the heater on time, converting ms to register code (Heater Step 0) - cannot be greater than 4032ms
# Bits <7:6> are a multiplier (1, 4, 16 or 64 times) Bits <5:0> are 1ms steps (0 to 63ms)
codedDuration = 0
if (heatDuration < 4032):
factor = 0
while (heatDuration > 63):
heatDuration = (heatDuration // 4)
factor = factor + 1
codedDuration = heatDuration + (factor * 64)
else:
codedDuration = 255
self.i2c.writeto_mem(self.CHIP_ADDRESS, 0x64, codedDuration.to_bytes(1, 'big')) # gas_wait_0 register - heater step 0
# Select index of heater step (0 to 9): CTRL_GAS_1 reg <3:0> (Make sure to combine with gas enable setting already there)
gasEnable = self.getUInt8(self.CTRL_GAS_1) & 0x20
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_GAS_1, (0x00 | gasEnable).to_bytes(1, 'big')) # Select heater step 0
self.gasInit = True
# Run all measurements on the BME688: Temperature, Pressure, Humidity & Gas Resistance.
def measureData(self):
if (self.bme688InitFlag == False):
self.bme688Init()
self.measTimePrev = self.measTime # Store previous measurement time (ms since micro:bit powered on)
# Set mode to FORCED MODE to begin single read cycle: CTRL_MEAS reg <1:0> (Make sure to combine with temp/pressure oversampling settings already there)
oSampleTP = self.getUInt8(self.CTRL_MEAS)
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, str((0x01 | oSampleTP)))
# Check New Data bit to see if values have been measured: MEAS_STATUS_0 bit <7>
newData = (self.getUInt8(self.MEAS_STATUS_0) & 0x80) >> 7
while (newData != 1):
newData = (self.getUInt8(self.MEAS_STATUS_0) & 0x80) >> 7
# Check Heater Stability Status bit to see if gas values have been measured: <4> (heater stability)
heaterStable = (self.getUInt8(self.GAS_RES_LSB_0) & 0x10) >> 4
# If there is new data, read temperature ADC registers(this is required for all other calculations)
self.tRaw = (self.getUInt8(self.TEMP_MSB_0) << 12) | (self.getUInt8(self.TEMP_LSB_0) << 4) | (self.getUInt8(self.TEMP_XLSB_0) >> 4)
# Read pressure ADC registers
self.pRaw = (self.getUInt8(self.PRESS_MSB_0) << 12) | (self.getUInt8(self.PRESS_LSB_0) << 4) | (self.getUInt8(self.PRESS_XLSB_0) >> 4)
# Read humidity ADC registers
self.hRaw = (self.getUInt8(self.HUMID_MSB_0) << 8) | (self.getUInt8(self.HUMID_LSB_0) >> 4)
# Read gas resistance ADC registers
self.gResRaw = (self.getUInt8(self.GAS_RES_MSB_0) << 2) | self.getUInt8(self.GAS_RES_LSB_0) >> 6 # Shift bits <7:6> right to get LSB for gas resistance
gasRange = self.getUInt8(self.GAS_RES_LSB_0) & 0x0F
self.measTime = ticks_ms() # Capture latest measurement time (ms since Pico powered on)
# Calculate the compensated reading values from the the raw ADC data
self.calcTemperature(self.tRaw)
self.intCalcPressure(self.pRaw)
self.intCalcHumidity(self.hRaw, self.tRead)
self.intCalcgRes(self.gResRaw, gasRange)
# A baseline gas resistance is required for the IAQ calculation - it should be taken in a well ventilated area without obvious air pollutants
# Take 60 readings over a ~5min period and find the mean
# Establish the baseline gas resistance reading and the ambient temperature.
# These values are required for air quality calculations
# When the baseline process is complete, values for gBase and tAmbient are stored in a file
# On subsequent power cycles of the board, this function will look for that file and take the baseline values stored there
# To force the baselines process to be run again, call the function like this: calcBaselines(True)
def calcBaselines(self, forcedRun=False):
if (self.bme688InitFlag == False):
self.bme688Init()
if (self.gasInit == False):
self.setupGasSensor()
self.screen.clear()
self.screen.displayText("Setting Baseline", 2)
self.screen.show()
try: # Look for a 'baselines.txt' file existing - if it does, take the baseline values from there (unless 'forcedRun' is set to True)
if not forcedRun:
f = open("baselines.txt", "r")
self.gBase = float(f.readline())
self.tAmbient = float(f.readline())
else:
raise Exception("RUNNING BASELINE PROCESS")
except: # If there is no file, an exception is raised, and the baseline process will be carried out (creating a new file at the end)
self.ambTempFlag = False
burnInReadings = 0
burnInData = 0
ambTotal = 0
progress = 0
while (burnInReadings < 60): # Measure data and continue summing gas resistance until 60 readings have been taken
progress = math.trunc((burnInReadings / 60) * 100)
self.screen.clear()
self.screen.displayText(str(progress) + "%", 4, 50)
self.screen.displayText("Setting Baseline", 2)
self.screen.show()
self.measureData()
burnInData = burnInData + self.gRes
ambTotal = ambTotal + self.newAmbTemp
sleep_ms(5000)
burnInReadings = burnInReadings + 1
self.gBase = (burnInData / 60) # Find the mean gas resistance during the period to form the baseline
self.tAmbient = (ambTotal / 60) # Calculate the ambient temperature as the mean of the 60 initial readings
# Save baseline values to a file
f = open("baselines.txt", "w") #open in write - creates if not existing, will overwrite if it does
f.write(str(self.gBase) + "\r\n")
f.write(str(self.tAmbient) + "\r\n")
f.close()
self.ambTempFlag = True
self.screen.clear()
self.screen.displayText("Setup Complete!", 2)
self.screen.show()
sleep_ms(2000)
self.screen.clear()
self.screen.show()
# Read Temperature from sensor as a Number.
# Units for temperature are in °C (Celsius) or °F (Fahrenheit) according to selection.
def readTemperature(self, temperature_unit="C"):
temperature = self.tRead
# Change temperature from °C to °F if user selection requires it
if (temperature_unit == "F"):
temperature = ((temperature * 18) + 320) / 10
return temperature
# Read Pressure from sensor as a Number.
# Units for pressure are in Pa (Pascals) or mBar (millibar) according to selection.
def readPressure(self, pressure_unit="Pa"):
pressure = self.pRead
#Change pressure from Pascals to millibar if user selection requires it
if (pressure_unit == "mBar"):
pressure = pressure / 100
return pressure
# Read Humidity from sensor as a Number.
# Humidity is output as a percentage.
def readHumidity(self):
return self.hRead
# Read Gas Resistance from sensor as a Number.
# Units for gas resistance are in Ohms.
def readGasRes(self):
if (self.gasInit == False):
self.screen.clear()
self.screen.displayText("ERROR", 2)
self.screen.displayText("Setup Gas Sensor", 3)
self.screen.show()
return 0
return self.gRes
# Read eCO2 from sensor as a Number (250 - 40000+ppm).
# Units for eCO2 are in ppm (parts per million).
def readeCO2(self):
if (self.gasInit == False):
self.screen.clear()
self.screen.displayText("ERROR", 2)
self.screen.displayText("Setup Gas Sensor", 3)
self.screen.show()
return 0
self.calcAirQuality()
return self.eCO2Value
# Return the Air Quality rating as a percentage (0% = Bad, 100% = Excellent).
def getAirQualityPercent(self):
if (self.gasInit == False):
self.screen.clear()
self.screen.displayText("ERROR", 2)
self.screen.displayText("Setup Gas Sensor", 3)
self.screen.show()
return 0
self.calcAirQuality()
return self.iaqPercent
# Return the Air Quality rating as an IAQ score (500 = Bad, 0 = Excellent).
# These values are based on the BME688 datasheet, Page 11, Table 6.
def getAirQualityScore(self):
if (self.gasInit == False):
self.screen.clear()
self.screen.displayText("ERROR", 2)
self.screen.displayText("Setup Gas Sensor", 3)
self.screen.show()
return 0
self.calcAirQuality()
return self.iaqScore
# Calculate the Index of Air Quality score from the current gas resistance and humidity readings
# iaqPercent: 0 to 100% - higher value = better air quality
# iaqScore: 25 should correspond to 'typically good' air, 250 to 'typically polluted' air
# airQualityRating: Text output based on the iaqScore
# Calculate the estimated CO2 value (eCO2)
def calcAirQuality(self):
humidityScore = 0
gasScore = 0
humidityOffset = self.hRead - self.hBase # Calculate the humidity offset from the baseline setting
ambTemp = (self.tAmbient / 100)
temperatureOffset = self.tRead - ambTemp # Calculate the temperature offset from the ambient temperature
humidityRatio = ((humidityOffset / self.hBase) + 1)
temperatureRatio = (temperatureOffset / ambTemp)
# IAQ Calculations
if (humidityOffset > 0): # Different paths for calculating the humidity score depending on whether the offset is greater than 0
humidityScore = (100 - self.hRead) / (100 - self.hBase)
else:
humidityScore = self.hRead / self.hBase
humidityScore = humidityScore * self.hWeight * 100
gasRatio = (self.gRes / self.gBase)
if ((self.gBase - self.gRes) > 0): # Different paths for calculating the gas score depending on whether the offset is greater than 0
gasScore = gasRatio * (100 * (1 - self.hWeight))
else:
# Make sure that when the gas offset and humidityOffset are 0, iaqPercent is 95% - leaves room for cleaner air to be identified
gasScore = math.floor(70 + (5 * (gasRatio - 1)))
if (gasScore > 75):
gasScore = 75
self.iaqPercent = math.trunc(humidityScore + gasScore) # Air quality percentage is the sum of the humidity (25% weighting) and gas (75% weighting) scores
self.iaqScore = (100 - self.iaqPercent) * 5 # Final air quality score is in range 0 - 500 (see BME688 datasheet page 11 for details)
# eCO2 Calculations
self.eCO2Value = 250 * math.pow(math.e, (0.012 * self.iaqScore)) # Exponential curve equation to calculate the eCO2 from an iaqScore input
# Adjust eCO2Value for humidity and/or temperature greater than the baseline values
if (humidityOffset > 0):
if (temperatureOffset > 0):
self.eCO2Value = self.eCO2Value * (humidityRatio + temperatureRatio)
else:
self.eCO2Value = self.eCO2Value * humidityRatio
elif (temperatureOffset > 0):
self.eCO2Value = self.eCO2Value * (temperatureRatio + 1)
# If measurements are taking place rapidly, breath detection is possible due to the sudden increase in humidity (~7-10%)
# If this increase happens within a 5s time window, 1200ppm is added to the eCO2 value
# (These values were based on 'breath-testing' with another eCO2 sensor with algorithms built-in)
if ((self.measTime - self.measTimePrev) <= 5000):
if ((self.hRead - self.hPrev) >= 3):
self.eCO2Value = self.eCO2Value + 1500
self.eCO2Value = math.trunc(self.eCO2Value)
# The KitronikOLED class enables control of the OLED display screen on the board
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class KitronikOLED(framebuf.FrameBuffer):
# Write commands to the OLED controller
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.CHIP_ADDRESS, self.temp)
# Write data to the OLED controller
def write_data(self, buf):
self.write_list[1] = buf
self.i2c.writevto(self.CHIP_ADDRESS, self.write_list)
# Runs on initialisation of the class
# Sets up all the register definitions and global variables
def __init__(self, i2cAddr=0x3C, sda=6, scl=7):
self.CHIP_ADDRESS = i2cAddr # I2C address as determined by hardware configuration
# register definitions
self.SET_CONTRAST = const(0x81)
self.SET_ENTIRE_ON = const(0xA4)
self.SET_NORM_INV = const(0xA6)
self.SET_DISP = const(0xAE)
self.SET_MEM_ADDR = const(0x20)
self.SET_COL_ADDR = const(0x21)
self.SET_PAGE_ADDR = const(0x22)
self.SET_DISP_START_LINE = const(0x40)
self.SET_SEG_REMAP = const(0xA0)
self.SET_MUX_RATIO = const(0xA8)
self.SET_COM_OUT_DIR = const(0xC0)
self.SET_DISP_OFFSET = const(0xD3)
self.SET_COM_PIN_CFG = const(0xDA)
self.SET_DISP_CLK_DIV = const(0xD5)
self.SET_PRECHARGE = const(0xD9)
self.SET_VCOM_DESEL = const(0xDB)
self.SET_CHARGE_PUMP = const(0x8D)
sda = Pin(sda)
scl = Pin(scl)
self.i2c = I2C(1,sda=sda, scl=scl, freq=100000)
self.plotArray = []
self.plotYMin = 0
self.plotYMax = 100
self.yPixelMin = 63
self.yPixelMax = 12
self.temp = bytearray(2)
self.write_list = [b"\x40", None] # Co=0, D/C#=1
self.width = 128
self.height = 64
self.external_vcc = False
self.pages = 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
# Initialise the display settings and start the display clear
def init_display(self):
for cmd in (
self.SET_DISP | 0x00, # off
# address setting
self.SET_MEM_ADDR,
0x00, # horizontal
# resolution and layout
self.SET_DISP_START_LINE | 0x00,
self.SET_SEG_REMAP,# | 0x01, # Set to either 0xA0 or A1, flips screen horizontally
self.SET_MUX_RATIO,
self.height - 1,
self.SET_COM_OUT_DIR, #| 0x08, # Set to either 0xC0 or 0xC8, flips screen vertically
self.SET_DISP_OFFSET,
0x00,
self.SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
self.SET_DISP_CLK_DIV,
0x80,
self.SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
self.SET_VCOM_DESEL,
0x30, # 0.83*Vcc
# display
self.SET_CONTRAST,
0xFF, # maximum
self.SET_ENTIRE_ON, # output follows RAM contents
self.SET_NORM_INV, # not inverted
# charge pump
self.SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
self.SET_DISP | 0x01,
): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
# Screen will switch off, but retain the information that was displayed
def poweroff(self):
self.write_cmd(self.SET_DISP | 0x00)
# Turn the screen back on - do not need to re-display what was showing as the information is retained
def poweron(self):
self.write_cmd(self.SET_DISP | 0x01)
# 0 = Dim to 150 = Bright
def contrast(self, contrast):
self.write_cmd(self.SET_CONTRAST)
self.write_cmd(contrast)
# 0 = White on black, 1 = Black on white
def invert(self, invert):
self.write_cmd(self.SET_NORM_INV | (invert & 1))
# Set text to display on a particular line (1 - 6) and an x-axis offset can be set (0 - 127, 0 is default)
# If the text is longer than than the screen it will be cut off, it will not be pushed to the next line (16 characters max per line)
# Need to call 'show()' to make the text actually display
def displayText(self, text, line, x_offset=0):
if (line < 1):
line = 1
if (line > 6):
line = 6
y = (line * 11) - 10
super().text(text, x_offset, y)
# Make what has been set to display actually appear on the screen
# Needs to be called after 'displayText()', 'plot()', clear()', 'drawLine()' & 'drawRect()'
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(self.SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(self.SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
# Plot a live updating graph of a variable
# Plot y range is pixels 12 down to 63, leaving room for a title or similar on the first line
# Need to call 'show()' to make the plot actually display
def plot(self, variable):
variable = math.trunc(variable)
if (variable > self.plotYMax):
self.plotYMax = variable
if (variable < self.plotYMin):
self.plotYMin = variable
entries = len(self.plotArray)
if (entries >= 128):
prevX = 0
prevY = self.plotArray[127]
self.plotArray.pop(0)
self.plotArray.append(variable)
else:
self.plotArray.append(variable)
prevX = len(self.plotArray) - 1
prevY = self.plotArray[prevX]
for entry in range(entries):
x = entry
y = self.plotArray[entry]
y = math.trunc(self.yPixelMin - (y * ((self.yPixelMin - self.yPixelMax) / (self.plotYMax - self.plotYMin))))
if (x == 0):
super().pixel(x, y, 1)
else:
self.drawLine(prevX, prevY, x, y)
prevX = x
prevY = y
# Wipe all data from the screen
# Need to call 'show()' to make the clear actually happen
def clear(self):
super().fill(0)
# Clear a specific line on the screen
def clearLine(self, line):
yPixel = (line - 1) + ((line * 10) - 10)
super().fill_rect(0, yPixel, 128, 10, 0)
# Draw a line on the screen (vertical, horizontal or diagonal), setting start and finish (x, y) coordinates
# Need to call 'show()' to make the line actually display
def drawLine(self, start_x, start_y, end_x, end_y):
super().line(start_x, start_y, end_x, end_y, 1)
# Draw rectangles with a top left starting (x, y) coordinate and then a width and height
# Can be filled (True) or just an outline (False)
# Need to call 'show()' to make the rectangle actually display
def drawRect(self, start_x, start_y, width, height, fill=False):
if (fill == False):
super().rect(start_x, start_y, width, height, 1)
elif (fill == True):
super().fill_rect(start_x, start_y, width, height, 1)