Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions __tests__/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,11 @@ describe("QueueConsumer", () => {
expect(channel.nack).not.toHaveBeenCalled();
});

it("logs consumer loop exit on stop()", () => {
it("logs consumer stop at info level", () => {
consumer.stop();

expect(silentLogger.warn).toHaveBeenCalledWith(
expect.stringContaining("consumer loop exited, delivery channel closed"),
expect(silentLogger.info).toHaveBeenCalledWith(
expect.stringContaining("consumer stopped"),
);
expect(consumer.isStopped()).toBe(true);
});
Expand All @@ -393,16 +393,16 @@ describe("QueueConsumer", () => {
consumer.stop();
consumer.stop();

const stopCalls = silentLogger.warn.mock.calls.filter(
(call: unknown[]) => typeof call[0] === "string" && (call[0] as string).includes("consumer loop exited"),
const stopCalls = silentLogger.info.mock.calls.filter(
(call: unknown[]) => typeof call[0] === "string" && (call[0] as string).includes("consumer stopped"),
);
expect(stopCalls).toHaveLength(1);
});

describe("legacySupport", () => {
it("enriches metadata for messages without CE headers when legacySupport is enabled", async () => {
const legacyConsumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, undefined, undefined, true,
"test-queue", silentLogger, undefined, { legacySupport: true },
);
const handler = mock(() => Promise.resolve(undefined));
legacyConsumer.addHandler("order.created", handler);
Expand All @@ -428,7 +428,7 @@ describe("QueueConsumer", () => {

it("does not enrich metadata for messages WITH CE headers when legacySupport is enabled", async () => {
const legacyConsumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, undefined, undefined, true,
"test-queue", silentLogger, undefined, { legacySupport: true },
);
const handler = mock(() => Promise.resolve(undefined));
legacyConsumer.addHandler("order.created", handler);
Expand Down Expand Up @@ -464,7 +464,7 @@ describe("QueueConsumer", () => {

it("logs debug messages for legacy message detection and enrichment", async () => {
const legacyConsumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, undefined, undefined, true,
"test-queue", silentLogger, undefined, { legacySupport: true },
);
const handler = mock(() => Promise.resolve(undefined));
legacyConsumer.addHandler("order.created", handler);
Expand Down
14 changes: 7 additions & 7 deletions __tests__/metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ describe("AMQP Consumer Metrics", () => {

it("calls eventReceived and eventAck on successful handler", async () => {
const consumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, metrics,
"test-queue", silentLogger, undefined, { metrics },
);
consumer.addHandler("order.created", mock(() => Promise.resolve(undefined)));
await consumer.consume(channel as unknown as import("amqplib").Channel);
Expand All @@ -184,7 +184,7 @@ describe("AMQP Consumer Metrics", () => {

it("calls eventNack on handler error", async () => {
const consumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, metrics,
"test-queue", silentLogger, undefined, { metrics },
);
consumer.addHandler("order.created", mock(() => Promise.reject(new Error("fail"))));
await consumer.consume(channel as unknown as import("amqplib").Channel);
Expand All @@ -203,7 +203,7 @@ describe("AMQP Consumer Metrics", () => {

it("calls eventWithoutHandler when no handler matches", async () => {
const consumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, metrics,
"test-queue", silentLogger, undefined, { metrics },
);
consumer.addHandler("order.created", mock());
await consumer.consume(channel as unknown as import("amqplib").Channel);
Expand All @@ -220,7 +220,7 @@ describe("AMQP Consumer Metrics", () => {

it("calls eventNotParsable on invalid JSON", async () => {
const consumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, metrics,
"test-queue", silentLogger, undefined, { metrics },
);
consumer.addHandler("order.created", mock());
await consumer.consume(channel as unknown as import("amqplib").Channel);
Expand All @@ -238,7 +238,7 @@ describe("AMQP Consumer Metrics", () => {
it("applies routingKeyMapper before passing to metrics", async () => {
const mapper = (key: string) => key.replace(/\.\d+/, ".ID");
const consumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, metrics, mapper,
"test-queue", silentLogger, undefined, { metrics, routingKeyMapper: mapper },
);
consumer.addHandler("order.#", mock(() => Promise.resolve(undefined)));
await consumer.consume(channel as unknown as import("amqplib").Channel);
Expand All @@ -255,10 +255,10 @@ describe("AMQP Consumer Metrics", () => {
);
});

it("replaces empty mapped routing key with 'unknown'", async () => {
it("replaces empty mapped routing key with unknown", async () => {
const mapper = () => "";
const consumer = new QueueConsumer(
"test-queue", silentLogger, undefined, undefined, undefined, metrics, mapper,
"test-queue", silentLogger, undefined, { metrics, routingKeyMapper: mapper },
);
consumer.addHandler("order.created", mock(() => Promise.resolve(undefined)));
await consumer.consume(channel as unknown as import("amqplib").Channel);
Expand Down
5 changes: 2 additions & 3 deletions __tests__/notifications.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ describe("AMQP notifications", () => {
"test-queue",
silentLogger,
undefined,
onNotification,
{ onNotification },
);

const handler = mock(() => Promise.resolve(undefined));
Expand Down Expand Up @@ -153,8 +153,7 @@ describe("AMQP notifications", () => {
"test-queue",
silentLogger,
undefined,
undefined,
onError,
{ onError },
);

const handlerError = new Error("handler failed");
Expand Down
2 changes: 1 addition & 1 deletion __tests__/prefetch-and-channel-close.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ describe("channel close listener", () => {
consumerCh.emit("error", new Error("channel reset"));

expect(silentLogger.warn).toHaveBeenCalledWith(
expect.stringContaining("consumer loop exited, delivery channel closed"),
expect.stringContaining("channel error: channel reset"),
);
});
});
Loading
Loading