diff --git a/src/messaging/event.js b/src/messaging/event.js index befe3c1..4f92bcc 100644 --- a/src/messaging/event.js +++ b/src/messaging/event.js @@ -14,7 +14,12 @@ const queueUrl = config.get('sqsEventsQueueUrl') const deadLetterQueueUrl = `${queueUrl}-deadletter` const deadLetterQueueArn = config.get('sqsEventsDlqArn') const maxNumberOfMessages = config.get('maxNumberOfMessages') -const visibilityTimeout = config.get('visibilityTimeout') +const pollingVisibilityTimeout = config.get('visibilityTimeout') + +const MAX_RETRIES = 7 +const RETRY_WAIT_BETWEEN_TRIES_IN_SECS = 1 +const DEFAULT_VISIBILITY_TIMEOUT = 3 +const DEFAULT_WAIT_TIME_IN_SECS = 3 const logger = createLogger() @@ -24,7 +29,7 @@ const logger = createLogger() const input = { QueueUrl: queueUrl, MaxNumberOfMessages: maxNumberOfMessages, - VisibilityTimeout: visibilityTimeout + VisibilityTimeout: pollingVisibilityTimeout } /** @@ -40,16 +45,66 @@ export function receiveEventMessages() { * Receive dead-letter queue messages * @returns {Promise} */ -export function receiveDlqMessages() { +export function receiveDlqMessages( + visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT, + waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS +) { const command = new ReceiveMessageCommand({ QueueUrl: deadLetterQueueUrl, MaxNumberOfMessages: 10, - VisibilityTimeout: 3, - WaitTimeSeconds: 3 + VisibilityTimeout: visibilityTimeout, + WaitTimeSeconds: waitTimeSeconds }) return sqsClient.send(command) } +/** + * Get a specific message from the dead-letter queue. Handles retries if not found. + * @param {string} messageId + * @param {number} [visibilityTimeout] - Queue visibilityTimeout + * @param {number} [waitTimeSeconds] - Queue waitTimeSeconds + * @returns {Promise< Message | null >} + */ +export async function getDlqMessage( + messageId, + visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT, + waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS +) { + let attempts = 1 + + while (attempts <= MAX_RETRIES) { + const messageResponse = await receiveDlqMessages( + visibilityTimeout, + waitTimeSeconds + ) + const messages = messageResponse.Messages ?? [] + for (const m of messages) { + logger.info( + `[DLQ] Received message with id ${m.MessageId} on attempt ${attempts}` + ) + } + + const messageFound = messages.find((m) => m.MessageId === messageId) + if (messageFound) { + logger.info( + `[DLQ] Found message with id ${messageId} on attempt ${attempts}` + ) + return messageFound + } + + logger.info( + `[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...` + ) + + await new Promise((resolve) => + setTimeout(resolve, RETRY_WAIT_BETWEEN_TRIES_IN_SECS * 1000) + ) + + attempts++ + } + return null +} + /** * Redrive the specified message from the dead-letter queue to the main queue * @returns {Promise} @@ -62,7 +117,7 @@ export function redriveDlqMessages() { } /** - * Submit the specified message to the main queue, and delete the messageId from the dead-letter queue + * Submit the specified message to the main queue * @param {string} messageId * @param {string} messageJson */ @@ -78,17 +133,12 @@ export async function resubmitDlqMessage(messageId, messageJson) { }) const sendResult = await sqsClient.send(command) logger.info( - `[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}. About to delete old message from DLQ` - ) - - await deleteDlqMessage(messageId) - logger.info( - `[DLQ] Deleted message id ${messageId} from DLQ after resubmitting new message id ${sendResult.MessageId}` + `[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}` ) } catch (err) { logger.error( err, - `[DLQ] Failed to submit new message to main queue or failed to delete old message of id ${messageId} from DLQ` + `[DLQ] Failed to submit new message to main queue based on old message of id ${messageId} from DLQ` ) throw err } @@ -97,39 +147,35 @@ export async function resubmitDlqMessage(messageId, messageJson) { /** * Delete DLQ message by messageId * This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero) - * otherwise the receipt handles become stale and the delete operation doesn't work. + * otherwise the receipt handle becomes stale and the delete operation doesn't work. + * getDlqMessage uses retries in case the message is not always visibile when querying the DLQ. * @param {string} messageId + * @param {number} [visibilityTimeout] - Queue visibilityTimeout + * @param {number} [waitTimeSeconds] - Queue waitTimeSeconds */ -export async function deleteDlqMessage(messageId) { - const receiveCommand = new ReceiveMessageCommand({ - QueueUrl: deadLetterQueueUrl, - MaxNumberOfMessages: 10, - VisibilityTimeout: 3, - WaitTimeSeconds: 3 - }) - const messageResponse = await sqsClient.send(receiveCommand) - - const messages = messageResponse.Messages - ? messageResponse.Messages.filter((m) => m.MessageId === messageId) - : undefined - if (!messages?.length) { - const errorText = `Message with id ${messageId} not found in notify-listener DLQ` +export async function deleteDlqMessage( + messageId, + visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT, + waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS +) { + const foundMessage = await getDlqMessage( + messageId, + visibilityTimeout, + waitTimeSeconds + ) + if (!foundMessage) { + const errorText = `Message with id ${messageId} not found in notify-listener DLQ after ${MAX_RETRIES} attempts` logger.info(errorText) throw new Error(errorText) } - logger.info( - `[DLQ] Number of messages found with id ${messageId}: ${messages.length}` - ) - for (const message of messages) { - const deleteCommand = new DeleteMessageCommand({ - QueueUrl: deadLetterQueueUrl, - ReceiptHandle: message.ReceiptHandle - }) - logger.info(`[DLQ] Deleting message with id ${messageId}`) - await sqsClient.send(deleteCommand) - logger.info(`[DLQ] Deleted message with id ${messageId}`) - } + const deleteCommand = new DeleteMessageCommand({ + QueueUrl: deadLetterQueueUrl, + ReceiptHandle: foundMessage.ReceiptHandle + }) + logger.info(`[DLQ] Deleting message with id ${messageId}`) + await sqsClient.send(deleteCommand) + logger.info(`[DLQ] Deleted message with id ${messageId}`) } /** diff --git a/src/messaging/event.test.js b/src/messaging/event.test.js index ebc7342..e288a1c 100644 --- a/src/messaging/event.test.js +++ b/src/messaging/event.test.js @@ -100,6 +100,25 @@ describe('event', () => { Messages: [messageStub, messageStub, messageStub] } + snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) + await deleteDlqMessage(messageStub.MessageId, 5, 2) + expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, { + QueueUrl: expect.any(String), + MaxNumberOfMessages: 10, + VisibilityTimeout: 5, + WaitTimeSeconds: 2 + }) + expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, { + QueueUrl: expect.any(String), + ReceiptHandle: receiptHandle + }) + }) + + it('should delete event message with default values', async () => { + const receivedMessage = { + Messages: [messageStub, messageStub, messageStub] + } + snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) await deleteDlqMessage(messageStub.MessageId) expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, { @@ -114,27 +133,22 @@ describe('event', () => { }) }) - it('should throw if message not found', async () => { + it('should throw if message not found after max attempts', async () => { const receivedMessage = { Messages: [] } snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) await expect(() => - deleteDlqMessage(messageStub.MessageId) + deleteDlqMessage(messageStub.MessageId, 0, 0) ).rejects.toThrow( - 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ' + 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ after 7 attempts' ) - }) + }, 10000) }) describe('resubmitDlqMessage', () => { - it('should resubmit message and delete old one from DLQ', async () => { - const receivedMessage = { - Messages: [messageStub] - } - snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) - + it('should resubmit message to main queue', async () => { const sendMessage = { MessageId: '12345' } @@ -145,28 +159,13 @@ describe('event', () => { QueueUrl: expect.any(String), MessageBody: messageStub.Body }) - expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, { - QueueUrl: expect.any(String), - ReceiptHandle: receiptHandle - }) }) - it('should throw if message not found', async () => { - const receivedMessage = { - Messages: [] - } - snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) - - const sendMessage = { - MessageId: '12345' - } - snsMock.on(SendMessageCommand).resolves(sendMessage) - + it('should throw if resubmit fails', async () => { + snsMock.on(SendMessageCommand).rejects('bad SQS command') await expect(() => resubmitDlqMessage(messageStub.MessageId, messageStub.Body) - ).rejects.toThrow( - 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ' - ) + ).rejects.toThrow('bad SQS command') }) }) }) diff --git a/src/routes/admin.js b/src/routes/admin.js index 03acbe1..02db215 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -4,6 +4,7 @@ import Joi from 'joi' import { createLogger } from '~/src/helpers/logging/logger.js' import { deleteDlqMessage, + getDlqMessage, receiveDlqMessages, redriveDlqMessages, resubmitDlqMessage @@ -12,11 +13,17 @@ import { const logger = createLogger() const OK_RESPONSE = 200 +const NOT_FOUND = 404 const messageIdSchema = Joi.object({ messageId: Joi.string().required() }) +const timeoutQuerySchema = Joi.object({ + visibilityTimeout: Joi.number().optional(), + waitTimeSeconds: Joi.number().optional() +}) + export default [ /** * @satisfies {ServerRoute} @@ -24,13 +31,46 @@ export default [ ({ method: 'GET', path: '/admin/deadletter/view', - async handler(_request, h) { - const messages = await receiveDlqMessages() + async handler(request, h) { + const { visibilityTimeout, waitTimeSeconds } = request.query + const messages = await receiveDlqMessages( + visibilityTimeout, + waitTimeSeconds + ) return h.response({ messages: messages.Messages ?? [] }).code(OK_RESPONSE) }, options: { auth: { scope: [`+${Scopes.DeadLetterQueues}`] + }, + validate: { + query: timeoutQuerySchema + } + } + }), + + /** + * @satisfies {ServerRoute} + */ + ({ + method: 'GET', + path: '/admin/deadletter/view/{messageId}', + async handler(request, h) { + const { visibilityTimeout, waitTimeSeconds } = request.query + const message = await getDlqMessage( + request.params.messageId, + visibilityTimeout, + waitTimeSeconds + ) + return h.response({ message }).code(message ? OK_RESPONSE : NOT_FOUND) + }, + options: { + auth: { + scope: [`+${Scopes.DeadLetterQueues}`] + }, + validate: { + params: messageIdSchema, + query: timeoutQuerySchema } } }), @@ -83,10 +123,14 @@ export default [ method: 'DELETE', path: '/admin/deadletter/{messageId}', async handler(request, h) { - const { params } = request + const { params, query } = request const { messageId } = params logger.info(`Deleting DLQ message ${messageId}`) - await deleteDlqMessage(messageId) + await deleteDlqMessage( + messageId, + query.visibilityTimeout, + query.waitTimeSeconds + ) logger.info(`Deleted DLQ message ${messageId}`) return h.response({ message: 'success' }).code(OK_RESPONSE) }, @@ -95,7 +139,8 @@ export default [ scope: [`+${Scopes.DeadLetterQueues}`] }, validate: { - params: messageIdSchema + params: messageIdSchema, + query: timeoutQuerySchema } } }) diff --git a/src/routes/admin.test.js b/src/routes/admin.test.js index 06270cf..578b8a6 100644 --- a/src/routes/admin.test.js +++ b/src/routes/admin.test.js @@ -1,6 +1,7 @@ import { createServer } from '~/src/api/server.js' import { deleteDlqMessage, + getDlqMessage, receiveDlqMessages, redriveDlqMessages, resubmitDlqMessage @@ -41,6 +42,20 @@ describe('Admin routes', () => { expect(response.headers['content-type']).toContain(jsonContentType) expect(response.result).toEqual({ messages: [{ MessageId: 'message1' }] }) }) + + test('/admin/dead-letter/view/message-id route returns 200', async () => { + jest.mocked(getDlqMessage).mockResolvedValue({ MessageId: 'message1' }) + + const response = await server.inject({ + method: 'GET', + url: '/admin/deadletter/view/message1', + auth + }) + + expect(response.statusCode).toEqual(okStatusCode) + expect(response.headers['content-type']).toContain(jsonContentType) + expect(response.result).toEqual({ message: { MessageId: 'message1' } }) + }) }) describe('POST', () => { @@ -85,7 +100,11 @@ describe('Admin routes', () => { expect(response.statusCode).toEqual(okStatusCode) expect(response.headers['content-type']).toContain(jsonContentType) expect(response.result).toEqual({ message: 'success' }) - expect(deleteDlqMessage).toHaveBeenCalledWith('message-id') + expect(deleteDlqMessage).toHaveBeenCalledWith( + 'message-id', + undefined, + undefined + ) }) }) })