diff --git a/config/lampi.yaml b/config/lampi.yaml index 43f36d4..82b7110 100644 --- a/config/lampi.yaml +++ b/config/lampi.yaml @@ -4,3 +4,5 @@ mqtt_ros_bridge: topic: "devices/b827eb3d9134/lamp/set_config" type: "lampi_msgs.msg:Lampi" publish_on_ros: False + ros_qos: 1 + mqtt_qos: 1 diff --git a/mqtt_ros_bridge/bridge_node.py b/mqtt_ros_bridge/bridge_node.py index c58fc56..c1d1589 100644 --- a/mqtt_ros_bridge/bridge_node.py +++ b/mqtt_ros_bridge/bridge_node.py @@ -1,12 +1,14 @@ import os import sys -from typing import Any, Callable, Generic, cast +from enum import IntEnum +from typing import Any, Callable, Generic, cast, Literal import paho.mqtt.client as MQTT import rclpy from rclpy._rclpy_pybind11 import RMWError from rclpy.node import Node # TODO this breaks humble +from rclpy.qos import QoSPresetProfiles from rclpy.parameter import Parameter, parameter_dict_from_yaml_file from rclpy.publisher import Publisher @@ -15,23 +17,63 @@ from mqtt_ros_bridge.util import lookup_message +class QoSPresetProfilesInt(IntEnum): + UNKNOWN = 0 + DEFAULT = 1 + SYSTEM_DEFAULT = 2 + SENSOR_DATA = 3 + SERVICES_DEFAULT = 4 + PARAMETERS = 5 + PARAMETER_EVENTS = 6 + ACTION_STATUS_DEFAULT = 7 + BEST_AVAILABLE = 8 + + @staticmethod + def to_qos_profile(enum: 'int | QoSPresetProfilesInt') -> QoSPresetProfiles: + match enum: + case QoSPresetProfilesInt.UNKNOWN: + return QoSPresetProfiles.UNKNOWN + case QoSPresetProfilesInt.DEFAULT: + return QoSPresetProfiles.DEFAULT + case QoSPresetProfilesInt.SYSTEM_DEFAULT: + return QoSPresetProfiles.SYSTEM_DEFAULT + case QoSPresetProfilesInt.SENSOR_DATA: + return QoSPresetProfiles.SENSOR_DATA + case QoSPresetProfilesInt.SERVICES_DEFAULT: + return QoSPresetProfiles.SERVICES_DEFAULT + case QoSPresetProfilesInt.PARAMETERS: + return QoSPresetProfiles.PARAMETERS + case QoSPresetProfilesInt.PARAMETER_EVENTS: + return QoSPresetProfiles.PARAMETER_EVENTS + case QoSPresetProfilesInt.ACTION_STATUS_DEFAULT: + return QoSPresetProfiles.ACTION_STATUS_DEFAULT + case QoSPresetProfilesInt.BEST_AVAILABLE: + return QoSPresetProfiles.BEST_AVAILABLE + case _: + return QoSPresetProfiles.UNKNOWN + + class TopicMsgInfo(Generic[MsgLikeT]): """Metadata about a single topic.""" def __init__(self, topic: str, msg_object_path: str, - publish_on_ros: bool, use_ros_serializer: bool = True) -> None: + publish_on_ros: bool, use_ros_serializer: bool = True, + ros_qos: int | QoSPresetProfilesInt = QoSPresetProfilesInt.DEFAULT, + mqtt_qos: Literal[0, 1, 2] = 0) -> None: self.topic = topic self.msg_type: MsgLikeT = cast(MsgLikeT, lookup_message(msg_object_path)) self.publish_on_ros = publish_on_ros self.serializer = ROSDefaultSerializer if use_ros_serializer else JSONSerializer + self.ros_qos = QoSPresetProfilesInt.to_qos_profile(ros_qos).value + self.mqtt_qos = mqtt_qos def __str__(self) -> str: return (f"Topic: {self.topic}, Message Type: {self.msg_type}, Publish on ROS:" f"{self.publish_on_ros}, Serializer: {self.serializer}") -MQTT_PORT = 1883 +MQTT_PORT = 50001 MQTT_KEEPALIVE = 60 PARAMETER_TOPIC = "topic" @@ -70,13 +112,14 @@ def __init__(self, args: list[str]) -> None: for topic_info in self.topics.values(): if topic_info.publish_on_ros: - publisher = self.create_publisher(topic_info.msg_type, topic_info.topic, 10) + publisher = self.create_publisher(topic_info.msg_type, topic_info.topic, + topic_info.ros_qos) self.ros_publishers[topic_info.topic] = publisher - self.mqtt_client.subscribe(topic_info.topic) + self.mqtt_client.subscribe(topic_info.topic, qos=topic_info.mqtt_qos) else: callback = self.make_ros_callback(topic_info) - # TODO proper QOS? - self.create_subscription(topic_info.msg_type, topic_info.topic, callback, 10) + self.create_subscription(topic_info.msg_type, topic_info.topic, callback, + topic_info.ros_qos) self.mqtt_client.on_message = self.mqtt_callback @@ -123,7 +166,8 @@ def make_ros_callback(self, topic_info: TopicMsgInfo[MsgLikeT]) -> Callable[[Msg """ def callback(msg: MsgLikeT) -> None: self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.topic}" Payload: "{msg}"') - self.mqtt_client.publish(topic_info.topic, topic_info.serializer.serialize(msg)) + self.mqtt_client.publish(topic_info.topic, topic_info.serializer.serialize(msg), + topic_info.mqtt_qos) return callback @@ -165,6 +209,9 @@ def main(args=None): rclpy.spin(bridge_node) + bridge_node.mqtt_client.socket().shutdown() + bridge_node.mqtt_client.socket().close() + bridge_node.destroy_node() rclpy.shutdown()