Skip to content

Commit d631bbf

Browse files
committed
Add task enqueued and finished signals
1 parent c7d1e16 commit d631bbf

File tree

13 files changed

+147
-60
lines changed

13 files changed

+147
-60
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,15 @@ assert default_task_backend.supports_get_result
231231

232232
This is particularly useful in combination with Django's [system check framework](https://docs.djangoproject.com/en/stable/topics/checks/).
233233

234+
### Signals
235+
236+
A few [Signals](https://docs.djangoproject.com/en/stable/topics/signals/) are provided to more easily respond to certain task events.
237+
238+
Whilst signals are available, they may not be the most maintainable approach.
239+
240+
- `django_tasks.signals.task_enqueued`: Called when a task is enqueued. The sender is the backend class. Also called with the enqueued `task_result`.
241+
- `django_tasks.signals.task_finished`: Called when a task finishes (`COMPLETE` or `FAILED`). The sender is the backend class. Also called with the finished `task_result`.
242+
234243
## Contributing
235244

236245
See [CONTRIBUTING.md](./CONTRIBUTING.md) for information on how to contribute.

django_tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
DEFAULT_TASK_BACKEND_ALIAS,
1717
ResultStatus,
1818
Task,
19+
TaskResult,
1920
task,
2021
)
2122

@@ -28,6 +29,7 @@
2829
"task",
2930
"ResultStatus",
3031
"Task",
32+
"TaskResult",
3133
]
3234

3335

django_tasks/backends/database/backend.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from django_tasks.backends.base import BaseTaskBackend
1212
from django_tasks.exceptions import ResultDoesNotExist
13+
from django_tasks.signals import task_enqueued
1314
from django_tasks.task import Task
1415
from django_tasks.task import TaskResult as BaseTaskResult
1516
from django_tasks.utils import json_normalize
@@ -52,10 +53,14 @@ def enqueue(
5253

5354
db_result = self._task_to_db_task(task, args, kwargs)
5455

56+
def save_result() -> None:
57+
db_result.save()
58+
task_enqueued.send(type(self), task_result=db_result.task_result)
59+
5560
if self._get_enqueue_on_commit_for_task(task):
56-
transaction.on_commit(db_result.save)
61+
transaction.on_commit(save_result)
5762
else:
58-
db_result.save()
63+
save_result()
5964

6065
return db_result.task_result
6166

django_tasks/backends/database/management/commands/db_worker.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from types import FrameType
99
from typing import List, Optional
1010

11+
from django.core.exceptions import SuspiciousOperation
1112
from django.core.management.base import BaseCommand
1213
from django.db import connections
1314
from django.db.utils import OperationalError
@@ -17,8 +18,10 @@
1718
from django_tasks.backends.database.models import DBTaskResult
1819
from django_tasks.backends.database.utils import exclusive_transaction
1920
from django_tasks.exceptions import InvalidTaskBackendError
20-
from django_tasks.task import DEFAULT_QUEUE_NAME, ResultStatus
21+
from django_tasks.signals import task_finished
22+
from django_tasks.task import DEFAULT_QUEUE_NAME
2123

24+
package_logger = logging.getLogger("django_tasks")
2225
logger = logging.getLogger("django_tasks.backends.database.db_worker")
2326

2427

@@ -124,28 +127,28 @@ def run_task(self, db_task_result: DBTaskResult) -> None:
124127
"Task id=%s path=%s state=%s",
125128
db_task_result.id,
126129
db_task_result.task_path,
127-
ResultStatus.RUNNING,
130+
task_result.status,
128131
)
129132
return_value = task.call(*task_result.args, **task_result.kwargs)
130133

131134
# Setting the return and success value inside the error handling,
132135
# So errors setting it (eg JSON encode) can still be recorded
133136
db_task_result.set_complete(return_value)
134-
logger.info(
135-
"Task id=%s path=%s state=%s",
136-
db_task_result.id,
137-
db_task_result.task_path,
138-
ResultStatus.COMPLETE,
137+
task_finished.send(
138+
sender=type(task.get_backend()), task_result=db_task_result.task_result
139139
)
140140
except BaseException as e:
141-
# Use `.exception` to integrate with error monitoring tools (eg Sentry)
142-
logger.exception(
143-
"Task id=%s path=%s state=%s",
144-
db_task_result.id,
145-
db_task_result.task_path,
146-
ResultStatus.FAILED,
147-
)
148141
db_task_result.set_failed(e)
142+
try:
143+
sender = type(db_task_result.task.get_backend())
144+
task_result = db_task_result.task_result
145+
except (ModuleNotFoundError, SuspiciousOperation):
146+
logger.exception("Task id=%s failed unexpectedly", db_task_result.id)
147+
else:
148+
task_finished.send(
149+
sender=sender,
150+
task_result=task_result,
151+
)
149152

150153
# If the user tried to terminate, let them
151154
if isinstance(e, KeyboardInterrupt):
@@ -205,18 +208,18 @@ def add_arguments(self, parser: ArgumentParser) -> None:
205208

206209
def configure_logging(self, verbosity: int) -> None:
207210
if verbosity == 0:
208-
logger.setLevel(logging.CRITICAL)
211+
package_logger.setLevel(logging.CRITICAL)
209212
elif verbosity == 1:
210-
logger.setLevel(logging.WARNING)
213+
package_logger.setLevel(logging.WARNING)
211214
elif verbosity == 2:
212-
logger.setLevel(logging.INFO)
215+
package_logger.setLevel(logging.INFO)
213216
else:
214-
logger.setLevel(logging.DEBUG)
217+
package_logger.setLevel(logging.DEBUG)
215218

216219
# If no handler is configured, the logs won't show,
217220
# regardless of the set level.
218-
if not logger.hasHandlers():
219-
logger.addHandler(logging.StreamHandler(self.stdout))
221+
if not package_logger.hasHandlers():
222+
package_logger.addHandler(logging.StreamHandler(self.stdout))
220223

221224
def handle(
222225
self,

django_tasks/backends/dummy.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing_extensions import ParamSpec
99

1010
from django_tasks.exceptions import ResultDoesNotExist
11+
from django_tasks.signals import task_enqueued
1112
from django_tasks.task import ResultStatus, Task, TaskResult
1213
from django_tasks.utils import json_normalize
1314

@@ -27,6 +28,11 @@ def __init__(self, options: dict) -> None:
2728

2829
self.results = []
2930

31+
def _store_result(self, result: TaskResult) -> None:
32+
object.__setattr__(result, "enqueued_at", timezone.now())
33+
self.results.append(result)
34+
task_enqueued.send(type(self), task_result=result)
35+
3036
def enqueue(
3137
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
3238
) -> TaskResult[T]:
@@ -36,7 +42,7 @@ def enqueue(
3642
task=task,
3743
id=str(uuid4()),
3844
status=ResultStatus.NEW,
39-
enqueued_at=timezone.now(),
45+
enqueued_at=None,
4046
started_at=None,
4147
finished_at=None,
4248
args=json_normalize(args),
@@ -45,12 +51,12 @@ def enqueue(
4551
)
4652

4753
if self._get_enqueue_on_commit_for_task(task) is not False:
48-
# Copy the task to prevent mutation issues
49-
transaction.on_commit(partial(self.results.append, deepcopy(result)))
54+
transaction.on_commit(partial(self._store_result, result))
5055
else:
51-
self.results.append(deepcopy(result))
56+
self._store_result(result)
5257

53-
return result
58+
# Copy the task to prevent mutation issues
59+
return deepcopy(result)
5460

5561
# We don't set `supports_get_result` as the results are scoped to the current thread
5662
def get_result(self, result_id: str) -> TaskResult:

django_tasks/backends/immediate.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from django.utils import timezone
1010
from typing_extensions import ParamSpec
1111

12+
from django_tasks.signals import task_enqueued, task_finished
1213
from django_tasks.task import ResultStatus, Task, TaskResult
1314
from django_tasks.utils import exception_to_dict, json_normalize
1415

@@ -28,6 +29,9 @@ def _execute_task(self, task_result: TaskResult) -> None:
2829
"""
2930
Execute the task for the given `TaskResult`, mutating it with the outcome
3031
"""
32+
object.__setattr__(task_result, "enqueued_at", timezone.now())
33+
task_enqueued.send(type(self), task_result=task_result)
34+
3135
task = task_result.task
3236

3337
calling_task_func = (
@@ -44,28 +48,25 @@ def _execute_task(self, task_result: TaskResult) -> None:
4448
),
4549
)
4650
except BaseException as e:
51+
# If the user tried to terminate, let them
52+
if isinstance(e, KeyboardInterrupt):
53+
raise
54+
4755
object.__setattr__(task_result, "finished_at", timezone.now())
4856
try:
4957
object.__setattr__(task_result, "_exception_data", exception_to_dict(e))
5058
except Exception:
5159
logger.exception("Task id=%s unable to save exception", task_result.id)
5260

53-
# Use `.exception` to integrate with error monitoring tools (eg Sentry)
54-
logger.exception(
55-
"Task id=%s path=%s state=%s",
56-
task_result.id,
57-
task.module_path,
58-
ResultStatus.FAILED,
59-
)
6061
object.__setattr__(task_result, "status", ResultStatus.FAILED)
6162

62-
# If the user tried to terminate, let them
63-
if isinstance(e, KeyboardInterrupt):
64-
raise
63+
task_finished.send(type(self), task_result=task_result)
6564
else:
6665
object.__setattr__(task_result, "finished_at", timezone.now())
6766
object.__setattr__(task_result, "status", ResultStatus.COMPLETE)
6867

68+
task_finished.send(type(self), task_result=task_result)
69+
6970
def enqueue(
7071
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
7172
) -> TaskResult[T]:
@@ -75,7 +76,7 @@ def enqueue(
7576
task=task,
7677
id=str(uuid4()),
7778
status=ResultStatus.NEW,
78-
enqueued_at=timezone.now(),
79+
enqueued_at=None,
7980
started_at=None,
8081
finished_at=None,
8182
args=json_normalize(args),

django_tasks/signal_handlers.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
1+
import logging
2+
from typing import Type
3+
14
from asgiref.local import Local
25
from django.core.signals import setting_changed
36
from django.dispatch import receiver
47

8+
from django_tasks import BaseTaskBackend, ResultStatus, TaskResult
9+
10+
from .signals import task_enqueued, task_finished
11+
12+
logger = logging.getLogger("django_tasks")
13+
514

615
@receiver(setting_changed)
716
def clear_tasks_handlers(*, setting: str, **kwargs: dict) -> None:
@@ -13,3 +22,33 @@ def clear_tasks_handlers(*, setting: str, **kwargs: dict) -> None:
1322

1423
tasks._settings = tasks.settings = tasks.configure_settings(None) # type:ignore[attr-defined]
1524
tasks._connections = Local() # type:ignore[attr-defined]
25+
26+
27+
@receiver(task_enqueued)
28+
def log_task_enqueued(
29+
sender: Type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
30+
) -> None:
31+
logger.debug(
32+
"Task id=%s path=%s enqueued backend=%s",
33+
task_result.id,
34+
task_result.task.module_path,
35+
task_result.backend,
36+
)
37+
38+
39+
@receiver(task_finished)
40+
def log_task_finished(
41+
sender: Type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
42+
) -> None:
43+
if task_result.status == ResultStatus.FAILED:
44+
# Use `.exception` to integrate with error monitoring tools (eg Sentry)
45+
log_method = logger.exception
46+
else:
47+
log_method = logger.info
48+
49+
log_method(
50+
"Task id=%s path=%s state=%s",
51+
task_result.id,
52+
task_result.task.module_path,
53+
task_result.status,
54+
)

django_tasks/signals.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from django.dispatch import Signal
2+
3+
task_enqueued = Signal()
4+
task_finished = Signal()

django_tasks/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ class TaskResult(Generic[T]):
229229
status: ResultStatus
230230
"""The status of the running task"""
231231

232-
enqueued_at: datetime
232+
enqueued_at: Optional[datetime]
233233
"""The time this task was enqueued"""
234234

235235
started_at: Optional[datetime]

tests/tests/test_database_backend.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import logging
23
import uuid
34
from contextlib import redirect_stderr
45
from datetime import timedelta
@@ -19,9 +20,6 @@
1920

2021
from django_tasks import ResultStatus, Task, default_task_backend, tasks
2122
from django_tasks.backends.database import DatabaseBackend
22-
from django_tasks.backends.database.management.commands.db_worker import (
23-
logger as db_worker_logger,
24-
)
2523
from django_tasks.backends.database.management.commands.prune_db_task_results import (
2624
logger as prune_db_tasks_logger,
2725
)
@@ -330,6 +328,14 @@ def test_task_specific_enqueue_on_commit(self) -> None:
330328
)
331329
)
332330

331+
def test_enqueue_logs(self) -> None:
332+
with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
333+
result = test_tasks.noop_task.enqueue()
334+
335+
self.assertEqual(len(captured_logs.output), 1)
336+
self.assertIn("enqueued", captured_logs.output[0])
337+
self.assertIn(result.id, captured_logs.output[0])
338+
333339

334340
@override_settings(
335341
TASKS={
@@ -344,9 +350,11 @@ class DatabaseBackendWorkerTestCase(TransactionTestCase):
344350
run_worker = partial(call_command, "db_worker", verbosity=0, batch=True, interval=0)
345351

346352
def tearDown(self) -> None:
353+
logger = logging.getLogger("django_tasks")
354+
347355
# Reset the logger after every run, to ensure the correct `stdout` is used
348-
for handler in db_worker_logger.handlers:
349-
db_worker_logger.removeHandler(handler)
356+
for handler in logger.handlers:
357+
logger.removeHandler(handler)
350358

351359
def test_run_enqueued_task(self) -> None:
352360
for task in [
@@ -366,7 +374,7 @@ def test_run_enqueued_task(self) -> None:
366374
result.refresh()
367375
self.assertIsNotNone(result.started_at)
368376
self.assertIsNotNone(result.finished_at)
369-
self.assertGreaterEqual(result.started_at, result.enqueued_at) # type:ignore[arg-type]
377+
self.assertGreaterEqual(result.started_at, result.enqueued_at) # type:ignore[arg-type,misc]
370378
self.assertGreaterEqual(result.finished_at, result.started_at) # type:ignore[arg-type,misc]
371379
self.assertEqual(result.status, ResultStatus.COMPLETE)
372380

0 commit comments

Comments
 (0)