Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yaml suppport #6

Merged
merged 26 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/industrial_ci_action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,3 @@ jobs:
# Added back so testing without Dockerfile can be done
ROS_DISTRO: ${{ matrix.ROS_DISTRO }}
ROS_REPO: ${{ matrix.ROS_REPO }}

12 changes: 12 additions & 0 deletions config/pub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mqtt_ros_bridge:
ros__parameters:
turtle_pub:
topic: "/turtle1/cmd_vel"
type: "geometry_msgs.msg:Twist"
publish_on_ros: True

yippee:
topic: "foo"
type: "std_msgs.msg:String"
publish_on_ros: True
use_ros_serializer: True
12 changes: 12 additions & 0 deletions config/sub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mqtt_ros_bridge:
ros__parameters:
turtle_pub:
topic: "/turtle1/cmd_vel"
type: "geometry_msgs.msg:Twist"
publish_on_ros: False

yippee:
topic: "foo"
type: "std_msgs.msg:String"
publish_on_ros: True
use_ros_serializer: True
36 changes: 36 additions & 0 deletions launch/demo_pub_bridge_launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os

from ament_index_python.packages import get_package_share_directory
from launch.actions import SetEnvironmentVariable
from launch.launch_description import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description() -> LaunchDescription:
"""
Generate LaunchDescription for MQTT ROS bridge.

Returns
-------
LaunchDescription
Launches bridge_node.

"""
config = os.path.join(
get_package_share_directory('mqtt_ros_bridge'),
'config',
'pub.yaml'
)

run_bridge_node = Node(
package='mqtt_ros_bridge',
executable='bridge_node',
emulate_tty=True,
output='screen',
arguments=[config]
)

return LaunchDescription([
SetEnvironmentVariable("ROS_DOMAIN_ID", "2"),
run_bridge_node
])
45 changes: 45 additions & 0 deletions launch/demo_sub_launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os

from ament_index_python.packages import get_package_share_directory
from launch.actions import SetEnvironmentVariable
from launch.launch_description import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description() -> LaunchDescription:
"""
Generate LaunchDescription for MQTT ROS bridge.

Returns
-------
LaunchDescription
Launches bridge_node.

"""
config = os.path.join(
get_package_share_directory('mqtt_ros_bridge'),
'config',
'sub.yaml'
)

run_bridge_node = Node(
package='mqtt_ros_bridge',
executable='bridge_node',
emulate_tty=True,
output='screen',
arguments=[config]
)

turtle_sim = Node(
package='rqt_robot_steering',
executable='rqt_robot_steering',
emulate_tty=True,
parameters=[{"topic": "/turtle1/cmd_vel"}],
output='screen'
)

return LaunchDescription([
SetEnvironmentVariable("ROS_DOMAIN_ID", "1"),
run_bridge_node,
turtle_sim
])
26 changes: 26 additions & 0 deletions launch/demo_turtle_sim_launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from launch.actions import SetEnvironmentVariable
from launch.launch_description import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description() -> LaunchDescription:
"""
Generate LaunchDescription for MQTT ROS bridge.

Returns
-------
LaunchDescription
Launches bridge_node.

"""
turtle_sim = Node(
package='turtlesim',
executable='turtlesim_node',
emulate_tty=True,
output='screen'
)

return LaunchDescription([
SetEnvironmentVariable("ROS_DOMAIN_ID", "2"),
turtle_sim
])
117 changes: 83 additions & 34 deletions mqtt_ros_bridge/bridge_node.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,64 @@
from dataclasses import dataclass
from typing import Any, Callable, Generic, Type
import os
import sys
from typing import Any, Callable, Generic, cast

import paho.mqtt.client as MQTT
import rclpy

from rclpy._rclpy_pybind11 import RMWError
from rclpy.node import Node
# TODO this breaks humble
from rclpy.parameter import Parameter, parameter_dict_from_yaml_file
from rclpy.publisher import Publisher
from rclpy.subscription import Subscription
from std_msgs.msg import String

from mqtt_ros_bridge.msg_typing import MsgLikeT
from mqtt_ros_bridge.serializer import ROSDefaultSerializer, Serializer
from mqtt_ros_bridge.serializer import JSONSerializer, ROSDefaultSerializer
from mqtt_ros_bridge.util import lookup_message


@dataclass
class TopicInfo(Generic[MsgLikeT]):
class TopicMsgInfo(Generic[MsgLikeT]):
"""Metadata about a single topic."""

name: str
msg_type: Type[MsgLikeT]
# Should Serializer also be generic across MsgLikeT?
serializer: type[Serializer]
publish_on_ros: bool
def __init__(self, topic: str, msg_object_path: str,
publish_on_ros: bool, use_ros_serializer: bool = True) -> None:

self.topic = topic
self.msg_type: MsgLikeT = cast(MsgLikeT, lookup_message(msg_object_path))
self.publish_on_ros = publish_on_ros
self.serializer = ROSDefaultSerializer if use_ros_serializer else JSONSerializer

def __str__(self) -> str:
return (f"Topic: {self.topic}, Message Type: {self.msg_type}, Publish on ROS:"
f"{self.publish_on_ros}, Serializer: {self.serializer}")

TOPICS: dict[str, TopicInfo] = {
'/turtle1/cmd_vel': TopicInfo('/turtle1/cmd_vel', String, ROSDefaultSerializer, False),
# 'sub_topic': TopicInfo('sub_topic', String, ROSDefaultSerializer, False)
}

MQTT_PORT = 1883
MQTT_KEEPALIVE = 60

PARAMETER_TOPIC = "topic"
PARAMETER_TYPE = "type"
PARAMETER_PUBLISH_ON_ROS = "publish_on_ros"
PARAMETER_USE_ROS_SERIALIZER = "use_ros_serializer"


class BridgeNode(Node):
"""Node to bridge MQTT and ROS."""

def __init__(self) -> None:
def __init__(self, args: list[str]) -> None:
super().__init__('mqtt_bridge_node')

# TODO get from parameters
# DEBUG = True

print(args)

if (len(args) != 2 and "--ros-args" not in args) or not (len(args) == 3 and
InvincibleRMC marked this conversation as resolved.
Show resolved Hide resolved
"--ros-args" in args):
raise ValueError("To many arguments given")

self.topics = self.topic_info_from_parameters(args[1])

self.get_logger().info(str(self.topics))

self.get_logger().info('Creating MQTT ROS bridge node')

self.mqtt_client = MQTT.Client()
Expand All @@ -48,34 +67,64 @@ def __init__(self) -> None:
self.mqtt_client.loop_start()

self.ros_publishers: dict[str, Publisher] = {}
self.ros_subscriptions: list[Subscription] = []

for topic_info in TOPICS.values():
for topic_info in self.topics.values():
if topic_info.publish_on_ros:
publisher = self.create_publisher(topic_info.msg_type, topic_info.name, 10)
self.ros_publishers[topic_info.name] = publisher
self.mqtt_client.subscribe(topic_info.name)
publisher = self.create_publisher(topic_info.msg_type, topic_info.topic, 10)
self.ros_publishers[topic_info.topic] = publisher
self.mqtt_client.subscribe(topic_info.topic)
else:
callback = self.make_ros_callback(topic_info)
subscription = self.create_subscription(
topic_info.msg_type, topic_info.name, callback, 10)
self.ros_subscriptions.append(subscription)
# TODO proper QOS?
self.create_subscription(topic_info.msg_type, topic_info.topic, callback, 10)

self.mqtt_client.on_message = self.mqtt_callback

def make_ros_callback(self, topic_info: TopicInfo[MsgLikeT]) -> Callable[[MsgLikeT], None]:
def topic_info_from_parameters(self, config: str) -> dict[str, TopicMsgInfo]:

InvincibleRMC marked this conversation as resolved.
Show resolved Hide resolved
config = os.path.expanduser(config)
topic_infos: dict[str, TopicMsgInfo] = {}

params: dict[str, Parameter] = {}
dictionary = parameter_dict_from_yaml_file(config)

for key, parameter_msg in dictionary.items():
params[key] = Parameter.from_parameter_msg(parameter_msg)

unique_names: set[str] = set()
for names in params.keys():
# TODO Check that right half matches PARAMETER_*
unique_names.add(names.split(".")[0])

for name in unique_names:

InvincibleRMC marked this conversation as resolved.
Show resolved Hide resolved
if params.get(f"{name}.{PARAMETER_USE_ROS_SERIALIZER}", None):
ros_serialiser = params[f"{name}.{PARAMETER_USE_ROS_SERIALIZER}"].value
else:
ros_serialiser = False

topic_infos[params[f"{name}.{PARAMETER_TOPIC}"].value] = (TopicMsgInfo(
params[f"{name}.{PARAMETER_TOPIC}"].value,
params[f"{name}.{PARAMETER_TYPE}"].value,
params[f"{name}.{PARAMETER_PUBLISH_ON_ROS}"].value,
ros_serialiser
))

return topic_infos

def make_ros_callback(self, topic_info: TopicMsgInfo[MsgLikeT]) -> Callable[[MsgLikeT], None]:
"""
Create a callback function which re-publishes messages on the same topic in MQTT.

Parameters
----------
topic_info : TopicInfo
topic_info : TopicMsgInfo
information about the topic that the callback will publish on

"""
def callback(msg: MsgLikeT) -> None:
self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.name}" Payload: "{msg}"')
self.mqtt_client.publish(topic_info.name, topic_info.serializer.serialize(msg))
self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.topic}" Payload: "{msg}"')
self.mqtt_client.publish(topic_info.topic, topic_info.serializer.serialize(msg))

return callback

Expand All @@ -94,10 +143,10 @@ def mqtt_callback(self, _client: MQTT.Client,
the message received over MQTT

"""
topic_info = TOPICS[mqtt_msg.topic]
topic_info = self.topics[mqtt_msg.topic]

self.get_logger().info(
f'MQTT RECEIVED: Topic: "{topic_info.name}" Payload: "{mqtt_msg.payload!r}"')
f'MQTT RECEIVED: Topic: "{topic_info.topic}" Payload: "{mqtt_msg.payload!r}"')

try:
ros_msg = topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type)
Expand All @@ -106,14 +155,14 @@ def mqtt_callback(self, _client: MQTT.Client,
f'MQTT on topic "{mqtt_msg.topic}": "{mqtt_msg.payload!r}"')
return

self.ros_publishers[topic_info.name].publish(ros_msg)
self.ros_publishers[topic_info.topic].publish(ros_msg)


def main(args=None):
"""Run bridge node; used in ROS executable."""
rclpy.init(args=args)

bridge_node = BridgeNode()
bridge_node = BridgeNode(sys.argv)

rclpy.spin(bridge_node)

Expand Down
7 changes: 3 additions & 4 deletions mqtt_ros_bridge/serializer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from abc import ABC, abstractmethod
from typing import Type

from rclpy.serialization import deserialize_message, serialize_message

Expand Down Expand Up @@ -30,7 +29,7 @@ def serialize(message: MsgLike) -> bytes:

@staticmethod
@abstractmethod
def deserialize(serialized_message: bytes, message_type: Type[MsgLikeT]) -> MsgLikeT:
def deserialize(serialized_message: bytes, message_type: type[MsgLikeT]) -> MsgLikeT:
"""
Deserialize the provided bytes into a ROS message of the provided type.

Expand All @@ -57,7 +56,7 @@ def serialize(message: MsgLike) -> bytes:
return serialize_message(message)

@staticmethod
def deserialize(serialized_message: bytes, message_type: Type[MsgLikeT]) -> MsgLikeT:
def deserialize(serialized_message: bytes, message_type: type[MsgLikeT]) -> MsgLikeT:
return deserialize_message(serialized_message, message_type)


Expand All @@ -69,5 +68,5 @@ def serialize(message: MsgLike) -> bytes:
return json_serialize(message)

@staticmethod
def deserialize(serialized_message: bytes, message_type: Type[MsgLikeT]) -> MsgLikeT:
def deserialize(serialized_message: bytes, message_type: type[MsgLikeT]) -> MsgLikeT:
return json_deserialize(serialized_message, message_type)
17 changes: 17 additions & 0 deletions mqtt_ros_bridge/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from importlib import import_module
from typing import cast

from mqtt_ros_bridge.msg_typing import MsgLike


def lookup_message(object_path: str, package: str = 'mqtt_ros_bridge') -> MsgLike:
"""Lookup message from a some.module:object_name specification."""
return cast(MsgLike, _lookup_object(object_path, package))


def _lookup_object(object_path: str, package: str = 'mqtt_ros_bridge') -> object:
"""Lookup object from a some.module:object_name specification."""
module_name, obj_name = object_path.split(":")
module = import_module(module_name, package)
obj = getattr(module, obj_name)
return obj
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<test_depend>test_msgs</test_depend>
<test_depend>python3-pytest</test_depend>


InvincibleRMC marked this conversation as resolved.
Show resolved Hide resolved
<export>
<build_type>ament_python</build_type>
</export>
Expand Down
Loading