From d5b5e46a33eee1d6b2d10e697245e16e72705499 Mon Sep 17 00:00:00 2001 From: rnawfal Date: Mon, 5 Feb 2024 16:19:49 +0000 Subject: [PATCH] move PANG message with topics after await result --- packages/python-runner/runner.py | 33 ++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/packages/python-runner/runner.py b/packages/python-runner/runner.py index 2d0e1e56f..b1046df56 100644 --- a/packages/python-runner/runner.py +++ b/packages/python-runner/runner.py @@ -207,26 +207,11 @@ async def run_instance(self, config, args): asyncio.create_task(self.connect_input_stream(input_stream)) self.logger.info('Running instance...') - result = self.sequence.run(context, input_stream, *args) + result = self.sequence.run(context, input_stream, *args) self.logger.info(f'Sending PANG') monitoring = self.streams[CC.MONITORING] - # Sending Topics producer information to STH - produces_runtime = getattr(result, 'provides', None) - - if produces_runtime == None: - produces = getattr(self.sequence, 'provides', None) - else: - produces = {} - produces['provides'] = produces_runtime - produces['contentType'] = getattr(result, 'content_type', None) - produces_json = json.dumps(produces) - - if produces: - self.logger.info(f'Sending PANG with {produces}') - send_encoded_msg(monitoring, msg_codes.PANG, produces) - # Sending Topics consumer information to STH consumes_runtime = getattr(result, 'requires', None) @@ -246,6 +231,22 @@ async def run_instance(self, config, args): result = Stream.read_from(result) elif asyncio.iscoroutine(result): result = await result + + # Sending Topics producer information to STH + produces_runtime = getattr(result, 'provides', None) + + if produces_runtime == None: + produces = getattr(self.sequence, 'provides', None) + else: + produces = {} + produces['provides'] = produces_runtime + produces['contentType'] = getattr(result, 'content_type', None) + + if produces: + self.logger.info(f'Sending PANG with {produces}') + send_encoded_msg(monitoring, msg_codes.PANG, produces) + + if result: await self.forward_output_stream(result) else: