diff --git a/__tests__/consumer-options.test.ts b/__tests__/consumer-options.test.ts index d87bc87..dbf56dd 100644 --- a/__tests__/consumer-options.test.ts +++ b/__tests__/consumer-options.test.ts @@ -2,7 +2,8 @@ import { describe, expect, it, mock, beforeEach } from "bun:test"; import { startJSConsumers } from "../src/consumer.js"; import type { JSConsumerRegistration } from "../src/consumer.js"; import type { ConsumerDefaults } from "../src/connection.js"; -import { AckPolicy } from "nats"; +import type { JSConsumerOptions } from "../src/consumer.js"; +import { AckPolicy, NatsError } from "nats"; const silentLogger = { info: mock(() => {}), @@ -65,9 +66,7 @@ describe("Consumer defaults (connection-level)", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - defaults, + { consumerDefaults: defaults }, ); expect(jsm.consumers.add).toHaveBeenCalledWith("events", { @@ -101,9 +100,7 @@ describe("Consumer defaults (connection-level)", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - defaults, + { consumerDefaults: defaults }, ); expect(jsm.consumers.add).toHaveBeenCalledWith("events", { @@ -138,9 +135,7 @@ describe("Consumer defaults (connection-level)", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - defaults, + { consumerDefaults: defaults }, ); expect(jsm.consumers.add).toHaveBeenCalledWith("events", { @@ -201,9 +196,7 @@ describe("Consumer defaults (connection-level)", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - defaults, + { consumerDefaults: defaults }, ); expect(jsm.consumers.add).toHaveBeenCalledWith("events", { @@ -234,9 +227,7 @@ describe("Consumer defaults (connection-level)", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - defaults, + { consumerDefaults: defaults }, ); expect(jsm.consumers.add).toHaveBeenCalledWith("events", { @@ -270,9 +261,7 @@ describe("Per-consumer options override defaults", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - defaults, + { consumerDefaults: defaults }, ); expect(jsm.consumers.add).toHaveBeenCalledWith("events", { @@ -305,9 +294,7 @@ describe("Per-consumer options override defaults", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - defaults, + { consumerDefaults: defaults }, ); expect(jsm.consumers.add).toHaveBeenCalledWith("events", { @@ -380,9 +367,7 @@ describe("Per-consumer options override defaults", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - defaults, + { consumerDefaults: defaults }, ); expect(jsm.consumers.add).toHaveBeenCalledWith("events", { @@ -393,3 +378,119 @@ describe("Per-consumer options override defaults", () => { }); }); }); + +describe("Stream error handling", () => { + it("rethrows non-NATS errors from stream update", async () => { + const { js } = createMockJsAndJsm(); + + // streams.add fails, then streams.update also fails with a non-NATS error + const jsm = { + streams: { + add: mock(() => Promise.reject(new Error("add failed"))), + update: mock(() => Promise.reject(new Error("network timeout"))), + }, + consumers: { + add: mock(() => Promise.resolve({})), + delete: mock(() => Promise.resolve(true)), + }, + }; + + const registrations: JSConsumerRegistration[] = [ + { + kind: "jetstream", + stream: "events", + routingKey: "Order.Created", + handler: async () => {}, + durable: "test-svc", + }, + ]; + + await expect( + startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger), + ).rejects.toThrow("network timeout"); + }); + + it("suppresses NATS 10058 error (stream already exists)", async () => { + const { js } = createMockJsAndJsm(); + + const natsErr = new NatsError("stream exists", "10058", new Error("inner")); + (natsErr as any).api_error = { err_code: 10058 }; + const jsm = { + streams: { + add: mock(() => Promise.reject(new Error("add failed"))), + update: mock(() => Promise.reject(natsErr)), + }, + consumers: { + add: mock(() => Promise.resolve({ name: "test-svc" })), + delete: mock(() => Promise.resolve(true)), + }, + }; + + const registrations: JSConsumerRegistration[] = [ + { + kind: "jetstream", + stream: "events", + routingKey: "Order.Created", + handler: async () => {}, + durable: "test-svc", + }, + ]; + + // Should not throw + await startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger); + expect(jsm.consumers.add).toHaveBeenCalled(); + }); +}); + +describe("backOff/maxDeliver validation", () => { + it("throws when backOff length exceeds maxDeliver", async () => { + const { js, jsm } = createMockJsAndJsm(); + + const registrations: JSConsumerRegistration[] = [ + { + kind: "jetstream", + stream: "events", + routingKey: "Order.Created", + handler: async () => {}, + durable: "test-svc", + maxDeliver: 2, + backOff: [1000, 5000, 30000], + }, + ]; + + await expect( + startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger), + ).rejects.toThrow("backOff length (3) exceeds maxDeliver (2)"); + }); +}); + +describe("Ephemeral consumer error handling", () => { + it("rethrows error when ephemeral consumer creation fails", async () => { + const { js } = createMockJsAndJsm(); + + const jsm = { + streams: { + add: mock(() => Promise.resolve({})), + update: mock(() => Promise.resolve({})), + }, + consumers: { + add: mock(() => Promise.reject(new Error("consumer creation failed"))), + delete: mock(() => Promise.resolve(true)), + }, + }; + + const registrations: JSConsumerRegistration[] = [ + { + kind: "jetstream", + stream: "events", + routingKey: "Order.Created", + handler: async () => {}, + // no durable = ephemeral + }, + ]; + + await expect( + startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger), + ).rejects.toThrow("consumer creation failed"); + }); +}); diff --git a/__tests__/metrics.test.ts b/__tests__/metrics.test.ts index 6f98f7b..52b7446 100644 --- a/__tests__/metrics.test.ts +++ b/__tests__/metrics.test.ts @@ -151,8 +151,7 @@ describe("NATS JetStream Consumer Metrics", () => { await startJSConsumers( js as never, jsm as never, "test-svc", registrations, silentLogger, - undefined, undefined, undefined, undefined, undefined, - metrics, + { metrics }, ); await waitFor(() => { @@ -221,8 +220,7 @@ describe("NATS JetStream Consumer Metrics", () => { await startJSConsumers( js as never, jsm as never, "test-svc", registrations, silentLogger, - undefined, undefined, undefined, undefined, undefined, - metrics, + { metrics }, ); await waitFor(() => { @@ -285,8 +283,7 @@ describe("NATS JetStream Consumer Metrics", () => { await startJSConsumers( js as never, jsm as never, "test-svc", registrations, silentLogger, - undefined, undefined, undefined, undefined, undefined, - metrics, + { metrics }, ); await waitFor(() => { @@ -347,8 +344,7 @@ describe("NATS JetStream Consumer Metrics", () => { await startJSConsumers( js as never, jsm as never, "test-svc", registrations, silentLogger, - undefined, undefined, undefined, undefined, undefined, - metrics, + { metrics }, ); await waitFor(() => { @@ -416,8 +412,7 @@ describe("NATS JetStream Consumer Metrics", () => { await startJSConsumers( js as never, jsm as never, "test-svc", registrations, silentLogger, - undefined, undefined, undefined, undefined, undefined, - metrics, mapper, + { metrics, routingKeyMapper: mapper }, ); await waitFor(() => { @@ -453,7 +448,7 @@ describe("NATS Core Consumer Metrics", () => { startCoreConsumers( nc as never, "test-service", registrations, silentLogger, - undefined, undefined, undefined, metrics, + { metrics }, ); const msg = createMockMsg({ orderId: "123" }, ceHeaders); @@ -480,7 +475,7 @@ describe("NATS Core Consumer Metrics", () => { startCoreConsumers( nc as never, "test-service", registrations, silentLogger, - undefined, undefined, undefined, metrics, + { metrics }, ); const msg = createMockMsg({ orderId: "123" }, ceHeaders); @@ -507,7 +502,7 @@ describe("NATS Core Consumer Metrics", () => { startCoreConsumers( nc as never, "test-service", registrations, silentLogger, - undefined, undefined, undefined, metrics, + { metrics }, ); const msg = { @@ -545,7 +540,7 @@ describe("NATS Core Consumer Metrics", () => { startCoreConsumers( nc as never, "test-service", registrations, silentLogger, - undefined, undefined, undefined, metrics, mapper, + { metrics, routingKeyMapper: mapper }, ); const msg = createMockMsg({ orderId: "123" }, ceHeaders); diff --git a/__tests__/notifications.test.ts b/__tests__/notifications.test.ts index 4b13c11..dcf0210 100644 --- a/__tests__/notifications.test.ts +++ b/__tests__/notifications.test.ts @@ -83,10 +83,7 @@ describe("JetStream notifications", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - undefined, - onNotification, + { onNotification }, ); await waitFor(() => { @@ -149,11 +146,7 @@ describe("JetStream notifications", () => { "test-svc", registrations, silentLogger, - undefined, - undefined, - undefined, - undefined, - onError, + { onError }, ); await waitFor(() => { @@ -229,8 +222,7 @@ describe("Core NATS notifications", () => { "test-service", registrations, silentLogger, - undefined, - onNotification, + { onNotification }, ); const msg = createMockMsg({ orderId: "123" }); @@ -264,9 +256,7 @@ describe("Core NATS notifications", () => { "test-service", registrations, silentLogger, - undefined, - undefined, - onError, + { onError }, ); const msg = createMockMsg({ orderId: "123" }); diff --git a/__tests__/stream-config.test.ts b/__tests__/stream-config.test.ts index 3503f12..188afdc 100644 --- a/__tests__/stream-config.test.ts +++ b/__tests__/stream-config.test.ts @@ -84,8 +84,7 @@ describe("Stream config in startJSConsumers", () => { "test-svc", registrations, silentLogger, - undefined, - resolver, + { resolveStreamConfig: resolver }, ); expect(jsm.streams.add).toHaveBeenCalledWith({ @@ -149,8 +148,7 @@ describe("Stream config in startJSConsumers", () => { "test-svc", registrations, silentLogger, - undefined, - resolver, + { resolveStreamConfig: resolver }, ); expect(jsm.streams.update).toHaveBeenCalledWith("events", { @@ -195,8 +193,7 @@ describe("Stream config in startJSConsumers", () => { "svc", registrations, silentLogger, - undefined, - resolver, + { resolveStreamConfig: resolver }, ); expect(resolvedStreams).toEqual(["events", "custom"]); @@ -235,8 +232,7 @@ describe("Stream config in startJSConsumers", () => { "test-svc", registrations, silentLogger, - undefined, - resolver, + { resolveStreamConfig: resolver }, ); expect(jsm.streams.add).toHaveBeenCalledWith({ diff --git a/src/connection.ts b/src/connection.ts index 10893c6..2a51a91 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -19,6 +19,7 @@ import { import { connect, Events, + NatsError, type ConnectionOptions as NatsConnectionOptions, type NatsConnection, type JetStreamClient, @@ -445,8 +446,14 @@ export class Connection { subjects: [`${stream}.>`], ...streamCfg, }); - } catch { - // Already exists with correct config + } catch (updateErr) { + // Suppress 10058 (stream name already in use) / 10059 (consumer name exists) + const apiCode = (updateErr instanceof NatsError) + ? updateErr.api_error?.err_code + : undefined; + if (apiCode !== 10058 && apiCode !== 10059) { + throw updateErr; + } } } publisher.wireJetStream(this.js); @@ -465,13 +472,15 @@ export class Connection { this.serviceName, this.jsRegistrations, this.logger, - this.propagator, - (stream) => this.resolveStreamConfig(stream), - this.consumerDefaults, - this.onNotification, - this.onError, - this.metrics, - this.routingKeyMapper, + { + propagator: this.propagator, + resolveStreamConfig: (stream) => this.resolveStreamConfig(stream), + consumerDefaults: this.consumerDefaults, + onNotification: this.onNotification, + onError: this.onError, + metrics: this.metrics, + routingKeyMapper: this.routingKeyMapper, + }, ); this.consumerHandles.push(...handles); } @@ -483,11 +492,13 @@ export class Connection { this.serviceName, this.coreRegistrations, this.logger, - this.propagator, - this.onNotification, - this.onError, - this.metrics, - this.routingKeyMapper, + { + propagator: this.propagator, + onNotification: this.onNotification, + onError: this.onError, + metrics: this.metrics, + routingKeyMapper: this.routingKeyMapper, + }, ); this.consumerHandles.push(...handles); } diff --git a/src/consumer.ts b/src/consumer.ts index fbf6b33..4f6e8e2 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -2,7 +2,7 @@ // Copyright (c) 2026 sparetimecoders import type { JetStreamClient, NatsConnection, JetStreamManager, ConsumerMessages } from "nats"; -import { AckPolicy } from "nats"; +import { AckPolicy, NatsError } from "nats"; import type { ConsumableEvent, EventHandler, @@ -26,6 +26,9 @@ import { extractToContext } from "./tracing.js"; import type { TextMapPropagator } from "@opentelemetry/api"; import type { StreamConfigResolver, ConsumerDefaults } from "./connection.js"; +const textDecoder = new TextDecoder(); +const textEncoder = new TextEncoder(); + type Logger = Pick; /** Internal registration for a JetStream consumer. */ @@ -74,6 +77,26 @@ function consumerGroupKey(reg: JSConsumerRegistration): string { return `${reg.stream}:${reg.durable ?? ""}`; } +/** Options for starting JetStream consumers. */ +export interface JSConsumerOptions { + propagator?: TextMapPropagator; + resolveStreamConfig?: StreamConfigResolver; + consumerDefaults?: ConsumerDefaults; + onNotification?: NotificationHandler; + onError?: ErrorNotificationHandler; + metrics?: MetricsRecorder; + routingKeyMapper?: RoutingKeyMapper; +} + +/** Options for starting Core NATS consumers. */ +export interface CoreConsumerOptions { + propagator?: TextMapPropagator; + onNotification?: NotificationHandler; + onError?: ErrorNotificationHandler; + metrics?: MetricsRecorder; + routingKeyMapper?: RoutingKeyMapper; +} + /** * Start all registered JetStream consumers. * @@ -90,14 +113,18 @@ export async function startJSConsumers( serviceName: string, registrations: JSConsumerRegistration[], logger: Logger, - propagator?: TextMapPropagator, - resolveStreamConfig?: StreamConfigResolver, - consumerDefaults?: ConsumerDefaults, - onNotification?: NotificationHandler, - onError?: ErrorNotificationHandler, - metrics?: MetricsRecorder, - routingKeyMapper?: RoutingKeyMapper, + options: JSConsumerOptions = {}, ): Promise { + const { + propagator, + resolveStreamConfig, + consumerDefaults, + onNotification, + onError, + metrics, + routingKeyMapper, + } = options; + const handles: ConsumerHandle[] = []; // Group registrations by stream+durable so we create one NATS consumer per group. @@ -132,8 +159,13 @@ export async function startJSConsumers( subjects: [streamSubjects], ...streamCfg, }); - } catch { - // Already exists with correct config + } catch (updateErr) { + const apiCode = (updateErr instanceof NatsError) + ? updateErr.api_error?.err_code + : undefined; + if (apiCode !== 10058 && apiCode !== 10059) { + throw updateErr; + } } } @@ -159,6 +191,21 @@ export async function startJSConsumers( consumerCfg.filter_subjects = filterSubjects; } + // Validate: backOff length must not exceed maxDeliver + const effectiveMaxDeliver = first.maxDeliver ?? consumerDefaults?.maxDeliver; + const effectiveBackOff = first.backOff ?? consumerDefaults?.backOff; + if ( + effectiveBackOff !== undefined && + effectiveBackOff.length > 0 && + effectiveMaxDeliver !== undefined && + effectiveMaxDeliver > 0 && + effectiveBackOff.length > effectiveMaxDeliver + ) { + throw new Error( + `backOff length (${effectiveBackOff.length}) exceeds maxDeliver (${effectiveMaxDeliver}) for consumer on stream "${stream}"`, + ); + } + // Apply MaxDeliver: per-consumer override > connection default. const maxDeliver = first.maxDeliver ?? consumerDefaults?.maxDeliver; if (maxDeliver !== undefined && maxDeliver > 0) { @@ -176,12 +223,14 @@ export async function startJSConsumers( try { const ci = await jsm.consumers.add(stream, consumerCfg); consumerName = ci.name; - } catch { + } catch (err) { // Consumer exists with incompatible config — delete and recreate if (durable) { await jsm.consumers.delete(stream, durable); const ci = await jsm.consumers.add(stream, consumerCfg); consumerName = ci.name; + } else { + throw err; } } @@ -242,7 +291,7 @@ export async function startJSConsumers( let payload: unknown; try { - payload = JSON.parse(new TextDecoder().decode(msg.data)); + payload = JSON.parse(textDecoder.decode(msg.data)); } catch { logger.error(`[gomessaging/nats] Failed to parse message on ${subject}`); metrics?.eventNotParsable(consumerName, mappedKey); @@ -307,12 +356,16 @@ export function startCoreConsumers( serviceName: string, registrations: CoreConsumerRegistration[], logger: Logger, - propagator?: TextMapPropagator, - onNotification?: NotificationHandler, - onError?: ErrorNotificationHandler, - metrics?: MetricsRecorder, - routingKeyMapper?: RoutingKeyMapper, + options: CoreConsumerOptions = {}, ): ConsumerHandle[] { + const { + propagator, + onNotification, + onError, + metrics, + routingKeyMapper, + } = options; + const handles: ConsumerHandle[] = []; for (const reg of registrations) { @@ -341,7 +394,7 @@ export function startCoreConsumers( let payload: unknown; try { - payload = JSON.parse(new TextDecoder().decode(msg.data)); + payload = JSON.parse(textDecoder.decode(msg.data)); } catch { logger.error(`[gomessaging/nats] Failed to parse message on ${reg.subject}`); metrics?.eventNotParsable(serviceName, mappedKey); @@ -360,7 +413,7 @@ export function startCoreConsumers( if (reg.requestReply) { const respHandler = reg.handler as RequestResponseEventHandler; const result = await respHandler(event); - const respData = new TextEncoder().encode(JSON.stringify(result)); + const respData = textEncoder.encode(JSON.stringify(result)); msg.respond(respData); } else { await (reg.handler as EventHandler)(event); @@ -384,7 +437,7 @@ export function startCoreConsumers( }); logger.error(`[gomessaging/nats] Handler failed on ${reg.subject}: ${errObj.message}`); if (reg.requestReply && msg.reply) { - const errResp = new TextEncoder().encode(JSON.stringify({ error: errObj.message })); + const errResp = textEncoder.encode(JSON.stringify({ error: errObj.message })); msg.respond(errResp); } } diff --git a/src/index.ts b/src/index.ts index b3e1b71..1b20cdc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -21,4 +21,5 @@ export { Connection, DefaultStreamConfig } from "./connection.js"; export type { ConnectionOptions, StreamConfig, StreamConfigResolver, ConsumerDefaults, ConsumerOptions } from "./connection.js"; export { Publisher } from "./publisher.js"; export type { PublisherOptions } from "./publisher.js"; +export type { JSConsumerOptions, CoreConsumerOptions } from "./consumer.js"; export { injectToHeaders, extractToContext } from "./tracing.js"; diff --git a/src/publisher.ts b/src/publisher.ts index 9895f57..e0c5a03 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -21,6 +21,7 @@ import { injectToHeaders } from "./tracing.js"; import type { TextMapPropagator } from "@opentelemetry/api"; const contentType = "application/json"; +const textEncoder = new TextEncoder(); type PublishFn = (subject: string, data: Uint8Array, headers: MsgHdrs) => Promise; @@ -100,7 +101,7 @@ export class Publisher { } const subject = this.subjectFn(this.stream, routingKey); - const data = new TextEncoder().encode(JSON.stringify(msg)); + const data = textEncoder.encode(JSON.stringify(msg)); const hdrs = natsHeaders();