diff --git a/PicoAirQuality.py b/PicoAirQuality.py new file mode 100644 index 0000000..8f889df --- /dev/null +++ b/PicoAirQuality.py @@ -0,0 +1,1225 @@ +import math +import framebuf +import array +import os +from machine import Pin, PWM, ADC, time_pulse_us, I2C, RTC +from rp2 import PIO, StateMachine, asm_pio +from time import sleep, sleep_ms, sleep_us, ticks_ms, ticks_us +from micropython import const + +# Initialise the module with all outputs off +# High Power Output Pins +hp_3 = Pin(3, Pin.OUT) +hp_15 = Pin(15, Pin.OUT) +hp_3.value(0) +hp_15.value(0) +# Servo +servo = Pin(2, Pin.OUT) +servo.value(0) +# Buzzer +buzzer = PWM(Pin(4)) +buzzer.duty_u16(0) + +# The KitronikButton class enable the use of the 2 user input buttons on the board +class KitronikButton: + def __init__(self): + self.buttonA = Pin(12, Pin.IN, Pin.PULL_DOWN) + self.buttonB = Pin(13, Pin.IN, Pin.PULL_DOWN) + +# The KitronikOutputControl class enables control of the servo and high-power outputs on the board +class KitronikOutputControl: + # This code drives a pwm on the PIO. It is running at 2Mhz, which gives the PWM a 1uS resolution. + @asm_pio(sideset_init=PIO.OUT_LOW) + def _servo_pwm(): + # First we clear the pin to zero, then load the registers. Y is always 20000 - 20uS, x is the pulse 'on' length. + pull(noblock) .side(0) + mov(x, osr) # Keep most recent pull data stashed in X, for recycling by noblock + mov(y, isr) # ISR must be preloaded with PWM count max + # This is where the looping work is done. the overall loop rate is 1Mhz (clock is 2Mhz - we have 2 instructions to do) + label("loop") + jmp(x_not_y, "skip") #if there is 'excess' Y number leave the pin alone and jump to the 'skip' label until we get to the X value + nop() .side(1) + label("skip") + jmp(y_dec, "loop") #count down y by 1 and jump to pwmloop. When y is 0 we will go back to the 'pull' command + + def __init__(self): + # High Power Output Pins + self.highPwr_3 = Pin(3, Pin.OUT) + self.highPwr_15 = Pin(15, Pin.OUT) + + # Servo Control + self.servo = [] + # Servo 0 degrees -> pulse of 0.5ms, 180 degrees 2.5ms + # Pulse train freq 50hz - 20ms + # 1us is freq of 1000000 + # Servo pulses range from 500 to 2500us and overall pulse train is 20000us repeat. + # Servo is on GP2 + self.maxServoPulse = 2500 + self.minServoPulse = 500 + self.pulseTrain = 20000 + self.degreesToUS = 2000/180 + # Create and start the servo statemachine + self.servo.append (StateMachine(1, self._servo_pwm, freq=2000000, sideset_base=Pin(2))) + self.servo[0].put(self.pulseTrain) + self.servo[0].exec("pull()") + self.servo[0].exec("mov(isr, osr)") + self.registerServo() + + # Doesn't actually register/unregister, just stops and starts the servo PIO + # The servo is registered by default in the __init__() function - these are only required if you want to use Pin 2 for something else, and then register the servo again + def registerServo(self): + if(not self.servo[0].active()): + self.servo[0].active(1) + + def deregisterServo(self): + if(self.servo[0].active()): + self.servo[0].active(0) + + # goToPosition takes a degree position for the servo to go to. + # 0degrees->180 degrees is 0->2000us, plus offset of 500uS + # 1 degree ~ 11uS. + # This function does the sum then calls goToPeriod to actually poke the PIO + def servoToPosition(self, degrees): + pulseLength = int(degrees*self.degreesToUS + 500) + self.servoToPeriod(pulseLength) + + def servoToPeriod(self, period): + if(period < 500): + period = 500 + if(period >2500): + period =2500 + self.servo[0].put(period) + + # Functions to turn on/off the high power outputs + # Enter the pin number, either '3' or '15' + def highPowerOn(self, pin): + if (pin == 3): + self.highPwr_3.value(1) + elif (pin == 15): + self.highPwr_15.value(1) + + def highPowerOff(self, pin): + if (pin == 3): + self.highPwr_3.value(0) + elif (pin == 15): + self.highPwr_15.value(0) + +# The KitronikDataLogger class enables data logging through the Pico file system +# It is possible to create multiple data logger instances to then log to multiple files simulataneously +class KitronikDataLogger: + # Function is called when the class is initialised - sets the maximum permissable filesize, the data separator and creates the log file with the entered filename + # Separator options: ("comma", "semicolon", "tab") + def __init__(self, file = "data_log.txt", separator = "semicolon"): + self.MAX_FILE_SIZE = 500000 # This is approximately 10000 full entries + if (separator == "comma"): + self.SEPARATOR = "," + elif (separator == "semicolon"): + self.SEPARATOR = ";" + elif (separator == "tab"): + self.SEPARATOR = "\t" + self.FILENAME = file + try: + f = open(self.FILENAME, "x") + f.close() + except OSError: + print("File already exists") + self.line1 = "" + self.line2 = "" + self.line3 = "" + self.dataHeadings = "" + self.projectInfo = False + self.headings = False + + # Write a header section to the specified file (there are 3 free text fields, each will write on a separate line) + def writeProjectInfo(self, line1="", line2="", line3=""): + if (line1 != ""): + self.writeFile(self.FILENAME, line1 + "\r\n") + self.line1 = line1 + if (line2 != ""): + self.writeFile(self.FILENAME, line2 + "\r\n") + self.line2 = line2 + if (line3 != ""): + self.writeFile(self.FILENAME, line3 + "\r\n") + self.line3 = line3 + self.projectInfo = True + + # This writes whatever is passed to it to the file + def writeFile(self, file, passed): + f = open(file, "a") #open in append - creates if not existing, will append if it exists + f.write(passed) + f.close() + + # Input and write to the file up to 10 data field headings + def nameColumnHeadings(self, field1="", field2="", field3="", field4="", field5="", field6="", field7="", field8="", field9="", field10=""): + dataHeadings = "" + if (field1 != ""): + dataHeadings = field1 + self.SEPARATOR + if (field2 != ""): + dataHeadings = dataHeadings + field2 + self.SEPARATOR + if (field3 != ""): + dataHeadings = dataHeadings + field3 + self.SEPARATOR + if (field4 != ""): + dataHeadings = dataHeadings + field4 + self.SEPARATOR + if (field5 != ""): + dataHeadings = dataHeadings + field5 + self.SEPARATOR + if (field6 != ""): + dataHeadings = dataHeadings + field6 + self.SEPARATOR + if (field7 != ""): + dataHeadings = dataHeadings + field7 + self.SEPARATOR + if (field8 != ""): + dataHeadings = dataHeadings + field8 + self.SEPARATOR + if (field9 != ""): + dataHeadings = dataHeadings + field9 + self.SEPARATOR + if (field10 != ""): + dataHeadings = dataHeadings + field10 + self.SEPARATOR + + self.dataHeadings = dataHeadings + self.writeFile(self.FILENAME, dataHeadings + "\r\n") + self.headings = True + + # Store up to 10 data entries (match the order with the data headings used) + def storeDataEntry(self, field1="", field2="", field3="", field4="", field5="", field6="", field7="", field8="", field9="", field10=""): + dataEntry = "" + if (field1 != ""): + dataEntry = field1 + self.SEPARATOR + if (field2 != ""): + dataEntry = dataEntry + field2 + self.SEPARATOR + if (field3 != ""): + dataEntry = dataEntry + field3 + self.SEPARATOR + if (field4 != ""): + dataEntry = dataEntry + field4 + self.SEPARATOR + if (field5 != ""): + dataEntry = dataEntry + field5 + self.SEPARATOR + if (field6 != ""): + dataEntry = dataEntry + field6 + self.SEPARATOR + if (field7 != ""): + dataEntry = dataEntry + field7 + self.SEPARATOR + if (field8 != ""): + dataEntry = dataEntry + field8 + self.SEPARATOR + if (field9 != ""): + dataEntry = dataEntry + field9 + self.SEPARATOR + if (field10 != ""): + dataEntry = dataEntry + field10 + self.SEPARATOR + + while (self.checkFileSize() > self.MAX_FILE_SIZE): + self.removeOneLine() + + self.writeFile(self.FILENAME, dataEntry + "\r\n") + + # This returns the size of the file, or 0 if the file does not exist + def checkFileSize(self): + # f is a file-like object. + try: + f = open(self.FILENAME, "r") # Open read - this throws an error if file does not exist - in that case the size is 0 + f.seek(0, 2) + size = f.tell() + f.close() + return size + except: + # if we wanted to know we could print some diagnostics here like: + #print("Exception - File does not exist") + return 0 + + # Remove a line from the data file to make space for more data + def removeOneLine(self): + tempName = self.FILENAME + ".bak" + readFrom = open(self.FILENAME, "r") + writeTo = open(tempName, "w") + if self.projectInfo: # If there is Project Info, skip over these lines and then write them to the temporary file + for l in range(3): + readFrom.readline() + writeTo.write(self.line1 + "\r\n") + writeTo.write(self.line2 + "\r\n") + writeTo.write(self.line3 + "\r\n") + if self.headings: + readFrom.readline() # If there are Headings, skip over this line and then write them to the temporary file + writeTo.write(self.dataHeadings + "\r\n") + readFrom.readline() # Read and throw away the first line of data in the file + + for lines in readFrom: # Write the remaining lines to the temporary file + writeTo.write(lines) + readFrom.close() # Close both files + writeTo.close() + os.remove(self.FILENAME) # Delete original log file + os.rename(tempName, self.FILENAME) # Rename temporary file as new log file (now with the first line of data removed) + + # Deletes all the contents of the file + def eraseAllData(self): + f = open(self.FILENAME, "w") + f.write("") + f.close() + + # Deletes the file from the Pico file system + def deleteDataFile(self): + os.remove(self.FILENAME) + +# The KitronikBuzzer class enables control of the piezo buzzer on the board +class KitronikBuzzer: + # Function is called when the class is initialised and sets the buzzer pin to GP4 + def __init__(self): + self.buzzer = PWM(Pin(4)) + self.dutyCycle = 32767 + + # Play a continous tone at a specified frequency + def playTone(self, freq): + if (freq < 30): + freq = 30 + if (freq > 3000): + freq = 3000 + self.buzzer.freq(freq) + self.buzzer.duty_u16(self.dutyCycle) + + # Play a tone at a speciied frequency for a specified length of time in ms + def playTone_Length(self, freq, length): + self.playTone(freq) + sleep_ms(length) + self.stopTone() + + # Stop the buzzer producing a tone + def stopTone(self): + self.buzzer.duty_u16(0) + +# The KitronikZIPLEDs class enables control of the ZIP LEDs both on the board and any connected externally +class KitronikZIPLEDs: + # We drive the ZIP LEDs using one of the PIO statemachines. + @asm_pio(sideset_init=PIO.OUT_LOW, out_shiftdir=PIO.SHIFT_LEFT, autopull=True, pull_thresh=24) + def _ZIPLEDOutput(): + T1 = 2 + T2 = 5 + T3 = 3 + wrap_target() + label("bitloop") + out(x, 1) .side(0) [T3 - 1] + jmp(not_x, "do_zero") .side(1) [T1 - 1] + jmp("bitloop") .side(1) [T2 - 1] + label("do_zero") + nop() .side(0) [T2 - 1] + wrap() + + def __init__(self, num_zip_leds): + self.num_zip_leds = num_zip_leds + # Create and start the StateMachine for the ZIPLeds + self.ZIPLEDs = StateMachine(4, self._ZIPLEDOutput, freq=8_000_000, sideset_base=Pin(20)) + self.theLEDs = array.array("I", [0 for _ in range(self.num_zip_leds)]) #an array for the LED colours. + self.brightness = 0.5 #20% initially + self.ZIPLEDs.active(1) + + # Define some colour tuples for people to use. + self.BLACK = (0, 0, 0) + self.RED = (255, 0, 0) + self.YELLOW = (255, 150, 0) + self.GREEN = (0, 255, 0) + self.CYAN = (0, 255, 255) + self.BLUE = (0, 0, 255) + self.PURPLE = (180, 0, 255) + self.WHITE = (255, 255, 255) + self.COLOURS = (self.BLACK, self.RED, self.YELLOW, self.GREEN, self.CYAN, self.BLUE, self.PURPLE, self.WHITE) + + # Show pushes the current setup of the LEDS to the physical LEDS - it makes them visible. + def show(self): + brightAdjustedLEDs = array.array("I", [0 for _ in range(self.num_zip_leds)]) + for i,c in enumerate(self.theLEDs): + r = int(((c >> 8) & 0xFF) * self.brightness) + g = int(((c >> 16) & 0xFF) * self.brightness) + b = int((c & 0xFF) * self.brightness) + brightAdjustedLEDs[i] = (g<<16) + (r<<8) + b + self.ZIPLEDs.put(brightAdjustedLEDs, 8) + + # Turn the LED off by setting the colour to black + def clear(self, whichLED): + self.setLED(whichLED, self.BLACK) + + # Sets the colour of an individual LED. Use show to make change visible + def setLED(self, whichLED, whichColour): + if(whichLED<0): + raise Exception("INVALID LED:",whichLED," specified") + elif(whichLED>(self.num_zip_leds - 1)): + raise Exception("INVALID LED:",whichLED," specified") + else: + self.theLEDs[whichLED] = (whichColour[1]<<16) + (whichColour[0]<<8) + whichColour[2] + + # Gets the stored colour of an individual LED, which isnt nessecerily the colour on show if it has been changed, but not 'show'n + def getLED(self, whichLED): + if(whichLED<0): + raise Exception("INVALID LED:",whichLED," specified") + elif(whichLED>(self.num_zip_leds - 1)): + raise Exception("INVALID LED:",whichLED," specified") + else: + return(((self.theLEDs[whichLED]>>8) & 0xff), ((self.theLEDs[whichLED]>>16)& 0xff) ,((self.theLEDs[whichLED])& 0xff)) + + # Takes 0-100 as a brightness value, brighness is applies in the'show' function + def setBrightness(self, value): + #cap to 0-100% + if (value<0): + value = 0 + elif (value>100): + value=100 + self.brightness = value / 100 + +# The KitronikRTC class enables use of the Pico RTC +class KitronikRTC: + # Function is called when the class is initialised and creates an instance of the Pico RTC and defines all the global variables + def __init__(self): + self.rtc = RTC() + self.day = 0 + self.month = 0 + self.year = 0 + self.weekday = 0 # In range 0 - 6, 0 = Monday, 6 = Sunday + self.hour = 0 + self.minute = 0 + self.second = 0 + self.alarmHour = 0 + self.alarmMinute = 0 + self.alarmSet = False + self.alarmTrigger = False + self.alarmRepeat = False + self.hourPeriod = 0 + self.minutePeriod = 0 + + # Function calculates the weekday (0 = Monday, 6 = Sunday) based on the date, taking into account leap years + def calcWeekday(self, day, month, year): + dayOffset = [0, 3, 2, 5, 0, 3, 5, 1, 4, 6, 2, 4] + if (month < 3): + month = month - 1 + + self.weekday = (year + (year // 4) - (year // 100) + (year // 400) + dayOffset[month - 1] + day) % 7 # Returns 0 - 6, 0 = Sunday, 6 = Saturday + if (self.weekday == 0): + self.weekday = 6 + else: + self.weekday = self.weekday - 1 + + # Set the date on the Pico RTC + def setDate(self, day, month, year): + self.getDateTime() + self.calcWeekday(day, month, year) + self.rtc.datetime((year, month, day, self.weekday, self.hour, self.minute, self.second, 0)) + + # Set the time on the Pico RTC + def setTime(self, hours, minutes, seconds): + self.getDateTime() + self.rtc.datetime((self.year, self.month, self.day, self.weekday, hours, minutes, seconds, 0)) + + # Return the current date and time + def getDateTime(self): + newDateTime = self.rtc.datetime() + self.day = newDateTime[2] + self.month = newDateTime[1] + self.year = newDateTime[0] + self.weekday = newDateTime[3] + self.hour = newDateTime[4] + self.minute = newDateTime[5] + self.second = newDateTime[6] + + # Return the current date as a string + def readDateString(self): + self.getDateTime() + if (self.day < 10): + day = "0" + str(self.day) + else: + day = str(self.day) + + if (self.month < 10): + month = "0" + str(self.month) + else: + month = str(self.month) + + date = day + "/" + month + "/" + str(self.year) + return date + + # Return the current time as a string + def readTimeString(self): + self.getDateTime() + if (self.hour < 10): + hour = "0" + str(self.hour) + else: + hour = str(self.hour) + + if (self.minute < 10): + minute = "0" + str(self.minute) + else: + minute = str(self.minute) + + if (self.second < 10): + second = "0" + str(self.second) + else: + second = str(self.second) + + time = hour + ":" + minute + ":" + second + return time + + # Return a specific date/time parameter as a number + # d = day, m = month, y = year, h = hour, min = minute, s = second + def readParameter(self, parameter): + self.getDateTime() + if (parameter == "d"): + return self.day + elif (parameter == "m"): + return self.month + elif (parameter == "y"): + return self.year + elif (parameter == "h"): + return self.hour + elif (parameter == "min"): + return self.minute + elif (parameter == "s"): + return self.second + + # Set an alarm for a specific hour and minute + # Extra options to set a periodically repeating alarm (set alarmRepeat to True and then specifiy the hour and/or minute period between alarms) + def setAlarm(self, hour, minute, alarmRepeat=False, hourPeriod=0, minutePeriod=0): + self.alarmHour = math.ceil(hour) + self.alarmMinute = math.ceil(minute) + self.alarmRepeat = alarmRepeat + + if alarmRepeat: + self.hourPeriod = math.ceil(hourPeriod) + self.minutePeriod = math.ceil(minutePeriod) + self.alarmSet = True + + # Check whether the alarm conditions have been met and then trigger the alarm + def checkAlarm(self): + if self.alarmSet: + if (self.readParameter("h") == self.alarmHour): + if (self.readParameter("min") == self.alarmMinute): + self.alarmTrigger = True + return self.alarmTrigger + + # Sets 'alarmTrigger' back to False, checks whether the alarm should repeat (sets new one if it should) or sets 'alarmSet' back to False + def silenceAlarm(self): + self.alarmTrigger = False + if not self.alarmRepeat: + self.alarmSet = False + else: + newHour = self.alarmHour + self.hourPeriod + newMinute = self.alarmMinute + self.minutePeriod + if (newMinute > 59): + newMinute = newMinute - 60 + newHour = newHour + 1 + if (newHour > 23): + newHour = newHour - 24 + self.setAlarm(newHour, newMinute, True, self.hourPeriod, self.minutePeriod) + +# The KitronikBME688 class enables contro and use of the BME688 sensor on the board +class KitronikBME688: + # The following functions are for reading the registers on the BME688 + # Function for reading register as signed 8 bit integer + def getUInt8(self, reg): + return int.from_bytes(self.i2c.readfrom_mem(self.CHIP_ADDRESS, reg, 1), "big") + + # Function to convert unsigned ints to twos complement signed ints + def twosComp(self, value, bits): + if ((value & (1 << (bits - 1))) != 0): + value = value - (1 << bits) + return value + + # Function for proportionally mapping a value to a different value range + def mapValues(self, value, frMin, frMax, toMin, toMax): + toRange = toMax - toMin + mappedVal = toMin + ((value - frMin) * ((toMax - toMin) / (frMax - frMin))) + return mappedVal + + def __init__(self, i2cAddr=0x77, sda=6, scl=7): + self.CHIP_ADDRESS = i2cAddr # I2C address as determined by hardware configuration + sda = Pin(sda) + scl = Pin(scl) + self.i2c = I2C(1,sda=sda, scl=scl, freq=100000) + + # Useful BME688 Register Addresses + # Control + self.CTRL_MEAS = 0x74 # Bit position <7:5>: Temperature oversampling Bit position <4:2>: Pressure oversampling Bit position <1:0>: Sensor power mode + self.RESET = 0xE0 # Write 0xB6 to initiate soft-reset (same effect as power-on reset) + self.CHIP_ID = 0xD0 # Read this to return the chip ID: 0x61 - good way to check communication is occurring + self.CTRL_HUM = 0x72 # Bit position <2:0>: Humidity oversampling settings + self.CONFIG = 0x75 # Bit position <4:2>: IIR filter settings + self.CTRL_GAS_0 = 0x70 # Bit position <3>: Heater off (set to '1' to turn off current injection) + self.CTRL_GAS_1 = 0x71 # Bit position <5> DATASHEET ERROR: Enable gas conversions to start when set to '1' Bit position <3:0>: Heater step selection (0 to 9) + + # Pressure Data + self.PRESS_MSB_0 = 0x1F # Forced & Parallel: MSB [19:12] + self.PRESS_LSB_0 = 0x20 # Forced & Parallel: LSB [11:4] + self.PRESS_XLSB_0 = 0x21 # Forced & Parallel: XLSB [3:0] + + # Temperature Data + self.TEMP_MSB_0 = 0x22 # Forced & Parallel: MSB [19:12] + self.TEMP_LSB_0 = 0x23 # Forced & Parallel: LSB [11:4] + self.TEMP_XLSB_0 = 0x24 # Forced & Parallel: XLSB [3:0] + + # Humidity Data + self.HUMID_MSB_0 = 0x25 # Forced & Parallel: MSB [15:8] + self.HUMID_LSB_0 = 0x26 # Forced & Parallel: LSB [7:0] + + # Gas Resistance Data + self.GAS_RES_MSB_0 = 0x2C # Forced & Parallel: MSB [9:2] + self.GAS_RES_LSB_0 = 0x2D # Forced & Parallel: Bit <7:6>: LSB [1:0] Bit <5>: Gas valid Bit <4>: Heater stability Bit <3:0>: Gas resistance range + + # Status + self.MEAS_STATUS_0 = 0x1D # Forced & Parallel: Bit <7>: New data Bit <6>: Gas measuring Bit <5>: Measuring Bit <3:0>: Gas measurement index + + # Calibration parameters for compensation calculations + # Temperature + self.PAR_T1 = self.twosComp((self.getUInt8(0xEA) << 8) | self.getUInt8(0xE9), 16) # Signed 16-bit + self.PAR_T2 = self.twosComp((self.getUInt8(0x8B) << 8) | self.getUInt8(0x8A), 16) # Signed 16-bit + self.PAR_T3 = self.twosComp(self.getUInt8(0x8C), 8) # Signed 8-bit + + # Pressure + self.PAR_P1 = (self.getUInt8(0x8F) << 8) | self.getUInt8(0x8E) # Always a positive number, do not do twosComp() conversion! + self.PAR_P2 = self.twosComp((self.getUInt8(0x91) << 8) | self.getUInt8(0x90), 16) # Signed 16-bit + self.PAR_P3 = self.twosComp(self.getUInt8(0x92), 8) # Signed 8-bit + self.PAR_P4 = self.twosComp((self.getUInt8(0x95) << 8) | self.getUInt8(0x94), 16) # Signed 16-bit + self.PAR_P5 = self.twosComp((self.getUInt8(0x97) << 8) | self.getUInt8(0x96), 16) # Signed 16-bit + self.PAR_P6 = self.twosComp(self.getUInt8(0x99), 8) # Signed 8-bit + self.PAR_P7 = self.twosComp(self.getUInt8(0x98), 8) # Signed 8-bit + self.PAR_P8 = self.twosComp((self.getUInt8(0x9D) << 8) | self.getUInt8(0x9C), 16) # Signed 16-bit + self.PAR_P9 = self.twosComp((self.getUInt8(0x9F) << 8) | self.getUInt8(0x9E), 16) # Signed 16-bit + self.PAR_P10 = self.twosComp(self.getUInt8(0xA0), 8) # Signed 8-bit + + # Humidity + parH1_LSB_parH2_LSB = self.getUInt8(0xE2) + self.PAR_H1 = (self.getUInt8(0xE3) << 4) | (parH1_LSB_parH2_LSB & 0x0F) + self.PAR_H2 = (self.getUInt8(0xE1) << 4) | (parH1_LSB_parH2_LSB >> 4) + self.PAR_H3 = self.twosComp(self.getUInt8(0xE4), 8) # Signed 8-bit + self.PAR_H4 = self.twosComp(self.getUInt8(0xE5), 8) # Signed 8-bit + self.PAR_H5 = self.twosComp(self.getUInt8(0xE6), 8) # Signed 8-bit + self.PAR_H6 = self.twosComp(self.getUInt8(0xE7), 8) # Signed 8-bit + self.PAR_H7 = self.twosComp(self.getUInt8(0xE8), 8) # Signed 8-bit + + # Gas resistance + self.PAR_G1 = self.twosComp(self.getUInt8(0xED), 8) # Signed 8-bit + self.PAR_G2 = self.twosComp((self.getUInt8(0xEB) << 8) | self.getUInt8(0xEC), 16) # Signed 16-bit + self.PAR_G3 = self.getUInt8(0xEE) # Unsigned 8-bit + self.RES_HEAT_RANGE = (self.getUInt8(0x02) >> 4) & 0x03 + self.RES_HEAT_VAL = self.twosComp(self.getUInt8(0x00), 8) # Signed 8-bit + + # Oversampling rate constants + self.OSRS_1X = 0x01 + self.OSRS_2X = 0x02 + self.OSRS_4X = 0x03 + self.OSRS_8X = 0x04 + self.OSRS_16X = 0x05 + + # IIR filter coefficient values + self.IIR_0 = 0x00 + self.IIR_1 = 0x01 + self.IIR_3 = 0x02 + self.IIR_7 = 0x03 + self.IIR_15 = 0x04 + self.IIR_31 = 0x05 + self.IIR_63 = 0x06 + self.IIR_127 = 0x07 + + #Global variables used for storing one copy of value, these are used in multiple locations for calculations + self.bme688InitFlag = False + self.gasInit = False + + self.tRead = 0 # calculated readings of sensor parameters from raw adc readings + self.pRead = 0 + self.hRead = 0 + self.gRes = 0 + self.iaqPercent = 0 + self.iaqScore = 0 + self.airQualityRating = "" + self.eCO2Value = 0 + + self.gBase = 0 + self.hBase = 40 # Between 30% & 50% is a widely recognised optimal indoor humidity, 40% is a good middle ground + self.hWeight = 0.25 # Humidity contributes 25% to the IAQ score, gas resistance is 75% + self.hPrev = 0 + self.measTime = 0 + self.measTimePrev = 0 + + self.tRaw = 0 # adc reading of raw temperature + self.pRaw = 0 # adc reading of raw pressure + self.hRaw = 0 # adc reading of raw humidity + self.gResRaw = 0 # adc reading of raw gas resistance + self.gasRange = 0 + + self.t_fine = 0 # Intermediate temperature value used for pressure calculation + self.newAmbTemp = 0 + self.tAmbient = 0 # Intermediate temperature value used for heater calculation + self.ambTempFlag = False + + # Create an instance of the OLED display screen for use during setup and for error messages + self.screen = KitronikOLED() + + # Begin the hardware inititialisation for the BME688 sensor + self.bme688Init() + + # Temperature compensation calculation: rawADC to degrees C (integer) + def calcTemperature(self, tempADC): + var1 = (tempADC >> 3) - (self.PAR_T1 << 1) + var2 = (var1 * self.PAR_T2) >> 11 + var3 = ((((var1 >> 1) * (var1 >> 1)) >> 12) * (self.PAR_T3 << 4)) >> 14 + self.t_fine = var2 + var3 + self.newAmbTemp = ((self.t_fine * 5) + 128) >> 8 + self.tRead = self.newAmbTemp / 100 # Convert to floating point with 2 dp + if (self.ambTempFlag == False): + self.tAmbient = self.newAmbTemp + + # Pressure compensation calculation: rawADC to Pascals (integer) + def intCalcPressure(self, pressureADC): + var1 = (self.t_fine >> 1) - 64000 + var2 = ((((var1 >> 2) * (var1 >> 2)) >> 11) * self.PAR_P6) >> 2 + var2 = var2 + ((var1 * self.PAR_P5) << 1) + var2 = (var2 >> 2) + (self.PAR_P4 << 16) + var1 = (((((var1 >> 2) * (var1 >> 2)) >> 13) * (self.PAR_P3 << 5)) >> 3) + ((self.PAR_P2 * var1) >> 1) + var1 = var1 >> 18 + var1 = ((32768 + var1) * self.PAR_P1) >> 15 + self.pRead = 1048576 - pressureADC + self.pRead = ((self.pRead - (var2 >> 12)) * 3125) + + if (self.pRead >= (1 << 30)): + self.pRead = (self.pRead // var1) << 1 + else: + self.pRead = ((self.pRead << 1) // var1) + + var1 = (self.PAR_P9 * (((self.pRead >> 3) * (self.pRead >> 3)) >> 13)) >> 12 + var2 = ((self.pRead >> 2) * self.PAR_P8) >> 13 + var3 = ((self.pRead >> 8) * (self.pRead >> 8) * (self.pRead >> 8) * self.PAR_P10) >> 17 + self.pRead = self.pRead + ((var1 + var2 + var3 + (self.PAR_P7 << 7)) >> 4) + + # Humidity compensation calculation: rawADC to % (integer) + # 'tempScaled' is the current reading from the Temperature sensor + def intCalcHumidity(self, humidADC, tempScaled): + self.hPrev = self.hRead + tempScaled = math.trunc(tempScaled) + + var1 = humidADC - (self.PAR_H1 << 4) - (((tempScaled * self.PAR_H3) // 100) >> 1) + var2 = (self.PAR_H2 * (((tempScaled * self.PAR_H4) // 100) + (((tempScaled * ((tempScaled * self.PAR_H5) // 100)) >> 6) // 100) + (1 << 14))) >> 10 + var3 = var1 * var2 + var4 = ((self.PAR_H6 << 7) + ((tempScaled * self.PAR_H7) // 100)) >> 4 + var5 = ((var3 >> 14) * (var3 >> 14)) >> 10 + var6 = (var4 * var5) >> 1 + self.hRead = (var3 + var6) >> 12 + self.hRead = (((var3 + var6) >> 10) * (1000)) >> 12 + self.hRead = self.hRead // 1000 + + # Gas sensor heater target temperature to target resistance calculation + # 'ambientTemp' is reading from Temperature sensor in degC (could be averaged over a day when there is enough data?) + # 'targetTemp' is the desired temperature of the hot plate in degC (in range 200 to 400) + # Note: Heating duration also needs to be specified for each heating step in 'gas_wait' registers + def intConvertGasTargetTemp(self, ambientTemp, targetTemp): + var1 = ((ambientTemp * self.PAR_G3) // 1000) << 8 # Divide by 1000 as we have ambientTemp in pre-degC format (i.e. 2500 rather than 25.00 degC) + var2 = (self.PAR_G1 + 784) * (((((self.PAR_G2 + 154009) * targetTemp * 5) // 100) + 3276800) // 10) + var3 = var1 + (var2 >> 1) + var4 = (var3 // (self.RES_HEAT_RANGE + 4)) + var5 = (131 * self.RES_HEAT_VAL) + 65536 # Target heater resistance in Ohms + resHeatX100 = (((var4 // var5) - 250) * 34) + resHeat = ((resHeatX100 + 50) // 100) + + return resHeat + + # Gas resistance compensation calculation: rawADC & range to Ohms (integer) + def intCalcgRes(self, gasADC, gasRange): + var1 = 262144 >> gasRange + var2 = gasADC - 512 + var2 = var2 * 3 + var2 = 4096 + var2 + calcGasRes = ((10000 * var1) // var2) + self.gRes = calcGasRes * 100 + + # Initialise the BME688, establishing communication, entering initial T, P & H oversampling rates, setup filter and do a first data reading (won't return gas) + def bme688Init(self): + # Establish communication with BME688 + chipID = self.i2c.readfrom_mem(self.CHIP_ADDRESS, self.CHIP_ID, 1) + chipID = int.from_bytes(chipID, "big") + while (chipID != 97): + chipID = self.i2c.readfrom_mem(self.CHIP_ADDRESS, self.CHIP_ID, 1) + # Do a soft reset + self.i2c.writeto_mem(self.CHIP_ADDRESS, self.RESET, "\xB6") + sleep_ms(1000) + # Set mode to SLEEP MODE: CTRL_MEAS reg <1:0> + self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, "\x00") + # Set the oversampling rates for Temperature, Pressure and Humidity + # Humidity: CTRL_HUM bits <2:0> + self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_HUM, str(self.OSRS_2X)) + # Temperature: CTRL_MEAS bits <7:5> Pressure: CTRL_MEAS bits <4:2> (Combine and write both in one command) + self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, str(((self.OSRS_2X << 5) | (self.OSRS_16X << 2)))) + + # IIR Filter: CONFIG bits <4:2> + self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CONFIG, str(self.IIR_3 << 2)) + + # Enable gas conversion: CTRL_GAS_1 bit <5> (although datasheet says <4> - not sure what's going on here...) + self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_GAS_1, "\x20") + self.bme688InitFlag = True + + # Do an initial data read (will only return temperature, pressure and humidity as no gas sensor parameters have been set) + self.measureData() + + # Setup the gas sensor (defaults are 300°C and 180ms). + # targetTemp is the target temperature for the gas sensor plate to reach (200 - 400°C), eg: 300 + # heatDuration is the length of time for the heater to be turned on (0 - 4032ms), eg: 180 + # WARNING: The temperature and duration values can be changed but this is not recommended unless the user is familiar with gas sensor setup + # The default values have been chosen as they provide a good all-round sensor response for air quality purposes + def setupGasSensor(self, targetTemp=300, heatDuration=180): + if (self.bme688InitFlag == False): + self.bme688Init() + + # Limit targetTemp between 200°C & 400°C + if (targetTemp < 200): + targetTemp = 200 + elif (targetTemp > 400): + targetTemp = 400 + + # Limit heatDuration between 0ms and 4032ms + if (heatDuration < 0): + heatDuration = 0 + elif (heatDuration > 4032): + heatDuration = 4032 + + # Define the target heater resistance from temperature + self.i2c.writeto_mem(self.CHIP_ADDRESS, 0x5A, self.intConvertGasTargetTemp(self.tAmbient, targetTemp).to_bytes(1, 'big')) # res_wait_0 register - heater step 0 + + # Define the heater on time, converting ms to register code (Heater Step 0) - cannot be greater than 4032ms + # Bits <7:6> are a multiplier (1, 4, 16 or 64 times) Bits <5:0> are 1ms steps (0 to 63ms) + codedDuration = 0 + if (heatDuration < 4032): + factor = 0 + while (heatDuration > 63): + heatDuration = (heatDuration // 4) + factor = factor + 1 + + codedDuration = heatDuration + (factor * 64) + else: + codedDuration = 255 + + self.i2c.writeto_mem(self.CHIP_ADDRESS, 0x64, codedDuration.to_bytes(1, 'big')) # gas_wait_0 register - heater step 0 + + # Select index of heater step (0 to 9): CTRL_GAS_1 reg <3:0> (Make sure to combine with gas enable setting already there) + gasEnable = self.getUInt8(self.CTRL_GAS_1) & 0x20 + self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_GAS_1, (0x00 | gasEnable).to_bytes(1, 'big')) # Select heater step 0 + + self.gasInit = True + + # Run all measurements on the BME688: Temperature, Pressure, Humidity & Gas Resistance. + def measureData(self): + if (self.bme688InitFlag == False): + self.bme688Init() + + self.measTimePrev = self.measTime # Store previous measurement time (ms since micro:bit powered on) + + # Set mode to FORCED MODE to begin single read cycle: CTRL_MEAS reg <1:0> (Make sure to combine with temp/pressure oversampling settings already there) + oSampleTP = self.getUInt8(self.CTRL_MEAS) + self.i2c.writeto_mem(self.CHIP_ADDRESS, self.CTRL_MEAS, str((0x01 | oSampleTP))) + + # Check New Data bit to see if values have been measured: MEAS_STATUS_0 bit <7> + newData = (self.getUInt8(self.MEAS_STATUS_0) & 0x80) >> 7 + while (newData != 1): + newData = (self.getUInt8(self.MEAS_STATUS_0) & 0x80) >> 7 + + # Check Heater Stability Status bit to see if gas values have been measured: <4> (heater stability) + heaterStable = (self.getUInt8(self.GAS_RES_LSB_0) & 0x10) >> 4 + + # If there is new data, read temperature ADC registers(this is required for all other calculations) + self.tRaw = (self.getUInt8(self.TEMP_MSB_0) << 12) | (self.getUInt8(self.TEMP_LSB_0) << 4) | (self.getUInt8(self.TEMP_XLSB_0) >> 4) + + # Read pressure ADC registers + self.pRaw = (self.getUInt8(self.PRESS_MSB_0) << 12) | (self.getUInt8(self.PRESS_LSB_0) << 4) | (self.getUInt8(self.PRESS_XLSB_0) >> 4) + + # Read humidity ADC registers + self.hRaw = (self.getUInt8(self.HUMID_MSB_0) << 8) | (self.getUInt8(self.HUMID_LSB_0) >> 4) + + # Read gas resistance ADC registers + self.gResRaw = (self.getUInt8(self.GAS_RES_MSB_0) << 2) | self.getUInt8(self.GAS_RES_LSB_0) >> 6 # Shift bits <7:6> right to get LSB for gas resistance + + gasRange = self.getUInt8(self.GAS_RES_LSB_0) & 0x0F + + self.measTime = ticks_ms() # Capture latest measurement time (ms since Pico powered on) + + # Calculate the compensated reading values from the the raw ADC data + self.calcTemperature(self.tRaw) + self.intCalcPressure(self.pRaw) + self.intCalcHumidity(self.hRaw, self.tRead) + self.intCalcgRes(self.gResRaw, gasRange) + + # A baseline gas resistance is required for the IAQ calculation - it should be taken in a well ventilated area without obvious air pollutants + # Take 60 readings over a ~5min period and find the mean + # Establish the baseline gas resistance reading and the ambient temperature. + # These values are required for air quality calculations + # When the baseline process is complete, values for gBase and tAmbient are stored in a file + # On subsequent power cycles of the board, this function will look for that file and take the baseline values stored there + # To force the baselines process to be run again, call the function like this: calcBaselines(True) + def calcBaselines(self, forcedRun=False): + if (self.bme688InitFlag == False): + self.bme688Init() + if (self.gasInit == False): + self.setupGasSensor() + + self.screen.clear() + self.screen.displayText("Setting Baseline", 2) + self.screen.show() + + try: # Look for a 'baselines.txt' file existing - if it does, take the baseline values from there (unless 'forcedRun' is set to True) + if not forcedRun: + f = open("baselines.txt", "r") + self.gBase = float(f.readline()) + self.tAmbient = float(f.readline()) + else: + raise Exception("RUNNING BASELINE PROCESS") + except: # If there is no file, an exception is raised, and the baseline process will be carried out (creating a new file at the end) + self.ambTempFlag = False + + burnInReadings = 0 + burnInData = 0 + ambTotal = 0 + progress = 0 + while (burnInReadings < 60): # Measure data and continue summing gas resistance until 60 readings have been taken + progress = math.trunc((burnInReadings / 60) * 100) + self.screen.clear() + self.screen.displayText(str(progress) + "%", 4, 50) + self.screen.displayText("Setting Baseline", 2) + self.screen.show() + self.measureData() + burnInData = burnInData + self.gRes + ambTotal = ambTotal + self.newAmbTemp + sleep_ms(5000) + burnInReadings = burnInReadings + 1 + + self.gBase = (burnInData / 60) # Find the mean gas resistance during the period to form the baseline + self.tAmbient = (ambTotal / 60) # Calculate the ambient temperature as the mean of the 60 initial readings + + # Save baseline values to a file + f = open("baselines.txt", "w") #open in write - creates if not existing, will overwrite if it does + f.write(str(self.gBase) + "\r\n") + f.write(str(self.tAmbient) + "\r\n") + f.close() + + self.ambTempFlag = True + + self.screen.clear() + self.screen.displayText("Setup Complete!", 2) + self.screen.show() + sleep_ms(2000) + self.screen.clear() + self.screen.show() + + # Read Temperature from sensor as a Number. + # Units for temperature are in °C (Celsius) or °F (Fahrenheit) according to selection. + def readTemperature(self, temperature_unit="C"): + temperature = self.tRead + # Change temperature from °C to °F if user selection requires it + if (temperature_unit == "F"): + temperature = ((temperature * 18) + 320) / 10 + + return temperature + + # Read Pressure from sensor as a Number. + # Units for pressure are in Pa (Pascals) or mBar (millibar) according to selection. + def readPressure(self, pressure_unit="Pa"): + pressure = self.pRead + #Change pressure from Pascals to millibar if user selection requires it + if (pressure_unit == "mBar"): + pressure = pressure / 100 + + return pressure + + # Read Humidity from sensor as a Number. + # Humidity is output as a percentage. + def readHumidity(self): + return self.hRead + + # Read Gas Resistance from sensor as a Number. + # Units for gas resistance are in Ohms. + def readGasRes(self): + if (self.gasInit == False): + self.screen.clear() + self.screen.displayText("ERROR", 2) + self.screen.displayText("Setup Gas Sensor", 3) + self.screen.show() + return 0 + return self.gRes + + # Read eCO2 from sensor as a Number (250 - 40000+ppm). + # Units for eCO2 are in ppm (parts per million). + def readeCO2(self): + if (self.gasInit == False): + self.screen.clear() + self.screen.displayText("ERROR", 2) + self.screen.displayText("Setup Gas Sensor", 3) + self.screen.show() + return 0 + self.calcAirQuality() + + return self.eCO2Value + + # Return the Air Quality rating as a percentage (0% = Bad, 100% = Excellent). + def getAirQualityPercent(self): + if (self.gasInit == False): + self.screen.clear() + self.screen.displayText("ERROR", 2) + self.screen.displayText("Setup Gas Sensor", 3) + self.screen.show() + return 0 + self.calcAirQuality() + + return self.iaqPercent + + # Return the Air Quality rating as an IAQ score (500 = Bad, 0 = Excellent). + # These values are based on the BME688 datasheet, Page 11, Table 6. + def getAirQualityScore(self): + if (self.gasInit == False): + self.screen.clear() + self.screen.displayText("ERROR", 2) + self.screen.displayText("Setup Gas Sensor", 3) + self.screen.show() + return 0 + self.calcAirQuality() + + return self.iaqScore + + # Calculate the Index of Air Quality score from the current gas resistance and humidity readings + # iaqPercent: 0 to 100% - higher value = better air quality + # iaqScore: 25 should correspond to 'typically good' air, 250 to 'typically polluted' air + # airQualityRating: Text output based on the iaqScore + # Calculate the estimated CO2 value (eCO2) + def calcAirQuality(self): + humidityScore = 0 + gasScore = 0 + humidityOffset = self.hRead - self.hBase # Calculate the humidity offset from the baseline setting + ambTemp = (self.tAmbient / 100) + temperatureOffset = self.tRead - ambTemp # Calculate the temperature offset from the ambient temperature + humidityRatio = ((humidityOffset / self.hBase) + 1) + temperatureRatio = (temperatureOffset / ambTemp) + + # IAQ Calculations + + if (humidityOffset > 0): # Different paths for calculating the humidity score depending on whether the offset is greater than 0 + humidityScore = (100 - self.hRead) / (100 - self.hBase) + else: + humidityScore = self.hRead / self.hBase + + humidityScore = humidityScore * self.hWeight * 100 + + gasRatio = (self.gRes / self.gBase) + + if ((self.gBase - self.gRes) > 0): # Different paths for calculating the gas score depending on whether the offset is greater than 0 + gasScore = gasRatio * (100 * (1 - self.hWeight)) + else: + # Make sure that when the gas offset and humidityOffset are 0, iaqPercent is 95% - leaves room for cleaner air to be identified + gasScore = math.floor(70 + (5 * (gasRatio - 1))) + if (gasScore > 75): + gasScore = 75 + + self.iaqPercent = math.trunc(humidityScore + gasScore) # Air quality percentage is the sum of the humidity (25% weighting) and gas (75% weighting) scores + self.iaqScore = (100 - self.iaqPercent) * 5 # Final air quality score is in range 0 - 500 (see BME688 datasheet page 11 for details) + + # eCO2 Calculations + self.eCO2Value = 250 * math.pow(math.e, (0.012 * self.iaqScore)) # Exponential curve equation to calculate the eCO2 from an iaqScore input + + # Adjust eCO2Value for humidity and/or temperature greater than the baseline values + if (humidityOffset > 0): + if (temperatureOffset > 0): + self.eCO2Value = self.eCO2Value * (humidityRatio + temperatureRatio) + else: + self.eCO2Value = self.eCO2Value * humidityRatio + elif (temperatureOffset > 0): + self.eCO2Value = self.eCO2Value * (temperatureRatio + 1) + + # If measurements are taking place rapidly, breath detection is possible due to the sudden increase in humidity (~7-10%) + # If this increase happens within a 5s time window, 1200ppm is added to the eCO2 value + # (These values were based on 'breath-testing' with another eCO2 sensor with algorithms built-in) + if ((self.measTime - self.measTimePrev) <= 5000): + if ((self.hRead - self.hPrev) >= 3): + self.eCO2Value = self.eCO2Value + 1500 + + self.eCO2Value = math.trunc(self.eCO2Value) + +# The KitronikOLED class enables control of the OLED display screen on the board +# Subclassing FrameBuffer provides support for graphics primitives +# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html +class KitronikOLED(framebuf.FrameBuffer): + # Write commands to the OLED controller + def write_cmd(self, cmd): + self.temp[0] = 0x80 # Co=1, D/C#=0 + self.temp[1] = cmd + self.i2c.writeto(self.CHIP_ADDRESS, self.temp) + + # Write data to the OLED controller + def write_data(self, buf): + self.write_list[1] = buf + self.i2c.writevto(self.CHIP_ADDRESS, self.write_list) + + # Runs on initialisation of the class + # Sets up all the register definitions and global variables + def __init__(self, i2cAddr=0x3C, sda=6, scl=7): + self.CHIP_ADDRESS = i2cAddr # I2C address as determined by hardware configuration + # register definitions + self.SET_CONTRAST = const(0x81) + self.SET_ENTIRE_ON = const(0xA4) + self.SET_NORM_INV = const(0xA6) + self.SET_DISP = const(0xAE) + self.SET_MEM_ADDR = const(0x20) + self.SET_COL_ADDR = const(0x21) + self.SET_PAGE_ADDR = const(0x22) + self.SET_DISP_START_LINE = const(0x40) + self.SET_SEG_REMAP = const(0xA0) + self.SET_MUX_RATIO = const(0xA8) + self.SET_COM_OUT_DIR = const(0xC0) + self.SET_DISP_OFFSET = const(0xD3) + self.SET_COM_PIN_CFG = const(0xDA) + self.SET_DISP_CLK_DIV = const(0xD5) + self.SET_PRECHARGE = const(0xD9) + self.SET_VCOM_DESEL = const(0xDB) + self.SET_CHARGE_PUMP = const(0x8D) + + sda = Pin(sda) + scl = Pin(scl) + self.i2c = I2C(1,sda=sda, scl=scl, freq=100000) + + self.plotArray = [] + self.plotYMin = 0 + self.plotYMax = 100 + self.yPixelMin = 63 + self.yPixelMax = 12 + + self.temp = bytearray(2) + self.write_list = [b"\x40", None] # Co=0, D/C#=1 + + self.width = 128 + self.height = 64 + self.external_vcc = False + self.pages = 8 + self.buffer = bytearray(self.pages * self.width) + super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB) + self.init_display() + + # Initialise the display settings and start the display clear + def init_display(self): + for cmd in ( + self.SET_DISP | 0x00, # off + # address setting + self.SET_MEM_ADDR, + 0x00, # horizontal + # resolution and layout + self.SET_DISP_START_LINE | 0x00, + self.SET_SEG_REMAP,# | 0x01, # Set to either 0xA0 or A1, flips screen horizontally + self.SET_MUX_RATIO, + self.height - 1, + self.SET_COM_OUT_DIR, #| 0x08, # Set to either 0xC0 or 0xC8, flips screen vertically + self.SET_DISP_OFFSET, + 0x00, + self.SET_COM_PIN_CFG, + 0x02 if self.width > 2 * self.height else 0x12, + # timing and driving scheme + self.SET_DISP_CLK_DIV, + 0x80, + self.SET_PRECHARGE, + 0x22 if self.external_vcc else 0xF1, + self.SET_VCOM_DESEL, + 0x30, # 0.83*Vcc + # display + self.SET_CONTRAST, + 0xFF, # maximum + self.SET_ENTIRE_ON, # output follows RAM contents + self.SET_NORM_INV, # not inverted + # charge pump + self.SET_CHARGE_PUMP, + 0x10 if self.external_vcc else 0x14, + self.SET_DISP | 0x01, + ): # on + self.write_cmd(cmd) + self.fill(0) + self.show() + + # Screen will switch off, but retain the information that was displayed + def poweroff(self): + self.write_cmd(self.SET_DISP | 0x00) + + # Turn the screen back on - do not need to re-display what was showing as the information is retained + def poweron(self): + self.write_cmd(self.SET_DISP | 0x01) + + # 0 = Dim to 150 = Bright + def contrast(self, contrast): + self.write_cmd(self.SET_CONTRAST) + self.write_cmd(contrast) + + # 0 = White on black, 1 = Black on white + def invert(self, invert): + self.write_cmd(self.SET_NORM_INV | (invert & 1)) + + # Set text to display on a particular line (1 - 6) and an x-axis offset can be set (0 - 127, 0 is default) + # If the text is longer than than the screen it will be cut off, it will not be pushed to the next line (16 characters max per line) + # Need to call 'show()' to make the text actually display + def displayText(self, text, line, x_offset=0): + if (line < 1): + line = 1 + if (line > 6): + line = 6 + + y = (line * 11) - 10 + + super().text(text, x_offset, y) + + # Make what has been set to display actually appear on the screen + # Needs to be called after 'displayText()', 'plot()', clear()', 'drawLine()' & 'drawRect()' + def show(self): + x0 = 0 + x1 = self.width - 1 + if self.width == 64: + # displays with width of 64 pixels are shifted by 32 + x0 += 32 + x1 += 32 + self.write_cmd(self.SET_COL_ADDR) + self.write_cmd(x0) + self.write_cmd(x1) + self.write_cmd(self.SET_PAGE_ADDR) + self.write_cmd(0) + self.write_cmd(self.pages - 1) + self.write_data(self.buffer) + + # Plot a live updating graph of a variable + # Plot y range is pixels 12 down to 63, leaving room for a title or similar on the first line + # Need to call 'show()' to make the plot actually display + def plot(self, variable): + variable = math.trunc(variable) + + if (variable > self.plotYMax): + self.plotYMax = variable + if (variable < self.plotYMin): + self.plotYMin = variable + + entries = len(self.plotArray) + if (entries >= 128): + prevX = 0 + prevY = self.plotArray[127] + self.plotArray.pop(0) + self.plotArray.append(variable) + else: + self.plotArray.append(variable) + prevX = len(self.plotArray) - 1 + prevY = self.plotArray[prevX] + + for entry in range(entries): + x = entry + y = self.plotArray[entry] + y = math.trunc(self.yPixelMin - (y * ((self.yPixelMin - self.yPixelMax) / (self.plotYMax - self.plotYMin)))) + if (x == 0): + super().pixel(x, y, 1) + else: + self.drawLine(prevX, prevY, x, y) + prevX = x + prevY = y + + # Wipe all data from the screen + # Need to call 'show()' to make the clear actually happen + def clear(self): + super().fill(0) + + # Clear a specific line on the screen + def clearLine(self, line): + yPixel = (line - 1) + ((line * 10) - 10) + super().fill_rect(0, yPixel, 128, 10, 0) + + # Draw a line on the screen (vertical, horizontal or diagonal), setting start and finish (x, y) coordinates + # Need to call 'show()' to make the line actually display + def drawLine(self, start_x, start_y, end_x, end_y): + super().line(start_x, start_y, end_x, end_y, 1) + + # Draw rectangles with a top left starting (x, y) coordinate and then a width and height + # Can be filled (True) or just an outline (False) + # Need to call 'show()' to make the rectangle actually display + def drawRect(self, start_x, start_y, width, height, fill=False): + if (fill == False): + super().rect(start_x, start_y, width, height, 1) + elif (fill == True): + super().fill_rect(start_x, start_y, width, height, 1) diff --git a/main.py b/main.py new file mode 100644 index 0000000..9033738 --- /dev/null +++ b/main.py @@ -0,0 +1,115 @@ + +# A weather station program display the date, time, temperature and humidity on the OLED screen with Air Quality and eCO2 on ZIP LEDs (Red/Green) +from PicoAirQuality import KitronikBME688, KitronikButton, KitronikDataLogger, \ + KitronikOLED, KitronikRTC, KitronikZIPLEDs,\ + KitronikBuzzer +import time + +#from datetime import datetime + +logFileName = "atmosphere_log.txt" + + +bme688 = KitronikBME688() +oled = KitronikOLED() +rtc = KitronikRTC() +zipleds = KitronikZIPLEDs(3) +# Python u arse hole sec = Integer() +rtc.setDate(16, 7, 2022) +rtc.setTime(12, 10, 0) + +bme688.setupGasSensor() +bme688.calcBaselines() +sec = 0 +#f = open(logFileName, 'a') +#f.write("# Data Logging: Date, Time, Temperature, Pressure, Humidity, AirQuality, CO2\n") + +beeper = KitronikBuzzer() +buttons = KitronikButton() + +def ButtonB_r_IRQHandler(pin): + global buttonB + buttonB = 2*60 + print("button B pressed", buttonB) + +def ButtonB_f_IRQHandler(pin): + #now = time.now() + global buttonB + buttonB = 2*60 + print("button B released", buttonB, pin.value(),time.ticks_ms()) + if pin.value(): + beeper.playTone(600) + else: + beeper.stopTone() + + +# FALLING DOES NOT WORK AND IRQ_LOW_LEVEL not recognised +buttons.buttonB.irq(trigger=machine.Pin.IRQ_FALLING|machine.Pin.IRQ_RISING, handler=ButtonB_f_IRQHandler) +#buttons.buttonB.irq(trigger=machine.Pin.IRQ_RISING, handler=ButtonB_r_IRQHandler) + +while True: + + if sec == 0: + f = open(logFileName, 'a+') + f.write("# Data Logging: Date, Time, Temperature, Pressure, Humidity, AirQuality, CO2\n") + f.write('# Restarted ' + str(rtc.readDateString()) + ' ' + str(rtc.readTimeString()) + '\n' ) + time.sleep_ms(1000) + bme688.measureData() + oled.clear() + + #oled.drawRect(4, 5, 120, 35) + oled.displayText(rtc.readDateString(), 1, 25) + oled.displayText(rtc.readTimeString(), 2, 33) + #oled.drawLine(0, 48, 127, 48) + #oled.drawLine(0, 49, 127, 49) + oled.displayText(str(bme688.readTemperature()) + " oC", 3, 0) + oled.displayText(str(bme688.readHumidity()) + " %H", 3, 80) + eCO2 = bme688.readeCO2() + oled.displayText(str(eCO2) + " CO2 ppm", 4, 0) + IAQ = bme688.getAirQualityScore() + oled.displayText(str(IAQ) + " AQ ", 5, 0) + mbar = bme688.readPressure()/100.0 + oled.displayText(str(mbar) + " mB ", 5, 50) + oled.show() + + if (IAQ < 100): + zipleds.setLED(0, zipleds.GREEN) + else: + zipleds.setLED(0, zipleds.RED) + if (eCO2 < 800): + zipleds.setLED(2, zipleds.GREEN) + else: + zipleds.setLED(2, zipleds.RED) + zipleds.setBrightness(10) + + + # ffs all I want is an action every minute I hate python scope shit + #now = rtc.readTimeString(); + # log every minute, thats 1440 records a day + sec = sec + 1 + secmod = sec % 29 + if (secmod) == 0: + #log.setupDataFields("Date", "Time", "Temperature", "Pressure", "Humidity", "IAQ", "eCO2") + log_string = (rtc.readDateString() + ',' + \ + rtc.readTimeString() + ',' + \ + str(bme688.readTemperature()) + ',' + \ + str(bme688.readPressure()) + ',' + \ + str(bme688.readHumidity())+ ',' + \ + str(bme688.getAirQualityScore()) + ',' + \ + str(bme688.readeCO2())) + '\n' + #print (log_string) + #print ("logging data\n") + f.write(log_string) + f.flush() + zipleds.setLED(1, zipleds.RED) + elif (secmod < 10): + zipleds.setLED(1, zipleds.BLUE) + elif (secmod > 10): + s2 = secmod * 2 + zip_col = (secmod, s2, s2) + zipleds.setLED(1, zip_col) + + if ( (sec+1) % 2 ) == 0: + zipleds.setLED(1, zipleds.BLACK) + + zipleds.show() \ No newline at end of file