diff --git a/test_ros2trace/package.xml b/test_ros2trace/package.xml index 2c761eb8..e49abfdc 100644 --- a/test_ros2trace/package.xml +++ b/test_ros2trace/package.xml @@ -20,6 +20,7 @@ launch_ros lttngpy python3-pytest + rclpy ros2run ros2trace test_tracetools diff --git a/test_ros2trace/test/test_ros2trace/test_trace.py b/test_ros2trace/test/test_ros2trace/test_trace.py index 514a174d..784267a6 100644 --- a/test_ros2trace/test/test_ros2trace/test_trace.py +++ b/test_ros2trace/test/test_ros2trace/test_trace.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio from collections.abc import Mapping import os import shutil import subprocess import tempfile +import threading import time +from typing import Callable from typing import Dict from typing import List from typing import Optional @@ -28,6 +31,14 @@ from launch import LaunchService from launch_ros.actions import Node from lttngpy import impl as lttngpy +import osrf_pycommon.process_utils +import rclpy +from rclpy.clock import Clock, ClockType +from rclpy.duration import Duration +from rclpy.executors import SingleThreadedExecutor +from rclpy.qos import QoSProfile +from std_msgs.msg import String +from tracetools_launch.action import Trace from tracetools_read import get_event_name from tracetools_test.mark_process import get_corresponding_trace_test_events from tracetools_test.mark_process import get_trace_test_id @@ -36,6 +47,7 @@ from tracetools_trace.tools import tracepoints from tracetools_trace.tools.lttng import is_lttng_installed from tracetools_trace.tools.names import DEFAULT_EVENTS_ROS +from tracetools_trace.tools.names import DEFAULT_INIT_EVENTS_ROS def are_tracepoints_included() -> bool: @@ -268,6 +280,19 @@ def run_trace_subcommand( process = self.run_command(['ros2', 'trace', *args], env=env) return self.wait_and_print_command_output(process) + def wait_for( + condition: Callable[[], bool], + timeout: Duration, + sleep_time: float = 0.1, + ): + clock = Clock(clock_type=ClockType.STEADY_TIME) + start = clock.now() + while not condition(): + if clock.now() - start > timeout: + return False + time.sleep(sleep_time) + return True + def run_nodes(self, env: Optional[Mapping[str, str]] = None) -> None: # Set trace test ID env var for spawned processes additional_env: Dict[str, str] = {} @@ -295,6 +320,49 @@ def run_nodes(self, env: Optional[Mapping[str, str]] = None) -> None: exit_code = ls.run() self.assertEqual(0, exit_code) + def pre_configure_dual_session_and_run_nodes( + self, + base_path: str, + session_name: str, + env: Optional[Mapping[str, str]] = None, + ) -> Tuple[LaunchService, asyncio.Task]: + # Set trace test ID env var for spawned processes + additional_env: Dict[str, str] = {} + if env is not None: + additional_env.update(env) + assert self.trace_test_id + additional_env[TRACE_TEST_ID_ENV_VAR] = self.trace_test_id + actions = [ + Trace( + session_name=session_name, + dual_session=True, + base_path=base_path, + events_ust=DEFAULT_INIT_EVENTS_ROS + [TRACE_TEST_ID_TP_NAME], + ), + Node( + package='test_tracetools', + executable='test_ping', + output='screen', + arguments=['do_more'], # run indefinitely + additional_env=additional_env, + ), + Node( + package='test_tracetools', + executable='test_pong', + output='screen', + arguments=['do_more'], # run indefinitely + additional_env=additional_env, + ), + ] + ld = LaunchDescription(actions) + ls = LaunchService() + ls.include_launch_description(ld) + + loop = osrf_pycommon.process_utils.get_loop() + launch_task = loop.create_task(ls.run_async()) + + return ls, launch_task + def test_default(self) -> None: tmpdir = self.create_test_tmpdir('test_default') @@ -671,7 +739,7 @@ def test_snapshot_start_pause_resume_stop(self) -> None: ret = self.run_trace_subcommand( [ 'start', session_name, - '--ust', tracepoints.rcl_subscription_init, TRACE_TEST_ID_TP_NAME, + '--ust', 'ros2:*', TRACE_TEST_ID_TP_NAME, '--path', tmpdir, '--snapshot-mode', ] @@ -736,6 +804,126 @@ def test_snapshot_start_pause_resume_stop(self) -> None: shutil.rmtree(tmpdir) + @unittest.skipIf(not are_tracepoints_included(), 'tracepoints are required') + def test_dual_session_start_pause_resume_stop(self) -> None: + tmpdir = self.create_test_tmpdir('test_dual_session_start_pause_resume_stop') + session_name = 'test_dual_session_start_pause_resume_stop' + snapshot_session_name = session_name + '-snapshot' + runtime_session_name = session_name + '-runtime' + + # Pre-configure dual session and start nodes + ls, launch_task = self.pre_configure_dual_session_and_run_nodes(tmpdir, session_name) + + # Wait until the nodes are running + ctx = rclpy.Context() + ctx.init() + node = rclpy.create_node('test_dual_session_sub', context=ctx) + ping_sub = node.create_subscription( + String, '/ping', lambda msg: None, QoSProfile(depth=15)) + pong_sub = node.create_subscription( + String, '/pong', lambda msg: None, QoSProfile(depth=15)) + executor = SingleThreadedExecutor(context=ctx) + executor.add_node(node) + executor_thread = threading.Thread(target=executor.spin, daemon=True) + executor_thread.start() + + assert self.wait_for( + lambda: ping_sub.get_publisher_count() >= 1 and pong_sub.get_publisher_count() >= 1, + Duration(seconds=10), + ), 'timed out waiting for nodes to be running' + + # Check that the snapshot session exists + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + + # Start tracing + ret = self.run_trace_subcommand( + [ + 'start', session_name, + '--ust', 'ros2:*', + '--path', tmpdir, + '--dual-session', + ] + ) + self.assertEqual(0, ret) + trace_dir = os.path.join(tmpdir, session_name) + snapshot_trace_dir = os.path.join(tmpdir, session_name, 'snapshot') + runtime_trace_dir = os.path.join(tmpdir, session_name, 'runtime') + self.assertTraceExist(snapshot_trace_dir) + self.assertTraceExist(runtime_trace_dir) + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) + + # Pause tracing and check trace + ret = self.run_trace_subcommand(['pause', session_name, '--dual-session']) + self.assertEqual(0, ret) + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) + expected_trace_data = [ + ('topic_name', '/ping'), + ('topic_name', '/pong'), + ] + num_events = self.assertTraceContains(trace_dir, expected_field_value=expected_trace_data) + + # Pausing again should give an error but not affect anything + ret = self.run_trace_subcommand(['pause', session_name, '--dual-session']) + self.assertEqual(1, ret) + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) + new_num_events = self.assertTraceContains( + trace_dir=tmpdir, + expected_field_value=expected_trace_data, + ) + self.assertEqual(num_events, new_num_events, 'unexpected new events in trace') + + # Resume tracing + ret = self.run_trace_subcommand(['resume', session_name, '--dual-session']) + self.assertEqual(0, ret) + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) + + # Resuming tracing again should give an error but not affect anything + ret = self.run_trace_subcommand(['resume', session_name, '--dual-session']) + self.assertEqual(1, ret) + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionExist(runtime_session_name) + + # Stop tracing and check that trace changed + ret = self.run_trace_subcommand(['stop', session_name, '--dual-session']) + self.assertEqual(0, ret) + # Snapshot session should still exist + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionNotExist(runtime_session_name) + new_num_events = self.assertTraceContains( + trace_dir, + expected_field_value=expected_trace_data, + ) + self.assertGreater(new_num_events, num_events, 'no new events in trace') + + # Stopping tracing again should give an error but not affect anything + ret = self.run_trace_subcommand(['stop', session_name, '--dual-session']) + self.assertEqual(1, ret) + self.assertTracingSessionExist(snapshot_session_name, snapshot_mode=True) + self.assertTracingSessionNotExist(runtime_session_name) + + # Shutdown launch service + loop = osrf_pycommon.process_utils.get_loop() + if not launch_task.done(): + loop.create_task(ls.shutdown()) + loop.run_until_complete(launch_task) + assert 0 == launch_task.result() + + # Shutting down the launch service should destroy the snapshot session + self.assertTracingSessionNotExist(snapshot_session_name, snapshot_mode=True) + + executor.shutdown() + executor_thread.join(3) + assert not executor_thread.is_alive() + ping_sub.destroy() + pong_sub.destroy() + node.destroy_node() + ctx.shutdown() + shutil.rmtree(tmpdir) + @unittest.skipIf(not are_tracepoints_included(), 'tracepoints are required') def test_runtime_disable(self) -> None: tmpdir = self.create_test_tmpdir('test_runtime_disable') diff --git a/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py b/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py index c95589b4..cf0eeaff 100644 --- a/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py +++ b/test_tracetools_launch/test/test_tracetools_launch/test_trace_action.py @@ -95,6 +95,7 @@ def _check_trace_action( *, session_name: Optional[str] = 'my-session-name', snapshot_mode: bool = False, + dual_session: bool = False, append_trace: bool = False, events_ust: List[str] = ['ros2:*', '*'], subbuffer_size_ust: int = 524288, @@ -111,6 +112,10 @@ def _check_trace_action( snapshot_mode, perform_typed_substitution(context, action.snapshot_mode, bool) ) + self.assertEqual( + dual_session, + perform_typed_substitution(context, action.dual_session, bool) + ) self.assertEqual( append_trace, perform_typed_substitution(context, action.append_trace, bool) @@ -169,6 +174,27 @@ def test_action_snapshot_mode(self) -> None: shutil.rmtree(tmpdir) + def test_action_dual_session(self) -> None: + tmpdir = tempfile.mkdtemp(prefix='TestTraceAction__test_action_dual_session') + + action = Trace( + session_name='my-session-name', + dual_session=True, + base_path=tmpdir, + events_kernel=[], + syscalls=[], + events_ust=[ + 'ros2:*', + '*', + ], + subbuffer_size_ust=524288, + subbuffer_size_kernel=1048576, + ) + context = self._assert_launch_no_errors([action]) + self._check_trace_action(action, context, tmpdir, dual_session=True) + + shutil.rmtree(tmpdir) + def test_action_frontend_xml(self) -> None: tmpdir = tempfile.mkdtemp(prefix='TestTraceAction__test_frontend_xml') @@ -178,6 +204,7 @@ def test_action_frontend_xml(self) -> None: None: - trace: session-name: my-session-name snapshot-mode: false + dual-session: false append-timestamp: false base-path: {} append-trace: true @@ -300,6 +328,11 @@ def test_action_substitutions(self) -> None: default_value='False', description='whether to take a snapshot of the session', ) + dual_session_arg = DeclareLaunchArgument( + 'dual-session', + default_value='False', + description='whether to pre-configure a dual session', + ) append_timestamp_arg = DeclareLaunchArgument( 'append-timestamp', default_value='False', @@ -323,6 +356,7 @@ def test_action_substitutions(self) -> None: action = Trace( session_name=LaunchConfiguration(session_name_arg.name), snapshot_mode=LaunchConfiguration(snapshot_mode_arg.name), + dual_session=LaunchConfiguration(dual_session_arg.name), append_timestamp=LaunchConfiguration(append_timestamp_arg.name), base_path=TextSubstitution(text=tmpdir), append_trace=LaunchConfiguration(append_trace_arg.name), @@ -345,6 +379,7 @@ def test_action_substitutions(self) -> None: context = self._assert_launch_no_errors([ session_name_arg, snapshot_mode_arg, + dual_session_arg, append_timestamp_arg, append_trace_arg, subbuffer_size_ust_arg, @@ -378,6 +413,7 @@ def test_action_substitutions_frontend_xml(self) -> None: + @@ -387,6 +423,8 @@ def test_action_substitutions_frontend_xml(self) -> None: None: - arg: name: snapshot-mode default: "false" + - arg: + name: dual-session + default: "false" - arg: name: append-timestamp default: "false" @@ -444,6 +485,8 @@ def test_action_substitutions_frontend_yaml(self) -> None: default: "1048576" - trace: session-name: "$(var session-name)" + snapshot-mode: "$(var snapshot-mode)" + dual-session: "$(var dual-session)" append-timestamp: "$(var append-timestamp)" base-path: "$(var base-path)" append-trace: "$(var append-trace)"