Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,101 @@ if __name__ == "__main__":
main()
```

## 4. Model Service SDK

### 4.1 概述

Model Service SDK 提供与 Model Service 交互的接口,支持 Agent 工作流中的 LLM 请求/响应处理。`ModelClient` 类是读取请求和写入响应到模型服务日志文件的主要接口。

### 4.2 基本用法

```python
import asyncio
from rock.sdk.model.client import ModelClient

async def main():
# 创建 ModelClient 实例
client = ModelClient()

# 获取第一个请求 (index=0)
first_request = await client.anti_call_llm(index=0)
print(f"第一个请求: {first_request}")

# 发送响应并获取下一个请求
llm_response = '{"content": "你好,有什么可以帮你的?"}'
next_request = await client.anti_call_llm(index=1, last_response=llm_response)
print(f"下一个请求: {next_request}")

asyncio.run(main())
```

### 4.3 超时与取消支持

`pop_request` 和 `wait_for_first_request` 方法支持超时和取消,防止无限阻塞:

#### 超时配置

```python
import asyncio
from rock.sdk.model.client import ModelClient

async def main():
client = ModelClient()

try:
# 等待第一个请求,超时时间为 30 秒
await client.wait_for_first_request(timeout=30.0)

# 弹出请求,超时时间为 60 秒(默认值)
request = await client.pop_request(index=1)
except TimeoutError as e:
print(f"操作超时: {e}")

asyncio.run(main())
```

#### 取消处理

```python
import asyncio
from rock.sdk.model.client import ModelClient

async def main():
client = ModelClient()

async def get_request():
try:
request = await client.pop_request(index=1)
return request
except asyncio.CancelledError:
print("请求被取消")
raise

# 创建可取消的任务
task = asyncio.create_task(get_request())

# 5 秒后取消
await asyncio.sleep(5)
task.cancel()

asyncio.run(main())
```

### 4.4 默认超时时间

轮询操作的默认超时时间为 **60 秒**。您可以自定义此值:

```python
# 使用默认超时(60 秒)
await client.pop_request(index=1)

# 自定义超时(30 秒)
await client.pop_request(index=1, timeout=30.0)

# 无超时(无限等待 - 不推荐)
await client.pop_request(index=1, timeout=None)
```

## 相关文档
- [快速开始指南](../../Getting%20Started/quickstart.md) - 了解如何快速开始使用 ROCK SDK
- [API 文档](../api.md) - 查看 SDK 封装的底层 API 接口
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,101 @@ if __name__ == "__main__":
main()
```

## 4. Model Service SDK

### 4.1 Overview

The Model Service SDK provides interfaces for interacting with the Model Service, enabling LLM request/response handling for agent workflows. The `ModelClient` class is the primary interface for reading requests and writing responses to the model service log file.

### 4.2 Basic Usage

```python
import asyncio
from rock.sdk.model.client import ModelClient

async def main():
# Create a ModelClient instance
client = ModelClient()

# Get the first request (index=0)
first_request = await client.anti_call_llm(index=0)
print(f"First request: {first_request}")

# Send a response and get the next request
llm_response = '{"content": "Hello, how can I help?"}'
next_request = await client.anti_call_llm(index=1, last_response=llm_response)
print(f"Next request: {next_request}")

asyncio.run(main())
```

### 4.3 Timeout and Cancellation Support

The `pop_request` and `wait_for_first_request` methods support timeout and cancellation to prevent indefinite blocking:

#### Timeout Configuration

```python
import asyncio
from rock.sdk.model.client import ModelClient

async def main():
client = ModelClient()

try:
# Wait for first request with a 30-second timeout
await client.wait_for_first_request(timeout=30.0)

# Pop request with a 60-second timeout (default)
request = await client.pop_request(index=1)
except TimeoutError as e:
print(f"Operation timed out: {e}")

asyncio.run(main())
```

#### Cancellation Handling

```python
import asyncio
from rock.sdk.model.client import ModelClient

async def main():
client = ModelClient()

async def get_request():
try:
request = await client.pop_request(index=1)
return request
except asyncio.CancelledError:
print("Request was cancelled")
raise

# Create a task that can be cancelled
task = asyncio.create_task(get_request())

# Cancel after 5 seconds
await asyncio.sleep(5)
task.cancel()

asyncio.run(main())
```

### 4.4 Default Timeout

The default timeout for polling operations is **60 seconds**. You can customize this:

```python
# Use default timeout (60 seconds)
await client.pop_request(index=1)

# Custom timeout (30 seconds)
await client.pop_request(index=1, timeout=30.0)

# No timeout (wait indefinitely - not recommended)
await client.pop_request(index=1, timeout=None)
```

## Related Documents
- [Quick Start Guide](../../Getting%20Started/quickstart.md) - Learn how to quickly get started with the ROCK SDK
- [API Documentation](../api.md) - View the underlying API interfaces encapsulated by the SDK
Expand Down
168 changes: 168 additions & 0 deletions examples/model_client_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""
ModelClient Demo - Demonstrates timeout and cancellation support.

This example shows how to use ModelClient with:
1. Timeout configuration
2. Cancellation handling
3. Basic request/response flow

NOTE: This demo requires the model service to be running.
Start the model service with: rock model-service start --type local
"""

import asyncio
import logging
import tempfile
from pathlib import Path

from rock.sdk.model.client import ModelClient, DEFAULT_POLL_TIMEOUT
from rock.sdk.model.server.config import REQUEST_END_MARKER, REQUEST_START_MARKER

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s -- %(message)s",
)
logger = logging.getLogger(__name__)


async def demo_timeout():
"""Demonstrate timeout behavior when request is not found."""
logger.info("=" * 60)
logger.info("Demo: Timeout behavior")
logger.info("=" * 60)

# Create a temporary log file with a request at index 1
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(f'{REQUEST_START_MARKER}{{"model": "gpt-4", "messages": []}}{REQUEST_END_MARKER}{{"index": 1}}\n')
log_file = f.name

try:
client = ModelClient(log_file_name=log_file)

# Try to get request at index 2, which doesn't exist
# This should timeout after 2 seconds
logger.info("Attempting to pop request at index 2 (doesn't exist)...")
try:
await client.pop_request(index=2, timeout=2.0)
except TimeoutError as e:
logger.info(f"TimeoutError caught as expected: {e}")
finally:
Path(log_file).unlink(missing_ok=True)

logger.info("Demo: Timeout behavior - PASSED\n")


async def demo_cancellation():
"""Demonstrate cancellation handling."""
logger.info("=" * 60)
logger.info("Demo: Cancellation handling")
logger.info("=" * 60)

# Create a temporary log file with a request at index 1
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(f'{REQUEST_START_MARKER}{{"model": "gpt-4", "messages": []}}{REQUEST_END_MARKER}{{"index": 1}}\n')
log_file = f.name

try:
client = ModelClient(log_file_name=log_file)

async def long_running_pop():
try:
# This will wait indefinitely for index 2
return await client.pop_request(index=2, timeout=100.0)
except asyncio.CancelledError:
logger.info("Task was cancelled, cleaning up...")
raise

# Create a task that can be cancelled
task = asyncio.create_task(long_running_pop())

# Wait a bit and then cancel
await asyncio.sleep(1.0)
logger.info("Cancelling the task...")
task.cancel()

try:
await task
except asyncio.CancelledError:
logger.info("CancelledError propagated correctly")
finally:
Path(log_file).unlink(missing_ok=True)

logger.info("Demo: Cancellation handling - PASSED\n")


async def demo_happy_path():
"""Demonstrate normal request retrieval."""
logger.info("=" * 60)
logger.info("Demo: Normal request retrieval")
logger.info("=" * 60)

# Create a temporary log file with requests
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(f'{REQUEST_START_MARKER}{{"model": "gpt-4", "messages": [{{"role": "user", "content": "Hello"}}]}}{REQUEST_END_MARKER}{{"index": 1}}\n')
log_file = f.name

try:
client = ModelClient(log_file_name=log_file)

logger.info("Popping request at index 1...")
request = await client.pop_request(index=1, timeout=5.0)
logger.info(f"Got request: {request[:100]}...")

if "gpt-4" in request:
logger.info("Request content verified successfully")
finally:
Path(log_file).unlink(missing_ok=True)

logger.info("Demo: Normal request retrieval - PASSED\n")


async def demo_wait_for_first_request():
"""Demonstrate wait_for_first_request with timeout."""
logger.info("=" * 60)
logger.info("Demo: wait_for_first_request with timeout")
logger.info("=" * 60)

# Use a non-existent file to trigger timeout
client = ModelClient(log_file_name="/non/existent/path/file.log")

logger.info("Waiting for first request (will timeout)...")
try:
await client.wait_for_first_request(timeout=1.0)
except TimeoutError as e:
logger.info(f"TimeoutError caught as expected: {e}")

logger.info("Demo: wait_for_first_request with timeout - PASSED\n")


async def demo_default_timeout():
"""Show default timeout value."""
logger.info("=" * 60)
logger.info("Demo: Default timeout configuration")
logger.info("=" * 60)

logger.info(f"Default poll timeout: {DEFAULT_POLL_TIMEOUT} seconds")
logger.info("This default is applied to pop_request and wait_for_first_request")
logger.info("Demo: Default timeout configuration - PASSED\n")


async def main():
"""Run all demos."""
logger.info("\n" + "=" * 60)
logger.info("ModelClient Demo - Timeout & Cancellation Support")
logger.info("=" * 60 + "\n")

await demo_default_timeout()
await demo_happy_path()
await demo_timeout()
await demo_wait_for_first_request()
await demo_cancellation()

logger.info("=" * 60)
logger.info("All demos completed successfully!")
logger.info("=" * 60)


if __name__ == "__main__":
asyncio.run(main())
2 changes: 2 additions & 0 deletions rock/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
# Model Service Config
ROCK_MODEL_SERVICE_DATA_DIR: str
ROCK_MODEL_SERVICE_TRAJ_APPEND_MODE: bool | None = None
ROCK_MODEL_CLIENT_POLL_TIMEOUT: float | None = None

# RuntimeEnv
ROCK_RTENV_PYTHON_V31114_INSTALL_CMD: str
Expand Down Expand Up @@ -101,6 +102,7 @@
"ROCK_MODEL_SERVICE_DATA_DIR": lambda: os.getenv("ROCK_MODEL_SERVICE_DATA_DIR", "/data/logs"),
"ROCK_MODEL_SERVICE_TRAJ_APPEND_MODE": lambda: os.getenv("ROCK_MODEL_SERVICE_TRAJ_APPEND_MODE", "false").lower()
== "true",
"ROCK_MODEL_CLIENT_POLL_TIMEOUT": lambda: float(val) if (val := os.getenv("ROCK_MODEL_CLIENT_POLL_TIMEOUT")) else None,
"ROCK_RTENV_PYTHON_V31114_INSTALL_CMD": lambda: os.getenv(
"ROCK_RTENV_PYTHON_V31114_INSTALL_CMD",
"[ -f cpython31114.tar.gz ] && rm cpython31114.tar.gz; [ -d python ] && rm -rf python; wget -q -O cpython31114.tar.gz https://github.com/astral-sh/python-build-standalone/releases/download/20251120/cpython-3.11.14+20251120-x86_64-unknown-linux-gnu-install_only.tar.gz && tar -xzf cpython31114.tar.gz && mv python runtime-env",
Expand Down
Loading
Loading