Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ff8167a
feat: use one listener queue per notifier
ACherryJam Nov 6, 2025
bb09c69
docs,chore: basic docs and ruff format
ACherryJam Nov 6, 2025
414ef20
chore: restructure folders by interfaces
ACherryJam Nov 7, 2025
7d2a95b
feat: null and in memory state holders and notifiers
ACherryJam Nov 7, 2025
7918d73
feat: startup and shutdown in state holders
ACherryJam Nov 7, 2025
d4dbe97
fix: call notifier and state holder startup/shutdown in modular backend
ACherryJam Nov 7, 2025
61ae28a
test: cancellable task tests aka baby's first tests
ACherryJam Nov 7, 2025
1bebc5a
ci: linting and testing CI aka baby's first CI
ACherryJam Nov 7, 2025
b8a46c0
fix!: add typing_extensions to support Python 3.9+
ACherryJam Nov 7, 2025
9a4ed4f
Merge pull request #1 from ACherryJam/notifier_rewrite
ACherryJam Nov 7, 2025
2a65c42
feat: support level task cancellation (asyncio)
ACherryJam Nov 8, 2025
93ab009
test: level and edge cancellation behaviour tests
ACherryJam Nov 8, 2025
c9eb438
feat: allow cancellable decorator to omit parentesis
ACherryJam Nov 8, 2025
69746a9
fix: allow direct function calls
ACherryJam Nov 8, 2025
9e46f4a
test: test cancellable w/o parentesis
ACherryJam Nov 8, 2025
3162c18
fix,test: remove deadlock from waiting for task to start
ACherryJam Nov 8, 2025
acf3724
fix: await for task when direct calling
ACherryJam Nov 8, 2025
6d990f4
ci: run tests on windows and macos (just like taskiq)
ACherryJam Nov 8, 2025
9b6df4f
fix: add async_timeout for Python <3.11
ACherryJam Nov 8, 2025
253cf73
chore: ruff check fixes and ruff format
ACherryJam Nov 8, 2025
fda260c
fix: adapt counter example to new module structure
ACherryJam Nov 8, 2025
4581f46
fix: a lot of things
ACherryJam Nov 9, 2025
c7d06ec
feat: actually working type hinting
ACherryJam Nov 10, 2025
bc84268
test: add sync function cancellable test and docstrings
ACherryJam Nov 10, 2025
5352d11
fix: close connections in redis and aiopika integrations (how did I m…
ACherryJam Nov 10, 2025
9b1d9f7
docs: more docstrings
ACherryJam Nov 10, 2025
d6adb15
test: intergration tests for redis and aiopika
ACherryJam Nov 11, 2025
3ba287f
feat: better serializer api and redis/aiopika notifier inits
ACherryJam Nov 11, 2025
b2cc9a7
Merge pull request #2 from ACherryJam/feature/level_cancellation
ACherryJam Nov 11, 2025
185d6c0
feat: more imports in inits
ACherryJam Nov 11, 2025
f3a6d4e
docs: project's readme
ACherryJam Nov 11, 2025
e66cc49
docs: add readme for counter example
ACherryJam Nov 11, 2025
02447f0
docs: add note about retry middlewares and task cancellation
ACherryJam Nov 11, 2025
4305e39
ci: release on pypi on github release
ACherryJam Nov 11, 2025
3b62529
chore: decrease version, actually
ACherryJam Nov 11, 2025
42a907a
fix: include only src in sdist
ACherryJam Nov 11, 2025
d0a1d3f
fix: tell mypy we're typed
ACherryJam Nov 11, 2025
19d1534
Merge pull request #3 from ACherryJam/final_stretch
ACherryJam Nov 11, 2025
64e2b77
ci: lint on pull request to main
ACherryJam Nov 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Lint

on:
pull_request:
branches: [develop, main]

jobs:
lint:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v5

- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: 3.14

- name: Setup uv
uses: astral-sh/setup-uv@v7

- name: Create virtual environment
run: uv venv .venv && source .venv/bin/activate

- name: Install modules
run: uv sync

- name: Check code style
run: uv run ruff check

- name: Check static typing
run: uv run mypy .
28 changes: 28 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Release on PyPI

on:
release:
types:
- released

jobs:
publish:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v5

- name: Setup Python 3.14
uses: actions/setup-python@v6
with:
python-version: 3.14

- name: Setup uv
uses: astral-sh/setup-uv@v6

- name: Build package
run: uv build

- name: Publish package
run: uv publish
65 changes: 65 additions & 0 deletions .github/workflows/run_tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: Testing

on:
pull_request:
branches: [develop, main]

jobs:
run-unit-tests:
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
os: [ubuntu-latest, windows-latest, macos-latest]
fail-fast: false

runs-on: ${{ matrix.os }}

steps:
- name: Checkout
uses: actions/checkout@v5

- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}

- name: Setup uv
uses: astral-sh/setup-uv@v7

# - name: Create virtual environment
# run: uv venv .venv && source .venv/bin/activate

- name: Install modules
run: uv sync

- name: Run unit tests for Python ${{ matrix.python-version }}
run: uv run pytest tests/unit

run-integration-tests:
runs-on: ubuntu-latest

strategy:
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
fail-fast: false

steps:
- name: Checkout
uses: actions/checkout@v5

- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}

- name: Setup uv
uses: astral-sh/setup-uv@v7

- name: Install modules
run: uv sync --extra redis --extra aiopika

- name: Setup Docker containers
run: docker compose -f ./docker-compose-tests.yml up --wait

- name: Run integration tests for Python ${{ matrix.python-version }}
run: uv run pytest tests/integration
168 changes: 164 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,172 @@
[![PyPI - Version](https://img.shields.io/pypi/v/taskiq-cancellation.svg)](https://pypi.org/project/taskiq-cancellation)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/taskiq-cancellation.svg)](https://pypi.org/project/taskiq-cancellation)
<div align="center">
<img src="imgs/header.png" width="70%" alt="taskiq-cancellation logo">
</div>

# Task cancellation for taskiq
[![PyPI - Version](https://img.shields.io/pypi/v/taskiq-cancellation.svg?style=for-the-badge)](https://pypi.org/project/taskiq-cancellation)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/taskiq-cancellation.svg?style=for-the-badge)](https://pypi.org/project/taskiq-cancellation)

**[taskiq-cancellation](https://pypi.org/project/taskiq-cancellation)** aims to be a drop-in task cancellation solution for taskiq as the original package doesn't provide a cancellation API.

## Contents:

- [Installation](#installation)
- [Usage](#usage)
- [What is a cancellation backend?](#what-is-a-cancellation-backend)
- [Modular cancellation backend](#modular-cancellation-backend)
- [Available integrations](#available-integrations)
- [Level and edge cancellation](#level-and-edge-cancellation)
- [Retry middlewares with task cancellation](#retry-middlewares-with-task-cancellation)
- [Development](#development)
- [Contributing](#contributing)

## Installation

```console
This package can be install from PyPI with your package manager of choice.

```bash
pip install taskiq-cancellation
pipx install taskiq-cancellation
poetry add taskiq-cancellation
uv add taskiq-cancellation
```

taskiq-cancellation currently provides integrations with Redis and RabbitMQ that are installable with `redis` and `aiopika` extras respectfully.

```bash
pip install taskiq-cancellation[redis,aiopika]
```

## Usage

To do task cancellation, you need to:

1. Create a cancellation backend
2. Wrap a function with `cancellable` decorator
3. Cancel the task with `cancel(task_id)`

```python
broker = PubSubBroker(url).with_result_backend(RedisAsyncResultBackend(url))
cancellation_backend = RedisCancellationBackend(url).with_broker(broker)

@broker.task
@cancellation_backend.cancellable
async def sleep(seconds: int):
await asyncio.sleep(seconds)
print("Slept!") # Won't be printed on worker side because of the cancellation

async def main():
await broker.startup()

task = await sleep.kiq(5)
await cancellation_backend.cancel(task.task_id)

await broker.shutdown()

asyncio.run(main())
```

### What is a cancellation backend?

**Cancellation backend** can be seen as combination of a broker and result backend for cancellation messages that works underneath taskiq's broker. Cancellation backend won't run tasks marked as cancelled and will listen for cancellation messages for already running tasks.

<div align="center">
<img src="imgs/backend_scheme.png" width="70%" alt="Cancellation backend example scheme">
</div>

### Modular cancellation backend

To easily create cancellation backends taskiq-cancellation provides `ModularCancellationBackend`. Modular cancellation backend consists of two parts: state holder and notifier.

- State holder is used to check for task cancellation status before running the task.
- Notifier is used to listen for cancellation messages while running the task

This allows to use any techonology for task cancellation. For example, if one uses SQL database and RabbitMQ message broker, they can make a custom state holder with SQL library of their choice and use provided RabbitMQ notifier.

```python
from taskiq_cancellation import ModularCancellationBackend
from taskiq_cancellation.state_holders.redis import RedisCancellationStateHolder
from taskiq_cancellation.notifiers.aiopika import AioPikaCancellationNotifier

backend = ModularCancellationBackend(
RedisCancellationStateHolder("redis://localhost:6379"),
AioPikaCancellationNotifier("amqp://guest:guest@localhost:5672")
)
```

### Available integrations

taskiq-cancellation provides:

- state holder for Redis (`RedisCancellationStateHolder`)
- notifiers for Redis pub/sub (`PubSubCancellationNotifier`) and RabbitMQ (`AioPikaCancellationNotifier`)

Also there are `NullCancellationStateHolder` and `NullCancellationNotifier` that do absolutely nothing, if there's no need to not check for task cancellation before starting the task or no need to listen for cancellation of already running tasks.

### Level and edge cancellation

By default, taskiq-cancellation uses [`anyio`](https://anyio.readthedocs.io/en/stable/) and its [level cancellation](anyio.readthedocs.io/en/stable/cancellation.html#differences-between-asyncio-and-anyio-cancellation-semantics). Level cancellation raises a cancellation exception on **every** asynchronous wait in a function.

As external libraries might not support level cancellation, task-cancellation also provides [edge cancellation]() via `asyncio`. Edge cancellation raises an exception only _once_. To enable it, add `cancellation_type=CancellationType.EDGE` parameter to `cancellable` decorator.

> [!WARNING]
> Currently edge cancellation is supported only for Python 3.11+ because it uses [`asyncio.TaskGroup`](https://docs.python.org/3/library/asyncio-task.html#asyncio.TaskGroup)

Example:

```python
from sqlalchemy.ext.asyncio import AsyncSession
from taskiq_cancellation import CancellationType

@broker.task
@cancellation_backend.cancellable(cancellation_type=CancellationType.EDGE)
async def sleep(seconds: int):
session = AsyncSession(engine)

try:
async with session.begin():
await asyncio.sleep(seconds)
session.add(SleptFor(seconds))
except asyncio.CancelledError:
# Won't raise cancelled exception
await session.close()
raise
```

### Retry middlewares with task cancellation

If you use `SimpleRetryMiddleware` or `SmartRetryMiddleware`, make sure to add `TaskCancellationException` to `types_of_exceptions` parameter to not trigger additional retries.

```python
from taskiq_cancellation.exceptions import TaskCancellationException

broker = PubSubBroker(url)
.with_result_backend(RedisAsyncResultBackend(url))
.with_middlewares(
SimpleRetryMiddleware(
types_of_exceptions=[TaskCancellationException, ]
)
)
```

## Development

For linting, ruff is used

```bash
ruff check
ruff format
```

For testing, pytest is used

```bash
pytest tests/unit # Unit tests

# Integration tests
docker compose -f docker-compose-tests.yml up --wait
pytest tests/integration
```

## Contributing

If you have any issues with this package or have an idea for improvement, please don't hesitate to open an issue! This is my first open-source project so I would like to ask to be a little patient with me though 🙏
20 changes: 20 additions & 0 deletions docker-compose-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
services:
redis:
image: redis:latest
ports:
- "6379:6379"

rabbitmq:
image: rabbitmq:latest
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASSWORD=guest
hostname: localhost
ports:
- "5672:5672"
healthcheck:
test: "rabbitmq-diagnostics check_running -q"
interval: 5s
timeout: 5s
retries: 10
start_period: 5s
63 changes: 63 additions & 0 deletions examples/counter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Example: Counter

This example demonstrates use of Redis cancellation backend with Redis broker.

`sleep(seconds)` task sleeps for `seconds` seconds. First task will finish fully after 5 seconds and second task will cancel after 2.5 seconds.

## How to run

1. Install dependencies

```bash
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```

2. Launch redis server locally

3. Launch worker

```bash
taskiq worker main:broker --workers=1
```

4. Launch client

```bash
python main.py
```

## Expected result

Client side:

```console
Sending task and waiting 5 seconds...
Sending task and waiting 2.5 seconds...
Canceling task...
```

Worker side:

```console
[2025-11-11 22:45:12,072][taskiq.receiver.receiver][INFO ][worker-0] Executing task main:count with ID: e2acdc76da94435d963b561b80098c47
1 Mississippi
2 Mississippi
3 Mississippi
4 Mississippi
5 Mississippi
[2025-11-11 22:45:17,073][taskiq.receiver.receiver][INFO ][worker-0] Executing task main:count with ID: e094778df6de450d811c4701cfff608f
1 Mississippi
2 Mississippi
3 Mississippi
[2025-11-11 22:45:19,589][taskiq.receiver.receiver][ERROR ][worker-0] Exception found while executing function:
Traceback (most recent call last):
File "P:\Scratches\taskiq-cancellation\.venv\lib\site-packages\taskiq\receiver\receiver.py", line 254, in run_task
returned = await target_future
File "P:\Scratches\taskiq-cancellation\src\taskiq_cancellation\abc\backend.py", line 190, in wrapper
return await level_handler(*args, **kwargs)
File "P:\Scratches\taskiq-cancellation\src\taskiq_cancellation\cancellation_handlers\level.py", line 104, in __call__
raise TaskCancellationException()
taskiq_cancellation.exceptions.TaskCancellationException
```
Loading