Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 243 additions & 17 deletions apps/server/src/routes/a2a/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ function buildAgentCard() {
url: 'https://github.com/protoLabsAI',
},
capabilities: {
streaming: false,
streaming: true,
pushNotifications: false,
stateTransitionHistory: false,
},
Expand Down Expand Up @@ -553,20 +553,252 @@ async function callChatEndpointWithRetry(
throw lastError ?? new Error('callChatEndpointWithRetry: unreachable');
}

// ─── Body type shared between send and stream handlers ───────────────────────

type A2ARequestBody = {
jsonrpc?: string;
id?: string | number;
method?: string;
params?: {
message?: {
role?: string;
parts?: Array<{ kind?: string; type?: string; text?: string }>;
};
metadata?: Record<string, unknown>;
contextId?: string;
};
};

/** Optional services for planning pipeline skills */
export interface A2AHandlerDeps {
planningService?: PlanningService;
settingsService?: SettingsService;
}

// ─── message/stream handler ───────────────────────────────────────────────────

/**
* Handle message/stream — SSE streaming variant of message/send.
*
* Calls /api/chat internally (which streams text-delta SSE chunks) and proxies
* each chunk as an A2A TaskArtifactUpdateEvent. A final TaskStatusUpdateEvent
* with state="completed" closes the stream.
*
* Error handling: any non-OK response from /api/chat is caught and emitted as a
* JSON-RPC error event — the raw HTML error body is NEVER surfaced as task text.
* This prevents 404 HTML (e.g. "Cannot POST /") from leaking into the A2A result.
*/
async function handleA2AMessageStream(
req: Request,
res: Response,
apiKey: string,
projectPath: string,
body: A2ARequestBody,
rpcId: string | number | null,
deps?: A2AHandlerDeps
): Promise<void> {
const parts = body.params?.message?.parts ?? [];
const userText = extractText(parts);
const skillOverride = body.params?.metadata?.skillHint as string | undefined;
const contextId =
body.params?.contextId ?? (req.headers['x-correlation-id'] as string | undefined);

// Validate params before setting SSE headers (can still use res.json() here)
if (!userText && skillOverride !== 'plan_resume') {
res.status(200).json({
jsonrpc: '2.0',
id: rpcId,
error: {
code: -32602,
message: 'Invalid params: message must contain at least one text part',
},
});
return;
}

if (skillOverride && !DECLARED_SKILL_IDS.has(skillOverride)) {
logger.warn(
`A2A message/stream rejected skill "${skillOverride}" — not in agent card. Declared: [${[...DECLARED_SKILL_IDS].join(', ')}]`
);
res.status(200).json({
jsonrpc: '2.0',
id: rpcId,
error: {
code: -32601,
message:
`Skill "${skillOverride}" is not declared in Ava's agent card. ` +
`Declared skills: ${[...DECLARED_SKILL_IDS].join(', ')}.`,
},
});
return;
}

if (!projectPath) {
res.status(200).json({
jsonrpc: '2.0',
id: rpcId,
error: { code: -32603, message: 'Internal error: projectPath is missing' },
});
return;
}

// Set SSE headers — must happen before any res.write() calls
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Disable nginx/proxy buffering so chunks reach the client immediately
res.setHeader('X-Accel-Buffering', 'no');
res.status(200);

const taskId = randomUUID();
const responseContextId = contextId ?? randomUUID();
const artifactId = randomUUID();

const writeSSE = (data: object): void => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};

// Emit initial "working" status so the client knows the task has started
writeSSE({
jsonrpc: '2.0',
id: rpcId,
result: {
id: taskId,
contextId: responseContextId,
status: { state: 'working' },
final: false,
},
});

logger.info(
`A2A message/stream started: "${(userText || '').slice(0, 80)}${(userText || '').length > 80 ? '…' : ''}" (skill=${skillOverride ?? 'none'}, task=${taskId})`
);

try {
const workflowSettings = await getWorkflowSettings(
projectPath,
deps?.settingsService,
'[A2AStream]'
);
const timeoutMs = workflowSettings.a2aSkillExecution?.timeoutMs ?? A2A_DEFAULT_TIMEOUT_MS;
const baseUrl = `http://localhost:${process.env['PORT'] ?? 3008}`;

const chatRes = await fetch(`${baseUrl}/api/chat`, {
method: 'POST',
signal: AbortSignal.timeout(timeoutMs),
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey,
...(contextId ? { 'X-Correlation-Id': contextId } : {}),
},
body: JSON.stringify({
messages: [{ id: randomUUID(), role: 'user', parts: [{ type: 'text', text: userText }] }],
projectPath,
...(skillOverride ? { skillOverride } : {}),
...(contextId ? { correlationId: contextId } : {}),
}),
Comment on lines +591 to +699
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

message/stream is bypassing the established skill dispatcher.

This path always posts straight to /api/chat, so it never applies the message/send flow's planning branches, native-tool routing, per-request metadata.projectPath override, or retry/backoff settings. The same skillHint can therefore behave differently depending on whether the caller uses message/send or message/stream.

As per coding guidelines, "Before committing a multi-step plan implementation, verify wiring is complete — CI catches broken code but NOT unwired code."

Also applies to: 892-1155

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/routes/a2a/index.ts` around lines 591 - 699, The
message/stream handler (handleA2AMessageStream) bypasses the centralized skill
dispatcher by POSTing directly to /api/chat, causing message/stream to skip
planning, native-tool routing, metadata.projectPath overrides, and retry/backoff
used by message/send; update handleA2AMessageStream to route through the same
dispatcher used by message/send (invoke the message/send flow or call the shared
dispatcher/handler function instead of directly calling /api/chat), passing
through skillHint/skillOverride, contextId/correlationId, projectPath from
body.params.metadata, and respecting workflow settings (timeout/backoff)
obtained via getWorkflowSettings so both endpoints share identical routing and
behavior.

});

if (!chatRes.ok) {
// Catch HTTP errors from /api/chat and emit as a JSON-RPC error event.
// This prevents raw HTML (e.g. "Cannot POST /") from leaking into task text.
const status = chatRes.status;
logger.error(`A2A message/stream: chat endpoint returned ${status} for task ${taskId}`);
await chatRes.text().catch(() => {}); // consume body to close connection cleanly
writeSSE({
jsonrpc: '2.0',
id: rpcId,
error: { code: -32603, message: `chat endpoint returned ${status}` },
});
res.end();
return;
}

const reader = chatRes.body?.getReader();
if (!reader) {
writeSSE({
jsonrpc: '2.0',
id: rpcId,
error: { code: -32603, message: 'No response body from chat endpoint' },
});
res.end();
return;
}

// Proxy text-delta chunks from /api/chat as A2A TaskArtifactUpdateEvents
const decoder = new TextDecoder();
let chunkCount = 0;

while (true) {
const { done, value } = await reader.read();
if (done) break;

const text = decoder.decode(value, { stream: true });
for (const line of text.split('\n')) {
const trimmed = line.trim();
if (!trimmed.startsWith('data: ')) continue;
try {
const payload = JSON.parse(trimmed.slice(6)) as Record<string, unknown>;
if (payload['type'] === 'text-delta' && typeof payload['delta'] === 'string') {
writeSSE({
jsonrpc: '2.0',
id: rpcId,
result: {
id: taskId,
contextId: responseContextId,
artifact: {
artifactId,
index: 0,
parts: [{ kind: 'text', text: payload['delta'] }],
// append: true on subsequent chunks so clients concatenate them
append: chunkCount > 0,
lastChunk: false,
},
final: false,
},
});
chunkCount++;
}
} catch {
// non-JSON data line — skip
}
}
}
Comment on lines +728 to +766
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This SSE parser will drop split data: frames.

reader.read() boundaries are arbitrary. Parsing each chunk independently means any JSON line split across reads is discarded by the catch, which will intermittently truncate streamed replies. Keep a carry-over buffer and only parse complete lines.

Proposed change
-    const decoder = new TextDecoder();
-    let chunkCount = 0;
+    const decoder = new TextDecoder();
+    let pending = '';
+    let chunkCount = 0;

     while (true) {
       const { done, value } = await reader.read();
       if (done) break;

-      const text = decoder.decode(value, { stream: true });
-      for (const line of text.split('\n')) {
+      pending += decoder.decode(value, { stream: true });
+      const lines = pending.split('\n');
+      pending = lines.pop() ?? '';
+
+      for (const line of lines) {
         const trimmed = line.trim();
         if (!trimmed.startsWith('data: ')) continue;
         try {
           const payload = JSON.parse(trimmed.slice(6)) as Record<string, unknown>;
           if (payload['type'] === 'text-delta' && typeof payload['delta'] === 'string') {
@@
         }
       }
     }
+
+    const trailing = pending.trim();
+    if (trailing.startsWith('data: ')) {
+      try {
+        const payload = JSON.parse(trailing.slice(6)) as Record<string, unknown>;
+        if (payload['type'] === 'text-delta' && typeof payload['delta'] === 'string') {
+          writeSSE({
+            jsonrpc: '2.0',
+            id: rpcId,
+            result: {
+              id: taskId,
+              contextId: responseContextId,
+              artifact: {
+                artifactId,
+                index: 0,
+                parts: [{ kind: 'text', text: payload['delta'] }],
+                append: chunkCount > 0,
+                lastChunk: false,
+              },
+              final: false,
+            },
+          });
+          chunkCount++;
+        }
+      } catch {
+        // trailing non-JSON data line — skip
+      }
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/routes/a2a/index.ts` around lines 728 - 766, The SSE parser
currently decodes and parses each reader.read() chunk independently which drops
JSON lines split across reads; fix the loop in the section around decoder,
chunkCount, reader.read by introducing a carry-over buffer string (e.g., let
buffer = ''), append decoder.decode(value, {stream:true}) to buffer on each
read, split buffer by '\n', process all complete lines except keep the last
partial line back into buffer for the next iteration, and only JSON.parse lines
that start with 'data: ' (preserving existing trimmed check and try/catch around
parsing) before calling writeSSE and incrementing chunkCount so split frames are
not lost.


logger.info(`A2A message/stream task ${taskId} completed (${chunkCount} chunks)`);

// Final TaskStatusUpdateEvent signals stream completion to the client
writeSSE({
jsonrpc: '2.0',
id: rpcId,
result: {
id: taskId,
contextId: responseContextId,
status: { state: 'completed' },
final: true,
},
});
} catch (err) {
logger.error(`A2A message/stream error for task ${taskId}:`, err);
writeSSE({
jsonrpc: '2.0',
id: rpcId,
error: { code: -32603, message: 'Internal error' },
});
}

res.end();
}

export function createA2AHandlerRoutes(projectPath: string, deps?: A2AHandlerDeps): Router {
const router = Router();

/**
* POST /a2a
*
* Accepts A2A JSON-RPC messages. Only message/send is implemented — enough
* for gateway delegation. Unknown methods return a JSON-RPC error.
* Accepts A2A JSON-RPC messages: message/send (JSON response) and
* message/stream (SSE streaming response). Unknown methods return a JSON-RPC
* error.
*
* Auth: X-API-Key header (same credential as /api/*).
*/
Expand All @@ -583,30 +815,24 @@ export function createA2AHandlerRoutes(projectPath: string, deps?: A2AHandlerDep
return;
}

const body = req.body as {
jsonrpc?: string;
id?: string | number;
method?: string;
params?: {
message?: {
role?: string;
parts?: Array<{ kind?: string; type?: string; text?: string }>;
};
metadata?: Record<string, unknown>;
contextId?: string;
};
};
const body = req.body as A2ARequestBody;

const rpcId = body.id ?? null;

// message/stream — SSE streaming response (delegates to dedicated handler)
if (body.method === 'message/stream') {
await handleA2AMessageStream(req, res, key, projectPath, body, rpcId, deps);
return;
}

// Only handle message/send — return proper JSON-RPC error for anything else
if (body.method !== 'message/send') {
res.status(200).json({
jsonrpc: '2.0',
id: rpcId,
error: {
code: -32601,
message: `Method not found: ${body.method ?? '(none)'}. Supported: message/send`,
message: `Method not found: ${body.method ?? '(none)'}. Supported: message/send, message/stream`,
},
});
return;
Expand Down
Loading