diff --git a/apps/mesh/src/event-bus/worker.ts b/apps/mesh/src/event-bus/worker.ts index 02e928df20..df04806b22 100644 --- a/apps/mesh/src/event-bus/worker.ts +++ b/apps/mesh/src/event-bus/worker.ts @@ -406,6 +406,8 @@ export class EventBusWorker { console.log( `[EventBus] Cron expression for event ${event.id} has no more runs`, ); + // Cron is exhausted — mark the event as delivered (terminal state) + await this.storage.markEventCompleted(event.id); return; } diff --git a/apps/mesh/src/storage/event-bus.ts b/apps/mesh/src/storage/event-bus.ts index 9c2dd324f7..791eeb060a 100644 --- a/apps/mesh/src/storage/event-bus.ts +++ b/apps/mesh/src/storage/event-bus.ts @@ -265,6 +265,14 @@ export interface EventBusStorage { connectionId: string, ): Promise<{ success: boolean }>; + /** + * Mark a cron event as completed (delivered) when there are no more runs. + * Called by the worker when a cron expression produces no next run time. + * + * @param eventId - The event ID to mark as completed + */ + markEventCompleted(eventId: string): Promise; + /** * Sync subscriptions to a desired state. * Creates new subscriptions, deletes removed ones, and updates filters. @@ -784,6 +792,19 @@ class KyselyEventBusStorage implements EventBusStorage { ); if (allDelivered) { + // For cron events, don't mark as "delivered" — the event represents an + // ongoing schedule. The worker's scheduleNextCronDelivery() owns the + // lifecycle and will mark it delivered when the cron has no more runs. + const event = await this.db + .selectFrom("events") + .select(["cron"]) + .where("id", "=", eventId) + .executeTakeFirst(); + + if (event?.cron) { + return; // Cron events stay "pending" between delivery cycles + } + await this.db .updateTable("events") .set({ @@ -1002,6 +1023,17 @@ class KyselyEventBusStorage implements EventBusStorage { return { success: updated }; } + async markEventCompleted(eventId: string): Promise { + await this.db + .updateTable("events") + .set({ + status: "delivered", + updated_at: new Date().toISOString(), + }) + .where("id", "=", eventId) + .execute(); + } + async syncSubscriptions( input: SyncSubscriptionsInput, ): Promise {