diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7dc2c11..4fb3785 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,4 +34,4 @@ jobs: run: uv run mypy src - name: 运行测试 - run: uv run pytest tests -v --cov=src/uart_mcp --cov-report=term-missing + run: uv run pytest tests -v --cov=src/uart_mcp --cov-report=term-missing --ignore=tests/test_integration_real_port.py diff --git a/pyproject.toml b/pyproject.toml index 7544acf..1a27f75 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,11 +30,12 @@ build-backend = "hatchling.build" packages = ["src/uart_mcp"] [tool.ruff] -line-length = 88 +line-length = 120 target-version = "py313" [tool.ruff.lint] select = ["E", "F", "I", "N", "W"] +ignore = ["E501"] # 忽略行长度检查,使用 line-length 控制 [tool.mypy] python_version = "3.13" @@ -43,6 +44,7 @@ strict = true [tool.pytest.ini_options] pythonpath = ["src"] testpaths = ["tests"] +addopts = "--ignore=tests/test_integration_real_port.py" [dependency-groups] dev = [ diff --git a/src/uart_mcp/config.py b/src/uart_mcp/config.py index 0b4d730..f5a8ec3 100644 --- a/src/uart_mcp/config.py +++ b/src/uart_mcp/config.py @@ -8,15 +8,11 @@ import platform import re import threading +import tomllib from dataclasses import dataclass from pathlib import Path from typing import Any -try: - import tomllib -except ImportError: - import tomli as tomllib # Python < 3.11 兼容 - logger = logging.getLogger(__name__) # 支持的波特率和数据位(从 types.py 导入常量以保持一致性) diff --git a/tests/conftest.py b/tests/conftest.py index 887b1e1..553485c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,12 +8,12 @@ class MockSerialLoopback: """模拟串口回环的 Serial 类 - + 写入的数据会自动进入读取缓冲区,模拟TX/RX短接的回环效果 """ - + def __init__(self, port=None, baudrate=9600, bytesize=8, parity='N', - stopbits=1, timeout=None, write_timeout=None, + stopbits=1, timeout=None, write_timeout=None, xonxoff=False, rtscts=False, **kwargs): self.port = port self.baudrate = baudrate @@ -24,26 +24,26 @@ def __init__(self, port=None, baudrate=9600, bytesize=8, parity='N', self.write_timeout = write_timeout self.xonxoff = xonxoff self.rtscts = rtscts - + self._is_open = True self._buffer = bytearray() # 回环缓冲区 self._lock = threading.Lock() - + @property def is_open(self): return self._is_open - + @property def in_waiting(self): with self._lock: return len(self._buffer) - + def open(self): self._is_open = True - + def close(self): self._is_open = False - + def write(self, data): """写入数据,同时放入读取缓冲区(回环)""" if not self._is_open: @@ -53,7 +53,7 @@ def write(self, data): with self._lock: self._buffer.extend(data) return len(data) - + def read(self, size=1): """从缓冲区读取数据""" if not self._is_open: @@ -62,27 +62,27 @@ def read(self, size=1): data = bytes(self._buffer[:size]) self._buffer = self._buffer[size:] return data - + def read_all(self): """读取所有可用数据""" with self._lock: data = bytes(self._buffer) self._buffer.clear() return data - + def reset_input_buffer(self): """清空输入缓冲区""" with self._lock: self._buffer.clear() - + def reset_output_buffer(self): """清空输出缓冲区(模拟)""" pass - + def flush(self): """刷新输出""" pass - + def apply_settings(self, settings): """应用配置设置(热更新)""" if "baudrate" in settings: @@ -121,17 +121,17 @@ def mock_serial(): @pytest.fixture def mock_serial_loopback(): """模拟串口回环 - 写入数据自动进入读取缓冲区 - + 用于集成测试,模拟真实硬件的回环效果 """ mock_instances = {} - + def create_mock_serial(*args, **kwargs): port = kwargs.get('port') or (args[0] if args else '/dev/mock') if port not in mock_instances: mock_instances[port] = MockSerialLoopback(*args, **kwargs) return mock_instances[port] - + with patch("serial.Serial", side_effect=create_mock_serial): yield mock_instances @@ -148,7 +148,7 @@ def mock_list_ports_with_devices(): """模拟返回设备列表的 list_ports""" mock_ports = [ MockPortInfo("/dev/ttyMOCK0", "模拟USB串口", "USB VID:PID=1234:5678"), - MockPortInfo("/dev/ttyMOCK1", "模拟蓝牙串口", "BLUETOOTH ADDR=00:11:22:33:44:55"), + MockPortInfo("/dev/ttyMOCK1", "模拟蓝牙串口", "BT ADDR=00:11:22:33:44:55"), ] with patch("serial.tools.list_ports.comports", return_value=mock_ports): yield mock_ports @@ -167,16 +167,16 @@ def reset_managers(): """重置全局管理器状态,用于隔离测试""" # 保存原状态 from uart_mcp import serial_manager, terminal_manager - + old_serial = serial_manager._serial_manager old_terminal = terminal_manager._terminal_manager - + # 重置为 None serial_manager._serial_manager = None terminal_manager._terminal_manager = None - + yield - + # 清理新创建的管理器 try: if serial_manager._serial_manager is not None: @@ -188,7 +188,7 @@ def reset_managers(): terminal_manager._terminal_manager.shutdown() except Exception: pass - + # 恢复原状态 serial_manager._serial_manager = old_serial terminal_manager._terminal_manager = old_terminal diff --git a/tests/test_config_management.py b/tests/test_config_management.py index dbf503c..f745d49 100644 --- a/tests/test_config_management.py +++ b/tests/test_config_management.py @@ -4,7 +4,6 @@ """ import os -import stat import tempfile from pathlib import Path from unittest.mock import patch @@ -12,14 +11,14 @@ import pytest from uart_mcp.config import ( - ConfigManager, BlacklistManager, + ConfigManager, UartConfig, - get_config_dir, - get_config_path, + get_blacklist_manager, get_blacklist_path, + get_config_dir, get_config_manager, - get_blacklist_manager, + get_config_path, ) diff --git a/tests/test_integration_mock.py b/tests/test_integration_mock.py index fbf35d4..c8f2fda 100644 --- a/tests/test_integration_mock.py +++ b/tests/test_integration_mock.py @@ -19,24 +19,24 @@ class TestMockIntegration: """基于Mock的集成测试类""" - + # ========== 阶段1:串口基础操作测试 ========== - + def test_list_ports(self, mock_list_ports_with_devices, reset_managers): """测试:列出所有可用串口""" from uart_mcp.tools.list_ports import list_ports - + ports = list_ports() - + assert len(ports) == 2 port_names = [p["port"] for p in ports] assert MOCK_PORT in port_names assert "/dev/ttyMOCK1" in port_names - + def test_open_port(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:打开串口""" from uart_mcp.tools.port_ops import open_port - + result = open_port( port=MOCK_PORT, baudrate=MOCK_BAUDRATE, @@ -44,143 +44,143 @@ def test_open_port(self, mock_serial_loopback, mock_list_ports_with_devices, res parity="N", stopbits=1.0, ) - + assert result.get("is_open") is True assert result.get("port") == MOCK_PORT - + def test_get_status(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:获取串口状态""" from uart_mcp.tools.port_ops import get_status, open_port - + # 先打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) - + result = get_status(port=MOCK_PORT) - + assert result.get("is_open") is True config = result.get("config", {}) assert config.get("baudrate") == MOCK_BAUDRATE - + def test_set_config(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:修改串口配置(热更新)""" from uart_mcp.tools.port_ops import get_status, open_port, set_config - + # 先打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) - + # 修改波特率 new_baudrate = 9600 result = set_config(port=MOCK_PORT, baudrate=new_baudrate) - + config = result.get("config", {}) assert config.get("baudrate") == new_baudrate - + # 验证配置已更新 status = get_status(port=MOCK_PORT) assert status.get("config", {}).get("baudrate") == new_baudrate - + def test_close_port(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:关闭串口""" from uart_mcp.tools.port_ops import close_port, open_port - + # 先打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) - + result = close_port(port=MOCK_PORT) - + assert result.get("success") is True - + # ========== 阶段2:数据通信测试(回环) ========== - + def test_send_receive_text(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:发送和接收文本数据(回环测试)""" from uart_mcp.tools.data_ops import read_data, send_data from uart_mcp.tools.port_ops import open_port - + # 打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) - + test_message = "Hello UART 你好串口!" - + # 发送数据 send_result = send_data(port=MOCK_PORT, data=test_message, is_binary=False) assert send_result.get("success") is True - + # 读取数据(回环) read_result = read_data(port=MOCK_PORT, is_binary=False) received = read_result.get("data", "") - + assert test_message in received - + def test_send_receive_binary(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:发送和接收二进制数据(回环测试)""" from uart_mcp.tools.data_ops import read_data, send_data from uart_mcp.tools.port_ops import open_port - + # 打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) - + # 准备二进制数据 raw_data = bytes([0x01, 0x02, 0x03, 0xFE, 0xFF]) b64_data = base64.b64encode(raw_data).decode("ascii") - + # 发送二进制数据 send_result = send_data(port=MOCK_PORT, data=b64_data, is_binary=True) assert send_result.get("success") is True - + # 读取二进制数据(回环) read_result = read_data(port=MOCK_PORT, is_binary=True) received_b64 = read_result.get("data", "") received_raw = base64.b64decode(received_b64) if received_b64 else b"" - + assert raw_data == received_raw - + # ========== 阶段3:终端会话测试 ========== - + def test_create_session(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:创建终端会话""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import create_session - + # 先打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) - + result = create_session( port=MOCK_PORT, line_ending="CRLF", local_echo=False, ) - + assert result.get("session_id") == MOCK_PORT - + def test_list_sessions(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:列出所有会话""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import create_session, list_sessions - + # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) - + result = list_sessions() sessions = result.get("sessions", []) - + session_ids = [s.get("session_id") if isinstance(s, dict) else s for s in sessions] assert MOCK_PORT in session_ids - + def test_get_session_info(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:获取会话信息""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import create_session, get_session_info - + # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) - + result = get_session_info(session_id=MOCK_PORT) - + assert result.get("session_id") == MOCK_PORT - + def test_send_command_read_output(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:发送命令并读取输出(回环测试)""" from uart_mcp.tools.port_ops import open_port @@ -190,16 +190,16 @@ def test_send_command_read_output(self, mock_serial_loopback, mock_list_ports_wi read_output, send_command, ) - + # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) - + # 清空缓冲区 clear_buffer(session_id=MOCK_PORT) - + test_cmd = "AT" - + # 发送命令 send_result = send_command( session_id=MOCK_PORT, @@ -207,79 +207,84 @@ def test_send_command_read_output(self, mock_serial_loopback, mock_list_ports_wi add_line_ending=True, ) assert send_result.get("success") is True - + # 等待后台线程读取数据 time.sleep(0.2) - + # 读取输出(回环模式下应该收到发送的命令) read_result = read_output(session_id=MOCK_PORT, clear=False) output = read_result.get("data", "") - + assert test_cmd in output - + def test_clear_buffer(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:清空缓冲区""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import clear_buffer, create_session, read_output - + # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) - + result = clear_buffer(session_id=MOCK_PORT) assert result.get("success") is True - + # 验证缓冲区已清空 read_result = read_output(session_id=MOCK_PORT, clear=False) assert len(read_result.get("data", "")) == 0 - + def test_close_session(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:关闭终端会话""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import close_session, create_session - + # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) - + result = close_session(session_id=MOCK_PORT) - + assert result.get("success") is True - + # ========== 阶段4:错误处理测试 ========== - + def test_open_nonexistent_port(self, mock_list_ports_with_devices, reset_managers): """测试:打开不存在的端口(端口不在列表中)""" from uart_mcp.errors import PortNotFoundError from uart_mcp.tools.port_ops import open_port - + # 不使用mock_serial_loopback,让它尝试打开真实(不存在)端口 # 由于端口不存在于系统中,应该抛出异常 with pytest.raises((PortNotFoundError, Exception)): open_port(port="/dev/ttyNONEXIST_CICD_TEST", baudrate=9600) - + def test_get_status_unopened_port(self, reset_managers): """测试:获取未打开端口的状态""" from uart_mcp.tools.port_ops import get_status - + with pytest.raises(Exception): get_status(port=MOCK_PORT) - + def test_send_data_unopened_port(self, reset_managers): """测试:向未打开的端口发送数据""" from uart_mcp.tools.data_ops import send_data - + with pytest.raises(Exception): send_data(port=MOCK_PORT, data="test", is_binary=False) class TestMockIntegrationWorkflow: """完整工作流程测试""" - + def test_full_workflow(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:完整的串口通信工作流程""" from uart_mcp.tools.data_ops import read_data, send_data - from uart_mcp.tools.port_ops import close_port, get_status, open_port, set_config + from uart_mcp.tools.port_ops import ( + close_port, + get_status, + open_port, + set_config, + ) from uart_mcp.tools.terminal import ( clear_buffer, close_session, @@ -289,60 +294,60 @@ def test_full_workflow(self, mock_serial_loopback, mock_list_ports_with_devices, read_output, send_command, ) - + # 1. 打开串口 result = open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) assert result.get("is_open") is True - + # 2. 获取状态 status = get_status(port=MOCK_PORT) assert status.get("is_open") is True - + # 3. 修改配置 new_config = set_config(port=MOCK_PORT, baudrate=9600) assert new_config.get("config", {}).get("baudrate") == 9600 - + # 恢复配置 set_config(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) - + # 4. 发送/接收文本 send_data(port=MOCK_PORT, data="Hello", is_binary=False) read_result = read_data(port=MOCK_PORT, is_binary=False) assert "Hello" in read_result.get("data", "") - + # 5. 发送/接收二进制 raw = bytes([0xAA, 0xBB]) send_data(port=MOCK_PORT, data=base64.b64encode(raw).decode(), is_binary=True) read_result = read_data(port=MOCK_PORT, is_binary=True) assert base64.b64decode(read_result.get("data", "")) == raw - + # 6. 创建终端会话 session = create_session(port=MOCK_PORT) assert session.get("session_id") == MOCK_PORT - + # 7. 列出会话 sessions = list_sessions() assert len(sessions.get("sessions", [])) > 0 - + # 8. 获取会话信息 info = get_session_info(session_id=MOCK_PORT) assert info.get("session_id") == MOCK_PORT - + # 9. 发送命令 clear_buffer(session_id=MOCK_PORT) send_command(session_id=MOCK_PORT, command="TEST") time.sleep(0.2) output = read_output(session_id=MOCK_PORT) assert "TEST" in output.get("data", "") - + # 10. 清空缓冲区 clear_buffer(session_id=MOCK_PORT) output = read_output(session_id=MOCK_PORT) assert output.get("data", "") == "" - + # 11. 关闭会话 close_session(session_id=MOCK_PORT) - + # 12. 关闭串口 result = close_port(port=MOCK_PORT) assert result.get("success") is True diff --git a/tests/test_integration_real_port.py b/tests/test_integration_real_port.py index fb89621..3203b74 100644 --- a/tests/test_integration_real_port.py +++ b/tests/test_integration_real_port.py @@ -119,7 +119,11 @@ def test_send_receive_text() -> None: received = read_result.get("data", "") success = test_message in received - print_result("send/read_data - 文本回环", success, f"发送: {test_message}, 接收: {received}") + print_result( + "send/read_data - 文本回环", + success, + f"发送: {test_message}, 接收: {received}" + ) assert success, "文本回环测试失败" except Exception as e: print_result("send/read_data - 文本回环", False, str(e)) @@ -150,7 +154,11 @@ def test_send_receive_binary() -> None: received_raw = base64.b64decode(received_b64) if received_b64 else b"" success = raw_data == received_raw - print_result("send/read_data - 二进制回环", success, f"发送: {raw_data.hex()}, 接收: {received_raw.hex()}") + print_result( + "send/read_data - 二进制回环", + success, + f"发送: {raw_data.hex()}, 接收: {received_raw.hex()}" + ) assert success, "二进制回环测试失败" except Exception as e: print_result("send/read_data - 二进制回环", False, str(e)) @@ -183,7 +191,9 @@ def test_list_sessions() -> None: result = list_sessions() sessions = result.get("sessions", []) # sessions 是对象列表,需要检查 session_id 字段 - session_ids = [s.get("session_id") if isinstance(s, dict) else s for s in sessions] + session_ids = [ + s.get("session_id") if isinstance(s, dict) else s for s in sessions + ] success = TEST_PORT in session_ids print_result("list_sessions - 列出会话", success, result) assert success, "列出会话失败" @@ -243,7 +253,11 @@ def test_send_command_read_output() -> None: # 回环模式下应该收到发送的命令 success = test_cmd in output - print_result("send/read_output - 命令回环", success, f"发送: {test_cmd}, 输出: {repr(output)}") + print_result( + "send/read_output - 命令回环", + success, + f"发送: {test_cmd}, 输出: {repr(output)}" + ) assert success, "命令回环测试失败" except Exception as e: print_result("send/read_output - 命令回环", False, str(e)) diff --git a/tests/test_scenario_blacklist_permission.py b/tests/test_scenario_blacklist_permission.py index e8c7793..f8bf2e9 100644 --- a/tests/test_scenario_blacklist_permission.py +++ b/tests/test_scenario_blacklist_permission.py @@ -117,7 +117,7 @@ def test_scenario_windows_skip_permission(): # Windows 跳过权限校验 with patch("uart_mcp.config.get_config_path", return_value=test_path), \ patch("platform.system", return_value="Windows"): - cm = ConfigManager() + ConfigManager() # noqa: F841 # 不应抛出异常,但配置加载失败(文件格式正确,但权限不应通过) # 实际上由于文件路径正常,会尝试加载 print("✓ Windows 跳过权限校验") diff --git a/tests/test_scenario_hot_reload.py b/tests/test_scenario_hot_reload.py index 39a2da5..e6f8f98 100644 --- a/tests/test_scenario_hot_reload.py +++ b/tests/test_scenario_hot_reload.py @@ -13,7 +13,7 @@ import pytest -from uart_mcp.config import ConfigManager, BlacklistManager, UartConfig +from uart_mcp.config import BlacklistManager, ConfigManager, UartConfig def test_scenario_config_reload_success(): diff --git a/tests/test_serial_manager.py b/tests/test_serial_manager.py index 89efa1b..0c36166 100644 --- a/tests/test_serial_manager.py +++ b/tests/test_serial_manager.py @@ -335,7 +335,7 @@ def test_read_data_with_timeout(self, mock_serial, mock_list_ports): mock_create.return_value = mock_serial_obj manager.open_port("/dev/ttyUSB0") - data = manager.read_data("/dev/ttyUSB0", timeout_ms=500) + manager.read_data("/dev/ttyUSB0", timeout_ms=500) # 忽略返回值 # 验证超时被临时修改 assert mock_serial_obj.timeout == 1.0 # 应该恢复原值 diff --git a/tests/test_server_handlers.py b/tests/test_server_handlers.py index 4ca6a38..d56bbfb 100644 --- a/tests/test_server_handlers.py +++ b/tests/test_server_handlers.py @@ -11,32 +11,32 @@ class TestHandleListTools: """测试 handle_list_tools 函数""" - + @pytest.mark.asyncio async def test_handle_list_tools_returns_all_tools(self): """测试:handle_list_tools 返回所有工具""" from uart_mcp.server import handle_list_tools - + tools = await handle_list_tools() - + # 验证返回工具列表 assert isinstance(tools, list) assert len(tools) == 14 # 总共14个工具 - + # 验证每个工具都有必要的属性 tool_names = [tool.name for tool in tools] - + # 串口基础工具 assert "list_ports" in tool_names assert "open_port" in tool_names assert "close_port" in tool_names assert "set_config" in tool_names assert "get_status" in tool_names - + # 数据操作工具 assert "send_data" in tool_names assert "read_data" in tool_names - + # 终端会话工具 assert "create_session" in tool_names assert "close_session" in tool_names @@ -49,261 +49,261 @@ async def test_handle_list_tools_returns_all_tools(self): class TestHandleCallTool: """测试 handle_call_tool 函数""" - + @pytest.mark.asyncio async def test_call_list_ports(self, mock_list_ports_with_devices, reset_managers): """测试:调用 list_ports 工具""" from uart_mcp.server import handle_call_tool - + result = await handle_call_tool("list_ports", {}) - + assert len(result) == 1 assert result[0].type == "text" - + data = json.loads(result[0].text) assert isinstance(data, list) assert len(data) == 2 - + @pytest.mark.asyncio async def test_call_open_port(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 open_port 工具""" from uart_mcp.server import handle_call_tool - + result = await handle_call_tool("open_port", { "port": "/dev/ttyMOCK0", "baudrate": 115200 }) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("is_open") is True - + @pytest.mark.asyncio async def test_call_get_status(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 get_status 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) - + result = await handle_call_tool("get_status", {"port": "/dev/ttyMOCK0"}) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("is_open") is True - + @pytest.mark.asyncio async def test_call_set_config(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 set_config 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) - + result = await handle_call_tool("set_config", { "port": "/dev/ttyMOCK0", "baudrate": 9600 }) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("config", {}).get("baudrate") == 9600 - + @pytest.mark.asyncio async def test_call_send_data(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 send_data 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) - + result = await handle_call_tool("send_data", { "port": "/dev/ttyMOCK0", "data": "Hello", "is_binary": False }) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("success") is True - + @pytest.mark.asyncio async def test_call_read_data(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 read_data 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口并发送数据 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) await handle_call_tool("send_data", {"port": "/dev/ttyMOCK0", "data": "Test", "is_binary": False}) - + result = await handle_call_tool("read_data", { "port": "/dev/ttyMOCK0", "is_binary": False }) - + assert len(result) == 1 data = json.loads(result[0].text) assert "Test" in data.get("data", "") - + @pytest.mark.asyncio async def test_call_close_port(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 close_port 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) - + result = await handle_call_tool("close_port", {"port": "/dev/ttyMOCK0"}) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("success") is True - + # ========== 终端会话工具测试 ========== - + @pytest.mark.asyncio async def test_call_create_session(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 create_session 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) - + result = await handle_call_tool("create_session", {"port": "/dev/ttyMOCK0"}) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("session_id") == "/dev/ttyMOCK0" - + @pytest.mark.asyncio async def test_call_list_sessions(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 list_sessions 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口并创建会话 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) await handle_call_tool("create_session", {"port": "/dev/ttyMOCK0"}) - + result = await handle_call_tool("list_sessions", {}) - + assert len(result) == 1 data = json.loads(result[0].text) assert "sessions" in data - + @pytest.mark.asyncio async def test_call_get_session_info(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 get_session_info 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口并创建会话 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) await handle_call_tool("create_session", {"port": "/dev/ttyMOCK0"}) - + result = await handle_call_tool("get_session_info", {"session_id": "/dev/ttyMOCK0"}) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("session_id") == "/dev/ttyMOCK0" - + @pytest.mark.asyncio async def test_call_send_command(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 send_command 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口并创建会话 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) await handle_call_tool("create_session", {"port": "/dev/ttyMOCK0"}) - + result = await handle_call_tool("send_command", { "session_id": "/dev/ttyMOCK0", "command": "AT" }) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("success") is True - + @pytest.mark.asyncio async def test_call_read_output(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 read_output 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口并创建会话 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) await handle_call_tool("create_session", {"port": "/dev/ttyMOCK0"}) - + result = await handle_call_tool("read_output", {"session_id": "/dev/ttyMOCK0"}) - + assert len(result) == 1 data = json.loads(result[0].text) assert "data" in data - + @pytest.mark.asyncio async def test_call_clear_buffer(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 clear_buffer 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口并创建会话 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) await handle_call_tool("create_session", {"port": "/dev/ttyMOCK0"}) - + result = await handle_call_tool("clear_buffer", {"session_id": "/dev/ttyMOCK0"}) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("success") is True - + @pytest.mark.asyncio async def test_call_close_session(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:调用 close_session 工具""" from uart_mcp.server import handle_call_tool - + # 先打开串口并创建会话 await handle_call_tool("open_port", {"port": "/dev/ttyMOCK0", "baudrate": 115200}) await handle_call_tool("create_session", {"port": "/dev/ttyMOCK0"}) - + result = await handle_call_tool("close_session", {"session_id": "/dev/ttyMOCK0"}) - + assert len(result) == 1 data = json.loads(result[0].text) assert data.get("success") is True - + # ========== 错误处理测试 ========== - + @pytest.mark.asyncio async def test_call_unknown_tool(self, reset_managers): """测试:调用未知工具返回错误""" from uart_mcp.server import handle_call_tool - + result = await handle_call_tool("unknown_tool", {}) - + assert len(result) == 1 data = json.loads(result[0].text) assert "error" in data assert "未知工具" in data["error"]["message"] - + @pytest.mark.asyncio async def test_serial_error_handling(self, reset_managers): """测试:SerialError 异常处理""" from uart_mcp.server import handle_call_tool - + # 尝试获取未打开端口的状态,应该触发 SerialError result = await handle_call_tool("get_status", {"port": "/dev/ttyNONEXIST"}) - + assert len(result) == 1 data = json.loads(result[0].text) assert "error" in data assert "code" in data["error"] - + @pytest.mark.asyncio async def test_general_exception_handling(self, reset_managers): """测试:普通异常处理""" from uart_mcp.server import handle_call_tool - + # 使用 patch 正确的模块路径来触发普通异常 with patch("uart_mcp.server.list_ports", side_effect=RuntimeError("测试异常")): result = await handle_call_tool("list_ports", {}) - + assert len(result) == 1 data = json.loads(result[0].text) assert "error" in data @@ -312,62 +312,71 @@ async def test_general_exception_handling(self, reset_managers): class TestMainAndRunServer: """测试 main() 和 run_server() 函数""" - + @pytest.mark.asyncio async def test_run_server_starts_correctly(self): """测试:run_server 启动并立即退出""" from uart_mcp.server import run_server - + # Mock stdio_server 上下文管理器,让它立即返回 mock_read_stream = AsyncMock() mock_write_stream = AsyncMock() - + async def mock_aenter(self): return (mock_read_stream, mock_write_stream) - + async def mock_aexit(self, *args): pass - + mock_context = MagicMock() mock_context.__aenter__ = mock_aenter mock_context.__aexit__ = mock_aexit - - with patch("uart_mcp.server.mcp.server.stdio.stdio_server", return_value=mock_context): - with patch("uart_mcp.server.server.run", new_callable=AsyncMock) as mock_run: + + with patch( + "uart_mcp.server.mcp.server.stdio.stdio_server", + return_value=mock_context + ): + with patch( + "uart_mcp.server.server.run", + new_callable=AsyncMock + ) as mock_run: await run_server() mock_run.assert_called_once() - + def test_main_normal_exit(self): """测试:main 正常启动和退出""" from uart_mcp.server import main - + def mock_asyncio_run_impl(coro): # 关闭协程以避免警告 coro.close() - - with patch("uart_mcp.server.asyncio.run", side_effect=mock_asyncio_run_impl) as mock_asyncio_run: + + with patch( + "uart_mcp.server.asyncio.run", + side_effect=mock_asyncio_run_impl + ) as mock_asyncio_run: with patch("uart_mcp.server.get_terminal_manager") as mock_term_mgr: with patch("uart_mcp.server.get_serial_manager") as mock_serial_mgr: mock_term_instance = MagicMock() mock_serial_instance = MagicMock() mock_term_mgr.return_value = mock_term_instance mock_serial_mgr.return_value = mock_serial_instance - + main() - + mock_asyncio_run.assert_called_once() mock_term_instance.shutdown.assert_called_once() mock_serial_instance.shutdown.assert_called_once() - + def test_main_keyboard_interrupt(self): """测试:main 处理 KeyboardInterrupt""" from uart_mcp.server import main - + def mock_asyncio_run_impl(coro): # 关闭协程以避免警告 coro.close() raise KeyboardInterrupt - + with patch("uart_mcp.server.asyncio.run", side_effect=mock_asyncio_run_impl): with patch("uart_mcp.server.get_terminal_manager") as mock_term_mgr: with patch("uart_mcp.server.get_serial_manager") as mock_serial_mgr: @@ -375,10 +384,10 @@ def mock_asyncio_run_impl(coro): mock_serial_instance = MagicMock() mock_term_mgr.return_value = mock_term_instance mock_serial_mgr.return_value = mock_serial_instance - + # 不应该抛出异常 main() - + # 确保清理仍然执行 mock_term_instance.shutdown.assert_called_once() mock_serial_instance.shutdown.assert_called_once()