From b21b35f366b16651358ab362ec3c6f2e73410f9a Mon Sep 17 00:00:00 2001 From: Scott Shawcroft Date: Thu, 17 Oct 2024 12:25:42 -0700 Subject: [PATCH] Add examples instead of __main__.py --- circuitmatter/__init__.py | 24 ++- circuitmatter/__main__.py | 264 ------------------------- circuitmatter/utility/mdns/__init__.py | 11 ++ circuitmatter/utility/mdns/avahi.py | 41 ++++ circuitmatter/utility/random.py | 6 + circuitmatter/utility/recording.py | 78 ++++++++ circuitmatter/utility/replay.py | 75 +++++++ examples/circuitmatter_simpletest.py | 26 +++ examples/replay.py | 64 ++++++ 9 files changed, 320 insertions(+), 269 deletions(-) delete mode 100644 circuitmatter/__main__.py create mode 100644 circuitmatter/utility/mdns/__init__.py create mode 100644 circuitmatter/utility/mdns/avahi.py create mode 100644 circuitmatter/utility/random.py create mode 100644 circuitmatter/utility/recording.py create mode 100644 circuitmatter/utility/replay.py create mode 100644 examples/circuitmatter_simpletest.py create mode 100644 examples/replay.py diff --git a/circuitmatter/__init__.py b/circuitmatter/__init__.py index 01b5331..f90c241 100644 --- a/circuitmatter/__init__.py +++ b/circuitmatter/__init__.py @@ -12,21 +12,35 @@ from . import session from .device_types.utility.root_node import RootNode -__version__ = "0.0.0" +__version__ = "0.1.0" class CircuitMatter: def __init__( self, - socketpool, - mdns_server, - random_source, - state_filename, + socketpool=None, + mdns_server=None, + random_source=None, + state_filename="matter-device-state.json", vendor_id=0xFFF1, product_id=0x8000, ): + if socketpool is None: + import socket + + socketpool = socket self.socketpool = socketpool + + if mdns_server is None: + from circuitmatter.utility.mdns.avahi import Avahi + + mdns_server = Avahi() self.mdns_server = mdns_server + + if random_source is None: + from circuitmatter.utility import random + + random_source = random self.random = random_source self.nonvolatile = nonvolatile.PersistentDictionary(state_filename) diff --git a/circuitmatter/__main__.py b/circuitmatter/__main__.py deleted file mode 100644 index 5ae0674..0000000 --- a/circuitmatter/__main__.py +++ /dev/null @@ -1,264 +0,0 @@ -"""Pure Python implementation of the Matter IOT protocol.""" - -import binascii -import json -import os -import pathlib -import secrets -import socket -import subprocess -import time - -import circuitmatter as cm - -from circuitmatter.device_types.lighting import on_off - - -class ReplaySocket: - def __init__(self, replay_data): - self.replay_data = replay_data - - def bind(self, address): - print("bind to", address) - - def setblocking(self, value): - print("setblocking", value) - - def recvfrom_into(self, buffer, nbytes=None): - if nbytes is None: - nbytes = len(buffer) - direction = "send" - while direction == "send": - direction, _, address, data_b64 = self.replay_data.pop(0) - decoded = binascii.a2b_base64(data_b64) - if len(decoded) > nbytes: - raise RuntimeError("Next replay packet is larger than buffer to read into") - buffer[: len(decoded)] = decoded - return len(decoded), address - - def sendto(self, data, address): - if address is None: - raise ValueError("Address must be set") - # direction, _, address, data_b64 = self.replay_data.pop(0) - # if direction == "send": - # decoded = binascii.a2b_base64(data_b64) - # for i, b in enumerate(data): - # if b != decoded[i]: - # # print("sent", data.hex(" ")) - # # print("old ", decoded.hex(" ")) - # # print(i, hex(b), hex(decoded[i])) - # print("Next replay packet does not match sent data") - return len(data) - - -class ReplayRandom: - def __init__(self, replay_data): - self.replay_data = replay_data - - def urandom(self, nbytes): - direction = None - while direction != "urandom": - direction, _, recorded_nbytes, data_b64 = self.replay_data.pop(0) - if recorded_nbytes != nbytes: - raise RuntimeError("Next replay random data is not the expected length") - decoded = binascii.a2b_base64(data_b64) - return decoded - - def randbelow(self, n): - direction = None - while direction != "randbelow": - direction, _, recorded_n, value = self.replay_data.pop(0) - if recorded_n != n: - raise RuntimeError("Next replay randbelow is not the expected length") - return value - - -class ReplaySocketPool: - AF_INET6 = 0 - SOCK_DGRAM = 1 - - def __init__(self, replay_lines): - self.replay_data = replay_lines - self._socket_created = False - - def socket(self, *args, **kwargs): - if self._socket_created: - raise RuntimeError("Only one socket can be created") - self._socket_created = True - return ReplaySocket(self.replay_data) - - -class DummyMDNS: - def advertise_service( - self, - service_type, - protocol, - port, - txt_records=[], - subtypes=[], - instance_name="", - ): - print(f"Advertise service {service_type} {protocol} {port} {txt_records}") - - -class MDNSServer(DummyMDNS): - def __init__(self): - self.active_services = {} - self.publish_address = None - - def advertise_service( - self, - service_type, - protocol, - port, - txt_records={}, - subtypes=[], - instance_name="", - ): - subtypes = [f"--subtype={subtype}" for subtype in subtypes] - txt_records = [f"{key}={value}" for key, value in txt_records.items()] - command = [ - "avahi-publish-service", - *subtypes, - instance_name, - f"{service_type}.{protocol}", - str(port), - *txt_records, - ] - print("running avahi", command) - self.active_services[service_type + instance_name] = subprocess.Popen(command) - if self.publish_address is None: - command = [ - "avahi-publish-address", - "dalinar.local", - "fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", # "fe80::642:1aff:fe0c:9f2a", - ] - print("run", command) - self.publish_address = subprocess.Popen(command) - - def __del__(self): - for active_service in self.active_services.values(): - active_service.kill() - if self.publish_address is not None: - self.publish_address.kill() - - -class RecordingRandom: - def __init__(self, record_file): - self.record_file = record_file - - def urandom(self, nbytes): - data = os.urandom(nbytes) - entry = ( - "urandom", - time.monotonic_ns(), - nbytes, - binascii.b2a_base64(data, newline=False).decode("utf-8"), - ) - json.dump(entry, self.record_file) - self.record_file.write("\n") - return data - - def randbelow(self, n): - value = secrets.randbelow(n) - entry = ("randbelow", time.monotonic_ns(), n, value) - json.dump(entry, self.record_file) - self.record_file.write("\n") - return value - - -class RecordingSocket: - def __init__(self, record_file, socket): - self.record_file = record_file - self.socket = socket - - def bind(self, address): - self.socket.bind(address) - - def setblocking(self, value): - self.socket.setblocking(value) - - def recvfrom_into(self, buffer, nbytes=None): - nbytes, addr = self.socket.recvfrom_into(buffer, nbytes) - entry = ( - "receive", - time.monotonic_ns(), - addr, - binascii.b2a_base64(buffer[:nbytes], newline=False).decode("utf-8"), - ) - json.dump(entry, self.record_file) - self.record_file.write("\n") - return nbytes, addr - - def sendto(self, data, address): - entry = ( - "send", - time.monotonic_ns(), - address, - binascii.b2a_base64(data, newline=False).decode("utf-8"), - ) - json.dump(entry, self.record_file) - self.record_file.write("\n") - return self.socket.sendto(data, address) - - -class RecordingSocketPool: - AF_INET6 = socket.AF_INET6 - SOCK_DGRAM = socket.SOCK_DGRAM - - def __init__(self, record_file): - self.record_file = record_file - self._socket_created = False - - def socket(self, *args, **kwargs): - if self._socket_created: - raise RuntimeError("Only one socket can be created") - self._socket_created = True - return RecordingSocket(self.record_file, socket.socket(*args, **kwargs)) - - -class NeoPixel(on_off.OnOffLight): - pass - - -def run(replay_file=None): - device_state = pathlib.Path("test_data/device_state.json") - replay_device_state = pathlib.Path("test_data/replay_device_state.json") - if replay_file: - replay_lines = [] - with open(replay_file, "r") as f: - device_state_fn = f.readline().strip() - for line in f: - replay_lines.append(json.loads(line)) - socketpool = ReplaySocketPool(replay_lines) - mdns_server = DummyMDNS() - random_source = ReplayRandom(replay_lines) - # Reset device state to before the captured run - device_state.write_text(pathlib.Path(device_state_fn).read_text()) - else: - timestamp = time.strftime("%Y%m%d-%H%M%S") - record_file = open(f"test_data/recorded_packets-{timestamp}.jsonl", "w") - device_state_fn = f"test_data/device_state-{timestamp}.json" - record_file.write(f"{device_state_fn}\n") - socketpool = RecordingSocketPool(record_file) - mdns_server = MDNSServer() - random_source = RecordingRandom(record_file) - # Save device state before we run so replays can use it. - replay_device_state = pathlib.Path(device_state_fn) - replay_device_state.write_text(device_state.read_text()) - - matter = cm.CircuitMatter(socketpool, mdns_server, random_source, device_state) - led = NeoPixel("neopixel1") - matter.add_device(led) - while True: - matter.process_packets() - - -if __name__ == "__main__": - import sys - - print(sys.argv) - replay_file = None - if len(sys.argv) > 1: - replay_file = sys.argv[1] - run(replay_file=replay_file) diff --git a/circuitmatter/utility/mdns/__init__.py b/circuitmatter/utility/mdns/__init__.py new file mode 100644 index 0000000..382a591 --- /dev/null +++ b/circuitmatter/utility/mdns/__init__.py @@ -0,0 +1,11 @@ +class DummyMDNS: + def advertise_service( + self, + service_type, + protocol, + port, + txt_records=[], + subtypes=[], + instance_name="", + ): + print(f"Advertise service {service_type} {protocol} {port} {txt_records}") diff --git a/circuitmatter/utility/mdns/avahi.py b/circuitmatter/utility/mdns/avahi.py new file mode 100644 index 0000000..8655199 --- /dev/null +++ b/circuitmatter/utility/mdns/avahi.py @@ -0,0 +1,41 @@ +import subprocess + + +class Avahi: + def __init__(self): + self.active_services = {} + self.publish_address = None + + def advertise_service( + self, + service_type, + protocol, + port, + txt_records={}, + subtypes=[], + instance_name="", + ): + subtypes = [f"--subtype={subtype}" for subtype in subtypes] + txt_records = [f"{key}={value}" for key, value in txt_records.items()] + command = [ + "avahi-publish-service", + *subtypes, + instance_name, + f"{service_type}.{protocol}", + str(port), + *txt_records, + ] + self.active_services[service_type + instance_name] = subprocess.Popen(command) + if self.publish_address is None: + command = [ + "avahi-publish-address", + "dalinar.local", + "fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", # "fe80::642:1aff:fe0c:9f2a", + ] + self.publish_address = subprocess.Popen(command) + + def __del__(self): + for active_service in self.active_services.values(): + active_service.kill() + if self.publish_address is not None: + self.publish_address.kill() diff --git a/circuitmatter/utility/random.py b/circuitmatter/utility/random.py new file mode 100644 index 0000000..b5513e1 --- /dev/null +++ b/circuitmatter/utility/random.py @@ -0,0 +1,6 @@ +import os +import secrets + +urandom = os.urandom + +randbelow = secrets.randbelow diff --git a/circuitmatter/utility/recording.py b/circuitmatter/utility/recording.py new file mode 100644 index 0000000..e28fb20 --- /dev/null +++ b/circuitmatter/utility/recording.py @@ -0,0 +1,78 @@ +import binascii +import json +import time + + +class RecordingRandom: + def __init__(self, record_file, random): + self.record_file = record_file + self._random = random + + def urandom(self, nbytes): + data = self._random.urandom(nbytes) + entry = ( + "urandom", + time.monotonic_ns(), + nbytes, + binascii.b2a_base64(data, newline=False).decode("utf-8"), + ) + json.dump(entry, self.record_file) + self.record_file.write("\n") + return data + + def randbelow(self, n): + value = self._random.randbelow(n) + entry = ("randbelow", time.monotonic_ns(), n, value) + json.dump(entry, self.record_file) + self.record_file.write("\n") + return value + + +class RecordingSocket: + def __init__(self, record_file, socket): + self.record_file = record_file + self.socket = socket + + def bind(self, address): + self.socket.bind(address) + + def setblocking(self, value): + self.socket.setblocking(value) + + def recvfrom_into(self, buffer, nbytes=None): + nbytes, addr = self.socket.recvfrom_into(buffer, nbytes) + entry = ( + "receive", + time.monotonic_ns(), + addr, + binascii.b2a_base64(buffer[:nbytes], newline=False).decode("utf-8"), + ) + json.dump(entry, self.record_file) + self.record_file.write("\n") + return nbytes, addr + + def sendto(self, data, address): + entry = ( + "send", + time.monotonic_ns(), + address, + binascii.b2a_base64(data, newline=False).decode("utf-8"), + ) + json.dump(entry, self.record_file) + self.record_file.write("\n") + return self.socket.sendto(data, address) + + +class RecordingSocketPool: + def __init__(self, record_file, socket): + self.AF_INET6 = socket.AF_INET6 + self.SOCK_DGRAM = socket.SOCK_DGRAM + self.record_file = record_file + self._socket_created = False + self._socket = socket + + def socket(self, *args, **kwargs): + if self._socket_created: + raise RuntimeError("Only one socket can be created") + self._socket_created = True + return RecordingSocket(self.record_file, self._socket.socket(*args, **kwargs)) diff --git a/circuitmatter/utility/replay.py b/circuitmatter/utility/replay.py new file mode 100644 index 0000000..e3c35f7 --- /dev/null +++ b/circuitmatter/utility/replay.py @@ -0,0 +1,75 @@ +import binascii + + +class ReplaySocket: + def __init__(self, replay_data): + self.replay_data = replay_data + + def bind(self, address): + print("bind to", address) + + def setblocking(self, value): + print("setblocking", value) + + def recvfrom_into(self, buffer, nbytes=None): + if nbytes is None: + nbytes = len(buffer) + direction = "send" + while direction == "send": + direction, _, address, data_b64 = self.replay_data.pop(0) + decoded = binascii.a2b_base64(data_b64) + if len(decoded) > nbytes: + raise RuntimeError("Next replay packet is larger than buffer to read into") + buffer[: len(decoded)] = decoded + return len(decoded), address + + def sendto(self, data, address): + if address is None: + raise ValueError("Address must be set") + # direction, _, address, data_b64 = self.replay_data.pop(0) + # if direction == "send": + # decoded = binascii.a2b_base64(data_b64) + # for i, b in enumerate(data): + # if b != decoded[i]: + # # print("sent", data.hex(" ")) + # # print("old ", decoded.hex(" ")) + # # print(i, hex(b), hex(decoded[i])) + # print("Next replay packet does not match sent data") + return len(data) + + +class ReplayRandom: + def __init__(self, replay_data): + self.replay_data = replay_data + + def urandom(self, nbytes): + direction = None + while direction != "urandom": + direction, _, recorded_nbytes, data_b64 = self.replay_data.pop(0) + if recorded_nbytes != nbytes: + raise RuntimeError("Next replay random data is not the expected length") + decoded = binascii.a2b_base64(data_b64) + return decoded + + def randbelow(self, n): + direction = None + while direction != "randbelow": + direction, _, recorded_n, value = self.replay_data.pop(0) + if recorded_n != n: + raise RuntimeError("Next replay randbelow is not the expected length") + return value + + +class ReplaySocketPool: + AF_INET6 = 0 + SOCK_DGRAM = 1 + + def __init__(self, replay_lines): + self.replay_data = replay_lines + self._socket_created = False + + def socket(self, *args, **kwargs): + if self._socket_created: + raise RuntimeError("Only one socket can be created") + self._socket_created = True + return ReplaySocket(self.replay_data) diff --git a/examples/circuitmatter_simpletest.py b/examples/circuitmatter_simpletest.py new file mode 100644 index 0000000..f1cb1f7 --- /dev/null +++ b/examples/circuitmatter_simpletest.py @@ -0,0 +1,26 @@ +"""Simple LED on and off as a light.""" + +import circuitmatter as cm +from circuitmatter.device_types.lighting import on_off + +import digitalio +import board + + +class LED(on_off.OnOffLight): + def __init__(self, name, led): + super().__init__(name) + self._led = led + + def on(self, session): + self._led.value = True + + def off(self, session): + self._led.value = False + + +matter = cm.CircuitMatter(state_filename="test_data/device_state.json") +led = LED("led1", digitalio.DigitalInOut(board.D13)) +matter.add_device(led) +while True: + matter.process_packets() diff --git a/examples/replay.py b/examples/replay.py new file mode 100644 index 0000000..52274bd --- /dev/null +++ b/examples/replay.py @@ -0,0 +1,64 @@ +"""Pure Python implementation of the Matter IOT protocol.""" + +import json +import pathlib +import socket +import time + +import circuitmatter as cm + +from circuitmatter.device_types.lighting import on_off + +from circuitmatter.utility import random +from circuitmatter.utility.recording import RecordingSocketPool, RecordingRandom +from circuitmatter.utility.replay import ReplaySocketPool, ReplayRandom + +from circuitmatter.utility.mdns import DummyMDNS +from circuitmatter.utility.mdns.avahi import Avahi + + +class NeoPixel(on_off.OnOffLight): + pass + + +def run(replay_file=None): + device_state = pathlib.Path("test_data/device_state.json") + replay_device_state = pathlib.Path("test_data/replay_device_state.json") + if replay_file: + replay_lines = [] + with open(replay_file, "r") as f: + device_state_fn = f.readline().strip() + for line in f: + replay_lines.append(json.loads(line)) + socketpool = ReplaySocketPool(replay_lines) + mdns_server = DummyMDNS() + random_source = ReplayRandom(replay_lines) + # Reset device state to before the captured run + device_state.write_text(pathlib.Path(device_state_fn).read_text()) + else: + timestamp = time.strftime("%Y%m%d-%H%M%S") + record_file = open(f"test_data/recorded_packets-{timestamp}.jsonl", "w") + device_state_fn = f"test_data/device_state-{timestamp}.json" + record_file.write(f"{device_state_fn}\n") + socketpool = RecordingSocketPool(record_file, socket) + mdns_server = Avahi() + random_source = RecordingRandom(record_file, random) + # Save device state before we run so replays can use it. + replay_device_state = pathlib.Path(device_state_fn) + replay_device_state.write_text(device_state.read_text()) + + matter = cm.CircuitMatter(socketpool, mdns_server, random_source, device_state) + led = NeoPixel("neopixel1") + matter.add_device(led) + while True: + matter.process_packets() + + +if __name__ == "__main__": + import sys + + print(sys.argv) + replay_file = None + if len(sys.argv) > 1: + replay_file = sys.argv[1] + run(replay_file=replay_file)