Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions rock/admin/entrypoints/sandbox_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Annotated, Any

import ray
from fastapi import APIRouter, Body, Depends, File, Form, UploadFile

from rock.actions import (
Expand Down Expand Up @@ -29,6 +30,27 @@
from rock.deployments.config import DockerDeploymentConfig
from rock.sandbox.sandbox_manager import SandboxManager
from rock.utils import handle_exceptions
from rock import env_vars


def _sandbox_start_request_from_form(
image: str = Form(""),
memory: str = Form("8g"),
cpus: float = Form(2),
auto_clear_time_minutes: int = Form(env_vars.ROCK_DEFAULT_AUTO_CLEAR_TIME_MINUTES),
startup_timeout: float = Form(300.0),
pull: str = Form("missing"),
sandbox_id: str | None = Form(None),
) -> SandboxStartRequest:
return SandboxStartRequest(
image=image,
memory=memory,
cpus=cpus,
auto_clear_time_minutes=auto_clear_time_minutes,
startup_timeout=startup_timeout,
pull=pull,
sandbox_id=sandbox_id or None,
)

sandbox_router = APIRouter()
sandbox_manager: SandboxManager
Expand Down Expand Up @@ -73,6 +95,25 @@ async def start_async(
return RockResponse(result=sandbox_start_response)


@sandbox_router.post("/start_async_with_env")
@handle_exceptions(error_message="async start sandbox with env_dir failed")
async def start_async_with_env(
request: Annotated[SandboxStartRequest, Depends(_sandbox_start_request_from_form)],
headers: Annotated[StartHeaders, Depends()],
env_dir_tar: UploadFile = File(...),
) -> RockResponse[SandboxStartResponse]:
config = DockerDeploymentConfig.from_request(request)
tar_bytes = await env_dir_tar.read()
config.env_dir_tar_ref = ray.put(tar_bytes)
await _apply_kata_runtime_switch(config)
sandbox_start_response = await sandbox_manager.start_async(
config,
user_info=headers.user_info,
cluster_info=headers.cluster_info,
)
return RockResponse(result=sandbox_start_response)


@sandbox_router.get("/is_alive")
@handle_exceptions(error_message="get sandbox is alive failed")
async def is_alive(sandbox_id: str):
Expand Down
5 changes: 4 additions & 1 deletion rock/deployments/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

from abc import abstractmethod
from typing import Literal
from typing import Any, Literal

from pydantic import BaseModel, ConfigDict, Field, model_validator

Expand Down Expand Up @@ -115,6 +115,9 @@ class DockerDeploymentConfig(DeploymentConfig):
extended_params: dict[str, str] = Field(default_factory=dict)
"""Generic extension field for storing custom string key-value pairs."""

env_dir_tar_ref: Any = None
"""Optional Ray ObjectRef of tar.gz bytes (build context). When set, worker extracts and runs docker build before run."""

@model_validator(mode="before")
def validate_platform_args(cls, data: dict) -> dict:
"""Validate and extract platform arguments from docker_args.
Expand Down
58 changes: 53 additions & 5 deletions rock/deployments/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
import random
import shlex
import subprocess
import tarfile
import tempfile
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from pathlib import Path
from typing import Any

import ray

from typing_extensions import Self

from rock import env_vars
Expand All @@ -20,7 +25,7 @@
from rock.deployments.hooks.abstract import CombinedDeploymentHook, DeploymentHook
from rock.deployments.runtime_env import DockerRuntimeEnv, LocalRuntimeEnv, PipRuntimeEnv, UvRuntimeEnv
from rock.deployments.sandbox_validator import DockerSandboxValidator
from rock.deployments.status import PersistedServiceStatus, ServiceStatus
from rock.deployments.status import PhaseStatus, PersistedServiceStatus, ServiceStatus
from rock.logger import init_logger
from rock.rocklet import PACKAGE_NAME, REMOTE_EXECUTABLE_NAME
from rock.rocklet.exceptions import DeploymentNotStartedError, DockerPullError
Expand Down Expand Up @@ -63,6 +68,8 @@ def __init__(
self._check_stop_task = None
self._container_name = None
self._service_status = PersistedServiceStatus()
if getattr(self._config, "env_dir_tar_ref", None) is not None:
self._service_status.add_phase("docker_build", PhaseStatus())
if self._config.container_name:
self.set_container_name(self._config.container_name)
if env_vars.ROCK_WORKER_ENV_TYPE == "docker":
Expand Down Expand Up @@ -285,6 +292,40 @@ def _memory(self):
def _cpus(self):
return [f"--cpus={self.config.cpus}"]

def _build_image_from_env_dir_tar(self, tar_bytes: bytes) -> str:
"""Extract env_dir tar.gz and run docker build; return image tag (self._config.image)."""
self._service_status.update_status(
phase_name="docker_build", status=Status.RUNNING, message="building from env_dir context"
)
with tempfile.TemporaryDirectory(prefix="rock_env_") as tmpdir:
path = Path(tmpdir)
with tarfile.open(fileobj=BytesIO(tar_bytes), mode="r:gz") as tar:
tar.extractall(path)
context_dir = path
dockerfile_path = context_dir / "Dockerfile"
if not dockerfile_path.exists():
raise FileNotFoundError(f"Dockerfile not found in env_dir context: {dockerfile_path}")
build_cmd = [
"docker",
"build",
"-f",
str(dockerfile_path),
"-t",
self._config.image,
str(context_dir),
]
logger.info("Running docker build from env_dir context: %s", shlex.join(build_cmd))
try:
subprocess.run(build_cmd, check=True, capture_output=True, timeout=1800)
except subprocess.CalledProcessError as e:
msg = f"docker build failed: {e.stderr.decode() if e.stderr else str(e)}"
self._service_status.update_status(phase_name="docker_build", status=Status.FAILED, message=msg)
raise RuntimeError(msg) from e
self._service_status.update_status(
phase_name="docker_build", status=Status.SUCCESS, message="docker build success"
)
return self._config.image

async def start(self):
"""Starts the runtime."""
if not self.sandbox_validator.check_availability():
Expand All @@ -295,11 +336,18 @@ async def start(self):
self._service_status.set_sandbox_id(self._container_name)
executor = get_executor()
loop = asyncio.get_running_loop()
await loop.run_in_executor(executor, self._pull_image)
if self._config.python_standalone_dir is not None:
image_id = self._build_image()

if self._config.env_dir_tar_ref is not None:
tar_bytes = ray.get(self._config.env_dir_tar_ref)
image_id = await loop.run_in_executor(
executor, self._build_image_from_env_dir_tar, tar_bytes
)
else:
image_id = self._config.image
await loop.run_in_executor(executor, self._pull_image)
if self._config.python_standalone_dir is not None:
image_id = self._build_image()
else:
image_id = self._config.image

if not self.sandbox_validator.check_resource(image_id):
raise Exception(f"Image {image_id} is not valid")
Expand Down
7 changes: 6 additions & 1 deletion rock/sandbox/operator/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def _generate_actor_options(self, config: DockerDeploymentConfig) -> dict:
async def submit(self, config: DockerDeploymentConfig, user_info: dict = {}) -> SandboxInfo:
async with self._ray_service.get_ray_rwlock().read_lock():
sandbox_id = config.container_name
logger.info(f"[{sandbox_id}] start_async params:{json.dumps(config.model_dump(), indent=2)}")
log_params = config.model_dump()
# env_dir_tar_ref is a Ray ObjectRef, not JSON-serializable
if "env_dir_tar_ref" in log_params and log_params["env_dir_tar_ref"] is not None:
log_params = {k: v for k, v in log_params.items() if k != "env_dir_tar_ref"}
log_params["env_dir_tar_ref"] = "<ObjectRef>"
logger.info(f"[{sandbox_id}] start_async params:{json.dumps(log_params, indent=2)}")
sandbox_actor: SandboxActor = await self.create_actor(config)
sandbox_actor.start.remote()
user_id = user_info.get("user_id", "default")
Expand Down
2 changes: 2 additions & 0 deletions rock/sandbox/sandbox_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ async def start_async(
self.validate_sandbox_spec(self.rock_config.runtime, config)
docker_deployment_config: DockerDeploymentConfig = await self.deployment_manager.init_config(config)
sandbox_id = docker_deployment_config.container_name
if getattr(docker_deployment_config, "env_dir_tar_ref", None) is not None and not (docker_deployment_config.image or "").strip():
docker_deployment_config.image = f"rock_env_image:{sandbox_id}"
sandbox_info: SandboxInfo = await self._operator.submit(docker_deployment_config, user_info)
stop_time = str(int(time.time()) + docker_deployment_config.auto_clear_time * 60)
auto_clear_time_dict = {
Expand Down
48 changes: 40 additions & 8 deletions rock/sdk/sandbox/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import io
import logging
import mimetypes
import os
import tarfile
import time
import uuid
import warnings
Expand Down Expand Up @@ -160,22 +162,52 @@ async def _parse_error_message_from_status(self, status: dict):
# If no failed stage is found, return None
return None

@staticmethod
def _pack_env_dir_to_tar_gz(env_dir: Path) -> bytes:
"""Pack a directory (docker build context) into a gzipped tar as bytes."""
buf = io.BytesIO()
env_dir = Path(env_dir).resolve()
if not env_dir.is_dir():
raise ValueError(f"env_dir is not a directory: {env_dir}")
dockerfile = env_dir / "Dockerfile"
if not dockerfile.exists():
raise ValueError(f"Dockerfile not found in env_dir: {dockerfile}")
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
tar.add(env_dir, arcname=".", filter=lambda ti: None if ti.name == ".git" else ti)
return buf.getvalue()

async def start(self):
url = f"{self._url}/start_async"
headers = self._build_headers()
data = {
"image": self.config.image,
"image_os": self.config.image_os,
form_data = {
"auto_clear_time": self.config.auto_clear_seconds / 60,
"auto_clear_time_minutes": self.config.auto_clear_seconds / 60,
"startup_timeout": self.config.startup_timeout,
"memory": self.config.memory,
"cpus": self.config.cpus,
}
try:
response = await HttpUtils.post(url, headers, data)
except Exception as e:
raise Exception(f"Failed to start standbox: {str(e)}, post url {url}")
if not self.config.env_dir:
form_data["image"] = self.config.image
form_data["image_os"] = self.config.image_os
if self.config.env_dir:
# env_dir build: server uses rock_env_image:<sandbox_id> as image tag
env_dir = Path(self.config.env_dir).resolve()
tar_bytes = self._pack_env_dir_to_tar_gz(env_dir)
url = f"{self._url}/start_async_with_env"
try:
response = await HttpUtils.post_multipart(
url,
headers,
data=form_data,
files={"env_dir_tar": ("env_dir.tar.gz", tar_bytes, "application/gzip")},
)
except Exception as e:
raise Exception(f"Failed to start sandbox with env_dir: {str(e)}, post url {url}")
else:
url = f"{self._url}/start_async"
try:
response = await HttpUtils.post(url, headers, form_data)
except Exception as e:
raise Exception(f"Failed to start sandbox: {str(e)}, post url {url}")

logging.debug(f"Start sandbox response: {response}")
if "Success" != response.get("status"):
Expand Down
4 changes: 4 additions & 0 deletions rock/sdk/sandbox/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import warnings
from pathlib import Path
from typing import Union

from pydantic import BaseModel, Field, field_validator

Expand Down Expand Up @@ -39,6 +41,8 @@ class SandboxConfig(BaseConfig):
experiment_id: str | None = None
cluster: str = "zb"
namespace: str | None = None
env_dir: Union[str, Path, None] = None
"""Optional path to docker build context directory (must contain Dockerfile). When set, SDK packs it as tar.gz and uploads; worker will docker build then run. Image tag is server-derived as rock_env_image:<sandbox_id>."""


class SandboxGroupConfig(SandboxConfig):
Expand Down
70 changes: 70 additions & 0 deletions tests/integration/sdk/sandbox/test_env_dir_build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Integration test for SDK start with env_dir (docker build context).

- SDK packs env_dir (directory containing Dockerfile) as tar.gz and POSTs to
/start_async_with_env; admin passes the archive to the SandboxActor via Ray;
the actor extracts, runs docker build, then docker run.

Run: pytest tests/integration/sdk/sandbox/test_env_dir_build.py -v
(with admin available and docker on the worker node)
"""
import tempfile
from pathlib import Path

import pytest

from rock.actions.sandbox.request import CreateBashSessionRequest
from rock.logger import init_logger
from rock.sdk.sandbox.client import Sandbox
from rock.sdk.sandbox.config import SandboxConfig

from tests.integration.conftest import SKIP_IF_NO_DOCKER

logger = init_logger(__name__)

# Content of the file we COPY in the Dockerfile; verified inside the container.
ENV_DIR_TEST_FILE_CONTENT = "rock-env-dir-build-ok"

@pytest.fixture
def minimal_env_dir():
"""A minimal docker build context: Dockerfile + a file to test COPY."""
with tempfile.TemporaryDirectory(prefix="rock_env_dir_") as tmp:
path = Path(tmp)
(path / "app.txt").write_text(ENV_DIR_TEST_FILE_CONTENT)
(path / "Dockerfile").write_text(
"FROM python:3.11\n"
"COPY app.txt /opt/app.txt\n"
)
yield path


@pytest.mark.need_admin
@SKIP_IF_NO_DOCKER
@pytest.mark.asyncio
async def test_sandbox_start_with_env_dir(admin_remote_server, minimal_env_dir):
"""Start sandbox with env_dir: SDK packs context, admin passes to actor, actor builds and runs.
When env_dir is set, image is ignored; server uses a generated tag for the built image."""
config = SandboxConfig(
memory="2g",
cpus=1.0,
startup_timeout=300,
base_url=f"{admin_remote_server.endpoint}:{admin_remote_server.port}",
env_dir=minimal_env_dir,
)
sandbox = Sandbox(config)
try:
await sandbox.start()
assert sandbox.sandbox_id
await sandbox.create_session(CreateBashSessionRequest(session="default"))
result = await sandbox.arun(cmd="echo ok", session="default")
assert result.output is not None
assert "ok" in result.output
# Verify COPY in Dockerfile: file from build context is present with expected content
cat_result = await sandbox.arun(cmd="cat /opt/app.txt", session="default")
assert cat_result.output is not None
assert cat_result.output.strip() == ENV_DIR_TEST_FILE_CONTENT
finally:
try:
await sandbox.stop()
except Exception as e:
logger.warning("Failed to stop sandbox: %s", e)
Loading