Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions cadence/workflow/deterministic_event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from asyncio import AbstractEventLoop, Handle, futures, tasks
from contextvars import Context
import logging
import collections
import asyncio.events as events
import threading
from typing import Callable
from typing_extensions import Unpack, TypeVarTuple

logger = logging.getLogger(__name__)

_Ts = TypeVarTuple("_Ts")

class DeterministicEventLoop(AbstractEventLoop):
"""
This is a basic FIFO implementation of event loop that does not allow I/O or timer operations.
As a result, it's theoretically deterministic. This event loop is not useful directly without async events processing inside the loop.

Code is mostly copied from asyncio.BaseEventLoop without I/O or timer operations.
"""

def __init__(self):
self._thread_id = None # indicate if the event loop is running
self._debug = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it seems like we don't have a place to enable debug mode

Copy link
Member Author

@shijiesheng shijiesheng Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually added set_debug function

self._ready = collections.deque[events.Handle]()
self._stopping = False
self._closed = False

def call_soon(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None) -> Handle:
return self._call_soon(callback, args, context)

def _call_soon(self, callback, args, context) -> Handle:
handle = events.Handle(callback, args, self, context)
self._ready.append(handle)
return handle

def get_debug(self):
return self._debug

def set_debug(self, enabled: bool):
self._debug = enabled

def run_forever(self):
"""Run until stop() is called."""
self._run_forever_setup()
try:
while True:
self._run_once()
if self._stopping:
break
finally:
self._run_forever_cleanup()

def run_until_complete(self, future):
"""Run until the Future is done.

If the argument is a coroutine, it is wrapped in a Task.

WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.

Return the Future's result, or raise its exception.
"""
self._check_closed()
self._check_running()

new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)

future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')

return future.result()

def create_task(self, coro, **kwargs):
"""Schedule a coroutine object.

Return a task object.
"""
self._check_closed()

# NOTE: eager_start is not supported for deterministic event loop
if kwargs.get("eager_start", False):
raise RuntimeError("eager_start in create_task is not supported for deterministic event loop")

return tasks.Task(coro, loop=self, **kwargs)

def create_future(self):
return futures.Future(loop=self)

def _run_once(self):
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
handle._run()

def _run_forever_setup(self):
self._check_closed()
self._check_running()
self._thread_id = threading.get_ident()
events._set_running_loop(self)

def _run_forever_cleanup(self):
self._stopping = False
self._thread_id = None
events._set_running_loop(None)

def stop(self):
self._stopping = True

def _check_closed(self):
if self._closed:
raise RuntimeError('Event loop is closed')

def _check_running(self):
if self.is_running():
raise RuntimeError('This event loop is already running')
if events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')

def is_running(self):
return (self._thread_id is not None)

def close(self):
"""Close the event loop.
The event loop must not be running.
"""
if self.is_running():
raise RuntimeError("Cannot close a running event loop")
if self._closed:
return
if self._debug:
logger.debug("Close %r", self)
self._closed = True
self._ready.clear()

def is_closed(self):
"""Returns True if the event loop was closed."""
return self._closed

def _run_until_complete_cb(fut):
if not fut.cancelled():
exc = fut.exception()
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
return
fut.get_loop().stop()
72 changes: 72 additions & 0 deletions tests/cadence/workflow/test_deterministic_event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pytest
import asyncio
from cadence.workflow.deterministic_event_loop import DeterministicEventLoop


async def coro_append(results: list, i: int):
results.append(i)

async def coro_await(size: int):
results = []
for i in range(size):
await coro_append(results, i)
return results

async def coro_await_future(future: asyncio.Future):
return await future

async def coro_await_task(size: int):
results = []
for i in range(size):
asyncio.create_task(coro_append(results, i))
return results

class TestDeterministicEventLoop:
"""Test suite for DeterministicEventLoop using table-driven tests."""

def setup_method(self):
"""Setup method called before each test."""
self.loop = DeterministicEventLoop()

def teardown_method(self):
"""Teardown method called after each test."""
if not self.loop.is_closed():
self.loop.close()
assert self.loop.is_closed() is True

def test_call_soon(self):
"""Test _run_once executes single callback."""
results = []
expected = []
for i in range(10000):
expected.append(i)
self.loop.call_soon(lambda x=i: results.append(x))

self.loop._run_once()

assert results == expected
assert self.loop.is_running() is False

def test_run_until_complete(self):
size = 10000
results = self.loop.run_until_complete(coro_await(size))
assert results == list(range(size))
assert self.loop.is_running() is False
assert self.loop.is_closed() is False

@pytest.mark.parametrize("result, exception, expected, expected_exception",
[(10000, None, 10000, None), (None, ValueError("test"), None, ValueError)])
def test_create_future(self, result, exception, expected, expected_exception):
future = self.loop.create_future()
if expected_exception is not None:
with pytest.raises(expected_exception):
future.set_exception(exception)
self.loop.run_until_complete(coro_await_future(future))
else:
future.set_result(result)
assert self.loop.run_until_complete(coro_await_future(future)) == expected

def test_create_task(self):
size = 10000
results = self.loop.run_until_complete(coro_await_task(size))
assert results == list(range(size))