diff --git a/rock/sdk/sandbox/model_service/base.py b/rock/sdk/sandbox/model_service/base.py index 1e96a3ec1..3b97b6ce4 100644 --- a/rock/sdk/sandbox/model_service/base.py +++ b/rock/sdk/sandbox/model_service/base.py @@ -1,15 +1,15 @@ from __future__ import annotations # Postpone annotation evaluation to avoid circular imports. import shlex -import time +from string import Template from typing import TYPE_CHECKING -from pydantic import BaseModel +from pydantic import BaseModel, Field from rock import env_vars -from rock.actions import CreateBashSessionRequest from rock.logger import init_logger -from rock.sdk.sandbox.utils import arun_with_retry +from rock.sdk.sandbox.runtime_env import PythonRuntimeEnv, PythonRuntimeEnvConfig, RuntimeEnv +from rock.sdk.sandbox.utils import with_time_logging if TYPE_CHECKING: from rock.sdk.sandbox.client import Sandbox @@ -20,45 +20,50 @@ class ModelServiceConfig(BaseModel): """Configuration for ModelService. - Attributes: - workdir: Working directory path for model service. - python_install_cmd: Command to install Python. - model_service_install_cmd: Command to install model service package. - python_install_timeout: Timeout for Python installation in seconds. - model_service_install_timeout: Timeout for model service installation in seconds. - model_service_session: Session name for model service. - session_envs: Environment variables for the session. - config_ini_cmd: Command to initialize config file. - model_service_type: Type of model service to start. - start_cmd: Command to start model service with model_service_type placeholder. - stop_cmd: Command to stop model service. - watch_agent_cmd: Command to watch agent with pid placeholder. - anti_call_llm_cmd: Command to anti-call LLM with index and response_payload placeholders. - anti_call_llm_cmd_no_response: Command to anti-call LLM with only index placeholder. - logging_path: Path for logging directory. Must be configured when starting ModelService. - logging_file_name: Name of the log file. + Provides unified commands for installation, startup/shutdown, + agent monitoring, and anti-call LLM operations. """ - workdir: str = "/tmp_model_service" - python_install_cmd: str = env_vars.ROCK_RTENV_PYTHON_V31114_INSTALL_CMD - model_service_install_cmd: str = env_vars.ROCK_MODEL_SERVICE_INSTALL_CMD - python_install_timeout: int = 300 - model_service_install_timeout: int = 300 - model_service_session: str = "model-service-session" - session_envs: dict[str, str] = {} + enable: bool = Field(default=False) + """Whether to enable the ModelService. When False, ModelService will not be initialized.""" - config_ini_cmd: str = "mkdir -p ~/.rock && touch ~/.rock/config.ini" - model_service_type: str = "local" - start_cmd: str = "rock model-service start --type {model_service_type}" - stop_cmd: str = "rock model-service stop" - watch_agent_cmd: str = "rock model-service watch-agent --pid {pid}" + type: str = Field(default="local") + """Type of model service to start.""" - anti_call_llm_cmd: str = "rock model-service anti-call-llm --index {index} --response {response_payload}" - anti_call_llm_cmd_no_response: str = "rock model-service anti-call-llm --index {index}" + install_cmd: str = Field(default=env_vars.ROCK_MODEL_SERVICE_INSTALL_CMD) + """Command to install model service package.""" - # Logging path must be configured when starting ModelService. - logging_path: str = "/data/logs" - logging_file_name: str = "model_service.log" + install_timeout: int = Field(default=300, gt=0) + """Timeout for model service installation in seconds.""" + + runtime_env_config: PythonRuntimeEnvConfig = Field(default_factory=PythonRuntimeEnvConfig) + """Runtime environment configuration for the model service.""" + + start_cmd: str = Field(default="rock model-service start --type ${type}") + """Command to start model service with type placeholder.""" + + stop_cmd: str = Field(default="rock model-service stop") + """Command to stop model service.""" + + config_ini_cmd: str = Field(default="mkdir -p ~/.rock && touch ~/.rock/config.ini") + """Command to create Rock config file.""" + + watch_agent_cmd: str = Field(default="rock model-service watch-agent --pid ${pid}") + """Command to watch agent with pid placeholder.""" + + anti_call_llm_cmd: str = Field( + default="rock model-service anti-call-llm --index ${index} --response ${response_payload}" + ) + """Command to anti-call LLM with index and response_payload placeholders.""" + + anti_call_llm_cmd_no_response: str = Field(default="rock model-service anti-call-llm --index ${index}") + """Command to anti-call LLM with only index placeholder.""" + + logging_path: str = Field(default="/data/logs") + """Path for logging directory. Must be configured when starting ModelService.""" + + logging_file_name: str = Field(default="model_service.log") + """Name of the log file.""" class ModelService: @@ -80,18 +85,19 @@ def __init__(self, sandbox: Sandbox, config: ModelServiceConfig): """ self._sandbox = sandbox self.config = config + + self.runtime_env: PythonRuntimeEnv | None = None + self.is_installed = False self.is_started = False - logger.debug(f"ModelService initialized: workdir={config.workdir}") + @with_time_logging("Installing model service") async def install(self) -> None: """Install model service in the sandbox. Performs the following installation steps: - 1. Create a bash session for model service. - 2. Create working directory and Rock config file. - 3. Install Python. - 4. Install model service package. + 1. Create and initialize Python runtime environment (via RuntimeEnv). + 2. Install model service package. Note: Caller should ensure this is not called concurrently or repeatedly. @@ -99,187 +105,81 @@ async def install(self) -> None: Raises: Exception: If any installation step fails. """ - sandbox_id = self._sandbox.sandbox_id - install_start_time = time.time() + # Initialize runtime env (installs Python) + self.runtime_env = RuntimeEnv.from_config(self._sandbox, self.config.runtime_env_config) + await self.runtime_env.init() - try: - logger.info(f"[{sandbox_id}] Starting model service installation") + await self._create_rock_config() + await self._install_model_service() - # Step 1: Create bash session - step_start_time = time.time() - logger.debug( - f"[{sandbox_id}] Step 1: Creating bash session: {self.config.model_service_session}, " - f"env_enable=True, session_envs={self.config.session_envs}" - ) - await self._sandbox.create_session( - CreateBashSessionRequest( - session=self.config.model_service_session, - env_enable=True, - env=self.config.session_envs, - ) - ) - step_elapsed = time.time() - step_start_time - logger.info(f"[{sandbox_id}] Step 1 completed: Bash session created (elapsed: {step_elapsed:.2f}s)") - - # Step 2: Create working directory and Rock config file - step_start_time = time.time() - mkdir_cmd = f"mkdir -p {self.config.workdir}" - logger.debug(f"[{sandbox_id}] Step 2a: {mkdir_cmd}") - await self._sandbox.arun( - cmd=mkdir_cmd, - session=self.config.model_service_session, - ) - - logger.debug(f"[{sandbox_id}] Step 2b: {self.config.config_ini_cmd}") - await self._sandbox.arun( - cmd=self.config.config_ini_cmd, - session=self.config.model_service_session, - ) - step_elapsed = time.time() - step_start_time - logger.info( - f"[{sandbox_id}] Step 2 completed: Working directory and config initialized (elapsed: {step_elapsed:.2f}s)" - ) - - # Step 3: Install Python - step_start_time = time.time() - python_install_cmd = f"cd {self.config.workdir} && {self.config.python_install_cmd}" - bash_python_cmd = f"bash -c {shlex.quote(python_install_cmd)}" - logger.debug( - f"[{sandbox_id}] Step 3: Installing Python (timeout: {self.config.python_install_timeout}s), " - f"cmd: {bash_python_cmd}" - ) - await arun_with_retry( - sandbox=self._sandbox, - cmd=bash_python_cmd, - session=self.config.model_service_session, - mode="nohup", - wait_timeout=self.config.python_install_timeout, - error_msg="Python installation failed", - ) - step_elapsed = time.time() - step_start_time - logger.info(f"[{sandbox_id}] Step 3 completed: Python installation finished (elapsed: {step_elapsed:.2f}s)") - - # Step 4: Install model service - step_start_time = time.time() - model_service_install_cmd = ( - f"export PATH={self.config.workdir}/python/bin:$PATH && " - f"cd {self.config.workdir} && {self.config.model_service_install_cmd}" - ) - bash_service_cmd = f"bash -c {shlex.quote(model_service_install_cmd)}" - logger.debug( - f"[{sandbox_id}] Step 4: Installing model service package (timeout: {self.config.model_service_install_timeout}s), " - f"cmd: {bash_service_cmd}" - ) - await arun_with_retry( - sandbox=self._sandbox, - cmd=bash_service_cmd, - session=self.config.model_service_session, - mode="nohup", - wait_timeout=self.config.model_service_install_timeout, - error_msg="Model service installation failed", - ) - step_elapsed = time.time() - step_start_time - logger.info( - f"[{sandbox_id}] Step 4 completed: Model service package installation finished (elapsed: {step_elapsed:.2f}s)" - ) + self.is_installed = True - total_elapsed = time.time() - install_start_time - logger.info(f"[{sandbox_id}] Installation finished successfully (total elapsed: {total_elapsed:.2f}s)") + async def _create_rock_config(self) -> None: + """Create Rock config file.""" + await self.runtime_env.run(cmd=self.config.config_ini_cmd) - self.is_installed = True + @with_time_logging("Installing model service package") + async def _install_model_service(self) -> None: + """Install model service package using runtime_env.run().""" + install_cmd = f"cd {self.runtime_env.workdir} && {self.config.install_cmd}" - except Exception as e: - total_elapsed = time.time() - install_start_time - logger.error(f"[{sandbox_id}] Installation failed: {str(e)} (elapsed: {total_elapsed:.2f}s)", exc_info=True) - raise + await self.runtime_env.run( + cmd=install_cmd, + wait_timeout=self.config.install_timeout, + error_msg="Model service installation failed", + ) + @with_time_logging("Starting model service") async def start(self) -> None: """Start the model service in the sandbox. Starts the service with configured logging settings. Note: - Caller should ensure install() has been called first and this is not called concurrently. + Caller should ensure install() has been called first. Raises: + RuntimeError: If service is not installed Exception: If service startup fails. """ - sandbox_id = self._sandbox.sandbox_id - start_time = time.time() - if not self.is_installed: error_msg = ( - f"[{sandbox_id}] Cannot start model service: ModelService has not been installed yet. " + f"[{self._sandbox.sandbox_id}] Cannot start model service: ModelService has not been installed yet. " f"Please call install() first." ) logger.error(error_msg) raise RuntimeError(error_msg) - try: - start_cmd = ( - f"export ROCK_LOGGING_PATH={self.config.logging_path} && " - f"export ROCK_LOGGING_FILE_NAME={self.config.logging_file_name} && " - f"{self.config.workdir}/python/bin/{self.config.stop_cmd} && " - f"{self.config.workdir}/python/bin/{self.config.start_cmd.format(model_service_type=self.config.model_service_type)}" - ) - bash_start_cmd = f"bash -c {shlex.quote(start_cmd)}" - logger.debug(f"[{sandbox_id}] Model service Start command: {bash_start_cmd}") + bash_start_cmd = ( + f"export ROCK_LOGGING_PATH={self.config.logging_path} && " + f"export ROCK_LOGGING_FILE_NAME={self.config.logging_file_name} && " + f"{self.config.stop_cmd} && " + f"{Template(self.config.start_cmd).safe_substitute(type=self.config.type)}" + ) + logger.debug(f"[{self._sandbox.sandbox_id}] Model service Start command: {bash_start_cmd}") - await self._sandbox.arun( - cmd=bash_start_cmd, - session=None, - mode="nohup", - ) - elapsed = time.time() - start_time - logger.info(f"[{sandbox_id}] Model service started successfully (elapsed: {elapsed:.2f}s)") - self.is_started = True - - except Exception as e: - elapsed = time.time() - start_time - logger.error( - f"[{sandbox_id}] Model service startup failed: {str(e)} (elapsed: {elapsed:.2f}s)", exc_info=True - ) - raise + await self.runtime_env.run(cmd=bash_start_cmd) + + self.is_started = True + @with_time_logging("Stopping model service") async def stop(self) -> None: """Stop the model service. Note: Caller should ensure proper sequencing with start(). - - Raises: - Exception: If service stop fails. """ - sandbox_id = self._sandbox.sandbox_id - start_time = time.time() - if not self.is_started: logger.warning( - f"[{sandbox_id}] Model service is not running, skipping stop operation. is_started={self.is_started}" + f"[{self._sandbox.sandbox_id}] Model service is not running, skipping stop operation. is_started={self.is_started}" ) return - try: - logger.info(f"[{sandbox_id}] Stopping model service") - - stop_cmd = f"{self.config.workdir}/python/bin/{self.config.stop_cmd}" - bash_stop_cmd = f"bash -c {shlex.quote(stop_cmd)}" - - await self._sandbox.arun( - cmd=bash_stop_cmd, - session=None, - mode="nohup", - ) - - elapsed = time.time() - start_time - logger.info(f"[{sandbox_id}] Model service stopped (elapsed: {elapsed:.2f}s)") - self.is_started = False + await self.runtime_env.run(cmd=self.config.stop_cmd) - except Exception as e: - elapsed = time.time() - start_time - logger.error(f"[{sandbox_id}] Stop failed: {str(e)} (elapsed: {elapsed:.2f}s)", exc_info=True) - raise + self.is_started = False + @with_time_logging("Watching agent") async def watch_agent(self, pid: str) -> None: """Watch agent process with the specified PID. @@ -290,34 +190,20 @@ async def watch_agent(self, pid: str) -> None: Caller should ensure start() has been called first. Raises: + RuntimeError: If service is not started Exception: If watch fails. """ - sandbox_id = self._sandbox.sandbox_id - start_time = time.time() - if not self.is_started: - error_msg = f"[{sandbox_id}] Cannot watch agent: ModelService is not started. Please call start() first." + error_msg = f"[{self._sandbox.sandbox_id}] Cannot watch agent: ModelService is not started. Please call start() first." logger.error(error_msg) raise RuntimeError(error_msg) - try: - watch_agent_cmd = f"{self.config.workdir}/python/bin/{self.config.watch_agent_cmd.format(pid=pid)}" - bash_watch_cmd = f"bash -c {shlex.quote(watch_agent_cmd)}" - logger.debug(f"[{sandbox_id}] Model service watch agent with pid={pid}, cmd: {bash_watch_cmd}") - - await self._sandbox.arun( - cmd=bash_watch_cmd, - session=None, - mode="nohup", - ) - elapsed = time.time() - start_time - logger.info(f"[{sandbox_id}] Watch agent completed (elapsed: {elapsed:.2f}s)") + bash_watch_cmd = Template(self.config.watch_agent_cmd).safe_substitute(pid=pid) + logger.debug(f"[{self._sandbox.sandbox_id}] Model service watch agent with pid={pid}, cmd: {bash_watch_cmd}") - except Exception as e: - elapsed = time.time() - start_time - logger.error(f"[{sandbox_id}] Watch agent failed: {str(e)} (elapsed: {elapsed:.2f}s)", exc_info=True) - raise + await self.runtime_env.run(cmd=bash_watch_cmd) + @with_time_logging("Executing anti-call LLM") async def anti_call_llm( self, index: int, @@ -343,10 +229,10 @@ async def anti_call_llm( Caller should ensure start() has been called first. Raises: + RuntimeError: If service is not started Exception: If operation fails. """ sandbox_id = self._sandbox.sandbox_id - start_time = time.time() if not self.is_started: error_msg = ( @@ -355,38 +241,35 @@ async def anti_call_llm( logger.error(error_msg) raise RuntimeError(error_msg) - try: - logger.info( - f"[{sandbox_id}] Executing anti-call LLM: index={index}, " - f"has_response={response_payload is not None}, timeout={call_timeout}s" - ) - - if response_payload: - cmd = self.config.anti_call_llm_cmd.format( - index=index, - response_payload=shlex.quote(response_payload), - ) - else: - cmd = self.config.anti_call_llm_cmd_no_response.format(index=index) - - full_cmd = f"{self.config.workdir}/python/bin/{cmd}" - bash_cmd = f"bash -c {shlex.quote(full_cmd)}" - logger.debug(f"[{sandbox_id}] Executing command: {bash_cmd}") - - result = await self._sandbox.arun( - cmd=bash_cmd, - mode="nohup", - session=None, # Start a new session to ensure clean context without session interference. - wait_timeout=call_timeout, - wait_interval=check_interval, - ) - - elapsed = time.time() - start_time - logger.info(f"[{sandbox_id}] Anti-call LLM execution completed (elapsed: {elapsed:.2f}s)") + logger.info( + f"[{sandbox_id}] Executing anti-call LLM: index={index}, " + f"has_response={response_payload is not None}, timeout={call_timeout}s" + ) - return result.output + from rock.sdk.sandbox.client import RunMode - except Exception as e: - elapsed = time.time() - start_time - logger.error(f"[{sandbox_id}] Anti-call LLM failed: {str(e)} (elapsed: {elapsed:.2f}s)", exc_info=True) - raise + if response_payload: + cmd = Template(self.config.anti_call_llm_cmd).safe_substitute( + index=index, + response_payload=shlex.quote(response_payload), + ) + else: + cmd = Template(self.config.anti_call_llm_cmd_no_response).safe_substitute(index=index) + + # We chose to use runtime_env's wrapped_cmd instead of the run method here, + # mainly to avoid unexpected behavior caused by sharing a session with runtime_env + bash_cmd = self.runtime_env.wrapped_cmd(cmd) + logger.debug(f"[{sandbox_id}] Executing command: {bash_cmd}") + + result = await self._sandbox.arun( + cmd=bash_cmd, + mode=RunMode.NOHUP, + session=None, + wait_timeout=call_timeout, + wait_interval=check_interval, + ) + + if result.exit_code != 0: + raise RuntimeError(f"Anti-call LLM command failed: {result.output}") + + return result.output diff --git a/rock/sdk/sandbox/runtime_env/base.py b/rock/sdk/sandbox/runtime_env/base.py index 74da8c325..1a6ff4b47 100644 --- a/rock/sdk/sandbox/runtime_env/base.py +++ b/rock/sdk/sandbox/runtime_env/base.py @@ -100,6 +100,11 @@ def runtime_env_id(self) -> RuntimeEnvId: """Unique ID for this runtime env instance.""" return self._runtime_env_id + @property + def workdir(self) -> str: + """Working directory for this runtime env instance.""" + return self._workdir + async def init(self) -> None: """Initialize the runtime environment.