Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/data-pump/data-pump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface FlowcoreDataPumpOptions {
url: string
intervalMs?: number
pathwayId: string
sourceId?: string
/** Log level for successful pulses. Defaults to 'debug'. */
successLogLevel?: "debug" | "info" | "warn" | "error"
/** Log level for pulse failures. Defaults to 'warn'. */
Expand Down Expand Up @@ -177,6 +178,7 @@ export class FlowcoreDataPump {

if (options.pulse) {
const pathwayId = options.pulse.pathwayId
const sourceId = options.pulse.sourceId
pump.pulseEmitter = new PulseEmitter(
{
url: options.pulse.url,
Expand All @@ -188,7 +190,10 @@ export class FlowcoreDataPump {
},
() => {
const snapshot = pump.getSnapshot()
if (snapshot) snapshot.pathwayId = pathwayId
if (snapshot) {
snapshot.pathwayId = pathwayId
snapshot.sourceId = sourceId
}
return snapshot
},
)
Expand Down
43 changes: 25 additions & 18 deletions src/data-pump/pulse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export interface PulseEmitterOptions {

export interface PulseSnapshot {
pathwayId: string
sourceId?: string
flowType: string
timeBucket: string
eventId: string | undefined
Expand Down Expand Up @@ -93,25 +94,31 @@ export class PulseEmitter {

const client = getFlowcoreClient(this.options.auth, this.options.url)

// sourceId is accepted by the server (data-pathways CP) and added to
// @flowcore/sdk's SendPumpPulseInput on the 3.x train. data-pump is still
// pinned to sdk ^1.78.0, so we widen the literal type locally until the
// sdk-3 migration lands. Remove the cast then.
const input = {
pathwayId: snapshot.pathwayId,
sourceId: snapshot.sourceId,
flowType: snapshot.flowType,
timeBucket: snapshot.timeBucket,
eventId: snapshot.eventId ?? null,
isLive: snapshot.isLive,
buffer: {
depth: snapshot.bufferDepth,
reserved: snapshot.bufferReserved,
sizeBytes: snapshot.bufferSizeBytes,
},
counters: {
acknowledged: snapshot.acknowledgedTotal,
failed: snapshot.failedTotal,
pulled: snapshot.pulledTotal,
},
uptimeMs: snapshot.uptimeMs,
}
await client.execute(
new SendPumpPulseCommand({
pathwayId: snapshot.pathwayId,
flowType: snapshot.flowType,
timeBucket: snapshot.timeBucket,
eventId: snapshot.eventId ?? null,
isLive: snapshot.isLive,
buffer: {
depth: snapshot.bufferDepth,
reserved: snapshot.bufferReserved,
sizeBytes: snapshot.bufferSizeBytes,
},
counters: {
acknowledged: snapshot.acknowledgedTotal,
failed: snapshot.failedTotal,
pulled: snapshot.pulledTotal,
},
uptimeMs: snapshot.uptimeMs,
}),
new SendPumpPulseCommand(input as ConstructorParameters<typeof SendPumpPulseCommand>[0]),
)

this.logger?.[this.successLogLevel]?.("Pulse sent", {
Expand Down
61 changes: 61 additions & 0 deletions test/tests/pulse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { assertEquals } from "@std/assert/equals"
import { describe, it } from "@std/testing/bdd"
import { PulseEmitter, type PulseSnapshot } from "../../src/data-pump/pulse.ts"

// The PulseEmitter posts via a Flowcore SDK Command, which routes through
// the network. We can't test the wire payload here without a mock server,
// but we can verify the snapshot shape the emitter threads through.

describe("PulseEmitter snapshot shape", () => {
it("PulseSnapshot carries optional sourceId", () => {
const snapshot: PulseSnapshot = {
pathwayId: "pathway-a",
sourceId: "source-1",
flowType: "flow.0",
timeBucket: "2026042317",
eventId: undefined,
isLive: true,
bufferDepth: 0,
bufferReserved: 0,
bufferSizeBytes: 0,
acknowledgedTotal: 0,
failedTotal: 0,
pulledTotal: 0,
uptimeMs: 0,
}
assertEquals(snapshot.sourceId, "source-1")
})

it("PulseSnapshot.sourceId is optional for back-compat", () => {
const snapshot: PulseSnapshot = {
pathwayId: "pathway-a",
flowType: "flow.0",
timeBucket: "2026042317",
eventId: undefined,
isLive: true,
bufferDepth: 0,
bufferReserved: 0,
bufferSizeBytes: 0,
acknowledgedTotal: 0,
failedTotal: 0,
pulledTotal: 0,
uptimeMs: 0,
}
assertEquals(snapshot.sourceId, undefined)
})

it("emitter accepts sourceId in options (compile-time check)", () => {
// The emitter takes PulseEmitterOptions; sourceId lives on the closure-
// captured snapshot, not on options. This test just confirms the class
// constructs without sourceId in the options shape.
const emitter = new PulseEmitter(
{
url: "http://localhost:0",
auth: { apiKey: "fc_noop_noop" },
},
() => null,
)
assertEquals(typeof emitter.start, "function")
assertEquals(typeof emitter.stop, "function")
})
})
Loading