Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions launch_testing/launch_testing_examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ launch_test launch_testing_examples/check_node_launch_test.py
```

There might be situations where nodes, once launched, take some time to actually start and we need to wait for the node to start to perform some action.
We can simulate this using ``launch.actions.TimerAction``. This example shows one way to detect when a node has been launched.
We delay the launch by 5 seconds, and wait for the node to start with a timeout of 8 seconds.
We can simulate this using ``launch.actions.TimerAction``.
This example shows one way to detect when a node has been launched.
We delay the launch by 5 seconds, and wait for the node to start with a timeout of 20 seconds.

### `check_multiple_nodes_launch_test.py`

Expand Down Expand Up @@ -64,7 +65,7 @@ Usage:
launch_test launch_testing_examples/hello_world_launch_test.py
```

This test is a simple example on how to use the ``launch_testing``.
This test is a simple example on how to use the ``launch_testing``.

It launches a process and asserts that it prints "hello_world" to ``stdout`` using ``proc_output.assertWaitFor()``.
Finally, it checks if the process exits normally (zero exit code).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ def generate_test_description():
class TestFixture(unittest.TestCase):

def test_check_if_msgs_published(self):
with WaitForTopics([('chatter', String)], timeout=5.0):
with WaitForTopics([('chatter', String)], timeout=15.0):
print('Topic received messages !')
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,18 @@ def generate_test_description():

class TestFixture(unittest.TestCase):

def test_node_start(self, proc_output):
def setUp(self):
rclpy.init()
node = Node('test_node')
assert wait_for_node(node, 'demo_node_1', 8.0), 'Node not found !'
rclpy.shutdown()

self.node = Node('test_node')

def wait_for_node(dummy_node, node_name, timeout=8.0):
start = time.time()
flag = False
print('Waiting for node...')
while time.time() - start < timeout and not flag:
flag = node_name in dummy_node.get_node_names()
time.sleep(0.1)
def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()

return flag
def test_node_start(self, proc_output):
start = time.time()
found = False
while time.time() - start < 20.0 and not found:
found = 'demo_node_1' in self.node.get_node_names()
time.sleep(0.1)
assert found, 'Node not found !'
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,26 @@ def generate_test_description():
if os.name != 'nt':
class TestFixture(unittest.TestCase):

def test_set_parameter(self, proc_output):
def setUp(self):
rclpy.init()
node = Node('test_node')
response = set_parameter(node, value=True)
assert response.successful, 'Could not set parameter!'
rclpy.shutdown()
self.node = Node('test_node')

def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()

def set_parameter(dummy_node, value=True, timeout=5.0):
parameters = [rclpy.Parameter('demo_parameter_1', value=value).to_parameter_msg()]
def test_set_parameter(self, proc_output):
parameters = [rclpy.Parameter('demo_parameter_1', value=True).to_parameter_msg()]

client = dummy_node.create_client(SetParameters, 'demo_node_1/set_parameters')
ready = client.wait_for_service(timeout_sec=timeout)
if not ready:
raise RuntimeError('Wait for service timed out')
client = self.node.create_client(SetParameters, 'demo_node_1/set_parameters')
ready = client.wait_for_service(timeout_sec=15.0)
if not ready:
raise RuntimeError('Wait for service timed out')

request = SetParameters.Request()
request.parameters = parameters
future = client.call_async(request)
rclpy.spin_until_future_complete(dummy_node, future, timeout_sec=timeout)
request = SetParameters.Request()
request.parameters = parameters
future = client.call_async(request)
rclpy.spin_until_future_complete(self.node, future, timeout_sec=15.0)

response = future.result()
return response.results[0]
response = future.result()
assert response.results[0].successful, 'Could not set parameter!'
1 change: 0 additions & 1 deletion launch_testing/launch_testing_examples/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
<exec_depend>rclpy</exec_depend>
<exec_depend>rcl_interfaces</exec_depend>
<exec_depend>ros2bag</exec_depend>
<exec_depend>rosbag2_transport</exec_depend>
<exec_depend>std_msgs</exec_depend>

<test_depend>ament_copyright</test_depend>
Expand Down