From e9d3e854c5d9502a2948d5749ff75aa2e01d36fc Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 9 Feb 2026 16:00:42 -0800 Subject: [PATCH 1/2] refactor: use is_complete() and dispatch_mode in job progress handler Replace hardcoded `stage == "results"` check with `job.progress.is_complete()` which verifies ALL stages are done, making it work for any job type. Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API` which is immutable for the job's lifetime and more correct than re-reading a mutable flag that could change between job creation and completion. Co-Authored-By: Claude --- ami/jobs/tasks.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 17083fb84..3548d0ea5 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -159,7 +159,7 @@ def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED, progress=progress_percentage, ) - if stage == "results" and progress_percentage >= 1.0: + if job.progress.is_complete(): job.status = JobState.SUCCESS job.progress.summary.status = JobState.SUCCESS job.finished_at = datetime.datetime.now() # Use naive datetime in local time @@ -167,23 +167,24 @@ def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> job.save() # Clean up async resources for completed jobs that use NATS/Redis - # Only ML jobs with async_pipeline_workers enabled use these resources - if stage == "results" and progress_percentage >= 1.0: + if job.progress.is_complete(): job = Job.objects.get(pk=job_id) # Re-fetch outside transaction _cleanup_job_if_needed(job) def _cleanup_job_if_needed(job) -> None: """ - Clean up async resources (NATS/Redis) if this job type uses them. + Clean up async resources (NATS/Redis) if this job uses them. - Only ML jobs with async_pipeline_workers enabled use NATS/Redis resources. + Only jobs with ASYNC_API dispatch mode use NATS/Redis resources. This function is safe to call for any job - it checks if cleanup is needed. Args: job: The Job instance """ - if job.job_type_key == "ml" and job.project and job.project.feature_flags.async_pipeline_workers: + from ami.jobs.models import JobDispatchMode + + if job.dispatch_mode == JobDispatchMode.ASYNC_API: # import here to avoid circular imports from ami.ml.orchestration.jobs import cleanup_async_job_resources From 6fde58edfea76de6def2f977fa122c296d104c44 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 9 Feb 2026 16:00:48 -0800 Subject: [PATCH 2/2] test: update cleanup tests for is_complete() and dispatch_mode checks Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard. Complete all stages (collect, process, results) in the completion test since is_complete() correctly requires all stages to be done. Co-Authored-By: Claude --- ami/ml/orchestration/tests/test_cleanup.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ami/ml/orchestration/tests/test_cleanup.py b/ami/ml/orchestration/tests/test_cleanup.py index f7b686d44..ef8382d3d 100644 --- a/ami/ml/orchestration/tests/test_cleanup.py +++ b/ami/ml/orchestration/tests/test_cleanup.py @@ -5,7 +5,7 @@ from django.test import TestCase from nats.js.errors import NotFoundError -from ami.jobs.models import Job, JobState, MLJob +from ami.jobs.models import Job, JobDispatchMode, JobState, MLJob from ami.jobs.tasks import _update_job_progress, update_job_failure, update_job_status from ami.main.models import Project, ProjectFeatureFlags, SourceImage, SourceImageCollection from ami.ml.models import Pipeline @@ -106,6 +106,7 @@ def _create_job_with_queued_images(self) -> Job: name="Test Cleanup Job", pipeline=self.pipeline, source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, ) # Queue images to NATS (also initializes Redis state) @@ -162,7 +163,9 @@ def test_cleanup_on_job_completion(self): """Test that resources are cleaned up when job completes successfully.""" job = self._create_job_with_queued_images() - # Simulate job completion by updating progress to 100% in results stage + # Simulate job completion: complete all stages (collect, process, then results) + _update_job_progress(job.pk, stage="collect", progress_percentage=1.0) + _update_job_progress(job.pk, stage="process", progress_percentage=1.0) _update_job_progress(job.pk, stage="results", progress_percentage=1.0) # Verify cleanup happened