diff --git a/ros_mcp/tools/actions.py b/ros_mcp/tools/actions.py
index be103263..81ab182c 100644
--- a/ros_mcp/tools/actions.py
+++ b/ros_mcp/tools/actions.py
@@ -1,6 +1,5 @@
"""Action tools for ROS MCP."""
-import asyncio
import json
import time
import uuid
@@ -8,9 +7,70 @@
from fastmcp import Context, FastMCP
from mcp.types import ToolAnnotations
-from ros_mcp.utils.response import _safe_get_values
+from ros_mcp.utils.response import _check_response, _safe_get_values
+from ros_mcp.utils.rosapi_types import rosapi_service, rosapi_type
from ros_mcp.utils.websocket import WebSocketManager
+# Action detail parts: result key → (rosapi service name, rosapi type name)
+_ACTION_PARTS = {
+ "goal": ("action_goal_details", "ActionGoalDetails"),
+ "result": ("action_result_details", "ActionResultDetails"),
+ "feedback": ("action_feedback_details", "ActionFeedbackDetails"),
+}
+
+
+def _parse_typedef(typedef: dict) -> dict:
+ """Parse a rosapi typedef into a structured field description."""
+ field_names = typedef.get("fieldnames", [])
+ field_types = typedef.get("fieldtypes", [])
+ field_array_len = typedef.get("fieldarraylen", [])
+ examples = typedef.get("examples", [])
+ const_names = typedef.get("constnames", [])
+ const_values = typedef.get("constvalues", [])
+
+ fields = {}
+ field_details = {}
+ for i, (name, ftype) in enumerate(zip(field_names, field_types)):
+ fields[name] = ftype
+ field_details[name] = {
+ "type": ftype,
+ "array_length": field_array_len[i] if i < len(field_array_len) else -1,
+ "example": examples[i] if i < len(examples) else None,
+ }
+
+ return {
+ "fields": fields,
+ "field_count": len(fields),
+ "field_details": field_details,
+ "message_type": typedef.get("type", ""),
+ "examples": examples,
+ "constants": dict(zip(const_names, const_values)) if const_names else {},
+ }
+
+
+def _fetch_action_part(ws_manager: WebSocketManager, action_type: str, part: str) -> dict:
+ """Fetch one part (goal/result/feedback) of an action definition."""
+ service, type_name = _ACTION_PARTS[part]
+ message = {
+ "op": "call_service",
+ "service": rosapi_service(service),
+ "type": rosapi_type(type_name),
+ "args": {"type": action_type},
+ "id": f"get_action_{part}_{action_type.replace('/', '_')}",
+ }
+ response = ws_manager.request(message)
+
+ error = _check_response(response)
+ if error:
+ return {}
+
+ values = _safe_get_values(response)
+ if values is not None:
+ typedefs = values.get("typedefs", [])
+ if typedefs:
+ return _parse_typedef(typedefs[0])
+ return {}
+
def register_action_tools(
mcp: FastMCP,
@@ -19,9 +79,7 @@ def register_action_tools(
"""Register all action-related tools."""
@mcp.tool(
- description=(
- "Get list of all available ROS actions. Works only with ROS 2.\nExample:\nget_actions()"
- ),
+ description=("Get list of all available ROS actions.\nExample:\nget_actions()"),
annotations=ToolAnnotations(
title="Get Actions",
readOnlyHint=True,
@@ -29,88 +87,36 @@ def register_action_tools(
)
def get_actions() -> dict:
"""
- Get list of all available ROS actions. Works only with ROS 2.
+ Get list of all available ROS actions.
Returns:
dict: Contains list of all active actions,
or a message string if no actions are found.
"""
- # Check if required service is available
- required_services = ["/rosapi/action_servers"]
-
- with ws_manager:
- # Get available services to check compatibility
- services_message = {
- "op": "call_service",
- "service": "/rosapi/services",
- "type": "rosapi/Services",
- "args": {},
- "id": "check_services_for_get_actions",
- }
-
- services_response = ws_manager.request(services_message)
- if not services_response or not isinstance(services_response, dict):
- return {
- "warning": "Cannot check service availability",
- "compatibility": {
- "issue": "Cannot determine available services",
- "required_services": required_services,
- "suggestion": "Ensure rosbridge is running and rosapi is available",
- },
- }
-
- svc_values = _safe_get_values(services_response)
- available_services = svc_values.get("services", []) if svc_values else []
- missing_services = [svc for svc in required_services if svc not in available_services]
-
- if missing_services:
- return {
- "warning": "Action listing not supported by this rosbridge/rosapi version",
- "compatibility": {
- "issue": "Required action services are not available",
- "missing_services": missing_services,
- "required_services": required_services,
- "available_services": [s for s in available_services if "action" in s],
- "suggestion": "This rosbridge version doesn't support action listing services",
- },
- }
-
- # rosbridge service call to get action list
message = {
"op": "call_service",
- "service": "/rosapi/action_servers",
- "type": "rosapi/ActionServers",
+ "service": rosapi_service("action_servers"),
+ "type": rosapi_type("ActionServers"),
"args": {},
"id": "get_actions_request_1",
}
- # Request action list from rosbridge
with ws_manager:
response = ws_manager.request(message)
- # Handle error responses from ws_manager
- if response and "error" in response:
- return {"error": f"WebSocket error: {response['error']}"}
+ error = _check_response(response)
+ if error:
+ return error
- # Check for service response errors first
- if response and "result" in response and not response["result"]:
- # Service call failed - return error with details from values
- if "values" in response and isinstance(response["values"], dict):
- error_msg = response["values"].get("message", "Service call failed")
- else:
- error_msg = "Service call failed"
- return {"error": f"Service call failed: {error_msg}"}
-
- # Return action info if present
values = _safe_get_values(response)
if values is not None:
actions = values.get("action_servers", [])
return {"actions": actions, "action_count": len(actions)}
- return {"warning": "No actions found or /rosapi/action_servers service not available"}
+ return {"warning": "No actions found"}
@mcp.tool(
description=(
- "Get complete action details including type, goal, result, and feedback structures. Works only with ROS 2.\n"
+ "Get complete action details including type, goal, result, and feedback structures.\n"
"Example:\nget_action_details('/turtle1/rotate_absolute')"
),
annotations=ToolAnnotations(
@@ -118,308 +124,63 @@ def get_actions() -> dict:
readOnlyHint=True,
),
)
- def get_action_details(action: str) -> dict:
+ def get_action_details(action: str, action_type: str = "") -> dict:
"""
- Get complete action details including type, goal, result, and feedback structures. Works only with ROS 2.
+ Get complete action details including type, goal, result, and feedback structures.
Args:
action (str): The action name (e.g., '/turtle1/rotate_absolute')
+ action_type (str): The action type (e.g., 'turtlesim/action/RotateAbsolute').
+ Required. Use get_actions() to discover available action types.
Returns:
dict: Contains complete action definition with type, goal, result, and feedback structures.
"""
- # Validate input
if not action or not action.strip():
return {"error": "Action name cannot be empty"}
- # First, get the action type
- action_type = "unknown"
- action_interfaces = [] # Initialize before if/else block
-
- # Check if required service is available
- required_services = ["/rosapi/interfaces"]
-
- with ws_manager:
- # Get available services to check compatibility
- services_message = {
- "op": "call_service",
- "service": "/rosapi/services",
- "type": "rosapi/Services",
- "args": {},
- "id": "check_services_for_get_action_type",
- }
-
- services_response = ws_manager.request(services_message)
- if not services_response or not isinstance(services_response, dict):
- return {
- "warning": "Cannot check service availability",
- "action": action,
- "compatibility": {
- "issue": "Cannot determine available services",
- "required_services": required_services,
- "suggestion": "Ensure rosbridge is running and rosapi is available",
- },
- }
-
- svc_values = _safe_get_values(services_response)
- available_services = svc_values.get("services", []) if svc_values else []
- missing_services = [svc for svc in required_services if svc not in available_services]
-
- if missing_services:
- return {
- "warning": "Action type resolution not supported by this rosbridge/rosapi version",
- "action": action,
- "compatibility": {
- "issue": "Required services are not available",
- "missing_services": missing_services,
- "required_services": required_services,
- "available_services": [s for s in available_services if "interface" in s],
- "suggestion": "This rosbridge version doesn't support interface listing services",
- },
- }
-
- # Known action type mappings
- action_type_map = {
- "/turtle1/rotate_absolute": "turtlesim/action/RotateAbsolute",
- # Add more mappings as needed
- }
-
- # Check if it's a known action
- if action in action_type_map:
- action_type = action_type_map[action]
- else:
- # For unknown actions, try to derive the type from interfaces list
+ if not action_type or not action_type.strip():
+ # List available action types so the caller can provide one
interfaces_message = {
"op": "call_service",
- "service": "/rosapi/interfaces",
- "type": "rosapi/Interfaces",
+ "service": rosapi_service("interfaces"),
+ "type": rosapi_type("Interfaces"),
"args": {},
- "id": f"get_interfaces_for_action_{action.replace('/', '_')}",
+ "id": f"get_interfaces_for_{action.replace('/', '_')}",
}
-
with ws_manager:
interfaces_response = ws_manager.request(interfaces_message)
+ available = []
iface_values = _safe_get_values(interfaces_response)
if iface_values is not None:
- interfaces = iface_values.get("interfaces", [])
- # Look for action interfaces that might match
- action_interfaces = [iface for iface in interfaces if "/action/" in iface]
- # Try to match based on action name patterns
- action_name_part = action.split("/")[-1] # Get last part (e.g., "rotate_absolute")
- for iface in action_interfaces:
- if action_name_part.lower() in iface.lower():
- action_type = iface
- break
-
- if action_type == "unknown":
+ available = [i for i in iface_values.get("interfaces", []) if "/action/" in i]
+
return {
- "error": f"Action type for {action} not found",
+ "error": f"action_type is required for {action}",
"action": action,
- "available_action_types": action_interfaces,
- "suggestion": "This action might not be available or use a different naming pattern",
+ "available_action_types": available,
}
- # Now get action details using the action type
- result = {
- "action": action,
- "action_type": action_type,
- "goal": {},
- "result": {},
- "feedback": {},
- }
-
- # Check if required action detail services are available
- required_detail_services = [
- "/rosapi/action_goal_details",
- "/rosapi/action_result_details",
- "/rosapi/action_feedback_details",
- ]
-
+ # Fetch goal, result, and feedback structures
with ws_manager:
- # Get available services to check compatibility
- services_message = {
- "op": "call_service",
- "service": "/rosapi/services",
- "type": "rosapi/Services",
- "args": {},
- "id": "check_services_for_action_details",
+ parts = {
+ part: _fetch_action_part(ws_manager, action_type, part) for part in _ACTION_PARTS
}
- services_response = ws_manager.request(services_message)
- if not services_response or not isinstance(services_response, dict):
- return {
- "error": "Failed to check service availability",
- "action": action,
- "action_type": action_type,
- "compatibility": {
- "issue": "Cannot determine available services",
- "required_services": required_detail_services,
- "suggestion": "Ensure rosbridge is running and rosapi is available",
- },
- }
-
- svc_values = _safe_get_values(services_response)
- available_services = svc_values.get("services", []) if svc_values else []
- missing_services = [
- svc for svc in required_detail_services if svc not in available_services
- ]
-
- if missing_services:
- # Return what we have (action type) even if details aren't available
- return {
- "action": action,
- "action_type": action_type,
- "goal": {},
- "result": {},
- "feedback": {},
- "compatibility": {
- "issue": "Required action detail services are not available",
- "missing_services": missing_services,
- "required_services": required_detail_services,
- "available_services": [s for s in available_services if "action" in s],
- "note": "Action type found, but detailed structures are not available",
- },
- }
-
- # Get goal, result, and feedback details
- # Get goal details using action-specific service
- goal_message = {
- "op": "call_service",
- "service": "/rosapi/action_goal_details",
- "type": "rosapi_msgs/srv/ActionGoalDetails",
- "args": {"type": action_type},
- "id": f"get_action_goal_details_{action_type.replace('/', '_')}",
- }
-
- goal_response = ws_manager.request(goal_message)
- goal_values = _safe_get_values(goal_response)
- if goal_values is not None and "error" not in goal_response:
- typedefs = goal_values.get("typedefs", [])
- if typedefs:
- for typedef in typedefs:
- field_names = typedef.get("fieldnames", [])
- field_types = typedef.get("fieldtypes", [])
- field_array_len = typedef.get("fieldarraylen", [])
- examples = typedef.get("examples", [])
- const_names = typedef.get("constnames", [])
- const_values = typedef.get("constvalues", [])
-
- fields = {}
- field_details = {}
- for i, (name, ftype) in enumerate(zip(field_names, field_types)):
- fields[name] = ftype
- field_details[name] = {
- "type": ftype,
- "array_length": field_array_len[i] if i < len(field_array_len) else -1,
- "example": examples[i] if i < len(examples) else None,
- }
-
- result["goal"] = {
- "fields": fields,
- "field_count": len(fields),
- "field_details": field_details,
- "message_type": typedef.get("type", ""),
- "examples": examples,
- "constants": dict(zip(const_names, const_values)) if const_names else {},
- }
-
- # Get result details using action-specific service
- result_message = {
- "op": "call_service",
- "service": "/rosapi/action_result_details",
- "type": "rosapi_msgs/srv/ActionResultDetails",
- "args": {"type": action_type},
- "id": f"get_action_result_details_{action_type.replace('/', '_')}",
- }
-
- result_response = ws_manager.request(result_message)
- res_values = _safe_get_values(result_response)
- if res_values is not None and "error" not in result_response:
- typedefs = res_values.get("typedefs", [])
- if typedefs:
- for typedef in typedefs:
- field_names = typedef.get("fieldnames", [])
- field_types = typedef.get("fieldtypes", [])
- field_array_len = typedef.get("fieldarraylen", [])
- examples = typedef.get("examples", [])
- const_names = typedef.get("constnames", [])
- const_values = typedef.get("constvalues", [])
-
- fields = {}
- field_details = {}
- for i, (name, ftype) in enumerate(zip(field_names, field_types)):
- fields[name] = ftype
- field_details[name] = {
- "type": ftype,
- "array_length": field_array_len[i] if i < len(field_array_len) else -1,
- "example": examples[i] if i < len(examples) else None,
- }
-
- result["result"] = {
- "fields": fields,
- "field_count": len(fields),
- "field_details": field_details,
- "message_type": typedef.get("type", ""),
- "examples": examples,
- "constants": dict(zip(const_names, const_values)) if const_names else {},
- }
-
- # Get feedback details using action-specific service
- feedback_message = {
- "op": "call_service",
- "service": "/rosapi/action_feedback_details",
- "type": "rosapi_msgs/srv/ActionFeedbackDetails",
- "args": {"type": action_type},
- "id": f"get_action_feedback_details_{action_type.replace('/', '_')}",
- }
-
- feedback_response = ws_manager.request(feedback_message)
- fb_values = _safe_get_values(feedback_response)
- if fb_values is not None and "error" not in feedback_response:
- typedefs = fb_values.get("typedefs", [])
- if typedefs:
- for typedef in typedefs:
- field_names = typedef.get("fieldnames", [])
- field_types = typedef.get("fieldtypes", [])
- field_array_len = typedef.get("fieldarraylen", [])
- examples = typedef.get("examples", [])
- const_names = typedef.get("constnames", [])
- const_values = typedef.get("constvalues", [])
-
- fields = {}
- field_details = {}
- for i, (name, ftype) in enumerate(zip(field_names, field_types)):
- fields[name] = ftype
- field_details[name] = {
- "type": ftype,
- "array_length": field_array_len[i] if i < len(field_array_len) else -1,
- "example": examples[i] if i < len(examples) else None,
- }
-
- result["feedback"] = {
- "fields": fields,
- "field_count": len(fields),
- "field_details": field_details,
- "message_type": typedef.get("type", ""),
- "examples": examples,
- "constants": dict(zip(const_names, const_values)) if const_names else {},
- }
-
- # Check if we got any data
- if not result["goal"] and not result["result"] and not result["feedback"]:
+ if not any(parts.values()):
return {
+ "error": f"Action type {action_type} found but has no definition",
"action": action,
"action_type": action_type,
- "error": f"Action type {action_type} found but has no definition",
}
- return result
+ return {"action": action, "action_type": action_type, **parts}
@mcp.tool(
description=(
"Get action status for a specific action name. Works only with ROS 2.\n"
- "Example:\nget_action_status('/fibonacci')"
+ "Example:\nget_action_status('/turtle1/rotate_absolute')"
),
annotations=ToolAnnotations(
title="Get Action Status",
@@ -431,112 +192,96 @@ def get_action_status(action_name: str) -> dict:
Get action status for a specific action name. Works only with ROS 2.
Args:
- action_name (str): The action name (e.g., '/fibonacci')
+ action_name (str): The action name (e.g., '/turtle1/rotate_absolute')
Returns:
dict: Contains action status information including active goals and their status.
"""
- # Validate input
if not action_name or not action_name.strip():
return {"error": "Action name cannot be empty"}
- # Ensure action name starts with /
if not action_name.startswith("/"):
action_name = f"/{action_name}"
- # Try to get action status by subscribing to the status topic
status_topic = f"{action_name}/_action/status"
status_msg_type = "action_msgs/msg/GoalStatusArray"
+ status_map = {
+ 0: "STATUS_UNKNOWN",
+ 1: "STATUS_ACCEPTED",
+ 2: "STATUS_EXECUTING",
+ 3: "STATUS_CANCELING",
+ 4: "STATUS_SUCCEEDED",
+ 5: "STATUS_CANCELED",
+ 6: "STATUS_ABORTED",
+ }
+
try:
- # Subscribe to action status topic
with ws_manager:
- message = {
- "op": "subscribe",
- "topic": status_topic,
- "type": status_msg_type,
- "id": f"get_action_status_{action_name.replace('/', '_')}",
- }
-
- send_error = ws_manager.send(message)
+ send_error = ws_manager.send(
+ {
+ "op": "subscribe",
+ "topic": status_topic,
+ "type": status_msg_type,
+ "id": f"get_action_status_{action_name.replace('/', '_')}",
+ }
+ )
if send_error:
return {
- "action_name": action_name,
- "success": False,
"error": f"Failed to subscribe to status topic: {send_error}",
+ "action_name": action_name,
}
- # Wait for status message
response = ws_manager.receive(timeout=3.0)
+
+ # Unsubscribe regardless of result
+ ws_manager.send({"op": "unsubscribe", "topic": status_topic})
+
if not response:
return {
- "action_name": action_name,
- "success": False,
"error": "No response from action status topic",
+ "action_name": action_name,
}
response_data = json.loads(response)
if response_data.get("op") == "status" and response_data.get("level") == "error":
return {
- "error": f"Action status error: {response_data.get('msg', 'Unknown error')}"
+ "error": f"Action status error: {response_data.get('msg', 'Unknown error')}",
+ "action_name": action_name,
}
- if "msg" not in response_data or "status_list" not in response_data["msg"]:
+ if "msg" not in response_data or "status_list" not in response_data.get("msg", {}):
return {
"action_name": action_name,
- "success": True,
"active_goals": [],
"goal_count": 0,
- "note": f"No active goals found for action {action_name}",
}
- status_list = response_data["msg"]["status_list"]
- status_map = {
- 0: "STATUS_UNKNOWN",
- 1: "STATUS_ACCEPTED",
- 2: "STATUS_EXECUTING",
- 3: "STATUS_CANCELING",
- 4: "STATUS_SUCCEEDED",
- 5: "STATUS_CANCELED",
- 6: "STATUS_ABORTED",
- }
-
active_goals = []
- for status_item in status_list:
- goal_info = status_item.get("goal_info", {})
- goal_id = goal_info.get("goal_id", {}).get("uuid", "unknown")
- status = status_item.get("status", -1)
+ for item in response_data["msg"]["status_list"]:
+ goal_info = item.get("goal_info", {})
+ status = item.get("status", -1)
stamp = goal_info.get("stamp", {})
-
active_goals.append(
{
- "goal_id": goal_id,
+ "goal_id": goal_info.get("goal_id", {}).get("uuid", "unknown"),
"status": status,
"status_text": status_map.get(status, "UNKNOWN"),
"timestamp": f"{stamp.get('sec', 0)}.{stamp.get('nanosec', 0)}",
}
)
- # Unsubscribe
- ws_manager.send({"op": "unsubscribe", "topic": status_topic})
-
return {
"action_name": action_name,
- "success": True,
"active_goals": active_goals,
"goal_count": len(active_goals),
- "note": f"Found {len(active_goals)} active goal(s) for action {action_name}",
}
except json.JSONDecodeError as e:
- return {"error": f"Failed to parse status response: {str(e)}"}
+ return {"error": f"Failed to parse status response: {e}"}
except Exception as e:
- return {
- "action_name": action_name,
- "success": False,
- "error": f"Failed to get action status: {str(e)}",
- }
+ return {"error": f"Failed to get action status: {e}", "action_name": action_name}
@mcp.tool(
description=(
@@ -567,129 +312,94 @@ async def send_action_goal(
Returns:
dict: Contains action response including goal_id, status, and result.
"""
- # Validate inputs
if not action_name or not action_name.strip():
return {"error": "Action name cannot be empty"}
-
if not action_type or not action_type.strip():
return {"error": "Action type cannot be empty"}
-
if not goal:
return {"error": "Goal cannot be empty"}
- # Use ws_manager.default_timeout if timeout is None
if timeout is None:
timeout = ws_manager.default_timeout
- # Generate unique goal ID
goal_id = f"goal_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}"
- # rosbridge action goal message
- # Based on rosbridge source code, it expects "args" instead of "goal"
message = {
"op": "send_action_goal",
"id": goal_id,
"action": action_name,
"action_type": action_type,
- "args": goal, # rosbridge expects "args" not "goal"
- "feedback": True, # Enable feedback messages
+ "args": goal,
+ "feedback": True,
}
- # Send the action goal through rosbridge
with ws_manager:
send_error = ws_manager.send(message)
if send_error:
return {
+ "error": f"Failed to send action goal: {send_error}",
"action": action_name,
- "action_type": action_type,
"success": False,
- "error": f"Failed to send action goal: {send_error}",
}
- # Wait for action completion - handle both action_result and action_feedback
start_time = time.time()
- last_feedback = None # Store the last feedback message
- feedback_count = 0 # Count feedback messages received
+ last_feedback = None
+ feedback_count = 0
while time.time() - start_time < timeout:
- elapsed_time = time.time() - start_time
-
- response = ws_manager.receive(timeout - elapsed_time)
-
- if response:
- try:
- msg_data = json.loads(response)
-
- # Handle action_result messages (final completion)
- if msg_data.get("op") == "action_result":
- # Report completion
- if ctx:
- try:
- completion_msg = f"Action completed successfully (received {feedback_count} feedback messages)"
- await ctx.report_progress(
- progress=feedback_count, total=None, message=completion_msg
- )
- except Exception:
- pass
-
- return {
- "action": action_name,
- "action_type": action_type,
- "success": True,
- "goal_id": goal_id,
- "status": msg_data.get("status", "unknown"),
- "result": msg_data.get("values", {}),
- }
-
- # Store action_feedback messages and report progress
- if msg_data.get("op") == "action_feedback":
- feedback_count += 1
- last_feedback = msg_data
-
- # Report feedback progress
- if ctx:
- try:
- feedback_values = msg_data.get("values", {})
- feedback_msg = f"Action feedback #{feedback_count}: {str(feedback_values)[:100]}..."
- await ctx.report_progress(
- progress=feedback_count, total=None, message=feedback_msg
- )
- except Exception:
- pass
-
- except json.JSONDecodeError:
- continue
- else:
- # No response received, continue waiting
- pass
-
- await asyncio.sleep(0.1)
-
- # Timeout - return last feedback if available
- if ctx and feedback_count > 0:
- try:
- await ctx.report_progress(
- progress=feedback_count,
- total=None,
- message=f"Action timed out after {timeout} seconds (received {feedback_count} feedback messages)",
- )
- except Exception:
- pass
+ remaining = timeout - (time.time() - start_time)
+ response = ws_manager.receive(remaining)
- result = {
- "action": action_name,
- "action_type": action_type,
- "success": False,
- "goal_id": goal_id,
- "error": f"Action timed out after {timeout} seconds",
- }
+ if not response:
+ continue
- if last_feedback:
- result["success"] = True
- result["last_feedback"] = last_feedback.get("values", {})
- result["note"] = "Action timed out, but partial progress was made"
+ try:
+ msg_data = json.loads(response)
+ except json.JSONDecodeError:
+ continue
+
+ if msg_data.get("op") == "action_result":
+ if ctx:
+ try:
+ await ctx.report_progress(
+ progress=feedback_count, total=None, message="Action completed"
+ )
+ except Exception:
+ pass
+ return {
+ "action": action_name,
+ "action_type": action_type,
+ "success": True,
+ "goal_id": goal_id,
+ "status": msg_data.get("status", "unknown"),
+ "result": msg_data.get("values", {}),
+ }
- return result
+ if msg_data.get("op") == "action_feedback":
+ feedback_count += 1
+ last_feedback = msg_data
+ if ctx:
+ try:
+ await ctx.report_progress(
+ progress=feedback_count,
+ total=None,
+ message=f"Feedback #{feedback_count}: {str(msg_data.get('values', {}))[:100]}",
+ )
+ except Exception:
+ pass
+
+ # Timeout
+ result = {
+ "action": action_name,
+ "action_type": action_type,
+ "success": False,
+ "goal_id": goal_id,
+ "error": f"Action timed out after {timeout} seconds",
+ }
+ if last_feedback:
+ result["last_feedback"] = last_feedback.get("values", {})
+ result["feedback_count"] = feedback_count
+ return result
@mcp.tool(
description=(
@@ -712,36 +422,28 @@ def cancel_action_goal(action_name: str, goal_id: str) -> dict:
Returns:
dict: Contains cancellation status and result.
"""
- # Validate inputs
if not action_name or not action_name.strip():
return {"error": "Action name cannot be empty"}
-
if not goal_id or not goal_id.strip():
return {"error": "Goal ID cannot be empty"}
- # Create cancel message for rosbridge (based on rosbridge source code)
- cancel_message = {
- "op": "cancel_action_goal",
- "id": goal_id, # Use the actual goal ID, not a new one
- "action": action_name,
- "feedback": True, # Enable feedback messages
- }
-
- # Send the cancel request through rosbridge
with ws_manager:
- # Send cancel request
- send_error = ws_manager.send(cancel_message)
+ send_error = ws_manager.send(
+ {
+ "op": "cancel_action_goal",
+ "id": goal_id,
+ "action": action_name,
+ }
+ )
if send_error:
return {
+ "error": f"Failed to send cancel request: {send_error}",
"action": action_name,
"goal_id": goal_id,
- "success": False,
- "error": f"Failed to send cancel request: {send_error}",
}
return {
"action": action_name,
"goal_id": goal_id,
"success": True,
- "note": "Cancel request sent successfully. Action may still be executing.",
}
diff --git a/tests/integration/docker/Dockerfile.ros1-melodic b/tests/integration/docker/Dockerfile.ros1-melodic
index 7faff912..0e06ffdf 100644
--- a/tests/integration/docker/Dockerfile.ros1-melodic
+++ b/tests/integration/docker/Dockerfile.ros1-melodic
@@ -8,6 +8,7 @@ SHELL ["/bin/bash", "-c"]
RUN apt-get update && apt-get install -y --no-install-recommends \
ros-melodic-rosbridge-suite \
ros-melodic-turtlesim \
+ ros-melodic-turtle-actionlib \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /ros_ws
diff --git a/tests/integration/docker/Dockerfile.ros1-noetic b/tests/integration/docker/Dockerfile.ros1-noetic
index d13d5858..126af5e1 100644
--- a/tests/integration/docker/Dockerfile.ros1-noetic
+++ b/tests/integration/docker/Dockerfile.ros1-noetic
@@ -8,6 +8,7 @@ SHELL ["/bin/bash", "-c"]
RUN apt-get update && apt-get install -y --no-install-recommends \
ros-noetic-rosbridge-suite \
ros-noetic-turtlesim \
+ ros-noetic-turtle-actionlib \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /ros_ws
diff --git a/tests/integration/docker/ros1_launch.launch b/tests/integration/docker/ros1_launch.launch
index 9d673a4e..a3b3bcdd 100644
--- a/tests/integration/docker/ros1_launch.launch
+++ b/tests/integration/docker/ros1_launch.launch
@@ -10,4 +10,6 @@
+
+
diff --git a/tests/integration/test_actions.py b/tests/integration/test_actions.py
new file mode 100644
index 00000000..286b4b75
--- /dev/null
+++ b/tests/integration/test_actions.py
@@ -0,0 +1,108 @@
+"""Integration tests for action tools.
+
+These tests call the actual MCP tool functions (get_actions, get_action_details,
+get_action_status) against a live rosbridge container.
+
+Action servers per distro:
+- ROS 1 (melodic/noetic): /shape_server (from turtle_actionlib)
+- ROS 2 (humble/jazzy): /turtle1/rotate_absolute (from turtlesim)
+
+get_action_details and get_action_status use ROS 2-only rosbridge features
+(/rosapi/interfaces, send_action_goal op). See issue #320 for ROS 1 support
+via the 5-topic actionlib pattern.
+"""
+
+import pytest
+
+from ros_mcp.utils.rosapi_types import RosVersion, get_ros_version
+
+pytestmark = [pytest.mark.integration]
+
+
+_ACTIONS = {
+ RosVersion.ROS1: {
+ "name": "/turtle_shape",
+ "type": "turtle_actionlib/ShapeAction",
+ },
+ RosVersion.ROS2: {
+ "name": "/turtle1/rotate_absolute",
+ "type": "turtlesim/action/RotateAbsolute",
+ },
+}
+
+
+def _action():
+ """Return the action dict (name, type) for the current distro."""
+ return _ACTIONS[get_ros_version()]
+
+
+class TestGetActions:
+ """Verify get_actions MCP tool returns the action list."""
+
+ def test_returns_actions(self, tools):
+ """get_actions should return actions and action_count."""
+ result = tools["get_actions"]()
+ assert "actions" in result
+ assert "action_count" in result
+ assert result["action_count"] > 0
+ assert result["action_count"] == len(result["actions"])
+
+ def test_includes_expected_action(self, tools):
+ """The expected action server should be present."""
+ result = tools["get_actions"]()
+ actions = result["actions"]
+ expected = _action()["name"]
+ assert any(expected in a for a in actions), f"{expected} not in {actions}"
+
+
+class TestGetActionDetails:
+ """Verify get_action_details MCP tool returns action structure.
+
+ Detail inspection requires /rosapi/interfaces (ROS 2 only).
+ """
+
+ @pytest.mark.skipif(
+ "get_ros_version() != RosVersion.ROS2",
+ reason="Action detail inspection requires /rosapi/interfaces (ROS 2 only, see #320)",
+ )
+ def test_action_details(self, tools):
+ """get_action_details should return goal/result/feedback structure."""
+ action = _action()["name"]
+ result = tools["get_action_details"](action=action, action_type=_action()["type"])
+ assert result["action"] == action
+ assert "goal" in result
+ assert "result" in result
+ assert "feedback" in result
+ assert result["goal"]["field_count"] > 0
+
+ def test_empty_action_returns_error(self, tools):
+ """get_action_details with empty string should return error."""
+ result = tools["get_action_details"](action="")
+ assert "error" in result
+
+ def test_nonexistent_action_returns_error(self, tools):
+ """get_action_details for nonexistent action should return error."""
+ result = tools["get_action_details"](action="/nonexistent_action_xyz")
+ assert "error" in result
+
+
+class TestGetActionStatus:
+ """Verify get_action_status MCP tool returns status info.
+
+ Status subscription works on ROS 2. On ROS 1, the status topic
+ format differs (see #320).
+ """
+
+ @pytest.mark.skipif(
+ "get_ros_version() != RosVersion.ROS2",
+ reason="Action status subscription uses ROS 2 topic format (see #320)",
+ )
+ def test_action_status(self, tools):
+ """get_action_status should return status structure."""
+ result = tools["get_action_status"](action_name=_action()["name"])
+ assert "action_name" in result
+
+ def test_empty_action_returns_error(self, tools):
+ """get_action_status with empty string should return error."""
+ result = tools["get_action_status"](action_name="")
+ assert "error" in result