From ec55528e36c7c651ad6dc53df2a8688a0dee8329 Mon Sep 17 00:00:00 2001 From: Oded Valtzer Date: Sat, 3 Jun 2023 19:41:15 +0300 Subject: [PATCH 1/3] add stop logic --- sqs_listener/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sqs_listener/__init__.py b/sqs_listener/__init__.py index 3fce7d8..02d29c9 100644 --- a/sqs_listener/__init__.py +++ b/sqs_listener/__init__.py @@ -66,6 +66,7 @@ def __init__(self, queue, **kwargs): self._wait_time = kwargs.get('wait_time', 0) self._max_number_of_messages = kwargs.get('max_number_of_messages', 1) self._deserializer = kwargs.get("deserializer", json.loads) + self.should_run = False # must come last if boto3_session: @@ -138,8 +139,9 @@ def _initialize_client(self): return sqs def _start_listening(self): + self.should_run = True # TODO consider incorporating output processing from here: https://github.com/debrouwere/sqs-antenna/blob/master/antenna/__init__.py - while True: + while self.should_run: # calling with WaitTimeSecconds of zero show the same behavior as # not specifiying a wait time, ie: short polling messages = self._client.receive_message( @@ -205,7 +207,8 @@ def listen(self): sqs_logger.info("Using error queue " + self._error_queue_name) self._start_listening() - + def stop(self): + self.should_run = False def _prepare_logger(self): logger = logging.getLogger('eg_daemon') logger.setLevel(logging.INFO) From f5413cac614f750b25f59fb969e899e2f2a2029b Mon Sep 17 00:00:00 2001 From: Oded Valtzer Date: Sun, 4 Jun 2023 00:31:28 +0300 Subject: [PATCH 2/3] add stop logic --- sqs_listener/__init__.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sqs_listener/__init__.py b/sqs_listener/__init__.py index 02d29c9..eb56aa8 100644 --- a/sqs_listener/__init__.py +++ b/sqs_listener/__init__.py @@ -66,7 +66,7 @@ def __init__(self, queue, **kwargs): self._wait_time = kwargs.get('wait_time', 0) self._max_number_of_messages = kwargs.get('max_number_of_messages', 1) self._deserializer = kwargs.get("deserializer", json.loads) - self.should_run = False + self._should_listen = False # must come last if boto3_session: @@ -139,9 +139,8 @@ def _initialize_client(self): return sqs def _start_listening(self): - self.should_run = True # TODO consider incorporating output processing from here: https://github.com/debrouwere/sqs-antenna/blob/master/antenna/__init__.py - while self.should_run: + while self._should_listen: # calling with WaitTimeSecconds of zero show the same behavior as # not specifiying a wait time, ie: short polling messages = self._client.receive_message( @@ -156,6 +155,8 @@ def _start_listening(self): sqs_logger.debug(messages) sqs_logger.info("{} messages received".format(len(messages['Messages']))) for m in messages['Messages']: + if not self._should_listen: + break receipt_handle = m['ReceiptHandle'] m_body = m['Body'] message_attribs = None @@ -200,15 +201,16 @@ def _start_listening(self): else: time.sleep(self._poll_interval) - + sqs_logger.info("client is not in listening state, stopping sqs listener") def listen(self): + self._should_listen = True sqs_logger.info("Listening to queue " + self._queue_name) if self._error_queue_name: sqs_logger.info("Using error queue " + self._error_queue_name) self._start_listening() def stop(self): - self.should_run = False + self._should_listen = False def _prepare_logger(self): logger = logging.getLogger('eg_daemon') logger.setLevel(logging.INFO) From 78232a812d9a8cd7832bfe30c239ce3efe380ac8 Mon Sep 17 00:00:00 2001 From: Oded Valtzer Date: Sun, 4 Jun 2023 16:33:40 +0300 Subject: [PATCH 3/3] add stop logic --- sqs_listener/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sqs_listener/__init__.py b/sqs_listener/__init__.py index eb56aa8..fedae7c 100644 --- a/sqs_listener/__init__.py +++ b/sqs_listener/__init__.py @@ -200,8 +200,11 @@ def _start_listening(self): ) else: + if not self._should_listen: + break time.sleep(self._poll_interval) sqs_logger.info("client is not in listening state, stopping sqs listener") + def listen(self): self._should_listen = True sqs_logger.info("Listening to queue " + self._queue_name) @@ -209,8 +212,10 @@ def listen(self): sqs_logger.info("Using error queue " + self._error_queue_name) self._start_listening() + def stop(self): self._should_listen = False + def _prepare_logger(self): logger = logging.getLogger('eg_daemon') logger.setLevel(logging.INFO)