diff --git a/.changeset/better-peas-buy.md b/.changeset/better-peas-buy.md new file mode 100644 index 0000000000..2f499761b5 --- /dev/null +++ b/.changeset/better-peas-buy.md @@ -0,0 +1,8 @@ +--- +"@workflow/world": patch +"@workflow/core": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +--- + +Combine initial run fetch, event fetch, and run_started event creation diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index da7a407bd3..4e2db032d0 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -191,34 +191,40 @@ export function workflowEntrypoint( }); let workflowStartedAt = -1; - let workflowRun = await world.runs.get(runId); + let workflowRun: WorkflowRun | undefined; + // Pre-loaded events from the run_started response. + // When present, we skip the events.list call to reduce TTFB. + let preloadedEvents: Event[] | undefined; // --- Infrastructure: prepare the run state --- + // Always call run_started directly — this both transitions + // the run to 'running' AND returns the run entity, saving + // a separate runs.get round-trip. // Network/server errors propagate to the queue handler for retry. // WorkflowRuntimeError (data integrity issues) are fatal and // produce run_failed since retrying won't fix them. try { - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create( - runId, - { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }, - { requestId } + const result = await world.events.create( + runId, + { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }, + { requestId } + ); + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` ); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; + } + workflowRun = result.run; + + // If the world returned events, use them to skip + // the initial events.list call and reduce TTFB. + if (result.events && result.events.length > 0) { + preloadedEvents = result.events; } - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. if (!workflowRun.startedAt) { throw new WorkflowRuntimeError( `Workflow run "${runId}" has no "startedAt" timestamp` @@ -226,7 +232,6 @@ export function workflowEntrypoint( } } catch (err) { // Run was concurrently completed/failed/cancelled - // between the GET and the run_started event creation if (EntityConflictError.is(err) || RunExpiredError.is(err)) { runtimeLogger.info( 'Run already finished during setup, skipping', @@ -294,8 +299,12 @@ export function workflowEntrypoint( return; } - // Load all events into memory before running - const events = await getAllWorkflowRunEvents(workflowRun.runId); + // Load all events into memory before running. + // If we got pre-loaded events from the run_started response, + // skip the events.list round-trip to reduce TTFB. + const events = + preloadedEvents ?? + (await getAllWorkflowRunEvents(workflowRun.runId)); // Check for any elapsed waits and create wait_completed events const now = Date.now(); diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index efa031afcf..b6aa61b56c 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -190,7 +190,15 @@ export function createEventsStorage( }; } - // Run state transitions are not allowed on terminal runs + // For run_started on terminal runs, use RunExpiredError so the + // runtime knows to exit without retrying. + if (data.eventType === 'run_started') { + throw new RunExpiredError( + `Workflow run "${effectiveRunId}" is already in terminal state "${currentRun.status}"` + ); + } + + // Other run state transitions are not allowed on terminal runs if ( runTerminalEvents.includes(data.eventType) || data.eventType === 'run_cancelled' @@ -280,6 +288,10 @@ export function createEventsStorage( createdAt: now, specVersion: effectiveSpecVersion, }; + // Strip eventData from run_started — it belongs on run_created only. + if (data.eventType === 'run_started' && 'eventData' in event) { + delete (event as any).eventData; + } // Track entity created/updated for EventResult let run: WorkflowRun | undefined; @@ -316,6 +328,12 @@ export function createEventsStorage( } else if (data.eventType === 'run_started') { // Reuse currentRun from validation (already read above) if (currentRun) { + // If already running, return the run directly without + // creating a duplicate event. + if (currentRun.status === 'running') { + return { run: currentRun }; + } + run = { runId: currentRun.runId, deploymentId: currentRun.deploymentId, @@ -832,6 +850,21 @@ export function createEventsStorage( const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; const filteredEvent = stripEventDataRefs(event, resolveData); + // For run_started: include all events so the runtime can skip + // the initial events.list call and reduce TTFB. + let events: Event[] | undefined; + if (data.eventType === 'run_started' && run) { + const allEvents = await paginatedFileSystemQuery({ + directory: path.join(basedir, 'events'), + schema: EventSchema, + filePrefix: `${effectiveRunId}-`, + sortOrder: 'asc', + getCreatedAt: getObjectCreatedAt('evnt'), + getId: (e) => e.eventId, + }); + events = allEvents.data; + } + // Return EventResult with event and any created/updated entity return { event: filteredEvent, @@ -839,6 +872,7 @@ export function createEventsStorage( step, hook, wait, + events, }; }, diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index d65dfa5ab9..1d1e8f96b3 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -444,7 +444,15 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { }; } - // Run state transitions are not allowed on terminal runs + // For run_started on terminal runs, use RunExpiredError so the + // runtime knows to exit without retrying. + if (data.eventType === 'run_started') { + throw new RunExpiredError( + `Workflow run "${effectiveRunId}" is already in terminal state "${currentRun.status}"` + ); + } + + // Other run state transitions are not allowed on terminal runs if ( runTerminalEvents.includes(data.eventType) || data.eventType === 'run_cancelled' @@ -563,6 +571,18 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { // Handle run_started event: update run status if (data.eventType === 'run_started') { + // If already running, return the run directly without event + if (currentRun?.status === 'running') { + const [fullRun] = await drizzle + .select() + .from(Schema.runs) + .where(eq(Schema.runs.runId, effectiveRunId)) + .limit(1); + if (fullRun) { + return { run: deserializeRunError(compact(fullRun)) }; + } + } + const [runValue] = await drizzle .update(Schema.runs) .set({ @@ -1135,6 +1155,14 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { } } + // Strip eventData from run_started — it belongs on run_created only. + const storedEventData = + data.eventType === 'run_started' + ? undefined + : 'eventData' in data + ? data.eventData + : undefined; + const [value] = await drizzle .insert(events) .values({ @@ -1142,22 +1170,47 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { eventId, correlationId: data.correlationId, eventType: data.eventType, - eventData: 'eventData' in data ? data.eventData : undefined, + eventData: storedEventData, specVersion: effectiveSpecVersion, }) .returning({ createdAt: events.createdAt }); if (!value) { throw new EntityConflictError(`Event ${eventId} could not be created`); } - const result = { ...data, ...value, runId: effectiveRunId, eventId }; + const result = { + ...data, + ...value, + runId: effectiveRunId, + eventId, + ...(storedEventData !== undefined + ? { eventData: storedEventData } + : {}), + }; + if (data.eventType === 'run_started') { + delete (result as any).eventData; + } const parsed = EventSchema.parse(result); const resolveData = params?.resolveData ?? 'all'; + + // For run_started: include all events so the runtime can skip + // the initial events.list call and reduce TTFB. + let allEvents: Event[] | undefined; + if (data.eventType === 'run_started' && run) { + const eventRows = await drizzle + .select() + .from(Schema.events) + .where(eq(Schema.events.runId, effectiveRunId)) + .orderBy(Schema.events.eventId); + allEvents = eventRows.map((e) => EventSchema.parse(compact(e))); + } + return { event: stripEventDataRefs(parsed, resolveData), run, step, hook, wait, + events: allEvents, }; }, async get( diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index 2965906f7b..74e862215a 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -369,6 +369,12 @@ export interface EventResult { hook?: import('./hooks.js').Hook; /** The wait entity (for wait_created/wait_completed events) */ wait?: import('./waits.js').Wait; + /** + * All events up to this point, with data resolved. When populated + * on a run_started response, the runtime uses these to skip the + * initial events.list call and reduce TTFB. + */ + events?: Event[]; } export interface GetEventParams {