Simplify event loop handling and remove deprecated get_event_loop#340
Simplify event loop handling and remove deprecated get_event_loop#340
Conversation
📝 Walkthrough🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/fastcs/control_system.py`:
- Around line 54-55: The code currently installs SIGINT/SIGTERM handlers inside
serve(), which hijacks signal handling when serve() is called from an existing
async runtime; move the signal-handler installation out of serve() and into
run() so handlers are only installed when run() is explicitly invoked, and guard
the installation with a main-thread check (threading.current_thread() is
threading.main_thread()) and ensure the event loop supports add_signal_handler
before calling it (e.g., get the loop from asyncio.get_event_loop() and call
add_signal_handler only when permitted). Modify serve() to no longer call
add_signal_handler, and update run() to install the handlers conditionally for
SIGINT/SIGTERM using the same handler functions currently referenced in serve().
- Around line 164-167: The interactive_shell task created inside run()/wrapper
must be tracked and its exceptions handled: when creating the coroutine
(interactive_shell / asyncio.to_thread(partial(shell.mainloop, ...))) retain the
Task returned by asyncio.create_task and attach a done callback that
catches/logs exceptions and always calls stop_event.set() on error; also ensure
tasks spawned by run() are appended to a local registry so failures are visible
(e.g., add the created Task to a list in run() and on completion remove it), and
update wrapper to create the task via that helper so any raised exceptions won't
silently block await stop_event.wait().
In `@tests/transports/tango/test_dsr.py`:
- Around line 58-71: The current finally block's cleanup iterates
gc.get_objects() and closes any asyncio.AbstractEventLoop it finds, which is
unsafe; change the cleanup to only close loops created by this test/context by
recording references to event loops you create (e.g., the 'loop' variable and
any loops returned or owned by DeviceTestContext) or by snapshotting existing
loops before entering DeviceTestContext and closing only loops that appear
afterwards, and remove the gc.get_objects() iteration; ensure you still call
loop.run_until_complete(asyncio.sleep(0)), loop.close(),
asyncio.set_event_loop(None) and gc.collect() but limit any additional .close()
calls to the specific loop objects you tracked rather than all AbstractEventLoop
instances.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cc07bde6-a8ed-470a-a36d-83ba46c30250
📒 Files selected for processing (16)
src/fastcs/control_system.pysrc/fastcs/launch.pysrc/fastcs/transports/epics/ca/ioc.pysrc/fastcs/transports/epics/ca/transport.pysrc/fastcs/transports/epics/pva/transport.pysrc/fastcs/transports/graphql/transport.pysrc/fastcs/transports/rest/transport.pysrc/fastcs/transports/tango/transport.pysrc/fastcs/transports/transport.pytests/benchmarking/controller.pytests/test_control_system.pytests/transports/epics/ca/test_initial_value.pytests/transports/epics/pva/test_p4p.pytests/transports/graphQL/test_graphql.pytests/transports/rest/test_rest.pytests/transports/tango/test_dsr.py
💤 Files with no reviewable changes (1)
- tests/transports/epics/ca/test_initial_value.py
676fac5 to
e8b2558
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #340 +/- ##
==========================================
- Coverage 91.21% 91.02% -0.19%
==========================================
Files 70 70
Lines 2594 2596 +2
==========================================
- Hits 2366 2363 -3
- Misses 228 233 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
941e46b to
d9d0620
Compare
d9d0620 to
6c77e2e
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
src/fastcs/control_system.py (2)
150-181:⚠️ Potential issue | 🟠 MajorThe interactive-shell background tasks still need real supervision.
run()still drops the created task reference after the callback is attached, so shell-launched coroutines can disappear mid-flight, andawait stop_event.wait()still blocks forever ifinteractive_shell()fails before setting the event.🔧 Suggested fix
async def _interactive_shell(self, context: dict[str, Any]): """Spawn interactive shell in another thread and wait for it to complete.""" loop = asyncio.get_running_loop() + pending_tasks: set[asyncio.Task[Any]] = set() + + def _track(task: asyncio.Task[Any]) -> asyncio.Task[Any]: + pending_tasks.add(task) + task.add_done_callback(pending_tasks.discard) + return task def run(coro: Coroutine[None, None, None]): """Run coroutine on FastCS event loop from IPython thread.""" def wrapper(): - task = asyncio.create_task(coro) + task = _track(asyncio.create_task(coro)) def _log_exception(t: asyncio.Task): if not t.cancelled() and (exc := t.exception()): logger.exception("`run` task raised exception", exc_info=exc) task.add_done_callback(_log_exception) loop.call_soon_threadsafe(wrapper) @@ async def interactive_shell( context: dict[str, object], stop_event: asyncio.Event ): """Run interactive shell in a new thread.""" - shell = InteractiveShellEmbed() - await asyncio.to_thread(partial(shell.mainloop, local_ns=context)) - - stop_event.set() + try: + shell = InteractiveShellEmbed() + await asyncio.to_thread(partial(shell.mainloop, local_ns=context)) + finally: + stop_event.set() @@ context["run"] = run stop_event = asyncio.Event() - shell_task = asyncio.create_task(interactive_shell(context, stop_event)) - - await stop_event.wait() - - if not shell_task.cancelled() and (exc := shell_task.exception()): - logger.exception("Interactive shell raised exception", exc_info=exc) + shell_task = _track(asyncio.create_task(interactive_shell(context, stop_event))) + wait_task = asyncio.create_task(stop_event.wait()) + done, pending = await asyncio.wait( + {shell_task, wait_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + if shell_task in done: + await shell_task🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fastcs/control_system.py` around lines 150 - 181, run() currently creates a task inside wrapper and drops its reference, and interactive_shell may fail without setting stop_event so await stop_event.wait() can hang; fix by keeping a supervised task set (e.g., a module/local set named supervised_tasks) and have wrapper create the task, add it to supervised_tasks, add a done callback that logs exceptions and removes the task from supervised_tasks, and ensure interactive_shell always sets stop_event in a finally block (or signals failure via setting stop_event and logging the exception) so await stop_event.wait() cannot block forever; also use that supervised_tasks set to cancel/wait for remaining tasks when the shell exits (referencing run, wrapper, interactive_shell, shell_task, stop_event).
51-58:⚠️ Potential issue | 🟠 MajorGuard
add_signal_handler()for worker-thread callers.
FastCS.run()is still callable from any synchronous thread. On Unix,loop.add_signal_handler()raises outside the main thread, so this path now fails beforeserve()starts.🔧 Minimal fix
import asyncio import os import signal +import threading @@ - if os.name != "nt" and task is not None: + if ( + os.name != "nt" + and task is not None + and threading.current_thread() is threading.main_thread() + ): loop.add_signal_handler(signal.SIGINT, task.cancel) loop.add_signal_handler(signal.SIGTERM, task.cancel)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fastcs/control_system.py` around lines 51 - 58, The code in async function _serve calls loop.add_signal_handler on Unix unconditionally which raises if FastCS.run is invoked from a non-main thread; fix by additionally checking the current thread is the main thread before registering signals (e.g., import threading and wrap the add_signal_handler calls with if threading.current_thread() is threading.main_thread(): ...), keeping the existing os.name != "nt" check and leaving serve() untouched so signal handlers are only installed when running on the main thread.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/explanations/control_system.md`:
- Line 80: Fix the wording typo in the shutdown section sentence that currently
reads "When then run is stopped"; replace it with a correct phrase such as "When
the run is stopped" (or "When the run is stopped, FastCS performs an orderly
teardown:") so the sentence reads smoothly and clearly in the user-facing docs;
update the sentence in the shutdown section where the phrase "When then run is
stopped" appears.
In `@src/fastcs/control_system.py`:
- Around line 143-145: The _start_scan_tasks method currently creates Tasks from
_scan_coros but doesn't supervise them; modify _start_scan_tasks to attach a
done callback to each Task that checks task.exception(), logs the exception
using the instance logger (e.g., self._logger.error with context like "scan task
failed"), and triggers a graceful shutdown routine (call an existing shutdown
method such as self._shutdown() or equivalent) so failures are surfaced and the
system stops cleanly; mirror the supervision pattern used by _interactive_shell
for callback structure and behavior.
---
Duplicate comments:
In `@src/fastcs/control_system.py`:
- Around line 150-181: run() currently creates a task inside wrapper and drops
its reference, and interactive_shell may fail without setting stop_event so
await stop_event.wait() can hang; fix by keeping a supervised task set (e.g., a
module/local set named supervised_tasks) and have wrapper create the task, add
it to supervised_tasks, add a done callback that logs exceptions and removes the
task from supervised_tasks, and ensure interactive_shell always sets stop_event
in a finally block (or signals failure via setting stop_event and logging the
exception) so await stop_event.wait() cannot block forever; also use that
supervised_tasks set to cancel/wait for remaining tasks when the shell exits
(referencing run, wrapper, interactive_shell, shell_task, stop_event).
- Around line 51-58: The code in async function _serve calls
loop.add_signal_handler on Unix unconditionally which raises if FastCS.run is
invoked from a non-main thread; fix by additionally checking the current thread
is the main thread before registering signals (e.g., import threading and wrap
the add_signal_handler calls with if threading.current_thread() is
threading.main_thread(): ...), keeping the existing os.name != "nt" check and
leaving serve() untouched so signal handlers are only installed when running on
the main thread.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3ab79b37-e4a5-49fa-9ae8-81a7cb2cebce
📒 Files selected for processing (18)
docs/explanations/control_system.mdpyproject.tomlsrc/fastcs/control_system.pysrc/fastcs/launch.pysrc/fastcs/transports/epics/ca/ioc.pysrc/fastcs/transports/epics/ca/transport.pysrc/fastcs/transports/epics/pva/transport.pysrc/fastcs/transports/graphql/transport.pysrc/fastcs/transports/rest/transport.pysrc/fastcs/transports/tango/transport.pysrc/fastcs/transports/transport.pytests/benchmarking/controller.pytests/test_control_system.pytests/transports/epics/ca/test_initial_value.pytests/transports/epics/pva/test_p4p.pytests/transports/graphQL/test_graphql.pytests/transports/rest/test_rest.pytests/transports/tango/test_dsr.py
💤 Files with no reviewable changes (2)
- tests/transports/epics/ca/test_initial_value.py
- pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (3)
- src/fastcs/transports/graphql/transport.py
- src/fastcs/transports/epics/pva/transport.py
- src/fastcs/transports/tango/transport.py
6c77e2e to
42f07e5
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
docs/explanations/control_system.md (1)
80-80:⚠️ Potential issue | 🟡 MinorFix typo in shutdown sentence (Line 80).
The phrase “When then application is stopped” is grammatically incorrect in user-facing docs.
Suggested fix
-When then application is stopped, FastCS performs an orderly teardown: +When the application is stopped, FastCS performs an orderly teardown:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/explanations/control_system.md` at line 80, Replace the typo "When then application is stopped" in docs/explanations/control_system.md (the shutdown sentence) with the correct phrasing "When the application is stopped" so the sentence reads "When the application is stopped, FastCS performs an orderly teardown:"; update only that phrase to fix the grammar in the user-facing docs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@docs/explanations/control_system.md`:
- Line 80: Replace the typo "When then application is stopped" in
docs/explanations/control_system.md (the shutdown sentence) with the correct
phrasing "When the application is stopped" so the sentence reads "When the
application is stopped, FastCS performs an orderly teardown:"; update only that
phrase to fix the grammar in the user-facing docs.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a07623b0-f9a9-4f71-a38d-570ed7f6cf90
📒 Files selected for processing (1)
docs/explanations/control_system.md
|
This is now simpler than it was initially and we only need per-test ignores on resource warnings. |
In the process of writing documentation it became clear that
Summary:
loopparameter toFastCSand instead require clients to callservefrom an async context with the loop they want it to run inloopto transports and instead retrieve it viaget_running_looponly where required (EpicaCA and Tango)asyncio.create_taskfrom an async context consistently in tests, rather thanrun_until_completeorensure_futureSummary by CodeRabbit