Skip to content

fix(runner): reassemble AG-UI streaming deltas in acp_get_session_status#1485

Open
quay-devel wants to merge 4 commits intoambient-code:mainfrom
quay-devel:fix/session-status-message-extraction
Open

fix(runner): reassemble AG-UI streaming deltas in acp_get_session_status#1485
quay-devel wants to merge 4 commits intoambient-code:mainfrom
quay-devel:fix/session-status-message-extraction

Conversation

@quay-devel
Copy link
Copy Markdown

@quay-devel quay-devel commented Apr 30, 2026

Summary

  • Fix acp_get_session_status returning empty recentMessages for all sessions by correctly reassembling AG-UI streaming delta events (delta field, not text) and grouping by messageId with role from TEXT_MESSAGE_START
  • Add MESSAGES_SNAPSHOT fallback for completed/stopped sessions that use snapshot-style event storage instead of streaming deltas
  • Upgrade error logging from debug to warning so export failures are observable

Fixes #1484

Root Cause

The message extraction code checked event.get("text") on TEXT_MESSAGE_CONTENT events, but AG-UI streaming protocol uses event["delta"] for text chunks. The role field is also on the TEXT_MESSAGE_START event, not on content events. Result: every event was silently skipped, yielding zero messages.

Changes

backend_tools.py: Replace single-event extraction with streaming delta reassembly:

  1. Track role from TEXT_MESSAGE_START by messageId
  2. Accumulate delta chunks from TEXT_MESSAGE_CONTENT by messageId
  3. Join and emit on TEXT_MESSAGE_END
  4. Fall back to MESSAGES_SNAPSHOT (last one) for completed sessions with no streaming events

test_backend_tools.py: 7 new tests covering:

  • Streaming delta reassembly (happy path)
  • max_messages limiting (returns last N)
  • Whitespace-only message filtering
  • Export endpoint failure graceful degradation
  • MESSAGES_SNAPSHOT fallback for stopped sessions
  • Streaming deltas preferred over snapshot when both exist
  • Long message truncation at 500 chars

Test Plan

  • All 7 new unit tests pass
  • Validated against live active session: 5 messages extracted from 3,796 events
  • Validated against live stopped session: 8 messages extracted via MESSAGES_SNAPSHOT fallback
  • Pre-existing tests unaffected (11/11 pass)

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Bug Fixes

    • Improved session message handling with enhanced fallback support and dual-format compatibility
    • Upgraded error logging for session operation failures
    • Added message filtering and truncation for cleaner output
  • Tests

    • New comprehensive test suite for session status operations with multiple coverage scenarios

acp_get_session_status always returned empty messages because the
message extraction code expected a "text" field on TEXT_MESSAGE_CONTENT
events, but AG-UI uses streaming deltas where the field is "delta" and
the role is on the TEXT_MESSAGE_START event. This also adds a fallback
to MESSAGES_SNAPSHOT events for completed sessions that only have
snapshot-style event storage.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 30, 2026

Deploy Preview for cheerful-kitten-f556a0 canceled.

Name Link
🔨 Latest commit f8f02a7
🔍 Latest deploy log https://app.netlify.com/projects/cheerful-kitten-f556a0/deploys/69f3ae2da656270008f9083d

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 30, 2026

📝 Walkthrough

Walkthrough

Modified acp_get_session_status to extract messages from two AG-UI event formats: streaming deltas (TEXT_MESSAGE_START/CONTENT/END) for active sessions and MESSAGES_SNAPSHOT arrays for completed sessions. Adds comprehensive test coverage and upgrades failure logging from debug to warning level.

Changes

Cohort / File(s) Summary
Message Extraction Fix
components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py
Updated message collection logic to assemble text from streaming delta events keyed by messageId, with role tracking at TEXT_MESSAGE_START and finalization at TEXT_MESSAGE_END. Implements fallback to MESSAGES_SNAPSHOT messages array (filtered to user/assistant roles with non-empty content). Both paths enforce 500-character truncation. Upgraded session event fetch failures from debug to warning-level logging.
Test Coverage
components/runners/ambient-runner/tests/test_backend_tools.py
Added async unit test suite for acp_get_session_status validating: streaming delta message reconstruction, max_messages parameter enforcement, whitespace-only message skipping, text truncation to 500 chars, export failure graceful handling, snapshot fallback behavior, and preference for streaming deltas when both formats present.
🚥 Pre-merge checks | ✅ 7 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 44.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (7 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed Title follows Conventional Commits format (type: fix, scope: runner) and clearly describes the main change: reassembling AG-UI streaming deltas in acp_get_session_status.
Linked Issues check ✅ Passed PR fully addresses all coding requirements from #1484: streaming delta reassembly by messageId with role tracking, MESSAGES_SNAPSHOT fallback for completed sessions, error logging upgrade, and comprehensive test coverage.
Out of Scope Changes check ✅ Passed All changes are strictly scoped to fixing acp_get_session_status message extraction and adding corresponding unit tests; no unrelated modifications present.
Performance And Algorithmic Complexity ✅ Passed Implementation uses O(n) event loop with list accumulation and single str.join() call, snapshot fallback is linear pass, all outputs bounded.
Security And Secret Handling ✅ Passed Code safely processes AG-UI event deltas without injection risks, no hardcoded secrets/tokens, no sensitive data leakage or auth bypass vulnerabilities detected.
Kubernetes Resource Safety ✅ Passed PR modifies only Python backend code for AG-UI streaming delta reassembly. No Kubernetes manifests, resource definitions, or cluster configurations are introduced or modified.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py (1)

400-459: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don’t swallow parser/runtime errors as “export failure.”

Lines 400-459 currently catch all exceptions from both fetch and parse, then return an empty-success response. A malformed event/delta can therefore hide a real bug and report totalMessages: 0 instead of surfacing the failure path.

Proposed fix
-            # Try to get recent events for message content
-            try:
-                events = api_client.get_session_events(
-                    session_name=session_name,
-                )
+            # Try to get recent events for message content
+            try:
+                events = api_client.get_session_events(session_name=session_name)
+            except Exception as events_err:
+                logger.warning(f"Could not fetch events for {session_name}: {events_err}")
+                result["recentMessages"] = []
+                result["totalMessages"] = 0
+                return _tool_response(result)
+
+            try:
                 # Two event patterns exist depending on session state:
                 #
                 # 1. Streaming deltas (active sessions):
@@
-                    elif etype == "TEXT_MESSAGE_CONTENT" and mid:
-                        if "delta" in event:
-                            msg_deltas.setdefault(mid, []).append(event["delta"])
+                    elif etype == "TEXT_MESSAGE_CONTENT" and mid:
+                        delta = event.get("delta")
+                        if isinstance(delta, str):
+                            msg_deltas.setdefault(mid, []).append(delta)
@@
                 # Return only the last N messages
                 result["recentMessages"] = messages[-max_messages:]
                 result["totalMessages"] = len(messages)
-            except Exception as events_err:
-                logger.warning(f"Could not fetch events for {session_name}: {events_err}")
-                result["recentMessages"] = []
-                result["totalMessages"] = 0
+            except Exception as parse_err:
+                logger.error(f"Failed to parse events for {session_name}: {parse_err}", exc_info=True)
+                return _tool_error(parse_err)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py`
around lines 400 - 459, The current broad except around the entire events
processing (catching Exception as events_err) hides parsing/runtime errors;
restrict error handling to only the fetch step by wrapping
api_client.get_session_events(session_name=...) in its own try/except and
handling fetch failures there (log warning and set result["recentMessages"]=[]
and result["totalMessages"]=0), but do not swallow exceptions raised while
iterating/processing events (the loop that uses msg_roles, msg_deltas,
MESSAGES_SNAPSHOT, etc.); allow those parsing/runtime exceptions to propagate
(or re-raise after logging) so real bugs surface. Use the existing symbols
api_client.get_session_events, session_name, events, msg_roles, msg_deltas,
last_snapshot, and the current except block (events_err) to locate and refactor
the code accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In
`@components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py`:
- Around line 400-459: The current broad except around the entire events
processing (catching Exception as events_err) hides parsing/runtime errors;
restrict error handling to only the fetch step by wrapping
api_client.get_session_events(session_name=...) in its own try/except and
handling fetch failures there (log warning and set result["recentMessages"]=[]
and result["totalMessages"]=0), but do not swallow exceptions raised while
iterating/processing events (the loop that uses msg_roles, msg_deltas,
MESSAGES_SNAPSHOT, etc.); allow those parsing/runtime exceptions to propagate
(or re-raise after logging) so real bugs surface. Use the existing symbols
api_client.get_session_events, session_name, events, msg_roles, msg_deltas,
last_snapshot, and the current except block (events_err) to locate and refactor
the code accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: deb7cfa6-29e7-46da-a46e-de5a3b7a485a

📥 Commits

Reviewing files that changed from the base of the PR and between d1645a9 and e940dd0.

📒 Files selected for processing (2)
  • components/runners/ambient-runner/ambient_runner/bridges/claude/backend_tools.py
  • components/runners/ambient-runner/tests/test_backend_tools.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix(runner): acp_get_session_status returns empty messages for all sessions

1 participant