[DRAFT] ❇️ Support consuming messages in batches#3
[DRAFT] ❇️ Support consuming messages in batches#3abdalrhman124 wants to merge 5 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR introduces batch message consumption functionality to the AMQP client, allowing messages to be processed in groups rather than individually. The implementation adds configurable batch size and timeout options to improve throughput for high-volume message processing scenarios.
- Adds batch processing capability with configurable
batchSizeandbatchTimeoutoptions - Implements timeout-based batch flushing to ensure messages aren't held indefinitely
- Maintains backward compatibility by defaulting to single-message processing when batch options aren't specified
Reviewed Changes
Copilot reviewed 4 out of 8 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| src/amqp-client.types.ts | Adds batchSize and batchTimeout options to ConsumeOptions interface |
| src/amqp-client.ts | Implements batch listener logic with timer-based processing and refactors message handling |
| tests/amqp-client.spec.ts | Adds comprehensive test coverage for batch listener functionality and edge cases |
| package.json | Updates version to 0.3.0-beta.1 for the new batch feature |
Comments suppressed due to low confidence (1)
tests/amqp-client.spec.ts:268
- The test verifies that onMessage is called for each message individually, but it doesn't test the actual batch processing behavior. The current implementation suggests messages are processed individually rather than as a true batch, which may not match the expected batch processing semantics.
it('should call the onMessage handler for each message', () => {
|
|
||
| type AMQPError = { code: number; message: string } | ||
|
|
||
| const DEFUALT_BATCH_TIMEOUT: number = 2000 |
There was a problem hiding this comment.
The constant name 'DEFUALT_BATCH_TIMEOUT' contains a typo. It should be 'DEFAULT_BATCH_TIMEOUT'.
| const DEFUALT_BATCH_TIMEOUT: number = 2000 | |
| const DEFAULT_BATCH_TIMEOUT: number = 2000 |
| timer = setInterval(() => { | ||
| if (batch.length > 0) { | ||
| processBatch() | ||
| } | ||
| } catch (error) { | ||
| this.logger.error('🚨 Message processing error:', error) | ||
| channel.nack(msg, false, false) | ||
| }, options?.batchTimeout ?? DEFUALT_BATCH_TIMEOUT) | ||
| } | ||
|
|
||
| const clearTimer = () => { | ||
| if (timer) { | ||
| clearInterval(timer) | ||
| timer = null | ||
| } | ||
| } | ||
|
|
||
| setTimer() |
There was a problem hiding this comment.
Using setInterval for batch processing creates a potential memory leak and inefficient timer management. The timer continues running even when no messages are being processed. Consider using setTimeout and resetting it only when needed, or clearing the timer when the batch is empty.
| timer = setInterval(() => { | |
| if (batch.length > 0) { | |
| processBatch() | |
| } | |
| } catch (error) { | |
| this.logger.error('🚨 Message processing error:', error) | |
| channel.nack(msg, false, false) | |
| }, options?.batchTimeout ?? DEFUALT_BATCH_TIMEOUT) | |
| } | |
| const clearTimer = () => { | |
| if (timer) { | |
| clearInterval(timer) | |
| timer = null | |
| } | |
| } | |
| setTimer() | |
| timer = setTimeout(async () => { | |
| if (batch.length > 0) { | |
| await processBatch() | |
| setTimer() // Reset the timer after processing the batch | |
| } else { | |
| clearTimer() // Clear the timer if the batch is empty | |
| } | |
| }, options?.batchTimeout ?? DEFUALT_BATCH_TIMEOUT) | |
| } | |
| const clearTimer = () => { | |
| if (timer) { | |
| clearTimeout(timer) | |
| timer = null | |
| } | |
| } |
| await Promise.allSettled( | ||
| batch.map(async (msg) => { | ||
| return await this.processSingleMessage(queueName, msg, channel, onMessage, options) | ||
| }) | ||
| ) |
There was a problem hiding this comment.
Using Promise.allSettled() means that if one message fails, others will still be processed, but the failed message acknowledgment logic inside processSingleMessage may not handle this correctly in a batch context. Each message should be individually acknowledged or nacked based on its processing result, but this approach doesn't capture individual results.
| await Promise.allSettled( | |
| batch.map(async (msg) => { | |
| return await this.processSingleMessage(queueName, msg, channel, onMessage, options) | |
| }) | |
| ) | |
| for (const msg of batch) { | |
| try { | |
| const success = await this.processSingleMessage(queueName, msg, channel, onMessage, options) | |
| if (success) { | |
| channel.ack(msg) | |
| } else { | |
| channel.nack(msg, false, false) | |
| } | |
| } catch (error) { | |
| this.logger.error(`🚨 Error processing message from batch: ${error.message}`, error) | |
| channel.nack(msg, false, false) | |
| } | |
| } |
There was a problem hiding this comment.
I believe that here we should be safe as we are already doing that inside the processSingleMessage, probably you can confirm @abdalrhman124
| } catch (error) { | ||
| this.logger.error('🚨 Message processing error:', error) | ||
| channel.nack(msg, false, false) | ||
| }, options?.batchTimeout ?? DEFUALT_BATCH_TIMEOUT) |
There was a problem hiding this comment.
The constant name 'DEFUALT_BATCH_TIMEOUT' contains a typo and should be 'DEFAULT_BATCH_TIMEOUT'.
| }, options?.batchTimeout ?? DEFUALT_BATCH_TIMEOUT) | |
| }, options?.batchTimeout ?? DEFAULT_BATCH_TIMEOUT) |
There was a problem hiding this comment.
@abdalrhman124 probably same suggestion as the other one. Github is a good catcher XD
|
Not sure if we have an idea of how to achieve that. But the necessity of having the dist here is provoking large number of changes in the PR. @abdalrhman124 @rubenjimenez in case you have other ideas. Apart from that good PR, just a couple of comments from my side and to take a look on Github suggestions, not all of them seems necessary, probably some are misunderstandings. |
What?
Support consuming messages in batches
[NOTE] Still testing it locally.