Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 50 additions & 7 deletions src/pathways/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,20 @@ export interface AuditWebhookSendOptions extends WebhookSendOptions {
export type LogLevel = keyof Pick<Logger, "debug" | "info" | "warn" | "error">

/**
* Configuration for log levels
* Configuration for log levels used by PathwaysBuilder for various operations.
*
* @property writeSuccess Log level used when a write operation is successful. Defaults to 'info'.
* @property pulseSuccess Log level used when a pulse is successfully sent. Defaults to 'debug'.
* @property pulseFailure Log level used when a pulse emission fails. Defaults to 'warn'.
* @property provisionSuccess Log level used when virtual pathway provisioning succeeds. Defaults to 'info'.
* @property provisionFailure Log level used when virtual pathway provisioning fails. Defaults to 'error'.
*/
export type LogLevelConfig = {
writeSuccess?: LogLevel
pulseSuccess?: LogLevel
pulseFailure?: LogLevel
provisionSuccess?: LogLevel
provisionFailure?: LogLevel
}

/**
Expand Down Expand Up @@ -362,6 +371,10 @@ export class PathwaysBuilder<
// Initialize log levels with defaults
this.logLevel = {
writeSuccess: logLevel?.writeSuccess ?? "info",
pulseSuccess: logLevel?.pulseSuccess ?? "debug",
pulseFailure: logLevel?.pulseFailure ?? "warn",
provisionSuccess: logLevel?.provisionSuccess ?? "info",
provisionFailure: logLevel?.provisionFailure ?? "error",
}

// Store configuration values for cloning
Expand Down Expand Up @@ -1286,10 +1299,11 @@ export class PathwaysBuilder<
if (this.pathwayName) {
const flowTypes = [...new Set(registrations.map((r) => r.flowType))]
const cpBaseUrl = this.pulseUrl
const url = `${cpBaseUrl}/api/v1/pathways/by-name/${encodeURIComponent(this.pathwayName)}`

const response = await fetch(
`${cpBaseUrl}/api/v1/pathways/by-name/${encodeURIComponent(this.pathwayName)}`,
{
let response: Response
try {
response = await fetch(url, {
method: "PUT",
headers: {
"Content-Type": "application/json",
Expand All @@ -1307,22 +1321,39 @@ export class PathwaysBuilder<
flowTypes,
},
}),
},
)
})
} catch (err) {
const msg = err instanceof Error ? err.message : String(err)
this.logger[this.logLevel.provisionFailure]("Virtual pathway registration failed", {
pathwayName: this.pathwayName,
url,
error: msg,
phase: "network",
})
throw new Error(`Failed to register virtual pathway "${this.pathwayName}": ${msg}`)
}

if (!response.ok) {
const text = await response.text().catch(() => "")
this.logger[this.logLevel.provisionFailure]("Virtual pathway registration failed", {
pathwayName: this.pathwayName,
url,
status: response.status,
body: text,
phase: "response",
})
throw new Error(
`Failed to register virtual pathway "${this.pathwayName}": ${response.status} ${text}`,
)
}

const result = await response.json() as { pathwayId: string; status: string }
this.pathwayId = result.pathwayId
this.logger.info("Virtual pathway registered", {
this.logger[this.logLevel.provisionSuccess]("Virtual pathway registered", {
pathwayName: this.pathwayName,
pathwayId: this.pathwayId,
status: result.status,
flowTypes,
})
}
}
Expand Down Expand Up @@ -1350,9 +1381,21 @@ export class PathwaysBuilder<
url: this.pulseUrl,
intervalMs: this.pulseIntervalMs,
pathwayId: this.pathwayId,
successLogLevel: this.logLevel.pulseSuccess,
failureLogLevel: this.logLevel.pulseFailure,
},
}
this.logger.info("Auto-configured pulse", { pathwayId: this.pathwayId, url: this.pulseUrl })
} else if (options.pulse) {
// Respect explicit pulse config but inject log levels from builder config if not set
options = {
...options,
pulse: {
...options.pulse,
successLogLevel: options.pulse.successLogLevel ?? this.logLevel.pulseSuccess,
failureLogLevel: options.pulse.failureLogLevel ?? this.logLevel.pulseFailure,
},
}
}

// If cluster active and not leader, don't start pump
Expand Down
10 changes: 9 additions & 1 deletion src/pathways/pump/pathway-pump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ export class PathwayPump {
private readonly bufferSize: number
private readonly maxRedeliveryCount: number
private readonly logger: Logger
private readonly pulseConfig?: { url: string; intervalMs?: number; pathwayId?: string }
private readonly pulseConfig?: {
url: string
intervalMs?: number
pathwayId?: string
successLogLevel?: "debug" | "info" | "warn" | "error"
failureLogLevel?: "debug" | "info" | "warn" | "error"
}

private pumps: Map<string, DataPumpInstance> = new Map()
private stateManagers: Map<string, PumpStateManager> = new Map()
Expand Down Expand Up @@ -164,6 +170,8 @@ export class PathwayPump {
url: this.pulseConfig.url,
intervalMs: this.pulseConfig.intervalMs,
pathwayId: this.pulseConfig.pathwayId ?? "unknown",
successLogLevel: this.pulseConfig.successLogLevel,
failureLogLevel: this.pulseConfig.failureLogLevel,
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/pathways/pump/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ export interface PathwayPumpOptions {
intervalMs?: number
/** Pathway ID for this pump */
pathwayId?: string
/** Log level for successful pulses. Defaults to 'debug'. */
successLogLevel?: "debug" | "info" | "warn" | "error"
/** Log level for pulse failures. Defaults to 'warn'. */
failureLogLevel?: "debug" | "info" | "warn" | "error"
}
}

Expand Down
Loading