From 5b24167727f5caf5049423a81ebe68296335515b Mon Sep 17 00:00:00 2001 From: Jez Barnsley Date: Mon, 27 Apr 2026 08:42:35 +0100 Subject: [PATCH 1/7] Rework with retries Co-authored-by: Copilot --- src/messaging/event.js | 83 +++++++++++++++++++++++++------------ src/messaging/event.test.js | 14 +++---- 2 files changed, 64 insertions(+), 33 deletions(-) diff --git a/src/messaging/event.js b/src/messaging/event.js index 7806efb..19e702c 100644 --- a/src/messaging/event.js +++ b/src/messaging/event.js @@ -81,7 +81,7 @@ export async function resubmitDlqMessage(messageId, messageJson) { `[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}. About to delete old message from DLQ` ) - await deleteDlqMessage(messageId) + await deleteDlqMessage(messageId, 5, 1) logger.info( `[DLQ] Deleted message id ${messageId} from DLQ after resubmitting new message id ${sendResult.MessageId}` ) @@ -99,37 +99,68 @@ export async function resubmitDlqMessage(messageId, messageJson) { * This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero) * otherwise the receipt handles become stale and the delete operation doesn't work. * @param {string} messageId + * @param {number} maxAttempts - Maximum number of receive attempts (default 50) + * @param {number} waitTimeSeconds - Wait time between attempts (default 1) */ -export async function deleteDlqMessage(messageId) { - const receiveCommand = new ReceiveMessageCommand({ - QueueUrl: deadLetterQueueUrl, - MaxNumberOfMessages: 10, - VisibilityTimeout: 2, - WaitTimeSeconds: 0 - }) - const messageResponse = await sqsClient.send(receiveCommand) +export async function deleteDlqMessage( + messageId, + maxAttempts = 5, + waitTimeSeconds = 1 +) { + let attempts = 0 + let foundMessage = null + + while (attempts < maxAttempts) { + attempts++ + + const receiveCommand = new ReceiveMessageCommand({ + QueueUrl: deadLetterQueueUrl, + MaxNumberOfMessages: 10, + VisibilityTimeout: 1, + WaitTimeSeconds: 1 + }) + const messageResponse = await sqsClient.send(receiveCommand) + + const messages = messageResponse.Messages ?? [] + for (const mess of messages) { + logger.info( + `[DLQ] [Delete] Received message with id ${mess.MessageId} on attempt ${attempts}` + ) + } + + const messagesFound = messages.filter((m) => m.MessageId === messageId) + + if (messagesFound.length > 0) { + foundMessage = messagesFound[0] + logger.info( + `[DLQ] Found message with id ${messageId} on attempt ${attempts}` + ) + break + } + + if (attempts < maxAttempts) { + logger.info( + `[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...` + ) + await new Promise((resolve) => + setTimeout(resolve, waitTimeSeconds * 1000) + ) + } + } - const messages = messageResponse.Messages - ? messageResponse.Messages.filter((m) => m.MessageId === messageId) - : undefined - if (!messages?.length) { - const errorText = `Message with id ${messageId} not found in notify-listener DLQ` + if (!foundMessage) { + const errorText = `Message with id ${messageId} not found in notify-listener DLQ after ${maxAttempts} attempts` logger.info(errorText) throw new Error(errorText) } - logger.info( - `[DLQ] Number of messages found with id ${messageId}: ${messages.length}` - ) - for (const message of messages) { - const deleteCommand = new DeleteMessageCommand({ - QueueUrl: deadLetterQueueUrl, - ReceiptHandle: message.ReceiptHandle - }) - logger.info(`[DLQ] Deleting message with id ${messageId}`) - await sqsClient.send(deleteCommand) - logger.info(`[DLQ] Deleted message with id ${messageId}`) - } + const deleteCommand = new DeleteMessageCommand({ + QueueUrl: deadLetterQueueUrl, + ReceiptHandle: foundMessage.ReceiptHandle + }) + logger.info(`[DLQ] Deleting message with id ${messageId}`) + await sqsClient.send(deleteCommand) + logger.info(`[DLQ] Deleted message with id ${messageId}`) } /** diff --git a/src/messaging/event.test.js b/src/messaging/event.test.js index 99e5bd1..f8403f4 100644 --- a/src/messaging/event.test.js +++ b/src/messaging/event.test.js @@ -101,12 +101,12 @@ describe('event', () => { } snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) - await deleteDlqMessage(messageStub.MessageId) + await deleteDlqMessage(messageStub.MessageId, 5, 0) expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, { QueueUrl: expect.any(String), MaxNumberOfMessages: 10, - VisibilityTimeout: 2, - WaitTimeSeconds: 0 + VisibilityTimeout: 1, + WaitTimeSeconds: 1 }) expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, { QueueUrl: expect.any(String), @@ -114,16 +114,16 @@ describe('event', () => { }) }) - it('should throw if message not found', async () => { + it('should throw if message not found after max attempts', async () => { const receivedMessage = { Messages: [] } snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) await expect(() => - deleteDlqMessage(messageStub.MessageId) + deleteDlqMessage(messageStub.MessageId, 5, 1) ).rejects.toThrow( - 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ' + 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ after 5 attempts' ) }) }) @@ -167,7 +167,7 @@ describe('event', () => { ).rejects.toThrow( 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ' ) - }) + }, 10000) }) }) From 3a3ae8aab0190fa4c216439a6254b0922e8270b3 Mon Sep 17 00:00:00 2001 From: Jez Barnsley Date: Mon, 27 Apr 2026 08:51:45 +0100 Subject: [PATCH 2/7] Sonar linting --- src/messaging/event.js | 11 +++++++---- src/messaging/event.test.js | 21 ++++++++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/messaging/event.js b/src/messaging/event.js index 19e702c..100a3a6 100644 --- a/src/messaging/event.js +++ b/src/messaging/event.js @@ -16,6 +16,9 @@ const deadLetterQueueArn = config.get('sqsEventsDlqArn') const maxNumberOfMessages = config.get('maxNumberOfMessages') const visibilityTimeout = config.get('visibilityTimeout') +const DEFAULT_MAX_RETRIES = 5 +const DEFAULT_RETRY_WAIT_TIME_IN_SECS = 1 + const logger = createLogger() /** @@ -81,7 +84,7 @@ export async function resubmitDlqMessage(messageId, messageJson) { `[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}. About to delete old message from DLQ` ) - await deleteDlqMessage(messageId, 5, 1) + await deleteDlqMessage(messageId, DEFAULT_MAX_RETRIES) logger.info( `[DLQ] Deleted message id ${messageId} from DLQ after resubmitting new message id ${sendResult.MessageId}` ) @@ -99,13 +102,13 @@ export async function resubmitDlqMessage(messageId, messageJson) { * This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero) * otherwise the receipt handles become stale and the delete operation doesn't work. * @param {string} messageId - * @param {number} maxAttempts - Maximum number of receive attempts (default 50) + * @param {number} maxAttempts - Maximum number of receive attempts (default 5) * @param {number} waitTimeSeconds - Wait time between attempts (default 1) */ export async function deleteDlqMessage( messageId, - maxAttempts = 5, - waitTimeSeconds = 1 + maxAttempts = DEFAULT_MAX_RETRIES, + waitTimeSeconds = DEFAULT_RETRY_WAIT_TIME_IN_SECS ) { let attempts = 0 let foundMessage = null diff --git a/src/messaging/event.test.js b/src/messaging/event.test.js index f8403f4..da37b20 100644 --- a/src/messaging/event.test.js +++ b/src/messaging/event.test.js @@ -114,6 +114,25 @@ describe('event', () => { }) }) + it('should delete event message with default values', async () => { + const receivedMessage = { + Messages: [messageStub, messageStub, messageStub] + } + + snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) + await deleteDlqMessage(messageStub.MessageId, 3, 1) + expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, { + QueueUrl: expect.any(String), + MaxNumberOfMessages: 10, + VisibilityTimeout: 1, + WaitTimeSeconds: 1 + }) + expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, { + QueueUrl: expect.any(String), + ReceiptHandle: receiptHandle + }) + }) + it('should throw if message not found after max attempts', async () => { const receivedMessage = { Messages: [] @@ -121,7 +140,7 @@ describe('event', () => { snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) await expect(() => - deleteDlqMessage(messageStub.MessageId, 5, 1) + deleteDlqMessage(messageStub.MessageId, 5, 0) ).rejects.toThrow( 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ after 5 attempts' ) From 06826164cc82565c548f568e02419fcc66b7a655 Mon Sep 17 00:00:00 2001 From: Jez Barnsley Date: Mon, 27 Apr 2026 13:58:44 +0100 Subject: [PATCH 3/7] Accepts params for visTimeout and waitTime --- src/messaging/event.js | 52 +++++++++++++++++-------------------- src/messaging/event.test.js | 49 ++++++++-------------------------- src/routes/admin.js | 27 +++++++++++++++---- src/routes/admin.test.js | 6 ++++- 4 files changed, 62 insertions(+), 72 deletions(-) diff --git a/src/messaging/event.js b/src/messaging/event.js index 100a3a6..67141cd 100644 --- a/src/messaging/event.js +++ b/src/messaging/event.js @@ -16,8 +16,10 @@ const deadLetterQueueArn = config.get('sqsEventsDlqArn') const maxNumberOfMessages = config.get('maxNumberOfMessages') const visibilityTimeout = config.get('visibilityTimeout') -const DEFAULT_MAX_RETRIES = 5 -const DEFAULT_RETRY_WAIT_TIME_IN_SECS = 1 +const MAX_RETRIES = 7 +const RETRY_WAIT_BETWEEN_TRIES_IN_SECS = 1 +const DEFAULT_VISIBILITY_TIMEOUT = 3 +const DEFAULT_WAIT_TIME_IN_SECS = 3 const logger = createLogger() @@ -43,12 +45,15 @@ export function receiveEventMessages() { * Receive dead-letter queue messages * @returns {Promise} */ -export function receiveDlqMessages() { +export function receiveDlqMessages( + visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT, + waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS +) { const command = new ReceiveMessageCommand({ QueueUrl: deadLetterQueueUrl, MaxNumberOfMessages: 10, - VisibilityTimeout: 0, - WaitTimeSeconds: 0 + VisibilityTimeout: visibilityTimeout, + WaitTimeSeconds: waitTimeSeconds }) return sqsClient.send(command) } @@ -81,17 +86,12 @@ export async function resubmitDlqMessage(messageId, messageJson) { }) const sendResult = await sqsClient.send(command) logger.info( - `[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}. About to delete old message from DLQ` - ) - - await deleteDlqMessage(messageId, DEFAULT_MAX_RETRIES) - logger.info( - `[DLQ] Deleted message id ${messageId} from DLQ after resubmitting new message id ${sendResult.MessageId}` + `[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}` ) } catch (err) { logger.error( err, - `[DLQ] Failed to submit new message to main queue or failed to delete old message of id ${messageId} from DLQ` + `[DLQ] Failed to submit new message to main queue based on old message of id ${messageId} from DLQ` ) throw err } @@ -102,28 +102,24 @@ export async function resubmitDlqMessage(messageId, messageJson) { * This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero) * otherwise the receipt handles become stale and the delete operation doesn't work. * @param {string} messageId - * @param {number} maxAttempts - Maximum number of receive attempts (default 5) - * @param {number} waitTimeSeconds - Wait time between attempts (default 1) + * @param {number} [visibilityTimeout] - Queue visibilityTimeout + * @param {number} [waitTimeSeconds] - Queue waitTimeSeconds */ export async function deleteDlqMessage( messageId, - maxAttempts = DEFAULT_MAX_RETRIES, - waitTimeSeconds = DEFAULT_RETRY_WAIT_TIME_IN_SECS + visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT, + waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS ) { let attempts = 0 let foundMessage = null - while (attempts < maxAttempts) { + while (attempts < MAX_RETRIES) { attempts++ - const receiveCommand = new ReceiveMessageCommand({ - QueueUrl: deadLetterQueueUrl, - MaxNumberOfMessages: 10, - VisibilityTimeout: 1, - WaitTimeSeconds: 1 - }) - const messageResponse = await sqsClient.send(receiveCommand) - + const messageResponse = await receiveDlqMessages( + visibilityTimeout, + waitTimeSeconds + ) const messages = messageResponse.Messages ?? [] for (const mess of messages) { logger.info( @@ -141,18 +137,18 @@ export async function deleteDlqMessage( break } - if (attempts < maxAttempts) { + if (attempts < MAX_RETRIES) { logger.info( `[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...` ) await new Promise((resolve) => - setTimeout(resolve, waitTimeSeconds * 1000) + setTimeout(resolve, RETRY_WAIT_BETWEEN_TRIES_IN_SECS * 1000) ) } } if (!foundMessage) { - const errorText = `Message with id ${messageId} not found in notify-listener DLQ after ${maxAttempts} attempts` + const errorText = `Message with id ${messageId} not found in notify-listener DLQ after ${MAX_RETRIES} attempts` logger.info(errorText) throw new Error(errorText) } diff --git a/src/messaging/event.test.js b/src/messaging/event.test.js index da37b20..4eb62c7 100644 --- a/src/messaging/event.test.js +++ b/src/messaging/event.test.js @@ -70,8 +70,8 @@ describe('event', () => { await receiveDlqMessages() expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, { QueueUrl: expect.any(String), - VisibilityTimeout: 0, - WaitTimeSeconds: 0 + VisibilityTimeout: 3, + WaitTimeSeconds: 3 }) }) }) @@ -101,12 +101,12 @@ describe('event', () => { } snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) - await deleteDlqMessage(messageStub.MessageId, 5, 0) + await deleteDlqMessage(messageStub.MessageId, 5, 2) expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, { QueueUrl: expect.any(String), MaxNumberOfMessages: 10, - VisibilityTimeout: 1, - WaitTimeSeconds: 1 + VisibilityTimeout: 5, + WaitTimeSeconds: 2 }) expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, { QueueUrl: expect.any(String), @@ -120,12 +120,12 @@ describe('event', () => { } snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) - await deleteDlqMessage(messageStub.MessageId, 3, 1) + await deleteDlqMessage(messageStub.MessageId) expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, { QueueUrl: expect.any(String), MaxNumberOfMessages: 10, - VisibilityTimeout: 1, - WaitTimeSeconds: 1 + VisibilityTimeout: 3, + WaitTimeSeconds: 3 }) expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, { QueueUrl: expect.any(String), @@ -140,20 +140,15 @@ describe('event', () => { snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) await expect(() => - deleteDlqMessage(messageStub.MessageId, 5, 0) + deleteDlqMessage(messageStub.MessageId, 0, 0) ).rejects.toThrow( - 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ after 5 attempts' + 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ after 7 attempts' ) - }) + }, 10000) }) describe('resubmitDlqMessage', () => { it('should resubmit message and delete old one from DLQ', async () => { - const receivedMessage = { - Messages: [messageStub] - } - snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) - const sendMessage = { MessageId: '12345' } @@ -164,29 +159,7 @@ describe('event', () => { QueueUrl: expect.any(String), MessageBody: messageStub.Body }) - expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, { - QueueUrl: expect.any(String), - ReceiptHandle: receiptHandle - }) }) - - it('should throw if message not found', async () => { - const receivedMessage = { - Messages: [] - } - snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) - - const sendMessage = { - MessageId: '12345' - } - snsMock.on(SendMessageCommand).resolves(sendMessage) - - await expect(() => - resubmitDlqMessage(messageStub.MessageId, messageStub.Body) - ).rejects.toThrow( - 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ' - ) - }, 10000) }) }) diff --git a/src/routes/admin.js b/src/routes/admin.js index 03acbe1..22c477b 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -17,6 +17,11 @@ const messageIdSchema = Joi.object({ messageId: Joi.string().required() }) +const timeoutQuerySchema = Joi.object({ + visibilityTimeout: Joi.number().optional(), + waitTimeSeconds: Joi.number().optional() +}) + export default [ /** * @satisfies {ServerRoute} @@ -24,13 +29,20 @@ export default [ ({ method: 'GET', path: '/admin/deadletter/view', - async handler(_request, h) { - const messages = await receiveDlqMessages() + async handler(request, h) { + const { visibilityTimeout, waitTimeSeconds } = request.query + const messages = await receiveDlqMessages( + visibilityTimeout, + waitTimeSeconds + ) return h.response({ messages: messages.Messages ?? [] }).code(OK_RESPONSE) }, options: { auth: { scope: [`+${Scopes.DeadLetterQueues}`] + }, + validate: { + query: timeoutQuerySchema } } }), @@ -83,10 +95,14 @@ export default [ method: 'DELETE', path: '/admin/deadletter/{messageId}', async handler(request, h) { - const { params } = request + const { params, query } = request const { messageId } = params logger.info(`Deleting DLQ message ${messageId}`) - await deleteDlqMessage(messageId) + await deleteDlqMessage( + messageId, + query.visibilityTimeout, + query.waitTimeSeconds + ) logger.info(`Deleted DLQ message ${messageId}`) return h.response({ message: 'success' }).code(OK_RESPONSE) }, @@ -95,7 +111,8 @@ export default [ scope: [`+${Scopes.DeadLetterQueues}`] }, validate: { - params: messageIdSchema + params: messageIdSchema, + query: timeoutQuerySchema } } }) diff --git a/src/routes/admin.test.js b/src/routes/admin.test.js index 06270cf..c32ab15 100644 --- a/src/routes/admin.test.js +++ b/src/routes/admin.test.js @@ -85,7 +85,11 @@ describe('Admin routes', () => { expect(response.statusCode).toEqual(okStatusCode) expect(response.headers['content-type']).toContain(jsonContentType) expect(response.result).toEqual({ message: 'success' }) - expect(deleteDlqMessage).toHaveBeenCalledWith('message-id') + expect(deleteDlqMessage).toHaveBeenCalledWith( + 'message-id', + undefined, + undefined + ) }) }) }) From e382cf3ea716a012e830b25356485ec2708a0f3f Mon Sep 17 00:00:00 2001 From: Jez Barnsley Date: Mon, 27 Apr 2026 15:25:46 +0100 Subject: [PATCH 4/7] Sonar fix --- src/messaging/event.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/messaging/event.js b/src/messaging/event.js index 67141cd..ef159dd 100644 --- a/src/messaging/event.js +++ b/src/messaging/event.js @@ -14,7 +14,7 @@ const queueUrl = config.get('sqsEventsQueueUrl') const deadLetterQueueUrl = `${queueUrl}-deadletter` const deadLetterQueueArn = config.get('sqsEventsDlqArn') const maxNumberOfMessages = config.get('maxNumberOfMessages') -const visibilityTimeout = config.get('visibilityTimeout') +const pollingVisibilityTimeout = config.get('visibilityTimeout') const MAX_RETRIES = 7 const RETRY_WAIT_BETWEEN_TRIES_IN_SECS = 1 @@ -29,7 +29,7 @@ const logger = createLogger() const input = { QueueUrl: queueUrl, MaxNumberOfMessages: maxNumberOfMessages, - VisibilityTimeout: visibilityTimeout + VisibilityTimeout: pollingVisibilityTimeout } /** From a6b8de586a234ed7c98f52570cfb3f22ae4cecb9 Mon Sep 17 00:00:00 2001 From: Jez Barnsley Date: Wed, 29 Apr 2026 12:10:45 +0100 Subject: [PATCH 5/7] Exposed getMessage endpoint --- src/messaging/event.js | 93 ++++++++++++++++++++++++---------------- src/routes/admin.js | 27 ++++++++++++ src/routes/admin.test.js | 15 +++++++ 3 files changed, 98 insertions(+), 37 deletions(-) diff --git a/src/messaging/event.js b/src/messaging/event.js index ef159dd..05b87d8 100644 --- a/src/messaging/event.js +++ b/src/messaging/event.js @@ -58,6 +58,57 @@ export function receiveDlqMessages( return sqsClient.send(command) } +/** + * Get a specific message from the dead-letter queue. Handles retries if not found. + * @param {string} messageId + * @param {number} [visibilityTimeout] - Queue visibilityTimeout + * @param {number} [waitTimeSeconds] - Queue waitTimeSeconds + * @returns {Promise< Message | null >} + */ +export async function getDlqMessage( + messageId, + visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT, + waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS +) { + let attempts = 0 + let foundMessage = null + + while (attempts < MAX_RETRIES) { + attempts++ + + const messageResponse = await receiveDlqMessages( + visibilityTimeout, + waitTimeSeconds + ) + const messages = messageResponse.Messages ?? [] + for (const mess of messages) { + logger.info( + `[DLQ] [Delete] Received message with id ${mess.MessageId} on attempt ${attempts}` + ) + } + + const messagesFound = messages.filter((m) => m.MessageId === messageId) + + if (messagesFound.length > 0) { + foundMessage = messagesFound[0] + logger.info( + `[DLQ] Found message with id ${messageId} on attempt ${attempts}` + ) + break + } + + if (attempts < MAX_RETRIES) { + logger.info( + `[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...` + ) + await new Promise((resolve) => + setTimeout(resolve, RETRY_WAIT_BETWEEN_TRIES_IN_SECS * 1000) + ) + } + } + return foundMessage +} + /** * Redrive the specified message from the dead-letter queue to the main queue * @returns {Promise} @@ -110,43 +161,11 @@ export async function deleteDlqMessage( visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT, waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS ) { - let attempts = 0 - let foundMessage = null - - while (attempts < MAX_RETRIES) { - attempts++ - - const messageResponse = await receiveDlqMessages( - visibilityTimeout, - waitTimeSeconds - ) - const messages = messageResponse.Messages ?? [] - for (const mess of messages) { - logger.info( - `[DLQ] [Delete] Received message with id ${mess.MessageId} on attempt ${attempts}` - ) - } - - const messagesFound = messages.filter((m) => m.MessageId === messageId) - - if (messagesFound.length > 0) { - foundMessage = messagesFound[0] - logger.info( - `[DLQ] Found message with id ${messageId} on attempt ${attempts}` - ) - break - } - - if (attempts < MAX_RETRIES) { - logger.info( - `[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...` - ) - await new Promise((resolve) => - setTimeout(resolve, RETRY_WAIT_BETWEEN_TRIES_IN_SECS * 1000) - ) - } - } - + const foundMessage = await getDlqMessage( + messageId, + visibilityTimeout, + waitTimeSeconds + ) if (!foundMessage) { const errorText = `Message with id ${messageId} not found in notify-listener DLQ after ${MAX_RETRIES} attempts` logger.info(errorText) diff --git a/src/routes/admin.js b/src/routes/admin.js index 22c477b..3d3ca81 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -4,6 +4,7 @@ import Joi from 'joi' import { createLogger } from '~/src/helpers/logging/logger.js' import { deleteDlqMessage, + getDlqMessage, receiveDlqMessages, redriveDlqMessages, resubmitDlqMessage @@ -47,6 +48,32 @@ export default [ } }), + /** + * @satisfies {ServerRoute} + */ + ({ + method: 'GET', + path: '/admin/deadletter/view/{messageId}', + async handler(request, h) { + const { visibilityTimeout, waitTimeSeconds } = request.query + const message = await getDlqMessage( + request.params.messageId, + visibilityTimeout, + waitTimeSeconds + ) + return h.response({ message }).code(OK_RESPONSE) + }, + options: { + auth: { + scope: [`+${Scopes.DeadLetterQueues}`] + }, + validate: { + params: messageIdSchema, + query: timeoutQuerySchema + } + } + }), + /** * @satisfies {ServerRoute} */ diff --git a/src/routes/admin.test.js b/src/routes/admin.test.js index c32ab15..578b8a6 100644 --- a/src/routes/admin.test.js +++ b/src/routes/admin.test.js @@ -1,6 +1,7 @@ import { createServer } from '~/src/api/server.js' import { deleteDlqMessage, + getDlqMessage, receiveDlqMessages, redriveDlqMessages, resubmitDlqMessage @@ -41,6 +42,20 @@ describe('Admin routes', () => { expect(response.headers['content-type']).toContain(jsonContentType) expect(response.result).toEqual({ messages: [{ MessageId: 'message1' }] }) }) + + test('/admin/dead-letter/view/message-id route returns 200', async () => { + jest.mocked(getDlqMessage).mockResolvedValue({ MessageId: 'message1' }) + + const response = await server.inject({ + method: 'GET', + url: '/admin/deadletter/view/message1', + auth + }) + + expect(response.statusCode).toEqual(okStatusCode) + expect(response.headers['content-type']).toContain(jsonContentType) + expect(response.result).toEqual({ message: { MessageId: 'message1' } }) + }) }) describe('POST', () => { From f67624f5c545b8e7174372344fa6d2466f6349bc Mon Sep 17 00:00:00 2001 From: Jez Barnsley Date: Thu, 30 Apr 2026 13:56:47 +0100 Subject: [PATCH 6/7] After review --- src/messaging/event.js | 43 +++++++++++++++++-------------------- src/messaging/event.test.js | 2 +- src/routes/admin.js | 3 ++- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/src/messaging/event.js b/src/messaging/event.js index 05b87d8..4f92bcc 100644 --- a/src/messaging/event.js +++ b/src/messaging/event.js @@ -70,43 +70,39 @@ export async function getDlqMessage( visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT, waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS ) { - let attempts = 0 - let foundMessage = null - - while (attempts < MAX_RETRIES) { - attempts++ + let attempts = 1 + while (attempts <= MAX_RETRIES) { const messageResponse = await receiveDlqMessages( visibilityTimeout, waitTimeSeconds ) const messages = messageResponse.Messages ?? [] - for (const mess of messages) { + for (const m of messages) { logger.info( - `[DLQ] [Delete] Received message with id ${mess.MessageId} on attempt ${attempts}` + `[DLQ] Received message with id ${m.MessageId} on attempt ${attempts}` ) } - const messagesFound = messages.filter((m) => m.MessageId === messageId) - - if (messagesFound.length > 0) { - foundMessage = messagesFound[0] + const messageFound = messages.find((m) => m.MessageId === messageId) + if (messageFound) { logger.info( `[DLQ] Found message with id ${messageId} on attempt ${attempts}` ) - break + return messageFound } - if (attempts < MAX_RETRIES) { - logger.info( - `[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...` - ) - await new Promise((resolve) => - setTimeout(resolve, RETRY_WAIT_BETWEEN_TRIES_IN_SECS * 1000) - ) - } + logger.info( + `[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...` + ) + + await new Promise((resolve) => + setTimeout(resolve, RETRY_WAIT_BETWEEN_TRIES_IN_SECS * 1000) + ) + + attempts++ } - return foundMessage + return null } /** @@ -121,7 +117,7 @@ export function redriveDlqMessages() { } /** - * Submit the specified message to the main queue, and delete the messageId from the dead-letter queue + * Submit the specified message to the main queue * @param {string} messageId * @param {string} messageJson */ @@ -151,7 +147,8 @@ export async function resubmitDlqMessage(messageId, messageJson) { /** * Delete DLQ message by messageId * This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero) - * otherwise the receipt handles become stale and the delete operation doesn't work. + * otherwise the receipt handle becomes stale and the delete operation doesn't work. + * getDlqMessage uses retries in case the message is not always visibile when querying the DLQ. * @param {string} messageId * @param {number} [visibilityTimeout] - Queue visibilityTimeout * @param {number} [waitTimeSeconds] - Queue waitTimeSeconds diff --git a/src/messaging/event.test.js b/src/messaging/event.test.js index 4eb62c7..0d29ff9 100644 --- a/src/messaging/event.test.js +++ b/src/messaging/event.test.js @@ -148,7 +148,7 @@ describe('event', () => { }) describe('resubmitDlqMessage', () => { - it('should resubmit message and delete old one from DLQ', async () => { + it('should resubmit message to main queue', async () => { const sendMessage = { MessageId: '12345' } diff --git a/src/routes/admin.js b/src/routes/admin.js index 3d3ca81..02db215 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -13,6 +13,7 @@ import { const logger = createLogger() const OK_RESPONSE = 200 +const NOT_FOUND = 404 const messageIdSchema = Joi.object({ messageId: Joi.string().required() @@ -61,7 +62,7 @@ export default [ visibilityTimeout, waitTimeSeconds ) - return h.response({ message }).code(OK_RESPONSE) + return h.response({ message }).code(message ? OK_RESPONSE : NOT_FOUND) }, options: { auth: { From 75390a1eb3d518d6e607ba6ddb6f9a2e1e903e6e Mon Sep 17 00:00:00 2001 From: Jez Barnsley Date: Thu, 30 Apr 2026 14:15:57 +0100 Subject: [PATCH 7/7] Extra coverage --- src/messaging/event.test.js | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/messaging/event.test.js b/src/messaging/event.test.js index 0d29ff9..e288a1c 100644 --- a/src/messaging/event.test.js +++ b/src/messaging/event.test.js @@ -160,6 +160,13 @@ describe('event', () => { MessageBody: messageStub.Body }) }) + + it('should throw if resubmit fails', async () => { + snsMock.on(SendMessageCommand).rejects('bad SQS command') + await expect(() => + resubmitDlqMessage(messageStub.MessageId, messageStub.Body) + ).rejects.toThrow('bad SQS command') + }) }) })