From e7aa06e8bfa7b502dd3500174bc033ae7b2b260d Mon Sep 17 00:00:00 2001 From: Robert Kwan Date: Sun, 26 Oct 2025 22:04:37 -0700 Subject: [PATCH 1/5] Merge back from jazzy branch - add tests for launch_utils of controller manager (#2147) --- .../test/test_ros2_control_node_launch.py | 64 ++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/controller_manager/test/test_ros2_control_node_launch.py b/controller_manager/test/test_ros2_control_node_launch.py index 9c9f8892ed..cca3c7ad8d 100644 --- a/controller_manager/test/test_ros2_control_node_launch.py +++ b/controller_manager/test/test_ros2_control_node_launch.py @@ -43,8 +43,11 @@ check_controllers_running, check_node_running, ) -from controller_manager.launch_utils import generate_controllers_spawner_launch_description - +from controller_manager.launch_utils import ( + generate_controllers_spawner_launch_description, + generate_controllers_spawner_launch_description_from_dict, + generate_load_controller_launch_description, +) # Executes the given launch file and checks if all nodes can be started @pytest.mark.launch_test @@ -102,6 +105,63 @@ def setUp(self): def tearDown(self): self.node.destroy_node() + # ------------------------------------------------------------------ + # Helper methods + # ------------------------------------------------------------------ + def _extract_actions(self, result): + """Return a list of launch actions, regardless of type.""" + if isinstance(result, list): + return result + elif hasattr(result, "entities"): # LaunchDescription in newer ROS2 + return list(result.entities) + else: + return [result] + + def _assert_launch_result(self, result, min_len=0): + """Verify that result is list- or LaunchDescription-like.""" + self.assertTrue( + isinstance(result, (list, LaunchDescription)), + f"Unexpected result type: {type(result)}", + ) + actions = self._extract_actions(result) + self.assertGreaterEqual(len(actions), min_len) + for act in actions: + self.assertTrue( + hasattr(act, "execute") or hasattr(act, "visit"), + f"Invalid action type: {type(act)}", + ) + return actions + + # ------------------------------------------------------------------ + # Launch utility tests + # ------------------------------------------------------------------ + def test_generate_controllers_spawner_from_list(self): + controllers = ["test_controller_1", "test_controller_2"] + result = generate_controllers_spawner_launch_description(controllers) + actions = self._assert_launch_result(result, min_len=1) + self.assertTrue(actions is not None and len(actions) > 0) + + def test_generate_controllers_spawner_from_dict(self): + """Ensure dict-based API works with string param files (old-style).""" + controllers = { + "ctrl_A": ["/tmp/dummy.yaml"], + "ctrl_B": ["/tmp/dummy.yaml"], + } + result = generate_controllers_spawner_launch_description_from_dict(controllers) + actions = self._extract_actions(result) + self.assertIsInstance(result, LaunchDescription) + self.assertEqual(len(actions), 3) + self.assertIsInstance(actions[-1], Node) + + def test_generate_load_controller_launch_description(self): + """Test load controller description with valid single string params.""" + controllers = ["test_controller_load"] + result = generate_load_controller_launch_description(controllers) + actions = self._extract_actions(result) + self.assertIsInstance(result, LaunchDescription) + self.assertEqual(len(actions), 3) + self.assertIsInstance(actions[-1], Node) + def test_node_start(self): check_node_running(self.node, "controller_manager") From 8e7d6bc828095c3b8e24076400c3dec4ed535276 Mon Sep 17 00:00:00 2001 From: Robert Kwan Date: Mon, 27 Oct 2025 22:27:48 -0700 Subject: [PATCH 2/5] Fix missing line to Add tests for launch_utils of controller manager (#2147) --- controller_manager/test/test_ros2_control_node_launch.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/controller_manager/test/test_ros2_control_node_launch.py b/controller_manager/test/test_ros2_control_node_launch.py index cca3c7ad8d..46a4f813c0 100644 --- a/controller_manager/test/test_ros2_control_node_launch.py +++ b/controller_manager/test/test_ros2_control_node_launch.py @@ -37,6 +37,7 @@ from launch_testing.actions import ReadyToTest import launch_testing.markers import launch_ros.actions +from launch_ros.actions import Node import rclpy from controller_manager.test_utils import ( @@ -49,6 +50,7 @@ generate_load_controller_launch_description, ) + # Executes the given launch file and checks if all nodes can be started @pytest.mark.launch_test def generate_test_description(): @@ -133,7 +135,7 @@ def _assert_launch_result(self, result, min_len=0): return actions # ------------------------------------------------------------------ - # Launch utility tests + # Launch utility tests # ------------------------------------------------------------------ def test_generate_controllers_spawner_from_list(self): controllers = ["test_controller_1", "test_controller_2"] From 05e3d32fe24ddc87e9cfa1e6d6f74e48bd6a3cb0 Mon Sep 17 00:00:00 2001 From: Robert Kwan Date: Mon, 27 Oct 2025 22:27:48 -0700 Subject: [PATCH 3/5] Fix missing line to Add tests for launch_utils of controller manager (#2147) --- controller_manager/test/test_ros2_control_node_launch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/controller_manager/test/test_ros2_control_node_launch.py b/controller_manager/test/test_ros2_control_node_launch.py index 46a4f813c0..bcc1c04a33 100644 --- a/controller_manager/test/test_ros2_control_node_launch.py +++ b/controller_manager/test/test_ros2_control_node_launch.py @@ -51,6 +51,7 @@ ) + # Executes the given launch file and checks if all nodes can be started @pytest.mark.launch_test def generate_test_description(): From 52bcf35abd30989518296c03fb6c22b403ac4783 Mon Sep 17 00:00:00 2001 From: Robert Kwan Date: Sun, 9 Nov 2025 01:41:10 -0800 Subject: [PATCH 4/5] Add more tests for launch_utils of controller manager (#2147) --- controller_manager/CMakeLists.txt | 24 ++ controller_manager/test/test_controller3.yaml | 8 + .../test/test_controller_load.yaml | 8 + .../test/test_joint_state_broadcast.yaml | 7 + .../test_launch_utils_integration_dict.py | 240 ++++++++++++++++++ .../test_launch_utils_integration_list.py | 232 +++++++++++++++++ .../test_launch_utils_integration_load.py | 167 ++++++++++++ .../test/test_launch_utils_unit.py | 97 +++++++ .../test/test_ros2_control_node_combined.yaml | 22 ++ 9 files changed, 805 insertions(+) create mode 100644 controller_manager/test/test_controller3.yaml create mode 100644 controller_manager/test/test_controller_load.yaml create mode 100644 controller_manager/test/test_joint_state_broadcast.yaml create mode 100644 controller_manager/test/test_launch_utils_integration_dict.py create mode 100644 controller_manager/test/test_launch_utils_integration_list.py create mode 100644 controller_manager/test/test_launch_utils_integration_load.py create mode 100644 controller_manager/test/test_launch_utils_unit.py create mode 100644 controller_manager/test/test_ros2_control_node_combined.yaml diff --git a/controller_manager/CMakeLists.txt b/controller_manager/CMakeLists.txt index 1bd47cd923..71039885c6 100644 --- a/controller_manager/CMakeLists.txt +++ b/controller_manager/CMakeLists.txt @@ -244,10 +244,34 @@ if(BUILD_TESTING) ) find_package(ament_cmake_pytest REQUIRED) + find_package(launch_testing_ament_cmake REQUIRED) + find_package(rclpy REQUIRED) install(FILES test/test_ros2_control_node.yaml + test/test_controller_load.yaml + test/test_ros2_control_node_combined.yaml DESTINATION test) ament_add_pytest_test(test_ros2_control_node test/test_ros2_control_node_launch.py) ament_add_pytest_test(test_test_utils test/test_test_utils.py) + ament_add_pytest_test(test_launch_utils_unit test/test_launch_utils_unit.py + APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path} + PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_CURRENT_SOURCE_DIR}:${PYTHONPATH} + ) + # ---------------------------------------------------------------------------- + # Integration tests for individual launch_utils entry points + # ---------------------------------------------------------------------------- + ament_add_pytest_test(test_launch_utils_integration_list + test/test_launch_utils_integration_list.py + TIMEOUT 60 + ) + ament_add_pytest_test(test_launch_utils_integration_dict + test/test_launch_utils_integration_dict.py + TIMEOUT 60 + ) + ament_add_pytest_test(test_launch_utils_integration_load + test/test_launch_utils_integration_load.py + TIMEOUT 60 + ) + endif() install( diff --git a/controller_manager/test/test_controller3.yaml b/controller_manager/test/test_controller3.yaml new file mode 100644 index 0000000000..4cb904882f --- /dev/null +++ b/controller_manager/test/test_controller3.yaml @@ -0,0 +1,8 @@ +controller_manager: + ros__parameters: + update_rate: 100 # Hz + +controller3: + ros__parameters: + type: "controller_manager/test_controller" + joint_names: ["joint3"] diff --git a/controller_manager/test/test_controller_load.yaml b/controller_manager/test/test_controller_load.yaml new file mode 100644 index 0000000000..ae5745e888 --- /dev/null +++ b/controller_manager/test/test_controller_load.yaml @@ -0,0 +1,8 @@ +controller_manager: + ros__parameters: + update_rate: 100 # Hz + +test_controller_load: + ros__parameters: + type: "controller_manager/test_controller" + joint_names: ["joint1"] diff --git a/controller_manager/test/test_joint_state_broadcast.yaml b/controller_manager/test/test_joint_state_broadcast.yaml new file mode 100644 index 0000000000..9001187574 --- /dev/null +++ b/controller_manager/test/test_joint_state_broadcast.yaml @@ -0,0 +1,7 @@ +controller_manager: + ros__parameters: + update_rate: 100 # Hz + +joint_state_broadcaster: + ros__parameters: + type: "joint_state_broadcaster/JointStateBroadcaster" diff --git a/controller_manager/test/test_launch_utils_integration_dict.py b/controller_manager/test/test_launch_utils_integration_dict.py new file mode 100644 index 0000000000..d16391d675 --- /dev/null +++ b/controller_manager/test/test_launch_utils_integration_dict.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +# Copyright 2025 Robert Kwan +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import unittest +import os +import tempfile +import time + +from ament_index_python.packages import get_package_share_directory, get_package_prefix +from launch import LaunchDescription +import launch_testing +from launch_testing.actions import ReadyToTest +import launch_testing.markers +import launch_ros.actions + +import rclpy + +from controller_manager.launch_utils import ( + generate_controllers_spawner_launch_description_from_dict, +) + + +@pytest.mark.launch_test +def generate_test_description(): + """ + Generate launch description for testing. + + THIS VERSION CREATES ALL NEEDED FILES DYNAMICALLY AND USES THE COMBINED CONFIG. + """ + + # Create temporary directory for all test files + temp_dir = tempfile.mkdtemp() + print(f"Creating test files in: {temp_dir}") + + # Get URDF, without involving xacro + urdf = os.path.join( + get_package_share_directory("ros2_control_test_assets"), + "urdf", + "test_hardware_components.urdf", + ) + with open(urdf) as infp: + robot_description_content = infp.read() + robot_description = {"robot_description": robot_description_content} + + robot_controllers = os.path.join( + get_package_prefix("controller_manager"), "test", "test_ros2_control_node_combined.yaml" + ) + + # Verify both files exist + assert os.path.isfile(robot_controllers), f"Controller config not created: {robot_controllers}" + assert os.path.isfile(urdf), f"URDF not created: {urdf}" + + robot_state_pub_node = launch_ros.actions.Node( + package="robot_state_publisher", + executable="robot_state_publisher", + output="both", + parameters=[robot_description], + ) + + # ===== START CONTROLLER MANAGER (ros2_control_node) ===== + control_node = launch_ros.actions.Node( + package="controller_manager", + executable="ros2_control_node", + parameters=[robot_controllers], # Use the combined config file + output="both", + ) + + # The dictionary keys are the controller names to be spawned/started. + # Values can be empty lists since config is provided via the main YAML. + ctrl_dict = { + "joint_state_broadcaster": [robot_controllers], + "ctrl_with_parameters_and_type": [robot_controllers], + "controller3": [robot_controllers], + } + controller_list = list(ctrl_dict.keys()) + + # ===== GENERATE SPAWNER LAUNCH DESCRIPTION ===== + print(f"Spawning controllers: {controller_list}") + + # Correct function name and call + spawner_ld = generate_controllers_spawner_launch_description_from_dict( + controller_info_dict=ctrl_dict, + ) + + # ===== CREATE LAUNCH DESCRIPTION ===== + ld = LaunchDescription( + [robot_state_pub_node, control_node, ReadyToTest()] + spawner_ld.entities + ) + + # Return tuple with launch description and test context + return ld, { + "controller_list": controller_list, # Key name updated to match the test function + "robot_controllers": str(robot_controllers), + "urdf_file": urdf, + "temp_dir": temp_dir, + } + + +# Active tests +class TestControllerSpawnerList(unittest.TestCase): + """Active tests that run while the launch is active.""" + + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def test_spawner_nodes_launched(self, proc_info): + """Ensure processes are running.""" + process_names = proc_info.process_names() + self.assertGreater(len(process_names), 0) + print("\n[TEST] Active processes: {process_names}") + + def test_controllers_loaded(self, proc_info, controller_list): + """Test that controllers were loaded (poll until they appear).""" + node = rclpy.create_node("test_controller_query_node") + + try: + from controller_manager_msgs.srv import ListControllers + + client = node.create_client(ListControllers, "/controller_manager/list_controllers") + + print("\n[TEST] Waiting for controller_manager service...") + wait_for_svc_timeout = 30.0 + if not client.wait_for_service(timeout_sec=wait_for_svc_timeout): + process_names = proc_info.process_names() + self.fail( + f"Controller manager service not available after {wait_for_svc_timeout}s.\n" + f"Active processes: {process_names}" + ) + + # Poll for controllers to be registered + print("[TEST] Service available, polling for controllers (timeout 30s)...") + deadline = node.get_clock().now() + rclpy.duration.Duration(seconds=30.0) + seen = [] + while node.get_clock().now() < deadline: + req = ListControllers.Request() + fut = client.call_async(req) + rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0) + if fut.done() and fut.result() is not None: + response = fut.result() + seen = [c.name for c in response.controller] + if all(ctrl in seen for ctrl in controller_list): + print(f"[TEST] Loaded controllers: {seen}") + break + # small sleep to avoid tight-loop + time.sleep(0.2) + else: + # timeout expired + self.fail( + f"Timeout waiting for controllers to be loaded. " + f"Expected: {controller_list}, saw: {seen}" + ) + + # Final assert (defensive) + for controller in controller_list: + self.assertIn( + controller, + seen, + f"Controller '{controller}' was not loaded. Available: {seen}", + ) + + print(f"[TEST] ? All {len(controller_list)} controllers loaded successfully") + + finally: + node.destroy_node() + + def test_spawner_exit_code(self, proc_info): + """Test that spawner process ran (may have completed already).""" + process_names = proc_info.process_names() + print(f"\n[TEST] Checking for spawner in: {process_names}") + + # The spawner may have already completed successfully and exited + # So we just verify that we have processes running + self.assertGreater(len(process_names), 0) + print(f"[TEST] ? Launch has {len(process_names)} active processes") + + +@launch_testing.post_shutdown_test() +class TestProcessOutput(unittest.TestCase): + """Post-shutdown tests.""" + + def test_exit_codes(self, proc_info): + """Verify all processes exited successfully.""" + print("\n[POST-SHUTDOWN] Process exit codes:") + for process_name in proc_info.process_names(): + info = proc_info[process_name] + print(f" {process_name}: {info.returncode}") + + for process_name in proc_info.process_names(): + info = proc_info[process_name] + + if "ros2_control_node" in process_name: + self.assertEqual( + info.returncode, 0, f"{process_name} exited with {info.returncode}" + ) + elif "spawner" in process_name: + # Spawner should complete successfully (0) or be terminated + self.assertIn( + info.returncode, + [0, -2, -15], + f"Spawner {process_name} exited with {info.returncode}", + ) + else: + self.assertIn( + info.returncode, [0, -2, -15], f"{process_name} exited with {info.returncode}" + ) + + print("[POST-SHUTDOWN] ? All processes exited as expected") + + def test_cleanup_temp_files(self, temp_dir, robot_controllers, urdf_file): + """Clean up temporary test files.""" + import shutil + + print(f"\n[CLEANUP] Removing temporary directory: {temp_dir}") + + # The original clean-up logic was commented out, enabling it for safety + try: + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + print("[CLEANUP] ? Temporary files removed") + except Exception as e: + print(f"[CLEANUP] Warning: Cleanup failed: {e}") diff --git a/controller_manager/test/test_launch_utils_integration_list.py b/controller_manager/test/test_launch_utils_integration_list.py new file mode 100644 index 0000000000..ab49a17ef1 --- /dev/null +++ b/controller_manager/test/test_launch_utils_integration_list.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +# Copyright 2025 Robert Kwan +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import unittest +import os +import time +import tempfile + +from ament_index_python.packages import get_package_share_directory, get_package_prefix +from launch import LaunchDescription +import launch_testing +from launch_testing.actions import ReadyToTest +import launch_testing.markers +import launch_ros.actions + +import rclpy + +from controller_manager.launch_utils import ( + generate_controllers_spawner_launch_description, +) + + +@pytest.mark.launch_test +def generate_test_description(): + """ + Generate launch description for testing. + """ + + # Create temporary directory for all test files + temp_dir = tempfile.mkdtemp() + print(f"Creating test files in: {temp_dir}") + + # Get URDF, without involving xacro + urdf = os.path.join( + get_package_share_directory("ros2_control_test_assets"), + "urdf", + "test_hardware_components.urdf", + ) + with open(urdf) as infp: + robot_description_content = infp.read() + robot_description = {"robot_description": robot_description_content} + + robot_controllers = os.path.join( + get_package_prefix("controller_manager"), "test", "test_ros2_control_node_combined.yaml" + ) + + # Verify both files exist + assert os.path.isfile(robot_controllers), f"Controller config not created: {robot_controllers}" + assert os.path.isfile(urdf), f"URDF not created: {urdf}" + + robot_state_pub_node = launch_ros.actions.Node( + package="robot_state_publisher", + executable="robot_state_publisher", + output="both", + parameters=[robot_description], + ) + + # ===== START CONTROLLER MANAGER ===== + control_node = launch_ros.actions.Node( + package="controller_manager", + executable="ros2_control_node", + parameters=[robot_controllers], + output="both", + ) + + # ===== DEFINE CONTROLLERS TO SPAWN ===== + controller_list = ["joint_state_broadcaster", "ctrl_with_parameters_and_type", "controller3"] + + # ===== GENERATE SPAWNER ===== + print(f"Spawning controllers: {controller_list}") + print(f"Using config file: {robot_controllers}") + + spawner_ld = generate_controllers_spawner_launch_description( + controller_names=controller_list.copy(), + controller_params_files=[robot_controllers], + ) + + # ===== CREATE LAUNCH DESCRIPTION ===== + ld = LaunchDescription( + [robot_state_pub_node, control_node, ReadyToTest()] + spawner_ld.entities + ) + + # Return tuple with test context + return ld, { + "controller_list": controller_list, + "robot_controllers": robot_controllers, + "urdf_file": urdf, + "temp_dir": temp_dir, + } + + +# Active tests +class TestControllerSpawnerList(unittest.TestCase): + """Active tests that run while the launch is active.""" + + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def test_spawner_nodes_launched(self, proc_info): + """Ensure processes are running.""" + process_names = proc_info.process_names() + self.assertGreater(len(process_names), 0) + print(f"\n[TEST] Active processes: {process_names}") + + def test_controllers_loaded(self, proc_info, controller_list): + """Test that controllers were loaded (poll until they appear).""" + node = rclpy.create_node("test_controller_query_node") + + try: + from controller_manager_msgs.srv import ListControllers + + client = node.create_client(ListControllers, "/controller_manager/list_controllers") + + print("\n[TEST] Waiting for controller_manager service...") + wait_for_svc_timeout = 30.0 + if not client.wait_for_service(timeout_sec=wait_for_svc_timeout): + process_names = proc_info.process_names() + self.fail( + f"Controller manager service not available after {wait_for_svc_timeout}s.\n" + f"Active processes: {process_names}" + ) + + # Poll for controllers to be registered + print("[TEST] Service available, polling for controllers (timeout 30s)...") + deadline = node.get_clock().now() + rclpy.duration.Duration(seconds=30.0) + seen = [] + while node.get_clock().now() < deadline: + req = ListControllers.Request() + fut = client.call_async(req) + rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0) + if fut.done() and fut.result() is not None: + response = fut.result() + seen = [c.name for c in response.controller] + if all(ctrl in seen for ctrl in controller_list): + print(f"[TEST] Loaded controllers: {seen}") + break + # small sleep to avoid tight-loop + time.sleep(0.2) + else: + # timeout expired + self.fail( + f"Timeout waiting for controllers to be loaded. " + f"Expected: {controller_list}, saw: {seen}" + ) + + # Final assert (defensive) + for controller in controller_list: + self.assertIn( + controller, + seen, + f"Controller '{controller}' was not loaded. Available: {seen}", + ) + + print(f"[TEST] ? All {len(controller_list)} controllers loaded successfully") + + finally: + node.destroy_node() + + def test_spawner_exit_code(self, proc_info): + """Test that spawner process ran (may have completed already).""" + process_names = proc_info.process_names() + print(f"\n[TEST] Checking for spawner in: {process_names}") + + # The spawner may have already completed successfully and exited + # So we just verify that we have processes running + self.assertGreater(len(process_names), 0) + print(f"[TEST] ? Launch has {len(process_names)} active processes") + + +@launch_testing.post_shutdown_test() +class TestProcessOutput(unittest.TestCase): + """Post-shutdown tests.""" + + def test_exit_codes(self, proc_info): + """Verify all processes exited successfully.""" + print("\n[POST-SHUTDOWN] Process exit codes:") + for process_name in proc_info.process_names(): + info = proc_info[process_name] + print(f" {process_name}: {info.returncode}") + + for process_name in proc_info.process_names(): + info = proc_info[process_name] + + if "ros2_control_node" in process_name: + self.assertEqual( + info.returncode, 0, f"{process_name} exited with {info.returncode}" + ) + elif "spawner" in process_name: + # Spawner should complete successfully (0) or be terminated + self.assertIn( + info.returncode, + [0, -2, -15], + f"Spawner {process_name} exited with {info.returncode}", + ) + else: + self.assertIn( + info.returncode, [0, -2, -15], f"{process_name} exited with {info.returncode}" + ) + + print("[POST-SHUTDOWN] ? All processes exited as expected") + + def test_cleanup_temp_files(self, temp_dir, robot_controllers, urdf_file): + """Clean up temporary test files.""" + import shutil + + print(f"\n[CLEANUP] Removing temporary directory: {temp_dir}") + + try: + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + print("[CLEANUP] ? Temporary files removed") + except Exception as e: + print(f"[CLEANUP] Warning: Cleanup failed: {e}") diff --git a/controller_manager/test/test_launch_utils_integration_load.py b/controller_manager/test/test_launch_utils_integration_load.py new file mode 100644 index 0000000000..418342e1e6 --- /dev/null +++ b/controller_manager/test/test_launch_utils_integration_load.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# Copyright 2025 Robert Kwan +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.import os + +import pytest +import unittest +import os +import tempfile + +from ament_index_python.packages import get_package_share_directory, get_package_prefix +from launch import LaunchDescription +import launch_testing +from launch_testing.actions import ReadyToTest +import launch_testing.asserts +import launch_testing.markers +import launch_ros.actions + +import rclpy + +from controller_manager.launch_utils import generate_load_controller_launch_description + + +def get_loaded_controllers(node, timeout=30.0): + """Query /controller_manager/list_controllers until controllers are available.""" + from controller_manager_msgs.srv import ListControllers + import time + + client = node.create_client(ListControllers, "/controller_manager/list_controllers") + + if not client.wait_for_service(timeout_sec=timeout): + raise RuntimeError("Controller manager service not available") + + seen = [] + start_time = time.time() + while time.time() - start_time < timeout: + req = ListControllers.Request() + fut = client.call_async(req) + rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0) + + if fut.done() and fut.result() is not None: + response = fut.result() + seen = [c.name for c in response.controller] + if seen: + break + + time.sleep(0.2) + + return seen + + +@pytest.mark.launch_test +def generate_test_description(): + + # Create temporary directory for all test files + temp_dir = tempfile.mkdtemp() + print(f"Creating test files in: {temp_dir}") + + # Get URDF, without involving xacro + urdf = os.path.join( + get_package_share_directory("ros2_control_test_assets"), + "urdf", + "test_hardware_components.urdf", + ) + with open(urdf) as infp: + robot_description_content = infp.read() + robot_description = {"robot_description": robot_description_content} + + robot_controllers = os.path.join( + get_package_prefix("controller_manager"), "test", "test_controller_load.yaml" + ) + + # Verify both files exist + assert os.path.isfile(robot_controllers), f"Controller config not created: {robot_controllers}" + + robot_state_pub_node = launch_ros.actions.Node( + package="robot_state_publisher", + executable="robot_state_publisher", + output="both", + parameters=[robot_description], + ) + + # ===== START CONTROLLER MANAGER ===== + control_node = launch_ros.actions.Node( + package="controller_manager", + executable="ros2_control_node", + parameters=[robot_controllers], + output="both", + ) + + print(f"Using config file: {robot_controllers}") + + spawner_action = generate_load_controller_launch_description( + controller_name="test_controller_load", + controller_params_file=[robot_controllers], + ) + + ld = LaunchDescription( + [ + robot_state_pub_node, + control_node, + *spawner_action.entities, # Unpack the entities from the returned LaunchDescription + ReadyToTest(), + ] + ) + + return ld, { + "temp_dir": temp_dir, + "controller_name": "test_controller_load", + "urdf_file": urdf, + } + + +class TestControllerLoad(unittest.TestCase): + def test_controller_loaded(self, launch_service, proc_output, controller_name): + # Create a temporary ROS 2 node for calling the service + rclpy.init() + test_node = rclpy.create_node("test_controller_client") + + # Poll the ListControllers service to ensure the target controller is present + try: + loaded_controllers = get_loaded_controllers(test_node, timeout=30.0) + + # CRITICAL ASSERTION: The test passes only if the controller name is in the list + assert ( + controller_name in loaded_controllers + ), f"Controller '{controller_name}' not found. Loaded: {loaded_controllers}" + + finally: + test_node.destroy_node() + rclpy.shutdown() + + def test_spawner_exit_code(self, proc_info): + """Test that spawner process ran (may have completed already).""" + process_names = proc_info.process_names() + print(f"\n[TEST] Checking for spawner in: {process_names}") + + # The spawner may have already completed successfully and exited + # So we just verify that we have processes running + self.assertGreater(len(process_names), 0) + print(f"[TEST] ? Launch has {len(process_names)} active processes") + + +@launch_testing.post_shutdown_test() +class TestCleanup: + def test_ros_nodes_exit_cleanly(self, proc_info): + # The control_node should exit cleanly after the whole launch file finishes + launch_testing.asserts.assertExitCodes(proc_info) + + # Ensures the temporary directory is removed after the test is done + def test_cleanup_temp_files(self, temp_dir): + import shutil + + try: + shutil.rmtree(temp_dir) + except Exception as e: + print(f"Cleanup failed for directory {temp_dir}: {e}") diff --git a/controller_manager/test/test_launch_utils_unit.py b/controller_manager/test/test_launch_utils_unit.py new file mode 100644 index 0000000000..9661a21a4d --- /dev/null +++ b/controller_manager/test/test_launch_utils_unit.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# Copyright 2025 Robert Kwan +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for launch_utils.py functions.""" + +import unittest +from launch import LaunchDescription +from launch_ros.actions import Node + +from controller_manager.launch_utils import ( + generate_controllers_spawner_launch_description, + generate_controllers_spawner_launch_description_from_dict, + generate_load_controller_launch_description, +) + + +class TestLaunchUtils(unittest.TestCase): + """ + Test suite for launch_utils.py functions. + + Tests three entry points: + 1. generate_controllers_spawner_launch_description (from list) + 2. generate_controllers_spawner_launch_description_from_dict (from dict) + 3. generate_load_controller_launch_description (load controllers) + """ + + # ------------------------------------------------------------------ + # Helper methods + # ------------------------------------------------------------------ + def _extract_actions(self, result): + """Return a list of launch actions, regardless of type.""" + if isinstance(result, list): + return result + elif hasattr(result, "entities"): # LaunchDescription in newer ROS2 + return list(result.entities) + else: + return [result] + + def _assert_launch_result(self, result, min_len=0): + """Verify that result is list- or LaunchDescription-like.""" + self.assertTrue( + isinstance(result, (list, LaunchDescription)), + f"Unexpected result type: {type(result)}", + ) + actions = self._extract_actions(result) + self.assertGreaterEqual(len(actions), min_len) + for act in actions: + self.assertTrue( + hasattr(act, "execute") or hasattr(act, "visit"), + f"Invalid action type: {type(act)}", + ) + return actions + + # ------------------------------------------------------------------ + # Launch utility tests + # ------------------------------------------------------------------ + def test_generate_controllers_spawner_from_list(self): + controllers = ["test_controller_1", "test_controller_2"] + result = generate_controllers_spawner_launch_description(controllers) + actions = self._assert_launch_result(result, min_len=1) + self.assertTrue(actions is not None and len(actions) > 0) + + def test_generate_controllers_spawner_from_dict(self): + """Ensure dict-based API works with string param files (old-style).""" + controllers = { + "ctrl_A": ["/tmp/dummy.yaml"], + "ctrl_B": ["/tmp/dummy.yaml"], + } + result = generate_controllers_spawner_launch_description_from_dict(controllers) + actions = self._extract_actions(result) + self.assertIsInstance(result, LaunchDescription) + self.assertEqual(len(actions), 3) + self.assertIsInstance(actions[-1], Node) + + def test_generate_load_controller_launch_description(self): + """Test load controller description with valid single string params.""" + controllers = ["test_controller_load"] + result = generate_load_controller_launch_description(controllers) + actions = self._extract_actions(result) + self.assertIsInstance(result, LaunchDescription) + self.assertEqual(len(actions), 3) + self.assertIsInstance(actions[-1], Node) + + +if __name__ == "__main__": + unittest.main() diff --git a/controller_manager/test/test_ros2_control_node_combined.yaml b/controller_manager/test/test_ros2_control_node_combined.yaml new file mode 100644 index 0000000000..a3b7545999 --- /dev/null +++ b/controller_manager/test/test_ros2_control_node_combined.yaml @@ -0,0 +1,22 @@ +controller_manager: + ros__parameters: + update_rate: 100 # Hz + +joint_state_broadcaster: + ros__parameters: + type: "joint_state_broadcaster/JointStateBroadcaster" + +forward_position_controller: + ros__parameters: + type: position_controllers/JointGroupPositionController + joints: ["joint1"] + +ctrl_with_parameters_and_type: + ros__parameters: + type: "controller_manager/test_controller" + joint_names: ["joint1"] + +controller3: + ros__parameters: + type: "controller_manager/test_controller" + joint_names: ["joint3"] From d161825c9a17e5338eb61269ec2b4390f5dd13bb Mon Sep 17 00:00:00 2001 From: Robert Kwan Date: Sun, 9 Nov 2025 11:47:19 -0800 Subject: [PATCH 5/5] Fixed pre-commit error in master branch for #2147 --- controller_manager/test/test_ros2_control_node_launch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/controller_manager/test/test_ros2_control_node_launch.py b/controller_manager/test/test_ros2_control_node_launch.py index bcc1c04a33..46a4f813c0 100644 --- a/controller_manager/test/test_ros2_control_node_launch.py +++ b/controller_manager/test/test_ros2_control_node_launch.py @@ -51,7 +51,6 @@ ) - # Executes the given launch file and checks if all nodes can be started @pytest.mark.launch_test def generate_test_description():