From 38b66744e0e440e1778de94266acb45b3a2eafc8 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 31 Mar 2026 12:43:43 -0700 Subject: [PATCH 01/17] [core] Combine initial run fetch, event fetch, and run_started event creation Signed-off-by: Peter Wielander --- .changeset/better-peas-buy.md | 8 +++ packages/core/src/runtime.ts | 53 ++++++++++------- .../world-local/src/storage/events-storage.ts | 36 ++++++++++- packages/world-postgres/src/storage.ts | 59 ++++++++++++++++++- packages/world/src/events.ts | 6 ++ 5 files changed, 136 insertions(+), 26 deletions(-) create mode 100644 .changeset/better-peas-buy.md diff --git a/.changeset/better-peas-buy.md b/.changeset/better-peas-buy.md new file mode 100644 index 0000000000..2f499761b5 --- /dev/null +++ b/.changeset/better-peas-buy.md @@ -0,0 +1,8 @@ +--- +"@workflow/world": patch +"@workflow/core": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +--- + +Combine initial run fetch, event fetch, and run_started event creation diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index da7a407bd3..4e2db032d0 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -191,34 +191,40 @@ export function workflowEntrypoint( }); let workflowStartedAt = -1; - let workflowRun = await world.runs.get(runId); + let workflowRun: WorkflowRun | undefined; + // Pre-loaded events from the run_started response. + // When present, we skip the events.list call to reduce TTFB. + let preloadedEvents: Event[] | undefined; // --- Infrastructure: prepare the run state --- + // Always call run_started directly — this both transitions + // the run to 'running' AND returns the run entity, saving + // a separate runs.get round-trip. // Network/server errors propagate to the queue handler for retry. // WorkflowRuntimeError (data integrity issues) are fatal and // produce run_failed since retrying won't fix them. try { - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create( - runId, - { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }, - { requestId } + const result = await world.events.create( + runId, + { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }, + { requestId } + ); + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` ); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; + } + workflowRun = result.run; + + // If the world returned events, use them to skip + // the initial events.list call and reduce TTFB. + if (result.events && result.events.length > 0) { + preloadedEvents = result.events; } - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. if (!workflowRun.startedAt) { throw new WorkflowRuntimeError( `Workflow run "${runId}" has no "startedAt" timestamp` @@ -226,7 +232,6 @@ export function workflowEntrypoint( } } catch (err) { // Run was concurrently completed/failed/cancelled - // between the GET and the run_started event creation if (EntityConflictError.is(err) || RunExpiredError.is(err)) { runtimeLogger.info( 'Run already finished during setup, skipping', @@ -294,8 +299,12 @@ export function workflowEntrypoint( return; } - // Load all events into memory before running - const events = await getAllWorkflowRunEvents(workflowRun.runId); + // Load all events into memory before running. + // If we got pre-loaded events from the run_started response, + // skip the events.list round-trip to reduce TTFB. + const events = + preloadedEvents ?? + (await getAllWorkflowRunEvents(workflowRun.runId)); // Check for any elapsed waits and create wait_completed events const now = Date.now(); diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index efa031afcf..b6aa61b56c 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -190,7 +190,15 @@ export function createEventsStorage( }; } - // Run state transitions are not allowed on terminal runs + // For run_started on terminal runs, use RunExpiredError so the + // runtime knows to exit without retrying. + if (data.eventType === 'run_started') { + throw new RunExpiredError( + `Workflow run "${effectiveRunId}" is already in terminal state "${currentRun.status}"` + ); + } + + // Other run state transitions are not allowed on terminal runs if ( runTerminalEvents.includes(data.eventType) || data.eventType === 'run_cancelled' @@ -280,6 +288,10 @@ export function createEventsStorage( createdAt: now, specVersion: effectiveSpecVersion, }; + // Strip eventData from run_started — it belongs on run_created only. + if (data.eventType === 'run_started' && 'eventData' in event) { + delete (event as any).eventData; + } // Track entity created/updated for EventResult let run: WorkflowRun | undefined; @@ -316,6 +328,12 @@ export function createEventsStorage( } else if (data.eventType === 'run_started') { // Reuse currentRun from validation (already read above) if (currentRun) { + // If already running, return the run directly without + // creating a duplicate event. + if (currentRun.status === 'running') { + return { run: currentRun }; + } + run = { runId: currentRun.runId, deploymentId: currentRun.deploymentId, @@ -832,6 +850,21 @@ export function createEventsStorage( const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; const filteredEvent = stripEventDataRefs(event, resolveData); + // For run_started: include all events so the runtime can skip + // the initial events.list call and reduce TTFB. + let events: Event[] | undefined; + if (data.eventType === 'run_started' && run) { + const allEvents = await paginatedFileSystemQuery({ + directory: path.join(basedir, 'events'), + schema: EventSchema, + filePrefix: `${effectiveRunId}-`, + sortOrder: 'asc', + getCreatedAt: getObjectCreatedAt('evnt'), + getId: (e) => e.eventId, + }); + events = allEvents.data; + } + // Return EventResult with event and any created/updated entity return { event: filteredEvent, @@ -839,6 +872,7 @@ export function createEventsStorage( step, hook, wait, + events, }; }, diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index d65dfa5ab9..1d1e8f96b3 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -444,7 +444,15 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { }; } - // Run state transitions are not allowed on terminal runs + // For run_started on terminal runs, use RunExpiredError so the + // runtime knows to exit without retrying. + if (data.eventType === 'run_started') { + throw new RunExpiredError( + `Workflow run "${effectiveRunId}" is already in terminal state "${currentRun.status}"` + ); + } + + // Other run state transitions are not allowed on terminal runs if ( runTerminalEvents.includes(data.eventType) || data.eventType === 'run_cancelled' @@ -563,6 +571,18 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { // Handle run_started event: update run status if (data.eventType === 'run_started') { + // If already running, return the run directly without event + if (currentRun?.status === 'running') { + const [fullRun] = await drizzle + .select() + .from(Schema.runs) + .where(eq(Schema.runs.runId, effectiveRunId)) + .limit(1); + if (fullRun) { + return { run: deserializeRunError(compact(fullRun)) }; + } + } + const [runValue] = await drizzle .update(Schema.runs) .set({ @@ -1135,6 +1155,14 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { } } + // Strip eventData from run_started — it belongs on run_created only. + const storedEventData = + data.eventType === 'run_started' + ? undefined + : 'eventData' in data + ? data.eventData + : undefined; + const [value] = await drizzle .insert(events) .values({ @@ -1142,22 +1170,47 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { eventId, correlationId: data.correlationId, eventType: data.eventType, - eventData: 'eventData' in data ? data.eventData : undefined, + eventData: storedEventData, specVersion: effectiveSpecVersion, }) .returning({ createdAt: events.createdAt }); if (!value) { throw new EntityConflictError(`Event ${eventId} could not be created`); } - const result = { ...data, ...value, runId: effectiveRunId, eventId }; + const result = { + ...data, + ...value, + runId: effectiveRunId, + eventId, + ...(storedEventData !== undefined + ? { eventData: storedEventData } + : {}), + }; + if (data.eventType === 'run_started') { + delete (result as any).eventData; + } const parsed = EventSchema.parse(result); const resolveData = params?.resolveData ?? 'all'; + + // For run_started: include all events so the runtime can skip + // the initial events.list call and reduce TTFB. + let allEvents: Event[] | undefined; + if (data.eventType === 'run_started' && run) { + const eventRows = await drizzle + .select() + .from(Schema.events) + .where(eq(Schema.events.runId, effectiveRunId)) + .orderBy(Schema.events.eventId); + allEvents = eventRows.map((e) => EventSchema.parse(compact(e))); + } + return { event: stripEventDataRefs(parsed, resolveData), run, step, hook, wait, + events: allEvents, }; }, async get( diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index 2965906f7b..74e862215a 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -369,6 +369,12 @@ export interface EventResult { hook?: import('./hooks.js').Hook; /** The wait entity (for wait_created/wait_completed events) */ wait?: import('./waits.js').Wait; + /** + * All events up to this point, with data resolved. When populated + * on a run_started response, the runtime uses these to skip the + * initial events.list call and reduce TTFB. + */ + events?: Event[]; } export interface GetEventParams { From 2f511392c317d501d293bfe1825f47dabf88a535 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 15:45:30 -0700 Subject: [PATCH 02/17] [core] [world] Lazy run creation on start Signed-off-by: Peter Wielander --- docs/content/docs/changelog/meta.json | 2 +- .../docs/changelog/resilient-start.mdx | 108 +++++++++++++++ packages/core/src/runtime.ts | 44 ++++-- packages/core/src/runtime/start.ts | 126 +++++++++++++----- packages/world-vercel/src/events.ts | 4 + packages/world-vercel/src/utils.ts | 3 +- packages/world/src/events.ts | 13 ++ packages/world/src/index.ts | 1 + packages/world/src/queue.ts | 17 +++ packages/world/src/ulid.ts | 8 +- 10 files changed, 274 insertions(+), 52 deletions(-) create mode 100644 docs/content/docs/changelog/resilient-start.mdx diff --git a/docs/content/docs/changelog/meta.json b/docs/content/docs/changelog/meta.json index 042ff8fc8b..0c01dff133 100644 --- a/docs/content/docs/changelog/meta.json +++ b/docs/content/docs/changelog/meta.json @@ -1,5 +1,5 @@ { "title": "Changelog", - "pages": ["index", "eager-processing"], + "pages": ["index", "eager-processing", "resilient-start"], "defaultOpen": false } diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx new file mode 100644 index 0000000000..e2ef937391 --- /dev/null +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -0,0 +1,108 @@ +--- +title: Resilient start() +description: Overhaul of start() to tolerate world-vercel storage unavailability when the queue is healthy. +--- + +# Resilient `start()` + +## Motivation + +When `world-vercel` storage (DynamoDB) is unavailable but the queue (VQS) is up, +`start()` previously failed entirely because `world.events.create(run_created)` +is called before `world.queue()`. This change decouples run creation from queue +dispatch so that runs can still be accepted when storage is degraded. + +## Design + +### `start()` changes (packages/core) + +- `world.events.create` (run_created) and `world.queue` are now called **in parallel**. +- If `events.create` errors with **429 or 5xx**, we log a warning saying that run + creation failed but the run was accepted — creation will be re-tried async by the + runtime when it processes the queue message. +- If `world.queue` fails, we still throw — the run truly failed and was not enqueued. +- The queue invocation now receives all the run inputs (`input`, `deploymentId`, + `workflowName`, `specVersion`, `executionContext`) so the runtime can create the + run later if needed. +- When the runtime re-enqueues itself, it does **not** pass these inputs — only the + first queue cycle carries them. + +### `workflowEntrypoint` changes (packages/core) + +- We no longer call `world.runs.get` or check the run status before starting. +- We **always** call `world.events.create` with `run_started`, now also passing the + run input that was sent through the queue. The response will be: + - **200 (now running)**: use the returned `Run` entity as the run. The response also + includes an `events` array of all events up to that point (typically `run_created` + and `run_started`), with data resolved. These are used to skip the very first + `world.events.list` call, reducing TTFB for the first invocation. + - **409 (already running)**: fetch the run via `world.runs.get` and proceed. + - **410 (already finished)**: log and exit as usual. + +### World / workflow-server changes + +- Posting `run_started` to a **non-existent** run is now allowed when the run input is + sent along with the payload. The server: + 1. Creates a `run_created` event first (so the event log is consistent). + 2. Strips the input from the `run_started` event data (it lives on `run_created`). + 3. Then creates the `run_started` event normally. + 4. Emits a log and Datadog metric to track when this fallback path is hit. +- When posting `run_started` and getting **200**, the response includes an `events` + property with all events up to that point (data always resolved). +- The ULID timestamp validation threshold for `run_created` was relaxed from + **5 minutes** to **24 hours** so the queue can correctly re-try creation later. + +## Decisions + +1. **Parallel not sequential**: We chose `Promise.allSettled` over sequential calls to + minimize latency in the happy path. The trade-off is slightly more complex error + handling. + +2. **409 → extra GET**: When `run_started` returns 409 (already running), we do an + extra `world.runs.get` call rather than changing the server to include the run + entity in 409 error responses. This keeps the HTTP error contract simple. The 409 + path is rare (concurrent workers racing) so the extra latency is acceptable. + +3. **Events in 200 response**: We only return events on the 200 path (first caller). + On 409 (second caller), we fall back to the normal `events.list` call. This is + correct because only on 200 can we be certain we know the full event history. + +4. **24-hour ULID threshold**: VQS supports delayed messages up to 24 hours. We match + this so a run_created retry can succeed even at the maximum queue delay. The + threshold still prevents abuse from manipulated timestamps. + +## Implementation notes + +### Error type mapping for terminal runs +Previously, calling `run_started` on a terminal run threw `InvalidOperationStateError` +(HTTP 409). This was changed to `EntityGoneError` (HTTP 410) so the runtime correctly +distinguishes "already running" (409 → fetch and proceed) from "already finished" +(410 → exit immediately). This required updating integration tests in +`events-materialization.integration.ts`. + +### run_started on already-running runs +Added an explicit check in `handleRunStateTransition`: if the run is already `running`, +throw `EntityConflictError` (409) instead of idempotently patching and creating a +duplicate `run_started` event. This prevents event log pollution from concurrent workers. + +### RunStartedEventSchema eventData stripping +The run input is passed through to `run_started`'s `eventData` but stripped before +the event is persisted — the data belongs on the `run_created` event only. The server +sets `effectiveEventData = undefined` for `run_started` events. + +### ULID threshold test updates +Both `workflow-server/test/helpers/ulid.test.ts` and +`test/api/events.integration.ts` had tests expecting 10-minute-old ULIDs to be +rejected. These were updated to use 25-hour offsets to match the new 24-hour threshold. + +## Follow-up work + +- [ ] Consider including run entity in 409 responses server-side to avoid the extra + GET call on the concurrent-start path. +- [ ] Add e2e tests covering the degraded-storage start path. +- [ ] Monitor the "run_started created run" Datadog metric in production to understand + how often the fallback path is hit. +- [ ] Consider whether the `events` optimization in the 200 response should also apply + to re-enqueue cycles (currently only first invocation). +- [ ] Consider adding a Datadog metric for the run_started → already running (409) + path to track concurrent start frequency. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 4e2db032d0..ce60426c69 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -4,8 +4,6 @@ import { RunExpiredError, WorkflowRuntimeError, } from '@workflow/errors'; -import { classifyRunError } from './classify-error.js'; -import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; import { parseWorkflowName } from '@workflow/utils/parse-name'; import { type Event, @@ -13,9 +11,11 @@ import { WorkflowInvokePayloadSchema, type WorkflowRun, } from '@workflow/world'; +import { classifyRunError } from './classify-error.js'; import { importKey } from './encryption.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; +import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; import { getAllWorkflowRunEvents, getQueueOverhead, @@ -105,6 +105,7 @@ export function workflowEntrypoint( runId, traceCarrier: traceContext, requestedAt, + runInput, } = WorkflowInvokePayloadSchema.parse(message_); const { requestId } = metadata; // Extract the workflow name from the topic name @@ -192,14 +193,15 @@ export function workflowEntrypoint( let workflowStartedAt = -1; let workflowRun: WorkflowRun | undefined; - // Pre-loaded events from the run_started response. - // When present, we skip the events.list call to reduce TTFB. + // Pre-loaded events from run_started response (first caller optimization) let preloadedEvents: Event[] | undefined; // --- Infrastructure: prepare the run state --- // Always call run_started directly — this both transitions // the run to 'running' AND returns the run entity, saving - // a separate runs.get round-trip. + // a separate runs.get round-trip. When runInput is present + // (resilient start), pass it so the server can create the + // run if run_created was missed. // Network/server errors propagate to the queue handler for retry. // WorkflowRuntimeError (data integrity issues) are fatal and // produce run_failed since retrying won't fix them. @@ -209,6 +211,18 @@ export function workflowEntrypoint( { eventType: 'run_started', specVersion: SPEC_VERSION_CURRENT, + // Pass run input from queue so server can create + // the run if run_created was missed + ...(runInput + ? { + eventData: { + input: runInput.input, + deploymentId: runInput.deploymentId, + workflowName: runInput.workflowName, + executionContext: runInput.executionContext, + }, + } + : {}), }, { requestId } ); @@ -219,7 +233,7 @@ export function workflowEntrypoint( } workflowRun = result.run; - // If the world returned events, use them to skip + // If the response includes events, use them to skip // the initial events.list call and reduce TTFB. if (result.events && result.events.length > 0) { preloadedEvents = result.events; @@ -231,15 +245,14 @@ export function workflowEntrypoint( ); } } catch (err) { - // Run was concurrently completed/failed/cancelled - if (EntityConflictError.is(err) || RunExpiredError.is(err)) { + if (RunExpiredError.is(err)) { + // 410: already finished — log and exit runtimeLogger.info( 'Run already finished during setup, skipping', { workflowRunId: runId, message: err.message } ); return; - } - if (err instanceof WorkflowRuntimeError) { + } else if (err instanceof WorkflowRuntimeError) { runtimeLogger.error( 'Fatal runtime error during workflow setup', { workflowRunId: runId, error: err.message } @@ -270,8 +283,15 @@ export function workflowEntrypoint( throw failErr; } return; + } else { + throw err; } - throw err; + } + + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` + ); } workflowStartedAt = +workflowRun.startedAt; @@ -300,7 +320,7 @@ export function workflowEntrypoint( } // Load all events into memory before running. - // If we got pre-loaded events from the run_started response, + // If we got events from the run_started response, // skip the events.list round-trip to reduce TTFB. const events = preloadedEvents ?? diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 15339b954d..7565de885e 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,9 +1,14 @@ import { waitUntil } from '@vercel/functions'; -import { WorkflowRuntimeError } from '@workflow/errors'; +import { + ThrottleError, + WorkflowRuntimeError, + WorkflowWorldError, +} from '@workflow/errors'; import type { WorkflowInvokePayload, World } from '@workflow/world'; import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { importKey } from '../encryption.js'; +import { runtimeLogger } from '../logger.js'; import type { Serializable } from '../schemas.js'; import { dehydrateWorkflowArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; @@ -157,33 +162,81 @@ export async function start( globalThis, v1Compat ); - const result = await world.events.create( - runId, - { - eventType: 'run_created', - specVersion, - eventData: { - deploymentId: deploymentId, - workflowName: workflowName, - input: workflowArguments, - executionContext: { traceCarrier, workflowCoreVersion }, + + const executionContext = { traceCarrier, workflowCoreVersion }; + + // Call events.create (run_created) and queue in parallel. + // If events.create fails with 429/5xx, the run was still accepted + // via the queue and creation will be re-tried async by the runtime. + const [runCreatedResult, queueResult] = await Promise.allSettled([ + world.events.create( + runId, + { + eventType: 'run_created', + specVersion, + eventData: { + deploymentId: deploymentId, + workflowName: workflowName, + input: workflowArguments, + executionContext, + }, }, - }, - { v1Compat } - ); + { v1Compat } + ), + world.queue( + getWorkflowQueueName(workflowName), + { + runId, + traceCarrier, + runInput: { + input: workflowArguments, + deploymentId, + workflowName, + specVersion, + executionContext, + }, + } satisfies WorkflowInvokePayload, + { + deploymentId, + } + ), + ]); - // Assert that the run was created - if (!result.run) { - throw new WorkflowRuntimeError( - "Missing 'run' in server response for 'run_created' event" - ); + // Queue failure is always fatal — the run was not enqueued + if (queueResult.status === 'rejected') { + throw queueResult.reason; } - // Verify server accepted our runId - if (!v1Compat && result.run.runId !== runId) { - throw new WorkflowRuntimeError( - `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` - ); + // Handle events.create result + if (runCreatedResult.status === 'rejected') { + const err = runCreatedResult.reason; + // 429 (ThrottleError) and 5xx (WorkflowWorldError with status >= 500) + // are retryable — the run was accepted via the queue and creation + // will be re-tried by the runtime when it calls run_started. + if (isRetryableStartError(err)) { + runtimeLogger.warn( + 'Run creation event failed, but the run was accepted via the queue. ' + + 'The run_created event will be re-tried async by the runtime.', + { workflowRunId: runId, error: err.message } + ); + } else { + throw err; + } + } else { + const result = runCreatedResult.value; + // Assert that the run was created + if (!result.run) { + throw new WorkflowRuntimeError( + "Missing 'run' in server response for 'run_created' event" + ); + } + + // Verify server accepted our runId + if (!v1Compat && result.run.runId !== runId) { + throw new WorkflowRuntimeError( + `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` + ); + } } waitUntil( @@ -197,22 +250,23 @@ export async function start( span?.setAttributes({ ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowRunStatus(result.run.status), ...Attribute.DeploymentId(deploymentId), }); - await world.queue( - getWorkflowQueueName(workflowName), - { - runId, - traceCarrier, - } satisfies WorkflowInvokePayload, - { - deploymentId, - } - ); - return new Run(runId); }); }); } + +/** + * Checks if an error from events.create (run_created) is retryable, + * meaning the queue can re-try creation later via the run_started path. + * - ThrottleError (429): rate limited, will succeed later + * - WorkflowWorldError with status >= 500: server error, will succeed later + */ +function isRetryableStartError(err: unknown): boolean { + if (ThrottleError.is(err)) return true; + if (WorkflowWorldError.is(err) && err.status && err.status >= 500) + return true; + return false; +} diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 3f5acef00d..22c1890fa2 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -64,6 +64,7 @@ const EventResultResolveWireSchema = z.object({ run: WorkflowRunSchema.optional(), step: StepWireSchema.optional(), hook: HookSchema.optional(), + events: z.array(EventSchema).optional(), }); const EventResultLazyWireSchema = z.object({ @@ -71,6 +72,7 @@ const EventResultLazyWireSchema = z.object({ run: WorkflowRunWireBaseSchema.optional(), step: StepWireSchema.optional(), hook: HookSchema.optional(), + events: z.array(EventSchema).optional(), }); // Schema for events returned with `remoteRefBehavior=lazy`. @@ -461,6 +463,7 @@ async function createWorkflowRunEventInner( run: wireResult.run, step: wireResult.step ? deserializeStep(wireResult.step) : undefined, hook: wireResult.hook, + events: wireResult.events, }; } @@ -487,5 +490,6 @@ async function createWorkflowRunEventInner( : undefined, step: wireResult.step ? deserializeStep(wireResult.step) : undefined, hook: wireResult.hook, + events: wireResult.events, }; } diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 5db88afadd..1086bd790a 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -61,7 +61,8 @@ function httpLog( * * Example: 'https://workflow-server-git-branch-name.vercel.sh' */ -const WORKFLOW_SERVER_URL_OVERRIDE = ''; +const WORKFLOW_SERVER_URL_OVERRIDE = + 'https://workflow-server-eumrxk3ud.vercel.sh'; export interface APIConfig { token?: string; diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index 74e862215a..6784a0a819 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -223,9 +223,22 @@ const RunCreatedEventSchema = BaseEventSchema.extend({ /** * Event created when a workflow run starts executing. * Updates the run entity to status 'running'. + * + * The optional eventData carries run creation data for the resilient start path: + * when the run_created event failed (e.g., storage outage during start()), the + * runtime passes the run input through the queue so the server can create the run + * on the run_started call if it doesn't exist yet. */ const RunStartedEventSchema = BaseEventSchema.extend({ eventType: z.literal('run_started'), + eventData: z + .object({ + input: SerializedDataSchema.optional(), + deploymentId: z.string().optional(), + workflowName: z.string().optional(), + executionContext: z.record(z.string(), z.any()).optional(), + }) + .optional(), }); /** diff --git a/packages/world/src/index.ts b/packages/world/src/index.ts index baa62a1480..ae30c9fda0 100644 --- a/packages/world/src/index.ts +++ b/packages/world/src/index.ts @@ -16,6 +16,7 @@ export { MessageId, QueuePayloadSchema, QueuePrefix, + RunInputSchema, StepInvokePayloadSchema, ValidQueueName, WorkflowInvokePayloadSchema, diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 5093b62dd3..59f467b463 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -21,12 +21,29 @@ export type MessageId = z.infer; export const TraceCarrierSchema = z.record(z.string(), z.string()); export type TraceCarrier = z.infer; +/** + * Run creation data carried through the queue for resilient start. + * Only present on the first queue delivery — re-enqueues omit this. + * When the runtime processes the message, it passes this data to the + * run_started event so the server can create the run if it doesn't exist yet. + */ +export const RunInputSchema = z.object({ + input: z.unknown(), + deploymentId: z.string(), + workflowName: z.string(), + specVersion: z.number(), + executionContext: z.record(z.string(), z.any()).optional(), +}); +export type RunInput = z.infer; + export const WorkflowInvokePayloadSchema = z.object({ runId: z.string(), traceCarrier: TraceCarrierSchema.optional(), requestedAt: z.coerce.date().optional(), /** Number of times this message has been re-enqueued due to server errors (5xx) */ serverErrorRetryCount: z.number().int().optional(), + /** Run creation data, only present on the first queue delivery from start() */ + runInput: RunInputSchema.optional(), }); export const StepInvokePayloadSchema = z.object({ diff --git a/packages/world/src/ulid.ts b/packages/world/src/ulid.ts index 9f5bb1e53d..ad59f1445c 100644 --- a/packages/world/src/ulid.ts +++ b/packages/world/src/ulid.ts @@ -4,9 +4,13 @@ import { z } from 'zod'; const UlidSchema = z.string().ulid(); /** - * Default threshold for ULID timestamp validation (5 minutes in milliseconds). + * Default threshold for ULID timestamp validation (24 hours in milliseconds). + * + * Set to 24 hours to support the resilient start path: when start() fails to + * create run_created, the queue carries the run input and the runtime creates + * the run on run_started. VQS supports delayed messages up to 24 hours. */ -export const DEFAULT_TIMESTAMP_THRESHOLD_MS = 5 * 60 * 1000; +export const DEFAULT_TIMESTAMP_THRESHOLD_MS = 24 * 60 * 60 * 1000; /** * Extracts a Date from a ULID string, or null if the string is not a valid ULID. From 0dfd424ee0d1123386da51dee01987eb70b54d7b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 16:13:12 -0700 Subject: [PATCH 03/17] add local and postgres world changes Signed-off-by: Peter Wielander --- .../world-local/src/storage/events-storage.ts | 78 +++++++++++++++++++ packages/world-postgres/src/storage.ts | 61 +++++++++++++++ packages/world-vercel/src/events.ts | 4 +- 3 files changed, 141 insertions(+), 2 deletions(-) diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index b6aa61b56c..20e600b18c 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -112,6 +112,8 @@ export function createEventsStorage( // on running steps, and running steps are always allowed to modify regardless // of run state. This optimization saves filesystem reads per step event. let currentRun: WorkflowRun | null = null; + // Track whether run was created via resilient start fallback + let runCreatedViaRunStarted = false; const skipRunValidationEvents = ['step_completed', 'step_retrying']; if ( data.eventType !== 'run_created' && @@ -124,6 +126,73 @@ export function createEventsStorage( WorkflowRunSchema, tag ); + + // Resilient start: run_started on non-existent run with eventData + // creates the run first, so the queue can bootstrap a run that + // failed to create during start(). + if ( + data.eventType === 'run_started' && + !currentRun && + 'eventData' in data && + data.eventData + ) { + const runInputData = data.eventData as { + deploymentId?: string; + workflowName?: string; + input?: any; + executionContext?: Record; + }; + if ( + runInputData.deploymentId && + runInputData.workflowName && + runInputData.input !== undefined + ) { + // Create the run entity + const createdRun: WorkflowRun = { + runId: effectiveRunId, + deploymentId: runInputData.deploymentId, + status: 'pending', + workflowName: runInputData.workflowName, + specVersion: effectiveSpecVersion, + executionContext: runInputData.executionContext, + input: runInputData.input, + output: undefined, + error: undefined, + startedAt: undefined, + completedAt: undefined, + createdAt: now, + updatedAt: now, + }; + await writeJSON( + taggedPath(basedir, 'runs', effectiveRunId, tag), + createdRun + ); + + // Create run_created event + const runCreatedEventId = `evnt_${monotonicUlid()}`; + const runCreatedEvent: Event = { + eventType: 'run_created', + runId: effectiveRunId, + eventId: runCreatedEventId, + createdAt: now, + specVersion: effectiveSpecVersion, + eventData: { + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + input: runInputData.input, + executionContext: runInputData.executionContext, + }, + }; + const createdCompositeKey = `${effectiveRunId}-${runCreatedEventId}`; + await writeJSON( + taggedPath(basedir, 'events', createdCompositeKey, tag), + runCreatedEvent + ); + + currentRun = createdRun; + runCreatedViaRunStarted = true; + } + } } // ============================================================ @@ -281,8 +350,17 @@ export function createEventsStorage( throw new HookNotFoundError(data.correlationId); } } + // Strip eventData from run_started events — the run input belongs + // on the run_created event only, not on run_started. + const eventData = + data.eventType === 'run_started' + ? undefined + : 'eventData' in data + ? data.eventData + : undefined; const event: Event = { ...data, + ...(eventData !== undefined ? { eventData } : {}), runId: effectiveRunId, eventId, createdAt: now, diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index 1d1e8f96b3..c2fbf207ad 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -373,6 +373,67 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { runId: effectiveRunId, }); currentRun = runValue ?? null; + + // Resilient start: run_started on non-existent run with eventData + // creates the run first, so the queue can bootstrap a run that + // failed to create during start(). + if ( + data.eventType === 'run_started' && + !currentRun && + 'eventData' in data && + data.eventData + ) { + const runInputData = (data as any).eventData as { + deploymentId?: string; + workflowName?: string; + input?: any; + executionContext?: Record; + }; + if ( + runInputData.deploymentId && + runInputData.workflowName && + runInputData.input !== undefined + ) { + // Create the run entity + const [createdRun] = await drizzle + .insert(Schema.runs) + .values({ + runId: effectiveRunId, + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + specVersion: effectiveSpecVersion, + input: runInputData.input as SerializedContent, + executionContext: runInputData.executionContext as + | SerializedContent + | undefined, + status: 'pending', + }) + .onConflictDoNothing() + .returning(); + + if (createdRun) { + // Create run_created event + const runCreatedEventId = `wevt_${ulid()}`; + await drizzle.insert(events).values({ + runId: effectiveRunId, + eventId: runCreatedEventId, + eventType: 'run_created', + eventData: { + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + input: runInputData.input, + executionContext: runInputData.executionContext, + }, + specVersion: effectiveSpecVersion, + }); + + currentRun = { + status: 'pending', + specVersion: effectiveSpecVersion, + }; + } + } + } } // ============================================================ diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 22c1890fa2..c37edac2af 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -60,7 +60,7 @@ function stripEventAndLegacyRefs( // undefined), so we use the looser WorkflowRunWireBaseSchema and normalize // the error via deserializeError() afterward. const EventResultResolveWireSchema = z.object({ - event: EventSchema, + event: EventSchema.optional(), run: WorkflowRunSchema.optional(), step: StepWireSchema.optional(), hook: HookSchema.optional(), @@ -68,7 +68,7 @@ const EventResultResolveWireSchema = z.object({ }); const EventResultLazyWireSchema = z.object({ - event: EventSchema, + event: EventSchema.optional(), run: WorkflowRunWireBaseSchema.optional(), step: StepWireSchema.optional(), hook: HookSchema.optional(), From b1c2bf36e0dfcf8c4371cf908d857790ab32abe5 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 16:13:25 -0700 Subject: [PATCH 04/17] fix get Signed-off-by: Peter Wielander --- docs/content/docs/changelog/resilient-start.mdx | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx index e2ef937391..92c4601681 100644 --- a/docs/content/docs/changelog/resilient-start.mdx +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -74,6 +74,7 @@ dispatch so that runs can still be accepted when storage is degraded. ## Implementation notes ### Error type mapping for terminal runs + Previously, calling `run_started` on a terminal run threw `InvalidOperationStateError` (HTTP 409). This was changed to `EntityGoneError` (HTTP 410) so the runtime correctly distinguishes "already running" (409 → fetch and proceed) from "already finished" @@ -81,16 +82,19 @@ distinguishes "already running" (409 → fetch and proceed) from "already finish `events-materialization.integration.ts`. ### run_started on already-running runs + Added an explicit check in `handleRunStateTransition`: if the run is already `running`, throw `EntityConflictError` (409) instead of idempotently patching and creating a duplicate `run_started` event. This prevents event log pollution from concurrent workers. ### RunStartedEventSchema eventData stripping + The run input is passed through to `run_started`'s `eventData` but stripped before the event is persisted — the data belongs on the `run_created` event only. The server sets `effectiveEventData = undefined` for `run_started` events. ### ULID threshold test updates + Both `workflow-server/test/helpers/ulid.test.ts` and `test/api/events.integration.ts` had tests expecting 10-minute-old ULIDs to be rejected. These were updated to use 25-hour offsets to match the new 24-hour threshold. @@ -106,3 +110,4 @@ rejected. These were updated to use 25-hour offsets to match the new 24-hour thr to re-enqueue cycles (currently only first invocation). - [ ] Consider adding a Datadog metric for the run_started → already running (409) path to track concurrent start frequency. +- [ ] Edit run ID validation so that we still allow up to 24 hours in the past, but only up to 5 minutes in the future From 920272365c96c66ff8b6569e441ac8b7b759e4cf Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 16:15:39 -0700 Subject: [PATCH 05/17] changelog Signed-off-by: Peter Wielander --- .../docs/changelog/resilient-start.mdx | 36 +++++---- packages/core/src/runtime/start.test.ts | 73 ++++++++++++++++++- 2 files changed, 95 insertions(+), 14 deletions(-) diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx index 92c4601681..b353ed3c6e 100644 --- a/docs/content/docs/changelog/resilient-start.mdx +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -58,13 +58,13 @@ dispatch so that runs can still be accepted when storage is degraded. minimize latency in the happy path. The trade-off is slightly more complex error handling. -2. **409 → extra GET**: When `run_started` returns 409 (already running), we do an - extra `world.runs.get` call rather than changing the server to include the run - entity in 409 error responses. This keeps the HTTP error contract simple. The 409 - path is rare (concurrent workers racing) so the extra latency is acceptable. +2. **Already-running returns run without event**: When `run_started` encounters an + already-running run, all worlds return `{ run }` with `event: undefined` (no + `events` array) instead of throwing. The runtime detects this by checking for + `result.event === undefined`. This avoids the extra `world.runs.get` round-trip. 3. **Events in 200 response**: We only return events on the 200 path (first caller). - On 409 (second caller), we fall back to the normal `events.list` call. This is + On the already-running path, we fall back to the normal `events.list` call. This is correct because only on 200 can we be certain we know the full event history. 4. **24-hour ULID threshold**: VQS supports delayed messages up to 24 hours. We match @@ -83,9 +83,22 @@ distinguishes "already running" (409 → fetch and proceed) from "already finish ### run_started on already-running runs -Added an explicit check in `handleRunStateTransition`: if the run is already `running`, -throw `EntityConflictError` (409) instead of idempotently patching and creating a -duplicate `run_started` event. This prevents event log pollution from concurrent workers. +All worlds (workflow-server, world-local, world-postgres) now return the existing run +entity directly — with `event: undefined` — when `run_started` is called on an +already-running run. This avoids both a duplicate event and the extra `world.runs.get` +call that the previous 409-based approach required. The `EventResultResolveWireSchema` +in world-vercel was updated to make `event` optional. + +### world-local and world-postgres support + +Both world-local (filesystem) and world-postgres (Drizzle/SQL) now implement the full +resilient start behavior: + +- Creating runs from `run_started` when the run doesn't exist and eventData is provided +- Returning `{ run }` without event on already-running +- Throwing `RunExpiredError` on terminal runs +- Stripping eventData from stored `run_started` events +- Returning the `events` array on successful start ### RunStartedEventSchema eventData stripping @@ -101,13 +114,10 @@ rejected. These were updated to use 25-hour offsets to match the new 24-hour thr ## Follow-up work -- [ ] Consider including run entity in 409 responses server-side to avoid the extra - GET call on the concurrent-start path. -- [ ] Add e2e tests covering the degraded-storage start path. +- [ ] Add e2e tests covering the degraded-storage start path against a live deployment. - [ ] Monitor the "run_started created run" Datadog metric in production to understand how often the fallback path is hit. - [ ] Consider whether the `events` optimization in the 200 response should also apply to re-enqueue cycles (currently only first invocation). -- [ ] Consider adding a Datadog metric for the run_started → already running (409) - path to track concurrent start frequency. - [ ] Edit run ID validation so that we still allow up to 24 hours in the past, but only up to 5 minutes in the future +- [ ] Add a Datadog metric for the "run_started → already running (409)" path to track concurrent start frequency, so we can add alerting on it later diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index 049cd5ffb3..fab28aea4c 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -1,4 +1,4 @@ -import { WorkflowRuntimeError } from '@workflow/errors'; +import { WorkflowRuntimeError, WorkflowWorldError } from '@workflow/errors'; import { SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY } from '@workflow/world'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { start } from './start.js'; @@ -380,4 +380,75 @@ describe('start', () => { ); }); }); + + describe('resilient start (run_created failure)', () => { + const validWorkflow = Object.assign(() => Promise.resolve('result'), { + workflowId: 'test-workflow', + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('should succeed when events.create throws a 500 error (queue still dispatched)', async () => { + const mockQueue = vi.fn().mockResolvedValue({ messageId: null }); + const serverError = new WorkflowWorldError('Internal Server Error', { + status: 500, + }); + const mockEventsCreate = vi.fn().mockRejectedValue(serverError); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + // start() should NOT throw — the queue was still dispatched + const run = await start(validWorkflow, [42]); + expect(run.runId).toMatch(/^wrun_/); + + // Queue should have been called with runInput + expect(mockQueue).toHaveBeenCalledTimes(1); + const [, queuePayload] = mockQueue.mock.calls[0]; + expect(queuePayload.runInput).toBeDefined(); + expect(queuePayload.runInput.deploymentId).toBe('deploy_123'); + expect(queuePayload.runInput.workflowName).toBe('test-workflow'); + expect(queuePayload.runInput.specVersion).toBe(SPEC_VERSION_CURRENT); + }); + + it('should throw when queue fails even if events.create succeeds', async () => { + const mockEventsCreate = vi.fn().mockResolvedValue({ + run: { runId: 'wrun_test', status: 'pending' }, + }); + const mockQueue = vi + .fn() + .mockRejectedValue(new Error('Queue unavailable')); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + await expect(start(validWorkflow, [])).rejects.toThrow( + 'Queue unavailable' + ); + }); + + it('should throw when events.create fails with a non-retryable error (e.g. 400)', async () => { + const badRequest = new WorkflowWorldError('Bad Request', { + status: 400, + }); + const mockEventsCreate = vi.fn().mockRejectedValue(badRequest); + const mockQueue = vi.fn().mockResolvedValue({ messageId: null }); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + await expect(start(validWorkflow, [])).rejects.toThrow('Bad Request'); + }); + }); }); From c129183357d6d86f57e4d3c81fe6dbf0bf525cc4 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 16:18:18 -0700 Subject: [PATCH 06/17] fix local world Signed-off-by: Peter Wielander --- packages/world-local/src/storage/events-storage.ts | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 20e600b18c..03f2dbb571 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -112,8 +112,6 @@ export function createEventsStorage( // on running steps, and running steps are always allowed to modify regardless // of run state. This optimization saves filesystem reads per step event. let currentRun: WorkflowRun | null = null; - // Track whether run was created via resilient start fallback - let runCreatedViaRunStarted = false; const skipRunValidationEvents = ['step_completed', 'step_retrying']; if ( data.eventType !== 'run_created' && @@ -190,7 +188,6 @@ export function createEventsStorage( ); currentRun = createdRun; - runCreatedViaRunStarted = true; } } } @@ -350,17 +347,8 @@ export function createEventsStorage( throw new HookNotFoundError(data.correlationId); } } - // Strip eventData from run_started events — the run input belongs - // on the run_created event only, not on run_started. - const eventData = - data.eventType === 'run_started' - ? undefined - : 'eventData' in data - ? data.eventData - : undefined; const event: Event = { ...data, - ...(eventData !== undefined ? { eventData } : {}), runId: effectiveRunId, eventId, createdAt: now, From 5223192146d4287fd60b4db4bf7f199d742d8580 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 16:38:33 -0700 Subject: [PATCH 07/17] fix backend Signed-off-by: Peter Wielander --- packages/world-vercel/src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 1086bd790a..f982d1c093 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -62,7 +62,7 @@ function httpLog( * Example: 'https://workflow-server-git-branch-name.vercel.sh' */ const WORKFLOW_SERVER_URL_OVERRIDE = - 'https://workflow-server-eumrxk3ud.vercel.sh'; + 'https://workflow-server-git-peter-allow-start-new-run-directly.vercel.sh'; export interface APIConfig { token?: string; From 50336409aead1b7dca888a647fbeb48078f9f663 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 16:43:41 -0700 Subject: [PATCH 08/17] e2e test Signed-off-by: Peter Wielander --- packages/core/e2e/e2e.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 618ded95b1..76d91f0229 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -4,7 +4,9 @@ import { setTimeout as sleep } from 'node:timers/promises'; import { WorkflowRunCancelledError, WorkflowRunFailedError, + WorkflowWorldError, } from '@workflow/errors'; +import type { World } from '@workflow/world'; import { afterAll, assert, From d381cd3c91d2bc18ae8f02ed84fdae72c0d26ab1 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 16:59:39 -0700 Subject: [PATCH 09/17] docs Signed-off-by: Peter Wielander --- docs/content/docs/changelog/resilient-start.mdx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx index b353ed3c6e..543bf4587c 100644 --- a/docs/content/docs/changelog/resilient-start.mdx +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -1,13 +1,13 @@ --- -title: Resilient start() -description: Overhaul of start() to tolerate world-vercel storage unavailability when the queue is healthy. +title: Resilient run start +description: Overhaul run start logic to tolerate world storage unavailability, as long as the queue is healthy, and significantly speeds up run start. --- # Resilient `start()` ## Motivation -When `world-vercel` storage (DynamoDB) is unavailable but the queue (VQS) is up, +When `world` storage is unavailable but the queue is up, `start()` previously failed entirely because `world.events.create(run_created)` is called before `world.queue()`. This change decouples run creation from queue dispatch so that runs can still be accepted when storage is degraded. From 10838f3cb88d55fb26cd8ad274a4304af943a531 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 16:59:43 -0700 Subject: [PATCH 10/17] test fixes Signed-off-by: Peter Wielander --- packages/world-local/src/storage.test.ts | 14 ++++++-- packages/world/src/index.ts | 2 ++ packages/world/src/ulid.ts | 43 ++++++++++++++++++------ 3 files changed, 46 insertions(+), 13 deletions(-) diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index 878e545fda..98a0392952 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -2857,8 +2857,8 @@ describe('Storage', () => { }); it('should reject a runId with a timestamp too far in the past', async () => { - // 10 minutes ago — exceeds the 5-minute threshold - const runId = makeRunId(Date.now() - 10 * 60 * 1000); + // 25 hours ago — exceeds the 24-hour past threshold + const runId = makeRunId(Date.now() - 25 * 60 * 60 * 1000); await expect( storage.events.create(runId, runCreatedEvent) @@ -2869,8 +2869,16 @@ describe('Storage', () => { ).rejects.toThrow(/Invalid runId timestamp/); }); + it('should accept a runId with a timestamp 10 minutes in the past', async () => { + // 10 minutes ago — within the 24-hour past threshold + const runId = makeRunId(Date.now() - 10 * 60 * 1000); + const result = await storage.events.create(runId, runCreatedEvent); + expect(result.run).toBeDefined(); + expect(result.run!.runId).toBe(runId); + }); + it('should reject a runId with a timestamp too far in the future', async () => { - // 10 minutes from now + // 10 minutes from now — exceeds the 5-minute future threshold const runId = makeRunId(Date.now() + 10 * 60 * 1000); await expect( diff --git a/packages/world/src/index.ts b/packages/world/src/index.ts index ae30c9fda0..e3a574e3bf 100644 --- a/packages/world/src/index.ts +++ b/packages/world/src/index.ts @@ -54,7 +54,9 @@ export { export type * from './steps.js'; export { StepSchema, StepStatusSchema } from './steps.js'; export { + DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS, DEFAULT_TIMESTAMP_THRESHOLD_MS, + DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS, ulidToDate, validateUlidTimestamp, } from './ulid.js'; diff --git a/packages/world/src/ulid.ts b/packages/world/src/ulid.ts index ad59f1445c..276b5b0c80 100644 --- a/packages/world/src/ulid.ts +++ b/packages/world/src/ulid.ts @@ -4,13 +4,25 @@ import { z } from 'zod'; const UlidSchema = z.string().ulid(); /** - * Default threshold for ULID timestamp validation (24 hours in milliseconds). + * Default threshold for ULID timestamps in the past (24 hours). * * Set to 24 hours to support the resilient start path: when start() fails to * create run_created, the queue carries the run input and the runtime creates * the run on run_started. VQS supports delayed messages up to 24 hours. */ -export const DEFAULT_TIMESTAMP_THRESHOLD_MS = 24 * 60 * 60 * 1000; +export const DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS = 24 * 60 * 60 * 1000; + +/** + * Default threshold for ULID timestamps in the future (5 minutes). + * + * Kept tight to prevent abuse from client-generated ULIDs with manipulated + * future timestamps while still tolerating minor clock skew. + */ +export const DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS = 5 * 60 * 1000; + +/** @deprecated Use DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS instead */ +export const DEFAULT_TIMESTAMP_THRESHOLD_MS = + DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS; /** * Extracts a Date from a ULID string, or null if the string is not a valid ULID. @@ -25,18 +37,22 @@ export function ulidToDate(maybeUlid: string): Date | null { } /** - * Validates that a prefixed ULID's embedded timestamp is within an acceptable threshold - * of the current server time. This prevents client-generated ULIDs with manipulated timestamps. + * Validates that a prefixed ULID's embedded timestamp is within acceptable thresholds + * of the current server time. Uses asymmetric thresholds: 24h in the past (to support + * resilient start with queue delays) and 5min in the future (to prevent abuse while + * tolerating clock skew). * * @param prefixedUlid - The prefixed ULID to validate (e.g., "wrun_01ARYZ...") * @param prefix - The prefix to strip (e.g., "wrun_") - * @param thresholdMs - Maximum allowed drift in milliseconds (default: 5 minutes) + * @param pastThresholdMs - Maximum allowed age in the past (default: 24 hours) + * @param futureThresholdMs - Maximum allowed distance in the future (default: 5 minutes) * @returns null if valid, or an error message string if invalid */ export function validateUlidTimestamp( prefixedUlid: string, prefix: string, - thresholdMs: number = DEFAULT_TIMESTAMP_THRESHOLD_MS + pastThresholdMs: number = DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS, + futureThresholdMs: number = DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS ): string | null { const raw = prefixedUlid.startsWith(prefix) ? prefixedUlid.slice(prefix.length) @@ -48,13 +64,20 @@ export function validateUlidTimestamp( } const serverTimestamp = new Date(); - const driftMs = Math.abs(serverTimestamp.getTime() - ulidTimestamp.getTime()); + const diffMs = serverTimestamp.getTime() - ulidTimestamp.getTime(); - if (driftMs <= thresholdMs) { - return null; + // diffMs > 0 means the ULID is in the past; diffMs < 0 means it's in the future + if (diffMs > 0 && diffMs <= pastThresholdMs) { + return null; // Within past threshold + } + if (diffMs <= 0 && -diffMs <= futureThresholdMs) { + return null; // Within future threshold } + const driftMs = Math.abs(diffMs); const driftSeconds = Math.round(driftMs / 1000); + const direction = diffMs > 0 ? 'past' : 'future'; + const thresholdMs = diffMs > 0 ? pastThresholdMs : futureThresholdMs; const thresholdSeconds = Math.round(thresholdMs / 1000); - return `Invalid runId timestamp: embedded timestamp differs from server time by ${driftSeconds}s (threshold: ${thresholdSeconds}s)`; + return `Invalid runId timestamp: embedded timestamp is ${driftSeconds}s in the ${direction} (threshold: ${thresholdSeconds}s)`; } From ccba6331ca4cde7f5af94a7fd2fe01f45b6789d3 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 17:04:06 -0700 Subject: [PATCH 11/17] docs Signed-off-by: Peter Wielander --- .../docs/changelog/resilient-start.mdx | 66 +++++++++++-------- packages/world-local/src/storage.test.ts | 2 +- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx index 543bf4587c..f1bfcc5688 100644 --- a/docs/content/docs/changelog/resilient-start.mdx +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -32,11 +32,12 @@ dispatch so that runs can still be accepted when storage is degraded. - We no longer call `world.runs.get` or check the run status before starting. - We **always** call `world.events.create` with `run_started`, now also passing the run input that was sent through the queue. The response will be: - - **200 (now running)**: use the returned `Run` entity as the run. The response also - includes an `events` array of all events up to that point (typically `run_created` - and `run_started`), with data resolved. These are used to skip the very first - `world.events.list` call, reducing TTFB for the first invocation. - - **409 (already running)**: fetch the run via `world.runs.get` and proceed. + - **200 with event (now running)**: use the returned `Run` entity as the run. The + response also includes an `events` array of all events up to that point (typically + `run_created` and `run_started`), with data resolved. These are used to skip the + very first `world.events.list` call, reducing TTFB for the first invocation. + - **200 without event (already running)**: the run entity is returned directly + without creating a duplicate event. The runtime proceeds normally. - **410 (already finished)**: log and exit as usual. ### World / workflow-server changes @@ -46,11 +47,15 @@ dispatch so that runs can still be accepted when storage is degraded. 1. Creates a `run_created` event first (so the event log is consistent). 2. Strips the input from the `run_started` event data (it lives on `run_created`). 3. Then creates the `run_started` event normally. - 4. Emits a log and Datadog metric to track when this fallback path is hit. + 4. Emits a log and a Datadog metric (`workflow_server.resilient_start.run_created_via_run_started`) + to track when this fallback path is hit. +- When `run_started` encounters an **already-running** run, all worlds return `{ run }` + with `event: undefined` instead of throwing. No duplicate event is created. - When posting `run_started` and getting **200**, the response includes an `events` property with all events up to that point (data always resolved). -- The ULID timestamp validation threshold for `run_created` was relaxed from - **5 minutes** to **24 hours** so the queue can correctly re-try creation later. +- ULID timestamp validation now uses **asymmetric thresholds**: 24 hours in the past + (to support queue retry delays) and 5 minutes in the future (to prevent abuse while + tolerating clock skew). ## Decisions @@ -67,19 +72,18 @@ dispatch so that runs can still be accepted when storage is degraded. On the already-running path, we fall back to the normal `events.list` call. This is correct because only on 200 can we be certain we know the full event history. -4. **24-hour ULID threshold**: VQS supports delayed messages up to 24 hours. We match - this so a run_created retry can succeed even at the maximum queue delay. The - threshold still prevents abuse from manipulated timestamps. +4. **Asymmetric ULID thresholds**: VQS supports delayed messages up to 24 hours. We + allow 24h in the past so a run_created retry can succeed at maximum queue delay, but + keep the future threshold at 5 minutes to prevent abuse from manipulated timestamps. ## Implementation notes ### Error type mapping for terminal runs Previously, calling `run_started` on a terminal run threw `InvalidOperationStateError` -(HTTP 409). This was changed to `EntityGoneError` (HTTP 410) so the runtime correctly -distinguishes "already running" (409 → fetch and proceed) from "already finished" -(410 → exit immediately). This required updating integration tests in -`events-materialization.integration.ts`. +(HTTP 409) on workflow-server, or `EntityConflictError` on world-local/world-postgres. +This was changed to `EntityGoneError` (HTTP 410) / `RunExpiredError` so the runtime +correctly distinguishes "already running" from "already finished" (exit immediately). ### run_started on already-running runs @@ -100,24 +104,32 @@ resilient start behavior: - Stripping eventData from stored `run_started` events - Returning the `events` array on successful start -### RunStartedEventSchema eventData stripping +### Asymmetric ULID timestamp validation -The run input is passed through to `run_started`'s `eventData` but stripped before -the event is persisted — the data belongs on the `run_created` event only. The server -sets `effectiveEventData = undefined` for `run_started` events. +Both `@workflow/world` (`validateUlidTimestamp`) and `workflow-server` +(`Ulid.isTimestampWithinThreshold`) now accept separate past and future thresholds: + +- **Past**: 24 hours (`DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS`) +- **Future**: 5 minutes (`DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS`) + +The old `DEFAULT_TIMESTAMP_THRESHOLD_MS` constant is deprecated but aliased to the +past threshold for backwards compatibility. -### ULID threshold test updates +### Datadog metric -Both `workflow-server/test/helpers/ulid.test.ts` and -`test/api/events.integration.ts` had tests expecting 10-minute-old ULIDs to be -rejected. These were updated to use 25-hour offsets to match the new 24-hour threshold. +The resilient start fallback path emits a Datadog distribution metric: +`workflow_server.resilient_start.run_created_via_run_started`, tagged with +`workflow_name`. Query with `sum:workflow_server.resilient_start.run_created_via_run_started{*}`. + +### RunStartedEventSchema eventData stripping + +The run input is passed through to `run_started`'s `eventData` but stripped before +the event is persisted — the data belongs on the `run_created` event only. All worlds +strip eventData from stored `run_started` events. ## Follow-up work - [ ] Add e2e tests covering the degraded-storage start path against a live deployment. -- [ ] Monitor the "run_started created run" Datadog metric in production to understand - how often the fallback path is hit. +- [ ] Monitor the Datadog metric in production to understand how often the fallback is hit. - [ ] Consider whether the `events` optimization in the 200 response should also apply to re-enqueue cycles (currently only first invocation). -- [ ] Edit run ID validation so that we still allow up to 24 hours in the past, but only up to 5 minutes in the future -- [ ] Add a Datadog metric for the "run_started → already running (409)" path to track concurrent start frequency, so we can add alerting on it later diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index 98a0392952..fde756c57b 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -2848,7 +2848,7 @@ describe('Storage', () => { }); it('should accept a runId within the threshold', async () => { - // 4 minutes ago — within the 5-minute default + // 4 minutes ago — within the 24-hour past threshold const runId = makeRunId(Date.now() - 4 * 60 * 1000); const result = await storage.events.create(runId, runCreatedEvent); From 770c4197021a91b155dcbb9db9abd05242b629aa Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 17:14:02 -0700 Subject: [PATCH 12/17] fix Signed-off-by: Peter Wielander --- packages/world-vercel/src/events.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index c37edac2af..6aeac0e9e8 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -459,7 +459,9 @@ async function createWorkflowRunEventInner( }); return { - event: stripEventAndLegacyRefs(wireResult.event, resolveData), + event: wireResult.event + ? stripEventAndLegacyRefs(wireResult.event, resolveData) + : undefined, run: wireResult.run, step: wireResult.step ? deserializeStep(wireResult.step) : undefined, hook: wireResult.hook, @@ -484,7 +486,9 @@ async function createWorkflowRunEventInner( // undefined (lazy ref mode), so deserializeError normalizes it into the // StructuredError shape expected by WorkflowRun consumers. return { - event: stripEventAndLegacyRefs(wireResult.event, resolveData), + event: wireResult.event + ? stripEventAndLegacyRefs(wireResult.event, resolveData) + : undefined, run: wireResult.run ? deserializeError(wireResult.run) : undefined, From ee29ee9c466ed19c285f6b31b84daa7f5e128cb6 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 27 Mar 2026 20:48:23 -0700 Subject: [PATCH 13/17] base64? Signed-off-by: Peter Wielander --- docs/content/docs/changelog/resilient-start.mdx | 9 +++++++++ packages/core/src/runtime.ts | 11 +++++++++-- packages/core/src/runtime/start.ts | 9 ++++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx index f1bfcc5688..e18cde67eb 100644 --- a/docs/content/docs/changelog/resilient-start.mdx +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -121,6 +121,15 @@ The resilient start fallback path emits a Datadog distribution metric: `workflow_server.resilient_start.run_created_via_run_started`, tagged with `workflow_name`. Query with `sum:workflow_server.resilient_start.run_created_via_run_started{*}`. +### Base64 encoding for queue transport + +`Uint8Array` values (the serialized workflow input) don't survive JSON serialization +through the queue — they get corrupted to `{0: 72, 1: 101, ...}` objects. The `runInput` +payload in the queue message now base64-encodes binary input in `start()` and the +runtime decodes it back to `Uint8Array` before passing it to `world.events.create`. +This was caught by the `spawnWorkflowFromStepWorkflow` e2e test where the child +workflow's input was being corrupted. + ### RunStartedEventSchema eventData stripping The run input is passed through to `run_started`'s `eventData` but stripped before diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index ce60426c69..6a07815124 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -212,11 +212,18 @@ export function workflowEntrypoint( eventType: 'run_started', specVersion: SPEC_VERSION_CURRENT, // Pass run input from queue so server can create - // the run if run_created was missed + // the run if run_created was missed. + // Input is base64-encoded for queue transport since + // Uint8Array doesn't survive JSON serialization. ...(runInput ? { eventData: { - input: runInput.input, + input: + typeof runInput.input === 'string' + ? Uint8Array.from(atob(runInput.input), (c) => + c.charCodeAt(0) + ) + : runInput.input, deploymentId: runInput.deploymentId, workflowName: runInput.workflowName, executionContext: runInput.executionContext, diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 7565de885e..b68dd41066 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -165,6 +165,13 @@ export async function start( const executionContext = { traceCarrier, workflowCoreVersion }; + // Encode input for queue transport — Uint8Array doesn't survive JSON + // serialization, so we base64-encode it for the queue payload. + const encodedInput = + workflowArguments instanceof Uint8Array + ? btoa(String.fromCharCode(...workflowArguments)) + : workflowArguments; + // Call events.create (run_created) and queue in parallel. // If events.create fails with 429/5xx, the run was still accepted // via the queue and creation will be re-tried async by the runtime. @@ -189,7 +196,7 @@ export async function start( runId, traceCarrier, runInput: { - input: workflowArguments, + input: encodedInput, deploymentId, workflowName, specVersion, From 11631dcffa0c58fd9ea4b85b972741b2c4de0df5 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Sat, 28 Mar 2026 16:00:12 -0700 Subject: [PATCH 14/17] change e2e test Signed-off-by: Peter Wielander --- packages/core/e2e/e2e.test.ts | 47 +++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 76d91f0229..8ae221bf35 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -2158,4 +2158,51 @@ describe('e2e', () => { }); } ); + + // ============================================================ + // Resilient start: run completes even when run_created fails + // ============================================================ + // TODO: Switch this to a stream-based workflow (e.g. readableStreamWorkflow) + // to also verify that serialization, flushing, and binary data work correctly + // over the queue boundary. Currently using addTenWorkflow to avoid the + // skipIf(isLocalDeployment()) barrier that stream tests require. + test( + 'resilient start: addTenWorkflow completes when run_created returns 500', + { timeout: 60_000 }, + async () => { + // Get the real world and wrap it so the first events.create call + // (run_created) throws a 500 server error. The queue should still + // be dispatched with runInput, and the runtime should bootstrap + // the run via the run_started fallback path. + const realWorld = getWorld(); + let createCallCount = 0; + const stubbedWorld: World = { + ...realWorld, + events: { + ...realWorld.events, + create: (async (...args: Parameters) => { + createCallCount++; + if (createCallCount === 1) { + // Fail the very first call (run_created from start()) + throw new WorkflowWorldError('Simulated storage outage', { + status: 500, + }); + } + return realWorld.events.create(...args); + }) as World['events']['create'], + }, + }; + + const run = await start(await e2e('addTenWorkflow'), [123], { + world: stubbedWorld, + }); + + // The run should still complete despite run_created failing + const returnValue = await run.returnValue; + expect(returnValue).toBe(133); + + // Verify the first call was indeed intercepted + expect(createCallCount).toBeGreaterThanOrEqual(2); + } + ); }); From ddd0bfd417f16d9e909273d86fc887c608195547 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Sun, 29 Mar 2026 09:48:45 -0700 Subject: [PATCH 15/17] changeset Signed-off-by: Peter Wielander --- .changeset/four-donuts-glow.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/four-donuts-glow.md diff --git a/.changeset/four-donuts-glow.md b/.changeset/four-donuts-glow.md new file mode 100644 index 0000000000..4d85de2356 --- /dev/null +++ b/.changeset/four-donuts-glow.md @@ -0,0 +1,9 @@ +--- +"@workflow/world-postgres": patch +"@workflow/world-vercel": patch +"@workflow/world-local": patch +"@workflow/world": patch +"@workflow/core": minor +--- + +Allow workflow invocation to create run if initial storage call in `start` did not succeed. Send run input through queue to enable this. Allow creating run_created and run_started events together in World, and skip first event list call by returning events directly. From 3ed82c1d91ceb6b1ffa931d0672be017b69cafe4 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Sun, 29 Mar 2026 10:00:42 -0700 Subject: [PATCH 16/17] 409 in case of cold start + 404 backoff when polling run values Signed-off-by: Peter Wielander --- packages/core/e2e/e2e.test.ts | 13 +++++++++---- packages/core/src/runtime/run.ts | 20 ++++++++++++++++++++ packages/core/src/runtime/start.ts | 14 ++++++++++---- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 8ae221bf35..dd978e4347 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -2197,12 +2197,17 @@ describe('e2e', () => { world: stubbedWorld, }); - // The run should still complete despite run_created failing + // Verify the stub intercepted the run_created call (only call + // through the stubbed world — the server-side runtime uses its + // own world instance for run_started and subsequent events). + expect(createCallCount).toBe(1); + + // The run should still complete despite run_created failing. + // The runtime's resilient start path creates the run from + // run_started, so returnValue polling may initially get + // WorkflowRunNotFoundError before the queue delivers. const returnValue = await run.returnValue; expect(returnValue).toBe(133); - - // Verify the first call was indeed intercepted - expect(createCallCount).toBeGreaterThanOrEqual(2); } ); }); diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 2b6bda92c2..ee9540b852 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -243,10 +243,21 @@ export class Run { * @returns The workflow return value. */ private async pollReturnValue(): Promise { + // Track not-found retries separately: when run_created fails and the + // resilient start path hasn't created the run yet, runs.get throws + // WorkflowRunNotFoundError. We retry up to 3 times with back-off + // (1s, 3s, 6s = 10s total) to give the queue time to deliver. + let notFoundRetries = 0; + const NOT_FOUND_MAX_RETRIES = 3; + const NOT_FOUND_DELAYS = [1_000, 3_000, 6_000]; + while (true) { try { const run = await this.world.runs.get(this.runId); + // Run exists — reset not-found counter + notFoundRetries = 0; + if (run.status === 'completed') { const encryptionKey = await this.getEncryptionKey(); return await hydrateWorkflowReturnValue( @@ -270,6 +281,15 @@ export class Run { await new Promise((resolve) => setTimeout(resolve, 1_000)); continue; } + if ( + WorkflowRunNotFoundError.is(error) && + notFoundRetries < NOT_FOUND_MAX_RETRIES + ) { + const delay = NOT_FOUND_DELAYS[notFoundRetries]!; + notFoundRetries++; + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } throw error; } } diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index b68dd41066..7f3e14aaf5 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,5 +1,6 @@ import { waitUntil } from '@vercel/functions'; import { + EntityConflictError, ThrottleError, WorkflowRuntimeError, WorkflowWorldError, @@ -217,10 +218,15 @@ export async function start( // Handle events.create result if (runCreatedResult.status === 'rejected') { const err = runCreatedResult.reason; - // 429 (ThrottleError) and 5xx (WorkflowWorldError with status >= 500) - // are retryable — the run was accepted via the queue and creation - // will be re-tried by the runtime when it calls run_started. - if (isRetryableStartError(err)) { + if (EntityConflictError.is(err)) { + // 409: The run already exists. This can happen in extreme cases where + // the run creation call gets a cold start or other slowdown, and the queue + // + run_started call completes faster. We expect this to be <=1% of cases. + // In this case, we can safely return. + } else if (isRetryableStartError(err)) { + // 429 (ThrottleError) and 5xx (WorkflowWorldError with status >= 500) + // are retryable — the run was accepted via the queue and creation + // will be re-tried by the runtime when it calls run_started. runtimeLogger.warn( 'Run creation event failed, but the run was accepted via the queue. ' + 'The run_created event will be re-tried async by the runtime.', From 7b51efd8305a09044ce4bc7402801ec060b7763a Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 30 Mar 2026 14:16:16 -0700 Subject: [PATCH 17/17] remove override Signed-off-by: Peter Wielander --- packages/world-vercel/src/utils.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index f982d1c093..5db88afadd 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -61,8 +61,7 @@ function httpLog( * * Example: 'https://workflow-server-git-branch-name.vercel.sh' */ -const WORKFLOW_SERVER_URL_OVERRIDE = - 'https://workflow-server-git-peter-allow-start-new-run-directly.vercel.sh'; +const WORKFLOW_SERVER_URL_OVERRIDE = ''; export interface APIConfig { token?: string;