Skip to content

Commit

Permalink
Merge pull request #4 from benjaminwp18/message-serialization
Browse files Browse the repository at this point in the history
Default ROS serialization
  • Loading branch information
benjaminwp18 authored Mar 28, 2024
2 parents 5805da8 + 8f68230 commit aec97b0
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 23 deletions.
66 changes: 43 additions & 23 deletions mqtt_ros_bridge/bridge_node.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
from typing import Any
from typing import Any, TypeVar, Callable, Generic
from dataclasses import dataclass

import rclpy
from rclpy.node import Node
from rclpy.publisher import Publisher
from rclpy.subscription import Subscription
from rclpy._rclpy_pybind11 import RMWError

from std_msgs.msg import String

import paho.mqtt.client as MQTT

from mqtt_ros_bridge.serializer import Serializer, ROSDefaultSerializer


T = TypeVar('T')


@dataclass
class TopicInfo():
class TopicInfo(Generic[T]):
"""Metadata about a single topic."""

name: str
msg_type: T
serializer: type[Serializer]
publish_on_ros: bool


TOPICS: list[TopicInfo] = [
TopicInfo('pub_topic', True),
TopicInfo('sub_topic', False)
]
TOPICS: dict[str, TopicInfo] = {
'pub_topic': TopicInfo('pub_topic', String, ROSDefaultSerializer, True),
'sub_topic': TopicInfo('sub_topic', String, ROSDefaultSerializer, False)
}

MQTT_PORT = 1883
MQTT_KEEPALIVE = 60


class BridgeNode(Node):
Expand All @@ -31,45 +42,47 @@ class BridgeNode(Node):
def __init__(self) -> None:
super().__init__('mqtt_bridge_node')

print('Creating MQTT ROS bridge node')
self.get_logger().info('Creating MQTT ROS bridge node')

self.mqtt_client = MQTT.Client()
self.mqtt_client.enable_logger()
self.mqtt_client.connect('localhost', port=1883, keepalive=60)
self.mqtt_client.connect('localhost', port=MQTT_PORT, keepalive=MQTT_KEEPALIVE)
self.mqtt_client.loop_start()

self.ros_publishers: dict[str, Publisher] = {}
self.ros_subscriptions: list[Subscription] = []

for topic_info in TOPICS:
for topic_info in TOPICS.values():
if topic_info.publish_on_ros:
publisher = self.create_publisher(String, topic_info.name, 10)
publisher = self.create_publisher(topic_info.msg_type, topic_info.name, 10)
self.ros_publishers[topic_info.name] = publisher
self.mqtt_client.subscribe(topic_info.name)
else:
callback = self.make_ros_callback(topic_info)
subscription = self.create_subscription(
String, topic_info.name, self.make_ros_receiver(topic_info.name), 10)
topic_info.msg_type, topic_info.name, callback, 10)
self.ros_subscriptions.append(subscription)

self.mqtt_client.on_message = self.mqtt_msg_received
self.mqtt_client.on_message = self.mqtt_callback

def make_ros_receiver(self, topic: str):
def make_ros_callback(self, topic_info: TopicInfo[T]) -> Callable[[T], None]:
"""
Create a callback function which re-publishes messages on the same topic in MQTT.
Parameters
----------
topic : str
the topic that the callback will publish on
topic_info : TopicInfo
information about the topic that the callback will publish on
"""
def callback(msg: String):
self.get_logger().info(f"ROS RECEIVED: Topic: '{topic}' Payload: '{msg}'")
self.mqtt_client.publish(topic, msg.data)
def callback(msg: T) -> None:
self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.name}" Payload: "{msg}"')
self.mqtt_client.publish(topic_info.name, topic_info.serializer.serialize(msg))

return callback

def mqtt_msg_received(self, _client: MQTT.Client, _userdata: Any, mqtt_msg: MQTT.MQTTMessage):
def mqtt_callback(self, _client: MQTT.Client,
_userdata: Any, mqtt_msg: MQTT.MQTTMessage) -> None:
"""
Re-publish messages from MQTT on the same topic in ROS.
Expand All @@ -83,12 +96,19 @@ def mqtt_msg_received(self, _client: MQTT.Client, _userdata: Any, mqtt_msg: MQTT
the message received over MQTT
"""
topic_info = TOPICS[mqtt_msg.topic]

self.get_logger().info(
f"MQTT RECEIVED: Topic: '{mqtt_msg.topic}' Payload: '{mqtt_msg.payload!r}'")
f'MQTT RECEIVED: Topic: "{topic_info.name}" Payload: "{mqtt_msg.payload!r}"')

try:
ros_msg = topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type)
except RMWError:
self.get_logger().info('Dropping message with bad serialization received from' +
f'MQTT on topic "{mqtt_msg.topic}": "{mqtt_msg.payload!r}"')
return

ros_msg = String()
ros_msg.data = mqtt_msg.payload.decode('utf-8')
self.ros_publishers[mqtt_msg.topic].publish(ros_msg)
self.ros_publishers[topic_info.name].publish(ros_msg)


def main(args=None):
Expand Down
58 changes: 58 additions & 0 deletions mqtt_ros_bridge/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Any
from abc import ABC, abstractmethod

from rclpy.serialization import serialize_message, deserialize_message


class Serializer(ABC):
"""Serializes and deserializes ROS messages for transmission over MQTT."""

@staticmethod
@abstractmethod
def serialize(message: Any) -> bytes:
"""
Serialize the provided ROS message to a bytes for MQTT.
Parameters
----------
message : Any
the ROS message to serialize
Returns
-------
bytes
the bytes formed by serializing the ROS message
"""

@staticmethod
@abstractmethod
def deserialize(serialized_message: bytes, message_type: Any) -> Any:
"""
Deserialize the provided bytes into a ROS message of the provided type.
Parameters
----------
serialized_message : bytes
the bytes generated by serializing a ROS message
message_type : Any
the type of the message to create
Returns
-------
Any
the ROS message formed by deserializing the bytes
"""


class ROSDefaultSerializer(Serializer):
"""Serialize and deserialize messages using the default ROS message serializer."""

@staticmethod
def serialize(message: Any) -> bytes:
return serialize_message(message)

@staticmethod
def deserialize(serialized_message: bytes, message_type: Any) -> Any:
return deserialize_message(serialized_message, message_type)

0 comments on commit aec97b0

Please sign in to comment.