From ff8167aec8132de4e475759aff77fee1a0680bc8 Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Thu, 6 Nov 2025 19:32:57 +0300 Subject: [PATCH 1/9] feat: use one listener queue per notifier --- src/taskiq_cancellation/__init__.py | 1 - src/taskiq_cancellation/abc/backend.py | 35 +++++++++- src/taskiq_cancellation/abc/notifier.py | 6 ++ .../integrations/aiopika/notifier.py | 27 ++++---- .../integrations/queue_notifier.py | 64 +++++++++++++++++++ .../integrations/redis/backend.py | 10 +-- .../integrations/redis/notifier.py | 27 ++++---- .../integrations/redis/state_holder.py | 6 +- src/taskiq_cancellation/modular.py | 8 ++- 9 files changed, 142 insertions(+), 42 deletions(-) create mode 100644 src/taskiq_cancellation/integrations/queue_notifier.py diff --git a/src/taskiq_cancellation/__init__.py b/src/taskiq_cancellation/__init__.py index 4ad9e80..896ff45 100644 --- a/src/taskiq_cancellation/__init__.py +++ b/src/taskiq_cancellation/__init__.py @@ -1,4 +1,3 @@ -from .abc import * from .modular import ModularCancellationBackend diff --git a/src/taskiq_cancellation/abc/backend.py b/src/taskiq_cancellation/abc/backend.py index 4f5ff15..9afc419 100644 --- a/src/taskiq_cancellation/abc/backend.py +++ b/src/taskiq_cancellation/abc/backend.py @@ -1,11 +1,10 @@ import abc import asyncio -import traceback from typing import Callable, Annotated, ParamSpec, TypeVar, Awaitable import anyio from anyio.abc import TaskStatus -from taskiq import Context, TaskiqDepends +from taskiq import Context, TaskiqDepends, AsyncBroker, TaskiqEvents, TaskiqState from ..utils import combines from ..exceptions import TaskCancellationException @@ -16,6 +15,11 @@ class CancellationBackend(abc.ABC): + def __init__(self) -> None: + super().__init__() + + self.broker: AsyncBroker | None = None + @abc.abstractmethod async def is_cancelled(self, task_id: str) -> bool: pass @@ -29,6 +33,27 @@ async def listen_for_cancellation( self, task_id: str, started_listening_task_status: TaskStatus ) -> None: pass + + async def startup(self) -> None: + pass + + async def shutdown(self) -> None: + pass + + def with_broker(self, broker: AsyncBroker) -> "CancellationBackend": + if self.broker is not None: + self.broker.event_handlers[TaskiqEvents.CLIENT_STARTUP].remove(self._broker_startup_handler) + self.broker.event_handlers[TaskiqEvents.WORKER_STARTUP].remove(self._broker_startup_handler) + self.broker.event_handlers[TaskiqEvents.CLIENT_SHUTDOWN].remove(self._broker_shutdown_handler) + self.broker.event_handlers[TaskiqEvents.WORKER_SHUTDOWN].remove(self._broker_shutdown_handler) + + self.broker = broker + self.broker.add_event_handler(TaskiqEvents.CLIENT_STARTUP, self._broker_startup_handler) + self.broker.add_event_handler(TaskiqEvents.WORKER_STARTUP, self._broker_startup_handler) + self.broker.add_event_handler(TaskiqEvents.CLIENT_SHUTDOWN, self._broker_shutdown_handler) + self.broker.add_event_handler(TaskiqEvents.WORKER_SHUTDOWN, self._broker_shutdown_handler) + + return self def cancellable(self, task: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: # Executor type depends on receiver configuration which we @@ -96,3 +121,9 @@ async def call_task(): else: return result return wrapper + + async def _broker_startup_handler(self, state: TaskiqState) -> None: + await self.startup() + + async def _broker_shutdown_handler(self, state: TaskiqState) -> None: + await self.shutdown() \ No newline at end of file diff --git a/src/taskiq_cancellation/abc/notifier.py b/src/taskiq_cancellation/abc/notifier.py index 0faaa91..2512fb5 100644 --- a/src/taskiq_cancellation/abc/notifier.py +++ b/src/taskiq_cancellation/abc/notifier.py @@ -8,6 +8,12 @@ class CancellationNotifier(abc.ABC): def __init__(self, serializer: TaskiqSerializer = JSONSerializer()): self.serializer = serializer + + async def startup(self) -> None: + pass + + async def shutdown(self) -> None: + pass @abc.abstractmethod async def cancel(self, task_id: str) -> None: diff --git a/src/taskiq_cancellation/integrations/aiopika/notifier.py b/src/taskiq_cancellation/integrations/aiopika/notifier.py index b79ad78..d085bb9 100644 --- a/src/taskiq_cancellation/integrations/aiopika/notifier.py +++ b/src/taskiq_cancellation/integrations/aiopika/notifier.py @@ -1,21 +1,19 @@ import time +import asyncio import aio_pika -from anyio.abc import TaskStatus -from taskiq.abc.serializer import TaskiqSerializer -from taskiq.serializers import JSONSerializer from taskiq.compat import model_dump, model_validate -from taskiq_cancellation.abc import CancellationNotifier -from taskiq_cancellation.exceptions import TaskCancellationException from taskiq_cancellation.message import CancellationMessage +from ..queue_notifier import QueueCancellationNotifier -class AioPikaNotifier(CancellationNotifier): + +class AioPikaNotifier(QueueCancellationNotifier): EXCHANGE_NAME = "__taskiq_cancellation" - def __init__(self, url: str, serializer: TaskiqSerializer = JSONSerializer()): - super().__init__(serializer) + def __init__(self, url: str, **kwargs): + super().__init__(**kwargs) self.url: str = url @@ -37,9 +35,7 @@ async def cancel(self, task_id: str) -> None: routing_key="" ) - async def listen_for_cancellation( - self, task_id: str, started_listening_task_status: TaskStatus - ) -> None: + async def _listen(self, started_listening: asyncio.Event): connection = await aio_pika.connect_robust( self.url ) @@ -48,10 +44,11 @@ async def listen_for_cancellation( exchange = await channel.declare_exchange( self.EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT, durable=True ) - queue = await channel.declare_queue(exclusive=True) + queue = await channel.declare_queue(exclusive=True, auto_delete=True) await queue.bind(exchange) - started_listening_task_status.started() + loop = asyncio.get_running_loop() + loop.call_soon_threadsafe(started_listening.set) async with queue.iterator() as queue_iter: async for message in queue_iter: @@ -60,6 +57,6 @@ async def listen_for_cancellation( self.serializer.loadb(message.body) ) - if cancellation_message.task_id == task_id: - raise TaskCancellationException() + for queue in self.queues: + await queue.put(cancellation_message) await message.ack() diff --git a/src/taskiq_cancellation/integrations/queue_notifier.py b/src/taskiq_cancellation/integrations/queue_notifier.py new file mode 100644 index 0000000..b24ea17 --- /dev/null +++ b/src/taskiq_cancellation/integrations/queue_notifier.py @@ -0,0 +1,64 @@ +import abc +import weakref +import asyncio + +from anyio.abc import TaskStatus + +from taskiq_cancellation.abc import CancellationNotifier +from taskiq_cancellation.exceptions import TaskCancellationException +from taskiq_cancellation.message import CancellationMessage + + +class QueueCancellationNotifier(CancellationNotifier): + CHANNEL_NAME = "__taskiq_cancellation_notifications" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + + self.listener_task: asyncio.Task | None = None + self.queues: weakref.WeakSet[asyncio.Queue[CancellationMessage]] = weakref.WeakSet() + + async def shutdown(self) -> None: + if self.listener_task is not None: + self.listener_task.cancel() + + async def listen_for_cancellation( + self, task_id: str, started_listening_task_status: TaskStatus + ) -> None: + cancellations: asyncio.Queue[CancellationMessage] = asyncio.Queue() + + if self.listener_task is None: + started_listening = asyncio.Event() + self.listener_task = asyncio.create_task( + self._listen(started_listening) + ) + await started_listening.wait() + + await self._subscribe(cancellations) + started_listening_task_status.started() + + while True: + cancellation_message = await cancellations.get() + + if cancellation_message.task_id == task_id: + raise TaskCancellationException() + + @abc.abstractmethod + async def _listen(self, started_listening: asyncio.Event): + pass + + async def _create_listener_task(self): + if self.listener_task is not None: + self.listener_task.cancel() + + started_listening = asyncio.Event() + self.listener_task = asyncio.create_task( + self._listen(started_listening) + ) + await started_listening.wait() + + async def _subscribe(self, queue: asyncio.Queue[CancellationMessage]): + self.queues.add(queue) + + async def _unsubsribe(self, queue: asyncio.Queue[CancellationMessage]): + self.queues.remove(queue) \ No newline at end of file diff --git a/src/taskiq_cancellation/integrations/redis/backend.py b/src/taskiq_cancellation/integrations/redis/backend.py index ad038d1..90b6447 100644 --- a/src/taskiq_cancellation/integrations/redis/backend.py +++ b/src/taskiq_cancellation/integrations/redis/backend.py @@ -1,5 +1,3 @@ -from typing import Type - from taskiq_cancellation.modular import ModularCancellationBackend from .notifier import PubSubCancellationNotifier @@ -10,11 +8,9 @@ class RedisCancellationBackend(ModularCancellationBackend): def __init__( self, url: str, - state_holder: Type = RedisCancellationStateHolder, - notifier: Type = PubSubCancellationNotifier, - **connection_kwargs + **kwargs ) -> None: super().__init__( - state_holder(url, **connection_kwargs), - PubSubCancellationNotifier(url, **connection_kwargs) + RedisCancellationStateHolder(url, **kwargs), + PubSubCancellationNotifier(url, **kwargs) ) diff --git a/src/taskiq_cancellation/integrations/redis/notifier.py b/src/taskiq_cancellation/integrations/redis/notifier.py index 5ed24fb..bc70047 100644 --- a/src/taskiq_cancellation/integrations/redis/notifier.py +++ b/src/taskiq_cancellation/integrations/redis/notifier.py @@ -1,20 +1,21 @@ import time +import asyncio -from anyio.abc import TaskStatus import redis.asyncio as redis from taskiq.compat import model_dump, model_validate -from taskiq_cancellation.abc import CancellationNotifier -from taskiq_cancellation.exceptions import TaskCancellationException from taskiq_cancellation.message import CancellationMessage +from ..queue_notifier import QueueCancellationNotifier -class PubSubCancellationNotifier(CancellationNotifier): + +class PubSubCancellationNotifier(QueueCancellationNotifier): CHANNEL_NAME = "__taskiq_cancellation_notifications" - def __init__(self, url: str, **connection_kwargs) -> None: - super().__init__() - self.connection_pool = redis.BlockingConnectionPool.from_url(url, **connection_kwargs) + def __init__(self, url: str, **kwargs) -> None: + super().__init__(**kwargs) + + self.connection_pool = redis.BlockingConnectionPool.from_url(url, **kwargs) async def cancel(self, task_id: str) -> None: timestamp = time.time() @@ -27,14 +28,14 @@ async def cancel(self, task_id: str) -> None: ) ) - async def listen_for_cancellation( - self, task_id: str, started_listening_task_status: TaskStatus - ) -> None: + async def _listen(self, started_listening: asyncio.Event): async with redis.Redis(connection_pool=self.connection_pool) as conn: pubsub = conn.pubsub() await pubsub.subscribe(self.CHANNEL_NAME) - started_listening_task_status.started() + # started_listening.set() + loop = asyncio.get_running_loop() + loop.call_soon_threadsafe(started_listening.set) while True: message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=None) @@ -48,5 +49,5 @@ async def listen_for_cancellation( CancellationMessage, self.serializer.loadb(message['data']) ) - if cancellation_message.task_id == task_id: - raise TaskCancellationException() + for queue in self.queues: + await queue.put(cancellation_message) diff --git a/src/taskiq_cancellation/integrations/redis/state_holder.py b/src/taskiq_cancellation/integrations/redis/state_holder.py index 4ca397c..262f121 100644 --- a/src/taskiq_cancellation/integrations/redis/state_holder.py +++ b/src/taskiq_cancellation/integrations/redis/state_holder.py @@ -4,9 +4,9 @@ class RedisCancellationStateHolder(CancellationStateHolder): - def __init__(self, url: str, **connection_kwargs) -> None: - super().__init__() - self.connection_pool = redis.BlockingConnectionPool.from_url(url, **connection_kwargs) + def __init__(self, url: str, **kwargs) -> None: + super().__init__(**kwargs) + self.connection_pool = redis.BlockingConnectionPool.from_url(url, **kwargs) async def cancel(self, task_id: str) -> None: async with redis.Redis(connection_pool=self.connection_pool) as conn: diff --git a/src/taskiq_cancellation/modular.py b/src/taskiq_cancellation/modular.py index bead650..54af77a 100644 --- a/src/taskiq_cancellation/modular.py +++ b/src/taskiq_cancellation/modular.py @@ -1,4 +1,8 @@ -from .abc import * +from .abc import ( + CancellationBackend, + CancellationNotifier, + CancellationStateHolder +) import anyio from anyio.abc import TaskStatus @@ -10,6 +14,8 @@ def __init__( state_holder: CancellationStateHolder, notifier: CancellationNotifier ): + super().__init__() + self.notifier: CancellationNotifier = notifier self.state_holder: CancellationStateHolder = state_holder From bb09c6927a1ae3797ca981a9e5c9c5d435080b86 Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Thu, 6 Nov 2025 22:24:06 +0300 Subject: [PATCH 2/9] docs,chore: basic docs and ruff format --- examples/counter/main.py | 5 +- src/taskiq_cancellation/__init__.py | 3 +- src/taskiq_cancellation/abc/__init__.py | 6 +- src/taskiq_cancellation/abc/backend.py | 136 ++++++++++++++---- src/taskiq_cancellation/abc/notifier.py | 25 +++- src/taskiq_cancellation/abc/state_holder.py | 16 +++ .../integrations/aiopika/__init__.py | 4 +- .../integrations/aiopika/notifier.py | 27 ++-- .../integrations/queue_notifier.py | 37 +++-- .../integrations/redis/__init__.py | 2 +- .../integrations/redis/backend.py | 8 +- .../integrations/redis/notifier.py | 15 +- src/taskiq_cancellation/modular.py | 23 +-- src/taskiq_cancellation/utils.py | 32 +++-- 14 files changed, 232 insertions(+), 107 deletions(-) diff --git a/examples/counter/main.py b/examples/counter/main.py index 5facc63..a148aae 100644 --- a/examples/counter/main.py +++ b/examples/counter/main.py @@ -6,7 +6,7 @@ url = "redis://localhost" broker = PubSubBroker(url).with_result_backend(RedisAsyncResultBackend(url)) -cancellation_backend = RedisCancellationBackend(url) +cancellation_backend = RedisCancellationBackend(url).with_broker(broker) @broker.task @@ -20,11 +20,14 @@ async def count(up_to: int): async def main(): await broker.startup() + print("Sending task and waiting 5 seconds...") task = await count.kiq(5) await asyncio.sleep(5) + print("Sending task and waiting 2.5 seconds...") task = await count.kiq(5) await asyncio.sleep(2.5) + print("Canceling task...") await cancellation_backend.cancel(task.task_id) await broker.shutdown() diff --git a/src/taskiq_cancellation/__init__.py b/src/taskiq_cancellation/__init__.py index 896ff45..f61572f 100644 --- a/src/taskiq_cancellation/__init__.py +++ b/src/taskiq_cancellation/__init__.py @@ -1,7 +1,8 @@ +from .abc import CancellationBackend from .modular import ModularCancellationBackend __all__ = [ + "CancellationBackend", "ModularCancellationBackend" ] - diff --git a/src/taskiq_cancellation/abc/__init__.py b/src/taskiq_cancellation/abc/__init__.py index cc8b52c..1decfa6 100644 --- a/src/taskiq_cancellation/abc/__init__.py +++ b/src/taskiq_cancellation/abc/__init__.py @@ -3,8 +3,4 @@ from .state_holder import CancellationStateHolder -__all__ = [ - "CancellationBackend", - "CancellationNotifier", - "CancellationStateHolder" -] +__all__ = ["CancellationBackend", "CancellationNotifier", "CancellationStateHolder"] diff --git a/src/taskiq_cancellation/abc/backend.py b/src/taskiq_cancellation/abc/backend.py index 9afc419..6323b07 100644 --- a/src/taskiq_cancellation/abc/backend.py +++ b/src/taskiq_cancellation/abc/backend.py @@ -1,6 +1,6 @@ import abc import asyncio -from typing import Callable, Annotated, ParamSpec, TypeVar, Awaitable +from typing import Callable, Annotated, ParamSpec, TypeVar, Awaitable, Self import anyio from anyio.abc import TaskStatus @@ -15,68 +15,145 @@ class CancellationBackend(abc.ABC): + """ + Base class for cancellation backend + """ def __init__(self) -> None: super().__init__() self.broker: AsyncBroker | None = None - @abc.abstractmethod + @abc.abstractmethod async def is_cancelled(self, task_id: str) -> bool: + """ + Checks if a task with task id of *task_id* is set to be cancelled + + :param task_id: task id to check + :type task_id: str + :returns: task cancellation state + :rtype: bool + """ pass @abc.abstractmethod async def cancel(self, task_id: str) -> None: + """ + Cancels a task with task id of *task_id* + + :param task_id: id of the task to cancel + :type task_id: str + """ pass @abc.abstractmethod async def listen_for_cancellation( self, task_id: str, started_listening_task_status: TaskStatus ) -> None: + """ + Listens for cancellation messages and raises :ref:`TaskCancellationException` when + receives :ref:`CancellationMessage` with same id as *task_id*. + + This function is used in :func:`cancellable` decorator. + Call `started_listening_task_status.started()` when the listener is ready + to receive messages. + + :param task_id: id of task that will be listened for + :type task_id: str + :param started_listening_task_status: + :type started_listening_task_status: anyio.abc.TaskStatus + """ pass async def startup(self) -> None: + """ + Starts up cancellation backend + + Triggered only if backend has a broker set. To set a broker use :func:`with_broker`. + """ pass async def shutdown(self) -> None: + """Shuts down cancellation backend + + Triggered only if backend has a broker set. To set a broker use :ref:`with_broker`. + """ pass - def with_broker(self, broker: AsyncBroker) -> "CancellationBackend": + def with_broker(self, broker: AsyncBroker) -> Self: + """ + Set a broker and return updated cancellation backend + + Sets up startup and shutdown event handlers for backend's startup + and shutdown methods respectfully + + :param broker: new broker + :type broker: AsyncBroker + :returns: self + """ if self.broker is not None: - self.broker.event_handlers[TaskiqEvents.CLIENT_STARTUP].remove(self._broker_startup_handler) - self.broker.event_handlers[TaskiqEvents.WORKER_STARTUP].remove(self._broker_startup_handler) - self.broker.event_handlers[TaskiqEvents.CLIENT_SHUTDOWN].remove(self._broker_shutdown_handler) - self.broker.event_handlers[TaskiqEvents.WORKER_SHUTDOWN].remove(self._broker_shutdown_handler) - + self.broker.event_handlers[TaskiqEvents.CLIENT_STARTUP].remove( + self._broker_startup_handler + ) + self.broker.event_handlers[TaskiqEvents.WORKER_STARTUP].remove( + self._broker_startup_handler + ) + self.broker.event_handlers[TaskiqEvents.CLIENT_SHUTDOWN].remove( + self._broker_shutdown_handler + ) + self.broker.event_handlers[TaskiqEvents.WORKER_SHUTDOWN].remove( + self._broker_shutdown_handler + ) + self.broker = broker - self.broker.add_event_handler(TaskiqEvents.CLIENT_STARTUP, self._broker_startup_handler) - self.broker.add_event_handler(TaskiqEvents.WORKER_STARTUP, self._broker_startup_handler) - self.broker.add_event_handler(TaskiqEvents.CLIENT_SHUTDOWN, self._broker_shutdown_handler) - self.broker.add_event_handler(TaskiqEvents.WORKER_SHUTDOWN, self._broker_shutdown_handler) + self.broker.add_event_handler( + TaskiqEvents.CLIENT_STARTUP, self._broker_startup_handler + ) + self.broker.add_event_handler( + TaskiqEvents.WORKER_STARTUP, self._broker_startup_handler + ) + self.broker.add_event_handler( + TaskiqEvents.CLIENT_SHUTDOWN, self._broker_shutdown_handler + ) + self.broker.add_event_handler( + TaskiqEvents.WORKER_SHUTDOWN, self._broker_shutdown_handler + ) return self - + def cancellable(self, task: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: - # Executor type depends on receiver configuration which we - # can't access in any way + """ + Decorator that makes funcion cancellable + + This decorator makes a new function that creates two tasks in :ref:`anyio.TaskGroup`: + 1. Cancellation message listener (uses :ref:`listen_for_cancellation`) + 2. Wrapped function + + - Returns function's result/exception if it finishes successfully/unsuccessfully + - Raises :ref:`TaskCancellationException` if listener task receives cancellation message + - If listener task raises an exception, task is cancelled and exception is propogated upwards + + :param task: Task function to wrap + :returns: Cancellable task function + """ + # Executor type depends on receiver configuration which we can't accessed in any way if not asyncio.iscoroutinefunction(task): raise ValueError("Can't cancel synchronous function") - + @combines(task) async def wrapper( - *args, - __taskiq_context: Annotated[Context, TaskiqDepends()], - **kwargs + *args, __taskiq_context: Annotated[Context, TaskiqDepends()], **kwargs ): task_id = __taskiq_context.message.task_id result = None listener_exception: Exception | None = None task_exception: Exception | None = None - cancelled_by_request: bool = False + cancelled_by_request: bool = False async with anyio.create_task_group() as group: + async def listen_for_cancellation( - task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED + task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED, ): nonlocal listener_exception, cancelled_by_request @@ -90,7 +167,7 @@ async def listen_for_cancellation( listener_exception = e finally: group.cancel_scope.cancel() - + async def call_task(): nonlocal result, task_exception @@ -103,15 +180,15 @@ async def call_task(): finally: group.cancel_scope.cancel() - # Listen before checking for cancellation in database so - # the message won't get lost in non-persistent queues + # Listen before checking for cancellation in state holder + # so the message won't get lost in non-persistent queues await group.start(listen_for_cancellation) if await self.is_cancelled(task_id): cancelled_by_request = True group.cancel_scope.cancel() else: group.start_soon(call_task) - + if task_exception is not None: raise task_exception elif cancelled_by_request: @@ -120,10 +197,11 @@ async def call_task(): raise listener_exception else: return result + return wrapper - async def _broker_startup_handler(self, state: TaskiqState) -> None: + async def _broker_startup_handler(self, _: TaskiqState) -> None: await self.startup() - - async def _broker_shutdown_handler(self, state: TaskiqState) -> None: - await self.shutdown() \ No newline at end of file + + async def _broker_shutdown_handler(self, _: TaskiqState) -> None: + await self.shutdown() diff --git a/src/taskiq_cancellation/abc/notifier.py b/src/taskiq_cancellation/abc/notifier.py index 2512fb5..3ba408f 100644 --- a/src/taskiq_cancellation/abc/notifier.py +++ b/src/taskiq_cancellation/abc/notifier.py @@ -6,21 +6,44 @@ class CancellationNotifier(abc.ABC): + """Receives cancellation messages and notifies listeners of these messages""" + def __init__(self, serializer: TaskiqSerializer = JSONSerializer()): self.serializer = serializer async def startup(self) -> None: + """Starts up cancellation notifier""" pass async def shutdown(self) -> None: + """Shuts down cancellation notifier""" pass - + @abc.abstractmethod async def cancel(self, task_id: str) -> None: + """ + Sends a cancellation message of a task with task id of *task_id* + + :param task_id: id of the task to cancel + :type task_id: str + """ pass @abc.abstractmethod async def listen_for_cancellation( self, task_id: str, started_listening_task_status: TaskStatus ) -> None: + """ + Listens for cancellation messages and raises :ref:`TaskCancellationException` when + receives :ref:`CancellationMessage` with same id as *task_id*. + + This function is used in :func:`cancellable` decorator of :ref:`ModularCancellationBackend`. + Call `started_listening_task_status.started()` when the listener is ready + to receive messages. + + :param task_id: id of task that will be listened for + :type task_id: str + :param started_listening_task_status: + :type started_listening_task_status: anyio.abc.TaskStatus + """ pass diff --git a/src/taskiq_cancellation/abc/state_holder.py b/src/taskiq_cancellation/abc/state_holder.py index 45df346..3b2615c 100644 --- a/src/taskiq_cancellation/abc/state_holder.py +++ b/src/taskiq_cancellation/abc/state_holder.py @@ -2,10 +2,26 @@ class CancellationStateHolder(abc.ABC): + """Holds cancellation state of Taskiq tasks""" + @abc.abstractmethod async def cancel(self, task_id: str) -> None: + """ + Sets a state of task with task id of *task_id* to be cancelled + + :param task_id: id of the task to cancel + :type task_id: str + """ pass @abc.abstractmethod async def is_cancelled(self, task_id: str) -> bool: + """ + Checks if a task with task id of *task_id* is set to be cancelled + + :param task_id: task id to check + :type task_id: str + :returns: task cancellation state + :rtype: bool + """ pass diff --git a/src/taskiq_cancellation/integrations/aiopika/__init__.py b/src/taskiq_cancellation/integrations/aiopika/__init__.py index 9d0488c..0af486e 100644 --- a/src/taskiq_cancellation/integrations/aiopika/__init__.py +++ b/src/taskiq_cancellation/integrations/aiopika/__init__.py @@ -1,6 +1,4 @@ from .notifier import AioPikaNotifier -__all__ = [ - "AioPikaNotifier" -] +__all__ = ["AioPikaNotifier"] diff --git a/src/taskiq_cancellation/integrations/aiopika/notifier.py b/src/taskiq_cancellation/integrations/aiopika/notifier.py index d085bb9..7965134 100644 --- a/src/taskiq_cancellation/integrations/aiopika/notifier.py +++ b/src/taskiq_cancellation/integrations/aiopika/notifier.py @@ -19,26 +19,26 @@ def __init__(self, url: str, **kwargs): async def cancel(self, task_id: str) -> None: timestamp = time.time() - connection = await aio_pika.connect_robust( - self.url - ) + connection = await aio_pika.connect_robust(self.url) channel = await connection.channel() exchange = await channel.declare_exchange( self.EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT, durable=True ) - + await exchange.publish( - aio_pika.Message(body=self.serializer.dumpb( - model_dump(CancellationMessage(task_id=task_id, timestamp=timestamp)) - )), - routing_key="" + aio_pika.Message( + body=self.serializer.dumpb( + model_dump( + CancellationMessage(task_id=task_id, timestamp=timestamp) + ) + ) + ), + routing_key="", ) async def _listen(self, started_listening: asyncio.Event): - connection = await aio_pika.connect_robust( - self.url - ) + connection = await aio_pika.connect_robust(self.url) channel = await connection.channel() exchange = await channel.declare_exchange( @@ -53,9 +53,8 @@ async def _listen(self, started_listening: asyncio.Event): async with queue.iterator() as queue_iter: async for message in queue_iter: cancellation_message = model_validate( - CancellationMessage, - self.serializer.loadb(message.body) - ) + CancellationMessage, self.serializer.loadb(message.body) + ) for queue in self.queues: await queue.put(cancellation_message) diff --git a/src/taskiq_cancellation/integrations/queue_notifier.py b/src/taskiq_cancellation/integrations/queue_notifier.py index b24ea17..3190501 100644 --- a/src/taskiq_cancellation/integrations/queue_notifier.py +++ b/src/taskiq_cancellation/integrations/queue_notifier.py @@ -10,13 +10,20 @@ class QueueCancellationNotifier(CancellationNotifier): - CHANNEL_NAME = "__taskiq_cancellation_notifications" + """ + A helper cancellation notifier that uses one listener to receive cancellation messages and + notifies listeners from `listen_for_cancellation` via `asyncio.Queue` + Requires :func:`_listen` to be implemeted + """ def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self.listener_task: asyncio.Task | None = None - self.queues: weakref.WeakSet[asyncio.Queue[CancellationMessage]] = weakref.WeakSet() + self.queues: weakref.WeakSet[asyncio.Queue[CancellationMessage]] = ( + weakref.WeakSet() + ) + """Set of subscribers' `asyncio.Queue`s to populate when message's received""" async def shutdown(self) -> None: if self.listener_task is not None: @@ -28,12 +35,8 @@ async def listen_for_cancellation( cancellations: asyncio.Queue[CancellationMessage] = asyncio.Queue() if self.listener_task is None: - started_listening = asyncio.Event() - self.listener_task = asyncio.create_task( - self._listen(started_listening) - ) - await started_listening.wait() - + await self._create_listener_task() + await self._subscribe(cancellations) started_listening_task_status.started() @@ -44,21 +47,25 @@ async def listen_for_cancellation( raise TaskCancellationException() @abc.abstractmethod - async def _listen(self, started_listening: asyncio.Event): + async def _listen(self, started_listening: asyncio.Event) -> None: + """ + Listens for cancellation messages and put them into subscribers' `asyncio.Queue`s + + :param started_listening: event to be set when listener is ready to receive messages + :type started_listening: asyncio.Event + """ pass async def _create_listener_task(self): if self.listener_task is not None: self.listener_task.cancel() - + started_listening = asyncio.Event() - self.listener_task = asyncio.create_task( - self._listen(started_listening) - ) + self.listener_task = asyncio.create_task(self._listen(started_listening)) await started_listening.wait() async def _subscribe(self, queue: asyncio.Queue[CancellationMessage]): self.queues.add(queue) - + async def _unsubsribe(self, queue: asyncio.Queue[CancellationMessage]): - self.queues.remove(queue) \ No newline at end of file + self.queues.remove(queue) diff --git a/src/taskiq_cancellation/integrations/redis/__init__.py b/src/taskiq_cancellation/integrations/redis/__init__.py index fe38b68..4aedc97 100644 --- a/src/taskiq_cancellation/integrations/redis/__init__.py +++ b/src/taskiq_cancellation/integrations/redis/__init__.py @@ -6,5 +6,5 @@ __all__ = [ "PubSubCancellationNotifier", "RedisCancellationStateHolder", - "RedisCancellationBackend" + "RedisCancellationBackend", ] diff --git a/src/taskiq_cancellation/integrations/redis/backend.py b/src/taskiq_cancellation/integrations/redis/backend.py index 90b6447..e11b177 100644 --- a/src/taskiq_cancellation/integrations/redis/backend.py +++ b/src/taskiq_cancellation/integrations/redis/backend.py @@ -5,12 +5,8 @@ class RedisCancellationBackend(ModularCancellationBackend): - def __init__( - self, - url: str, - **kwargs - ) -> None: + def __init__(self, url: str, **kwargs) -> None: super().__init__( RedisCancellationStateHolder(url, **kwargs), - PubSubCancellationNotifier(url, **kwargs) + PubSubCancellationNotifier(url, **kwargs), ) diff --git a/src/taskiq_cancellation/integrations/redis/notifier.py b/src/taskiq_cancellation/integrations/redis/notifier.py index bc70047..b76e5b9 100644 --- a/src/taskiq_cancellation/integrations/redis/notifier.py +++ b/src/taskiq_cancellation/integrations/redis/notifier.py @@ -22,10 +22,12 @@ async def cancel(self, task_id: str) -> None: async with redis.Redis(connection_pool=self.connection_pool) as conn: await conn.publish( - self.CHANNEL_NAME, + self.CHANNEL_NAME, self.serializer.dumpb( - model_dump(CancellationMessage(task_id=task_id, timestamp=timestamp)) - ) + model_dump( + CancellationMessage(task_id=task_id, timestamp=timestamp) + ) + ), ) async def _listen(self, started_listening: asyncio.Event): @@ -38,7 +40,9 @@ async def _listen(self, started_listening: asyncio.Event): loop.call_soon_threadsafe(started_listening.set) while True: - message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=None) + message = await pubsub.get_message( + ignore_subscribe_messages=True, timeout=None + ) if message is None: continue @@ -46,8 +50,7 @@ async def _listen(self, started_listening: asyncio.Event): continue cancellation_message = model_validate( - CancellationMessage, - self.serializer.loadb(message['data']) + CancellationMessage, self.serializer.loadb(message["data"]) ) for queue in self.queues: await queue.put(cancellation_message) diff --git a/src/taskiq_cancellation/modular.py b/src/taskiq_cancellation/modular.py index 54af77a..087b9a0 100644 --- a/src/taskiq_cancellation/modular.py +++ b/src/taskiq_cancellation/modular.py @@ -1,21 +1,22 @@ -from .abc import ( - CancellationBackend, - CancellationNotifier, - CancellationStateHolder -) +from .abc import CancellationBackend, CancellationNotifier, CancellationStateHolder import anyio from anyio.abc import TaskStatus class ModularCancellationBackend(CancellationBackend): + """ + Modular cancellation backend made up of :class:`CancellationStateHolder` + and :class:`CancellationNotifier` + + - `CancellationStateHolder` stores cancellation state and blocks the task from being run. + - `CancellationNotifier` receives cancellation messages and cancels already running tasks. + """ def __init__( - self, - state_holder: CancellationStateHolder, - notifier: CancellationNotifier + self, state_holder: CancellationStateHolder, notifier: CancellationNotifier ): super().__init__() - + self.notifier: CancellationNotifier = notifier self.state_holder: CancellationStateHolder = state_holder @@ -30,4 +31,6 @@ async def cancel(self, task_id: str): async def listen_for_cancellation( self, task_id: str, started_listening_task_status: TaskStatus[None] ): - await self.notifier.listen_for_cancellation(task_id, started_listening_task_status) + await self.notifier.listen_for_cancellation( + task_id, started_listening_task_status + ) diff --git a/src/taskiq_cancellation/utils.py b/src/taskiq_cancellation/utils.py index b1cda03..4d659f0 100644 --- a/src/taskiq_cancellation/utils.py +++ b/src/taskiq_cancellation/utils.py @@ -12,7 +12,7 @@ def combines(wrapped): In cases of parameter collision wrapper parameters will be used Example: - ''' + ''' import inspect def decorator(func): @@ -20,41 +20,44 @@ def decorator(func): def wrapper(c: int, *args, **kwargs): return foo(*args, **kwargs) * c return wrapper - + @decorator def foo(a: int, b = "lol"): return b * a - + print(inspect.signature(foo)) # (a: int, c: int, b='lol', *args, **kwargs) ''' """ wrapped_signature: inspect.Signature = inspect.signature(wrapped) wrapped_type_hints: typing.Dict[str, str] = typing.get_type_hints(wrapped) - + def decorator(wrapper): wrapper_signature = inspect.signature(wrapper) wrapper_type_hints = typing.get_type_hints(wrapper) for param_name in wrapped_signature.parameters.keys(): if param_name in wrapper_signature.parameters.keys(): - logging.warning(f"Parameter {param_name} will be overwritten by wrapper function") - - parameters = OrderedDict(wrapped_signature.parameters, **wrapper_signature.parameters) + logging.warning( + f"Parameter {param_name} will be overwritten by wrapper function" + ) + + parameters = OrderedDict( + wrapped_signature.parameters, **wrapper_signature.parameters + ) parameters = sorted( parameters.values(), - key=lambda p: p.kind + (0.5 if p.default != inspect.Parameter.empty else 0) + key=lambda p: p.kind + (0.5 if p.default != inspect.Parameter.empty else 0), ) new_return_annotation: inspect.Signature if wrapper_signature.return_annotation is not None: - new_return_annotation = wrapper_signature.return_annotation + new_return_annotation = wrapper_signature.return_annotation else: - new_return_annotation = wrapped_signature.return_annotation + new_return_annotation = wrapped_signature.return_annotation new_signature = inspect.Signature( - parameters=parameters, - return_annotation=new_return_annotation + parameters=parameters, return_annotation=new_return_annotation ) new_annotations = dict(wrapped_type_hints, **wrapper_type_hints) @@ -63,9 +66,8 @@ def decorator(wrapper): wrapper.__signature__ = new_signature return wrapper + return decorator -__all__ = [ - "combines" -] +__all__ = ["combines"] From 414ef205d09890ed6450e61d0c88fa3c11ef45c7 Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Fri, 7 Nov 2025 13:24:52 +0300 Subject: [PATCH 3/9] chore: restructure folders by interfaces --- src/taskiq_cancellation/__init__.py | 2 +- src/taskiq_cancellation/abc/backend.py | 4 +-- .../{integrations => backends}/__init__.py | 0 .../{ => backends}/modular.py | 2 +- .../redis/backend.py => backends/redis.py} | 6 ++-- .../integrations/aiopika/__init__.py | 4 --- .../integrations/redis/__init__.py | 10 ------ src/taskiq_cancellation/notifiers/__init__.py | 0 .../notifier.py => notifiers/aiopika.py} | 2 +- .../notifiers/in_memory.py | 35 +++++++++++++++++++ .../queue_notifier.py => notifiers/queue.py} | 0 .../redis/notifier.py => notifiers/redis.py} | 2 +- .../state_holders/__init__.py | 0 .../redis.py} | 0 14 files changed, 44 insertions(+), 23 deletions(-) rename src/taskiq_cancellation/{integrations => backends}/__init__.py (100%) rename src/taskiq_cancellation/{ => backends}/modular.py (92%) rename src/taskiq_cancellation/{integrations/redis/backend.py => backends/redis.py} (53%) delete mode 100644 src/taskiq_cancellation/integrations/aiopika/__init__.py delete mode 100644 src/taskiq_cancellation/integrations/redis/__init__.py create mode 100644 src/taskiq_cancellation/notifiers/__init__.py rename src/taskiq_cancellation/{integrations/aiopika/notifier.py => notifiers/aiopika.py} (97%) create mode 100644 src/taskiq_cancellation/notifiers/in_memory.py rename src/taskiq_cancellation/{integrations/queue_notifier.py => notifiers/queue.py} (100%) rename src/taskiq_cancellation/{integrations/redis/notifier.py => notifiers/redis.py} (97%) create mode 100644 src/taskiq_cancellation/state_holders/__init__.py rename src/taskiq_cancellation/{integrations/redis/state_holder.py => state_holders/redis.py} (100%) diff --git a/src/taskiq_cancellation/__init__.py b/src/taskiq_cancellation/__init__.py index f61572f..57864e4 100644 --- a/src/taskiq_cancellation/__init__.py +++ b/src/taskiq_cancellation/__init__.py @@ -1,5 +1,5 @@ from .abc import CancellationBackend -from .modular import ModularCancellationBackend +from .backends.modular import ModularCancellationBackend __all__ = [ diff --git a/src/taskiq_cancellation/abc/backend.py b/src/taskiq_cancellation/abc/backend.py index 6323b07..ab88f47 100644 --- a/src/taskiq_cancellation/abc/backend.py +++ b/src/taskiq_cancellation/abc/backend.py @@ -6,8 +6,8 @@ from anyio.abc import TaskStatus from taskiq import Context, TaskiqDepends, AsyncBroker, TaskiqEvents, TaskiqState -from ..utils import combines -from ..exceptions import TaskCancellationException +from taskiq_cancellation.utils import combines +from taskiq_cancellation.exceptions import TaskCancellationException P = ParamSpec("P") diff --git a/src/taskiq_cancellation/integrations/__init__.py b/src/taskiq_cancellation/backends/__init__.py similarity index 100% rename from src/taskiq_cancellation/integrations/__init__.py rename to src/taskiq_cancellation/backends/__init__.py diff --git a/src/taskiq_cancellation/modular.py b/src/taskiq_cancellation/backends/modular.py similarity index 92% rename from src/taskiq_cancellation/modular.py rename to src/taskiq_cancellation/backends/modular.py index 087b9a0..9343602 100644 --- a/src/taskiq_cancellation/modular.py +++ b/src/taskiq_cancellation/backends/modular.py @@ -1,4 +1,4 @@ -from .abc import CancellationBackend, CancellationNotifier, CancellationStateHolder +from taskiq_cancellation.abc import CancellationBackend, CancellationNotifier, CancellationStateHolder import anyio from anyio.abc import TaskStatus diff --git a/src/taskiq_cancellation/integrations/redis/backend.py b/src/taskiq_cancellation/backends/redis.py similarity index 53% rename from src/taskiq_cancellation/integrations/redis/backend.py rename to src/taskiq_cancellation/backends/redis.py index e11b177..3870e32 100644 --- a/src/taskiq_cancellation/integrations/redis/backend.py +++ b/src/taskiq_cancellation/backends/redis.py @@ -1,7 +1,7 @@ -from taskiq_cancellation.modular import ModularCancellationBackend +from taskiq_cancellation.backends.modular import ModularCancellationBackend -from .notifier import PubSubCancellationNotifier -from .state_holder import RedisCancellationStateHolder +from taskiq_cancellation.notifiers.redis import PubSubCancellationNotifier +from taskiq_cancellation.state_holders.redis import RedisCancellationStateHolder class RedisCancellationBackend(ModularCancellationBackend): diff --git a/src/taskiq_cancellation/integrations/aiopika/__init__.py b/src/taskiq_cancellation/integrations/aiopika/__init__.py deleted file mode 100644 index 0af486e..0000000 --- a/src/taskiq_cancellation/integrations/aiopika/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .notifier import AioPikaNotifier - - -__all__ = ["AioPikaNotifier"] diff --git a/src/taskiq_cancellation/integrations/redis/__init__.py b/src/taskiq_cancellation/integrations/redis/__init__.py deleted file mode 100644 index 4aedc97..0000000 --- a/src/taskiq_cancellation/integrations/redis/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from .notifier import PubSubCancellationNotifier -from .state_holder import RedisCancellationStateHolder -from .backend import RedisCancellationBackend - - -__all__ = [ - "PubSubCancellationNotifier", - "RedisCancellationStateHolder", - "RedisCancellationBackend", -] diff --git a/src/taskiq_cancellation/notifiers/__init__.py b/src/taskiq_cancellation/notifiers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/taskiq_cancellation/integrations/aiopika/notifier.py b/src/taskiq_cancellation/notifiers/aiopika.py similarity index 97% rename from src/taskiq_cancellation/integrations/aiopika/notifier.py rename to src/taskiq_cancellation/notifiers/aiopika.py index 7965134..8e55ab8 100644 --- a/src/taskiq_cancellation/integrations/aiopika/notifier.py +++ b/src/taskiq_cancellation/notifiers/aiopika.py @@ -6,7 +6,7 @@ from taskiq_cancellation.message import CancellationMessage -from ..queue_notifier import QueueCancellationNotifier +from .queue import QueueCancellationNotifier class AioPikaNotifier(QueueCancellationNotifier): diff --git a/src/taskiq_cancellation/notifiers/in_memory.py b/src/taskiq_cancellation/notifiers/in_memory.py new file mode 100644 index 0000000..748a129 --- /dev/null +++ b/src/taskiq_cancellation/notifiers/in_memory.py @@ -0,0 +1,35 @@ +import time +import asyncio + +from taskiq_cancellation.message import CancellationMessage + +from .queue import QueueCancellationNotifier + + +class InMemoryCancellationNotifier(QueueCancellationNotifier): + """In memory cancellation notifier used for testing""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + + self.messages: asyncio.Queue[CancellationMessage] = asyncio.Queue() + + async def cancel(self, task_id: str) -> None: + timestamp = time.time() + + await self.messages.put( + CancellationMessage( + task_id=task_id, + timestamp=timestamp + ) + ) + + async def _listen(self, started_listening: asyncio.Event) -> None: + loop = asyncio.get_running_loop() + loop.call_soon_threadsafe(started_listening.set) + + while True: + message = await self.messages.get() + + for queue in self.queues: + await queue.put(message) diff --git a/src/taskiq_cancellation/integrations/queue_notifier.py b/src/taskiq_cancellation/notifiers/queue.py similarity index 100% rename from src/taskiq_cancellation/integrations/queue_notifier.py rename to src/taskiq_cancellation/notifiers/queue.py diff --git a/src/taskiq_cancellation/integrations/redis/notifier.py b/src/taskiq_cancellation/notifiers/redis.py similarity index 97% rename from src/taskiq_cancellation/integrations/redis/notifier.py rename to src/taskiq_cancellation/notifiers/redis.py index b76e5b9..2efa7c2 100644 --- a/src/taskiq_cancellation/integrations/redis/notifier.py +++ b/src/taskiq_cancellation/notifiers/redis.py @@ -6,7 +6,7 @@ from taskiq_cancellation.message import CancellationMessage -from ..queue_notifier import QueueCancellationNotifier +from .queue import QueueCancellationNotifier class PubSubCancellationNotifier(QueueCancellationNotifier): diff --git a/src/taskiq_cancellation/state_holders/__init__.py b/src/taskiq_cancellation/state_holders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/taskiq_cancellation/integrations/redis/state_holder.py b/src/taskiq_cancellation/state_holders/redis.py similarity index 100% rename from src/taskiq_cancellation/integrations/redis/state_holder.py rename to src/taskiq_cancellation/state_holders/redis.py From 7d2a95b4f632d178cae0f87980669ffcf21d9951 Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Fri, 7 Nov 2025 13:25:33 +0300 Subject: [PATCH 4/9] feat: null and in memory state holders and notifiers --- src/taskiq_cancellation/backends/in_memory.py | 12 ++++++++++++ src/taskiq_cancellation/notifiers/null.py | 19 +++++++++++++++++++ .../state_holders/in_memory.py | 16 ++++++++++++++++ src/taskiq_cancellation/state_holders/null.py | 15 +++++++++++++++ 4 files changed, 62 insertions(+) create mode 100644 src/taskiq_cancellation/backends/in_memory.py create mode 100644 src/taskiq_cancellation/notifiers/null.py create mode 100644 src/taskiq_cancellation/state_holders/in_memory.py create mode 100644 src/taskiq_cancellation/state_holders/null.py diff --git a/src/taskiq_cancellation/backends/in_memory.py b/src/taskiq_cancellation/backends/in_memory.py new file mode 100644 index 0000000..ee5e385 --- /dev/null +++ b/src/taskiq_cancellation/backends/in_memory.py @@ -0,0 +1,12 @@ +from taskiq_cancellation.notifiers.in_memory import InMemoryCancellationNotifier +from taskiq_cancellation.state_holders.in_memory import InMemoryCancellationStateHolder + +from .modular import ModularCancellationBackend + + +class InMemoryCancellationBackend(ModularCancellationBackend): + def __init__(self, **kwargs): + super().__init__( + state_holder=InMemoryCancellationStateHolder(**kwargs), + notifier=InMemoryCancellationNotifier(**kwargs) + ) diff --git a/src/taskiq_cancellation/notifiers/null.py b/src/taskiq_cancellation/notifiers/null.py new file mode 100644 index 0000000..9740012 --- /dev/null +++ b/src/taskiq_cancellation/notifiers/null.py @@ -0,0 +1,19 @@ +import asyncio + +from anyio.abc import TaskStatus +from taskiq_cancellation.abc.notifier import CancellationNotifier + + +class NullCancellationNotifier(CancellationNotifier): + """ + \"Do nothing\" cancellation notifier + + May be useful if there's no need or ability to use an actual notifier + """ + + async def cancel(self, task_id: str) -> None: + pass + + async def listen_for_cancellation(self, task_id: str, started_listening_task_status: TaskStatus) -> None: + started_listening_task_status.started() + await asyncio.sleep(float("+inf")) diff --git a/src/taskiq_cancellation/state_holders/in_memory.py b/src/taskiq_cancellation/state_holders/in_memory.py new file mode 100644 index 0000000..23a7089 --- /dev/null +++ b/src/taskiq_cancellation/state_holders/in_memory.py @@ -0,0 +1,16 @@ +from taskiq_cancellation.abc import CancellationStateHolder + + +class InMemoryCancellationStateHolder(CancellationStateHolder): + """In memory cancellation state holder used for testing""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + + self.state_holder: dict[str, bool] = {} + + async def cancel(self, task_id: str) -> None: + self.state_holder[task_id] = True + + async def is_cancelled(self, task_id: str) -> bool: + return self.state_holder.get(task_id, False) diff --git a/src/taskiq_cancellation/state_holders/null.py b/src/taskiq_cancellation/state_holders/null.py new file mode 100644 index 0000000..4383071 --- /dev/null +++ b/src/taskiq_cancellation/state_holders/null.py @@ -0,0 +1,15 @@ +from taskiq_cancellation.abc import CancellationStateHolder + + +class NullCancellationStateHolder(CancellationStateHolder): + """ + \"Do nothing\" cancellation state holder + + May be useful if there's no need or ability to use an actual state holder + """ + + async def cancel(self, task_id: str) -> None: + pass + + async def is_cancelled(self, task_id: str) -> bool: + return False From 7918d737431ee965c0f6f7cbf6fa73257cf9ac28 Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Fri, 7 Nov 2025 15:31:49 +0300 Subject: [PATCH 5/9] feat: startup and shutdown in state holders --- src/taskiq_cancellation/abc/state_holder.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/taskiq_cancellation/abc/state_holder.py b/src/taskiq_cancellation/abc/state_holder.py index 3b2615c..8416440 100644 --- a/src/taskiq_cancellation/abc/state_holder.py +++ b/src/taskiq_cancellation/abc/state_holder.py @@ -25,3 +25,11 @@ async def is_cancelled(self, task_id: str) -> bool: :rtype: bool """ pass + + async def startup(self) -> None: + """Starts up cancellation state holder""" + pass + + async def shutdown(self) -> None: + """Shuts down cancellation state holder""" + pass From d4dbe976b6c0658062a4ea04fae7f8ac97c11d11 Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Fri, 7 Nov 2025 15:33:26 +0300 Subject: [PATCH 6/9] fix: call notifier and state holder startup/shutdown in modular backend --- src/taskiq_cancellation/backends/modular.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/taskiq_cancellation/backends/modular.py b/src/taskiq_cancellation/backends/modular.py index 9343602..2d87dfe 100644 --- a/src/taskiq_cancellation/backends/modular.py +++ b/src/taskiq_cancellation/backends/modular.py @@ -34,3 +34,13 @@ async def listen_for_cancellation( await self.notifier.listen_for_cancellation( task_id, started_listening_task_status ) + + async def startup(self) -> None: + await super().startup() + await self.state_holder.startup() + await self.notifier.startup() + + async def shutdown(self) -> None: + await super().shutdown() + await self.state_holder.shutdown() + await self.notifier.shutdown() From 61ae28ab91dce7ffef194057c62bc6f037ff71fd Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Fri, 7 Nov 2025 15:51:14 +0300 Subject: [PATCH 7/9] test: cancellable task tests aka baby's first tests --- tests/test_cancellation.py | 58 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 tests/test_cancellation.py diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py new file mode 100644 index 0000000..a790d1e --- /dev/null +++ b/tests/test_cancellation.py @@ -0,0 +1,58 @@ +import pytest +import asyncio + +from taskiq import AsyncBroker, InMemoryBroker + +from taskiq_cancellation.abc import CancellationBackend +from taskiq_cancellation.backends.in_memory import InMemoryCancellationBackend +from taskiq_cancellation.exceptions import TaskCancellationException + + +@pytest.fixture +def broker(): + return InMemoryBroker() + + +@pytest.fixture +def backend(broker): + return InMemoryCancellationBackend().with_broker(broker) + + +@pytest.mark.asyncio +async def test_task_success(broker: AsyncBroker, backend: CancellationBackend): + """Test that cancellable task can run successfully""" + + @broker.task + @backend.cancellable + async def task(): + await asyncio.sleep(0.1) + + await broker.startup() + + t = await task.kiq() + + result = await t.wait_result() + assert result.is_err is False + + await broker.shutdown() + + +@pytest.mark.asyncio +async def test_task_cancellation(broker: AsyncBroker, backend: CancellationBackend): + """Test that cancellable task can be cancelled""" + + @broker.task + @backend.cancellable + async def task(): + await asyncio.sleep(0.3) + + await broker.startup() + + t = await task.kiq() + await backend.cancel(t.task_id) + + with pytest.raises(TaskCancellationException): + result = await t.wait_result() + result.raise_for_error() + + await broker.shutdown() From 1bebc5a65162aa79fca0868a5ddfc89fec9d24ef Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Fri, 7 Nov 2025 23:42:40 +0300 Subject: [PATCH 8/9] ci: linting and testing CI aka baby's first CI --- .github/workflows/lint.yaml | 33 ++++++++++++++++++++++++++++++ .github/workflows/run_tests.yaml | 35 ++++++++++++++++++++++++++++++++ pyproject.toml | 25 ++++++++++++++++++----- 3 files changed, 88 insertions(+), 5 deletions(-) create mode 100644 .github/workflows/lint.yaml create mode 100644 .github/workflows/run_tests.yaml diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..0e31ea4 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,33 @@ +name: Lint + +on: + pull_request: + branches: [develop] + +jobs: + lint: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v5 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: 3.14 + + - name: Setup uv + uses: astral-sh/setup-uv@v7 + + - name: Create virtual environment + run: uv venv .venv && source .venv/bin/activate + + - name: Install modules + run: uv sync + + - name: Check code style + run: uv run ruff check + + - name: Check static typing + run: uv run mypy . diff --git a/.github/workflows/run_tests.yaml b/.github/workflows/run_tests.yaml new file mode 100644 index 0000000..b540751 --- /dev/null +++ b/.github/workflows/run_tests.yaml @@ -0,0 +1,35 @@ +name: Testing + +on: + pull_request: + branches: [develop] + +jobs: + run-tests: + runs-on: ubuntu-latest + + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + fail-fast: false + + steps: + - name: Checkout + uses: actions/checkout@v5 + + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup uv + uses: astral-sh/setup-uv@v7 + + - name: Create virtual environment + run: uv venv .venv && source .venv/bin/activate + + - name: Install modules + run: uv sync + + - name: Run tests for Python ${{ matrix.python-version }} + run: uv run pytest diff --git a/pyproject.toml b/pyproject.toml index 5c1c6cf..9ec9714 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,7 @@ -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - [project] name = "taskiq-cancellation" dynamic = ["version"] -description = 'Task cancellation mechanism for taskiq' +description = 'Task cancellation for taskiq' readme = "README.md" requires-python = ">=3.8" license = "MIT" @@ -19,6 +15,8 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] @@ -33,6 +31,10 @@ Documentation = "https://github.com/ACherryJam/taskiq-cancellation#readme" Issues = "https://github.com/ACherryJam/taskiq-cancellation/issues" Source = "https://github.com/ACherryJam/taskiq-cancellation" +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + [tool.hatch.version] path = "src/taskiq_cancellation/__about__.py" @@ -41,6 +43,11 @@ extra-dependencies = ["mypy>=1.0.0"] [tool.hatch.envs.types.scripts] check = "mypy --install-types --non-interactive {args:src/taskiq_cancellation tests}" +[tool.mypy] +ignore_missing_imports = true +exclude = ["examples"] + + [tool.coverage.run] source_pkgs = ["taskiq_cancellation", "tests"] branch = true @@ -56,3 +63,11 @@ tests = ["tests", "*/taskiq-cancellation/tests"] [tool.coverage.report] exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] + +[dependency-groups] +dev = [ + "mypy>=1.14.1", + "pytest>=8.3.5", + "pytest-asyncio>=0.24.0", + "ruff>=0.14.4", +] From b8a46c08a5c6dc04a65a6de3d6334d1078d524ae Mon Sep 17 00:00:00 2001 From: Alexander Starikov Date: Sat, 8 Nov 2025 00:12:33 +0300 Subject: [PATCH 9/9] fix!: add typing_extensions to support Python 3.9+ --- .github/workflows/run_tests.yaml | 2 +- pyproject.toml | 13 ++++++------- src/taskiq_cancellation/abc/backend.py | 3 ++- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/run_tests.yaml b/.github/workflows/run_tests.yaml index b540751..3a45a80 100644 --- a/.github/workflows/run_tests.yaml +++ b/.github/workflows/run_tests.yaml @@ -10,7 +10,7 @@ jobs: strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] fail-fast: false steps: diff --git a/pyproject.toml b/pyproject.toml index 9ec9714..869d263 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,14 +3,13 @@ name = "taskiq-cancellation" dynamic = ["version"] description = 'Task cancellation for taskiq' readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.9" license = "MIT" keywords = ["taskiq", "cancellation"] authors = [{ name = "Alexander Starikov", email = "acherryjam@gmail.com" }] classifiers = [ "Development Status :: 4 - Beta", "Programming Language :: Python", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -20,7 +19,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] -dependencies = ["taskiq"] +dependencies = ["taskiq", "typing-extensions>=4.13.2"] [project.optional-dependencies] redis = ["redis~=3.0"] @@ -66,8 +65,8 @@ exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] [dependency-groups] dev = [ - "mypy>=1.14.1", - "pytest>=8.3.5", - "pytest-asyncio>=0.24.0", - "ruff>=0.14.4", + "mypy>=1.14.1", + "pytest>=8.3.5", + "pytest-asyncio>=0.24.0", + "ruff>=0.14.4", ] diff --git a/src/taskiq_cancellation/abc/backend.py b/src/taskiq_cancellation/abc/backend.py index ab88f47..c3befb9 100644 --- a/src/taskiq_cancellation/abc/backend.py +++ b/src/taskiq_cancellation/abc/backend.py @@ -1,6 +1,7 @@ import abc import asyncio -from typing import Callable, Annotated, ParamSpec, TypeVar, Awaitable, Self +from typing import Callable, Annotated, TypeVar, Awaitable +from typing_extensions import ParamSpec, Self import anyio from anyio.abc import TaskStatus