Skip to content

Commit 5547f87

Browse files
committed
add test_verb_set
Signed-off-by: Brian Chen <[email protected]>
1 parent aeb03ed commit 5547f87

File tree

1 file changed

+226
-0
lines changed

1 file changed

+226
-0
lines changed

ros2param/test/test_verb_set.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# Copyright 2022 Open Source Robotics Foundation, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import contextlib
16+
import os
17+
import sys
18+
import time
19+
import unittest
20+
import xmlrpc
21+
22+
from launch import LaunchDescription
23+
from launch.actions import ExecuteProcess
24+
from launch_ros.actions import Node
25+
import launch_testing
26+
import launch_testing.actions
27+
import launch_testing.asserts
28+
import launch_testing.markers
29+
import launch_testing.tools
30+
import launch_testing_ros.tools
31+
32+
import pytest
33+
34+
import rclpy
35+
from rclpy.utilities import get_available_rmw_implementations
36+
37+
from ros2cli.node.strategy import NodeStrategy
38+
39+
40+
TEST_NODE = 'test_node'
41+
TEST_NAMESPACE = '/foo'
42+
TEST_TIMEOUT = 20.0
43+
44+
# Skip cli tests on Windows while they exhibit pathological behavior
45+
# https://github.com/ros2/build_farmer/issues/248
46+
if sys.platform.startswith('win'):
47+
pytest.skip(
48+
'CLI tests can block for a pathological amount of time on Windows.',
49+
allow_module_level=True)
50+
51+
52+
@pytest.mark.rostest
53+
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
54+
def generate_test_description(rmw_implementation):
55+
path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures')
56+
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
57+
58+
# Parameter node test fixture
59+
path_to_parameter_node_script = os.path.join(path_to_fixtures, 'parameter_node.py')
60+
parameter_node = Node(
61+
executable=sys.executable,
62+
name=TEST_NODE,
63+
namespace=TEST_NAMESPACE,
64+
arguments=[path_to_parameter_node_script],
65+
additional_env=additional_env
66+
)
67+
68+
return LaunchDescription([
69+
# TODO(jacobperron): Provide a common RestartCliDaemon launch action in ros2cli
70+
ExecuteProcess(
71+
cmd=['ros2', 'daemon', 'stop'],
72+
name='daemon-stop',
73+
on_exit=[
74+
ExecuteProcess(
75+
cmd=['ros2', 'daemon', 'start'],
76+
name='daemon-start',
77+
on_exit=[
78+
parameter_node,
79+
launch_testing.actions.ReadyToTest(),
80+
],
81+
additional_env=additional_env
82+
)
83+
]
84+
),
85+
])
86+
87+
88+
class TestVerbSet(unittest.TestCase):
89+
90+
@classmethod
91+
def setUpClass(
92+
cls,
93+
launch_service,
94+
proc_info,
95+
proc_output,
96+
rmw_implementation
97+
):
98+
rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter(
99+
filtered_rmw_implementation=rmw_implementation
100+
)
101+
102+
@contextlib.contextmanager
103+
def launch_param_set_command(self, arguments):
104+
param_set_command_action = ExecuteProcess(
105+
cmd=['ros2', 'param', 'set', *arguments],
106+
additional_env={
107+
'RMW_IMPLEMENTATION': rmw_implementation,
108+
},
109+
name='ros2param-set-cli',
110+
output='screen'
111+
)
112+
with launch_testing.tools.launch_process(
113+
launch_service, param_set_command_action, proc_info, proc_output,
114+
output_filter=rmw_implementation_filter
115+
) as param_set_command:
116+
yield param_set_command
117+
cls.launch_param_set_command = launch_param_set_command
118+
119+
@contextlib.contextmanager
120+
def launch_param_dump_command(self, arguments):
121+
param_dump_command_action = ExecuteProcess(
122+
cmd=['ros2', 'param', 'dump', *arguments],
123+
additional_env={
124+
'RMW_IMPLEMENTATION': rmw_implementation,
125+
},
126+
name='ros2param-dump-cli',
127+
output='screen'
128+
)
129+
with launch_testing.tools.launch_process(
130+
launch_service, param_dump_command_action, proc_info, proc_output,
131+
output_filter=rmw_implementation_filter
132+
) as param_dump_command:
133+
yield param_dump_command
134+
cls.launch_param_dump_command = launch_param_dump_command
135+
136+
def setUp(self):
137+
# Ensure the daemon node is running and discovers the test node
138+
start_time = time.time()
139+
timed_out = True
140+
with NodeStrategy(None) as node:
141+
while (time.time() - start_time) < TEST_TIMEOUT:
142+
# TODO(jacobperron): Create a generic 'CliNodeError' so we can treat errors
143+
# from DirectNode and DaemonNode the same
144+
try:
145+
services = node.get_service_names_and_types_by_node(TEST_NODE, TEST_NAMESPACE)
146+
except rclpy.node.NodeNameNonExistentError:
147+
continue
148+
except xmlrpc.client.Fault as e:
149+
if 'NodeNameNonExistentError' in e.faultString:
150+
continue
151+
raise
152+
153+
service_names = [name_type_tuple[0] for name_type_tuple in services]
154+
if (
155+
len(service_names) > 0
156+
and f'{TEST_NAMESPACE}/{TEST_NODE}/get_parameters' in service_names
157+
):
158+
timed_out = False
159+
break
160+
if timed_out:
161+
self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds')
162+
163+
def test_verb_set_single(self):
164+
with self.launch_param_set_command(
165+
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', 'int_param', '1']
166+
) as param_load_command:
167+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
168+
assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK
169+
assert launch_testing.tools.expect_output(
170+
expected_lines=['Set parameter int_param successful'],
171+
text=param_load_command.output,
172+
strict=True)
173+
174+
def test_verb_set_multiple(self):
175+
with self.launch_param_set_command(
176+
arguments=[
177+
f'{TEST_NAMESPACE}/{TEST_NODE}',
178+
'int_param', '1',
179+
'str_param', 'hello world',
180+
'bool_param', 'True',
181+
]
182+
) as param_load_command:
183+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
184+
assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK
185+
assert launch_testing.tools.expect_output(
186+
expected_lines=[
187+
'Set parameter int_param successful',
188+
'Set parameter str_param successful',
189+
'Set parameter bool_param successful'],
190+
text=param_load_command.output,
191+
strict=True)
192+
193+
def test_verb_set_odd_params(self):
194+
with self.launch_param_set_command(
195+
arguments=[
196+
f'{TEST_NAMESPACE}/{TEST_NODE}',
197+
'int_param', '1',
198+
'str_param', 'hello world',
199+
'bool_param',
200+
]
201+
) as param_load_command:
202+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
203+
assert param_load_command.exit_code == 2
204+
assert launch_testing.tools.expect_output(
205+
expected_lines=['ros2 param set: error: Must provide parameter name and value pairs'],
206+
text=param_load_command.output,
207+
strict=False
208+
)
209+
210+
def test_verb_set_invalid_node(self):
211+
with self.launch_param_set_command(
212+
arguments=[
213+
f'{TEST_NAMESPACE}/{TEST_NODE}_invalid',
214+
'int_param', '123',
215+
]
216+
) as param_load_command:
217+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
218+
assert param_load_command.exit_code == 1
219+
assert launch_testing.tools.expect_output(
220+
expected_lines=['Node not found'],
221+
text=param_load_command.output,
222+
strict=True)
223+
224+
225+
if __name__ == '__main__':
226+
unittest.main()

0 commit comments

Comments
 (0)