diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 5062d732b8dc..0b1dc38525f8 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -117,6 +117,14 @@ def __init__( # by comparing it with the current timestamp. self._metrics_last_updated: float = 0.0 + self._initialize_metrics_gauges() + + Executor.__init__(self, self._data_context.execution_options) + thread_name = f"StreamingExecutor-{self._dataset_id}" + threading.Thread.__init__(self, daemon=True, name=thread_name) + + def _initialize_metrics_gauges(self) -> None: + """Initialize all Prometheus-style metrics gauges for monitoring execution.""" self._sched_loop_duration_s = Gauge( "data_sched_loop_duration_s", description="Duration of the scheduling loop in seconds", @@ -149,10 +157,6 @@ def __init__( tag_keys=("dataset", "operator"), ) - Executor.__init__(self, self._data_context.execution_options) - thread_name = f"StreamingExecutor-{self._dataset_id}" - threading.Thread.__init__(self, daemon=True, name=thread_name) - def execute( self, dag: PhysicalOperator, initial_stats: Optional[DatasetStats] = None ) -> OutputIterator: