From 67ebb7a767201542230672370eed607be34dc3ff Mon Sep 17 00:00:00 2001 From: Automaker Date: Tue, 21 Apr 2026 16:59:15 +0000 Subject: [PATCH] fix(a2a): implement message/stream SSE endpoint and prevent 404 HTML leaking as task text MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add handleA2AMessageStream() that proxies /api/chat SSE chunks as A2A TaskArtifactUpdateEvents and closes with a final TaskStatusUpdateEvent (state=completed) - Validate params and skill allowlist BEFORE setting SSE headers so error responses are still plain JSON-RPC when the request is malformed - Catch non-OK responses from /api/chat and emit a JSON-RPC error event — the raw HTML body (e.g. "Cannot POST /") is never surfaced as task text - Update agent card capabilities.streaming: true so callers know to use message/stream directly instead of simulating it via message/send - Update Method Not Found error message to list both supported methods - Extract shared A2ARequestBody type to avoid duplicating the inline cast Closes #471 Co-Authored-By: Claude Sonnet 4.6 --- apps/server/src/routes/a2a/index.ts | 260 ++++++++++++++++++++++++++-- 1 file changed, 243 insertions(+), 17 deletions(-) diff --git a/apps/server/src/routes/a2a/index.ts b/apps/server/src/routes/a2a/index.ts index 4a0723b95..73db157de 100644 --- a/apps/server/src/routes/a2a/index.ts +++ b/apps/server/src/routes/a2a/index.ts @@ -242,7 +242,7 @@ function buildAgentCard() { url: 'https://github.com/protoLabsAI', }, capabilities: { - streaming: false, + streaming: true, pushNotifications: false, stateTransitionHistory: false, }, @@ -553,20 +553,252 @@ async function callChatEndpointWithRetry( throw lastError ?? new Error('callChatEndpointWithRetry: unreachable'); } +// ─── Body type shared between send and stream handlers ─────────────────────── + +type A2ARequestBody = { + jsonrpc?: string; + id?: string | number; + method?: string; + params?: { + message?: { + role?: string; + parts?: Array<{ kind?: string; type?: string; text?: string }>; + }; + metadata?: Record; + contextId?: string; + }; +}; + /** Optional services for planning pipeline skills */ export interface A2AHandlerDeps { planningService?: PlanningService; settingsService?: SettingsService; } +// ─── message/stream handler ─────────────────────────────────────────────────── + +/** + * Handle message/stream — SSE streaming variant of message/send. + * + * Calls /api/chat internally (which streams text-delta SSE chunks) and proxies + * each chunk as an A2A TaskArtifactUpdateEvent. A final TaskStatusUpdateEvent + * with state="completed" closes the stream. + * + * Error handling: any non-OK response from /api/chat is caught and emitted as a + * JSON-RPC error event — the raw HTML error body is NEVER surfaced as task text. + * This prevents 404 HTML (e.g. "Cannot POST /") from leaking into the A2A result. + */ +async function handleA2AMessageStream( + req: Request, + res: Response, + apiKey: string, + projectPath: string, + body: A2ARequestBody, + rpcId: string | number | null, + deps?: A2AHandlerDeps +): Promise { + const parts = body.params?.message?.parts ?? []; + const userText = extractText(parts); + const skillOverride = body.params?.metadata?.skillHint as string | undefined; + const contextId = + body.params?.contextId ?? (req.headers['x-correlation-id'] as string | undefined); + + // Validate params before setting SSE headers (can still use res.json() here) + if (!userText && skillOverride !== 'plan_resume') { + res.status(200).json({ + jsonrpc: '2.0', + id: rpcId, + error: { + code: -32602, + message: 'Invalid params: message must contain at least one text part', + }, + }); + return; + } + + if (skillOverride && !DECLARED_SKILL_IDS.has(skillOverride)) { + logger.warn( + `A2A message/stream rejected skill "${skillOverride}" — not in agent card. Declared: [${[...DECLARED_SKILL_IDS].join(', ')}]` + ); + res.status(200).json({ + jsonrpc: '2.0', + id: rpcId, + error: { + code: -32601, + message: + `Skill "${skillOverride}" is not declared in Ava's agent card. ` + + `Declared skills: ${[...DECLARED_SKILL_IDS].join(', ')}.`, + }, + }); + return; + } + + if (!projectPath) { + res.status(200).json({ + jsonrpc: '2.0', + id: rpcId, + error: { code: -32603, message: 'Internal error: projectPath is missing' }, + }); + return; + } + + // Set SSE headers — must happen before any res.write() calls + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + // Disable nginx/proxy buffering so chunks reach the client immediately + res.setHeader('X-Accel-Buffering', 'no'); + res.status(200); + + const taskId = randomUUID(); + const responseContextId = contextId ?? randomUUID(); + const artifactId = randomUUID(); + + const writeSSE = (data: object): void => { + res.write(`data: ${JSON.stringify(data)}\n\n`); + }; + + // Emit initial "working" status so the client knows the task has started + writeSSE({ + jsonrpc: '2.0', + id: rpcId, + result: { + id: taskId, + contextId: responseContextId, + status: { state: 'working' }, + final: false, + }, + }); + + logger.info( + `A2A message/stream started: "${(userText || '').slice(0, 80)}${(userText || '').length > 80 ? '…' : ''}" (skill=${skillOverride ?? 'none'}, task=${taskId})` + ); + + try { + const workflowSettings = await getWorkflowSettings( + projectPath, + deps?.settingsService, + '[A2AStream]' + ); + const timeoutMs = workflowSettings.a2aSkillExecution?.timeoutMs ?? A2A_DEFAULT_TIMEOUT_MS; + const baseUrl = `http://localhost:${process.env['PORT'] ?? 3008}`; + + const chatRes = await fetch(`${baseUrl}/api/chat`, { + method: 'POST', + signal: AbortSignal.timeout(timeoutMs), + headers: { + 'Content-Type': 'application/json', + 'X-API-Key': apiKey, + ...(contextId ? { 'X-Correlation-Id': contextId } : {}), + }, + body: JSON.stringify({ + messages: [{ id: randomUUID(), role: 'user', parts: [{ type: 'text', text: userText }] }], + projectPath, + ...(skillOverride ? { skillOverride } : {}), + ...(contextId ? { correlationId: contextId } : {}), + }), + }); + + if (!chatRes.ok) { + // Catch HTTP errors from /api/chat and emit as a JSON-RPC error event. + // This prevents raw HTML (e.g. "Cannot POST /") from leaking into task text. + const status = chatRes.status; + logger.error(`A2A message/stream: chat endpoint returned ${status} for task ${taskId}`); + await chatRes.text().catch(() => {}); // consume body to close connection cleanly + writeSSE({ + jsonrpc: '2.0', + id: rpcId, + error: { code: -32603, message: `chat endpoint returned ${status}` }, + }); + res.end(); + return; + } + + const reader = chatRes.body?.getReader(); + if (!reader) { + writeSSE({ + jsonrpc: '2.0', + id: rpcId, + error: { code: -32603, message: 'No response body from chat endpoint' }, + }); + res.end(); + return; + } + + // Proxy text-delta chunks from /api/chat as A2A TaskArtifactUpdateEvents + const decoder = new TextDecoder(); + let chunkCount = 0; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const text = decoder.decode(value, { stream: true }); + for (const line of text.split('\n')) { + const trimmed = line.trim(); + if (!trimmed.startsWith('data: ')) continue; + try { + const payload = JSON.parse(trimmed.slice(6)) as Record; + if (payload['type'] === 'text-delta' && typeof payload['delta'] === 'string') { + writeSSE({ + jsonrpc: '2.0', + id: rpcId, + result: { + id: taskId, + contextId: responseContextId, + artifact: { + artifactId, + index: 0, + parts: [{ kind: 'text', text: payload['delta'] }], + // append: true on subsequent chunks so clients concatenate them + append: chunkCount > 0, + lastChunk: false, + }, + final: false, + }, + }); + chunkCount++; + } + } catch { + // non-JSON data line — skip + } + } + } + + logger.info(`A2A message/stream task ${taskId} completed (${chunkCount} chunks)`); + + // Final TaskStatusUpdateEvent signals stream completion to the client + writeSSE({ + jsonrpc: '2.0', + id: rpcId, + result: { + id: taskId, + contextId: responseContextId, + status: { state: 'completed' }, + final: true, + }, + }); + } catch (err) { + logger.error(`A2A message/stream error for task ${taskId}:`, err); + writeSSE({ + jsonrpc: '2.0', + id: rpcId, + error: { code: -32603, message: 'Internal error' }, + }); + } + + res.end(); +} + export function createA2AHandlerRoutes(projectPath: string, deps?: A2AHandlerDeps): Router { const router = Router(); /** * POST /a2a * - * Accepts A2A JSON-RPC messages. Only message/send is implemented — enough - * for gateway delegation. Unknown methods return a JSON-RPC error. + * Accepts A2A JSON-RPC messages: message/send (JSON response) and + * message/stream (SSE streaming response). Unknown methods return a JSON-RPC + * error. * * Auth: X-API-Key header (same credential as /api/*). */ @@ -583,22 +815,16 @@ export function createA2AHandlerRoutes(projectPath: string, deps?: A2AHandlerDep return; } - const body = req.body as { - jsonrpc?: string; - id?: string | number; - method?: string; - params?: { - message?: { - role?: string; - parts?: Array<{ kind?: string; type?: string; text?: string }>; - }; - metadata?: Record; - contextId?: string; - }; - }; + const body = req.body as A2ARequestBody; const rpcId = body.id ?? null; + // message/stream — SSE streaming response (delegates to dedicated handler) + if (body.method === 'message/stream') { + await handleA2AMessageStream(req, res, key, projectPath, body, rpcId, deps); + return; + } + // Only handle message/send — return proper JSON-RPC error for anything else if (body.method !== 'message/send') { res.status(200).json({ @@ -606,7 +832,7 @@ export function createA2AHandlerRoutes(projectPath: string, deps?: A2AHandlerDep id: rpcId, error: { code: -32601, - message: `Method not found: ${body.method ?? '(none)'}. Supported: message/send`, + message: `Method not found: ${body.method ?? '(none)'}. Supported: message/send, message/stream`, }, }); return;