From 9eae72d68f755adc7c421ca40335a0e7eb136a16 Mon Sep 17 00:00:00 2001 From: adarsh-jaiss Date: Thu, 3 Aug 2023 12:22:47 +0530 Subject: [PATCH 1/2] stop consume functionality --- memphis/consumer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/memphis/consumer.py b/memphis/consumer.py index c1731ab..83d051c 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -89,6 +89,11 @@ async def main(): """ self.dls_callback_func = callback self.t_consume = asyncio.create_task(self.__consume(callback)) + + def stopConsume(self): + if self.t_consume is not None: + self.t_consume.cancel() + self.t_consume = None async def __consume(self, callback): subject = get_internal_name(self.station_name) From 5786a1fd3758fb14eb76e3313d53935596df368e Mon Sep 17 00:00:00 2001 From: adarsh-jaiss Date: Fri, 4 Aug 2023 13:38:16 +0530 Subject: [PATCH 2/2] Fixed the lint issues and used snake_case in the function --- memphis/consumer.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 83d051c..be7e2f4 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -89,8 +89,35 @@ async def main(): """ self.dls_callback_func = callback self.t_consume = asyncio.create_task(self.__consume(callback)) - - def stopConsume(self): + + def stop_consume(self): + """ + This method stops the automatic consumption of messages. + + Example: + import asyncio + from memphis import Memphis + + async def message_handler(messages, error, context): + if error: + print(f"Error occurred: {error}") + return + + for message in messages: + print(f"Received message: {message}") + + async def main(): + memphis = Memphis() + await memphis.connect(host='localhost', username='user', password='pass') + consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + consumer.set_context({'key': 'value'}) + consumer.consume(message_handler) + + await asyncio.sleep(10) # Consume messages for 10 seconds + consumer.stopConsume() # Stop consuming messages + + asyncio.run(main()) + """ if self.t_consume is not None: self.t_consume.cancel() self.t_consume = None