Skip to content

STREAM_FLUSH_INTERVAL_MS should be configurable per-world #1517

@jcourson-bg

Description

@jcourson-bg

WorkflowServerWritableStream buffers writes for 10ms before flushing (STREAM_FLUSH_INTERVAL_MS in packages/core/src/serialization.ts). This makes sense for world-vercel where each flush is an HTTP PUT, but for backends like Redis where XADD is sub-millisecond, the buffer just adds latency for no benefit (chunky ai outputs).

The constant is hardcoded and there's no env var or runtime option to change it. Every world pays the same 10ms delay regardless of how cheap its writes are.

I want to use a hybrid world (Postgres for storage/queue, Redis for streams):

import { createWorld as createPostgresWorld } from '@workflow/world-postgres';
import { createStreamer as createRedisStreamer } from '@workflow-worlds/redis';
import type { World } from '@workflow/world';

export function createHybridWorld(config: {
  postgresUrl: string;
  redisUrl: string;
}): World {
  const pgWorld = createPostgresWorld({
    connectionString: config.postgresUrl,
  });
  
  // Redis streamer from the community package
  const { streamer: redisStreamer, close: closeRedis } = await createRedisStreamer({
    redis: new Redis(config.redisUrl),
  });

  return {
    // Storage, queue, everything else from Postgres
    ...pgWorld,
    // Override just the stream methods with Redis
    ...redisStreamer,
    async start() {
      await pgWorld.start();
    },
    async close() {
      await closeRedis();
      await pgWorld.close?.();
    },
  };
}

But the 10ms buffering lives in core's WorkflowServerWritableStream, not in the world - so even with a fast Redis streamer, every write still sits in the buffer before writeToStream is ever called. There's no way for a world to opt out of it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions