From 01887b9f49ef9284f03f02ef63f8cf1cd4734831 Mon Sep 17 00:00:00 2001 From: You-Cheng Lin Date: Sat, 29 Nov 2025 13:17:16 +0800 Subject: [PATCH 1/2] update Signed-off-by: You-Cheng Lin --- .../data/_internal/execution/streaming_executor.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 5062d732b8dc..d6095afc00cd 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -117,6 +117,15 @@ def __init__( # by comparing it with the current timestamp. self._metrics_last_updated: float = 0.0 + # Initialize metrics gauges + self._initialize_metrics_gauges() + + Executor.__init__(self, self._data_context.execution_options) + thread_name = f"StreamingExecutor-{self._dataset_id}" + threading.Thread.__init__(self, daemon=True, name=thread_name) + + def _initialize_metrics_gauges(self) -> None: + """Initialize all Prometheus-style metrics gauges for monitoring execution.""" self._sched_loop_duration_s = Gauge( "data_sched_loop_duration_s", description="Duration of the scheduling loop in seconds", @@ -149,10 +158,6 @@ def __init__( tag_keys=("dataset", "operator"), ) - Executor.__init__(self, self._data_context.execution_options) - thread_name = f"StreamingExecutor-{self._dataset_id}" - threading.Thread.__init__(self, daemon=True, name=thread_name) - def execute( self, dag: PhysicalOperator, initial_stats: Optional[DatasetStats] = None ) -> OutputIterator: From 2e4d98493b344ca5aa9bcea8874deb6db9cb6ffa Mon Sep 17 00:00:00 2001 From: You-Cheng Lin Date: Sat, 29 Nov 2025 13:20:30 +0800 Subject: [PATCH 2/2] remove unnecessary comment Signed-off-by: You-Cheng Lin --- python/ray/data/_internal/execution/streaming_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index d6095afc00cd..0b1dc38525f8 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -117,7 +117,6 @@ def __init__( # by comparing it with the current timestamp. self._metrics_last_updated: float = 0.0 - # Initialize metrics gauges self._initialize_metrics_gauges() Executor.__init__(self, self._data_context.execution_options)