Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/data-pump/data-pump-cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export class FlowcoreDataPumpCluster {
private workerHandler?: (events: FlowcoreEvent[]) => Promise<void>
private workerFailedHandler?: (events: FlowcoreEvent[]) => void | Promise<void>

private pumpRestartAttempts = 0

// NATS distribution state
private natsConnectionManager?: NatsConnectionManager
private natsDistLeader?: NatsDistributionLeader
Expand Down Expand Up @@ -361,6 +363,8 @@ export class FlowcoreDataPumpCluster {
this.pump.stop()
this.pump = undefined
}

this.pumpRestartAttempts = 0
}

private startPumpAsLeader(): void {
Expand All @@ -370,6 +374,7 @@ export class FlowcoreDataPumpCluster {
processor: {
concurrency: this.workerConcurrency,
handler: async (events: FlowcoreEvent[]) => {
this.pumpRestartAttempts = 0
await this.distributeEvents(events)
},
failedHandler: this.workerFailedHandler,
Expand All @@ -379,6 +384,14 @@ export class FlowcoreDataPumpCluster {
this.pump = FlowcoreDataPump.create(pumpOptions, this.options.dataSourceOverride)
this.pump.start().catch((error) => {
this.logger?.error("Pump error in leader mode", { error })
if (!this.running || !this.isLeader) return
this.pumpRestartAttempts++
const delay = Math.min(RECONNECT_BASE_MS * Math.pow(2, this.pumpRestartAttempts - 1), RECONNECT_MAX_MS)
this.logger?.warn(`Restarting leader pump in ${delay}ms (attempt ${this.pumpRestartAttempts})`)
setTimeout(() => {
if (!this.running || !this.isLeader) return
this.startPumpAsLeader()
}, delay)
})
}

Expand Down
22 changes: 18 additions & 4 deletions src/data-pump/data-pump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export class FlowcoreDataPump {
private acknowledgedCount = 0
private failedCount = 0
private pulledCount = 0
private processLoopRestartAttempts = 0

private constructor(
public readonly dataSource: FlowcoreDataSource,
Expand Down Expand Up @@ -220,10 +221,7 @@ export class FlowcoreDataPump {
}

if (this.options.processor) {
this.processLoop().catch((error) => {
this.logger?.error("Error in processor", { error })
this.stop()
})
this.startProcessLoop()
}

if (!callback) {
Expand All @@ -245,6 +243,7 @@ export class FlowcoreDataPump {

public stop(isRestart = false): void {
this.running = false
this.processLoopRestartAttempts = 0
this.buffer = []
this.updateMetricsGauges()
this.pulseEmitter?.stop()
Expand Down Expand Up @@ -499,12 +498,27 @@ export class FlowcoreDataPump {

// #region Pusher

private startProcessLoop(): void {
this.processLoop().catch((error) => {
this.logger?.error("Error in processor", { error })
if (!this.running) return
this.processLoopRestartAttempts++
const delay = Math.min(1_000 * Math.pow(2, this.processLoopRestartAttempts - 1), 30_000)
this.logger?.warn(`Restarting process loop in ${delay}ms (attempt ${this.processLoopRestartAttempts})`)
setTimeout(() => {
if (!this.running) return
this.startProcessLoop()
}, delay)
})
}

private async processLoop() {
while (this.running) {
try {
const events = await this.reserve(this.options.processor?.concurrency ?? 1)
await this.options.processor?.handler(events)
await this.acknowledge(events.map((event) => event.eventId))
this.processLoopRestartAttempts = 0
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "Unknown error"
this.logger?.error(`Failed to process events: ${errorMessage}`)
Expand Down
107 changes: 107 additions & 0 deletions test/tests/data-pump-restart.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { assertEquals } from "@std/assert"
import { afterEach, beforeEach, describe, it } from "@std/testing/bdd"
import { FakeTime } from "@std/testing/time"
import type { FlowcoreEvent } from "@flowcore/sdk"
import { FlowcoreDataPump } from "../../src/data-pump/data-pump.ts"
import type { FlowcoreDataPumpStateManager } from "../../src/data-pump/types.ts"

// #region Test Helpers

const FAKE_API_KEY = "fc_testid_testsecret"

function createMockStateManager(): FlowcoreDataPumpStateManager {
return {
getState: () => Promise.resolve({ timeBucket: "20260331120000" }),
setState: () => {},
}
}

function createMockLogger() {
const logs: { level: string; message: string }[] = []
return {
logger: {
debug: (msg: string) => logs.push({ level: "debug", message: msg }),
info: (msg: string) => logs.push({ level: "info", message: msg }),
warn: (msg: string) => logs.push({ level: "warn", message: msg }),
error: (msg: string | Error) => logs.push({ level: "error", message: msg instanceof Error ? msg.message : msg }),
},
logs,
}
}

function createPump(
handler: (events: FlowcoreEvent[]) => Promise<void>,
logger: ReturnType<typeof createMockLogger>["logger"],
): FlowcoreDataPump {
return FlowcoreDataPump.create({
auth: { apiKey: FAKE_API_KEY },
dataSource: {
tenant: "test",
dataCore: "test-dc",
flowType: "test.0",
eventTypes: ["test.created.0"],
},
stateManager: createMockStateManager(),
processor: {
concurrency: 1,
handler,
},
notifier: { type: "poller", intervalMs: 60_000 },
logger,
baseUrlOverride: "http://localhost:99999",
noTranslation: true,
})
}

// #endregion

describe("processLoop restart", { sanitizeOps: false, sanitizeResources: false }, () => {
let fakeTime: FakeTime

beforeEach(() => {
fakeTime = new FakeTime()
})

afterEach(() => {
fakeTime.restore()
})

it("should not emit restart warnings after stop", async () => {
const { logger, logs } = createMockLogger()

const pump = createPump(() => {
return Promise.reject(new Error("Always fails"))
}, logger)

void pump.start(() => {})
// Stop immediately — should prevent any restart scheduling
pump.stop()

// Advance past any potential restart delays
await fakeTime.tickAsync(60_000)

const restartLogs = logs.filter((l) => l.level === "warn" && l.message.includes("Restarting process loop"))
assertEquals(restartLogs.length, 0)
})
})

describe("backoff formula", () => {
it("should produce correct exponential sequence capped at 30s", () => {
const delays = []
for (let attempt = 1; attempt <= 8; attempt++) {
delays.push(Math.min(1_000 * Math.pow(2, attempt - 1), 30_000))
}
assertEquals(delays, [1_000, 2_000, 4_000, 8_000, 16_000, 30_000, 30_000, 30_000])
})

it("should use RECONNECT constants for leader pump (same formula)", () => {
const RECONNECT_BASE_MS = 1_000
const RECONNECT_MAX_MS = 30_000

const delays = []
for (let attempt = 1; attempt <= 6; attempt++) {
delays.push(Math.min(RECONNECT_BASE_MS * Math.pow(2, attempt - 1), RECONNECT_MAX_MS))
}
assertEquals(delays, [1_000, 2_000, 4_000, 8_000, 16_000, 30_000])
})
})
Loading