diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index c2e6485..d2336e2 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -364,6 +364,9 @@ export class PathwaysBuilder< private pathwayPump: PathwayPump | null = null private commandPoller: CommandPoller | null = null private clusterBypassProcess = false + private currentPumpAutoProvision = false + private currentPumpUsesExplicitPulse = false + private currentPumpUsesAutoPulse = false /** * Creates a new PathwaysBuilder instance @@ -1244,27 +1247,14 @@ export class PathwaysBuilder< // Listen for leadership changes to auto-start/stop the pump this.clusterManager.onLeadershipChange((isLeader: boolean) => { - if (isLeader && this.pathwayPump && !this.pathwayPump.isRunning) { - this.logger.info("Became leader, starting pump") - const registrations = Object.keys(this.pathways).map((key) => { - const [flowType, eventType] = key.split("/") - return { flowType, eventType } - }) - this.pathwayPump.start(registrations).catch((err) => { - this.logger.error( - "Failed to start pump after becoming leader", - err instanceof Error ? err : new Error(String(err)), - ) - }) - } else if (!isLeader && this.pathwayPump?.isRunning) { - this.logger.info("Lost leadership, stopping pump") - this.pathwayPump.stop().catch((err) => { - this.logger.error( - "Failed to stop pump after losing leadership", - err instanceof Error ? err : new Error(String(err)), - ) - }) - } + this.handleLeadershipChange(isLeader).catch((err) => { + this.logger.error( + isLeader + ? "Failed to bootstrap leader runtime after becoming leader" + : "Failed to stop leader runtime after losing leadership", + err instanceof Error ? err : new Error(String(err)), + ) + }) }) // Wire reset handler: leader receives reset requests and delegates to pump @@ -1302,6 +1292,149 @@ export class PathwaysBuilder< return this.clusterManager !== null && this.clusterManager.isRunning } + private buildAutoPulseConfig(): NonNullable | null { + if (this.pathwayMode !== "virtual" || !this.pathwayId) { + return null + } + + return { + url: this.pulseUrl, + intervalMs: this.pulseIntervalMs, + pathwayId: this.pathwayId, + successLogLevel: this.logLevel.pulseSuccess, + failureLogLevel: this.logLevel.pulseFailure, + } + } + + private async applyAutoPulseConfig(): Promise { + if (!this.pathwayPump || this.currentPumpUsesExplicitPulse || this.currentPumpUsesAutoPulse) { + return + } + + const pulse = this.buildAutoPulseConfig() + if (!pulse) { + return + } + + await this.pathwayPump.setPulseConfig(pulse) + this.currentPumpUsesAutoPulse = true + this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl }) + } + + private startCommandPollerIfNeeded(): void { + if ( + this.pathwayMode !== "virtual" || !this.pathwayId || !this.pathwayPump?.isRunning || + this.commandPoller + ) { + return + } + + this.commandPoller = new CommandPoller({ + cpBaseUrl: this.pulseUrl, + pathwayId: this.pathwayId, + apiKey: this.apiKey, + intervalMs: this.commandPollingIntervalMs, + logger: this.logger, + onCommand: async (cmd) => { + if (cmd.type === "datapumpRestart") { + const position = cmd.position + ? { + timeBucket: (cmd.position as Record).timeBucket ?? "", + eventId: (cmd.position as Record).eventId, + } + : undefined + const flowTypes = cmd.sourceFlowTypes ?? undefined + await this.pathwayPump!.reset(position, flowTypes) + } else { + this.logger.warn("Unknown command type received", { type: cmd.type, commandId: cmd.id }) + } + }, + logLevel: { + pollSuccess: this.logLevel.pulseSuccess, + pollFailure: this.logLevel.pulseFailure, + }, + }) + this.commandPoller.start() + this.logger.info("Command poller started", { + pathwayId: this.pathwayId, + intervalMs: this.commandPollingIntervalMs, + }) + } + + private stopCommandPoller(): void { + if (!this.commandPoller) { + return + } + + this.commandPoller.stop() + this.commandPoller = null + } + + private async startCurrentPump(): Promise { + if (!this.pathwayPump || this.pathwayPump.isRunning) { + return + } + + const registrations = this.buildRegistrations() + await this.pathwayPump.start(registrations) + + this.logger.info("Pump started", { + pathways: registrations.length, + }) + } + + private async stopLeaderRuntime(): Promise { + this.stopCommandPoller() + + if (!this.pathwayPump?.isRunning) { + return + } + + await this.pathwayPump.stop() + this.logger.info("Pump stopped") + } + + private async bootstrapLeaderPump(): Promise { + if (!this.pathwayPump) { + return + } + + await this.applyAutoPulseConfig() + await this.startCurrentPump() + + if (this.runtimeEnv !== "production" || this.pathwayMode !== "virtual") { + this.startCommandPollerIfNeeded() + return + } + + if (this.currentPumpAutoProvision && !this.pathwayId) { + try { + await this.registerPathwayInstance(this.buildRegistrations()) + await this.applyAutoPulseConfig() + } catch (err) { + await this.stopLeaderRuntime() + throw err + } + } + + this.startCommandPollerIfNeeded() + } + + private async handleLeadershipChange(isLeader: boolean): Promise { + if (isLeader) { + if (!this.pathwayPump) { + return + } + + this.logger.info("Became leader, bootstrapping pump") + await this.bootstrapLeaderPump() + return + } + + this.logger.info("Lost leadership, stopping leader runtime") + await this.stopLeaderRuntime() + } + private buildRegistrations(): ProvisionerRegistration[] { return Object.keys(this.pathways).map((key) => { const [flowType, eventType] = key.split("/") @@ -1498,6 +1631,9 @@ export class PathwaysBuilder< } const autoProvision = options.autoProvision ?? this.defaultAutoProvision + this.currentPumpAutoProvision = autoProvision + this.currentPumpUsesExplicitPulse = Boolean(options.pulse) + this.currentPumpUsesAutoPulse = false if (autoProvision) { if (this.runtimeEnv === "production") { @@ -1508,7 +1644,11 @@ export class PathwaysBuilder< this.logger.info("Auto-provisioning Flowcore resources for production startup", { pathwayMode: this.pathwayMode, }) - await this.provision() + if (this.pathwayMode === "virtual") { + await this.provisionSharedResources() + } else { + await this.provision() + } } else if (this.runtimeEnv === "development") { this.logger.info("Auto-provisioning shared Flowcore resources for development startup") await this.provisionSharedResources() @@ -1517,21 +1657,7 @@ export class PathwaysBuilder< } } - // Auto-configure pulse when pathwayName was provisioned and no explicit pulse config - if (this.pathwayMode === "virtual" && this.pathwayId && !options.pulse) { - options = { - ...options, - pulse: { - url: this.pulseUrl, - intervalMs: this.pulseIntervalMs, - pathwayId: this.pathwayId, - successLogLevel: this.logLevel.pulseSuccess, - failureLogLevel: this.logLevel.pulseFailure, - }, - } - this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl }) - } else if (options.pulse) { - // Respect explicit pulse config but inject log levels from builder config if not set + if (options.pulse) { options = { ...options, pulse: { @@ -1540,6 +1666,16 @@ export class PathwaysBuilder< failureLogLevel: options.pulse.failureLogLevel ?? this.logLevel.pulseFailure, }, } + } else { + const autoPulse = this.buildAutoPulseConfig() + if (autoPulse) { + this.currentPumpUsesAutoPulse = true + this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl }) + options = { + ...options, + pulse: autoPulse, + } + } } // If cluster active and not leader, don't start pump @@ -1575,50 +1711,11 @@ export class PathwaysBuilder< return this.pathwayPump } - // Collect registered pathways - const registrations = Object.keys(this.pathways).map((key) => { - const [flowType, eventType] = key.split("/") - return { flowType, eventType } - }) - - await this.pathwayPump.start(registrations) - - this.logger.info("Pump started", { - pathways: registrations.length, - }) - - // Auto-start command poller for virtual pathways (poll CP for restart commands) - if (this.pathwayMode === "virtual" && this.pathwayId && !this.commandPoller) { - this.commandPoller = new CommandPoller({ - cpBaseUrl: this.pulseUrl, - pathwayId: this.pathwayId, - apiKey: this.apiKey, - intervalMs: this.commandPollingIntervalMs, - logger: this.logger, - onCommand: async (cmd) => { - if (cmd.type === "datapumpRestart") { - const position = cmd.position - ? { - timeBucket: (cmd.position as Record).timeBucket ?? "", - eventId: (cmd.position as Record).eventId, - } - : undefined - const flowTypes = cmd.sourceFlowTypes ?? undefined - await this.pathwayPump!.reset(position, flowTypes) - } else { - this.logger.warn("Unknown command type received", { type: cmd.type, commandId: cmd.id }) - } - }, - logLevel: { - pollSuccess: this.logLevel.pulseSuccess, - pollFailure: this.logLevel.pulseFailure, - }, - }) - this.commandPoller.start() - this.logger.info("Command poller started", { - pathwayId: this.pathwayId, - intervalMs: this.commandPollingIntervalMs, - }) + try { + await this.bootstrapLeaderPump() + } catch (err) { + await this.stopPump() + throw err } return this.pathwayPump @@ -1628,13 +1725,18 @@ export class PathwaysBuilder< * Stop the data pump */ async stopPump(): Promise { - if (this.commandPoller) { - this.commandPoller.stop() - this.commandPoller = null + this.stopCommandPoller() + if (!this.pathwayPump) { + this.currentPumpAutoProvision = false + this.currentPumpUsesExplicitPulse = false + this.currentPumpUsesAutoPulse = false + return } - if (!this.pathwayPump) return await this.pathwayPump.stop() this.pathwayPump = null + this.currentPumpAutoProvision = false + this.currentPumpUsesExplicitPulse = false + this.currentPumpUsesAutoPulse = false this.logger.info("Pump stopped") } diff --git a/src/pathways/pump/pathway-pump.ts b/src/pathways/pump/pathway-pump.ts index 9b20911..3510782 100644 --- a/src/pathways/pump/pathway-pump.ts +++ b/src/pathways/pump/pathway-pump.ts @@ -38,7 +38,7 @@ export class PathwayPump { private readonly bufferSize: number private readonly maxRedeliveryCount: number private readonly logger: Logger - private readonly pulseConfig?: { + private pulseConfig?: { url: string intervalMs?: number pathwayId?: string @@ -278,6 +278,38 @@ export class PathwayPump { return resetFlowTypes } + async setPulseConfig(pulseConfig: NonNullable): Promise { + this.pulseConfig = pulseConfig + + if (!this.running) { + return + } + + const flowTypeGroups = [...this.flowTypeEventTypes.entries()] + const existingPumps = [...this.pumps.entries()] + + for (const [flowType, pump] of existingPumps) { + try { + await pump.stop() + this.logger.info("Data pump stopped for pulse reconfiguration", { flowType }) + } catch (err) { + this.logger.error( + `Error stopping pump for ${flowType} during pulse reconfiguration`, + err instanceof Error ? err : new Error(String(err)), + ) + throw err + } + } + + this.pumps.clear() + this.stateManagers.clear() + this.restartAttempts.clear() + + for (const [flowType, eventTypes] of flowTypeGroups) { + await this.startPumpForFlowType(flowType, eventTypes) + } + } + get isRunning(): boolean { return this.running } diff --git a/tests/pathway-pump.test.ts b/tests/pathway-pump.test.ts index bdbf28f..bcdf21f 100644 --- a/tests/pathway-pump.test.ts +++ b/tests/pathway-pump.test.ts @@ -130,5 +130,76 @@ Deno.test({ assertEquals(groups.get("order"), ["placed", "shipped"]) assertEquals(groups.get("payment"), ["received"]) }) + + await t.step("setPulseConfig recreates running pumps with the new pulse configuration", async () => { + const factory = createInMemoryStateFactory() + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + }) + + pump.configure({ + tenant: "test-tenant", + dataCore: "test-dc", + apiKey: "test-key", + baseUrl: "https://api.flowcore.io", + processEvent: async (_pathway: string, _event: FlowcoreEvent) => {}, + }) + + const stoppedFlowTypes: string[] = [] + const createdFlowTypes: string[] = [] + const createdPulsePathwayIds: string[] = [] + ;(pump as unknown as { + running: boolean + pumps: Map }> + flowTypeEventTypes: Map + dataPumpConstructor: { + create(options: Record): Promise<{ start(cb?: unknown): Promise }> + } + }).running = true + ;(pump as unknown as { pumps: Map }> }).pumps = new Map([ + ["user", { + stop: async () => { + stoppedFlowTypes.push("user") + }, + }], + ["order", { + stop: async () => { + stoppedFlowTypes.push("order") + }, + }], + ]) + ;(pump as unknown as { flowTypeEventTypes: Map }).flowTypeEventTypes = new Map([ + ["user", ["created", "updated"]], + ["order", ["placed"]], + ]) + ;(pump as unknown as { + dataPumpConstructor: { + create(options: Record): Promise<{ start(cb?: unknown): Promise }> + } + }).dataPumpConstructor = { + create: async (options: Record) => { + const dataSource = options.dataSource as { flowType: string } + const pulse = options.pulse as { pathwayId: string } + createdFlowTypes.push(dataSource.flowType) + createdPulsePathwayIds.push(pulse.pathwayId) + + return { + start: async () => {}, + } + }, + } + + await pump.setPulseConfig({ + url: "http://localhost:3000", + pathwayId: "pathway-123", + }) + + assertEquals(stoppedFlowTypes.sort(), ["order", "user"]) + assertEquals(createdFlowTypes.sort(), ["order", "user"]) + assertEquals(createdPulsePathwayIds, ["pathway-123", "pathway-123"]) + assertEquals(pump.isRunning, true) + assertEquals(pump.registeredFlowTypes.sort(), ["order", "user"]) + }) }, }) diff --git a/tests/pump-runtime-behavior.test.ts b/tests/pump-runtime-behavior.test.ts index 65cebbb..7f3a637 100644 --- a/tests/pump-runtime-behavior.test.ts +++ b/tests/pump-runtime-behavior.test.ts @@ -2,7 +2,7 @@ import { assertEquals, assertRejects } from "https://deno.land/std@0.224.0/asser import { stub } from "https://deno.land/std@0.224.0/testing/mock.ts" import { z } from "zod" import { CommandPoller } from "../src/pathways/command-poller.ts" -import { type PathwaysBuilderConfig, PathwaysBuilder } from "../src/pathways/builder.ts" +import { PathwaysBuilder, type PathwaysBuilderConfig } from "../src/pathways/builder.ts" import { PathwayPump } from "../src/pathways/pump/pathway-pump.ts" import { PathwayProvisioner } from "../src/pathways/provisioner.ts" @@ -142,23 +142,34 @@ Deno.test({ }) await t.step( - "production virtual mode provisions pathway resources and starts the local pump on the leader", + "production virtual mode starts the local pump before registering the virtual pathway on the leader", async () => { let provisionCalls = 0 let startCalls = 0 + let setPulseCalls = 0 let commandPollerStarts = 0 + const lifecycle: string[] = [] const fetchBodies: Array> = [] const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { provisionCalls++ + lifecycle.push("provision") }) - const startStub = stub(PathwayPump.prototype, "start", async () => { + const startStub = stub(PathwayPump.prototype, "start", async function (this: PathwayPump) { startCalls++ + ;(this as unknown as { running: boolean }).running = true + lifecycle.push("start") + }) + const setPulseStub = stub(PathwayPump.prototype, "setPulseConfig", async () => { + setPulseCalls++ + lifecycle.push("setPulse") }) const pollerStub = stub(CommandPoller.prototype, "start", () => { commandPollerStarts++ + lifecycle.push("pollerStart") }) const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + lifecycle.push("fetch") fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { status: 200, @@ -181,18 +192,180 @@ Deno.test({ assertEquals(provisionCalls, 1) assertEquals(startCalls, 1) + assertEquals(setPulseCalls, 1) assertEquals(commandPollerStarts, 1) assertEquals(fetchBodies.length, 1) assertEquals(fetchBodies[0].type, "virtual") + assertEquals(lifecycle, ["provision", "start", "fetch", "setPulse", "pollerStart"]) } finally { provisionStub.restore() startStub.restore() + setPulseStub.restore() pollerStub.restore() fetchStub.restore() } }, ) + await t.step( + "production virtual mode skips by-name registration on non-leaders and defers it until leadership gain", + async () => { + let provisionCalls = 0 + let startCalls = 0 + let stopCalls = 0 + let setPulseCalls = 0 + let commandPollerStarts = 0 + let commandPollerStops = 0 + const lifecycle: string[] = [] + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + lifecycle.push("provision") + }) + const startStub = stub(PathwayPump.prototype, "start", async function (this: PathwayPump) { + startCalls++ + ;(this as unknown as { running: boolean }).running = true + lifecycle.push("start") + }) + const stopStub = stub(PathwayPump.prototype, "stop", async function (this: PathwayPump) { + stopCalls++ + ;(this as unknown as { running: boolean }).running = false + lifecycle.push("stop") + }) + const setPulseStub = stub(PathwayPump.prototype, "setPulseConfig", async () => { + setPulseCalls++ + lifecycle.push("setPulse") + }) + const pollerStartStub = stub(CommandPoller.prototype, "start", () => { + commandPollerStarts++ + lifecycle.push("pollerStart") + }) + const pollerStopStub = stub(CommandPoller.prototype, "stop", () => { + commandPollerStops++ + lifecycle.push("pollerStop") + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + lifecycle.push("fetch") + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "virtual-service", + pathwayMode: "virtual", + }) + const clusterManager = { isRunning: true, isLeader: false } + ;(builder as unknown as { clusterManager: typeof clusterManager }).clusterManager = clusterManager + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 0) + assertEquals(fetchBodies.length, 0) + assertEquals(setPulseCalls, 0) + assertEquals(commandPollerStarts, 0) + + clusterManager.isLeader = true + await (builder as unknown as { handleLeadershipChange(isLeader: boolean): Promise }) + .handleLeadershipChange(true) + + assertEquals(startCalls, 1) + assertEquals(fetchBodies.length, 1) + assertEquals(setPulseCalls, 1) + assertEquals(commandPollerStarts, 1) + + await (builder as unknown as { handleLeadershipChange(isLeader: boolean): Promise }) + .handleLeadershipChange(false) + + assertEquals(commandPollerStops, 1) + assertEquals(stopCalls, 1) + assertEquals(lifecycle, [ + "provision", + "start", + "fetch", + "setPulse", + "pollerStart", + "pollerStop", + "stop", + ]) + } finally { + provisionStub.restore() + startStub.restore() + stopStub.restore() + setPulseStub.restore() + pollerStartStub.restore() + pollerStopStub.restore() + fetchStub.restore() + } + }, + ) + + await t.step( + "production virtual mode stops the local pump when registration fails after startup", + async () => { + let startCalls = 0 + let stopCalls = 0 + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => {}) + const startStub = stub(PathwayPump.prototype, "start", async function (this: PathwayPump) { + startCalls++ + ;(this as unknown as { running: boolean }).running = true + }) + const stopStub = stub(PathwayPump.prototype, "stop", async function (this: PathwayPump) { + if (!(this as unknown as { running: boolean }).running) { + return + } + stopCalls++ + ;(this as unknown as { running: boolean }).running = false + }) + const fetchStub = stub(globalThis, "fetch", async () => { + return new Response( + JSON.stringify({ + status: 500, + code: "INTERNAL_SERVER_ERROR", + message: "Internal server error", + }), + { + status: 500, + headers: { "Content-Type": "application/json" }, + }, + ) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "virtual-service", + pathwayMode: "virtual", + }) + ;(builder as unknown as { clusterManager: { isRunning: boolean; isLeader: boolean } }).clusterManager = { + isRunning: true, + isLeader: true, + } + + await assertRejects( + () => builder.startPump(createPumpOptions()), + Error, + 'Failed to register virtual pathway "virtual-service"', + ) + + assertEquals(startCalls, 1) + assertEquals(stopCalls, 1) + } finally { + provisionStub.restore() + startStub.restore() + stopStub.restore() + fetchStub.restore() + } + }, + ) + await t.step("production managed mode provisions a managed pathway and does not start a local pump", async () => { let provisionCalls = 0 let startCalls = 0