diff --git a/.changeset/better-peas-buy.md b/.changeset/better-peas-buy.md new file mode 100644 index 0000000000..2f499761b5 --- /dev/null +++ b/.changeset/better-peas-buy.md @@ -0,0 +1,8 @@ +--- +"@workflow/world": patch +"@workflow/core": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +--- + +Combine initial run fetch, event fetch, and run_started event creation diff --git a/.changeset/four-donuts-glow.md b/.changeset/four-donuts-glow.md new file mode 100644 index 0000000000..4d85de2356 --- /dev/null +++ b/.changeset/four-donuts-glow.md @@ -0,0 +1,9 @@ +--- +"@workflow/world-postgres": patch +"@workflow/world-vercel": patch +"@workflow/world-local": patch +"@workflow/world": patch +"@workflow/core": minor +--- + +Allow workflow invocation to create run if initial storage call in `start` did not succeed. Send run input through queue to enable this. Allow creating run_created and run_started events together in World, and skip first event list call by returning events directly. diff --git a/docs/content/docs/changelog/meta.json b/docs/content/docs/changelog/meta.json index 042ff8fc8b..0c01dff133 100644 --- a/docs/content/docs/changelog/meta.json +++ b/docs/content/docs/changelog/meta.json @@ -1,5 +1,5 @@ { "title": "Changelog", - "pages": ["index", "eager-processing"], + "pages": ["index", "eager-processing", "resilient-start"], "defaultOpen": false } diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx new file mode 100644 index 0000000000..e18cde67eb --- /dev/null +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -0,0 +1,144 @@ +--- +title: Resilient run start +description: Overhaul run start logic to tolerate world storage unavailability, as long as the queue is healthy, and significantly speeds up run start. +--- + +# Resilient `start()` + +## Motivation + +When `world` storage is unavailable but the queue is up, +`start()` previously failed entirely because `world.events.create(run_created)` +is called before `world.queue()`. This change decouples run creation from queue +dispatch so that runs can still be accepted when storage is degraded. + +## Design + +### `start()` changes (packages/core) + +- `world.events.create` (run_created) and `world.queue` are now called **in parallel**. +- If `events.create` errors with **429 or 5xx**, we log a warning saying that run + creation failed but the run was accepted — creation will be re-tried async by the + runtime when it processes the queue message. +- If `world.queue` fails, we still throw — the run truly failed and was not enqueued. +- The queue invocation now receives all the run inputs (`input`, `deploymentId`, + `workflowName`, `specVersion`, `executionContext`) so the runtime can create the + run later if needed. +- When the runtime re-enqueues itself, it does **not** pass these inputs — only the + first queue cycle carries them. + +### `workflowEntrypoint` changes (packages/core) + +- We no longer call `world.runs.get` or check the run status before starting. +- We **always** call `world.events.create` with `run_started`, now also passing the + run input that was sent through the queue. The response will be: + - **200 with event (now running)**: use the returned `Run` entity as the run. The + response also includes an `events` array of all events up to that point (typically + `run_created` and `run_started`), with data resolved. These are used to skip the + very first `world.events.list` call, reducing TTFB for the first invocation. + - **200 without event (already running)**: the run entity is returned directly + without creating a duplicate event. The runtime proceeds normally. + - **410 (already finished)**: log and exit as usual. + +### World / workflow-server changes + +- Posting `run_started` to a **non-existent** run is now allowed when the run input is + sent along with the payload. The server: + 1. Creates a `run_created` event first (so the event log is consistent). + 2. Strips the input from the `run_started` event data (it lives on `run_created`). + 3. Then creates the `run_started` event normally. + 4. Emits a log and a Datadog metric (`workflow_server.resilient_start.run_created_via_run_started`) + to track when this fallback path is hit. +- When `run_started` encounters an **already-running** run, all worlds return `{ run }` + with `event: undefined` instead of throwing. No duplicate event is created. +- When posting `run_started` and getting **200**, the response includes an `events` + property with all events up to that point (data always resolved). +- ULID timestamp validation now uses **asymmetric thresholds**: 24 hours in the past + (to support queue retry delays) and 5 minutes in the future (to prevent abuse while + tolerating clock skew). + +## Decisions + +1. **Parallel not sequential**: We chose `Promise.allSettled` over sequential calls to + minimize latency in the happy path. The trade-off is slightly more complex error + handling. + +2. **Already-running returns run without event**: When `run_started` encounters an + already-running run, all worlds return `{ run }` with `event: undefined` (no + `events` array) instead of throwing. The runtime detects this by checking for + `result.event === undefined`. This avoids the extra `world.runs.get` round-trip. + +3. **Events in 200 response**: We only return events on the 200 path (first caller). + On the already-running path, we fall back to the normal `events.list` call. This is + correct because only on 200 can we be certain we know the full event history. + +4. **Asymmetric ULID thresholds**: VQS supports delayed messages up to 24 hours. We + allow 24h in the past so a run_created retry can succeed at maximum queue delay, but + keep the future threshold at 5 minutes to prevent abuse from manipulated timestamps. + +## Implementation notes + +### Error type mapping for terminal runs + +Previously, calling `run_started` on a terminal run threw `InvalidOperationStateError` +(HTTP 409) on workflow-server, or `EntityConflictError` on world-local/world-postgres. +This was changed to `EntityGoneError` (HTTP 410) / `RunExpiredError` so the runtime +correctly distinguishes "already running" from "already finished" (exit immediately). + +### run_started on already-running runs + +All worlds (workflow-server, world-local, world-postgres) now return the existing run +entity directly — with `event: undefined` — when `run_started` is called on an +already-running run. This avoids both a duplicate event and the extra `world.runs.get` +call that the previous 409-based approach required. The `EventResultResolveWireSchema` +in world-vercel was updated to make `event` optional. + +### world-local and world-postgres support + +Both world-local (filesystem) and world-postgres (Drizzle/SQL) now implement the full +resilient start behavior: + +- Creating runs from `run_started` when the run doesn't exist and eventData is provided +- Returning `{ run }` without event on already-running +- Throwing `RunExpiredError` on terminal runs +- Stripping eventData from stored `run_started` events +- Returning the `events` array on successful start + +### Asymmetric ULID timestamp validation + +Both `@workflow/world` (`validateUlidTimestamp`) and `workflow-server` +(`Ulid.isTimestampWithinThreshold`) now accept separate past and future thresholds: + +- **Past**: 24 hours (`DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS`) +- **Future**: 5 minutes (`DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS`) + +The old `DEFAULT_TIMESTAMP_THRESHOLD_MS` constant is deprecated but aliased to the +past threshold for backwards compatibility. + +### Datadog metric + +The resilient start fallback path emits a Datadog distribution metric: +`workflow_server.resilient_start.run_created_via_run_started`, tagged with +`workflow_name`. Query with `sum:workflow_server.resilient_start.run_created_via_run_started{*}`. + +### Base64 encoding for queue transport + +`Uint8Array` values (the serialized workflow input) don't survive JSON serialization +through the queue — they get corrupted to `{0: 72, 1: 101, ...}` objects. The `runInput` +payload in the queue message now base64-encodes binary input in `start()` and the +runtime decodes it back to `Uint8Array` before passing it to `world.events.create`. +This was caught by the `spawnWorkflowFromStepWorkflow` e2e test where the child +workflow's input was being corrupted. + +### RunStartedEventSchema eventData stripping + +The run input is passed through to `run_started`'s `eventData` but stripped before +the event is persisted — the data belongs on the `run_created` event only. All worlds +strip eventData from stored `run_started` events. + +## Follow-up work + +- [ ] Add e2e tests covering the degraded-storage start path against a live deployment. +- [ ] Monitor the Datadog metric in production to understand how often the fallback is hit. +- [ ] Consider whether the `events` optimization in the 200 response should also apply + to re-enqueue cycles (currently only first invocation). diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 618ded95b1..dd978e4347 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -4,7 +4,9 @@ import { setTimeout as sleep } from 'node:timers/promises'; import { WorkflowRunCancelledError, WorkflowRunFailedError, + WorkflowWorldError, } from '@workflow/errors'; +import type { World } from '@workflow/world'; import { afterAll, assert, @@ -2156,4 +2158,56 @@ describe('e2e', () => { }); } ); + + // ============================================================ + // Resilient start: run completes even when run_created fails + // ============================================================ + // TODO: Switch this to a stream-based workflow (e.g. readableStreamWorkflow) + // to also verify that serialization, flushing, and binary data work correctly + // over the queue boundary. Currently using addTenWorkflow to avoid the + // skipIf(isLocalDeployment()) barrier that stream tests require. + test( + 'resilient start: addTenWorkflow completes when run_created returns 500', + { timeout: 60_000 }, + async () => { + // Get the real world and wrap it so the first events.create call + // (run_created) throws a 500 server error. The queue should still + // be dispatched with runInput, and the runtime should bootstrap + // the run via the run_started fallback path. + const realWorld = getWorld(); + let createCallCount = 0; + const stubbedWorld: World = { + ...realWorld, + events: { + ...realWorld.events, + create: (async (...args: Parameters) => { + createCallCount++; + if (createCallCount === 1) { + // Fail the very first call (run_created from start()) + throw new WorkflowWorldError('Simulated storage outage', { + status: 500, + }); + } + return realWorld.events.create(...args); + }) as World['events']['create'], + }, + }; + + const run = await start(await e2e('addTenWorkflow'), [123], { + world: stubbedWorld, + }); + + // Verify the stub intercepted the run_created call (only call + // through the stubbed world — the server-side runtime uses its + // own world instance for run_started and subsequent events). + expect(createCallCount).toBe(1); + + // The run should still complete despite run_created failing. + // The runtime's resilient start path creates the run from + // run_started, so returnValue polling may initially get + // WorkflowRunNotFoundError before the queue delivers. + const returnValue = await run.returnValue; + expect(returnValue).toBe(133); + } + ); }); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index da7a407bd3..6a07815124 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -4,8 +4,6 @@ import { RunExpiredError, WorkflowRuntimeError, } from '@workflow/errors'; -import { classifyRunError } from './classify-error.js'; -import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; import { parseWorkflowName } from '@workflow/utils/parse-name'; import { type Event, @@ -13,9 +11,11 @@ import { WorkflowInvokePayloadSchema, type WorkflowRun, } from '@workflow/world'; +import { classifyRunError } from './classify-error.js'; import { importKey } from './encryption.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; +import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; import { getAllWorkflowRunEvents, getQueueOverhead, @@ -105,6 +105,7 @@ export function workflowEntrypoint( runId, traceCarrier: traceContext, requestedAt, + runInput, } = WorkflowInvokePayloadSchema.parse(message_); const { requestId } = metadata; // Extract the workflow name from the topic name @@ -191,50 +192,74 @@ export function workflowEntrypoint( }); let workflowStartedAt = -1; - let workflowRun = await world.runs.get(runId); + let workflowRun: WorkflowRun | undefined; + // Pre-loaded events from run_started response (first caller optimization) + let preloadedEvents: Event[] | undefined; // --- Infrastructure: prepare the run state --- + // Always call run_started directly — this both transitions + // the run to 'running' AND returns the run entity, saving + // a separate runs.get round-trip. When runInput is present + // (resilient start), pass it so the server can create the + // run if run_created was missed. // Network/server errors propagate to the queue handler for retry. // WorkflowRuntimeError (data integrity issues) are fatal and // produce run_failed since retrying won't fix them. try { - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create( - runId, - { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }, - { requestId } + const result = await world.events.create( + runId, + { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + // Pass run input from queue so server can create + // the run if run_created was missed. + // Input is base64-encoded for queue transport since + // Uint8Array doesn't survive JSON serialization. + ...(runInput + ? { + eventData: { + input: + typeof runInput.input === 'string' + ? Uint8Array.from(atob(runInput.input), (c) => + c.charCodeAt(0) + ) + : runInput.input, + deploymentId: runInput.deploymentId, + workflowName: runInput.workflowName, + executionContext: runInput.executionContext, + }, + } + : {}), + }, + { requestId } + ); + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` ); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; + } + workflowRun = result.run; + + // If the response includes events, use them to skip + // the initial events.list call and reduce TTFB. + if (result.events && result.events.length > 0) { + preloadedEvents = result.events; } - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. if (!workflowRun.startedAt) { throw new WorkflowRuntimeError( `Workflow run "${runId}" has no "startedAt" timestamp` ); } } catch (err) { - // Run was concurrently completed/failed/cancelled - // between the GET and the run_started event creation - if (EntityConflictError.is(err) || RunExpiredError.is(err)) { + if (RunExpiredError.is(err)) { + // 410: already finished — log and exit runtimeLogger.info( 'Run already finished during setup, skipping', { workflowRunId: runId, message: err.message } ); return; - } - if (err instanceof WorkflowRuntimeError) { + } else if (err instanceof WorkflowRuntimeError) { runtimeLogger.error( 'Fatal runtime error during workflow setup', { workflowRunId: runId, error: err.message } @@ -265,8 +290,15 @@ export function workflowEntrypoint( throw failErr; } return; + } else { + throw err; } - throw err; + } + + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` + ); } workflowStartedAt = +workflowRun.startedAt; @@ -294,8 +326,12 @@ export function workflowEntrypoint( return; } - // Load all events into memory before running - const events = await getAllWorkflowRunEvents(workflowRun.runId); + // Load all events into memory before running. + // If we got events from the run_started response, + // skip the events.list round-trip to reduce TTFB. + const events = + preloadedEvents ?? + (await getAllWorkflowRunEvents(workflowRun.runId)); // Check for any elapsed waits and create wait_completed events const now = Date.now(); diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 2b6bda92c2..ee9540b852 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -243,10 +243,21 @@ export class Run { * @returns The workflow return value. */ private async pollReturnValue(): Promise { + // Track not-found retries separately: when run_created fails and the + // resilient start path hasn't created the run yet, runs.get throws + // WorkflowRunNotFoundError. We retry up to 3 times with back-off + // (1s, 3s, 6s = 10s total) to give the queue time to deliver. + let notFoundRetries = 0; + const NOT_FOUND_MAX_RETRIES = 3; + const NOT_FOUND_DELAYS = [1_000, 3_000, 6_000]; + while (true) { try { const run = await this.world.runs.get(this.runId); + // Run exists — reset not-found counter + notFoundRetries = 0; + if (run.status === 'completed') { const encryptionKey = await this.getEncryptionKey(); return await hydrateWorkflowReturnValue( @@ -270,6 +281,15 @@ export class Run { await new Promise((resolve) => setTimeout(resolve, 1_000)); continue; } + if ( + WorkflowRunNotFoundError.is(error) && + notFoundRetries < NOT_FOUND_MAX_RETRIES + ) { + const delay = NOT_FOUND_DELAYS[notFoundRetries]!; + notFoundRetries++; + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } throw error; } } diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index 049cd5ffb3..fab28aea4c 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -1,4 +1,4 @@ -import { WorkflowRuntimeError } from '@workflow/errors'; +import { WorkflowRuntimeError, WorkflowWorldError } from '@workflow/errors'; import { SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY } from '@workflow/world'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { start } from './start.js'; @@ -380,4 +380,75 @@ describe('start', () => { ); }); }); + + describe('resilient start (run_created failure)', () => { + const validWorkflow = Object.assign(() => Promise.resolve('result'), { + workflowId: 'test-workflow', + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('should succeed when events.create throws a 500 error (queue still dispatched)', async () => { + const mockQueue = vi.fn().mockResolvedValue({ messageId: null }); + const serverError = new WorkflowWorldError('Internal Server Error', { + status: 500, + }); + const mockEventsCreate = vi.fn().mockRejectedValue(serverError); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + // start() should NOT throw — the queue was still dispatched + const run = await start(validWorkflow, [42]); + expect(run.runId).toMatch(/^wrun_/); + + // Queue should have been called with runInput + expect(mockQueue).toHaveBeenCalledTimes(1); + const [, queuePayload] = mockQueue.mock.calls[0]; + expect(queuePayload.runInput).toBeDefined(); + expect(queuePayload.runInput.deploymentId).toBe('deploy_123'); + expect(queuePayload.runInput.workflowName).toBe('test-workflow'); + expect(queuePayload.runInput.specVersion).toBe(SPEC_VERSION_CURRENT); + }); + + it('should throw when queue fails even if events.create succeeds', async () => { + const mockEventsCreate = vi.fn().mockResolvedValue({ + run: { runId: 'wrun_test', status: 'pending' }, + }); + const mockQueue = vi + .fn() + .mockRejectedValue(new Error('Queue unavailable')); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + await expect(start(validWorkflow, [])).rejects.toThrow( + 'Queue unavailable' + ); + }); + + it('should throw when events.create fails with a non-retryable error (e.g. 400)', async () => { + const badRequest = new WorkflowWorldError('Bad Request', { + status: 400, + }); + const mockEventsCreate = vi.fn().mockRejectedValue(badRequest); + const mockQueue = vi.fn().mockResolvedValue({ messageId: null }); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + await expect(start(validWorkflow, [])).rejects.toThrow('Bad Request'); + }); + }); }); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 15339b954d..7f3e14aaf5 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,9 +1,15 @@ import { waitUntil } from '@vercel/functions'; -import { WorkflowRuntimeError } from '@workflow/errors'; +import { + EntityConflictError, + ThrottleError, + WorkflowRuntimeError, + WorkflowWorldError, +} from '@workflow/errors'; import type { WorkflowInvokePayload, World } from '@workflow/world'; import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { importKey } from '../encryption.js'; +import { runtimeLogger } from '../logger.js'; import type { Serializable } from '../schemas.js'; import { dehydrateWorkflowArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; @@ -157,33 +163,93 @@ export async function start( globalThis, v1Compat ); - const result = await world.events.create( - runId, - { - eventType: 'run_created', - specVersion, - eventData: { - deploymentId: deploymentId, - workflowName: workflowName, - input: workflowArguments, - executionContext: { traceCarrier, workflowCoreVersion }, + + const executionContext = { traceCarrier, workflowCoreVersion }; + + // Encode input for queue transport — Uint8Array doesn't survive JSON + // serialization, so we base64-encode it for the queue payload. + const encodedInput = + workflowArguments instanceof Uint8Array + ? btoa(String.fromCharCode(...workflowArguments)) + : workflowArguments; + + // Call events.create (run_created) and queue in parallel. + // If events.create fails with 429/5xx, the run was still accepted + // via the queue and creation will be re-tried async by the runtime. + const [runCreatedResult, queueResult] = await Promise.allSettled([ + world.events.create( + runId, + { + eventType: 'run_created', + specVersion, + eventData: { + deploymentId: deploymentId, + workflowName: workflowName, + input: workflowArguments, + executionContext, + }, }, - }, - { v1Compat } - ); + { v1Compat } + ), + world.queue( + getWorkflowQueueName(workflowName), + { + runId, + traceCarrier, + runInput: { + input: encodedInput, + deploymentId, + workflowName, + specVersion, + executionContext, + }, + } satisfies WorkflowInvokePayload, + { + deploymentId, + } + ), + ]); - // Assert that the run was created - if (!result.run) { - throw new WorkflowRuntimeError( - "Missing 'run' in server response for 'run_created' event" - ); + // Queue failure is always fatal — the run was not enqueued + if (queueResult.status === 'rejected') { + throw queueResult.reason; } - // Verify server accepted our runId - if (!v1Compat && result.run.runId !== runId) { - throw new WorkflowRuntimeError( - `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` - ); + // Handle events.create result + if (runCreatedResult.status === 'rejected') { + const err = runCreatedResult.reason; + if (EntityConflictError.is(err)) { + // 409: The run already exists. This can happen in extreme cases where + // the run creation call gets a cold start or other slowdown, and the queue + // + run_started call completes faster. We expect this to be <=1% of cases. + // In this case, we can safely return. + } else if (isRetryableStartError(err)) { + // 429 (ThrottleError) and 5xx (WorkflowWorldError with status >= 500) + // are retryable — the run was accepted via the queue and creation + // will be re-tried by the runtime when it calls run_started. + runtimeLogger.warn( + 'Run creation event failed, but the run was accepted via the queue. ' + + 'The run_created event will be re-tried async by the runtime.', + { workflowRunId: runId, error: err.message } + ); + } else { + throw err; + } + } else { + const result = runCreatedResult.value; + // Assert that the run was created + if (!result.run) { + throw new WorkflowRuntimeError( + "Missing 'run' in server response for 'run_created' event" + ); + } + + // Verify server accepted our runId + if (!v1Compat && result.run.runId !== runId) { + throw new WorkflowRuntimeError( + `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` + ); + } } waitUntil( @@ -197,22 +263,23 @@ export async function start( span?.setAttributes({ ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowRunStatus(result.run.status), ...Attribute.DeploymentId(deploymentId), }); - await world.queue( - getWorkflowQueueName(workflowName), - { - runId, - traceCarrier, - } satisfies WorkflowInvokePayload, - { - deploymentId, - } - ); - return new Run(runId); }); }); } + +/** + * Checks if an error from events.create (run_created) is retryable, + * meaning the queue can re-try creation later via the run_started path. + * - ThrottleError (429): rate limited, will succeed later + * - WorkflowWorldError with status >= 500: server error, will succeed later + */ +function isRetryableStartError(err: unknown): boolean { + if (ThrottleError.is(err)) return true; + if (WorkflowWorldError.is(err) && err.status && err.status >= 500) + return true; + return false; +} diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index 878e545fda..fde756c57b 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -2848,7 +2848,7 @@ describe('Storage', () => { }); it('should accept a runId within the threshold', async () => { - // 4 minutes ago — within the 5-minute default + // 4 minutes ago — within the 24-hour past threshold const runId = makeRunId(Date.now() - 4 * 60 * 1000); const result = await storage.events.create(runId, runCreatedEvent); @@ -2857,8 +2857,8 @@ describe('Storage', () => { }); it('should reject a runId with a timestamp too far in the past', async () => { - // 10 minutes ago — exceeds the 5-minute threshold - const runId = makeRunId(Date.now() - 10 * 60 * 1000); + // 25 hours ago — exceeds the 24-hour past threshold + const runId = makeRunId(Date.now() - 25 * 60 * 60 * 1000); await expect( storage.events.create(runId, runCreatedEvent) @@ -2869,8 +2869,16 @@ describe('Storage', () => { ).rejects.toThrow(/Invalid runId timestamp/); }); + it('should accept a runId with a timestamp 10 minutes in the past', async () => { + // 10 minutes ago — within the 24-hour past threshold + const runId = makeRunId(Date.now() - 10 * 60 * 1000); + const result = await storage.events.create(runId, runCreatedEvent); + expect(result.run).toBeDefined(); + expect(result.run!.runId).toBe(runId); + }); + it('should reject a runId with a timestamp too far in the future', async () => { - // 10 minutes from now + // 10 minutes from now — exceeds the 5-minute future threshold const runId = makeRunId(Date.now() + 10 * 60 * 1000); await expect( diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index efa031afcf..03f2dbb571 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -124,6 +124,72 @@ export function createEventsStorage( WorkflowRunSchema, tag ); + + // Resilient start: run_started on non-existent run with eventData + // creates the run first, so the queue can bootstrap a run that + // failed to create during start(). + if ( + data.eventType === 'run_started' && + !currentRun && + 'eventData' in data && + data.eventData + ) { + const runInputData = data.eventData as { + deploymentId?: string; + workflowName?: string; + input?: any; + executionContext?: Record; + }; + if ( + runInputData.deploymentId && + runInputData.workflowName && + runInputData.input !== undefined + ) { + // Create the run entity + const createdRun: WorkflowRun = { + runId: effectiveRunId, + deploymentId: runInputData.deploymentId, + status: 'pending', + workflowName: runInputData.workflowName, + specVersion: effectiveSpecVersion, + executionContext: runInputData.executionContext, + input: runInputData.input, + output: undefined, + error: undefined, + startedAt: undefined, + completedAt: undefined, + createdAt: now, + updatedAt: now, + }; + await writeJSON( + taggedPath(basedir, 'runs', effectiveRunId, tag), + createdRun + ); + + // Create run_created event + const runCreatedEventId = `evnt_${monotonicUlid()}`; + const runCreatedEvent: Event = { + eventType: 'run_created', + runId: effectiveRunId, + eventId: runCreatedEventId, + createdAt: now, + specVersion: effectiveSpecVersion, + eventData: { + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + input: runInputData.input, + executionContext: runInputData.executionContext, + }, + }; + const createdCompositeKey = `${effectiveRunId}-${runCreatedEventId}`; + await writeJSON( + taggedPath(basedir, 'events', createdCompositeKey, tag), + runCreatedEvent + ); + + currentRun = createdRun; + } + } } // ============================================================ @@ -190,7 +256,15 @@ export function createEventsStorage( }; } - // Run state transitions are not allowed on terminal runs + // For run_started on terminal runs, use RunExpiredError so the + // runtime knows to exit without retrying. + if (data.eventType === 'run_started') { + throw new RunExpiredError( + `Workflow run "${effectiveRunId}" is already in terminal state "${currentRun.status}"` + ); + } + + // Other run state transitions are not allowed on terminal runs if ( runTerminalEvents.includes(data.eventType) || data.eventType === 'run_cancelled' @@ -280,6 +354,10 @@ export function createEventsStorage( createdAt: now, specVersion: effectiveSpecVersion, }; + // Strip eventData from run_started — it belongs on run_created only. + if (data.eventType === 'run_started' && 'eventData' in event) { + delete (event as any).eventData; + } // Track entity created/updated for EventResult let run: WorkflowRun | undefined; @@ -316,6 +394,12 @@ export function createEventsStorage( } else if (data.eventType === 'run_started') { // Reuse currentRun from validation (already read above) if (currentRun) { + // If already running, return the run directly without + // creating a duplicate event. + if (currentRun.status === 'running') { + return { run: currentRun }; + } + run = { runId: currentRun.runId, deploymentId: currentRun.deploymentId, @@ -832,6 +916,21 @@ export function createEventsStorage( const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; const filteredEvent = stripEventDataRefs(event, resolveData); + // For run_started: include all events so the runtime can skip + // the initial events.list call and reduce TTFB. + let events: Event[] | undefined; + if (data.eventType === 'run_started' && run) { + const allEvents = await paginatedFileSystemQuery({ + directory: path.join(basedir, 'events'), + schema: EventSchema, + filePrefix: `${effectiveRunId}-`, + sortOrder: 'asc', + getCreatedAt: getObjectCreatedAt('evnt'), + getId: (e) => e.eventId, + }); + events = allEvents.data; + } + // Return EventResult with event and any created/updated entity return { event: filteredEvent, @@ -839,6 +938,7 @@ export function createEventsStorage( step, hook, wait, + events, }; }, diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index d65dfa5ab9..c2fbf207ad 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -373,6 +373,67 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { runId: effectiveRunId, }); currentRun = runValue ?? null; + + // Resilient start: run_started on non-existent run with eventData + // creates the run first, so the queue can bootstrap a run that + // failed to create during start(). + if ( + data.eventType === 'run_started' && + !currentRun && + 'eventData' in data && + data.eventData + ) { + const runInputData = (data as any).eventData as { + deploymentId?: string; + workflowName?: string; + input?: any; + executionContext?: Record; + }; + if ( + runInputData.deploymentId && + runInputData.workflowName && + runInputData.input !== undefined + ) { + // Create the run entity + const [createdRun] = await drizzle + .insert(Schema.runs) + .values({ + runId: effectiveRunId, + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + specVersion: effectiveSpecVersion, + input: runInputData.input as SerializedContent, + executionContext: runInputData.executionContext as + | SerializedContent + | undefined, + status: 'pending', + }) + .onConflictDoNothing() + .returning(); + + if (createdRun) { + // Create run_created event + const runCreatedEventId = `wevt_${ulid()}`; + await drizzle.insert(events).values({ + runId: effectiveRunId, + eventId: runCreatedEventId, + eventType: 'run_created', + eventData: { + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + input: runInputData.input, + executionContext: runInputData.executionContext, + }, + specVersion: effectiveSpecVersion, + }); + + currentRun = { + status: 'pending', + specVersion: effectiveSpecVersion, + }; + } + } + } } // ============================================================ @@ -444,7 +505,15 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { }; } - // Run state transitions are not allowed on terminal runs + // For run_started on terminal runs, use RunExpiredError so the + // runtime knows to exit without retrying. + if (data.eventType === 'run_started') { + throw new RunExpiredError( + `Workflow run "${effectiveRunId}" is already in terminal state "${currentRun.status}"` + ); + } + + // Other run state transitions are not allowed on terminal runs if ( runTerminalEvents.includes(data.eventType) || data.eventType === 'run_cancelled' @@ -563,6 +632,18 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { // Handle run_started event: update run status if (data.eventType === 'run_started') { + // If already running, return the run directly without event + if (currentRun?.status === 'running') { + const [fullRun] = await drizzle + .select() + .from(Schema.runs) + .where(eq(Schema.runs.runId, effectiveRunId)) + .limit(1); + if (fullRun) { + return { run: deserializeRunError(compact(fullRun)) }; + } + } + const [runValue] = await drizzle .update(Schema.runs) .set({ @@ -1135,6 +1216,14 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { } } + // Strip eventData from run_started — it belongs on run_created only. + const storedEventData = + data.eventType === 'run_started' + ? undefined + : 'eventData' in data + ? data.eventData + : undefined; + const [value] = await drizzle .insert(events) .values({ @@ -1142,22 +1231,47 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { eventId, correlationId: data.correlationId, eventType: data.eventType, - eventData: 'eventData' in data ? data.eventData : undefined, + eventData: storedEventData, specVersion: effectiveSpecVersion, }) .returning({ createdAt: events.createdAt }); if (!value) { throw new EntityConflictError(`Event ${eventId} could not be created`); } - const result = { ...data, ...value, runId: effectiveRunId, eventId }; + const result = { + ...data, + ...value, + runId: effectiveRunId, + eventId, + ...(storedEventData !== undefined + ? { eventData: storedEventData } + : {}), + }; + if (data.eventType === 'run_started') { + delete (result as any).eventData; + } const parsed = EventSchema.parse(result); const resolveData = params?.resolveData ?? 'all'; + + // For run_started: include all events so the runtime can skip + // the initial events.list call and reduce TTFB. + let allEvents: Event[] | undefined; + if (data.eventType === 'run_started' && run) { + const eventRows = await drizzle + .select() + .from(Schema.events) + .where(eq(Schema.events.runId, effectiveRunId)) + .orderBy(Schema.events.eventId); + allEvents = eventRows.map((e) => EventSchema.parse(compact(e))); + } + return { event: stripEventDataRefs(parsed, resolveData), run, step, hook, wait, + events: allEvents, }; }, async get( diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 3f5acef00d..6aeac0e9e8 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -60,17 +60,19 @@ function stripEventAndLegacyRefs( // undefined), so we use the looser WorkflowRunWireBaseSchema and normalize // the error via deserializeError() afterward. const EventResultResolveWireSchema = z.object({ - event: EventSchema, + event: EventSchema.optional(), run: WorkflowRunSchema.optional(), step: StepWireSchema.optional(), hook: HookSchema.optional(), + events: z.array(EventSchema).optional(), }); const EventResultLazyWireSchema = z.object({ - event: EventSchema, + event: EventSchema.optional(), run: WorkflowRunWireBaseSchema.optional(), step: StepWireSchema.optional(), hook: HookSchema.optional(), + events: z.array(EventSchema).optional(), }); // Schema for events returned with `remoteRefBehavior=lazy`. @@ -457,10 +459,13 @@ async function createWorkflowRunEventInner( }); return { - event: stripEventAndLegacyRefs(wireResult.event, resolveData), + event: wireResult.event + ? stripEventAndLegacyRefs(wireResult.event, resolveData) + : undefined, run: wireResult.run, step: wireResult.step ? deserializeStep(wireResult.step) : undefined, hook: wireResult.hook, + events: wireResult.events, }; } @@ -481,11 +486,14 @@ async function createWorkflowRunEventInner( // undefined (lazy ref mode), so deserializeError normalizes it into the // StructuredError shape expected by WorkflowRun consumers. return { - event: stripEventAndLegacyRefs(wireResult.event, resolveData), + event: wireResult.event + ? stripEventAndLegacyRefs(wireResult.event, resolveData) + : undefined, run: wireResult.run ? deserializeError(wireResult.run) : undefined, step: wireResult.step ? deserializeStep(wireResult.step) : undefined, hook: wireResult.hook, + events: wireResult.events, }; } diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index 2965906f7b..6784a0a819 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -223,9 +223,22 @@ const RunCreatedEventSchema = BaseEventSchema.extend({ /** * Event created when a workflow run starts executing. * Updates the run entity to status 'running'. + * + * The optional eventData carries run creation data for the resilient start path: + * when the run_created event failed (e.g., storage outage during start()), the + * runtime passes the run input through the queue so the server can create the run + * on the run_started call if it doesn't exist yet. */ const RunStartedEventSchema = BaseEventSchema.extend({ eventType: z.literal('run_started'), + eventData: z + .object({ + input: SerializedDataSchema.optional(), + deploymentId: z.string().optional(), + workflowName: z.string().optional(), + executionContext: z.record(z.string(), z.any()).optional(), + }) + .optional(), }); /** @@ -369,6 +382,12 @@ export interface EventResult { hook?: import('./hooks.js').Hook; /** The wait entity (for wait_created/wait_completed events) */ wait?: import('./waits.js').Wait; + /** + * All events up to this point, with data resolved. When populated + * on a run_started response, the runtime uses these to skip the + * initial events.list call and reduce TTFB. + */ + events?: Event[]; } export interface GetEventParams { diff --git a/packages/world/src/index.ts b/packages/world/src/index.ts index baa62a1480..e3a574e3bf 100644 --- a/packages/world/src/index.ts +++ b/packages/world/src/index.ts @@ -16,6 +16,7 @@ export { MessageId, QueuePayloadSchema, QueuePrefix, + RunInputSchema, StepInvokePayloadSchema, ValidQueueName, WorkflowInvokePayloadSchema, @@ -53,7 +54,9 @@ export { export type * from './steps.js'; export { StepSchema, StepStatusSchema } from './steps.js'; export { + DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS, DEFAULT_TIMESTAMP_THRESHOLD_MS, + DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS, ulidToDate, validateUlidTimestamp, } from './ulid.js'; diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 5093b62dd3..59f467b463 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -21,12 +21,29 @@ export type MessageId = z.infer; export const TraceCarrierSchema = z.record(z.string(), z.string()); export type TraceCarrier = z.infer; +/** + * Run creation data carried through the queue for resilient start. + * Only present on the first queue delivery — re-enqueues omit this. + * When the runtime processes the message, it passes this data to the + * run_started event so the server can create the run if it doesn't exist yet. + */ +export const RunInputSchema = z.object({ + input: z.unknown(), + deploymentId: z.string(), + workflowName: z.string(), + specVersion: z.number(), + executionContext: z.record(z.string(), z.any()).optional(), +}); +export type RunInput = z.infer; + export const WorkflowInvokePayloadSchema = z.object({ runId: z.string(), traceCarrier: TraceCarrierSchema.optional(), requestedAt: z.coerce.date().optional(), /** Number of times this message has been re-enqueued due to server errors (5xx) */ serverErrorRetryCount: z.number().int().optional(), + /** Run creation data, only present on the first queue delivery from start() */ + runInput: RunInputSchema.optional(), }); export const StepInvokePayloadSchema = z.object({ diff --git a/packages/world/src/ulid.ts b/packages/world/src/ulid.ts index 9f5bb1e53d..276b5b0c80 100644 --- a/packages/world/src/ulid.ts +++ b/packages/world/src/ulid.ts @@ -4,9 +4,25 @@ import { z } from 'zod'; const UlidSchema = z.string().ulid(); /** - * Default threshold for ULID timestamp validation (5 minutes in milliseconds). + * Default threshold for ULID timestamps in the past (24 hours). + * + * Set to 24 hours to support the resilient start path: when start() fails to + * create run_created, the queue carries the run input and the runtime creates + * the run on run_started. VQS supports delayed messages up to 24 hours. + */ +export const DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS = 24 * 60 * 60 * 1000; + +/** + * Default threshold for ULID timestamps in the future (5 minutes). + * + * Kept tight to prevent abuse from client-generated ULIDs with manipulated + * future timestamps while still tolerating minor clock skew. */ -export const DEFAULT_TIMESTAMP_THRESHOLD_MS = 5 * 60 * 1000; +export const DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS = 5 * 60 * 1000; + +/** @deprecated Use DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS instead */ +export const DEFAULT_TIMESTAMP_THRESHOLD_MS = + DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS; /** * Extracts a Date from a ULID string, or null if the string is not a valid ULID. @@ -21,18 +37,22 @@ export function ulidToDate(maybeUlid: string): Date | null { } /** - * Validates that a prefixed ULID's embedded timestamp is within an acceptable threshold - * of the current server time. This prevents client-generated ULIDs with manipulated timestamps. + * Validates that a prefixed ULID's embedded timestamp is within acceptable thresholds + * of the current server time. Uses asymmetric thresholds: 24h in the past (to support + * resilient start with queue delays) and 5min in the future (to prevent abuse while + * tolerating clock skew). * * @param prefixedUlid - The prefixed ULID to validate (e.g., "wrun_01ARYZ...") * @param prefix - The prefix to strip (e.g., "wrun_") - * @param thresholdMs - Maximum allowed drift in milliseconds (default: 5 minutes) + * @param pastThresholdMs - Maximum allowed age in the past (default: 24 hours) + * @param futureThresholdMs - Maximum allowed distance in the future (default: 5 minutes) * @returns null if valid, or an error message string if invalid */ export function validateUlidTimestamp( prefixedUlid: string, prefix: string, - thresholdMs: number = DEFAULT_TIMESTAMP_THRESHOLD_MS + pastThresholdMs: number = DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS, + futureThresholdMs: number = DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS ): string | null { const raw = prefixedUlid.startsWith(prefix) ? prefixedUlid.slice(prefix.length) @@ -44,13 +64,20 @@ export function validateUlidTimestamp( } const serverTimestamp = new Date(); - const driftMs = Math.abs(serverTimestamp.getTime() - ulidTimestamp.getTime()); + const diffMs = serverTimestamp.getTime() - ulidTimestamp.getTime(); - if (driftMs <= thresholdMs) { - return null; + // diffMs > 0 means the ULID is in the past; diffMs < 0 means it's in the future + if (diffMs > 0 && diffMs <= pastThresholdMs) { + return null; // Within past threshold + } + if (diffMs <= 0 && -diffMs <= futureThresholdMs) { + return null; // Within future threshold } + const driftMs = Math.abs(diffMs); const driftSeconds = Math.round(driftMs / 1000); + const direction = diffMs > 0 ? 'past' : 'future'; + const thresholdMs = diffMs > 0 ? pastThresholdMs : futureThresholdMs; const thresholdSeconds = Math.round(thresholdMs / 1000); - return `Invalid runId timestamp: embedded timestamp differs from server time by ${driftSeconds}s (threshold: ${thresholdSeconds}s)`; + return `Invalid runId timestamp: embedded timestamp is ${driftSeconds}s in the ${direction} (threshold: ${thresholdSeconds}s)`; }