From 0e68629c9558c54217d9d36588382520f45bcb1a Mon Sep 17 00:00:00 2001 From: Lode Rosseel Date: Wed, 10 Dec 2025 13:03:12 +0100 Subject: [PATCH 1/5] Check and drop if stale connection when saving task result in db --- django_celery_results/backends/database.py | 14 +++++++++++-- t/proj/settings.py | 4 ++++ t/unit/backends/test_database.py | 23 +++++++++++++++++++++- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/django_celery_results/backends/database.py b/django_celery_results/backends/database.py index a4e364a6..94c0db2a 100644 --- a/django_celery_results/backends/database.py +++ b/django_celery_results/backends/database.py @@ -7,7 +7,7 @@ from celery.result import GroupResult, allow_join_result, result_from_tuple from celery.utils.log import get_logger from celery.utils.serialization import b64decode, b64encode -from django.db import connection, router, transaction +from django.db import connection, connections, router, transaction from django.db.models.functions import Now from django.db.utils import InterfaceError from kombu.exceptions import DecodeError @@ -120,6 +120,17 @@ def _store_result( using=None ): """Store return value and status of an executed task.""" + + # If a task has been running long, it may have exceeded + # the max db age and/or the database connection + # may have been ended due to being idle for too long. + # As a safety, before we submit the result, + # we ensure it still has a valid connection, just like + # Django does after a request to ensure a + # clean connection for the next request. + (connections[self.TaskModel._default_manager.db] + .close_if_unusable_or_obsolete()) + content_type, content_encoding, result = self.encode_content(result) meta = { @@ -147,7 +158,6 @@ def _store_result( if status == states.STARTED: task_props['date_started'] = Now() - self.TaskModel._default_manager.store_result(**task_props) return result diff --git a/t/proj/settings.py b/t/proj/settings.py index 6e974b51..acbcf0ba 100644 --- a/t/proj/settings.py +++ b/t/proj/settings.py @@ -35,7 +35,9 @@ 'PASSWORD': os.getenv('DB_POSTGRES_PASSWORD', 'postgres'), 'OPTIONS': { 'connect_timeout': 1000, + }, + 'CONN_MAX_AGE': None, }, 'secondary': { 'ENGINE': 'django.db.backends.postgresql', @@ -50,6 +52,7 @@ 'TEST': { 'MIRROR': 'default', }, + 'CONN_MAX_AGE': None, }, 'read-only': { 'ENGINE': 'django.db.backends.postgresql', @@ -65,6 +68,7 @@ 'TEST': { 'MIRROR': 'default', }, + 'CONN_MAX_AGE': None, }, } except ImportError: diff --git a/t/unit/backends/test_database.py b/t/unit/backends/test_database.py index 8baa6cc0..8b773780 100644 --- a/t/unit/backends/test_database.py +++ b/t/unit/backends/test_database.py @@ -1,4 +1,5 @@ import datetime +import time import json import pickle import re @@ -16,6 +17,7 @@ from django_celery_results.backends.database import DatabaseBackend from django_celery_results.models import ChordCounter, TaskResult +from django.db import connections class SomeClass: @@ -24,7 +26,7 @@ def __init__(self, data): self.data = data -@pytest.mark.django_db() +@pytest.mark.django_db(transaction=True) @pytest.mark.usefixtures('depends_on_current_app') class test_DatabaseBackend: @@ -550,6 +552,25 @@ def test_backend__task_result_meta_injection(self): tr = TaskResult.objects.get(task_id=tid2) assert json.loads(tr.meta) == {'key': 'value', 'children': []} + def test_backend__task_result_closes_stale_connection(self): + tid = uuid() + request = self._create_request( + task_id=tid, + name='my_task', + args=[], + kwargs={}, + task_protocol=1, + ) + # simulate a stale connection by setting the close time + # to the current time + db_conn_wrapper = connections[self.b.TaskModel.objects.db] + db_conn_wrapper.close_at = time.monotonic() + current_db_connection = db_conn_wrapper.connection + self.b.mark_as_done(tid, None, request=request) + # Validate the connection was replaced in the process + # of saving the task + assert current_db_connection is not db_conn_wrapper.connection + def test_backend__task_result_date(self): tid2 = uuid() From 174d6defbe71c9a84cd978666ee309a54e99daae Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:54:20 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- t/unit/backends/test_database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/unit/backends/test_database.py b/t/unit/backends/test_database.py index 8b773780..9cf29fd1 100644 --- a/t/unit/backends/test_database.py +++ b/t/unit/backends/test_database.py @@ -1,8 +1,8 @@ import datetime -import time import json import pickle import re +import time from unittest import mock import celery @@ -13,11 +13,11 @@ from celery.utils.serialization import b64decode from celery.worker.request import Request from celery.worker.strategy import hybrid_to_proto2 +from django.db import connections from django.test import TransactionTestCase from django_celery_results.backends.database import DatabaseBackend from django_celery_results.models import ChordCounter, TaskResult -from django.db import connections class SomeClass: From a5159b688fc66f88fe0747b0ad5ed02e77eca275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Asif=20Saif=20Uddin=20=7B=22Auvi=22=3A=22=E0=A6=85?= =?UTF-8?q?=E0=A6=AD=E0=A6=BF=22=7D?= Date: Wed, 10 Dec 2025 19:42:39 +0600 Subject: [PATCH 3/5] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- t/proj/settings.py | 1 - 1 file changed, 1 deletion(-) diff --git a/t/proj/settings.py b/t/proj/settings.py index acbcf0ba..d347f14d 100644 --- a/t/proj/settings.py +++ b/t/proj/settings.py @@ -35,7 +35,6 @@ 'PASSWORD': os.getenv('DB_POSTGRES_PASSWORD', 'postgres'), 'OPTIONS': { 'connect_timeout': 1000, - }, 'CONN_MAX_AGE': None, }, From 16d420d63b7f2fdf0bb65d005162cd11c8d3a38f Mon Sep 17 00:00:00 2001 From: Lode Rosseel Date: Wed, 10 Dec 2025 17:36:31 +0100 Subject: [PATCH 4/5] Replace qs.db with db for write --- django_celery_results/backends/database.py | 2 +- t/unit/backends/test_database.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/django_celery_results/backends/database.py b/django_celery_results/backends/database.py index 94c0db2a..664c7120 100644 --- a/django_celery_results/backends/database.py +++ b/django_celery_results/backends/database.py @@ -128,7 +128,7 @@ def _store_result( # we ensure it still has a valid connection, just like # Django does after a request to ensure a # clean connection for the next request. - (connections[self.TaskModel._default_manager.db] + (connections[router.db_for_write(self.TaskModel)] .close_if_unusable_or_obsolete()) content_type, content_encoding, result = self.encode_content(result) diff --git a/t/unit/backends/test_database.py b/t/unit/backends/test_database.py index 9cf29fd1..d6520893 100644 --- a/t/unit/backends/test_database.py +++ b/t/unit/backends/test_database.py @@ -563,7 +563,7 @@ def test_backend__task_result_closes_stale_connection(self): ) # simulate a stale connection by setting the close time # to the current time - db_conn_wrapper = connections[self.b.TaskModel.objects.db] + db_conn_wrapper = connections[router.db_for_write(self.b.TaskModel)] db_conn_wrapper.close_at = time.monotonic() current_db_connection = db_conn_wrapper.connection self.b.mark_as_done(tid, None, request=request) From e23643dd71d7c54e30cc67a648e744a8646e86d5 Mon Sep 17 00:00:00 2001 From: Lode Rosseel Date: Wed, 10 Dec 2025 17:37:52 +0100 Subject: [PATCH 5/5] Add router import --- t/unit/backends/test_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/unit/backends/test_database.py b/t/unit/backends/test_database.py index d6520893..2c8b36e0 100644 --- a/t/unit/backends/test_database.py +++ b/t/unit/backends/test_database.py @@ -13,7 +13,7 @@ from celery.utils.serialization import b64decode from celery.worker.request import Request from celery.worker.strategy import hybrid_to_proto2 -from django.db import connections +from django.db import connections, router from django.test import TransactionTestCase from django_celery_results.backends.database import DatabaseBackend