Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add human readable encodings with json. #3

Merged
merged 14 commits into from
Mar 29, 2024
24 changes: 10 additions & 14 deletions mqtt_ros_bridge/bridge_node.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
from typing import Any, TypeVar, Callable, Generic
from dataclasses import dataclass
from typing import Any, Callable, Generic, Type

import paho.mqtt.client as MQTT
import rclpy
from mqtt_ros_bridge.encodings import MsgLikeT
from mqtt_ros_bridge.serializer import ROSDefaultSerializer, Serializer
from rclpy._rclpy_pybind11 import RMWError
from rclpy.node import Node
from rclpy.publisher import Publisher
from rclpy.subscription import Subscription
from rclpy._rclpy_pybind11 import RMWError

from std_msgs.msg import String

import paho.mqtt.client as MQTT

from mqtt_ros_bridge.serializer import Serializer, ROSDefaultSerializer


T = TypeVar('T')


@dataclass
class TopicInfo(Generic[T]):
class TopicInfo(Generic[MsgLikeT]):
"""Metadata about a single topic."""

name: str
msg_type: T
msg_type: Type[MsgLikeT]
# Should Serializer also be generic across MsgLikeT?
serializer: type[Serializer]
publish_on_ros: bool

Expand Down Expand Up @@ -65,7 +61,7 @@ def __init__(self) -> None:

self.mqtt_client.on_message = self.mqtt_callback

def make_ros_callback(self, topic_info: TopicInfo[T]) -> Callable[[T], None]:
def make_ros_callback(self, topic_info: TopicInfo[MsgLikeT]) -> Callable[[MsgLikeT], None]:
"""
Create a callback function which re-publishes messages on the same topic in MQTT.

Expand All @@ -75,7 +71,7 @@ def make_ros_callback(self, topic_info: TopicInfo[T]) -> Callable[[T], None]:
information about the topic that the callback will publish on

"""
def callback(msg: T) -> None:
def callback(msg: MsgLikeT) -> None:
self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.name}" Payload: "{msg}"')
self.mqtt_client.publish(topic_info.name, topic_info.serializer.serialize(msg))

Expand Down
115 changes: 115 additions & 0 deletions mqtt_ros_bridge/encodings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import json
from array import array
from typing import (Iterable, MutableSequence, Protocol, Type, TypeAlias,
TypeVar, cast)

from numpy import floating, integer, ndarray
from numpy.typing import NDArray
from rclpy.type_support import check_is_valid_msg_type

NestedDictionary: TypeAlias = dict[str, object]


class MsgLike(Protocol):
"""Generic Message Type Alias."""

@classmethod
def get_fields_and_field_types(cls) -> dict[str, str]:
...


MsgLikeT = TypeVar("MsgLikeT", bound=MsgLike)
ArrayElementT = TypeVar('ArrayElementT', int, float, str)

RESERVED_FIELD_TYPE = '/'
ENCODING = 'latin-1'


def numpy_encoding(array_arg: NDArray[integer] | NDArray[floating]) -> list[int] | list[float]:
if isinstance(array_arg[0], integer):
return [int(x) for x in array_arg]
else:
return [float(x) for x in array_arg]


def array_encoding(array_arg: MutableSequence[ArrayElementT]) -> list[ArrayElementT]:
if len(array_arg) == 0:
return []
element_type = type(array_arg[0])
return [element_type(x) for x in array_arg]


def human_readable_encoding(msg: MsgLike) -> bytes:
check_is_valid_msg_type(type(msg))

msg_dict = human_readable_encoding_recursive(msg)
return json.dumps(msg_dict).encode()


def human_readable_encoding_recursive(msg: MsgLike) -> NestedDictionary:
msg_dict = {}

msg_fields_and_field_types = type(msg).get_fields_and_field_types()
for field, field_types in msg_fields_and_field_types.items():
value = getattr(msg, field)

if isinstance(value, bytes):
value = value.decode(ENCODING)
elif isinstance(value, list) and len(value) > 0:
if isinstance(value[0], bytes):
value = cast(list[bytes], value)
value = [byte.decode(ENCODING) for byte in value]
elif RESERVED_FIELD_TYPE in field_types:
value = [human_readable_encoding_recursive(msg_in_list) for msg_in_list in value]
elif isinstance(value, list) and len(value) == 0:
value = []
elif isinstance(value, ndarray):
value = numpy_encoding(value)
elif isinstance(value, array):
value = array_encoding(value)
elif RESERVED_FIELD_TYPE in field_types:
value = human_readable_encoding_recursive(value)
msg_dict[field] = value

return msg_dict


def human_readable_decoding(byte_msg: bytes, msg_type: Type[MsgLikeT]) -> MsgLikeT:
check_is_valid_msg_type(msg_type)

str_msg = byte_msg.decode()
msg_dict = json.loads(str_msg)
return human_readable_decoding_recursive(msg_dict, msg_type)


def human_readable_decoding_recursive(msg_dict: NestedDictionary,
msg_type: Type[MsgLikeT]) -> MsgLikeT:
msg = msg_type()
set_value: object
for field, value in msg_dict.items():
field_default = getattr(msg, field)
if isinstance(field_default, bytes):
if isinstance(value, str):
set_value = value.encode(ENCODING)
elif isinstance(value, dict):
set_value = human_readable_decoding_recursive(value, type(getattr(msg, field)))
elif isinstance(field_default, list):
if len(field_default) == 0:
set_value = []
else:
field_default_element = field_default[0]
if isinstance(field_default_element, bytes):
value = cast(list[str], value)
set_value = [byte.encode(ENCODING) for byte in value]
elif RESERVED_FIELD_TYPE in msg_type.get_fields_and_field_types()[field]:
value = cast(Iterable[NestedDictionary], value)
set_value = [human_readable_decoding_recursive(msg_in_list,
type(getattr(msg, field)[0]))
for msg_in_list in value]
else:
set_value = value
else:
set_value = value

setattr(msg, field, set_value)
return msg
27 changes: 21 additions & 6 deletions mqtt_ros_bridge/serializer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Any
from abc import ABC, abstractmethod
from typing import Type

from rclpy.serialization import serialize_message, deserialize_message
from mqtt_ros_bridge.encodings import (MsgLike, MsgLikeT,
human_readable_decoding,
human_readable_encoding)
from rclpy.serialization import deserialize_message, serialize_message


class Serializer(ABC):
"""Serializes and deserializes ROS messages for transmission over MQTT."""

@staticmethod
@abstractmethod
def serialize(message: Any) -> bytes:
def serialize(message: MsgLike) -> bytes:
"""
Serialize the provided ROS message to a bytes for MQTT.

Expand All @@ -27,7 +30,7 @@ def serialize(message: Any) -> bytes:

@staticmethod
@abstractmethod
def deserialize(serialized_message: bytes, message_type: Any) -> Any:
def deserialize(serialized_message: bytes, message_type: Type[MsgLikeT]) -> MsgLikeT:
"""
Deserialize the provided bytes into a ROS message of the provided type.

Expand All @@ -50,9 +53,21 @@ class ROSDefaultSerializer(Serializer):
"""Serialize and deserialize messages using the default ROS message serializer."""

@staticmethod
def serialize(message: Any) -> bytes:
def serialize(message: MsgLike) -> bytes:
return serialize_message(message)

@staticmethod
def deserialize(serialized_message: bytes, message_type: Any) -> Any:
def deserialize(serialized_message: bytes, message_type: Type[MsgLikeT]) -> MsgLikeT:
return deserialize_message(serialized_message, message_type)


class HumanReadableSerializer(Serializer):
"""Serialize and deserialize messages using the default ROS message serializer."""

@staticmethod
def serialize(message: MsgLike) -> bytes:
return human_readable_encoding(message)

@staticmethod
def deserialize(serialized_message: bytes, message_type: Type[MsgLikeT]) -> MsgLikeT:
return human_readable_decoding(serialized_message, message_type)
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>ament_mypy</test_depend>
<test_depend>test_msgs</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
Expand Down
25 changes: 25 additions & 0 deletions test/test_encodings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from test_msgs.msg import (Arrays, BasicTypes, BoundedPlainSequences, BoundedSequences, Builtins,
Constants, Defaults, Empty, MultiNested, Strings, UnboundedSequences,
WStrings)
from mqtt_ros_bridge.encodings import human_readable_encoding, human_readable_decoding, MsgLikeT


def encodings_helper(msg: MsgLikeT) -> MsgLikeT:
encoded_and_decoded_msg = human_readable_decoding(human_readable_encoding(msg), type(msg))
assert encoded_and_decoded_msg == msg
return encoded_and_decoded_msg


def test_encodings() -> None:
encodings_helper(Arrays())
encodings_helper(BasicTypes())
encodings_helper(BoundedPlainSequences())
encodings_helper(BoundedSequences())
encodings_helper(Builtins())
encodings_helper(Constants())
encodings_helper(Defaults())
encodings_helper(Empty())
encodings_helper(MultiNested())
encodings_helper(Strings())
encodings_helper(UnboundedSequences())
encodings_helper(WStrings())