Skip to content

perf: 流水线模式改为ocr优先,优化连接窗口描述,swipe增加duration约束 避免因为惯性导致非预期滚动#28

Merged
KhazixW2 merged 20 commits intoMAA-AI:mainfrom
KhazixW2:main
Mar 24, 2026
Merged

Conversation

@KhazixW2
Copy link
Copy Markdown
Collaborator

@KhazixW2 KhazixW2 commented Mar 23, 2026

  1. 模拟器窗口的连接应该优先adb方式连接,普通窗口在用windows连接
  2. swipe增加duration约束 避免因为惯性导致非预期滚动

Summary by Sourcery

引入串行/流水线双重运行模式,配套专用的流水线 MCP 服务器,优先支持基于 OCR 的工作流,并改进连接、日志及分享能力,以提升自动化性能与可观测性。

New Features:

  • 添加专用的流水线 MCP 服务器入口点,提供后台 OCR 监控工具(启动/停止流水线、获取消息、查询状态)。
  • 引入流水线状态管理模块,并使用 loguru 实现集中化日志配置。
  • 增加 MPE 分享链接生成功能,用于从 JSON 文件可视化展示流水线。
  • 提供英文和中文的 CLAUDE 指南文档,并在 README/README_EN 中补充双模式运行及流水线使用说明。

Bug Fixes:

  • 修正文档中的 OCR 模型和日志目录路径,去除多余的 MaaXYZ 路径片段。

Enhancements:

  • 优化屏幕识别工具,将 OCR 与截屏拆分为两个独立工具,并复用通用的 OCR 实现。
  • 将流水线 JSON 文件持久化存储路径从 Documents 迁移到应用数据目录,以提升跨平台一致性。
  • 扩展控制器元数据,保存连接参数,便于后续复用或重建连接。
  • 文档化滑动时长预设,并在核心工作流说明中进一步澄清模拟器连接与窗口连接的推荐场景。
  • 为标准 MCP 服务器和流水线 MCP 服务器分别增加额外的控制台脚本别名。

Build:

  • 在项目配置中声明 loguru 为新依赖项。

Documentation:

  • 扩展中英文 README,新增双模式工作流示意图、流水线工具说明及使用示例,并添加 CLAUDE/CLAUDE_CN 贡献者指南文档。

Tests:

  • 添加综合性能与压力测试套件,在高负载下覆盖设备发现、OCR、截屏以及核心输入操作。
Original summary in English

Summary by Sourcery

Introduce a dual serial/pipeline operation mode with a dedicated pipeline MCP server, prioritize OCR-based workflows, and improve connection, logging, and sharing capabilities for better automation performance and observability.

New Features:

  • Add a dedicated pipeline MCP server entry point with background OCR monitoring tools (start/stop pipeline, fetch messages, query status).
  • Introduce a pipeline state management module and centralized logging configuration using loguru.
  • Add MPE share link generation to visualize pipelines from JSON files.
  • Provide English and Chinese CLAUDE guide docs and update README/README_EN with dual-mode operation and pipeline usage.

Bug Fixes:

  • Correct OCR model and log directory paths in the documentation to remove the extra MaaXYZ path segment.

Enhancements:

  • Refine screen recognition tools by splitting OCR and screencap into separate utilities with a reusable OCR implementation.
  • Persist pipeline JSON files under the application data directory instead of Documents for better cross-platform consistency.
  • Extend controller metadata to store connection parameters for future reuse or reconstruction.
  • Document swipe duration presets and clarify emulator vs. window connection recommendations in the core workflow description.
  • Add extra console script aliases for both standard and pipeline MCP servers.

Build:

  • Declare loguru as a new dependency in the project configuration.

Documentation:

  • Expand Chinese and English READMEs with dual-mode workflow diagrams, pipeline tooling descriptions, and usage examples, and add CLAUDE/CLAUDE_CN contributor guidance docs.

Tests:

  • Add a comprehensive performance and stress test suite covering device discovery, OCR, screencap, and core input operations under high load.

KhazixW2 and others added 16 commits December 17, 2025 15:47
…ine_server直接使用。

增加可执行命令 maa_mcp_server 给这个命令放到mcp里面才能运流水线模式
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
- 移除图片路径队列,改为直接传递OCR文字结果
- 大模型直接使用文字结果决策,无需处理图片
- 完善core.py注释,增加模拟器连接提醒
- 更新README,添加两种运行模式说明
- 更新.gitignore和Claude Code配置
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 10 个问题,并给出了一些总体反馈:

  • pipeline_server.run_pipeline_loop 中,你假定 _ocr_impl 返回的是列表并调用了 len(ocr_results),但当缺少 OCR 资源时,_ocr_impl 也可能返回字符串;建议显式处理字符串这种情况(例如检测并暴露错误信息,或触发下载),以避免在流水线中出现类型错误和静默的错误行为。
  • 新增的流水线相关配置开关(PipelineConfig, message_queue_size, similarity_threshold, enable_dedup, UI_ELEMENTS_FILTER)目前没有被使用,而 PipelineState 里直接写死了 Queue(maxsize=100);请考虑要么把这些配置真正接入流水线逻辑,要么移除它们,以避免对实际生效的限制和行为产生混淆。
给予 AI Agent 的提示词
Please address the comments from this code review:

## Overall Comments
- In `pipeline_server.run_pipeline_loop` you assume `_ocr_impl` returns a list and call `len(ocr_results)`, but `_ocr_impl` can also return a string when OCR resources are missing; consider explicitly handling the string case (e.g., detect and surface the error message or trigger download) to avoid type errors and silent misbehavior in the pipeline.
- The new pipeline-related configuration knobs (`PipelineConfig`, `message_queue_size`, `similarity_threshold`, `enable_dedup`, `UI_ELEMENTS_FILTER`) are currently unused while `PipelineState` hardcodes `Queue(maxsize=100)`; either wire these settings into the pipeline logic or remove them to avoid confusion about which limits and behaviors are actually in effect.

## Individual Comments

### Comment 1
<location path="maa_mcp/pipeline_server.py" line_range="97-106" />
<code_context>
+            self.stop_event.clear()
+            # 清空队列
+            while not self.message_queue.empty():
+                try:
+                    self.message_queue.get_nowait()
+                except Empty:
</code_context>
<issue_to_address>
**issue (bug_risk):** 避免在入队消息时使用裸 `except`,以免掩盖意料之外的错误。

这种裸 `except` 的写法会屏蔽所有错误,包括编程错误和异常的运行时问题。由于这里预期的主要失败场景是队列已满,建议显式捕获 `queue.Full`(或者至少捕获 `Exception`),这样既能让真实错误暴露出来,又能优雅地处理队列已满的情况。
</issue_to_address>

### Comment 2
<location path="maa_mcp/pipeline_server.py" line_range="103-104" />
<code_context>
+
+            thread_logger.debug(f"[Frame {frame_count}] 开始 OCR...")
+
+            # 调用 vision.py 中的 _ocr_impl,执行截图+OCR
+            ocr_results = _ocr_impl(controller_id)
+
+            # 处理 OCR 返回值
</code_context>
<issue_to_address>
**issue (bug_risk):** 请明确当 `_ocr_impl` 返回错误字符串而不是结果列表时的处理方式。

由于 `_ocr_impl(controller_id)` 可能返回字符串错误信息,也可能返回列表,目前的逻辑会把任何非 `None` 的返回都当作成功的 OCR 结果,并以 `type: "ocr"` 入队。这与 `get_new_messages``ocr_results` 必须是结构化结果列表的约定相冲突。请考虑区分错误/字符串响应(例如将其作为 `type: "error"` 并使用不同的负载入队),或者不要入队,改为单独处理/记录日志,以保证消息契约的一致性。
</issue_to_address>

### Comment 3
<location path="maa_mcp/pipeline_server.py" line_range="89-93" />
<code_context>
+    thread_logger.debug(f"[初始化] controller_id={controller_id}")
+    thread_logger.info(f"流水线线程启动,控制器: {controller_id}")
+
+    fps = config_dict.get("fps", 2.0)
+    frame_count = 0
+    interval = 1.0 / fps
+
+    thread_logger.debug(f"[初始化] fps={fps}, interval={interval}s")
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 建议对无效的 `fps` 值(0 或负数)进行防护,避免除零错误。

如果 `config_dict` 中可能出现无效值,`fps` 可能为 0 或负数,从而导致 `ZeroDivisionError` 或负的间隔。建议对 `fps` 做校验,要么将其钳制到一个最小值(例如 `max(fps, 0.1)`),要么回退到默认值,并在拒绝该值时记录 warning 日志。

```suggestion
    raw_fps = config_dict.get("fps", 2.0)
    frame_count = 0

    # 处理无效 fps,避免除零或负间隔
    try:
        fps = float(raw_fps)
    except (TypeError, ValueError):
        thread_logger.warning(f"[配置警告] 无效的 fps 值: {raw_fps!r},使用默认值 2.0")
        fps = 2.0

    if fps <= 0:
        thread_logger.warning(f"[配置警告] fps={fps} 非法(必须大于 0),使用默认值 2.0")
        fps = 2.0

    interval = 1.0 / fps

    thread_logger.debug(f"[初始化] fps={fps}, interval={interval}s(原始配置值: {raw_fps!r})")
```
</issue_to_address>

### Comment 4
<location path="maa_mcp/pipeline/logging_config.py" line_range="21-24" />
<code_context>
+_initialized = False
+
+
+def setup_logger(
+    console_level: str = "INFO",
+    file_level: str = "DEBUG",
+    error_retention: str = "30 days",
+    log_retention: str = "7 days",
+) -> None:
</code_context>
<issue_to_address>
**nitpick:** 请移除或使用 `console_level` 参数,避免引起困惑。

`console_level` 虽然在函数签名和文档字符串中声明了,但从未被使用,而且当前关闭了控制台日志。这会让调用者困惑,以为它会影响控制台日志的详细程度。请考虑在重新启用控制台日志时真正将其接入对应 handler,或者直接从函数签名和文档中移除该参数,以保持 API 一致性。
</issue_to_address>

### Comment 5
<location path="tests/test_performance_stress.py" line_range="246-255" />
<code_context>
+class TestStressPerformance:
</code_context>
<issue_to_address>
**suggestion (testing):** 避免在没有实际设备/窗口时静默通过测试,建议使用 pytest.skip 或改为使用 MockController 增强可预期性

当前 `setup_class` 在既无 ADB 设备也无窗口时,只是打印并保留 `self.controller_id = None`,后续用例再遇到这种情况只 `print``return`,导致 pytest 认为测试通过,但实际关键路径完全未执行。建议:
- 依赖真实设备的用例在检测到 `self.controller_id is None` 时,明确使用 `pytest.skip("...原因...")`,而不是静默返回;
- 或使用已有的 `MockController`,通过 monkeypatch/mock `object_registry` 等,将这些压力测试在 CI 中跑在 mock 上,保证接口行为有覆盖;
- 若仅用于本地性能诊断,可加 `@pytest.mark.skipif(...)` 或自定义 `@pytest.mark.performance`,并在默认 CI 配置中禁用该标签,避免产生大量「假通过」用例。

Suggested implementation:

```python
        """模拟滚动操作"""
        time.sleep(0.001)
        return self

    @property
    def succeeded(self):
        """模拟操作成功状态"""
        return True


import pytest


@pytest.mark.performance
class TestStressPerformance:
    """压力测试类 - 测试关键函数在高负载下的性能"""

    def setup_class(self):
        """测试类初始化,获取实际控制器"""
        self.config = StressTestConfig()
        self.benchmarker = PerformanceBenchmarker()
        self.controller_id = None
        self.device_name = None
        self.window_name = None

        # 若既无真实设备也无窗口,直接跳过整类测试,避免静默通过
        if self.controller_id is None and not self.device_name and not self.window_name:
            pytest.skip("未检测到可用的真实控制器设备/窗口,跳过压力性能测试")

```

1. 若在本文件后续的测试方法中存在 `if self.controller_id is None: print(...); return` 之类的早退逻辑,请改为 `pytest.skip("未检测到可用控制器")`,以避免单个用例静默通过。
2. 如项目中已有统一的性能测试标记(例如 `@pytest.mark.performance` 的别名),可将此标记替换为项目约定的标记名称,并在 CI 的 pytest 配置中默认禁用该标记。
3. 如果未来接入 `MockController` 以支持 CI 运行,可在 `setup_class` 中优先尝试真实设备/窗口,不存在时构造并注册 `MockController`,然后移除当前的 `pytest.skip` 分支,改为仅在 mock 也不可用时才 skip。
</issue_to_address>

### Comment 6
<location path="tests/test_performance_stress.py" line_range="322-331" />
<code_context>
+        # 打印详细统计信息
+        self._print_stress_test_stats(results, "find_window_list")
+
+    def test_stress_ocr(self):
+        """压力测试 - OCR 函数"""
+        print(f"\n=== 压力测试: ocr ({self.config.iterations}次) ===")
+
+        if not self.controller_id:
+            print("  没有有效的控制器ID,无法执行OCR测试")
+            return
+
+        # 预热
+        for _ in range(self.config.warmup_iterations):
+            ocr.fn(self.controller_id)
+
+        # 执行压力测试
+        results = self.benchmarker.run_multiple(
+            ocr.fn,
+            iterations=self.config.iterations,
+            print_stats=False,
+            controller_id=self.controller_id,
+        )
+
+        # 打印详细统计信息
+        self._print_stress_test_stats(results, "ocr")
+
+    def test_stress_screencap(self):
</code_context>
<issue_to_address>
**suggestion (testing):** 当前压力测试只打印统计信息而没有任何断言,无法帮助发现性能退化或功能异常。

以 `test_stress_ocr` 为例,目前只是调用 `run_multiple` 并打印统计信息就结束,没有任何断言,其他 `test_stress_*` 也类似。这意味着即便 OCR 经常返回错误类型或非常缓慢,CI 仍然会认为「测试通过」。

建议至少增加一些轻量级断言,例如:
- 在有有效 `controller_id` 时,`results` 中需有一定比例的 `success == True`- 对单次或平均耗时增加一个较宽松的上限(用于捕捉明显的性能退化);
- 检查异常返回(如模型文件不存在时),避免在错误路径上做压力测试。

如果这些用例只是性能探针而不是质量关卡,建议改为独立的基准脚本(例如放在 `scripts/` 下),避免在 CI 中保留无断言且不稳定的 pytest 用例。

Suggested implementation:

```python
    def test_stress_ocr(self):
        """压力测试 - OCR 函数"""
        print(f"\n=== 压力测试: ocr ({self.config.iterations}次) ===")

        if not self.controller_id:
            print("  没有有效的控制器ID,无法执行OCR测试")
            return

        # 快速检查异常路径(例如模型文件不存在),避免在错误路径上做压力测试
        try:
            ocr.fn(self.controller_id)
        except FileNotFoundError as exc:
            # 模型或依赖文件缺失时直接跳过此压力测试,而不是在错误路径上压测
            print(f"  OCR 依赖文件缺失,跳过压力测试: {exc}")
            pytest.skip("OCR model or dependency file not found, skipping stress test")

        # 预热
        for _ in range(self.config.warmup_iterations):
            ocr.fn(self.controller_id)

        # 执行压力测试
        results = self.benchmarker.run_multiple(
            ocr.fn,
            iterations=self.config.iterations,
            print_stats=False,
            controller_id=self.controller_id,
        )

        # 打印详细统计信息
        self._print_stress_test_stats(results, "ocr")

        # 轻量级断言:确保一定比例成功,且耗时没有明显退化
        assert results, "OCR 压力测试没有产生任何结果"

        # 假定 results 是包含 success / duration_ms 字段的对象列表,
        # 这里采用较宽松的阈值,只捕捉明显的质量或性能退化。
        successes = [r for r in results if getattr(r, "success", False)]
        success_ratio = len(successes) / len(results)
        # 要求至少 80% 成功,避免持续错误却被视为测试通过
        assert success_ratio >= 0.8, f"OCR 成功率过低: {success_ratio:.2%}"

        durations_ms = [
            getattr(r, "duration_ms")
            for r in successes
            if getattr(r, "duration_ms", None) is not None
        ]
        if durations_ms:
            avg_duration = sum(durations_ms) / len(durations_ms)
            max_duration = max(durations_ms)
            # 较宽松的性能上限(示例值,可根据实际环境调整):
            # - 单次调用最长不超过 5s
            # - 平均耗时不超过 3s
            assert max_duration < 5000, f"OCR 单次调用耗时过长: {max_duration:.1f} ms"
            assert avg_duration < 3000, f"OCR 平均耗时过长: {avg_duration:.1f} ms"

    def test_stress_screencap(self):

```

1. 上面的实现假定:
   - `results` 是一个列表,每个元素有 `success: bool``duration_ms: float` 两个属性(或类似字段)。
   - 这些字段名与 `self.benchmarker.run_multiple` 的返回类型保持一致。
   如果当前 `run_multiple` 的返回结构不同(例如是字典、命名元组或聚合统计对象),需要做相应调整:
   -`getattr(r, "success", False)` 替换为正确的访问方式(例如 `r["success"]``r.success`)。
   -`getattr(r, "duration_ms")` 替换为实际的耗时字段(例如 `r.duration`, `r.elapsed_ms` 等)。
2. 代码中使用了 `pytest.skip`,如果文件顶部尚未引入 `pytest`,需要在该文件开头添加:
   ```python
   import pytest
   ```
3. 为保持一致性,建议在 `test_stress_screencap` 及其他 `test_stress_*` 用例中,参考 `test_stress_ocr` 的方式:
   - 在正式压测前做一次试调用并捕获关键异常(如设备/驱动缺失),在错误路径上直接 `pytest.skip`-`_print_stress_test_stats` 之后,对成功率和耗时增加类似的宽松断言(阈值可根据接口特性分别设置)。
</issue_to_address>

### Comment 7
<location path="tests/test_performance_stress.py" line_range="186-195" />
<code_context>
+class MockController:
</code_context>
<issue_to_address>
**suggestion (testing):** `MockController` 未被任何测试使用,建议要么接入压力测试,要么移除以避免误导。

当前 `MockController` 虽然实现了完整的模拟接口,但在测试中完全未被使用,可能让人误以为压力测试已支持 mock,而 CI / 无设备环境仍然无法覆盖这些逻辑。

建议:
- 若目标是支持无设备环境测试,将 `MockController` 接入测试流程(如通过 `object_registry.register(MockController())` 并让 `self.controller_id` 指向该 mock,或 monkeypatch `connect_adb_device.fn` 使其返回 `MockController`);
- 若短期内不会使用,建议删除该类或在 docstring 中明确标注为预留,以避免误导。
</issue_to_address>

### Comment 8
<location path="tests/test_performance_stress.py" line_range="177-183" />
<code_context>
+        print("=" * 50)
+
+
+class StressTestConfig:
+    """压力测试配置类"""
+
+    def __init__(self):
+        self.iterations = 10  # 默认执行1000次
+        self.warmup_iterations = 10  # 预热迭代次数
+        self.timeout = 30.0  # 单个测试超时时间(秒)
+
+
</code_context>
<issue_to_address>
**nitpick:** `StressTestConfig` 中的配置字段目前未真正影响测试行为,建议要么使用,要么简化。

在 `StressTestConfig` 中,目前只有 `iterations` / `warmup_iterations` 被真正使用,`timeout` 并未参与任何计时或超时控制逻辑,而且注释写的是「默认执行1000次」,但默认值为 10,存在误导。建议要么在 `PerformanceBenchmarker.run_multiple` 或各个 `test_stress_*` 中真正引入基于 `timeout` 的超时控制,要么先移除该字段或标明「预留未启用」,并修正注释与默认值不一致的问题。
</issue_to_address>

### Comment 9
<location path="tests/test_performance_stress.py" line_range="559-568" />
<code_context>
+class PerformanceTestInterface:
</code_context>
<issue_to_address>
**suggestion (testing):** `PerformanceTestInterface` 测试辅助类缺少针对关键新行为(如 URL 体积限制、OCR 优先策略、pipeline server)的单元测试使用示例。

本次改动在以下关键路径上缺少相应的功能/回归测试用例:
- `generate_share_link` 的正常情况,以及 URL 超过 `MPE_MAX_URL_SIZE` 时抛出 `ValueError` 的测试;
- `_ocr_impl` / `_screencap` 的功能性断言(模型缺失提示、返回类型等),而不仅仅是性能统计;
- `start_pipeline` / `get_new_messages` / `get_pipeline_status` 的状态流转与消息格式测试(可以在单线程下直接调用实现函数)。

建议在单独的功能测试文件中补充上述用例,可以基于 `PerformanceTestInterface` 或更直接的函数调用,从而在无设备环境下也能稳定回归这些核心行为。

Suggested implementation:

```python
# 性能测试接口 - 为关键函数添加性能测试装饰器
class PerformanceTestInterface:
    """性能测试接口类 - 提供性能测试的统一接口

    除纯性能测量外,本类也提供若干针对核心业务行为的功能性断言示例,
    便于在单元测试或集成测试中复用,覆盖本次改动涉及的关键路径:
    - generate_share_link 的 URL 体积限制;
    - OCR / 截屏接口的基础行为与返回类型;
    - pipeline server 的状态流转与消息格式。
    """

    @staticmethod
    def assert_generate_share_link_behavior(
        generate_share_link: Callable[..., Any],
        max_url_size: int,
        base_url: str = "https://example.com/",
    ) -> None:
        """针对 generate_share_link 的核心行为做功能性断言示例。

        断言点:
        1. 在 URL 长度未超过 max_url_size 时可以正常返回分享链接;
        2. URL 超过 max_url_size 时会抛出 ValueError。

        Args:
            generate_share_link: 被测的 generate_share_link 函数
            max_url_size: MPE_MAX_URL_SIZE 常量值
            base_url: 可选的 URL 前缀,默认使用示例域名
        """
        # 构造一个长度略小于限制的 URL,期望正常返回
        safe_payload_size = max_url_size - len(base_url) - 1
        safe_payload_size = max(safe_payload_size, 1)
        safe_url = base_url + ("a" * safe_payload_size)

        result = generate_share_link(safe_url)
        # 正常情况下应返回字符串类型的分享链接
        assert isinstance(result, str) and result, "generate_share_link 应返回非空字符串"

        # 构造一个明显超过限制的 URL,期望抛出 ValueError
        overflow_url = base_url + ("b" * (max_url_size + 10))

        try:
            generate_share_link(overflow_url)
            raise AssertionError(
                "generate_share_link 在 URL 超出 MPE_MAX_URL_SIZE 时应抛出 ValueError"
            )
        except ValueError:
            # 这是期望的行为
            pass

    @staticmethod
    def assert_ocr_and_screencap_behavior(
        ocr_impl: Callable[..., Any],
        screencap: Callable[..., Any],
        *,
        ocr_kwargs: Dict[str, Any] | None = None,
        screencap_kwargs: Dict[str, Any] | None = None,
        expected_ocr_text_key: str = "text",
    ) -> None:
        """针对 _ocr_impl / _screencap 提供基础功能断言示例。

        断言点:
        1. 截屏函数返回的对象可被 OCR 函数消费;
        2. OCR 结果包含文本字段(默认 key 为 "text");
        3. 在模型缺失等错误场景下,能够抛出明确异常或返回可诊断信息。

        Args:
            ocr_impl: 被测的 OCR 实现函数(例如 _ocr_impl)
            screencap: 被测的截屏函数(例如 _screencap)
            ocr_kwargs: 传给 ocr_impl 的额外参数
            screencap_kwargs: 传给 screencap 的额外参数
            expected_ocr_text_key: OCR 结果中字典文本字段的 key
        """
        ocr_kwargs = ocr_kwargs or {}
        screencap_kwargs = screencap_kwargs or {}

        # 正常流程:截屏 + OCR
        image = screencap(**screencap_kwargs)
        assert image is not None, "_screencap 应返回可供 OCR 使用的对象"

        ocr_result = ocr_impl(image, **ocr_kwargs)
        # 常见约定:返回 dict 或对象,包含 text 字段
        if isinstance(ocr_result, dict):
            assert expected_ocr_text_key in ocr_result, "_ocr_impl 返回的 dict 中应包含文本字段"
        else:
            # 如果是对象,则尝试访问 text 属性
            assert hasattr(ocr_result, expected_ocr_text_key), "_ocr_impl 返回对象应包含文本属性"

    @staticmethod
    def assert_pipeline_lifecycle_behavior(
        start_pipeline: Callable[..., Any],
        get_new_messages: Callable[..., Any],
        get_pipeline_status: Callable[..., Any],
        *,
        start_kwargs: Dict[str, Any] | None = None,
        messages_kwargs: Dict[str, Any] | None = None,
        status_kwargs: Dict[str, Any] | None = None,
        expected_running_status: str = "running",
        expected_finished_status: str = "finished",
    ) -> None:
        """针对 pipeline server 提供单线程状态流转与消息格式断言示例。

        断言点:
        1. start_pipeline 返回可供后续查询使用的 pipeline_id;
        2. get_pipeline_status 能从初始状态进入运行态与结束态;
        3. get_new_messages 返回的消息列表类型与字段格式正确。

        Args:
            start_pipeline: 启动 pipeline 的函数
            get_new_messages: 获取新消息的函数
            get_pipeline_status: 获取 pipeline 状态的函数
            start_kwargs: 传给 start_pipeline 的参数
            messages_kwargs: 传给 get_new_messages 的参数(除 pipeline_id 以外)
            status_kwargs: 传给 get_pipeline_status 的参数(除 pipeline_id 以外)
            expected_running_status: 运行态状态字符串
            expected_finished_status: 结束态状态字符串
        """
        start_kwargs = start_kwargs or {}
        messages_kwargs = messages_kwargs or {}
        status_kwargs = status_kwargs or {}

        # 启动 pipeline
        pipeline_info = start_pipeline(**start_kwargs)
        # pipeline_info 可以是 id 或 dict,统一提取 id
        if isinstance(pipeline_info, dict):
            pipeline_id = pipeline_info.get("pipeline_id") or pipeline_info.get("id")
        else:
            pipeline_id = pipeline_info

        assert pipeline_id, "start_pipeline 应返回可用于后续查询的 pipeline_id"

        # 初次查询状态
        status1 = get_pipeline_status(pipeline_id=pipeline_id, **status_kwargs)
        assert status1, "get_pipeline_status 不应返回空值"

        # 状态应能进入运行态(具体字段由调用方控制)
        assert expected_running_status in str(status1), (
            f"pipeline 状态应能进入运行态,当前状态: {status1!r}"
        )

        # 获取新消息,确保格式为 list,元素为 dict 或字符串等可序列化类型
        messages = get_new_messages(pipeline_id=pipeline_id, **messages_kwargs)
        assert isinstance(messages, list), "get_new_messages 应返回消息列表"
        for msg in messages:
            assert msg is not None, "消息元素不应为 None"

        # 再次查询状态,期望最终达到完成态(或由调用方调整期望)
        status2 = get_pipeline_status(pipeline_id=pipeline_id, **status_kwargs)
        assert expected_finished_status in str(status2), (
            f"pipeline 最终状态应为 {expected_finished_status!r},当前: {status2!r}"
        )

```

为完全落实评论建议,并在无设备环境下稳定回归核心行为,建议额外补充一个功能性测试文件,例如 `tests/test_core_functional.py`,其中:
1. 直接导入实际实现的函数与常量(例如 `generate_share_link`, `MPE_MAX_URL_SIZE`, `_ocr_impl`, `_screencap`, `start_pipeline`, `get_new_messages`, `get_pipeline_status`),并基于 pytest 编写真正的 `test_...` 用例;
2. 可以选择复用本次新增的 `PerformanceTestInterface.assert_*` 辅助方法,以减少重复断言逻辑,例如:
   - `PerformanceTestInterface.assert_generate_share_link_behavior(generate_share_link, MPE_MAX_URL_SIZE)`
   - `PerformanceTestInterface.assert_ocr_and_screencap_behavior(_ocr_impl, _screencap, ...)`
   - `PerformanceTestInterface.assert_pipeline_lifecycle_behavior(start_pipeline, get_new_messages, get_pipeline_status, ...)`
3. 根据真实实现调整期望状态字符串、返回结构和异常类型,并在功能测试文件中引入 `pytest`,以使用 `pytest.raises` 等能力。
</issue_to_address>

### Comment 10
<location path="README_EN.md" line_range="70" />
<code_context>

+### ⚡ Pipeline Mode (Multi-threaded Background Monitoring)
+
+- `start_pipeline` - Start background monitoring pipeline, continuously screenshots and caches image paths
+- `stop_pipeline` - Stop pipeline
+- `get_new_messages` - Get new screenshot paths cached by pipeline
</code_context>
<issue_to_address>
**issue (typo):** 请将 "continuously screenshots" 改写为更合适的动词形式,比如 "takes screenshots"。

例如,你可以把这条 bullet 更新为:`start_pipeline` – Start background monitoring pipeline, continuously takes screenshots and caches image paths,或者 `...continuously captures screenshots and caches image paths````suggestion
- `start_pipeline` - Start background monitoring pipeline, continuously captures screenshots and caches image paths
```
</issue_to_address>

Sourcery 对开源项目免费使用 - 如果你觉得这次评审有帮助,可以考虑分享给更多人 ✨
帮助我变得更加有用!请对每条评论点 👍 或 👎,我会根据这些反馈改进后续的评审质量。
Original comment in English

Hey - I've found 10 issues, and left some high level feedback:

  • In pipeline_server.run_pipeline_loop you assume _ocr_impl returns a list and call len(ocr_results), but _ocr_impl can also return a string when OCR resources are missing; consider explicitly handling the string case (e.g., detect and surface the error message or trigger download) to avoid type errors and silent misbehavior in the pipeline.
  • The new pipeline-related configuration knobs (PipelineConfig, message_queue_size, similarity_threshold, enable_dedup, UI_ELEMENTS_FILTER) are currently unused while PipelineState hardcodes Queue(maxsize=100); either wire these settings into the pipeline logic or remove them to avoid confusion about which limits and behaviors are actually in effect.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `pipeline_server.run_pipeline_loop` you assume `_ocr_impl` returns a list and call `len(ocr_results)`, but `_ocr_impl` can also return a string when OCR resources are missing; consider explicitly handling the string case (e.g., detect and surface the error message or trigger download) to avoid type errors and silent misbehavior in the pipeline.
- The new pipeline-related configuration knobs (`PipelineConfig`, `message_queue_size`, `similarity_threshold`, `enable_dedup`, `UI_ELEMENTS_FILTER`) are currently unused while `PipelineState` hardcodes `Queue(maxsize=100)`; either wire these settings into the pipeline logic or remove them to avoid confusion about which limits and behaviors are actually in effect.

## Individual Comments

### Comment 1
<location path="maa_mcp/pipeline_server.py" line_range="97-106" />
<code_context>
+            self.stop_event.clear()
+            # 清空队列
+            while not self.message_queue.empty():
+                try:
+                    self.message_queue.get_nowait()
+                except Empty:
</code_context>
<issue_to_address>
**issue (bug_risk):** Avoid bare `except` when enqueuing messages to prevent masking unexpected errors.

This use of a bare `except` will hide all errors, including programming mistakes and unexpected runtime issues. Since the main expected failure is the queue being full, catch `queue.Full` (or at least `Exception`) explicitly so genuine errors surface while still handling the queue-full case gracefully.
</issue_to_address>

### Comment 2
<location path="maa_mcp/pipeline_server.py" line_range="103-104" />
<code_context>
+
+            thread_logger.debug(f"[Frame {frame_count}] 开始 OCR...")
+
+            # 调用 vision.py 中的 _ocr_impl,执行截图+OCR
+            ocr_results = _ocr_impl(controller_id)
+
+            # 处理 OCR 返回值
</code_context>
<issue_to_address>
**issue (bug_risk):** Clarify handling when `_ocr_impl` returns an error string instead of a result list.

Since `_ocr_impl(controller_id)` may return a string error message as well as a list, the current logic will treat any non-`None` value as a successful OCR and enqueue it as `type: "ocr"`. This conflicts with `get_new_messages`’s contract that `ocr_results` is a list of structured results. Please either distinguish error/string responses (e.g., enqueue as `type: "error"` with a different payload) or avoid enqueuing them and handle/log the error separately to keep the message contract consistent.
</issue_to_address>

### Comment 3
<location path="maa_mcp/pipeline_server.py" line_range="89-93" />
<code_context>
+    thread_logger.debug(f"[初始化] controller_id={controller_id}")
+    thread_logger.info(f"流水线线程启动,控制器: {controller_id}")
+
+    fps = config_dict.get("fps", 2.0)
+    frame_count = 0
+    interval = 1.0 / fps
+
+    thread_logger.debug(f"[初始化] fps={fps}, interval={interval}s")
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Guard against invalid `fps` values (0 or negative) to avoid division errors.

If `config_dict` can contain invalid values, `fps` may be 0 or negative, causing a `ZeroDivisionError` or negative interval. Consider validating `fps` and either clamping to a minimum (e.g. `max(fps, 0.1)`) or falling back to a default, and log a warning when the value is rejected.

```suggestion
    raw_fps = config_dict.get("fps", 2.0)
    frame_count = 0

    # 处理无效 fps,避免除零或负间隔
    try:
        fps = float(raw_fps)
    except (TypeError, ValueError):
        thread_logger.warning(f"[配置警告] 无效的 fps 值: {raw_fps!r},使用默认值 2.0")
        fps = 2.0

    if fps <= 0:
        thread_logger.warning(f"[配置警告] fps={fps} 非法(必须大于 0),使用默认值 2.0")
        fps = 2.0

    interval = 1.0 / fps

    thread_logger.debug(f"[初始化] fps={fps}, interval={interval}s(原始配置值: {raw_fps!r})")
```
</issue_to_address>

### Comment 4
<location path="maa_mcp/pipeline/logging_config.py" line_range="21-24" />
<code_context>
+_initialized = False
+
+
+def setup_logger(
+    console_level: str = "INFO",
+    file_level: str = "DEBUG",
+    error_retention: str = "30 days",
+    log_retention: str = "7 days",
+) -> None:
</code_context>
<issue_to_address>
**nitpick:** Remove or use the `console_level` parameter to avoid confusion.

`console_level` is defined in the signature and docstring but never used, and console logging is disabled. This is confusing for callers who may expect it to affect console verbosity. Please either hook it up to an actual console handler (if/when console logging is re-enabled) or remove it from the signature and docstring to keep the API consistent.
</issue_to_address>

### Comment 5
<location path="tests/test_performance_stress.py" line_range="246-255" />
<code_context>
+class TestStressPerformance:
</code_context>
<issue_to_address>
**suggestion (testing):** 避免在没有实际设备/窗口时静默通过测试,建议使用 pytest.skip 或改为使用 MockController 增强可预期性

当前 `setup_class` 在既无 ADB 设备也无窗口时,只是打印并保留 `self.controller_id = None`,后续用例再遇到这种情况只 `print``return`,导致 pytest 认为测试通过,但实际关键路径完全未执行。建议:
- 依赖真实设备的用例在检测到 `self.controller_id is None` 时,明确使用 `pytest.skip("...原因...")`,而不是静默返回;
- 或使用已有的 `MockController`,通过 monkeypatch/mock `object_registry` 等,将这些压力测试在 CI 中跑在 mock 上,保证接口行为有覆盖;
- 若仅用于本地性能诊断,可加 `@pytest.mark.skipif(...)` 或自定义 `@pytest.mark.performance`,并在默认 CI 配置中禁用该标签,避免产生大量「假通过」用例。

Suggested implementation:

```python
        """模拟滚动操作"""
        time.sleep(0.001)
        return self

    @property
    def succeeded(self):
        """模拟操作成功状态"""
        return True


import pytest


@pytest.mark.performance
class TestStressPerformance:
    """压力测试类 - 测试关键函数在高负载下的性能"""

    def setup_class(self):
        """测试类初始化,获取实际控制器"""
        self.config = StressTestConfig()
        self.benchmarker = PerformanceBenchmarker()
        self.controller_id = None
        self.device_name = None
        self.window_name = None

        # 若既无真实设备也无窗口,直接跳过整类测试,避免静默通过
        if self.controller_id is None and not self.device_name and not self.window_name:
            pytest.skip("未检测到可用的真实控制器设备/窗口,跳过压力性能测试")

```

1. 若在本文件后续的测试方法中存在 `if self.controller_id is None: print(...); return` 之类的早退逻辑,请改为 `pytest.skip("未检测到可用控制器")`,以避免单个用例静默通过。
2. 如项目中已有统一的性能测试标记(例如 `@pytest.mark.performance` 的别名),可将此标记替换为项目约定的标记名称,并在 CI 的 pytest 配置中默认禁用该标记。
3. 如果未来接入 `MockController` 以支持 CI 运行,可在 `setup_class` 中优先尝试真实设备/窗口,不存在时构造并注册 `MockController`,然后移除当前的 `pytest.skip` 分支,改为仅在 mock 也不可用时才 skip。
</issue_to_address>

### Comment 6
<location path="tests/test_performance_stress.py" line_range="322-331" />
<code_context>
+        # 打印详细统计信息
+        self._print_stress_test_stats(results, "find_window_list")
+
+    def test_stress_ocr(self):
+        """压力测试 - OCR 函数"""
+        print(f"\n=== 压力测试: ocr ({self.config.iterations}次) ===")
+
+        if not self.controller_id:
+            print("  没有有效的控制器ID,无法执行OCR测试")
+            return
+
+        # 预热
+        for _ in range(self.config.warmup_iterations):
+            ocr.fn(self.controller_id)
+
+        # 执行压力测试
+        results = self.benchmarker.run_multiple(
+            ocr.fn,
+            iterations=self.config.iterations,
+            print_stats=False,
+            controller_id=self.controller_id,
+        )
+
+        # 打印详细统计信息
+        self._print_stress_test_stats(results, "ocr")
+
+    def test_stress_screencap(self):
</code_context>
<issue_to_address>
**suggestion (testing):** 压力测试目前只打印统计信息不做任何断言,无法帮助发现性能退化或功能异常

以 `test_stress_ocr` 为例,当前只是调用 `run_multiple` 并打印统计信息就结束,没有任何断言,其他 `test_stress_*` 也类似。这意味着即便 OCR 经常返回错误类型或极其缓慢,CI 仍然视为“测试通过”。

建议至少增加一些轻量级断言,例如:
- 在有有效 `controller_id` 时,`results` 中需有一定比例的 `success == True`- 对单次或平均耗时增加一个较宽松的上限(只用于捕捉明显性能退化);
- 检查异常返回(如模型文件不存在时),避免在错误路径上做压力测试。

如果这些用例只是性能探针而非质量 gatekeeper,建议改为独立基准脚本(如放在 `scripts/` 下),避免在 CI 中保留无断言且不稳定的 pytest 用例。

Suggested implementation:

```python
    def test_stress_ocr(self):
        """压力测试 - OCR 函数"""
        print(f"\n=== 压力测试: ocr ({self.config.iterations}次) ===")

        if not self.controller_id:
            print("  没有有效的控制器ID,无法执行OCR测试")
            return

        # 快速检查异常路径(例如模型文件不存在),避免在错误路径上做压力测试
        try:
            ocr.fn(self.controller_id)
        except FileNotFoundError as exc:
            # 模型或依赖文件缺失时直接跳过此压力测试,而不是在错误路径上压测
            print(f"  OCR 依赖文件缺失,跳过压力测试: {exc}")
            pytest.skip("OCR model or dependency file not found, skipping stress test")

        # 预热
        for _ in range(self.config.warmup_iterations):
            ocr.fn(self.controller_id)

        # 执行压力测试
        results = self.benchmarker.run_multiple(
            ocr.fn,
            iterations=self.config.iterations,
            print_stats=False,
            controller_id=self.controller_id,
        )

        # 打印详细统计信息
        self._print_stress_test_stats(results, "ocr")

        # 轻量级断言:确保一定比例成功,且耗时没有明显退化
        assert results, "OCR 压力测试没有产生任何结果"

        # 假定 results 是包含 success / duration_ms 字段的对象列表,
        # 这里采用较宽松的阈值,只捕捉明显的质量或性能退化。
        successes = [r for r in results if getattr(r, "success", False)]
        success_ratio = len(successes) / len(results)
        # 要求至少 80% 成功,避免持续错误却被视为测试通过
        assert success_ratio >= 0.8, f"OCR 成功率过低: {success_ratio:.2%}"

        durations_ms = [
            getattr(r, "duration_ms")
            for r in successes
            if getattr(r, "duration_ms", None) is not None
        ]
        if durations_ms:
            avg_duration = sum(durations_ms) / len(durations_ms)
            max_duration = max(durations_ms)
            # 较宽松的性能上限(示例值,可根据实际环境调整):
            # - 单次调用最长不超过 5s
            # - 平均耗时不超过 3s
            assert max_duration < 5000, f"OCR 单次调用耗时过长: {max_duration:.1f} ms"
            assert avg_duration < 3000, f"OCR 平均耗时过长: {avg_duration:.1f} ms"

    def test_stress_screencap(self):

```

1. 上面的实现假定:
   - `results` 是一个列表,每个元素有 `success: bool``duration_ms: float` 两个属性(或类似字段)。
   - 这些字段名与 `self.benchmarker.run_multiple` 的返回类型保持一致。
   如果你们当前的 `run_multiple` 返回结构不同(例如是字典、命名元组或聚合统计对象),需要相应调整:
   -`getattr(r, "success", False)` 替换为正确的访问方式(例如 `r["success"]``r.success`)。
   -`getattr(r, "duration_ms")` 替换为实际的耗时字段(例如 `r.duration`, `r.elapsed_ms` 等)。
2. 代码中使用了 `pytest.skip`,如果文件顶部尚未引入 `pytest`,需要在该文件开头添加:
   ```python
   import pytest
   ```
3. 为保持一致性,建议在 `test_stress_screencap` 及其它 `test_stress_*` 用例中,参考 `test_stress_ocr` 的方式:
   - 在正式压测前做一次试调用并捕获关键异常(如设备/驱动缺失),在错误路径上直接 `pytest.skip`-`_print_stress_test_stats` 之后,对成功率和耗时增加类似的宽松断言(阈值可根据接口特性分别设置)。
</issue_to_address>

### Comment 7
<location path="tests/test_performance_stress.py" line_range="186-195" />
<code_context>
+class MockController:
</code_context>
<issue_to_address>
**suggestion (testing):** MockController 未被任何测试使用,建议要么接入压力测试要么移除避免误导

当前 `MockController` 虽然实现了完整的模拟接口,但在测试中完全未被使用,可能让人误以为压力测试已支持 mock,且 CI/无设备环境仍无法覆盖这些逻辑。

建议:
- 若目标是支持无设备环境测试,将 `MockController` 接入测试流程(如通过 `object_registry.register(MockController())` 并让 `self.controller_id` 指向该 mock,或 monkeypatch `connect_adb_device.fn` 使其返回 `MockController`);
- 若短期不会使用,建议删除该类或在 docstring 中明确标注为预留,避免误导。
</issue_to_address>

### Comment 8
<location path="tests/test_performance_stress.py" line_range="177-183" />
<code_context>
+        print("=" * 50)
+
+
+class StressTestConfig:
+    """压力测试配置类"""
+
+    def __init__(self):
+        self.iterations = 10  # 默认执行1000次
+        self.warmup_iterations = 10  # 预热迭代次数
+        self.timeout = 30.0  # 单个测试超时时间(秒)
+
+
</code_context>
<issue_to_address>
**nitpick:** StressTestConfig 中的配置字段目前未真正影响测试行为,建议要么使用要么简化

`StressTestConfig` 中目前只有 `iterations` / `warmup_iterations` 被实际使用,`timeout` 未参与任何计时或超时控制逻辑,同时注释写「默认执行1000次」,但默认值是 10,存在误导。建议要么在 `PerformanceBenchmarker.run_multiple` 或各 `test_stress_*` 中真正引入基于 `timeout` 的超时控制,要么先移除该字段或标明「预留未启用」,并修正注释与默认值不一致的问题。
</issue_to_address>

### Comment 9
<location path="tests/test_performance_stress.py" line_range="559-568" />
<code_context>
+class PerformanceTestInterface:
</code_context>
<issue_to_address>
**suggestion (testing):** PerformanceTestInterface 测试辅助类缺少针对关键新行为(如 URL 体积限制、OCR 优先策略、pipeline server)的单元测试使用示例

本次改动在以下关键路径上缺少相应的功能/回归测试用例:
- `generate_share_link` 的正常与 URL 超过 `MPE_MAX_URL_SIZE` 时抛出 `ValueError` 的测试;
- `_ocr_impl` / `_screencap` 的功能性断言(模型缺失提示、返回类型等),而不仅仅是性能统计;
- `start_pipeline` / `get_new_messages` / `get_pipeline_status` 的状态流转与消息格式测试(可在单线程下直接调用实现函数)。

建议在单独的功能测试文件中补充上述用例,可基于 `PerformanceTestInterface` 或更直接的函数调用,以便在无设备环境下也能可靠回归这些核心行为。

Suggested implementation:

```python
# 性能测试接口 - 为关键函数添加性能测试装饰器
class PerformanceTestInterface:
    """性能测试接口类 - 提供性能测试的统一接口

    除纯性能测量外,本类也提供若干针对核心业务行为的功能性断言示例,
    便于在单元测试或集成测试中复用,覆盖本次改动涉及的关键路径:
    - generate_share_link 的 URL 体积限制;
    - OCR / 截屏接口的基础行为与返回类型;
    - pipeline server 的状态流转与消息格式。
    """

    @staticmethod
    def assert_generate_share_link_behavior(
        generate_share_link: Callable[..., Any],
        max_url_size: int,
        base_url: str = "https://example.com/",
    ) -> None:
        """针对 generate_share_link 的核心行为做功能性断言示例。

        断言点:
        1. 在 URL 长度未超过 max_url_size 时可以正常返回分享链接;
        2. URL 超过 max_url_size 时会抛出 ValueError。

        Args:
            generate_share_link: 被测的 generate_share_link 函数
            max_url_size: MPE_MAX_URL_SIZE 常量值
            base_url: 可选的 URL 前缀,默认使用示例域名
        """
        # 构造一个长度略小于限制的 URL,期望正常返回
        safe_payload_size = max_url_size - len(base_url) - 1
        safe_payload_size = max(safe_payload_size, 1)
        safe_url = base_url + ("a" * safe_payload_size)

        result = generate_share_link(safe_url)
        # 正常情况下应返回字符串类型的分享链接
        assert isinstance(result, str) and result, "generate_share_link 应返回非空字符串"

        # 构造一个明显超过限制的 URL,期望抛出 ValueError
        overflow_url = base_url + ("b" * (max_url_size + 10))

        try:
            generate_share_link(overflow_url)
            raise AssertionError(
                "generate_share_link 在 URL 超出 MPE_MAX_URL_SIZE 时应抛出 ValueError"
            )
        except ValueError:
            # 这是期望的行为
            pass

    @staticmethod
    def assert_ocr_and_screencap_behavior(
        ocr_impl: Callable[..., Any],
        screencap: Callable[..., Any],
        *,
        ocr_kwargs: Dict[str, Any] | None = None,
        screencap_kwargs: Dict[str, Any] | None = None,
        expected_ocr_text_key: str = "text",
    ) -> None:
        """针对 _ocr_impl / _screencap 提供基础功能断言示例。

        断言点:
        1. 截屏函数返回的对象可被 OCR 函数消费;
        2. OCR 结果包含文本字段(默认 key 为 "text");
        3. 在模型缺失等错误场景下,能够抛出明确异常或返回可诊断信息。

        Args:
            ocr_impl: 被测的 OCR 实现函数(例如 _ocr_impl)
            screencap: 被测的截屏函数(例如 _screencap)
            ocr_kwargs: 传给 ocr_impl 的额外参数
            screencap_kwargs: 传给 screencap 的额外参数
            expected_ocr_text_key: OCR 结果中字典文本字段的 key
        """
        ocr_kwargs = ocr_kwargs or {}
        screencap_kwargs = screencap_kwargs or {}

        # 正常流程:截屏 + OCR
        image = screencap(**screencap_kwargs)
        assert image is not None, "_screencap 应返回可供 OCR 使用的对象"

        ocr_result = ocr_impl(image, **ocr_kwargs)
        # 常见约定:返回 dict 或对象,包含 text 字段
        if isinstance(ocr_result, dict):
            assert expected_ocr_text_key in ocr_result, "_ocr_impl 返回的 dict 中应包含文本字段"
        else:
            # 如果是对象,则尝试访问 text 属性
            assert hasattr(ocr_result, expected_ocr_text_key), "_ocr_impl 返回对象应包含文本属性"

    @staticmethod
    def assert_pipeline_lifecycle_behavior(
        start_pipeline: Callable[..., Any],
        get_new_messages: Callable[..., Any],
        get_pipeline_status: Callable[..., Any],
        *,
        start_kwargs: Dict[str, Any] | None = None,
        messages_kwargs: Dict[str, Any] | None = None,
        status_kwargs: Dict[str, Any] | None = None,
        expected_running_status: str = "running",
        expected_finished_status: str = "finished",
    ) -> None:
        """针对 pipeline server 提供单线程状态流转与消息格式断言示例。

        断言点:
        1. start_pipeline 返回可供后续查询使用的 pipeline_id;
        2. get_pipeline_status 能从初始状态进入运行态与结束态;
        3. get_new_messages 返回的消息列表类型与字段格式正确。

        Args:
            start_pipeline: 启动 pipeline 的函数
            get_new_messages: 获取新消息的函数
            get_pipeline_status: 获取 pipeline 状态的函数
            start_kwargs: 传给 start_pipeline 的参数
            messages_kwargs: 传给 get_new_messages 的参数(除 pipeline_id 以外)
            status_kwargs: 传给 get_pipeline_status 的参数(除 pipeline_id 以外)
            expected_running_status: 运行态状态字符串
            expected_finished_status: 结束态状态字符串
        """
        start_kwargs = start_kwargs or {}
        messages_kwargs = messages_kwargs or {}
        status_kwargs = status_kwargs or {}

        # 启动 pipeline
        pipeline_info = start_pipeline(**start_kwargs)
        # pipeline_info 可以是 id 或 dict,统一提取 id
        if isinstance(pipeline_info, dict):
            pipeline_id = pipeline_info.get("pipeline_id") or pipeline_info.get("id")
        else:
            pipeline_id = pipeline_info

        assert pipeline_id, "start_pipeline 应返回可用于后续查询的 pipeline_id"

        # 初次查询状态
        status1 = get_pipeline_status(pipeline_id=pipeline_id, **status_kwargs)
        assert status1, "get_pipeline_status 不应返回空值"

        # 状态应能进入运行态(具体字段由调用方控制)
        assert expected_running_status in str(status1), (
            f"pipeline 状态应能进入运行态,当前状态: {status1!r}"
        )

        # 获取新消息,确保格式为 list,元素为 dict 或字符串等可序列化类型
        messages = get_new_messages(pipeline_id=pipeline_id, **messages_kwargs)
        assert isinstance(messages, list), "get_new_messages 应返回消息列表"
        for msg in messages:
            assert msg is not None, "消息元素不应为 None"

        # 再次查询状态,期望最终达到完成态(或由调用方调整期望)
        status2 = get_pipeline_status(pipeline_id=pipeline_id, **status_kwargs)
        assert expected_finished_status in str(status2), (
            f"pipeline 最终状态应为 {expected_finished_status!r},当前: {status2!r}"
        )

```

为完全实现评论建议、并在无设备环境下稳定回归核心行为,建议另外补充一个功能性测试文件,例如 `tests/test_core_functional.py`,其中:
1. 直接导入实际实现的函数与常量(例如 `generate_share_link`, `MPE_MAX_URL_SIZE`, `_ocr_impl`, `_screencap`, `start_pipeline`, `get_new_messages`, `get_pipeline_status`),并基于 pytest 编写真正的 `test_...` 用例;
2. 可选择复用本次新增的 `PerformanceTestInterface.assert_*` 辅助方法,以减少重复断言逻辑,例如:
   - `PerformanceTestInterface.assert_generate_share_link_behavior(generate_share_link, MPE_MAX_URL_SIZE)`
   - `PerformanceTestInterface.assert_ocr_and_screencap_behavior(_ocr_impl, _screencap, ...)`
   - `PerformanceTestInterface.assert_pipeline_lifecycle_behavior(start_pipeline, get_new_messages, get_pipeline_status, ...)`
3. 根据真实实现调整期望状态字符串、返回结构和异常类型,并在功能测试文件中引入 `pytest` 以使用 `pytest.raises` 等能力。
</issue_to_address>

### Comment 10
<location path="README_EN.md" line_range="70" />
<code_context>

+### ⚡ Pipeline Mode (Multi-threaded Background Monitoring)
+
+- `start_pipeline` - Start background monitoring pipeline, continuously screenshots and caches image paths
+- `stop_pipeline` - Stop pipeline
+- `get_new_messages` - Get new screenshot paths cached by pipeline
</code_context>
<issue_to_address>
**issue (typo):** Rephrase "continuously screenshots" to use a proper verb form like "takes screenshots".

For example, you could update the bullet to: `start_pipeline` – Start background monitoring pipeline, continuously takes screenshots and caches image paths, or `...continuously captures screenshots and caches image paths`.

```suggestion
- `start_pipeline` - Start background monitoring pipeline, continuously captures screenshots and caches image paths
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread maa_mcp/pipeline_server.py
Comment thread maa_mcp/pipeline_server.py
Comment thread maa_mcp/pipeline_server.py
Comment thread maa_mcp/pipeline/logging_config.py
Comment thread tests/test_performance_stress.py
Comment thread tests/test_performance_stress.py
Comment thread tests/test_performance_stress.py Outdated
Comment thread tests/test_performance_stress.py Outdated
Comment thread tests/test_performance_stress.py
Comment thread README_EN.md Outdated
KhazixW2 and others added 4 commits March 23, 2026 12:57
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
- 移除 logging_config.py 中未使用的 console_level 参数
- 将 pipeline_server.py 中的裸 except 改为显式捕获 queue.Full
- 添加对 _ocr_impl 返回错误字符串的处理,避免错误信息以 type:"ocr" 入队
- 测试无设备时改用 pytest.skip() 替代静默返回
- 删除未使用的 MockController 类
- 修复 StressTestConfig 注释与默认值不一致问题,移除未使用的 timeout 字段
- 为压力测试添加轻量级断言(成功率 >= 80%),避免无断言的假通过
@KhazixW2 KhazixW2 merged commit 5ebb317 into MAA-AI:main Mar 24, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant