Skip to content

Commit

Permalink
BOOKMARK: writing to I2C bus on registration
Browse files Browse the repository at this point in the history
  • Loading branch information
Keegan-Evans committed Oct 30, 2023
0 parents commit e988913
Show file tree
Hide file tree
Showing 14 changed files with 572 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
__pycache__/*
.vscode/*
IGNORE/*
.micropico
*.pyc
*OLD*
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## Setup

- Install `pyserial` library
- `pip install pyserial` in development environment

- Install `umqtt.simple` library on pico
- `pip install mpremote` in development environment
- `mpremote connect <path to COM port> mip install umqtt.simple`
Empty file added __init__.py
Empty file.
131 changes: 131 additions & 0 deletions air_quality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# air_quality.py
# ignore this file for now

from machine import Pin
from util import create_message_packet, try_until_runs, set_timeout, Networker
import utime
from umqtt.simple import MQTTClient

import json

INIT_AIR_QUALITY = bytearray([0x20, 0x03])
MEASURE_AIR_QUALITY = bytearray([0x20, 0x08])
MEASURE_RAW_SIGNALS = bytearray([0x20, 0x50])

class SGP30:
def __init__(self, bus, mqtt_handler, i2c_address=0x58, indicator_pin="LED", topic="sensor/air_quality", sensor_id="air_quality"):
utime.sleep_ms(10)

self.i2c_address = i2c_address
self.bus = bus
self.mqtt_handler = mqtt_handler
self.co2 = 0x00
self.tvoc = 0x00
self.featureSetVersion = None
self.h2 = 0x00
self.ethanol = 0x00
self.serialID = None
self.start_time = utime.mktime(utime.localtime())
self.elapsed_time = 0
self.indicator_pin = Pin(indicator_pin, Pin.OUT)
self.topic = topic
self.sensor_id = sensor_id


def set_elapsed_time(self):
self.elapsed_time = utime.mktime(utime.localtime()) - self.start_time

@set_timeout(10)
@try_until_runs
def write(self, msg):
self.bus.writeto(self.i2c_address, create_message_packet(msg))

@set_timeout(10)
@try_until_runs
def read(self, num_bytes):
readings = bytearray(num_bytes)
self.bus.readfrom_into(self.i2c_address, readings)
return readings


def initAirQuality(self):
self.write(INIT_AIR_QUALITY)
self.indicator_pin.on()


def measureAirQuality(self):
self.write(MEASURE_AIR_QUALITY)
# gives sensor time to record measurements, min 20-25ms
utime.sleep_ms(50)
read_vals = self.read(6)
self.set_elapsed_time()

# No real values are returned for the co2 or tvoc values until
# 15 seconds after the initialization of the sensor so that
# proper calibration can occur
if self.elapsed_time < 15:
self.co2 = bytearray([255 << 8 | 255])
self.tvoc = bytearray([255 << 8 | 255])
self.co2 = bytearray([read_vals[0] << 8 | read_vals[1]])
self.tvoc= bytearray([read_vals[3] << 8 | read_vals[4]])
return None


def measureRawSignals(self):
self.write(MEASURE_RAW_SIGNALS)
# gives sensor time to record measurements, min 20-25ms
utime.sleep_ms(50)
read_vals = self.read(6)
self.h2 = bytearray([read_vals[0] << 8 | read_vals[1]])
self.ethanol = bytearray([read_vals[3] << 8 | read_vals[4]])
return None


def measurements(self):
self.indicator_pin.off()
self.measureRawSignals()
self.measureAirQuality()
self.indicator_pin.on()

measurement_values = json.dumps(
{'sensor': self.sensor_id,
'data':
{'co2': int.from_bytes(self.co2, 'big'),
'tvoc': int.from_bytes(self.tvoc, 'big'),
'h2': int.from_bytes(self.h2, 'big'),
'ethanol': int.from_bytes(self.ethanol, 'big'),
'elapsed_time': self.elapsed_time
}})

print(measurement_values)
return measurement_values

def publish(self):
self.mqtt_handler.connect()
print(bytes(self.topic, 'utf-8'), "\n", self.measurements)
self.mqtt_handler.publish(topic=bytes(self.topic, 'utf-8'), msg=self.measurements(), qos=0)
self.mqtt_handler.disconnect()


if __name__ == '__main__':
from umqtt.simple import MQTTClient
from util import Networker
from util import setup_I2C_bus

wlan = Networker().establish_connection()
print("wlan")

client = MQTTClient('aq_board', '10.42.0.1', port=1883, keepalive=60)
print("mqtt")

i2c = setup_I2C_bus()
print("i2c")

test_aq = SGP30(bus=i2c, mqtt_handler=client)
test_aq.initAirQuality()

while True:

print(test_aq.measurements())
test_aq.publish()
utime.sleep_ms(180)
79 changes: 79 additions & 0 deletions demo_i2c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
INIT_AIR_QUALITY = bytearray([0x20, 0x03])
MEASURE_AIR_QUALITY = bytearray([0x20, 0x08])
MEASURE_RAW_SIGNALS = bytearray([0x20, 0x50])
#
# Included as a demo of how to create a custom CRC hash function
# for i2c sensors
def generate_crc_table(poly):
# Generate CRC lookup table
table = [str(0).encode()] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 0x80:
crc = (crc << 1) ^ poly
else:
crc <<= 1
crc &= 0xFF

table.insert(i, str(crc).encode("utf-8"))

return table


SGP30_LOOKUP = generate_crc_table(0x31)

def crc8_bad(data, poly=0x31, init_value=0xFF, final_xor=0x00):

if len(data) != 0:
raise ValueError("Data must be contain 2 and only 2 bytes")
crc = data[0] << 16 ^ data[1] << 8 ^ init_value

crc ^= poly
return crc
# return bytes([crc ^ final_xor])

def crc8(data, table, poly=0x31, init_value=0xFF, final_xor=0x00):
crc = init_value

for byte in data:
crc ^= byte
crc = table[crc]

# crc = int.from_bytes(table[crc], "big")
print(crc)
# return bytes([crc ^ final_xor])
return crc

# for byte in INIT_AIR_QUALITY:
# print(byte)
# print(0xFF ^ byte)
# print(SGP30_LOOKUP[0xFF ^ byte])
#


def create_message_packet(data):
byte_data = b""

if data is str:
byte_data += data.encode("utf-8")

elif data is int:
byte_data += str(data).encode("utf-8")

elif data is bytes or bytearray:
byte_data += data

else:
raise ValueError(
"Type of the provided data({}) does not match str, bytes, "
"bytearray".format(data.type)
)

byte_data += crc8(byte_data, SGP30_LOOKUP)

return byte_data

if __name__ == '__main__':
create_message_packet(INIT_AIR_QUALITY)
# create_message_packet(INIT_AIR_QUALITY, SGP30_LOOKUP)
7 changes: 7 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name: sensor_firmware
channels:
- conda-forge
dependencies:
- python=3.11
- pydocstyle
- pyserial
29 changes: 29 additions & 0 deletions measurement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import inspect


class Measurement:
def __init__(self):
self.measurement_functions = dict()
self.pins = set()

# def __repr__(self):
# return "Sensor(measurement_functions={}, pins={})".format(
# self.measurement_functions, self.pins)
#
def register_function(self, name, func, pins):
# check to make sure pins unused
for pin in pins:
if pin in self.pins:
raise ValueError(f"Pin {pin} already in use")
else:
self.pins.add(pin)

if not callable(func):
raise ValueError(f"Function {func} is not callable")

if str(inspect.signature(func)) != '()':
raise ValueError(f"Function {func.__name__} takes arguments "
f"{str(inspect.signature(func))} but should not.")

self.measurement_functions[name] = func
self.pins = pins
Loading

0 comments on commit e988913

Please sign in to comment.