From 68887e3b89bbd7f33d4eb22f2559733e8e7c70a5 Mon Sep 17 00:00:00 2001 From: daifangwen Date: Mon, 9 Mar 2026 08:54:06 +0000 Subject: [PATCH] feat: add lifespan rt metrics --- rock/sandbox/base_actor.py | 33 ++++++---- tests/unit/test_base_actor.py | 111 ++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 12 deletions(-) diff --git a/rock/sandbox/base_actor.py b/rock/sandbox/base_actor.py index 32712b322..3a49ab61e 100644 --- a/rock/sandbox/base_actor.py +++ b/rock/sandbox/base_actor.py @@ -37,6 +37,7 @@ class BaseActor: _namespace = "default" _metrics_endpoint = "" _user_defined_tags: dict = {} + _created_time: float = None def __init__( self, @@ -48,6 +49,7 @@ def __init__( self._gauges: dict[str, _Gauge] = {} if isinstance(config, DockerDeploymentConfig) and config.auto_clear_time: self._auto_clear_time_in_minutes = config.auto_clear_time + self._created_time = time.monotonic() self._stop_time = datetime.datetime.now() + datetime.timedelta(minutes=self._auto_clear_time_in_minutes) # Initialize the user and environment info - can be overridden by subclasses self._role = "test" @@ -103,6 +105,9 @@ def _init_monitor(self): self._gauges["net"] = self.meter.create_gauge( name="xrl_gateway.system.network", description="Network Usage", unit="1" ) + self._gauges["rt"] = self.meter.create_gauge( + name="xrl_gateway.system.lifespan_rt", description="Life Span Rt", unit="1" + ) async def _setup_monitor(self): if not env_vars.ROCK_MONITOR_ENABLE: @@ -152,19 +157,20 @@ async def _collect_sandbox_metrics(self, sandbox_id: str): return logger.debug(f"sandbox [{sandbox_id}] metrics = {metrics}") + attributes = { + "sandbox_id": sandbox_id, + "env": self._env, + "role": self._role, + "host": self.host, + "ip": self._ip, + "user_id": self._user_id, + "experiment_id": self._experiment_id, + "namespace": self._namespace, + } + if self._user_defined_tags is not None: + attributes.update(self._user_defined_tags) + if metrics.get("cpu") is not None: - attributes = { - "sandbox_id": sandbox_id, - "env": self._env, - "role": self._role, - "host": self.host, - "ip": self._ip, - } - if self._user_defined_tags is not None: - attributes.update(self._user_defined_tags) - attributes["user_id"] = self._user_id - attributes["experiment_id"] = self._experiment_id - attributes["namespace"] = self._namespace self._gauges["cpu"].set(metrics["cpu"], attributes=attributes) self._gauges["mem"].set(metrics["mem"], attributes=attributes) self._gauges["disk"].set(metrics["disk"], attributes=attributes) @@ -173,6 +179,9 @@ async def _collect_sandbox_metrics(self, sandbox_id: str): logger.debug(f"Successfully reported metrics for sandbox: {sandbox_id}") else: logger.warning(f"No metrics returned for sandbox: {sandbox_id}") + + life_span_rt = time.monotonic() - self._created_time + self._gauges["rt"].set(life_span_rt, attributes=attributes) single_sandbox_report_rt = time.perf_counter() - start logger.debug(f"Single sandbox report rt:{single_sandbox_report_rt:.4f}s") diff --git a/tests/unit/test_base_actor.py b/tests/unit/test_base_actor.py index daa0aa4d9..765686f1d 100644 --- a/tests/unit/test_base_actor.py +++ b/tests/unit/test_base_actor.py @@ -1,8 +1,12 @@ +import datetime +from unittest.mock import MagicMock + import pytest import ray from rock.deployments.config import LocalDeploymentConfig from rock.logger import init_logger +from rock.sandbox.base_actor import BaseActor from rock.sandbox.sandbox_actor import SandboxActor logger = init_logger(__name__) @@ -131,3 +135,110 @@ async def test_user_defined_tags_with_empty_dict(ray_init_shutdown): logger.info(f"Empty dict set successfully: {result}") finally: ray.kill(sandbox_actor) + + +class ConcreteBaseActor(BaseActor): + """Minimal concrete subclass used only for unit testing BaseActor.""" + + async def get_sandbox_statistics(self): + return {"cpu": 10.0, "mem": 20.0, "disk": 30.0, "net": 40.0} + + +def _make_actor() -> ConcreteBaseActor: + """Create a ConcreteBaseActor with lightweight mocked dependencies.""" + config = MagicMock() + config.container_name = "test-container" + config.auto_clear_time = None # skip DockerDeploymentConfig branch + + deployment = MagicMock() + deployment.__class__ = object # make isinstance(deployment, DockerDeployment) return False + + actor = ConcreteBaseActor(config, deployment) + actor.host = "127.0.0.1" + # Pre-populate all gauges with mocks so tests can override selectively + for key in ("cpu", "mem", "disk", "net", "rt"): + actor._gauges[key] = MagicMock() + return actor + + +@pytest.mark.asyncio +async def test_life_span_rt_gauge_is_set_during_metrics_collection(): + """life_span_rt gauge must be set with the elapsed timedelta after collection.""" + actor = _make_actor() + mock_rt_gauge = MagicMock() + actor._gauges["rt"] = mock_rt_gauge + + await actor._collect_sandbox_metrics("test-container") + + assert mock_rt_gauge.set.called, "life_span_rt gauge.set() was never called" + life_span_rt_value = mock_rt_gauge.set.call_args[0][0] + assert isinstance(life_span_rt_value, float), f"Expected float, got {type(life_span_rt_value)}" + assert life_span_rt_value >= 0, "life_span_rt must be non-negative" + + +@pytest.mark.asyncio +async def test_life_span_rt_increases_over_time(): + """life_span_rt reported on a second call must be >= the first call's value.""" + actor = _make_actor() + mock_rt_gauge = MagicMock() + actor._gauges["rt"] = mock_rt_gauge + + await actor._collect_sandbox_metrics("test-container") + first_rt: datetime.timedelta = mock_rt_gauge.set.call_args[0][0] + + await actor._collect_sandbox_metrics("test-container") + second_rt: datetime.timedelta = mock_rt_gauge.set.call_args[0][0] + + assert second_rt >= first_rt, f"life_span_rt should be non-decreasing: first={first_rt}, second={second_rt}" + + +@pytest.mark.asyncio +async def test_life_span_rt_attributes_contain_expected_keys(): + """Attributes passed to life_span_rt gauge must include all standard dimension keys.""" + actor = _make_actor() + actor._env = "prod" + actor._role = "worker" + actor._user_id = "user-42" + actor._experiment_id = "exp-7" + actor._namespace = "ns-test" + actor.host = "10.0.0.1" + + mock_rt_gauge = MagicMock() + actor._gauges["rt"] = mock_rt_gauge + + await actor._collect_sandbox_metrics("test-container") + + attributes = mock_rt_gauge.set.call_args[1]["attributes"] + expected_keys = {"sandbox_id", "env", "role", "host", "ip", "user_id", "experiment_id", "namespace"} + assert expected_keys.issubset(attributes.keys()), f"Missing attribute keys: {expected_keys - attributes.keys()}" + assert attributes["env"] == "prod" + assert attributes["role"] == "worker" + assert attributes["user_id"] == "user-42" + assert attributes["experiment_id"] == "exp-7" + assert attributes["namespace"] == "ns-test" + + +@pytest.mark.asyncio +async def test_life_span_rt_set_even_when_no_cpu_metrics(): + """life_span_rt must be reported even when get_sandbox_statistics returns no cpu data.""" + + class NoCpuActor(BaseActor): + async def get_sandbox_statistics(self): + return {} # cpu key absent + + config = MagicMock() + config.container_name = "no-cpu-container" + config.auto_clear_time = None + deployment = MagicMock() + deployment.__class__ = object + + actor = NoCpuActor(config, deployment) + actor.host = "127.0.0.1" + for key in ("cpu", "mem", "disk", "net", "rt"): + actor._gauges[key] = MagicMock() + + mock_rt_gauge = actor._gauges["rt"] + + await actor._collect_sandbox_metrics("no-cpu-container") + + assert mock_rt_gauge.set.called, "life_span_rt gauge.set() must be called even when cpu metrics are absent"