diff --git a/src/api/http.ts b/src/api/http.ts index 8e680a13..a7842a0e 100644 --- a/src/api/http.ts +++ b/src/api/http.ts @@ -21,7 +21,7 @@ import { linkInstancesEvent, } from '../runtime/module.js'; import { isNodeEnv } from '../utils/runtime.js'; -import { parseAndEvaluateStatement, Result } from '../runtime/interpreter.js'; +import { Environment, parseAndEvaluateStatement, Result } from '../runtime/interpreter.js'; import { ApplicationSpec } from '../runtime/loader.js'; import { logger } from '../runtime/logger.js'; import { requireAuth, verifySession } from '../runtime/modules/auth.js'; @@ -53,6 +53,7 @@ import { } from '../runtime/defs.js'; import { evaluate } from '../runtime/interpreter.js'; import { Config } from '../runtime/state.js'; +import { SseEmitter, isStreamingRequest } from './sse.js'; import { findFileByFilename, createFileRecord, @@ -72,7 +73,7 @@ export async function startServer( app.use((req, res, next) => { res.header('Access-Control-Allow-Origin', '*'); res.header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS'); - res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization'); + res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization, Accept'); // Handle preflight requests if (req.method === 'OPTIONS') { @@ -346,6 +347,20 @@ function internalError(res: Response) { }; } +function okSse(emitter: SseEmitter) { + return (value: Result) => { + const result: Result = normalizedResult(value); + emitter.sendResult(result); + }; +} + +function internalErrorSse(emitter: SseEmitter) { + return (reason: any) => { + logger.error(reason); + emitter.sendError(reason.message, statusFromErrorType(reason)); + }; +} + function patternFromAttributes( moduleName: string, recName: string, @@ -423,7 +438,15 @@ async function handleEventPost( eventName, objectAsInstanceAttributes(req.body) ).setAuthContext(sessionInfo); - evaluate(inst, ok(res)).catch(internalError(res)); + if (isStreamingRequest(req.headers.accept)) { + const emitter = new SseEmitter(res); + emitter.initialize(); + const env = new Environment(eventName + '.sse'); + env.setSseEmitter(emitter); + evaluate(inst, okSse(emitter), env).catch(internalErrorSse(emitter)); + } else { + evaluate(inst, ok(res)).catch(internalError(res)); + } } catch (err: any) { logger.error(err); res.status(500).send(err.toString()); diff --git a/src/api/sse.ts b/src/api/sse.ts new file mode 100644 index 00000000..c12a89c9 --- /dev/null +++ b/src/api/sse.ts @@ -0,0 +1,64 @@ +import { Response } from 'express'; + +export class SseEmitter { + private res: Response; + private closed: boolean = false; + private keepAliveTimer: ReturnType | undefined; + + constructor(res: Response) { + this.res = res; + } + + initialize(): void { + this.res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + this.res.write(': connected\n\n'); + + this.keepAliveTimer = setInterval(() => { + if (!this.closed) { + this.res.write(': keepalive\n\n'); + } + }, 15000); + + this.res.on('close', () => { + this.close(); + }); + } + + send(eventType: string, data: object): void { + if (this.closed) return; + this.res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`); + } + + sendResult(result: any): void { + this.send('result', result); + this.close(); + } + + sendError(error: string, statusCode?: number): void { + this.send('error', { error, statusCode }); + this.close(); + } + + close(): void { + if (this.closed) return; + this.closed = true; + if (this.keepAliveTimer) { + clearInterval(this.keepAliveTimer); + this.keepAliveTimer = undefined; + } + this.res.end(); + } + + isClosed(): boolean { + return this.closed; + } +} + +export function isStreamingRequest(acceptHeader: string | undefined): boolean { + return acceptHeader !== undefined && acceptHeader.includes('text/event-stream'); +} diff --git a/src/runtime/exec-graph.ts b/src/runtime/exec-graph.ts index fff3d7d2..5570ff17 100644 --- a/src/runtime/exec-graph.ts +++ b/src/runtime/exec-graph.ts @@ -248,6 +248,9 @@ export async function executeGraph(execGraph: ExecGraph, env: Environment): Prom env.appendEntryToMonitor(node.codeStr); } } + if (monitorIncr) { + env.emitSseEvent('agent_start', { step: node.codeStr }); + } try { if (node.subGraphIndex == -1) { await evaluateStatement(node.code as Statement, env); @@ -301,12 +304,16 @@ export async function executeGraph(execGraph: ExecGraph, env: Environment): Prom } } catch (reason: any) { if (monitoringEnabled) env.setMonitorEntryError(reason); + env.emitSseEvent('error', { step: node.codeStr, error: `${reason}` }); throw reason; } finally { if (monitoringEnabled) { if (monitorIncr) env.decrementMonitor(); env.setMonitorEntryResult(env.getLastResult()); } + if (monitorIncr) { + env.emitSseEvent('agent_complete', { step: node.codeStr }); + } } } } finally { diff --git a/src/runtime/interpreter.ts b/src/runtime/interpreter.ts index c338fdb4..33ae48a2 100644 --- a/src/runtime/interpreter.ts +++ b/src/runtime/interpreter.ts @@ -116,6 +116,7 @@ import { fetchDoc } from './docs.js'; import { FlowSpec, FlowStep, getAgentFlow } from './agents/flows.js'; import { isMonitoringEnabled } from './state.js'; import { Monitor, MonitorEntry } from './monitor.js'; +import { SseEmitter } from '../api/sse.js'; import { detailedDiff } from 'deep-object-diff'; import { callMcpTool, mcpClientNameFromToolEvent } from './mcpclient.js'; import { isNodeEnv } from '../utils/runtime.js'; @@ -174,6 +175,7 @@ export class Environment extends Instance { private agentMode: 'chat' | 'planner' | undefined = undefined; private agentChatId: string | undefined = undefined; private monitor: Monitor | undefined = undefined; + private sseEmitter: SseEmitter | undefined = undefined; private escalatedRole: string | undefined; private activeChatId: string | undefined; @@ -202,6 +204,7 @@ export class Environment extends Instance { this.eventExecutor = parent.eventExecutor; this.agentChatId = parent.agentChatId; this.monitor = parent.monitor; + this.sseEmitter = parent.sseEmitter; this.escalatedRole = parent.escalatedRole; this.activeChatId = parent.activeChatId; } else { @@ -935,6 +938,21 @@ export class Environment extends Instance { } return this; } + + setSseEmitter(emitter: SseEmitter): Environment { + this.sseEmitter = emitter; + return this; + } + + getSseEmitter(): SseEmitter | undefined { + return this.sseEmitter; + } + + emitSseEvent(eventType: string, data: object): void { + if (this.sseEmitter && !this.sseEmitter.isClosed()) { + this.sseEmitter.send(eventType, data); + } + } } export const GlobalEnvironment = new Environment(); @@ -2041,11 +2059,14 @@ async function agentInvoke(agent: AgentInstance, msg: string, env: Environment): console.debug(invokeDebugMsg); // + env.emitSseEvent('agent_start', { agent: agent.name }); + const monitoringEnabled = isMonitoringEnabled(); await agent.invoke(msg, env); let result: string | undefined = env.getLastResult(); logger.debug(`Agent ${agent.name} result: ${result}`); + env.emitSseEvent('agent_complete', { agent: agent.name }); const isPlanner = !env.isInAgentChatMode() && agent.isPlanner(); const stmtsExec = env.getStatementsExecutor(); @@ -2282,12 +2303,17 @@ async function iterateOnFlow( if (monitoringEnabled) { env.appendEntryToMonitor(step); } + env.emitSseEvent('flow_step_start', { step, agent: agent.name }); const inst = agent.swapInstruction(''); await agentInvoke(agent, inst, env); } else { rootAgent.maybeAddScratchData(env); } if (monitoringEnabled) env.setMonitorEntryResult(env.getLastResult()); + env.emitSseEvent('flow_step_complete', { + step, + result: maybeInstanceAsString(env.getLastResult()), + }); if (env.isSuspended()) { console.debug(`${iterId} suspending iteration on step ${step}`); await saveFlowSuspension(rootAgent, context, step, env); diff --git a/src/runtime/modules/ai.ts b/src/runtime/modules/ai.ts index 2c24bd67..ee2c23c0 100644 --- a/src/runtime/modules/ai.ts +++ b/src/runtime/modules/ai.ts @@ -942,6 +942,17 @@ Only return a pure JSON object with no extra text, annotations etc.`; const u = response.sysMsg.usage_metadata; env.setMonitorEntryLlmTokenUsage(u.input_tokens, u.output_tokens, u.total_tokens); } + env.emitSseEvent('llm_response', { + agent: this.name, + content: response.content, + tokens: response.sysMsg.usage_metadata + ? { + input: response.sysMsg.usage_metadata.input_tokens, + output: response.sysMsg.usage_metadata.output_tokens, + total: response.sysMsg.usage_metadata.total_tokens, + } + : undefined, + }); if (this.saveResponseAs) { await saveAgentResponse(this.saveResponseAs, response.content, env); } @@ -949,6 +960,7 @@ Only return a pure JSON object with no extra text, annotations etc.`; } catch (err: any) { logger.error(`Error while invoking ${agentName} - ${err}`); if (monitoringEnabled) env.setMonitorEntryError(`${err}`); + env.emitSseEvent('error', { agent: this.name, error: `${err}` }); env.setLastResult(undefined); } } else { diff --git a/test/api/sse.test.ts b/test/api/sse.test.ts new file mode 100644 index 00000000..5633311f --- /dev/null +++ b/test/api/sse.test.ts @@ -0,0 +1,170 @@ +import { assert, describe, test } from 'vitest'; +import { SseEmitter, isStreamingRequest } from '../../src/api/sse.js'; + +function mockResponse(): { res: any; written: string[]; state: { ended: boolean; headers: any } } { + const state = { + written: [] as string[], + ended: false, + headers: {} as any, + closeHandler: null as Function | null, + }; + const res = { + writeHead(_status: number, headers: any) { + state.headers = headers; + }, + write(chunk: string) { + state.written.push(chunk); + }, + end() { + state.ended = true; + }, + on(event: string, handler: Function) { + if (event === 'close') state.closeHandler = handler; + }, + }; + return { res, written: state.written, state }; +} + +describe('SseEmitter', () => { + test('initialize writes SSE headers and connected comment', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + assert( + mock.state.headers['Content-Type'] === 'text/event-stream', + 'Content-Type should be text/event-stream' + ); + assert(mock.state.headers['Cache-Control'] === 'no-cache', 'Cache-Control should be no-cache'); + assert(mock.state.headers['Connection'] === 'keep-alive', 'Connection should be keep-alive'); + assert(mock.state.headers['X-Accel-Buffering'] === 'no', 'X-Accel-Buffering should be no'); + assert(mock.written.length >= 1, 'Should have written connected comment'); + assert(mock.written[0] === ': connected\n\n', 'First write should be connected comment'); + + emitter.close(); + }); + + test('send writes event and data in SSE format', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + emitter.send('my_event', { foo: 'bar' }); + + const eventMsg = mock.written.find(w => w.startsWith('event:')); + assert(eventMsg !== undefined, 'Should have written an event message'); + assert( + eventMsg === 'event: my_event\ndata: {"foo":"bar"}\n\n', + 'Event format should be correct' + ); + + emitter.close(); + }); + + test('send accepts arbitrary event type strings', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + emitter.send('custom_progress_update', { step: 1, total: 5 }); + emitter.send('another-type', { x: true }); + + const events = mock.written.filter(w => w.startsWith('event:')); + assert(events.length === 2, 'Should have written two events'); + assert( + events[0].startsWith('event: custom_progress_update\n'), + 'First event type should match' + ); + assert(events[1].startsWith('event: another-type\n'), 'Second event type should match'); + + emitter.close(); + }); + + test('send no-ops after close', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + emitter.close(); + const countBefore = mock.written.length; + + emitter.send('should_not_appear', { data: 'ignored' }); + assert(mock.written.length === countBefore, 'No new writes after close'); + }); + + test('close is idempotent', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + assert(!emitter.isClosed(), 'Should not be closed initially'); + emitter.close(); + assert(emitter.isClosed(), 'Should be closed after first close'); + emitter.close(); + assert(emitter.isClosed(), 'Should still be closed after second close'); + }); + + test('sendResult sends result event then closes', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + emitter.sendResult({ answer: 42 }); + + const resultEvent = mock.written.find(w => w.startsWith('event: result\n')); + assert(resultEvent !== undefined, 'Should have written a result event'); + assert(resultEvent!.includes('"answer":42'), 'Result should contain the data'); + assert(emitter.isClosed(), 'Should be closed after sendResult'); + }); + + test('sendError sends error event with statusCode then closes', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + emitter.sendError('something broke', 500); + + const errorEvent = mock.written.find(w => w.startsWith('event: error\n')); + assert(errorEvent !== undefined, 'Should have written an error event'); + assert(errorEvent!.includes('"error":"something broke"'), 'Error message should be present'); + assert(errorEvent!.includes('"statusCode":500'), 'Status code should be present'); + assert(emitter.isClosed(), 'Should be closed after sendError'); + }); + + test('client disconnect sets closed via res close handler', () => { + let closeHandler: Function | null = null; + const res = { + writeHead() {}, + write() {}, + end() {}, + on(event: string, handler: Function) { + if (event === 'close') closeHandler = handler; + }, + }; + const emitter = new SseEmitter(res as any); + emitter.initialize(); + + assert(!emitter.isClosed(), 'Should not be closed before client disconnect'); + assert(closeHandler !== null, 'Should have registered a close handler'); + closeHandler!(); + assert(emitter.isClosed(), 'Should be closed after client disconnect'); + }); +}); + +describe('isStreamingRequest', () => { + test('returns true for text/event-stream', () => { + assert(isStreamingRequest('text/event-stream') === true); + }); + + test('returns true when text/event-stream is part of a list', () => { + assert(isStreamingRequest('text/event-stream, application/json') === true); + }); + + test('returns false for application/json', () => { + assert(isStreamingRequest('application/json') === false); + }); + + test('returns false for undefined', () => { + assert(isStreamingRequest(undefined) === false); + }); +}); diff --git a/test/runtime/sse.test.ts b/test/runtime/sse.test.ts new file mode 100644 index 00000000..2576e38a --- /dev/null +++ b/test/runtime/sse.test.ts @@ -0,0 +1,143 @@ +import { assert, describe, test } from 'vitest'; +import { Environment, evaluate } from '../../src/runtime/interpreter.js'; +import { SseEmitter } from '../../src/api/sse.js'; +import { doInternModule } from '../util.js'; +import { makeInstance, newInstanceAttributes } from '../../src/runtime/module.js'; + +function mockResponse(): { res: any; written: string[] } { + const state = { written: [] as string[] }; + const res = { + writeHead() {}, + write(chunk: string) { + state.written.push(chunk); + }, + end() {}, + on() {}, + }; + return { res, written: state.written }; +} + +function collectSseEvents(written: string[]): Array<{ type: string; data: any }> { + return written + .filter(w => w.startsWith('event:')) + .map(w => { + const lines = w.trim().split('\n'); + const type = lines[0].replace('event: ', ''); + const data = JSON.parse(lines[1].replace('data: ', '')); + return { type, data }; + }); +} + +describe('Environment.emitSseEvent', () => { + test('no-ops when no emitter is set', () => { + const env = new Environment('test-env'); + // Should not throw + env.emitSseEvent('some_event', { key: 'value' }); + }); + + test('emits event when emitter is set', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + const env = new Environment('test-env'); + env.setSseEmitter(emitter); + env.emitSseEvent('test_event', { hello: 'world' }); + + const events = collectSseEvents(mock.written); + assert(events.length === 1, 'Should have one event'); + assert(events[0].type === 'test_event', 'Event type should match'); + assert(events[0].data.hello === 'world', 'Event data should match'); + + emitter.close(); + }); + + test('no-ops when emitter is closed', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + const env = new Environment('test-env'); + env.setSseEmitter(emitter); + emitter.close(); + + const countBefore = mock.written.length; + env.emitSseEvent('should_not_appear', { ignored: true }); + assert(mock.written.length === countBefore, 'No new writes after emitter is closed'); + }); + + test('child environment inherits sseEmitter from parent', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + const parent = new Environment('parent'); + parent.setSseEmitter(emitter); + + const child = new Environment('child', parent); + child.emitSseEvent('child_event', { from: 'child' }); + + const events = collectSseEvents(mock.written); + assert(events.length === 1, 'Child should emit via inherited emitter'); + assert(events[0].type === 'child_event', 'Event type from child should match'); + + emitter.close(); + }); + + test('getSseEmitter returns the set emitter', () => { + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + + const env = new Environment('test-env'); + assert(env.getSseEmitter() === undefined, 'Should be undefined initially'); + + env.setSseEmitter(emitter); + assert(env.getSseEmitter() === emitter, 'Should return the set emitter'); + }); +}); + +describe('SSE integration with evaluate', () => { + test('evaluate with SSE emitter produces result event', async () => { + await doInternModule( + 'SseTest01', + `entity Item { + id Int @id, + name String + } + workflow CreateItem { + {Item {id CreateItem.id, name CreateItem.name}} + } + ` + ); + + const mock = mockResponse(); + const emitter = new SseEmitter(mock.res); + emitter.initialize(); + + const env = new Environment('sse-test'); + env.setSseEmitter(emitter); + + const inst = makeInstance( + 'SseTest01', + 'CreateItem', + newInstanceAttributes().set('id', 1).set('name', 'TestItem') + ); + + let resultReceived: any = undefined; + await evaluate( + inst, + (value: any) => { + resultReceived = value; + }, + env + ); + + assert(resultReceived !== undefined, 'Should have received a result via continuation'); + + // The emitter should still be functional (not closed by evaluate itself — + // that's the HTTP handler's job via okSse/internalErrorSse) + assert(!emitter.isClosed(), 'Emitter should still be open after evaluate'); + + emitter.close(); + }); +});