Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions src/messaging/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from '@aws-sdk/client-sqs'

import { config } from '~/src/config/index.js'
import { createLogger } from '~/src/helpers/logging/logger.js'
import { sqsClient } from '~/src/messaging/sqs.js'

export const receiveMessageTimeout = config.get('receiveMessageTimeout')
Expand All @@ -14,6 +15,8 @@ const deadLetterQueueArn = config.get('sqsEventsDlqArn')
const maxNumberOfMessages = config.get('maxNumberOfMessages')
const visibilityTimeout = config.get('visibilityTimeout')

const logger = createLogger()

/**
* @type {ReceiveMessageCommandInput}
*/
Expand Down Expand Up @@ -58,17 +61,41 @@ export function redriveDlqMessages() {
}

/**
* Delete DLQ message
* @param {string} receiptHandle
* @returns {Promise<DeleteMessageCommandOutput>}
* Delete DLQ message by messageId
* This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero)
* otherwise the receipt handles become stale and the delete operation doesn't work.
* @param {string} messageId
*/
export function deleteDlqMessage(receiptHandle) {
const command = new DeleteMessageCommand({
export async function deleteDlqMessage(messageId) {
const receiveCommand = new ReceiveMessageCommand({
QueueUrl: deadLetterQueueUrl,
ReceiptHandle: receiptHandle
MaxNumberOfMessages: 10,
VisibilityTimeout: 2,
WaitTimeSeconds: 0
})
const messageResponse = await sqsClient.send(receiveCommand)

return sqsClient.send(command)
const messages = messageResponse.Messages
? messageResponse.Messages.filter((m) => m.MessageId === messageId)
: undefined
if (!messages?.length) {
const errorText = `Message with id ${messageId} not found in sharepoint-listener DLQ`
logger.info(errorText)
throw new Error(errorText)
}

logger.info(
`[DLQ] Number of messages found with id ${messageId}: ${messages.length}`
)
for (const message of messages) {
const deleteCommand = new DeleteMessageCommand({
QueueUrl: deadLetterQueueUrl,
ReceiptHandle: message.ReceiptHandle
})
logger.info(`[DLQ] Deleting message with id ${messageId}`)
await sqsClient.send(deleteCommand)
logger.info(`[DLQ] Deleted message with id ${messageId}`)
}
}

/**
Expand Down
30 changes: 23 additions & 7 deletions src/messaging/event.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,36 @@ describe('event', () => {

describe('deleteDlqMessage', () => {
it('should delete event message', async () => {
/**
* @type {DeleteMessageCommandOutput}
*/
const deleteResult = {
$metadata: {}
const receivedMessage = {
Messages: [messageStub, messageStub, messageStub]
}

snsMock.on(DeleteMessageCommand).resolves(deleteResult)
await deleteDlqMessage(messageStub.ReceiptHandle)
snsMock.on(ReceiveMessageCommand).resolves(receivedMessage)
await deleteDlqMessage(messageStub.MessageId)
expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, {
QueueUrl: expect.any(String),
MaxNumberOfMessages: 10,
VisibilityTimeout: 2,
WaitTimeSeconds: 0
})
expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, {
QueueUrl: expect.any(String),
ReceiptHandle: receiptHandle
})
})

it('should throw if message not found', async () => {
const receivedMessage = {
Messages: []
}

snsMock.on(ReceiveMessageCommand).resolves(receivedMessage)
await expect(() =>
deleteDlqMessage(messageStub.MessageId)
).rejects.toThrow(
'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in sharepoint-listener DLQ'
)
})
})
})

Expand Down
18 changes: 7 additions & 11 deletions src/routes/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ const messageIdSchema = Joi.object({
messageId: Joi.string().required()
})

const receiptHandleSchema = Joi.object({
receiptHandle: Joi.string().required()
})

export default [
/**
* @satisfies {ServerRoute}
Expand Down Expand Up @@ -58,25 +54,25 @@ export default [
}),

/**
* @satisfies {ServerRoute<{ Params: { messageId: string }, Payload: { receiptHandle: string } }>}
* @satisfies {ServerRoute<{ Params: { messageId: string } }>}
*/
({
method: 'DELETE',
path: '/admin/deadletter/{messageId}',
async handler(request, h) {
const { params, payload } = request
logger.info(`Deleting DLQ message ${params.messageId}`)
await deleteDlqMessage(payload.receiptHandle)
logger.info(`Deleted DLQ message ${params.messageId}`)
const { params } = request
const { messageId } = params
logger.info(`Deleting DLQ message ${messageId}`)
await deleteDlqMessage(messageId)
logger.info(`Deleted DLQ message ${messageId}`)
return h.response({ message: 'success' }).code(OK_RESPONSE)
},
options: {
auth: {
scope: [`+${Scopes.DeadLetterQueues}`]
},
validate: {
params: messageIdSchema,
payload: receiptHandleSchema
params: messageIdSchema
}
}
})
Expand Down
7 changes: 2 additions & 5 deletions src/routes/admin.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,13 @@ describe('Admin routes', () => {
const response = await server.inject({
method: 'DELETE',
url: '/admin/deadletter/message-id',
auth,
payload: {
receiptHandle: 'receipt-handle'
}
auth
})

expect(response.statusCode).toEqual(okStatusCode)
expect(response.headers['content-type']).toContain(jsonContentType)
expect(response.result).toEqual({ message: 'success' })
expect(deleteDlqMessage).toHaveBeenCalledWith('receipt-handle')
expect(deleteDlqMessage).toHaveBeenCalledWith('message-id')
})
})
})
Expand Down
Loading