Skip to content
272 changes: 187 additions & 85 deletions src/pathways/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ export class PathwaysBuilder<
private pathwayPump: PathwayPump | null = null
private commandPoller: CommandPoller | null = null
private clusterBypassProcess = false
private currentPumpAutoProvision = false
private currentPumpUsesExplicitPulse = false
private currentPumpUsesAutoPulse = false

/**
* Creates a new PathwaysBuilder instance
Expand Down Expand Up @@ -1244,27 +1247,14 @@ export class PathwaysBuilder<

// Listen for leadership changes to auto-start/stop the pump
this.clusterManager.onLeadershipChange((isLeader: boolean) => {
if (isLeader && this.pathwayPump && !this.pathwayPump.isRunning) {
this.logger.info("Became leader, starting pump")
const registrations = Object.keys(this.pathways).map((key) => {
const [flowType, eventType] = key.split("/")
return { flowType, eventType }
})
this.pathwayPump.start(registrations).catch((err) => {
this.logger.error(
"Failed to start pump after becoming leader",
err instanceof Error ? err : new Error(String(err)),
)
})
} else if (!isLeader && this.pathwayPump?.isRunning) {
this.logger.info("Lost leadership, stopping pump")
this.pathwayPump.stop().catch((err) => {
this.logger.error(
"Failed to stop pump after losing leadership",
err instanceof Error ? err : new Error(String(err)),
)
})
}
this.handleLeadershipChange(isLeader).catch((err) => {
this.logger.error(
isLeader
? "Failed to bootstrap leader runtime after becoming leader"
: "Failed to stop leader runtime after losing leadership",
err instanceof Error ? err : new Error(String(err)),
)
})
})

// Wire reset handler: leader receives reset requests and delegates to pump
Expand Down Expand Up @@ -1302,6 +1292,149 @@ export class PathwaysBuilder<
return this.clusterManager !== null && this.clusterManager.isRunning
}

private buildAutoPulseConfig(): NonNullable<PathwayPumpOptions["pulse"]> | null {
if (this.pathwayMode !== "virtual" || !this.pathwayId) {
return null
}

return {
url: this.pulseUrl,
intervalMs: this.pulseIntervalMs,
pathwayId: this.pathwayId,
successLogLevel: this.logLevel.pulseSuccess,
failureLogLevel: this.logLevel.pulseFailure,
}
}

private async applyAutoPulseConfig(): Promise<void> {
if (!this.pathwayPump || this.currentPumpUsesExplicitPulse || this.currentPumpUsesAutoPulse) {
return
}

const pulse = this.buildAutoPulseConfig()
if (!pulse) {
return
}

await this.pathwayPump.setPulseConfig(pulse)
this.currentPumpUsesAutoPulse = true
this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl })
}

private startCommandPollerIfNeeded(): void {
if (
this.pathwayMode !== "virtual" || !this.pathwayId || !this.pathwayPump?.isRunning ||
this.commandPoller
) {
return
}

this.commandPoller = new CommandPoller({
cpBaseUrl: this.pulseUrl,
pathwayId: this.pathwayId,
apiKey: this.apiKey,
intervalMs: this.commandPollingIntervalMs,
logger: this.logger,
onCommand: async (cmd) => {
if (cmd.type === "datapumpRestart") {
const position = cmd.position
? {
timeBucket: (cmd.position as Record<string, string>).timeBucket ?? "",
eventId: (cmd.position as Record<string, string>).eventId,
}
: undefined
const flowTypes = cmd.sourceFlowTypes ?? undefined
await this.pathwayPump!.reset(position, flowTypes)
} else {
this.logger.warn("Unknown command type received", { type: cmd.type, commandId: cmd.id })
}
},
logLevel: {
pollSuccess: this.logLevel.pulseSuccess,
pollFailure: this.logLevel.pulseFailure,
},
})
this.commandPoller.start()
this.logger.info("Command poller started", {
pathwayId: this.pathwayId,
intervalMs: this.commandPollingIntervalMs,
})
}

private stopCommandPoller(): void {
if (!this.commandPoller) {
return
}

this.commandPoller.stop()
this.commandPoller = null
}

private async startCurrentPump(): Promise<void> {
if (!this.pathwayPump || this.pathwayPump.isRunning) {
return
}

const registrations = this.buildRegistrations()
await this.pathwayPump.start(registrations)

this.logger.info("Pump started", {
pathways: registrations.length,
})
}

private async stopLeaderRuntime(): Promise<void> {
this.stopCommandPoller()

if (!this.pathwayPump?.isRunning) {
return
}

await this.pathwayPump.stop()
this.logger.info("Pump stopped")
}

private async bootstrapLeaderPump(): Promise<void> {
if (!this.pathwayPump) {
return
}

await this.applyAutoPulseConfig()
await this.startCurrentPump()

if (this.runtimeEnv !== "production" || this.pathwayMode !== "virtual") {
this.startCommandPollerIfNeeded()
return
}

if (this.currentPumpAutoProvision && !this.pathwayId) {
try {
await this.registerPathwayInstance(this.buildRegistrations())
await this.applyAutoPulseConfig()
} catch (err) {
await this.stopLeaderRuntime()
throw err
}
}

this.startCommandPollerIfNeeded()
}

private async handleLeadershipChange(isLeader: boolean): Promise<void> {
if (isLeader) {
if (!this.pathwayPump) {
return
}

this.logger.info("Became leader, bootstrapping pump")
await this.bootstrapLeaderPump()
return
}

this.logger.info("Lost leadership, stopping leader runtime")
await this.stopLeaderRuntime()
}

private buildRegistrations(): ProvisionerRegistration[] {
return Object.keys(this.pathways).map((key) => {
const [flowType, eventType] = key.split("/")
Expand Down Expand Up @@ -1498,6 +1631,9 @@ export class PathwaysBuilder<
}

const autoProvision = options.autoProvision ?? this.defaultAutoProvision
this.currentPumpAutoProvision = autoProvision
this.currentPumpUsesExplicitPulse = Boolean(options.pulse)
this.currentPumpUsesAutoPulse = false

if (autoProvision) {
if (this.runtimeEnv === "production") {
Expand All @@ -1508,7 +1644,11 @@ export class PathwaysBuilder<
this.logger.info("Auto-provisioning Flowcore resources for production startup", {
pathwayMode: this.pathwayMode,
})
await this.provision()
if (this.pathwayMode === "virtual") {
await this.provisionSharedResources()
} else {
await this.provision()
}
} else if (this.runtimeEnv === "development") {
this.logger.info("Auto-provisioning shared Flowcore resources for development startup")
await this.provisionSharedResources()
Expand All @@ -1517,21 +1657,7 @@ export class PathwaysBuilder<
}
}

// Auto-configure pulse when pathwayName was provisioned and no explicit pulse config
if (this.pathwayMode === "virtual" && this.pathwayId && !options.pulse) {
options = {
...options,
pulse: {
url: this.pulseUrl,
intervalMs: this.pulseIntervalMs,
pathwayId: this.pathwayId,
successLogLevel: this.logLevel.pulseSuccess,
failureLogLevel: this.logLevel.pulseFailure,
},
}
this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl })
} else if (options.pulse) {
// Respect explicit pulse config but inject log levels from builder config if not set
if (options.pulse) {
options = {
...options,
pulse: {
Expand All @@ -1540,6 +1666,16 @@ export class PathwaysBuilder<
failureLogLevel: options.pulse.failureLogLevel ?? this.logLevel.pulseFailure,
},
}
} else {
const autoPulse = this.buildAutoPulseConfig()
if (autoPulse) {
this.currentPumpUsesAutoPulse = true
this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl })
options = {
...options,
pulse: autoPulse,
}
}
}

// If cluster active and not leader, don't start pump
Expand Down Expand Up @@ -1575,50 +1711,11 @@ export class PathwaysBuilder<
return this.pathwayPump
}

// Collect registered pathways
const registrations = Object.keys(this.pathways).map((key) => {
const [flowType, eventType] = key.split("/")
return { flowType, eventType }
})

await this.pathwayPump.start(registrations)

this.logger.info("Pump started", {
pathways: registrations.length,
})

// Auto-start command poller for virtual pathways (poll CP for restart commands)
if (this.pathwayMode === "virtual" && this.pathwayId && !this.commandPoller) {
this.commandPoller = new CommandPoller({
cpBaseUrl: this.pulseUrl,
pathwayId: this.pathwayId,
apiKey: this.apiKey,
intervalMs: this.commandPollingIntervalMs,
logger: this.logger,
onCommand: async (cmd) => {
if (cmd.type === "datapumpRestart") {
const position = cmd.position
? {
timeBucket: (cmd.position as Record<string, string>).timeBucket ?? "",
eventId: (cmd.position as Record<string, string>).eventId,
}
: undefined
const flowTypes = cmd.sourceFlowTypes ?? undefined
await this.pathwayPump!.reset(position, flowTypes)
} else {
this.logger.warn("Unknown command type received", { type: cmd.type, commandId: cmd.id })
}
},
logLevel: {
pollSuccess: this.logLevel.pulseSuccess,
pollFailure: this.logLevel.pulseFailure,
},
})
this.commandPoller.start()
this.logger.info("Command poller started", {
pathwayId: this.pathwayId,
intervalMs: this.commandPollingIntervalMs,
})
try {
await this.bootstrapLeaderPump()
} catch (err) {
await this.stopPump()
throw err
}

return this.pathwayPump
Expand All @@ -1628,13 +1725,18 @@ export class PathwaysBuilder<
* Stop the data pump
*/
async stopPump(): Promise<void> {
if (this.commandPoller) {
this.commandPoller.stop()
this.commandPoller = null
this.stopCommandPoller()
if (!this.pathwayPump) {
this.currentPumpAutoProvision = false
this.currentPumpUsesExplicitPulse = false
this.currentPumpUsesAutoPulse = false
return
}
if (!this.pathwayPump) return
await this.pathwayPump.stop()
this.pathwayPump = null
this.currentPumpAutoProvision = false
this.currentPumpUsesExplicitPulse = false
this.currentPumpUsesAutoPulse = false
this.logger.info("Pump stopped")
}

Expand Down
34 changes: 33 additions & 1 deletion src/pathways/pump/pathway-pump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class PathwayPump {
private readonly bufferSize: number
private readonly maxRedeliveryCount: number
private readonly logger: Logger
private readonly pulseConfig?: {
private pulseConfig?: {
url: string
intervalMs?: number
pathwayId?: string
Expand Down Expand Up @@ -278,6 +278,38 @@ export class PathwayPump {
return resetFlowTypes
}

async setPulseConfig(pulseConfig: NonNullable<PathwayPumpOptions["pulse"]>): Promise<void> {
this.pulseConfig = pulseConfig

if (!this.running) {
return
}

const flowTypeGroups = [...this.flowTypeEventTypes.entries()]
const existingPumps = [...this.pumps.entries()]

for (const [flowType, pump] of existingPumps) {
try {
await pump.stop()
this.logger.info("Data pump stopped for pulse reconfiguration", { flowType })
} catch (err) {
this.logger.error(
`Error stopping pump for ${flowType} during pulse reconfiguration`,
err instanceof Error ? err : new Error(String(err)),
)
throw err
}
}

this.pumps.clear()
this.stateManagers.clear()
this.restartAttempts.clear()

for (const [flowType, eventTypes] of flowTypeGroups) {
await this.startPumpForFlowType(flowType, eventTypes)
}
}

get isRunning(): boolean {
return this.running
}
Expand Down
Loading
Loading