diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ac36b9..fc4f513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,161 +2,176 @@ ## [2.0.1](https://github.com/flowcore-io/flowcore-pathways/compare/v2.0.0...v2.0.1) (2026-04-13) - ### Bug Fixes -* avoid virtual auto-provision startup deadlock ([#73](https://github.com/flowcore-io/flowcore-pathways/issues/73)) ([d6d866c](https://github.com/flowcore-io/flowcore-pathways/commit/d6d866cf814ab963ef1f9f42b0d35d12c8a486d9)) +- avoid virtual auto-provision startup deadlock ([#73](https://github.com/flowcore-io/flowcore-pathways/issues/73)) + ([d6d866c](https://github.com/flowcore-io/flowcore-pathways/commit/d6d866cf814ab963ef1f9f42b0d35d12c8a486d9)) ## [2.0.0](https://github.com/flowcore-io/flowcore-pathways/compare/v1.0.0...v2.0.0) (2026-04-13) - ### ⚠ BREAKING CHANGES -* make auto-provision runtime-aware ([#71](https://github.com/flowcore-io/flowcore-pathways/issues/71)) +- make auto-provision runtime-aware ([#71](https://github.com/flowcore-io/flowcore-pathways/issues/71)) ### Features -* make auto-provision runtime-aware ([#71](https://github.com/flowcore-io/flowcore-pathways/issues/71)) ([1e82da2](https://github.com/flowcore-io/flowcore-pathways/commit/1e82da2979c8c414cdf2f1b82750a0ced6994129)) +- make auto-provision runtime-aware ([#71](https://github.com/flowcore-io/flowcore-pathways/issues/71)) + ([1e82da2](https://github.com/flowcore-io/flowcore-pathways/commit/1e82da2979c8c414cdf2f1b82750a0ced6994129)) ## [1.0.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.24.0...v1.0.0) (2026-04-10) - ### ⚠ BREAKING CHANGES -* poll-based command queue for virtual pathway resets ([#69](https://github.com/flowcore-io/flowcore-pathways/issues/69)) +- poll-based command queue for virtual pathway resets + ([#69](https://github.com/flowcore-io/flowcore-pathways/issues/69)) ### Features -* poll-based command queue for virtual pathway resets ([#69](https://github.com/flowcore-io/flowcore-pathways/issues/69)) ([133b047](https://github.com/flowcore-io/flowcore-pathways/commit/133b0471c38a28011ad89e2ec7c86ba50be74839)) +- poll-based command queue for virtual pathway resets + ([#69](https://github.com/flowcore-io/flowcore-pathways/issues/69)) + ([133b047](https://github.com/flowcore-io/flowcore-pathways/commit/133b0471c38a28011ad89e2ec7c86ba50be74839)) ## [0.24.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.23.1...v0.24.0) (2026-04-08) - ### Features -* configurable log levels for pulse and provision events ([#67](https://github.com/flowcore-io/flowcore-pathways/issues/67)) ([214752f](https://github.com/flowcore-io/flowcore-pathways/commit/214752fcddd4d84ab9305208fc043adc91e3bef5)) +- configurable log levels for pulse and provision events + ([#67](https://github.com/flowcore-io/flowcore-pathways/issues/67)) + ([214752f](https://github.com/flowcore-io/flowcore-pathways/commit/214752fcddd4d84ab9305208fc043adc91e3bef5)) ## [0.23.1](https://github.com/flowcore-io/flowcore-pathways/compare/v0.23.0...v0.23.1) (2026-04-07) - ### Bug Fixes -* pulseUrl default should be base URL not full endpoint path ([#65](https://github.com/flowcore-io/flowcore-pathways/issues/65)) ([a003536](https://github.com/flowcore-io/flowcore-pathways/commit/a00353689bdbd25e359195bb786f45e9137772d6)) +- pulseUrl default should be base URL not full endpoint path + ([#65](https://github.com/flowcore-io/flowcore-pathways/issues/65)) + ([a003536](https://github.com/flowcore-io/flowcore-pathways/commit/a00353689bdbd25e359195bb786f45e9137772d6)) ## [0.23.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.22.1...v0.23.0) (2026-04-07) - ### Features -* add pathwayLabels for virtual pathway metadata ([#63](https://github.com/flowcore-io/flowcore-pathways/issues/63)) ([8f0048a](https://github.com/flowcore-io/flowcore-pathways/commit/8f0048a74f3d89f5ace47fa68e160fd880799d33)) +- add pathwayLabels for virtual pathway metadata ([#63](https://github.com/flowcore-io/flowcore-pathways/issues/63)) + ([8f0048a](https://github.com/flowcore-io/flowcore-pathways/commit/8f0048a74f3d89f5ace47fa68e160fd880799d33)) ## [0.22.1](https://github.com/flowcore-io/flowcore-pathways/compare/v0.22.0...v0.22.1) (2026-04-07) - ### Bug Fixes -* format ApiKey header as keyId:fullKey for CP auth ([#61](https://github.com/flowcore-io/flowcore-pathways/issues/61)) ([e49c17a](https://github.com/flowcore-io/flowcore-pathways/commit/e49c17a8ca6cd8e38150222a8ceaa6e77fca1f69)) +- format ApiKey header as keyId:fullKey for CP auth ([#61](https://github.com/flowcore-io/flowcore-pathways/issues/61)) + ([e49c17a](https://github.com/flowcore-io/flowcore-pathways/commit/e49c17a8ca6cd8e38150222a8ceaa6e77fca1f69)) ## [0.22.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.21.1...v0.22.0) (2026-04-02) - ### Features -* auto-provisioned virtual pathways with pulse + reset ([#59](https://github.com/flowcore-io/flowcore-pathways/issues/59)) ([82ff956](https://github.com/flowcore-io/flowcore-pathways/commit/82ff9563abd7a82bc4ec28943794c6528fe7a009)) +- auto-provisioned virtual pathways with pulse + reset + ([#59](https://github.com/flowcore-io/flowcore-pathways/issues/59)) + ([82ff956](https://github.com/flowcore-io/flowcore-pathways/commit/82ff9563abd7a82bc4ec28943794c6528fe7a009)) ## [0.21.1](https://github.com/flowcore-io/flowcore-pathways/compare/v0.21.0...v0.21.1) (2026-03-31) - ### Bug Fixes -* restart individual flow type pumps on error with exponential backoff ([#57](https://github.com/flowcore-io/flowcore-pathways/issues/57)) ([2644e71](https://github.com/flowcore-io/flowcore-pathways/commit/2644e711713fe1694522a95708e37de3872008f0)) +- restart individual flow type pumps on error with exponential backoff + ([#57](https://github.com/flowcore-io/flowcore-pathways/issues/57)) + ([2644e71](https://github.com/flowcore-io/flowcore-pathways/commit/2644e711713fe1694522a95708e37de3872008f0)) ## [0.21.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.20.1...v0.21.0) (2026-03-30) - ### Features -* bump SDK + data-pump for fc_ key websocket support ([#55](https://github.com/flowcore-io/flowcore-pathways/issues/55)) ([7e0676f](https://github.com/flowcore-io/flowcore-pathways/commit/7e0676f8e2685c8049ae4305bb1c6932834b5abc)) +- bump SDK + data-pump for fc_ key websocket support ([#55](https://github.com/flowcore-io/flowcore-pathways/issues/55)) + ([7e0676f](https://github.com/flowcore-io/flowcore-pathways/commit/7e0676f8e2685c8049ae4305bb1c6932834b5abc)) ## [0.20.1](https://github.com/flowcore-io/flowcore-pathways/compare/v0.20.0...v0.20.1) (2026-03-27) - ### Bug Fixes -* use explicit pulse URL instead of baseUrl for pulse reporting ([4605c34](https://github.com/flowcore-io/flowcore-pathways/commit/4605c34426804c5fb6081bbe0463dee50cea13d1)) +- use explicit pulse URL instead of baseUrl for pulse reporting + ([4605c34](https://github.com/flowcore-io/flowcore-pathways/commit/4605c34426804c5fb6081bbe0463dee50cea13d1)) ## [0.20.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.19.0...v0.20.0) (2026-03-26) - ### Features -* wire pulse config through PathwayPump to FlowcoreDataPump ([c0aa851](https://github.com/flowcore-io/flowcore-pathways/commit/c0aa8512a9c78e562d3cd78fe3bb7be6c5566a0c)) +- wire pulse config through PathwayPump to FlowcoreDataPump + ([c0aa851](https://github.com/flowcore-io/flowcore-pathways/commit/c0aa8512a9c78e562d3cd78fe3bb7be6c5566a0c)) ## [0.19.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.18.0...v0.19.0) (2026-03-24) - ### Features -* add cluster-aware pump reset functionality ([#51](https://github.com/flowcore-io/flowcore-pathways/issues/51)) ([03f110b](https://github.com/flowcore-io/flowcore-pathways/commit/03f110b2a3e63f1ce12b05993e3da042cb2a7199)) +- add cluster-aware pump reset functionality ([#51](https://github.com/flowcore-io/flowcore-pathways/issues/51)) + ([03f110b](https://github.com/flowcore-io/flowcore-pathways/commit/03f110b2a3e63f1ce12b05993e3da042cb2a7199)) ## [0.18.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.17.5...v0.18.0) (2026-03-23) - ### Features -* add Node.js/Bun cluster transport with runtime auto-detection ([#49](https://github.com/flowcore-io/flowcore-pathways/issues/49)) ([31ebc99](https://github.com/flowcore-io/flowcore-pathways/commit/31ebc9992ad41f9447258dc6af550881b0da34be)) +- add Node.js/Bun cluster transport with runtime auto-detection + ([#49](https://github.com/flowcore-io/flowcore-pathways/issues/49)) + ([31ebc99](https://github.com/flowcore-io/flowcore-pathways/commit/31ebc9992ad41f9447258dc6af550881b0da34be)) ## [0.17.5](https://github.com/flowcore-io/flowcore-pathways/compare/v0.17.4...v0.17.5) (2026-03-21) - ### Bug Fixes -* auto-start pump when instance becomes cluster leader ([#47](https://github.com/flowcore-io/flowcore-pathways/issues/47)) ([fe74d59](https://github.com/flowcore-io/flowcore-pathways/commit/fe74d592ddd97dcacae79a3e017575a0e980c269)) +- auto-start pump when instance becomes cluster leader + ([#47](https://github.com/flowcore-io/flowcore-pathways/issues/47)) + ([fe74d59](https://github.com/flowcore-io/flowcore-pathways/commit/fe74d592ddd97dcacae79a3e017575a0e980c269)) ## [0.17.4](https://github.com/flowcore-io/flowcore-pathways/compare/v0.17.3...v0.17.4) (2026-03-21) - ### Bug Fixes -* register cluster instances with full ws:// URL including port ([#45](https://github.com/flowcore-io/flowcore-pathways/issues/45)) ([0cb1493](https://github.com/flowcore-io/flowcore-pathways/commit/0cb1493618bb4ed5c049336c23e9bf28c3b2079d)) +- register cluster instances with full ws:// URL including port + ([#45](https://github.com/flowcore-io/flowcore-pathways/issues/45)) + ([0cb1493](https://github.com/flowcore-io/flowcore-pathways/commit/0cb1493618bb4ed5c049336c23e9bf28c3b2079d)) ## [0.17.3](https://github.com/flowcore-io/flowcore-pathways/compare/v0.17.2...v0.17.3) (2026-03-20) - ### Bug Fixes -* suppress PostgreSQL NOTICE messages from CREATE TABLE IF NOT EXISTS ([#43](https://github.com/flowcore-io/flowcore-pathways/issues/43)) ([3def1d9](https://github.com/flowcore-io/flowcore-pathways/commit/3def1d9c6e9711f529010d77eaedd0b17f2799f0)) +- suppress PostgreSQL NOTICE messages from CREATE TABLE IF NOT EXISTS + ([#43](https://github.com/flowcore-io/flowcore-pathways/issues/43)) + ([3def1d9](https://github.com/flowcore-io/flowcore-pathways/commit/3def1d9c6e9711f529010d77eaedd0b17f2799f0)) ## [0.17.2](https://github.com/flowcore-io/flowcore-pathways/compare/v0.17.1...v0.17.2) (2026-03-20) - ### Bug Fixes -* use lightweight TenantTranslateNameToIdCommand instead of TenantFetchCommand ([466b040](https://github.com/flowcore-io/flowcore-pathways/commit/466b0400290644e9fb5c870c2fef04ec177f9719)) +- use lightweight TenantTranslateNameToIdCommand instead of TenantFetchCommand + ([466b040](https://github.com/flowcore-io/flowcore-pathways/commit/466b0400290644e9fb5c870c2fef04ec177f9719)) ## [0.17.1](https://github.com/flowcore-io/flowcore-pathways/compare/v0.17.0...v0.17.1) (2026-03-19) - ### Bug Fixes -* migrate npm publish to blacksmith trusted auth (no classic tokens) ([04764d7](https://github.com/flowcore-io/flowcore-pathways/commit/04764d7a6f6e216324ce65edd2ba0cc6a097f22f)) +- migrate npm publish to blacksmith trusted auth (no classic tokens) + ([04764d7](https://github.com/flowcore-io/flowcore-pathways/commit/04764d7a6f6e216324ce65edd2ba0cc6a097f22f)) ## [0.17.0](https://github.com/flowcore-io/flowcore-pathways/compare/v0.16.4...v0.17.0) (2026-03-19) - ### Features -* add cluster mode and data pump integration ([231384a](https://github.com/flowcore-io/flowcore-pathways/commit/231384a8685993040c1d2b637822499f9f756000)) -* add cluster mode and data pump integration ([9ca0fcb](https://github.com/flowcore-io/flowcore-pathways/commit/9ca0fcb10a9e7c8a6f266dc2c568e30a0b4e9672)) -* add declarative registration and auto-provisioning ([ee7f36a](https://github.com/flowcore-io/flowcore-pathways/commit/ee7f36ae87fb3fd901b9a9f918ffc82b19008939)) - +- add cluster mode and data pump integration + ([231384a](https://github.com/flowcore-io/flowcore-pathways/commit/231384a8685993040c1d2b637822499f9f756000)) +- add cluster mode and data pump integration + ([9ca0fcb](https://github.com/flowcore-io/flowcore-pathways/commit/9ca0fcb10a9e7c8a6f266dc2c568e30a0b4e9672)) +- add declarative registration and auto-provisioning + ([ee7f36a](https://github.com/flowcore-io/flowcore-pathways/commit/ee7f36ae87fb3fd901b9a9f918ffc82b19008939)) ### Bug Fixes -* replace inline npm: specifiers with import map references ([470732f](https://github.com/flowcore-io/flowcore-pathways/commit/470732fac377ed5d3f54d3ef852d792dbb58cef6)) -* resolve lint errors in cluster manager ([a809b6a](https://github.com/flowcore-io/flowcore-pathways/commit/a809b6a1daee16b276ffdb9c88a9f31233592275)) -* resolve pre-existing Buffer/BlobPart type error in file pathway write ([ba08aa3](https://github.com/flowcore-io/flowcore-pathways/commit/ba08aa334eb51c86894c6a39ee2010c22fc16dc5)) -* use runtime transport abstraction for cross-platform npm build ([9e4f7b9](https://github.com/flowcore-io/flowcore-pathways/commit/9e4f7b90ce45f1f8bfae913aeeb970971fad5dea)) +- replace inline npm: specifiers with import map references + ([470732f](https://github.com/flowcore-io/flowcore-pathways/commit/470732fac377ed5d3f54d3ef852d792dbb58cef6)) +- resolve lint errors in cluster manager + ([a809b6a](https://github.com/flowcore-io/flowcore-pathways/commit/a809b6a1daee16b276ffdb9c88a9f31233592275)) +- resolve pre-existing Buffer/BlobPart type error in file pathway write + ([ba08aa3](https://github.com/flowcore-io/flowcore-pathways/commit/ba08aa334eb51c86894c6a39ee2010c22fc16dc5)) +- use runtime transport abstraction for cross-platform npm build + ([9e4f7b9](https://github.com/flowcore-io/flowcore-pathways/commit/9e4f7b90ce45f1f8bfae913aeeb970971fad5dea)) ## [0.16.4](https://github.com/flowcore-io/flowcore-pathways/compare/v0.16.3...v0.16.4) (2025-07-25) diff --git a/README.md b/README.md index 410abd9..e9b8927 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ Pathways helps you build event-driven applications with type-safe pathways for p - [Core Concepts](#core-concepts) - [Usage](#usage) - [Creating a Pathways Builder](#creating-a-pathways-builder) + - [Runtime Defaults and Auto-Provisioning](#runtime-defaults-and-auto-provisioning) + - [Pump Concurrency](#pump-concurrency) - [Registering Pathways](#registering-pathways) - [Handling Events](#handling-events) - [Writing Events](#writing-events) @@ -19,7 +21,6 @@ Pathways helps you build event-driven applications with type-safe pathways for p - [HTTP Server Integration](#http-server-integration) - [Persistence Options](#persistence-options) - [Advanced Usage](#advanced-usage) - - [Runtime Defaults and Auto-Provisioning](#runtime-defaults-and-auto-provisioning) - [Auditing](#auditing) - [Custom Loggers](#custom-loggers) - [Retry Mechanisms](#retry-mechanisms) @@ -131,27 +132,55 @@ const pathways = new PathwaysBuilder({ ### Runtime Defaults and Auto-Provisioning -`PathwaysBuilder` can now drive different startup behavior for development and production: +`PathwaysBuilder` drives different startup behavior based on `runtimeEnv` (auto-detected from `NODE_ENV` when omitted): + +| `runtimeEnv` | `pathwayMode` default | Shared resources | Pathway registration | Local pump | +| ------------- | --------------------- | ---------------- | -------------------------------------- | ------------------------------------ | +| `production` | `managed` | provisioned | opt-in (`autoProvision.pathway: true`) | not started (control plane delivers) | +| `development` | `virtual` | provisioned | opt-in | started (single instance) | +| `test` | `virtual` | skipped | skipped | started | + +> **Why `managed` in production?** Virtual cluster mode requires long-lived processes with stable networking, which +> breaks serverless runtimes such as Next.js on Vercel (port collisions, instrumentation hook behavior, non-leader pod +> timeouts). `managed` routes event delivery through the Flowcore control plane and is safe in every runtime. -- `development`: starts the local in-process pump and only provisions shared Flowcore resources such as the data core, - flow types, and event types -- `production + virtual`: requires cluster mode and auto-provisions a virtual pathway by name -- `production + managed`: auto-provisions a managed pathway by name and does not start a local pump +#### Granular `autoProvision` + +Pass an `AutoProvisionConfig` to turn individual provisioning stages on or off: ```typescript +import { PathwaysBuilder } from "@flowcore/pathways" + const pathways = new PathwaysBuilder({ baseUrl: "https://api.flowcore.io", tenant: "your-tenant", dataCore: "your-data-core", apiKey: process.env.FLOWCORE_API_KEY!, - runtimeEnv: process.env.NODE_ENV === "production" ? "production" : "development", + runtimeEnv: "production", pathwayName: "orders-service", - pathwayMode: "virtual", // default - defaultAutoProvision: true, // default + // pathwayMode defaults to "managed" in production + autoProvision: { + dataCore: true, // create/update the data core (default: true) + flowType: true, // create/update registered flow types (default: true) + eventType: true, // create/update registered event types (default: true) + pathway: true, // upsert the by-name pathway instance (default: false) + }, +}) +``` + +Omitted fields fall back to resources-on / pathway-off, so most deployments only need to set `pathway: true` when they +want the by-name pathway registration. + +To disable everything (for CI or when resources are managed elsewhere): + +```typescript +const pathways = new PathwaysBuilder({ + /* ... */ + autoProvision: false, // or per-stage: { dataCore: false, flowType: false, eventType: false, pathway: false } }) ``` -For managed production delivery, provide a transform endpoint and leave event fetching to the control plane: +#### Managed production example ```typescript const pathways = new PathwaysBuilder({ @@ -161,19 +190,45 @@ const pathways = new PathwaysBuilder({ apiKey: process.env.FLOWCORE_API_KEY!, runtimeEnv: "production", pathwayName: "orders-service", - pathwayMode: "managed", + autoProvision: { pathway: true }, // register the managed pathway instance managedConfig: { endpointUrl: "https://app.example.com/api/flowcore", - authHeaders: { - authorization: `Bearer ${process.env.TRANSFORM_TOKEN!}`, - }, + authHeaders: { authorization: `Bearer ${process.env.TRANSFORM_TOKEN!}` }, sizeClass: "medium", }, }) ``` -To disable all remote provisioning and keep startup fully manual, set `defaultAutoProvision: false` or pass -`autoProvision: false` to `startPump()`. +#### Deprecated: `defaultAutoProvision` + +`defaultAutoProvision: boolean` still works but is deprecated — prefer `autoProvision`. Mapping: + +- `true` → `{ dataCore: true, flowType: true, eventType: true, pathway: false }` +- `false` → `{ dataCore: false, flowType: false, eventType: false, pathway: false }` + +### Pump Concurrency + +Control how many events each pump processes in parallel via `startPump({ concurrency })`. Accepts a number (shared +default) or a `PumpConcurrencyConfig` with per-flow-type overrides: + +```typescript +// Shared default across every flow type +await pathways.startPump({ concurrency: 4 }) + +// Per-flow-type overrides — unlisted flow types fall back to `default` (or 1) +await pathways.startPump({ + concurrency: { + default: 2, + byFlowType: { + orders: 8, + audit: 1, + }, + }, +}) +``` + +Omit `concurrency` to keep the default of 1 per flow type. `startPump()` also accepts a per-call `autoProvision` +override (same shape as the builder-level option) for overriding provisioning behavior at a specific call site. ### Registering Pathways diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index d2336e2..1d588bf 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -21,9 +21,53 @@ import type { } from "./types.ts" import type { PathwayClusterOptions } from "./cluster/types.ts" import { ClusterManager } from "./cluster/cluster-manager.ts" -import type { PathwayPumpOptions, PumpState } from "./pump/types.ts" +import type { AutoProvisionConfig, PathwayPumpOptions, PumpState } from "./pump/types.ts" import { PathwayPump } from "./pump/pathway-pump.ts" import { PathwayProvisioner, type ProvisionerRegistration } from "./provisioner.ts" + +export type { AutoProvisionConfig } from "./pump/types.ts" + +/** + * Defaults for each auto-provisioning stage — resources on, pathway registration off. + * + * These defaults deliberately skip the by-name pathway upsert so most deployments don't + * accidentally hit the control plane at startup; opt in via `autoProvision.pathway: true`. + */ +const DEFAULT_AUTO_PROVISION: Required = { + dataCore: true, + flowType: true, + eventType: true, + pathway: false, +} + +/** + * Resolve a user-supplied `autoProvision` / `defaultAutoProvision` value into a fully + * populated `Required`. + * + * Resolution rules (first match wins): + * 1. `autoProvision` object → merge with `DEFAULT_AUTO_PROVISION` + * 2. `autoProvision` boolean → `true` → defaults; `false` → all-false + * 3. `defaultAutoProvision === false` → all-false + * 4. otherwise → `DEFAULT_AUTO_PROVISION` + */ +function resolveAutoProvision( + autoProvision: boolean | AutoProvisionConfig | undefined, + defaultAutoProvision?: boolean, +): Required { + if (typeof autoProvision === "object" && autoProvision !== null) { + return { ...DEFAULT_AUTO_PROVISION, ...autoProvision } + } + if (autoProvision === true) { + return { ...DEFAULT_AUTO_PROVISION } + } + if (autoProvision === false) { + return { dataCore: false, flowType: false, eventType: false, pathway: false } + } + if (defaultAutoProvision === false) { + return { dataCore: false, flowType: false, eventType: false, pathway: false } + } + return { ...DEFAULT_AUTO_PROVISION } +} import { AUDIT_ENTITY_ID, AUDIT_ENTITY_TYPE, @@ -176,6 +220,16 @@ export interface PathwaysBuilderConfig { commandPollingIntervalMs?: number runtimeEnv?: PathwayRuntimeEnv pathwayMode?: PathwayMode + /** + * Granular auto-provisioning toggles. Omitted fields fall back to resources-on, + * pathway-off defaults — see `AutoProvisionConfig`. + */ + autoProvision?: AutoProvisionConfig + /** + * @deprecated Prefer `autoProvision`. Mapping: + * - `true` → `{ dataCore: true, flowType: true, eventType: true, pathway: false }` + * - `false` → `{ dataCore: false, flowType: false, eventType: false, pathway: false }` + */ defaultAutoProvision?: boolean managedConfig?: ManagedPathwayConfig } @@ -355,7 +409,7 @@ export class PathwaysBuilder< private readonly commandPollingIntervalMs: number private readonly runtimeEnv: PathwayRuntimeEnv private readonly pathwayMode: PathwayMode - private readonly defaultAutoProvision: boolean + private readonly autoProvision: Required private readonly managedConfig?: ManagedPathwayConfig private pathwayId?: string @@ -364,7 +418,7 @@ export class PathwaysBuilder< private pathwayPump: PathwayPump | null = null private commandPoller: CommandPoller | null = null private clusterBypassProcess = false - private currentPumpAutoProvision = false + private currentPumpProvisionsPathway = false private currentPumpUsesExplicitPulse = false private currentPumpUsesAutoPulse = false @@ -405,6 +459,7 @@ export class PathwaysBuilder< commandPollingIntervalMs, runtimeEnv, pathwayMode, + autoProvision, defaultAutoProvision, managedConfig, }: PathwaysBuilderConfig) { @@ -438,8 +493,10 @@ export class PathwaysBuilder< this.pulseIntervalMs = pulseIntervalMs ?? 30_000 this.commandPollingIntervalMs = commandPollingIntervalMs ?? 5_000 this.runtimeEnv = normalizeRuntimeEnv(runtimeEnv ?? process.env.NODE_ENV) - this.pathwayMode = pathwayMode ?? "virtual" - this.defaultAutoProvision = defaultAutoProvision ?? true + // Env-aware default: production → "managed" (control-plane delivery, serverless-safe), + // development/test → "virtual" (single-instance local pump). + this.pathwayMode = pathwayMode ?? (this.runtimeEnv === "production" ? "managed" : "virtual") + this.autoProvision = resolveAutoProvision(autoProvision, defaultAutoProvision) this.managedConfig = managedConfig if (enableSessionUserResolvers) { @@ -453,7 +510,7 @@ export class PathwaysBuilder< pathwayTimeoutMs, runtimeEnv: this.runtimeEnv, pathwayMode: this.pathwayMode, - defaultAutoProvision: this.defaultAutoProvision, + autoProvision: this.autoProvision, }) this.webhookBuilderFactory = new WebhookBuilder({ @@ -1407,7 +1464,7 @@ export class PathwaysBuilder< return } - if (this.currentPumpAutoProvision && !this.pathwayId) { + if (this.currentPumpProvisionsPathway && !this.pathwayId) { try { await this.registerPathwayInstance(this.buildRegistrations()) await this.applyAutoPulseConfig() @@ -1447,7 +1504,9 @@ export class PathwaysBuilder< }) } - private async provisionSharedResources(): Promise { + private async provisionSharedResources( + skipFlags: { skipDataCore?: boolean; skipFlowTypes?: boolean; skipEventTypes?: boolean } = {}, + ): Promise { const registrations = this.buildRegistrations() const provisioner = new PathwayProvisioner({ @@ -1459,6 +1518,9 @@ export class PathwaysBuilder< dataCoreDeleteProtection: this.dataCoreDeleteProtection, registrations, logger: this.logger, + skipDataCore: skipFlags.skipDataCore, + skipFlowTypes: skipFlags.skipFlowTypes, + skipEventTypes: skipFlags.skipEventTypes, }) await provisioner.provision() @@ -1630,30 +1692,46 @@ export class PathwaysBuilder< throw new Error("Pump already started") } - const autoProvision = options.autoProvision ?? this.defaultAutoProvision - this.currentPumpAutoProvision = autoProvision + // Resolve effective auto-provision config: per-call override wins over builder-level setting. + const ap = options.autoProvision != null ? resolveAutoProvision(options.autoProvision) : this.autoProvision + // Track pathway-registration intent separately so bootstrapLeaderPump can pick it up on leadership gain. + this.currentPumpProvisionsPathway = ap.pathway this.currentPumpUsesExplicitPulse = Boolean(options.pulse) this.currentPumpUsesAutoPulse = false - if (autoProvision) { - if (this.runtimeEnv === "production") { - if (this.pathwayMode === "virtual" && !this.isClusterActive) { - throw new Error("Cluster mode must be started before production virtual pump startup") - } + if (this.runtimeEnv === "test") { + this.logger.info("Skipping remote auto-provisioning in test runtime") + } else { + if (this.runtimeEnv === "production" && this.pathwayMode === "virtual" && !this.isClusterActive) { + throw new Error("Cluster mode must be started before production virtual pump startup") + } - this.logger.info("Auto-provisioning Flowcore resources for production startup", { + const shouldProvisionResources = ap.dataCore || ap.flowType || ap.eventType + let registrations: ProvisionerRegistration[] | null = null + if (shouldProvisionResources) { + this.logger.info("Auto-provisioning Flowcore resources", { + runtimeEnv: this.runtimeEnv, pathwayMode: this.pathwayMode, + autoProvision: ap, }) - if (this.pathwayMode === "virtual") { - await this.provisionSharedResources() - } else { - await this.provision() + registrations = await this.provisionSharedResources({ + skipDataCore: !ap.dataCore, + skipFlowTypes: !ap.flowType, + skipEventTypes: !ap.eventType, + }) + } + + if (ap.pathway) { + // In production+virtual, pathway registration must be deferred until the leader is ready + // (bootstrapLeaderPump performs it after pump start). Everywhere else it happens upfront. + const deferToLeaderBootstrap = this.runtimeEnv === "production" && this.pathwayMode === "virtual" + if (!deferToLeaderBootstrap) { + this.logger.info("Registering pathway instance", { + runtimeEnv: this.runtimeEnv, + pathwayMode: this.pathwayMode, + }) + await this.registerPathwayInstance(registrations ?? this.buildRegistrations()) } - } else if (this.runtimeEnv === "development") { - this.logger.info("Auto-provisioning shared Flowcore resources for development startup") - await this.provisionSharedResources() - } else { - this.logger.info("Skipping remote auto-provisioning in test runtime") } } @@ -1727,14 +1805,14 @@ export class PathwaysBuilder< async stopPump(): Promise { this.stopCommandPoller() if (!this.pathwayPump) { - this.currentPumpAutoProvision = false + this.currentPumpProvisionsPathway = false this.currentPumpUsesExplicitPulse = false this.currentPumpUsesAutoPulse = false return } await this.pathwayPump.stop() this.pathwayPump = null - this.currentPumpAutoProvision = false + this.currentPumpProvisionsPathway = false this.currentPumpUsesExplicitPulse = false this.currentPumpUsesAutoPulse = false this.logger.info("Pump stopped") diff --git a/src/pathways/cluster/cluster-manager.ts b/src/pathways/cluster/cluster-manager.ts index 763d909..d87564a 100644 --- a/src/pathways/cluster/cluster-manager.ts +++ b/src/pathways/cluster/cluster-manager.ts @@ -92,7 +92,8 @@ export class ClusterManager { private eventHandler: ((pathway: string, event: FlowcoreEvent) => Promise) | null = null private leadershipChangeHandler: ((isLeader: boolean) => void) | null = null private resetHandler: ((position?: PumpState) => Promise) | null = null - private pendingResets: Map void; reject: (error: Error) => void; sentAt: number }> = new Map() + private pendingResets: Map void; reject: (error: Error) => void; sentAt: number }> = + new Map() constructor(options: PathwayClusterOptions, logger?: Logger) { this.coordinator = options.coordinator diff --git a/src/pathways/provisioner.ts b/src/pathways/provisioner.ts index f360b5d..ee7d135 100644 --- a/src/pathways/provisioner.ts +++ b/src/pathways/provisioner.ts @@ -39,6 +39,20 @@ export interface PathwayProvisionerOptions { logger?: Logger /** Override FlowcoreClient for testing */ clientFactory?: (apiKey: string) => FlowcoreClient + /** + * Skip data core create/update — still resolves the data core id via fetch so + * downstream stages can run. Fails loudly if the data core doesn't exist and + * no description was configured (same error as the non-skip path). + */ + skipDataCore?: boolean + /** + * Skip flow type create/update. When event types are still provisioned, flow + * type ids are resolved via `FlowTypeListCommand` (read-only). When event + * types are skipped too, the flow type list fetch is skipped entirely. + */ + skipFlowTypes?: boolean + /** Skip the event type loop entirely (no list/fetch/create/update). */ + skipEventTypes?: boolean } /** @@ -57,6 +71,9 @@ export class PathwayProvisioner { private readonly registrations: ProvisionerRegistration[] private readonly logger: Logger private readonly clientFactory: (apiKey: string) => FlowcoreClient + private readonly skipDataCore: boolean + private readonly skipFlowTypes: boolean + private readonly skipEventTypes: boolean constructor(options: PathwayProvisionerOptions) { this.tenant = options.tenant @@ -68,34 +85,55 @@ export class PathwayProvisioner { this.registrations = options.registrations this.logger = options.logger ?? new NoopLogger() this.clientFactory = options.clientFactory ?? ((apiKey: string) => new FlowcoreClient({ apiKey })) + this.skipDataCore = options.skipDataCore ?? false + this.skipFlowTypes = options.skipFlowTypes ?? false + this.skipEventTypes = options.skipEventTypes ?? false } /** * Run the provisioning flow: * 1. Fetch tenant → tenantId - * 2. Fetch or create data core - * 3. For each unique flow type: fetch or create - * 4. For each event type: fetch or create - * 5. Update descriptions where they differ + * 2. Fetch or create data core (read-only when `skipDataCore` is set) + * 3. For each unique flow type: fetch or create (read-only when `skipFlowTypes`) + * 4. For each event type: fetch or create (entirely skipped when `skipEventTypes`) + * 5. Update descriptions where they differ (unless skipped) */ async provision(): Promise { const client = this.clientFactory(this.apiKey) - this.logger.info("Starting provisioning", { tenant: this.tenant, dataCore: this.dataCore }) + this.logger.info("Starting provisioning", { + tenant: this.tenant, + dataCore: this.dataCore, + skipDataCore: this.skipDataCore, + skipFlowTypes: this.skipFlowTypes, + skipEventTypes: this.skipEventTypes, + }) // Step 1: Fetch tenant const tenant = await client.execute(new TenantTranslateNameToIdCommand({ tenant: this.tenant })) const tenantId = tenant.id this.logger.info("Tenant resolved", { tenantId }) - // Step 2: Provision data core + // Step 2: Provision (or resolve) data core const dataCoreId = await this.provisionDataCore(client, tenantId) this.logger.info("Data core resolved", { dataCoreId }) - // Step 3: Provision flow types + // Short-circuit when both flow-type and event-type stages are skipped — + // no need to list flow types in that case. + if (this.skipFlowTypes && this.skipEventTypes) { + this.logger.info("Provisioning complete (flow types + event types skipped)") + return + } + + // Step 3: Provision (or resolve) flow types const flowTypeIds = await this.provisionFlowTypes(client, dataCoreId) this.logger.info("Flow types resolved", { count: flowTypeIds.size }) + if (this.skipEventTypes) { + this.logger.info("Provisioning complete (event types skipped)") + return + } + // Step 4: Provision event types await this.provisionEventTypes(client, flowTypeIds) this.logger.info("Provisioning complete") @@ -115,8 +153,12 @@ export class PathwayProvisioner { } if (dataCore) { - // Data core exists — update description if provided and changed - if (this.dataCoreDescription !== undefined && dataCore.description !== this.dataCoreDescription) { + // Data core exists — update description if provided and changed (unless skipping). + if ( + !this.skipDataCore && + this.dataCoreDescription !== undefined && + dataCore.description !== this.dataCoreDescription + ) { this.logger.info("Updating data core description", { dataCoreId: dataCore.id, from: dataCore.description, @@ -137,6 +179,14 @@ export class PathwayProvisioner { ) } + if (this.skipDataCore) { + // Data core missing but create/update was skipped — can't resolve an id. + throw new Error( + `Data core "${this.dataCore}" not found and skipDataCore is set. ` + + `Pre-provision the data core or enable data core provisioning.`, + ) + } + this.logger.info("Creating data core", { name: this.dataCore }) const created = await client.execute( new DataCoreCreateCommand({ @@ -177,8 +227,8 @@ export class PathwayProvisioner { if (existingFt) { flowTypeIds.set(name, existingFt.id) - // Update description if provided and changed - if (description !== undefined && existingFt.description !== description) { + // Update description if provided and changed (unless skipping). + if (!this.skipFlowTypes && description !== undefined && existingFt.description !== description) { this.logger.info("Updating flow type description", { flowType: name, from: existingFt.description, @@ -186,13 +236,19 @@ export class PathwayProvisioner { }) await client.execute(new FlowTypeUpdateCommand({ flowTypeId: existingFt.id, description })) } - } else if (description !== undefined) { + } else if (!this.skipFlowTypes && description !== undefined) { // Create flow type this.logger.info("Creating flow type", { name, description }) const created = await client.execute( new FlowTypeCreateCommand({ dataCoreId, name, description }), ) flowTypeIds.set(name, created.id) + } else if (this.skipFlowTypes) { + // Flow type missing, create/update skipped — downstream event type stage cannot proceed. + throw new Error( + `Flow type "${name}" not found in data core and skipFlowTypes is set. ` + + `Pre-provision the flow type or enable flow type provisioning.`, + ) } else { throw new Error( `Flow type "${name}" not found in data core. ` + diff --git a/src/pathways/pump/pathway-pump.ts b/src/pathways/pump/pathway-pump.ts index 3510782..2d0e2af 100644 --- a/src/pathways/pump/pathway-pump.ts +++ b/src/pathways/pump/pathway-pump.ts @@ -3,6 +3,7 @@ import type { Logger } from "../logger.ts" import { NoopLogger } from "../logger.ts" import type { PathwayPumpOptions, + PumpConcurrencyConfig, PumpNotifierConfig, PumpState, PumpStateManager, @@ -26,6 +27,29 @@ type DataPumpConstructor = any const RESTART_BASE_MS = 1_000 const RESTART_MAX_MS = 30_000 +/** + * Normalize the user-facing `concurrency` option into a `Required`. + * + * Accepts: + * - `undefined` → `{ default: 1, byFlowType: {} }` + * - `number` → `{ default: n, byFlowType: {} }` + * - object → shallow copy, `default` falls back to `1`, `byFlowType` to `{}` + */ +function normalizeConcurrency( + concurrency: PathwayPumpOptions["concurrency"], +): Required { + if (typeof concurrency === "number") { + return { default: concurrency, byFlowType: {} } + } + if (concurrency && typeof concurrency === "object") { + return { + default: concurrency.default ?? 1, + byFlowType: { ...(concurrency.byFlowType ?? {}) }, + } + } + return { default: 1, byFlowType: {} } +} + /** * PathwayPump orchestrates data pump instances for auto-fetching events from Flowcore. * @@ -37,6 +61,7 @@ export class PathwayPump { private readonly notifier: PumpNotifierConfig private readonly bufferSize: number private readonly maxRedeliveryCount: number + private readonly concurrency: Required private readonly logger: Logger private pulseConfig?: { url: string @@ -67,6 +92,7 @@ export class PathwayPump { this.notifier = options.notifier ?? { type: "websocket" } this.bufferSize = options.bufferSize ?? 1000 this.maxRedeliveryCount = options.maxRedeliveryCount ?? 3 + this.concurrency = normalizeConcurrency(options.concurrency) this.logger = logger ?? new NoopLogger() this.pulseConfig = options.pulse } @@ -142,7 +168,7 @@ export class PathwayPump { }, stateManager, processor: { - concurrency: 1, + concurrency: this.concurrency.byFlowType[flowType] ?? this.concurrency.default, handler: async (events: FlowcoreEvent[]) => { for (const event of events) { const pathway = `${event.flowType}/${event.eventType}` diff --git a/src/pathways/pump/types.ts b/src/pathways/pump/types.ts index 8b1f1f2..a08eecf 100644 --- a/src/pathways/pump/types.ts +++ b/src/pathways/pump/types.ts @@ -1,5 +1,39 @@ import type { PostgresConfig } from "../postgres/index.ts" +/** + * Granular toggles for each provisioning stage. + * + * Omitted fields fall back to defaults: resources on, pathway instance off. + * + * @property dataCore Create/update the data core when `dataCoreDescription` is set. Default: true. + * @property flowType Create/update registered flow types. Default: true. + * @property eventType Create/update registered event types. Default: true. + * @property pathway Upsert the by-name pathway instance (virtual or managed). Default: false. + */ +export interface AutoProvisionConfig { + /** Create/update the data core when `dataCoreDescription` is set. Default: true. */ + dataCore?: boolean + /** Create/update registered flow types. Default: true. */ + flowType?: boolean + /** Create/update registered event types. Default: true. */ + eventType?: boolean + /** Upsert the by-name pathway instance (virtual or managed). Default: false. */ + pathway?: boolean +} + +/** + * Concurrency settings for event processing per pump. + * + * @property default Default concurrency applied to every flow type. Default: 1. + * @property byFlowType Per-flow-type overrides keyed by `flowType` name. + */ +export interface PumpConcurrencyConfig { + /** Default concurrency applied to every flow type. Default: 1. */ + default?: number + /** Per-flow-type overrides keyed by `flowType` name. */ + byFlowType?: Record +} + /** * Options for configuring the data pump */ @@ -8,8 +42,19 @@ export interface PathwayPumpOptions { notifier?: PumpNotifierConfig bufferSize?: number maxRedeliveryCount?: number - /** If true, applies the builder's environment-aware provisioning rules before startup */ - autoProvision?: boolean + /** + * Controls whether startup runs the builder's provisioning rules. + * + * Accepts a boolean (legacy) or an `AutoProvisionConfig` object for per-stage control. + * When omitted, the builder's constructor-level `autoProvision` / `defaultAutoProvision` + * settings are used. + */ + autoProvision?: boolean | AutoProvisionConfig + /** + * Concurrency for event processing: pass a number for a shared default, or an object + * for per-flow-type overrides. Missing flow types fall back to `default` (or 1). + */ + concurrency?: number | PumpConcurrencyConfig /** Optional pulse reporting to control plane */ pulse?: { /** Control plane API URL for pulse endpoint */ diff --git a/src/router/index.ts b/src/router/index.ts index 6442d0c..c24d4bf 100644 --- a/src/router/index.ts +++ b/src/router/index.ts @@ -195,7 +195,9 @@ export class PathwayRouter { _body: ResetCallbackBody, _providedSecret: string | null, ): { success: boolean; flowTypesReset: string[] } { - this.logger.warn("processReset is deprecated — virtual pathway commands are now poll-based. This method is a no-op.") + this.logger.warn( + "processReset is deprecated — virtual pathway commands are now poll-based. This method is a no-op.", + ) return { success: false, flowTypesReset: [] } } } diff --git a/tests/pathway-builder-config.test.ts b/tests/pathway-builder-config.test.ts index 7a1363f..d778b00 100644 --- a/tests/pathway-builder-config.test.ts +++ b/tests/pathway-builder-config.test.ts @@ -8,6 +8,16 @@ const baseOpts = { apiKey: "fc_testid_testsecret", } +type InternalBuilderShape = { + pathwayMode: "virtual" | "managed" + autoProvision: { dataCore: boolean; flowType: boolean; eventType: boolean; pathway: boolean } +} + +// deno-lint-ignore no-explicit-any +function inspect(builder: PathwaysBuilder): InternalBuilderShape { + return builder as unknown as InternalBuilderShape +} + Deno.test({ name: "PathwaysBuilder config — virtual pathway fields", sanitizeResources: false, @@ -69,5 +79,153 @@ Deno.test({ }) assertEquals(typeof builder, "object") }) + + await t.step("autoProvision object merges with resources-on/pathway-off defaults", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "development", + autoProvision: { pathway: true }, + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.autoProvision, { + dataCore: true, + flowType: true, + eventType: true, + pathway: true, + }) + }) + + await t.step("autoProvision individual field overrides produce expected merge", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "development", + autoProvision: { dataCore: false, pathway: true }, + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.autoProvision, { + dataCore: false, + flowType: true, + eventType: true, + pathway: true, + }) + }) + + await t.step("defaultAutoProvision=false maps to all-false", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "development", + defaultAutoProvision: false, + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.autoProvision, { + dataCore: false, + flowType: false, + eventType: false, + pathway: false, + }) + }) + + await t.step("defaultAutoProvision=true maps to resources-on, pathway-off", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "development", + defaultAutoProvision: true, + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.autoProvision, { + dataCore: true, + flowType: true, + eventType: true, + pathway: false, + }) + }) + + await t.step("both unset defaults to resources-on, pathway-off", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "development", + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.autoProvision, { + dataCore: true, + flowType: true, + eventType: true, + pathway: false, + }) + }) + + await t.step("autoProvision overrides defaultAutoProvision when both are set", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "development", + defaultAutoProvision: false, + autoProvision: { pathway: true }, + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + // autoProvision object wins — resources back on, pathway explicitly on. + assertEquals(builder.autoProvision, { + dataCore: true, + flowType: true, + eventType: true, + pathway: true, + }) + }) + + await t.step("pathwayMode default is 'managed' in production", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "production", + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.pathwayMode, "managed") + }) + + await t.step("pathwayMode default is 'virtual' in development", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "development", + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.pathwayMode, "virtual") + }) + + await t.step("pathwayMode default is 'virtual' in test", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "test", + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.pathwayMode, "virtual") + }) + + await t.step("explicit pathwayMode still wins over env-aware default", () => { + const builder = inspect( + new PathwaysBuilder({ + ...baseOpts, + runtimeEnv: "production", + pathwayMode: "virtual", + // deno-lint-ignore no-explicit-any + }) as unknown as PathwaysBuilder, + ) + assertEquals(builder.pathwayMode, "virtual") + }) }, }) diff --git a/tests/pathway-pump.test.ts b/tests/pathway-pump.test.ts index bcdf21f..fe61d40 100644 --- a/tests/pathway-pump.test.ts +++ b/tests/pathway-pump.test.ts @@ -131,6 +131,124 @@ Deno.test({ assertEquals(groups.get("payment"), ["received"]) }) + await t.step("concurrency defaults to 1 per flow type when unset", async () => { + const factory = createInMemoryStateFactory() + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + }) + + pump.configure({ + tenant: "test-tenant", + dataCore: "test-dc", + apiKey: "test-key", + baseUrl: "https://api.flowcore.io", + processEvent: async (_pathway: string, _event: FlowcoreEvent) => {}, + }) + + // Bypass the dynamic `@flowcore/data-pump` import by invoking the per-flowType + // bootstrap directly with a stubbed constructor — same pattern as the setPulseConfig test. + const createdConcurrencies: Record = {} + const internal = pump as unknown as { + dataPumpConstructor: { + create(options: Record): Promise<{ start(cb?: unknown): Promise }> + } + startPumpForFlowType(flowType: string, eventTypes: string[]): Promise + } + internal.dataPumpConstructor = { + create: (options: Record) => { + const dataSource = options.dataSource as { flowType: string } + const processor = options.processor as { concurrency: number } + createdConcurrencies[dataSource.flowType] = processor.concurrency + return Promise.resolve({ start: async () => {} }) + }, + } + + await internal.startPumpForFlowType("user", ["created"]) + await internal.startPumpForFlowType("order", ["placed"]) + + assertEquals(createdConcurrencies.user, 1) + assertEquals(createdConcurrencies.order, 1) + }) + + await t.step("numeric concurrency sets a shared default for every flow type", async () => { + const factory = createInMemoryStateFactory() + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + concurrency: 4, + }) + + pump.configure({ + tenant: "test-tenant", + dataCore: "test-dc", + apiKey: "test-key", + baseUrl: "https://api.flowcore.io", + processEvent: async () => {}, + }) + + const createdConcurrencies: Record = {} + const internal = pump as unknown as { + dataPumpConstructor: { + create(options: Record): Promise<{ start(cb?: unknown): Promise }> + } + startPumpForFlowType(flowType: string, eventTypes: string[]): Promise + } + internal.dataPumpConstructor = { + create: (options: Record) => { + const dataSource = options.dataSource as { flowType: string } + const processor = options.processor as { concurrency: number } + createdConcurrencies[dataSource.flowType] = processor.concurrency + return Promise.resolve({ start: async () => {} }) + }, + } + + await internal.startPumpForFlowType("user", ["created"]) + await internal.startPumpForFlowType("order", ["placed"]) + + assertEquals(createdConcurrencies.user, 4) + assertEquals(createdConcurrencies.order, 4) + }) + + await t.step("per-flow-type overrides win; missing ones fall back to default", async () => { + const factory = createInMemoryStateFactory() + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + concurrency: { default: 2, byFlowType: { orders: 5 } }, + }) + + pump.configure({ + tenant: "test-tenant", + dataCore: "test-dc", + apiKey: "test-key", + baseUrl: "https://api.flowcore.io", + processEvent: async () => {}, + }) + + const createdConcurrencies: Record = {} + const internal = pump as unknown as { + dataPumpConstructor: { + create(options: Record): Promise<{ start(cb?: unknown): Promise }> + } + startPumpForFlowType(flowType: string, eventTypes: string[]): Promise + } + internal.dataPumpConstructor = { + create: (options: Record) => { + const dataSource = options.dataSource as { flowType: string } + const processor = options.processor as { concurrency: number } + createdConcurrencies[dataSource.flowType] = processor.concurrency + return Promise.resolve({ start: async () => {} }) + }, + } + + await internal.startPumpForFlowType("orders", ["placed"]) + await internal.startPumpForFlowType("users", ["created"]) + + assertEquals(createdConcurrencies.orders, 5) + assertEquals(createdConcurrencies.users, 2) + }) + await t.step("setPulseConfig recreates running pumps with the new pulse configuration", async () => { const factory = createInMemoryStateFactory() const pump = new PathwayPump({ diff --git a/tests/provisioner.test.ts b/tests/provisioner.test.ts index 820ad67..9a35402 100644 --- a/tests/provisioner.test.ts +++ b/tests/provisioner.test.ts @@ -448,6 +448,254 @@ Deno.test({ assertEquals(commands.includes("DataCoreUpdateCommand"), false) }) + await t.step("skipDataCore: resolves id but does not update description", async () => { + const commands: string[] = [] + + const client = createMockClient({ + TenantTranslateNameToIdCommand: () => baseTenant(), + DataCoreFetchCommand: () => { + commands.push("DataCoreFetchCommand") + return baseDataCore({ description: "Old desc" }) + }, + DataCoreUpdateCommand: () => { + commands.push("DataCoreUpdateCommand") + return baseDataCore({ description: "New desc" }) + }, + FlowTypeListCommand: () => [baseFlowType("user", "ft-001", "User events")], + EventTypeListCommand: () => [baseEventType("created", "et-001", "ft-001", "User created")], + }) + + const provisioner = new PathwayProvisioner({ + tenant: "my-org", + dataCore: "my-core", + apiKey: "fc_test_key", + dataCoreDescription: "New desc", + dataCoreAccessControl: "private", + dataCoreDeleteProtection: false, + skipDataCore: true, + registrations: [ + { + flowType: "user", + eventType: "created", + flowTypeDescription: "User events", + eventTypeDescription: "User created", + }, + ], + clientFactory: () => client, + }) + + await provisioner.provision() + + assertEquals(commands.includes("DataCoreFetchCommand"), true) + assertEquals(commands.includes("DataCoreUpdateCommand"), false) + }) + + await t.step("skipDataCore: fails loudly when data core missing and no description", async () => { + const client = createMockClient({ + TenantTranslateNameToIdCommand: () => baseTenant(), + DataCoreFetchCommand: () => { + throw new NotFoundException("DataCore", {}) + }, + }) + + const provisioner = new PathwayProvisioner({ + tenant: "my-org", + dataCore: "my-core", + apiKey: "fc_test_key", + dataCoreAccessControl: "private", + dataCoreDeleteProtection: false, + skipDataCore: true, + registrations: [], + clientFactory: () => client, + }) + + await assertRejects( + () => provisioner.provision(), + Error, + 'Data core "my-core" not found', + ) + }) + + await t.step("skipDataCore: fails when data core missing even with description", async () => { + const client = createMockClient({ + TenantTranslateNameToIdCommand: () => baseTenant(), + DataCoreFetchCommand: () => { + throw new NotFoundException("DataCore", {}) + }, + }) + + const provisioner = new PathwayProvisioner({ + tenant: "my-org", + dataCore: "my-core", + apiKey: "fc_test_key", + dataCoreDescription: "desc", + dataCoreAccessControl: "private", + dataCoreDeleteProtection: false, + skipDataCore: true, + registrations: [], + clientFactory: () => client, + }) + + await assertRejects( + () => provisioner.provision(), + Error, + "skipDataCore is set", + ) + }) + + await t.step("skipFlowTypes: resolves ids via list but does not create/update", async () => { + const commands: string[] = [] + + const client = createMockClient({ + TenantTranslateNameToIdCommand: () => baseTenant(), + DataCoreFetchCommand: () => baseDataCore(), + FlowTypeListCommand: () => { + commands.push("FlowTypeListCommand") + return [baseFlowType("user", "ft-001", "Old flow desc")] + }, + FlowTypeUpdateCommand: () => { + commands.push("FlowTypeUpdateCommand") + return baseFlowType("user", "ft-001", "New flow desc") + }, + EventTypeListCommand: () => [baseEventType("created", "et-001", "ft-001", "User created")], + }) + + const provisioner = new PathwayProvisioner({ + tenant: "my-org", + dataCore: "my-core", + apiKey: "fc_test_key", + dataCoreAccessControl: "private", + dataCoreDeleteProtection: false, + skipFlowTypes: true, + registrations: [ + { + flowType: "user", + eventType: "created", + flowTypeDescription: "New flow desc", + eventTypeDescription: "User created", + }, + ], + clientFactory: () => client, + }) + + await provisioner.provision() + + assertEquals(commands.includes("FlowTypeListCommand"), true) + assertEquals(commands.includes("FlowTypeUpdateCommand"), false) + }) + + await t.step("skipFlowTypes: fails when flow type missing", async () => { + const client = createMockClient({ + TenantTranslateNameToIdCommand: () => baseTenant(), + DataCoreFetchCommand: () => baseDataCore(), + FlowTypeListCommand: () => [], + }) + + const provisioner = new PathwayProvisioner({ + tenant: "my-org", + dataCore: "my-core", + apiKey: "fc_test_key", + dataCoreAccessControl: "private", + dataCoreDeleteProtection: false, + skipFlowTypes: true, + registrations: [ + { + flowType: "user", + eventType: "created", + flowTypeDescription: "User events", + }, + ], + clientFactory: () => client, + }) + + await assertRejects( + () => provisioner.provision(), + Error, + "skipFlowTypes is set", + ) + }) + + await t.step("skipEventTypes: skips the event type loop entirely", async () => { + const commands: string[] = [] + + const client = createMockClient({ + TenantTranslateNameToIdCommand: () => baseTenant(), + DataCoreFetchCommand: () => baseDataCore(), + FlowTypeListCommand: () => [baseFlowType("user", "ft-001", "User events")], + EventTypeListCommand: () => { + commands.push("EventTypeListCommand") + return [] + }, + EventTypeCreateCommand: () => { + commands.push("EventTypeCreateCommand") + return baseEventType("created", "et-new", "ft-001", "User created") + }, + }) + + const provisioner = new PathwayProvisioner({ + tenant: "my-org", + dataCore: "my-core", + apiKey: "fc_test_key", + dataCoreAccessControl: "private", + dataCoreDeleteProtection: false, + skipEventTypes: true, + registrations: [ + { + flowType: "user", + eventType: "created", + flowTypeDescription: "User events", + eventTypeDescription: "User created", + }, + ], + clientFactory: () => client, + }) + + await provisioner.provision() + + assertEquals(commands.includes("EventTypeListCommand"), false) + assertEquals(commands.includes("EventTypeCreateCommand"), false) + }) + + await t.step("skipFlowTypes + skipEventTypes: short-circuits after data core", async () => { + const commands: string[] = [] + + const client = createMockClient({ + TenantTranslateNameToIdCommand: () => baseTenant(), + DataCoreFetchCommand: () => { + commands.push("DataCoreFetchCommand") + return baseDataCore() + }, + FlowTypeListCommand: () => { + commands.push("FlowTypeListCommand") + return [] + }, + }) + + const provisioner = new PathwayProvisioner({ + tenant: "my-org", + dataCore: "my-core", + apiKey: "fc_test_key", + dataCoreAccessControl: "private", + dataCoreDeleteProtection: false, + skipFlowTypes: true, + skipEventTypes: true, + registrations: [ + { + flowType: "user", + eventType: "created", + flowTypeDescription: "User events", + eventTypeDescription: "User created", + }, + ], + clientFactory: () => client, + }) + + await provisioner.provision() + + assertEquals(commands.includes("DataCoreFetchCommand"), true) + assertEquals(commands.includes("FlowTypeListCommand"), false) + }) + await t.step("creates multiple flow types and event types", async () => { const createdFlowTypes: string[] = [] const createdEventTypes: string[] = [] diff --git a/tests/pump-runtime-behavior.test.ts b/tests/pump-runtime-behavior.test.ts index 7f3a637..cd41ec8 100644 --- a/tests/pump-runtime-behavior.test.ts +++ b/tests/pump-runtime-behavior.test.ts @@ -126,6 +126,8 @@ Deno.test({ try { const builder = createBuilder({ runtimeEnv: "production", + // Explicit virtual — overrides the prod-default "managed" so the cluster check fires. + pathwayMode: "virtual", pathwayName: "virtual-service", }) @@ -182,6 +184,8 @@ Deno.test({ runtimeEnv: "production", pathwayName: "virtual-service", pathwayMode: "virtual", + // Opt in to pathway registration — default is resources-only. + autoProvision: { pathway: true }, }) ;(builder as unknown as { clusterManager: { isRunning: boolean; isLeader: boolean } }).clusterManager = { isRunning: true, @@ -259,6 +263,7 @@ Deno.test({ runtimeEnv: "production", pathwayName: "virtual-service", pathwayMode: "virtual", + autoProvision: { pathway: true }, }) const clusterManager = { isRunning: true, isLeader: false } ;(builder as unknown as { clusterManager: typeof clusterManager }).clusterManager = clusterManager @@ -343,6 +348,7 @@ Deno.test({ runtimeEnv: "production", pathwayName: "virtual-service", pathwayMode: "virtual", + autoProvision: { pathway: true }, }) ;(builder as unknown as { clusterManager: { isRunning: boolean; isLeader: boolean } }).clusterManager = { isRunning: true, @@ -390,6 +396,7 @@ Deno.test({ runtimeEnv: "production", pathwayMode: "managed", pathwayName: "managed-service", + autoProvision: { pathway: true }, managedConfig: { endpointUrl: "https://app.example.com/flowcore", authHeaders: { authorization: "Bearer secret" }, @@ -410,5 +417,199 @@ Deno.test({ fetchStub.restore() } }) + + await t.step( + "production default (no explicit mode) uses managed mode, provisions resources only, skips local pump", + async () => { + let provisionCalls = 0 + let startCalls = 0 + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + // No explicit pathwayMode; prod defaults to "managed" now. + const builder = createBuilder({ + runtimeEnv: "production", + pathwayName: "managed-service", + managedConfig: { + endpointUrl: "https://app.example.com/flowcore", + }, + }) + + await builder.startPump(createPumpOptions()) + + // Resources on, pathway registration off by default. + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 0) + assertEquals(fetchBodies.length, 0) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }, + ) + + await t.step( + "development default provisions resources, does NOT register pathway, starts local pump", + async () => { + let provisionCalls = 0 + let startCalls = 0 + let fetchCalls = 0 + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const fetchStub = stub(globalThis, "fetch", async () => { + fetchCalls++ + return new Response("{}", { status: 200 }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "development", + pathwayName: "dev-service", + }) + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 1) + assertEquals(fetchCalls, 0) + assertEquals(startCalls, 1) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }, + ) + + await t.step("autoProvision.pathway=true triggers registerPathwayInstance", async () => { + let provisionCalls = 0 + let startCalls = 0 + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "development", + pathwayName: "dev-service", + autoProvision: { pathway: true }, + }) + + await builder.startPump(createPumpOptions()) + + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 1) + assertEquals(fetchBodies.length, 1) + assertEquals(fetchBodies[0].type, "virtual") + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) + + await t.step("per-startPump autoProvision override wins over builder-level config", async () => { + let provisionCalls = 0 + let startCalls = 0 + const fetchBodies: Array> = [] + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => { + startCalls++ + }) + const fetchStub = stub(globalThis, "fetch", async (_input, init) => { + fetchBodies.push(JSON.parse(String(init?.body ?? "{}"))) + return new Response(JSON.stringify({ pathwayId: crypto.randomUUID(), status: "created" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + }) + + try { + // Builder-level: resources on, pathway off (default). Override via startPump. + const builder = createBuilder({ + runtimeEnv: "development", + pathwayName: "dev-service", + }) + + await builder.startPump({ + ...createPumpOptions(), + autoProvision: { pathway: true }, + }) + + assertEquals(provisionCalls, 1) + assertEquals(startCalls, 1) + assertEquals(fetchBodies.length, 1) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) + + await t.step("defaultAutoProvision=true maps to resources-on, pathway-off (new default semantics)", async () => { + let provisionCalls = 0 + let fetchCalls = 0 + + const provisionStub = stub(PathwayProvisioner.prototype, "provision", async () => { + provisionCalls++ + }) + const startStub = stub(PathwayPump.prototype, "start", async () => {}) + const fetchStub = stub(globalThis, "fetch", async () => { + fetchCalls++ + return new Response("{}", { status: 200 }) + }) + + try { + const builder = createBuilder({ + runtimeEnv: "development", + pathwayName: "dev-service", + defaultAutoProvision: true, + }) + + await builder.startPump(createPumpOptions()) + + // Legacy `true` → resources only, no pathway registration. + assertEquals(provisionCalls, 1) + assertEquals(fetchCalls, 0) + } finally { + provisionStub.restore() + startStub.restore() + fetchStub.restore() + } + }) }, })