Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ repos:

# Python hooks
- repo: https://github.com/asottile/pyupgrade
rev: v3.20.0
rev: v3.21.0
hooks:
- id: pyupgrade
args: [--py36-plus]
Expand Down Expand Up @@ -133,7 +133,7 @@ repos:
exclude: CHANGELOG\.rst|\.(svg|pyc|drawio)$

- repo: https://github.com/python-jsonschema/check-jsonschema
rev: 0.34.0
rev: 0.34.1
hooks:
- id: check-github-workflows
args: ["--verbose"]
Expand Down
64 changes: 63 additions & 1 deletion controller_manager/test/test_ros2_control_node_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,18 @@
from launch_testing.actions import ReadyToTest
import launch_testing.markers
import launch_ros.actions
from launch_ros.actions import Node

import rclpy
from controller_manager.test_utils import (
check_controllers_running,
check_node_running,
)
from controller_manager.launch_utils import generate_controllers_spawner_launch_description
from controller_manager.launch_utils import (
generate_controllers_spawner_launch_description,
generate_controllers_spawner_launch_description_from_dict,
generate_load_controller_launch_description,
)


# Executes the given launch file and checks if all nodes can be started
Expand Down Expand Up @@ -102,6 +107,63 @@ def setUp(self):
def tearDown(self):
self.node.destroy_node()

# ------------------------------------------------------------------
# Helper methods
# ------------------------------------------------------------------
def _extract_actions(self, result):
"""Return a list of launch actions, regardless of type."""
if isinstance(result, list):
return result
elif hasattr(result, "entities"): # LaunchDescription in newer ROS2
return list(result.entities)
else:
return [result]

def _assert_launch_result(self, result, min_len=0):
"""Verify that result is list- or LaunchDescription-like."""
self.assertTrue(
isinstance(result, (list, LaunchDescription)),
f"Unexpected result type: {type(result)}",
)
actions = self._extract_actions(result)
self.assertGreaterEqual(len(actions), min_len)
for act in actions:
self.assertTrue(
hasattr(act, "execute") or hasattr(act, "visit"),
f"Invalid action type: {type(act)}",
)
return actions

# ------------------------------------------------------------------
# Launch utility tests
# ------------------------------------------------------------------
def test_generate_controllers_spawner_from_list(self):
controllers = ["test_controller_1", "test_controller_2"]
result = generate_controllers_spawner_launch_description(controllers)
actions = self._assert_launch_result(result, min_len=1)
self.assertTrue(actions is not None and len(actions) > 0)

def test_generate_controllers_spawner_from_dict(self):
"""Ensure dict-based API works with string param files (old-style)."""
controllers = {
"ctrl_A": ["/tmp/dummy.yaml"],
"ctrl_B": ["/tmp/dummy.yaml"],
}
result = generate_controllers_spawner_launch_description_from_dict(controllers)
actions = self._extract_actions(result)
self.assertIsInstance(result, LaunchDescription)
self.assertEqual(len(actions), 3)
self.assertIsInstance(actions[-1], Node)

def test_generate_load_controller_launch_description(self):
"""Test load controller description with valid single string params."""
controllers = ["test_controller_load"]
result = generate_load_controller_launch_description(controllers)
actions = self._extract_actions(result)
self.assertIsInstance(result, LaunchDescription)
self.assertEqual(len(actions), 3)
self.assertIsInstance(actions[-1], Node)

def test_node_start(self):
check_node_running(self.node, "controller_manager")

Expand Down