Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-unconsumed-event-hook-step-loop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Fix false-positive unconsumed event error in `for await` hook loops with step calls
44 changes: 27 additions & 17 deletions packages/core/src/events-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,33 @@ export class EventsConsumer {
// is still unconsumed after the queue drains, it's truly orphaned.
if (currentEvent !== null) {
const checkVersion = ++this.unconsumedCheckVersion;
this.pendingUnconsumedCheck = this.getPromiseQueue().then(() => {
// Use a delayed setTimeout after the queue drains. The delay must be
// long enough for promise chains to propagate across the VM boundary
// (from resolve() in the host context through to the workflow code
// calling subscribe() in the VM context). Node.js does not guarantee
// that setTimeout(0) fires after all cross-context microtasks settle,
// so we use a small but non-zero delay. Any subscribe() call that
// arrives during this window will cancel the check via version
// invalidation + clearTimeout.
this.pendingUnconsumedTimeout = setTimeout(() => {
this.pendingUnconsumedTimeout = null;
if (this.unconsumedCheckVersion === checkVersion) {
this.pendingUnconsumedCheck = null;
this.onUnconsumedEvent(currentEvent);
}
}, 100);
});
this.pendingUnconsumedCheck = this.getPromiseQueue()
.then(
// Yield to the event loop after the first queue drain. This allows
// microtask chains triggered by the preceding resolve() (e.g., a
// step result delivery that resumes a for-await loop, which then
// calls createHookPromise and appends a second round of async work
// to the promise queue) to propagate before we re-check the queue.
() => new Promise<void>((resolve) => setTimeout(resolve, 0))
)
.then(() => this.getPromiseQueue())
.then(() => {
Comment on lines +118 to +127
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deferred check promise chain always schedules the extra timer(s) even if a later subscribe() has already invalidated checkVersion. This can leave behind unnecessary timers (including the new setTimeout(0) yield) under high churn, keeping the event loop alive and doing extra work even though the check can never fire. Consider short‑circuiting before scheduling the setTimeout (and/or before the second getPromiseQueue() call) when this.unconsumedCheckVersion !== checkVersion so cancellation is cheap and no timers are created for stale checks.

Suggested change
.then(
// Yield to the event loop after the first queue drain. This allows
// microtask chains triggered by the preceding resolve() (e.g., a
// step result delivery that resumes a for-await loop, which then
// calls createHookPromise and appends a second round of async work
// to the promise queue) to propagate before we re-check the queue.
() => new Promise<void>((resolve) => setTimeout(resolve, 0))
)
.then(() => this.getPromiseQueue())
.then(() => {
.then(() => {
// If a newer subscribe() has already invalidated this check, bail out
// before scheduling the extra yield timer.
if (this.unconsumedCheckVersion !== checkVersion) {
return;
}
// Yield to the event loop after the first queue drain. This allows
// microtask chains triggered by the preceding resolve() (e.g., a
// step result delivery that resumes a for-await loop, which then
// calls createHookPromise and appends a second round of async work
// to the promise queue) to propagate before we re-check the queue.
return new Promise<void>((resolve) => setTimeout(resolve, 0));
})
.then(() => {
// Short-circuit stale checks before performing a second queue drain.
if (this.unconsumedCheckVersion !== checkVersion) {
return;
}
return this.getPromiseQueue();
})
.then(() => {
// If a later subscribe() has invalidated this check by the time both
// queue drains (and the yield) have completed, do not schedule the
// final timeout at all.
if (this.unconsumedCheckVersion !== checkVersion) {
return;
}

Copilot uses AI. Check for mistakes.
// Use a delayed setTimeout after the queue drains. The delay must be
// long enough for promise chains to propagate across the VM boundary
// (from resolve() in the host context through to the workflow code
// calling subscribe() in the VM context). Node.js does not guarantee
// that setTimeout(0) fires after all cross-context microtasks settle,
// so we use a small but non-zero delay. Any subscribe() call that
// arrives during this window will cancel the check via version
// invalidation + clearTimeout.
this.pendingUnconsumedTimeout = setTimeout(() => {
this.pendingUnconsumedTimeout = null;
if (this.unconsumedCheckVersion === checkVersion) {
this.pendingUnconsumedCheck = null;
this.onUnconsumedEvent(currentEvent);
}
}, 100);
});
}
};
}
150 changes: 150 additions & 0 deletions packages/core/src/hook-sleep-interaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,156 @@ function defineTests(mode: 'sync' | 'async') {
});
});

describe(`hook + sleep with step per payload ${label}`, () => {
it('should not trigger unconsumed event error when for-await loop calls a step per hook payload', async () => {
// Reproduces CI failure: hookWithSleepWorkflow event log had alternating
// hook_received + step lifecycle events. During replay, the EventsConsumer
// advances past the second step_created before the for-await loop has
// called processPayload (and registered the step consumer). The deferred
// unconsumed check must wait for the new async work (hook payload
// deserialization) before declaring the event orphaned.
await setupHydrateMock();
const ops: Promise<any>[] = [];
const [payload1, payload2, stepResult1, stepResult2] = await Promise.all([
dehydrateStepReturnValue(
{ type: 'subscribe', id: 1 },
'wrun_test',
undefined,
ops
),
dehydrateStepReturnValue(
{ type: 'done', done: true },
'wrun_test',
undefined,
ops
),
dehydrateStepReturnValue(
{ processed: true, type: 'subscribe', id: 1 },
'wrun_test',
undefined,
ops
),
dehydrateStepReturnValue(
{ processed: true, type: 'done' },
'wrun_test',
undefined,
ops
),
]);

const ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_test',
eventType: 'hook_created',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: { token: 'test-token', isWebhook: false },
createdAt: new Date(),
Comment on lines +639 to +646
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is intended to ensure an "unconsumed event" error does not occur, but setupWorkflowContext() configures EventsConsumer with onUnconsumedEvent: () => {}. That means the regression would manifest as a hang until the test times out (and it won’t surface the same error the real runtime throws). To make the regression signal deterministic and match production, consider letting the test pass an onUnconsumedEvent handler that rejects via ctx.onWorkflowError (or throws) so a failure is immediate and clearly attributed to the unconsumed-event path.

Copilot uses AI. Check for mistakes.
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'wait_created',
correlationId: `wait_${CORR_IDS[1]}`,
eventData: { resumeAt: new Date('2099-01-01') },
createdAt: new Date(),
},
// First hook payload → step lifecycle
{
eventId: 'evnt_2',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: { payload: payload1 },
createdAt: new Date(),
},
{
eventId: 'evnt_3',
runId: 'wrun_test',
eventType: 'step_created',
correlationId: `step_${CORR_IDS[2]}`,
eventData: { stepName: 'processPayload', input: payload1 },
createdAt: new Date(),
},
{
eventId: 'evnt_4',
runId: 'wrun_test',
eventType: 'step_started',
correlationId: `step_${CORR_IDS[2]}`,
eventData: {},
createdAt: new Date(),
},
{
eventId: 'evnt_5',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: `step_${CORR_IDS[2]}`,
eventData: { result: stepResult1 },
createdAt: new Date(),
},
// Second hook payload → step lifecycle
{
eventId: 'evnt_6',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: { payload: payload2 },
createdAt: new Date(),
},
{
eventId: 'evnt_7',
runId: 'wrun_test',
eventType: 'step_created',
correlationId: `step_${CORR_IDS[3]}`,
eventData: { stepName: 'processPayload', input: payload2 },
createdAt: new Date(),
},
{
eventId: 'evnt_8',
runId: 'wrun_test',
eventType: 'step_started',
correlationId: `step_${CORR_IDS[3]}`,
eventData: {},
createdAt: new Date(),
},
{
eventId: 'evnt_9',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: `step_${CORR_IDS[3]}`,
eventData: { result: stepResult2 },
createdAt: new Date(),
},
]);

const createHook = createCreateHook(ctx);
const sleep = createSleep(ctx);
const useStep = createUseStep(ctx);

const { result, error } = await runWithDiscontinuation(ctx, async () => {
const hook = createHook();
void sleep('1d');

const processPayload = useStep<[any], any>('processPayload');
const results: any[] = [];

for await (const payload of hook) {
const processed = await processPayload(payload);
results.push(processed);
if ((payload as any).done) break;
}

return results;
});

expect(error).toBeUndefined();
expect(result).toEqual([
{ processed: true, type: 'subscribe', id: 1 },
{ processed: true, type: 'done' },
]);
});
});

describe(`hook only (no concurrent pending entity) ${label}`, () => {
it('should deliver all hook payloads and reach step when no sleep or incomplete step exists', async () => {
await setupHydrateMock();
Expand Down
Loading