Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/mesh/src/event-bus/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ export class EventBusWorker {
console.log(
`[EventBus] Cron expression for event ${event.id} has no more runs`,
);
// Cron is exhausted — mark the event as delivered (terminal state)
await this.storage.markEventCompleted(event.id);
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: markEventCompleted unconditionally sets the event to delivered, which can overwrite a failed status when the final cron run fails. This hides delivery failures for the last cron cycle.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/event-bus/worker.ts, line 410:

<comment>`markEventCompleted` unconditionally sets the event to `delivered`, which can overwrite a `failed` status when the final cron run fails. This hides delivery failures for the last cron cycle.</comment>

<file context>
@@ -406,6 +406,8 @@ export class EventBusWorker {
           `[EventBus] Cron expression for event ${event.id} has no more runs`,
         );
+        // Cron is exhausted — mark the event as delivered (terminal state)
+        await this.storage.markEventCompleted(event.id);
         return;
       }
</file context>
Fix with Cubic

return;
}

Expand Down
32 changes: 32 additions & 0 deletions apps/mesh/src/storage/event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,14 @@ export interface EventBusStorage {
connectionId: string,
): Promise<{ success: boolean }>;

/**
* Mark a cron event as completed (delivered) when there are no more runs.
* Called by the worker when a cron expression produces no next run time.
*
* @param eventId - The event ID to mark as completed
*/
markEventCompleted(eventId: string): Promise<void>;

/**
* Sync subscriptions to a desired state.
* Creates new subscriptions, deletes removed ones, and updates filters.
Expand Down Expand Up @@ -784,6 +792,19 @@ class KyselyEventBusStorage implements EventBusStorage {
);

if (allDelivered) {
// For cron events, don't mark as "delivered" — the event represents an
// ongoing schedule. The worker's scheduleNextCronDelivery() owns the
// lifecycle and will mark it delivered when the cron has no more runs.
const event = await this.db
.selectFrom("events")
.select(["cron"])
.where("id", "=", eventId)
.executeTakeFirst();

if (event?.cron) {
return; // Cron events stay "pending" between delivery cycles
}

await this.db
.updateTable("events")
.set({
Expand Down Expand Up @@ -1002,6 +1023,17 @@ class KyselyEventBusStorage implements EventBusStorage {
return { success: updated };
}

async markEventCompleted(eventId: string): Promise<void> {
await this.db
.updateTable("events")
.set({
status: "delivered",
updated_at: new Date().toISOString(),
})
.where("id", "=", eventId)
.execute();
}

async syncSubscriptions(
input: SyncSubscriptionsInput,
): Promise<SyncSubscriptionsResult> {
Expand Down