From 9cb75eab65c0af3a8c15a6cf32a74a2801b4a17e Mon Sep 17 00:00:00 2001 From: jbiskur Date: Fri, 10 Apr 2026 14:23:04 +0100 Subject: [PATCH 1/6] feat!: poll-based command queue for virtual pathway resets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace push-based HTTP callback reset with pull-based command polling. Virtual pathway instances now poll the CP every 5s for pending commands instead of exposing a public reset endpoint. New: - CommandPoller class polls GET /api/v1/pathways/:pathwayId/commands/pending - Reports status via POST /api/v1/pathways/:pathwayId/commands/:commandId/status - Phase flow: acknowledged → execute resetPump() → running (or failed) - Auto-starts when pathwayName is configured and pump starts - Configurable via commandPollingIntervalMs (default: 5000ms) Breaking changes: - advertisedUrl, resetSecret, resetPath constructor params deprecated (accepted but ignored) - pathwayName no longer requires advertisedUrl or resetSecret - processReset() on PathwayRouter deprecated (returns no-op with warning) - virtualConfig in provision() no longer sends resetUrl or authHeaders The library reuses its existing apiKey for polling authentication (tenant-mode FRN auth on the CP side). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pathways/builder.ts | 65 ++++++++---- src/pathways/command-poller.ts | 180 +++++++++++++++++++++++++++++++++ src/router/index.ts | 39 ++----- 3 files changed, 233 insertions(+), 51 deletions(-) create mode 100644 src/pathways/command-poller.ts diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index 642a656..ff5e24d 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -7,6 +7,7 @@ import type { FlowcoreEvent } from "../contracts/event.ts" import { InternalPathwayState } from "./internal-pathway.state.ts" import type { Logger } from "./logger.ts" import { NoopLogger } from "./logger.ts" +import { CommandPoller } from "./command-poller.ts" import type { EventMetadata, PathwayContract, @@ -298,16 +299,15 @@ export class PathwaysBuilder< // Virtual pathway auto-provisioning private readonly pathwayName?: string private readonly pathwayLabels: Record - private readonly advertisedUrl?: string - private readonly resetSecret?: string - private readonly resetPath: string private readonly pulseUrl: string private readonly pulseIntervalMs: number + private readonly commandPollingIntervalMs: number private pathwayId?: string - // Cluster + pump + // Cluster + pump + command poller private clusterManager: ClusterManager | null = null private pathwayPump: PathwayPump | null = null + private commandPoller: CommandPoller | null = null private clusterBypassProcess = false /** @@ -339,11 +339,12 @@ export class PathwaysBuilder< dataCoreDeleteProtection, pathwayName, pathwayLabels, - advertisedUrl, - resetSecret, - resetPath, + advertisedUrl: _advertisedUrl, + resetSecret: _resetSecret, + resetPath: _resetPath, pulseUrl, pulseIntervalMs, + commandPollingIntervalMs, }: { baseUrl: string tenant: string @@ -359,11 +360,15 @@ export class PathwaysBuilder< dataCoreDeleteProtection?: boolean pathwayName?: string pathwayLabels?: Record + /** @deprecated No longer used — virtual pathway commands are now poll-based */ advertisedUrl?: string + /** @deprecated No longer used — virtual pathway commands are now poll-based */ resetSecret?: string + /** @deprecated No longer used — virtual pathway commands are now poll-based */ resetPath?: string pulseUrl?: string pulseIntervalMs?: number + commandPollingIntervalMs?: number }) { // Initialize logger (use NoopLogger if none provided) this.logger = logger ?? new NoopLogger() @@ -389,21 +394,11 @@ export class PathwaysBuilder< this.dataCoreDeleteProtection = dataCoreDeleteProtection ?? false // Store virtual pathway auto-provisioning config - if (pathwayName) { - if (!advertisedUrl) { - throw new Error("advertisedUrl is required when pathwayName is set") - } - if (!resetSecret) { - throw new Error("resetSecret is required when pathwayName is set") - } - } this.pathwayName = pathwayName this.pathwayLabels = pathwayLabels ?? {} - this.advertisedUrl = advertisedUrl - this.resetSecret = resetSecret - this.resetPath = resetPath ?? "/reset" this.pulseUrl = pulseUrl ?? "https://data-pathways.api.flowcore.io" this.pulseIntervalMs = pulseIntervalMs ?? 30_000 + this.commandPollingIntervalMs = commandPollingIntervalMs ?? 5_000 if (enableSessionUserResolvers) { this.sessionUserResolvers = overrideSessionUserResolvers ?? new SessionUser() @@ -1316,8 +1311,6 @@ export class PathwaysBuilder< dataCore: this.dataCore, labels: this.pathwayLabels, virtualConfig: { - resetUrl: `${this.advertisedUrl}${this.resetPath}`, - authHeaders: { "x-pump-reset-secret": this.resetSecret }, flowTypes, }, }), @@ -1438,6 +1431,34 @@ export class PathwaysBuilder< pathways: registrations.length, }) + // Auto-start command poller for virtual pathways (poll CP for restart commands) + if (this.pathwayId && !this.commandPoller) { + this.commandPoller = new CommandPoller({ + cpBaseUrl: this.pulseUrl, + pathwayId: this.pathwayId, + apiKey: this.apiKey, + intervalMs: this.commandPollingIntervalMs, + logger: this.logger, + onCommand: async (cmd) => { + if (cmd.type === "datapumpRestart") { + const position = cmd.position + ? { timeBucket: (cmd.position as Record).timeBucket ?? "", eventId: (cmd.position as Record).eventId } + : undefined + const flowTypes = cmd.sourceFlowTypes ?? undefined + await this.pathwayPump!.reset(position, flowTypes) + } else { + this.logger.warn("Unknown command type received", { type: cmd.type, commandId: cmd.id }) + } + }, + logLevel: { + pollSuccess: this.logLevel.pulseSuccess, + pollFailure: this.logLevel.pulseFailure, + }, + }) + this.commandPoller.start() + this.logger.info("Command poller started", { pathwayId: this.pathwayId, intervalMs: this.commandPollingIntervalMs }) + } + return this.pathwayPump } @@ -1445,6 +1466,10 @@ export class PathwaysBuilder< * Stop the data pump */ async stopPump(): Promise { + if (this.commandPoller) { + this.commandPoller.stop() + this.commandPoller = null + } if (!this.pathwayPump) return await this.pathwayPump.stop() this.pathwayPump = null diff --git a/src/pathways/command-poller.ts b/src/pathways/command-poller.ts new file mode 100644 index 0000000..d718fce --- /dev/null +++ b/src/pathways/command-poller.ts @@ -0,0 +1,180 @@ +import type { Logger } from "./logger.ts" +import type { LogLevel } from "./builder.ts" + +export interface PendingCommand { + id: string + type: string + position: Record | null + sourceFlowTypes: string[] | null + reason: string | null + stopAt: string | null +} + +interface PendingCommandsResponse { + commands: PendingCommand[] +} + +export interface CommandPollerOptions { + cpBaseUrl: string + pathwayId: string + apiKey: string + intervalMs: number + logger: Logger + onCommand: (command: PendingCommand) => Promise + logLevel: { + pollSuccess: LogLevel + pollFailure: LogLevel + } +} + +function formatAuthHeader(apiKey: string): string { + return `ApiKey ${apiKey.startsWith("fc_") ? `${apiKey.split("_")[1]}:${apiKey}` : apiKey}` +} + +export class CommandPoller { + private readonly cpBaseUrl: string + private readonly pathwayId: string + private readonly apiKey: string + private readonly intervalMs: number + private readonly logger: Logger + private readonly onCommand: (command: PendingCommand) => Promise + private readonly logLevel: { pollSuccess: LogLevel; pollFailure: LogLevel } + private timer: ReturnType | null = null + private polling = false + + constructor(options: CommandPollerOptions) { + this.cpBaseUrl = options.cpBaseUrl + this.pathwayId = options.pathwayId + this.apiKey = options.apiKey + this.intervalMs = options.intervalMs + this.logger = options.logger + this.onCommand = options.onCommand + this.logLevel = options.logLevel + } + + start(): void { + if (this.timer) return + + this.logger.debug("Command poller starting", { + pathwayId: this.pathwayId, + intervalMs: this.intervalMs, + }) + + // Poll immediately on start, then on interval + this.poll() + this.timer = setInterval(() => this.poll(), this.intervalMs) + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer) + this.timer = null + } + this.logger.debug("Command poller stopped", { pathwayId: this.pathwayId }) + } + + private async poll(): Promise { + // Guard against overlapping polls + if (this.polling) return + this.polling = true + + try { + const url = `${this.cpBaseUrl}/api/v1/pathways/${this.pathwayId}/commands/pending` + const response = await fetch(url, { + method: "GET", + headers: { + Authorization: formatAuthHeader(this.apiKey), + }, + }) + + if (!response.ok) { + this.logger[this.logLevel.pollFailure]("Command poll failed", { + pathwayId: this.pathwayId, + status: response.status, + }) + return + } + + const body = (await response.json()) as PendingCommandsResponse + + if (body.commands.length === 0) { + this.logger[this.logLevel.pollSuccess]("Command poll: no pending commands", { + pathwayId: this.pathwayId, + }) + return + } + + this.logger.info("Command poll: received commands", { + pathwayId: this.pathwayId, + count: body.commands.length, + }) + + for (const command of body.commands) { + await this.handleCommand(command) + } + } catch (err) { + this.logger[this.logLevel.pollFailure]("Command poll error", { + pathwayId: this.pathwayId, + error: err instanceof Error ? err.message : String(err), + }) + } finally { + this.polling = false + } + } + + private async handleCommand(command: PendingCommand): Promise { + const statusUrl = `${this.cpBaseUrl}/api/v1/pathways/${this.pathwayId}/commands/${command.id}/status` + const headers = { + "Content-Type": "application/json", + Authorization: formatAuthHeader(this.apiKey), + } + + // Acknowledge receipt + try { + await fetch(statusUrl, { + method: "POST", + headers, + body: JSON.stringify({ phase: "acknowledged" }), + }) + } catch { + // Best-effort ack — continue with execution + } + + // Execute command + try { + await this.onCommand(command) + + // Report success + await fetch(statusUrl, { + method: "POST", + headers, + body: JSON.stringify({ phase: "running" }), + }) + + this.logger.info("Command executed successfully", { + commandId: command.id, + type: command.type, + pathwayId: this.pathwayId, + }) + } catch (err) { + const details = err instanceof Error ? err.message : String(err) + + // Report failure + try { + await fetch(statusUrl, { + method: "POST", + headers, + body: JSON.stringify({ phase: "failed", details }), + }) + } catch { + // Best-effort status report + } + + this.logger.error("Command execution failed", err instanceof Error ? err : new Error(details), { + commandId: command.id, + type: command.type, + pathwayId: this.pathwayId, + }) + } + } +} diff --git a/src/router/index.ts b/src/router/index.ts index 5c424d0..b8b2a33 100644 --- a/src/router/index.ts +++ b/src/router/index.ts @@ -187,44 +187,21 @@ export class PathwayRouter { } /** - * Process an incoming virtual reset callback from the Data Pathways CP. - * Validates the secret, resolves which pumps to reset, and calls resetPump(). - * - * @param body The reset callback body from the CP - * @param providedSecret The secret from the x-pump-reset-secret header - * @returns Result with success status and list of flow types that were reset + * @deprecated Use the poll-based command queue instead. Virtual pathway commands + * are now polled from the CP via GET /api/v1/pathways/:id/commands/pending. + * The PathwaysBuilder auto-starts a CommandPoller when pathwayName is configured. */ async processReset( - body: ResetCallbackBody, - providedSecret: string | null, + _body: ResetCallbackBody, + _providedSecret: string | null, ): Promise<{ success: boolean; flowTypesReset: string[] }> { - // Validate secret key (same pattern as processEvent) - if (!providedSecret || providedSecret !== this.secretKey) { - const errorMsg = "Invalid secret key" - this.logger.error(errorMsg, new Error(errorMsg)) - throw new Error(errorMsg) - } - - this.logger.debug("Processing reset request", { - pathwayId: body.pathwayId, - mode: body.mode, - flowTypes: body.flowTypes, - }) - - const position = body.position - ? { timeBucket: body.position.timeBucket ?? "", eventId: body.position.eventId } - : undefined - - const flowTypesReset = await this.pathways.resetPump(position, body.flowTypes) - - this.logger.info("Reset completed", { flowTypesReset }) - - return { success: true, flowTypesReset } + this.logger.warn("processReset is deprecated — virtual pathway commands are now poll-based. This method is a no-op.") + return { success: false, flowTypesReset: [] } } } /** - * Body shape for virtual reset callbacks from the Data Pathways CP. + * @deprecated No longer used — virtual pathway commands are now poll-based. */ export interface ResetCallbackBody { pathwayId: string From 8b60ba6e6bdef9ff8ede93f44f1c6cf29f0ec63c Mon Sep 17 00:00:00 2001 From: jbiskur Date: Fri, 10 Apr 2026 15:41:38 +0100 Subject: [PATCH 2/6] fix: remove async from deprecated processReset (deno require-await lint) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/router/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/router/index.ts b/src/router/index.ts index b8b2a33..6442d0c 100644 --- a/src/router/index.ts +++ b/src/router/index.ts @@ -191,10 +191,10 @@ export class PathwayRouter { * are now polled from the CP via GET /api/v1/pathways/:id/commands/pending. * The PathwaysBuilder auto-starts a CommandPoller when pathwayName is configured. */ - async processReset( + processReset( _body: ResetCallbackBody, _providedSecret: string | null, - ): Promise<{ success: boolean; flowTypesReset: string[] }> { + ): { success: boolean; flowTypesReset: string[] } { this.logger.warn("processReset is deprecated — virtual pathway commands are now poll-based. This method is a no-op.") return { success: false, flowTypesReset: [] } } From e30b1f7a49fbea0784860fae92a7344c634ff38a Mon Sep 17 00:00:00 2001 From: jbiskur Date: Fri, 10 Apr 2026 15:44:18 +0100 Subject: [PATCH 3/6] =?UTF-8?q?fix:=20update=20config=20tests=20=E2=80=94?= =?UTF-8?q?=20pathwayName=20no=20longer=20requires=20advertisedUrl/resetSe?= =?UTF-8?q?cret?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove assertThrows tests for the old validation that required advertisedUrl + resetSecret when pathwayName is set. These fields are now deprecated and ignored. Add test for the new commandPollingIntervalMs option and for pathwayName without any deprecated fields. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/pathway-builder-config.test.ts | 45 ++++++++++++---------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/tests/pathway-builder-config.test.ts b/tests/pathway-builder-config.test.ts index 21b2c12..8589ce3 100644 --- a/tests/pathway-builder-config.test.ts +++ b/tests/pathway-builder-config.test.ts @@ -1,4 +1,4 @@ -import { assertEquals, assertThrows } from "https://deno.land/std@0.224.0/assert/mod.ts" +import { assertEquals } from "https://deno.land/std@0.224.0/assert/mod.ts" import { PathwaysBuilder } from "../src/pathways/builder.ts" const baseOpts = { @@ -14,43 +14,36 @@ Deno.test({ sanitizeOps: false, fn: async (t) => { await t.step("should accept all new fields without error", () => { + const builder = new PathwaysBuilder({ + ...baseOpts, + pathwayName: "my-service", + pulseUrl: "https://custom-cp.example.com", + pulseIntervalMs: 15000, + commandPollingIntervalMs: 3000, + }) + + assertEquals(typeof builder, "object") + }) + + await t.step("should accept deprecated fields without error (backward compat)", () => { const builder = new PathwaysBuilder({ ...baseOpts, pathwayName: "my-service", advertisedUrl: "https://my-service.example.com", resetSecret: "my-reset-secret", resetPath: "/admin/reset", - pulseUrl: "https://custom-cp.example.com/api/v1/pump-pulse", - pulseIntervalMs: 15000, }) assertEquals(typeof builder, "object") }) - await t.step("should throw when pathwayName set but advertisedUrl missing", () => { - assertThrows( - () => - new PathwaysBuilder({ - ...baseOpts, - pathwayName: "my-service", - resetSecret: "my-reset-secret", - }), - Error, - "advertisedUrl is required when pathwayName is set", - ) - }) + await t.step("should accept pathwayName without advertisedUrl or resetSecret", () => { + const builder = new PathwaysBuilder({ + ...baseOpts, + pathwayName: "my-service", + }) - await t.step("should throw when pathwayName set but resetSecret missing", () => { - assertThrows( - () => - new PathwaysBuilder({ - ...baseOpts, - pathwayName: "my-service", - advertisedUrl: "https://my-service.example.com", - }), - Error, - "resetSecret is required when pathwayName is set", - ) + assertEquals(typeof builder, "object") }) await t.step("should work without pathwayName (backward compat)", () => { From 302ce2c7795dc7ca1487dc5a045ca9327ce31156 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Mon, 13 Apr 2026 15:08:08 +0100 Subject: [PATCH 4/6] feat!: make auto-provision runtime-aware --- README.md | 47 ++++ deno.lock | 3 +- src/pathways/builder.ts | 354 +++++++++++++++++++-------- src/pathways/pump/types.ts | 2 +- tests/pathway-builder-config.test.ts | 10 + tests/pump-runtime-behavior.test.ts | 241 ++++++++++++++++++ 6 files changed, 559 insertions(+), 98 deletions(-) create mode 100644 tests/pump-runtime-behavior.test.ts diff --git a/README.md b/README.md index 15b1f41..410abd9 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Pathways helps you build event-driven applications with type-safe pathways for p - [HTTP Server Integration](#http-server-integration) - [Persistence Options](#persistence-options) - [Advanced Usage](#advanced-usage) + - [Runtime Defaults and Auto-Provisioning](#runtime-defaults-and-auto-provisioning) - [Auditing](#auditing) - [Custom Loggers](#custom-loggers) - [Retry Mechanisms](#retry-mechanisms) @@ -128,6 +129,52 @@ const pathways = new PathwaysBuilder({ }) ``` +### Runtime Defaults and Auto-Provisioning + +`PathwaysBuilder` can now drive different startup behavior for development and production: + +- `development`: starts the local in-process pump and only provisions shared Flowcore resources such as the data core, + flow types, and event types +- `production + virtual`: requires cluster mode and auto-provisions a virtual pathway by name +- `production + managed`: auto-provisions a managed pathway by name and does not start a local pump + +```typescript +const pathways = new PathwaysBuilder({ + baseUrl: "https://api.flowcore.io", + tenant: "your-tenant", + dataCore: "your-data-core", + apiKey: process.env.FLOWCORE_API_KEY!, + runtimeEnv: process.env.NODE_ENV === "production" ? "production" : "development", + pathwayName: "orders-service", + pathwayMode: "virtual", // default + defaultAutoProvision: true, // default +}) +``` + +For managed production delivery, provide a transform endpoint and leave event fetching to the control plane: + +```typescript +const pathways = new PathwaysBuilder({ + baseUrl: "https://api.flowcore.io", + tenant: "your-tenant", + dataCore: "your-data-core", + apiKey: process.env.FLOWCORE_API_KEY!, + runtimeEnv: "production", + pathwayName: "orders-service", + pathwayMode: "managed", + managedConfig: { + endpointUrl: "https://app.example.com/api/flowcore", + authHeaders: { + authorization: `Bearer ${process.env.TRANSFORM_TOKEN!}`, + }, + sizeClass: "medium", + }, +}) +``` + +To disable all remote provisioning and keep startup fully manual, set `defaultAutoProvision: false` or pass +`autoProvision: false` to `startPump()`. + ### Registering Pathways Register pathways with their schemas for type-safe event handling: diff --git a/deno.lock b/deno.lock index 08624db..f835212 100644 --- a/deno.lock +++ b/deno.lock @@ -399,7 +399,8 @@ "https://deno.land/std@0.224.0/internal/format.ts": "0a98ee226fd3d43450245b1844b47003419d34d210fa989900861c79820d21c2", "https://deno.land/std@0.224.0/internal/mod.ts": "534125398c8e7426183e12dc255bb635d94e06d0f93c60a297723abe69d3b22e", "https://deno.land/std@0.224.0/testing/_test_suite.ts": "f10a8a6338b60c403f07a76f3f46bdc9f1e1a820c0a1decddeb2949f7a8a0546", - "https://deno.land/std@0.224.0/testing/bdd.ts": "3e4de4ff6d8f348b5574661cef9501b442046a59079e201b849d0e74120d476b" + "https://deno.land/std@0.224.0/testing/bdd.ts": "3e4de4ff6d8f348b5574661cef9501b442046a59079e201b849d0e74120d476b", + "https://deno.land/std@0.224.0/testing/mock.ts": "a963181c2860b6ba3eb60e08b62c164d33cf5da7cd445893499b2efda20074db" }, "workspace": { "dependencies": [ diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index ff5e24d..c2e6485 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -23,7 +23,7 @@ import type { PathwayClusterOptions } from "./cluster/types.ts" import { ClusterManager } from "./cluster/cluster-manager.ts" import type { PathwayPumpOptions, PumpState } from "./pump/types.ts" import { PathwayPump } from "./pump/pathway-pump.ts" -import { PathwayProvisioner } from "./provisioner.ts" +import { PathwayProvisioner, type ProvisionerRegistration } from "./provisioner.ts" import { AUDIT_ENTITY_ID, AUDIT_ENTITY_TYPE, @@ -58,6 +58,17 @@ const DEFAULT_RETRY_DELAY_MS = 500 */ const DEFAULT_SESSION_USER_RESOLVER_TTL_MS = 10 * 1000 +function normalizeRuntimeEnv(runtimeEnv?: string): PathwayRuntimeEnv { + switch (runtimeEnv) { + case "development": + case "production": + case "test": + return runtimeEnv + default: + return "development" + } +} + /** * Defines the mode for auditing pathway operations * - "user": Normal user-initiated operations @@ -129,6 +140,46 @@ export type LogLevelConfig = { */ type InternalLogLevelConfig = Required +export type PathwayRuntimeEnv = "development" | "production" | "test" + +export type PathwayMode = "virtual" | "managed" + +export interface ManagedPathwayConfig { + endpointUrl: string + authHeaders?: Record + sizeClass?: "small" | "medium" | "high" +} + +export interface PathwaysBuilderConfig { + baseUrl: string + tenant: string + dataCore: string + apiKey: string + pathwayTimeoutMs?: number + logger?: Logger + enableSessionUserResolvers?: boolean + overrideSessionUserResolvers?: SessionUserResolver + logLevel?: LogLevelConfig + dataCoreDescription?: string + dataCoreAccessControl?: string + dataCoreDeleteProtection?: boolean + pathwayName?: string + pathwayLabels?: Record + /** @deprecated No longer used — virtual pathway commands are now poll-based */ + advertisedUrl?: string + /** @deprecated No longer used — virtual pathway commands are now poll-based */ + resetSecret?: string + /** @deprecated No longer used — virtual pathway commands are now poll-based */ + resetPath?: string + pulseUrl?: string + pulseIntervalMs?: number + commandPollingIntervalMs?: number + runtimeEnv?: PathwayRuntimeEnv + pathwayMode?: PathwayMode + defaultAutoProvision?: boolean + managedConfig?: ManagedPathwayConfig +} + /** * SessionUserResolver is a key-value store for storing and retrieving UserIdResolver functions * with a TTL (time to live). @@ -302,6 +353,10 @@ export class PathwaysBuilder< private readonly pulseUrl: string private readonly pulseIntervalMs: number private readonly commandPollingIntervalMs: number + private readonly runtimeEnv: PathwayRuntimeEnv + private readonly pathwayMode: PathwayMode + private readonly defaultAutoProvision: boolean + private readonly managedConfig?: ManagedPathwayConfig private pathwayId?: string // Cluster + pump + command poller @@ -345,31 +400,11 @@ export class PathwaysBuilder< pulseUrl, pulseIntervalMs, commandPollingIntervalMs, - }: { - baseUrl: string - tenant: string - dataCore: string - apiKey: string - pathwayTimeoutMs?: number - logger?: Logger - enableSessionUserResolvers?: boolean - overrideSessionUserResolvers?: SessionUserResolver - logLevel?: LogLevelConfig - dataCoreDescription?: string - dataCoreAccessControl?: string - dataCoreDeleteProtection?: boolean - pathwayName?: string - pathwayLabels?: Record - /** @deprecated No longer used — virtual pathway commands are now poll-based */ - advertisedUrl?: string - /** @deprecated No longer used — virtual pathway commands are now poll-based */ - resetSecret?: string - /** @deprecated No longer used — virtual pathway commands are now poll-based */ - resetPath?: string - pulseUrl?: string - pulseIntervalMs?: number - commandPollingIntervalMs?: number - }) { + runtimeEnv, + pathwayMode, + defaultAutoProvision, + managedConfig, + }: PathwaysBuilderConfig) { // Initialize logger (use NoopLogger if none provided) this.logger = logger ?? new NoopLogger() @@ -399,6 +434,10 @@ export class PathwaysBuilder< this.pulseUrl = pulseUrl ?? "https://data-pathways.api.flowcore.io" this.pulseIntervalMs = pulseIntervalMs ?? 30_000 this.commandPollingIntervalMs = commandPollingIntervalMs ?? 5_000 + this.runtimeEnv = normalizeRuntimeEnv(runtimeEnv ?? process.env.NODE_ENV) + this.pathwayMode = pathwayMode ?? "virtual" + this.defaultAutoProvision = defaultAutoProvision ?? true + this.managedConfig = managedConfig if (enableSessionUserResolvers) { this.sessionUserResolvers = overrideSessionUserResolvers ?? new SessionUser() @@ -409,6 +448,9 @@ export class PathwaysBuilder< tenant, dataCore, pathwayTimeoutMs, + runtimeEnv: this.runtimeEnv, + pathwayMode: this.pathwayMode, + defaultAutoProvision: this.defaultAutoProvision, }) this.webhookBuilderFactory = new WebhookBuilder({ @@ -1260,14 +1302,8 @@ export class PathwaysBuilder< return this.clusterManager !== null && this.clusterManager.isRunning } - /** - * Provision Flowcore infrastructure (data core, flow types, event types). - * Creates missing resources when descriptions are provided, updates descriptions - * when they differ. Fails if a resource is missing and no description is provided. - * Additive only — never deletes. - */ - async provision(): Promise { - const registrations = Object.keys(this.pathways).map((key) => { + private buildRegistrations(): ProvisionerRegistration[] { + return Object.keys(this.pathways).map((key) => { const [flowType, eventType] = key.split("/") return { flowType, @@ -1276,6 +1312,10 @@ export class PathwaysBuilder< eventTypeDescription: this.eventTypeDescriptions.get(key), } }) + } + + private async provisionSharedResources(): Promise { + const registrations = this.buildRegistrations() const provisioner = new PathwayProvisioner({ tenant: this.tenant, @@ -1289,66 +1329,163 @@ export class PathwaysBuilder< }) await provisioner.provision() + return registrations + } - // Step 4: Register virtual pathway with CP (when pathwayName is set) - if (this.pathwayName) { - const flowTypes = [...new Set(registrations.map((r) => r.flowType))] - const cpBaseUrl = this.pulseUrl - const url = `${cpBaseUrl}/api/v1/pathways/by-name/${encodeURIComponent(this.pathwayName)}` + private getPathwayProvisionAuthHeader(): string { + const apiKey = this.apiKey.startsWith("fc_") ? `${this.apiKey.split("_")[1]}:${this.apiKey}` : this.apiKey - let response: Response - try { - response = await fetch(url, { - method: "PUT", - headers: { - "Content-Type": "application/json", - Authorization: `ApiKey ${ - this.apiKey.startsWith("fc_") ? `${this.apiKey.split("_")[1]}:${this.apiKey}` : this.apiKey - }`, - }, - body: JSON.stringify({ - tenant: this.tenant, - dataCore: this.dataCore, - labels: this.pathwayLabels, - virtualConfig: { - flowTypes, - }, - }), - }) - } catch (err) { - const msg = err instanceof Error ? err.message : String(err) - this.logger[this.logLevel.provisionFailure]("Virtual pathway registration failed", { - pathwayName: this.pathwayName, - url, - error: msg, - phase: "network", - }) - throw new Error(`Failed to register virtual pathway "${this.pathwayName}": ${msg}`) - } + return `ApiKey ${apiKey}` + } - if (!response.ok) { - const text = await response.text().catch(() => "") - this.logger[this.logLevel.provisionFailure]("Virtual pathway registration failed", { - pathwayName: this.pathwayName, - url, - status: response.status, - body: text, - phase: "response", - }) - throw new Error( - `Failed to register virtual pathway "${this.pathwayName}": ${response.status} ${text}`, - ) - } + private ensurePathwayName(reason: string): string { + if (!this.pathwayName) { + throw new Error(reason) + } + return this.pathwayName + } - const result = await response.json() as { pathwayId: string; status: string } - this.pathwayId = result.pathwayId - this.logger[this.logLevel.provisionSuccess]("Virtual pathway registered", { - pathwayName: this.pathwayName, - pathwayId: this.pathwayId, - status: result.status, - flowTypes, + private buildManagedPathwayConfig(registrations: ProvisionerRegistration[]): { + sizeClass: "small" | "medium" | "high" + config: { + sources: Array<{ + flowType: string + eventTypes: string[] + endpoints: Array<{ + url: string + authHeaders: Record + }> + }> + } + } { + if (!this.managedConfig?.endpointUrl) { + throw new Error( + "managedConfig.endpointUrl is required when provisioning a managed pathway", + ) + } + + const grouped = new Map>() + for (const registration of registrations) { + const eventTypes = grouped.get(registration.flowType) ?? new Set() + eventTypes.add(registration.eventType) + grouped.set(registration.flowType, eventTypes) + } + + return { + sizeClass: this.managedConfig.sizeClass ?? "small", + config: { + sources: [...grouped.entries()].map(([flowType, eventTypes]) => ({ + flowType, + eventTypes: [...eventTypes], + endpoints: [{ + url: this.managedConfig!.endpointUrl, + authHeaders: this.managedConfig!.authHeaders ?? {}, + }], + })), + }, + } + } + + private async upsertPathwayByName( + type: PathwayMode, + body: Record, + logMeta: Record, + ): Promise { + const pathwayName = this.ensurePathwayName( + `pathwayName is required when provisioning a ${type} pathway`, + ) + const url = `${this.pulseUrl}/api/v1/pathways/by-name/${encodeURIComponent(pathwayName)}` + + let response: Response + try { + response = await fetch(url, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Authorization: this.getPathwayProvisionAuthHeader(), + }, + body: JSON.stringify(body), }) + } catch (err) { + const msg = err instanceof Error ? err.message : String(err) + this.logger[this.logLevel.provisionFailure](`${type} pathway registration failed`, { + pathwayName, + url, + error: msg, + phase: "network", + }) + throw new Error(`Failed to register ${type} pathway "${pathwayName}": ${msg}`) + } + + if (!response.ok) { + const text = await response.text().catch(() => "") + this.logger[this.logLevel.provisionFailure](`${type} pathway registration failed`, { + pathwayName, + url, + status: response.status, + body: text, + phase: "response", + }) + throw new Error( + `Failed to register ${type} pathway "${pathwayName}": ${response.status} ${text}`, + ) + } + + const result = await response.json() as { pathwayId: string; status: string } + this.pathwayId = result.pathwayId + this.logger[this.logLevel.provisionSuccess](`${type} pathway registered`, { + pathwayName, + pathwayId: this.pathwayId, + status: result.status, + ...logMeta, + }) + } + + private async registerPathwayInstance(registrations: ProvisionerRegistration[]): Promise { + if (this.pathwayMode === "managed") { + const { sizeClass, config } = this.buildManagedPathwayConfig(registrations) + await this.upsertPathwayByName("managed", { + tenant: this.tenant, + dataCore: this.dataCore, + labels: this.pathwayLabels, + sizeClass, + enabled: true, + type: "managed", + config, + }, { + sizeClass, + sourceCount: config.sources.length, + }) + return + } + + if (!this.pathwayName) { + return } + + const flowTypes = [...new Set(registrations.map((registration) => registration.flowType))] + await this.upsertPathwayByName("virtual", { + tenant: this.tenant, + dataCore: this.dataCore, + labels: this.pathwayLabels, + type: "virtual", + virtualConfig: { + flowTypes, + }, + }, { + flowTypes, + }) + } + + /** + * Provision Flowcore infrastructure (data core, flow types, event types). + * Creates missing resources when descriptions are provided, updates descriptions + * when they differ. Fails if a resource is missing and no description is provided. + * Additive only — never deletes. + */ + async provision(): Promise { + const registrations = await this.provisionSharedResources() + await this.registerPathwayInstance(registrations) } /** @@ -1360,14 +1497,28 @@ export class PathwaysBuilder< throw new Error("Pump already started") } - // Auto-provision infrastructure if requested - if (options.autoProvision) { - this.logger.info("Auto-provisioning Flowcore infrastructure before starting pump") - await this.provision() + const autoProvision = options.autoProvision ?? this.defaultAutoProvision + + if (autoProvision) { + if (this.runtimeEnv === "production") { + if (this.pathwayMode === "virtual" && !this.isClusterActive) { + throw new Error("Cluster mode must be started before production virtual pump startup") + } + + this.logger.info("Auto-provisioning Flowcore resources for production startup", { + pathwayMode: this.pathwayMode, + }) + await this.provision() + } else if (this.runtimeEnv === "development") { + this.logger.info("Auto-provisioning shared Flowcore resources for development startup") + await this.provisionSharedResources() + } else { + this.logger.info("Skipping remote auto-provisioning in test runtime") + } } // Auto-configure pulse when pathwayName was provisioned and no explicit pulse config - if (this.pathwayId && !options.pulse) { + if (this.pathwayMode === "virtual" && this.pathwayId && !options.pulse) { options = { ...options, pulse: { @@ -1419,6 +1570,11 @@ export class PathwaysBuilder< }, }) + if (this.runtimeEnv === "production" && this.pathwayMode === "managed") { + this.logger.info("Not starting local pump — production managed pathways rely on control-plane delivery") + return this.pathwayPump + } + // Collect registered pathways const registrations = Object.keys(this.pathways).map((key) => { const [flowType, eventType] = key.split("/") @@ -1432,7 +1588,7 @@ export class PathwaysBuilder< }) // Auto-start command poller for virtual pathways (poll CP for restart commands) - if (this.pathwayId && !this.commandPoller) { + if (this.pathwayMode === "virtual" && this.pathwayId && !this.commandPoller) { this.commandPoller = new CommandPoller({ cpBaseUrl: this.pulseUrl, pathwayId: this.pathwayId, @@ -1442,7 +1598,10 @@ export class PathwaysBuilder< onCommand: async (cmd) => { if (cmd.type === "datapumpRestart") { const position = cmd.position - ? { timeBucket: (cmd.position as Record).timeBucket ?? "", eventId: (cmd.position as Record).eventId } + ? { + timeBucket: (cmd.position as Record).timeBucket ?? "", + eventId: (cmd.position as Record).eventId, + } : undefined const flowTypes = cmd.sourceFlowTypes ?? undefined await this.pathwayPump!.reset(position, flowTypes) @@ -1456,7 +1615,10 @@ export class PathwaysBuilder< }, }) this.commandPoller.start() - this.logger.info("Command poller started", { pathwayId: this.pathwayId, intervalMs: this.commandPollingIntervalMs }) + this.logger.info("Command poller started", { + pathwayId: this.pathwayId, + intervalMs: this.commandPollingIntervalMs, + }) } return this.pathwayPump diff --git a/src/pathways/pump/types.ts b/src/pathways/pump/types.ts index 928a0d7..8b1f1f2 100644 --- a/src/pathways/pump/types.ts +++ b/src/pathways/pump/types.ts @@ -8,7 +8,7 @@ export interface PathwayPumpOptions { notifier?: PumpNotifierConfig bufferSize?: number maxRedeliveryCount?: number - /** If true, calls provision() before starting the pump */ + /** If true, applies the builder's environment-aware provisioning rules before startup */ autoProvision?: boolean /** Optional pulse reporting to control plane */ pulse?: { diff --git a/tests/pathway-builder-config.test.ts b/tests/pathway-builder-config.test.ts index 8589ce3..7a1363f 100644 --- a/tests/pathway-builder-config.test.ts +++ b/tests/pathway-builder-config.test.ts @@ -20,6 +20,16 @@ Deno.test({ pulseUrl: "https://custom-cp.example.com", pulseIntervalMs: 15000, commandPollingIntervalMs: 3000, + runtimeEnv: "production", + pathwayMode: "managed", + defaultAutoProvision: false, + managedConfig: { + endpointUrl: "https://app.example.com/flowcore", + authHeaders: { + "x-service-token": "secret", + }, + sizeClass: "medium", + }, }) assertEquals(typeof builder, "object") diff --git a/tests/pump-runtime-behavior.test.ts b/tests/pump-runtime-behavior.test.ts new file mode 100644 index 0000000..65cebbb --- /dev/null +++ b/tests/pump-runtime-behavior.test.ts @@ -0,0 +1,241 @@ +import { assertEquals, assertRejects } from "https://deno.land/std@0.224.0/assert/mod.ts" +import { stub } from "https://deno.land/std@0.224.0/testing/mock.ts" +import { z } from "zod" +import { CommandPoller } from "../src/pathways/command-poller.ts" +import { type PathwaysBuilderConfig, PathwaysBuilder } from "../src/pathways/builder.ts" +import { PathwayPump } from "../src/pathways/pump/pathway-pump.ts" +import { PathwayProvisioner } from "../src/pathways/provisioner.ts" + +const baseOpts = { + baseUrl: "https://api.flowcore.io", + tenant: "test-tenant", + dataCore: "test-dc", + apiKey: "fc_testid_testsecret", + dataCoreDescription: "Test data core", +} + +function createBuilder(overrides: Partial = {}) { + return new PathwaysBuilder({ + ...baseOpts, + ...overrides, + }).register({ + flowType: "user.0", + eventType: "created.0", + schema: z.object({ + id: z.string(), + }), + flowTypeDescription: "User events", + description: "User created", + }) +} + +function createPumpOptions() { + return { + stateManagerFactory: () => ({ + getState: () => null, + setState: () => {}, + }), + notifier: { type: "poller" as const, pollerIntervalMs: 1000 }, + } +} + +Deno.test({ + name: "PathwaysBuilder startPump runtime behavior", + sanitizeResources: false, + sanitizeOps: false, + fn: async (t) => { + await t.step("development auto-provision provisions shared resources only", async () => { + const provisionCalls: string[] = [] + const startCalls: number[] = [] + const fetchCalls: RequestInit[] = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls.push("shared") + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls.push(1) + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchCalls.push(init ?? {}) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "development", + pathwayName: "dev-service", + }) + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls.length, 1) + assertEquals(startCalls.length, 1) + assertEquals(fetchCalls.length, 0) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) + + await t.step("defaultAutoProvision=false skips all remote provisioning", async () => { + let provisionCalls = 0 + let startCalls = 0 + let fetchCalls = 0 + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const fetchStub = stub(globalThis, "fetch", async () => { + fetchCalls++ + return new Response("{}", { status: 200 }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "development", + pathwayName: "dev-service", + defaultAutoProvision: false, + }) + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 0) + assertEquals(fetchCalls, 0) + assertEquals(startCalls, 1) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) + + await t.step("production virtual mode requires an active cluster before startup", async () => { + let provisionCalls = 0 + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "virtual-service", + }) + + await assertRejects( + () => builder.startPump(createPumpOptions()), + Error, + "Cluster mode must be started before production virtual pump startup", + ) + + assertEquals(provisionCalls, 0) + } finally { + provisionStub.restore() + } + }) + + await t.step( + "production virtual mode provisions pathway resources and starts the local pump on the leader", + async () => { + let provisionCalls = 0 + let startCalls = 0 + let commandPollerStarts = 0 + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const pollerStub = stub(CommandPoller.prototype, "start", () => { + commandPollerStarts++ + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "virtual-service", + pathwayMode: "virtual", + }) + ;(builder as unknown as { clusterManager: { isRunning: boolean; isLeader: boolean } }).clusterManager = { + isRunning: true, + isLeader: true, + } + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 1) + assertEquals(commandPollerStarts, 1) + assertEquals(fetchBodies.length, 1) + assertEquals(fetchBodies[0].type, "virtual") + } finally { + provisionStub.restore() + startStub.restore() + pollerStub.restore() + fetchStub.restore() + } + }, + ) + + await t.step("production managed mode provisions a managed pathway and does not start a local pump", async () => { + let provisionCalls = 0 + let startCalls = 0 + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayMode: "managed", + pathwayName: "managed-service", + managedConfig: { + endpointUrl: "https://app.example.com/flowcore", + authHeaders: { authorization: "Bearer secret" }, + sizeClass: "medium", + }, + }) + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 0) + assertEquals(fetchBodies.length, 1) + assertEquals(fetchBodies[0].type, "managed") + assertEquals((fetchBodies[0].config as { sources: unknown[] }).sources.length, 1) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) + }, +}) From 3b19db86c99d37eae87d4b4fb71699f6ab3b6f08 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Mon, 13 Apr 2026 15:14:35 +0100 Subject: [PATCH 5/6] fix: replace broken jq github action --- .github/workflows/build.yml | 5 ++--- .github/workflows/release-please.yml | 5 ++--- .github/workflows/validate.yml | 5 ++--- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c2229ac..92354f7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -18,10 +18,9 @@ jobs: token: ${{ secrets.FLOWCORE_MACHINE_GITHUB_TOKEN }} submodules: true - name: Extract version from deno.json - uses: sergeysova/jq-action@v2 id: version - with: - cmd: "jq .version deno.json -r" + run: | + echo "value=$(python3 -c 'import json; print(json.load(open("deno.json"))["version"])')" >> "$GITHUB_OUTPUT" - name: Show my version run: 'echo "version ${{ steps.version.outputs.value }}"' diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index a08dff5..8558249 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -13,10 +13,9 @@ jobs: steps: - uses: actions/checkout@v3 - name: Extract package name from deno.json - uses: sergeysova/jq-action@v2 id: package - with: - cmd: "jq .name deno.json -r" + run: | + echo "value=$(python3 -c 'import json; print(json.load(open("deno.json"))["name"])')" >> "$GITHUB_OUTPUT" - name: Setup Deno2 environment uses: denoland/setup-deno@v2 with: diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 9adf630..02ec291 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -41,10 +41,9 @@ jobs: token: ${{ secrets.FLOWCORE_MACHINE_GITHUB_TOKEN }} submodules: true - name: Extract version from deno.json - uses: sergeysova/jq-action@v2 id: version - with: - cmd: "jq .version deno.json -r" + run: | + echo "value=$(python3 -c 'import json; print(json.load(open("deno.json"))["version"])')" >> "$GITHUB_OUTPUT" - name: Show my version run: 'echo "version ${{ steps.version.outputs.value }}"' From ab0fd224e5aea75ea8f89e3b1a8f75f8aa218cd2 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Mon, 13 Apr 2026 22:33:54 +0100 Subject: [PATCH 6/6] fix: avoid virtual auto-provision startup deadlock --- src/pathways/builder.ts | 272 +++++++++++++++++++--------- src/pathways/pump/pathway-pump.ts | 34 +++- tests/pathway-pump.test.ts | 71 ++++++++ tests/pump-runtime-behavior.test.ts | 179 +++++++++++++++++- 4 files changed, 467 insertions(+), 89 deletions(-) diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index c2e6485..d2336e2 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -364,6 +364,9 @@ export class PathwaysBuilder< private pathwayPump: PathwayPump | null = null private commandPoller: CommandPoller | null = null private clusterBypassProcess = false + private currentPumpAutoProvision = false + private currentPumpUsesExplicitPulse = false + private currentPumpUsesAutoPulse = false /** * Creates a new PathwaysBuilder instance @@ -1244,27 +1247,14 @@ export class PathwaysBuilder< // Listen for leadership changes to auto-start/stop the pump this.clusterManager.onLeadershipChange((isLeader: boolean) => { - if (isLeader && this.pathwayPump && !this.pathwayPump.isRunning) { - this.logger.info("Became leader, starting pump") - const registrations = Object.keys(this.pathways).map((key) => { - const [flowType, eventType] = key.split("/") - return { flowType, eventType } - }) - this.pathwayPump.start(registrations).catch((err) => { - this.logger.error( - "Failed to start pump after becoming leader", - err instanceof Error ? err : new Error(String(err)), - ) - }) - } else if (!isLeader && this.pathwayPump?.isRunning) { - this.logger.info("Lost leadership, stopping pump") - this.pathwayPump.stop().catch((err) => { - this.logger.error( - "Failed to stop pump after losing leadership", - err instanceof Error ? err : new Error(String(err)), - ) - }) - } + this.handleLeadershipChange(isLeader).catch((err) => { + this.logger.error( + isLeader + ? "Failed to bootstrap leader runtime after becoming leader" + : "Failed to stop leader runtime after losing leadership", + err instanceof Error ? err : new Error(String(err)), + ) + }) }) // Wire reset handler: leader receives reset requests and delegates to pump @@ -1302,6 +1292,149 @@ export class PathwaysBuilder< return this.clusterManager !== null && this.clusterManager.isRunning } + private buildAutoPulseConfig(): NonNullable | null { + if (this.pathwayMode !== "virtual" || !this.pathwayId) { + return null + } + + return { + url: this.pulseUrl, + intervalMs: this.pulseIntervalMs, + pathwayId: this.pathwayId, + successLogLevel: this.logLevel.pulseSuccess, + failureLogLevel: this.logLevel.pulseFailure, + } + } + + private async applyAutoPulseConfig(): Promise { + if (!this.pathwayPump || this.currentPumpUsesExplicitPulse || this.currentPumpUsesAutoPulse) { + return + } + + const pulse = this.buildAutoPulseConfig() + if (!pulse) { + return + } + + await this.pathwayPump.setPulseConfig(pulse) + this.currentPumpUsesAutoPulse = true + this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl }) + } + + private startCommandPollerIfNeeded(): void { + if ( + this.pathwayMode !== "virtual" || !this.pathwayId || !this.pathwayPump?.isRunning || + this.commandPoller + ) { + return + } + + this.commandPoller = new CommandPoller({ + cpBaseUrl: this.pulseUrl, + pathwayId: this.pathwayId, + apiKey: this.apiKey, + intervalMs: this.commandPollingIntervalMs, + logger: this.logger, + onCommand: async (cmd) => { + if (cmd.type === "datapumpRestart") { + const position = cmd.position + ? { + timeBucket: (cmd.position as Record).timeBucket ?? "", + eventId: (cmd.position as Record).eventId, + } + : undefined + const flowTypes = cmd.sourceFlowTypes ?? undefined + await this.pathwayPump!.reset(position, flowTypes) + } else { + this.logger.warn("Unknown command type received", { type: cmd.type, commandId: cmd.id }) + } + }, + logLevel: { + pollSuccess: this.logLevel.pulseSuccess, + pollFailure: this.logLevel.pulseFailure, + }, + }) + this.commandPoller.start() + this.logger.info("Command poller started", { + pathwayId: this.pathwayId, + intervalMs: this.commandPollingIntervalMs, + }) + } + + private stopCommandPoller(): void { + if (!this.commandPoller) { + return + } + + this.commandPoller.stop() + this.commandPoller = null + } + + private async startCurrentPump(): Promise { + if (!this.pathwayPump || this.pathwayPump.isRunning) { + return + } + + const registrations = this.buildRegistrations() + await this.pathwayPump.start(registrations) + + this.logger.info("Pump started", { + pathways: registrations.length, + }) + } + + private async stopLeaderRuntime(): Promise { + this.stopCommandPoller() + + if (!this.pathwayPump?.isRunning) { + return + } + + await this.pathwayPump.stop() + this.logger.info("Pump stopped") + } + + private async bootstrapLeaderPump(): Promise { + if (!this.pathwayPump) { + return + } + + await this.applyAutoPulseConfig() + await this.startCurrentPump() + + if (this.runtimeEnv !== "production" || this.pathwayMode !== "virtual") { + this.startCommandPollerIfNeeded() + return + } + + if (this.currentPumpAutoProvision && !this.pathwayId) { + try { + await this.registerPathwayInstance(this.buildRegistrations()) + await this.applyAutoPulseConfig() + } catch (err) { + await this.stopLeaderRuntime() + throw err + } + } + + this.startCommandPollerIfNeeded() + } + + private async handleLeadershipChange(isLeader: boolean): Promise { + if (isLeader) { + if (!this.pathwayPump) { + return + } + + this.logger.info("Became leader, bootstrapping pump") + await this.bootstrapLeaderPump() + return + } + + this.logger.info("Lost leadership, stopping leader runtime") + await this.stopLeaderRuntime() + } + private buildRegistrations(): ProvisionerRegistration[] { return Object.keys(this.pathways).map((key) => { const [flowType, eventType] = key.split("/") @@ -1498,6 +1631,9 @@ export class PathwaysBuilder< } const autoProvision = options.autoProvision ?? this.defaultAutoProvision + this.currentPumpAutoProvision = autoProvision + this.currentPumpUsesExplicitPulse = Boolean(options.pulse) + this.currentPumpUsesAutoPulse = false if (autoProvision) { if (this.runtimeEnv === "production") { @@ -1508,7 +1644,11 @@ export class PathwaysBuilder< this.logger.info("Auto-provisioning Flowcore resources for production startup", { pathwayMode: this.pathwayMode, }) - await this.provision() + if (this.pathwayMode === "virtual") { + await this.provisionSharedResources() + } else { + await this.provision() + } } else if (this.runtimeEnv === "development") { this.logger.info("Auto-provisioning shared Flowcore resources for development startup") await this.provisionSharedResources() @@ -1517,21 +1657,7 @@ export class PathwaysBuilder< } } - // Auto-configure pulse when pathwayName was provisioned and no explicit pulse config - if (this.pathwayMode === "virtual" && this.pathwayId && !options.pulse) { - options = { - ...options, - pulse: { - url: this.pulseUrl, - intervalMs: this.pulseIntervalMs, - pathwayId: this.pathwayId, - successLogLevel: this.logLevel.pulseSuccess, - failureLogLevel: this.logLevel.pulseFailure, - }, - } - this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl }) - } else if (options.pulse) { - // Respect explicit pulse config but inject log levels from builder config if not set + if (options.pulse) { options = { ...options, pulse: { @@ -1540,6 +1666,16 @@ export class PathwaysBuilder< failureLogLevel: options.pulse.failureLogLevel ?? this.logLevel.pulseFailure, }, } + } else { + const autoPulse = this.buildAutoPulseConfig() + if (autoPulse) { + this.currentPumpUsesAutoPulse = true + this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl }) + options = { + ...options, + pulse: autoPulse, + } + } } // If cluster active and not leader, don't start pump @@ -1575,50 +1711,11 @@ export class PathwaysBuilder< return this.pathwayPump } - // Collect registered pathways - const registrations = Object.keys(this.pathways).map((key) => { - const [flowType, eventType] = key.split("/") - return { flowType, eventType } - }) - - await this.pathwayPump.start(registrations) - - this.logger.info("Pump started", { - pathways: registrations.length, - }) - - // Auto-start command poller for virtual pathways (poll CP for restart commands) - if (this.pathwayMode === "virtual" && this.pathwayId && !this.commandPoller) { - this.commandPoller = new CommandPoller({ - cpBaseUrl: this.pulseUrl, - pathwayId: this.pathwayId, - apiKey: this.apiKey, - intervalMs: this.commandPollingIntervalMs, - logger: this.logger, - onCommand: async (cmd) => { - if (cmd.type === "datapumpRestart") { - const position = cmd.position - ? { - timeBucket: (cmd.position as Record).timeBucket ?? "", - eventId: (cmd.position as Record).eventId, - } - : undefined - const flowTypes = cmd.sourceFlowTypes ?? undefined - await this.pathwayPump!.reset(position, flowTypes) - } else { - this.logger.warn("Unknown command type received", { type: cmd.type, commandId: cmd.id }) - } - }, - logLevel: { - pollSuccess: this.logLevel.pulseSuccess, - pollFailure: this.logLevel.pulseFailure, - }, - }) - this.commandPoller.start() - this.logger.info("Command poller started", { - pathwayId: this.pathwayId, - intervalMs: this.commandPollingIntervalMs, - }) + try { + await this.bootstrapLeaderPump() + } catch (err) { + await this.stopPump() + throw err } return this.pathwayPump @@ -1628,13 +1725,18 @@ export class PathwaysBuilder< * Stop the data pump */ async stopPump(): Promise { - if (this.commandPoller) { - this.commandPoller.stop() - this.commandPoller = null + this.stopCommandPoller() + if (!this.pathwayPump) { + this.currentPumpAutoProvision = false + this.currentPumpUsesExplicitPulse = false + this.currentPumpUsesAutoPulse = false + return } - if (!this.pathwayPump) return await this.pathwayPump.stop() this.pathwayPump = null + this.currentPumpAutoProvision = false + this.currentPumpUsesExplicitPulse = false + this.currentPumpUsesAutoPulse = false this.logger.info("Pump stopped") } diff --git a/src/pathways/pump/pathway-pump.ts b/src/pathways/pump/pathway-pump.ts index 9b20911..3510782 100644 --- a/src/pathways/pump/pathway-pump.ts +++ b/src/pathways/pump/pathway-pump.ts @@ -38,7 +38,7 @@ export class PathwayPump { private readonly bufferSize: number private readonly maxRedeliveryCount: number private readonly logger: Logger - private readonly pulseConfig?: { + private pulseConfig?: { url: string intervalMs?: number pathwayId?: string @@ -278,6 +278,38 @@ export class PathwayPump { return resetFlowTypes } + async setPulseConfig(pulseConfig: NonNullable): Promise { + this.pulseConfig = pulseConfig + + if (!this.running) { + return + } + + const flowTypeGroups = [...this.flowTypeEventTypes.entries()] + const existingPumps = [...this.pumps.entries()] + + for (const [flowType, pump] of existingPumps) { + try { + await pump.stop() + this.logger.info("Data pump stopped for pulse reconfiguration", { flowType }) + } catch (err) { + this.logger.error( + `Error stopping pump for ${flowType} during pulse reconfiguration`, + err instanceof Error ? err : new Error(String(err)), + ) + throw err + } + } + + this.pumps.clear() + this.stateManagers.clear() + this.restartAttempts.clear() + + for (const [flowType, eventTypes] of flowTypeGroups) { + await this.startPumpForFlowType(flowType, eventTypes) + } + } + get isRunning(): boolean { return this.running } diff --git a/tests/pathway-pump.test.ts b/tests/pathway-pump.test.ts index bdbf28f..bcdf21f 100644 --- a/tests/pathway-pump.test.ts +++ b/tests/pathway-pump.test.ts @@ -130,5 +130,76 @@ Deno.test({ assertEquals(groups.get("order"), ["placed", "shipped"]) assertEquals(groups.get("payment"), ["received"]) }) + + await t.step("setPulseConfig recreates running pumps with the new pulse configuration", async () => { + const factory = createInMemoryStateFactory() + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + }) + + pump.configure({ + tenant: "test-tenant", + dataCore: "test-dc", + apiKey: "test-key", + baseUrl: "https://api.flowcore.io", + processEvent: async (_pathway: string, _event: FlowcoreEvent) => {}, + }) + + const stoppedFlowTypes: string[] = [] + const createdFlowTypes: string[] = [] + const createdPulsePathwayIds: string[] = [] + ;(pump as unknown as { + running: boolean + pumps: Map }> + flowTypeEventTypes: Map + dataPumpConstructor: { + create(options: Record): Promise<{ start(cb?: unknown): Promise }> + } + }).running = true + ;(pump as unknown as { pumps: Map }> }).pumps = new Map([ + ["user", { + stop: async () => { + stoppedFlowTypes.push("user") + }, + }], + ["order", { + stop: async () => { + stoppedFlowTypes.push("order") + }, + }], + ]) + ;(pump as unknown as { flowTypeEventTypes: Map }).flowTypeEventTypes = new Map([ + ["user", ["created", "updated"]], + ["order", ["placed"]], + ]) + ;(pump as unknown as { + dataPumpConstructor: { + create(options: Record): Promise<{ start(cb?: unknown): Promise }> + } + }).dataPumpConstructor = { + create: async (options: Record) => { + const dataSource = options.dataSource as { flowType: string } + const pulse = options.pulse as { pathwayId: string } + createdFlowTypes.push(dataSource.flowType) + createdPulsePathwayIds.push(pulse.pathwayId) + + return { + start: async () => {}, + } + }, + } + + await pump.setPulseConfig({ + url: "http://localhost:3000", + pathwayId: "pathway-123", + }) + + assertEquals(stoppedFlowTypes.sort(), ["order", "user"]) + assertEquals(createdFlowTypes.sort(), ["order", "user"]) + assertEquals(createdPulsePathwayIds, ["pathway-123", "pathway-123"]) + assertEquals(pump.isRunning, true) + assertEquals(pump.registeredFlowTypes.sort(), ["order", "user"]) + }) }, }) diff --git a/tests/pump-runtime-behavior.test.ts b/tests/pump-runtime-behavior.test.ts index 65cebbb..7f3a637 100644 --- a/tests/pump-runtime-behavior.test.ts +++ b/tests/pump-runtime-behavior.test.ts @@ -2,7 +2,7 @@ import { assertEquals, assertRejects } from "https://deno.land/std@0.224.0/asser import { stub } from "https://deno.land/std@0.224.0/testing/mock.ts" import { z } from "zod" import { CommandPoller } from "../src/pathways/command-poller.ts" -import { type PathwaysBuilderConfig, PathwaysBuilder } from "../src/pathways/builder.ts" +import { PathwaysBuilder, type PathwaysBuilderConfig } from "../src/pathways/builder.ts" import { PathwayPump } from "../src/pathways/pump/pathway-pump.ts" import { PathwayProvisioner } from "../src/pathways/provisioner.ts" @@ -142,23 +142,34 @@ Deno.test({ }) await t.step( - "production virtual mode provisions pathway resources and starts the local pump on the leader", + "production virtual mode starts the local pump before registering the virtual pathway on the leader", async () => { let provisionCalls = 0 let startCalls = 0 + let setPulseCalls = 0 let commandPollerStarts = 0 + const lifecycle: string[] = [] const fetchBodies: Array> = [] const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { provisionCalls++ + lifecycle.push("provision") }) - const startStub = stub(PathwayPump.prototype, "start", async () => { + const startStub = stub(PathwayPump.prototype, "start", async function (this: PathwayPump) { startCalls++ + ;(this as unknown as { running: boolean }).running = true + lifecycle.push("start") + }) + const setPulseStub = stub(PathwayPump.prototype, "setPulseConfig", async () => { + setPulseCalls++ + lifecycle.push("setPulse") }) const pollerStub = stub(CommandPoller.prototype, "start", () => { commandPollerStarts++ + lifecycle.push("pollerStart") }) const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + lifecycle.push("fetch") fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { status: 200, @@ -181,18 +192,180 @@ Deno.test({ assertEquals(provisionCalls, 1) assertEquals(startCalls, 1) + assertEquals(setPulseCalls, 1) assertEquals(commandPollerStarts, 1) assertEquals(fetchBodies.length, 1) assertEquals(fetchBodies[0].type, "virtual") + assertEquals(lifecycle, ["provision", "start", "fetch", "setPulse", "pollerStart"]) } finally { provisionStub.restore() startStub.restore() + setPulseStub.restore() pollerStub.restore() fetchStub.restore() } }, ) + await t.step( + "production virtual mode skips by-name registration on non-leaders and defers it until leadership gain", + async () => { + let provisionCalls = 0 + let startCalls = 0 + let stopCalls = 0 + let setPulseCalls = 0 + let commandPollerStarts = 0 + let commandPollerStops = 0 + const lifecycle: string[] = [] + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + lifecycle.push("provision") + }) + const startStub = stub(PathwayPump.prototype, "start", async function (this: PathwayPump) { + startCalls++ + ;(this as unknown as { running: boolean }).running = true + lifecycle.push("start") + }) + const stopStub = stub(PathwayPump.prototype, "stop", async function (this: PathwayPump) { + stopCalls++ + ;(this as unknown as { running: boolean }).running = false + lifecycle.push("stop") + }) + const setPulseStub = stub(PathwayPump.prototype, "setPulseConfig", async () => { + setPulseCalls++ + lifecycle.push("setPulse") + }) + const pollerStartStub = stub(CommandPoller.prototype, "start", () => { + commandPollerStarts++ + lifecycle.push("pollerStart") + }) + const pollerStopStub = stub(CommandPoller.prototype, "stop", () => { + commandPollerStops++ + lifecycle.push("pollerStop") + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + lifecycle.push("fetch") + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "virtual-service", + pathwayMode: "virtual", + }) + const clusterManager = { isRunning: true, isLeader: false } + ;(builder as unknown as { clusterManager: typeof clusterManager }).clusterManager = clusterManager + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 0) + assertEquals(fetchBodies.length, 0) + assertEquals(setPulseCalls, 0) + assertEquals(commandPollerStarts, 0) + + clusterManager.isLeader = true + await (builder as unknown as { handleLeadershipChange(isLeader: boolean): Promise }) + .handleLeadershipChange(true) + + assertEquals(startCalls, 1) + assertEquals(fetchBodies.length, 1) + assertEquals(setPulseCalls, 1) + assertEquals(commandPollerStarts, 1) + + await (builder as unknown as { handleLeadershipChange(isLeader: boolean): Promise }) + .handleLeadershipChange(false) + + assertEquals(commandPollerStops, 1) + assertEquals(stopCalls, 1) + assertEquals(lifecycle, [ + "provision", + "start", + "fetch", + "setPulse", + "pollerStart", + "pollerStop", + "stop", + ]) + } finally { + provisionStub.restore() + startStub.restore() + stopStub.restore() + setPulseStub.restore() + pollerStartStub.restore() + pollerStopStub.restore() + fetchStub.restore() + } + }, + ) + + await t.step( + "production virtual mode stops the local pump when registration fails after startup", + async () => { + let startCalls = 0 + let stopCalls = 0 + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => {}) + const startStub = stub(PathwayPump.prototype, "start", async function (this: PathwayPump) { + startCalls++ + ;(this as unknown as { running: boolean }).running = true + }) + const stopStub = stub(PathwayPump.prototype, "stop", async function (this: PathwayPump) { + if (!(this as unknown as { running: boolean }).running) { + return + } + stopCalls++ + ;(this as unknown as { running: boolean }).running = false + }) + const fetchStub = stub(globalThis, "fetch", async () => { + return new Response( + JSON.stringify({ + status: 500, + code: "INTERNAL_SERVER_ERROR", + message: "Internal server error", + }), + { + status: 500, + headers: { "Content-Type": "application/json" }, + }, + ) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "virtual-service", + pathwayMode: "virtual", + }) + ;(builder as unknown as { clusterManager: { isRunning: boolean; isLeader: boolean } }).clusterManager = { + isRunning: true, + isLeader: true, + } + + await assertRejects( + () => builder.startPump(createPumpOptions()), + Error, + 'Failed to register virtual pathway "virtual-service"', + ) + + assertEquals(startCalls, 1) + assertEquals(stopCalls, 1) + } finally { + provisionStub.restore() + startStub.restore() + stopStub.restore() + fetchStub.restore() + } + }, + ) + await t.step("production managed mode provisions a managed pathway and does not start a local pump", async () => { let provisionCalls = 0 let startCalls = 0