diff --git a/src/messaging/event.js b/src/messaging/event.js index 80f20c2..eb2ceb9 100644 --- a/src/messaging/event.js +++ b/src/messaging/event.js @@ -5,6 +5,7 @@ import { } from '@aws-sdk/client-sqs' import { config } from '~/src/config/index.js' +import { createLogger } from '~/src/helpers/logging/logger.js' import { sqsClient } from '~/src/messaging/sqs.js' export const receiveMessageTimeout = config.get('receiveMessageTimeout') @@ -14,6 +15,8 @@ const deadLetterQueueArn = config.get('sqsEventsDlqArn') const maxNumberOfMessages = config.get('maxNumberOfMessages') const visibilityTimeout = config.get('visibilityTimeout') +const logger = createLogger() + /** * @type {ReceiveMessageCommandInput} */ @@ -58,17 +61,41 @@ export function redriveDlqMessages() { } /** - * Delete DLQ message - * @param {string} receiptHandle - * @returns {Promise} + * Delete DLQ message by messageId + * This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero) + * otherwise the receipt handles become stale and the delete operation doesn't work. + * @param {string} messageId */ -export function deleteDlqMessage(receiptHandle) { - const command = new DeleteMessageCommand({ +export async function deleteDlqMessage(messageId) { + const receiveCommand = new ReceiveMessageCommand({ QueueUrl: deadLetterQueueUrl, - ReceiptHandle: receiptHandle + MaxNumberOfMessages: 10, + VisibilityTimeout: 2, + WaitTimeSeconds: 0 }) + const messageResponse = await sqsClient.send(receiveCommand) - return sqsClient.send(command) + const messages = messageResponse.Messages + ? messageResponse.Messages.filter((m) => m.MessageId === messageId) + : undefined + if (!messages?.length) { + const errorText = `Message with id ${messageId} not found in sharepoint-listener DLQ` + logger.info(errorText) + throw new Error(errorText) + } + + logger.info( + `[DLQ] Number of messages found with id ${messageId}: ${messages.length}` + ) + for (const message of messages) { + const deleteCommand = new DeleteMessageCommand({ + QueueUrl: deadLetterQueueUrl, + ReceiptHandle: message.ReceiptHandle + }) + logger.info(`[DLQ] Deleting message with id ${messageId}`) + await sqsClient.send(deleteCommand) + logger.info(`[DLQ] Deleted message with id ${messageId}`) + } } /** diff --git a/src/messaging/event.test.js b/src/messaging/event.test.js index 0efd2b8..d84580c 100644 --- a/src/messaging/event.test.js +++ b/src/messaging/event.test.js @@ -94,20 +94,36 @@ describe('event', () => { describe('deleteDlqMessage', () => { it('should delete event message', async () => { - /** - * @type {DeleteMessageCommandOutput} - */ - const deleteResult = { - $metadata: {} + const receivedMessage = { + Messages: [messageStub, messageStub, messageStub] } - snsMock.on(DeleteMessageCommand).resolves(deleteResult) - await deleteDlqMessage(messageStub.ReceiptHandle) + snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) + await deleteDlqMessage(messageStub.MessageId) + expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, { + QueueUrl: expect.any(String), + MaxNumberOfMessages: 10, + VisibilityTimeout: 2, + WaitTimeSeconds: 0 + }) expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, { QueueUrl: expect.any(String), ReceiptHandle: receiptHandle }) }) + + it('should throw if message not found', async () => { + const receivedMessage = { + Messages: [] + } + + snsMock.on(ReceiveMessageCommand).resolves(receivedMessage) + await expect(() => + deleteDlqMessage(messageStub.MessageId) + ).rejects.toThrow( + 'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in sharepoint-listener DLQ' + ) + }) }) }) diff --git a/src/routes/admin.js b/src/routes/admin.js index ecb070b..f465d9d 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -16,10 +16,6 @@ const messageIdSchema = Joi.object({ messageId: Joi.string().required() }) -const receiptHandleSchema = Joi.object({ - receiptHandle: Joi.string().required() -}) - export default [ /** * @satisfies {ServerRoute} @@ -58,16 +54,17 @@ export default [ }), /** - * @satisfies {ServerRoute<{ Params: { messageId: string }, Payload: { receiptHandle: string } }>} + * @satisfies {ServerRoute<{ Params: { messageId: string } }>} */ ({ method: 'DELETE', path: '/admin/deadletter/{messageId}', async handler(request, h) { - const { params, payload } = request - logger.info(`Deleting DLQ message ${params.messageId}`) - await deleteDlqMessage(payload.receiptHandle) - logger.info(`Deleted DLQ message ${params.messageId}`) + const { params } = request + const { messageId } = params + logger.info(`Deleting DLQ message ${messageId}`) + await deleteDlqMessage(messageId) + logger.info(`Deleted DLQ message ${messageId}`) return h.response({ message: 'success' }).code(OK_RESPONSE) }, options: { @@ -75,8 +72,7 @@ export default [ scope: [`+${Scopes.DeadLetterQueues}`] }, validate: { - params: messageIdSchema, - payload: receiptHandleSchema + params: messageIdSchema } } }) diff --git a/src/routes/admin.test.js b/src/routes/admin.test.js index 3ec6ad3..9484a98 100644 --- a/src/routes/admin.test.js +++ b/src/routes/admin.test.js @@ -62,16 +62,13 @@ describe('Admin routes', () => { const response = await server.inject({ method: 'DELETE', url: '/admin/deadletter/message-id', - auth, - payload: { - receiptHandle: 'receipt-handle' - } + auth }) expect(response.statusCode).toEqual(okStatusCode) expect(response.headers['content-type']).toContain(jsonContentType) expect(response.result).toEqual({ message: 'success' }) - expect(deleteDlqMessage).toHaveBeenCalledWith('receipt-handle') + expect(deleteDlqMessage).toHaveBeenCalledWith('message-id') }) }) })