Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/better-peas-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/world": patch
"@workflow/core": patch
"@workflow/world-local": patch
"@workflow/world-postgres": patch
---

Combine initial run fetch, event fetch, and run_started event creation
9 changes: 9 additions & 0 deletions .changeset/four-donuts-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@workflow/world-postgres": patch
"@workflow/world-vercel": patch
"@workflow/world-local": patch
"@workflow/world": patch
"@workflow/core": minor
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: AGENTS.md states: "All changes should be marked as 'patch'. Never use 'major' or 'minor' modes."

This should be:

"@workflow/core": patch

---

Allow workflow invocation to create run if initial storage call in `start` did not succeed. Send run input through queue to enable this. Allow creating run_created and run_started events together in World, and skip first event list call by returning events directly.
2 changes: 1 addition & 1 deletion docs/content/docs/changelog/meta.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"title": "Changelog",
"pages": ["index", "eager-processing"],
"pages": ["index", "eager-processing", "resilient-start"],
"defaultOpen": false
}
144 changes: 144 additions & 0 deletions docs/content/docs/changelog/resilient-start.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
---
title: Resilient run start
description: Overhaul run start logic to tolerate world storage unavailability, as long as the queue is healthy, and significantly speeds up run start.
---

# Resilient `start()`

## Motivation

When `world` storage is unavailable but the queue is up,
`start()` previously failed entirely because `world.events.create(run_created)`
is called before `world.queue()`. This change decouples run creation from queue
dispatch so that runs can still be accepted when storage is degraded.

## Design

### `start()` changes (packages/core)

- `world.events.create` (run_created) and `world.queue` are now called **in parallel**.
- If `events.create` errors with **429 or 5xx**, we log a warning saying that run
creation failed but the run was accepted — creation will be re-tried async by the
runtime when it processes the queue message.
- If `world.queue` fails, we still throw — the run truly failed and was not enqueued.
- The queue invocation now receives all the run inputs (`input`, `deploymentId`,
`workflowName`, `specVersion`, `executionContext`) so the runtime can create the
run later if needed.
- When the runtime re-enqueues itself, it does **not** pass these inputs — only the
first queue cycle carries them.

### `workflowEntrypoint` changes (packages/core)

- We no longer call `world.runs.get` or check the run status before starting.
- We **always** call `world.events.create` with `run_started`, now also passing the
run input that was sent through the queue. The response will be:
- **200 with event (now running)**: use the returned `Run` entity as the run. The
response also includes an `events` array of all events up to that point (typically
`run_created` and `run_started`), with data resolved. These are used to skip the
very first `world.events.list` call, reducing TTFB for the first invocation.
- **200 without event (already running)**: the run entity is returned directly
without creating a duplicate event. The runtime proceeds normally.
- **410 (already finished)**: log and exit as usual.

### World / workflow-server changes

- Posting `run_started` to a **non-existent** run is now allowed when the run input is
sent along with the payload. The server:
1. Creates a `run_created` event first (so the event log is consistent).
2. Strips the input from the `run_started` event data (it lives on `run_created`).
3. Then creates the `run_started` event normally.
4. Emits a log and a Datadog metric (`workflow_server.resilient_start.run_created_via_run_started`)
to track when this fallback path is hit.
- When `run_started` encounters an **already-running** run, all worlds return `{ run }`
with `event: undefined` instead of throwing. No duplicate event is created.
- When posting `run_started` and getting **200**, the response includes an `events`
property with all events up to that point (data always resolved).
- ULID timestamp validation now uses **asymmetric thresholds**: 24 hours in the past
(to support queue retry delays) and 5 minutes in the future (to prevent abuse while
tolerating clock skew).

## Decisions

1. **Parallel not sequential**: We chose `Promise.allSettled` over sequential calls to
minimize latency in the happy path. The trade-off is slightly more complex error
handling.

2. **Already-running returns run without event**: When `run_started` encounters an
already-running run, all worlds return `{ run }` with `event: undefined` (no
`events` array) instead of throwing. The runtime detects this by checking for
`result.event === undefined`. This avoids the extra `world.runs.get` round-trip.

3. **Events in 200 response**: We only return events on the 200 path (first caller).
On the already-running path, we fall back to the normal `events.list` call. This is
correct because only on 200 can we be certain we know the full event history.

4. **Asymmetric ULID thresholds**: VQS supports delayed messages up to 24 hours. We
allow 24h in the past so a run_created retry can succeed at maximum queue delay, but
keep the future threshold at 5 minutes to prevent abuse from manipulated timestamps.

## Implementation notes

### Error type mapping for terminal runs

Previously, calling `run_started` on a terminal run threw `InvalidOperationStateError`
(HTTP 409) on workflow-server, or `EntityConflictError` on world-local/world-postgres.
This was changed to `EntityGoneError` (HTTP 410) / `RunExpiredError` so the runtime
correctly distinguishes "already running" from "already finished" (exit immediately).

### run_started on already-running runs

All worlds (workflow-server, world-local, world-postgres) now return the existing run
entity directly — with `event: undefined` — when `run_started` is called on an
already-running run. This avoids both a duplicate event and the extra `world.runs.get`
call that the previous 409-based approach required. The `EventResultResolveWireSchema`
in world-vercel was updated to make `event` optional.

### world-local and world-postgres support

Both world-local (filesystem) and world-postgres (Drizzle/SQL) now implement the full
resilient start behavior:

- Creating runs from `run_started` when the run doesn't exist and eventData is provided
- Returning `{ run }` without event on already-running
- Throwing `RunExpiredError` on terminal runs
- Stripping eventData from stored `run_started` events
- Returning the `events` array on successful start

### Asymmetric ULID timestamp validation

Both `@workflow/world` (`validateUlidTimestamp`) and `workflow-server`
(`Ulid.isTimestampWithinThreshold`) now accept separate past and future thresholds:

- **Past**: 24 hours (`DEFAULT_TIMESTAMP_THRESHOLD_PAST_MS`)
- **Future**: 5 minutes (`DEFAULT_TIMESTAMP_THRESHOLD_FUTURE_MS`)

The old `DEFAULT_TIMESTAMP_THRESHOLD_MS` constant is deprecated but aliased to the
past threshold for backwards compatibility.

### Datadog metric

The resilient start fallback path emits a Datadog distribution metric:
`workflow_server.resilient_start.run_created_via_run_started`, tagged with
`workflow_name`. Query with `sum:workflow_server.resilient_start.run_created_via_run_started{*}`.

### Base64 encoding for queue transport

`Uint8Array` values (the serialized workflow input) don't survive JSON serialization
through the queue — they get corrupted to `{0: 72, 1: 101, ...}` objects. The `runInput`
Comment on lines +124 to +127
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a hack, trying to remove this right now by devaluing binary, but might not be the right approach. WDYT?

payload in the queue message now base64-encodes binary input in `start()` and the
runtime decodes it back to `Uint8Array` before passing it to `world.events.create`.
This was caught by the `spawnWorkflowFromStepWorkflow` e2e test where the child
workflow's input was being corrupted.

### RunStartedEventSchema eventData stripping

The run input is passed through to `run_started`'s `eventData` but stripped before
the event is persisted — the data belongs on the `run_created` event only. All worlds
strip eventData from stored `run_started` events.

## Follow-up work

- [ ] Add e2e tests covering the degraded-storage start path against a live deployment.
- [ ] Monitor the Datadog metric in production to understand how often the fallback is hit.
- [ ] Consider whether the `events` optimization in the 200 response should also apply
to re-enqueue cycles (currently only first invocation).
54 changes: 54 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { setTimeout as sleep } from 'node:timers/promises';
import {
WorkflowRunCancelledError,
WorkflowRunFailedError,
WorkflowWorldError,
} from '@workflow/errors';
import type { World } from '@workflow/world';
import {
afterAll,
assert,
Expand Down Expand Up @@ -2156,4 +2158,56 @@ describe('e2e', () => {
});
}
);

// ============================================================
// Resilient start: run completes even when run_created fails
// ============================================================
// TODO: Switch this to a stream-based workflow (e.g. readableStreamWorkflow)
// to also verify that serialization, flushing, and binary data work correctly
// over the queue boundary. Currently using addTenWorkflow to avoid the
// skipIf(isLocalDeployment()) barrier that stream tests require.
test(
'resilient start: addTenWorkflow completes when run_created returns 500',
{ timeout: 60_000 },
async () => {
// Get the real world and wrap it so the first events.create call
// (run_created) throws a 500 server error. The queue should still
// be dispatched with runInput, and the runtime should bootstrap
// the run via the run_started fallback path.
const realWorld = getWorld();
let createCallCount = 0;
const stubbedWorld: World = {
...realWorld,
events: {
...realWorld.events,
create: (async (...args: Parameters<World['events']['create']>) => {
createCallCount++;
if (createCallCount === 1) {
// Fail the very first call (run_created from start())
throw new WorkflowWorldError('Simulated storage outage', {
status: 500,
});
}
return realWorld.events.create(...args);
}) as World['events']['create'],
},
};

const run = await start(await e2e('addTenWorkflow'), [123], {
world: stubbedWorld,
});

// Verify the stub intercepted the run_created call (only call
// through the stubbed world — the server-side runtime uses its
// own world instance for run_started and subsequent events).
expect(createCallCount).toBe(1);

// The run should still complete despite run_created failing.
// The runtime's resilient start path creates the run from
// run_started, so returnValue polling may initially get
// WorkflowRunNotFoundError before the queue delivers.
const returnValue = await run.returnValue;
expect(returnValue).toBe(133);
}
);
});
94 changes: 65 additions & 29 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ import {
RunExpiredError,
WorkflowRuntimeError,
} from '@workflow/errors';
import { classifyRunError } from './classify-error.js';
import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js';
import { parseWorkflowName } from '@workflow/utils/parse-name';
import {
type Event,
SPEC_VERSION_CURRENT,
WorkflowInvokePayloadSchema,
type WorkflowRun,
} from '@workflow/world';
import { classifyRunError } from './classify-error.js';
import { importKey } from './encryption.js';
import { WorkflowSuspension } from './global.js';
import { runtimeLogger } from './logger.js';
import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js';
import {
getAllWorkflowRunEvents,
getQueueOverhead,
Expand Down Expand Up @@ -105,6 +105,7 @@ export function workflowEntrypoint(
runId,
traceCarrier: traceContext,
requestedAt,
runInput,
} = WorkflowInvokePayloadSchema.parse(message_);
const { requestId } = metadata;
// Extract the workflow name from the topic name
Expand Down Expand Up @@ -191,50 +192,74 @@ export function workflowEntrypoint(
});

let workflowStartedAt = -1;
let workflowRun = await world.runs.get(runId);
let workflowRun: WorkflowRun | undefined;
// Pre-loaded events from run_started response (first caller optimization)
let preloadedEvents: Event[] | undefined;

// --- Infrastructure: prepare the run state ---
// Always call run_started directly — this both transitions
// the run to 'running' AND returns the run entity, saving
// a separate runs.get round-trip. When runInput is present
// (resilient start), pass it so the server can create the
// run if run_created was missed.
// Network/server errors propagate to the queue handler for retry.
// WorkflowRuntimeError (data integrity issues) are fatal and
// produce run_failed since retrying won't fix them.
try {
if (workflowRun.status === 'pending') {
// Transition run to 'running' via event (event-sourced architecture)
const result = await world.events.create(
runId,
{
eventType: 'run_started',
specVersion: SPEC_VERSION_CURRENT,
},
{ requestId }
const result = await world.events.create(
runId,
{
eventType: 'run_started',
specVersion: SPEC_VERSION_CURRENT,
// Pass run input from queue so server can create
// the run if run_created was missed.
// Input is base64-encoded for queue transport since
// Uint8Array doesn't survive JSON serialization.
...(runInput
? {
eventData: {
input:
typeof runInput.input === 'string'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: This decodes ANY string as base64 binary. If runInput.input is a plain string value (not base64-encoded binary), atob() will either throw or produce garbage.

The encode side in start.ts only base64-encodes when workflowArguments instanceof Uint8Array, but this decode side has no way to distinguish "base64-encoded Uint8Array" from "a string that was always a string." The typeof === 'string' check is not a reliable discriminant.

See my comment on start.ts for the recommended fix (binary transport or discriminant field).

? Uint8Array.from(atob(runInput.input), (c) =>
c.charCodeAt(0)
)
: runInput.input,
deploymentId: runInput.deploymentId,
workflowName: runInput.workflowName,
executionContext: runInput.executionContext,
},
}
: {}),
},
{ requestId }
);
if (!result.run) {
throw new WorkflowRuntimeError(
`Event creation for 'run_started' did not return the run entity for run "${runId}"`
);
// Use the run entity from the event response (no extra get call needed)
if (!result.run) {
throw new WorkflowRuntimeError(
`Event creation for 'run_started' did not return the run entity for run "${runId}"`
);
}
workflowRun = result.run;
}
workflowRun = result.run;

// If the response includes events, use them to skip
// the initial events.list call and reduce TTFB.
if (result.events && result.events.length > 0) {
preloadedEvents = result.events;
}

// At this point, the workflow is "running" and `startedAt` should
// definitely be set.
if (!workflowRun.startedAt) {
throw new WorkflowRuntimeError(
`Workflow run "${runId}" has no "startedAt" timestamp`
);
}
} catch (err) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking (behavior change): The old code caught both EntityConflictError and RunExpiredError here. The new code only catches RunExpiredError. This means if events.create('run_started') throws EntityConflictError (e.g., duplicate eventId from a concurrent request), it will now propagate to the queue handler and cause a retry — previously it was silently consumed.

Is this intentional? The design doc says already-running returns { run } without throwing, but EntityConflictError can come from other sources (e.g., DB unique constraint on the event ID). If intentional, add a comment explaining why EntityConflictError is no longer expected here. If not, it should be re-added.

// Run was concurrently completed/failed/cancelled
// between the GET and the run_started event creation
if (EntityConflictError.is(err) || RunExpiredError.is(err)) {
if (RunExpiredError.is(err)) {
// 410: already finished — log and exit
runtimeLogger.info(
'Run already finished during setup, skipping',
{ workflowRunId: runId, message: err.message }
);
return;
}
if (err instanceof WorkflowRuntimeError) {
} else if (err instanceof WorkflowRuntimeError) {
runtimeLogger.error(
'Fatal runtime error during workflow setup',
{ workflowRunId: runId, error: err.message }
Expand Down Expand Up @@ -265,8 +290,15 @@ export function workflowEntrypoint(
throw failErr;
}
return;
} else {
throw err;
}
throw err;
}

if (!workflowRun.startedAt) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: This workflowRun.startedAt check is duplicated — it already exists at line 230 inside the try block. Since workflowRun doesn't change between the two checks, this second one is unreachable (the first would have already thrown WorkflowRuntimeError, caught by the else if (err instanceof WorkflowRuntimeError) branch). You can remove this one.

throw new WorkflowRuntimeError(
`Workflow run "${runId}" has no "startedAt" timestamp`
);
}
workflowStartedAt = +workflowRun.startedAt;

Expand Down Expand Up @@ -294,8 +326,12 @@ export function workflowEntrypoint(
return;
}

// Load all events into memory before running
const events = await getAllWorkflowRunEvents(workflowRun.runId);
// Load all events into memory before running.
// If we got events from the run_started response,
// skip the events.list round-trip to reduce TTFB.
const events =
preloadedEvents ??
(await getAllWorkflowRunEvents(workflowRun.runId));

// Check for any elapsed waits and create wait_completed events
const now = Date.now();
Expand Down
Loading
Loading