Skip to content

Commit

Permalink
Getting ready for module installation
Browse files Browse the repository at this point in the history
  • Loading branch information
Aiky30 committed Dec 17, 2024
1 parent c13fb08 commit eb8070c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ Decisions to be made:
7. Device startup and shutdown needs a unit test
8. Temp probe should be a fixture
9. CPU temp probe should be a fixture
10. Pass args from systemd through to run_protocol. This may need a cli, how would that work on a Pico?

### Wish list:

- Poetry (Not started)
- ASGI backend server (Daphne)
- Native webcomponent FE with Bootstrap
- pip install raspberry pi
- modular components that are easy to install from a software and hardware perspective
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,62 @@

import adafruit_am2320
import board
from shed_pi_module_utils.base_protocol import BaseProtocol
from shed_pi_module_utils.data_submission import (
ReadingSubmissionService,
)
from shed_pi_module_utils.utils import logger

# create the I2C shared bus
i2c = board.I2C() # uses board.SCL and board.SDA
# i2c = board.STEMMA_I2C() # For using the built-in STEMMA QT connector on a microcontroller
am = adafruit_am2320.AM2320(i2c)

while True:
print("Temperature: ", am.temperature)
print("Humidity: ", am.relative_humidity)
time.sleep(2)


class DeviceProtocol(BaseProtocol):
def __init__(self, submission_service: ReadingSubmissionService):
...
super().__init__(submission_service=submission_service)
# create the I2C shared bus
i2c = board.I2C() # uses board.SCL and board.SDA
# i2c = board.STEMMA_I2C() # For using the built-in STEMMA QT connector on a microcontroller
self.component = adafruit_am2320.AM2320(i2c)

self.should_stop = False

def stop(self):
logger.info("Stopping device protocol")

def start(self):
self.should_stop = True

def start(self, run_for: float = 60):
"""
:param run_for: Time to wait between reads
"""
logger.info("Starting device protocol")

while not self.should_stop:
self.read_data()
time.sleep(run_for)

def shutdown(self) -> None:
self.stop()

def run(self):
...

def read_data(self):
logger.debug("Reading component ")
print("Temperature: ", self.component.temperature)
print("Humidity: ", self.component.relative_humidity)

self.submission_service.submit()


def main():
def run_protocol():
# The Submission service is used to record any module data
submission_service = ReadingSubmissionService()
device = DeviceProtocol(submission_service=submission_service)

try:
device.startup()
device.start()
finally:
device.shutdown()


if __name__ == "__main__":
main()
run_protocol()

0 comments on commit eb8070c

Please sign in to comment.