From 2faa5478b16ea1afde1fcc721fcf8f89c50b4c94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20K=C3=B6nig?= Date: Mon, 17 Nov 2025 16:35:33 +0100 Subject: [PATCH 1/2] chore: add library --- .ci/runChecks.sh | 2 +- package-lock.json | 24 ++++++++++++++++++++++++ package.json | 1 + 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/.ci/runChecks.sh b/.ci/runChecks.sh index 9186ab1d7..e5cd57a9a 100755 --- a/.ci/runChecks.sh +++ b/.ci/runChecks.sh @@ -11,4 +11,4 @@ npm run lint:prettier # auditing npx license-check --ignoreRegex "@nmshd/*" -npx better-npm-audit audit --exclude 1109754 +npx better-npm-audit audit diff --git a/package-lock.json b/package-lock.json index 56f8fa48e..88103937f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "@nmshd/typescript-ioc": "^3.2.5", "@nmshd/typescript-rest": "^3.2.1", "agentkeepalive": "4.6.0", + "amqp-connection-manager": "^5.0.0", "amqplib": "^0.10.9", "axios": "^1.13.2", "compression": "1.8.1", @@ -4324,11 +4325,28 @@ } } }, + "node_modules/amqp-connection-manager": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/amqp-connection-manager/-/amqp-connection-manager-5.0.0.tgz", + "integrity": "sha512-88yQzqa5RSBgnLl504XjvCQJ7d+osskdwvg35Lwm1LRbfLjNU9p7SQUMSP82BB7mseiq9tIUPJ3HE3eXQbpjEw==", + "license": "MIT", + "dependencies": { + "promise-breaker": "^6.0.0" + }, + "engines": { + "node": ">=10.0.0", + "npm": ">5.0.0" + }, + "peerDependencies": { + "amqplib": "*" + } + }, "node_modules/amqplib": { "version": "0.10.9", "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.9.tgz", "integrity": "sha512-jwSftI4QjS3mizvnSnOrPGYiUnm1vI2OP1iXeOUz5pb74Ua0nbf6nPyyTzuiCLEE3fMpaJORXh2K/TQ08H5xGA==", "license": "MIT", + "peer": true, "dependencies": { "buffer-more-ints": "~1.0.0", "url-parse": "~1.5.10" @@ -10993,6 +11011,12 @@ "integrity": "sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==", "license": "MIT" }, + "node_modules/promise-breaker": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/promise-breaker/-/promise-breaker-6.0.0.tgz", + "integrity": "sha512-BthzO9yTPswGf7etOBiHCVuugs2N01/Q/94dIPls48z2zCmrnDptUUZzfIb+41xq0MnYZ/BzmOd6ikDR4ibNZA==", + "license": "MIT" + }, "node_modules/proto3-json-serializer": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/proto3-json-serializer/-/proto3-json-serializer-3.0.2.tgz", diff --git a/package.json b/package.json index d65890a48..c8baa5f92 100644 --- a/package.json +++ b/package.json @@ -88,6 +88,7 @@ "@nmshd/typescript-ioc": "^3.2.5", "@nmshd/typescript-rest": "^3.2.1", "agentkeepalive": "4.6.0", + "amqp-connection-manager": "^5.0.0", "amqplib": "^0.10.9", "axios": "^1.13.2", "compression": "1.8.1", From 6a6994ed91392be8e18562563000e30a3912526b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20K=C3=B6nig?= Date: Mon, 17 Nov 2025 16:41:38 +0100 Subject: [PATCH 2/2] fix: use the new lib --- .../connectors/AMQPConnector.ts | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/modules/messageBrokerPublisher/connectors/AMQPConnector.ts b/src/modules/messageBrokerPublisher/connectors/AMQPConnector.ts index dcf3f4197..14ec83798 100644 --- a/src/modules/messageBrokerPublisher/connectors/AMQPConnector.ts +++ b/src/modules/messageBrokerPublisher/connectors/AMQPConnector.ts @@ -1,5 +1,5 @@ import { ILogger } from "@js-soft/logging-abstractions"; -import amqp from "amqplib"; +import { AmqpConnectionManager, ChannelWrapper, connect as amqpConnect } from "amqp-connection-manager"; import { MessageBrokerConnector } from "./MessageBrokerConnector"; export interface AMQPConnectorConfiguration { @@ -9,8 +9,8 @@ export interface AMQPConnectorConfiguration { } export class AMQPConnector extends MessageBrokerConnector { - private connection?: amqp.ChannelModel; - private channel?: amqp.Channel; + private connection?: AmqpConnectionManager; + private channel?: ChannelWrapper; public constructor(configuration: AMQPConnectorConfiguration, logger: ILogger) { super(configuration, logger); @@ -21,24 +21,24 @@ export class AMQPConnector extends MessageBrokerConnector { const url = this.configuration.url; - this.connection = await amqp.connect(url, { timeout: this.configuration.timeout ?? 2000 }).catch((e) => { - throw new Error(`Could not connect to RabbitMQ at '${url}' (${e.message})`); - }); + this.connection = amqpConnect(url, { connectionOptions: { timeout: this.configuration.timeout ?? 2000 } }); - this.channel = await this.connection.createChannel().catch((e) => { - throw new Error(`Could not create a channel for RabbitMQ (${e.message})`); + await this.connection.connect().catch((e) => { + throw new Error(`Could not connect to RabbitMQ at '${url}' (${e.message})`); }); const exchange = this.configuration.exchange ?? ""; + this.channel = this.connection.createChannel({ json: true }); + await this.channel.checkExchange(exchange).catch(() => { throw new Error(`The configured exchange '${exchange}' does not exist.`); }); } - public publish(namespace: string, data: Buffer): void { + public async publish(namespace: string, data: Buffer): Promise { const exchangeName = this.configuration.exchange ?? ""; - const sent = this.channel!.publish(exchangeName, namespace, data); + const sent = await this.channel!.publish(exchangeName, namespace, data); if (!sent) { this.logger.error(`Publishing event '${namespace}' to exchange '${exchangeName}' failed.`); }