Skip to content

ACherryJam/taskiq-cancellation

Repository files navigation

taskiq-cancellation logo

PyPI - Version PyPI - Python Version

taskiq-cancellation aims to be a drop-in task cancellation solution for taskiq as the original package doesn't provide a cancellation API.

Contents:

Installation

This package can be install from PyPI with your package manager of choice.

pip install taskiq-cancellation
pipx install taskiq-cancellation
poetry add taskiq-cancellation
uv add taskiq-cancellation

taskiq-cancellation currently provides integrations with Redis and RabbitMQ that are installable with redis and aiopika extras respectfully.

pip install taskiq-cancellation[redis,aiopika]

Usage

To do task cancellation, you need to:

  1. Create a cancellation backend
  2. Wrap a function with cancellable decorator
  3. Cancel the task with cancel(task_id)
broker = PubSubBroker(url).with_result_backend(RedisAsyncResultBackend(url))
cancellation_backend = RedisCancellationBackend(url).with_broker(broker)

@broker.task
@cancellation_backend.cancellable
async def sleep(seconds: int):
    await asyncio.sleep(seconds)
    print("Slept!")  # Won't be printed on worker side because of the cancellation

async def main():
    await broker.startup()

    task = await sleep.kiq(5)
    await cancellation_backend.cancel(task.task_id)

    await broker.shutdown()

asyncio.run(main())

What is a cancellation backend?

Cancellation backend can be seen as combination of a broker and result backend for cancellation messages that works underneath taskiq's broker. Cancellation backend won't run tasks marked as cancelled and will listen for cancellation messages for already running tasks.

Cancellation backend example scheme

Modular cancellation backend

To easily create cancellation backends taskiq-cancellation provides ModularCancellationBackend. Modular cancellation backend consists of two parts: state holder and notifier.

  • State holder is used to check for task cancellation status before running the task.
  • Notifier is used to listen for cancellation messages while running the task

This allows to use any techonology for task cancellation. For example, if one uses SQL database and RabbitMQ message broker, they can make a custom state holder with SQL library of their choice and use provided RabbitMQ notifier.

from taskiq_cancellation import ModularCancellationBackend
from taskiq_cancellation.state_holders.redis import RedisCancellationStateHolder
from taskiq_cancellation.notifiers.aiopika import AioPikaCancellationNotifier

backend = ModularCancellationBackend(
    RedisCancellationStateHolder("redis://localhost:6379"),
    AioPikaCancellationNotifier("amqp://guest:guest@localhost:5672")
)

Available integrations

taskiq-cancellation provides:

  • state holder for Redis (RedisCancellationStateHolder)
  • notifiers for Redis pub/sub (PubSubCancellationNotifier) and RabbitMQ (AioPikaCancellationNotifier)

Also there are NullCancellationStateHolder and NullCancellationNotifier that do absolutely nothing, if there's no need to not check for task cancellation before starting the task or no need to listen for cancellation of already running tasks.

Level and edge cancellation

By default, taskiq-cancellation uses anyio and its level cancellation. Level cancellation raises a cancellation exception on every asynchronous wait in a function.

As external libraries might not support level cancellation, task-cancellation also provides edge cancellation via asyncio. Edge cancellation raises an exception only once. To enable it, add cancellation_type=CancellationType.EDGE parameter to cancellable decorator.

Warning

Currently edge cancellation is supported only for Python 3.11+ because it uses asyncio.TaskGroup

Example:

from sqlalchemy.ext.asyncio import AsyncSession
from taskiq_cancellation import CancellationType

@broker.task
@cancellation_backend.cancellable(cancellation_type=CancellationType.EDGE)
async def sleep(seconds: int):
    session = AsyncSession(engine)

    try:
        async with session.begin():
            await asyncio.sleep(seconds)
            session.add(SleptFor(seconds))
    except asyncio.CancelledError:
        # Won't raise cancelled exception
        await session.close()
        raise

Development

For linting, ruff is used

ruff check
ruff format

For testing, pytest is used

pytest tests/unit # Unit tests

# Integration tests
docker compose -f docker-compose-tests.yml up --wait
pytest tests/integration

Contributing

If you have any issues with this package or have an idea for improvement, please don't hesitate to open an issue! This is my first open-source project so I would like to ask to be a little patient with me though 🙏

About

Task cancellation for taskiq

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Contributors

Languages