Skip to content

Commit

Permalink
Default ROS serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwp18 committed Mar 28, 2024
1 parent 5805da8 commit 27e3197
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 21 deletions.
58 changes: 37 additions & 21 deletions mqtt_ros_bridge/bridge_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,32 @@
from rclpy.node import Node
from rclpy.publisher import Publisher
from rclpy.subscription import Subscription
from rclpy._rclpy_pybind11 import RMWError

from std_msgs.msg import String

import paho.mqtt.client as MQTT

from mqtt_ros_bridge.serializer import Serializer, ROSDefaultSerializer


@dataclass
class TopicInfo():
"""Metadata about a single topic."""

name: str
msg_type: Any
serializer: type[Serializer]
publish_on_ros: bool


TOPICS: list[TopicInfo] = [
TopicInfo('pub_topic', True),
TopicInfo('sub_topic', False)
]
TOPICS: dict[str, TopicInfo] = {
'pub_topic': TopicInfo('pub_topic', String, ROSDefaultSerializer, True),
'sub_topic': TopicInfo('sub_topic', String, ROSDefaultSerializer, False)
}

MQTT_PORT = 1883
MQTT_KEEPALIVE = 60


class BridgeNode(Node):
Expand All @@ -31,45 +39,46 @@ class BridgeNode(Node):
def __init__(self) -> None:
super().__init__('mqtt_bridge_node')

print('Creating MQTT ROS bridge node')
self.get_logger().info('Creating MQTT ROS bridge node')

self.mqtt_client = MQTT.Client()
self.mqtt_client.enable_logger()
self.mqtt_client.connect('localhost', port=1883, keepalive=60)
self.mqtt_client.connect('localhost', port=MQTT_PORT, keepalive=MQTT_KEEPALIVE)
self.mqtt_client.loop_start()

self.ros_publishers: dict[str, Publisher] = {}
self.ros_subscriptions: list[Subscription] = []

for topic_info in TOPICS:
for topic_info in TOPICS.values():
if topic_info.publish_on_ros:
publisher = self.create_publisher(String, topic_info.name, 10)
publisher = self.create_publisher(topic_info.msg_type, topic_info.name, 10)
self.ros_publishers[topic_info.name] = publisher
self.mqtt_client.subscribe(topic_info.name)
else:
callback = self.make_ros_callback(topic_info)
subscription = self.create_subscription(
String, topic_info.name, self.make_ros_receiver(topic_info.name), 10)
topic_info.msg_type, topic_info.name, callback, 10)
self.ros_subscriptions.append(subscription)

self.mqtt_client.on_message = self.mqtt_msg_received
self.mqtt_client.on_message = self.mqtt_callback

def make_ros_receiver(self, topic: str):
def make_ros_callback(self, topic_info: TopicInfo):
"""
Create a callback function which re-publishes messages on the same topic in MQTT.
Parameters
----------
topic : str
the topic that the callback will publish on
topic_info : TopicInfo
information about the topic that the callback will publish on
"""
def callback(msg: String):
self.get_logger().info(f"ROS RECEIVED: Topic: '{topic}' Payload: '{msg}'")
self.mqtt_client.publish(topic, msg.data)
def callback(msg: topic_info.msg_type):
self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.name}" Payload: "{msg}"')
self.mqtt_client.publish(topic_info.name, topic_info.serializer.serialize(msg))

return callback

def mqtt_msg_received(self, _client: MQTT.Client, _userdata: Any, mqtt_msg: MQTT.MQTTMessage):
def mqtt_callback(self, _client: MQTT.Client, _userdata: Any, mqtt_msg: MQTT.MQTTMessage):
"""
Re-publish messages from MQTT on the same topic in ROS.
Expand All @@ -83,12 +92,19 @@ def mqtt_msg_received(self, _client: MQTT.Client, _userdata: Any, mqtt_msg: MQTT
the message received over MQTT
"""
topic_info = TOPICS[mqtt_msg.topic]

self.get_logger().info(
f"MQTT RECEIVED: Topic: '{mqtt_msg.topic}' Payload: '{mqtt_msg.payload!r}'")
f'MQTT RECEIVED: Topic: "{topic_info.name}" Payload: "{mqtt_msg.payload!r}"')

try:
ros_msg = topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type)
except RMWError:
self.get_logger().info('Dropping message with bad serialization received' +
f'from MQTT on topic "{mqtt_msg.topic}": "{mqtt_msg.payload!r}"')
return

ros_msg = String()
ros_msg.data = mqtt_msg.payload.decode('utf-8')
self.ros_publishers[mqtt_msg.topic].publish(ros_msg)
self.ros_publishers[topic_info.name].publish(ros_msg)


def main(args=None):
Expand Down
57 changes: 57 additions & 0 deletions mqtt_ros_bridge/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Any
from abc import ABC, abstractmethod

from rclpy.serialization import serialize_message, deserialize_message

class Serializer(ABC):
"""Serializes and deserializes ROS messages for transmission over MQTT."""

@staticmethod
@abstractmethod
def serialize(message: Any) -> bytes:
"""
Serialize the provided ROS message to a bytes for MQTT.
Parameters
----------
message : Any
the ROS message to serialize
Returns
-------
bytes
the bytes formed by serializing the ROS message
"""

@staticmethod
@abstractmethod
def deserialize(serialized_message: bytes, message_type: Any) -> Any:
"""
Deserialize the provided bytes into a ROS message of the provided type.
Parameters
----------
serialized_message : bytes
the bytes generated by serializing a ROS message
message_type : Any
the type of the message to create
Returns
-------
Any
the ROS message formed by deserializing the bytes
"""


class ROSDefaultSerializer(Serializer):
"""Serialize and deserialize messages using the default ROS message serializer."""

@staticmethod
def serialize(message: Any) -> bytes:
return serialize_message(message)

@staticmethod
def deserialize(serialized_message: bytes, message_type: Any) -> Any:
return deserialize_message(serialized_message, message_type)

0 comments on commit 27e3197

Please sign in to comment.