88import rclpy
99from rclpy ._rclpy_pybind11 import RMWError
1010from rclpy .node import Node
11+ from rclpy .client import Client
1112# TODO this breaks humble
1213from rclpy .parameter import Parameter , parameter_dict_from_yaml_file
1314from rclpy .publisher import Publisher
@@ -56,6 +57,9 @@ def __str__(self) -> str:
5657PARAMETER_INTERACTION_TYPE = "interaction_type"
5758PARAMETER_USE_ROS_SERIALIZER = "use_ros_serializer"
5859
60+ SERVICE_REQUEST_POSTFIX = "_mqtt_bridge_request"
61+ SERVICE_RESPONSE_POSTFIX = "_mqtt_bridge_response"
62+
5963
6064class BridgeNode (Node ):
6165 """Node to bridge MQTT and ROS."""
@@ -70,7 +74,9 @@ def __init__(self, args: list[str]) -> None:
7074
7175 if (len (args ) != 2 and "--ros-args" not in args ) or not (len (args ) == 3 and
7276 "--ros-args" in args ):
73- raise ValueError ("To many arguments given" )
77+ msg = "To many arguments given"
78+ self .get_logger ().error (msg )
79+ raise ValueError (msg )
7480
7581 self .topics = self .topic_info_from_parameters (args [1 ])
7682
@@ -85,16 +91,27 @@ def __init__(self, args: list[str]) -> None:
8591 self .mqtt_client .loop_start ()
8692
8793 self .ros_publishers : dict [str , Publisher ] = {}
94+ self .ros_clients : dict [str , Client ] = {}
8895
8996 for topic_info in self .topics .values ():
90- if topic_info .interaction_type == InteractionType .ROS_PUBLISHER :
91- publisher = self .create_publisher (topic_info .msg_type , topic_info .topic , 10 )
92- self .ros_publishers [topic_info .topic ] = publisher
93- self .mqtt_client .subscribe (topic_info .topic )
94- else :
95- callback = self .make_ros_callback (topic_info )
96- # TODO proper QOS?
97- self .create_subscription (topic_info .msg_type , topic_info .topic , callback , 10 )
97+ match topic_info .interaction_type :
98+ case InteractionType .ROS_PUBLISHER :
99+ publisher = self .create_publisher (topic_info .msg_type , topic_info .topic , 10 )
100+ self .ros_publishers [topic_info .topic ] = publisher
101+ self .mqtt_client .subscribe (topic_info .topic )
102+ case InteractionType .ROS_SUBSCRIBER :
103+ callback = self .make_ros_callback (topic_info )
104+ # TODO proper QOS?
105+ self .create_subscription (topic_info .msg_type , topic_info .topic , callback , 10 )
106+ case InteractionType .ROS_CLIENT :
107+ self .ros_clients [topic_info .topic ] = \
108+ self .create_client (topic_info .topic , topic_info .topic )
109+ case InteractionType .ROS_SERVER :
110+ pass # TODO
111+ case _:
112+ msg = f'Invalid interaction type { topic_info .interaction_type } '
113+ self .get_logger ().error (msg )
114+ raise ValueError (msg )
98115
99116 self .mqtt_client .on_message = self .mqtt_callback
100117
@@ -122,7 +139,9 @@ def topic_info_from_parameters(self, config: str) -> dict[str, TopicMsgInfo]:
122139
123140 interaction_type_str = params [f"{ name } .{ PARAMETER_INTERACTION_TYPE } " ].value
124141 if interaction_type_str not in INTERACTION_TYPES .keys ():
125- raise ValueError (f'Interaction types must be one of { INTERACTION_TYPES .keys ()} ' )
142+ msg = f'Interaction types must be one of { INTERACTION_TYPES .keys ()} '
143+ self .get_logger ().error (msg )
144+ raise ValueError (msg )
126145
127146 topic_infos [params [f"{ name } .{ PARAMETER_TOPIC } " ].value ] = TopicMsgInfo (
128147 params [f"{ name } .{ PARAMETER_TOPIC } " ].value ,
@@ -152,7 +171,7 @@ def callback(msg: MsgLikeT) -> None:
152171 def mqtt_callback (self , _client : MQTT .Client ,
153172 _userdata : Any , mqtt_msg : MQTT .MQTTMessage ) -> None :
154173 """
155- Re-publish messages from MQTT on the same topic in ROS.
174+ Re-publish messages from MQTT on the same topic in ROS or
156175
157176 Parameters
158177 ----------
@@ -164,7 +183,14 @@ def mqtt_callback(self, _client: MQTT.Client,
164183 the message received over MQTT
165184
166185 """
167- topic_info = self .topics [mqtt_msg .topic ]
186+ if mqtt_msg .topic in self .topics :
187+ topic_info = self .topics [mqtt_msg .topic ]
188+ elif mqtt_msg .topic + SERVICE_REQUEST_POSTFIX in self .topics :
189+ topic_info = self .topics [mqtt_msg .topic + SERVICE_REQUEST_POSTFIX ]
190+ elif mqtt_msg .topic + SERVICE_RESPONSE_POSTFIX in self .topics :
191+ topic_info = self .topics [mqtt_msg .topic + SERVICE_RESPONSE_POSTFIX ]
192+ else :
193+ return # Ignore MQTT messages on topics that weren't registered with us
168194
169195 self .get_logger ().info (
170196 f'MQTT RECEIVED: Topic: "{ topic_info .topic } " Payload: "{ mqtt_msg .payload !r} "' )
@@ -176,7 +202,13 @@ def mqtt_callback(self, _client: MQTT.Client,
176202 f'MQTT on topic "{ mqtt_msg .topic } ": "{ mqtt_msg .payload !r} "' )
177203 return
178204
179- self .ros_publishers [topic_info .topic ].publish (ros_msg )
205+ if topic_info .interaction_type == InteractionType .ROS_PUBLISHER :
206+ self .ros_publishers [topic_info .topic ].publish (ros_msg )
207+ elif topic_info .interaction_type == InteractionType .ROS_CLIENT :
208+ request = topic_info .msg_type .Request ()
209+ future = self .ros_clients [topic_info .topic ].call_async (request )
210+ # TODO: send a message on topic + RESPONSE POSTFIX when this future returns
211+ # TODO: server
180212
181213
182214def main (args = None ):
0 commit comments