Skip to content

Commit 1110e1c

Browse files
authored
add DeterministicEventLoop (#17)
What changed? added DeterministEventLoop which is core of a workflow loop derived from BaseEventLoop without any I/O support. Why? This FIFO is needed to ensure deterministic requirement of workflows How did you test it? Unit Test
1 parent 0039ea2 commit 1110e1c

File tree

2 files changed

+234
-0
lines changed

2 files changed

+234
-0
lines changed
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
from asyncio import AbstractEventLoop, Handle, futures, tasks
2+
from contextvars import Context
3+
import logging
4+
import collections
5+
import asyncio.events as events
6+
import threading
7+
from typing import Callable
8+
from typing_extensions import Unpack, TypeVarTuple
9+
10+
logger = logging.getLogger(__name__)
11+
12+
_Ts = TypeVarTuple("_Ts")
13+
14+
class DeterministicEventLoop(AbstractEventLoop):
15+
"""
16+
This is a basic FIFO implementation of event loop that does not allow I/O or timer operations.
17+
As a result, it's theoretically deterministic. This event loop is not useful directly without async events processing inside the loop.
18+
19+
Code is mostly copied from asyncio.BaseEventLoop without I/O or timer operations.
20+
"""
21+
22+
def __init__(self):
23+
self._thread_id = None # indicate if the event loop is running
24+
self._debug = False
25+
self._ready = collections.deque[events.Handle]()
26+
self._stopping = False
27+
self._closed = False
28+
29+
def call_soon(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None) -> Handle:
30+
return self._call_soon(callback, args, context)
31+
32+
def _call_soon(self, callback, args, context) -> Handle:
33+
handle = events.Handle(callback, args, self, context)
34+
self._ready.append(handle)
35+
return handle
36+
37+
def get_debug(self):
38+
return self._debug
39+
40+
def set_debug(self, enabled: bool):
41+
self._debug = enabled
42+
43+
def run_forever(self):
44+
"""Run until stop() is called."""
45+
self._run_forever_setup()
46+
try:
47+
while True:
48+
self._run_once()
49+
if self._stopping:
50+
break
51+
finally:
52+
self._run_forever_cleanup()
53+
54+
def run_until_complete(self, future):
55+
"""Run until the Future is done.
56+
57+
If the argument is a coroutine, it is wrapped in a Task.
58+
59+
WARNING: It would be disastrous to call run_until_complete()
60+
with the same coroutine twice -- it would wrap it in two
61+
different Tasks and that can't be good.
62+
63+
Return the Future's result, or raise its exception.
64+
"""
65+
self._check_closed()
66+
self._check_running()
67+
68+
new_task = not futures.isfuture(future)
69+
future = tasks.ensure_future(future, loop=self)
70+
71+
future.add_done_callback(_run_until_complete_cb)
72+
try:
73+
self.run_forever()
74+
except:
75+
if new_task and future.done() and not future.cancelled():
76+
# The coroutine raised a BaseException. Consume the exception
77+
# to not log a warning, the caller doesn't have access to the
78+
# local task.
79+
future.exception()
80+
raise
81+
finally:
82+
future.remove_done_callback(_run_until_complete_cb)
83+
if not future.done():
84+
raise RuntimeError('Event loop stopped before Future completed.')
85+
86+
return future.result()
87+
88+
def create_task(self, coro, **kwargs):
89+
"""Schedule a coroutine object.
90+
91+
Return a task object.
92+
"""
93+
self._check_closed()
94+
95+
# NOTE: eager_start is not supported for deterministic event loop
96+
if kwargs.get("eager_start", False):
97+
raise RuntimeError("eager_start in create_task is not supported for deterministic event loop")
98+
99+
return tasks.Task(coro, loop=self, **kwargs)
100+
101+
def create_future(self):
102+
return futures.Future(loop=self)
103+
104+
def _run_once(self):
105+
ntodo = len(self._ready)
106+
for i in range(ntodo):
107+
handle = self._ready.popleft()
108+
if handle._cancelled:
109+
continue
110+
handle._run()
111+
112+
def _run_forever_setup(self):
113+
self._check_closed()
114+
self._check_running()
115+
self._thread_id = threading.get_ident()
116+
events._set_running_loop(self)
117+
118+
def _run_forever_cleanup(self):
119+
self._stopping = False
120+
self._thread_id = None
121+
events._set_running_loop(None)
122+
123+
def stop(self):
124+
self._stopping = True
125+
126+
def _check_closed(self):
127+
if self._closed:
128+
raise RuntimeError('Event loop is closed')
129+
130+
def _check_running(self):
131+
if self.is_running():
132+
raise RuntimeError('This event loop is already running')
133+
if events._get_running_loop() is not None:
134+
raise RuntimeError(
135+
'Cannot run the event loop while another loop is running')
136+
137+
def is_running(self):
138+
return (self._thread_id is not None)
139+
140+
def close(self):
141+
"""Close the event loop.
142+
The event loop must not be running.
143+
"""
144+
if self.is_running():
145+
raise RuntimeError("Cannot close a running event loop")
146+
if self._closed:
147+
return
148+
if self._debug:
149+
logger.debug("Close %r", self)
150+
self._closed = True
151+
self._ready.clear()
152+
153+
def is_closed(self):
154+
"""Returns True if the event loop was closed."""
155+
return self._closed
156+
157+
def _run_until_complete_cb(fut):
158+
if not fut.cancelled():
159+
exc = fut.exception()
160+
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
161+
return
162+
fut.get_loop().stop()
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import pytest
2+
import asyncio
3+
from cadence.workflow.deterministic_event_loop import DeterministicEventLoop
4+
5+
6+
async def coro_append(results: list, i: int):
7+
results.append(i)
8+
9+
async def coro_await(size: int):
10+
results = []
11+
for i in range(size):
12+
await coro_append(results, i)
13+
return results
14+
15+
async def coro_await_future(future: asyncio.Future):
16+
return await future
17+
18+
async def coro_await_task(size: int):
19+
results = []
20+
for i in range(size):
21+
asyncio.create_task(coro_append(results, i))
22+
return results
23+
24+
class TestDeterministicEventLoop:
25+
"""Test suite for DeterministicEventLoop using table-driven tests."""
26+
27+
def setup_method(self):
28+
"""Setup method called before each test."""
29+
self.loop = DeterministicEventLoop()
30+
31+
def teardown_method(self):
32+
"""Teardown method called after each test."""
33+
if not self.loop.is_closed():
34+
self.loop.close()
35+
assert self.loop.is_closed() is True
36+
37+
def test_call_soon(self):
38+
"""Test _run_once executes single callback."""
39+
results = []
40+
expected = []
41+
for i in range(10000):
42+
expected.append(i)
43+
self.loop.call_soon(lambda x=i: results.append(x))
44+
45+
self.loop._run_once()
46+
47+
assert results == expected
48+
assert self.loop.is_running() is False
49+
50+
def test_run_until_complete(self):
51+
size = 10000
52+
results = self.loop.run_until_complete(coro_await(size))
53+
assert results == list(range(size))
54+
assert self.loop.is_running() is False
55+
assert self.loop.is_closed() is False
56+
57+
@pytest.mark.parametrize("result, exception, expected, expected_exception",
58+
[(10000, None, 10000, None), (None, ValueError("test"), None, ValueError)])
59+
def test_create_future(self, result, exception, expected, expected_exception):
60+
future = self.loop.create_future()
61+
if expected_exception is not None:
62+
with pytest.raises(expected_exception):
63+
future.set_exception(exception)
64+
self.loop.run_until_complete(coro_await_future(future))
65+
else:
66+
future.set_result(result)
67+
assert self.loop.run_until_complete(coro_await_future(future)) == expected
68+
69+
def test_create_task(self):
70+
size = 10000
71+
results = self.loop.run_until_complete(coro_await_task(size))
72+
assert results == list(range(size))

0 commit comments

Comments
 (0)