Skip to content

Commit 5fd2509

Browse files
committed
Add initial support for a celery backend
1 parent ae77a27 commit 5fd2509

File tree

4 files changed

+68
-0
lines changed

4 files changed

+68
-0
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .backend import CeleryBackend
2+
3+
__all__ = ["CeleryBackend"]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import os
2+
3+
from celery import Celery
4+
5+
from django_tasks.task import DEFAULT_QUEUE_NAME
6+
7+
8+
# Set the default Django settings module for the 'celery' program.
9+
django_settings = os.environ.get('DJANGO_SETTINGS_MODULE')
10+
if django_settings is None:
11+
raise ValueError('DJANGO_SETTINGS_MODULE environment variable is not set')
12+
13+
app = Celery('django_tasks')
14+
15+
# Using a string here means the worker doesn't have to serialize
16+
# the configuration object to child processes.
17+
# - namespace='CELERY' means all celery-related configuration keys
18+
# should have a `CELERY_` prefix.
19+
app.config_from_object('django.conf:settings', namespace='CELERY')
20+
21+
app.conf.task_default_queue = DEFAULT_QUEUE_NAME
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from typing import TypeVar
2+
3+
from typing_extensions import ParamSpec
4+
5+
from celery import shared_task
6+
from celery.local import Proxy as CeleryTaskProxy
7+
from django_tasks.backends.base import BaseTaskBackend
8+
from django_tasks.task import Task, TaskResult
9+
10+
11+
T = TypeVar("T")
12+
P = ParamSpec("P")
13+
14+
15+
class CeleryTask(Task):
16+
17+
celery_task: CeleryTaskProxy
18+
"""Celery proxy to the task in the current celery app task registry."""
19+
20+
def __post_init__(self) -> None:
21+
# TODO: allow passing extra celery specific parameters?
22+
celery_task = shared_task()(self.func)
23+
self.celery_task = celery_task
24+
return super().__post_init__()
25+
26+
27+
class CeleryBackend(BaseTaskBackend):
28+
task_class = CeleryTask
29+
supports_defer = True
30+
31+
def enqueue(
32+
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
33+
) -> TaskResult[T]:
34+
self.validate_task(task)
35+
36+
apply_async_kwargs = {
37+
"eta": task.run_after,
38+
}
39+
if task.queue_name:
40+
apply_async_kwargs["queue"] = task.queue_name
41+
if task.priority:
42+
apply_async_kwargs["priority"] = task.priority
43+
task.celery_task.apply_async(args, kwargs=kwargs, **apply_async_kwargs)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ dev = [
5454
"coverage",
5555
"django-stubs[compatible-mypy]",
5656
"dj-database-url",
57+
"celery",
5758
]
5859
mysql = [
5960
"mysqlclient"

0 commit comments

Comments
 (0)