Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 86 additions & 40 deletions src/messaging/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ const queueUrl = config.get('sqsEventsQueueUrl')
const deadLetterQueueUrl = `${queueUrl}-deadletter`
const deadLetterQueueArn = config.get('sqsEventsDlqArn')
const maxNumberOfMessages = config.get('maxNumberOfMessages')
const visibilityTimeout = config.get('visibilityTimeout')
const pollingVisibilityTimeout = config.get('visibilityTimeout')

const MAX_RETRIES = 7
const RETRY_WAIT_BETWEEN_TRIES_IN_SECS = 1
const DEFAULT_VISIBILITY_TIMEOUT = 3
const DEFAULT_WAIT_TIME_IN_SECS = 3

const logger = createLogger()

Expand All @@ -24,7 +29,7 @@ const logger = createLogger()
const input = {
QueueUrl: queueUrl,
MaxNumberOfMessages: maxNumberOfMessages,
VisibilityTimeout: visibilityTimeout
VisibilityTimeout: pollingVisibilityTimeout
}

/**
Expand All @@ -40,16 +45,66 @@ export function receiveEventMessages() {
* Receive dead-letter queue messages
* @returns {Promise<ReceiveMessageResult>}
*/
export function receiveDlqMessages() {
export function receiveDlqMessages(
visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT,
waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS
) {
const command = new ReceiveMessageCommand({
QueueUrl: deadLetterQueueUrl,
MaxNumberOfMessages: 10,
VisibilityTimeout: 3,
WaitTimeSeconds: 3
VisibilityTimeout: visibilityTimeout,
WaitTimeSeconds: waitTimeSeconds
})
return sqsClient.send(command)
}

/**
* Get a specific message from the dead-letter queue. Handles retries if not found.
* @param {string} messageId
* @param {number} [visibilityTimeout] - Queue visibilityTimeout
* @param {number} [waitTimeSeconds] - Queue waitTimeSeconds
* @returns {Promise< Message | null >}
*/
export async function getDlqMessage(
messageId,
visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT,
waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS
) {
let attempts = 1

while (attempts <= MAX_RETRIES) {
const messageResponse = await receiveDlqMessages(
visibilityTimeout,
waitTimeSeconds
)
const messages = messageResponse.Messages ?? []
for (const m of messages) {
logger.info(
`[DLQ] Received message with id ${m.MessageId} on attempt ${attempts}`
)
}

const messageFound = messages.find((m) => m.MessageId === messageId)
if (messageFound) {
logger.info(
`[DLQ] Found message with id ${messageId} on attempt ${attempts}`
)
return messageFound
}

logger.info(
`[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...`
)

await new Promise((resolve) =>
setTimeout(resolve, RETRY_WAIT_BETWEEN_TRIES_IN_SECS * 1000)
)

attempts++
}
return null
}

/**
* Redrive the specified message from the dead-letter queue to the main queue
* @returns {Promise<StartMessageMoveTaskResult>}
Expand All @@ -62,7 +117,7 @@ export function redriveDlqMessages() {
}

/**
* Submit the specified message to the main queue, and delete the messageId from the dead-letter queue
* Submit the specified message to the main queue
* @param {string} messageId
* @param {string} messageJson
*/
Expand All @@ -78,17 +133,12 @@ export async function resubmitDlqMessage(messageId, messageJson) {
})
const sendResult = await sqsClient.send(command)
logger.info(
`[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}. About to delete old message from DLQ`
)

await deleteDlqMessage(messageId)
logger.info(
`[DLQ] Deleted message id ${messageId} from DLQ after resubmitting new message id ${sendResult.MessageId}`
`[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}`
)
} catch (err) {
logger.error(
err,
`[DLQ] Failed to submit new message to main queue or failed to delete old message of id ${messageId} from DLQ`
`[DLQ] Failed to submit new message to main queue based on old message of id ${messageId} from DLQ`
)
throw err
}
Expand All @@ -97,39 +147,35 @@ export async function resubmitDlqMessage(messageId, messageJson) {
/**
* Delete DLQ message by messageId
* This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero)
* otherwise the receipt handles become stale and the delete operation doesn't work.
* otherwise the receipt handle becomes stale and the delete operation doesn't work.
* getDlqMessage uses retries in case the message is not always visibile when querying the DLQ.
* @param {string} messageId
* @param {number} [visibilityTimeout] - Queue visibilityTimeout
* @param {number} [waitTimeSeconds] - Queue waitTimeSeconds
*/
export async function deleteDlqMessage(messageId) {
const receiveCommand = new ReceiveMessageCommand({
QueueUrl: deadLetterQueueUrl,
MaxNumberOfMessages: 10,
VisibilityTimeout: 3,
WaitTimeSeconds: 3
})
const messageResponse = await sqsClient.send(receiveCommand)

const messages = messageResponse.Messages
? messageResponse.Messages.filter((m) => m.MessageId === messageId)
: undefined
if (!messages?.length) {
const errorText = `Message with id ${messageId} not found in notify-listener DLQ`
export async function deleteDlqMessage(
messageId,
visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT,
waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS
) {
const foundMessage = await getDlqMessage(
messageId,
visibilityTimeout,
waitTimeSeconds
)
if (!foundMessage) {
const errorText = `Message with id ${messageId} not found in notify-listener DLQ after ${MAX_RETRIES} attempts`
logger.info(errorText)
throw new Error(errorText)
}

logger.info(
`[DLQ] Number of messages found with id ${messageId}: ${messages.length}`
)
for (const message of messages) {
const deleteCommand = new DeleteMessageCommand({
QueueUrl: deadLetterQueueUrl,
ReceiptHandle: message.ReceiptHandle
})
logger.info(`[DLQ] Deleting message with id ${messageId}`)
await sqsClient.send(deleteCommand)
logger.info(`[DLQ] Deleted message with id ${messageId}`)
}
const deleteCommand = new DeleteMessageCommand({
QueueUrl: deadLetterQueueUrl,
ReceiptHandle: foundMessage.ReceiptHandle
})
logger.info(`[DLQ] Deleting message with id ${messageId}`)
await sqsClient.send(deleteCommand)
logger.info(`[DLQ] Deleted message with id ${messageId}`)
}

/**
Expand Down
55 changes: 27 additions & 28 deletions src/messaging/event.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ describe('event', () => {
Messages: [messageStub, messageStub, messageStub]
}

snsMock.on(ReceiveMessageCommand).resolves(receivedMessage)
await deleteDlqMessage(messageStub.MessageId, 5, 2)
expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, {
QueueUrl: expect.any(String),
MaxNumberOfMessages: 10,
VisibilityTimeout: 5,
WaitTimeSeconds: 2
})
expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, {
QueueUrl: expect.any(String),
ReceiptHandle: receiptHandle
})
})

it('should delete event message with default values', async () => {
const receivedMessage = {
Messages: [messageStub, messageStub, messageStub]
}

snsMock.on(ReceiveMessageCommand).resolves(receivedMessage)
await deleteDlqMessage(messageStub.MessageId)
expect(snsMock).toHaveReceivedCommandWith(ReceiveMessageCommand, {
Expand All @@ -114,27 +133,22 @@ describe('event', () => {
})
})

it('should throw if message not found', async () => {
it('should throw if message not found after max attempts', async () => {
const receivedMessage = {
Messages: []
}

snsMock.on(ReceiveMessageCommand).resolves(receivedMessage)
await expect(() =>
deleteDlqMessage(messageStub.MessageId)
deleteDlqMessage(messageStub.MessageId, 0, 0)
).rejects.toThrow(
'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ'
'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ after 7 attempts'
)
})
}, 10000)
})

describe('resubmitDlqMessage', () => {
it('should resubmit message and delete old one from DLQ', async () => {
const receivedMessage = {
Messages: [messageStub]
}
snsMock.on(ReceiveMessageCommand).resolves(receivedMessage)

it('should resubmit message to main queue', async () => {
const sendMessage = {
MessageId: '12345'
}
Expand All @@ -145,28 +159,13 @@ describe('event', () => {
QueueUrl: expect.any(String),
MessageBody: messageStub.Body
})
expect(snsMock).toHaveReceivedCommandWith(DeleteMessageCommand, {
QueueUrl: expect.any(String),
ReceiptHandle: receiptHandle
})
})

it('should throw if message not found', async () => {
const receivedMessage = {
Messages: []
}
snsMock.on(ReceiveMessageCommand).resolves(receivedMessage)

const sendMessage = {
MessageId: '12345'
}
snsMock.on(SendMessageCommand).resolves(sendMessage)

it('should throw if resubmit fails', async () => {
snsMock.on(SendMessageCommand).rejects('bad SQS command')
await expect(() =>
resubmitDlqMessage(messageStub.MessageId, messageStub.Body)
).rejects.toThrow(
'Message with id 31cb6fff-8317-412e-8488-308d099034c4 not found in notify-listener DLQ'
)
).rejects.toThrow('bad SQS command')
})
})
})
Expand Down
55 changes: 50 additions & 5 deletions src/routes/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Joi from 'joi'
import { createLogger } from '~/src/helpers/logging/logger.js'
import {
deleteDlqMessage,
getDlqMessage,
receiveDlqMessages,
redriveDlqMessages,
resubmitDlqMessage
Expand All @@ -12,25 +13,64 @@ import {
const logger = createLogger()

const OK_RESPONSE = 200
const NOT_FOUND = 404

const messageIdSchema = Joi.object({
messageId: Joi.string().required()
})

const timeoutQuerySchema = Joi.object({
visibilityTimeout: Joi.number().optional(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: non-negative numbers?

waitTimeSeconds: Joi.number().optional()
})

export default [
/**
* @satisfies {ServerRoute}
*/
({
method: 'GET',
path: '/admin/deadletter/view',
async handler(_request, h) {
const messages = await receiveDlqMessages()
async handler(request, h) {
const { visibilityTimeout, waitTimeSeconds } = request.query
const messages = await receiveDlqMessages(
visibilityTimeout,
waitTimeSeconds
)
return h.response({ messages: messages.Messages ?? [] }).code(OK_RESPONSE)
},
options: {
auth: {
scope: [`+${Scopes.DeadLetterQueues}`]
},
validate: {
query: timeoutQuerySchema
}
}
}),

/**
* @satisfies {ServerRoute}
*/
({
method: 'GET',
path: '/admin/deadletter/view/{messageId}',
async handler(request, h) {
const { visibilityTimeout, waitTimeSeconds } = request.query
const message = await getDlqMessage(
request.params.messageId,
visibilityTimeout,
waitTimeSeconds
)
return h.response({ message }).code(message ? OK_RESPONSE : NOT_FOUND)
},
options: {
auth: {
scope: [`+${Scopes.DeadLetterQueues}`]
},
validate: {
params: messageIdSchema,
query: timeoutQuerySchema
}
}
}),
Expand Down Expand Up @@ -83,10 +123,14 @@ export default [
method: 'DELETE',
path: '/admin/deadletter/{messageId}',
async handler(request, h) {
const { params } = request
const { params, query } = request
const { messageId } = params
logger.info(`Deleting DLQ message ${messageId}`)
await deleteDlqMessage(messageId)
await deleteDlqMessage(
messageId,
query.visibilityTimeout,
query.waitTimeSeconds
)
logger.info(`Deleted DLQ message ${messageId}`)
return h.response({ message: 'success' }).code(OK_RESPONSE)
},
Expand All @@ -95,7 +139,8 @@ export default [
scope: [`+${Scopes.DeadLetterQueues}`]
},
validate: {
params: messageIdSchema
params: messageIdSchema,
query: timeoutQuerySchema
}
}
})
Expand Down
Loading
Loading