Skip to content

Commit f9c6621

Browse files
Test and improve concurrency (#69)
Fixes #33 Also fixes and a potential deadlocking issue.
1 parent 41385f7 commit f9c6621

File tree

9 files changed

+336
-34
lines changed

9 files changed

+336
-34
lines changed

README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,6 @@ TASKS = {
114114

115115
Finally, you can run `manage.py db_worker` to run tasks as they're created. Check the `--help` for more options.
116116

117-
> [!CAUTION]
118-
> The database backend does not work with SQLite when you are running multiple worker processes - tasks may be executed more than once. See [#33](https://github.com/RealOrangeOne/django-tasks/issues/33).
119-
120117
### Retrieving task result
121118

122119
When enqueueing a task, you get a `TaskResult`, however it may be useful to retrieve said result from somewhere else (another request, another task etc). This can be done with `get_result` (or `aget_result`):

django_tasks/backends/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from abc import ABCMeta, abstractmethod
22
from inspect import iscoroutinefunction
3-
from typing import Any, List, TypeVar
3+
from typing import Any, Iterable, TypeVar
44

55
from asgiref.sync import sync_to_async
66
from django.core.checks.messages import CheckMessage
@@ -101,7 +101,7 @@ async def aget_result(self, result_id: str) -> TaskResult:
101101
result_id=result_id
102102
)
103103

104-
def check(self, **kwargs: Any) -> List[CheckMessage]:
104+
def check(self, **kwargs: Any) -> Iterable[CheckMessage]:
105105
raise NotImplementedError(
106106
"subclasses may provide a check() method to verify that task "
107107
"backend is configured correctly."

django_tasks/backends/database/backend.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from dataclasses import dataclass
2-
from typing import TYPE_CHECKING, Any, List, TypeVar
2+
from typing import TYPE_CHECKING, Any, Iterable, TypeVar
33

44
from django.apps import apps
55
from django.core.checks import ERROR, CheckMessage
66
from django.core.exceptions import ValidationError
7+
from django.db import connections, router
78
from typing_extensions import ParamSpec
89

910
from django_tasks.backends.base import BaseTaskBackend
@@ -81,16 +82,26 @@ async def aget_result(self, result_id: str) -> TaskResult:
8182
except (DBTaskResult.DoesNotExist, ValidationError) as e:
8283
raise ResultDoesNotExist(result_id) from e
8384

84-
def check(self, **kwargs: Any) -> List[CheckMessage]:
85-
if not apps.is_installed("django_tasks.backends.database"):
86-
backend_name = self.__class__.__name__
85+
def check(self, **kwargs: Any) -> Iterable[CheckMessage]:
86+
from .models import DBTaskResult
8787

88-
return [
89-
CheckMessage(
90-
ERROR,
91-
f"{backend_name} configured as django_tasks backend, but database app not installed",
92-
"Insert 'django_tasks.backends.database' in INSTALLED_APPS",
93-
)
94-
]
88+
backend_name = self.__class__.__name__
9589

96-
return []
90+
if not apps.is_installed("django_tasks.backends.database"):
91+
yield CheckMessage(
92+
ERROR,
93+
f"{backend_name} configured as django_tasks backend, but database app not installed",
94+
"Insert 'django_tasks.backends.database' in INSTALLED_APPS",
95+
)
96+
97+
db_connection = connections[router.db_for_read(DBTaskResult)]
98+
if (
99+
db_connection.vendor == "sqlite"
100+
and hasattr(db_connection, "transaction_mode")
101+
and db_connection.transaction_mode != "EXCLUSIVE"
102+
):
103+
yield CheckMessage(
104+
ERROR,
105+
f"{backend_name} is using SQLite non-exclusive transactions",
106+
f"Set settings.DATABASES[{db_connection.alias!r}]['OPTIONS']['transaction_mode'] to 'EXCLUSIVE'",
107+
)

django_tasks/backends/database/management/commands/db_worker.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
from typing import List, Optional
88

99
from django.core.management.base import BaseCommand
10-
from django.db import transaction
10+
from django.db.utils import OperationalError
1111

1212
from django_tasks import DEFAULT_TASK_BACKEND_ALIAS, tasks
1313
from django_tasks.backends.database.backend import DatabaseBackend
1414
from django_tasks.backends.database.models import DBTaskResult
15+
from django_tasks.backends.database.utils import exclusive_transaction
1516
from django_tasks.exceptions import InvalidTaskBackendError
1617
from django_tasks.task import DEFAULT_QUEUE_NAME, ResultStatus
1718

@@ -68,8 +69,16 @@ def start(self) -> None:
6869

6970
# During this transaction, all "ready" tasks are locked. Therefore, it's important
7071
# it be as efficient as possible.
71-
with transaction.atomic():
72-
task_result = tasks.get_locked()
72+
with exclusive_transaction(tasks.db):
73+
try:
74+
task_result = tasks.get_locked()
75+
except OperationalError as e:
76+
# Ignore locked databases and keep trying.
77+
# It should unlock eventually.
78+
if "database is locked" in e.args[0]:
79+
task_result = None
80+
else:
81+
raise
7382

7483
if task_result is not None:
7584
# "claim" the task, so it isn't run by another worker process

django_tasks/backends/database/models.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
)
2121
from django_tasks.utils import exception_to_dict, retry
2222

23+
from .utils import normalize_uuid
24+
2325
logger = logging.getLogger("django_tasks.backends.database")
2426

2527
T = TypeVar("T")
@@ -62,7 +64,7 @@ def get_locked(self) -> Optional["DBTaskResult"]:
6264
"""
6365
Get a job, locking the row and accounting for deadlocks.
6466
"""
65-
return self.select_for_update().first()
67+
return self.select_for_update(skip_locked=True).first()
6668

6769

6870
class DBTaskResult(GenericBase[P, T], models.Model):
@@ -127,7 +129,7 @@ def task_result(self) -> "TaskResult[T]":
127129
result = TaskResult[T](
128130
db_result=self,
129131
task=self.task,
130-
id=str(self.id),
132+
id=normalize_uuid(self.id),
131133
status=ResultStatus[self.status],
132134
enqueued_at=self.enqueued_at,
133135
started_at=self.started_at,
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from contextlib import contextmanager
2+
from typing import Any, Generator, Optional, Union
3+
from uuid import UUID
4+
5+
from django.db import transaction
6+
7+
8+
@contextmanager
9+
def exclusive_transaction(using: Optional[str] = None) -> Generator[Any, Any, Any]:
10+
"""
11+
Wrapper around `transaction.atomic` which ensures transactions on SQLite are exclusive.
12+
13+
This functionality is built-in to Django 5.1+.
14+
"""
15+
connection = transaction.get_connection(using)
16+
17+
if (
18+
connection.vendor == "sqlite"
19+
and getattr(connection, "transaction_mode", None) != "EXCLUSIVE"
20+
):
21+
with connection.cursor() as c:
22+
c.execute("BEGIN EXCLUSIVE")
23+
try:
24+
yield
25+
finally:
26+
c.execute("COMMIT")
27+
else:
28+
with transaction.atomic(using=using):
29+
yield
30+
31+
32+
def normalize_uuid(val: Union[str, UUID]) -> str:
33+
"""
34+
Normalize a UUID into its dashed representation.
35+
36+
This works around engines like MySQL which don't store values in a uuid field,
37+
and thus drops the dashes.
38+
"""
39+
if isinstance(val, str):
40+
val = UUID(val)
41+
42+
return str(val)

django_tasks/checks.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, List, Sequence
1+
from typing import Any, Iterable, Sequence
22

33
from django.apps.config import AppConfig
44
from django.core.checks.messages import CheckMessage
@@ -8,16 +8,11 @@
88

99
def check_tasks(
1010
app_configs: Sequence[AppConfig] = None, **kwargs: Any
11-
) -> List[CheckMessage]:
11+
) -> Iterable[CheckMessage]:
1212
"""Checks all registered task backends."""
1313

14-
errors = []
1514
for backend in tasks.all():
1615
try:
17-
backend_errors = backend.check()
16+
yield from backend.check()
1817
except NotImplementedError:
1918
pass
20-
else:
21-
errors.extend(backend_errors)
22-
23-
return errors

tests/settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
77

8-
IN_TEST = sys.argv[0] == "test"
8+
IN_TEST = sys.argv[1] == "test"
99

1010
ALLOWED_HOSTS = ["*"]
1111

0 commit comments

Comments
 (0)