1818import shutil
1919import subprocess
2020import tempfile
21+ import threading
2122import time
23+ from typing import Callable
2224from typing import Dict
2325from typing import List
2426from typing import Optional
3032from launch_ros .actions import Node
3133from lttngpy import impl as lttngpy
3234import osrf_pycommon .process_utils
35+ import rclpy
36+ from rclpy .clock import Clock , ClockType
37+ from rclpy .duration import Duration
38+ from rclpy .executors import SingleThreadedExecutor
39+ from rclpy .qos import QoSProfile
40+ from std_msgs .msg import String
3341from tracetools_launch .action import Trace
3442from tracetools_read import get_event_name
3543from tracetools_test .mark_process import get_corresponding_trace_test_events
@@ -272,6 +280,19 @@ def run_trace_subcommand(
272280 process = self .run_command (['ros2' , 'trace' , * args ], env = env )
273281 return self .wait_and_print_command_output (process )
274282
283+ def wait_for (
284+ condition : Callable [[], bool ],
285+ timeout : Duration ,
286+ sleep_time : float = 0.1 ,
287+ ):
288+ clock = Clock (clock_type = ClockType .STEADY_TIME )
289+ start = clock .now ()
290+ while not condition ():
291+ if clock .now () - start > timeout :
292+ return False
293+ time .sleep (sleep_time )
294+ return True
295+
275296 def run_nodes (self , env : Optional [Mapping [str , str ]] = None ) -> None :
276297 # Set trace test ID env var for spawned processes
277298 additional_env : Dict [str , str ] = {}
@@ -340,9 +361,6 @@ def pre_configure_dual_session_and_run_nodes(
340361 loop = osrf_pycommon .process_utils .get_loop ()
341362 launch_task = loop .create_task (ls .run_async ())
342363
343- # Give the launch service time to start actions
344- loop .run_until_complete (asyncio .sleep (1.0 ))
345-
346364 return ls , launch_task
347365
348366 def test_default (self ) -> None :
@@ -790,10 +808,32 @@ def test_snapshot_start_pause_resume_stop(self) -> None:
790808 def test_dual_session_start_pause_resume_stop (self ) -> None :
791809 tmpdir = self .create_test_tmpdir ('test_dual_session_start_pause_resume_stop' )
792810 session_name = 'test_dual_session_start_pause_resume_stop'
811+ snapshot_session_name = session_name + '-snapshot'
812+ runtime_session_name = session_name + '-runtime'
793813
794- # Pre-configure dual session, start nodes and check that the snapshot session exists
814+ # Pre-configure dual session and start nodes
795815 ls , launch_task = self .pre_configure_dual_session_and_run_nodes (tmpdir , session_name )
796- self .assertTracingSessionExist (session_name + '-snapshot' , snapshot_mode = True )
816+
817+ # Wait until the nodes are running
818+ ctx = rclpy .Context ()
819+ ctx .init ()
820+ node = rclpy .create_node ('test_dual_session_sub' , context = ctx )
821+ ping_sub = node .create_subscription (
822+ String , '/ping' , lambda msg : None , QoSProfile (depth = 15 ))
823+ pong_sub = node .create_subscription (
824+ String , '/pong' , lambda msg : None , QoSProfile (depth = 15 ))
825+ executor = SingleThreadedExecutor (context = ctx )
826+ executor .add_node (node )
827+ executor_thread = threading .Thread (target = executor .spin , daemon = True )
828+ executor_thread .start ()
829+
830+ assert self .wait_for (
831+ lambda : ping_sub .get_publisher_count () >= 1 and pong_sub .get_publisher_count () >= 1 ,
832+ Duration (seconds = 10 ),
833+ ), 'timed out waiting for nodes to be running'
834+
835+ # Check that the snapshot session exists
836+ self .assertTracingSessionExist (snapshot_session_name , snapshot_mode = True )
797837
798838 # Start tracing
799839 ret = self .run_trace_subcommand (
@@ -810,14 +850,14 @@ def test_dual_session_start_pause_resume_stop(self) -> None:
810850 runtime_trace_dir = os .path .join (tmpdir , session_name , 'runtime' )
811851 self .assertTraceExist (snapshot_trace_dir )
812852 self .assertTraceExist (runtime_trace_dir )
813- self .assertTracingSessionExist (session_name + '-snapshot' , snapshot_mode = True )
814- self .assertTracingSessionExist (session_name + '-runtime' )
853+ self .assertTracingSessionExist (snapshot_session_name , snapshot_mode = True )
854+ self .assertTracingSessionExist (runtime_session_name )
815855
816856 # Pause tracing and check trace
817857 ret = self .run_trace_subcommand (['pause' , session_name , '--dual-session' ])
818858 self .assertEqual (0 , ret )
819- self .assertTracingSessionExist (session_name + '-snapshot' , snapshot_mode = True )
820- self .assertTracingSessionExist (session_name + '-runtime' )
859+ self .assertTracingSessionExist (snapshot_session_name , snapshot_mode = True )
860+ self .assertTracingSessionExist (runtime_session_name )
821861 expected_trace_data = [
822862 ('topic_name' , '/ping' ),
823863 ('topic_name' , '/pong' ),
@@ -827,8 +867,8 @@ def test_dual_session_start_pause_resume_stop(self) -> None:
827867 # Pausing again should give an error but not affect anything
828868 ret = self .run_trace_subcommand (['pause' , session_name , '--dual-session' ])
829869 self .assertEqual (1 , ret )
830- self .assertTracingSessionExist (session_name + '-snapshot' , snapshot_mode = True )
831- self .assertTracingSessionExist (session_name + '-runtime' )
870+ self .assertTracingSessionExist (snapshot_session_name , snapshot_mode = True )
871+ self .assertTracingSessionExist (runtime_session_name )
832872 new_num_events = self .assertTraceContains (
833873 trace_dir = tmpdir ,
834874 expected_field_value = expected_trace_data ,
@@ -838,21 +878,21 @@ def test_dual_session_start_pause_resume_stop(self) -> None:
838878 # Resume tracing
839879 ret = self .run_trace_subcommand (['resume' , session_name , '--dual-session' ])
840880 self .assertEqual (0 , ret )
841- self .assertTracingSessionExist (session_name + '-snapshot' , snapshot_mode = True )
842- self .assertTracingSessionExist (session_name + '-runtime' )
881+ self .assertTracingSessionExist (snapshot_session_name , snapshot_mode = True )
882+ self .assertTracingSessionExist (runtime_session_name )
843883
844884 # Resuming tracing again should give an error but not affect anything
845885 ret = self .run_trace_subcommand (['resume' , session_name , '--dual-session' ])
846886 self .assertEqual (1 , ret )
847- self .assertTracingSessionExist (session_name + '-snapshot' , snapshot_mode = True )
848- self .assertTracingSessionExist (session_name + '-runtime' )
887+ self .assertTracingSessionExist (snapshot_session_name , snapshot_mode = True )
888+ self .assertTracingSessionExist (runtime_session_name )
849889
850890 # Stop tracing and check that trace changed
851891 ret = self .run_trace_subcommand (['stop' , session_name , '--dual-session' ])
852892 self .assertEqual (0 , ret )
853893 # Snapshot session should still exist
854- self .assertTracingSessionExist (session_name + '-snapshot' , snapshot_mode = True )
855- self .assertTracingSessionNotExist (session_name + '-runtime' )
894+ self .assertTracingSessionExist (snapshot_session_name , snapshot_mode = True )
895+ self .assertTracingSessionNotExist (runtime_session_name )
856896 new_num_events = self .assertTraceContains (
857897 trace_dir ,
858898 expected_field_value = expected_trace_data ,
@@ -862,8 +902,8 @@ def test_dual_session_start_pause_resume_stop(self) -> None:
862902 # Stopping tracing again should give an error but not affect anything
863903 ret = self .run_trace_subcommand (['stop' , session_name , '--dual-session' ])
864904 self .assertEqual (1 , ret )
865- self .assertTracingSessionExist (session_name + '-snapshot' , snapshot_mode = True )
866- self .assertTracingSessionNotExist (session_name + '-runtime' )
905+ self .assertTracingSessionExist (snapshot_session_name , snapshot_mode = True )
906+ self .assertTracingSessionNotExist (runtime_session_name )
867907
868908 # Shutdown launch service
869909 loop = osrf_pycommon .process_utils .get_loop ()
@@ -873,8 +913,15 @@ def test_dual_session_start_pause_resume_stop(self) -> None:
873913 assert 0 == launch_task .result ()
874914
875915 # Shutting down the launch service should destroy the snapshot session
876- self .assertTracingSessionNotExist (session_name + '-snapshot' , snapshot_mode = True )
877-
916+ self .assertTracingSessionNotExist (snapshot_session_name , snapshot_mode = True )
917+
918+ executor .shutdown ()
919+ executor_thread .join (3 )
920+ assert not executor_thread .is_alive ()
921+ ping_sub .destroy ()
922+ pong_sub .destroy ()
923+ node .destroy_node ()
924+ ctx .shutdown ()
878925 shutil .rmtree (tmpdir )
879926
880927 @unittest .skipIf (not are_tracepoints_included (), 'tracepoints are required' )
0 commit comments