From 208f42ef0b40c5127e66209d63090c7c39db7ffd Mon Sep 17 00:00:00 2001 From: jbiskur Date: Thu, 23 Apr 2026 22:05:25 +0100 Subject: [PATCH] feat: thread sourceId through PulseEmitter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The data-pathways control-plane uses (pathway_id, source_id) as the unique key for pump_pulses rows. Without sourceId in the pulse payload, every pulse lands on a NULL-sourceId row per flowType — shadowing any correctly keyed siblings and making multi-source identity impossible. Changes: - Add optional sourceId to the `pulse` option on FlowcoreDataPumpOptions - Capture sourceId in the PulseEmitter snapshot closure - Forward it to SendPumpPulseCommand input The upstream @flowcore/sdk is pinned at ^1.78.0 in this repo but SendPumpPulseInput gained sourceId only on the 3.x train (see flowcore-sdk#219). Until data-pump migrates to sdk 3.x we cast the command input through its constructor-parameter type, which lets the field pass through getBody() → POST body unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/data-pump/data-pump.ts | 7 ++++- src/data-pump/pulse.ts | 43 ++++++++++++++++----------- test/tests/pulse.test.ts | 61 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 19 deletions(-) create mode 100644 test/tests/pulse.test.ts diff --git a/src/data-pump/data-pump.ts b/src/data-pump/data-pump.ts index dbc1bb7..6ecdece 100644 --- a/src/data-pump/data-pump.ts +++ b/src/data-pump/data-pump.ts @@ -56,6 +56,7 @@ export interface FlowcoreDataPumpOptions { url: string intervalMs?: number pathwayId: string + sourceId?: string /** Log level for successful pulses. Defaults to 'debug'. */ successLogLevel?: "debug" | "info" | "warn" | "error" /** Log level for pulse failures. Defaults to 'warn'. */ @@ -177,6 +178,7 @@ export class FlowcoreDataPump { if (options.pulse) { const pathwayId = options.pulse.pathwayId + const sourceId = options.pulse.sourceId pump.pulseEmitter = new PulseEmitter( { url: options.pulse.url, @@ -188,7 +190,10 @@ export class FlowcoreDataPump { }, () => { const snapshot = pump.getSnapshot() - if (snapshot) snapshot.pathwayId = pathwayId + if (snapshot) { + snapshot.pathwayId = pathwayId + snapshot.sourceId = sourceId + } return snapshot }, ) diff --git a/src/data-pump/pulse.ts b/src/data-pump/pulse.ts index a252d1a..53dd3db 100644 --- a/src/data-pump/pulse.ts +++ b/src/data-pump/pulse.ts @@ -20,6 +20,7 @@ export interface PulseEmitterOptions { export interface PulseSnapshot { pathwayId: string + sourceId?: string flowType: string timeBucket: string eventId: string | undefined @@ -93,25 +94,31 @@ export class PulseEmitter { const client = getFlowcoreClient(this.options.auth, this.options.url) + // sourceId is accepted by the server (data-pathways CP) and added to + // @flowcore/sdk's SendPumpPulseInput on the 3.x train. data-pump is still + // pinned to sdk ^1.78.0, so we widen the literal type locally until the + // sdk-3 migration lands. Remove the cast then. + const input = { + pathwayId: snapshot.pathwayId, + sourceId: snapshot.sourceId, + flowType: snapshot.flowType, + timeBucket: snapshot.timeBucket, + eventId: snapshot.eventId ?? null, + isLive: snapshot.isLive, + buffer: { + depth: snapshot.bufferDepth, + reserved: snapshot.bufferReserved, + sizeBytes: snapshot.bufferSizeBytes, + }, + counters: { + acknowledged: snapshot.acknowledgedTotal, + failed: snapshot.failedTotal, + pulled: snapshot.pulledTotal, + }, + uptimeMs: snapshot.uptimeMs, + } await client.execute( - new SendPumpPulseCommand({ - pathwayId: snapshot.pathwayId, - flowType: snapshot.flowType, - timeBucket: snapshot.timeBucket, - eventId: snapshot.eventId ?? null, - isLive: snapshot.isLive, - buffer: { - depth: snapshot.bufferDepth, - reserved: snapshot.bufferReserved, - sizeBytes: snapshot.bufferSizeBytes, - }, - counters: { - acknowledged: snapshot.acknowledgedTotal, - failed: snapshot.failedTotal, - pulled: snapshot.pulledTotal, - }, - uptimeMs: snapshot.uptimeMs, - }), + new SendPumpPulseCommand(input as ConstructorParameters[0]), ) this.logger?.[this.successLogLevel]?.("Pulse sent", { diff --git a/test/tests/pulse.test.ts b/test/tests/pulse.test.ts new file mode 100644 index 0000000..3c2c833 --- /dev/null +++ b/test/tests/pulse.test.ts @@ -0,0 +1,61 @@ +import { assertEquals } from "@std/assert/equals" +import { describe, it } from "@std/testing/bdd" +import { PulseEmitter, type PulseSnapshot } from "../../src/data-pump/pulse.ts" + +// The PulseEmitter posts via a Flowcore SDK Command, which routes through +// the network. We can't test the wire payload here without a mock server, +// but we can verify the snapshot shape the emitter threads through. + +describe("PulseEmitter snapshot shape", () => { + it("PulseSnapshot carries optional sourceId", () => { + const snapshot: PulseSnapshot = { + pathwayId: "pathway-a", + sourceId: "source-1", + flowType: "flow.0", + timeBucket: "2026042317", + eventId: undefined, + isLive: true, + bufferDepth: 0, + bufferReserved: 0, + bufferSizeBytes: 0, + acknowledgedTotal: 0, + failedTotal: 0, + pulledTotal: 0, + uptimeMs: 0, + } + assertEquals(snapshot.sourceId, "source-1") + }) + + it("PulseSnapshot.sourceId is optional for back-compat", () => { + const snapshot: PulseSnapshot = { + pathwayId: "pathway-a", + flowType: "flow.0", + timeBucket: "2026042317", + eventId: undefined, + isLive: true, + bufferDepth: 0, + bufferReserved: 0, + bufferSizeBytes: 0, + acknowledgedTotal: 0, + failedTotal: 0, + pulledTotal: 0, + uptimeMs: 0, + } + assertEquals(snapshot.sourceId, undefined) + }) + + it("emitter accepts sourceId in options (compile-time check)", () => { + // The emitter takes PulseEmitterOptions; sourceId lives on the closure- + // captured snapshot, not on options. This test just confirms the class + // constructs without sourceId in the options shape. + const emitter = new PulseEmitter( + { + url: "http://localhost:0", + auth: { apiKey: "fc_noop_noop" }, + }, + () => null, + ) + assertEquals(typeof emitter.start, "function") + assertEquals(typeof emitter.stop, "function") + }) +})