Next make a buffered record of ON OFF button presses along with the time stamps so that a non interrupt function can determine which morse values have been entered. This can then be displayed on the screen. This is based on the PicoAirQ code atmo. In the fututre there may be a microphone amplified and filtered to only hear morse frquencies at audio. this pcb could then listen to morse being played and decode it. An RS-232 output could be used for logging to other computers etc.
1226 lines
53 KiB
Python
1226 lines
53 KiB
Python
import math
|
|
import framebuf
|
|
import array
|
|
import os
|
|
from machine import Pin, PWM, ADC, time_pulse_us, I2C, RTC
|
|
from rp2 import PIO, StateMachine, asm_pio
|
|
from time import sleep, sleep_ms, sleep_us, ticks_ms, ticks_us
|
|
from micropython import const
|
|
|
|
# Initialise the module with all outputs off
|
|
# High Power Output Pins
|
|
hp_3 = Pin(3, Pin.OUT)
|
|
hp_15 = Pin(15, Pin.OUT)
|
|
hp_3.value(0)
|
|
hp_15.value(0)
|
|
# Servo
|
|
servo = Pin(2, Pin.OUT)
|
|
servo.value(0)
|
|
# Buzzer
|
|
buzzer = PWM(Pin(4))
|
|
buzzer.duty_u16(0)
|
|
|
|
# The KitronikButton class enable the use of the 2 user input buttons on the board
|
|
class KitronikButton:
|
|
def __init__(self):
|
|
self.buttonA = Pin(12, Pin.IN, Pin.PULL_DOWN)
|
|
self.buttonB = Pin(13, Pin.IN, Pin.PULL_DOWN)
|
|
|
|
# The KitronikOutputControl class enables control of the servo and high-power outputs on the board
|
|
class KitronikOutputControl:
|
|
# This code drives a pwm on the PIO. It is running at 2Mhz, which gives the PWM a 1uS resolution.
|
|
@asm_pio(sideset_init=PIO.OUT_LOW)
|
|
def _servo_pwm():
|
|
# First we clear the pin to zero, then load the registers. Y is always 20000 - 20uS, x is the pulse 'on' length.
|
|
pull(noblock) .side(0)
|
|
mov(x, osr) # Keep most recent pull data stashed in X, for recycling by noblock
|
|
mov(y, isr) # ISR must be preloaded with PWM count max
|
|
# This is where the looping work is done. the overall loop rate is 1Mhz (clock is 2Mhz - we have 2 instructions to do)
|
|
label("loop")
|
|
jmp(x_not_y, "skip") #if there is 'excess' Y number leave the pin alone and jump to the 'skip' label until we get to the X value
|
|
nop() .side(1)
|
|
label("skip")
|
|
jmp(y_dec, "loop") #count down y by 1 and jump to pwmloop. When y is 0 we will go back to the 'pull' command
|
|
|
|
def __init__(self):
|
|
# High Power Output Pins
|
|
self.highPwr_3 = Pin(3, Pin.OUT)
|
|
self.highPwr_15 = Pin(15, Pin.OUT)
|
|
|
|
# Servo Control
|
|
self.servo = []
|
|
# Servo 0 degrees -> pulse of 0.5ms, 180 degrees 2.5ms
|
|
# Pulse train freq 50hz - 20ms
|
|
# 1us is freq of 1000000
|
|
# Servo pulses range from 500 to 2500us and overall pulse train is 20000us repeat.
|
|
# Servo is on GP2
|
|
self.maxServoPulse = 2500
|
|
self.minServoPulse = 500
|
|
self.pulseTrain = 20000
|
|
self.degreesToUS = 2000/180
|
|
# Create and start the servo statemachine
|
|
self.servo.append (StateMachine(1, self._servo_pwm, freq=2000000, sideset_base=Pin(2)))
|
|
self.servo[0].put(self.pulseTrain)
|
|
self.servo[0].exec("pull()")
|
|
self.servo[0].exec("mov(isr, osr)")
|
|
self.registerServo()
|
|
|
|
# Doesn't actually register/unregister, just stops and starts the servo PIO
|
|
# The servo is registered by default in the __init__() function - these are only required if you want to use Pin 2 for something else, and then register the servo again
|
|
def registerServo(self):
|
|
if(not self.servo[0].active()):
|
|
self.servo[0].active(1)
|
|
|
|
def deregisterServo(self):
|
|
if(self.servo[0].active()):
|
|
self.servo[0].active(0)
|
|
|
|
# goToPosition takes a degree position for the servo to go to.
|
|
# 0degrees->180 degrees is 0->2000us, plus offset of 500uS
|
|
# 1 degree ~ 11uS.
|
|
# This function does the sum then calls goToPeriod to actually poke the PIO
|
|
def servoToPosition(self, degrees):
|
|
pulseLength = int(degrees*self.degreesToUS + 500)
|
|
self.servoToPeriod(pulseLength)
|
|
|
|
def servoToPeriod(self, period):
|
|
if(period < 500):
|
|
period = 500
|
|
if(period >2500):
|
|
period =2500
|
|
self.servo[0].put(period)
|
|
|
|
# Functions to turn on/off the high power outputs
|
|
# Enter the pin number, either '3' or '15'
|
|
def highPowerOn(self, pin):
|
|
if (pin == 3):
|
|
self.highPwr_3.value(1)
|
|
elif (pin == 15):
|
|
self.highPwr_15.value(1)
|
|
|
|
def highPowerOff(self, pin):
|
|
if (pin == 3):
|
|
self.highPwr_3.value(0)
|
|
elif (pin == 15):
|
|
self.highPwr_15.value(0)
|
|
|
|
# The KitronikDataLogger class enables data logging through the Pico file system
|
|
# It is possible to create multiple data logger instances to then log to multiple files simulataneously
|
|
class KitronikDataLogger:
|
|
# Function is called when the class is initialised - sets the maximum permissable filesize, the data separator and creates the log file with the entered filename
|
|
# Separator options: ("comma", "semicolon", "tab")
|
|
def __init__(self, file = "data_log.txt", separator = "semicolon"):
|
|
self.MAX_FILE_SIZE = 500000 # This is approximately 10000 full entries
|
|
if (separator == "comma"):
|
|
self.SEPARATOR = ","
|
|
elif (separator == "semicolon"):
|
|
self.SEPARATOR = ";"
|
|
elif (separator == "tab"):
|
|
self.SEPARATOR = "\t"
|
|
self.FILENAME = file
|
|
try:
|
|
f = open(self.FILENAME, "x")
|
|
f.close()
|
|
except OSError:
|
|
print("File already exists")
|
|
self.line1 = ""
|
|
self.line2 = ""
|
|
self.line3 = ""
|
|
self.dataHeadings = ""
|
|
self.projectInfo = False
|
|
self.headings = False
|
|
|
|
# Write a header section to the specified file (there are 3 free text fields, each will write on a separate line)
|
|
def writeProjectInfo(self, line1="", line2="", line3=""):
|
|
if (line1 != ""):
|
|
self.writeFile(self.FILENAME, line1 + "\r\n")
|
|
self.line1 = line1
|
|
if (line2 != ""):
|
|
self.writeFile(self.FILENAME, line2 + "\r\n")
|
|
self.line2 = line2
|
|
if (line3 != ""):
|
|
self.writeFile(self.FILENAME, line3 + "\r\n")
|
|
self.line3 = line3
|
|
self.projectInfo = True
|
|
|
|
# This writes whatever is passed to it to the file
|
|
def writeFile(self, file, passed):
|
|
f = open(file, "a") #open in append - creates if not existing, will append if it exists
|
|
f.write(passed)
|
|
f.close()
|
|
|
|
# Input and write to the file up to 10 data field headings
|
|
def nameColumnHeadings(self, field1="", field2="", field3="", field4="", field5="", field6="", field7="", field8="", field9="", field10=""):
|
|
dataHeadings = ""
|
|
if (field1 != ""):
|
|
dataHeadings = field1 + self.SEPARATOR
|
|
if (field2 != ""):
|
|
dataHeadings = dataHeadings + field2 + self.SEPARATOR
|
|
if (field3 != ""):
|
|
dataHeadings = dataHeadings + field3 + self.SEPARATOR
|
|
if (field4 != ""):
|
|
dataHeadings = dataHeadings + field4 + self.SEPARATOR
|
|
if (field5 != ""):
|
|
dataHeadings = dataHeadings + field5 + self.SEPARATOR
|
|
if (field6 != ""):
|
|
dataHeadings = dataHeadings + field6 + self.SEPARATOR
|
|
if (field7 != ""):
|
|
dataHeadings = dataHeadings + field7 + self.SEPARATOR
|
|
if (field8 != ""):
|
|
dataHeadings = dataHeadings + field8 + self.SEPARATOR
|
|
if (field9 != ""):
|
|
dataHeadings = dataHeadings + field9 + self.SEPARATOR
|
|
if (field10 != ""):
|
|
dataHeadings = dataHeadings + field10 + self.SEPARATOR
|
|
|
|
self.dataHeadings = dataHeadings
|
|
self.writeFile(self.FILENAME, dataHeadings + "\r\n")
|
|
self.headings = True
|
|
|
|
# Store up to 10 data entries (match the order with the data headings used)
|
|
def storeDataEntry(self, field1="", field2="", field3="", field4="", field5="", field6="", field7="", field8="", field9="", field10=""):
|
|
dataEntry = ""
|
|
if (field1 != ""):
|
|
dataEntry = field1 + self.SEPARATOR
|
|
if (field2 != ""):
|
|
dataEntry = dataEntry + field2 + self.SEPARATOR
|
|
if (field3 != ""):
|
|
dataEntry = dataEntry + field3 + self.SEPARATOR
|
|
if (field4 != ""):
|
|
dataEntry = dataEntry + field4 + self.SEPARATOR
|
|
if (field5 != ""):
|
|
dataEntry = dataEntry + field5 + self.SEPARATOR
|
|
if (field6 != ""):
|
|
dataEntry = dataEntry + field6 + self.SEPARATOR
|
|
if (field7 != ""):
|
|
dataEntry = dataEntry + field7 + self.SEPARATOR
|
|
if (field8 != ""):
|
|
dataEntry = dataEntry + field8 + self.SEPARATOR
|
|
if (field9 != ""):
|
|
dataEntry = dataEntry + field9 + self.SEPARATOR
|
|
if (field10 != ""):
|
|
dataEntry = dataEntry + field10 + self.SEPARATOR
|
|
|
|
while (self.checkFileSize() > self.MAX_FILE_SIZE):
|
|
self.removeOneLine()
|
|
|
|
self.writeFile(self.FILENAME, dataEntry + "\r\n")
|
|
|
|
# This returns the size of the file, or 0 if the file does not exist
|
|
def checkFileSize(self):
|
|
# f is a file-like object.
|
|
try:
|
|
f = open(self.FILENAME, "r") # Open read - this throws an error if file does not exist - in that case the size is 0
|
|
f.seek(0, 2)
|
|
size = f.tell()
|
|
f.close()
|
|
return size
|
|
except:
|
|
# if we wanted to know we could print some diagnostics here like:
|
|
#print("Exception - File does not exist")
|
|
return 0
|
|
|
|
# Remove a line from the data file to make space for more data
|
|
def removeOneLine(self):
|
|
tempName = self.FILENAME + ".bak"
|
|
readFrom = open(self.FILENAME, "r")
|
|
writeTo = open(tempName, "w")
|
|
if self.projectInfo: # If there is Project Info, skip over these lines and then write them to the temporary file
|
|
for l in range(3):
|
|
readFrom.readline()
|
|
writeTo.write(self.line1 + "\r\n")
|
|
writeTo.write(self.line2 + "\r\n")
|
|
writeTo.write(self.line3 + "\r\n")
|
|
if self.headings:
|
|
readFrom.readline() # If there are Headings, skip over this line and then write them to the temporary file
|
|
writeTo.write(self.dataHeadings + "\r\n")
|
|
readFrom.readline() # Read and throw away the first line of data in the file
|
|
|
|
for lines in readFrom: # Write the remaining lines to the temporary file
|
|
writeTo.write(lines)
|
|
readFrom.close() # Close both files
|
|
writeTo.close()
|
|
os.remove(self.FILENAME) # Delete original log file
|
|
os.rename(tempName, self.FILENAME) # Rename temporary file as new log file (now with the first line of data removed)
|
|
|
|
# Deletes all the contents of the file
|
|
def eraseAllData(self):
|
|
f = open(self.FILENAME, "w")
|
|
f.write("")
|
|
f.close()
|
|
|
|
# Deletes the file from the Pico file system
|
|
def deleteDataFile(self):
|
|
os.remove(self.FILENAME)
|
|
|
|
# The KitronikBuzzer class enables control of the piezo buzzer on the board
|
|
class KitronikBuzzer:
|
|
# Function is called when the class is initialised and sets the buzzer pin to GP4
|
|
def __init__(self):
|
|
self.buzzer = PWM(Pin(4))
|
|
self.dutyCycle = 32767
|
|
|
|
# Play a continous tone at a specified frequency
|
|
def playTone(self, freq):
|
|
if (freq < 30):
|
|
freq = 30
|
|
if (freq > 3000):
|
|
freq = 3000
|
|
self.buzzer.freq(freq)
|
|
self.buzzer.duty_u16(self.dutyCycle)
|
|
|
|
# Play a tone at a speciied frequency for a specified length of time in ms
|
|
def playTone_Length(self, freq, length):
|
|
self.playTone(freq)
|
|
sleep_ms(length)
|
|
self.stopTone()
|
|
|
|
# Stop the buzzer producing a tone
|
|
def stopTone(self):
|
|
self.buzzer.duty_u16(0)
|
|
|
|
# The KitronikZIPLEDs class enables control of the ZIP LEDs both on the board and any connected externally
|
|
class KitronikZIPLEDs:
|
|
# We drive the ZIP LEDs using one of the PIO statemachines.
|
|
@asm_pio(sideset_init=PIO.OUT_LOW, out_shiftdir=PIO.SHIFT_LEFT, autopull=True, pull_thresh=24)
|
|
def _ZIPLEDOutput():
|
|
T1 = 2
|
|
T2 = 5
|
|
T3 = 3
|
|
wrap_target()
|
|
label("bitloop")
|
|
out(x, 1) .side(0) [T3 - 1]
|
|
jmp(not_x, "do_zero") .side(1) [T1 - 1]
|
|
jmp("bitloop") .side(1) [T2 - 1]
|
|
label("do_zero")
|
|
nop() .side(0) [T2 - 1]
|
|
wrap()
|
|
|
|
def __init__(self, num_zip_leds):
|
|
self.num_zip_leds = num_zip_leds
|
|
# Create and start the StateMachine for the ZIPLeds
|
|
self.ZIPLEDs = StateMachine(4, self._ZIPLEDOutput, freq=8_000_000, sideset_base=Pin(20))
|
|
self.theLEDs = array.array("I", [0 for _ in range(self.num_zip_leds)]) #an array for the LED colours.
|
|
self.brightness = 0.5 #20% initially
|
|
self.ZIPLEDs.active(1)
|
|
|
|
# Define some colour tuples for people to use.
|
|
self.BLACK = (0, 0, 0)
|
|
self.RED = (255, 0, 0)
|
|
self.YELLOW = (255, 150, 0)
|
|
self.GREEN = (0, 255, 0)
|
|
self.CYAN = (0, 255, 255)
|
|
self.BLUE = (0, 0, 255)
|
|
self.PURPLE = (180, 0, 255)
|
|
self.WHITE = (255, 255, 255)
|
|
self.COLOURS = (self.BLACK, self.RED, self.YELLOW, self.GREEN, self.CYAN, self.BLUE, self.PURPLE, self.WHITE)
|
|
|
|
# Show pushes the current setup of the LEDS to the physical LEDS - it makes them visible.
|
|
def show(self):
|
|
brightAdjustedLEDs = array.array("I", [0 for _ in range(self.num_zip_leds)])
|
|
for i,c in enumerate(self.theLEDs):
|
|
r = int(((c >> 8) & 0xFF) * self.brightness)
|
|
g = int(((c >> 16) & 0xFF) * self.brightness)
|
|
b = int((c & 0xFF) * self.brightness)
|
|
brightAdjustedLEDs[i] = (g<<16) + (r<<8) + b
|
|
self.ZIPLEDs.put(brightAdjustedLEDs, 8)
|
|
|
|
# Turn the LED off by setting the colour to black
|
|
def clear(self, whichLED):
|
|
self.setLED(whichLED, self.BLACK)
|
|
|
|
# Sets the colour of an individual LED. Use show to make change visible
|
|
def setLED(self, whichLED, whichColour):
|
|
if(whichLED<0):
|
|
raise Exception("INVALID LED:",whichLED," specified")
|
|
elif(whichLED>(self.num_zip_leds - 1)):
|
|
raise Exception("INVALID LED:",whichLED," specified")
|
|
else:
|
|
self.theLEDs[whichLED] = (whichColour[1]<<16) + (whichColour[0]<<8) + whichColour[2]
|
|
|
|
# Gets the stored colour of an individual LED, which isnt nessecerily the colour on show if it has been changed, but not 'show'n
|
|
def getLED(self, whichLED):
|
|
if(whichLED<0):
|
|
raise Exception("INVALID LED:",whichLED," specified")
|
|
elif(whichLED>(self.num_zip_leds - 1)):
|
|
raise Exception("INVALID LED:",whichLED," specified")
|
|
else:
|
|
return(((self.theLEDs[whichLED]>>8) & 0xff), ((self.theLEDs[whichLED]>>16)& 0xff) ,((self.theLEDs[whichLED])& 0xff))
|
|
|
|
# Takes 0-100 as a brightness value, brighness is applies in the'show' function
|
|
def setBrightness(self, value):
|
|
#cap to 0-100%
|
|
if (value<0):
|
|
value = 0
|
|
elif (value>100):
|
|
value=100
|
|
self.brightness = value / 100
|
|
|
|
# The KitronikRTC class enables use of the Pico RTC
|
|
class KitronikRTC:
|
|
# Function is called when the class is initialised and creates an instance of the Pico RTC and defines all the global variables
|
|
def __init__(self):
|
|
self.rtc = RTC()
|
|
self.day = 0
|
|
self.month = 0
|
|
self.year = 0
|
|
self.weekday = 0 # In range 0 - 6, 0 = Monday, 6 = Sunday
|
|
self.hour = 0
|
|
self.minute = 0
|
|
self.second = 0
|
|
self.alarmHour = 0
|
|
self.alarmMinute = 0
|
|
self.alarmSet = False
|
|
self.alarmTrigger = False
|
|
self.alarmRepeat = False
|
|
self.hourPeriod = 0
|
|
self.minutePeriod = 0
|
|
|
|
# Function calculates the weekday (0 = Monday, 6 = Sunday) based on the date, taking into account leap years
|
|
def calcWeekday(self, day, month, year):
|
|
dayOffset = [0, 3, 2, 5, 0, 3, 5, 1, 4, 6, 2, 4]
|
|
if (month < 3):
|
|
month = month - 1
|
|
|
|
self.weekday = (year + (year // 4) - (year // 100) + (year // 400) + dayOffset[month - 1] + day) % 7 # Returns 0 - 6, 0 = Sunday, 6 = Saturday
|
|
if (self.weekday == 0):
|
|
self.weekday = 6
|
|
else:
|
|
self.weekday = self.weekday - 1
|
|
|
|
# Set the date on the Pico RTC
|
|
def setDate(self, day, month, year):
|
|
self.getDateTime()
|
|
self.calcWeekday(day, month, year)
|
|
self.rtc.datetime((year, month, day, self.weekday, self.hour, self.minute, self.second, 0))
|
|
|
|
# Set the time on the Pico RTC
|
|
def setTime(self, hours, minutes, seconds):
|
|
self.getDateTime()
|
|
self.rtc.datetime((self.year, self.month, self.day, self.weekday, hours, minutes, seconds, 0))
|
|
|
|
# Return the current date and time
|
|
def getDateTime(self):
|
|
newDateTime = self.rtc.datetime()
|
|
self.day = newDateTime[2]
|
|
self.month = newDateTime[1]
|
|
self.year = newDateTime[0]
|
|
self.weekday = newDateTime[3]
|
|
self.hour = newDateTime[4]
|
|
self.minute = newDateTime[5]
|
|
self.second = newDateTime[6]
|
|
|
|
# Return the current date as a string
|
|
def readDateString(self):
|
|
self.getDateTime()
|
|
if (self.day < 10):
|
|
day = "0" + str(self.day)
|
|
else:
|
|
day = str(self.day)
|
|
|
|
if (self.month < 10):
|
|
month = "0" + str(self.month)
|
|
else:
|
|
month = str(self.month)
|
|
|
|
date = day + "/" + month + "/" + str(self.year)
|
|
return date
|
|
|
|
# Return the current time as a string
|
|
def readTimeString(self):
|
|
self.getDateTime()
|
|
if (self.hour < 10):
|
|
hour = "0" + str(self.hour)
|
|
else:
|
|
hour = str(self.hour)
|
|
|
|
if (self.minute < 10):
|
|
minute = "0" + str(self.minute)
|
|
else:
|
|
minute = str(self.minute)
|
|
|
|
if (self.second < 10):
|
|
second = "0" + str(self.second)
|
|
else:
|
|
second = str(self.second)
|
|
|
|
time = hour + ":" + minute + ":" + second
|
|
return time
|
|
|
|
# Return a specific date/time parameter as a number
|
|
# d = day, m = month, y = year, h = hour, min = minute, s = second
|
|
def readParameter(self, parameter):
|
|
self.getDateTime()
|
|
if (parameter == "d"):
|
|
return self.day
|
|
elif (parameter == "m"):
|
|
return self.month
|
|
elif (parameter == "y"):
|
|
return self.year
|
|
elif (parameter == "h"):
|
|
return self.hour
|
|
elif (parameter == "min"):
|
|
return self.minute
|
|
elif (parameter == "s"):
|
|
return self.second
|
|
|
|
# Set an alarm for a specific hour and minute
|
|
# Extra options to set a periodically repeating alarm (set alarmRepeat to True and then specifiy the hour and/or minute period between alarms)
|
|
def setAlarm(self, hour, minute, alarmRepeat=False, hourPeriod=0, minutePeriod=0):
|
|
self.alarmHour = math.ceil(hour)
|
|
self.alarmMinute = math.ceil(minute)
|
|
self.alarmRepeat = alarmRepeat
|
|
|
|
if alarmRepeat:
|
|
self.hourPeriod = math.ceil(hourPeriod)
|
|
self.minutePeriod = math.ceil(minutePeriod)
|
|
self.alarmSet = True
|
|
|
|
# Check whether the alarm conditions have been met and then trigger the alarm
|
|
def checkAlarm(self):
|
|
if self.alarmSet:
|
|
if (self.readParameter("h") == self.alarmHour):
|
|
if (self.readParameter("min") == self.alarmMinute):
|
|
self.alarmTrigger = True
|
|
return self.alarmTrigger
|
|
|
|
# Sets 'alarmTrigger' back to False, checks whether the alarm should repeat (sets new one if it should) or sets 'alarmSet' back to False
|
|
def silenceAlarm(self):
|
|
self.alarmTrigger = False
|
|
if not self.alarmRepeat:
|
|
self.alarmSet = False
|
|
else:
|
|
newHour = self.alarmHour + self.hourPeriod
|
|
newMinute = self.alarmMinute + self.minutePeriod
|
|
if (newMinute > 59):
|
|
newMinute = newMinute - 60
|
|
newHour = newHour + 1
|
|
if (newHour > 23):
|
|
newHour = newHour - 24
|
|
self.setAlarm(newHour, newMinute, True, self.hourPeriod, self.minutePeriod)
|
|
|
|
# The KitronikBME688 class enables contro and use of the BME688 sensor on the board
|
|
class KitronikBME688:
|
|
# The following functions are for reading the registers on the BME688
|
|
# Function for reading register as signed 8 bit integer
|
|
def getUInt8(self, reg):
|
|
return int.from_bytes(self.i2c.readfrom_mem(self.CHIP_ADDRESS, reg, 1), "big")
|
|
|
|
# Function to convert unsigned ints to twos complement signed ints
|
|
def twosComp(self, value, bits):
|
|
if ((value & (1 << (bits - 1))) != 0):
|
|
value = value - (1 << bits)
|
|
return value
|
|
|
|
# Function for proportionally mapping a value to a different value range
|
|
def mapValues(self, value, frMin, frMax, toMin, toMax):
|
|
toRange = toMax - toMin
|
|
mappedVal = toMin + ((value - frMin) * ((toMax - toMin) / (frMax - frMin)))
|
|
return mappedVal
|
|
|
|
def __init__(self, i2cAddr=0x77, sda=6, scl=7):
|
|
self.CHIP_ADDRESS = i2cAddr # I2C address as determined by hardware configuration
|
|
sda = Pin(sda)
|
|
scl = Pin(scl)
|
|
self.i2c = I2C(1,sda=sda, scl=scl, freq=100000)
|
|
|
|
# Useful BME688 Register Addresses
|
|
# Control
|
|
self.CTRL_MEAS = 0x74 # Bit position <7:5>: Temperature oversampling Bit position <4:2>: Pressure oversampling Bit position <1:0>: Sensor power mode
|
|
self.RESET = 0xE0 # Write 0xB6 to initiate soft-reset (same effect as power-on reset)
|
|
self.CHIP_ID = 0xD0 # Read this to return the chip ID: 0x61 - good way to check communication is occurring
|
|
self.CTRL_HUM = 0x72 # Bit position <2:0>: Humidity oversampling settings
|
|
self.CONFIG = 0x75 # Bit position <4:2>: IIR filter settings
|
|
self.CTRL_GAS_0 = 0x70 # Bit position <3>: Heater off (set to '1' to turn off current injection)
|
|
self.CTRL_GAS_1 = 0x71 # Bit position <5> DATASHEET ERROR: Enable gas conversions to start when set to '1' Bit position <3:0>: Heater step selection (0 to 9)
|
|
|
|
# Pressure Data
|
|
self.PRESS_MSB_0 = 0x1F # Forced & Parallel: MSB [19:12]
|
|
self.PRESS_LSB_0 = 0x20 # Forced & Parallel: LSB [11:4]
|
|
self.PRESS_XLSB_0 = 0x21 # Forced & Parallel: XLSB [3:0]
|
|
|
|
# Temperature Data
|
|
self.TEMP_MSB_0 = 0x22 # Forced & Parallel: MSB [19:12]
|
|
self.TEMP_LSB_0 = 0x23 # Forced & Parallel: LSB [11:4]
|
|
self.TEMP_XLSB_0 = 0x24 # Forced & Parallel: XLSB [3:0]
|
|
|
|
# Humidity Data
|
|
self.HUMID_MSB_0 = 0x25 # Forced & Parallel: MSB [15:8]
|
|
self.HUMID_LSB_0 = 0x26 # Forced & Parallel: LSB [7:0]
|
|
|
|
# Gas Resistance Data
|
|
self.GAS_RES_MSB_0 = 0x2C # Forced & Parallel: MSB [9:2]
|
|
self.GAS_RES_LSB_0 = 0x2D # Forced & Parallel: Bit <7:6>: LSB [1:0] Bit <5>: Gas valid Bit <4>: Heater stability Bit <3:0>: Gas resistance range
|
|
|
|
# Status
|
|
self.MEAS_STATUS_0 = 0x1D # Forced & Parallel: Bit <7>: New data Bit <6>: Gas measuring Bit <5>: Measuring Bit <3:0>: Gas measurement index
|
|
|
|
# Calibration parameters for compensation calculations
|
|
# Temperature
|
|
self.PAR_T1 = self.twosComp((self.getUInt8(0xEA) << 8) | self.getUInt8(0xE9), 16) # Signed 16-bit
|
|
self.PAR_T2 = self.twosComp((self.getUInt8(0x8B) << 8) | self.getUInt8(0x8A), 16) # Signed 16-bit
|
|
self.PAR_T3 = self.twosComp(self.getUInt8(0x8C), 8) # Signed 8-bit
|
|
|
|
# Pressure
|
|
self.PAR_P1 = (self.getUInt8(0x8F) << 8) | self.getUInt8(0x8E) # Always a positive number, do not do twosComp() conversion!
|
|
self.PAR_P2 = self.twosComp((self.getUInt8(0x91) << 8) | self.getUInt8(0x90), 16) # Signed 16-bit
|
|
self.PAR_P3 = self.twosComp(self.getUInt8(0x92), 8) # Signed 8-bit
|
|
self.PAR_P4 = self.twosComp((self.getUInt8(0x95) << 8) | self.getUInt8(0x94), 16) # Signed 16-bit
|
|
self.PAR_P5 = self.twosComp((self.getUInt8(0x97) << 8) | self.getUInt8(0x96), 16) # Signed 16-bit
|
|
self.PAR_P6 = self.twosComp(self.getUInt8(0x99), 8) # Signed 8-bit
|
|
self.PAR_P7 = self.twosComp(self.getUInt8(0x98), 8) # Signed 8-bit
|
|
self.PAR_P8 = self.twosComp((self.getUInt8(0x9D) << 8) | self.getUInt8(0x9C), 16) # Signed 16-bit
|
|
self.PAR_P9 = self.twosComp((self.getUInt8(0x9F) << 8) | self.getUInt8(0x9E), 16) # Signed 16-bit
|
|
self.PAR_P10 = self.twosComp(self.getUInt8(0xA0), 8) # Signed 8-bit
|
|
|
|
# Humidity
|
|
parH1_LSB_parH2_LSB = self.getUInt8(0xE2)
|
|
self.PAR_H1 = (self.getUInt8(0xE3) << 4) | (parH1_LSB_parH2_LSB & 0x0F)
|
|
self.PAR_H2 = (self.getUInt8(0xE1) << 4) | (parH1_LSB_parH2_LSB >> 4)
|
|
self.PAR_H3 = self.twosComp(self.getUInt8(0xE4), 8) # Signed 8-bit
|
|
self.PAR_H4 = self.twosComp(self.getUInt8(0xE5), 8) # Signed 8-bit
|
|
self.PAR_H5 = self.twosComp(self.getUInt8(0xE6), 8) # Signed 8-bit
|
|
self.PAR_H6 = self.twosComp(self.getUInt8(0xE7), 8) # Signed 8-bit
|
|
self.PAR_H7 = self.twosComp(self.getUInt8(0xE8), 8) # Signed 8-bit
|
|
|
|
# Gas resistance
|
|
self.PAR_G1 = self.twosComp(self.getUInt8(0xED), 8) # Signed 8-bit
|
|
self.PAR_G2 = self.twosComp((self.getUInt8(0xEB) << 8) | self.getUInt8(0xEC), 16) # Signed 16-bit
|
|
self.PAR_G3 = self.getUInt8(0xEE) # Unsigned 8-bit
|
|
self.RES_HEAT_RANGE = (self.getUInt8(0x02) >> 4) & 0x03
|
|
self.RES_HEAT_VAL = self.twosComp(self.getUInt8(0x00), 8) # Signed 8-bit
|
|
|
|
# Oversampling rate constants
|
|
self.OSRS_1X = 0x01
|
|
self.OSRS_2X = 0x02
|
|
self.OSRS_4X = 0x03
|
|
self.OSRS_8X = 0x04
|
|
self.OSRS_16X = 0x05
|
|
|
|
# IIR filter coefficient values
|
|
self.IIR_0 = 0x00
|
|
self.IIR_1 = 0x01
|
|
self.IIR_3 = 0x02
|
|
self.IIR_7 = 0x03
|
|
self.IIR_15 = 0x04
|
|
self.IIR_31 = 0x05
|
|
self.IIR_63 = 0x06
|
|
self.IIR_127 = 0x07
|
|
|
|
#Global variables used for storing one copy of value, these are used in multiple locations for calculations
|
|
self.bme688InitFlag = False
|
|
self.gasInit = False
|
|
|
|
self.tRead = 0 # calculated readings of sensor parameters from raw adc readings
|
|
self.pRead = 0
|
|
self.hRead = 0
|
|
self.gRes = 0
|
|
self.iaqPercent = 0
|
|
self.iaqScore = 0
|
|
self.airQualityRating = ""
|
|
self.eCO2Value = 0
|
|
|
|
self.gBase = 0
|
|
self.hBase = 40 # Between 30% & 50% is a widely recognised optimal indoor humidity, 40% is a good middle ground
|
|
self.hWeight = 0.25 # Humidity contributes 25% to the IAQ score, gas resistance is 75%
|
|
self.hPrev = 0
|
|
self.measTime = 0
|
|
self.measTimePrev = 0
|
|
|
|
self.tRaw = 0 # adc reading of raw temperature
|
|
self.pRaw = 0 # adc reading of raw pressure
|
|
self.hRaw = 0 # adc reading of raw humidity
|
|
self.gResRaw = 0 # adc reading of raw gas resistance
|
|
self.gasRange = 0
|
|
|
|
self.t_fine = 0 # Intermediate temperature value used for pressure calculation
|
|
self.newAmbTemp = 0
|
|
self.tAmbient = 0 # Intermediate temperature value used for heater calculation
|
|
self.ambTempFlag = False
|
|
|
|
# Create an instance of the OLED display screen for use during setup and for error messages
|
|
self.screen = KitronikOLED()
|
|
|
|
# Begin the hardware inititialisation for the BME688 sensor
|
|
self.bme688Init()
|
|
|
|
# Temperature compensation calculation: rawADC to degrees C (integer)
|
|
def calcTemperature(self, tempADC):
|
|
var1 = (tempADC >> 3) - (self.PAR_T1 << 1)
|
|
var2 = (var1 * self.PAR_T2) >> 11
|
|
var3 = ((((var1 >> 1) * (var1 >> 1)) >> 12) * (self.PAR_T3 << 4)) >> 14
|
|
self.t_fine = var2 + var3
|
|
self.newAmbTemp = ((self.t_fine * 5) + 128) >> 8
|
|
self.tRead = self.newAmbTemp / 100 # Convert to floating point with 2 dp
|
|
if (self.ambTempFlag == False):
|
|
self.tAmbient = self.newAmbTemp
|
|
|
|
# Pressure compensation calculation: rawADC to Pascals (integer)
|
|
def intCalcPressure(self, pressureADC):
|
|
var1 = (self.t_fine >> 1) - 64000
|
|
var2 = ((((var1 >> 2) * (var1 >> 2)) >> 11) * self.PAR_P6) >> 2
|
|
var2 = var2 + ((var1 * self.PAR_P5) << 1)
|
|
var2 = (var2 >> 2) + (self.PAR_P4 << 16)
|
|
var1 = (((((var1 >> 2) * (var1 >> 2)) >> 13) * (self.PAR_P3 << 5)) >> 3) + ((self.PAR_P2 * var1) >> 1)
|
|
var1 = var1 >> 18
|
|
var1 = ((32768 + var1) * self.PAR_P1) >> 15
|
|
self.pRead = 1048576 - pressureADC
|
|
self.pRead = ((self.pRead - (var2 >> 12)) * 3125)
|
|
|
|
if (self.pRead >= (1 << 30)):
|
|
self.pRead = (self.pRead // var1) << 1
|
|
else:
|
|
self.pRead = ((self.pRead << 1) // var1)
|
|
|
|
var1 = (self.PAR_P9 * (((self.pRead >> 3) * (self.pRead >> 3)) >> 13)) >> 12
|
|
var2 = ((self.pRead >> 2) * self.PAR_P8) >> 13
|
|
var3 = ((self.pRead >> 8) * (self.pRead >> 8) * (self.pRead >> 8) * self.PAR_P10) >> 17
|
|
self.pRead = self.pRead + ((var1 + var2 + var3 + (self.PAR_P7 << 7)) >> 4)
|
|
|
|
# Humidity compensation calculation: rawADC to % (integer)
|
|
# 'tempScaled' is the current reading from the Temperature sensor
|
|
def intCalcHumidity(self, humidADC, tempScaled):
|
|
self.hPrev = self.hRead
|
|
tempScaled = math.trunc(tempScaled)
|
|
|
|
var1 = humidADC - (self.PAR_H1 << 4) - (((tempScaled * self.PAR_H3) // 100) >> 1)
|
|
var2 = (self.PAR_H2 * (((tempScaled * self.PAR_H4) // 100) + (((tempScaled * ((tempScaled * self.PAR_H5) // 100)) >> 6) // 100) + (1 << 14))) >> 10
|
|
var3 = var1 * var2
|
|
var4 = ((self.PAR_H6 << 7) + ((tempScaled * self.PAR_H7) // 100)) >> 4
|
|
var5 = ((var3 >> 14) * (var3 >> 14)) >> 10
|
|
var6 = (var4 * var5) >> 1
|
|
self.hRead = (var3 + var6) >> 12
|
|
self.hRead = (((var3 + var6) >> 10) * (1000)) >> 12
|
|
self.hRead = self.hRead // 1000
|
|
|
|
# Gas sensor heater target temperature to target resistance calculation
|
|
# 'ambientTemp' is reading from Temperature sensor in degC (could be averaged over a day when there is enough data?)
|
|
# 'targetTemp' is the desired temperature of the hot plate in degC (in range 200 to 400)
|
|
# Note: Heating duration also needs to be specified for each heating step in 'gas_wait' registers
|
|
def intConvertGasTargetTemp(self, ambientTemp, targetTemp):
|
|
var1 = ((ambientTemp * self.PAR_G3) // 1000) << 8 # Divide by 1000 as we have ambientTemp in pre-degC format (i.e. 2500 rather than 25.00 degC)
|
|
var2 = (self.PAR_G1 + 784) * (((((self.PAR_G2 + 154009) * targetTemp * 5) // 100) + 3276800) // 10)
|
|
var3 = var1 + (var2 >> 1)
|
|
var4 = (var3 // (self.RES_HEAT_RANGE + 4))
|
|
var5 = (131 * self.RES_HEAT_VAL) + 65536 # Target heater resistance in Ohms
|
|
resHeatX100 = (((var4 // var5) - 250) * 34)
|
|
resHeat = ((resHeatX100 + 50) // 100)
|
|
|
|
return resHeat
|
|
|
|
# Gas resistance compensation calculation: rawADC & range to Ohms (integer)
|
|
def intCalcgRes(self, gasADC, gasRange):
|
|
var1 = 262144 >> gasRange
|
|
var2 = gasADC - 512
|
|
var2 = var2 * 3
|
|
var2 = 4096 + var2
|
|
calcGasRes = ((10000 * var1) // var2)
|
|
self.gRes = calcGasRes * 100
|
|
|
|
# Initialise the BME688, establishing communication, entering initial T, P & H oversampling rates, setup filter and do a first data reading (won't return gas)
|
|
def bme688Init(self):
|
|
# Establish communication with BME688
|
|
chipID = self.i2c.readfrom_mem(self.CHIP_ADDRESS, self.CHIP_ID, 1)
|
|
chipID = int.from_bytes(chipID, "big")
|
|
while (chipID != 97):
|
|
chipID = self.i2c.readfrom_mem(self.CHIP_ADDRESS, self.CHIP_ID, 1)
|
|
# Do a soft reset
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.RESET, "\xB6")
|
|
sleep_ms(1000)
|
|
# Set mode to SLEEP MODE: CTRL_MEAS reg <1:0>
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, "\x00")
|
|
# Set the oversampling rates for Temperature, Pressure and Humidity
|
|
# Humidity: CTRL_HUM bits <2:0>
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_HUM, str(self.OSRS_2X))
|
|
# Temperature: CTRL_MEAS bits <7:5> Pressure: CTRL_MEAS bits <4:2> (Combine and write both in one command)
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, str(((self.OSRS_2X << 5) | (self.OSRS_16X << 2))))
|
|
|
|
# IIR Filter: CONFIG bits <4:2>
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CONFIG, str(self.IIR_3 << 2))
|
|
|
|
# Enable gas conversion: CTRL_GAS_1 bit <5> (although datasheet says <4> - not sure what's going on here...)
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_GAS_1, "\x20")
|
|
self.bme688InitFlag = True
|
|
|
|
# Do an initial data read (will only return temperature, pressure and humidity as no gas sensor parameters have been set)
|
|
self.measureData()
|
|
|
|
# Setup the gas sensor (defaults are 300°C and 180ms).
|
|
# targetTemp is the target temperature for the gas sensor plate to reach (200 - 400°C), eg: 300
|
|
# heatDuration is the length of time for the heater to be turned on (0 - 4032ms), eg: 180
|
|
# WARNING: The temperature and duration values can be changed but this is not recommended unless the user is familiar with gas sensor setup
|
|
# The default values have been chosen as they provide a good all-round sensor response for air quality purposes
|
|
def setupGasSensor(self, targetTemp=300, heatDuration=180):
|
|
if (self.bme688InitFlag == False):
|
|
self.bme688Init()
|
|
|
|
# Limit targetTemp between 200°C & 400°C
|
|
if (targetTemp < 200):
|
|
targetTemp = 200
|
|
elif (targetTemp > 400):
|
|
targetTemp = 400
|
|
|
|
# Limit heatDuration between 0ms and 4032ms
|
|
if (heatDuration < 0):
|
|
heatDuration = 0
|
|
elif (heatDuration > 4032):
|
|
heatDuration = 4032
|
|
|
|
# Define the target heater resistance from temperature
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, 0x5A, self.intConvertGasTargetTemp(self.tAmbient, targetTemp).to_bytes(1, 'big')) # res_wait_0 register - heater step 0
|
|
|
|
# Define the heater on time, converting ms to register code (Heater Step 0) - cannot be greater than 4032ms
|
|
# Bits <7:6> are a multiplier (1, 4, 16 or 64 times) Bits <5:0> are 1ms steps (0 to 63ms)
|
|
codedDuration = 0
|
|
if (heatDuration < 4032):
|
|
factor = 0
|
|
while (heatDuration > 63):
|
|
heatDuration = (heatDuration // 4)
|
|
factor = factor + 1
|
|
|
|
codedDuration = heatDuration + (factor * 64)
|
|
else:
|
|
codedDuration = 255
|
|
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, 0x64, codedDuration.to_bytes(1, 'big')) # gas_wait_0 register - heater step 0
|
|
|
|
# Select index of heater step (0 to 9): CTRL_GAS_1 reg <3:0> (Make sure to combine with gas enable setting already there)
|
|
gasEnable = self.getUInt8(self.CTRL_GAS_1) & 0x20
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_GAS_1, (0x00 | gasEnable).to_bytes(1, 'big')) # Select heater step 0
|
|
|
|
self.gasInit = True
|
|
|
|
# Run all measurements on the BME688: Temperature, Pressure, Humidity & Gas Resistance.
|
|
def measureData(self):
|
|
if (self.bme688InitFlag == False):
|
|
self.bme688Init()
|
|
|
|
self.measTimePrev = self.measTime # Store previous measurement time (ms since micro:bit powered on)
|
|
|
|
# Set mode to FORCED MODE to begin single read cycle: CTRL_MEAS reg <1:0> (Make sure to combine with temp/pressure oversampling settings already there)
|
|
oSampleTP = self.getUInt8(self.CTRL_MEAS)
|
|
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, str((0x01 | oSampleTP)))
|
|
|
|
# Check New Data bit to see if values have been measured: MEAS_STATUS_0 bit <7>
|
|
newData = (self.getUInt8(self.MEAS_STATUS_0) & 0x80) >> 7
|
|
while (newData != 1):
|
|
newData = (self.getUInt8(self.MEAS_STATUS_0) & 0x80) >> 7
|
|
|
|
# Check Heater Stability Status bit to see if gas values have been measured: <4> (heater stability)
|
|
heaterStable = (self.getUInt8(self.GAS_RES_LSB_0) & 0x10) >> 4
|
|
|
|
# If there is new data, read temperature ADC registers(this is required for all other calculations)
|
|
self.tRaw = (self.getUInt8(self.TEMP_MSB_0) << 12) | (self.getUInt8(self.TEMP_LSB_0) << 4) | (self.getUInt8(self.TEMP_XLSB_0) >> 4)
|
|
|
|
# Read pressure ADC registers
|
|
self.pRaw = (self.getUInt8(self.PRESS_MSB_0) << 12) | (self.getUInt8(self.PRESS_LSB_0) << 4) | (self.getUInt8(self.PRESS_XLSB_0) >> 4)
|
|
|
|
# Read humidity ADC registers
|
|
self.hRaw = (self.getUInt8(self.HUMID_MSB_0) << 8) | (self.getUInt8(self.HUMID_LSB_0) >> 4)
|
|
|
|
# Read gas resistance ADC registers
|
|
self.gResRaw = (self.getUInt8(self.GAS_RES_MSB_0) << 2) | self.getUInt8(self.GAS_RES_LSB_0) >> 6 # Shift bits <7:6> right to get LSB for gas resistance
|
|
|
|
gasRange = self.getUInt8(self.GAS_RES_LSB_0) & 0x0F
|
|
|
|
self.measTime = ticks_ms() # Capture latest measurement time (ms since Pico powered on)
|
|
|
|
# Calculate the compensated reading values from the the raw ADC data
|
|
self.calcTemperature(self.tRaw)
|
|
self.intCalcPressure(self.pRaw)
|
|
self.intCalcHumidity(self.hRaw, self.tRead)
|
|
self.intCalcgRes(self.gResRaw, gasRange)
|
|
|
|
# A baseline gas resistance is required for the IAQ calculation - it should be taken in a well ventilated area without obvious air pollutants
|
|
# Take 60 readings over a ~5min period and find the mean
|
|
# Establish the baseline gas resistance reading and the ambient temperature.
|
|
# These values are required for air quality calculations
|
|
# When the baseline process is complete, values for gBase and tAmbient are stored in a file
|
|
# On subsequent power cycles of the board, this function will look for that file and take the baseline values stored there
|
|
# To force the baselines process to be run again, call the function like this: calcBaselines(True)
|
|
def calcBaselines(self, forcedRun=False):
|
|
if (self.bme688InitFlag == False):
|
|
self.bme688Init()
|
|
if (self.gasInit == False):
|
|
self.setupGasSensor()
|
|
|
|
self.screen.clear()
|
|
self.screen.displayText("Setting Baseline", 2)
|
|
self.screen.show()
|
|
|
|
try: # Look for a 'baselines.txt' file existing - if it does, take the baseline values from there (unless 'forcedRun' is set to True)
|
|
if not forcedRun:
|
|
f = open("baselines.txt", "r")
|
|
self.gBase = float(f.readline())
|
|
self.tAmbient = float(f.readline())
|
|
else:
|
|
raise Exception("RUNNING BASELINE PROCESS")
|
|
except: # If there is no file, an exception is raised, and the baseline process will be carried out (creating a new file at the end)
|
|
self.ambTempFlag = False
|
|
|
|
burnInReadings = 0
|
|
burnInData = 0
|
|
ambTotal = 0
|
|
progress = 0
|
|
while (burnInReadings < 60): # Measure data and continue summing gas resistance until 60 readings have been taken
|
|
progress = math.trunc((burnInReadings / 60) * 100)
|
|
self.screen.clear()
|
|
self.screen.displayText(str(progress) + "%", 4, 50)
|
|
self.screen.displayText("Setting Baseline", 2)
|
|
self.screen.show()
|
|
self.measureData()
|
|
burnInData = burnInData + self.gRes
|
|
ambTotal = ambTotal + self.newAmbTemp
|
|
sleep_ms(5000)
|
|
burnInReadings = burnInReadings + 1
|
|
|
|
self.gBase = (burnInData / 60) # Find the mean gas resistance during the period to form the baseline
|
|
self.tAmbient = (ambTotal / 60) # Calculate the ambient temperature as the mean of the 60 initial readings
|
|
|
|
# Save baseline values to a file
|
|
f = open("baselines.txt", "w") #open in write - creates if not existing, will overwrite if it does
|
|
f.write(str(self.gBase) + "\r\n")
|
|
f.write(str(self.tAmbient) + "\r\n")
|
|
f.close()
|
|
|
|
self.ambTempFlag = True
|
|
|
|
self.screen.clear()
|
|
self.screen.displayText("Setup Complete!", 2)
|
|
self.screen.show()
|
|
sleep_ms(2000)
|
|
self.screen.clear()
|
|
self.screen.show()
|
|
|
|
# Read Temperature from sensor as a Number.
|
|
# Units for temperature are in °C (Celsius) or °F (Fahrenheit) according to selection.
|
|
def readTemperature(self, temperature_unit="C"):
|
|
temperature = self.tRead
|
|
# Change temperature from °C to °F if user selection requires it
|
|
if (temperature_unit == "F"):
|
|
temperature = ((temperature * 18) + 320) / 10
|
|
|
|
return temperature
|
|
|
|
# Read Pressure from sensor as a Number.
|
|
# Units for pressure are in Pa (Pascals) or mBar (millibar) according to selection.
|
|
def readPressure(self, pressure_unit="Pa"):
|
|
pressure = self.pRead
|
|
#Change pressure from Pascals to millibar if user selection requires it
|
|
if (pressure_unit == "mBar"):
|
|
pressure = pressure / 100
|
|
|
|
return pressure
|
|
|
|
# Read Humidity from sensor as a Number.
|
|
# Humidity is output as a percentage.
|
|
def readHumidity(self):
|
|
return self.hRead
|
|
|
|
# Read Gas Resistance from sensor as a Number.
|
|
# Units for gas resistance are in Ohms.
|
|
def readGasRes(self):
|
|
if (self.gasInit == False):
|
|
self.screen.clear()
|
|
self.screen.displayText("ERROR", 2)
|
|
self.screen.displayText("Setup Gas Sensor", 3)
|
|
self.screen.show()
|
|
return 0
|
|
return self.gRes
|
|
|
|
# Read eCO2 from sensor as a Number (250 - 40000+ppm).
|
|
# Units for eCO2 are in ppm (parts per million).
|
|
def readeCO2(self):
|
|
if (self.gasInit == False):
|
|
self.screen.clear()
|
|
self.screen.displayText("ERROR", 2)
|
|
self.screen.displayText("Setup Gas Sensor", 3)
|
|
self.screen.show()
|
|
return 0
|
|
self.calcAirQuality()
|
|
|
|
return self.eCO2Value
|
|
|
|
# Return the Air Quality rating as a percentage (0% = Bad, 100% = Excellent).
|
|
def getAirQualityPercent(self):
|
|
if (self.gasInit == False):
|
|
self.screen.clear()
|
|
self.screen.displayText("ERROR", 2)
|
|
self.screen.displayText("Setup Gas Sensor", 3)
|
|
self.screen.show()
|
|
return 0
|
|
self.calcAirQuality()
|
|
|
|
return self.iaqPercent
|
|
|
|
# Return the Air Quality rating as an IAQ score (500 = Bad, 0 = Excellent).
|
|
# These values are based on the BME688 datasheet, Page 11, Table 6.
|
|
def getAirQualityScore(self):
|
|
if (self.gasInit == False):
|
|
self.screen.clear()
|
|
self.screen.displayText("ERROR", 2)
|
|
self.screen.displayText("Setup Gas Sensor", 3)
|
|
self.screen.show()
|
|
return 0
|
|
self.calcAirQuality()
|
|
|
|
return self.iaqScore
|
|
|
|
# Calculate the Index of Air Quality score from the current gas resistance and humidity readings
|
|
# iaqPercent: 0 to 100% - higher value = better air quality
|
|
# iaqScore: 25 should correspond to 'typically good' air, 250 to 'typically polluted' air
|
|
# airQualityRating: Text output based on the iaqScore
|
|
# Calculate the estimated CO2 value (eCO2)
|
|
def calcAirQuality(self):
|
|
humidityScore = 0
|
|
gasScore = 0
|
|
humidityOffset = self.hRead - self.hBase # Calculate the humidity offset from the baseline setting
|
|
ambTemp = (self.tAmbient / 100)
|
|
temperatureOffset = self.tRead - ambTemp # Calculate the temperature offset from the ambient temperature
|
|
humidityRatio = ((humidityOffset / self.hBase) + 1)
|
|
temperatureRatio = (temperatureOffset / ambTemp)
|
|
|
|
# IAQ Calculations
|
|
|
|
if (humidityOffset > 0): # Different paths for calculating the humidity score depending on whether the offset is greater than 0
|
|
humidityScore = (100 - self.hRead) / (100 - self.hBase)
|
|
else:
|
|
humidityScore = self.hRead / self.hBase
|
|
|
|
humidityScore = humidityScore * self.hWeight * 100
|
|
|
|
gasRatio = (self.gRes / self.gBase)
|
|
|
|
if ((self.gBase - self.gRes) > 0): # Different paths for calculating the gas score depending on whether the offset is greater than 0
|
|
gasScore = gasRatio * (100 * (1 - self.hWeight))
|
|
else:
|
|
# Make sure that when the gas offset and humidityOffset are 0, iaqPercent is 95% - leaves room for cleaner air to be identified
|
|
gasScore = math.floor(70 + (5 * (gasRatio - 1)))
|
|
if (gasScore > 75):
|
|
gasScore = 75
|
|
|
|
self.iaqPercent = math.trunc(humidityScore + gasScore) # Air quality percentage is the sum of the humidity (25% weighting) and gas (75% weighting) scores
|
|
self.iaqScore = (100 - self.iaqPercent) * 5 # Final air quality score is in range 0 - 500 (see BME688 datasheet page 11 for details)
|
|
|
|
# eCO2 Calculations
|
|
self.eCO2Value = 250 * math.pow(math.e, (0.012 * self.iaqScore)) # Exponential curve equation to calculate the eCO2 from an iaqScore input
|
|
|
|
# Adjust eCO2Value for humidity and/or temperature greater than the baseline values
|
|
if (humidityOffset > 0):
|
|
if (temperatureOffset > 0):
|
|
self.eCO2Value = self.eCO2Value * (humidityRatio + temperatureRatio)
|
|
else:
|
|
self.eCO2Value = self.eCO2Value * humidityRatio
|
|
elif (temperatureOffset > 0):
|
|
self.eCO2Value = self.eCO2Value * (temperatureRatio + 1)
|
|
|
|
# If measurements are taking place rapidly, breath detection is possible due to the sudden increase in humidity (~7-10%)
|
|
# If this increase happens within a 5s time window, 1200ppm is added to the eCO2 value
|
|
# (These values were based on 'breath-testing' with another eCO2 sensor with algorithms built-in)
|
|
if ((self.measTime - self.measTimePrev) <= 5000):
|
|
if ((self.hRead - self.hPrev) >= 3):
|
|
self.eCO2Value = self.eCO2Value + 1500
|
|
|
|
self.eCO2Value = math.trunc(self.eCO2Value)
|
|
|
|
# The KitronikOLED class enables control of the OLED display screen on the board
|
|
# Subclassing FrameBuffer provides support for graphics primitives
|
|
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
|
|
class KitronikOLED(framebuf.FrameBuffer):
|
|
# Write commands to the OLED controller
|
|
def write_cmd(self, cmd):
|
|
self.temp[0] = 0x80 # Co=1, D/C#=0
|
|
self.temp[1] = cmd
|
|
self.i2c.writeto(self.CHIP_ADDRESS, self.temp)
|
|
|
|
# Write data to the OLED controller
|
|
def write_data(self, buf):
|
|
self.write_list[1] = buf
|
|
self.i2c.writevto(self.CHIP_ADDRESS, self.write_list)
|
|
|
|
# Runs on initialisation of the class
|
|
# Sets up all the register definitions and global variables
|
|
def __init__(self, i2cAddr=0x3C, sda=6, scl=7):
|
|
self.CHIP_ADDRESS = i2cAddr # I2C address as determined by hardware configuration
|
|
# register definitions
|
|
self.SET_CONTRAST = const(0x81)
|
|
self.SET_ENTIRE_ON = const(0xA4)
|
|
self.SET_NORM_INV = const(0xA6)
|
|
self.SET_DISP = const(0xAE)
|
|
self.SET_MEM_ADDR = const(0x20)
|
|
self.SET_COL_ADDR = const(0x21)
|
|
self.SET_PAGE_ADDR = const(0x22)
|
|
self.SET_DISP_START_LINE = const(0x40)
|
|
self.SET_SEG_REMAP = const(0xA0)
|
|
self.SET_MUX_RATIO = const(0xA8)
|
|
self.SET_COM_OUT_DIR = const(0xC0)
|
|
self.SET_DISP_OFFSET = const(0xD3)
|
|
self.SET_COM_PIN_CFG = const(0xDA)
|
|
self.SET_DISP_CLK_DIV = const(0xD5)
|
|
self.SET_PRECHARGE = const(0xD9)
|
|
self.SET_VCOM_DESEL = const(0xDB)
|
|
self.SET_CHARGE_PUMP = const(0x8D)
|
|
|
|
sda = Pin(sda)
|
|
scl = Pin(scl)
|
|
self.i2c = I2C(1,sda=sda, scl=scl, freq=100000)
|
|
|
|
self.plotArray = []
|
|
self.plotYMin = 0
|
|
self.plotYMax = 100
|
|
self.yPixelMin = 63
|
|
self.yPixelMax = 12
|
|
|
|
self.temp = bytearray(2)
|
|
self.write_list = [b"\x40", None] # Co=0, D/C#=1
|
|
|
|
self.width = 128
|
|
self.height = 64
|
|
self.external_vcc = False
|
|
self.pages = 8
|
|
self.buffer = bytearray(self.pages * self.width)
|
|
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
|
|
self.init_display()
|
|
|
|
# Initialise the display settings and start the display clear
|
|
def init_display(self):
|
|
for cmd in (
|
|
self.SET_DISP | 0x00, # off
|
|
# address setting
|
|
self.SET_MEM_ADDR,
|
|
0x00, # horizontal
|
|
# resolution and layout
|
|
self.SET_DISP_START_LINE | 0x00,
|
|
self.SET_SEG_REMAP,# | 0x01, # Set to either 0xA0 or A1, flips screen horizontally
|
|
self.SET_MUX_RATIO,
|
|
self.height - 1,
|
|
self.SET_COM_OUT_DIR, #| 0x08, # Set to either 0xC0 or 0xC8, flips screen vertically
|
|
self.SET_DISP_OFFSET,
|
|
0x00,
|
|
self.SET_COM_PIN_CFG,
|
|
0x02 if self.width > 2 * self.height else 0x12,
|
|
# timing and driving scheme
|
|
self.SET_DISP_CLK_DIV,
|
|
0x80,
|
|
self.SET_PRECHARGE,
|
|
0x22 if self.external_vcc else 0xF1,
|
|
self.SET_VCOM_DESEL,
|
|
0x30, # 0.83*Vcc
|
|
# display
|
|
self.SET_CONTRAST,
|
|
0xFF, # maximum
|
|
self.SET_ENTIRE_ON, # output follows RAM contents
|
|
self.SET_NORM_INV, # not inverted
|
|
# charge pump
|
|
self.SET_CHARGE_PUMP,
|
|
0x10 if self.external_vcc else 0x14,
|
|
self.SET_DISP | 0x01,
|
|
): # on
|
|
self.write_cmd(cmd)
|
|
self.fill(0)
|
|
self.show()
|
|
|
|
# Screen will switch off, but retain the information that was displayed
|
|
def poweroff(self):
|
|
self.write_cmd(self.SET_DISP | 0x00)
|
|
|
|
# Turn the screen back on - do not need to re-display what was showing as the information is retained
|
|
def poweron(self):
|
|
self.write_cmd(self.SET_DISP | 0x01)
|
|
|
|
# 0 = Dim to 150 = Bright
|
|
def contrast(self, contrast):
|
|
self.write_cmd(self.SET_CONTRAST)
|
|
self.write_cmd(contrast)
|
|
|
|
# 0 = White on black, 1 = Black on white
|
|
def invert(self, invert):
|
|
self.write_cmd(self.SET_NORM_INV | (invert & 1))
|
|
|
|
# Set text to display on a particular line (1 - 6) and an x-axis offset can be set (0 - 127, 0 is default)
|
|
# If the text is longer than than the screen it will be cut off, it will not be pushed to the next line (16 characters max per line)
|
|
# Need to call 'show()' to make the text actually display
|
|
def displayText(self, text, line, x_offset=0):
|
|
if (line < 1):
|
|
line = 1
|
|
if (line > 6):
|
|
line = 6
|
|
|
|
y = (line * 11) - 10
|
|
|
|
super().text(text, x_offset, y)
|
|
|
|
# Make what has been set to display actually appear on the screen
|
|
# Needs to be called after 'displayText()', 'plot()', clear()', 'drawLine()' & 'drawRect()'
|
|
def show(self):
|
|
x0 = 0
|
|
x1 = self.width - 1
|
|
if self.width == 64:
|
|
# displays with width of 64 pixels are shifted by 32
|
|
x0 += 32
|
|
x1 += 32
|
|
self.write_cmd(self.SET_COL_ADDR)
|
|
self.write_cmd(x0)
|
|
self.write_cmd(x1)
|
|
self.write_cmd(self.SET_PAGE_ADDR)
|
|
self.write_cmd(0)
|
|
self.write_cmd(self.pages - 1)
|
|
self.write_data(self.buffer)
|
|
|
|
# Plot a live updating graph of a variable
|
|
# Plot y range is pixels 12 down to 63, leaving room for a title or similar on the first line
|
|
# Need to call 'show()' to make the plot actually display
|
|
def plot(self, variable):
|
|
variable = math.trunc(variable)
|
|
|
|
if (variable > self.plotYMax):
|
|
self.plotYMax = variable
|
|
if (variable < self.plotYMin):
|
|
self.plotYMin = variable
|
|
|
|
entries = len(self.plotArray)
|
|
if (entries >= 128):
|
|
prevX = 0
|
|
prevY = self.plotArray[127]
|
|
self.plotArray.pop(0)
|
|
self.plotArray.append(variable)
|
|
else:
|
|
self.plotArray.append(variable)
|
|
prevX = len(self.plotArray) - 1
|
|
prevY = self.plotArray[prevX]
|
|
|
|
for entry in range(entries):
|
|
x = entry
|
|
y = self.plotArray[entry]
|
|
y = math.trunc(self.yPixelMin - (y * ((self.yPixelMin - self.yPixelMax) / (self.plotYMax - self.plotYMin))))
|
|
if (x == 0):
|
|
super().pixel(x, y, 1)
|
|
else:
|
|
self.drawLine(prevX, prevY, x, y)
|
|
prevX = x
|
|
prevY = y
|
|
|
|
# Wipe all data from the screen
|
|
# Need to call 'show()' to make the clear actually happen
|
|
def clear(self):
|
|
super().fill(0)
|
|
|
|
# Clear a specific line on the screen
|
|
def clearLine(self, line):
|
|
yPixel = (line - 1) + ((line * 10) - 10)
|
|
super().fill_rect(0, yPixel, 128, 10, 0)
|
|
|
|
# Draw a line on the screen (vertical, horizontal or diagonal), setting start and finish (x, y) coordinates
|
|
# Need to call 'show()' to make the line actually display
|
|
def drawLine(self, start_x, start_y, end_x, end_y):
|
|
super().line(start_x, start_y, end_x, end_y, 1)
|
|
|
|
# Draw rectangles with a top left starting (x, y) coordinate and then a width and height
|
|
# Can be filled (True) or just an outline (False)
|
|
# Need to call 'show()' to make the rectangle actually display
|
|
def drawRect(self, start_x, start_y, width, height, fill=False):
|
|
if (fill == False):
|
|
super().rect(start_x, start_y, width, height, 1)
|
|
elif (fill == True):
|
|
super().fill_rect(start_x, start_y, width, height, 1)
|