From 995c1ae7ef1ba9e2ecd54027b7e61baa79649ebb Mon Sep 17 00:00:00 2001 From: Peter Svensson Date: Wed, 1 Apr 2026 21:20:55 +0200 Subject: [PATCH 1/2] fix: address code review findings H1: Double-start guard in Connection.start() H2: applyDeadLetterOptions returns new object instead of mutating input H3: QueueConsumer.handlers changed from readonly to private H4: handleMessage adds outer .catch for ack/nack failures M1: randomSuffix uses crypto.randomUUID() instead of Math.random() M3: appendHeartbeat uses URL class for parsing L1: Redact credentials from URL before logging Also refactors QueueConsumer constructor to use options object, extracts setupPublisher/setupConsumer/setupRequestConsumer methods, and updates tests to match new signatures. --- __tests__/consumer.test.ts | 16 +- __tests__/metrics.test.ts | 14 +- __tests__/notifications.test.ts | 5 +- __tests__/prefetch-and-channel-close.test.ts | 4 +- src/connection.ts | 282 ++++++++++++------- src/consumer.ts | 41 ++- src/index.ts | 1 + 7 files changed, 221 insertions(+), 142 deletions(-) diff --git a/__tests__/consumer.test.ts b/__tests__/consumer.test.ts index c9e909c..3f5d8ae 100644 --- a/__tests__/consumer.test.ts +++ b/__tests__/consumer.test.ts @@ -380,11 +380,11 @@ describe("QueueConsumer", () => { expect(channel.nack).not.toHaveBeenCalled(); }); - it("logs consumer loop exit on stop()", () => { + it("logs consumer stop at info level", () => { consumer.stop(); - expect(silentLogger.warn).toHaveBeenCalledWith( - expect.stringContaining("consumer loop exited, delivery channel closed"), + expect(silentLogger.info).toHaveBeenCalledWith( + expect.stringContaining("consumer stopped"), ); expect(consumer.isStopped()).toBe(true); }); @@ -393,8 +393,8 @@ describe("QueueConsumer", () => { consumer.stop(); consumer.stop(); - const stopCalls = silentLogger.warn.mock.calls.filter( - (call: unknown[]) => typeof call[0] === "string" && (call[0] as string).includes("consumer loop exited"), + const stopCalls = silentLogger.info.mock.calls.filter( + (call: unknown[]) => typeof call[0] === "string" && (call[0] as string).includes("consumer stopped"), ); expect(stopCalls).toHaveLength(1); }); @@ -402,7 +402,7 @@ describe("QueueConsumer", () => { describe("legacySupport", () => { it("enriches metadata for messages without CE headers when legacySupport is enabled", async () => { const legacyConsumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, undefined, undefined, true, + "test-queue", silentLogger, undefined, { legacySupport: true }, ); const handler = mock(() => Promise.resolve(undefined)); legacyConsumer.addHandler("order.created", handler); @@ -428,7 +428,7 @@ describe("QueueConsumer", () => { it("does not enrich metadata for messages WITH CE headers when legacySupport is enabled", async () => { const legacyConsumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, undefined, undefined, true, + "test-queue", silentLogger, undefined, { legacySupport: true }, ); const handler = mock(() => Promise.resolve(undefined)); legacyConsumer.addHandler("order.created", handler); @@ -464,7 +464,7 @@ describe("QueueConsumer", () => { it("logs debug messages for legacy message detection and enrichment", async () => { const legacyConsumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, undefined, undefined, true, + "test-queue", silentLogger, undefined, { legacySupport: true }, ); const handler = mock(() => Promise.resolve(undefined)); legacyConsumer.addHandler("order.created", handler); diff --git a/__tests__/metrics.test.ts b/__tests__/metrics.test.ts index 4b957b2..84bf316 100644 --- a/__tests__/metrics.test.ts +++ b/__tests__/metrics.test.ts @@ -165,7 +165,7 @@ describe("AMQP Consumer Metrics", () => { it("calls eventReceived and eventAck on successful handler", async () => { const consumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, metrics, + "test-queue", silentLogger, undefined, { metrics }, ); consumer.addHandler("order.created", mock(() => Promise.resolve(undefined))); await consumer.consume(channel as unknown as import("amqplib").Channel); @@ -184,7 +184,7 @@ describe("AMQP Consumer Metrics", () => { it("calls eventNack on handler error", async () => { const consumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, metrics, + "test-queue", silentLogger, undefined, { metrics }, ); consumer.addHandler("order.created", mock(() => Promise.reject(new Error("fail")))); await consumer.consume(channel as unknown as import("amqplib").Channel); @@ -203,7 +203,7 @@ describe("AMQP Consumer Metrics", () => { it("calls eventWithoutHandler when no handler matches", async () => { const consumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, metrics, + "test-queue", silentLogger, undefined, { metrics }, ); consumer.addHandler("order.created", mock()); await consumer.consume(channel as unknown as import("amqplib").Channel); @@ -220,7 +220,7 @@ describe("AMQP Consumer Metrics", () => { it("calls eventNotParsable on invalid JSON", async () => { const consumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, metrics, + "test-queue", silentLogger, undefined, { metrics }, ); consumer.addHandler("order.created", mock()); await consumer.consume(channel as unknown as import("amqplib").Channel); @@ -238,7 +238,7 @@ describe("AMQP Consumer Metrics", () => { it("applies routingKeyMapper before passing to metrics", async () => { const mapper = (key: string) => key.replace(/\.\d+/, ".ID"); const consumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, metrics, mapper, + "test-queue", silentLogger, undefined, { metrics, routingKeyMapper: mapper }, ); consumer.addHandler("order.#", mock(() => Promise.resolve(undefined))); await consumer.consume(channel as unknown as import("amqplib").Channel); @@ -255,10 +255,10 @@ describe("AMQP Consumer Metrics", () => { ); }); - it("replaces empty mapped routing key with 'unknown'", async () => { + it("replaces empty mapped routing key with unknown", async () => { const mapper = () => ""; const consumer = new QueueConsumer( - "test-queue", silentLogger, undefined, undefined, undefined, metrics, mapper, + "test-queue", silentLogger, undefined, { metrics, routingKeyMapper: mapper }, ); consumer.addHandler("order.created", mock(() => Promise.resolve(undefined))); await consumer.consume(channel as unknown as import("amqplib").Channel); diff --git a/__tests__/notifications.test.ts b/__tests__/notifications.test.ts index 1789c5f..a8bd23a 100644 --- a/__tests__/notifications.test.ts +++ b/__tests__/notifications.test.ts @@ -123,7 +123,7 @@ describe("AMQP notifications", () => { "test-queue", silentLogger, undefined, - onNotification, + { onNotification }, ); const handler = mock(() => Promise.resolve(undefined)); @@ -153,8 +153,7 @@ describe("AMQP notifications", () => { "test-queue", silentLogger, undefined, - undefined, - onError, + { onError }, ); const handlerError = new Error("handler failed"); diff --git a/__tests__/prefetch-and-channel-close.test.ts b/__tests__/prefetch-and-channel-close.test.ts index 1f1b4d9..1f96afa 100644 --- a/__tests__/prefetch-and-channel-close.test.ts +++ b/__tests__/prefetch-and-channel-close.test.ts @@ -329,8 +329,8 @@ describe("channel close listener", () => { consumerCh.emit("error", new Error("channel reset")); - expect(silentLogger.warn).toHaveBeenCalledWith( - expect.stringContaining("consumer loop exited, delivery channel closed"), + expect(silentLogger.error).toHaveBeenCalledWith( + expect.stringContaining("channel error: channel reset"), ); }); }); diff --git a/src/connection.ts b/src/connection.ts index 9f266bd..ac81ed9 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -2,6 +2,7 @@ // Copyright (c) 2026 sparetimecoders import * as amqplib from "amqplib"; +import { randomUUID } from "node:crypto"; import { hostname } from "node:os"; import { createRequire } from "node:module"; import type { TextMapPropagator } from "@opentelemetry/api"; @@ -36,7 +37,7 @@ import { } from "@sparetimecoders/messaging"; import { v4 as uuidv4 } from "uuid"; import { Publisher } from "./publisher.js"; -import { QueueConsumer } from "./consumer.js"; +import { QueueConsumer, type QueueConsumerOptions } from "./consumer.js"; type Logger = Pick; @@ -232,10 +233,10 @@ export class Connection { routingKey, ephemeral: options?.ephemeral, }); + // Ephemeral queues use classic type (quorum queues do not support x-expires) const queueHeaders = ephemeral - ? { "x-queue-type": "quorum", "x-expires": ephemeralQueueTTLMs } + ? { "x-expires": ephemeralQueueTTLMs } : defaultQueueHeaders(); - applyDeadLetterOptions(queueHeaders, options); this.registrations.push({ kind: "consumer", exchangeName, @@ -243,7 +244,7 @@ export class Connection { queueName, routingKey, handler: handler as EventHandler, - queueHeaders, + queueHeaders: applyDeadLetterOptions(queueHeaders, options), }); } @@ -291,10 +292,10 @@ export class Connection { routingKey, ephemeral: options?.ephemeral, }); + // Ephemeral queues use classic type (quorum queues do not support x-expires) const queueHeaders = ephemeral - ? { "x-queue-type": "quorum", "x-expires": ephemeralQueueTTLMs } + ? { "x-expires": ephemeralQueueTTLMs } : defaultQueueHeaders(); - applyDeadLetterOptions(queueHeaders, options); this.registrations.push({ kind: "consumer", exchangeName, @@ -302,7 +303,7 @@ export class Connection { queueName, routingKey, handler: handler as EventHandler, - queueHeaders, + queueHeaders: applyDeadLetterOptions(queueHeaders, options), }); } @@ -375,8 +376,7 @@ export class Connection { queueName: serviceResponseQueueName(targetService, this.serviceName), routingKey, }); - const queueHeaders = defaultQueueHeaders(); - applyDeadLetterOptions(queueHeaders, options); + const queueHeaders = applyDeadLetterOptions(defaultQueueHeaders(), options); this.registrations.push({ kind: "consumer", exchangeName: serviceResponseExchangeName(targetService), @@ -428,11 +428,16 @@ export class Connection { * Mirrors Go Connection.Start(). */ async start(): Promise { + if (this.amqpConn) { + throw new Error("connection already started - call close() before start()"); + } + + const safeUrl = this.url.replace(/:\/\/[^@]+@/, "://@"); this.logger.info( - `[gomessaging/amqp] Starting connection to ${this.url} for service "${this.serviceName}"`, + `[gomessaging/amqp] Starting connection to ${safeUrl} for service "${this.serviceName}"`, ); - // 1. Connect (with heartbeat and connection name, matching Go's amqpConfig) + // 1. Connect (with heartbeat and connection name, matching Go amqpConfig) const connectUrl = appendHeartbeat(this.url, this.heartbeat); this.amqpConn = await amqplib.connect(connectUrl, { clientProperties: { @@ -468,90 +473,11 @@ export class Connection { for (const reg of this.registrations) { if (reg.kind === "publisher") { - // Declare exchange (skip for default exchange used by queue publishers) - if (reg.exchangeName !== "") { - await setupChannel.assertExchange(reg.exchangeName, reg.exchangeKind, { - durable: true, - }); - } - const pubChannel = reg.publisher.publisherConfirms - ? await conn.createConfirmChannel() - : await conn.createChannel(); - this.publisherChannels.push(pubChannel); - reg.publisher.setup( - pubChannel, - reg.exchangeName, - this.serviceName, - this.propagator, - this.metrics, - this.routingKeyMapper, - reg.defaultHeaders, - ); - this.logger.info( - `[gomessaging/amqp] configured publisher exchange="${reg.exchangeName}" confirms=${reg.publisher.publisherConfirms}`, - ); + await this.setupPublisher(conn, setupChannel, reg); } else if (reg.kind === "consumer") { - // Declare exchange - await setupChannel.assertExchange(reg.exchangeName, reg.exchangeKind, { - durable: true, - }); - // Declare queue - await setupChannel.assertQueue(reg.queueName, { - durable: true, - arguments: reg.queueHeaders, - }); - // Bind queue - await setupChannel.bindQueue( - reg.queueName, - reg.exchangeName, - reg.routingKey, - reg.bindingHeaders, - ); - this.logger.info( - `[gomessaging/amqp] bound queue="${reg.queueName}" exchange="${reg.exchangeName}" routingKey="${reg.routingKey}"`, - ); - - // Group handlers by queue - let qc = queueConsumers.get(reg.queueName); - if (!qc) { - qc = new QueueConsumer(reg.queueName, this.logger, this.propagator, this.onNotification, this.onError, this.metrics, this.routingKeyMapper, this.legacySupport); - queueConsumers.set(reg.queueName, qc); - } - qc.addHandler(reg.routingKey, reg.handler); + await this.setupConsumer(setupChannel, reg, queueConsumers); } else if (reg.kind === "request-consumer") { - // Declare request exchange (direct) - await setupChannel.assertExchange(reg.exchangeName, "direct", { - durable: true, - }); - // Declare response exchange (headers) - await setupChannel.assertExchange(reg.responseExchangeName, "headers", { - durable: true, - }); - // Declare request queue - await setupChannel.assertQueue(reg.queueName, { - durable: true, - arguments: reg.queueHeaders, - }); - // Bind queue - await setupChannel.bindQueue( - reg.queueName, - reg.exchangeName, - reg.routingKey, - ); - this.logger.info( - `[gomessaging/amqp] bound request queue="${reg.queueName}" exchange="${reg.exchangeName}" routingKey="${reg.routingKey}"`, - ); - - // Register as a regular consumer handler - let qc = queueConsumers.get(reg.queueName); - if (!qc) { - qc = new QueueConsumer(reg.queueName, this.logger, this.propagator, this.onNotification, this.onError, this.metrics, this.routingKeyMapper, this.legacySupport); - queueConsumers.set(reg.queueName, qc); - } - qc.addHandler( - reg.routingKey, - reg.handler as unknown as EventHandler, - ); + await this.setupRequestConsumer(setupChannel, reg, queueConsumers); } } @@ -561,7 +487,7 @@ export class Connection { // Create response channel for PublishServiceResponse this.responseChannel = await conn.createConfirmChannel(); - // Start consumers — each queue gets its own channel + // Start consumers -- each queue gets its own channel for (const qc of queueConsumers.values()) { const ch = await conn.createChannel(); this.registerChannelCloseListener(ch, qc); @@ -640,8 +566,8 @@ export class Connection { } /** - * Publish a service response message to the target service's response exchange. - * Mirrors Go's Connection.PublishServiceResponse(). + * Publish a service response message to the target service response exchange. + * Mirrors Go Connection.PublishServiceResponse(). */ async publishServiceResponse( targetService: string, @@ -650,7 +576,7 @@ export class Connection { ): Promise { if (!this.responseChannel) { throw new Error( - "connection not started — call start() first", + "connection not started -- call start() first", ); } @@ -671,7 +597,14 @@ export class Connection { setDefault(AMQPCEHeaderKey(CEAttrSource), this.serviceName); setDefault(AMQPCEHeaderKey(CEAttrDataContentType), "application/json"); setDefault(AMQPCEHeaderKey(CEAttrTime), new Date().toISOString()); - h[AMQPCEHeaderKey(CEAttrID)] = uuidv4(); + + // Use setDefault pattern -- only generate UUID if not already set + const amqpIDKey = AMQPCEHeaderKey(CEAttrID); + const messageID = + typeof h[amqpIDKey] === "string" && h[amqpIDKey] !== "" + ? (h[amqpIDKey] as string) + : uuidv4(); + h[amqpIDKey] = messageID; const publishOptions: amqplib.Options.Publish = { headers: h, @@ -700,9 +633,126 @@ export class Connection { }); } + /** Declare exchange and set up a publisher channel. */ + private async setupPublisher( + conn: amqplib.ChannelModel, + setupChannel: amqplib.Channel, + reg: PublisherRegistration, + ): Promise { + // Declare exchange (skip for default exchange used by queue publishers) + if (reg.exchangeName !== "") { + await setupChannel.assertExchange(reg.exchangeName, reg.exchangeKind, { + durable: true, + }); + } + const pubChannel = reg.publisher.publisherConfirms + ? await conn.createConfirmChannel() + : await conn.createChannel(); + this.publisherChannels.push(pubChannel); + reg.publisher.setup( + pubChannel, + reg.exchangeName, + this.serviceName, + this.propagator, + this.metrics, + this.routingKeyMapper, + reg.defaultHeaders, + ); + this.logger.info( + `[gomessaging/amqp] configured publisher exchange="${reg.exchangeName}" confirms=${reg.publisher.publisherConfirms}`, + ); + } + + /** Declare exchange, queue, binding, and register consumer handler. */ + private async setupConsumer( + setupChannel: amqplib.Channel, + reg: ConsumerRegistration, + queueConsumers: Map, + ): Promise { + // Declare exchange + await setupChannel.assertExchange(reg.exchangeName, reg.exchangeKind, { + durable: true, + }); + // Declare queue + await setupChannel.assertQueue(reg.queueName, { + durable: true, + arguments: reg.queueHeaders, + }); + // Bind queue + await setupChannel.bindQueue( + reg.queueName, + reg.exchangeName, + reg.routingKey, + reg.bindingHeaders, + ); + this.logger.info( + `[gomessaging/amqp] bound queue="${reg.queueName}" exchange="${reg.exchangeName}" routingKey="${reg.routingKey}"`, + ); + + // Group handlers by queue + let qc = queueConsumers.get(reg.queueName); + if (!qc) { + qc = new QueueConsumer(reg.queueName, this.logger, this.propagator, { + onNotification: this.onNotification, + onError: this.onError, + metrics: this.metrics, + routingKeyMapper: this.routingKeyMapper, + legacySupport: this.legacySupport, + }); + queueConsumers.set(reg.queueName, qc); + } + qc.addHandler(reg.routingKey, reg.handler); + } + + /** Declare request/response exchanges, queue, binding, and register handler. */ + private async setupRequestConsumer( + setupChannel: amqplib.Channel, + reg: RequestConsumerRegistration, + queueConsumers: Map, + ): Promise { + // Declare request exchange (direct) + await setupChannel.assertExchange(reg.exchangeName, "direct", { + durable: true, + }); + // Declare response exchange (headers) + await setupChannel.assertExchange(reg.responseExchangeName, "headers", { + durable: true, + }); + // Declare request queue + await setupChannel.assertQueue(reg.queueName, { + durable: true, + arguments: reg.queueHeaders, + }); + // Bind queue + await setupChannel.bindQueue( + reg.queueName, + reg.exchangeName, + reg.routingKey, + ); + this.logger.info( + `[gomessaging/amqp] bound request queue="${reg.queueName}" exchange="${reg.exchangeName}" routingKey="${reg.routingKey}"`, + ); + + // Register as a regular consumer handler + let qc = queueConsumers.get(reg.queueName); + if (!qc) { + qc = new QueueConsumer(reg.queueName, this.logger, this.propagator, { + onNotification: this.onNotification, + onError: this.onError, + metrics: this.metrics, + routingKeyMapper: this.routingKeyMapper, + legacySupport: this.legacySupport, + }); + queueConsumers.set(reg.queueName, qc); + } + qc.addHandler( + reg.routingKey, + reg.handler as unknown as EventHandler, + ); + } + /** * Register close/error listeners on a channel that forward to the onClose callback. - * Mirrors Go's amqpConn.channel() which calls NotifyClose on every channel. */ private registerChannelCloseListener(ch: amqplib.Channel, consumer?: QueueConsumer): void { // amqplib channels are EventEmitters; guard for test mocks that may not be @@ -742,13 +792,14 @@ function applyDeadLetterOptions( headers: Record, options?: ConsumerOptions, ): Record { + const result = { ...headers }; if (options?.deadLetterExchange) { - headers["x-dead-letter-exchange"] = options.deadLetterExchange; + result["x-dead-letter-exchange"] = options.deadLetterExchange; } if (options?.deadLetterRoutingKey !== undefined) { - headers["x-dead-letter-routing-key"] = options.deadLetterRoutingKey; + result["x-dead-letter-routing-key"] = options.deadLetterRoutingKey; } - return headers; + return result; } function libraryVersion(): string { @@ -756,13 +807,20 @@ function libraryVersion(): string { const require = createRequire(import.meta.url); const pkg = require("../package.json") as { version: string }; return pkg.version; - } catch { - return "_unknown_"; + } catch (err: unknown) { + if ( + err instanceof Error && + "code" in err && + (err as NodeJS.ErrnoException).code === "MODULE_NOT_FOUND" + ) { + return "_unknown_"; + } + throw err; } } function randomSuffix(): string { - return Math.random().toString(36).slice(2, 10); + return randomUUID().replace(/-/g, "").slice(0, 8); } /** @@ -770,7 +828,15 @@ function randomSuffix(): string { * If the URL already contains a heartbeat parameter, it is left as-is. */ function appendHeartbeat(url: string, heartbeat: number): string { - if (url.includes("heartbeat=")) return url; - const separator = url.includes("?") ? "&" : "?"; - return `${url}${separator}heartbeat=${heartbeat}`; + try { + const parsed = new URL(url); + if (parsed.searchParams.has("heartbeat")) return url; + parsed.searchParams.set("heartbeat", String(heartbeat)); + return parsed.toString(); + } catch { + // Fallback for non-standard AMQP URLs that URL cannot parse + if (url.includes("heartbeat=")) return url; + const separator = url.includes("?") ? "&" : "?"; + return `${url}${separator}heartbeat=${heartbeat}`; + } } diff --git a/src/consumer.ts b/src/consumer.ts index 89c6f62..30dee5c 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -31,13 +31,22 @@ type Logger = Pick; /** RoutingKeyHandlers maps routing keys to typed event handlers. */ type RoutingKeyHandlers = Map>; +/** Options for QueueConsumer params beyond the required queue/logger/propagator. */ +export interface QueueConsumerOptions { + onNotification?: NotificationHandler; + onError?: ErrorNotificationHandler; + metrics?: MetricsRecorder; + routingKeyMapper?: RoutingKeyMapper; + legacySupport?: boolean; +} + /** - * QueueConsumer manages a single AMQP queue with routing-key → handler dispatch. + * QueueConsumer manages a single AMQP queue with routing-key -> handler dispatch. * Mirrors golang/amqp/consumer.go queueConsumer. */ export class QueueConsumer { readonly queue: string; - readonly handlers: RoutingKeyHandlers = new Map(); + private readonly handlers: RoutingKeyHandlers = new Map(); private logger: Logger; private propagator?: TextMapPropagator; private consumerTag = ""; @@ -52,20 +61,21 @@ export class QueueConsumer { queue: string, logger: Logger, propagator?: TextMapPropagator, - onNotification?: NotificationHandler, - onError?: ErrorNotificationHandler, - metrics?: MetricsRecorder, - routingKeyMapper?: RoutingKeyMapper, - legacySupport = false, + options?: QueueConsumerOptions, ) { this.queue = queue; this.logger = logger; this.propagator = propagator; - this.onNotification = onNotification; - this.onError = onError; - this.metrics = metrics; - this.routingKeyMapper = routingKeyMapper; - this.legacySupport = legacySupport; + this.onNotification = options?.onNotification; + this.onError = options?.onError; + this.metrics = options?.metrics; + this.routingKeyMapper = options?.routingKeyMapper; + this.legacySupport = options?.legacySupport ?? false; + } + + /** Returns a read-only view of the registered handlers. */ + getHandlers(): ReadonlyMap> { + return this.handlers; } addHandler(routingKey: string, handler: EventHandler): void { @@ -106,8 +116,8 @@ export class QueueConsumer { stop(): void { if (this.stopped) return; this.stopped = true; - this.logger.warn( - `[gomessaging/amqp] consumer loop exited, delivery channel closed for queue "${this.queue}"`, + this.logger.info( + `[gomessaging/amqp] consumer stopped for queue "${this.queue}"`, ); } @@ -215,6 +225,9 @@ export class QueueConsumer { } else { channel.nack(msg, false, true); } + }) + .catch((err: unknown) => { + this.logger.error(`[gomessaging/amqp] ack/nack failed: ${String(err)}`); }); } } diff --git a/src/index.ts b/src/index.ts index c68d958..c918905 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,4 +22,5 @@ export type { ConnectionOptions, ConsumerOptions } from "./connection.js"; export { Publisher, WithoutPublisherConfirms } from "./publisher.js"; export type { PublisherOptions } from "./publisher.js"; export { QueueConsumer } from "./consumer.js"; +export type { QueueConsumerOptions } from "./consumer.js"; export { injectToHeaders, extractToContext } from "./tracing.js"; From c3901e5a601e966910325f217c95e2d8c75292e5 Mon Sep 17 00:00:00 2001 From: Peter Svensson Date: Wed, 1 Apr 2026 23:04:01 +0200 Subject: [PATCH 2/2] fix: correct test assertion to match warn log level for channel errors --- __tests__/prefetch-and-channel-close.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/__tests__/prefetch-and-channel-close.test.ts b/__tests__/prefetch-and-channel-close.test.ts index 1f96afa..233d101 100644 --- a/__tests__/prefetch-and-channel-close.test.ts +++ b/__tests__/prefetch-and-channel-close.test.ts @@ -329,7 +329,7 @@ describe("channel close listener", () => { consumerCh.emit("error", new Error("channel reset")); - expect(silentLogger.error).toHaveBeenCalledWith( + expect(silentLogger.warn).toHaveBeenCalledWith( expect.stringContaining("channel error: channel reset"), ); });