Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/api/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
linkInstancesEvent,
} from '../runtime/module.js';
import { isNodeEnv } from '../utils/runtime.js';
import { parseAndEvaluateStatement, Result } from '../runtime/interpreter.js';
import { Environment, parseAndEvaluateStatement, Result } from '../runtime/interpreter.js';
import { ApplicationSpec } from '../runtime/loader.js';
import { logger } from '../runtime/logger.js';
import { requireAuth, verifySession } from '../runtime/modules/auth.js';
Expand Down Expand Up @@ -53,6 +53,7 @@ import {
} from '../runtime/defs.js';
import { evaluate } from '../runtime/interpreter.js';
import { Config } from '../runtime/state.js';
import { SseEmitter, isStreamingRequest } from './sse.js';
import {
findFileByFilename,
createFileRecord,
Expand All @@ -72,7 +73,7 @@ export async function startServer(
app.use((req, res, next) => {
res.header('Access-Control-Allow-Origin', '*');
res.header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization');
res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization, Accept');

// Handle preflight requests
if (req.method === 'OPTIONS') {
Expand Down Expand Up @@ -346,6 +347,20 @@ function internalError(res: Response) {
};
}

function okSse(emitter: SseEmitter) {
return (value: Result) => {
const result: Result = normalizedResult(value);
emitter.sendResult(result);
};
}

function internalErrorSse(emitter: SseEmitter) {
return (reason: any) => {
logger.error(reason);
emitter.sendError(reason.message, statusFromErrorType(reason));
};
}

function patternFromAttributes(
moduleName: string,
recName: string,
Expand Down Expand Up @@ -423,7 +438,15 @@ async function handleEventPost(
eventName,
objectAsInstanceAttributes(req.body)
).setAuthContext(sessionInfo);
evaluate(inst, ok(res)).catch(internalError(res));
if (isStreamingRequest(req.headers.accept)) {
const emitter = new SseEmitter(res);
emitter.initialize();
const env = new Environment(eventName + '.sse');
env.setSseEmitter(emitter);
evaluate(inst, okSse(emitter), env).catch(internalErrorSse(emitter));
} else {
evaluate(inst, ok(res)).catch(internalError(res));
}
} catch (err: any) {
logger.error(err);
res.status(500).send(err.toString());
Expand Down
64 changes: 64 additions & 0 deletions src/api/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Response } from 'express';

export class SseEmitter {
private res: Response;
private closed: boolean = false;
private keepAliveTimer: ReturnType<typeof setInterval> | undefined;

constructor(res: Response) {
this.res = res;
}

initialize(): void {
this.res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
this.res.write(': connected\n\n');

this.keepAliveTimer = setInterval(() => {
if (!this.closed) {
this.res.write(': keepalive\n\n');
}
}, 15000);

this.res.on('close', () => {
this.close();
});
}

send(eventType: string, data: object): void {
if (this.closed) return;
this.res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`);
}

sendResult(result: any): void {
this.send('result', result);
this.close();
}

sendError(error: string, statusCode?: number): void {
this.send('error', { error, statusCode });
this.close();
}

close(): void {
if (this.closed) return;
this.closed = true;
if (this.keepAliveTimer) {
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = undefined;
}
this.res.end();
}

isClosed(): boolean {
return this.closed;
}
}

export function isStreamingRequest(acceptHeader: string | undefined): boolean {
return acceptHeader !== undefined && acceptHeader.includes('text/event-stream');
}
7 changes: 7 additions & 0 deletions src/runtime/exec-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ export async function executeGraph(execGraph: ExecGraph, env: Environment): Prom
env.appendEntryToMonitor(node.codeStr);
}
}
if (monitorIncr) {
env.emitSseEvent('agent_start', { step: node.codeStr });
}
try {
if (node.subGraphIndex == -1) {
await evaluateStatement(node.code as Statement, env);
Expand Down Expand Up @@ -301,12 +304,16 @@ export async function executeGraph(execGraph: ExecGraph, env: Environment): Prom
}
} catch (reason: any) {
if (monitoringEnabled) env.setMonitorEntryError(reason);
env.emitSseEvent('error', { step: node.codeStr, error: `${reason}` });
throw reason;
} finally {
if (monitoringEnabled) {
if (monitorIncr) env.decrementMonitor();
env.setMonitorEntryResult(env.getLastResult());
}
if (monitorIncr) {
env.emitSseEvent('agent_complete', { step: node.codeStr });
}
}
}
} finally {
Expand Down
26 changes: 26 additions & 0 deletions src/runtime/interpreter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ import { fetchDoc } from './docs.js';
import { FlowSpec, FlowStep, getAgentFlow } from './agents/flows.js';
import { isMonitoringEnabled } from './state.js';
import { Monitor, MonitorEntry } from './monitor.js';
import { SseEmitter } from '../api/sse.js';
import { detailedDiff } from 'deep-object-diff';
import { callMcpTool, mcpClientNameFromToolEvent } from './mcpclient.js';
import { isNodeEnv } from '../utils/runtime.js';
Expand Down Expand Up @@ -174,6 +175,7 @@ export class Environment extends Instance {
private agentMode: 'chat' | 'planner' | undefined = undefined;
private agentChatId: string | undefined = undefined;
private monitor: Monitor | undefined = undefined;
private sseEmitter: SseEmitter | undefined = undefined;
private escalatedRole: string | undefined;
private activeChatId: string | undefined;

Expand Down Expand Up @@ -202,6 +204,7 @@ export class Environment extends Instance {
this.eventExecutor = parent.eventExecutor;
this.agentChatId = parent.agentChatId;
this.monitor = parent.monitor;
this.sseEmitter = parent.sseEmitter;
this.escalatedRole = parent.escalatedRole;
this.activeChatId = parent.activeChatId;
} else {
Expand Down Expand Up @@ -935,6 +938,21 @@ export class Environment extends Instance {
}
return this;
}

setSseEmitter(emitter: SseEmitter): Environment {
this.sseEmitter = emitter;
return this;
}

getSseEmitter(): SseEmitter | undefined {
return this.sseEmitter;
}

emitSseEvent(eventType: string, data: object): void {
if (this.sseEmitter && !this.sseEmitter.isClosed()) {
this.sseEmitter.send(eventType, data);
}
}
}

export const GlobalEnvironment = new Environment();
Expand Down Expand Up @@ -2041,11 +2059,14 @@ async function agentInvoke(agent: AgentInstance, msg: string, env: Environment):
console.debug(invokeDebugMsg);
//

env.emitSseEvent('agent_start', { agent: agent.name });

const monitoringEnabled = isMonitoringEnabled();

await agent.invoke(msg, env);
let result: string | undefined = env.getLastResult();
logger.debug(`Agent ${agent.name} result: ${result}`);
env.emitSseEvent('agent_complete', { agent: agent.name });

const isPlanner = !env.isInAgentChatMode() && agent.isPlanner();
const stmtsExec = env.getStatementsExecutor();
Expand Down Expand Up @@ -2282,12 +2303,17 @@ async function iterateOnFlow(
if (monitoringEnabled) {
env.appendEntryToMonitor(step);
}
env.emitSseEvent('flow_step_start', { step, agent: agent.name });
const inst = agent.swapInstruction('');
await agentInvoke(agent, inst, env);
} else {
rootAgent.maybeAddScratchData(env);
}
if (monitoringEnabled) env.setMonitorEntryResult(env.getLastResult());
env.emitSseEvent('flow_step_complete', {
step,
result: maybeInstanceAsString(env.getLastResult()),
});
if (env.isSuspended()) {
console.debug(`${iterId} suspending iteration on step ${step}`);
await saveFlowSuspension(rootAgent, context, step, env);
Expand Down
12 changes: 12 additions & 0 deletions src/runtime/modules/ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -942,13 +942,25 @@ Only return a pure JSON object with no extra text, annotations etc.`;
const u = response.sysMsg.usage_metadata;
env.setMonitorEntryLlmTokenUsage(u.input_tokens, u.output_tokens, u.total_tokens);
}
env.emitSseEvent('llm_response', {
agent: this.name,
content: response.content,
tokens: response.sysMsg.usage_metadata
? {
input: response.sysMsg.usage_metadata.input_tokens,
output: response.sysMsg.usage_metadata.output_tokens,
total: response.sysMsg.usage_metadata.total_tokens,
}
: undefined,
});
if (this.saveResponseAs) {
await saveAgentResponse(this.saveResponseAs, response.content, env);
}
env.setLastResult(response.content);
} catch (err: any) {
logger.error(`Error while invoking ${agentName} - ${err}`);
if (monitoringEnabled) env.setMonitorEntryError(`${err}`);
env.emitSseEvent('error', { agent: this.name, error: `${err}` });
env.setLastResult(undefined);
}
} else {
Expand Down
Loading