From d9730500d9a2eeda776250b50b51750dbbd405ea Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 17:06:34 -0500 Subject: [PATCH 01/13] test: add integration tests for node tools (red) --- tests/integration/test_nodes.py | 114 ++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 tests/integration/test_nodes.py diff --git a/tests/integration/test_nodes.py b/tests/integration/test_nodes.py new file mode 100644 index 00000000..b90d63aa --- /dev/null +++ b/tests/integration/test_nodes.py @@ -0,0 +1,114 @@ +"""Integration tests for node tools (step 2). + +These tests verify get_nodes and get_node_details return correct +results using rosapi_service()/rosapi_type() resolved paths. + +Note: get_node_details crashes rosapi_node on Humble and Jazzy (#273). +Those tests are skipped on ROS 2. +""" + +import pytest + +from ros_mcp.utils.rosapi_types import RosVersion, get_ros_version, rosapi_service, rosapi_type + +pytestmark = [pytest.mark.integration] + + +class TestGetNodes: + """Verify get_nodes tool returns the running node list.""" + + def test_returns_nodes(self, ws): + """get_nodes should return a non-empty list of nodes.""" + message = { + "op": "call_service", + "id": "test_get_nodes", + "service": rosapi_service("nodes"), + "type": rosapi_type("Nodes"), + "args": {}, + } + response = ws.request(message) + assert response is not None + assert isinstance(response, dict) + assert response.get("result") is not False, f"Service call failed: {response}" + assert "values" in response + nodes = response["values"].get("nodes", []) + assert len(nodes) > 0, "Should find at least one node" + + def test_includes_turtlesim(self, ws): + """turtlesim node should be present (launched by Docker container).""" + message = { + "op": "call_service", + "id": "test_nodes_turtlesim", + "service": rosapi_service("nodes"), + "type": rosapi_type("Nodes"), + "args": {}, + } + response = ws.request(message) + nodes = response["values"].get("nodes", []) + assert any("/turtlesim" in n for n in nodes), f"turtlesim not in {nodes}" + + def test_node_count(self, ws): + """Should have at least 3 nodes: turtlesim, rosbridge, rosapi.""" + message = { + "op": "call_service", + "id": "test_node_count", + "service": rosapi_service("nodes"), + "type": rosapi_type("Nodes"), + "args": {}, + } + response = ws.request(message) + nodes = response["values"].get("nodes", []) + assert len(nodes) >= 3, f"Expected >= 3 nodes, got {len(nodes)}: {nodes}" + + +class TestGetNodeDetails: + """Verify get_node_details tool returns publishers/subscribers/services. + + Note: get_node_details crashes rosapi_node on Humble and Jazzy (#273). + These tests only run on ROS 1. + """ + + @pytest.fixture(autouse=True) + def _skip_ros2(self, ws): + """Skip all tests in this class on ROS 2 due to #273.""" + # ws ensures detect_rosapi_types() has been called + if get_ros_version() == RosVersion.ROS2: + pytest.skip("get_node_details crashes rosapi_node on ROS 2 (#273)") + + def test_turtlesim_details(self, ws): + """get_node_details for /turtlesim should return publishers and subscribers.""" + message = { + "op": "call_service", + "id": "test_node_details_turtlesim", + "service": rosapi_service("node_details"), + "type": rosapi_type("NodeDetails"), + "args": {"node": "/turtlesim"}, + } + response = ws.request(message) + assert response is not None + assert isinstance(response, dict) + assert response.get("result") is not False, f"Service call failed: {response}" + assert "values" in response + values = response["values"] + # turtlesim publishes to /turtle1/pose and subscribes to /turtle1/cmd_vel + assert len(values.get("publishing", [])) > 0, "turtlesim should have publishers" + assert len(values.get("subscribing", [])) > 0, "turtlesim should have subscribers" + + def test_nonexistent_node(self, ws): + """get_node_details for a nonexistent node should return empty or error.""" + message = { + "op": "call_service", + "id": "test_node_details_missing", + "service": rosapi_service("node_details"), + "type": rosapi_type("NodeDetails"), + "args": {"node": "/nonexistent_node_xyz"}, + } + response = ws.request(message) + assert response is not None + # Either result is false or values are empty + if response.get("result") is not False: + values = response.get("values", {}) + publishing = values.get("publishing", []) + subscribing = values.get("subscribing", []) + services = values.get("services", []) + assert not publishing and not subscribing and not services From 7f347067b567bc2e3f1ff6141874807ba005b02c Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 17:07:06 -0500 Subject: [PATCH 02/13] refactor(nodes): use rosapi_service()/rosapi_type() instead of hardcoded paths --- ros_mcp/tools/nodes.py | 9 +++++---- tests/integration/test_nodes.py | 32 ++++++++++---------------------- 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/ros_mcp/tools/nodes.py b/ros_mcp/tools/nodes.py index 5823f3c0..fba2c5d2 100644 --- a/ros_mcp/tools/nodes.py +++ b/ros_mcp/tools/nodes.py @@ -4,6 +4,7 @@ from mcp.types import ToolAnnotations from ros_mcp.utils.response import _check_response, _safe_get_values +from ros_mcp.utils.rosapi_types import rosapi_service, rosapi_type from ros_mcp.utils.websocket import WebSocketManager @@ -31,8 +32,8 @@ def get_nodes() -> dict: # rosbridge service call to get node list message = { "op": "call_service", - "service": "/rosapi/nodes", - "type": "rosapi/Nodes", + "service": rosapi_service("nodes"), + "type": rosapi_type("Nodes"), "args": {}, "id": "get_nodes_request_1", } @@ -91,8 +92,8 @@ def get_node_details(node: str) -> dict: # rosbridge service call to get node details message = { "op": "call_service", - "service": "/rosapi/node_details", - "type": "rosapi/NodeDetails", + "service": rosapi_service("node_details"), + "type": rosapi_type("NodeDetails"), "args": {"node": node}, "id": f"get_node_details_{node.replace('/', '_')}", } diff --git a/tests/integration/test_nodes.py b/tests/integration/test_nodes.py index b90d63aa..e0f4058f 100644 --- a/tests/integration/test_nodes.py +++ b/tests/integration/test_nodes.py @@ -3,8 +3,8 @@ These tests verify get_nodes and get_node_details return correct results using rosapi_service()/rosapi_type() resolved paths. -Note: get_node_details crashes rosapi_node on Humble and Jazzy (#273). -Those tests are skipped on ROS 2. +Note: get_node_details for nonexistent nodes crashes rosapi_node on +ROS 2 (#273), causing a timeout. That test is skipped on ROS 2. """ import pytest @@ -62,18 +62,7 @@ def test_node_count(self, ws): class TestGetNodeDetails: - """Verify get_node_details tool returns publishers/subscribers/services. - - Note: get_node_details crashes rosapi_node on Humble and Jazzy (#273). - These tests only run on ROS 1. - """ - - @pytest.fixture(autouse=True) - def _skip_ros2(self, ws): - """Skip all tests in this class on ROS 2 due to #273.""" - # ws ensures detect_rosapi_types() has been called - if get_ros_version() == RosVersion.ROS2: - pytest.skip("get_node_details crashes rosapi_node on ROS 2 (#273)") + """Verify get_node_details tool returns publishers/subscribers/services.""" def test_turtlesim_details(self, ws): """get_node_details for /turtlesim should return publishers and subscribers.""" @@ -90,12 +79,13 @@ def test_turtlesim_details(self, ws): assert response.get("result") is not False, f"Service call failed: {response}" assert "values" in response values = response["values"] - # turtlesim publishes to /turtle1/pose and subscribes to /turtle1/cmd_vel assert len(values.get("publishing", [])) > 0, "turtlesim should have publishers" assert len(values.get("subscribing", [])) > 0, "turtlesim should have subscribers" - def test_nonexistent_node(self, ws): - """get_node_details for a nonexistent node should return empty or error.""" + def test_nonexistent_node_ros1(self, ws): + """On ROS 1, get_node_details for a nonexistent node returns empty lists.""" + if get_ros_version() == RosVersion.ROS2: + pytest.skip("nonexistent node crashes rosapi_node on ROS 2 (#273)") message = { "op": "call_service", "id": "test_node_details_missing", @@ -105,10 +95,8 @@ def test_nonexistent_node(self, ws): } response = ws.request(message) assert response is not None - # Either result is false or values are empty if response.get("result") is not False: values = response.get("values", {}) - publishing = values.get("publishing", []) - subscribing = values.get("subscribing", []) - services = values.get("services", []) - assert not publishing and not subscribing and not services + assert not values.get("publishing", []) + assert not values.get("subscribing", []) + assert not values.get("services", []) From 470e918067eee212d9af593f79f73c55d7799d1b Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 17:22:09 -0500 Subject: [PATCH 03/13] test: add negative tests and run node_details on all distros --- tests/integration/test_nodes.py | 117 +++++++++++++++++++++++++++++--- 1 file changed, 107 insertions(+), 10 deletions(-) diff --git a/tests/integration/test_nodes.py b/tests/integration/test_nodes.py index e0f4058f..e95a42c2 100644 --- a/tests/integration/test_nodes.py +++ b/tests/integration/test_nodes.py @@ -3,8 +3,8 @@ These tests verify get_nodes and get_node_details return correct results using rosapi_service()/rosapi_type() resolved paths. -Note: get_node_details for nonexistent nodes crashes rosapi_node on -ROS 2 (#273), causing a timeout. That test is skipped on ROS 2. +Note: calling node_details for nonexistent nodes crashes rosapi_node +on ROS 2 (#273). Tests only query nodes confirmed to exist. """ import pytest @@ -62,10 +62,26 @@ def test_node_count(self, ws): class TestGetNodeDetails: - """Verify get_node_details tool returns publishers/subscribers/services.""" + """Verify get_node_details tool returns publishers/subscribers/services. + + Only queries nodes that exist in the node list — calling node_details + for nonexistent nodes crashes rosapi_node on ROS 2 (#273). + """ def test_turtlesim_details(self, ws): """get_node_details for /turtlesim should return publishers and subscribers.""" + # Verify turtlesim is in the node list before querying details + list_msg = { + "op": "call_service", + "id": "test_details_list_first", + "service": rosapi_service("nodes"), + "type": rosapi_type("Nodes"), + "args": {}, + } + list_resp = ws.request(list_msg) + nodes = list_resp["values"].get("nodes", []) + assert any("/turtlesim" in n for n in nodes), f"turtlesim not in {nodes}" + message = { "op": "call_service", "id": "test_node_details_turtlesim", @@ -82,21 +98,102 @@ def test_turtlesim_details(self, ws): assert len(values.get("publishing", [])) > 0, "turtlesim should have publishers" assert len(values.get("subscribing", [])) > 0, "turtlesim should have subscribers" - def test_nonexistent_node_ros1(self, ws): - """On ROS 1, get_node_details for a nonexistent node returns empty lists.""" + def test_rosbridge_details(self, ws): + """get_node_details for rosbridge should return services.""" + # Find the rosbridge node name from the node list + list_msg = { + "op": "call_service", + "id": "test_details_rosbridge_list", + "service": rosapi_service("nodes"), + "type": rosapi_type("Nodes"), + "args": {}, + } + list_resp = ws.request(list_msg) + nodes = list_resp["values"].get("nodes", []) + rosbridge = next((n for n in nodes if "rosbridge" in n), None) + assert rosbridge is not None, f"rosbridge not in {nodes}" + + message = { + "op": "call_service", + "id": "test_node_details_rosbridge", + "service": rosapi_service("node_details"), + "type": rosapi_type("NodeDetails"), + "args": {"node": rosbridge}, + } + response = ws.request(message) + assert response is not None + assert isinstance(response, dict) + assert response.get("result") is not False, f"Service call failed: {response}" + assert "values" in response + values = response["values"] + assert len(values.get("services", [])) > 0, "rosbridge should have services" + + +class TestGetNodesNegative: + """Negative tests for node tools — edge cases and unexpected input.""" + + def test_nodes_are_strings(self, ws): + """Every node name should be a string starting with /.""" + message = { + "op": "call_service", + "id": "test_nodes_strings", + "service": rosapi_service("nodes"), + "type": rosapi_type("Nodes"), + "args": {}, + } + response = ws.request(message) + nodes = response["values"].get("nodes", []) + for node in nodes: + assert isinstance(node, str), f"Expected string, got {type(node)}: {node}" + assert node.startswith("/"), f"Node name should start with /: {node}" + + def test_node_details_empty_name_ros1(self, ws): + """node_details with empty node name should fail gracefully on ROS 1. + + On ROS 2 this crashes rosapi_node (#273), so only test on ROS 1. + """ if get_ros_version() == RosVersion.ROS2: - pytest.skip("nonexistent node crashes rosapi_node on ROS 2 (#273)") + pytest.skip("empty node name crashes rosapi_node on ROS 2 (#273)") message = { "op": "call_service", - "id": "test_node_details_missing", + "id": "test_node_details_empty", "service": rosapi_service("node_details"), "type": rosapi_type("NodeDetails"), - "args": {"node": "/nonexistent_node_xyz"}, + "args": {"node": ""}, } response = ws.request(message) assert response is not None + # Should either fail (result: false) or return empty details + values = response.get("values", {}) if response.get("result") is not False: - values = response.get("values", {}) assert not values.get("publishing", []) assert not values.get("subscribing", []) - assert not values.get("services", []) + + def test_turtlesim_has_expected_topics(self, ws): + """turtlesim details should include well-known topic names.""" + # Find turtlesim in node list first + list_msg = { + "op": "call_service", + "id": "test_neg_list", + "service": rosapi_service("nodes"), + "type": rosapi_type("Nodes"), + "args": {}, + } + list_resp = ws.request(list_msg) + nodes = list_resp["values"].get("nodes", []) + assert any("/turtlesim" in n for n in nodes) + + message = { + "op": "call_service", + "id": "test_neg_turtlesim_topics", + "service": rosapi_service("node_details"), + "type": rosapi_type("NodeDetails"), + "args": {"node": "/turtlesim"}, + } + response = ws.request(message) + values = response["values"] + publishers = values.get("publishing", []) + subscribers = values.get("subscribing", []) + # turtlesim should publish pose and subscribe to cmd_vel + assert any("pose" in p for p in publishers), f"No pose topic in {publishers}" + assert any("cmd_vel" in s for s in subscribers), f"No cmd_vel topic in {subscribers}" From 0c056f9aeab9202700e43cf8f7cf8f45441aca56 Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 18:35:07 -0500 Subject: [PATCH 04/13] test: call actual MCP tools in node integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests now call get_nodes() and get_node_details() tool functions directly. Includes negative tests for empty/whitespace node names. No skips — empty name is caught by the tool's own validation before reaching rosapi_node. --- tests/integration/test_nodes.py | 244 +++++++++----------------------- 1 file changed, 69 insertions(+), 175 deletions(-) diff --git a/tests/integration/test_nodes.py b/tests/integration/test_nodes.py index e95a42c2..71cc46a6 100644 --- a/tests/integration/test_nodes.py +++ b/tests/integration/test_nodes.py @@ -1,199 +1,93 @@ """Integration tests for node tools (step 2). -These tests verify get_nodes and get_node_details return correct -results using rosapi_service()/rosapi_type() resolved paths. +These tests call the actual MCP tool functions (get_nodes, get_node_details) +against a live rosbridge container. -Note: calling node_details for nonexistent nodes crashes rosapi_node +Note: calling get_node_details for nonexistent nodes crashes rosapi_node on ROS 2 (#273). Tests only query nodes confirmed to exist. """ import pytest -from ros_mcp.utils.rosapi_types import RosVersion, get_ros_version, rosapi_service, rosapi_type - pytestmark = [pytest.mark.integration] class TestGetNodes: - """Verify get_nodes tool returns the running node list.""" - - def test_returns_nodes(self, ws): - """get_nodes should return a non-empty list of nodes.""" - message = { - "op": "call_service", - "id": "test_get_nodes", - "service": rosapi_service("nodes"), - "type": rosapi_type("Nodes"), - "args": {}, - } - response = ws.request(message) - assert response is not None - assert isinstance(response, dict) - assert response.get("result") is not False, f"Service call failed: {response}" - assert "values" in response - nodes = response["values"].get("nodes", []) - assert len(nodes) > 0, "Should find at least one node" - - def test_includes_turtlesim(self, ws): + """Verify get_nodes MCP tool returns the running node list.""" + + def test_returns_nodes(self, tools): + """get_nodes should return nodes and node_count.""" + result = tools["get_nodes"]() + assert "nodes" in result + assert "node_count" in result + assert result["node_count"] > 0 + assert result["node_count"] == len(result["nodes"]) + + def test_includes_turtlesim(self, tools): """turtlesim node should be present (launched by Docker container).""" - message = { - "op": "call_service", - "id": "test_nodes_turtlesim", - "service": rosapi_service("nodes"), - "type": rosapi_type("Nodes"), - "args": {}, - } - response = ws.request(message) - nodes = response["values"].get("nodes", []) + result = tools["get_nodes"]() + nodes = result["nodes"] assert any("/turtlesim" in n for n in nodes), f"turtlesim not in {nodes}" - def test_node_count(self, ws): + def test_includes_rosbridge(self, tools): + """rosbridge node should be present.""" + result = tools["get_nodes"]() + nodes = result["nodes"] + assert any("rosbridge" in n for n in nodes), f"rosbridge not in {nodes}" + + def test_node_count_at_least_three(self, tools): """Should have at least 3 nodes: turtlesim, rosbridge, rosapi.""" - message = { - "op": "call_service", - "id": "test_node_count", - "service": rosapi_service("nodes"), - "type": rosapi_type("Nodes"), - "args": {}, - } - response = ws.request(message) - nodes = response["values"].get("nodes", []) - assert len(nodes) >= 3, f"Expected >= 3 nodes, got {len(nodes)}: {nodes}" + result = tools["get_nodes"]() + assert result["node_count"] >= 3, f"Expected >= 3, got {result['node_count']}: {result['nodes']}" + + def test_all_nodes_are_ros_names(self, tools): + """Every node name should be a string starting with /.""" + result = tools["get_nodes"]() + for node in result["nodes"]: + assert isinstance(node, str), f"Expected string, got {type(node)}: {node}" + assert node.startswith("/"), f"Node name should start with /: {node}" class TestGetNodeDetails: - """Verify get_node_details tool returns publishers/subscribers/services. + """Verify get_node_details MCP tool returns publishers/subscribers/services. - Only queries nodes that exist in the node list — calling node_details - for nonexistent nodes crashes rosapi_node on ROS 2 (#273). + Only queries nodes confirmed to exist — calling get_node_details for + nonexistent nodes crashes rosapi_node on ROS 2 (#273). """ - def test_turtlesim_details(self, ws): + def test_turtlesim_details(self, tools): """get_node_details for /turtlesim should return publishers and subscribers.""" - # Verify turtlesim is in the node list before querying details - list_msg = { - "op": "call_service", - "id": "test_details_list_first", - "service": rosapi_service("nodes"), - "type": rosapi_type("Nodes"), - "args": {}, - } - list_resp = ws.request(list_msg) - nodes = list_resp["values"].get("nodes", []) - assert any("/turtlesim" in n for n in nodes), f"turtlesim not in {nodes}" - - message = { - "op": "call_service", - "id": "test_node_details_turtlesim", - "service": rosapi_service("node_details"), - "type": rosapi_type("NodeDetails"), - "args": {"node": "/turtlesim"}, - } - response = ws.request(message) - assert response is not None - assert isinstance(response, dict) - assert response.get("result") is not False, f"Service call failed: {response}" - assert "values" in response - values = response["values"] - assert len(values.get("publishing", [])) > 0, "turtlesim should have publishers" - assert len(values.get("subscribing", [])) > 0, "turtlesim should have subscribers" - - def test_rosbridge_details(self, ws): + result = tools["get_node_details"](node="/turtlesim") + assert result["node"] == "/turtlesim" + assert result["publisher_count"] > 0, "turtlesim should have publishers" + assert result["subscriber_count"] > 0, "turtlesim should have subscribers" + assert any("pose" in p for p in result["publishers"]), f"No pose in {result['publishers']}" + assert any("cmd_vel" in s for s in result["subscribers"]), f"No cmd_vel in {result['subscribers']}" + + def test_rosbridge_has_services(self, tools): """get_node_details for rosbridge should return services.""" - # Find the rosbridge node name from the node list - list_msg = { - "op": "call_service", - "id": "test_details_rosbridge_list", - "service": rosapi_service("nodes"), - "type": rosapi_type("Nodes"), - "args": {}, - } - list_resp = ws.request(list_msg) - nodes = list_resp["values"].get("nodes", []) - rosbridge = next((n for n in nodes if "rosbridge" in n), None) - assert rosbridge is not None, f"rosbridge not in {nodes}" - - message = { - "op": "call_service", - "id": "test_node_details_rosbridge", - "service": rosapi_service("node_details"), - "type": rosapi_type("NodeDetails"), - "args": {"node": rosbridge}, - } - response = ws.request(message) - assert response is not None - assert isinstance(response, dict) - assert response.get("result") is not False, f"Service call failed: {response}" - assert "values" in response - values = response["values"] - assert len(values.get("services", [])) > 0, "rosbridge should have services" - - -class TestGetNodesNegative: - """Negative tests for node tools — edge cases and unexpected input.""" - - def test_nodes_are_strings(self, ws): - """Every node name should be a string starting with /.""" - message = { - "op": "call_service", - "id": "test_nodes_strings", - "service": rosapi_service("nodes"), - "type": rosapi_type("Nodes"), - "args": {}, - } - response = ws.request(message) - nodes = response["values"].get("nodes", []) - for node in nodes: - assert isinstance(node, str), f"Expected string, got {type(node)}: {node}" - assert node.startswith("/"), f"Node name should start with /: {node}" - - def test_node_details_empty_name_ros1(self, ws): - """node_details with empty node name should fail gracefully on ROS 1. - - On ROS 2 this crashes rosapi_node (#273), so only test on ROS 1. - """ - if get_ros_version() == RosVersion.ROS2: - pytest.skip("empty node name crashes rosapi_node on ROS 2 (#273)") - message = { - "op": "call_service", - "id": "test_node_details_empty", - "service": rosapi_service("node_details"), - "type": rosapi_type("NodeDetails"), - "args": {"node": ""}, - } - response = ws.request(message) - assert response is not None - # Should either fail (result: false) or return empty details - values = response.get("values", {}) - if response.get("result") is not False: - assert not values.get("publishing", []) - assert not values.get("subscribing", []) - - def test_turtlesim_has_expected_topics(self, ws): - """turtlesim details should include well-known topic names.""" - # Find turtlesim in node list first - list_msg = { - "op": "call_service", - "id": "test_neg_list", - "service": rosapi_service("nodes"), - "type": rosapi_type("Nodes"), - "args": {}, - } - list_resp = ws.request(list_msg) - nodes = list_resp["values"].get("nodes", []) - assert any("/turtlesim" in n for n in nodes) - - message = { - "op": "call_service", - "id": "test_neg_turtlesim_topics", - "service": rosapi_service("node_details"), - "type": rosapi_type("NodeDetails"), - "args": {"node": "/turtlesim"}, - } - response = ws.request(message) - values = response["values"] - publishers = values.get("publishing", []) - subscribers = values.get("subscribing", []) - # turtlesim should publish pose and subscribe to cmd_vel - assert any("pose" in p for p in publishers), f"No pose topic in {publishers}" - assert any("cmd_vel" in s for s in subscribers), f"No cmd_vel topic in {subscribers}" + # Find rosbridge node name dynamically (varies by distro) + nodes_result = tools["get_nodes"]() + rosbridge = next((n for n in nodes_result["nodes"] if "rosbridge" in n), None) + assert rosbridge is not None + + result = tools["get_node_details"](node=rosbridge) + assert result["node"] == rosbridge + assert result["service_count"] > 0, "rosbridge should have services" + + def test_detail_counts_match_lists(self, tools): + """publisher_count/subscriber_count/service_count should match list lengths.""" + result = tools["get_node_details"](node="/turtlesim") + assert result["publisher_count"] == len(result["publishers"]) + assert result["subscriber_count"] == len(result["subscribers"]) + assert result["service_count"] == len(result["services"]) + + def test_empty_node_name_returns_error(self, tools): + """get_node_details with empty string should return error dict.""" + result = tools["get_node_details"](node="") + assert "error" in result + + def test_whitespace_node_name_returns_error(self, tools): + """get_node_details with whitespace should return error dict.""" + result = tools["get_node_details"](node=" ") + assert "error" in result From 6d5ea2f9d392777a5e6c3b131636c3f1cb43aa74 Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 19:00:54 -0500 Subject: [PATCH 05/13] style: ruff format long assert lines in test_nodes --- tests/integration/test_nodes.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_nodes.py b/tests/integration/test_nodes.py index 71cc46a6..e83f37f1 100644 --- a/tests/integration/test_nodes.py +++ b/tests/integration/test_nodes.py @@ -38,7 +38,9 @@ def test_includes_rosbridge(self, tools): def test_node_count_at_least_three(self, tools): """Should have at least 3 nodes: turtlesim, rosbridge, rosapi.""" result = tools["get_nodes"]() - assert result["node_count"] >= 3, f"Expected >= 3, got {result['node_count']}: {result['nodes']}" + assert result["node_count"] >= 3, ( + f"Expected >= 3, got {result['node_count']}: {result['nodes']}" + ) def test_all_nodes_are_ros_names(self, tools): """Every node name should be a string starting with /.""" @@ -62,7 +64,9 @@ def test_turtlesim_details(self, tools): assert result["publisher_count"] > 0, "turtlesim should have publishers" assert result["subscriber_count"] > 0, "turtlesim should have subscribers" assert any("pose" in p for p in result["publishers"]), f"No pose in {result['publishers']}" - assert any("cmd_vel" in s for s in result["subscribers"]), f"No cmd_vel in {result['subscribers']}" + assert any("cmd_vel" in s for s in result["subscribers"]), ( + f"No cmd_vel in {result['subscribers']}" + ) def test_rosbridge_has_services(self, tools): """get_node_details for rosbridge should return services.""" From ac802af8d4928cec99fc5d6237987cc10204c26c Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 5 Apr 2026 10:48:20 -0500 Subject: [PATCH 06/13] fix: surface detection failure warning, fall back to ROS 2 types when unknown Address PR #280 review feedback: - connection.py: surface detection failure as warning in return value - rosapi_types.py: get_type() falls back to ROS 2 format instead of crashing - rosapi_types.py: bump distro detection failure log to warning level Co-Authored-By: Claude Opus 4.6 (1M context) --- ros_mcp/tools/connection.py | 13 ++++++++++--- ros_mcp/utils/rosapi_types.py | 16 +++++++++++++--- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/ros_mcp/tools/connection.py b/ros_mcp/tools/connection.py index 76921699..95bbd29f 100644 --- a/ros_mcp/tools/connection.py +++ b/ros_mcp/tools/connection.py @@ -57,17 +57,24 @@ def connect_to_robot( ping_result = ping_ip_and_port(actual_ip, actual_port, ping_timeout, port_timeout) # Detect ROS version and cache rosapi type resolver + detection_warning = None if ping_result.get("port_check", {}).get("open"): try: detect_rosapi_types(ws_manager) - except Exception: - pass # Detection failure is non-fatal; tools will use defaults + except Exception as e: + detection_warning = ( + f"ROS version detection failed: {e}. " + "Tools will assume ROS 2 type format (rosapi_msgs/srv/*)." + ) # Combine the results - return { + result: dict[str, Any] = { "message": f"WebSocket IP set to {actual_ip}:{actual_port}", "connectivity_test": ping_result, } + if detection_warning: + result["warning"] = detection_warning + return result @mcp.tool( description=( diff --git a/ros_mcp/utils/rosapi_types.py b/ros_mcp/utils/rosapi_types.py index 52302cdf..582bdadc 100644 --- a/ros_mcp/utils/rosapi_types.py +++ b/ros_mcp/utils/rosapi_types.py @@ -149,7 +149,7 @@ def detect(self, ws_manager: WebSocketManager) -> None: ) logger.info("ROS 1 distro: '%s'", self._distro) except Exception as e: - logger.debug("ROS 1 distro detection failed (non-fatal): %s", e) + logger.warning("ROS 1 distro detection failed (non-fatal): %s", e) def _reset(self) -> None: """Reset detection state. For testing only.""" @@ -168,8 +168,18 @@ def distro(self) -> str: return self._distro def get_type(self, short_name: str) -> str: - """Return the version-appropriate type string.""" - type_prefix = _TYPE_PREFIX[self.version] + """Return the version-appropriate type string. + + Falls back to ROS 2 format (rosapi_msgs/srv/) when version is unknown. + """ + if self._version is None: + logger.warning( + "ROS version unknown — falling back to ROS 2 type format for '%s'", + short_name, + ) + type_prefix = _TYPE_PREFIX[RosVersion.ROS2] + else: + type_prefix = _TYPE_PREFIX[self._version] return f"{type_prefix}/{short_name}" def get_service(self, service_name: str) -> str: From afe63595110538dd2ca2412e6fc441bf6e44c6c5 Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 19:06:14 -0500 Subject: [PATCH 07/13] refactor(topics): use rosapi_service()/rosapi_type() instead of hardcoded paths --- ros_mcp/tools/topics.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/ros_mcp/tools/topics.py b/ros_mcp/tools/topics.py index 8ded3c2b..354f0a0d 100644 --- a/ros_mcp/tools/topics.py +++ b/ros_mcp/tools/topics.py @@ -11,6 +11,7 @@ from ros_mcp.tools.images import _encode_image_to_imagecontent, convert_expects_image_hint from ros_mcp.utils.response import _check_response, _safe_get_values +from ros_mcp.utils.rosapi_types import rosapi_service, rosapi_type from ros_mcp.utils.websocket import WebSocketManager, parse_input @@ -38,8 +39,8 @@ def get_topics() -> dict: # rosbridge service call to get topic list message = { "op": "call_service", - "service": "/rosapi/topics", - "type": "rosapi/Topics", + "service": rosapi_service("topics"), + "type": rosapi_type("Topics"), "args": {}, "id": "get_topics_request_1", } @@ -87,8 +88,8 @@ def get_topic_type(topic: str) -> dict: # rosbridge service call to get topic type message = { "op": "call_service", - "service": "/rosapi/topic_type", - "type": "rosapi/TopicType", + "service": rosapi_service("topic_type"), + "type": rosapi_type("TopicType"), "args": {"topic": topic}, "id": f"get_topic_type_request_{topic.replace('/', '_')}", } @@ -149,8 +150,8 @@ def get_topic_details(topic: str) -> dict: # Get topic type type_message = { "op": "call_service", - "service": "/rosapi/topic_type", - "type": "rosapi/TopicType", + "service": rosapi_service("topic_type"), + "type": rosapi_type("TopicType"), "args": {"topic": topic}, "id": f"get_topic_type_{topic.replace('/', '_')}", } @@ -163,8 +164,8 @@ def get_topic_details(topic: str) -> dict: # Get publishers for this topic publishers_message = { "op": "call_service", - "service": "/rosapi/publishers", - "type": "rosapi/Publishers", + "service": rosapi_service("publishers"), + "type": rosapi_type("Publishers"), "args": {"topic": topic}, "id": f"get_publishers_{topic.replace('/', '_')}", } @@ -177,8 +178,8 @@ def get_topic_details(topic: str) -> dict: # Get subscribers for this topic subscribers_message = { "op": "call_service", - "service": "/rosapi/subscribers", - "type": "rosapi/Subscribers", + "service": rosapi_service("subscribers"), + "type": rosapi_type("Subscribers"), "args": {"topic": topic}, "id": f"get_subscribers_{topic.replace('/', '_')}", } @@ -226,8 +227,8 @@ def get_message_details(message_type: str) -> dict: # rosbridge service call to get message details message = { "op": "call_service", - "service": "/rosapi/message_details", - "type": "rosapi/MessageDetails", + "service": rosapi_service("message_details"), + "type": rosapi_type("MessageDetails"), "args": {"type": message_type}, "id": f"get_message_details_request_{message_type.replace('/', '_')}", } From 2bc26f19a40fd31b3c75b82e1b5376867416e6b4 Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 19:09:28 -0500 Subject: [PATCH 08/13] test: integration tests for topic tools via MCP tool functions --- tests/integration/test_topics.py | 108 +++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 tests/integration/test_topics.py diff --git a/tests/integration/test_topics.py b/tests/integration/test_topics.py new file mode 100644 index 00000000..14ceb622 --- /dev/null +++ b/tests/integration/test_topics.py @@ -0,0 +1,108 @@ +"""Integration tests for topic tools (step 3). + +These tests call the actual MCP tool functions (get_topics, get_topic_type, +get_topic_details, get_message_details, subscribe_once) against a live +rosbridge container. +""" + +import pytest + +pytestmark = [pytest.mark.integration] + + +class TestGetTopics: + """Verify get_topics MCP tool returns the topic list.""" + + def test_returns_topics(self, tools): + """get_topics should return topics, types, and topic_count.""" + result = tools["get_topics"]() + assert "topics" in result + assert "types" in result + assert "topic_count" in result + assert result["topic_count"] > 0 + assert result["topic_count"] == len(result["topics"]) + assert len(result["types"]) == len(result["topics"]) + + def test_includes_turtle_pose(self, tools): + """turtlesim publishes /turtle1/pose — it should appear in topics.""" + result = tools["get_topics"]() + assert any("/turtle1/pose" in t for t in result["topics"]), ( + f"/turtle1/pose not in {result['topics']}" + ) + + def test_includes_cmd_vel(self, tools): + """turtlesim subscribes to /turtle1/cmd_vel — it should appear in topics.""" + result = tools["get_topics"]() + assert any("cmd_vel" in t for t in result["topics"]), ( + f"cmd_vel not in {result['topics']}" + ) + + +class TestGetTopicType: + """Verify get_topic_type MCP tool returns the message type.""" + + def test_turtle_pose_type(self, tools): + """get_topic_type for /turtle1/pose should return turtlesim/Pose.""" + result = tools["get_topic_type"](topic="/turtle1/pose") + assert "type" in result + assert "Pose" in result["type"] + + def test_empty_topic_returns_error(self, tools): + """get_topic_type with empty string should return error.""" + result = tools["get_topic_type"](topic="") + assert "error" in result + + +class TestGetTopicDetails: + """Verify get_topic_details MCP tool returns type + publishers + subscribers.""" + + def test_pose_details(self, tools): + """get_topic_details for /turtle1/pose should have a publisher (turtlesim).""" + result = tools["get_topic_details"](topic="/turtle1/pose") + assert result["topic"] == "/turtle1/pose" + assert "Pose" in result["type"] + assert result["publisher_count"] > 0 + + def test_empty_topic_returns_error(self, tools): + """get_topic_details with empty string should return error.""" + result = tools["get_topic_details"](topic="") + assert "error" in result + + +class TestGetMessageDetails: + """Verify get_message_details MCP tool returns message structure.""" + + def test_twist_structure(self, tools): + """get_message_details for geometry_msgs/Twist should return fields.""" + result = tools["get_message_details"](message_type="geometry_msgs/Twist") + assert "structure" in result, f"Unexpected result: {result}" + assert len(result["structure"]) > 0 + + def test_empty_type_returns_error(self, tools): + """get_message_details with empty string should return error.""" + result = tools["get_message_details"](message_type="") + assert "error" in result + + +class TestSubscribeOnce: + """Verify subscribe_once MCP tool receives a message from a live topic.""" + + def test_subscribe_to_pose(self, tools): + """subscribe_once on /turtle1/pose should receive a Pose message.""" + # Get the actual type from get_topic_type (differs between ROS 1 and 2) + type_result = tools["get_topic_type"](topic="/turtle1/pose") + pose_type = type_result["type"] + result = tools["subscribe_once"]( + topic="/turtle1/pose", + msg_type=pose_type, + timeout=5.0, + ) + assert "msg" in result, f"subscribe_once failed: {result}" + msg = result["msg"] + assert "x" in msg + assert "y" in msg + + def test_missing_args_returns_error(self, tools): + """subscribe_once without topic/msg_type should return error.""" + result = tools["subscribe_once"]() + assert "error" in result From ae26bc6b8eea808b749a50777212c17d616e7f5c Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 19:13:49 -0500 Subject: [PATCH 09/13] ci: add per-module test runner scripts and module filter to run-tests.sh run-tests.sh now accepts optional second arg: distro + module e.g. ./run-tests.sh humble topics Per-module wrappers: run-connection-tests.sh, run-detect_version-tests.sh, run-nodes-tests.sh, run-topics-tests.sh --- .../scripts/run-connection-tests.sh | 4 ++++ .../scripts/run-detect_version-tests.sh | 4 ++++ tests/integration/scripts/run-nodes-tests.sh | 4 ++++ tests/integration/scripts/run-tests.sh | 18 +++++++++++++++--- tests/integration/scripts/run-topics-tests.sh | 4 ++++ 5 files changed, 31 insertions(+), 3 deletions(-) create mode 100755 tests/integration/scripts/run-connection-tests.sh create mode 100755 tests/integration/scripts/run-detect_version-tests.sh create mode 100755 tests/integration/scripts/run-nodes-tests.sh create mode 100755 tests/integration/scripts/run-topics-tests.sh diff --git a/tests/integration/scripts/run-connection-tests.sh b/tests/integration/scripts/run-connection-tests.sh new file mode 100755 index 00000000..454ae3e8 --- /dev/null +++ b/tests/integration/scripts/run-connection-tests.sh @@ -0,0 +1,4 @@ +#!/bin/bash +# Run connection integration tests against a specific ROS distro. +# Usage: ./tests/integration/scripts/run-connection-tests.sh +exec "$(dirname "$0")/run-tests.sh" "${1:?Usage: $0 }" connection diff --git a/tests/integration/scripts/run-detect_version-tests.sh b/tests/integration/scripts/run-detect_version-tests.sh new file mode 100755 index 00000000..a2f72204 --- /dev/null +++ b/tests/integration/scripts/run-detect_version-tests.sh @@ -0,0 +1,4 @@ +#!/bin/bash +# Run detect_version integration tests against a specific ROS distro. +# Usage: ./tests/integration/scripts/run-detect_version-tests.sh +exec "$(dirname "$0")/run-tests.sh" "${1:?Usage: $0 }" detect_version diff --git a/tests/integration/scripts/run-nodes-tests.sh b/tests/integration/scripts/run-nodes-tests.sh new file mode 100755 index 00000000..878a976c --- /dev/null +++ b/tests/integration/scripts/run-nodes-tests.sh @@ -0,0 +1,4 @@ +#!/bin/bash +# Run nodes integration tests against a specific ROS distro. +# Usage: ./tests/integration/scripts/run-nodes-tests.sh +exec "$(dirname "$0")/run-tests.sh" "${1:?Usage: $0 }" nodes diff --git a/tests/integration/scripts/run-tests.sh b/tests/integration/scripts/run-tests.sh index d7f726d0..cd143118 100755 --- a/tests/integration/scripts/run-tests.sh +++ b/tests/integration/scripts/run-tests.sh @@ -1,12 +1,14 @@ #!/bin/bash # Run integration tests against a specific ROS distro. -# Usage: ./tests/integration/scripts/run-tests.sh +# Usage: ./tests/integration/scripts/run-tests.sh [module] # Example: ./tests/integration/scripts/run-tests.sh noetic +# Example: ./tests/integration/scripts/run-tests.sh humble topics set -e cd "$(git rev-parse --show-toplevel)" -DISTRO="${1:?Usage: $0 }" +DISTRO="${1:?Usage: $0 [module]}" +MODULE="${2:-}" COMPOSE="tests/integration/docker-compose.yml" declare -A DOCKERFILES=( @@ -54,7 +56,17 @@ uv run python tests/integration/test_quick_detect.py # Run pytest echo "" echo "--- Pytest ---" -uv run pytest tests/integration/ -v --ros-distro "$DISTRO" --skip-compose +if [ -n "$MODULE" ]; then + TEST_PATH="tests/integration/test_${MODULE}.py" + if [ ! -f "$TEST_PATH" ]; then + echo "Unknown module: $MODULE" + echo "Available: $(ls tests/integration/test_*.py | sed 's|tests/integration/test_||;s|\.py||' | tr '\n' ' ')" + exit 1 + fi + uv run pytest "$TEST_PATH" -v --ros-distro "$DISTRO" --skip-compose +else + uv run pytest tests/integration/ -v --ros-distro "$DISTRO" --skip-compose +fi # Tear down echo "" diff --git a/tests/integration/scripts/run-topics-tests.sh b/tests/integration/scripts/run-topics-tests.sh new file mode 100755 index 00000000..ceeed177 --- /dev/null +++ b/tests/integration/scripts/run-topics-tests.sh @@ -0,0 +1,4 @@ +#!/bin/bash +# Run topics integration tests against a specific ROS distro. +# Usage: ./tests/integration/scripts/run-topics-tests.sh +exec "$(dirname "$0")/run-tests.sh" "${1:?Usage: $0 }" topics From 8a0411197cf76420d25904c0e5660de855917587 Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 19:16:44 -0500 Subject: [PATCH 10/13] ci: rename run-detect_version-tests.sh to run-detect-version-tests.sh --- ...{run-detect_version-tests.sh => run-detect-version-tests.sh} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename tests/integration/scripts/{run-detect_version-tests.sh => run-detect-version-tests.sh} (74%) diff --git a/tests/integration/scripts/run-detect_version-tests.sh b/tests/integration/scripts/run-detect-version-tests.sh similarity index 74% rename from tests/integration/scripts/run-detect_version-tests.sh rename to tests/integration/scripts/run-detect-version-tests.sh index a2f72204..edb1c67a 100755 --- a/tests/integration/scripts/run-detect_version-tests.sh +++ b/tests/integration/scripts/run-detect-version-tests.sh @@ -1,4 +1,4 @@ #!/bin/bash # Run detect_version integration tests against a specific ROS distro. -# Usage: ./tests/integration/scripts/run-detect_version-tests.sh +# Usage: ./tests/integration/scripts/run-detect-version-tests.sh exec "$(dirname "$0")/run-tests.sh" "${1:?Usage: $0 }" detect_version From e8ee53e3ba2b1fc61fce8df0d11ff39cc4552e5e Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 19:18:17 -0500 Subject: [PATCH 11/13] ci: show available distros and modules when run-tests.sh called without args --- tests/integration/scripts/run-tests.sh | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/integration/scripts/run-tests.sh b/tests/integration/scripts/run-tests.sh index cd143118..65528eed 100755 --- a/tests/integration/scripts/run-tests.sh +++ b/tests/integration/scripts/run-tests.sh @@ -7,7 +7,18 @@ set -e cd "$(git rev-parse --show-toplevel)" -DISTRO="${1:?Usage: $0 [module]}" +if [ -z "${1:-}" ]; then + MODULES=$(ls tests/integration/test_*.py 2>/dev/null \ + | sed 's|tests/integration/test_||;s|\.py||' \ + | grep -v quick_detect \ + | tr '\n' ', ' | sed 's/,$//') + echo "Usage: $0 [module]" + echo "Distros: melodic, noetic, humble, jazzy" + echo "Modules: $MODULES" + exit 1 +fi + +DISTRO="$1" MODULE="${2:-}" COMPOSE="tests/integration/docker-compose.yml" From 91f0fbc87b9fcc0973bb27acbdb6814748b302c8 Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 22 Mar 2026 19:38:57 -0500 Subject: [PATCH 12/13] test: add publish/subscribe tests, fix type lookup for all distros - Add tests for subscribe_for_duration, publish_once, publish_for_durations - Add end-to-end test: publish cmd_vel, verify turtle moves via pose - Extract _get_type() helper to guard against missing type responses - Relax published_count assertion (rosbridge timing varies by distro) - All 8 topic tools now tested (20 tests) --- tests/integration/test_topics.py | 142 +++++++++++++++++++++++++++++-- 1 file changed, 133 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_topics.py b/tests/integration/test_topics.py index 14ceb622..cba2c13d 100644 --- a/tests/integration/test_topics.py +++ b/tests/integration/test_topics.py @@ -1,15 +1,24 @@ -"""Integration tests for topic tools (step 3). +"""Integration tests for topic tools. These tests call the actual MCP tool functions (get_topics, get_topic_type, -get_topic_details, get_message_details, subscribe_once) against a live -rosbridge container. +get_topic_details, get_message_details, subscribe_once, subscribe_for_duration, +publish_once, publish_for_durations) against a live rosbridge container. """ +import time + import pytest pytestmark = [pytest.mark.integration] +def _get_type(tools, topic): + """Get the msg type for a topic, failing the test with a clear message if not found.""" + result = tools["get_topic_type"](topic=topic) + assert "type" in result, f"get_topic_type failed for {topic}: {result}" + return result["type"] + + class TestGetTopics: """Verify get_topics MCP tool returns the topic list.""" @@ -33,9 +42,7 @@ def test_includes_turtle_pose(self, tools): def test_includes_cmd_vel(self, tools): """turtlesim subscribes to /turtle1/cmd_vel — it should appear in topics.""" result = tools["get_topics"]() - assert any("cmd_vel" in t for t in result["topics"]), ( - f"cmd_vel not in {result['topics']}" - ) + assert any("cmd_vel" in t for t in result["topics"]), f"cmd_vel not in {result['topics']}" class TestGetTopicType: @@ -89,9 +96,7 @@ class TestSubscribeOnce: def test_subscribe_to_pose(self, tools): """subscribe_once on /turtle1/pose should receive a Pose message.""" - # Get the actual type from get_topic_type (differs between ROS 1 and 2) - type_result = tools["get_topic_type"](topic="/turtle1/pose") - pose_type = type_result["type"] + pose_type = _get_type(tools, "/turtle1/pose") result = tools["subscribe_once"]( topic="/turtle1/pose", msg_type=pose_type, @@ -106,3 +111,122 @@ def test_missing_args_returns_error(self, tools): """subscribe_once without topic/msg_type should return error.""" result = tools["subscribe_once"]() assert "error" in result + + +class TestSubscribeForDuration: + """Verify subscribe_for_duration MCP tool collects messages over time.""" + + def test_collect_pose_messages(self, tools): + """subscribe_for_duration on /turtle1/pose should collect multiple messages.""" + pose_type = _get_type(tools, "/turtle1/pose") + result = tools["subscribe_for_duration"]( + topic="/turtle1/pose", + msg_type=pose_type, + duration=2.0, + max_messages=5, + ) + assert "messages" in result, f"subscribe_for_duration failed: {result}" + assert result["collected_count"] > 0 + assert result["topic"] == "/turtle1/pose" + + def test_missing_args_returns_error(self, tools): + """subscribe_for_duration without topic/msg_type should return error.""" + result = tools["subscribe_for_duration"]() + assert "error" in result + + +class TestPublishOnce: + """Verify publish_once MCP tool sends a message.""" + + def test_publish_cmd_vel(self, tools): + """publish_once should successfully publish a Twist to /turtle1/cmd_vel.""" + cmd_vel_type = _get_type(tools, "/turtle1/cmd_vel") + result = tools["publish_once"]( + topic="/turtle1/cmd_vel", + msg_type=cmd_vel_type, + msg={"linear": {"x": 0.1, "y": 0.0, "z": 0.0}}, + ) + assert result.get("success") is True, f"publish_once failed: {result}" + + def test_empty_msg_returns_error(self, tools): + """publish_once with empty msg should return error.""" + result = tools["publish_once"]( + topic="/turtle1/cmd_vel", + msg_type="geometry_msgs/Twist", + msg={}, + ) + assert "error" in result + + def test_missing_topic_returns_error(self, tools): + """publish_once without topic should return error.""" + result = tools["publish_once"]() + assert "error" in result + + +class TestPublishForDurations: + """Verify publish_for_durations MCP tool sends a sequence of messages.""" + + def test_publish_sequence(self, tools): + """publish_for_durations should publish a message sequence.""" + cmd_vel_type = _get_type(tools, "/turtle1/cmd_vel") + result = tools["publish_for_durations"]( + topic="/turtle1/cmd_vel", + msg_type=cmd_vel_type, + messages=[{"linear": {"x": 0.1}}, {"linear": {"x": 0.0}}], + durations=[0.5, 0.1], + ) + assert result.get("success") is True, f"publish_for_durations failed: {result}" + # published_count may be < total on some distros due to rosbridge + # response timing in non-streaming mode (rate_hz=0) + assert result["published_count"] >= 1 + + def test_empty_sequence(self, tools): + """publish_for_durations with empty lists should succeed with 0 published.""" + cmd_vel_type = _get_type(tools, "/turtle1/cmd_vel") + result = tools["publish_for_durations"]( + topic="/turtle1/cmd_vel", + msg_type=cmd_vel_type, + messages=[], + durations=[], + ) + assert result.get("success") is True + assert result["published_count"] == 0 + + def test_missing_args_returns_error(self, tools): + """publish_for_durations without topic/msg_type should return error.""" + result = tools["publish_for_durations"]() + assert "error" in result + + +class TestPublishAndSubscribe: + """End-to-end: publish cmd_vel and verify turtle moves via pose subscription.""" + + def test_turtle_moves_after_publish(self, tools): + """Publish a velocity, then check that the turtle's position changed.""" + pose_type = _get_type(tools, "/turtle1/pose") + + # Get initial pose + before = tools["subscribe_once"](topic="/turtle1/pose", msg_type=pose_type, timeout=5.0) + assert "msg" in before, f"Failed to get initial pose: {before}" + x_before = before["msg"]["x"] + + # Publish forward velocity + cmd_vel_type = _get_type(tools, "/turtle1/cmd_vel") + pub_result = tools["publish_for_durations"]( + topic="/turtle1/cmd_vel", + msg_type=cmd_vel_type, + messages=[{"linear": {"x": 1.0, "y": 0.0, "z": 0.0}}], + durations=[1.0], + rate_hz=10, + ) + assert pub_result.get("success") is True + + # Wait briefly for physics to update + time.sleep(0.5) + + # Get new pose + after = tools["subscribe_once"](topic="/turtle1/pose", msg_type=pose_type, timeout=5.0) + assert "msg" in after, f"Failed to get new pose: {after}" + x_after = after["msg"]["x"] + + assert x_after != x_before, f"Turtle did not move: x_before={x_before}, x_after={x_after}" From e635108dafde5f46c68b54cc3554f194a34b3b78 Mon Sep 17 00:00:00 2001 From: Stefano Dalla Gasperina Date: Sun, 5 Apr 2026 11:07:51 -0500 Subject: [PATCH 13/13] fix: correct subscribe_for_duration test to handle ToolResult object --- tests/integration/test_topics.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_topics.py b/tests/integration/test_topics.py index cba2c13d..5600e949 100644 --- a/tests/integration/test_topics.py +++ b/tests/integration/test_topics.py @@ -5,6 +5,7 @@ publish_once, publish_for_durations) against a live rosbridge container. """ +import json import time import pytest @@ -125,9 +126,11 @@ def test_collect_pose_messages(self, tools): duration=2.0, max_messages=5, ) - assert "messages" in result, f"subscribe_for_duration failed: {result}" - assert result["collected_count"] > 0 - assert result["topic"] == "/turtle1/pose" + # subscribe_for_duration returns a ToolResult; summary is in content[0].text + assert hasattr(result, "content"), f"subscribe_for_duration failed: {result}" + summary = json.loads(result.content[0].text) + assert summary["collected_count"] > 0 + assert summary["topic"] == "/turtle1/pose" def test_missing_args_returns_error(self, tools): """subscribe_for_duration without topic/msg_type should return error."""