From 1ade1a824feb93f4e180efcb65b373ae0131f02b Mon Sep 17 00:00:00 2001 From: M Umar Khan Date: Thu, 10 Jul 2025 13:56:53 +0500 Subject: [PATCH 1/3] Replace texfields with jsonfield --- ..._sub_tasks_taskresult_new_meta_and_more.py | 33 ++++++++ ...0016_make_copy_of_taskresult_textfields.py | 81 +++++++++++++++++++ ...17_make_copy_of_chordcounter_textfields.py | 79 ++++++++++++++++++ ...ove_chordcounter_new_sub_tasks_and_more.py | 50 ++++++++++++ django_celery_results/models.py | 9 ++- 5 files changed, 248 insertions(+), 4 deletions(-) create mode 100644 django_celery_results/migrations/0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more.py create mode 100644 django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py create mode 100644 django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py create mode 100644 django_celery_results/migrations/0018_remove_chordcounter_new_sub_tasks_and_more.py diff --git a/django_celery_results/migrations/0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more.py b/django_celery_results/migrations/0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more.py new file mode 100644 index 00000000..bcc825ca --- /dev/null +++ b/django_celery_results/migrations/0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more.py @@ -0,0 +1,33 @@ +# Generated by Django 5.2.4 on 2025-07-09 09:20 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_results', '0014_alter_taskresult_status'), + ] + + operations = [ + migrations.AddField( + model_name='chordcounter', + name='new_sub_tasks', + field=models.JSONField(default=None, help_text='JSON serialized list of task result tuples. use .group_result() to decode'), + ), + migrations.AddField( + model_name='taskresult', + name='new_meta', + field=models.JSONField(default=None, editable=False, help_text='JSON meta information about the task, such as information on child tasks', null=True, verbose_name='Task Meta Information'), + ), + migrations.AddField( + model_name='taskresult', + name='new_task_args', + field=models.JSONField(help_text='JSON representation of the positional arguments used with the task', null=True, verbose_name='Task Positional Arguments'), + ), + migrations.AddField( + model_name='taskresult', + name='new_task_kwargs', + field=models.JSONField(help_text='JSON representation of the named arguments used with the task', null=True, verbose_name='Task Named Arguments'), + ), + ] diff --git a/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py b/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py new file mode 100644 index 00000000..153a3a74 --- /dev/null +++ b/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py @@ -0,0 +1,81 @@ +import json +import logging +from django.db import migrations, transaction + + +logger = logging.getLogger(__name__) + + +def safe_json_loads(value, default=None): + """Safely parse JSON string with fallback.""" + if not value: # Handles None, empty string, etc. + return default + return json.loads(value) + + +def make_copy_of_taskresult_textfields(apps, schema_editor): + TaskResult = apps.get_model('django_celery_results', 'TaskResult') + + total_count = TaskResult.objects.count() + logger.info(f"Starting migration for {total_count} TaskResult records") + + batch_size = 500 + processed_count = 0 + error_count = 0 + last_id = 0 + + while True: + with transaction.atomic(): + # Get next batch using cursor pagination + batch = list( + TaskResult.objects.filter(id__gt=last_id) + .order_by('id')[:batch_size] + ) + + if not batch: + break + + updates = [] + + for obj in batch: + try: + # Parse JSON fields with appropriate defaults + obj.new_task_args = safe_json_loads(obj.task_args) + obj.new_task_kwargs = safe_json_loads(obj.task_kwargs) + obj.new_meta = safe_json_loads(obj.meta) + + updates.append(obj) + + except Exception as e: + error_count += 1 + logger.error(f"Error processing TaskResult ID {obj.id}: {e}") + continue + + if updates: + TaskResult.objects.bulk_update( + updates, + ['new_task_args', 'new_task_kwargs', 'new_meta'] + ) + processed_count += len(updates) + + last_id = batch[-1].id + + # Progress logging + progress = (processed_count / total_count) * 100 if total_count > 0 else 0 + logger.info(f"Processed {processed_count}/{total_count} records ({progress:.1f}%)") + + logger.info(f"Migration completed. Successfully processed {processed_count} records, " + f"{error_count} errors encountered") + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_results', '0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more'), + ] + + operations = [ + migrations.RunPython( + make_copy_of_taskresult_textfields, + migrations.RunPython.noop + ) + ] diff --git a/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py b/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py new file mode 100644 index 00000000..7153dea0 --- /dev/null +++ b/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py @@ -0,0 +1,79 @@ +import json +import logging +from django.db import migrations, transaction + + +logger = logging.getLogger(__name__) + + +def safe_json_loads(value, default=None): + """Safely parse JSON string with fallback.""" + if not value: # Handles None, empty string, etc. + return default + return json.loads(value) + + +def make_copy_of_chordcounter_textfields(apps, schema_editor): + chord_counter = apps.get_model('django_celery_results', 'ChordCounter') + + total_count = chord_counter.objects.count() + logger.info(f"Starting migration for {total_count} ChordCounter records") + + batch_size = 500 + processed_count = 0 + error_count = 0 + last_id = 0 + + while True: + with transaction.atomic(): + # Get next batch using cursor pagination + batch = list( + chord_counter.objects.filter(id__gt=last_id) + .order_by('id')[:batch_size] + ) + + if not batch: + break + + updates = [] + + for obj in batch: + try: + # Parse JSON fields with appropriate defaults + obj.new_sub_tasks = safe_json_loads(obj.sub_tasks) + updates.append(obj) + + except Exception as e: + error_count += 1 + logger.error(f"Error processing ChordCounter ID {obj.id}: {e}") + continue + + if updates: + chord_counter.objects.bulk_update( + updates, + ['new_sub_tasks'] + ) + processed_count += len(updates) + + last_id = batch[-1].id + + # Progress logging + progress = (processed_count / total_count) * 100 if total_count > 0 else 0 + logger.info(f"Processed {processed_count}/{total_count} records ({progress:.1f}%)") + + logger.info(f"Migration completed. Successfully processed {processed_count} records, " + f"{error_count} errors encountered") + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_results', '0016_make_copy_of_taskresult_textfields'), + ] + + operations = [ + migrations.RunPython( + make_copy_of_chordcounter_textfields, + migrations.RunPython.noop + ) + ] diff --git a/django_celery_results/migrations/0018_remove_chordcounter_new_sub_tasks_and_more.py b/django_celery_results/migrations/0018_remove_chordcounter_new_sub_tasks_and_more.py new file mode 100644 index 00000000..9b900f3a --- /dev/null +++ b/django_celery_results/migrations/0018_remove_chordcounter_new_sub_tasks_and_more.py @@ -0,0 +1,50 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_results', '0017_make_copy_of_chordcounter_textfields'), + ] + + operations = [ + # Remove the old fields + migrations.RemoveField( + model_name='chordcounter', + name='sub_tasks', + ), + migrations.RemoveField( + model_name='taskresult', + name='meta', + ), + migrations.RemoveField( + model_name='taskresult', + name='task_args', + ), + migrations.RemoveField( + model_name='taskresult', + name='task_kwargs', + ), + + # Rename the new_ fields to their non-prefixed versions + migrations.RenameField( + model_name='chordcounter', + old_name='new_sub_tasks', + new_name='sub_tasks', + ), + migrations.RenameField( + model_name='taskresult', + old_name='new_meta', + new_name='meta', + ), + migrations.RenameField( + model_name='taskresult', + old_name='new_task_args', + new_name='task_args', + ), + migrations.RenameField( + model_name='taskresult', + old_name='new_task_kwargs', + new_name='task_kwargs', + ), + ] diff --git a/django_celery_results/models.py b/django_celery_results/models.py index b472a5af..5e5affa1 100644 --- a/django_celery_results/models.py +++ b/django_celery_results/models.py @@ -39,12 +39,12 @@ class TaskResult(models.Model): ), verbose_name=_('Task Name'), help_text=_('Name of the Task which was run')) - task_args = models.TextField( + task_args = models.JSONField( null=True, verbose_name=_('Task Positional Arguments'), help_text=_('JSON representation of the positional arguments ' 'used with the task')) - task_kwargs = models.TextField( + task_kwargs = models.JSONField( null=True, verbose_name=_('Task Named Arguments'), help_text=_('JSON representation of the named arguments ' @@ -86,7 +86,7 @@ class TaskResult(models.Model): blank=True, null=True, verbose_name=_('Traceback'), help_text=_('Text of the traceback if the task generated one')) - meta = models.TextField( + meta = models.JSONField( null=True, default=None, editable=False, verbose_name=_('Task Meta Information'), help_text=_('JSON meta information about the task, ' @@ -148,7 +148,8 @@ class ChordCounter(models.Model): verbose_name=_("Group ID"), help_text=_("Celery ID for the Chord header group"), ) - sub_tasks = models.TextField( + sub_tasks = models.JSONField( + default=None, help_text=_( "JSON serialized list of task result tuples. " "use .group_result() to decode" From d0788663ff8cf8b7b88e3622d0a27dcae4d56171 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 10 Jul 2025 09:04:06 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../migrations/0016_make_copy_of_taskresult_textfields.py | 2 +- .../migrations/0017_make_copy_of_chordcounter_textfields.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py b/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py index 153a3a74..05e888db 100644 --- a/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py +++ b/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py @@ -1,7 +1,7 @@ import json import logging -from django.db import migrations, transaction +from django.db import migrations, transaction logger = logging.getLogger(__name__) diff --git a/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py b/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py index 7153dea0..e944ab8f 100644 --- a/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py +++ b/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py @@ -1,7 +1,7 @@ import json import logging -from django.db import migrations, transaction +from django.db import migrations, transaction logger = logging.getLogger(__name__) From f5df2109f1265fc25f5916c1800c4b8c613067d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Asif=20Saif=20Uddin=20=7B=22Auvi=22=3A=22=E0=A6=85?= =?UTF-8?q?=E0=A6=AD=E0=A6=BF=22=7D?= Date: Thu, 2 Oct 2025 18:02:51 +0600 Subject: [PATCH 3/3] Update django_celery_results/models.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- django_celery_results/models.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/django_celery_results/models.py b/django_celery_results/models.py index 5e5affa1..710688fd 100644 --- a/django_celery_results/models.py +++ b/django_celery_results/models.py @@ -149,7 +149,8 @@ class ChordCounter(models.Model): help_text=_("Celery ID for the Chord header group"), ) sub_tasks = models.JSONField( - default=None, + null=True, + default=list, help_text=_( "JSON serialized list of task result tuples. " "use .group_result() to decode"