Conversation
…leaking as task text - Add handleA2AMessageStream() that proxies /api/chat SSE chunks as A2A TaskArtifactUpdateEvents and closes with a final TaskStatusUpdateEvent (state=completed) - Validate params and skill allowlist BEFORE setting SSE headers so error responses are still plain JSON-RPC when the request is malformed - Catch non-OK responses from /api/chat and emit a JSON-RPC error event — the raw HTML body (e.g. "Cannot POST /") is never surfaced as task text - Update agent card capabilities.streaming: true so callers know to use message/stream directly instead of simulating it via message/send - Update Method Not Found error message to list both supported methods - Extract shared A2ARequestBody type to avoid duplicating the inline cast Closes #471 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughA new SSE streaming handler was added to the A2A API to support the Changes
Sequence DiagramsequenceDiagram
actor Client
participant A2AHandler as A2A Handler
participant ChatAPI as Internal /api/chat
Client->>A2AHandler: POST /a2a with method: 'message/stream'
activate A2AHandler
A2AHandler->>A2AHandler: Validate params (text parts, skillHint, projectPath)
A2AHandler->>Client: Set SSE headers (text/event-stream, no-cache)
A2AHandler->>Client: Emit "working" status event
A2AHandler->>ChatAPI: POST /api/chat (proxy request)
activate ChatAPI
ChatAPI-->>A2AHandler: Streaming response (text-delta events)
deactivate ChatAPI
loop For each SSE data line
A2AHandler->>A2AHandler: Parse text-delta
A2AHandler->>Client: Emit TaskArtifactUpdateEvent (create/append)
end
alt Success path
A2AHandler->>Client: Emit "completed" status event
else /api/chat non-OK response
A2AHandler->>A2AHandler: Consume response body
A2AHandler->>Client: Emit JSON-RPC error (code: -32603)
else Exception during processing
A2AHandler->>Client: Emit "Internal error" event
end
A2AHandler->>A2AHandler: Close SSE stream
deactivate A2AHandler
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
apps/server/src/routes/a2a/index.ts (1)
556-570: Preferinterfacefor the shared request body.
A2ARequestBodyis a reusable object shape shared by both handlers, so this should follow the repo rule and be declared as aninterfaceinstead of atypealias.Proposed change
-type A2ARequestBody = { +interface A2ARequestBody { jsonrpc?: string; id?: string | number; method?: string; params?: { message?: { @@ metadata?: Record<string, unknown>; contextId?: string; }; -}; +}As per coding guidelines, "Use TypeScript
interfacefor defining object shapes rather thantypealiases."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/server/src/routes/a2a/index.ts` around lines 556 - 570, The A2ARequestBody is declared as a type alias but per repo rules should be an interface; replace the "type A2ARequestBody = { ... }" declaration with "interface A2ARequestBody { ... }" preserving the exact property structure (jsonrpc, id, method, params with message/metadata/contextId and nested parts), and ensure any usages in the send and stream handlers still reference A2ARequestBody without other changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/server/src/routes/a2a/index.ts`:
- Around line 591-699: The message/stream handler (handleA2AMessageStream)
bypasses the centralized skill dispatcher by POSTing directly to /api/chat,
causing message/stream to skip planning, native-tool routing,
metadata.projectPath overrides, and retry/backoff used by message/send; update
handleA2AMessageStream to route through the same dispatcher used by message/send
(invoke the message/send flow or call the shared dispatcher/handler function
instead of directly calling /api/chat), passing through skillHint/skillOverride,
contextId/correlationId, projectPath from body.params.metadata, and respecting
workflow settings (timeout/backoff) obtained via getWorkflowSettings so both
endpoints share identical routing and behavior.
- Around line 728-766: The SSE parser currently decodes and parses each
reader.read() chunk independently which drops JSON lines split across reads; fix
the loop in the section around decoder, chunkCount, reader.read by introducing a
carry-over buffer string (e.g., let buffer = ''), append decoder.decode(value,
{stream:true}) to buffer on each read, split buffer by '\n', process all
complete lines except keep the last partial line back into buffer for the next
iteration, and only JSON.parse lines that start with 'data: ' (preserving
existing trimmed check and try/catch around parsing) before calling writeSSE and
incrementing chunkCount so split frames are not lost.
---
Nitpick comments:
In `@apps/server/src/routes/a2a/index.ts`:
- Around line 556-570: The A2ARequestBody is declared as a type alias but per
repo rules should be an interface; replace the "type A2ARequestBody = { ... }"
declaration with "interface A2ARequestBody { ... }" preserving the exact
property structure (jsonrpc, id, method, params with message/metadata/contextId
and nested parts), and ensure any usages in the send and stream handlers still
reference A2ARequestBody without other changes.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ef9c862f-e599-43e9-9bc4-1574cdc1dc8e
📒 Files selected for processing (1)
apps/server/src/routes/a2a/index.ts
| async function handleA2AMessageStream( | ||
| req: Request, | ||
| res: Response, | ||
| apiKey: string, | ||
| projectPath: string, | ||
| body: A2ARequestBody, | ||
| rpcId: string | number | null, | ||
| deps?: A2AHandlerDeps | ||
| ): Promise<void> { | ||
| const parts = body.params?.message?.parts ?? []; | ||
| const userText = extractText(parts); | ||
| const skillOverride = body.params?.metadata?.skillHint as string | undefined; | ||
| const contextId = | ||
| body.params?.contextId ?? (req.headers['x-correlation-id'] as string | undefined); | ||
|
|
||
| // Validate params before setting SSE headers (can still use res.json() here) | ||
| if (!userText && skillOverride !== 'plan_resume') { | ||
| res.status(200).json({ | ||
| jsonrpc: '2.0', | ||
| id: rpcId, | ||
| error: { | ||
| code: -32602, | ||
| message: 'Invalid params: message must contain at least one text part', | ||
| }, | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| if (skillOverride && !DECLARED_SKILL_IDS.has(skillOverride)) { | ||
| logger.warn( | ||
| `A2A message/stream rejected skill "${skillOverride}" — not in agent card. Declared: [${[...DECLARED_SKILL_IDS].join(', ')}]` | ||
| ); | ||
| res.status(200).json({ | ||
| jsonrpc: '2.0', | ||
| id: rpcId, | ||
| error: { | ||
| code: -32601, | ||
| message: | ||
| `Skill "${skillOverride}" is not declared in Ava's agent card. ` + | ||
| `Declared skills: ${[...DECLARED_SKILL_IDS].join(', ')}.`, | ||
| }, | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| if (!projectPath) { | ||
| res.status(200).json({ | ||
| jsonrpc: '2.0', | ||
| id: rpcId, | ||
| error: { code: -32603, message: 'Internal error: projectPath is missing' }, | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| // Set SSE headers — must happen before any res.write() calls | ||
| res.setHeader('Content-Type', 'text/event-stream'); | ||
| res.setHeader('Cache-Control', 'no-cache'); | ||
| res.setHeader('Connection', 'keep-alive'); | ||
| // Disable nginx/proxy buffering so chunks reach the client immediately | ||
| res.setHeader('X-Accel-Buffering', 'no'); | ||
| res.status(200); | ||
|
|
||
| const taskId = randomUUID(); | ||
| const responseContextId = contextId ?? randomUUID(); | ||
| const artifactId = randomUUID(); | ||
|
|
||
| const writeSSE = (data: object): void => { | ||
| res.write(`data: ${JSON.stringify(data)}\n\n`); | ||
| }; | ||
|
|
||
| // Emit initial "working" status so the client knows the task has started | ||
| writeSSE({ | ||
| jsonrpc: '2.0', | ||
| id: rpcId, | ||
| result: { | ||
| id: taskId, | ||
| contextId: responseContextId, | ||
| status: { state: 'working' }, | ||
| final: false, | ||
| }, | ||
| }); | ||
|
|
||
| logger.info( | ||
| `A2A message/stream started: "${(userText || '').slice(0, 80)}${(userText || '').length > 80 ? '…' : ''}" (skill=${skillOverride ?? 'none'}, task=${taskId})` | ||
| ); | ||
|
|
||
| try { | ||
| const workflowSettings = await getWorkflowSettings( | ||
| projectPath, | ||
| deps?.settingsService, | ||
| '[A2AStream]' | ||
| ); | ||
| const timeoutMs = workflowSettings.a2aSkillExecution?.timeoutMs ?? A2A_DEFAULT_TIMEOUT_MS; | ||
| const baseUrl = `http://localhost:${process.env['PORT'] ?? 3008}`; | ||
|
|
||
| const chatRes = await fetch(`${baseUrl}/api/chat`, { | ||
| method: 'POST', | ||
| signal: AbortSignal.timeout(timeoutMs), | ||
| headers: { | ||
| 'Content-Type': 'application/json', | ||
| 'X-API-Key': apiKey, | ||
| ...(contextId ? { 'X-Correlation-Id': contextId } : {}), | ||
| }, | ||
| body: JSON.stringify({ | ||
| messages: [{ id: randomUUID(), role: 'user', parts: [{ type: 'text', text: userText }] }], | ||
| projectPath, | ||
| ...(skillOverride ? { skillOverride } : {}), | ||
| ...(contextId ? { correlationId: contextId } : {}), | ||
| }), |
There was a problem hiding this comment.
message/stream is bypassing the established skill dispatcher.
This path always posts straight to /api/chat, so it never applies the message/send flow's planning branches, native-tool routing, per-request metadata.projectPath override, or retry/backoff settings. The same skillHint can therefore behave differently depending on whether the caller uses message/send or message/stream.
As per coding guidelines, "Before committing a multi-step plan implementation, verify wiring is complete — CI catches broken code but NOT unwired code."
Also applies to: 892-1155
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/server/src/routes/a2a/index.ts` around lines 591 - 699, The
message/stream handler (handleA2AMessageStream) bypasses the centralized skill
dispatcher by POSTing directly to /api/chat, causing message/stream to skip
planning, native-tool routing, metadata.projectPath overrides, and retry/backoff
used by message/send; update handleA2AMessageStream to route through the same
dispatcher used by message/send (invoke the message/send flow or call the shared
dispatcher/handler function instead of directly calling /api/chat), passing
through skillHint/skillOverride, contextId/correlationId, projectPath from
body.params.metadata, and respecting workflow settings (timeout/backoff)
obtained via getWorkflowSettings so both endpoints share identical routing and
behavior.
| // Proxy text-delta chunks from /api/chat as A2A TaskArtifactUpdateEvents | ||
| const decoder = new TextDecoder(); | ||
| let chunkCount = 0; | ||
|
|
||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
|
|
||
| const text = decoder.decode(value, { stream: true }); | ||
| for (const line of text.split('\n')) { | ||
| const trimmed = line.trim(); | ||
| if (!trimmed.startsWith('data: ')) continue; | ||
| try { | ||
| const payload = JSON.parse(trimmed.slice(6)) as Record<string, unknown>; | ||
| if (payload['type'] === 'text-delta' && typeof payload['delta'] === 'string') { | ||
| writeSSE({ | ||
| jsonrpc: '2.0', | ||
| id: rpcId, | ||
| result: { | ||
| id: taskId, | ||
| contextId: responseContextId, | ||
| artifact: { | ||
| artifactId, | ||
| index: 0, | ||
| parts: [{ kind: 'text', text: payload['delta'] }], | ||
| // append: true on subsequent chunks so clients concatenate them | ||
| append: chunkCount > 0, | ||
| lastChunk: false, | ||
| }, | ||
| final: false, | ||
| }, | ||
| }); | ||
| chunkCount++; | ||
| } | ||
| } catch { | ||
| // non-JSON data line — skip | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This SSE parser will drop split data: frames.
reader.read() boundaries are arbitrary. Parsing each chunk independently means any JSON line split across reads is discarded by the catch, which will intermittently truncate streamed replies. Keep a carry-over buffer and only parse complete lines.
Proposed change
- const decoder = new TextDecoder();
- let chunkCount = 0;
+ const decoder = new TextDecoder();
+ let pending = '';
+ let chunkCount = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;
- const text = decoder.decode(value, { stream: true });
- for (const line of text.split('\n')) {
+ pending += decoder.decode(value, { stream: true });
+ const lines = pending.split('\n');
+ pending = lines.pop() ?? '';
+
+ for (const line of lines) {
const trimmed = line.trim();
if (!trimmed.startsWith('data: ')) continue;
try {
const payload = JSON.parse(trimmed.slice(6)) as Record<string, unknown>;
if (payload['type'] === 'text-delta' && typeof payload['delta'] === 'string') {
@@
}
}
}
+
+ const trailing = pending.trim();
+ if (trailing.startsWith('data: ')) {
+ try {
+ const payload = JSON.parse(trailing.slice(6)) as Record<string, unknown>;
+ if (payload['type'] === 'text-delta' && typeof payload['delta'] === 'string') {
+ writeSSE({
+ jsonrpc: '2.0',
+ id: rpcId,
+ result: {
+ id: taskId,
+ contextId: responseContextId,
+ artifact: {
+ artifactId,
+ index: 0,
+ parts: [{ kind: 'text', text: payload['delta'] }],
+ append: chunkCount > 0,
+ lastChunk: false,
+ },
+ final: false,
+ },
+ });
+ chunkCount++;
+ }
+ } catch {
+ // trailing non-JSON data line — skip
+ }
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/server/src/routes/a2a/index.ts` around lines 728 - 766, The SSE parser
currently decodes and parses each reader.read() chunk independently which drops
JSON lines split across reads; fix the loop in the section around decoder,
chunkCount, reader.read by introducing a carry-over buffer string (e.g., let
buffer = ''), append decoder.decode(value, {stream:true}) to buffer on each
read, split buffer by '\n', process all complete lines except keep the last
partial line back into buffer for the next iteration, and only JSON.parse lines
that start with 'data: ' (preserving existing trimmed check and try/catch around
parsing) before calling writeSSE and incrementing chunkCount so split frames are
not lost.
Summary
The streaming path (message/stream) on https://ava.proto-labs.ai/a2a intermittently returns a completed Task whose status.message.parts[0].text contains raw upstream HTML: "HTTP error for message/send! Status: 404 Not Found. Response: ...
..." instead of an assistant reply.Pattern: first call in a loop succeeds, subsequent calls may return the 404 HTML. Indicates an internal fan-out to a sub-service that is hitting a wrong URL — likely missing /a2a path su...
Created automatically by Automaker
Summary by CodeRabbit
Release Notes