https://gist.github.com/magameda1337/2f9ddf2863162c853081d015a1367f1a
import asyncpg
from typing import Optional
from taskiq_cancellation.abc import CancellationStateHolder
class PostgresCancellationStateHolder(CancellationStateHolder):
def __init__(self, url: str, **kwargs) -> None:
super().__init__(**kwargs)
self.url = url
self._pool: Optional[asyncpg.Pool] = None
async def _get_pool(self) -> asyncpg.Pool:
if self._pool is None:
self._pool = await asyncpg.create_pool(dsn=self.url)
return self._pool
async def cancel(self, task_id: str) -> None:
pool = await self._get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO tasks_cancellation (task_id, cancelled)
VALUES ($1, $2)
""",
task_id, True
)
async def is_cancelled(self, task_id: str) -> bool:
pool = await self._get_pool()
async with pool.acquire() as conn:
val = await conn.fetchval(
"SELECT cancelled FROM tasks_cancellation WHERE task_id = $1",
task_id
)
return bool(val)
async def shutdown(self) -> None:
if self._pool:
await self._pool.close()
self._pool = None
await super().shutdown()
I think many people will find this useful!
https://gist.github.com/magameda1337/2f9ddf2863162c853081d015a1367f1a
I think many people will find this useful!