From 6e8eba617c0cdfde4d4940908ceb319a05f1314b Mon Sep 17 00:00:00 2001 From: quay-devel Date: Thu, 30 Apr 2026 08:26:17 +0000 Subject: [PATCH] fix(runner): reassemble AG-UI streaming deltas in acp_get_session_status acp_get_session_status always returned empty messages because the message extraction code expected a "text" field on TEXT_MESSAGE_CONTENT events, but AG-UI uses streaming deltas where the field is "delta" and the role is on the TEXT_MESSAGE_START event. This also adds a fallback to MESSAGES_SNAPSHOT events for completed sessions that only have snapshot-style event storage. Co-Authored-By: Claude Opus 4.6 --- .../bridges/claude/backend_tools.py | 59 +- .../tests/test_backend_tools.py | 543 ++++++++++++++++++ 2 files changed, 592 insertions(+), 10 deletions(-) diff --git a/components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py b/components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py index 0cc174ed0..df19fc2df 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py @@ -401,22 +401,61 @@ async def acp_get_session_status(args: dict) -> dict: events = api_client.get_session_events( session_name=session_name, ) - # Extract text messages from AG-UI events + # Two event patterns exist depending on session state: + # + # 1. Streaming deltas (active sessions): + # TEXT_MESSAGE_START -> role, messageId + # TEXT_MESSAGE_CONTENT -> delta (text chunk), messageId + # TEXT_MESSAGE_END -> messageId + # + # 2. MESSAGES_SNAPSHOT (completed sessions): + # Contains a full "messages" array with role + content + # + # Try streaming deltas first; fall back to snapshots. + msg_roles: dict[str, str] = {} + msg_deltas: dict[str, list[str]] = {} messages = [] + last_snapshot = None for event in events: - event_type = event.get("type", "") - if event_type == "TEXT_MESSAGE_CONTENT" and event.get("text"): - messages.append( - { - "role": event.get("role", "assistant"), - "text": event["text"][:500], - } - ) + etype = event.get("type", "") + mid = event.get("messageId", "") + if etype == "TEXT_MESSAGE_START" and mid: + msg_roles[mid] = event.get("role", "assistant") + msg_deltas[mid] = [] + elif etype == "TEXT_MESSAGE_CONTENT" and mid: + if "delta" in event: + msg_deltas.setdefault(mid, []).append(event["delta"]) + elif etype == "TEXT_MESSAGE_END" and mid: + if mid in msg_deltas: + full_text = "".join(msg_deltas.pop(mid)) + if full_text.strip(): + messages.append( + { + "role": msg_roles.pop(mid, "assistant"), + "text": full_text[:500], + } + ) + elif etype == "MESSAGES_SNAPSHOT": + last_snapshot = event + + # Fall back to MESSAGES_SNAPSHOT if no streaming deltas found + if not messages and last_snapshot: + for msg in last_snapshot.get("messages", []): + role = msg.get("role", "") + content = msg.get("content", "") + if role in ("user", "assistant") and isinstance(content, str) and content.strip(): + messages.append( + { + "role": role, + "text": content[:500], + } + ) + # Return only the last N messages result["recentMessages"] = messages[-max_messages:] result["totalMessages"] = len(messages) except Exception as events_err: - logger.debug(f"Could not fetch events for {session_name}: {events_err}") + logger.warning(f"Could not fetch events for {session_name}: {events_err}") result["recentMessages"] = [] result["totalMessages"] = 0 diff --git a/components/runners/ambient-runner/tests/test_backend_tools.py b/components/runners/ambient-runner/tests/test_backend_tools.py index 32a6931af..873daa5d3 100644 --- a/components/runners/ambient-runner/tests/test_backend_tools.py +++ b/components/runners/ambient-runner/tests/test_backend_tools.py @@ -478,6 +478,549 @@ def decorator(func): assert tools == [] + @patch("urllib.request.urlopen") + @pytest.mark.asyncio + async def test_get_session_status_reassembles_streaming_deltas( + self, mock_urlopen, monkeypatch + ): + """Test that acp_get_session_status reassembles AG-UI streaming deltas into messages.""" + from ambient_runner.bridges.claude.backend_tools import ( + create_backend_mcp_tools, + ) + + monkeypatch.setenv("BACKEND_API_URL", "http://backend:8080/api") + monkeypatch.setenv("PROJECT_NAME", "test-project") + monkeypatch.setenv("BOT_TOKEN", "test-token") + + # First call: get_session (session details) + session_response = MagicMock() + session_response.read.return_value = json.dumps( + { + "status": { + "phase": "Running", + "agentStatus": "idle", + "startTime": "2026-04-30T08:00:00Z", + "lastActivityTime": "2026-04-30T09:00:00Z", + }, + "spec": {"displayName": "Test Session"}, + } + ).encode("utf-8") + session_response.__enter__ = MagicMock(return_value=session_response) + session_response.__exit__ = MagicMock(return_value=False) + + # Second call: export (AG-UI events with streaming deltas) + export_response = MagicMock() + export_response.read.return_value = json.dumps( + { + "sessionId": "test-session", + "projectName": "test-project", + "aguiEvents": [ + { + "type": "RUN_STARTED", + "runId": "run-1", + "threadId": "thread-1", + }, + { + "type": "TEXT_MESSAGE_START", + "messageId": "msg-1", + "role": "assistant", + "runId": "run-1", + "threadId": "thread-1", + }, + { + "type": "TEXT_MESSAGE_CONTENT", + "messageId": "msg-1", + "delta": "Hello, ", + }, + { + "type": "TEXT_MESSAGE_CONTENT", + "messageId": "msg-1", + "delta": "world!", + }, + { + "type": "TEXT_MESSAGE_END", + "messageId": "msg-1", + }, + { + "type": "TEXT_MESSAGE_START", + "messageId": "msg-2", + "role": "user", + "runId": "run-1", + "threadId": "thread-1", + }, + { + "type": "TEXT_MESSAGE_CONTENT", + "messageId": "msg-2", + "delta": "Thanks!", + }, + { + "type": "TEXT_MESSAGE_END", + "messageId": "msg-2", + }, + ], + } + ).encode("utf-8") + export_response.__enter__ = MagicMock(return_value=export_response) + export_response.__exit__ = MagicMock(return_value=False) + + mock_urlopen.side_effect = [session_response, export_response] + + def mock_tool(name, description, schema): + def decorator(func): + func._tool_name = name + return func + + return decorator + + client = BackendAPIClient() + tools = create_backend_mcp_tools(sdk_tool_decorator=mock_tool, client=client) + + status_tool = next( + t for t in tools if getattr(t, "_tool_name", "") == "acp_get_session_status" + ) + result = await status_tool({"session_name": "test-session", "max_messages": 50}) + + result_data = json.loads(result["content"][0]["text"]) + assert result_data["success"] is True + assert result_data["totalMessages"] == 2 + assert len(result_data["recentMessages"]) == 2 + assert result_data["recentMessages"][0]["role"] == "assistant" + assert result_data["recentMessages"][0]["text"] == "Hello, world!" + assert result_data["recentMessages"][1]["role"] == "user" + assert result_data["recentMessages"][1]["text"] == "Thanks!" + + @patch("urllib.request.urlopen") + @pytest.mark.asyncio + async def test_get_session_status_respects_max_messages( + self, mock_urlopen, monkeypatch + ): + """Test that max_messages limits the returned messages.""" + from ambient_runner.bridges.claude.backend_tools import ( + create_backend_mcp_tools, + ) + + monkeypatch.setenv("BACKEND_API_URL", "http://backend:8080/api") + monkeypatch.setenv("PROJECT_NAME", "test-project") + monkeypatch.setenv("BOT_TOKEN", "test-token") + + session_response = MagicMock() + session_response.read.return_value = json.dumps( + { + "status": {"phase": "Running", "agentStatus": "idle"}, + "spec": {"displayName": "Test"}, + } + ).encode("utf-8") + session_response.__enter__ = MagicMock(return_value=session_response) + session_response.__exit__ = MagicMock(return_value=False) + + # Build 5 messages worth of events + agui_events = [] + for i in range(5): + agui_events.extend( + [ + { + "type": "TEXT_MESSAGE_START", + "messageId": f"msg-{i}", + "role": "assistant", + }, + { + "type": "TEXT_MESSAGE_CONTENT", + "messageId": f"msg-{i}", + "delta": f"Message {i}", + }, + {"type": "TEXT_MESSAGE_END", "messageId": f"msg-{i}"}, + ] + ) + + export_response = MagicMock() + export_response.read.return_value = json.dumps( + {"aguiEvents": agui_events} + ).encode("utf-8") + export_response.__enter__ = MagicMock(return_value=export_response) + export_response.__exit__ = MagicMock(return_value=False) + + mock_urlopen.side_effect = [session_response, export_response] + + def mock_tool(name, description, schema): + def decorator(func): + func._tool_name = name + return func + + return decorator + + client = BackendAPIClient() + tools = create_backend_mcp_tools(sdk_tool_decorator=mock_tool, client=client) + status_tool = next( + t for t in tools if getattr(t, "_tool_name", "") == "acp_get_session_status" + ) + + result = await status_tool({"session_name": "test-session", "max_messages": 2}) + result_data = json.loads(result["content"][0]["text"]) + + assert result_data["totalMessages"] == 5 + assert len(result_data["recentMessages"]) == 2 + # Should return the LAST 2 messages + assert result_data["recentMessages"][0]["text"] == "Message 3" + assert result_data["recentMessages"][1]["text"] == "Message 4" + + @patch("urllib.request.urlopen") + @pytest.mark.asyncio + async def test_get_session_status_skips_empty_messages( + self, mock_urlopen, monkeypatch + ): + """Test that whitespace-only messages are skipped.""" + from ambient_runner.bridges.claude.backend_tools import ( + create_backend_mcp_tools, + ) + + monkeypatch.setenv("BACKEND_API_URL", "http://backend:8080/api") + monkeypatch.setenv("PROJECT_NAME", "test-project") + monkeypatch.setenv("BOT_TOKEN", "test-token") + + session_response = MagicMock() + session_response.read.return_value = json.dumps( + { + "status": {"phase": "Running", "agentStatus": "idle"}, + "spec": {"displayName": "Test"}, + } + ).encode("utf-8") + session_response.__enter__ = MagicMock(return_value=session_response) + session_response.__exit__ = MagicMock(return_value=False) + + export_response = MagicMock() + export_response.read.return_value = json.dumps( + { + "aguiEvents": [ + { + "type": "TEXT_MESSAGE_START", + "messageId": "msg-empty", + "role": "assistant", + }, + { + "type": "TEXT_MESSAGE_CONTENT", + "messageId": "msg-empty", + "delta": " ", + }, + {"type": "TEXT_MESSAGE_END", "messageId": "msg-empty"}, + { + "type": "TEXT_MESSAGE_START", + "messageId": "msg-real", + "role": "assistant", + }, + { + "type": "TEXT_MESSAGE_CONTENT", + "messageId": "msg-real", + "delta": "Real content", + }, + {"type": "TEXT_MESSAGE_END", "messageId": "msg-real"}, + ], + } + ).encode("utf-8") + export_response.__enter__ = MagicMock(return_value=export_response) + export_response.__exit__ = MagicMock(return_value=False) + + mock_urlopen.side_effect = [session_response, export_response] + + def mock_tool(name, description, schema): + def decorator(func): + func._tool_name = name + return func + + return decorator + + client = BackendAPIClient() + tools = create_backend_mcp_tools(sdk_tool_decorator=mock_tool, client=client) + status_tool = next( + t for t in tools if getattr(t, "_tool_name", "") == "acp_get_session_status" + ) + + result = await status_tool({"session_name": "test-session"}) + result_data = json.loads(result["content"][0]["text"]) + + assert result_data["totalMessages"] == 1 + assert result_data["recentMessages"][0]["text"] == "Real content" + + @patch("urllib.request.urlopen") + @pytest.mark.asyncio + async def test_get_session_status_export_failure_returns_empty( + self, mock_urlopen, monkeypatch + ): + """Test graceful degradation when export endpoint fails.""" + from ambient_runner.bridges.claude.backend_tools import ( + create_backend_mcp_tools, + ) + + monkeypatch.setenv("BACKEND_API_URL", "http://backend:8080/api") + monkeypatch.setenv("PROJECT_NAME", "test-project") + monkeypatch.setenv("BOT_TOKEN", "test-token") + + session_response = MagicMock() + session_response.read.return_value = json.dumps( + { + "status": {"phase": "Stopped", "agentStatus": None}, + "spec": {"displayName": "Stopped Session"}, + } + ).encode("utf-8") + session_response.__enter__ = MagicMock(return_value=session_response) + session_response.__exit__ = MagicMock(return_value=False) + + # Export call fails + export_error = HTTPError( + "http://backend:8080/api/export", + 404, + "Not Found", + {}, + MagicMock(read=lambda: b'{"error": "Not found"}'), + ) + mock_urlopen.side_effect = [session_response, export_error] + + def mock_tool(name, description, schema): + def decorator(func): + func._tool_name = name + return func + + return decorator + + client = BackendAPIClient() + tools = create_backend_mcp_tools(sdk_tool_decorator=mock_tool, client=client) + status_tool = next( + t for t in tools if getattr(t, "_tool_name", "") == "acp_get_session_status" + ) + + result = await status_tool({"session_name": "test-session"}) + result_data = json.loads(result["content"][0]["text"]) + + # Should still return session metadata, just no messages + assert result_data["success"] is True + assert result_data["phase"] == "Stopped" + assert result_data["recentMessages"] == [] + assert result_data["totalMessages"] == 0 + + @patch("urllib.request.urlopen") + @pytest.mark.asyncio + async def test_get_session_status_falls_back_to_messages_snapshot( + self, mock_urlopen, monkeypatch + ): + """Test fallback to MESSAGES_SNAPSHOT when no streaming deltas exist.""" + from ambient_runner.bridges.claude.backend_tools import ( + create_backend_mcp_tools, + ) + + monkeypatch.setenv("BACKEND_API_URL", "http://backend:8080/api") + monkeypatch.setenv("PROJECT_NAME", "test-project") + monkeypatch.setenv("BOT_TOKEN", "test-token") + + session_response = MagicMock() + session_response.read.return_value = json.dumps( + { + "status": {"phase": "Stopped", "agentStatus": None}, + "spec": {"displayName": "Completed Session"}, + } + ).encode("utf-8") + session_response.__enter__ = MagicMock(return_value=session_response) + session_response.__exit__ = MagicMock(return_value=False) + + # Completed sessions have MESSAGES_SNAPSHOT instead of streaming deltas + export_response = MagicMock() + export_response.read.return_value = json.dumps( + { + "aguiEvents": [ + {"type": "RAW", "runId": "run-1"}, + { + "type": "MESSAGES_SNAPSHOT", + "messages": [ + {"role": "user", "content": "Fix the bug in auth.py"}, + {"role": "assistant", "content": "I found the issue in the login handler."}, + {"role": "tool", "content": "[tool result]"}, + {"role": "user", "content": "Thanks, ship it!"}, + {"role": "assistant", "content": "PR created: #1234"}, + ], + }, + {"type": "RUN_STARTED", "runId": "run-1"}, + { + "type": "MESSAGES_SNAPSHOT", + "messages": [ + {"role": "user", "content": "Fix the bug in auth.py"}, + {"role": "assistant", "content": "I found the issue in the login handler."}, + {"role": "user", "content": "Thanks, ship it!"}, + {"role": "assistant", "content": "PR created and merged."}, + ], + }, + {"type": "RUN_FINISHED", "runId": "run-1"}, + ], + } + ).encode("utf-8") + export_response.__enter__ = MagicMock(return_value=export_response) + export_response.__exit__ = MagicMock(return_value=False) + + mock_urlopen.side_effect = [session_response, export_response] + + def mock_tool(name, description, schema): + def decorator(func): + func._tool_name = name + return func + + return decorator + + client = BackendAPIClient() + tools = create_backend_mcp_tools(sdk_tool_decorator=mock_tool, client=client) + status_tool = next( + t for t in tools if getattr(t, "_tool_name", "") == "acp_get_session_status" + ) + + result = await status_tool({"session_name": "test-session", "max_messages": 50}) + result_data = json.loads(result["content"][0]["text"]) + + # Should use the LAST MESSAGES_SNAPSHOT (most complete) + assert result_data["totalMessages"] == 4 + assert len(result_data["recentMessages"]) == 4 + # Tool messages should be filtered out + assert all( + m["role"] in ("user", "assistant") for m in result_data["recentMessages"] + ) + assert result_data["recentMessages"][0]["text"] == "Fix the bug in auth.py" + assert result_data["recentMessages"][-1]["text"] == "PR created and merged." + + @patch("urllib.request.urlopen") + @pytest.mark.asyncio + async def test_get_session_status_prefers_streaming_over_snapshot( + self, mock_urlopen, monkeypatch + ): + """Test that streaming deltas are preferred over MESSAGES_SNAPSHOT.""" + from ambient_runner.bridges.claude.backend_tools import ( + create_backend_mcp_tools, + ) + + monkeypatch.setenv("BACKEND_API_URL", "http://backend:8080/api") + monkeypatch.setenv("PROJECT_NAME", "test-project") + monkeypatch.setenv("BOT_TOKEN", "test-token") + + session_response = MagicMock() + session_response.read.return_value = json.dumps( + { + "status": {"phase": "Running", "agentStatus": "working"}, + "spec": {"displayName": "Active Session"}, + } + ).encode("utf-8") + session_response.__enter__ = MagicMock(return_value=session_response) + session_response.__exit__ = MagicMock(return_value=False) + + # Session has BOTH streaming deltas AND a snapshot (active session mid-run) + export_response = MagicMock() + export_response.read.return_value = json.dumps( + { + "aguiEvents": [ + { + "type": "MESSAGES_SNAPSHOT", + "messages": [ + {"role": "user", "content": "Old snapshot content"}, + ], + }, + { + "type": "TEXT_MESSAGE_START", + "messageId": "msg-live", + "role": "assistant", + }, + { + "type": "TEXT_MESSAGE_CONTENT", + "messageId": "msg-live", + "delta": "Fresh streaming content", + }, + {"type": "TEXT_MESSAGE_END", "messageId": "msg-live"}, + ], + } + ).encode("utf-8") + export_response.__enter__ = MagicMock(return_value=export_response) + export_response.__exit__ = MagicMock(return_value=False) + + mock_urlopen.side_effect = [session_response, export_response] + + def mock_tool(name, description, schema): + def decorator(func): + func._tool_name = name + return func + + return decorator + + client = BackendAPIClient() + tools = create_backend_mcp_tools(sdk_tool_decorator=mock_tool, client=client) + status_tool = next( + t for t in tools if getattr(t, "_tool_name", "") == "acp_get_session_status" + ) + + result = await status_tool({"session_name": "test-session"}) + result_data = json.loads(result["content"][0]["text"]) + + # Should use streaming deltas, NOT the snapshot + assert result_data["totalMessages"] == 1 + assert result_data["recentMessages"][0]["text"] == "Fresh streaming content" + + @patch("urllib.request.urlopen") + @pytest.mark.asyncio + async def test_get_session_status_truncates_long_messages( + self, mock_urlopen, monkeypatch + ): + """Test that messages longer than 500 chars are truncated.""" + from ambient_runner.bridges.claude.backend_tools import ( + create_backend_mcp_tools, + ) + + monkeypatch.setenv("BACKEND_API_URL", "http://backend:8080/api") + monkeypatch.setenv("PROJECT_NAME", "test-project") + monkeypatch.setenv("BOT_TOKEN", "test-token") + + session_response = MagicMock() + session_response.read.return_value = json.dumps( + { + "status": {"phase": "Running", "agentStatus": "idle"}, + "spec": {"displayName": "Test"}, + } + ).encode("utf-8") + session_response.__enter__ = MagicMock(return_value=session_response) + session_response.__exit__ = MagicMock(return_value=False) + + long_text = "A" * 1000 + export_response = MagicMock() + export_response.read.return_value = json.dumps( + { + "aguiEvents": [ + { + "type": "TEXT_MESSAGE_START", + "messageId": "msg-long", + "role": "assistant", + }, + { + "type": "TEXT_MESSAGE_CONTENT", + "messageId": "msg-long", + "delta": long_text, + }, + {"type": "TEXT_MESSAGE_END", "messageId": "msg-long"}, + ], + } + ).encode("utf-8") + export_response.__enter__ = MagicMock(return_value=export_response) + export_response.__exit__ = MagicMock(return_value=False) + + mock_urlopen.side_effect = [session_response, export_response] + + def mock_tool(name, description, schema): + def decorator(func): + func._tool_name = name + return func + + return decorator + + client = BackendAPIClient() + tools = create_backend_mcp_tools(sdk_tool_decorator=mock_tool, client=client) + status_tool = next( + t for t in tools if getattr(t, "_tool_name", "") == "acp_get_session_status" + ) + + result = await status_tool({"session_name": "test-session"}) + result_data = json.loads(result["content"][0]["text"]) + + assert len(result_data["recentMessages"][0]["text"]) == 500 + @patch("urllib.request.urlopen") @pytest.mark.asyncio async def test_tool_execution_list_sessions(self, mock_urlopen, monkeypatch):