diff --git a/docs/i18n/zh-Hans/docusaurus-plugin-content-docs/version-1.3.x/References/Python SDK References/python_sdk.md b/docs/i18n/zh-Hans/docusaurus-plugin-content-docs/version-1.3.x/References/Python SDK References/python_sdk.md index c1083f29b..4d6870727 100644 --- a/docs/i18n/zh-Hans/docusaurus-plugin-content-docs/version-1.3.x/References/Python SDK References/python_sdk.md +++ b/docs/i18n/zh-Hans/docusaurus-plugin-content-docs/version-1.3.x/References/Python SDK References/python_sdk.md @@ -258,6 +258,101 @@ if __name__ == "__main__": main() ``` +## 4. Model Service SDK + +### 4.1 概述 + +Model Service SDK 提供与 Model Service 交互的接口,支持 Agent 工作流中的 LLM 请求/响应处理。`ModelClient` 类是读取请求和写入响应到模型服务日志文件的主要接口。 + +### 4.2 基本用法 + +```python +import asyncio +from rock.sdk.model.client import ModelClient + +async def main(): + # 创建 ModelClient 实例 + client = ModelClient() + + # 获取第一个请求 (index=0) + first_request = await client.anti_call_llm(index=0) + print(f"第一个请求: {first_request}") + + # 发送响应并获取下一个请求 + llm_response = '{"content": "你好,有什么可以帮你的?"}' + next_request = await client.anti_call_llm(index=1, last_response=llm_response) + print(f"下一个请求: {next_request}") + +asyncio.run(main()) +``` + +### 4.3 超时与取消支持 + +`pop_request` 和 `wait_for_first_request` 方法支持超时和取消,防止无限阻塞: + +#### 超时配置 + +```python +import asyncio +from rock.sdk.model.client import ModelClient + +async def main(): + client = ModelClient() + + try: + # 等待第一个请求,超时时间为 30 秒 + await client.wait_for_first_request(timeout=30.0) + + # 弹出请求,超时时间为 60 秒(默认值) + request = await client.pop_request(index=1) + except TimeoutError as e: + print(f"操作超时: {e}") + +asyncio.run(main()) +``` + +#### 取消处理 + +```python +import asyncio +from rock.sdk.model.client import ModelClient + +async def main(): + client = ModelClient() + + async def get_request(): + try: + request = await client.pop_request(index=1) + return request + except asyncio.CancelledError: + print("请求被取消") + raise + + # 创建可取消的任务 + task = asyncio.create_task(get_request()) + + # 5 秒后取消 + await asyncio.sleep(5) + task.cancel() + +asyncio.run(main()) +``` + +### 4.4 默认超时时间 + +轮询操作的默认超时时间为 **60 秒**。您可以自定义此值: + +```python +# 使用默认超时(60 秒) +await client.pop_request(index=1) + +# 自定义超时(30 秒) +await client.pop_request(index=1, timeout=30.0) + +# 无超时(无限等待 - 不推荐) +await client.pop_request(index=1, timeout=None) +``` + ## 相关文档 - [快速开始指南](../../Getting%20Started/quickstart.md) - 了解如何快速开始使用 ROCK SDK - [API 文档](../api.md) - 查看 SDK 封装的底层 API 接口 diff --git a/docs/versioned_docs/version-1.3.x/References/Python SDK References/python_sdk.md b/docs/versioned_docs/version-1.3.x/References/Python SDK References/python_sdk.md index 5272d7edb..13fd78ae5 100644 --- a/docs/versioned_docs/version-1.3.x/References/Python SDK References/python_sdk.md +++ b/docs/versioned_docs/version-1.3.x/References/Python SDK References/python_sdk.md @@ -258,6 +258,101 @@ if __name__ == "__main__": main() ``` +## 4. Model Service SDK + +### 4.1 Overview + +The Model Service SDK provides interfaces for interacting with the Model Service, enabling LLM request/response handling for agent workflows. The `ModelClient` class is the primary interface for reading requests and writing responses to the model service log file. + +### 4.2 Basic Usage + +```python +import asyncio +from rock.sdk.model.client import ModelClient + +async def main(): + # Create a ModelClient instance + client = ModelClient() + + # Get the first request (index=0) + first_request = await client.anti_call_llm(index=0) + print(f"First request: {first_request}") + + # Send a response and get the next request + llm_response = '{"content": "Hello, how can I help?"}' + next_request = await client.anti_call_llm(index=1, last_response=llm_response) + print(f"Next request: {next_request}") + +asyncio.run(main()) +``` + +### 4.3 Timeout and Cancellation Support + +The `pop_request` and `wait_for_first_request` methods support timeout and cancellation to prevent indefinite blocking: + +#### Timeout Configuration + +```python +import asyncio +from rock.sdk.model.client import ModelClient + +async def main(): + client = ModelClient() + + try: + # Wait for first request with a 30-second timeout + await client.wait_for_first_request(timeout=30.0) + + # Pop request with a 60-second timeout (default) + request = await client.pop_request(index=1) + except TimeoutError as e: + print(f"Operation timed out: {e}") + +asyncio.run(main()) +``` + +#### Cancellation Handling + +```python +import asyncio +from rock.sdk.model.client import ModelClient + +async def main(): + client = ModelClient() + + async def get_request(): + try: + request = await client.pop_request(index=1) + return request + except asyncio.CancelledError: + print("Request was cancelled") + raise + + # Create a task that can be cancelled + task = asyncio.create_task(get_request()) + + # Cancel after 5 seconds + await asyncio.sleep(5) + task.cancel() + +asyncio.run(main()) +``` + +### 4.4 Default Timeout + +The default timeout for polling operations is **60 seconds**. You can customize this: + +```python +# Use default timeout (60 seconds) +await client.pop_request(index=1) + +# Custom timeout (30 seconds) +await client.pop_request(index=1, timeout=30.0) + +# No timeout (wait indefinitely - not recommended) +await client.pop_request(index=1, timeout=None) +``` + ## Related Documents - [Quick Start Guide](../../Getting%20Started/quickstart.md) - Learn how to quickly get started with the ROCK SDK - [API Documentation](../api.md) - View the underlying API interfaces encapsulated by the SDK diff --git a/examples/model_client_demo.py b/examples/model_client_demo.py new file mode 100644 index 000000000..efaad9d66 --- /dev/null +++ b/examples/model_client_demo.py @@ -0,0 +1,168 @@ +""" +ModelClient Demo - Demonstrates timeout and cancellation support. + +This example shows how to use ModelClient with: +1. Timeout configuration +2. Cancellation handling +3. Basic request/response flow + +NOTE: This demo requires the model service to be running. +Start the model service with: rock model-service start --type local +""" + +import asyncio +import logging +import tempfile +from pathlib import Path + +from rock.sdk.model.client import ModelClient, DEFAULT_POLL_TIMEOUT +from rock.sdk.model.server.config import REQUEST_END_MARKER, REQUEST_START_MARKER + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s -- %(message)s", +) +logger = logging.getLogger(__name__) + + +async def demo_timeout(): + """Demonstrate timeout behavior when request is not found.""" + logger.info("=" * 60) + logger.info("Demo: Timeout behavior") + logger.info("=" * 60) + + # Create a temporary log file with a request at index 1 + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(f'{REQUEST_START_MARKER}{{"model": "gpt-4", "messages": []}}{REQUEST_END_MARKER}{{"index": 1}}\n') + log_file = f.name + + try: + client = ModelClient(log_file_name=log_file) + + # Try to get request at index 2, which doesn't exist + # This should timeout after 2 seconds + logger.info("Attempting to pop request at index 2 (doesn't exist)...") + try: + await client.pop_request(index=2, timeout=2.0) + except TimeoutError as e: + logger.info(f"TimeoutError caught as expected: {e}") + finally: + Path(log_file).unlink(missing_ok=True) + + logger.info("Demo: Timeout behavior - PASSED\n") + + +async def demo_cancellation(): + """Demonstrate cancellation handling.""" + logger.info("=" * 60) + logger.info("Demo: Cancellation handling") + logger.info("=" * 60) + + # Create a temporary log file with a request at index 1 + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(f'{REQUEST_START_MARKER}{{"model": "gpt-4", "messages": []}}{REQUEST_END_MARKER}{{"index": 1}}\n') + log_file = f.name + + try: + client = ModelClient(log_file_name=log_file) + + async def long_running_pop(): + try: + # This will wait indefinitely for index 2 + return await client.pop_request(index=2, timeout=100.0) + except asyncio.CancelledError: + logger.info("Task was cancelled, cleaning up...") + raise + + # Create a task that can be cancelled + task = asyncio.create_task(long_running_pop()) + + # Wait a bit and then cancel + await asyncio.sleep(1.0) + logger.info("Cancelling the task...") + task.cancel() + + try: + await task + except asyncio.CancelledError: + logger.info("CancelledError propagated correctly") + finally: + Path(log_file).unlink(missing_ok=True) + + logger.info("Demo: Cancellation handling - PASSED\n") + + +async def demo_happy_path(): + """Demonstrate normal request retrieval.""" + logger.info("=" * 60) + logger.info("Demo: Normal request retrieval") + logger.info("=" * 60) + + # Create a temporary log file with requests + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(f'{REQUEST_START_MARKER}{{"model": "gpt-4", "messages": [{{"role": "user", "content": "Hello"}}]}}{REQUEST_END_MARKER}{{"index": 1}}\n') + log_file = f.name + + try: + client = ModelClient(log_file_name=log_file) + + logger.info("Popping request at index 1...") + request = await client.pop_request(index=1, timeout=5.0) + logger.info(f"Got request: {request[:100]}...") + + if "gpt-4" in request: + logger.info("Request content verified successfully") + finally: + Path(log_file).unlink(missing_ok=True) + + logger.info("Demo: Normal request retrieval - PASSED\n") + + +async def demo_wait_for_first_request(): + """Demonstrate wait_for_first_request with timeout.""" + logger.info("=" * 60) + logger.info("Demo: wait_for_first_request with timeout") + logger.info("=" * 60) + + # Use a non-existent file to trigger timeout + client = ModelClient(log_file_name="/non/existent/path/file.log") + + logger.info("Waiting for first request (will timeout)...") + try: + await client.wait_for_first_request(timeout=1.0) + except TimeoutError as e: + logger.info(f"TimeoutError caught as expected: {e}") + + logger.info("Demo: wait_for_first_request with timeout - PASSED\n") + + +async def demo_default_timeout(): + """Show default timeout value.""" + logger.info("=" * 60) + logger.info("Demo: Default timeout configuration") + logger.info("=" * 60) + + logger.info(f"Default poll timeout: {DEFAULT_POLL_TIMEOUT} seconds") + logger.info("This default is applied to pop_request and wait_for_first_request") + logger.info("Demo: Default timeout configuration - PASSED\n") + + +async def main(): + """Run all demos.""" + logger.info("\n" + "=" * 60) + logger.info("ModelClient Demo - Timeout & Cancellation Support") + logger.info("=" * 60 + "\n") + + await demo_default_timeout() + await demo_happy_path() + await demo_timeout() + await demo_wait_for_first_request() + await demo_cancellation() + + logger.info("=" * 60) + logger.info("All demos completed successfully!") + logger.info("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/rock/env_vars.py b/rock/env_vars.py index be1f98899..4871d0954 100644 --- a/rock/env_vars.py +++ b/rock/env_vars.py @@ -48,6 +48,7 @@ # Model Service Config ROCK_MODEL_SERVICE_DATA_DIR: str ROCK_MODEL_SERVICE_TRAJ_APPEND_MODE: bool | None = None + ROCK_MODEL_CLIENT_POLL_TIMEOUT: float | None = None # RuntimeEnv ROCK_RTENV_PYTHON_V31114_INSTALL_CMD: str @@ -101,6 +102,7 @@ "ROCK_MODEL_SERVICE_DATA_DIR": lambda: os.getenv("ROCK_MODEL_SERVICE_DATA_DIR", "/data/logs"), "ROCK_MODEL_SERVICE_TRAJ_APPEND_MODE": lambda: os.getenv("ROCK_MODEL_SERVICE_TRAJ_APPEND_MODE", "false").lower() == "true", + "ROCK_MODEL_CLIENT_POLL_TIMEOUT": lambda: float(val) if (val := os.getenv("ROCK_MODEL_CLIENT_POLL_TIMEOUT")) else None, "ROCK_RTENV_PYTHON_V31114_INSTALL_CMD": lambda: os.getenv( "ROCK_RTENV_PYTHON_V31114_INSTALL_CMD", "[ -f cpython31114.tar.gz ] && rm cpython31114.tar.gz; [ -d python ] && rm -rf python; wget -q -O cpython31114.tar.gz https://github.com/astral-sh/python-build-standalone/releases/download/20251120/cpython-3.11.14+20251120-x86_64-unknown-linux-gnu-install_only.tar.gz && tar -xzf cpython31114.tar.gz && mv python runtime-env", diff --git a/rock/sdk/model/client.py b/rock/sdk/model/client.py index 32b4947c2..bd93accee 100644 --- a/rock/sdk/model/client.py +++ b/rock/sdk/model/client.py @@ -4,6 +4,7 @@ import time from pathlib import Path +from rock import env_vars from rock.sdk.model.server.config import ( LOG_FILE, REQUEST_END_MARKER, @@ -58,16 +59,37 @@ async def _append_response(self, content: str): with open(self.log_file, "a") as f: f.write(content) - async def pop_request(self, index: int) -> str: + async def pop_request(self, index: int, timeout: float | None = env_vars.ROCK_MODEL_CLIENT_POLL_TIMEOUT) -> str: + """Pop the request with the given index from the log file. + + Args: + index: The index of the request to pop. + timeout: Maximum time to wait in seconds. Uses ROCK_MODEL_CLIENT_POLL_TIMEOUT + environment variable by default, or no timeout if not set. + + Returns: + The request JSON string. + + Raises: + TimeoutError: If timeout expires before the request is found. + asyncio.CancelledError: If the operation is cancelled. + """ + start_time = time.monotonic() while True: - last_request_line = await self.read_last_request_line() - request_json, meta = await self.parse_request_line(last_request_line) - if SESSION_END_MARKER == request_json: - return SESSION_END_MARKER - if meta.get("index") == index: - return request_json - logger.debug(f"Last request {last_request_line} is not the index {index} we want, waiting...") - await asyncio.sleep(1) + if timeout is not None and time.monotonic() - start_time > timeout: + raise TimeoutError(f"pop_request timed out after {timeout} seconds") + try: + last_request_line = await self.read_last_request_line() + request_json, meta = await self.parse_request_line(last_request_line) + if SESSION_END_MARKER == request_json: + return SESSION_END_MARKER + if meta.get("index") == index: + return request_json + logger.debug(f"Last request {last_request_line} is not the index {index} we want, waiting...") + await asyncio.sleep(1) + except asyncio.CancelledError: + logger.info(f"pop_request(index={index}) cancelled") + raise async def parse_request_line(self, line_content: str) -> tuple[str, dict]: if SESSION_END_MARKER in line_content: @@ -106,20 +128,37 @@ async def read_last_response_line(self) -> str | None: line_index -= 1 return None - async def wait_for_first_request(self): + async def wait_for_first_request(self, timeout: float | None = env_vars.ROCK_MODEL_CLIENT_POLL_TIMEOUT): + """Wait for the first request to be written to the log file. + + Args: + timeout: Maximum time to wait in seconds. Uses ROCK_MODEL_CLIENT_POLL_TIMEOUT + environment variable by default, or no timeout if not set. + + Raises: + TimeoutError: If timeout expires before the first request appears. + asyncio.CancelledError: If the operation is cancelled. + """ + start_time = time.monotonic() while True: - if not Path(self.log_file).exists(): - logger.debug(f"Log file {self.log_file} not found, waiting...") - await asyncio.sleep(1) - continue - with open(self.log_file) as f: - lines = f.readlines() - if len(lines) == 0: - logger.debug(f"Log file {self.log_file} is empty, waiting for the first request...") + if timeout is not None and time.monotonic() - start_time > timeout: + raise TimeoutError(f"wait_for_first_request timed out after {timeout} seconds") + try: + if not Path(self.log_file).exists(): + logger.debug(f"Log file {self.log_file} not found, waiting...") await asyncio.sleep(1) continue - else: - return + with open(self.log_file) as f: + lines = f.readlines() + if len(lines) == 0: + logger.debug(f"Log file {self.log_file} is empty, waiting for the first request...") + await asyncio.sleep(1) + continue + else: + return + except asyncio.CancelledError: + logger.info("wait_for_first_request cancelled") + raise async def _construct_response(self, last_response: str, index: int) -> str: meta = { diff --git a/tests/unit/sdk/model/test_model_client.py b/tests/unit/sdk/model/test_model_client.py index 917d382e1..06d011930 100644 --- a/tests/unit/sdk/model/test_model_client.py +++ b/tests/unit/sdk/model/test_model_client.py @@ -1,6 +1,13 @@ +import asyncio +import inspect +import tempfile +from pathlib import Path + import pytest +from rock import env_vars from rock.sdk.model.client import ModelClient +from rock.sdk.model.server.config import REQUEST_END_MARKER, REQUEST_START_MARKER @pytest.mark.asyncio @@ -23,3 +30,94 @@ async def test_parse_response_line(): response_json, meta = await client.parse_response_line(content) assert 1 == meta.get("index") assert "mock content" in response_json + + +# ==================== Timeout Tests ==================== + + +@pytest.mark.asyncio +async def test_pop_request_raises_timeout_error_when_timeout_expires(): + """Test that pop_request raises TimeoutError when timeout expires.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + # Write a request with index 1, but we'll ask for index 2 + f.write(f'{REQUEST_START_MARKER}{{"model": "gpt-4"}}{REQUEST_END_MARKER}{{"index": 1}}\n') + log_file = f.name + + try: + client = ModelClient(log_file_name=log_file) + # Should timeout because index 2 doesn't exist + with pytest.raises(TimeoutError, match="pop_request timed out"): + await client.pop_request(index=2, timeout=0.5) + finally: + Path(log_file).unlink(missing_ok=True) + + +@pytest.mark.asyncio +async def test_wait_for_first_request_raises_timeout_error_when_timeout_expires(): + """Test that wait_for_first_request raises TimeoutError when timeout expires.""" + # Use a non-existent file + client = ModelClient(log_file_name="/non/existent/path/file.log") + with pytest.raises(TimeoutError, match="wait_for_first_request timed out"): + await client.wait_for_first_request(timeout=0.5) + + +# ==================== Function Signature Tests ==================== + + +def test_pop_request_timeout_default_is_from_env_vars(): + """Test that pop_request timeout parameter default is env_vars.ROCK_MODEL_CLIENT_POLL_TIMEOUT.""" + sig = inspect.signature(ModelClient.pop_request) + timeout_param = sig.parameters["timeout"] + # The default value should equal env_vars.ROCK_MODEL_CLIENT_POLL_TIMEOUT (evaluated at import time) + assert timeout_param.default == env_vars.ROCK_MODEL_CLIENT_POLL_TIMEOUT + + +def test_wait_for_first_request_timeout_default_is_from_env_vars(): + """Test that wait_for_first_request timeout parameter default is env_vars.ROCK_MODEL_CLIENT_POLL_TIMEOUT.""" + sig = inspect.signature(ModelClient.wait_for_first_request) + timeout_param = sig.parameters["timeout"] + # The default value should equal env_vars.ROCK_MODEL_CLIENT_POLL_TIMEOUT (evaluated at import time) + assert timeout_param.default == env_vars.ROCK_MODEL_CLIENT_POLL_TIMEOUT + + +# ==================== Cancellation Tests ==================== + + +@pytest.mark.asyncio +async def test_pop_request_propagates_cancelled_error(): + """Test that pop_request properly propagates asyncio.CancelledError.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + # Write a request with index 1, but we'll ask for index 2 + f.write(f'{REQUEST_START_MARKER}{{"model": "gpt-4"}}{REQUEST_END_MARKER}{{"index": 1}}\n') + log_file = f.name + + try: + client = ModelClient(log_file_name=log_file) + + async def cancel_after_delay(): + await asyncio.sleep(0.3) + raise asyncio.CancelledError() + + task = asyncio.create_task(client.pop_request(index=2, timeout=10.0)) + # Cancel the task after a short delay + await asyncio.sleep(0.3) + task.cancel() + + with pytest.raises(asyncio.CancelledError): + await task + finally: + Path(log_file).unlink(missing_ok=True) + + +@pytest.mark.asyncio +async def test_wait_for_first_request_propagates_cancelled_error(): + """Test that wait_for_first_request properly propagates asyncio.CancelledError.""" + client = ModelClient(log_file_name="/non/existent/path/file.log") + + task = asyncio.create_task(client.wait_for_first_request(timeout=10.0)) + # Cancel the task after a short delay + await asyncio.sleep(0.3) + task.cancel() + + with pytest.raises(asyncio.CancelledError): + await task