From d4d98ae8768b116bf4b24afc72e2a2914b89a7fd Mon Sep 17 00:00:00 2001 From: jbiskur Date: Tue, 31 Mar 2026 12:28:00 +0100 Subject: [PATCH] fix: restart processLoop and leader pump on error with exponential backoff Individual flow type pumps that encounter errors die silently and are never restarted, causing event processing to stall permanently. This adds automatic restart with exponential backoff (1s-30s) to: - processLoop: restarts on unhandled rejection, resets backoff on successful ack - leader pump in cluster mode: restarts on start() failure, resets on successful distribution Co-Authored-By: Claude Opus 4.6 (1M context) --- src/data-pump/data-pump-cluster.ts | 13 ++++ src/data-pump/data-pump.ts | 22 +++++- test/tests/data-pump-restart.test.ts | 107 +++++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 4 deletions(-) create mode 100644 test/tests/data-pump-restart.test.ts diff --git a/src/data-pump/data-pump-cluster.ts b/src/data-pump/data-pump-cluster.ts index 58ae6fe..448b7c2 100644 --- a/src/data-pump/data-pump-cluster.ts +++ b/src/data-pump/data-pump-cluster.ts @@ -65,6 +65,8 @@ export class FlowcoreDataPumpCluster { private workerHandler?: (events: FlowcoreEvent[]) => Promise private workerFailedHandler?: (events: FlowcoreEvent[]) => void | Promise + private pumpRestartAttempts = 0 + // NATS distribution state private natsConnectionManager?: NatsConnectionManager private natsDistLeader?: NatsDistributionLeader @@ -361,6 +363,8 @@ export class FlowcoreDataPumpCluster { this.pump.stop() this.pump = undefined } + + this.pumpRestartAttempts = 0 } private startPumpAsLeader(): void { @@ -370,6 +374,7 @@ export class FlowcoreDataPumpCluster { processor: { concurrency: this.workerConcurrency, handler: async (events: FlowcoreEvent[]) => { + this.pumpRestartAttempts = 0 await this.distributeEvents(events) }, failedHandler: this.workerFailedHandler, @@ -379,6 +384,14 @@ export class FlowcoreDataPumpCluster { this.pump = FlowcoreDataPump.create(pumpOptions, this.options.dataSourceOverride) this.pump.start().catch((error) => { this.logger?.error("Pump error in leader mode", { error }) + if (!this.running || !this.isLeader) return + this.pumpRestartAttempts++ + const delay = Math.min(RECONNECT_BASE_MS * Math.pow(2, this.pumpRestartAttempts - 1), RECONNECT_MAX_MS) + this.logger?.warn(`Restarting leader pump in ${delay}ms (attempt ${this.pumpRestartAttempts})`) + setTimeout(() => { + if (!this.running || !this.isLeader) return + this.startPumpAsLeader() + }, delay) }) } diff --git a/src/data-pump/data-pump.ts b/src/data-pump/data-pump.ts index 38acc2c..f56a6c0 100644 --- a/src/data-pump/data-pump.ts +++ b/src/data-pump/data-pump.ts @@ -91,6 +91,7 @@ export class FlowcoreDataPump { private acknowledgedCount = 0 private failedCount = 0 private pulledCount = 0 + private processLoopRestartAttempts = 0 private constructor( public readonly dataSource: FlowcoreDataSource, @@ -220,10 +221,7 @@ export class FlowcoreDataPump { } if (this.options.processor) { - this.processLoop().catch((error) => { - this.logger?.error("Error in processor", { error }) - this.stop() - }) + this.startProcessLoop() } if (!callback) { @@ -245,6 +243,7 @@ export class FlowcoreDataPump { public stop(isRestart = false): void { this.running = false + this.processLoopRestartAttempts = 0 this.buffer = [] this.updateMetricsGauges() this.pulseEmitter?.stop() @@ -499,12 +498,27 @@ export class FlowcoreDataPump { // #region Pusher + private startProcessLoop(): void { + this.processLoop().catch((error) => { + this.logger?.error("Error in processor", { error }) + if (!this.running) return + this.processLoopRestartAttempts++ + const delay = Math.min(1_000 * Math.pow(2, this.processLoopRestartAttempts - 1), 30_000) + this.logger?.warn(`Restarting process loop in ${delay}ms (attempt ${this.processLoopRestartAttempts})`) + setTimeout(() => { + if (!this.running) return + this.startProcessLoop() + }, delay) + }) + } + private async processLoop() { while (this.running) { try { const events = await this.reserve(this.options.processor?.concurrency ?? 1) await this.options.processor?.handler(events) await this.acknowledge(events.map((event) => event.eventId)) + this.processLoopRestartAttempts = 0 } catch (error) { const errorMessage = error instanceof Error ? error.message : "Unknown error" this.logger?.error(`Failed to process events: ${errorMessage}`) diff --git a/test/tests/data-pump-restart.test.ts b/test/tests/data-pump-restart.test.ts new file mode 100644 index 0000000..f82b74b --- /dev/null +++ b/test/tests/data-pump-restart.test.ts @@ -0,0 +1,107 @@ +import { assertEquals } from "@std/assert" +import { afterEach, beforeEach, describe, it } from "@std/testing/bdd" +import { FakeTime } from "@std/testing/time" +import type { FlowcoreEvent } from "@flowcore/sdk" +import { FlowcoreDataPump } from "../../src/data-pump/data-pump.ts" +import type { FlowcoreDataPumpStateManager } from "../../src/data-pump/types.ts" + +// #region Test Helpers + +const FAKE_API_KEY = "fc_testid_testsecret" + +function createMockStateManager(): FlowcoreDataPumpStateManager { + return { + getState: () => Promise.resolve({ timeBucket: "20260331120000" }), + setState: () => {}, + } +} + +function createMockLogger() { + const logs: { level: string; message: string }[] = [] + return { + logger: { + debug: (msg: string) => logs.push({ level: "debug", message: msg }), + info: (msg: string) => logs.push({ level: "info", message: msg }), + warn: (msg: string) => logs.push({ level: "warn", message: msg }), + error: (msg: string | Error) => logs.push({ level: "error", message: msg instanceof Error ? msg.message : msg }), + }, + logs, + } +} + +function createPump( + handler: (events: FlowcoreEvent[]) => Promise, + logger: ReturnType["logger"], +): FlowcoreDataPump { + return FlowcoreDataPump.create({ + auth: { apiKey: FAKE_API_KEY }, + dataSource: { + tenant: "test", + dataCore: "test-dc", + flowType: "test.0", + eventTypes: ["test.created.0"], + }, + stateManager: createMockStateManager(), + processor: { + concurrency: 1, + handler, + }, + notifier: { type: "poller", intervalMs: 60_000 }, + logger, + baseUrlOverride: "http://localhost:99999", + noTranslation: true, + }) +} + +// #endregion + +describe("processLoop restart", { sanitizeOps: false, sanitizeResources: false }, () => { + let fakeTime: FakeTime + + beforeEach(() => { + fakeTime = new FakeTime() + }) + + afterEach(() => { + fakeTime.restore() + }) + + it("should not emit restart warnings after stop", async () => { + const { logger, logs } = createMockLogger() + + const pump = createPump(() => { + return Promise.reject(new Error("Always fails")) + }, logger) + + void pump.start(() => {}) + // Stop immediately — should prevent any restart scheduling + pump.stop() + + // Advance past any potential restart delays + await fakeTime.tickAsync(60_000) + + const restartLogs = logs.filter((l) => l.level === "warn" && l.message.includes("Restarting process loop")) + assertEquals(restartLogs.length, 0) + }) +}) + +describe("backoff formula", () => { + it("should produce correct exponential sequence capped at 30s", () => { + const delays = [] + for (let attempt = 1; attempt <= 8; attempt++) { + delays.push(Math.min(1_000 * Math.pow(2, attempt - 1), 30_000)) + } + assertEquals(delays, [1_000, 2_000, 4_000, 8_000, 16_000, 30_000, 30_000, 30_000]) + }) + + it("should use RECONNECT constants for leader pump (same formula)", () => { + const RECONNECT_BASE_MS = 1_000 + const RECONNECT_MAX_MS = 30_000 + + const delays = [] + for (let attempt = 1; attempt <= 6; attempt++) { + delays.push(Math.min(RECONNECT_BASE_MS * Math.pow(2, attempt - 1), RECONNECT_MAX_MS)) + } + assertEquals(delays, [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]) + }) +})