diff --git a/src/data-pump/data-pump-cluster.ts b/src/data-pump/data-pump-cluster.ts index 58ae6fe..448b7c2 100644 --- a/src/data-pump/data-pump-cluster.ts +++ b/src/data-pump/data-pump-cluster.ts @@ -65,6 +65,8 @@ export class FlowcoreDataPumpCluster { private workerHandler?: (events: FlowcoreEvent[]) => Promise private workerFailedHandler?: (events: FlowcoreEvent[]) => void | Promise + private pumpRestartAttempts = 0 + // NATS distribution state private natsConnectionManager?: NatsConnectionManager private natsDistLeader?: NatsDistributionLeader @@ -361,6 +363,8 @@ export class FlowcoreDataPumpCluster { this.pump.stop() this.pump = undefined } + + this.pumpRestartAttempts = 0 } private startPumpAsLeader(): void { @@ -370,6 +374,7 @@ export class FlowcoreDataPumpCluster { processor: { concurrency: this.workerConcurrency, handler: async (events: FlowcoreEvent[]) => { + this.pumpRestartAttempts = 0 await this.distributeEvents(events) }, failedHandler: this.workerFailedHandler, @@ -379,6 +384,14 @@ export class FlowcoreDataPumpCluster { this.pump = FlowcoreDataPump.create(pumpOptions, this.options.dataSourceOverride) this.pump.start().catch((error) => { this.logger?.error("Pump error in leader mode", { error }) + if (!this.running || !this.isLeader) return + this.pumpRestartAttempts++ + const delay = Math.min(RECONNECT_BASE_MS * Math.pow(2, this.pumpRestartAttempts - 1), RECONNECT_MAX_MS) + this.logger?.warn(`Restarting leader pump in ${delay}ms (attempt ${this.pumpRestartAttempts})`) + setTimeout(() => { + if (!this.running || !this.isLeader) return + this.startPumpAsLeader() + }, delay) }) } diff --git a/src/data-pump/data-pump.ts b/src/data-pump/data-pump.ts index 38acc2c..f56a6c0 100644 --- a/src/data-pump/data-pump.ts +++ b/src/data-pump/data-pump.ts @@ -91,6 +91,7 @@ export class FlowcoreDataPump { private acknowledgedCount = 0 private failedCount = 0 private pulledCount = 0 + private processLoopRestartAttempts = 0 private constructor( public readonly dataSource: FlowcoreDataSource, @@ -220,10 +221,7 @@ export class FlowcoreDataPump { } if (this.options.processor) { - this.processLoop().catch((error) => { - this.logger?.error("Error in processor", { error }) - this.stop() - }) + this.startProcessLoop() } if (!callback) { @@ -245,6 +243,7 @@ export class FlowcoreDataPump { public stop(isRestart = false): void { this.running = false + this.processLoopRestartAttempts = 0 this.buffer = [] this.updateMetricsGauges() this.pulseEmitter?.stop() @@ -499,12 +498,27 @@ export class FlowcoreDataPump { // #region Pusher + private startProcessLoop(): void { + this.processLoop().catch((error) => { + this.logger?.error("Error in processor", { error }) + if (!this.running) return + this.processLoopRestartAttempts++ + const delay = Math.min(1_000 * Math.pow(2, this.processLoopRestartAttempts - 1), 30_000) + this.logger?.warn(`Restarting process loop in ${delay}ms (attempt ${this.processLoopRestartAttempts})`) + setTimeout(() => { + if (!this.running) return + this.startProcessLoop() + }, delay) + }) + } + private async processLoop() { while (this.running) { try { const events = await this.reserve(this.options.processor?.concurrency ?? 1) await this.options.processor?.handler(events) await this.acknowledge(events.map((event) => event.eventId)) + this.processLoopRestartAttempts = 0 } catch (error) { const errorMessage = error instanceof Error ? error.message : "Unknown error" this.logger?.error(`Failed to process events: ${errorMessage}`) diff --git a/test/tests/data-pump-restart.test.ts b/test/tests/data-pump-restart.test.ts new file mode 100644 index 0000000..f82b74b --- /dev/null +++ b/test/tests/data-pump-restart.test.ts @@ -0,0 +1,107 @@ +import { assertEquals } from "@std/assert" +import { afterEach, beforeEach, describe, it } from "@std/testing/bdd" +import { FakeTime } from "@std/testing/time" +import type { FlowcoreEvent } from "@flowcore/sdk" +import { FlowcoreDataPump } from "../../src/data-pump/data-pump.ts" +import type { FlowcoreDataPumpStateManager } from "../../src/data-pump/types.ts" + +// #region Test Helpers + +const FAKE_API_KEY = "fc_testid_testsecret" + +function createMockStateManager(): FlowcoreDataPumpStateManager { + return { + getState: () => Promise.resolve({ timeBucket: "20260331120000" }), + setState: () => {}, + } +} + +function createMockLogger() { + const logs: { level: string; message: string }[] = [] + return { + logger: { + debug: (msg: string) => logs.push({ level: "debug", message: msg }), + info: (msg: string) => logs.push({ level: "info", message: msg }), + warn: (msg: string) => logs.push({ level: "warn", message: msg }), + error: (msg: string | Error) => logs.push({ level: "error", message: msg instanceof Error ? msg.message : msg }), + }, + logs, + } +} + +function createPump( + handler: (events: FlowcoreEvent[]) => Promise, + logger: ReturnType["logger"], +): FlowcoreDataPump { + return FlowcoreDataPump.create({ + auth: { apiKey: FAKE_API_KEY }, + dataSource: { + tenant: "test", + dataCore: "test-dc", + flowType: "test.0", + eventTypes: ["test.created.0"], + }, + stateManager: createMockStateManager(), + processor: { + concurrency: 1, + handler, + }, + notifier: { type: "poller", intervalMs: 60_000 }, + logger, + baseUrlOverride: "http://localhost:99999", + noTranslation: true, + }) +} + +// #endregion + +describe("processLoop restart", { sanitizeOps: false, sanitizeResources: false }, () => { + let fakeTime: FakeTime + + beforeEach(() => { + fakeTime = new FakeTime() + }) + + afterEach(() => { + fakeTime.restore() + }) + + it("should not emit restart warnings after stop", async () => { + const { logger, logs } = createMockLogger() + + const pump = createPump(() => { + return Promise.reject(new Error("Always fails")) + }, logger) + + void pump.start(() => {}) + // Stop immediately — should prevent any restart scheduling + pump.stop() + + // Advance past any potential restart delays + await fakeTime.tickAsync(60_000) + + const restartLogs = logs.filter((l) => l.level === "warn" && l.message.includes("Restarting process loop")) + assertEquals(restartLogs.length, 0) + }) +}) + +describe("backoff formula", () => { + it("should produce correct exponential sequence capped at 30s", () => { + const delays = [] + for (let attempt = 1; attempt <= 8; attempt++) { + delays.push(Math.min(1_000 * Math.pow(2, attempt - 1), 30_000)) + } + assertEquals(delays, [1_000, 2_000, 4_000, 8_000, 16_000, 30_000, 30_000, 30_000]) + }) + + it("should use RECONNECT constants for leader pump (same formula)", () => { + const RECONNECT_BASE_MS = 1_000 + const RECONNECT_MAX_MS = 30_000 + + const delays = [] + for (let attempt = 1; attempt <= 6; attempt++) { + delays.push(Math.min(RECONNECT_BASE_MS * Math.pow(2, attempt - 1), RECONNECT_MAX_MS)) + } + assertEquals(delays, [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]) + }) +})