Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions rock/sandbox/base_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class BaseActor:
_namespace = "default"
_metrics_endpoint = ""
_user_defined_tags: dict = {}
_created_time: float = None

def __init__(
self,
Expand All @@ -48,6 +49,7 @@ def __init__(
self._gauges: dict[str, _Gauge] = {}
if isinstance(config, DockerDeploymentConfig) and config.auto_clear_time:
self._auto_clear_time_in_minutes = config.auto_clear_time
self._created_time = time.monotonic()
self._stop_time = datetime.datetime.now() + datetime.timedelta(minutes=self._auto_clear_time_in_minutes)
# Initialize the user and environment info - can be overridden by subclasses
self._role = "test"
Expand Down Expand Up @@ -103,6 +105,9 @@ def _init_monitor(self):
self._gauges["net"] = self.meter.create_gauge(
name="xrl_gateway.system.network", description="Network Usage", unit="1"
)
self._gauges["rt"] = self.meter.create_gauge(
name="xrl_gateway.system.lifespan_rt", description="Life Span Rt", unit="1"
)

async def _setup_monitor(self):
if not env_vars.ROCK_MONITOR_ENABLE:
Expand Down Expand Up @@ -152,19 +157,20 @@ async def _collect_sandbox_metrics(self, sandbox_id: str):
return
logger.debug(f"sandbox [{sandbox_id}] metrics = {metrics}")

attributes = {
"sandbox_id": sandbox_id,
"env": self._env,
"role": self._role,
"host": self.host,
"ip": self._ip,
"user_id": self._user_id,
"experiment_id": self._experiment_id,
"namespace": self._namespace,
}
if self._user_defined_tags is not None:
attributes.update(self._user_defined_tags)

if metrics.get("cpu") is not None:
attributes = {
"sandbox_id": sandbox_id,
"env": self._env,
"role": self._role,
"host": self.host,
"ip": self._ip,
}
if self._user_defined_tags is not None:
attributes.update(self._user_defined_tags)
attributes["user_id"] = self._user_id
attributes["experiment_id"] = self._experiment_id
attributes["namespace"] = self._namespace
self._gauges["cpu"].set(metrics["cpu"], attributes=attributes)
self._gauges["mem"].set(metrics["mem"], attributes=attributes)
self._gauges["disk"].set(metrics["disk"], attributes=attributes)
Expand All @@ -173,6 +179,9 @@ async def _collect_sandbox_metrics(self, sandbox_id: str):
logger.debug(f"Successfully reported metrics for sandbox: {sandbox_id}")
else:
logger.warning(f"No metrics returned for sandbox: {sandbox_id}")

life_span_rt = time.monotonic() - self._created_time
self._gauges["rt"].set(life_span_rt, attributes=attributes)
single_sandbox_report_rt = time.perf_counter() - start
logger.debug(f"Single sandbox report rt:{single_sandbox_report_rt:.4f}s")

Expand Down
111 changes: 111 additions & 0 deletions tests/unit/test_base_actor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import datetime
from unittest.mock import MagicMock

import pytest
import ray

from rock.deployments.config import LocalDeploymentConfig
from rock.logger import init_logger
from rock.sandbox.base_actor import BaseActor
from rock.sandbox.sandbox_actor import SandboxActor

logger = init_logger(__name__)
Expand Down Expand Up @@ -131,3 +135,110 @@ async def test_user_defined_tags_with_empty_dict(ray_init_shutdown):
logger.info(f"Empty dict set successfully: {result}")
finally:
ray.kill(sandbox_actor)


class ConcreteBaseActor(BaseActor):
"""Minimal concrete subclass used only for unit testing BaseActor."""

async def get_sandbox_statistics(self):
return {"cpu": 10.0, "mem": 20.0, "disk": 30.0, "net": 40.0}


def _make_actor() -> ConcreteBaseActor:
"""Create a ConcreteBaseActor with lightweight mocked dependencies."""
config = MagicMock()
config.container_name = "test-container"
config.auto_clear_time = None # skip DockerDeploymentConfig branch

deployment = MagicMock()
deployment.__class__ = object # make isinstance(deployment, DockerDeployment) return False

actor = ConcreteBaseActor(config, deployment)
actor.host = "127.0.0.1"
# Pre-populate all gauges with mocks so tests can override selectively
for key in ("cpu", "mem", "disk", "net", "rt"):
actor._gauges[key] = MagicMock()
return actor


@pytest.mark.asyncio
async def test_life_span_rt_gauge_is_set_during_metrics_collection():
"""life_span_rt gauge must be set with the elapsed timedelta after collection."""
actor = _make_actor()
mock_rt_gauge = MagicMock()
actor._gauges["rt"] = mock_rt_gauge

await actor._collect_sandbox_metrics("test-container")

assert mock_rt_gauge.set.called, "life_span_rt gauge.set() was never called"
life_span_rt_value = mock_rt_gauge.set.call_args[0][0]
assert isinstance(life_span_rt_value, float), f"Expected float, got {type(life_span_rt_value)}"
assert life_span_rt_value >= 0, "life_span_rt must be non-negative"


@pytest.mark.asyncio
async def test_life_span_rt_increases_over_time():
"""life_span_rt reported on a second call must be >= the first call's value."""
actor = _make_actor()
mock_rt_gauge = MagicMock()
actor._gauges["rt"] = mock_rt_gauge

await actor._collect_sandbox_metrics("test-container")
first_rt: datetime.timedelta = mock_rt_gauge.set.call_args[0][0]

await actor._collect_sandbox_metrics("test-container")
second_rt: datetime.timedelta = mock_rt_gauge.set.call_args[0][0]

assert second_rt >= first_rt, f"life_span_rt should be non-decreasing: first={first_rt}, second={second_rt}"


@pytest.mark.asyncio
async def test_life_span_rt_attributes_contain_expected_keys():
"""Attributes passed to life_span_rt gauge must include all standard dimension keys."""
actor = _make_actor()
actor._env = "prod"
actor._role = "worker"
actor._user_id = "user-42"
actor._experiment_id = "exp-7"
actor._namespace = "ns-test"
actor.host = "10.0.0.1"

mock_rt_gauge = MagicMock()
actor._gauges["rt"] = mock_rt_gauge

await actor._collect_sandbox_metrics("test-container")

attributes = mock_rt_gauge.set.call_args[1]["attributes"]
expected_keys = {"sandbox_id", "env", "role", "host", "ip", "user_id", "experiment_id", "namespace"}
assert expected_keys.issubset(attributes.keys()), f"Missing attribute keys: {expected_keys - attributes.keys()}"
assert attributes["env"] == "prod"
assert attributes["role"] == "worker"
assert attributes["user_id"] == "user-42"
assert attributes["experiment_id"] == "exp-7"
assert attributes["namespace"] == "ns-test"


@pytest.mark.asyncio
async def test_life_span_rt_set_even_when_no_cpu_metrics():
"""life_span_rt must be reported even when get_sandbox_statistics returns no cpu data."""

class NoCpuActor(BaseActor):
async def get_sandbox_statistics(self):
return {} # cpu key absent

config = MagicMock()
config.container_name = "no-cpu-container"
config.auto_clear_time = None
deployment = MagicMock()
deployment.__class__ = object

actor = NoCpuActor(config, deployment)
actor.host = "127.0.0.1"
for key in ("cpu", "mem", "disk", "net", "rt"):
actor._gauges[key] = MagicMock()

mock_rt_gauge = actor._gauges["rt"]

await actor._collect_sandbox_metrics("no-cpu-container")

assert mock_rt_gauge.set.called, "life_span_rt gauge.set() must be called even when cpu metrics are absent"
Loading