Skip to content

Commit

Permalink
type safe
Browse files Browse the repository at this point in the history
  • Loading branch information
InvincibleRMC committed Apr 16, 2024
1 parent f510607 commit 332fe07
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
23 changes: 9 additions & 14 deletions mqtt_ros_bridge/bridge_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import sys
from typing import Any, Callable, Generic
from typing import Any, Callable, Generic, cast

import paho.mqtt.client as MQTT
import rclpy
Expand All @@ -11,17 +11,17 @@
from mqtt_ros_bridge.msg_typing import MsgLikeT
from mqtt_ros_bridge.serializer import (JSONSerializer, ROSDefaultSerializer,
Serializer)
from mqtt_ros_bridge.util import lookup_object, parameter_dict_from_yaml_file
from mqtt_ros_bridge.util import lookup_message, parameter_dict_from_yaml_file


class TopicInfo(Generic[MsgLikeT]):
class TopicInfoMsg(Generic[MsgLikeT]):
"""Metadata about a single topic."""

def __init__(self, topic: str, msg_object_path: str,
publish_on_ros: bool, use_ros_serializer: bool = True) -> None:

self.topic = topic
self.msg_type: MsgLikeT = lookup_object(msg_object_path)
self.msg_type: MsgLikeT = cast(MsgLikeT, lookup_message(msg_object_path))
self.publish_on_ros = publish_on_ros

if use_ros_serializer:
Expand All @@ -34,11 +34,6 @@ def __str__(self) -> str:
f"{self.publish_on_ros}, Serializer: {self.serializer}")


# TOPICS: dict[str, TopicInfo] = {
# '/turtle1/cmd_vel': TopicInfo('/turtle1/cmd_vel', "std_msgs.msg:String", False),
# # 'sub_topic': TopicInfo('sub_topic', String, ROSDefaultSerializer, False)
# }

MQTT_PORT = 1883
MQTT_KEEPALIVE = 60

Expand Down Expand Up @@ -88,10 +83,10 @@ def __init__(self, args: list[str]) -> None:

self.mqtt_client.on_message = self.mqtt_callback

def topic_info_from_parameters(self, config: str) -> dict[str, TopicInfo]:
def topic_info_from_parameters(self, config: str) -> dict[str, TopicInfoMsg]:

config = os.path.expanduser(config)
topic_infos: dict[str, TopicInfo] = {}
topic_infos: dict[str, TopicInfoMsg] = {}

params = parameter_dict_from_yaml_file(config)

Expand All @@ -107,7 +102,7 @@ def topic_info_from_parameters(self, config: str) -> dict[str, TopicInfo]:
else:
ros_serialiser = False

topic_infos[params[f"{name}.{PARAMETER_TOPIC}"].value] = (TopicInfo(
topic_infos[params[f"{name}.{PARAMETER_TOPIC}"].value] = (TopicInfoMsg(
params[f"{name}.{PARAMETER_TOPIC}"].value,
params[f"{name}.{PARAMETER_TYPE}"].value,
params[f"{name}.{PARAMETER_PUBLISH_ON_ROS}"].value,
Expand All @@ -116,13 +111,13 @@ def topic_info_from_parameters(self, config: str) -> dict[str, TopicInfo]:

return topic_infos

def make_ros_callback(self, topic_info: TopicInfo[MsgLikeT]) -> Callable[[MsgLikeT], None]:
def make_ros_callback(self, topic_info: TopicInfoMsg[MsgLikeT]) -> Callable[[MsgLikeT], None]:
"""
Create a callback function which re-publishes messages on the same topic in MQTT.
Parameters
----------
topic_info : TopicInfo
topic_info : TopicInfoMsg
information about the topic that the callback will publish on
"""
Expand Down
16 changes: 13 additions & 3 deletions mqtt_ros_bridge/util.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from importlib import import_module
from typing import Optional
from typing import Optional, cast

import yaml
from rcl_interfaces.msg import Parameter as ParameterMsg
from rcl_interfaces.msg import ParameterType, ParameterValue
from rclpy.parameter import PARAMETER_SEPARATOR_STRING, Parameter

from mqtt_ros_bridge.msg_typing import MsgLike

def lookup_object(object_path: str, package: str = 'mqtt_ros_bridge') -> object:

def lookup_message(object_path: str, package: str = 'mqtt_ros_bridge') -> MsgLike:
""" lookup message from a some.module:object_name specification. """

return cast(MsgLike, _lookup_object(object_path, package))


def _lookup_object(object_path: str, package: str = 'mqtt_ros_bridge') -> object:
""" lookup object from a some.module:object_name specification. """
module_name, obj_name = object_path.split(":")
module = import_module(module_name, package)
Expand Down Expand Up @@ -154,7 +162,9 @@ def parameter_dict_from_yaml_file(
return new_dictionary


def _unpack_parameter_dict(namespace, parameter_dict):
def _unpack_parameter_dict(namespace: str,
parameter_dict: dict[str, ParameterMsg]
) -> dict[str, ParameterMsg]:
"""
Flatten a parameter dictionary recursively.
Expand Down

0 comments on commit 332fe07

Please sign in to comment.