From 71eaad823731adfc993f101091bad5203ad39fc3 Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Fri, 10 May 2024 15:54:30 -0700 Subject: [PATCH 1/2] add SQS consumer example Signed-off-by: Achille Roussel --- aws-sqs-consumer/main.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 aws-sqs-consumer/main.py diff --git a/aws-sqs-consumer/main.py b/aws-sqs-consumer/main.py new file mode 100644 index 0000000..cbb17ed --- /dev/null +++ b/aws-sqs-consumer/main.py @@ -0,0 +1,34 @@ +import boto3 +import dispatch + +sqs = boto3.resource('sqs') + +queue_name = 'dispatch-example-queue' +queue = sqs.get_queue_by_name(QueueName=queue_name) + + +# This worker consumes messages from the SQS queue and transfers them to +# Dispatch for processing. +@dispatch.worker +def consume_messages(): + while True: + messages = queue.receive_messages() + if not messages: + continue + batch = dispatch.batch() + for m in messages: + batch.add(handle_message, m.message_id, m.body) + batch.dispatch() + queue.delete_messages(Entries=[ + {'Id': m.message_id, 'ReceiptHandle': m.receipt_handle} for m in messages + ]) + + +# This function processes messages consumed from SQS, applying the reliability +# features of Dispatch such as retries, adaptive concurrency control, etc... +@dispatch.function +def handle_message(id: str, body: str): + print('processing message:', id, body) + + +dispatch.run() From de34816158cb837d29198ba484366872b03e7a83 Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Fri, 10 May 2024 15:56:32 -0700 Subject: [PATCH 2/2] only run if in main module Signed-off-by: Achille Roussel --- aws-sqs-consumer/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aws-sqs-consumer/main.py b/aws-sqs-consumer/main.py index cbb17ed..f47c8cc 100644 --- a/aws-sqs-consumer/main.py +++ b/aws-sqs-consumer/main.py @@ -31,4 +31,5 @@ def handle_message(id: str, body: str): print('processing message:', id, body) -dispatch.run() +if __name__ == '__main__': + dispatch.run()