From 047e26164c04056efac63854e73722b251f68bee Mon Sep 17 00:00:00 2001 From: Shravan Deva Date: Sat, 6 Sep 2025 15:40:30 +0530 Subject: [PATCH 1/3] Add tests for trace action with dual_sessions enabled Signed-off-by: Shravan Deva --- .../test_trace_action.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py b/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py index c95589b4..84b3482c 100644 --- a/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py +++ b/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py @@ -95,6 +95,7 @@ def _check_trace_action( *, session_name: Optional[str] = 'my-session-name', snapshot_mode: bool = False, + dual_session: bool = False, append_trace: bool = False, events_ust: List[str] = ['ros2:*', '*'], subbuffer_size_ust: int = 524288, @@ -111,6 +112,10 @@ def _check_trace_action( snapshot_mode, perform_typed_substitution(context, action.snapshot_mode, bool) ) + self.assertEqual( + dual_session, + perform_typed_substitution(context, action.dual_session, bool) + ) self.assertEqual( append_trace, perform_typed_substitution(context, action.append_trace, bool) @@ -169,6 +174,27 @@ def test_action_snapshot_mode(self) -> None: shutil.rmtree(tmpdir) + def test_action_dual_session(self) -> None: + tmpdir = tempfile.mkdtemp(prefix='TestTraceAction__test_action_dual_session') + + action = Trace( + session_name='my-session-name', + dual_session=True, + base_path=tmpdir, + events_kernel=[], + syscalls=[], + events_ust=[ + 'ros2:*', + '*', + ], + subbuffer_size_ust=524288, + subbuffer_size_kernel=1048576, + ) + context = self._assert_launch_no_errors([action]) + self._check_trace_action(action, context, tmpdir, dual_session=True) + + shutil.rmtree(tmpdir) + def test_action_frontend_xml(self) -> None: tmpdir = tempfile.mkdtemp(prefix='TestTraceAction__test_frontend_xml') @@ -178,6 +204,7 @@ def test_action_frontend_xml(self) -> None: None: - trace: session-name: my-session-name snapshot-mode: false + dual-session: false append-timestamp: false base-path: {} append-trace: true @@ -300,6 +328,11 @@ def test_action_substitutions(self) -> None: default_value='False', description='whether to take a snapshot of the session', ) + dual_session_arg = DeclareLaunchArgument( + 'dual-session', + default_value='False', + description='whether to pre-configure a dual session', + ) append_timestamp_arg = DeclareLaunchArgument( 'append-timestamp', default_value='False', @@ -323,6 +356,7 @@ def test_action_substitutions(self) -> None: action = Trace( session_name=LaunchConfiguration(session_name_arg.name), snapshot_mode=LaunchConfiguration(snapshot_mode_arg.name), + dual_session=LaunchConfiguration(dual_session_arg.name), append_timestamp=LaunchConfiguration(append_timestamp_arg.name), base_path=TextSubstitution(text=tmpdir), append_trace=LaunchConfiguration(append_trace_arg.name), @@ -345,6 +379,7 @@ def test_action_substitutions(self) -> None: context = self._assert_launch_no_errors([ session_name_arg, snapshot_mode_arg, + dual_session_arg, append_timestamp_arg, append_trace_arg, subbuffer_size_ust_arg, @@ -378,6 +413,7 @@ def test_action_substitutions_frontend_xml(self) -> None: + @@ -421,6 +457,9 @@ def test_action_substitutions_frontend_yaml(self) -> None: - arg: name: snapshot-mode default: "false" + - arg: + name: dual-session + default: "false" - arg: name: append-timestamp default: "false" From a5dd4e48c3d79f43cd14fcbb5d88803ee59c0b01 Mon Sep 17 00:00:00 2001 From: Shravan Deva Date: Sat, 6 Sep 2025 20:46:26 +0530 Subject: [PATCH 2/3] Add tests for ros2trace with --dual-session option Signed-off-by: Shravan Deva --- .../test/test_ros2trace/test_trace.py | 143 +++++++++++++++++- 1 file changed, 142 insertions(+), 1 deletion(-) diff --git a/test_ros2trace/test/test_ros2trace/test_trace.py b/test_ros2trace/test/test_ros2trace/test_trace.py index 514a174d..74984100 100644 --- a/test_ros2trace/test/test_ros2trace/test_trace.py +++ b/test_ros2trace/test/test_ros2trace/test_trace.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio from collections.abc import Mapping import os import shutil @@ -28,6 +29,8 @@ from launch import LaunchService from launch_ros.actions import Node from lttngpy import impl as lttngpy +import osrf_pycommon.process_utils +from tracetools_launch.action import Trace from tracetools_read import get_event_name from tracetools_test.mark_process import get_corresponding_trace_test_events from tracetools_test.mark_process import get_trace_test_id @@ -36,6 +39,7 @@ from tracetools_trace.tools import tracepoints from tracetools_trace.tools.lttng import is_lttng_installed from tracetools_trace.tools.names import DEFAULT_EVENTS_ROS +from tracetools_trace.tools.names import DEFAULT_INIT_EVENTS_ROS def are_tracepoints_included() -> bool: @@ -295,6 +299,52 @@ def run_nodes(self, env: Optional[Mapping[str, str]] = None) -> None: exit_code = ls.run() self.assertEqual(0, exit_code) + def pre_configure_dual_session_and_run_nodes( + self, + base_path: str, + session_name: str, + env: Optional[Mapping[str, str]] = None, + ) -> Tuple[LaunchService, asyncio.Task]: + # Set trace test ID env var for spawned processes + additional_env: Dict[str, str] = {} + if env is not None: + additional_env.update(env) + assert self.trace_test_id + additional_env[TRACE_TEST_ID_ENV_VAR] = self.trace_test_id + actions = [ + Trace( + session_name=session_name, + dual_session=True, + base_path=base_path, + events_ust=DEFAULT_INIT_EVENTS_ROS + [TRACE_TEST_ID_TP_NAME], + ), + Node( + package='test_tracetools', + executable='test_ping', + output='screen', + arguments=['do_more'], # run indefinitely + additional_env=additional_env, + ), + Node( + package='test_tracetools', + executable='test_pong', + output='screen', + arguments=['do_more'], # run indefinitely + additional_env=additional_env, + ), + ] + ld = LaunchDescription(actions) + ls = LaunchService() + ls.include_launch_description(ld) + + loop = osrf_pycommon.process_utils.get_loop() + launch_task = loop.create_task(ls.run_async()) + + # Give the launch service time to start actions + loop.run_until_complete(asyncio.sleep(1.0)) + + return ls, launch_task + def test_default(self) -> None: tmpdir = self.create_test_tmpdir('test_default') @@ -671,7 +721,7 @@ def test_snapshot_start_pause_resume_stop(self) -> None: ret = self.run_trace_subcommand( [ 'start', session_name, - '--ust', tracepoints.rcl_subscription_init, TRACE_TEST_ID_TP_NAME, + '--ust', 'ros2:*', TRACE_TEST_ID_TP_NAME, '--path', tmpdir, '--snapshot-mode', ] @@ -736,6 +786,97 @@ def test_snapshot_start_pause_resume_stop(self) -> None: shutil.rmtree(tmpdir) + @unittest.skipIf(not are_tracepoints_included(), 'tracepoints are required') + def test_dual_session_start_pause_resume_stop(self) -> None: + tmpdir = self.create_test_tmpdir('test_dual_session_start_pause_resume_stop') + session_name = 'test_dual_session_start_pause_resume_stop' + + # Pre-configure dual session, start nodes and check that the snapshot session exists + ls, launch_task = self.pre_configure_dual_session_and_run_nodes(tmpdir, session_name) + self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + + # Start tracing + ret = self.run_trace_subcommand( + [ + 'start', session_name, + '--ust', 'ros2:*', + '--path', tmpdir, + '--dual-session', + ] + ) + self.assertEqual(0, ret) + trace_dir = os.path.join(tmpdir, session_name) + snapshot_trace_dir = os.path.join(tmpdir, session_name, 'snapshot') + runtime_trace_dir = os.path.join(tmpdir, session_name, 'runtime') + self.assertTraceExist(snapshot_trace_dir) + self.assertTraceExist(runtime_trace_dir) + self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + self.assertTracingSessionExist(session_name+'-runtime') + + # Pause tracing and check trace + ret = self.run_trace_subcommand(['pause', session_name, '--dual-session']) + self.assertEqual(0, ret) + self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + self.assertTracingSessionExist(session_name+'-runtime') + expected_trace_data = [ + ('topic_name', '/ping'), + ('topic_name', '/pong'), + ] + num_events = self.assertTraceContains(trace_dir, expected_field_value=expected_trace_data) + + # Pausing again should give an error but not affect anything + ret = self.run_trace_subcommand(['pause', session_name, '--dual-session']) + self.assertEqual(1, ret) + self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + self.assertTracingSessionExist(session_name+'-runtime') + new_num_events = self.assertTraceContains( + trace_dir=tmpdir, + expected_field_value=expected_trace_data, + ) + self.assertEqual(num_events, new_num_events, 'unexpected new events in trace') + + # Resume tracing + ret = self.run_trace_subcommand(['resume', session_name, '--dual-session']) + self.assertEqual(0, ret) + self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + self.assertTracingSessionExist(session_name+'-runtime') + + # Resuming tracing again should give an error but not affect anything + ret = self.run_trace_subcommand(['resume', session_name, '--dual-session']) + self.assertEqual(1, ret) + self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + self.assertTracingSessionExist(session_name+'-runtime') + + # Stop tracing and check that trace changed + ret = self.run_trace_subcommand(['stop', session_name, '--dual-session']) + self.assertEqual(0, ret) + # Snapshot session should still exist + self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + self.assertTracingSessionNotExist(session_name+'-runtime') + new_num_events = self.assertTraceContains( + trace_dir, + expected_field_value=expected_trace_data, + ) + self.assertGreater(new_num_events, num_events, 'no new events in trace') + + # Stopping tracing again should give an error but not affect anything + ret = self.run_trace_subcommand(['stop', session_name, '--dual-session']) + self.assertEqual(1, ret) + self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + self.assertTracingSessionNotExist(session_name+'-runtime') + + # Shutdown launch service + loop = osrf_pycommon.process_utils.get_loop() + if not launch_task.done(): + loop.create_task(ls.shutdown()) + loop.run_until_complete(launch_task) + assert 0 == launch_task.result() + + # Shutting down the launch service should destroy the snapshot session + self.assertTracingSessionNotExist(session_name+'-snapshot', snapshot_mode=True) + + shutil.rmtree(tmpdir) + @unittest.skipIf(not are_tracepoints_included(), 'tracepoints are required') def test_runtime_disable(self) -> None: tmpdir = self.create_test_tmpdir('test_runtime_disable') From ef374ac65e6f15b5c9586734106c03e08f9b6f4e Mon Sep 17 00:00:00 2001 From: Shravan Deva Date: Thu, 18 Sep 2025 21:05:54 +0530 Subject: [PATCH 3/3] Remove harcoded sleep Signed-off-by: Shravan Deva --- test_ros2trace/package.xml | 1 + .../test/test_ros2trace/test_trace.py | 89 ++++++++++++++----- .../test_trace_action.py | 4 + 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/test_ros2trace/package.xml b/test_ros2trace/package.xml index 2c761eb8..e49abfdc 100644 --- a/test_ros2trace/package.xml +++ b/test_ros2trace/package.xml @@ -20,6 +20,7 @@ launch_ros lttngpy python3-pytest + rclpy ros2run ros2trace test_tracetools diff --git a/test_ros2trace/test/test_ros2trace/test_trace.py b/test_ros2trace/test/test_ros2trace/test_trace.py index 74984100..784267a6 100644 --- a/test_ros2trace/test/test_ros2trace/test_trace.py +++ b/test_ros2trace/test/test_ros2trace/test_trace.py @@ -18,7 +18,9 @@ import shutil import subprocess import tempfile +import threading import time +from typing import Callable from typing import Dict from typing import List from typing import Optional @@ -30,6 +32,12 @@ from launch_ros.actions import Node from lttngpy import impl as lttngpy import osrf_pycommon.process_utils +import rclpy +from rclpy.clock import Clock, ClockType +from rclpy.duration import Duration +from rclpy.executors import SingleThreadedExecutor +from rclpy.qos import QoSProfile +from std_msgs.msg import String from tracetools_launch.action import Trace from tracetools_read import get_event_name from tracetools_test.mark_process import get_corresponding_trace_test_events @@ -272,6 +280,19 @@ def run_trace_subcommand( process = self.run_command(['ros2', 'trace', *args], env=env) return self.wait_and_print_command_output(process) + def wait_for( + condition: Callable[[], bool], + timeout: Duration, + sleep_time: float = 0.1, + ): + clock = Clock(clock_type=ClockType.STEADY_TIME) + start = clock.now() + while not condition(): + if clock.now() - start > timeout: + return False + time.sleep(sleep_time) + return True + def run_nodes(self, env: Optional[Mapping[str, str]] = None) -> None: # Set trace test ID env var for spawned processes additional_env: Dict[str, str] = {} @@ -340,9 +361,6 @@ def pre_configure_dual_session_and_run_nodes( loop = osrf_pycommon.process_utils.get_loop() launch_task = loop.create_task(ls.run_async()) - # Give the launch service time to start actions - loop.run_until_complete(asyncio.sleep(1.0)) - return ls, launch_task def test_default(self) -> None: @@ -790,10 +808,32 @@ def test_snapshot_start_pause_resume_stop(self) -> None: def test_dual_session_start_pause_resume_stop(self) -> None: tmpdir = self.create_test_tmpdir('test_dual_session_start_pause_resume_stop') session_name = 'test_dual_session_start_pause_resume_stop' + snapshot_session_name = session_name + '-snapshot' + runtime_session_name = session_name + '-runtime' - # Pre-configure dual session, start nodes and check that the snapshot session exists + # Pre-configure dual session and start nodes ls, launch_task = self.pre_configure_dual_session_and_run_nodes(tmpdir, session_name) - self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) + + # Wait until the nodes are running + ctx = rclpy.Context() + ctx.init() + node = rclpy.create_node('test_dual_session_sub', context=ctx) + ping_sub = node.create_subscription( + String, '/ping', lambda msg: None, QoSProfile(depth=15)) + pong_sub = node.create_subscription( + String, '/pong', lambda msg: None, QoSProfile(depth=15)) + executor = SingleThreadedExecutor(context=ctx) + executor.add_node(node) + executor_thread = threading.Thread(target=executor.spin, daemon=True) + executor_thread.start() + + assert self.wait_for( + lambda: ping_sub.get_publisher_count() >= 1 and pong_sub.get_publisher_count() >= 1, + Duration(seconds=10), + ), 'timed out waiting for nodes to be running' + + # Check that the snapshot session exists + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) # Start tracing ret = self.run_trace_subcommand( @@ -810,14 +850,14 @@ def test_dual_session_start_pause_resume_stop(self) -> None: runtime_trace_dir = os.path.join(tmpdir, session_name, 'runtime') self.assertTraceExist(snapshot_trace_dir) self.assertTraceExist(runtime_trace_dir) - self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) - self.assertTracingSessionExist(session_name+'-runtime') + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) # Pause tracing and check trace ret = self.run_trace_subcommand(['pause', session_name, '--dual-session']) self.assertEqual(0, ret) - self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) - self.assertTracingSessionExist(session_name+'-runtime') + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) expected_trace_data = [ ('topic_name', '/ping'), ('topic_name', '/pong'), @@ -827,8 +867,8 @@ def test_dual_session_start_pause_resume_stop(self) -> None: # Pausing again should give an error but not affect anything ret = self.run_trace_subcommand(['pause', session_name, '--dual-session']) self.assertEqual(1, ret) - self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) - self.assertTracingSessionExist(session_name+'-runtime') + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) new_num_events = self.assertTraceContains( trace_dir=tmpdir, expected_field_value=expected_trace_data, @@ -838,21 +878,21 @@ def test_dual_session_start_pause_resume_stop(self) -> None: # Resume tracing ret = self.run_trace_subcommand(['resume', session_name, '--dual-session']) self.assertEqual(0, ret) - self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) - self.assertTracingSessionExist(session_name+'-runtime') + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) # Resuming tracing again should give an error but not affect anything ret = self.run_trace_subcommand(['resume', session_name, '--dual-session']) self.assertEqual(1, ret) - self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) - self.assertTracingSessionExist(session_name+'-runtime') + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) # Stop tracing and check that trace changed ret = self.run_trace_subcommand(['stop', session_name, '--dual-session']) self.assertEqual(0, ret) # Snapshot session should still exist - self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) - self.assertTracingSessionNotExist(session_name+'-runtime') + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionNotExist(runtime_session_name) new_num_events = self.assertTraceContains( trace_dir, expected_field_value=expected_trace_data, @@ -862,8 +902,8 @@ def test_dual_session_start_pause_resume_stop(self) -> None: # Stopping tracing again should give an error but not affect anything ret = self.run_trace_subcommand(['stop', session_name, '--dual-session']) self.assertEqual(1, ret) - self.assertTracingSessionExist(session_name+'-snapshot', snapshot_mode=True) - self.assertTracingSessionNotExist(session_name+'-runtime') + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionNotExist(runtime_session_name) # Shutdown launch service loop = osrf_pycommon.process_utils.get_loop() @@ -873,8 +913,15 @@ def test_dual_session_start_pause_resume_stop(self) -> None: assert 0 == launch_task.result() # Shutting down the launch service should destroy the snapshot session - self.assertTracingSessionNotExist(session_name+'-snapshot', snapshot_mode=True) - + self.assertTracingSessionNotExist(snapshot_session_name, snapshot_mode=True) + + executor.shutdown() + executor_thread.join(3) + assert not executor_thread.is_alive() + ping_sub.destroy() + pong_sub.destroy() + node.destroy_node() + ctx.shutdown() shutil.rmtree(tmpdir) @unittest.skipIf(not are_tracepoints_included(), 'tracepoints are required') diff --git a/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py b/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py index 84b3482c..cf0eeaff 100644 --- a/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py +++ b/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py @@ -423,6 +423,8 @@ def test_action_substitutions_frontend_xml(self) -> None: None: default: "1048576" - trace: session-name: "$(var session-name)" + snapshot-mode: "$(var snapshot-mode)" + dual-session: "$(var dual-session)" append-timestamp: "$(var append-timestamp)" base-path: "$(var base-path)" append-trace: "$(var append-trace)"