diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 72699c1..b1ea21b 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -1,12 +1,31 @@ { - "mcpServers": { - "MaaMCP": { - "command": "python", - "args": [ - "-m", - "maa_mcp" - ], - "cwd": "${workspaceFolder}" - } + "permissions": { + "allow": [ + "Bash(python -c \"import maa_mcp.pipeline_server\")", + "mcp__maa-*", + "mcp__maa-mcp__ocr", + "mcp__maa-mcp__click", + "mcp__maa-mcp__wait", + "mcp__maa-mcp__click_key", + "mcp__maa-mcp__connect_adb_device", + "mcp__maa-mcp__swipe", + "mcp__maa-mcp__find_adb_device_list", + "mcp__maa-mcp__test_sweep" + ], + "defaultMode": "bypassPermissions" + }, + "enableAllProjectMcpServers": true, + "enabledMcpjsonServers": [ + "maa-mcp" + ], + "mcpServers": { + "MaaMCP": { + "command": "python", + "args": [ + "-m", + "maa_mcp" + ], + "cwd": "${workspaceFolder}" } + } } \ No newline at end of file diff --git a/.gitignore b/.gitignore index 746979e..3b103f8 100644 --- a/.gitignore +++ b/.gitignore @@ -465,4 +465,7 @@ install tools/ImageCropper/**/*.png config -debug \ No newline at end of file +debug +output +.claude/skills +comment.txt \ No newline at end of file diff --git a/.mcp.json b/.mcp.json new file mode 100644 index 0000000..9f1ffae --- /dev/null +++ b/.mcp.json @@ -0,0 +1,11 @@ +{ + "mcpServers": { + "maa-mcp": { + "command": "python", + "args": [ + "-m", + "maa_mcp.pipeline_server" + ] + } + } +} \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..7c2987e --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,88 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +MaaMCP is an MCP (Model Context Protocol) server that exposes MaaFramework's automation capabilities to AI assistants. It provides Android device control via ADB and Windows desktop automation via window handles. + +## Development Commands + +```bash +# Install dependencies in development mode +pip install -e . + +# Run MCP server (standard serial mode) +maa-mcp +# or +python -m maa_mcp + +# Run MCP server (pipeline mode with background screenshot thread) +maa-mcp-server +# or +python -m maa_mcp.pipeline_server + +# Run tests +pytest tests/ -v +pytest tests/test_basic.py -v # run specific file +``` + +## Architecture + +### Entry Points + +The package has multiple entry points defined in `pyproject.toml`: +- `maa-mcp` / `maa_mcp`: Standard MCP server ([__main__.py](maa_mcp/__main__.py)) +- `maa-mcp-server` / `maa_mcp_server`: Pipeline server with multi-threaded background monitoring ([pipeline_server.py](maa_mcp/pipeline_server.py)) + +### Core Components + +- **[core.py](maa_mcp/core.py)**: Creates the FastMCP server instance, global registries (`object_registry`, `controller_info_registry`), and `ControllerInfo` dataclass +- **[registry.py](maa_mcp/registry.py)**: `ObjectRegistry` class for managing controller instances by ID +- **[paths.py](maa_mcp/paths.py)**: Cross-platform data directory management using `platformdirs` + +### Module Responsibilities + +| Module | Purpose | +|--------|---------| +| `adb.py` | ADB device discovery (`find_adb_device_list`) and connection (`connect_adb_device`) | +| `win32.py` | Windows window discovery (`find_window_list`) and connection (`connect_window`) | +| `vision.py` | Screen capture (`screencap`) and OCR recognition (`ocr`) | +| `control.py` | Input operations: `click`, `double_click`, `swipe`, `input_text`, `click_key`, `keyboard_shortcut`, `scroll` | +| `resource.py` | OCR resource download and tasker management | +| `download.py` | OCR model file download utilities | +| `pipeline/` | Pipeline mode state management and logging | + +### Two Operation Modes + +1. **Serial Mode**: Synchronous execution where each operation waits for the previous to complete +2. **Pipeline Mode**: Multi-threaded mode where a background thread continuously captures screenshots and caches them in a queue for the main thread to process decisions + +### Controller Pattern + +All device/window control flows through: +1. Discovery functions return device/window identifiers +2. Connection functions create `AdbController` or `Win32Controller` instances (from `maafw`) and register them in `object_registry` +3. Operations use `controller_id` to look up the controller in `object_registry` +4. `controller_info_registry` stores metadata (controller type, connection params) for each `controller_id` + +### Key Dependencies + +- `maafw>=5.2.6`: Core automation framework (MaaFramework) +- `fastmcp>=2.0.0`: MCP server framework +- `opencv-python>=4.0.0`: Image processing for screenshots +- `loguru>=0.7.0`: Logging +- `platformdirs>=4.0.0`: Cross-platform paths + +## Data Storage + +OCR models and screenshots are stored in platform-specific directories: +- Windows: `C:\Users\\AppData\Local\MaaMCP\` +- macOS: `~/Library/Application Support/MaaMCP/` +- Linux: `~/.local/share/MaaMCP/` + +## Localization + +- [CLAUDE_CN.md](CLAUDE_CN.md): Chinese version of this document + +__Rule__: When updating this file, always sync changes to [CLAUDE_CN.md](CLAUDE_CN.md) diff --git a/CLAUDE_CN.md b/CLAUDE_CN.md new file mode 100644 index 0000000..3e0674c --- /dev/null +++ b/CLAUDE_CN.md @@ -0,0 +1,82 @@ +# CLAUDE_CN.md + +本文件为 Claude Code (claude.ai/code) 在本仓库中工作时提供指导。 + +## 项目概述 + +MaaMCP 是一个 MCP(Model Context Protocol)服务器,将 MaaFramework 的自动化能力暴露给 AI 助手。它通过 ADB 提供 Android 设备控制,通过窗口句柄提供 Windows 桌面自动化。 + +## 开发命令 + +```bash +# 以开发模式安装依赖 +pip install -e . + +# 运行 MCP 服务器(标准串行模式) +maa-mcp +# 或 +python -m maa_mcp + +# 运行 MCP 服务器(流水线模式,带后台截图线程) +maa-mcp-server +# 或 +python -m maa_mcp.pipeline_server + +# 运行测试 +pytest tests/ -v +pytest tests/test_basic.py -v # 运行特定文件 +``` + +## 架构 + +### 入口点 + +包在 `pyproject.toml` 中定义了多个入口点: +- `maa-mcp` / `maa_mcp`:标准 MCP 服务器([__main__.py](maa_mcp/__main__.py)) +- `maa-mcp-server` / `maa_mcp_server`:带多线程后台监控的流水线服务器([pipeline_server.py](maa_mcp/pipeline_server.py)) + +### 核心组件 + +- **[core.py](maa_mcp/core.py)**:创建 FastMCP 服务器实例、全局注册表(`object_registry`、`controller_info_registry`)和 `ControllerInfo` 数据类 +- **[registry.py](maa_mcp/registry.py)**:`ObjectRegistry` 类,用于通过 ID 管理控制器实例 +- **[paths.py](maa_mcp/paths.py)**:使用 `platformdirs` 的跨平台数据目录管理 + +### 模块职责 + +| 模块 | 用途 | +|------|------| +| `adb.py` | ADB 设备发现(`find_adb_device_list`)和连接(`connect_adb_device`) | +| `win32.py` | Windows 窗口发现(`find_window_list`)和连接(`connect_window`) | +| `vision.py` | 屏幕截图(`screencap`)和 OCR 识别(`ocr`) | +| `control.py` | 输入操作:`click`、`double_click`、`swipe`、`input_text`、`click_key`、`keyboard_shortcut`、`scroll` | +| `resource.py` | OCR 资源下载和任务管理 | +| `download.py` | OCR 模型文件下载工具 | +| `pipeline/` | 流水线模式状态管理和日志 | + +### 两种操作模式 + +1. **串行模式**:同步执行,每个操作等待前一个完成 +2. **流水线模式**:多线程模式,后台线程持续截图并缓存在队列中,供主线程处理决策 + +### 控制器模式 + +所有设备/窗口控制都通过以下流程: +1. 发现函数返回设备/窗口标识符 +2. 连接函数创建 `AdbController` 或 `Win32Controller` 实例(来自 `maafw`)并注册到 `object_registry` +3. 操作使用 `controller_id` 在 `object_registry` 中查找控制器 +4. `controller_info_registry` 存储每个 `controller_id` 的元数据(控制器类型、连接参数) + +### 关键依赖 + +- `maafw>=5.2.6`:核心自动化框架(MaaFramework) +- `fastmcp>=2.0.0`:MCP 服务器框架 +- `opencv-python>=4.0.0`:截图图像处理 +- `loguru>=0.7.0`:日志 +- `platformdirs>=4.0.0`:跨平台路径 + +## 数据存储 + +OCR 模型和截图存储在平台特定的目录中: +- Windows:`C:\Users\\AppData\Local\MaaMCP\` +- macOS:`~/Library/Application Support/MaaMCP/` +- Linux:`~/.local/share/MaaMCP/` \ No newline at end of file diff --git a/README.md b/README.md index 3b040bc..cf4da0e 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ MaaMCP 是一个 MCP 服务器,将 MaaFramework 的强大自动化能力通过 - 🖥️ **Windows 自动化** - 控制 Windows 桌面应用程序 - 🎯 **后台操作** - Windows 上的截图与控制均在后台运行,不占用鼠标键盘,您可以继续使用电脑做其他事情 - 🔗 **多设备协同** - 同时控制多个设备/窗口,实现跨设备自动化 +- ⚡ **双模式运行** - 串行模式(同步执行)和流水线模式(后台持续截图),适应不同场景需求 - 👁️ **智能识别** - 使用 OCR 识别屏幕文字内容 - 🎯 **精准操作** - 执行点击、滑动、文本输入、按键等操作 - 📸 **屏幕截图** - 获取实时屏幕截图进行视觉分析 @@ -47,8 +48,8 @@ Talk is cheap, 请看: **[🎞️ Bilibili 视频演示](https://www.bilibili.co ### 👀 屏幕识别 -- `screencap_and_ocr` - 光学字符识别(高效,推荐优先使用) -- `screencap_only` - 屏幕截图,然后由大模型视觉处理(按需使用,token 开销大) +- `ocr` - 光学字符识别(高效,推荐优先使用) +- `screencap` - 屏幕截图(按需使用,token 开销大) ### 🎮 设备控制 @@ -64,6 +65,13 @@ Talk is cheap, 请看: **[🎞️ Bilibili 视频演示](https://www.bilibili.co - 支持组合键:Ctrl+C、Ctrl+V、Alt+Tab 等 - `scroll` - 鼠标滚轮(仅 Windows) +### ⚡ 流水线模式(多线程后台监控) + +- `start_pipeline` - 启动后台监控流水线,持续截图并缓存图片路径 +- `stop_pipeline` - 停止流水线 +- `get_new_messages` - 获取流水线缓存的新截图路径 +- `get_pipeline_status` - 获取流水线运行状态 + ### 📝 Pipeline 生成与运行 - `get_pipeline_protocol` - 获取 Pipeline 协议文档 @@ -105,6 +113,36 @@ pip install maa-mcp pip install -e . ``` +### 运行方式 + +MaaMCP 提供两种运行模式: + +#### 标准服务器(串行模式) + +传统的同步执行方式,适合简单任务: + +```bash +# 已安装包的情况 +maa-mcp + +# 从源码开发运行 +python -m maa_mcp +``` + +#### 流水线服务器(多线程后台监控) + +多线程异步执行方式,适合需要高频屏幕监控的实时自动化任务: + +```bash +# 已安装包的情况 +maa-mcp-server + +# 从源码开发运行 +python -m maa_mcp.pipeline_server +``` + +两种服务器的功能完全一致,均支持串行模式和流水线模式的自动化流程。区别在于流水线服务器内部使用独立的后台线程持续采集屏幕截图,可根据任务需求灵活选择。 + ### 配置客户端 在 Cursor 等软件中,添加 MCP 服务器: @@ -157,41 +195,76 @@ MaaMCP 会自动: 3. 自动下载并加载 OCR 资源 4. 执行识别和操作任务 -## 大模型提示词 +## 工作流程 -如果你希望 AI 能够快速、高效地完成自动化任务,而不希望看到运行过程中的详细解释,可以将以下内容添加到你的提示词(Prompt)中: +MaaMCP 遵循简洁的操作流程,支持多设备/多窗口协同工作,并提供两种运行模式: +```mermaid +graph LR + A[扫描设备] --> B[建立连接] + B --> C1[串行模式] + B --> C2[流水线模式] + C1 --> D[执行自动化操作] + C2 --> D ``` -# Role: UI Automation Agent -## Workflow Optimization Rules -1. **Minimize Round-Trips**: 你的目标是以最少的交互次数完成任务。 -2. **Critical Pattern**: 当涉及到表单/聊天输入时,必须遵循 **[Click Focus -> Input Text -> Send Key]** 的原子化操作序列。 - - 🚫 错误做法:先 Click,等待结果;再 Input,等待结果;再 Press Enter。 - - ✅ 正确做法:在 `click` 之后,无需等待返回,直接在同一个 `tool_calls` 列表中根据逻辑推断追加 `input_text` 和 `click_key`。 +1. **扫描** - 使用 `find_adb_device_list` 或 `find_window_list` +2. **连接** - 使用 `connect_adb_device` 或 `connect_window`(可连接多个设备/窗口,获得多个控制器 ID) +3. **操作** - 通过指定不同的控制器 ID,对多个设备/窗口执行 OCR、点击、滑动等自动化操作 -## Communication Style -- **NO YAPPING**: 不要复述用户的指令,不要解释你的步骤。 -- **Direct Execution**: 接收指令 -> (内部思考) -> 直接输出 JSON 工具调用。 -``` +### 双模式运行 -### 性能建议 +MaaMCP 支持两种运行模式,可根据任务需求灵活选择: -为了获得最快的运行速度,建议使用 **Flash 版本**的大语言模型(如 Claude 3 Flash),这些模型在保持较高智能水平的同时,能够显著提升响应速度。 +#### 串行模式(默认) -## 工作流程 +传统的同步执行方式,一个指令完成后再执行下一个: -MaaMCP 遵循简洁的操作流程,支持多设备/多窗口协同工作: +``` +OCR识别 → 分析结果 → 执行操作 → OCR识别 → ... +``` + +**适用场景**:简单任务、对实时性要求不高的场景 + +#### 流水线模式(多线程后台监控) + +多线程异步执行方式,后台持续采集屏幕信息,主线程专注于决策和操作: ```mermaid graph LR - A[扫描设备] --> B[建立连接] - B --> C[执行自动化操作] + subgraph 后台线程 + S1[持续截图] --> S2[缓存图片路径] + S2 --> S3[推送到消息队列] + S3 --> S1 + end + subgraph 主线程 + M1[获取截图路径] --> M2[视觉分析] + M2 --> M3[决定是否OCR] + M3 --> M4[执行操作] + M4 --> M1 + end ``` -1. **扫描** - 使用 `find_adb_device_list` 或 `find_window_list` -2. **连接** - 使用 `connect_adb_device` 或 `connect_window`(可连接多个设备/窗口,获得多个控制器 ID) -3. **操作** - 通过指定不同的控制器 ID,对多个设备/窗口执行 OCR、点击、滑动等自动化操作 +**工作流程**: + +1. **启动流水线** - 调用 `start_pipeline(controller_id)` 启动后台监控 +2. **获取截图** - 调用 `get_pipeline_status()` 检查状态,`get_new_messages()` 获取截图路径 +3. **分析执行** - 读取图片进行视觉分析,根据需要调用 OCR,执行点击等操作 +4. **停止流水线** - 任务完成后调用 `stop_pipeline()` 释放资源 + +**优势**: + +- 后台持续截图,AI 可直接查看完整画面进行决策 +- AI 可根据图片内容自行决定是否需要 OCR、具体 OCR 哪个区域 +- 支持高频屏幕监控,不错过任何界面变化 +- 适合需要快速响应的实时自动化任务 +- 消息队列机制,支持异步处理 + +**使用示例**: + +```text +请用 MaaMCP 工具连接我的设备,使用流水线模式监控屏幕,当出现特定弹窗时自动点击确认。 +``` ## Pipeline 生成功能 @@ -292,9 +365,9 @@ Pipeline 生成后,AI 会自动进行验证和优化: 首次使用时,会自动下载 OCR 模型文件。但可能出现下载失败等情况,请检查数据目录: -- Windows: `C:\Users\<用户名>\AppData\Local\MaaXYZ\MaaMCP\resource\model\ocr\` -- macOS: `~/Library/Application Support/MaaXYZ/MaaMCP/resource/model/ocr/` -- Linux: `~/.local/share/MaaXYZ/MaaMCP/resource/model/ocr/` +- Windows: `C:\Users\<用户名>\AppData\Local\MaaMCP\resource\model\ocr\` +- macOS: `~/Library/Application Support/MaaMCP/resource/model/ocr/` +- Linux: `~/.local/share/MaaMCP/resource/model/ocr/` 1. 检查上述目录中是否有模型文件(`det.onnx`, `rec.onnx`, `keys.txt`) 2. 检查 `model/download.log` 中是否出现资源下载异常 @@ -304,9 +377,9 @@ Pipeline 生成后,AI 会自动进行验证和优化: 提交问题时,请提供日志文件,日志文件路径如下: -- Windows: `C:\Users\<用户名>\AppData\Local\MaaXYZ\MaaMCP\debug\maa.log` -- macOS: `~/Library/Application Support/MaaXYZ/MaaMCP/debug/maa.log` -- Linux: `~/.local/share/MaaXYZ/MaaMCP/debug/maa.log` +- Windows: `C:\Users\<用户名>\AppData\Local\MaaMCP\debug\maa.log` +- macOS: `~/Library/Application Support/MaaMCP/debug/maa.log` +- Linux: `~/.local/share/MaaMCP/debug/maa.log` ## 许可证 diff --git a/README_EN.md b/README_EN.md index 54bef2f..d6afbb5 100644 --- a/README_EN.md +++ b/README_EN.md @@ -30,6 +30,7 @@ MaaMCP is a Model Context Protocol server that exposes MaaFramework's powerful a - 🖥️ **Windows Automation** - Control Windows desktop applications - 🎯 **Background Operation** - Screenshots and controls on Windows run in the background without occupying your mouse or keyboard, allowing you to continue using your computer for other tasks - 🔗 **Multi-Device Coordination** - Control multiple devices/windows simultaneously for cross-device automation +- ⚡ **Dual-Mode Operation** - Serial mode (synchronous execution) and Pipeline mode (background continuous screenshots), adapting to different scenarios - 👁️ **Smart Recognition** - Use OCR to recognize on-screen text - 🎯 **Precise Operations** - Execute clicks, swipes, text input, key presses, and more - 📸 **Screenshots** - Capture real-time screenshots for visual analysis @@ -47,8 +48,8 @@ Talk is cheap, see: **[🎞️ Bilibili Video Demo](https://www.bilibili.com/vid ### 👀 Screen Recognition -- `screencap_and_ocr` - Optical Character Recognition (efficient, recommended for priority use, OCR model auto-downloads on first use) -- `screencap_only` - Screenshot capture, then processed by large model vision (use as needed, high token cost) +- `ocr` - Optical Character Recognition (efficient, recommended for priority use, OCR model auto-downloads on first use) +- `screencap` - Screenshot capture (use as needed, high token cost) ### 🎮 Device Control @@ -64,6 +65,13 @@ Talk is cheap, see: **[🎞️ Bilibili Video Demo](https://www.bilibili.com/vid - Supports key combinations: Ctrl+C, Ctrl+V, Alt+Tab, etc. - `scroll` - Mouse wheel (Windows only) +### ⚡ Pipeline Mode (Multi-threaded Background Monitoring) + +- `start_pipeline` - Start background monitoring pipeline, continuously captures screenshots and caches image paths +- `stop_pipeline` - Stop pipeline +- `get_new_messages` - Get new screenshot paths cached by pipeline +- `get_pipeline_status` - Get pipeline running status + ### 📝 Pipeline Generation & Execution - `get_pipeline_protocol` - Get Pipeline protocol documentation @@ -149,42 +157,75 @@ MaaMCP will automatically: 3. Auto-download and load OCR resources (on first use) 4. Execute recognition and operation tasks -## Prompt words +## Workflow -If you want AI to complete automation tasks quickly and efficiently without seeing detailed explanations during the running process, you can add the following content to your prompt: +MaaMCP follows a streamlined operational workflow with multi-device/window coordination support and two operation modes: -``` -# Role: UI Automation Agent - -## Workflow Optimization Rules -1. **Minimize Round-Trips**: Your goal is to complete tasks with the fewest interactions. -2. **Critical Pattern**: When it comes to form/chat input, you must follow the **[Click Focus -> Input Text -> Send Key]** atomic operation sequence. - - 🚫 Wrong way: Click first, wait for results; then Input, wait for results; then Press Enter. - - ✅ Correct way: After `click`, without waiting for a return, directly append `input_text` and `click_key` in the same `tool_calls` list based on logical inference. - -## Communication Style -- **NO YAPPING**: Don't repeat user instructions, don't explain your steps. -- **Direct Execution**: Receive instructions -> (internal thinking) -> directly output JSON tool calls. -- **Direct Execution**: Receive instruction -> (internal thinking) -> directly output JSON tool calls. +```mermaid +graph LR + A[Scan Devices] --> B[Establish Connection] + B --> C1[Serial Mode] + B --> C2[Pipeline Mode] + C1 --> D[Execute Automation] + C2 --> D ``` -### Performance Recommendations +1. **Scan** - Use `find_adb_device_list` or `find_window_list` +2. **Connect** - Use `connect_adb_device` or `connect_window` (can connect multiple devices/windows, each gets a unique controller ID) +3. **Operate** - Execute OCR, click, swipe, etc. on multiple devices/windows by specifying different controller IDs (OCR resources auto-download on first use) -For the fastest running speed, it is recommended to use the **Flash version** of a large language model (such as Claude 3 Flash), which can significantly improve response speed while maintaining high intelligence levels. +### Dual-Mode Operation -## Workflow +MaaMCP supports two operation modes, allowing flexible selection based on task requirements: + +#### Serial Mode (Default) + +Traditional synchronous execution, completing one instruction before executing the next: + +``` +OCR Recognition → Analyze Results → Execute Operation → OCR Recognition → ... +``` + +**Suitable for**: Simple tasks, scenarios with low real-time requirements + +#### Pipeline Mode (Multi-threaded Background Monitoring) -MaaMCP follows a streamlined operational workflow with multi-device/window coordination support: +Multi-threaded asynchronous execution, background thread continuously captures screen information while main thread focuses on decision-making and operations: ```mermaid graph LR - A[Scan Devices] --> B[Establish Connection] - B --> C[Execute Automation] + subgraph Background Thread + S1[Continuous Screenshots] --> S2[Cache Image Paths] + S2 --> S3[Push to Message Queue] + S3 --> S1 + end + subgraph Main Thread + M1[Get Screenshot Path] --> M2[Visual Analysis] + M2 --> M3[Decide if OCR Needed] + M3 --> M4[Execute Operation] + M4 --> M1 + end ``` -1. **Scan** - Use `find_adb_device_list` or `find_window_list` -2. **Connect** - Use `connect_adb_device` or `connect_window` (can connect multiple devices/windows, each gets a unique controller ID) -3. **Operate** - Execute OCR, click, swipe, etc. on multiple devices/windows by specifying different controller IDs (OCR resources auto-download on first use) +**Workflow**: + +1. **Start Pipeline** - Call `start_pipeline(controller_id)` to start background monitoring +2. **Get Screenshots** - Call `get_pipeline_status()` to check status, `get_new_messages()` to get screenshot paths +3. **Analyze & Execute** - Read images for visual analysis, call OCR as needed, execute clicks and other operations +4. **Stop Pipeline** - Call `stop_pipeline()` to release resources when task is complete + +**Advantages**: +- Continuous background screenshots, AI can directly view complete screen for decision-making +- AI can decide whether OCR is needed and which specific regions to OCR based on image content +- Supports high-frequency screen monitoring, never misses interface changes +- Suitable for real-time automation tasks requiring fast response +- Message queue mechanism supports asynchronous processing + +**Usage Example**: + +```text +Please use MaaMCP tools to connect to my device, use pipeline mode to monitor the screen, and automatically click confirm when a specific popup appears. +``` ## Pipeline Generation diff --git a/maa_mcp/adb.py b/maa_mcp/adb.py index 171b011..7fbe35d 100644 --- a/maa_mcp/adb.py +++ b/maa_mcp/adb.py @@ -68,8 +68,16 @@ def connect_adb_device(device_name: str) -> Optional[str]: if not adb_controller.post_connection().wait().succeeded: return None controller_id = object_registry.register(adb_controller) + + connection_params = { + "adb_path": device.adb_path, + "address": device.address, + "screencap_methods": device.screencap_methods, + "input_methods": device.input_methods, + "config": device.config, + } + controller_info_registry[controller_id] = ControllerInfo( - controller_type=ControllerType.ADB + controller_type=ControllerType.ADB, connection_params=connection_params ) return controller_id - diff --git a/maa_mcp/control.py b/maa_mcp/control.py index e4425f2..a393f04 100644 --- a/maa_mcp/control.py +++ b/maa_mcp/control.py @@ -109,7 +109,14 @@ def double_click( - 失败:返回 False 说明: - 坐标系统以屏幕左上角为原点 (0, 0)。duration 参数控制滑动速度,数值越大滑动越慢。 + - 坐标系统以屏幕左上角为原点 (0, 0) + - duration 参数控制滑动速度,数值越大滑动越慢 + - 起点、终点坐标由 AI 根据当前 OCR 识别结果和场景自行计算决定 + + 预设 duration 值(建议直接使用),正常情况下默认用slow速度滑动: + + - slow(慢速): duration=3000,适合长距离拖拽、滚动翻页 + - fast(快速): duration=1500,适合短距离精确滑动、点击式滑动 """, ) def swipe( @@ -199,7 +206,9 @@ def click_key(controller_id: str, key: int, duration: int = 50) -> bool: return controller.post_key_up(key).wait().succeeded -@mcp.tool(name="keyboard_shortcut", description=""" +@mcp.tool( + name="keyboard_shortcut", + description=""" 在设备屏幕上执行键盘快捷键操作。 参数: @@ -220,7 +229,8 @@ def click_key(controller_id: str, key: int, duration: int = 50) -> bool: - Left Windows: 91 (0x5B) 注意:该方法仅对 Windows 窗口控制器,且在 Seize 控制方式下有效,其他控制方式不支持。 -""") +""", +) def keyboard_shortcut( controller_id: str, modifiers: list[int], primary_key: int, duration: int = 50 ) -> Union[bool, str]: @@ -233,7 +243,10 @@ def keyboard_shortcut( if info: if info.controller_type == ControllerType.ADB: return "keyboard_shortcut 不支持 ADB 控制器,该方法仅适用于 Windows 窗口控制器。请使用 click_key 进行单个按键操作。" - if info.controller_type == ControllerType.WIN32 and info.keyboard_method != "Seize": + if ( + info.controller_type == ControllerType.WIN32 + and info.keyboard_method != "Seize" + ): return f"keyboard_shortcut 仅支持 Seize 键盘模式,当前为 {info.keyboard_method}。可对同一窗口调用 connect_window(keyboard_method='Seize') 获取新 controller_id,原 controller_id 仍可用于其他操作。" for modifier in modifiers: diff --git a/maa_mcp/core.py b/maa_mcp/core.py index 25173b3..4a8b1f2 100644 --- a/maa_mcp/core.py +++ b/maa_mcp/core.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from enum import Enum, auto from pathlib import Path -from typing import Optional +from typing import Any, Dict, Optional from maa.toolkit import Toolkit @@ -30,6 +30,8 @@ class ControllerInfo: """控制器信息,用于记录控制器类型和配置""" controller_type: ControllerType + # 连接参数,用于在子进程中重建控制器 + connection_params: Dict[str, Any] # Win32 专用:键盘输入方式 keyboard_method: Optional[str] = None @@ -55,23 +57,75 @@ class ControllerInfo: - 每个设备/窗口拥有独立的控制器 ID(controller_id) - 通过在操作时指定不同的 controller_id 实现多设备协同自动化 - 标准工作流程: - 1. 设备/窗口发现与连接 + ⭐ 双模式运行支持: + - 串行模式(流程 1 + 2):传统的同步执行方式,一个指令完成后再执行下一个 + - 流水线模式(流程 1 + 3):多线程异步执行方式,后台持续采集屏幕信息,主线程专注于决策和操作 + + ======================== + 标准工作流程 + ======================== + + 1. 设备/窗口发现与连接(必选,两种模式通用) - 调用 find_adb_device_list() 扫描可用的 ADB 设备 - 调用 find_window_list() 扫描可用的 Windows 窗口 - 若发现多个设备/窗口,需向用户展示列表并等待用户选择需要操作的目标 - 使用 connect_adb_device(device_name) 或 connect_window(window_name) 建立连接 - 可连接多个设备/窗口,每个连接返回独立的控制器 ID - 2. 自动化执行循环 + 2. 串行自动化执行循环(流程 1 之后选择此流程进入串行模式) + ⭐ 标准工作循环:截图(screencap) → OCR识别(ocr) → 分析内容 → 执行操作(click/swipe等) → 重复直到完成 - 调用 ocr(controller_id) 对指定设备进行屏幕截图和 OCR 识别 - - 首次使用时,如果 OCR 模型文件不存在,ocr() 会返回提示信息,需要调用 check_and_download_ocr() 下载资源 + - 首次使用时,如果 OCR 模型文件不存在, ocr() 会返回提示信息,需要调用 check_and_download_ocr() 下载资源 - 下载完成后即可正常使用 OCR 功能,后续调用无需再次下载 - - 根据识别结果调用 click()、double_click()、scroll()、swipe() 等执行相应操作 + - 根据 OCR 识别结果中的文字和坐标,执行 click()、double_click()、scroll()、swipe() 等操作 + - 操作后等待界面刷新(约 1 秒),然后再次调用 ocr() 获取新界面状态 + - 重复以上循环,直到完成用户指定的任务 - 所有操作通过 controller_id 指定目标设备/窗口 - 可在多个设备间切换操作,实现协同自动化 + - 特点:每次操作需等待 OCR 完成,适合简单任务或对实时性要求不高的场景 + + ⚠️ 重要提醒: + - 模拟器(如 MuMu)必须使用 ADB 方式连接(find_adb_device_list → connect_adb_device), + 使用 Win32 窗口方式连接(find_window_list → connect_window)会导致截图黑屏 + - 普通 Windows 窗口应使用 find_window_list → connect_window 连接 + + 3. 流水线自动化执行(流程 1 之后选择此流程进入多线程流水线模式) + ⭐ 适用场景:需要高频屏幕监控、实时响应的自动化任务 + + 3.1 启动流水线 + - 调用 start_pipeline(controller_id) 启动指定控制器的流水线 + - 流水线会在后台启动独立线程,按固定频率自动截图并缓存图片路径 + - 截图路径会自动推送到消息队列中 + - 启动流水线后,等待约 1 秒让流水线进行初始缓存 + + 3.2 获取流水线状态和截图 + - 调用 get_pipeline_status() 检测流水线运行状态和待处理消息数量 + - 如果有新消息,调用 get_new_messages() 获取最新的截图路径 + - 消息包含 type(固定为 "screenshot")、image_path(截图文件路径)、timestamp、frame_id + + 3.3 分析截图并执行操作 + - 读取 image_path 中的图片内容,进行视觉分析 + - 根据图片内容判断是否需要执行 OCR(调用 ocr 工具获取文字信息) + - 根据分析结果调用 click()、double_click()、scroll()、swipe() 等执行相应操作 + - 所有操作通过 controller_id 指定目标设备/窗口 + - 可在多个设备间切换操作,实现协同自动化 + - 操作完成后继续循环 3.2 和 3.3,直到任务完成 + + 3.4 停止流水线 + - 任务完成后,调用 stop_pipeline() 停止流水线 + - 释放后台线程资源 + + 流水线模式优势: + - 后台持续截图,大模型可直接查看完整画面进行决策 + - 大模型可根据图片内容自行决定是否需要 OCR、具体 OCR 哪个区域 + - 支持高频屏幕监控,不错过任何界面变化 + - 适合需要快速响应的实时自动化任务 + - 消息队列机制,支持异步处理和历史数据查询 + + ======================== + 屏幕识别策略(重要) + ======================== - 屏幕识别策略: - 优先使用 OCR:始终优先调用 ocr() 进行文字识别,OCR 返回结构化文本数据,token 消耗极低 - 按需使用截图:仅当以下情况时,才调用 screencap() 获取截图,再通过 read_file 读取图片进行视觉识别: 1. OCR 结果不足以做出决策(如需要识别图标、图像、颜色、布局等非文字信息) @@ -93,7 +147,10 @@ class ControllerInfo: 截图异常(画面为空、纯黑、花屏等): - 多尝试几次(2~3次)确认是否为偶发问题,不要一次失败就切换 - - 若持续异常,按优先级切换截图方式重新连接: + - ⚠️ 重要:如果目标是模拟器(如 MuMu),画面黑屏是正常的! + 模拟器必须使用 ADB 方式连接:调用 find_adb_device_list() → connect_adb_device() + 使用 Win32 窗口方式连接模拟器会持续黑屏,切换截图方式无法解决 + - 若持续异常且目标是普通 Windows 窗口(非模拟器),按优先级切换截图方式重新连接: FramePool → PrintWindow → GDI → DXGI_DesktopDup_Window → ScreenDC - 最后手段:DXGI_DesktopDup(截取整个桌面,触控坐标会不正确,仅用于排查问题) diff --git a/maa_mcp/paths.py b/maa_mcp/paths.py index 275ca83..96c0d8a 100644 --- a/maa_mcp/paths.py +++ b/maa_mcp/paths.py @@ -67,6 +67,16 @@ def get_screenshots_dir() -> Path: return get_data_dir() / "screenshots" +def get_logs_dir() -> Path: + """ + 获取日志目录路径 + + Returns: + 日志目录路径 (data_dir/logs) + """ + return get_data_dir() / "logs" + + def ensure_dirs() -> None: """ 确保所有必要的目录存在 @@ -76,6 +86,7 @@ def ensure_dirs() -> None: get_model_dir(), get_ocr_dir(), get_screenshots_dir(), + get_logs_dir(), ] for d in dirs: d.mkdir(parents=True, exist_ok=True) diff --git a/maa_mcp/pipeline.py b/maa_mcp/pipeline.py index e9df552..55f6b02 100644 --- a/maa_mcp/pipeline.py +++ b/maa_mcp/pipeline.py @@ -12,7 +12,6 @@ from datetime import datetime from pathlib import Path from typing import Optional -from urllib.parse import urlencode from lzstring import LZString from maa.tasker import TaskDetail @@ -355,17 +354,16 @@ def save_pipeline( else: filepath = filepath / f"pipeline_{timestamp}.json" else: - # 默认保存到用户的 Documents/MaaMCP 目录 - maamcp_dir = Path.home() / "Documents" / "MaaMCP" - maamcp_dir.mkdir(parents=True, exist_ok=True) + pipelines_dir = get_data_dir() / "pipelines" + pipelines_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if name: # 清理名称中的非法字符 safe_name = "".join(c for c in name if c.isalnum() or c in "._- ") safe_name = safe_name.strip()[:50] or "pipeline" - filepath = maamcp_dir / f"{safe_name}_{timestamp}.json" + filepath = pipelines_dir / f"{safe_name}_{timestamp}.json" else: - filepath = maamcp_dir / f"pipeline_{timestamp}.json" + filepath = pipelines_dir / f"pipeline_{timestamp}.json" # 检查文件是否已存在 if filepath.exists() and not overwrite: @@ -459,11 +457,27 @@ def run_pipeline( MPE 相关配置 """ +# MPE 分享协议版本 +MPE_SHARE_VERSION = 1 +# URL 参数名 +MPE_SHARE_PARAM = "shared" # 默认 MPE 基准地址 MPE_BASE_URL = "https://mpe.codax.site/stable" -# 参数配置 -MPE_IMPORT_PARAM = "import" # 起始目录 -MPE_IMPORT_FILE_PARAM = "file" # 建议文件名 +# URL 最大大小限制 +MPE_MAX_URL_SIZE = 60 * 1024 # 60KB + + +def generate_share_link(pipeline_obj: dict) -> str: + # 生成分享链接 + payload = { + "v": MPE_SHARE_VERSION, + "d": pipeline_obj, + } + json_string = json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + lz = LZString() + compressed = lz.compressToEncodedURIComponent(json_string) + share_url = f"{MPE_BASE_URL}?{MPE_SHARE_PARAM}={compressed}" + return share_url @mcp.tool( @@ -475,18 +489,18 @@ def run_pipeline( - pipeline_file_path: Pipeline JSON 文件的本地路径(字符串) 功能说明: - 该工具会根据 Pipeline 文件路径推断起始目录和文件名,生成导入参数 URL, - 并自动在系统默认浏览器中打开。前端会提示用户从指定目录选择文件进行导入。 + 该工具会读取指定路径的 Pipeline JSON 文件,将数据压缩编码后生成一个分享链接, + 并自动在系统默认浏览器中打开,方便用户可视化查看工作流结构。 注意: - 此工具无返回值,仅执行打开浏览器的操作 - 仅在用户要求查看 Pipeline 可视化流程图时使用 - 传入的文件路径必须指向一个有效的本地 JSON 文件 - - 前端会根据 URL 参数提示用户从本地选择文件导入 + - 如果生成的 URL 超过 60KB,将返回错误提示而不打开浏览器 """, ) def open_pipeline_in_browser(pipeline_file_path: str) -> None: - # 获取文件路径 + # 读取文件内容 file_path = Path(pipeline_file_path) if not file_path.exists(): @@ -494,43 +508,18 @@ def open_pipeline_in_browser(pipeline_file_path: str) -> None: if not file_path.is_file(): raise ValueError(f"路径不是文件: {pipeline_file_path}") - # 推断起始目录和文件名 - lower_path = str(file_path).lower() - if "downloads" in lower_path or "download" in lower_path or "下载" in lower_path: - start_dir = "downloads" - file_name = file_path.name - elif "documents" in lower_path or "docs" in lower_path or "文档" in lower_path: - start_dir = "documents" - # 检查是否在 MaaMCP 子目录中 - if "maamcp" in lower_path: - file_name = f"MaaMCP/{file_path.name}" - else: - file_name = file_path.name - elif "desktop" in lower_path or "桌面" in lower_path: - start_dir = "desktop" - file_name = file_path.name - elif "music" in lower_path or "音乐" in lower_path: - start_dir = "music" - file_name = file_path.name - elif "pictures" in lower_path or "图片" in lower_path: - start_dir = "pictures" - file_name = file_path.name - elif "videos" in lower_path or "视频" in lower_path: - start_dir = "videos" - file_name = file_path.name - else: - # 无法推断起始目录 + with open(file_path, "r", encoding="utf-8") as f: + pipeline_obj = json.load(f) + + # 生成分享链接 + share_url = generate_share_link(pipeline_obj) + + # 检查 URL 大小 + url_size = len(share_url.encode("utf-8")) + if url_size > MPE_MAX_URL_SIZE: + size_kb = url_size / 1024 raise ValueError( - f"无法从路径推断起始目录: {pipeline_file_path}\n" - f"请将文件放置在以下目录之一: Downloads、Documents、Desktop、Music、Pictures、Videos" + f"生成的分享链接过大({size_kb:.2f} KB),请自行通过复制或文件的方式导入 Pipeline 至 MPE。" ) - # 生成 URL - params = { - MPE_IMPORT_PARAM: start_dir, - MPE_IMPORT_FILE_PARAM: file_name, - } - query_str = urlencode(params) - open_url = f"{MPE_BASE_URL}?{query_str}" - - webbrowser.open(open_url) + webbrowser.open(share_url) diff --git a/maa_mcp/pipeline/__init__.py b/maa_mcp/pipeline/__init__.py new file mode 100644 index 0000000..5f2aaa4 --- /dev/null +++ b/maa_mcp/pipeline/__init__.py @@ -0,0 +1,22 @@ +# maa_mcp/pipeline/__init__.py +""" +Pipeline 模块 +============= +流水线服务器的核心组件。 + +包含: +- logging_config: 日志配置 +- state: 流水线状态管理 +""" + +from .logging_config import setup_logger, get_logger +from .state import PipelineState, get_pipeline_state + +__all__ = [ + # 日志 + "setup_logger", + "get_logger", + # 状态管理 + "PipelineState", + "get_pipeline_state", +] diff --git a/maa_mcp/pipeline/logging_config.py b/maa_mcp/pipeline/logging_config.py new file mode 100644 index 0000000..5bce7bb --- /dev/null +++ b/maa_mcp/pipeline/logging_config.py @@ -0,0 +1,93 @@ +# maa_mcp/pipeline/logging_config.py +""" +日志配置模块 +============ +使用 loguru 配置日志输出到控制台和文件。 +""" + +import sys +from pathlib import Path +from typing import Optional + +from loguru import logger + +from maa_mcp.paths import get_logs_dir + + +# 标记是否已初始化 +_initialized = False + + +def setup_logger( + file_level: str = "DEBUG", + error_retention: str = "30 days", + log_retention: str = "7 days", +) -> None: + """ + 配置 loguru 日志系统。 + + 注意:默认只输出日志到文件,不输出到控制台。 + 如果需要临时启用控制台输出,可以取消注释函数内部的控制台输出配置代码。 + + Args: + file_level: 文件日志级别 + error_retention: 错误日志保留时间 + log_retention: 普通日志保留时间 + """ + global _initialized + + if _initialized: + return + + # 获取日志目录 + logs_dir = get_logs_dir() + logs_dir.mkdir(parents=True, exist_ok=True) + + # 移除默认 handler + logger.remove() + + # 添加文件输出 - 按日期轮转 + logger.add( + logs_dir / "pipeline_{time:YYYY-MM-DD}.log", + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", + level=file_level, + rotation="00:00", # 每天午夜轮转 + retention=log_retention, + compression="zip", # 压缩旧日志 + encoding="utf-8", + ) + + # 添加错误日志单独文件 + logger.add( + logs_dir / "pipeline_error_{time:YYYY-MM-DD}.log", + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", + level="ERROR", + rotation="00:00", + retention=error_retention, + compression="zip", + encoding="utf-8", + ) + + _initialized = True + logger.bind(module="Logger").info(f"日志系统初始化完成,日志目录: {logs_dir}") + + +def get_logger(module: str = "Pipeline"): + """ + 获取带模块标识的 logger。 + + Args: + module: 模块名称标识 + + Returns: + 绑定了模块名的 logger 实例 + """ + # 确保已初始化 + if not _initialized: + setup_logger() + + return logger.bind(module=module) + + +# 模块级别的便捷导出 +__all__ = ["setup_logger", "get_logger", "logger"] diff --git a/maa_mcp/pipeline/state.py b/maa_mcp/pipeline/state.py new file mode 100644 index 0000000..58ebce2 --- /dev/null +++ b/maa_mcp/pipeline/state.py @@ -0,0 +1,127 @@ +# maa_mcp/pipeline/state.py +""" +状态管理模块 +============ +流水线状态管理类。 +""" + +import threading +from threading import Lock, Event +from queue import Queue, Empty +from typing import Optional, Dict, Any + + +class PipelineState: + """ + 流水线全局状态(单例,线程安全) + + 管理流水线的运行状态、消息队列和统计信息。 + """ + + _instance = None + _lock = Lock() # 类属性:全局共享锁 + + def __new__(cls): + if cls._instance is None: + with cls._lock: + # 双重检查锁定 + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + self._initialized = True + self.is_running = False + self.stop_event = Event() + self.pipeline_thread: Optional[threading.Thread] = None + self.message_queue = Queue(maxsize=100) + self.stats_dict: Dict[str, Any] = {} + self.last_screen_state: Dict[str, Any] = {} + self.controller_id: Optional[str] = None + self.reset() + + def reset(self): + """重置流水线状态""" + with PipelineState._lock: + self.is_running = False + self.stop_event.clear() + # 清空队列 + while not self.message_queue.empty(): + try: + self.message_queue.get_nowait() + except Empty: + break + self.stats_dict = { + "frame_count": 0, + "ocr_count": 0, + "new_message_count": 0, + "start_time": 0, + "last_update": 0, + } + self.last_screen_state = {} + self.controller_id = None + + def start(self, controller_id: str): + """标记流水线启动""" + with PipelineState._lock: + self.is_running = True + self.controller_id = controller_id + + def stop(self): + """标记流水线停止""" + with PipelineState._lock: + self.is_running = False + self.stop_event.set() + + def update_stats(self, **kwargs): + """更新统计信息""" + with PipelineState._lock: + for key, value in kwargs.items(): + self.stats_dict[key] = value + + def increment_stat(self, key: str, amount: int = 1): + """增加统计计数""" + with PipelineState._lock: + self.stats_dict[key] = self.stats_dict.get(key, 0) + amount + + def get_stats(self) -> Dict[str, Any]: + """获取统计信息副本""" + with PipelineState._lock: + return dict(self.stats_dict) + + def update_screen_state(self, texts: list, timestamp: float): + """更新屏幕状态""" + with PipelineState._lock: + self.last_screen_state["texts"] = texts + self.last_screen_state["timestamp"] = timestamp + + def get_screen_state(self) -> Dict[str, Any]: + """获取屏幕状态副本""" + with PipelineState._lock: + return dict(self.last_screen_state) + + +# 全局状态实例(懒加载) +_pipeline_state: Optional[PipelineState] = None + + +def get_pipeline_state() -> PipelineState: + """ + 获取流水线状态单例实例 + + Returns: + PipelineState 实例 + """ + global _pipeline_state + if _pipeline_state is None: + _pipeline_state = PipelineState() + return _pipeline_state + + +__all__ = [ + "PipelineState", + "get_pipeline_state", +] diff --git a/maa_mcp/pipeline_server.py b/maa_mcp/pipeline_server.py new file mode 100644 index 0000000..6f9629c --- /dev/null +++ b/maa_mcp/pipeline_server.py @@ -0,0 +1,352 @@ +# pipeline_server.py +""" +多线程流水线 MCP 服务器 +====================== +真正可运行的 MCP 服务器入口,支持多线程后台监控。 + +使用方法: +作为 MCP 服务器运行 (替代 __main__.py): + python maa_mcp/pipeline_server.py +""" + +import time +from threading import Thread, Event +from queue import Queue, Empty, Full +from typing import List, Dict, Any + +# 导入 MCP Core 和 Registry +from maa_mcp.core import mcp, controller_info_registry, object_registry + +# 导入功能模块以注册基础工具 +import maa_mcp.adb +import maa_mcp.win32 +import maa_mcp.vision +import maa_mcp.control +import maa_mcp.utils +import maa_mcp.resource + +# 导入 Pipeline 子模块 +from dataclasses import dataclass + +from maa_mcp.pipeline import ( + setup_logger, + get_logger, + PipelineState, + get_pipeline_state, +) + + +# 流水线配置类 +@dataclass +class PipelineConfig: + """流水线配置""" + + screenshot_fps: float = 2.0 # 截图帧率 + message_queue_size: int = 100 # 消息队列大小 + similarity_threshold: int = 5 # 图像相似度阈值 + enable_dedup: bool = True # 启用消息去重 + + +# UI 元素过滤列表(用于消息去重时过滤 UI 文本) +UI_ELEMENTS_FILTER = {"微信", "发送", "输入", "语音", "表情", "更多"} + +# 导入现有的工具实现函数(内部函数,可直接调用) +from maa_mcp.vision import _ocr_impl + +# ==================== 初始化日志 ==================== + +setup_logger() +logger = get_logger("PipelineServer") + + +# ==================== 流水线核心逻辑 ==================== + + +def run_pipeline_loop( + controller_id: str, + config_dict: Dict, + stop_event: Event, + message_queue: Queue, +): + """ + 流水线主循环(多线程版) + + 后台线程持续截图并执行 OCR,将 OCR 文字结果传递给大模型。 + 大模型直接使用文字结果进行决策,无需处理图片。 + + Args: + controller_id: 控制器 ID + config_dict: 配置字典 + stop_event: 停止事件 + message_queue: 消息队列(存放 OCR 结果) + """ + thread_logger = get_logger("PipelineLoop") + + thread_logger.debug(f"[初始化] 流水线线程启动") + thread_logger.debug(f"[初始化] controller_id={controller_id}") + thread_logger.info(f"流水线线程启动,控制器: {controller_id}") + + fps = config_dict.get("fps", 2.0) + frame_count = 0 + interval = 1.0 / fps + + thread_logger.debug(f"[初始化] fps={fps}, interval={interval}s") + thread_logger.info("流水线初始化完成,开始主循环(OCR 模式)") + + while not stop_event.is_set(): + try: + loop_start = time.time() + frame_count += 1 + + thread_logger.debug(f"[Frame {frame_count}] 开始 OCR...") + + # 调用 vision.py 中的 _ocr_impl,执行截图+OCR + ocr_results = _ocr_impl(controller_id) + + # 处理 OCR 返回值 + if ocr_results is None: + thread_logger.debug(f"[Frame {frame_count}] OCR 失败: None") + time.sleep(interval) + continue + + # 检查是否为错误信息(字符串) + if isinstance(ocr_results, str): + thread_logger.warning(f"[Frame {frame_count}] OCR 错误: {ocr_results}") + time.sleep(interval) + continue + + thread_logger.debug(f"[Frame {frame_count}] OCR 成功,结果条数: {len(ocr_results)}") + + # 将 OCR 结果放入消息队列 + message_data = { + "type": "ocr", + "ocr_results": ocr_results, + "timestamp": time.time(), + "frame_id": frame_count, + } + try: + message_queue.put_nowait(message_data) + thread_logger.info(f"📷 OCR 结果: {len(ocr_results)} 条") + except Full: + thread_logger.warning(f"[Frame {frame_count}] 消息队列已满,丢弃 OCR 结果") + + elapsed = time.time() - loop_start + sleep_time = max(0, interval - elapsed) + if sleep_time > 0: + time.sleep(sleep_time) + + except Exception as e: + thread_logger.error(f"流水线异常: {e}") + import traceback + + thread_logger.debug(f"堆栈: {traceback.format_exc()}") + time.sleep(1) + + thread_logger.info("流水线线程已停止") + + +# ==================== MCP 工具实现 ==================== + + +def _start_pipeline_impl(controller_id: str, fps: float = 2.0) -> str: + """启动流水线实现""" + try: + logger.debug( + f"[启动] 收到启动流水线请求: controller_id={controller_id}, fps={fps}" + ) + + pipeline_state = get_pipeline_state() + + if pipeline_state.is_running: + return "⚠️ 流水线已经在运行中" + + # 获取控制器信息,验证 controller_id 是否有效 + info = controller_info_registry.get(controller_id) + if not info: + return f"❌ 未找到控制器: {controller_id},请先连接设备" + + # 验证 controller 对象存在 + if object_registry.get(controller_id) is None: + return f"❌ 未找到控制器对象: {controller_id}" + + pipeline_state.reset() + pipeline_state.controller_id = controller_id + pipeline_state.stats_dict["start_time"] = time.time() + + logger.info(f"正在启动流水线线程, controller_id={controller_id}") + + # 启动流水线线程,只需要传递 controller_id + pipeline_state.pipeline_thread = Thread( + target=run_pipeline_loop, + args=( + controller_id, + {"fps": fps, "enable_dedup": True}, + pipeline_state.stop_event, + pipeline_state.message_queue, + ), + daemon=True, + name=f"PipelineThread-{controller_id}", + ) + + pipeline_state.pipeline_thread.start() + pipeline_state.is_running = True + + logger.info(f"流水线已启动, Thread={pipeline_state.pipeline_thread.name}") + + return f"✅ 流水线已启动 (Thread: {pipeline_state.pipeline_thread.name})" + except Exception as e: + logger.exception("启动流水线失败") + return f"❌ 启动流水线失败: {str(e)}" + + +def _stop_pipeline_impl() -> str: + """停止流水线实现""" + pipeline_state = get_pipeline_state() + if not pipeline_state.is_running: + return "⚠️ 流水线未在运行" + + pipeline_state.stop_event.set() + if pipeline_state.pipeline_thread: + pipeline_state.pipeline_thread.join(timeout=5) + if pipeline_state.pipeline_thread.is_alive(): + logger.warning("流水线线程未能在5秒内停止") + + pipeline_state.is_running = False + return "✅ 流水线已停止" + + +def _get_new_messages_impl(max_count: int = 10) -> List[Dict[str, Any]]: + """获取消息实现""" + pipeline_state = get_pipeline_state() + messages = [] + for _ in range(max_count): + try: + messages.append(pipeline_state.message_queue.get_nowait()) + except Empty: + break + return messages + + +def _get_pipeline_status_impl() -> Dict[str, Any]: + """获取状态实现""" + pipeline_state = get_pipeline_state() + stats = pipeline_state.get_stats() + start_time = stats.get("start_time", 0) + uptime = time.time() - start_time if start_time > 0 else 0 + return { + "is_running": pipeline_state.is_running, + "controller_id": pipeline_state.controller_id, + "uptime": round(uptime, 1), + "pending": pipeline_state.message_queue.qsize(), + } + + +# ==================== MCP 工具注册 ==================== + + +@mcp.tool( + name="start_pipeline", + description=""" + 启动后台监控流水线,持续对设备屏幕进行截图+OCR 并缓存 OCR 结果。 + + 参数: + - controller_id: 控制器 ID,由 connect_adb_device() 或 connect_window() 返回 + - fps: 截图帧率(默认 2.0),控制每秒 OCR 次数 + + 返回值: + - 成功:返回包含 "✅" 的成功信息 + - 失败:返回包含 "❌" 的错误信息 + + 说明: + 流水线启动后会在后台线程持续运行,定期截图并执行 OCR,将 OCR 结果放入消息队列。 + 可通过 get_new_messages() 获取 OCR 结果,大模型直接使用文字结果进行决策。 + 同一时间只能运行一个流水线实例。 + """, +) +def start_pipeline(controller_id: str, fps: float = 2.0) -> str: + return _start_pipeline_impl(controller_id, fps) + + +@mcp.tool( + name="stop_pipeline", + description=""" + 停止当前运行的后台监控流水线。 + + 参数: + 无 + + 返回值: + - 成功:返回包含 "✅" 的成功信息 + - 未运行:返回包含 "⚠️" 的提示信息 + + 说明: + 停止流水线后,后台线程将结束运行,消息队列中的未读消息仍可通过 get_new_messages() 获取。 + """, +) +def stop_pipeline() -> str: + return _stop_pipeline_impl() + + +@mcp.tool( + name="get_new_messages", + description=""" + 获取流水线缓存的最新 OCR 结果(非阻塞)。 + + 参数: + - max_count: 最大获取数量(默认 10),控制单次调用返回的消息数量上限 + + 返回值: + - 成功:返回消息列表,每条消息包含以下字段: + - type: 消息类型,固定为 "ocr" + - ocr_results: OCR 识别结果列表,包含文字、坐标、置信度等信息 + - timestamp: OCR 时间戳 + - frame_id: 帧序号 + - 无新消息:返回空列表 [] + + 说明: + 此方法为非阻塞调用,立即返回当前队列中的 OCR 结果。 + 获取后的消息会从队列中移除,不会重复返回。 + + 建议用法: + 1. 获取 ocr_results 后,直接使用文字结果进行分析决策 + 2. 根据 OCR 结果中的坐标信息执行点击、滑动等操作 + 3. 无需再调用 ocr() 工具,直接使用队列中的结果即可 + """, +) +def get_new_messages(max_count: int = 10) -> List[Dict[str, Any]]: + return _get_new_messages_impl(max_count) + + +@mcp.tool( + name="get_pipeline_status", + description=""" + 获取流水线的当前运行状态。 + + 参数: + 无 + + 返回值: + 返回状态字典,包含以下字段: + - is_running: 是否正在运行(布尔值) + - controller_id: 当前绑定的控制器 ID(字符串或 None) + - uptime: 运行时长(秒,浮点数) + - pending: 待处理消息数量(整数) + + 说明: + 可用于检查流水线是否正常运行,以及监控消息队列的积压情况。 + """, +) +def get_pipeline_status() -> Dict[str, Any]: + return _get_pipeline_status_impl() + + +# ==================== 主入口 ==================== + + +def main(): + # 启动 MCP 服务器 + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/maa_mcp/vision.py b/maa_mcp/vision.py index 6d6175a..efbba8f 100644 --- a/maa_mcp/vision.py +++ b/maa_mcp/vision.py @@ -14,25 +14,38 @@ from maa_mcp.paths import get_screenshots_dir -@mcp.tool( - name="screencap_and_ocr", - description=""" - 对当前设备屏幕进行截图,并执行光学字符识别(OCR)处理。 +def _screencap(controller_id: str) -> Optional[str]: + controller: Controller | None = object_registry.get(controller_id) + if not controller: + return None + image = controller.post_screencap().wait().get() + if image is None: + return None + + # 保存截图到跨平台用户数据目录,返回路径供大模型按需读取 + screenshots_dir = get_screenshots_dir() + screenshots_dir.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + filepath = screenshots_dir / f"screenshot_{timestamp}.png" + success = cv2.imwrite(str(filepath), image) + if not success: + return None + # 记录当前会话保存的截图文件路径,用于退出时清理 + _saved_screenshots.append(filepath) + return str(filepath.absolute()) +def _ocr_impl(controller_id: str) -> Optional[Union[list, str]]: + """ + OCR 核心实现(可被其他模块复用) + 参数: - - controller_id: 控制器 ID,由 connect_adb_device() 或 connect_window() 返回 - + - controller_id: 控制器 ID + 返回值: - - 成功:返回识别结果列表,包含识别到的文字、坐标信息、置信度等结构化数据 - - OCR 资源不存在(首次使用):返回字符串提示信息,需要调用 check_and_download_ocr() 下载资源后重试 - - 失败:返回 None(截图失败或 OCR 识别失败) - - 说明: - 识别结果可用于后续的坐标定位和自动化决策,通常包含文本内容、坐标等信息。 - 需根据坐标信息理解屏幕上文字的位置和布局,以便进行进一步的交互操作。 -""", -) -def screencap_and_ocr(controller_id: str) -> Optional[Union[list, str]]: + - 成功:返回识别结果列表 + - OCR 资源不存在:返回字符串提示信息 + - 失败:返回 None + """ # 先检查 OCR 资源是否存在,不存在则返回提示信息让 AI 主动调用下载 if not check_ocr_files_exist(): return "OCR 模型文件不存在,请先调用 check_and_download_ocr() 下载 OCR 资源后重试" @@ -52,7 +65,30 @@ def screencap_and_ocr(controller_id: str) -> Optional[Union[list, str]]: @mcp.tool( - name="screencap_only", + name="ocr", + description=""" + 对当前设备屏幕进行截图,并执行光学字符识别(OCR)处理。 + + 参数: + - controller_id: 控制器 ID,由 connect_adb_device() 或 connect_window() 返回 + + 返回值: + - 成功:返回识别结果列表,包含识别到的文字、坐标信息、置信度等结构化数据 + - OCR 资源不存在(首次使用):返回字符串提示信息,需要调用 check_and_download_ocr() 下载资源后重试 + - 失败:返回 None(截图失败或 OCR 识别失败) + + 说明: + 识别结果可用于后续的坐标定位和自动化决策,通常包含文本内容、边界框坐标、置信度评分等信息。 + 首次使用时,如果 OCR 模型文件不存在,会返回提示信息,此时需要调用 check_and_download_ocr() 下载资源后再重试。 + 下载完成后即可正常使用,后续调用无需再次下载。 +""", +) +def ocr(controller_id: str) -> Optional[Union[list, str]]: + return _ocr_impl(controller_id) + + +@mcp.tool( + name="screencap", description=""" 对当前设备屏幕进行截图。 参数: @@ -62,21 +98,5 @@ def screencap_and_ocr(controller_id: str) -> Optional[Union[list, str]]: - 失败:返回 None """, ) -def screencap_only(controller_id: str) -> Optional[str]: - controller = object_registry.get(controller_id) - if not controller: - return None - image = controller.post_screencap().wait().get() - if image is None: - return None - # 保存截图到跨平台用户数据目录,返回路径供大模型按需读取 - screenshots_dir = get_screenshots_dir() - screenshots_dir.mkdir(parents=True, exist_ok=True) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") - filepath = screenshots_dir / f"screenshot_{timestamp}.png" - success = cv2.imwrite(str(filepath), image) - if not success: - return None - # 记录当前会话保存的截图文件路径,用于退出时清理 - _saved_screenshots.append(filepath) - return str(filepath.absolute()) +def screencap(controller_id: str) -> Optional[str]: + return _screencap(controller_id) diff --git a/maa_mcp/win32.py b/maa_mcp/win32.py index d1fbbed..ed600df 100644 --- a/maa_mcp/win32.py +++ b/maa_mcp/win32.py @@ -125,9 +125,17 @@ def connect_window( if not window_controller.post_connection().wait().succeeded: return None controller_id = object_registry.register(window_controller) + + connection_params = { + "hwnd": window.hwnd, + "screencap_method": screencap_method, + "mouse_method": mouse_method, + "keyboard_method": keyboard_method, + } + controller_info_registry[controller_id] = ControllerInfo( controller_type=ControllerType.WIN32, + connection_params=connection_params, keyboard_method=keyboard_method, ) return controller_id - diff --git a/pyproject.toml b/pyproject.toml index 7d53de2..c852f66 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dependencies = [ "fastmcp>=2.0.0", "opencv-python>=4.0.0", "platformdirs>=4.0.0", + "loguru>=0.7.0", "lzstring>=1.0.4", ] @@ -56,6 +57,9 @@ Issues = "https://github.com/MistEO/MaaMCP/issues" [project.scripts] maa-mcp = "maa_mcp.__main__:main" +maa_mcp = "maa_mcp.__main__:main" +maa-mcp-server = "maa_mcp.pipeline_server:main" +maa_mcp_server = "maa_mcp.pipeline_server:main" [tool.hatch.version] source = "vcs" diff --git a/tests/test_performance_stress.py b/tests/test_performance_stress.py new file mode 100644 index 0000000..0679074 --- /dev/null +++ b/tests/test_performance_stress.py @@ -0,0 +1,672 @@ +"""压力测试 - 测量关键函数在高负载下的性能表现""" + +import time +import pytest +from typing import Callable, Any, Optional, List, Dict + +from maa_mcp.vision import ocr, screencap +from maa_mcp.control import click, swipe, input_text, click_key, scroll, double_click +from maa_mcp.core import controller_info_registry, ControllerType +from maa_mcp.adb import find_adb_device_list, connect_adb_device +from maa_mcp.win32 import find_window_list, connect_window + + +def _call_tool(func, *args, **kwargs): + """兼容模式:调用工具函数,自动处理 FunctionTool 和普通函数""" + # 如果 func 有 .fn 属性,说明是 FunctionTool,使用 .fn() 调用 + if hasattr(func, 'fn'): + return func.fn(*args, **kwargs) + # 否则直接调用 + return func(*args, **kwargs) + + +class PerformanceTimer: + """性能计时器,用于测量函数执行时间""" + + def __init__(self): + self.start_time: Optional[float] = None + self.end_time: Optional[float] = None + self.elapsed_time: Optional[float] = None + + def start(self): + """开始计时""" + self.start_time = time.perf_counter() + self.end_time = None + self.elapsed_time = None + + def stop(self): + """停止计时""" + if self.start_time is not None: + self.end_time = time.perf_counter() + self.elapsed_time = self.end_time - self.start_time + + def __enter__(self): + """上下文管理器入口""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """上下文管理器出口""" + self.stop() + + +class PerformanceTestResult: + """性能测试结果类""" + + def __init__( + self, + function_name: str, + execution_time: float, + success: bool, + result: Any = None, + ): + self.function_name = function_name + self.execution_time = execution_time + self.success = success + self.result = result + + def __str__(self): + status = "成功" if self.success else "失败" + return f"{self.function_name} - {status}, 耗时: {self.execution_time:.4f}秒" + + +class PerformanceBenchmarker: + """性能基准测试工具,用于批量测试函数性能""" + + def __init__(self): + self.results: List[PerformanceTestResult] = [] + + def benchmark(self, func: Callable, *args, **kwargs) -> PerformanceTestResult: + """执行单次性能测试""" + timer = PerformanceTimer() + success = False + result = None + + try: + timer.start() + # 兼容 FunctionTool 和普通函数 + result = _call_tool(func, *args, **kwargs) + timer.stop() + success = True + except Exception as e: + timer.stop() + print(f"[Error] {getattr(func, '__name__', str(func))} 执行失败: {e}") + + # 获取函数名:优先使用 __name__,FunctionTool 使用 name 属性 + func_name = getattr(func, '__name__', None) or getattr(func, 'name', str(func)) + + test_result = PerformanceTestResult( + function_name=func_name, + execution_time=timer.elapsed_time or 0, + success=success, + result=result, + ) + + self.results.append(test_result) + return test_result + + def run_multiple( + self, + func: Callable, + iterations: int = 5, + print_stats: bool = True, + *args, + **kwargs, + ) -> List[PerformanceTestResult]: + """多次执行性能测试,获取平均时间""" + test_results = [] + + total_start_time = time.perf_counter() + last_print_time = 0 + + for i in range(iterations): + # 使用\r实现同一行滚动显示进度,限制刷新频率避免拖慢速度 + current_time = time.perf_counter() + if iterations > 1 and print_stats: + # 每0.1秒或最后一次才刷新 + if current_time - last_print_time > 0.1 or i == iterations - 1: + print( + f"\r[Iteration {i+1}/{iterations}] - 进行中...", + end="", + flush=True, + ) + last_print_time = current_time + + result = self.benchmark(func, *args, **kwargs) + test_results.append(result) + + total_end_time = time.perf_counter() + total_wall_time = total_end_time - total_start_time + + # 完成后换行 + if iterations > 1 and print_stats: + print() + + # 计算统计信息 + if test_results and print_stats: + success_times = [r.execution_time for r in test_results if r.success] + if success_times: + avg_time = sum(success_times) / len(success_times) + max_time = max(success_times) + min_time = min(success_times) + total_execution_time = sum(success_times) + + print(f"\n[Statistics] {func.__name__}") + print(f" 平均时间: {avg_time:.4f} 秒") + print(f" 最大时间: {max_time:.4f} 秒") + print(f" 最小时间: {min_time:.4f} 秒") + print(f" 成功率: {len(success_times)}/{iterations}") + print(f" 总执行耗时 (Sum): {total_execution_time:.4f} 秒") + print(f" 总墙钟耗时 (Wall): {total_wall_time:.4f} 秒") + if total_wall_time > total_execution_time * 1.1: + print(f" 注意: 墙钟时间显著大于执行时间,可能存在系统开销或IO等待") + + return test_results + + def print_summary(self): + """打印所有测试结果摘要""" + print("\n" + "=" * 50) + print("性能测试结果摘要") + print("=" * 50) + + for result in self.results: + print(result) + + # 统计总览 + total_tests = len(self.results) + successful_tests = sum(1 for r in self.results if r.success) + avg_time_all = ( + sum(r.execution_time for r in self.results if r.success) / successful_tests + if successful_tests + else 0 + ) + + print(f"\n总览: {successful_tests}/{total_tests} 个测试成功") + if successful_tests: + print(f"平均执行时间: {avg_time_all:.4f} 秒") + print("=" * 50) + + +class StressTestConfig: + """压力测试配置类""" + + def __init__(self): + self.iterations = 10 # 默认执行次数 + self.warmup_iterations = 10 # 预热迭代次数 + + +class TestStressPerformance: + """压力测试类 - 测试关键函数在高负载下的性能""" + + def setup_class(self): + """测试类初始化,获取实际控制器""" + self.config = StressTestConfig() + self.benchmarker = PerformanceBenchmarker() + self.controller_id = None + self.device_name = None + self.window_name = None + + # 尝试获取ADB设备 + try: + device_list = _call_tool(find_adb_device_list) + if device_list: + self.device_name = device_list[0] # 使用第一个设备 + self.controller_id = _call_tool(connect_adb_device, self.device_name) + print( + f" 使用ADB设备: {self.device_name}, 控制器ID: {self.controller_id}" + ) + except Exception as e: + print(f" 获取ADB设备失败: {e}") + + # 如果没有ADB设备,尝试获取Windows窗口 + if not self.controller_id: + try: + window_list = _call_tool(find_window_list) + if window_list: + self.window_name = window_list[0] # 使用第一个窗口 + self.controller_id = _call_tool(connect_window, self.window_name) + print( + f" 使用Windows窗口: {self.window_name}, 控制器ID: {self.controller_id}" + ) + except Exception as e: + print(f" 获取Windows窗口失败: {e}") + + # 如果仍然没有控制器,跳过测试类 + if not self.controller_id: + pytest.skip("未检测到可用的真实控制器设备/窗口,跳过压力性能测试") + + def test_stress_find_adb_device_list(self): + """压力测试 - find_adb_device_list 函数""" + print(f"\n=== 压力测试: find_adb_device_list ({self.config.iterations}次) ===") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(find_adb_device_list) + + # 执行压力测试 + results = self.benchmarker.run_multiple( + find_adb_device_list, + iterations=self.config.iterations, + print_stats=False, + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "find_adb_device_list") + + def test_stress_find_window_list(self): + """压力测试 - find_window_list 函数""" + print(f"\n=== 压力测试: find_window_list ({self.config.iterations}次) ===") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(find_window_list) + + # 执行压力测试 + results = self.benchmarker.run_multiple( + find_window_list, + iterations=self.config.iterations, + print_stats=False, + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "find_window_list") + + def test_stress_ocr(self): + """压力测试 - OCR 函数""" + print(f"\n=== 压力测试: ocr ({self.config.iterations}次) ===") + + if not self.controller_id: + pytest.skip("未检测到可用控制器") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(ocr, self.controller_id) + + # 执行压力测试 + results = self.benchmarker.run_multiple( + ocr, + iterations=self.config.iterations, + print_stats=False, + controller_id=self.controller_id, + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "ocr") + + # 轻量级断言 + assert results, "OCR 压力测试没有产生任何结果" + successes = [r for r in results if r.success] + success_ratio = len(successes) / len(results) + assert success_ratio >= 0.8, f"OCR 成功率过低: {success_ratio:.2%}" + + durations_ms = [r.execution_time * 1000 for r in successes] + if durations_ms: + avg_duration = sum(durations_ms) / len(durations_ms) + max_duration = max(durations_ms) + assert max_duration < 5000, f"OCR 单次调用耗时过长: {max_duration:.1f} ms" + assert avg_duration < 3000, f"OCR 平均耗时过长: {avg_duration:.1f} ms" + + def test_stress_screencap(self): + """压力测试 - 截图函数""" + print(f"\n=== 压力测试: screencap ({self.config.iterations}次) ===") + + if not self.controller_id: + pytest.skip("未检测到可用控制器") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(screencap, self.controller_id) + + # 执行压力测试 + results = self.benchmarker.run_multiple( + screencap, + iterations=self.config.iterations, + print_stats=False, + controller_id=self.controller_id, + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "screencap") + + # 轻量级断言 + assert results, "截图压力测试没有产生任何结果" + successes = [r for r in results if r.success] + success_ratio = len(successes) / len(results) + assert success_ratio >= 0.8, f"截图成功率过低: {success_ratio:.2%}" + + def test_stress_click(self): + """压力测试 - 点击函数""" + print(f"\n=== 压力测试: click ({self.config.iterations}次) ===") + + if not self.controller_id: + pytest.skip("未检测到可用控制器") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(click, self.controller_id, 100, 100) + + # 执行压力测试 + results = self.benchmarker.run_multiple( + click, + iterations=self.config.iterations, + print_stats=False, + controller_id=self.controller_id, + x=100, + y=100, + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "click") + + # 轻量级断言 + assert results, "点击压力测试没有产生任何结果" + successes = [r for r in results if r.success] + success_ratio = len(successes) / len(results) + assert success_ratio >= 0.8, f"点击成功率过低: {success_ratio:.2%}" + + def test_stress_swipe(self): + """压力测试 - 滑动函数""" + print(f"\n=== 压力测试: swipe ({self.config.iterations}次) ===") + + if not self.controller_id: + pytest.skip("未检测到可用控制器") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(swipe, self.controller_id, 100, 100, 200, 200, 500) + + # 执行压力测试 + results = self.benchmarker.run_multiple( + swipe, + iterations=self.config.iterations, + print_stats=False, + controller_id=self.controller_id, + start_x=100, + start_y=100, + end_x=200, + end_y=200, + duration=500, + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "swipe") + + # 轻量级断言 + assert results, "滑动压力测试没有产生任何结果" + successes = [r for r in results if r.success] + success_ratio = len(successes) / len(results) + assert success_ratio >= 0.8, f"滑动成功率过低: {success_ratio:.2%}" + + def test_stress_input_text(self): + """压力测试 - 输入文本函数""" + print(f"\n=== 压力测试: input_text ({self.config.iterations}次) ===") + + if not self.controller_id: + pytest.skip("未检测到可用控制器") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(input_text, self.controller_id, "test") + + # 执行压力测试 + results = self.benchmarker.run_multiple( + input_text, + iterations=self.config.iterations, + print_stats=False, + controller_id=self.controller_id, + text="test", + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "input_text") + + # 轻量级断言 + assert results, "输入文本压力测试没有产生任何结果" + successes = [r for r in results if r.success] + success_ratio = len(successes) / len(results) + assert success_ratio >= 0.8, f"输入文本成功率过低: {success_ratio:.2%}" + + def test_stress_click_key(self): + """压力测试 - 按键点击函数""" + print(f"\n=== 压力测试: click_key ({self.config.iterations}次) ===") + + if not self.controller_id: + pytest.skip("未检测到可用控制器") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(click_key, self.controller_id, 13) # 13 是回车键的虚拟键码 + + # 执行压力测试 + results = self.benchmarker.run_multiple( + click_key, + iterations=self.config.iterations, + print_stats=False, + controller_id=self.controller_id, + key=13, # 13 是回车键的虚拟键码 + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "click_key") + + # 轻量级断言 + assert results, "按键点击压力测试没有产生任何结果" + successes = [r for r in results if r.success] + success_ratio = len(successes) / len(results) + assert success_ratio >= 0.8, f"按键点击成功率过低: {success_ratio:.2%}" + + def test_stress_scroll(self): + """压力测试 - 滚动函数""" + print(f"\n=== 压力测试: scroll ({self.config.iterations}次) ===") + + if not self.controller_id: + pytest.skip("未检测到可用控制器") + + # 检查是否为 ADB 控制器 + info = controller_info_registry.get(self.controller_id) + if info and info.controller_type == ControllerType.ADB: + pytest.skip("当前控制器为 ADB,跳过 scroll 压力测试 (仅支持 Windows)") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(scroll, self.controller_id, 0, -120) + + # 执行压力测试 + results = self.benchmarker.run_multiple( + scroll, + iterations=self.config.iterations, + print_stats=False, + controller_id=self.controller_id, + x=0, + y=-120, + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "scroll") + + # 轻量级断言 + assert results, "滚动压力测试没有产生任何结果" + successes = [r for r in results if r.success] + success_ratio = len(successes) / len(results) + assert success_ratio >= 0.8, f"滚动成功率过低: {success_ratio:.2%}" + + def test_stress_double_click(self): + """压力测试 - 双击函数""" + print(f"\n=== 压力测试: double_click ({self.config.iterations}次) ===") + + if not self.controller_id: + pytest.skip("未检测到可用控制器") + + # 预热 + for _ in range(self.config.warmup_iterations): + _call_tool(double_click, self.controller_id, 100, 100) + + # 执行压力测试 + results = self.benchmarker.run_multiple( + double_click, + iterations=self.config.iterations, + print_stats=False, + controller_id=self.controller_id, + x=100, + y=100, + ) + + # 打印详细统计信息 + self._print_stress_test_stats(results, "double_click") + + # 轻量级断言 + assert results, "双击压力测试没有产生任何结果" + successes = [r for r in results if r.success] + success_ratio = len(successes) / len(results) + assert success_ratio >= 0.8, f"双击成功率过低: {success_ratio:.2%}" + + def _print_stress_test_stats(self, results: List, function_name: str): + """打印压力测试的详细统计信息""" + if not results: + print(f" 未获取到测试结果") + return + + # 筛选成功的测试结果 + success_results = [r for r in results if r.success] + if not success_results: + print(f" 所有测试都失败了") + return + + # 计算统计数据 + execution_times = [r.execution_time for r in success_results] + avg_time = sum(execution_times) / len(execution_times) + min_time = min(execution_times) + max_time = max(execution_times) + median_time = sorted(execution_times)[len(execution_times) // 2] + + # 计算每秒处理次数(TPS) + tps = len(success_results) / sum(execution_times) + + print(f"\n[压力测试统计] {function_name}") + print(f" 总执行次数: {len(results)}") + print(f" 成功次数: {len(success_results)}") + print(f" 平均时间: {avg_time * 1000:.3f} 毫秒") + print(f" 最小时间: {min_time * 1000:.3f} 毫秒") + print(f" 最大时间: {max_time * 1000:.3f} 毫秒") + print(f" 中位数时间: {median_time * 1000:.3f} 毫秒") + print(f" 每秒处理次数 (TPS): {tps:.2f}") + print(f" 总耗时: {sum(execution_times) * 1000:.2f} 毫秒") + + +# 性能测试接口 - 为关键函数添加性能测试装饰器 +class PerformanceTestInterface: + """性能测试接口类 - 提供性能测试的统一接口""" + + @staticmethod + def measure_function_performance( + func: Callable, iterations: int = 1000, *args, **kwargs + ) -> Dict[str, Any]: + """测量函数在指定次数迭代下的性能 + + Args: + func: 要测试的函数 + iterations: 迭代次数 + *args: 函数参数 + **kwargs: 函数关键字参数 + + Returns: + 包含性能统计数据的字典 + """ + benchmarker = PerformanceBenchmarker() + + # 预热 + for _ in range(10): + func(*args, **kwargs) + + # 执行测试 + results = benchmarker.run_multiple(func, iterations, *args, **kwargs) + + # 计算统计数据 + success_results = [r for r in results if r.success] + if not success_results: + return { + "function_name": func.__name__, + "iterations": iterations, + "success": False, + "message": "所有测试都失败了", + } + + success_times = [r.execution_time for r in success_results] + avg_time = sum(success_times) / len(success_times) + min_time = min(success_times) + max_time = max(success_times) + median_time = sorted(success_times)[len(success_times) // 2] + tps = len(success_results) / sum(success_times) + + return { + "function_name": func.__name__, + "iterations": iterations, + "success": True, + "total_executions": len(results), + "successful_executions": len(success_results), + "average_time": avg_time, + "minimum_time": min_time, + "maximum_time": max_time, + "median_time": median_time, + "tps": tps, + "total_time": sum(success_times), + } + + @staticmethod + def compare_function_performances( + functions: List[Callable], iterations: int = 1000 + ): + """比较多个函数的性能""" + results = [] + + for func in functions: + result = PerformanceTestInterface.measure_function_performance( + func, iterations + ) + if result: + results.append(result) + + # 按平均时间排序 + results.sort(key=lambda x: x["average_time"]) + + return results + + +# 压力测试示例脚本(可直接运行) +if __name__ == "__main__": + """压力测试示例 - 展示如何使用压力测试模块""" + + print("MaaMCP 压力测试示例") + print("=" * 60) + + # 创建压力测试配置 + config = StressTestConfig() + config.iterations = 1000 # 使用1000次迭代进行完整的压力测试 + + # 创建测试实例 + test = TestStressPerformance() + test.setup_class() + + # 运行部分压力测试 + print("\n1. 运行 find_adb_device_list 压力测试:") + test.test_stress_find_adb_device_list() + + print("\n2. 运行 OCR 压力测试:") + test.test_stress_ocr() + + print("\n3. 运行 click 压力测试:") + test.test_stress_click() + + print("\n4. 运行 input_text 压力测试:") + test.test_stress_input_text() + + print("\n5. 运行 scroll 压力测试:") + test.test_stress_scroll() + + print("\n6. 运行 double_click 压力测试:") + test.test_stress_double_click() + + print("\n" + "=" * 60) + print("压力测试示例执行完成!") + print("要运行完整的1000次迭代测试,请使用 pytest 执行:") + print("pytest tests/test_performance_stress.py -v")