From 9e605ef6326f9786eca511e441e9e26ae75c6e01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Janek=20Nouvertn=C3=A9?= <25355197+provinzkraut@users.noreply.github.com> Date: Sun, 17 Aug 2025 20:16:08 +0200 Subject: [PATCH 1/2] first draft --- pytest_asyncio/plugin.py | 57 ++++++++++++++++++++- tests/async_fixtures/test_async_fixtures.py | 11 ++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/pytest_asyncio/plugin.py b/pytest_asyncio/plugin.py index ec52ee4c..430aa35c 100644 --- a/pytest_asyncio/plugin.py +++ b/pytest_asyncio/plugin.py @@ -16,6 +16,7 @@ from collections.abc import ( AsyncIterator, Awaitable, + Coroutine as CoroutineT, Generator, Iterable, Iterator, @@ -276,6 +277,39 @@ def _fixture_synchronizer( AsyncGenFixtureYieldType = TypeVar("AsyncGenFixtureYieldType") +async def _fixture_runner( + coro_queue: asyncio.Queue[tuple[Awaitable[Any], asyncio.Future[Any]] | None], +) -> None: + while True: + item = await coro_queue.get() + if item is None: + break + coro, future = item + try: + retval = await coro + future.set_result(retval) + except Exception as exc: + future.set_exception(exc) + + +def _create_task_in_context( + coro: CoroutineT[Any, Any, Any], + loop: AbstractEventLoop, + context: contextvars.Context, +) -> asyncio.Task[Any]: + if sys.version_info >= (3, 11): + return loop.create_task(coro, context=context) + + from backports.asyncio.runner._patch import _patch_object + from backports.asyncio.runner.tasks import Task + + with ( + _patch_object(asyncio.tasks, asyncio.tasks.Task.__name__, Task), + _patch_object(contextvars, contextvars.copy_context.__name__, lambda: context), + ): + return loop.create_task(coro) + + def _wrap_asyncgen_fixture( fixture_function: Callable[ AsyncGenFixtureParams, AsyncGeneratorType[AsyncGenFixtureYieldType, Any] @@ -294,11 +328,29 @@ async def setup(): res = await gen_obj.__anext__() # type: ignore[union-attr] return res + async def call_in_runner_task(func): + coro = func() + future = runner.get_loop().create_future() + coro_queue.put_nowait((coro, future)) + return await future + context = contextvars.copy_context() - result = runner.run(setup(), context=context) + coro_queue: asyncio.Queue[tuple[Awaitable[Any], asyncio.Future[Any]] | None] = ( + asyncio.Queue() + ) + + runner_task = _create_task_in_context( + _fixture_runner(coro_queue), loop=runner.get_loop(), context=context + ) + + result = runner.get_loop().run_until_complete(call_in_runner_task(setup)) reset_contextvars = _apply_contextvar_changes(context) + async def stop_runner_task(): + coro_queue.put_nowait(None) + await runner_task + def finalizer() -> None: """Yield again, to finalize.""" @@ -312,7 +364,8 @@ async def async_finalizer() -> None: msg += "Yield only once." raise ValueError(msg) - runner.run(async_finalizer(), context=context) + runner.get_loop().run_until_complete(call_in_runner_task(async_finalizer)) + runner.get_loop().run_until_complete(stop_runner_task()) if reset_contextvars is not None: reset_contextvars() diff --git a/tests/async_fixtures/test_async_fixtures.py b/tests/async_fixtures/test_async_fixtures.py index 16478539..b8ccf190 100644 --- a/tests/async_fixtures/test_async_fixtures.py +++ b/tests/async_fixtures/test_async_fixtures.py @@ -37,3 +37,14 @@ async def async_fixture_method(self): @pytest.mark.asyncio async def test_async_fixture_method(self): assert self.is_same_instance + + +@pytest.fixture() +async def setup_and_teardown_tasks(): + task = asyncio.current_task() + yield + assert task is asyncio.current_task() + + +async def test_setup_and_teardown_tasks(setup_and_teardown_tasks): + pass From 7581198be14b02508810e7cb122f00e71e7e39e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Janek=20Nouvertn=C3=A9?= <25355197+provinzkraut@users.noreply.github.com> Date: Sun, 17 Aug 2025 20:28:35 +0200 Subject: [PATCH 2/2] some refactorings --- pytest_asyncio/plugin.py | 85 +++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/pytest_asyncio/plugin.py b/pytest_asyncio/plugin.py index 430aa35c..819ceca9 100644 --- a/pytest_asyncio/plugin.py +++ b/pytest_asyncio/plugin.py @@ -277,21 +277,6 @@ def _fixture_synchronizer( AsyncGenFixtureYieldType = TypeVar("AsyncGenFixtureYieldType") -async def _fixture_runner( - coro_queue: asyncio.Queue[tuple[Awaitable[Any], asyncio.Future[Any]] | None], -) -> None: - while True: - item = await coro_queue.get() - if item is None: - break - coro, future = item - try: - retval = await coro - future.set_result(retval) - except Exception as exc: - future.set_exception(exc) - - def _create_task_in_context( coro: CoroutineT[Any, Any, Any], loop: AbstractEventLoop, @@ -310,6 +295,51 @@ def _create_task_in_context( return loop.create_task(coro) +class _FixtureRunner: + def __init__(self, loop: AbstractEventLoop, context: contextvars.Context) -> None: + self.loop = loop + self.queue: asyncio.Queue[tuple[Awaitable[Any], asyncio.Future[Any]] | None] = ( + asyncio.Queue() + ) + self._context = context + self._task = None + + async def _worker(self) -> None: + while True: + item = await self.queue.get() + if item is None: + break + coro, future = item + try: + retval = await coro + future.set_result(retval) + except Exception as exc: + future.set_exception(exc) + + def run(self, func): + return self.loop.run_until_complete(self._run(func)) + + async def _run(self, func): + if self._task is None: + self._task = _create_task_in_context( + self._worker(), loop=self.loop, context=self._context + ) + + coro = func() + future = self.loop.create_future() + self.queue.put_nowait((coro, future)) + return await future + + async def _stop(self): + self.queue.put_nowait(None) + if self._task is not None: + await self._task + self._task = None + + def stop(self) -> None: + self.loop.run_until_complete(self._stop()) + + def _wrap_asyncgen_fixture( fixture_function: Callable[ AsyncGenFixtureParams, AsyncGeneratorType[AsyncGenFixtureYieldType, Any] @@ -328,29 +358,12 @@ async def setup(): res = await gen_obj.__anext__() # type: ignore[union-attr] return res - async def call_in_runner_task(func): - coro = func() - future = runner.get_loop().create_future() - coro_queue.put_nowait((coro, future)) - return await future - context = contextvars.copy_context() - coro_queue: asyncio.Queue[tuple[Awaitable[Any], asyncio.Future[Any]] | None] = ( - asyncio.Queue() - ) - - runner_task = _create_task_in_context( - _fixture_runner(coro_queue), loop=runner.get_loop(), context=context - ) - - result = runner.get_loop().run_until_complete(call_in_runner_task(setup)) + fixture_runner = _FixtureRunner(loop=runner.get_loop(), context=context) + result = fixture_runner.run(setup) reset_contextvars = _apply_contextvar_changes(context) - async def stop_runner_task(): - coro_queue.put_nowait(None) - await runner_task - def finalizer() -> None: """Yield again, to finalize.""" @@ -364,8 +377,8 @@ async def async_finalizer() -> None: msg += "Yield only once." raise ValueError(msg) - runner.get_loop().run_until_complete(call_in_runner_task(async_finalizer)) - runner.get_loop().run_until_complete(stop_runner_task()) + fixture_runner.run(async_finalizer) + fixture_runner.stop() if reset_contextvars is not None: reset_contextvars()