From c472694690b6a86e981ce0eb620c41b3eaabd2e1 Mon Sep 17 00:00:00 2001 From: Paul Dubs Date: Wed, 12 Mar 2025 11:54:41 +0100 Subject: [PATCH 1/2] Add support for running in environments that have a running asyncio loop Jupyter's ipykernel starts an asyncio loop in the same thread as it is executing the xircuits workflow. When using AsyncComponent in that environment directly, we would encounter an exception thrown by AsyncToSync telling us to just await the thing. A simple workaround to this is to not use ipykernel based execution by selecting the "Terminal Run" option. However, that requires the user to know about the issue and understand the workaround. This change instead allows the workflow to adapt to its environment. --- xircuits/compiler/generator.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/xircuits/compiler/generator.py b/xircuits/compiler/generator.py index 90715c82..f1b5041b 100644 --- a/xircuits/compiler/generator.py +++ b/xircuits/compiler/generator.py @@ -306,6 +306,9 @@ def _generate_main(self, flow_name): main = ast.parse(""" def main(args): import pprint + import asyncio + from asgiref.sync import sync_to_async + ctx = {} ctx['args'] = args flow = %s() @@ -325,9 +328,16 @@ def main(args): tpl = "flow.%s.value = args.%s" % (arg_name, arg_name) body.extend(ast.parse(tpl).body) - body.extend(ast.parse(""" -flow.do(ctx) -""").body) + execute = ast.parse(""" +def execute(): + try: + flow.do(ctx) + except: + import traceback + traceback.print_exc() + raise +""").body[0] + execute_body = execute.body # Print out the output values for i, port in enumerate(p for p in finish_node.ports if p.dataType == 'dynalist'): @@ -335,11 +345,21 @@ def main(args): if i > 0: port_name = "%s_%s" % (port_name, i) - body.extend(ast.parse(""" + execute_body.extend(ast.parse(""" print("%s:") pprint.pprint(flow.%s.value) """ % (port_name, port_name)).body) + body.append(execute) + body.extend(ast.parse(""" +try: + event_loop = asyncio.get_running_loop() + event_loop.create_task(sync_to_async(execute)()) +except RuntimeError: + execute() + +""").body) + return [main] def _build_node_set(self): From c5f4c875791ed183df46b291873ebdac9ce7bb47 Mon Sep 17 00:00:00 2001 From: Paul Dubs Date: Wed, 12 Mar 2025 14:05:28 +0100 Subject: [PATCH 2/2] Always run in an async context --- xircuits/compiler/generator.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/xircuits/compiler/generator.py b/xircuits/compiler/generator.py index f1b5041b..f383b375 100644 --- a/xircuits/compiler/generator.py +++ b/xircuits/compiler/generator.py @@ -329,6 +329,7 @@ def main(args): body.extend(ast.parse(tpl).body) execute = ast.parse(""" +@sync_to_async def execute(): try: flow.do(ctx) @@ -352,11 +353,16 @@ def execute(): body.append(execute) body.extend(ast.parse(""" +event_loop = None try: event_loop = asyncio.get_running_loop() - event_loop.create_task(sync_to_async(execute)()) except RuntimeError: - execute() + pass + +if event_loop: + event_loop.create_task(execute()) +else: + asyncio.run(execute()) """).body)