From 582126188177a70ce6f0708458552c7f29c1f979 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 26 Mar 2026 16:03:03 -0700 Subject: [PATCH] Fix false-positive unconsumed event in for-await hook loops with steps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During replay of a `for await (const payload of hook) { await step() }` pattern, the EventsConsumer could advance to a step_created event before the workflow code registered the step consumer. The existing deferred unconsumed check chained onto the promise queue once and waited 100ms, but this missed a second round of async work (hook payload deserialization) triggered by the first drain's resolve(). Fix: after the initial queue drain, yield to the event loop (setTimeout(0)) so microtask chains propagate (e.g., step resolve → for-await resumes → createHookPromise → new deserialization), then re-chain onto the latest queue before starting the 100ms timeout. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../fix-unconsumed-event-hook-step-loop.md | 5 + packages/core/src/events-consumer.ts | 44 +++-- .../core/src/hook-sleep-interaction.test.ts | 150 ++++++++++++++++++ 3 files changed, 182 insertions(+), 17 deletions(-) create mode 100644 .changeset/fix-unconsumed-event-hook-step-loop.md diff --git a/.changeset/fix-unconsumed-event-hook-step-loop.md b/.changeset/fix-unconsumed-event-hook-step-loop.md new file mode 100644 index 0000000000..7015e4ac24 --- /dev/null +++ b/.changeset/fix-unconsumed-event-hook-step-loop.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Fix false-positive unconsumed event error in `for await` hook loops with step calls diff --git a/packages/core/src/events-consumer.ts b/packages/core/src/events-consumer.ts index 1786cf21df..78401cfe75 100644 --- a/packages/core/src/events-consumer.ts +++ b/packages/core/src/events-consumer.ts @@ -114,23 +114,33 @@ export class EventsConsumer { // is still unconsumed after the queue drains, it's truly orphaned. if (currentEvent !== null) { const checkVersion = ++this.unconsumedCheckVersion; - this.pendingUnconsumedCheck = this.getPromiseQueue().then(() => { - // Use a delayed setTimeout after the queue drains. The delay must be - // long enough for promise chains to propagate across the VM boundary - // (from resolve() in the host context through to the workflow code - // calling subscribe() in the VM context). Node.js does not guarantee - // that setTimeout(0) fires after all cross-context microtasks settle, - // so we use a small but non-zero delay. Any subscribe() call that - // arrives during this window will cancel the check via version - // invalidation + clearTimeout. - this.pendingUnconsumedTimeout = setTimeout(() => { - this.pendingUnconsumedTimeout = null; - if (this.unconsumedCheckVersion === checkVersion) { - this.pendingUnconsumedCheck = null; - this.onUnconsumedEvent(currentEvent); - } - }, 100); - }); + this.pendingUnconsumedCheck = this.getPromiseQueue() + .then( + // Yield to the event loop after the first queue drain. This allows + // microtask chains triggered by the preceding resolve() (e.g., a + // step result delivery that resumes a for-await loop, which then + // calls createHookPromise and appends a second round of async work + // to the promise queue) to propagate before we re-check the queue. + () => new Promise((resolve) => setTimeout(resolve, 0)) + ) + .then(() => this.getPromiseQueue()) + .then(() => { + // Use a delayed setTimeout after the queue drains. The delay must be + // long enough for promise chains to propagate across the VM boundary + // (from resolve() in the host context through to the workflow code + // calling subscribe() in the VM context). Node.js does not guarantee + // that setTimeout(0) fires after all cross-context microtasks settle, + // so we use a small but non-zero delay. Any subscribe() call that + // arrives during this window will cancel the check via version + // invalidation + clearTimeout. + this.pendingUnconsumedTimeout = setTimeout(() => { + this.pendingUnconsumedTimeout = null; + if (this.unconsumedCheckVersion === checkVersion) { + this.pendingUnconsumedCheck = null; + this.onUnconsumedEvent(currentEvent); + } + }, 100); + }); } }; } diff --git a/packages/core/src/hook-sleep-interaction.test.ts b/packages/core/src/hook-sleep-interaction.test.ts index a706628b81..21ed2f2489 100644 --- a/packages/core/src/hook-sleep-interaction.test.ts +++ b/packages/core/src/hook-sleep-interaction.test.ts @@ -599,6 +599,156 @@ function defineTests(mode: 'sync' | 'async') { }); }); + describe(`hook + sleep with step per payload ${label}`, () => { + it('should not trigger unconsumed event error when for-await loop calls a step per hook payload', async () => { + // Reproduces CI failure: hookWithSleepWorkflow event log had alternating + // hook_received + step lifecycle events. During replay, the EventsConsumer + // advances past the second step_created before the for-await loop has + // called processPayload (and registered the step consumer). The deferred + // unconsumed check must wait for the new async work (hook payload + // deserialization) before declaring the event orphaned. + await setupHydrateMock(); + const ops: Promise[] = []; + const [payload1, payload2, stepResult1, stepResult2] = await Promise.all([ + dehydrateStepReturnValue( + { type: 'subscribe', id: 1 }, + 'wrun_test', + undefined, + ops + ), + dehydrateStepReturnValue( + { type: 'done', done: true }, + 'wrun_test', + undefined, + ops + ), + dehydrateStepReturnValue( + { processed: true, type: 'subscribe', id: 1 }, + 'wrun_test', + undefined, + ops + ), + dehydrateStepReturnValue( + { processed: true, type: 'done' }, + 'wrun_test', + undefined, + ops + ), + ]); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'hook_created', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { token: 'test-token', isWebhook: false }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'wait_created', + correlationId: `wait_${CORR_IDS[1]}`, + eventData: { resumeAt: new Date('2099-01-01') }, + createdAt: new Date(), + }, + // First hook payload → step lifecycle + { + eventId: 'evnt_2', + runId: 'wrun_test', + eventType: 'hook_received', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { payload: payload1 }, + createdAt: new Date(), + }, + { + eventId: 'evnt_3', + runId: 'wrun_test', + eventType: 'step_created', + correlationId: `step_${CORR_IDS[2]}`, + eventData: { stepName: 'processPayload', input: payload1 }, + createdAt: new Date(), + }, + { + eventId: 'evnt_4', + runId: 'wrun_test', + eventType: 'step_started', + correlationId: `step_${CORR_IDS[2]}`, + eventData: {}, + createdAt: new Date(), + }, + { + eventId: 'evnt_5', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: `step_${CORR_IDS[2]}`, + eventData: { result: stepResult1 }, + createdAt: new Date(), + }, + // Second hook payload → step lifecycle + { + eventId: 'evnt_6', + runId: 'wrun_test', + eventType: 'hook_received', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { payload: payload2 }, + createdAt: new Date(), + }, + { + eventId: 'evnt_7', + runId: 'wrun_test', + eventType: 'step_created', + correlationId: `step_${CORR_IDS[3]}`, + eventData: { stepName: 'processPayload', input: payload2 }, + createdAt: new Date(), + }, + { + eventId: 'evnt_8', + runId: 'wrun_test', + eventType: 'step_started', + correlationId: `step_${CORR_IDS[3]}`, + eventData: {}, + createdAt: new Date(), + }, + { + eventId: 'evnt_9', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: `step_${CORR_IDS[3]}`, + eventData: { result: stepResult2 }, + createdAt: new Date(), + }, + ]); + + const createHook = createCreateHook(ctx); + const sleep = createSleep(ctx); + const useStep = createUseStep(ctx); + + const { result, error } = await runWithDiscontinuation(ctx, async () => { + const hook = createHook(); + void sleep('1d'); + + const processPayload = useStep<[any], any>('processPayload'); + const results: any[] = []; + + for await (const payload of hook) { + const processed = await processPayload(payload); + results.push(processed); + if ((payload as any).done) break; + } + + return results; + }); + + expect(error).toBeUndefined(); + expect(result).toEqual([ + { processed: true, type: 'subscribe', id: 1 }, + { processed: true, type: 'done' }, + ]); + }); + }); + describe(`hook only (no concurrent pending entity) ${label}`, () => { it('should deliver all hook payloads and reach step when no sleep or incomplete step exists', async () => { await setupHydrateMock();