|
13 | 13 | from dataclasses import dataclass, field |
14 | 14 | from typing import TYPE_CHECKING, Any, Generic, Literal, TypeGuard, cast, get_args, get_origin, overload |
15 | 15 |
|
16 | | -from anyio import CancelScope, create_memory_object_stream, create_task_group |
| 16 | +from anyio import BrokenResourceError, CancelScope, create_memory_object_stream, create_task_group |
17 | 17 | from anyio.abc import TaskGroup |
18 | 18 | from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream |
19 | 19 | from typing_extensions import TypeVar, assert_never |
@@ -748,12 +748,15 @@ async def _run_tracked_task(self, t_: GraphTask): |
748 | 748 | with CancelScope() as scope: |
749 | 749 | self.cancel_scopes[t_.task_id] = scope |
750 | 750 | result = await self._run_task(t_) |
751 | | - if isinstance(result, _GraphTaskAsyncIterable): |
752 | | - async for new_tasks in result.iterable: |
753 | | - await self.iter_stream_sender.send(_GraphTaskResult(t_, new_tasks, False)) |
754 | | - await self.iter_stream_sender.send(_GraphTaskResult(t_, [])) |
755 | | - else: |
756 | | - await self.iter_stream_sender.send(_GraphTaskResult(t_, result)) |
| 751 | + try: |
| 752 | + if isinstance(result, _GraphTaskAsyncIterable): |
| 753 | + async for new_tasks in result.iterable: |
| 754 | + await self.iter_stream_sender.send(_GraphTaskResult(t_, new_tasks, False)) |
| 755 | + await self.iter_stream_sender.send(_GraphTaskResult(t_, [])) |
| 756 | + else: |
| 757 | + await self.iter_stream_sender.send(_GraphTaskResult(t_, result)) |
| 758 | + except BrokenResourceError: |
| 759 | + pass # pragma: no cover # This can happen in difficult-to-reproduce circumstances when cancelling an asyncio task |
757 | 760 |
|
758 | 761 | async def _run_task( |
759 | 762 | self, |
|
0 commit comments