Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/fastcs/control_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ def __init__(
):
self._controller = controller
self._transports = transports
self._loop = loop or asyncio.get_event_loop()

if loop is not None:
self._loop = loop
self._close_loop_on_stop = False
else:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._close_loop_on_stop = True
Comment on lines +39 to +45
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Inspecting loop/task creation sites:"
rg -n -C2 'ensure_future|create_task|run_until_complete|new_event_loop|set_event_loop' src/fastcs/control_system.py

echo
echo "Reproducing asyncio loop binding behavior:"
python - <<'PY'
import asyncio

provided = asyncio.new_event_loop()
current = asyncio.new_event_loop()
asyncio.set_event_loop(current)

async def noop():
    return None

future = asyncio.ensure_future(noop())
print("future loop is current:", future.get_loop() is current)
print("future loop is provided:", future.get_loop() is provided)

try:
    provided.run_until_complete(future)
except Exception as exc:
    print(type(exc).__name__, exc)
finally:
    future.cancel()
    current.close()
    provided.close()
    asyncio.set_event_loop(None)
PY

Repository: DiamondLightSource/fastcs

Length of output: 1612


Bind serve() to self._loop explicitly.

When an external loop is passed, it is not set as the thread-local current event loop. However, run() still creates serve via asyncio.ensure_future(), which implicitly binds the task to the current thread-local loop. If they differ, self._loop.run_until_complete(serve) raises ValueError: The future belongs to a different loop than the one specified as the loop argument. Use self._loop.create_task() instead.

Proposed fix
     def run(self, interactive: bool = True):
-        serve = asyncio.ensure_future(self.serve(interactive=interactive))
+        serve = self._loop.create_task(self.serve(interactive=interactive))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/fastcs/control_system.py` around lines 39 - 45, The scheduled serve
coroutine must be bound to self._loop instead of the thread-local loop: in the
run() implementation where you currently call asyncio.ensure_future(serve) (or
otherwise schedule the serve coro), replace that with
self._loop.create_task(serve) so the Task is created on self._loop (avoiding the
"future belongs to a different loop" ValueError) and do not rely on setting the
thread-local loop; update any related code that expects asyncio.ensure_future to
use self._loop.create_task for scheduling serve and similar tasks.

Comment on lines +43 to +45
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Inspecting lifecycle sites:"
rg -n -C2 'new_event_loop|set_event_loop|get_event_loop|close\(' src/fastcs/control_system.py src/fastcs/launch.py

echo
echo "Checking what happens after closing a loop that is still current:"
python - <<'PY'
import asyncio

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.close()

try:
    current = asyncio.get_event_loop()
    print("get_event_loop returned closed loop:", current.is_closed())
except Exception as exc:
    print(type(exc).__name__, exc)
finally:
    try:
        asyncio.set_event_loop(None)
    except Exception:
        pass
PY

Repository: DiamondLightSource/fastcs

Length of output: 1808


🌐 Web query:

asyncio.get_event_loop() behavior after loop.close() thread-local reference

💡 Result:

asyncio keeps the “current event loop” in policy-managed, per-context storage (per-thread by default). Closing a loop does not clear/unset that stored reference—loop.close() is “irreversible” and says no other loop methods should be called after it’s closed, but it doesn’t say it unsets the loop from the thread context. [1]

So, if you do this in a thread:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)   # bind loop to this thread
loop.close()

loop2 = asyncio.get_event_loop()   # <- often returns the same (now closed) loop

…then get_event_loop() will typically hand you back the same loop object that was previously set for that thread, even though it’s now closed (and using it will then raise RuntimeError: Event loop is closed from loop methods). The docs describe get_event_loop() as returning the running loop if any, otherwise delegating to the event-loop policy’s get_event_loop(), and set_event_loop() as setting the loop for the current OS thread. [2]

What to do instead

  • If you’re done with a loop and want a fresh one in the same thread, explicitly rebind:

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    (or, clear the thread’s current loop and then set a new one).

  • Be aware of version changes: in modern Python, asyncio.get_event_loop() has been tightened/deprecated in steps; e.g. Python 3.14 changes it to raise RuntimeError when there is no current event loop. [2][3]

Sources: Python event-loop docs (loop close semantics; get/set loop behavior) [1][2], Python policy docs (per-thread-by-default context; default policy behavior changes) [3], CPython issue discussing the deprecation/behavior changes around get_event_loop() [4].


🌐 Web query:

Python asyncio thread-local event loop cleanup best practices restore previous loop

💡 Result:

Best practice (recommended): don’t touch thread-local loops—use asyncio.run() / asyncio.Runner

If you’re starting asyncio work in a non-async context (including inside a worker thread), prefer asyncio.run(coro) or (Py 3.11+) asyncio.Runner, which creates the loop, runs the coroutine, finalizes async generators, shuts down the default threadpool/executor, and closes the loop for you. (docs.python.org)

# in any thread
def worker():
    return asyncio.run(main())

Or (Py 3.11+), if you need to run multiple coroutines on the same managed loop:

def worker():
    with asyncio.Runner() as r:
        r.run(coro1())
        r.run(coro2())

(docs.python.org)


When you must temporarily set_event_loop() in a thread: save/restore + hard cleanup

Use a try/finally, restore the previous loop object (if any), and explicitly shut down/close what you created.

Key points:

  • In threads, don’t rely on implicit loop creation; create your own via asyncio.new_event_loop() and (only if needed by code that calls get_event_loop()) set it as current for that thread. (docs.python.org)
  • asyncio.set_event_loop(None) is not “reset to default behavior”; it disables default loop creation for that thread, so prefer restoring the previous loop object when you have one. (bugs.python.org)
  • Closing a loop releases resources (including the default executor). (peps.python.org)
import asyncio

def run_in_thread_with_temp_loop(coro):
    try:
        prev = asyncio.get_event_loop()
    except RuntimeError:
        prev = None  # no loop was set for this thread

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        result = loop.run_until_complete(coro)

        # best-effort cleanup for anything the coro started
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.run_until_complete(loop.shutdown_default_executor())  # 3.9+
        return result
    finally:
        asyncio.set_event_loop(prev)   # restore previous loop (preferred)
        loop.close()                   # release loop resources

If there was no previous loop and you want to “clear” the thread-local current loop, you can asyncio.set_event_loop(None)—just be aware of the semantic note above. (bugs.python.org)

Citations:


Save and restore the previous event loop before closing.

The code calls asyncio.set_event_loop(self._loop) but __del__ only closes it without restoring the thread-local reference. This leaves the thread pointing at a closed loop, causing asyncio.get_event_loop() calls in the same thread to return a closed loop and raise RuntimeError, as seen in src/fastcs/launch.py:161-165.

Proposed fix
     def __init__(
         self,
         controller: Controller,
         transports: Sequence[Transport],
         loop: asyncio.AbstractEventLoop | None = None,
     ):
         self._controller = controller
         self._transports = transports
+        self._previous_loop: asyncio.AbstractEventLoop | None = None

         if loop is not None:
             self._loop = loop
             self._close_loop_on_stop = False
         else:
+            try:
+                self._previous_loop = asyncio.get_event_loop_policy().get_event_loop()
+            except RuntimeError:
+                self._previous_loop = None
             self._loop = asyncio.new_event_loop()
             asyncio.set_event_loop(self._loop)
             self._close_loop_on_stop = True
@@
     def __del__(self):
         self._stop_scan_tasks()
         if self._close_loop_on_stop and not self._loop.is_closed():
+            try:
+                if asyncio.get_event_loop_policy().get_event_loop() is self._loop:
+                    asyncio.set_event_loop(self._previous_loop)
+            except RuntimeError:
+                pass
             self._loop.close()

Also applies to: lines 166-169

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/fastcs/control_system.py` around lines 43 - 45, When you create and set a
new event loop in ControlSystem (the block that does asyncio.new_event_loop()
and asyncio.set_event_loop(self._loop)), save the previous thread-local loop
(e.g., prev_loop = asyncio.get_event_loop() or
get_event_loop_policy().get_event_loop()) on the instance, and then when tearing
down (in __del__ or the stop/close method that currently closes self._loop)
restore the saved loop via asyncio.set_event_loop(self._previous_loop) before
closing; ensure you clear the saved reference and only close the loop if
self._close_loop_on_stop is True. Apply the same save/restore pattern to the
other creation site referenced (lines that mirror this behavior around the
second new_event_loop() usage) so the thread-local event loop is never left
pointing at a closed loop.


self._scan_coros: list[ScanCallback] = []
self._initial_coros: list[ScanCallback] = []
Expand Down Expand Up @@ -158,3 +165,5 @@ async def interactive_shell(

def __del__(self):
self._stop_scan_tasks()
if self._close_loop_on_stop and not self._loop.is_closed():
self._loop.close()
Loading