Skip to content

Commit

Permalink
Linting & typing
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwp18 committed Mar 28, 2024
1 parent 27e3197 commit 43b10a9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
18 changes: 11 additions & 7 deletions mqtt_ros_bridge/bridge_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, TypeVar, Callable, Generic
from dataclasses import dataclass

import rclpy
Expand All @@ -14,12 +14,15 @@
from mqtt_ros_bridge.serializer import Serializer, ROSDefaultSerializer


T = TypeVar('T')


@dataclass
class TopicInfo():
class TopicInfo(Generic[T]):
"""Metadata about a single topic."""

name: str
msg_type: Any
msg_type: T
serializer: type[Serializer]
publish_on_ros: bool

Expand Down Expand Up @@ -62,7 +65,7 @@ def __init__(self) -> None:

self.mqtt_client.on_message = self.mqtt_callback

def make_ros_callback(self, topic_info: TopicInfo):
def make_ros_callback(self, topic_info: TopicInfo[T]) -> Callable[[T], None]:
"""
Create a callback function which re-publishes messages on the same topic in MQTT.
Expand All @@ -72,13 +75,14 @@ def make_ros_callback(self, topic_info: TopicInfo):
information about the topic that the callback will publish on
"""
def callback(msg: topic_info.msg_type):
def callback(msg: T) -> None:
self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.name}" Payload: "{msg}"')
self.mqtt_client.publish(topic_info.name, topic_info.serializer.serialize(msg))

return callback

def mqtt_callback(self, _client: MQTT.Client, _userdata: Any, mqtt_msg: MQTT.MQTTMessage):
def mqtt_callback(self, _client: MQTT.Client,
_userdata: Any, mqtt_msg: MQTT.MQTTMessage) -> None:
"""
Re-publish messages from MQTT on the same topic in ROS.
Expand All @@ -101,7 +105,7 @@ def mqtt_callback(self, _client: MQTT.Client, _userdata: Any, mqtt_msg: MQTT.MQT
ros_msg = topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type)
except RMWError:
self.get_logger().info('Dropping message with bad serialization received' +
f'from MQTT on topic "{mqtt_msg.topic}": "{mqtt_msg.payload!r}"')
f'from MQTT on topic "{mqtt_msg.topic}": "{mqtt_msg.payload!r}"')
return

self.ros_publishers[topic_info.name].publish(ros_msg)
Expand Down
1 change: 1 addition & 0 deletions mqtt_ros_bridge/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from rclpy.serialization import serialize_message, deserialize_message


class Serializer(ABC):
"""Serializes and deserializes ROS messages for transmission over MQTT."""

Expand Down

0 comments on commit 43b10a9

Please sign in to comment.