|
1 | 1 | import os
|
2 | 2 | import sys
|
3 |
| -from typing import Any, Callable, Generic, cast |
| 3 | +from enum import IntEnum |
| 4 | +from typing import Any, Callable, Generic, cast, Literal |
4 | 5 |
|
5 | 6 | import paho.mqtt.client as MQTT
|
6 | 7 | import rclpy
|
7 | 8 | from rclpy._rclpy_pybind11 import RMWError
|
8 | 9 | from rclpy.node import Node
|
9 | 10 | # TODO this breaks humble
|
| 11 | +from rclpy.qos import QoSPresetProfiles |
10 | 12 | from rclpy.parameter import Parameter, parameter_dict_from_yaml_file
|
11 | 13 | from rclpy.publisher import Publisher
|
12 | 14 |
|
|
15 | 17 | from mqtt_ros_bridge.util import lookup_message
|
16 | 18 |
|
17 | 19 |
|
| 20 | +class QoSPresetProfilesInt(IntEnum): |
| 21 | + UNKNOWN = 0 |
| 22 | + DEFAULT = 1 |
| 23 | + SYSTEM_DEFAULT = 2 |
| 24 | + SENSOR_DATA = 3 |
| 25 | + SERVICES_DEFAULT = 4 |
| 26 | + PARAMETERS = 5 |
| 27 | + PARAMETER_EVENTS = 6 |
| 28 | + ACTION_STATUS_DEFAULT = 7 |
| 29 | + BEST_AVAILABLE = 8 |
| 30 | + |
| 31 | + @staticmethod |
| 32 | + def to_qos_profile(enum: 'int | QoSPresetProfilesInt') -> QoSPresetProfiles: |
| 33 | + match enum: |
| 34 | + case QoSPresetProfilesInt.UNKNOWN: |
| 35 | + return QoSPresetProfiles.UNKNOWN |
| 36 | + case QoSPresetProfilesInt.DEFAULT: |
| 37 | + return QoSPresetProfiles.DEFAULT |
| 38 | + case QoSPresetProfilesInt.SYSTEM_DEFAULT: |
| 39 | + return QoSPresetProfiles.SYSTEM_DEFAULT |
| 40 | + case QoSPresetProfilesInt.SENSOR_DATA: |
| 41 | + return QoSPresetProfiles.SENSOR_DATA |
| 42 | + case QoSPresetProfilesInt.SERVICES_DEFAULT: |
| 43 | + return QoSPresetProfiles.SERVICES_DEFAULT |
| 44 | + case QoSPresetProfilesInt.PARAMETERS: |
| 45 | + return QoSPresetProfiles.PARAMETERS |
| 46 | + case QoSPresetProfilesInt.PARAMETER_EVENTS: |
| 47 | + return QoSPresetProfiles.PARAMETER_EVENTS |
| 48 | + case QoSPresetProfilesInt.ACTION_STATUS_DEFAULT: |
| 49 | + return QoSPresetProfiles.ACTION_STATUS_DEFAULT |
| 50 | + case QoSPresetProfilesInt.BEST_AVAILABLE: |
| 51 | + return QoSPresetProfiles.BEST_AVAILABLE |
| 52 | + case _: |
| 53 | + return QoSPresetProfiles.UNKNOWN |
| 54 | + |
| 55 | + |
18 | 56 | class TopicMsgInfo(Generic[MsgLikeT]):
|
19 | 57 | """Metadata about a single topic."""
|
20 | 58 |
|
21 | 59 | def __init__(self, topic: str, msg_object_path: str,
|
22 |
| - publish_on_ros: bool, use_ros_serializer: bool = True) -> None: |
| 60 | + publish_on_ros: bool, use_ros_serializer: bool = True, |
| 61 | + ros_qos: int | QoSPresetProfilesInt = QoSPresetProfilesInt.DEFAULT, |
| 62 | + mqtt_qos: Literal[0, 1, 2] = 0) -> None: |
23 | 63 |
|
24 | 64 | self.topic = topic
|
25 | 65 | self.msg_type: MsgLikeT = cast(MsgLikeT, lookup_message(msg_object_path))
|
26 | 66 | self.publish_on_ros = publish_on_ros
|
27 | 67 | self.serializer = ROSDefaultSerializer if use_ros_serializer else JSONSerializer
|
| 68 | + self.ros_qos = QoSPresetProfilesInt.to_qos_profile(ros_qos).value |
| 69 | + self.mqtt_qos = mqtt_qos |
28 | 70 |
|
29 | 71 | def __str__(self) -> str:
|
30 | 72 | return (f"Topic: {self.topic}, Message Type: {self.msg_type}, Publish on ROS:"
|
31 | 73 | f"{self.publish_on_ros}, Serializer: {self.serializer}")
|
32 | 74 |
|
33 | 75 |
|
34 |
| -MQTT_PORT = 1883 |
| 76 | +MQTT_PORT = 50001 |
35 | 77 | MQTT_KEEPALIVE = 60
|
36 | 78 |
|
37 | 79 | PARAMETER_TOPIC = "topic"
|
@@ -70,13 +112,14 @@ def __init__(self, args: list[str]) -> None:
|
70 | 112 |
|
71 | 113 | for topic_info in self.topics.values():
|
72 | 114 | if topic_info.publish_on_ros:
|
73 |
| - publisher = self.create_publisher(topic_info.msg_type, topic_info.topic, 10) |
| 115 | + publisher = self.create_publisher(topic_info.msg_type, topic_info.topic, |
| 116 | + topic_info.ros_qos) |
74 | 117 | self.ros_publishers[topic_info.topic] = publisher
|
75 |
| - self.mqtt_client.subscribe(topic_info.topic) |
| 118 | + self.mqtt_client.subscribe(topic_info.topic, qos=topic_info.mqtt_qos) |
76 | 119 | else:
|
77 | 120 | callback = self.make_ros_callback(topic_info)
|
78 |
| - # TODO proper QOS? |
79 |
| - self.create_subscription(topic_info.msg_type, topic_info.topic, callback, 10) |
| 121 | + self.create_subscription(topic_info.msg_type, topic_info.topic, callback, |
| 122 | + topic_info.ros_qos) |
80 | 123 |
|
81 | 124 | self.mqtt_client.on_message = self.mqtt_callback
|
82 | 125 |
|
@@ -123,7 +166,8 @@ def make_ros_callback(self, topic_info: TopicMsgInfo[MsgLikeT]) -> Callable[[Msg
|
123 | 166 | """
|
124 | 167 | def callback(msg: MsgLikeT) -> None:
|
125 | 168 | self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.topic}" Payload: "{msg}"')
|
126 |
| - self.mqtt_client.publish(topic_info.topic, topic_info.serializer.serialize(msg)) |
| 169 | + self.mqtt_client.publish(topic_info.topic, topic_info.serializer.serialize(msg), |
| 170 | + topic_info.mqtt_qos) |
127 | 171 |
|
128 | 172 | return callback
|
129 | 173 |
|
@@ -165,6 +209,9 @@ def main(args=None):
|
165 | 209 |
|
166 | 210 | rclpy.spin(bridge_node)
|
167 | 211 |
|
| 212 | + bridge_node.mqtt_client.socket().shutdown() |
| 213 | + bridge_node.mqtt_client.socket().close() |
| 214 | + |
168 | 215 | bridge_node.destroy_node()
|
169 | 216 | rclpy.shutdown()
|
170 | 217 |
|
|
0 commit comments