Skip to content

Commit

Permalink
implement an mro to make tests succeed
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalbert committed Dec 26, 2024
1 parent 1156b23 commit 7b4cb0b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
5 changes: 1 addition & 4 deletions circuitmatter/interaction_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ class ChunkedMessage(InteractionModelMessage):
"""Chunked messages take multiple encodes or decodes before they are complete."""

def encode_into(self, buffer: memoryview, offset: int = 0) -> int:
print("ChunkedMessage encode_into: ", end="") #########
# Leave room for MoreChunkedMessages, SupressResponse, and InteractionModelRevision.
# Leave room for MoreChunkedMessages, SuppressResponse, and InteractionModelRevision.
buffer[0] = tlv.ElementType.STRUCTURE
offset += 1
subbuffer = memoryview(buffer)[: -2 * 2 - 3 - 1]
Expand All @@ -160,15 +159,13 @@ def encode_into(self, buffer: memoryview, offset: int = 0) -> int:
try:
offset = descriptor_class.encode_into(self, subbuffer, offset)
except tlv.ArrayEncodingError as e:
print("splitting", name, f"[{e.index}:] offset {offset}")
offset = e.offset
tag = descriptor_class.tag
self.values[tag] = self.values[tag][e.index :]
self.MoreChunkedMessages = True
else:
offset = descriptor_class.encode_into(self, buffer, offset)
buffer[offset] = tlv.ElementType.END_OF_CONTAINER
print()
return offset + 1


Expand Down
32 changes: 18 additions & 14 deletions circuitmatter/tlv.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,22 @@ class IntEnum:
INT_SIZE = "BHIQ"


def class_hierarchy(cls):
classes = set()
classes.add(cls)
for base in cls.__bases__:
classes.update(class_hierarchy(base))
return classes
# mro implementation from https://stackoverflow.com/a/54261655/142996
def _mro(cls):
"""Implement __mro__ in Python, since CircuitPython and MicroPython don't have it."""
if cls is object:
return [object]
return [cls] + _mro_merge([_mro(base) for base in cls.__bases__])

def _mro_merge(mros):
if not any(mros): # all lists are empty
return [] # base case
for candidate, *_ in mros:
if all(candidate not in tail for _, *tail in mros):
return [candidate] + _mro_merge([tail if head is candidate else [head, *tail]
for head, *tail in mros])
else:
raise TypeError("No legal mro")


class ElementType(enum.IntEnum):
Expand Down Expand Up @@ -129,8 +139,8 @@ def max_length(cls):

@classmethod
def _members(cls) -> Iterable[tuple[str, Member]]:
for superclass in class_hierarchy(cls):
for field_name, descriptor in vars(superclass).items():
for superclass in _mro(cls):
for field_name, descriptor in superclass.__dict__.items():
if not field_name.startswith("_") and isinstance(descriptor, Member):
yield field_name, descriptor

Expand All @@ -140,7 +150,6 @@ def _members_by_tag(cls) -> dict[int, tuple[str, Member]]:
return cls._members_by_tag_cache
members = {}
for field_name, descriptor in cls.__dict__.items():
# for field_name, descriptor in vars(cls).items():
if not field_name.startswith("_") and isinstance(descriptor, Member):
members[descriptor.tag] = (field_name, descriptor)
cls._members_by_tag_cache = members
Expand Down Expand Up @@ -177,10 +186,8 @@ def encode(self) -> memoryview:
return memoryview(buffer)[:end]

def encode_into(self, buffer: bytearray, offset: int = 0) -> int:
print("Structure encode_into:", end="") #########
for _, descriptor_class in self._members():
offset = descriptor_class.encode_into(self, buffer, offset)
print()
buffer[offset] = ElementType.END_OF_CONTAINER
return offset + 1

Expand Down Expand Up @@ -289,7 +296,6 @@ def encode_into(
anonymous_ok=False,
) -> int:
value = self.__get__(obj) # type: ignore # self inference issues
print("Member encode_into:", "tag:", f"0x{self.tag:x}" if type(self.tag) is int else self.tag, "value:", f"0x{value:x}" if type(value) is int else value, type(value)) ########
return self._encode_value_into(value, buffer, offset, anonymous_ok)

def _encode_value_into( # noqa: PLR0912 Too many branches
Expand Down Expand Up @@ -838,7 +844,6 @@ def encode(self) -> memoryview:
def encode_into(self, buffer: bytearray, offset: int = 0) -> int:
member_by_tag = self._members_by_tag()
for item in self.items:
print("List encode_into: ", end="") #########
if isinstance(item, tuple):
tag, _ = item
if tag in member_by_tag:
Expand All @@ -848,7 +853,6 @@ def encode_into(self, buffer: bytearray, offset: int = 0) -> int:
offset = member.encode_into(self, buffer, offset, anonymous_ok=True)
else:
raise NotImplementedError("Anonymous list member")
print() #############
buffer[offset] = ElementType.END_OF_CONTAINER
return offset + 1

Expand Down

0 comments on commit 7b4cb0b

Please sign in to comment.