diff --git a/.changeset/configurable-stream-flush-interval.md b/.changeset/configurable-stream-flush-interval.md new file mode 100644 index 0000000000..2435323ac6 --- /dev/null +++ b/.changeset/configurable-stream-flush-interval.md @@ -0,0 +1,8 @@ +--- +"@workflow/world": patch +"@workflow/core": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +--- + +Add `streamFlushIntervalMs` option to `Streamer` interface, allowing non-Vercel world implementations to configure the stream write buffer flush interval. Defaults to 10ms (unchanged behavior). Set to 0 for immediate flushing on backends with sub-millisecond writes. diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 3ea6939c76..1c44df63f6 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -522,7 +522,7 @@ export class WorkflowServerWritableStream extends WritableStream { for (const w of currentWaiters) w.reject(err); } ); - }, STREAM_FLUSH_INTERVAL_MS); + }, world.streamFlushIntervalMs ?? STREAM_FLUSH_INTERVAL_MS); }; super({ @@ -570,8 +570,7 @@ export class WorkflowServerWritableStream extends WritableStream { // unsettled promise because the cleared timer will never fire. const waiters = flushWaiters; flushWaiters = []; - const abortError = - reason ?? new Error("Stream aborted"); + const abortError = reason ?? new Error('Stream aborted'); for (const w of waiters) w.reject(abortError); }, }); diff --git a/packages/core/src/writable-stream.test.ts b/packages/core/src/writable-stream.test.ts index d07500d589..7514374a56 100644 --- a/packages/core/src/writable-stream.test.ts +++ b/packages/core/src/writable-stream.test.ts @@ -11,6 +11,7 @@ describe('WorkflowServerWritableStream', () => { writeToStream: ReturnType; writeToStreamMulti: ReturnType; closeStream: ReturnType; + streamFlushIntervalMs?: number; }; beforeEach(async () => { @@ -248,4 +249,52 @@ describe('WorkflowServerWritableStream', () => { ); }); }); + + describe('streamFlushIntervalMs', () => { + it('should use world.streamFlushIntervalMs when set to 0 (immediate flush)', async () => { + mockWorld.streamFlushIntervalMs = 0; + + const stream = new WorkflowServerWritableStream('s', 'run-1'); + const writer = stream.getWriter(); + + // With interval=0, the flush fires on the next microtask tick via setTimeout(fn, 0) + await writer.write(new Uint8Array([1])); + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + await writer.close(); + }); + + it('should fall back to default interval when streamFlushIntervalMs is undefined', async () => { + // mockWorld has no streamFlushIntervalMs set — uses default 10ms + delete mockWorld.streamFlushIntervalMs; + + const stream = new WorkflowServerWritableStream('s', 'run-1'); + const writer = stream.getWriter(); + + await writer.write(new Uint8Array([1])); + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + await writer.close(); + }); + + it('should respect a custom non-zero flush interval', async () => { + mockWorld.streamFlushIntervalMs = 50; + + const stream = new WorkflowServerWritableStream('s', 'run-1'); + const writer = stream.getWriter(); + + // Start a write — the flush is scheduled 50ms from now + const writePromise = writer.write(new Uint8Array([1])); + + // After 10ms (the old default), data should NOT have flushed yet + await new Promise((r) => setTimeout(r, 10)); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + + // Wait for the write to complete (will resolve after the 50ms timer fires) + await writePromise; + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + await writer.close(); + }); + }); }); diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index 7f686d879f..99431be3e1 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -22,6 +22,11 @@ export type Config = { * `.workflow-data` directory. */ tag?: string; + /** + * Override the flush interval (in ms) for buffered stream writes. + * Default is 10ms. Set to 0 for immediate flushing. + */ + streamFlushIntervalMs?: number; }; export const config = once(() => { diff --git a/packages/world-local/src/index.ts b/packages/world-local/src/index.ts index 654ec53b76..c8e9c695d5 100644 --- a/packages/world-local/src/index.ts +++ b/packages/world-local/src/index.ts @@ -64,10 +64,12 @@ export function createLocalWorld(args?: Partial): LocalWorld { return { ...queue, ...createStorage(mergedConfig.dataDir, tag), - ...instrumentObject( - 'world.streams', - createStreamer(mergedConfig.dataDir, tag) - ), + ...instrumentObject('world.streams', { + ...createStreamer(mergedConfig.dataDir, tag), + ...(mergedConfig.streamFlushIntervalMs !== undefined && { + streamFlushIntervalMs: mergedConfig.streamFlushIntervalMs, + }), + }), async start() { await initDataDir(mergedConfig.dataDir); }, diff --git a/packages/world-postgres/src/config.ts b/packages/world-postgres/src/config.ts index 02473e89ac..ca778914ea 100644 --- a/packages/world-postgres/src/config.ts +++ b/packages/world-postgres/src/config.ts @@ -7,4 +7,9 @@ type PgConnectionConfig = export type PostgresWorldConfig = PgConnectionConfig & { jobPrefix?: string; queueConcurrency?: number; + /** + * Override the flush interval (in ms) for buffered stream writes. + * Default is 10ms. Set to 0 for immediate flushing. + */ + streamFlushIntervalMs?: number; }; diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index e9d61cd60a..33d0d74e58 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -59,6 +59,9 @@ export function createWorld( ...storage, ...streamer, ...queue, + ...(config.streamFlushIntervalMs !== undefined && { + streamFlushIntervalMs: config.streamFlushIntervalMs, + }), async start() { await queue.start(); }, diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index 77b8cd04e6..f2bcc13081 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -30,6 +30,19 @@ import type { } from './steps.js'; export interface Streamer { + /** + * Override the default flush interval (in milliseconds) for buffered stream writes. + * Chunks are accumulated in a buffer and flushed together on this interval. + * + * The default is 10ms, which is appropriate for HTTP-based backends like world-vercel + * where each flush is a network round-trip. For backends with sub-millisecond writes + * (e.g., Redis, local filesystem), a lower value (or 0 for immediate flushing) reduces + * end-to-end stream latency. + * + * Not supported by world-vercel. + */ + streamFlushIntervalMs?: number; + writeToStream( name: string, runId: string,