import math
import framebuf
import array
import os
from machine import Pin, PWM, ADC, time_pulse_us, I2C, RTC
from rp2 import PIO, StateMachine, asm_pio
from time import sleep, sleep_ms, sleep_us, ticks_ms, ticks_us
from micropython import const
# Initialise the module with all outputs off
# High Power Output Pins
hp_3 = Pin(3, Pin.OUT)
hp_15 = Pin(15, Pin.OUT)
# Servo
servo = Pin(2, Pin.OUT)
# Buzzer
buzzer = PWM(Pin(4))
# The KitronikButton class enable the use of the 2 user input buttons on the board
class KitronikButton:
def __init__(self):
self.buttonA = Pin(12, Pin.IN, Pin.PULL_DOWN)
self.buttonB = Pin(13, Pin.IN, Pin.PULL_DOWN)
# The KitronikOutputControl class enables control of the servo and high-power outputs on the board
class KitronikOutputControl:
# This code drives a pwm on the PIO. It is running at 2Mhz, which gives the PWM a 1uS resolution.
def _servo_pwm():
# First we clear the pin to zero, then load the registers. Y is always 20000 - 20uS, x is the pulse 'on' length.
pull(noblock) .side(0)
mov(x, osr) # Keep most recent pull data stashed in X, for recycling by noblock
mov(y, isr) # ISR must be preloaded with PWM count max
# This is where the looping work is done. the overall loop rate is 1Mhz (clock is 2Mhz - we have 2 instructions to do)
jmp(x_not_y, "skip") #if there is 'excess' Y number leave the pin alone and jump to the 'skip' label until we get to the X value
nop() .side(1)
jmp(y_dec, "loop") #count down y by 1 and jump to pwmloop. When y is 0 we will go back to the 'pull' command
def __init__(self):
# High Power Output Pins
self.highPwr_3 = Pin(3, Pin.OUT)
self.highPwr_15 = Pin(15, Pin.OUT)
# Servo Control
self.servo = []
# Servo 0 degrees -> pulse of 0.5ms, 180 degrees 2.5ms
# Pulse train freq 50hz - 20ms
# 1us is freq of 1000000
# Servo pulses range from 500 to 2500us and overall pulse train is 20000us repeat.
# Servo is on GP2
self.maxServoPulse = 2500
self.minServoPulse = 500
self.pulseTrain = 20000
self.degreesToUS = 2000/180
# Create and start the servo statemachine
self.servo.append (StateMachine(1, self._servo_pwm, freq=2000000, sideset_base=Pin(2)))
self.servo[0].exec("mov(isr, osr)")
# Doesn't actually register/unregister, just stops and starts the servo PIO
# The servo is registered by default in the __init__() function - these are only required if you want to use Pin 2 for something else, and then register the servo again
def registerServo(self):
if(not self.servo[0].active()):
def deregisterServo(self):
# goToPosition takes a degree position for the servo to go to.
# 0degrees->180 degrees is 0->2000us, plus offset of 500uS
# 1 degree ~ 11uS.
# This function does the sum then calls goToPeriod to actually poke the PIO
def servoToPosition(self, degrees):
pulseLength = int(degrees*self.degreesToUS + 500)
def servoToPeriod(self, period):
if(period < 500):
period = 500
if(period >2500):
period =2500
# Functions to turn on/off the high power outputs
# Enter the pin number, either '3' or '15'
def highPowerOn(self, pin):
if (pin == 3):
elif (pin == 15):
def highPowerOff(self, pin):
if (pin == 3):
elif (pin == 15):
# The KitronikDataLogger class enables data logging through the Pico file system
# It is possible to create multiple data logger instances to then log to multiple files simulataneously
class KitronikDataLogger:
# Function is called when the class is initialised - sets the maximum permissable filesize, the data separator and creates the log file with the entered filename
# Separator options: ("comma", "semicolon", "tab")
def __init__(self, file = "data_log.txt", separator = "semicolon"):
self.MAX_FILE_SIZE = 500000 # This is approximately 10000 full entries
if (separator == "comma"):
self.SEPARATOR = ","
elif (separator == "semicolon"):
self.SEPARATOR = ";"
elif (separator == "tab"):
self.SEPARATOR = "\t"
self.FILENAME = file
f = open(self.FILENAME, "x")
except OSError:
print("File already exists")
self.line1 = ""
self.line2 = ""
self.line3 = ""
self.dataHeadings = ""
self.projectInfo = False
self.headings = False
# Write a header section to the specified file (there are 3 free text fields, each will write on a separate line)
def writeProjectInfo(self, line1="", line2="", line3=""):
if (line1 != ""):
self.writeFile(self.FILENAME, line1 + "\r\n")
self.line1 = line1
if (line2 != ""):
self.writeFile(self.FILENAME, line2 + "\r\n")
self.line2 = line2
if (line3 != ""):
self.writeFile(self.FILENAME, line3 + "\r\n")
self.line3 = line3
self.projectInfo = True
# This writes whatever is passed to it to the file
def writeFile(self, file, passed):
f = open(file, "a") #open in append - creates if not existing, will append if it exists
# Input and write to the file up to 10 data field headings
def nameColumnHeadings(self, field1="", field2="", field3="", field4="", field5="", field6="", field7="", field8="", field9="", field10=""):
dataHeadings = ""
if (field1 != ""):
dataHeadings = field1 + self.SEPARATOR
if (field2 != ""):
dataHeadings = dataHeadings + field2 + self.SEPARATOR
if (field3 != ""):
dataHeadings = dataHeadings + field3 + self.SEPARATOR
if (field4 != ""):
dataHeadings = dataHeadings + field4 + self.SEPARATOR
if (field5 != ""):
dataHeadings = dataHeadings + field5 + self.SEPARATOR
if (field6 != ""):
dataHeadings = dataHeadings + field6 + self.SEPARATOR
if (field7 != ""):
dataHeadings = dataHeadings + field7 + self.SEPARATOR
if (field8 != ""):
dataHeadings = dataHeadings + field8 + self.SEPARATOR
if (field9 != ""):
dataHeadings = dataHeadings + field9 + self.SEPARATOR
if (field10 != ""):
dataHeadings = dataHeadings + field10 + self.SEPARATOR
self.dataHeadings = dataHeadings
self.writeFile(self.FILENAME, dataHeadings + "\r\n")
self.headings = True
# Store up to 10 data entries (match the order with the data headings used)
def storeDataEntry(self, field1="", field2="", field3="", field4="", field5="", field6="", field7="", field8="", field9="", field10=""):
dataEntry = ""
if (field1 != ""):
dataEntry = field1 + self.SEPARATOR
if (field2 != ""):
dataEntry = dataEntry + field2 + self.SEPARATOR
if (field3 != ""):
dataEntry = dataEntry + field3 + self.SEPARATOR
if (field4 != ""):
dataEntry = dataEntry + field4 + self.SEPARATOR
if (field5 != ""):
dataEntry = dataEntry + field5 + self.SEPARATOR
if (field6 != ""):
dataEntry = dataEntry + field6 + self.SEPARATOR
if (field7 != ""):
dataEntry = dataEntry + field7 + self.SEPARATOR
if (field8 != ""):
dataEntry = dataEntry + field8 + self.SEPARATOR
if (field9 != ""):
dataEntry = dataEntry + field9 + self.SEPARATOR
if (field10 != ""):
dataEntry = dataEntry + field10 + self.SEPARATOR
while (self.checkFileSize() > self.MAX_FILE_SIZE):
self.writeFile(self.FILENAME, dataEntry + "\r\n")
# This returns the size of the file, or 0 if the file does not exist
def checkFileSize(self):
# f is a file-like object.
f = open(self.FILENAME, "r") # Open read - this throws an error if file does not exist - in that case the size is 0
f.seek(0, 2)
size = f.tell()
return size
# if we wanted to know we could print some diagnostics here like:
#print("Exception - File does not exist")
return 0
# Remove a line from the data file to make space for more data
def removeOneLine(self):
tempName = self.FILENAME + ".bak"
readFrom = open(self.FILENAME, "r")
writeTo = open(tempName, "w")
if self.projectInfo: # If there is Project Info, skip over these lines and then write them to the temporary file
for l in range(3):
writeTo.write(self.line1 + "\r\n")
writeTo.write(self.line2 + "\r\n")
writeTo.write(self.line3 + "\r\n")
if self.headings:
readFrom.readline() # If there are Headings, skip over this line and then write them to the temporary file
writeTo.write(self.dataHeadings + "\r\n")
readFrom.readline() # Read and throw away the first line of data in the file
for lines in readFrom: # Write the remaining lines to the temporary file
readFrom.close() # Close both files
os.remove(self.FILENAME) # Delete original log file
os.rename(tempName, self.FILENAME) # Rename temporary file as new log file (now with the first line of data removed)
# Deletes all the contents of the file
def eraseAllData(self):
f = open(self.FILENAME, "w")
# Deletes the file from the Pico file system
def deleteDataFile(self):
# The KitronikBuzzer class enables control of the piezo buzzer on the board
class KitronikBuzzer:
# Function is called when the class is initialised and sets the buzzer pin to GP4
def __init__(self):
self.buzzer = PWM(Pin(4))
self.dutyCycle = 32767
# Play a continous tone at a specified frequency
def playTone(self, freq):
if (freq < 30):
freq = 30
if (freq > 3000):
freq = 3000
# Play a tone at a speciied frequency for a specified length of time in ms
def playTone_Length(self, freq, length):
# Stop the buzzer producing a tone
def stopTone(self):
# The KitronikZIPLEDs class enables control of the ZIP LEDs both on the board and any connected externally
class KitronikZIPLEDs:
# We drive the ZIP LEDs using one of the PIO statemachines.
@asm_pio(sideset_init=PIO.OUT_LOW, out_shiftdir=PIO.SHIFT_LEFT, autopull=True, pull_thresh=24)
def _ZIPLEDOutput():
T1 = 2
T2 = 5
T3 = 3
out(x, 1) .side(0) [T3 - 1]
jmp(not_x, "do_zero") .side(1) [T1 - 1]
jmp("bitloop") .side(1) [T2 - 1]
nop() .side(0) [T2 - 1]
def __init__(self, num_zip_leds):
self.num_zip_leds = num_zip_leds
# Create and start the StateMachine for the ZIPLeds
self.ZIPLEDs = StateMachine(4, self._ZIPLEDOutput, freq=8_000_000, sideset_base=Pin(20))
self.theLEDs = array.array("I", [0 for _ in range(self.num_zip_leds)]) #an array for the LED colours.
self.brightness = 0.5 #20% initially
# Define some colour tuples for people to use.
self.BLACK = (0, 0, 0)
self.RED = (255, 0, 0)
self.YELLOW = (255, 150, 0)
self.GREEN = (0, 255, 0)
self.CYAN = (0, 255, 255)
self.BLUE = (0, 0, 255)
self.PURPLE = (180, 0, 255)
self.WHITE = (255, 255, 255)
self.COLOURS = (self.BLACK, self.RED, self.YELLOW, self.GREEN, self.CYAN, self.BLUE, self.PURPLE, self.WHITE)
# Show pushes the current setup of the LEDS to the physical LEDS - it makes them visible.
def show(self):
brightAdjustedLEDs = array.array("I", [0 for _ in range(self.num_zip_leds)])
for i,c in enumerate(self.theLEDs):
r = int(((c >> 8) & 0xFF) * self.brightness)
g = int(((c >> 16) & 0xFF) * self.brightness)
b = int((c & 0xFF) * self.brightness)
brightAdjustedLEDs[i] = (g<<16) + (r<<8) + b
self.ZIPLEDs.put(brightAdjustedLEDs, 8)
# Turn the LED off by setting the colour to black
def clear(self, whichLED):
self.setLED(whichLED, self.BLACK)
# Sets the colour of an individual LED. Use show to make change visible
def setLED(self, whichLED, whichColour):
raise Exception("INVALID LED:",whichLED," specified")
elif(whichLED>(self.num_zip_leds - 1)):
raise Exception("INVALID LED:",whichLED," specified")
self.theLEDs[whichLED] = (whichColour[1]<<16) + (whichColour[0]<<8) + whichColour[2]
# Gets the stored colour of an individual LED, which isnt nessecerily the colour on show if it has been changed, but not 'show'n
def getLED(self, whichLED):
raise Exception("INVALID LED:",whichLED," specified")
elif(whichLED>(self.num_zip_leds - 1)):
raise Exception("INVALID LED:",whichLED," specified")
return(((self.theLEDs[whichLED]>>8) & 0xff), ((self.theLEDs[whichLED]>>16)& 0xff) ,((self.theLEDs[whichLED])& 0xff))
# Takes 0-100 as a brightness value, brighness is applies in the'show' function
def setBrightness(self, value):
#cap to 0-100%
if (value<0):
value = 0
elif (value>100):
self.brightness = value / 100
# The KitronikRTC class enables use of the Pico RTC
class KitronikRTC:
# Function is called when the class is initialised and creates an instance of the Pico RTC and defines all the global variables
def __init__(self):
self.rtc = RTC()
self.day = 0
self.month = 0
self.year = 0
self.weekday = 0 # In range 0 - 6, 0 = Monday, 6 = Sunday
self.hour = 0
self.minute = 0
self.second = 0
self.alarmHour = 0
self.alarmMinute = 0
self.alarmSet = False
self.alarmTrigger = False
self.alarmRepeat = False
self.hourPeriod = 0
self.minutePeriod = 0
# Function calculates the weekday (0 = Monday, 6 = Sunday) based on the date, taking into account leap years
def calcWeekday(self, day, month, year):
dayOffset = [0, 3, 2, 5, 0, 3, 5, 1, 4, 6, 2, 4]
if (month < 3):
month = month - 1
self.weekday = (year + (year // 4) - (year // 100) + (year // 400) + dayOffset[month - 1] + day) % 7 # Returns 0 - 6, 0 = Sunday, 6 = Saturday
if (self.weekday == 0):
self.weekday = 6
self.weekday = self.weekday - 1
# Set the date on the Pico RTC
def setDate(self, day, month, year):
self.calcWeekday(day, month, year)
self.rtc.datetime((year, month, day, self.weekday, self.hour, self.minute, self.second, 0))
# Set the time on the Pico RTC
def setTime(self, hours, minutes, seconds):
self.rtc.datetime((self.year, self.month, self.day, self.weekday, hours, minutes, seconds, 0))
# Return the current date and time
def getDateTime(self):
newDateTime = self.rtc.datetime()
self.day = newDateTime[2]
self.month = newDateTime[1]
self.year = newDateTime[0]
self.weekday = newDateTime[3]
self.hour = newDateTime[4]
self.minute = newDateTime[5]
self.second = newDateTime[6]
# Return the current date as a string
def readDateString(self):
if (self.day < 10):
day = "0" + str(self.day)
day = str(self.day)
if (self.month < 10):
month = "0" + str(self.month)
month = str(self.month)
date = day + "/" + month + "/" + str(self.year)
return date
# Return the current time as a string
def readTimeString(self):
if (self.hour < 10):
hour = "0" + str(self.hour)
hour = str(self.hour)
if (self.minute < 10):
minute = "0" + str(self.minute)
minute = str(self.minute)
if (self.second < 10):
second = "0" + str(self.second)
second = str(self.second)
time = hour + ":" + minute + ":" + second
return time
# Return a specific date/time parameter as a number
# d = day, m = month, y = year, h = hour, min = minute, s = second
def readParameter(self, parameter):
if (parameter == "d"):
return self.day
elif (parameter == "m"):
return self.month
elif (parameter == "y"):
return self.year
elif (parameter == "h"):
return self.hour
elif (parameter == "min"):
return self.minute
elif (parameter == "s"):
return self.second
# Set an alarm for a specific hour and minute
# Extra options to set a periodically repeating alarm (set alarmRepeat to True and then specifiy the hour and/or minute period between alarms)
def setAlarm(self, hour, minute, alarmRepeat=False, hourPeriod=0, minutePeriod=0):
self.alarmHour = math.ceil(hour)
self.alarmMinute = math.ceil(minute)
self.alarmRepeat = alarmRepeat
if alarmRepeat:
self.hourPeriod = math.ceil(hourPeriod)
self.minutePeriod = math.ceil(minutePeriod)
self.alarmSet = True
# Check whether the alarm conditions have been met and then trigger the alarm
def checkAlarm(self):
if self.alarmSet:
if (self.readParameter("h") == self.alarmHour):
if (self.readParameter("min") == self.alarmMinute):
self.alarmTrigger = True
return self.alarmTrigger
# Sets 'alarmTrigger' back to False, checks whether the alarm should repeat (sets new one if it should) or sets 'alarmSet' back to False
def silenceAlarm(self):
self.alarmTrigger = False
if not self.alarmRepeat:
self.alarmSet = False
newHour = self.alarmHour + self.hourPeriod
newMinute = self.alarmMinute + self.minutePeriod
if (newMinute > 59):
newMinute = newMinute - 60
newHour = newHour + 1
if (newHour > 23):
newHour = newHour - 24
self.setAlarm(newHour, newMinute, True, self.hourPeriod, self.minutePeriod)
# The KitronikBME688 class enables contro and use of the BME688 sensor on the board
class KitronikBME688:
# The following functions are for reading the registers on the BME688
# Function for reading register as signed 8 bit integer
def getUInt8(self, reg):
return int.from_bytes(self.i2c.readfrom_mem(self.CHIP_ADDRESS, reg, 1), "big")
# Function to convert unsigned ints to twos complement signed ints
def twosComp(self, value, bits):
if ((value & (1 << (bits - 1))) != 0):
value = value - (1 << bits)
return value
# Function for proportionally mapping a value to a different value range
def mapValues(self, value, frMin, frMax, toMin, toMax):
toRange = toMax - toMin
mappedVal = toMin + ((value - frMin) * ((toMax - toMin) / (frMax - frMin)))
return mappedVal
def __init__(self, i2cAddr=0x77, sda=6, scl=7):
self.CHIP_ADDRESS = i2cAddr # I2C address as determined by hardware configuration
sda = Pin(sda)
scl = Pin(scl)
self.i2c = I2C(1,sda=sda, scl=scl, freq=100000)
# Useful BME688 Register Addresses
# Control
self.CTRL_MEAS = 0x74 # Bit position <7:5>: Temperature oversampling Bit position <4:2>: Pressure oversampling Bit position <1:0>: Sensor power mode
self.RESET = 0xE0 # Write 0xB6 to initiate soft-reset (same effect as power-on reset)
self.CHIP_ID = 0xD0 # Read this to return the chip ID: 0x61 - good way to check communication is occurring
self.CTRL_HUM = 0x72 # Bit position <2:0>: Humidity oversampling settings
self.CONFIG = 0x75 # Bit position <4:2>: IIR filter settings
self.CTRL_GAS_0 = 0x70 # Bit position <3>: Heater off (set to '1' to turn off current injection)
self.CTRL_GAS_1 = 0x71 # Bit position <5> DATASHEET ERROR: Enable gas conversions to start when set to '1' Bit position <3:0>: Heater step selection (0 to 9)
# Pressure Data
self.PRESS_MSB_0 = 0x1F # Forced & Parallel: MSB [19:12]
self.PRESS_LSB_0 = 0x20 # Forced & Parallel: LSB [11:4]
self.PRESS_XLSB_0 = 0x21 # Forced & Parallel: XLSB [3:0]
# Temperature Data
self.TEMP_MSB_0 = 0x22 # Forced & Parallel: MSB [19:12]
self.TEMP_LSB_0 = 0x23 # Forced & Parallel: LSB [11:4]
self.TEMP_XLSB_0 = 0x24 # Forced & Parallel: XLSB [3:0]
# Humidity Data
self.HUMID_MSB_0 = 0x25 # Forced & Parallel: MSB [15:8]
self.HUMID_LSB_0 = 0x26 # Forced & Parallel: LSB [7:0]
# Gas Resistance Data
self.GAS_RES_MSB_0 = 0x2C # Forced & Parallel: MSB [9:2]
self.GAS_RES_LSB_0 = 0x2D # Forced & Parallel: Bit <7:6>: LSB [1:0] Bit <5>: Gas valid Bit <4>: Heater stability Bit <3:0>: Gas resistance range
# Status
self.MEAS_STATUS_0 = 0x1D # Forced & Parallel: Bit <7>: New data Bit <6>: Gas measuring Bit <5>: Measuring Bit <3:0>: Gas measurement index
# Calibration parameters for compensation calculations
# Temperature
self.PAR_T1 = self.twosComp((self.getUInt8(0xEA) << 8) | self.getUInt8(0xE9), 16) # Signed 16-bit
self.PAR_T2 = self.twosComp((self.getUInt8(0x8B) << 8) | self.getUInt8(0x8A), 16) # Signed 16-bit
self.PAR_T3 = self.twosComp(self.getUInt8(0x8C), 8) # Signed 8-bit
# Pressure
self.PAR_P1 = (self.getUInt8(0x8F) << 8) | self.getUInt8(0x8E) # Always a positive number, do not do twosComp() conversion!
self.PAR_P2 = self.twosComp((self.getUInt8(0x91) << 8) | self.getUInt8(0x90), 16) # Signed 16-bit
self.PAR_P3 = self.twosComp(self.getUInt8(0x92), 8) # Signed 8-bit
self.PAR_P4 = self.twosComp((self.getUInt8(0x95) << 8) | self.getUInt8(0x94), 16) # Signed 16-bit
self.PAR_P5 = self.twosComp((self.getUInt8(0x97) << 8) | self.getUInt8(0x96), 16) # Signed 16-bit
self.PAR_P6 = self.twosComp(self.getUInt8(0x99), 8) # Signed 8-bit
self.PAR_P7 = self.twosComp(self.getUInt8(0x98), 8) # Signed 8-bit
self.PAR_P8 = self.twosComp((self.getUInt8(0x9D) << 8) | self.getUInt8(0x9C), 16) # Signed 16-bit
self.PAR_P9 = self.twosComp((self.getUInt8(0x9F) << 8) | self.getUInt8(0x9E), 16) # Signed 16-bit
self.PAR_P10 = self.twosComp(self.getUInt8(0xA0), 8) # Signed 8-bit
# Humidity
parH1_LSB_parH2_LSB = self.getUInt8(0xE2)
self.PAR_H1 = (self.getUInt8(0xE3) << 4) | (parH1_LSB_parH2_LSB & 0x0F)
self.PAR_H2 = (self.getUInt8(0xE1) << 4) | (parH1_LSB_parH2_LSB >> 4)
self.PAR_H3 = self.twosComp(self.getUInt8(0xE4), 8) # Signed 8-bit
self.PAR_H4 = self.twosComp(self.getUInt8(0xE5), 8) # Signed 8-bit
self.PAR_H5 = self.twosComp(self.getUInt8(0xE6), 8) # Signed 8-bit
self.PAR_H6 = self.twosComp(self.getUInt8(0xE7), 8) # Signed 8-bit
self.PAR_H7 = self.twosComp(self.getUInt8(0xE8), 8) # Signed 8-bit
# Gas resistance
self.PAR_G1 = self.twosComp(self.getUInt8(0xED), 8) # Signed 8-bit
self.PAR_G2 = self.twosComp((self.getUInt8(0xEB) << 8) | self.getUInt8(0xEC), 16) # Signed 16-bit
self.PAR_G3 = self.getUInt8(0xEE) # Unsigned 8-bit
self.RES_HEAT_RANGE = (self.getUInt8(0x02) >> 4) & 0x03
self.RES_HEAT_VAL = self.twosComp(self.getUInt8(0x00), 8) # Signed 8-bit
# Oversampling rate constants
self.OSRS_1X = 0x01
self.OSRS_2X = 0x02
self.OSRS_4X = 0x03
self.OSRS_8X = 0x04
self.OSRS_16X = 0x05
# IIR filter coefficient values
self.IIR_0 = 0x00
self.IIR_1 = 0x01
self.IIR_3 = 0x02
self.IIR_7 = 0x03
self.IIR_15 = 0x04
self.IIR_31 = 0x05
self.IIR_63 = 0x06
self.IIR_127 = 0x07
#Global variables used for storing one copy of value, these are used in multiple locations for calculations
self.bme688InitFlag = False
self.gasInit = False
self.tRead = 0 # calculated readings of sensor parameters from raw adc readings
self.pRead = 0
self.hRead = 0
self.gRes = 0
self.iaqPercent = 0
self.iaqScore = 0
self.airQualityRating = ""
self.eCO2Value = 0
self.gBase = 0
self.hBase = 40 # Between 30% & 50% is a widely recognised optimal indoor humidity, 40% is a good middle ground
self.hWeight = 0.25 # Humidity contributes 25% to the IAQ score, gas resistance is 75%
self.hPrev = 0
self.measTime = 0
self.measTimePrev = 0
self.tRaw = 0 # adc reading of raw temperature
self.pRaw = 0 # adc reading of raw pressure
self.hRaw = 0 # adc reading of raw humidity
self.gResRaw = 0 # adc reading of raw gas resistance
self.gasRange = 0
self.t_fine = 0 # Intermediate temperature value used for pressure calculation
self.newAmbTemp = 0
self.tAmbient = 0 # Intermediate temperature value used for heater calculation
self.ambTempFlag = False
# Create an instance of the OLED display screen for use during setup and for error messages
self.screen = KitronikOLED()
# Begin the hardware inititialisation for the BME688 sensor
# Temperature compensation calculation: rawADC to degrees C (integer)
def calcTemperature(self, tempADC):
var1 = (tempADC >> 3) - (self.PAR_T1 << 1)
var2 = (var1 * self.PAR_T2) >> 11
var3 = ((((var1 >> 1) * (var1 >> 1)) >> 12) * (self.PAR_T3 << 4)) >> 14
self.t_fine = var2 + var3
self.newAmbTemp = ((self.t_fine * 5) + 128) >> 8
self.tRead = self.newAmbTemp / 100 # Convert to floating point with 2 dp
if (self.ambTempFlag == False):
self.tAmbient = self.newAmbTemp
# Pressure compensation calculation: rawADC to Pascals (integer)
def intCalcPressure(self, pressureADC):
var1 = (self.t_fine >> 1) - 64000
var2 = ((((var1 >> 2) * (var1 >> 2)) >> 11) * self.PAR_P6) >> 2
var2 = var2 + ((var1 * self.PAR_P5) << 1)
var2 = (var2 >> 2) + (self.PAR_P4 << 16)
var1 = (((((var1 >> 2) * (var1 >> 2)) >> 13) * (self.PAR_P3 << 5)) >> 3) + ((self.PAR_P2 * var1) >> 1)
var1 = var1 >> 18
var1 = ((32768 + var1) * self.PAR_P1) >> 15
self.pRead = 1048576 - pressureADC
self.pRead = ((self.pRead - (var2 >> 12)) * 3125)
if (self.pRead >= (1 << 30)):
self.pRead = (self.pRead // var1) << 1
self.pRead = ((self.pRead << 1) // var1)
var1 = (self.PAR_P9 * (((self.pRead >> 3) * (self.pRead >> 3)) >> 13)) >> 12
var2 = ((self.pRead >> 2) * self.PAR_P8) >> 13
var3 = ((self.pRead >> 8) * (self.pRead >> 8) * (self.pRead >> 8) * self.PAR_P10) >> 17
self.pRead = self.pRead + ((var1 + var2 + var3 + (self.PAR_P7 << 7)) >> 4)
# Humidity compensation calculation: rawADC to % (integer)
# 'tempScaled' is the current reading from the Temperature sensor
def intCalcHumidity(self, humidADC, tempScaled):
self.hPrev = self.hRead
tempScaled = math.trunc(tempScaled)
var1 = humidADC - (self.PAR_H1 << 4) - (((tempScaled * self.PAR_H3) // 100) >> 1)
var2 = (self.PAR_H2 * (((tempScaled * self.PAR_H4) // 100) + (((tempScaled * ((tempScaled * self.PAR_H5) // 100)) >> 6) // 100) + (1 << 14))) >> 10
var3 = var1 * var2
var4 = ((self.PAR_H6 << 7) + ((tempScaled * self.PAR_H7) // 100)) >> 4
var5 = ((var3 >> 14) * (var3 >> 14)) >> 10
var6 = (var4 * var5) >> 1
self.hRead = (var3 + var6) >> 12
self.hRead = (((var3 + var6) >> 10) * (1000)) >> 12
self.hRead = self.hRead // 1000
# Gas sensor heater target temperature to target resistance calculation
# 'ambientTemp' is reading from Temperature sensor in degC (could be averaged over a day when there is enough data?)
# 'targetTemp' is the desired temperature of the hot plate in degC (in range 200 to 400)
# Note: Heating duration also needs to be specified for each heating step in 'gas_wait' registers
def intConvertGasTargetTemp(self, ambientTemp, targetTemp):
var1 = ((ambientTemp * self.PAR_G3) // 1000) << 8 # Divide by 1000 as we have ambientTemp in pre-degC format (i.e. 2500 rather than 25.00 degC)
var2 = (self.PAR_G1 + 784) * (((((self.PAR_G2 + 154009) * targetTemp * 5) // 100) + 3276800) // 10)
var3 = var1 + (var2 >> 1)
var4 = (var3 // (self.RES_HEAT_RANGE + 4))
var5 = (131 * self.RES_HEAT_VAL) + 65536 # Target heater resistance in Ohms
resHeatX100 = (((var4 // var5) - 250) * 34)
resHeat = ((resHeatX100 + 50) // 100)
return resHeat
# Gas resistance compensation calculation: rawADC & range to Ohms (integer)
def intCalcgRes(self, gasADC, gasRange):
var1 = 262144 >> gasRange
var2 = gasADC - 512
var2 = var2 * 3
var2 = 4096 + var2
calcGasRes = ((10000 * var1) // var2)
self.gRes = calcGasRes * 100
# Initialise the BME688, establishing communication, entering initial T, P & H oversampling rates, setup filter and do a first data reading (won't return gas)
def bme688Init(self):
# Establish communication with BME688
chipID = self.i2c.readfrom_mem(self.CHIP_ADDRESS, self.CHIP_ID, 1)
chipID = int.from_bytes(chipID, "big")
while (chipID != 97):
chipID = self.i2c.readfrom_mem(self.CHIP_ADDRESS, self.CHIP_ID, 1)
# Do a soft reset
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.RESET, "\xB6")
# Set mode to SLEEP MODE: CTRL_MEAS reg <1:0>
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, "\x00")
# Set the oversampling rates for Temperature, Pressure and Humidity
# Humidity: CTRL_HUM bits <2:0>
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_HUM, str(self.OSRS_2X))
# Temperature: CTRL_MEAS bits <7:5> Pressure: CTRL_MEAS bits <4:2> (Combine and write both in one command)
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, str(((self.OSRS_2X << 5) | (self.OSRS_16X << 2))))
# IIR Filter: CONFIG bits <4:2>
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CONFIG, str(self.IIR_3 << 2))
# Enable gas conversion: CTRL_GAS_1 bit <5> (although datasheet says <4> - not sure what's going on here...)
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_GAS_1, "\x20")
self.bme688InitFlag = True
# Do an initial data read (will only return temperature, pressure and humidity as no gas sensor parameters have been set)
# Setup the gas sensor (defaults are 300°C and 180ms).
# targetTemp is the target temperature for the gas sensor plate to reach (200 - 400°C), eg: 300
# heatDuration is the length of time for the heater to be turned on (0 - 4032ms), eg: 180
# WARNING: The temperature and duration values can be changed but this is not recommended unless the user is familiar with gas sensor setup
# The default values have been chosen as they provide a good all-round sensor response for air quality purposes
def setupGasSensor(self, targetTemp=300, heatDuration=180):
if (self.bme688InitFlag == False):
# Limit targetTemp between 200°C & 400°C
if (targetTemp < 200):
targetTemp = 200
elif (targetTemp > 400):
targetTemp = 400
# Limit heatDuration between 0ms and 4032ms
if (heatDuration < 0):
heatDuration = 0
elif (heatDuration > 4032):
heatDuration = 4032
# Define the target heater resistance from temperature
self.i2c.writeto_mem(self.CHIP_ADDRESS, 0x5A, self.intConvertGasTargetTemp(self.tAmbient, targetTemp).to_bytes(1, 'big')) # res_wait_0 register - heater step 0
# Define the heater on time, converting ms to register code (Heater Step 0) - cannot be greater than 4032ms
# Bits <7:6> are a multiplier (1, 4, 16 or 64 times) Bits <5:0> are 1ms steps (0 to 63ms)
codedDuration = 0
if (heatDuration < 4032):
factor = 0
while (heatDuration > 63):
heatDuration = (heatDuration // 4)
factor = factor + 1
codedDuration = heatDuration + (factor * 64)
codedDuration = 255
self.i2c.writeto_mem(self.CHIP_ADDRESS, 0x64, codedDuration.to_bytes(1, 'big')) # gas_wait_0 register - heater step 0
# Select index of heater step (0 to 9): CTRL_GAS_1 reg <3:0> (Make sure to combine with gas enable setting already there)
gasEnable = self.getUInt8(self.CTRL_GAS_1) & 0x20
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_GAS_1, (0x00 | gasEnable).to_bytes(1, 'big')) # Select heater step 0
self.gasInit = True
# Run all measurements on the BME688: Temperature, Pressure, Humidity & Gas Resistance.
def measureData(self):
if (self.bme688InitFlag == False):
self.measTimePrev = self.measTime # Store previous measurement time (ms since micro:bit powered on)
# Set mode to FORCED MODE to begin single read cycle: CTRL_MEAS reg <1:0> (Make sure to combine with temp/pressure oversampling settings already there)
oSampleTP = self.getUInt8(self.CTRL_MEAS)
self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, str((0x01 | oSampleTP)))
# Check New Data bit to see if values have been measured: MEAS_STATUS_0 bit <7>
newData = (self.getUInt8(self.MEAS_STATUS_0) & 0x80) >> 7
while (newData != 1):
newData = (self.getUInt8(self.MEAS_STATUS_0) & 0x80) >> 7
# Check Heater Stability Status bit to see if gas values have been measured: <4> (heater stability)
heaterStable = (self.getUInt8(self.GAS_RES_LSB_0) & 0x10) >> 4
# If there is new data, read temperature ADC registers(this is required for all other calculations)
self.tRaw = (self.getUInt8(self.TEMP_MSB_0) << 12) | (self.getUInt8(self.TEMP_LSB_0) << 4) | (self.getUInt8(self.TEMP_XLSB_0) >> 4)
# Read pressure ADC registers
self.pRaw = (self.getUInt8(self.PRESS_MSB_0) << 12) | (self.getUInt8(self.PRESS_LSB_0) << 4) | (self.getUInt8(self.PRESS_XLSB_0) >> 4)
# Read humidity ADC registers
self.hRaw = (self.getUInt8(self.HUMID_MSB_0) << 8) | (self.getUInt8(self.HUMID_LSB_0) >> 4)
# Read gas resistance ADC registers
self.gResRaw = (self.getUInt8(self.GAS_RES_MSB_0) << 2) | self.getUInt8(self.GAS_RES_LSB_0) >> 6 # Shift bits <7:6> right to get LSB for gas resistance
gasRange = self.getUInt8(self.GAS_RES_LSB_0) & 0x0F
self.measTime = ticks_ms() # Capture latest measurement time (ms since Pico powered on)
# Calculate the compensated reading values from the the raw ADC data
self.intCalcHumidity(self.hRaw, self.tRead)
self.intCalcgRes(self.gResRaw, gasRange)
# A baseline gas resistance is required for the IAQ calculation - it should be taken in a well ventilated area without obvious air pollutants
# Take 60 readings over a ~5min period and find the mean
# Establish the baseline gas resistance reading and the ambient temperature.
# These values are required for air quality calculations
# When the baseline process is complete, values for gBase and tAmbient are stored in a file
# On subsequent power cycles of the board, this function will look for that file and take the baseline values stored there
# To force the baselines process to be run again, call the function like this: calcBaselines(True)
def calcBaselines(self, forcedRun=False):
if (self.bme688InitFlag == False):
if (self.gasInit == False):
self.screen.displayText("Setting Baseline", 2)
try: # Look for a 'baselines.txt' file existing - if it does, take the baseline values from there (unless 'forcedRun' is set to True)
if not forcedRun:
f = open("baselines.txt", "r")
self.gBase = float(f.readline())
self.tAmbient = float(f.readline())
except: # If there is no file, an exception is raised, and the baseline process will be carried out (creating a new file at the end)
self.ambTempFlag = False
burnInReadings = 0
burnInData = 0
ambTotal = 0
progress = 0
while (burnInReadings < 60): # Measure data and continue summing gas resistance until 60 readings have been taken
progress = math.trunc((burnInReadings / 60) * 100)
self.screen.displayText(str(progress) + "%", 4, 50)
self.screen.displayText("Setting Baseline", 2)
burnInData = burnInData + self.gRes
ambTotal = ambTotal + self.newAmbTemp
burnInReadings = burnInReadings + 1
self.gBase = (burnInData / 60) # Find the mean gas resistance during the period to form the baseline
self.tAmbient = (ambTotal / 60) # Calculate the ambient temperature as the mean of the 60 initial readings
# Save baseline values to a file
f = open("baselines.txt", "w") #open in write - creates if not existing, will overwrite if it does
f.write(str(self.gBase) + "\r\n")
f.write(str(self.tAmbient) + "\r\n")
self.ambTempFlag = True
self.screen.displayText("Setup Complete!", 2)
# Read Temperature from sensor as a Number.
# Units for temperature are in °C (Celsius) or °F (Fahrenheit) according to selection.
def readTemperature(self, temperature_unit="C"):
temperature = self.tRead
# Change temperature from °C to °F if user selection requires it
if (temperature_unit == "F"):
temperature = ((temperature * 18) + 320) / 10
return temperature
# Read Pressure from sensor as a Number.
# Units for pressure are in Pa (Pascals) or mBar (millibar) according to selection.
def readPressure(self, pressure_unit="Pa"):
pressure = self.pRead
#Change pressure from Pascals to millibar if user selection requires it
if (pressure_unit == "mBar"):
pressure = pressure / 100
return pressure
# Read Humidity from sensor as a Number.
# Humidity is output as a percentage.
def readHumidity(self):
return self.hRead
# Read Gas Resistance from sensor as a Number.
# Units for gas resistance are in Ohms.
def readGasRes(self):
if (self.gasInit == False):
self.screen.displayText("ERROR", 2)
self.screen.displayText("Setup Gas Sensor", 3)
return 0
return self.gRes
# Read eCO2 from sensor as a Number (250 - 40000+ppm).
# Units for eCO2 are in ppm (parts per million).
def readeCO2(self):
if (self.gasInit == False):
self.screen.displayText("ERROR", 2)
self.screen.displayText("Setup Gas Sensor", 3)
return 0
return self.eCO2Value
# Return the Air Quality rating as a percentage (0% = Bad, 100% = Excellent).
def getAirQualityPercent(self):
if (self.gasInit == False):
self.screen.displayText("ERROR", 2)
self.screen.displayText("Setup Gas Sensor", 3)
return 0
return self.iaqPercent
# Return the Air Quality rating as an IAQ score (500 = Bad, 0 = Excellent).
# These values are based on the BME688 datasheet, Page 11, Table 6.
def getAirQualityScore(self):
if (self.gasInit == False):
self.screen.displayText("ERROR", 2)
self.screen.displayText("Setup Gas Sensor", 3)
return 0
return self.iaqScore
# Calculate the Index of Air Quality score from the current gas resistance and humidity readings
# iaqPercent: 0 to 100% - higher value = better air quality
# iaqScore: 25 should correspond to 'typically good' air, 250 to 'typically polluted' air
# airQualityRating: Text output based on the iaqScore
# Calculate the estimated CO2 value (eCO2)
def calcAirQuality(self):
humidityScore = 0
gasScore = 0
humidityOffset = self.hRead - self.hBase # Calculate the humidity offset from the baseline setting
ambTemp = (self.tAmbient / 100)
temperatureOffset = self.tRead - ambTemp # Calculate the temperature offset from the ambient temperature
humidityRatio = ((humidityOffset / self.hBase) + 1)
temperatureRatio = (temperatureOffset / ambTemp)
# IAQ Calculations
if (humidityOffset > 0): # Different paths for calculating the humidity score depending on whether the offset is greater than 0
humidityScore = (100 - self.hRead) / (100 - self.hBase)
humidityScore = self.hRead / self.hBase
humidityScore = humidityScore * self.hWeight * 100
gasRatio = (self.gRes / self.gBase)
if ((self.gBase - self.gRes) > 0): # Different paths for calculating the gas score depending on whether the offset is greater than 0
gasScore = gasRatio * (100 * (1 - self.hWeight))
# Make sure that when the gas offset and humidityOffset are 0, iaqPercent is 95% - leaves room for cleaner air to be identified
gasScore = math.floor(70 + (5 * (gasRatio - 1)))
if (gasScore > 75):
gasScore = 75
self.iaqPercent = math.trunc(humidityScore + gasScore) # Air quality percentage is the sum of the humidity (25% weighting) and gas (75% weighting) scores
self.iaqScore = (100 - self.iaqPercent) * 5 # Final air quality score is in range 0 - 500 (see BME688 datasheet page 11 for details)
# eCO2 Calculations
self.eCO2Value = 250 * math.pow(math.e, (0.012 * self.iaqScore)) # Exponential curve equation to calculate the eCO2 from an iaqScore input
# Adjust eCO2Value for humidity and/or temperature greater than the baseline values
if (humidityOffset > 0):
if (temperatureOffset > 0):
self.eCO2Value = self.eCO2Value * (humidityRatio + temperatureRatio)
self.eCO2Value = self.eCO2Value * humidityRatio
elif (temperatureOffset > 0):
self.eCO2Value = self.eCO2Value * (temperatureRatio + 1)
# If measurements are taking place rapidly, breath detection is possible due to the sudden increase in humidity (~7-10%)
# If this increase happens within a 5s time window, 1200ppm is added to the eCO2 value
# (These values were based on 'breath-testing' with another eCO2 sensor with algorithms built-in)
if ((self.measTime - self.measTimePrev) <= 5000):
if ((self.hRead - self.hPrev) >= 3):
self.eCO2Value = self.eCO2Value + 1500
self.eCO2Value = math.trunc(self.eCO2Value)
# The KitronikOLED class enables control of the OLED display screen on the board
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class KitronikOLED(framebuf.FrameBuffer):
# Write commands to the OLED controller
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.CHIP_ADDRESS, self.temp)
# Write data to the OLED controller
def write_data(self, buf):
self.write_list[1] = buf
self.i2c.writevto(self.CHIP_ADDRESS, self.write_list)
# Runs on initialisation of the class
# Sets up all the register definitions and global variables
def __init__(self, i2cAddr=0x3C, sda=6, scl=7):
self.CHIP_ADDRESS = i2cAddr # I2C address as determined by hardware configuration
# register definitions
self.SET_CONTRAST = const(0x81)
self.SET_ENTIRE_ON = const(0xA4)
self.SET_NORM_INV = const(0xA6)
self.SET_DISP = const(0xAE)
self.SET_MEM_ADDR = const(0x20)
self.SET_COL_ADDR = const(0x21)
self.SET_PAGE_ADDR = const(0x22)
self.SET_DISP_START_LINE = const(0x40)
self.SET_SEG_REMAP = const(0xA0)
self.SET_MUX_RATIO = const(0xA8)
self.SET_COM_OUT_DIR = const(0xC0)
self.SET_DISP_OFFSET = const(0xD3)
self.SET_COM_PIN_CFG = const(0xDA)
self.SET_DISP_CLK_DIV = const(0xD5)
self.SET_PRECHARGE = const(0xD9)
self.SET_VCOM_DESEL = const(0xDB)
self.SET_CHARGE_PUMP = const(0x8D)
sda = Pin(sda)
scl = Pin(scl)
self.i2c = I2C(1,sda=sda, scl=scl, freq=100000)
self.plotArray = []
self.plotYMin = 0
self.plotYMax = 100
self.yPixelMin = 63
self.yPixelMax = 12
self.temp = bytearray(2)
self.write_list = [b"\x40", None] # Co=0, D/C#=1
self.width = 128
self.height = 64
self.external_vcc = False
self.pages = 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
# Initialise the display settings and start the display clear
def init_display(self):
for cmd in (
self.SET_DISP | 0x00, # off
# address setting
0x00, # horizontal
# resolution and layout
self.SET_DISP_START_LINE | 0x00,
self.SET_SEG_REMAP,# | 0x01, # Set to either 0xA0 or A1, flips screen horizontally
self.height - 1,
self.SET_COM_OUT_DIR, #| 0x08, # Set to either 0xC0 or 0xC8, flips screen vertically
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
0x22 if self.external_vcc else 0xF1,
0x30, # 0.83*Vcc
# display
0xFF, # maximum
self.SET_ENTIRE_ON, # output follows RAM contents
self.SET_NORM_INV, # not inverted
# charge pump
0x10 if self.external_vcc else 0x14,
self.SET_DISP | 0x01,
): # on
# Screen will switch off, but retain the information that was displayed
def poweroff(self):
self.write_cmd(self.SET_DISP | 0x00)
# Turn the screen back on - do not need to re-display what was showing as the information is retained
def poweron(self):
self.write_cmd(self.SET_DISP | 0x01)
# 0 = Dim to 150 = Bright
def contrast(self, contrast):
# 0 = White on black, 1 = Black on white
def invert(self, invert):
self.write_cmd(self.SET_NORM_INV | (invert & 1))
# Set text to display on a particular line (1 - 6) and an x-axis offset can be set (0 - 127, 0 is default)
# If the text is longer than than the screen it will be cut off, it will not be pushed to the next line (16 characters max per line)
# Need to call 'show()' to make the text actually display
def displayText(self, text, line, x_offset=0):
if (line < 1):
line = 1
if (line > 6):
line = 6
y = (line * 11) - 10
super().text(text, x_offset, y)
# Make what has been set to display actually appear on the screen
# Needs to be called after 'displayText()', 'plot()', clear()', 'drawLine()' & 'drawRect()'
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(self.pages - 1)
# Plot a live updating graph of a variable
# Plot y range is pixels 12 down to 63, leaving room for a title or similar on the first line
# Need to call 'show()' to make the plot actually display
def plot(self, variable):
variable = math.trunc(variable)
if (variable > self.plotYMax):
self.plotYMax = variable
if (variable < self.plotYMin):
self.plotYMin = variable
entries = len(self.plotArray)
if (entries >= 128):
prevX = 0
prevY = self.plotArray[127]
prevX = len(self.plotArray) - 1
prevY = self.plotArray[prevX]
for entry in range(entries):
x = entry
y = self.plotArray[entry]
y = math.trunc(self.yPixelMin - (y * ((self.yPixelMin - self.yPixelMax) / (self.plotYMax - self.plotYMin))))
if (x == 0):
super().pixel(x, y, 1)
self.drawLine(prevX, prevY, x, y)
prevX = x
prevY = y
# Wipe all data from the screen
# Need to call 'show()' to make the clear actually happen
def clear(self):
# Clear a specific line on the screen
def clearLine(self, line):
yPixel = (line - 1) + ((line * 10) - 10)
super().fill_rect(0, yPixel, 128, 10, 0)
# Draw a line on the screen (vertical, horizontal or diagonal), setting start and finish (x, y) coordinates
# Need to call 'show()' to make the line actually display
def drawLine(self, start_x, start_y, end_x, end_y):
super().line(start_x, start_y, end_x, end_y, 1)
# Draw rectangles with a top left starting (x, y) coordinate and then a width and height
# Can be filled (True) or just an outline (False)
# Need to call 'show()' to make the rectangle actually display
def drawRect(self, start_x, start_y, width, height, fill=False):
if (fill == False):
super().rect(start_x, start_y, width, height, 1)
elif (fill == True):
super().fill_rect(start_x, start_y, width, height, 1)