diff --git a/rock/admin/entrypoints/sandbox_api.py b/rock/admin/entrypoints/sandbox_api.py index 3cf788212..ff9660389 100644 --- a/rock/admin/entrypoints/sandbox_api.py +++ b/rock/admin/entrypoints/sandbox_api.py @@ -1,5 +1,6 @@ from typing import Annotated, Any +import ray from fastapi import APIRouter, Body, Depends, File, Form, UploadFile from rock.actions import ( @@ -29,6 +30,27 @@ from rock.deployments.config import DockerDeploymentConfig from rock.sandbox.sandbox_manager import SandboxManager from rock.utils import handle_exceptions +from rock import env_vars + + +def _sandbox_start_request_from_form( + image: str = Form(""), + memory: str = Form("8g"), + cpus: float = Form(2), + auto_clear_time_minutes: int = Form(env_vars.ROCK_DEFAULT_AUTO_CLEAR_TIME_MINUTES), + startup_timeout: float = Form(300.0), + pull: str = Form("missing"), + sandbox_id: str | None = Form(None), +) -> SandboxStartRequest: + return SandboxStartRequest( + image=image, + memory=memory, + cpus=cpus, + auto_clear_time_minutes=auto_clear_time_minutes, + startup_timeout=startup_timeout, + pull=pull, + sandbox_id=sandbox_id or None, + ) sandbox_router = APIRouter() sandbox_manager: SandboxManager @@ -73,6 +95,25 @@ async def start_async( return RockResponse(result=sandbox_start_response) +@sandbox_router.post("/start_async_with_env") +@handle_exceptions(error_message="async start sandbox with env_dir failed") +async def start_async_with_env( + request: Annotated[SandboxStartRequest, Depends(_sandbox_start_request_from_form)], + headers: Annotated[StartHeaders, Depends()], + env_dir_tar: UploadFile = File(...), +) -> RockResponse[SandboxStartResponse]: + config = DockerDeploymentConfig.from_request(request) + tar_bytes = await env_dir_tar.read() + config.env_dir_tar_ref = ray.put(tar_bytes) + await _apply_kata_runtime_switch(config) + sandbox_start_response = await sandbox_manager.start_async( + config, + user_info=headers.user_info, + cluster_info=headers.cluster_info, + ) + return RockResponse(result=sandbox_start_response) + + @sandbox_router.get("/is_alive") @handle_exceptions(error_message="get sandbox is alive failed") async def is_alive(sandbox_id: str): diff --git a/rock/deployments/config.py b/rock/deployments/config.py index c5a65e5f1..9b23984b2 100644 --- a/rock/deployments/config.py +++ b/rock/deployments/config.py @@ -7,7 +7,7 @@ """ from abc import abstractmethod -from typing import Literal +from typing import Any, Literal from pydantic import BaseModel, ConfigDict, Field, model_validator @@ -115,6 +115,9 @@ class DockerDeploymentConfig(DeploymentConfig): extended_params: dict[str, str] = Field(default_factory=dict) """Generic extension field for storing custom string key-value pairs.""" + env_dir_tar_ref: Any = None + """Optional Ray ObjectRef of tar.gz bytes (build context). When set, worker extracts and runs docker build before run.""" + @model_validator(mode="before") def validate_platform_args(cls, data: dict) -> dict: """Validate and extract platform arguments from docker_args. diff --git a/rock/deployments/docker.py b/rock/deployments/docker.py index 0cb8c98b6..b17d6fc68 100644 --- a/rock/deployments/docker.py +++ b/rock/deployments/docker.py @@ -4,12 +4,17 @@ import random import shlex import subprocess +import tarfile +import tempfile import time import uuid from concurrent.futures import ThreadPoolExecutor +from io import BytesIO from pathlib import Path from typing import Any +import ray + from typing_extensions import Self from rock import env_vars @@ -20,7 +25,7 @@ from rock.deployments.hooks.abstract import CombinedDeploymentHook, DeploymentHook from rock.deployments.runtime_env import DockerRuntimeEnv, LocalRuntimeEnv, PipRuntimeEnv, UvRuntimeEnv from rock.deployments.sandbox_validator import DockerSandboxValidator -from rock.deployments.status import PersistedServiceStatus, ServiceStatus +from rock.deployments.status import PhaseStatus, PersistedServiceStatus, ServiceStatus from rock.logger import init_logger from rock.rocklet import PACKAGE_NAME, REMOTE_EXECUTABLE_NAME from rock.rocklet.exceptions import DeploymentNotStartedError, DockerPullError @@ -63,6 +68,8 @@ def __init__( self._check_stop_task = None self._container_name = None self._service_status = PersistedServiceStatus() + if getattr(self._config, "env_dir_tar_ref", None) is not None: + self._service_status.add_phase("docker_build", PhaseStatus()) if self._config.container_name: self.set_container_name(self._config.container_name) if env_vars.ROCK_WORKER_ENV_TYPE == "docker": @@ -285,6 +292,40 @@ def _memory(self): def _cpus(self): return [f"--cpus={self.config.cpus}"] + def _build_image_from_env_dir_tar(self, tar_bytes: bytes) -> str: + """Extract env_dir tar.gz and run docker build; return image tag (self._config.image).""" + self._service_status.update_status( + phase_name="docker_build", status=Status.RUNNING, message="building from env_dir context" + ) + with tempfile.TemporaryDirectory(prefix="rock_env_") as tmpdir: + path = Path(tmpdir) + with tarfile.open(fileobj=BytesIO(tar_bytes), mode="r:gz") as tar: + tar.extractall(path) + context_dir = path + dockerfile_path = context_dir / "Dockerfile" + if not dockerfile_path.exists(): + raise FileNotFoundError(f"Dockerfile not found in env_dir context: {dockerfile_path}") + build_cmd = [ + "docker", + "build", + "-f", + str(dockerfile_path), + "-t", + self._config.image, + str(context_dir), + ] + logger.info("Running docker build from env_dir context: %s", shlex.join(build_cmd)) + try: + subprocess.run(build_cmd, check=True, capture_output=True, timeout=1800) + except subprocess.CalledProcessError as e: + msg = f"docker build failed: {e.stderr.decode() if e.stderr else str(e)}" + self._service_status.update_status(phase_name="docker_build", status=Status.FAILED, message=msg) + raise RuntimeError(msg) from e + self._service_status.update_status( + phase_name="docker_build", status=Status.SUCCESS, message="docker build success" + ) + return self._config.image + async def start(self): """Starts the runtime.""" if not self.sandbox_validator.check_availability(): @@ -295,11 +336,18 @@ async def start(self): self._service_status.set_sandbox_id(self._container_name) executor = get_executor() loop = asyncio.get_running_loop() - await loop.run_in_executor(executor, self._pull_image) - if self._config.python_standalone_dir is not None: - image_id = self._build_image() + + if self._config.env_dir_tar_ref is not None: + tar_bytes = ray.get(self._config.env_dir_tar_ref) + image_id = await loop.run_in_executor( + executor, self._build_image_from_env_dir_tar, tar_bytes + ) else: - image_id = self._config.image + await loop.run_in_executor(executor, self._pull_image) + if self._config.python_standalone_dir is not None: + image_id = self._build_image() + else: + image_id = self._config.image if not self.sandbox_validator.check_resource(image_id): raise Exception(f"Image {image_id} is not valid") diff --git a/rock/sandbox/operator/ray.py b/rock/sandbox/operator/ray.py index 084a53a7e..46e7583b6 100644 --- a/rock/sandbox/operator/ray.py +++ b/rock/sandbox/operator/ray.py @@ -52,7 +52,12 @@ def _generate_actor_options(self, config: DockerDeploymentConfig) -> dict: async def submit(self, config: DockerDeploymentConfig, user_info: dict = {}) -> SandboxInfo: async with self._ray_service.get_ray_rwlock().read_lock(): sandbox_id = config.container_name - logger.info(f"[{sandbox_id}] start_async params:{json.dumps(config.model_dump(), indent=2)}") + log_params = config.model_dump() + # env_dir_tar_ref is a Ray ObjectRef, not JSON-serializable + if "env_dir_tar_ref" in log_params and log_params["env_dir_tar_ref"] is not None: + log_params = {k: v for k, v in log_params.items() if k != "env_dir_tar_ref"} + log_params["env_dir_tar_ref"] = "" + logger.info(f"[{sandbox_id}] start_async params:{json.dumps(log_params, indent=2)}") sandbox_actor: SandboxActor = await self.create_actor(config) sandbox_actor.start.remote() user_id = user_info.get("user_id", "default") diff --git a/rock/sandbox/sandbox_manager.py b/rock/sandbox/sandbox_manager.py index 4aec1e4fc..0e3649277 100644 --- a/rock/sandbox/sandbox_manager.py +++ b/rock/sandbox/sandbox_manager.py @@ -116,6 +116,8 @@ async def start_async( self.validate_sandbox_spec(self.rock_config.runtime, config) docker_deployment_config: DockerDeploymentConfig = await self.deployment_manager.init_config(config) sandbox_id = docker_deployment_config.container_name + if getattr(docker_deployment_config, "env_dir_tar_ref", None) is not None and not (docker_deployment_config.image or "").strip(): + docker_deployment_config.image = f"rock_env_image:{sandbox_id}" sandbox_info: SandboxInfo = await self._operator.submit(docker_deployment_config, user_info) stop_time = str(int(time.time()) + docker_deployment_config.auto_clear_time * 60) auto_clear_time_dict = { diff --git a/rock/sdk/sandbox/client.py b/rock/sdk/sandbox/client.py index c18912700..d44ba624e 100644 --- a/rock/sdk/sandbox/client.py +++ b/rock/sdk/sandbox/client.py @@ -1,7 +1,9 @@ import asyncio +import io import logging import mimetypes import os +import tarfile import time import uuid import warnings @@ -160,22 +162,52 @@ async def _parse_error_message_from_status(self, status: dict): # If no failed stage is found, return None return None + @staticmethod + def _pack_env_dir_to_tar_gz(env_dir: Path) -> bytes: + """Pack a directory (docker build context) into a gzipped tar as bytes.""" + buf = io.BytesIO() + env_dir = Path(env_dir).resolve() + if not env_dir.is_dir(): + raise ValueError(f"env_dir is not a directory: {env_dir}") + dockerfile = env_dir / "Dockerfile" + if not dockerfile.exists(): + raise ValueError(f"Dockerfile not found in env_dir: {dockerfile}") + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + tar.add(env_dir, arcname=".", filter=lambda ti: None if ti.name == ".git" else ti) + return buf.getvalue() + async def start(self): - url = f"{self._url}/start_async" headers = self._build_headers() - data = { - "image": self.config.image, - "image_os": self.config.image_os, + form_data = { "auto_clear_time": self.config.auto_clear_seconds / 60, "auto_clear_time_minutes": self.config.auto_clear_seconds / 60, "startup_timeout": self.config.startup_timeout, "memory": self.config.memory, "cpus": self.config.cpus, } - try: - response = await HttpUtils.post(url, headers, data) - except Exception as e: - raise Exception(f"Failed to start standbox: {str(e)}, post url {url}") + if not self.config.env_dir: + form_data["image"] = self.config.image + form_data["image_os"] = self.config.image_os + if self.config.env_dir: + # env_dir build: server uses rock_env_image: as image tag + env_dir = Path(self.config.env_dir).resolve() + tar_bytes = self._pack_env_dir_to_tar_gz(env_dir) + url = f"{self._url}/start_async_with_env" + try: + response = await HttpUtils.post_multipart( + url, + headers, + data=form_data, + files={"env_dir_tar": ("env_dir.tar.gz", tar_bytes, "application/gzip")}, + ) + except Exception as e: + raise Exception(f"Failed to start sandbox with env_dir: {str(e)}, post url {url}") + else: + url = f"{self._url}/start_async" + try: + response = await HttpUtils.post(url, headers, form_data) + except Exception as e: + raise Exception(f"Failed to start sandbox: {str(e)}, post url {url}") logging.debug(f"Start sandbox response: {response}") if "Success" != response.get("status"): diff --git a/rock/sdk/sandbox/config.py b/rock/sdk/sandbox/config.py index ea933f05e..ede0bfa91 100644 --- a/rock/sdk/sandbox/config.py +++ b/rock/sdk/sandbox/config.py @@ -1,4 +1,6 @@ import warnings +from pathlib import Path +from typing import Union from pydantic import BaseModel, Field, field_validator @@ -39,6 +41,8 @@ class SandboxConfig(BaseConfig): experiment_id: str | None = None cluster: str = "zb" namespace: str | None = None + env_dir: Union[str, Path, None] = None + """Optional path to docker build context directory (must contain Dockerfile). When set, SDK packs it as tar.gz and uploads; worker will docker build then run. Image tag is server-derived as rock_env_image:.""" class SandboxGroupConfig(SandboxConfig): diff --git a/tests/integration/sdk/sandbox/test_env_dir_build.py b/tests/integration/sdk/sandbox/test_env_dir_build.py new file mode 100644 index 000000000..8ec709eb4 --- /dev/null +++ b/tests/integration/sdk/sandbox/test_env_dir_build.py @@ -0,0 +1,70 @@ +""" +Integration test for SDK start with env_dir (docker build context). + +- SDK packs env_dir (directory containing Dockerfile) as tar.gz and POSTs to + /start_async_with_env; admin passes the archive to the SandboxActor via Ray; + the actor extracts, runs docker build, then docker run. + +Run: pytest tests/integration/sdk/sandbox/test_env_dir_build.py -v + (with admin available and docker on the worker node) +""" +import tempfile +from pathlib import Path + +import pytest + +from rock.actions.sandbox.request import CreateBashSessionRequest +from rock.logger import init_logger +from rock.sdk.sandbox.client import Sandbox +from rock.sdk.sandbox.config import SandboxConfig + +from tests.integration.conftest import SKIP_IF_NO_DOCKER + +logger = init_logger(__name__) + +# Content of the file we COPY in the Dockerfile; verified inside the container. +ENV_DIR_TEST_FILE_CONTENT = "rock-env-dir-build-ok" + +@pytest.fixture +def minimal_env_dir(): + """A minimal docker build context: Dockerfile + a file to test COPY.""" + with tempfile.TemporaryDirectory(prefix="rock_env_dir_") as tmp: + path = Path(tmp) + (path / "app.txt").write_text(ENV_DIR_TEST_FILE_CONTENT) + (path / "Dockerfile").write_text( + "FROM python:3.11\n" + "COPY app.txt /opt/app.txt\n" + ) + yield path + + +@pytest.mark.need_admin +@SKIP_IF_NO_DOCKER +@pytest.mark.asyncio +async def test_sandbox_start_with_env_dir(admin_remote_server, minimal_env_dir): + """Start sandbox with env_dir: SDK packs context, admin passes to actor, actor builds and runs. + When env_dir is set, image is ignored; server uses a generated tag for the built image.""" + config = SandboxConfig( + memory="2g", + cpus=1.0, + startup_timeout=300, + base_url=f"{admin_remote_server.endpoint}:{admin_remote_server.port}", + env_dir=minimal_env_dir, + ) + sandbox = Sandbox(config) + try: + await sandbox.start() + assert sandbox.sandbox_id + await sandbox.create_session(CreateBashSessionRequest(session="default")) + result = await sandbox.arun(cmd="echo ok", session="default") + assert result.output is not None + assert "ok" in result.output + # Verify COPY in Dockerfile: file from build context is present with expected content + cat_result = await sandbox.arun(cmd="cat /opt/app.txt", session="default") + assert cat_result.output is not None + assert cat_result.output.strip() == ENV_DIR_TEST_FILE_CONTENT + finally: + try: + await sandbox.stop() + except Exception as e: + logger.warning("Failed to stop sandbox: %s", e)