From 36a64131754a5e66e66730e99cb8814cc554b6b3 Mon Sep 17 00:00:00 2001 From: Loren Arthur Date: Thu, 25 Jul 2019 13:36:05 -0700 Subject: [PATCH 1/6] Add test to assert coroutines run in the context of a task. --- tornado/test/gen_test.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py index 659e22605e..fb5f1b0d8c 100644 --- a/tornado/test/gen_test.py +++ b/tornado/test/gen_test.py @@ -636,6 +636,22 @@ def f(): self.assertEqual(ret, [1, 1]) self.finished = True + def test_coroutine_context(self): + @gen.coroutine + def f(): + current_task = getattr(asyncio, 'current_task', None) + if current_task is None: + current_task = getattr(asyncio.Task, 'current_task', None) + task = current_task() + assert task + _id = id(task) + yield gen.moment + task = current_task() + assert task + assert _id == id(task) + self.io_loop.run_sync(f, timeout=3) + self.finished = True + class GenCoroutineSequenceHandler(RequestHandler): @gen.coroutine From 5a762229f6c577cfefe59218f3db59acb1f0470e Mon Sep 17 00:00:00 2001 From: Loren Arthur Date: Thu, 25 Jul 2019 11:38:20 -0700 Subject: [PATCH 2/6] Update Runner to run coroutine in the context of a task. --- tornado/gen.py | 163 ++++++++++++++++++------------------------------- 1 file changed, 59 insertions(+), 104 deletions(-) diff --git a/tornado/gen.py b/tornado/gen.py index 7cc7ec7857..e9b630a173 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -200,31 +200,17 @@ def wrapper(*args, **kwargs): future = None # type: ignore else: if isinstance(result, Generator): - # Inline the first iteration of Runner.run. This lets us - # avoid the cost of creating a Runner when the coroutine - # never actually yields, which in turn allows us to - # use "optional" coroutines in critical path code without - # performance penalty for the synchronous case. - try: - yielded = next(result) - except (StopIteration, Return) as e: - future_set_result_unless_cancelled( - future, _value_from_stopiteration(e) - ) - except Exception: - future_set_exc_info(future, sys.exc_info()) - else: - # Provide strong references to Runner objects as long - # as their result future objects also have strong - # references (typically from the parent coroutine's - # Runner). This keeps the coroutine's Runner alive. - # We do this by exploiting the public API - # add_done_callback() instead of putting a private - # attribute on the Future. - # (Github issues #1769, #2229). - runner = Runner(result, future, yielded) - future.add_done_callback(lambda _: runner) - yielded = None + # Provide strong references to Runner objects as long + # as their result future objects also have strong + # references (typically from the parent coroutine's + # Runner). This keeps the coroutine's Runner alive. + # We do this by exploiting the public API + # add_done_callback() instead of putting a private + # attribute on the Future. + # (Github issues #1769, #2229). + runner = Runner(result, future) + future.add_done_callback(lambda _: runner) + try: return future finally: @@ -701,7 +687,6 @@ def __init__( self, gen: "Generator[_Yieldable, Any, _T]", result_future: "Future[_T]", - first_yielded: _Yieldable, ) -> None: self.gen = gen self.result_future = result_future @@ -709,63 +694,59 @@ def __init__( self.running = False self.finished = False self.io_loop = IOLoop.current() - if self.handle_yield(first_yielded): - gen = result_future = first_yielded = None # type: ignore - self.run() + asyncio.ensure_future(self.run()) + + async def run(self) -> None: + "Runs the generator to completion in the context of a task" + while True: + future = self.future + if future is None: + raise Exception("No pending future") + if not self.future.done(): + _step = asyncio.Event() + def step(*args): + _step.set() + self.io_loop.add_future(self.future, step) + await _step.wait() + self.future = None + try: + exc_info = None - def run(self) -> None: - """Starts or resumes the generator, running until it reaches a - yield point that is not ready. - """ - if self.running or self.finished: - return - try: - self.running = True - while True: - future = self.future - if future is None: - raise Exception("No pending future") - if not future.done(): - return - self.future = None try: - exc_info = None + value = future.result() + except Exception: + exc_info = sys.exc_info() + future = None + if exc_info is not None: try: - value = future.result() - except Exception: - exc_info = sys.exc_info() - future = None - - if exc_info is not None: - try: - yielded = self.gen.throw(*exc_info) # type: ignore - finally: - # Break up a reference to itself - # for faster GC on CPython. - exc_info = None - else: - yielded = self.gen.send(value) + yielded = self.gen.throw(*exc_info) # type: ignore + finally: + # Break up a reference to itself + # for faster GC on CPython. + exc_info = None + else: + yielded = self.gen.send(value) - except (StopIteration, Return) as e: - self.finished = True - self.future = _null_future - future_set_result_unless_cancelled( - self.result_future, _value_from_stopiteration(e) - ) - self.result_future = None # type: ignore - return - except Exception: - self.finished = True - self.future = _null_future - future_set_exc_info(self.result_future, sys.exc_info()) - self.result_future = None # type: ignore - return - if not self.handle_yield(yielded): - return - yielded = None - finally: - self.running = False + except (StopIteration, Return) as e: + self.finished = True + self.future = _null_future + future_set_result_unless_cancelled( + self.result_future, _value_from_stopiteration(e) + ) + self.result_future = None # type: ignore + return + except Exception: + self.finished = True + self.future = _null_future + future_set_exc_info(self.result_future, sys.exc_info()) + self.result_future = None # type: ignore + return + + self.handle_yield(yielded) + yielded = None + if self.future is moment: + await sleep(0) def handle_yield(self, yielded: _Yieldable) -> bool: try: @@ -774,32 +755,6 @@ def handle_yield(self, yielded: _Yieldable) -> bool: self.future = Future() future_set_exc_info(self.future, sys.exc_info()) - if self.future is moment: - self.io_loop.add_callback(self.run) - return False - elif self.future is None: - raise Exception("no pending future") - elif not self.future.done(): - - def inner(f: Any) -> None: - # Break a reference cycle to speed GC. - f = None # noqa: F841 - self.run() - - self.io_loop.add_future(self.future, inner) - return False - return True - - def handle_exception( - self, typ: Type[Exception], value: Exception, tb: types.TracebackType - ) -> bool: - if not self.running and not self.finished: - self.future = Future() - future_set_exc_info(self.future, (typ, value, tb)) - self.run() - return True - else: - return False # Convert Awaitables into Futures. From 88da3a2c6acb2a978df727d9bd02a995b68a3403 Mon Sep 17 00:00:00 2001 From: Loren Arthur Date: Wed, 31 Jul 2019 09:14:02 -0700 Subject: [PATCH 3/6] Add ExpectLog to infinite_coro test. --- tornado/test/gen_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py index fb5f1b0d8c..028c9327fc 100644 --- a/tornado/test/gen_test.py +++ b/tornado/test/gen_test.py @@ -1011,8 +1011,9 @@ def do_something(): yield gen.sleep(0.2) loop.run_sync(do_something) - loop.close() - gc.collect() + with ExpectLog("asyncio", "Task was destroyed but it is pending"): + loop.close() + gc.collect() # Future was collected self.assertIs(wfut[0](), None) # At least one wakeup From d652bb0354d0b573d157a01084b2a174bece1fc5 Mon Sep 17 00:00:00 2001 From: Loren Arthur Date: Thu, 1 Aug 2019 20:34:47 -0700 Subject: [PATCH 4/6] Add callbacks to correctly cancel task. --- tornado/gen.py | 10 ++++++++-- tornado/ioloop.py | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tornado/gen.py b/tornado/gen.py index e9b630a173..4235dc9ba8 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -209,7 +209,7 @@ def wrapper(*args, **kwargs): # attribute on the Future. # (Github issues #1769, #2229). runner = Runner(result, future) - future.add_done_callback(lambda _: runner) + future.add_done_callback(runner.finish) try: return future @@ -694,7 +694,7 @@ def __init__( self.running = False self.finished = False self.io_loop = IOLoop.current() - asyncio.ensure_future(self.run()) + self.task = asyncio.ensure_future(self.run()) async def run(self) -> None: "Runs the generator to completion in the context of a task" @@ -755,6 +755,12 @@ def handle_yield(self, yielded: _Yieldable) -> bool: self.future = Future() future_set_exc_info(self.future, sys.exc_info()) + def finish(self, future): + if future.cancelled(): + self.task.cancel() + self.future.cancel() + self.task = None + # Convert Awaitables into Futures. diff --git a/tornado/ioloop.py b/tornado/ioloop.py index a0598727a4..8799519404 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -506,7 +506,8 @@ def run() -> None: future_cell[0] = fut fut.set_result(result) assert future_cell[0] is not None - self.add_future(future_cell[0], lambda future: self.stop()) + self.add_future(future_cell[0], + lambda f: f.add_done_callback(lambda _: self.stop())) self.add_callback(run) if timeout is not None: From 18f1bdaea566826d3cdb2a23dda7ce390f783984 Mon Sep 17 00:00:00 2001 From: Loren Arthur Date: Thu, 1 Aug 2019 20:59:29 -0700 Subject: [PATCH 5/6] Autoformatting --- tornado/gen.py | 8 +++----- tornado/ioloop.py | 5 +++-- tornado/test/gen_test.py | 5 +++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tornado/gen.py b/tornado/gen.py index 4235dc9ba8..8ca7ba045c 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -76,7 +76,6 @@ def get(self): from functools import singledispatch from inspect import isawaitable import sys -import types from tornado.concurrent import ( Future, @@ -684,9 +683,7 @@ class Runner(object): """ def __init__( - self, - gen: "Generator[_Yieldable, Any, _T]", - result_future: "Future[_T]", + self, gen: "Generator[_Yieldable, Any, _T]", result_future: "Future[_T]" ) -> None: self.gen = gen self.result_future = result_future @@ -704,8 +701,10 @@ async def run(self) -> None: raise Exception("No pending future") if not self.future.done(): _step = asyncio.Event() + def step(*args): _step.set() + self.io_loop.add_future(self.future, step) await _step.wait() self.future = None @@ -762,7 +761,6 @@ def finish(self, future): self.task = None - # Convert Awaitables into Futures. try: _wrap_awaitable = asyncio.ensure_future diff --git a/tornado/ioloop.py b/tornado/ioloop.py index 8799519404..216bbff23d 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -506,8 +506,9 @@ def run() -> None: future_cell[0] = fut fut.set_result(result) assert future_cell[0] is not None - self.add_future(future_cell[0], - lambda f: f.add_done_callback(lambda _: self.stop())) + self.add_future( + future_cell[0], lambda f: f.add_done_callback(lambda _: self.stop()) + ) self.add_callback(run) if timeout is not None: diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py index 028c9327fc..37bad9fb53 100644 --- a/tornado/test/gen_test.py +++ b/tornado/test/gen_test.py @@ -639,9 +639,9 @@ def f(): def test_coroutine_context(self): @gen.coroutine def f(): - current_task = getattr(asyncio, 'current_task', None) + current_task = getattr(asyncio, "current_task", None) if current_task is None: - current_task = getattr(asyncio.Task, 'current_task', None) + current_task = getattr(asyncio.Task, "current_task", None) task = current_task() assert task _id = id(task) @@ -649,6 +649,7 @@ def f(): task = current_task() assert task assert _id == id(task) + self.io_loop.run_sync(f, timeout=3) self.finished = True From 8345ecd4d105a615079de5a918f0e47b1c73dd76 Mon Sep 17 00:00:00 2001 From: Loren Arthur Date: Thu, 1 Aug 2019 23:09:11 -0700 Subject: [PATCH 6/6] Update type annotations. --- tornado/gen.py | 13 ++++++------- tornado/ioloop.py | 8 +++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/tornado/gen.py b/tornado/gen.py index 8ca7ba045c..aa32a2fcd4 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -699,13 +699,13 @@ async def run(self) -> None: future = self.future if future is None: raise Exception("No pending future") - if not self.future.done(): + if not self.future.done(): # type: ignore _step = asyncio.Event() - def step(*args): + def step(f: "Future[_T]") -> None: _step.set() - self.io_loop.add_future(self.future, step) + self.io_loop.add_future(self.future, step) # type: ignore await _step.wait() self.future = None try: @@ -747,18 +747,17 @@ def step(*args): if self.future is moment: await sleep(0) - def handle_yield(self, yielded: _Yieldable) -> bool: + def handle_yield(self, yielded: _Yieldable) -> None: try: self.future = convert_yielded(yielded) except BadYieldError: self.future = Future() future_set_exc_info(self.future, sys.exc_info()) - def finish(self, future): + def finish(self, future: "Future[_T]") -> None: if future.cancelled(): self.task.cancel() - self.future.cancel() - self.task = None + self.future.cancel() # type: ignore # Convert Awaitables into Futures. diff --git a/tornado/ioloop.py b/tornado/ioloop.py index 216bbff23d..6cb2367ab2 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -506,9 +506,11 @@ def run() -> None: future_cell[0] = fut fut.set_result(result) assert future_cell[0] is not None - self.add_future( - future_cell[0], lambda f: f.add_done_callback(lambda _: self.stop()) - ) + + def _stop(f: "Future[_T]") -> None: + f.add_done_callback(lambda _: self.stop()) + + self.add_future(future_cell[0], _stop) self.add_callback(run) if timeout is not None: