1
1
from importlib import import_module
2
- from typing import Optional , cast
3
-
4
- import yaml
5
- from rcl_interfaces .msg import Parameter as ParameterMsg
6
- from rcl_interfaces .msg import ParameterType , ParameterValue
7
- from rclpy .parameter import PARAMETER_SEPARATOR_STRING , Parameter
2
+ from typing import cast
8
3
9
4
from mqtt_ros_bridge .msg_typing import MsgLike
10
5
@@ -20,168 +15,3 @@ def _lookup_object(object_path: str, package: str = 'mqtt_ros_bridge') -> object
20
15
module = import_module (module_name , package )
21
16
obj = getattr (module , obj_name )
22
17
return obj
23
-
24
-
25
- # Kind of exist in ros-iron??? very confusing
26
- def get_parameter_value (string_value : str ) -> ParameterValue :
27
- """
28
- Guess the desired type of the parameter based on the string value.
29
-
30
- :param string_value: The string value to be converted to a ParameterValue.
31
- :return: The ParameterValue.
32
- """
33
- value = ParameterValue ()
34
- try :
35
- yaml_value = yaml .safe_load (string_value )
36
- except yaml .parser .ParserError :
37
- yaml_value = string_value
38
-
39
- if isinstance (yaml_value , bool ):
40
- value .type = ParameterType .PARAMETER_BOOL
41
- value .bool_value = yaml_value
42
- elif isinstance (yaml_value , int ):
43
- value .type = ParameterType .PARAMETER_INTEGER
44
- value .integer_value = yaml_value
45
- elif isinstance (yaml_value , float ):
46
- value .type = ParameterType .PARAMETER_DOUBLE
47
- value .double_value = yaml_value
48
- elif isinstance (yaml_value , list ):
49
- if all ((isinstance (v , bool ) for v in yaml_value )):
50
- value .type = ParameterType .PARAMETER_BOOL_ARRAY
51
- value .bool_array_value = yaml_value
52
- elif all ((isinstance (v , int ) for v in yaml_value )):
53
- value .type = ParameterType .PARAMETER_INTEGER_ARRAY
54
- value .integer_array_value = yaml_value
55
- elif all ((isinstance (v , float ) for v in yaml_value )):
56
- value .type = ParameterType .PARAMETER_DOUBLE_ARRAY
57
- value .double_array_value = yaml_value
58
- elif all ((isinstance (v , str ) for v in yaml_value )):
59
- value .type = ParameterType .PARAMETER_STRING_ARRAY
60
- value .string_array_value = yaml_value
61
- else :
62
- value .type = ParameterType .PARAMETER_STRING
63
- value .string_value = string_value
64
- else :
65
- value .type = ParameterType .PARAMETER_STRING
66
- value .string_value = yaml_value if yaml_value is not None else string_value
67
- return value
68
-
69
-
70
- def parameter_value_to_python (parameter_value : ParameterValue ):
71
- """
72
- Get the value for the Python builtin type from a rcl_interfaces/msg/ParameterValue object.
73
-
74
- Returns the value member of the message based on the ``type`` member.
75
- Returns ``None`` if the parameter is "NOT_SET".
76
-
77
- :param parameter_value: The message to get the value from.
78
- :raises RuntimeError: if the member ``type`` has an unexpected value.
79
- """
80
- if parameter_value .type == ParameterType .PARAMETER_BOOL :
81
- value = parameter_value .bool_value
82
- elif parameter_value .type == ParameterType .PARAMETER_INTEGER :
83
- value = parameter_value .integer_value
84
- elif parameter_value .type == ParameterType .PARAMETER_DOUBLE :
85
- value = parameter_value .double_value
86
- elif parameter_value .type == ParameterType .PARAMETER_STRING :
87
- value = parameter_value .string_value
88
- elif parameter_value .type == ParameterType .PARAMETER_BYTE_ARRAY :
89
- value = list (parameter_value .byte_array_value )
90
- elif parameter_value .type == ParameterType .PARAMETER_BOOL_ARRAY :
91
- value = list (parameter_value .bool_array_value )
92
- elif parameter_value .type == ParameterType .PARAMETER_INTEGER_ARRAY :
93
- value = list (parameter_value .integer_array_value )
94
- elif parameter_value .type == ParameterType .PARAMETER_DOUBLE_ARRAY :
95
- value = list (parameter_value .double_array_value )
96
- elif parameter_value .type == ParameterType .PARAMETER_STRING_ARRAY :
97
- value = list (parameter_value .string_array_value )
98
- elif parameter_value .type == ParameterType .PARAMETER_NOT_SET :
99
- value = None
100
- else :
101
- raise RuntimeError (f'unexpected parameter type { parameter_value .type } ' )
102
-
103
- return value
104
-
105
-
106
- def parameter_dict_from_yaml_file (
107
- parameter_file : str ,
108
- use_wildcard : bool = False ,
109
- target_nodes : Optional [list [str ]] = None ,
110
- namespace : str = ''
111
- ) -> dict [str , Parameter ]:
112
- """
113
- Build a dict of parameters from a YAML file.
114
-
115
- Will load all parameters if ``target_nodes`` is None or empty.
116
-
117
- :raises RuntimeError: if a target node is not in the file
118
- :raises RuntimeError: if the is not a valid ROS parameter file
119
-
120
- :param parameter_file: Path to the YAML file to load parameters from.
121
- :param use_wildcard: Use wildcard matching for the target nodes.
122
- :param target_nodes: list of nodes in the YAML file to load parameters from.
123
- :param namespace: Namespace to prepend to all parameters.
124
- :return: A dict of Parameter messages keyed by the parameter names
125
- """
126
- with open (parameter_file , 'r' ) as f :
127
- param_file = yaml .safe_load (f )
128
- param_keys = []
129
- param_dict = {}
130
-
131
- if use_wildcard and '/**' in param_file :
132
- param_keys .append ('/**' )
133
-
134
- if target_nodes :
135
- for n in target_nodes :
136
- if n not in param_file .keys ():
137
- raise RuntimeError (f'Param file does not contain parameters for { n } ,'
138
- f'only for nodes: { list (param_file .keys ())} ' )
139
- param_keys .append (n )
140
- else :
141
- # wildcard key must go to the front of param_keys so that
142
- # node-namespaced parameters will override the wildcard parameters
143
- keys = set (param_file .keys ())
144
- keys .discard ('/**' )
145
- param_keys .extend (keys )
146
-
147
- if len (param_keys ) == 0 :
148
- raise RuntimeError ('Param file does not contain selected parameters' )
149
-
150
- for n in param_keys :
151
- value = param_file [n ]
152
- if not isinstance (value , dict ) or 'ros__parameters' not in value :
153
- raise RuntimeError (f'YAML file is not a valid ROS parameter file for node { n } ' )
154
- param_dict .update (value ['ros__parameters' ])
155
- # Modification done here
156
- new_dictionary : dict [str , Parameter ] = {}
157
- dictionary = _unpack_parameter_dict (namespace , param_dict )
158
-
159
- for key , parameter_msg in dictionary .items ():
160
- new_dictionary [key ] = Parameter .from_parameter_msg (parameter_msg )
161
- return new_dictionary
162
-
163
-
164
- def _unpack_parameter_dict (namespace : str ,
165
- parameter_dict : dict [str , ParameterMsg ]
166
- ) -> dict [str , ParameterMsg ]:
167
- """
168
- Flatten a parameter dictionary recursively.
169
-
170
- :param namespace: The namespace to prepend to the parameter names.
171
- :param parameter_dict: A dictionary of parameters keyed by the parameter names
172
- :return: A dict of Parameter objects keyed by the parameter names
173
- """
174
- parameters : dict [str , ParameterMsg ] = {}
175
- for param_name , param_value in parameter_dict .items ():
176
- full_param_name = namespace + param_name
177
- # Unroll nested parameters
178
- if isinstance (param_value , dict ):
179
- parameters .update (_unpack_parameter_dict (
180
- namespace = full_param_name + PARAMETER_SEPARATOR_STRING ,
181
- parameter_dict = param_value ))
182
- else :
183
- parameter = ParameterMsg ()
184
- parameter .name = full_param_name
185
- parameter .value = get_parameter_value (str (param_value ))
186
- parameters [full_param_name ] = parameter
187
- return parameters
0 commit comments