From 9cb75eab65c0af3a8c15a6cf32a74a2801b4a17e Mon Sep 17 00:00:00 2001 From: jbiskur Date: Fri, 10 Apr 2026 14:23:04 +0100 Subject: [PATCH 1/5] feat!: poll-based command queue for virtual pathway resets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace push-based HTTP callback reset with pull-based command polling. Virtual pathway instances now poll the CP every 5s for pending commands instead of exposing a public reset endpoint. New: - CommandPoller class polls GET /api/v1/pathways/:pathwayId/commands/pending - Reports status via POST /api/v1/pathways/:pathwayId/commands/:commandId/status - Phase flow: acknowledged → execute resetPump() → running (or failed) - Auto-starts when pathwayName is configured and pump starts - Configurable via commandPollingIntervalMs (default: 5000ms) Breaking changes: - advertisedUrl, resetSecret, resetPath constructor params deprecated (accepted but ignored) - pathwayName no longer requires advertisedUrl or resetSecret - processReset() on PathwayRouter deprecated (returns no-op with warning) - virtualConfig in provision() no longer sends resetUrl or authHeaders The library reuses its existing apiKey for polling authentication (tenant-mode FRN auth on the CP side). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pathways/builder.ts | 65 ++++++++---- src/pathways/command-poller.ts | 180 +++++++++++++++++++++++++++++++++ src/router/index.ts | 39 ++----- 3 files changed, 233 insertions(+), 51 deletions(-) create mode 100644 src/pathways/command-poller.ts diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index 642a656..ff5e24d 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -7,6 +7,7 @@ import type { FlowcoreEvent } from "../contracts/event.ts" import { InternalPathwayState } from "./internal-pathway.state.ts" import type { Logger } from "./logger.ts" import { NoopLogger } from "./logger.ts" +import { CommandPoller } from "./command-poller.ts" import type { EventMetadata, PathwayContract, @@ -298,16 +299,15 @@ export class PathwaysBuilder< // Virtual pathway auto-provisioning private readonly pathwayName?: string private readonly pathwayLabels: Record - private readonly advertisedUrl?: string - private readonly resetSecret?: string - private readonly resetPath: string private readonly pulseUrl: string private readonly pulseIntervalMs: number + private readonly commandPollingIntervalMs: number private pathwayId?: string - // Cluster + pump + // Cluster + pump + command poller private clusterManager: ClusterManager | null = null private pathwayPump: PathwayPump | null = null + private commandPoller: CommandPoller | null = null private clusterBypassProcess = false /** @@ -339,11 +339,12 @@ export class PathwaysBuilder< dataCoreDeleteProtection, pathwayName, pathwayLabels, - advertisedUrl, - resetSecret, - resetPath, + advertisedUrl: _advertisedUrl, + resetSecret: _resetSecret, + resetPath: _resetPath, pulseUrl, pulseIntervalMs, + commandPollingIntervalMs, }: { baseUrl: string tenant: string @@ -359,11 +360,15 @@ export class PathwaysBuilder< dataCoreDeleteProtection?: boolean pathwayName?: string pathwayLabels?: Record + /** @deprecated No longer used — virtual pathway commands are now poll-based */ advertisedUrl?: string + /** @deprecated No longer used — virtual pathway commands are now poll-based */ resetSecret?: string + /** @deprecated No longer used — virtual pathway commands are now poll-based */ resetPath?: string pulseUrl?: string pulseIntervalMs?: number + commandPollingIntervalMs?: number }) { // Initialize logger (use NoopLogger if none provided) this.logger = logger ?? new NoopLogger() @@ -389,21 +394,11 @@ export class PathwaysBuilder< this.dataCoreDeleteProtection = dataCoreDeleteProtection ?? false // Store virtual pathway auto-provisioning config - if (pathwayName) { - if (!advertisedUrl) { - throw new Error("advertisedUrl is required when pathwayName is set") - } - if (!resetSecret) { - throw new Error("resetSecret is required when pathwayName is set") - } - } this.pathwayName = pathwayName this.pathwayLabels = pathwayLabels ?? {} - this.advertisedUrl = advertisedUrl - this.resetSecret = resetSecret - this.resetPath = resetPath ?? "/reset" this.pulseUrl = pulseUrl ?? "https://data-pathways.api.flowcore.io" this.pulseIntervalMs = pulseIntervalMs ?? 30_000 + this.commandPollingIntervalMs = commandPollingIntervalMs ?? 5_000 if (enableSessionUserResolvers) { this.sessionUserResolvers = overrideSessionUserResolvers ?? new SessionUser() @@ -1316,8 +1311,6 @@ export class PathwaysBuilder< dataCore: this.dataCore, labels: this.pathwayLabels, virtualConfig: { - resetUrl: `${this.advertisedUrl}${this.resetPath}`, - authHeaders: { "x-pump-reset-secret": this.resetSecret }, flowTypes, }, }), @@ -1438,6 +1431,34 @@ export class PathwaysBuilder< pathways: registrations.length, }) + // Auto-start command poller for virtual pathways (poll CP for restart commands) + if (this.pathwayId && !this.commandPoller) { + this.commandPoller = new CommandPoller({ + cpBaseUrl: this.pulseUrl, + pathwayId: this.pathwayId, + apiKey: this.apiKey, + intervalMs: this.commandPollingIntervalMs, + logger: this.logger, + onCommand: async (cmd) => { + if (cmd.type === "datapumpRestart") { + const position = cmd.position + ? { timeBucket: (cmd.position as Record).timeBucket ?? "", eventId: (cmd.position as Record).eventId } + : undefined + const flowTypes = cmd.sourceFlowTypes ?? undefined + await this.pathwayPump!.reset(position, flowTypes) + } else { + this.logger.warn("Unknown command type received", { type: cmd.type, commandId: cmd.id }) + } + }, + logLevel: { + pollSuccess: this.logLevel.pulseSuccess, + pollFailure: this.logLevel.pulseFailure, + }, + }) + this.commandPoller.start() + this.logger.info("Command poller started", { pathwayId: this.pathwayId, intervalMs: this.commandPollingIntervalMs }) + } + return this.pathwayPump } @@ -1445,6 +1466,10 @@ export class PathwaysBuilder< * Stop the data pump */ async stopPump(): Promise { + if (this.commandPoller) { + this.commandPoller.stop() + this.commandPoller = null + } if (!this.pathwayPump) return await this.pathwayPump.stop() this.pathwayPump = null diff --git a/src/pathways/command-poller.ts b/src/pathways/command-poller.ts new file mode 100644 index 0000000..d718fce --- /dev/null +++ b/src/pathways/command-poller.ts @@ -0,0 +1,180 @@ +import type { Logger } from "./logger.ts" +import type { LogLevel } from "./builder.ts" + +export interface PendingCommand { + id: string + type: string + position: Record | null + sourceFlowTypes: string[] | null + reason: string | null + stopAt: string | null +} + +interface PendingCommandsResponse { + commands: PendingCommand[] +} + +export interface CommandPollerOptions { + cpBaseUrl: string + pathwayId: string + apiKey: string + intervalMs: number + logger: Logger + onCommand: (command: PendingCommand) => Promise + logLevel: { + pollSuccess: LogLevel + pollFailure: LogLevel + } +} + +function formatAuthHeader(apiKey: string): string { + return `ApiKey ${apiKey.startsWith("fc_") ? `${apiKey.split("_")[1]}:${apiKey}` : apiKey}` +} + +export class CommandPoller { + private readonly cpBaseUrl: string + private readonly pathwayId: string + private readonly apiKey: string + private readonly intervalMs: number + private readonly logger: Logger + private readonly onCommand: (command: PendingCommand) => Promise + private readonly logLevel: { pollSuccess: LogLevel; pollFailure: LogLevel } + private timer: ReturnType | null = null + private polling = false + + constructor(options: CommandPollerOptions) { + this.cpBaseUrl = options.cpBaseUrl + this.pathwayId = options.pathwayId + this.apiKey = options.apiKey + this.intervalMs = options.intervalMs + this.logger = options.logger + this.onCommand = options.onCommand + this.logLevel = options.logLevel + } + + start(): void { + if (this.timer) return + + this.logger.debug("Command poller starting", { + pathwayId: this.pathwayId, + intervalMs: this.intervalMs, + }) + + // Poll immediately on start, then on interval + this.poll() + this.timer = setInterval(() => this.poll(), this.intervalMs) + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer) + this.timer = null + } + this.logger.debug("Command poller stopped", { pathwayId: this.pathwayId }) + } + + private async poll(): Promise { + // Guard against overlapping polls + if (this.polling) return + this.polling = true + + try { + const url = `${this.cpBaseUrl}/api/v1/pathways/${this.pathwayId}/commands/pending` + const response = await fetch(url, { + method: "GET", + headers: { + Authorization: formatAuthHeader(this.apiKey), + }, + }) + + if (!response.ok) { + this.logger[this.logLevel.pollFailure]("Command poll failed", { + pathwayId: this.pathwayId, + status: response.status, + }) + return + } + + const body = (await response.json()) as PendingCommandsResponse + + if (body.commands.length === 0) { + this.logger[this.logLevel.pollSuccess]("Command poll: no pending commands", { + pathwayId: this.pathwayId, + }) + return + } + + this.logger.info("Command poll: received commands", { + pathwayId: this.pathwayId, + count: body.commands.length, + }) + + for (const command of body.commands) { + await this.handleCommand(command) + } + } catch (err) { + this.logger[this.logLevel.pollFailure]("Command poll error", { + pathwayId: this.pathwayId, + error: err instanceof Error ? err.message : String(err), + }) + } finally { + this.polling = false + } + } + + private async handleCommand(command: PendingCommand): Promise { + const statusUrl = `${this.cpBaseUrl}/api/v1/pathways/${this.pathwayId}/commands/${command.id}/status` + const headers = { + "Content-Type": "application/json", + Authorization: formatAuthHeader(this.apiKey), + } + + // Acknowledge receipt + try { + await fetch(statusUrl, { + method: "POST", + headers, + body: JSON.stringify({ phase: "acknowledged" }), + }) + } catch { + // Best-effort ack — continue with execution + } + + // Execute command + try { + await this.onCommand(command) + + // Report success + await fetch(statusUrl, { + method: "POST", + headers, + body: JSON.stringify({ phase: "running" }), + }) + + this.logger.info("Command executed successfully", { + commandId: command.id, + type: command.type, + pathwayId: this.pathwayId, + }) + } catch (err) { + const details = err instanceof Error ? err.message : String(err) + + // Report failure + try { + await fetch(statusUrl, { + method: "POST", + headers, + body: JSON.stringify({ phase: "failed", details }), + }) + } catch { + // Best-effort status report + } + + this.logger.error("Command execution failed", err instanceof Error ? err : new Error(details), { + commandId: command.id, + type: command.type, + pathwayId: this.pathwayId, + }) + } + } +} diff --git a/src/router/index.ts b/src/router/index.ts index 5c424d0..b8b2a33 100644 --- a/src/router/index.ts +++ b/src/router/index.ts @@ -187,44 +187,21 @@ export class PathwayRouter { } /** - * Process an incoming virtual reset callback from the Data Pathways CP. - * Validates the secret, resolves which pumps to reset, and calls resetPump(). - * - * @param body The reset callback body from the CP - * @param providedSecret The secret from the x-pump-reset-secret header - * @returns Result with success status and list of flow types that were reset + * @deprecated Use the poll-based command queue instead. Virtual pathway commands + * are now polled from the CP via GET /api/v1/pathways/:id/commands/pending. + * The PathwaysBuilder auto-starts a CommandPoller when pathwayName is configured. */ async processReset( - body: ResetCallbackBody, - providedSecret: string | null, + _body: ResetCallbackBody, + _providedSecret: string | null, ): Promise<{ success: boolean; flowTypesReset: string[] }> { - // Validate secret key (same pattern as processEvent) - if (!providedSecret || providedSecret !== this.secretKey) { - const errorMsg = "Invalid secret key" - this.logger.error(errorMsg, new Error(errorMsg)) - throw new Error(errorMsg) - } - - this.logger.debug("Processing reset request", { - pathwayId: body.pathwayId, - mode: body.mode, - flowTypes: body.flowTypes, - }) - - const position = body.position - ? { timeBucket: body.position.timeBucket ?? "", eventId: body.position.eventId } - : undefined - - const flowTypesReset = await this.pathways.resetPump(position, body.flowTypes) - - this.logger.info("Reset completed", { flowTypesReset }) - - return { success: true, flowTypesReset } + this.logger.warn("processReset is deprecated — virtual pathway commands are now poll-based. This method is a no-op.") + return { success: false, flowTypesReset: [] } } } /** - * Body shape for virtual reset callbacks from the Data Pathways CP. + * @deprecated No longer used — virtual pathway commands are now poll-based. */ export interface ResetCallbackBody { pathwayId: string From 8b60ba6e6bdef9ff8ede93f44f1c6cf29f0ec63c Mon Sep 17 00:00:00 2001 From: jbiskur Date: Fri, 10 Apr 2026 15:41:38 +0100 Subject: [PATCH 2/5] fix: remove async from deprecated processReset (deno require-await lint) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/router/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/router/index.ts b/src/router/index.ts index b8b2a33..6442d0c 100644 --- a/src/router/index.ts +++ b/src/router/index.ts @@ -191,10 +191,10 @@ export class PathwayRouter { * are now polled from the CP via GET /api/v1/pathways/:id/commands/pending. * The PathwaysBuilder auto-starts a CommandPoller when pathwayName is configured. */ - async processReset( + processReset( _body: ResetCallbackBody, _providedSecret: string | null, - ): Promise<{ success: boolean; flowTypesReset: string[] }> { + ): { success: boolean; flowTypesReset: string[] } { this.logger.warn("processReset is deprecated — virtual pathway commands are now poll-based. This method is a no-op.") return { success: false, flowTypesReset: [] } } From e30b1f7a49fbea0784860fae92a7344c634ff38a Mon Sep 17 00:00:00 2001 From: jbiskur Date: Fri, 10 Apr 2026 15:44:18 +0100 Subject: [PATCH 3/5] =?UTF-8?q?fix:=20update=20config=20tests=20=E2=80=94?= =?UTF-8?q?=20pathwayName=20no=20longer=20requires=20advertisedUrl/resetSe?= =?UTF-8?q?cret?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove assertThrows tests for the old validation that required advertisedUrl + resetSecret when pathwayName is set. These fields are now deprecated and ignored. Add test for the new commandPollingIntervalMs option and for pathwayName without any deprecated fields. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/pathway-builder-config.test.ts | 45 ++++++++++++---------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/tests/pathway-builder-config.test.ts b/tests/pathway-builder-config.test.ts index 21b2c12..8589ce3 100644 --- a/tests/pathway-builder-config.test.ts +++ b/tests/pathway-builder-config.test.ts @@ -1,4 +1,4 @@ -import { assertEquals, assertThrows } from "https://deno.land/std@0.224.0/assert/mod.ts" +import { assertEquals } from "https://deno.land/std@0.224.0/assert/mod.ts" import { PathwaysBuilder } from "../src/pathways/builder.ts" const baseOpts = { @@ -14,43 +14,36 @@ Deno.test({ sanitizeOps: false, fn: async (t) => { await t.step("should accept all new fields without error", () => { + const builder = new PathwaysBuilder({ + ...baseOpts, + pathwayName: "my-service", + pulseUrl: "https://custom-cp.example.com", + pulseIntervalMs: 15000, + commandPollingIntervalMs: 3000, + }) + + assertEquals(typeof builder, "object") + }) + + await t.step("should accept deprecated fields without error (backward compat)", () => { const builder = new PathwaysBuilder({ ...baseOpts, pathwayName: "my-service", advertisedUrl: "https://my-service.example.com", resetSecret: "my-reset-secret", resetPath: "/admin/reset", - pulseUrl: "https://custom-cp.example.com/api/v1/pump-pulse", - pulseIntervalMs: 15000, }) assertEquals(typeof builder, "object") }) - await t.step("should throw when pathwayName set but advertisedUrl missing", () => { - assertThrows( - () => - new PathwaysBuilder({ - ...baseOpts, - pathwayName: "my-service", - resetSecret: "my-reset-secret", - }), - Error, - "advertisedUrl is required when pathwayName is set", - ) - }) + await t.step("should accept pathwayName without advertisedUrl or resetSecret", () => { + const builder = new PathwaysBuilder({ + ...baseOpts, + pathwayName: "my-service", + }) - await t.step("should throw when pathwayName set but resetSecret missing", () => { - assertThrows( - () => - new PathwaysBuilder({ - ...baseOpts, - pathwayName: "my-service", - advertisedUrl: "https://my-service.example.com", - }), - Error, - "resetSecret is required when pathwayName is set", - ) + assertEquals(typeof builder, "object") }) await t.step("should work without pathwayName (backward compat)", () => { From 302ce2c7795dc7ca1487dc5a045ca9327ce31156 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Mon, 13 Apr 2026 15:08:08 +0100 Subject: [PATCH 4/5] feat!: make auto-provision runtime-aware --- README.md | 47 ++++ deno.lock | 3 +- src/pathways/builder.ts | 354 +++++++++++++++++++-------- src/pathways/pump/types.ts | 2 +- tests/pathway-builder-config.test.ts | 10 + tests/pump-runtime-behavior.test.ts | 241 ++++++++++++++++++ 6 files changed, 559 insertions(+), 98 deletions(-) create mode 100644 tests/pump-runtime-behavior.test.ts diff --git a/README.md b/README.md index 15b1f41..410abd9 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Pathways helps you build event-driven applications with type-safe pathways for p - [HTTP Server Integration](#http-server-integration) - [Persistence Options](#persistence-options) - [Advanced Usage](#advanced-usage) + - [Runtime Defaults and Auto-Provisioning](#runtime-defaults-and-auto-provisioning) - [Auditing](#auditing) - [Custom Loggers](#custom-loggers) - [Retry Mechanisms](#retry-mechanisms) @@ -128,6 +129,52 @@ const pathways = new PathwaysBuilder({ }) ``` +### Runtime Defaults and Auto-Provisioning + +`PathwaysBuilder` can now drive different startup behavior for development and production: + +- `development`: starts the local in-process pump and only provisions shared Flowcore resources such as the data core, + flow types, and event types +- `production + virtual`: requires cluster mode and auto-provisions a virtual pathway by name +- `production + managed`: auto-provisions a managed pathway by name and does not start a local pump + +```typescript +const pathways = new PathwaysBuilder({ + baseUrl: "https://api.flowcore.io", + tenant: "your-tenant", + dataCore: "your-data-core", + apiKey: process.env.FLOWCORE_API_KEY!, + runtimeEnv: process.env.NODE_ENV === "production" ? "production" : "development", + pathwayName: "orders-service", + pathwayMode: "virtual", // default + defaultAutoProvision: true, // default +}) +``` + +For managed production delivery, provide a transform endpoint and leave event fetching to the control plane: + +```typescript +const pathways = new PathwaysBuilder({ + baseUrl: "https://api.flowcore.io", + tenant: "your-tenant", + dataCore: "your-data-core", + apiKey: process.env.FLOWCORE_API_KEY!, + runtimeEnv: "production", + pathwayName: "orders-service", + pathwayMode: "managed", + managedConfig: { + endpointUrl: "https://app.example.com/api/flowcore", + authHeaders: { + authorization: `Bearer ${process.env.TRANSFORM_TOKEN!}`, + }, + sizeClass: "medium", + }, +}) +``` + +To disable all remote provisioning and keep startup fully manual, set `defaultAutoProvision: false` or pass +`autoProvision: false` to `startPump()`. + ### Registering Pathways Register pathways with their schemas for type-safe event handling: diff --git a/deno.lock b/deno.lock index 08624db..f835212 100644 --- a/deno.lock +++ b/deno.lock @@ -399,7 +399,8 @@ "https://deno.land/std@0.224.0/internal/format.ts": "0a98ee226fd3d43450245b1844b47003419d34d210fa989900861c79820d21c2", "https://deno.land/std@0.224.0/internal/mod.ts": "534125398c8e7426183e12dc255bb635d94e06d0f93c60a297723abe69d3b22e", "https://deno.land/std@0.224.0/testing/_test_suite.ts": "f10a8a6338b60c403f07a76f3f46bdc9f1e1a820c0a1decddeb2949f7a8a0546", - "https://deno.land/std@0.224.0/testing/bdd.ts": "3e4de4ff6d8f348b5574661cef9501b442046a59079e201b849d0e74120d476b" + "https://deno.land/std@0.224.0/testing/bdd.ts": "3e4de4ff6d8f348b5574661cef9501b442046a59079e201b849d0e74120d476b", + "https://deno.land/std@0.224.0/testing/mock.ts": "a963181c2860b6ba3eb60e08b62c164d33cf5da7cd445893499b2efda20074db" }, "workspace": { "dependencies": [ diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index ff5e24d..c2e6485 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -23,7 +23,7 @@ import type { PathwayClusterOptions } from "./cluster/types.ts" import { ClusterManager } from "./cluster/cluster-manager.ts" import type { PathwayPumpOptions, PumpState } from "./pump/types.ts" import { PathwayPump } from "./pump/pathway-pump.ts" -import { PathwayProvisioner } from "./provisioner.ts" +import { PathwayProvisioner, type ProvisionerRegistration } from "./provisioner.ts" import { AUDIT_ENTITY_ID, AUDIT_ENTITY_TYPE, @@ -58,6 +58,17 @@ const DEFAULT_RETRY_DELAY_MS = 500 */ const DEFAULT_SESSION_USER_RESOLVER_TTL_MS = 10 * 1000 +function normalizeRuntimeEnv(runtimeEnv?: string): PathwayRuntimeEnv { + switch (runtimeEnv) { + case "development": + case "production": + case "test": + return runtimeEnv + default: + return "development" + } +} + /** * Defines the mode for auditing pathway operations * - "user": Normal user-initiated operations @@ -129,6 +140,46 @@ export type LogLevelConfig = { */ type InternalLogLevelConfig = Required +export type PathwayRuntimeEnv = "development" | "production" | "test" + +export type PathwayMode = "virtual" | "managed" + +export interface ManagedPathwayConfig { + endpointUrl: string + authHeaders?: Record + sizeClass?: "small" | "medium" | "high" +} + +export interface PathwaysBuilderConfig { + baseUrl: string + tenant: string + dataCore: string + apiKey: string + pathwayTimeoutMs?: number + logger?: Logger + enableSessionUserResolvers?: boolean + overrideSessionUserResolvers?: SessionUserResolver + logLevel?: LogLevelConfig + dataCoreDescription?: string + dataCoreAccessControl?: string + dataCoreDeleteProtection?: boolean + pathwayName?: string + pathwayLabels?: Record + /** @deprecated No longer used — virtual pathway commands are now poll-based */ + advertisedUrl?: string + /** @deprecated No longer used — virtual pathway commands are now poll-based */ + resetSecret?: string + /** @deprecated No longer used — virtual pathway commands are now poll-based */ + resetPath?: string + pulseUrl?: string + pulseIntervalMs?: number + commandPollingIntervalMs?: number + runtimeEnv?: PathwayRuntimeEnv + pathwayMode?: PathwayMode + defaultAutoProvision?: boolean + managedConfig?: ManagedPathwayConfig +} + /** * SessionUserResolver is a key-value store for storing and retrieving UserIdResolver functions * with a TTL (time to live). @@ -302,6 +353,10 @@ export class PathwaysBuilder< private readonly pulseUrl: string private readonly pulseIntervalMs: number private readonly commandPollingIntervalMs: number + private readonly runtimeEnv: PathwayRuntimeEnv + private readonly pathwayMode: PathwayMode + private readonly defaultAutoProvision: boolean + private readonly managedConfig?: ManagedPathwayConfig private pathwayId?: string // Cluster + pump + command poller @@ -345,31 +400,11 @@ export class PathwaysBuilder< pulseUrl, pulseIntervalMs, commandPollingIntervalMs, - }: { - baseUrl: string - tenant: string - dataCore: string - apiKey: string - pathwayTimeoutMs?: number - logger?: Logger - enableSessionUserResolvers?: boolean - overrideSessionUserResolvers?: SessionUserResolver - logLevel?: LogLevelConfig - dataCoreDescription?: string - dataCoreAccessControl?: string - dataCoreDeleteProtection?: boolean - pathwayName?: string - pathwayLabels?: Record - /** @deprecated No longer used — virtual pathway commands are now poll-based */ - advertisedUrl?: string - /** @deprecated No longer used — virtual pathway commands are now poll-based */ - resetSecret?: string - /** @deprecated No longer used — virtual pathway commands are now poll-based */ - resetPath?: string - pulseUrl?: string - pulseIntervalMs?: number - commandPollingIntervalMs?: number - }) { + runtimeEnv, + pathwayMode, + defaultAutoProvision, + managedConfig, + }: PathwaysBuilderConfig) { // Initialize logger (use NoopLogger if none provided) this.logger = logger ?? new NoopLogger() @@ -399,6 +434,10 @@ export class PathwaysBuilder< this.pulseUrl = pulseUrl ?? "https://data-pathways.api.flowcore.io" this.pulseIntervalMs = pulseIntervalMs ?? 30_000 this.commandPollingIntervalMs = commandPollingIntervalMs ?? 5_000 + this.runtimeEnv = normalizeRuntimeEnv(runtimeEnv ?? process.env.NODE_ENV) + this.pathwayMode = pathwayMode ?? "virtual" + this.defaultAutoProvision = defaultAutoProvision ?? true + this.managedConfig = managedConfig if (enableSessionUserResolvers) { this.sessionUserResolvers = overrideSessionUserResolvers ?? new SessionUser() @@ -409,6 +448,9 @@ export class PathwaysBuilder< tenant, dataCore, pathwayTimeoutMs, + runtimeEnv: this.runtimeEnv, + pathwayMode: this.pathwayMode, + defaultAutoProvision: this.defaultAutoProvision, }) this.webhookBuilderFactory = new WebhookBuilder({ @@ -1260,14 +1302,8 @@ export class PathwaysBuilder< return this.clusterManager !== null && this.clusterManager.isRunning } - /** - * Provision Flowcore infrastructure (data core, flow types, event types). - * Creates missing resources when descriptions are provided, updates descriptions - * when they differ. Fails if a resource is missing and no description is provided. - * Additive only — never deletes. - */ - async provision(): Promise { - const registrations = Object.keys(this.pathways).map((key) => { + private buildRegistrations(): ProvisionerRegistration[] { + return Object.keys(this.pathways).map((key) => { const [flowType, eventType] = key.split("/") return { flowType, @@ -1276,6 +1312,10 @@ export class PathwaysBuilder< eventTypeDescription: this.eventTypeDescriptions.get(key), } }) + } + + private async provisionSharedResources(): Promise { + const registrations = this.buildRegistrations() const provisioner = new PathwayProvisioner({ tenant: this.tenant, @@ -1289,66 +1329,163 @@ export class PathwaysBuilder< }) await provisioner.provision() + return registrations + } - // Step 4: Register virtual pathway with CP (when pathwayName is set) - if (this.pathwayName) { - const flowTypes = [...new Set(registrations.map((r) => r.flowType))] - const cpBaseUrl = this.pulseUrl - const url = `${cpBaseUrl}/api/v1/pathways/by-name/${encodeURIComponent(this.pathwayName)}` + private getPathwayProvisionAuthHeader(): string { + const apiKey = this.apiKey.startsWith("fc_") ? `${this.apiKey.split("_")[1]}:${this.apiKey}` : this.apiKey - let response: Response - try { - response = await fetch(url, { - method: "PUT", - headers: { - "Content-Type": "application/json", - Authorization: `ApiKey ${ - this.apiKey.startsWith("fc_") ? `${this.apiKey.split("_")[1]}:${this.apiKey}` : this.apiKey - }`, - }, - body: JSON.stringify({ - tenant: this.tenant, - dataCore: this.dataCore, - labels: this.pathwayLabels, - virtualConfig: { - flowTypes, - }, - }), - }) - } catch (err) { - const msg = err instanceof Error ? err.message : String(err) - this.logger[this.logLevel.provisionFailure]("Virtual pathway registration failed", { - pathwayName: this.pathwayName, - url, - error: msg, - phase: "network", - }) - throw new Error(`Failed to register virtual pathway "${this.pathwayName}": ${msg}`) - } + return `ApiKey ${apiKey}` + } - if (!response.ok) { - const text = await response.text().catch(() => "") - this.logger[this.logLevel.provisionFailure]("Virtual pathway registration failed", { - pathwayName: this.pathwayName, - url, - status: response.status, - body: text, - phase: "response", - }) - throw new Error( - `Failed to register virtual pathway "${this.pathwayName}": ${response.status} ${text}`, - ) - } + private ensurePathwayName(reason: string): string { + if (!this.pathwayName) { + throw new Error(reason) + } + return this.pathwayName + } - const result = await response.json() as { pathwayId: string; status: string } - this.pathwayId = result.pathwayId - this.logger[this.logLevel.provisionSuccess]("Virtual pathway registered", { - pathwayName: this.pathwayName, - pathwayId: this.pathwayId, - status: result.status, - flowTypes, + private buildManagedPathwayConfig(registrations: ProvisionerRegistration[]): { + sizeClass: "small" | "medium" | "high" + config: { + sources: Array<{ + flowType: string + eventTypes: string[] + endpoints: Array<{ + url: string + authHeaders: Record + }> + }> + } + } { + if (!this.managedConfig?.endpointUrl) { + throw new Error( + "managedConfig.endpointUrl is required when provisioning a managed pathway", + ) + } + + const grouped = new Map>() + for (const registration of registrations) { + const eventTypes = grouped.get(registration.flowType) ?? new Set() + eventTypes.add(registration.eventType) + grouped.set(registration.flowType, eventTypes) + } + + return { + sizeClass: this.managedConfig.sizeClass ?? "small", + config: { + sources: [...grouped.entries()].map(([flowType, eventTypes]) => ({ + flowType, + eventTypes: [...eventTypes], + endpoints: [{ + url: this.managedConfig!.endpointUrl, + authHeaders: this.managedConfig!.authHeaders ?? {}, + }], + })), + }, + } + } + + private async upsertPathwayByName( + type: PathwayMode, + body: Record, + logMeta: Record, + ): Promise { + const pathwayName = this.ensurePathwayName( + `pathwayName is required when provisioning a ${type} pathway`, + ) + const url = `${this.pulseUrl}/api/v1/pathways/by-name/${encodeURIComponent(pathwayName)}` + + let response: Response + try { + response = await fetch(url, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Authorization: this.getPathwayProvisionAuthHeader(), + }, + body: JSON.stringify(body), }) + } catch (err) { + const msg = err instanceof Error ? err.message : String(err) + this.logger[this.logLevel.provisionFailure](`${type} pathway registration failed`, { + pathwayName, + url, + error: msg, + phase: "network", + }) + throw new Error(`Failed to register ${type} pathway "${pathwayName}": ${msg}`) + } + + if (!response.ok) { + const text = await response.text().catch(() => "") + this.logger[this.logLevel.provisionFailure](`${type} pathway registration failed`, { + pathwayName, + url, + status: response.status, + body: text, + phase: "response", + }) + throw new Error( + `Failed to register ${type} pathway "${pathwayName}": ${response.status} ${text}`, + ) + } + + const result = await response.json() as { pathwayId: string; status: string } + this.pathwayId = result.pathwayId + this.logger[this.logLevel.provisionSuccess](`${type} pathway registered`, { + pathwayName, + pathwayId: this.pathwayId, + status: result.status, + ...logMeta, + }) + } + + private async registerPathwayInstance(registrations: ProvisionerRegistration[]): Promise { + if (this.pathwayMode === "managed") { + const { sizeClass, config } = this.buildManagedPathwayConfig(registrations) + await this.upsertPathwayByName("managed", { + tenant: this.tenant, + dataCore: this.dataCore, + labels: this.pathwayLabels, + sizeClass, + enabled: true, + type: "managed", + config, + }, { + sizeClass, + sourceCount: config.sources.length, + }) + return + } + + if (!this.pathwayName) { + return } + + const flowTypes = [...new Set(registrations.map((registration) => registration.flowType))] + await this.upsertPathwayByName("virtual", { + tenant: this.tenant, + dataCore: this.dataCore, + labels: this.pathwayLabels, + type: "virtual", + virtualConfig: { + flowTypes, + }, + }, { + flowTypes, + }) + } + + /** + * Provision Flowcore infrastructure (data core, flow types, event types). + * Creates missing resources when descriptions are provided, updates descriptions + * when they differ. Fails if a resource is missing and no description is provided. + * Additive only — never deletes. + */ + async provision(): Promise { + const registrations = await this.provisionSharedResources() + await this.registerPathwayInstance(registrations) } /** @@ -1360,14 +1497,28 @@ export class PathwaysBuilder< throw new Error("Pump already started") } - // Auto-provision infrastructure if requested - if (options.autoProvision) { - this.logger.info("Auto-provisioning Flowcore infrastructure before starting pump") - await this.provision() + const autoProvision = options.autoProvision ?? this.defaultAutoProvision + + if (autoProvision) { + if (this.runtimeEnv === "production") { + if (this.pathwayMode === "virtual" && !this.isClusterActive) { + throw new Error("Cluster mode must be started before production virtual pump startup") + } + + this.logger.info("Auto-provisioning Flowcore resources for production startup", { + pathwayMode: this.pathwayMode, + }) + await this.provision() + } else if (this.runtimeEnv === "development") { + this.logger.info("Auto-provisioning shared Flowcore resources for development startup") + await this.provisionSharedResources() + } else { + this.logger.info("Skipping remote auto-provisioning in test runtime") + } } // Auto-configure pulse when pathwayName was provisioned and no explicit pulse config - if (this.pathwayId && !options.pulse) { + if (this.pathwayMode === "virtual" && this.pathwayId && !options.pulse) { options = { ...options, pulse: { @@ -1419,6 +1570,11 @@ export class PathwaysBuilder< }, }) + if (this.runtimeEnv === "production" && this.pathwayMode === "managed") { + this.logger.info("Not starting local pump — production managed pathways rely on control-plane delivery") + return this.pathwayPump + } + // Collect registered pathways const registrations = Object.keys(this.pathways).map((key) => { const [flowType, eventType] = key.split("/") @@ -1432,7 +1588,7 @@ export class PathwaysBuilder< }) // Auto-start command poller for virtual pathways (poll CP for restart commands) - if (this.pathwayId && !this.commandPoller) { + if (this.pathwayMode === "virtual" && this.pathwayId && !this.commandPoller) { this.commandPoller = new CommandPoller({ cpBaseUrl: this.pulseUrl, pathwayId: this.pathwayId, @@ -1442,7 +1598,10 @@ export class PathwaysBuilder< onCommand: async (cmd) => { if (cmd.type === "datapumpRestart") { const position = cmd.position - ? { timeBucket: (cmd.position as Record).timeBucket ?? "", eventId: (cmd.position as Record).eventId } + ? { + timeBucket: (cmd.position as Record).timeBucket ?? "", + eventId: (cmd.position as Record).eventId, + } : undefined const flowTypes = cmd.sourceFlowTypes ?? undefined await this.pathwayPump!.reset(position, flowTypes) @@ -1456,7 +1615,10 @@ export class PathwaysBuilder< }, }) this.commandPoller.start() - this.logger.info("Command poller started", { pathwayId: this.pathwayId, intervalMs: this.commandPollingIntervalMs }) + this.logger.info("Command poller started", { + pathwayId: this.pathwayId, + intervalMs: this.commandPollingIntervalMs, + }) } return this.pathwayPump diff --git a/src/pathways/pump/types.ts b/src/pathways/pump/types.ts index 928a0d7..8b1f1f2 100644 --- a/src/pathways/pump/types.ts +++ b/src/pathways/pump/types.ts @@ -8,7 +8,7 @@ export interface PathwayPumpOptions { notifier?: PumpNotifierConfig bufferSize?: number maxRedeliveryCount?: number - /** If true, calls provision() before starting the pump */ + /** If true, applies the builder's environment-aware provisioning rules before startup */ autoProvision?: boolean /** Optional pulse reporting to control plane */ pulse?: { diff --git a/tests/pathway-builder-config.test.ts b/tests/pathway-builder-config.test.ts index 8589ce3..7a1363f 100644 --- a/tests/pathway-builder-config.test.ts +++ b/tests/pathway-builder-config.test.ts @@ -20,6 +20,16 @@ Deno.test({ pulseUrl: "https://custom-cp.example.com", pulseIntervalMs: 15000, commandPollingIntervalMs: 3000, + runtimeEnv: "production", + pathwayMode: "managed", + defaultAutoProvision: false, + managedConfig: { + endpointUrl: "https://app.example.com/flowcore", + authHeaders: { + "x-service-token": "secret", + }, + sizeClass: "medium", + }, }) assertEquals(typeof builder, "object") diff --git a/tests/pump-runtime-behavior.test.ts b/tests/pump-runtime-behavior.test.ts new file mode 100644 index 0000000..65cebbb --- /dev/null +++ b/tests/pump-runtime-behavior.test.ts @@ -0,0 +1,241 @@ +import { assertEquals, assertRejects } from "https://deno.land/std@0.224.0/assert/mod.ts" +import { stub } from "https://deno.land/std@0.224.0/testing/mock.ts" +import { z } from "zod" +import { CommandPoller } from "../src/pathways/command-poller.ts" +import { type PathwaysBuilderConfig, PathwaysBuilder } from "../src/pathways/builder.ts" +import { PathwayPump } from "../src/pathways/pump/pathway-pump.ts" +import { PathwayProvisioner } from "../src/pathways/provisioner.ts" + +const baseOpts = { + baseUrl: "https://api.flowcore.io", + tenant: "test-tenant", + dataCore: "test-dc", + apiKey: "fc_testid_testsecret", + dataCoreDescription: "Test data core", +} + +function createBuilder(overrides: Partial = {}) { + return new PathwaysBuilder({ + ...baseOpts, + ...overrides, + }).register({ + flowType: "user.0", + eventType: "created.0", + schema: z.object({ + id: z.string(), + }), + flowTypeDescription: "User events", + description: "User created", + }) +} + +function createPumpOptions() { + return { + stateManagerFactory: () => ({ + getState: () => null, + setState: () => {}, + }), + notifier: { type: "poller" as const, pollerIntervalMs: 1000 }, + } +} + +Deno.test({ + name: "PathwaysBuilder startPump runtime behavior", + sanitizeResources: false, + sanitizeOps: false, + fn: async (t) => { + await t.step("development auto-provision provisions shared resources only", async () => { + const provisionCalls: string[] = [] + const startCalls: number[] = [] + const fetchCalls: RequestInit[] = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls.push("shared") + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls.push(1) + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchCalls.push(init ?? {}) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "development", + pathwayName: "dev-service", + }) + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls.length, 1) + assertEquals(startCalls.length, 1) + assertEquals(fetchCalls.length, 0) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) + + await t.step("defaultAutoProvision=false skips all remote provisioning", async () => { + let provisionCalls = 0 + let startCalls = 0 + let fetchCalls = 0 + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const fetchStub = stub(globalThis, "fetch", async () => { + fetchCalls++ + return new Response("{}", { status: 200 }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "development", + pathwayName: "dev-service", + defaultAutoProvision: false, + }) + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 0) + assertEquals(fetchCalls, 0) + assertEquals(startCalls, 1) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) + + await t.step("production virtual mode requires an active cluster before startup", async () => { + let provisionCalls = 0 + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "virtual-service", + }) + + await assertRejects( + () => builder.startPump(createPumpOptions()), + Error, + "Cluster mode must be started before production virtual pump startup", + ) + + assertEquals(provisionCalls, 0) + } finally { + provisionStub.restore() + } + }) + + await t.step( + "production virtual mode provisions pathway resources and starts the local pump on the leader", + async () => { + let provisionCalls = 0 + let startCalls = 0 + let commandPollerStarts = 0 + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const pollerStub = stub(CommandPoller.prototype, "start", () => { + commandPollerStarts++ + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "virtual-service", + pathwayMode: "virtual", + }) + ;(builder as unknown as { clusterManager: { isRunning: boolean; isLeader: boolean } }).clusterManager = { + isRunning: true, + isLeader: true, + } + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 1) + assertEquals(commandPollerStarts, 1) + assertEquals(fetchBodies.length, 1) + assertEquals(fetchBodies[0].type, "virtual") + } finally { + provisionStub.restore() + startStub.restore() + pollerStub.restore() + fetchStub.restore() + } + }, + ) + + await t.step("production managed mode provisions a managed pathway and does not start a local pump", async () => { + let provisionCalls = 0 + let startCalls = 0 + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "production", + pathwayMode: "managed", + pathwayName: "managed-service", + managedConfig: { + endpointUrl: "https://app.example.com/flowcore", + authHeaders: { authorization: "Bearer secret" }, + sizeClass: "medium", + }, + }) + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 0) + assertEquals(fetchBodies.length, 1) + assertEquals(fetchBodies[0].type, "managed") + assertEquals((fetchBodies[0].config as { sources: unknown[] }).sources.length, 1) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) + }, +}) From 3b19db86c99d37eae87d4b4fb71699f6ab3b6f08 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Mon, 13 Apr 2026 15:14:35 +0100 Subject: [PATCH 5/5] fix: replace broken jq github action --- .github/workflows/build.yml | 5 ++--- .github/workflows/release-please.yml | 5 ++--- .github/workflows/validate.yml | 5 ++--- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c2229ac..92354f7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -18,10 +18,9 @@ jobs: token: ${{ secrets.FLOWCORE_MACHINE_GITHUB_TOKEN }} submodules: true - name: Extract version from deno.json - uses: sergeysova/jq-action@v2 id: version - with: - cmd: "jq .version deno.json -r" + run: | + echo "value=$(python3 -c 'import json; print(json.load(open("deno.json"))["version"])')" >> "$GITHUB_OUTPUT" - name: Show my version run: 'echo "version ${{ steps.version.outputs.value }}"' diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index a08dff5..8558249 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -13,10 +13,9 @@ jobs: steps: - uses: actions/checkout@v3 - name: Extract package name from deno.json - uses: sergeysova/jq-action@v2 id: package - with: - cmd: "jq .name deno.json -r" + run: | + echo "value=$(python3 -c 'import json; print(json.load(open("deno.json"))["name"])')" >> "$GITHUB_OUTPUT" - name: Setup Deno2 environment uses: denoland/setup-deno@v2 with: diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 9adf630..02ec291 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -41,10 +41,9 @@ jobs: token: ${{ secrets.FLOWCORE_MACHINE_GITHUB_TOKEN }} submodules: true - name: Extract version from deno.json - uses: sergeysova/jq-action@v2 id: version - with: - cmd: "jq .version deno.json -r" + run: | + echo "value=$(python3 -c 'import json; print(json.load(open("deno.json"))["version"])')" >> "$GITHUB_OUTPUT" - name: Show my version run: 'echo "version ${{ steps.version.outputs.value }}"'