diff --git a/memphis/consumer.py b/memphis/consumer.py index c1731ab..be7e2f4 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -90,6 +90,38 @@ async def main(): self.dls_callback_func = callback self.t_consume = asyncio.create_task(self.__consume(callback)) + def stop_consume(self): + """ + This method stops the automatic consumption of messages. + + Example: + import asyncio + from memphis import Memphis + + async def message_handler(messages, error, context): + if error: + print(f"Error occurred: {error}") + return + + for message in messages: + print(f"Received message: {message}") + + async def main(): + memphis = Memphis() + await memphis.connect(host='localhost', username='user', password='pass') + consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + consumer.set_context({'key': 'value'}) + consumer.consume(message_handler) + + await asyncio.sleep(10) # Consume messages for 10 seconds + consumer.stopConsume() # Stop consuming messages + + asyncio.run(main()) + """ + if self.t_consume is not None: + self.t_consume.cancel() + self.t_consume = None + async def __consume(self, callback): subject = get_internal_name(self.station_name) consumer_group = get_internal_name(self.consumer_group)