Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 126 additions & 25 deletions __tests__/consumer-options.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { describe, expect, it, mock, beforeEach } from "bun:test";
import { startJSConsumers } from "../src/consumer.js";
import type { JSConsumerRegistration } from "../src/consumer.js";
import type { ConsumerDefaults } from "../src/connection.js";
import { AckPolicy } from "nats";
import type { JSConsumerOptions } from "../src/consumer.js";
import { AckPolicy, NatsError } from "nats";

const silentLogger = {
info: mock(() => {}),
Expand Down Expand Up @@ -65,9 +66,7 @@ describe("Consumer defaults (connection-level)", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
defaults,
{ consumerDefaults: defaults },
);

expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
Expand Down Expand Up @@ -101,9 +100,7 @@ describe("Consumer defaults (connection-level)", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
defaults,
{ consumerDefaults: defaults },
);

expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
Expand Down Expand Up @@ -138,9 +135,7 @@ describe("Consumer defaults (connection-level)", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
defaults,
{ consumerDefaults: defaults },
);

expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
Expand Down Expand Up @@ -201,9 +196,7 @@ describe("Consumer defaults (connection-level)", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
defaults,
{ consumerDefaults: defaults },
);

expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
Expand Down Expand Up @@ -234,9 +227,7 @@ describe("Consumer defaults (connection-level)", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
defaults,
{ consumerDefaults: defaults },
);

expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
Expand Down Expand Up @@ -270,9 +261,7 @@ describe("Per-consumer options override defaults", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
defaults,
{ consumerDefaults: defaults },
);

expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
Expand Down Expand Up @@ -305,9 +294,7 @@ describe("Per-consumer options override defaults", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
defaults,
{ consumerDefaults: defaults },
);

expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
Expand Down Expand Up @@ -380,9 +367,7 @@ describe("Per-consumer options override defaults", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
defaults,
{ consumerDefaults: defaults },
);

expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
Expand All @@ -393,3 +378,119 @@ describe("Per-consumer options override defaults", () => {
});
});
});

describe("Stream error handling", () => {
it("rethrows non-NATS errors from stream update", async () => {
const { js } = createMockJsAndJsm();

// streams.add fails, then streams.update also fails with a non-NATS error
const jsm = {
streams: {
add: mock(() => Promise.reject(new Error("add failed"))),
update: mock(() => Promise.reject(new Error("network timeout"))),
},
consumers: {
add: mock(() => Promise.resolve({})),
delete: mock(() => Promise.resolve(true)),
},
};

const registrations: JSConsumerRegistration<unknown>[] = [
{
kind: "jetstream",
stream: "events",
routingKey: "Order.Created",
handler: async () => {},
durable: "test-svc",
},
];

await expect(
startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger),
).rejects.toThrow("network timeout");
});

it("suppresses NATS 10058 error (stream already exists)", async () => {
const { js } = createMockJsAndJsm();

const natsErr = new NatsError("stream exists", "10058", new Error("inner"));
(natsErr as any).api_error = { err_code: 10058 };
const jsm = {
streams: {
add: mock(() => Promise.reject(new Error("add failed"))),
update: mock(() => Promise.reject(natsErr)),
},
consumers: {
add: mock(() => Promise.resolve({ name: "test-svc" })),
delete: mock(() => Promise.resolve(true)),
},
};

const registrations: JSConsumerRegistration<unknown>[] = [
{
kind: "jetstream",
stream: "events",
routingKey: "Order.Created",
handler: async () => {},
durable: "test-svc",
},
];

// Should not throw
await startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger);
expect(jsm.consumers.add).toHaveBeenCalled();
});
});

describe("backOff/maxDeliver validation", () => {
it("throws when backOff length exceeds maxDeliver", async () => {
const { js, jsm } = createMockJsAndJsm();

const registrations: JSConsumerRegistration<unknown>[] = [
{
kind: "jetstream",
stream: "events",
routingKey: "Order.Created",
handler: async () => {},
durable: "test-svc",
maxDeliver: 2,
backOff: [1000, 5000, 30000],
},
];

await expect(
startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger),
).rejects.toThrow("backOff length (3) exceeds maxDeliver (2)");
});
});

describe("Ephemeral consumer error handling", () => {
it("rethrows error when ephemeral consumer creation fails", async () => {
const { js } = createMockJsAndJsm();

const jsm = {
streams: {
add: mock(() => Promise.resolve({})),
update: mock(() => Promise.resolve({})),
},
consumers: {
add: mock(() => Promise.reject(new Error("consumer creation failed"))),
delete: mock(() => Promise.resolve(true)),
},
};

const registrations: JSConsumerRegistration<unknown>[] = [
{
kind: "jetstream",
stream: "events",
routingKey: "Order.Created",
handler: async () => {},
// no durable = ephemeral
},
];

await expect(
startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger),
).rejects.toThrow("consumer creation failed");
});
});
23 changes: 9 additions & 14 deletions __tests__/metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ describe("NATS JetStream Consumer Metrics", () => {

await startJSConsumers(
js as never, jsm as never, "test-svc", registrations, silentLogger,
undefined, undefined, undefined, undefined, undefined,
metrics,
{ metrics },
);

await waitFor(() => {
Expand Down Expand Up @@ -221,8 +220,7 @@ describe("NATS JetStream Consumer Metrics", () => {

await startJSConsumers(
js as never, jsm as never, "test-svc", registrations, silentLogger,
undefined, undefined, undefined, undefined, undefined,
metrics,
{ metrics },
);

await waitFor(() => {
Expand Down Expand Up @@ -285,8 +283,7 @@ describe("NATS JetStream Consumer Metrics", () => {

await startJSConsumers(
js as never, jsm as never, "test-svc", registrations, silentLogger,
undefined, undefined, undefined, undefined, undefined,
metrics,
{ metrics },
);

await waitFor(() => {
Expand Down Expand Up @@ -347,8 +344,7 @@ describe("NATS JetStream Consumer Metrics", () => {

await startJSConsumers(
js as never, jsm as never, "test-svc", registrations, silentLogger,
undefined, undefined, undefined, undefined, undefined,
metrics,
{ metrics },
);

await waitFor(() => {
Expand Down Expand Up @@ -416,8 +412,7 @@ describe("NATS JetStream Consumer Metrics", () => {

await startJSConsumers(
js as never, jsm as never, "test-svc", registrations, silentLogger,
undefined, undefined, undefined, undefined, undefined,
metrics, mapper,
{ metrics, routingKeyMapper: mapper },
);

await waitFor(() => {
Expand Down Expand Up @@ -453,7 +448,7 @@ describe("NATS Core Consumer Metrics", () => {

startCoreConsumers(
nc as never, "test-service", registrations, silentLogger,
undefined, undefined, undefined, metrics,
{ metrics },
);

const msg = createMockMsg({ orderId: "123" }, ceHeaders);
Expand All @@ -480,7 +475,7 @@ describe("NATS Core Consumer Metrics", () => {

startCoreConsumers(
nc as never, "test-service", registrations, silentLogger,
undefined, undefined, undefined, metrics,
{ metrics },
);

const msg = createMockMsg({ orderId: "123" }, ceHeaders);
Expand All @@ -507,7 +502,7 @@ describe("NATS Core Consumer Metrics", () => {

startCoreConsumers(
nc as never, "test-service", registrations, silentLogger,
undefined, undefined, undefined, metrics,
{ metrics },
);

const msg = {
Expand Down Expand Up @@ -545,7 +540,7 @@ describe("NATS Core Consumer Metrics", () => {

startCoreConsumers(
nc as never, "test-service", registrations, silentLogger,
undefined, undefined, undefined, metrics, mapper,
{ metrics, routingKeyMapper: mapper },
);

const msg = createMockMsg({ orderId: "123" }, ceHeaders);
Expand Down
18 changes: 4 additions & 14 deletions __tests__/notifications.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ describe("JetStream notifications", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
undefined,
onNotification,
{ onNotification },
);

await waitFor(() => {
Expand Down Expand Up @@ -149,11 +146,7 @@ describe("JetStream notifications", () => {
"test-svc",
registrations,
silentLogger,
undefined,
undefined,
undefined,
undefined,
onError,
{ onError },
);

await waitFor(() => {
Expand Down Expand Up @@ -229,8 +222,7 @@ describe("Core NATS notifications", () => {
"test-service",
registrations,
silentLogger,
undefined,
onNotification,
{ onNotification },
);

const msg = createMockMsg({ orderId: "123" });
Expand Down Expand Up @@ -264,9 +256,7 @@ describe("Core NATS notifications", () => {
"test-service",
registrations,
silentLogger,
undefined,
undefined,
onError,
{ onError },
);

const msg = createMockMsg({ orderId: "123" });
Expand Down
12 changes: 4 additions & 8 deletions __tests__/stream-config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ describe("Stream config in startJSConsumers", () => {
"test-svc",
registrations,
silentLogger,
undefined,
resolver,
{ resolveStreamConfig: resolver },
);

expect(jsm.streams.add).toHaveBeenCalledWith({
Expand Down Expand Up @@ -149,8 +148,7 @@ describe("Stream config in startJSConsumers", () => {
"test-svc",
registrations,
silentLogger,
undefined,
resolver,
{ resolveStreamConfig: resolver },
);

expect(jsm.streams.update).toHaveBeenCalledWith("events", {
Expand Down Expand Up @@ -195,8 +193,7 @@ describe("Stream config in startJSConsumers", () => {
"svc",
registrations,
silentLogger,
undefined,
resolver,
{ resolveStreamConfig: resolver },
);

expect(resolvedStreams).toEqual(["events", "custom"]);
Expand Down Expand Up @@ -235,8 +232,7 @@ describe("Stream config in startJSConsumers", () => {
"test-svc",
registrations,
silentLogger,
undefined,
resolver,
{ resolveStreamConfig: resolver },
);

expect(jsm.streams.add).toHaveBeenCalledWith({
Expand Down
Loading
Loading