PSv2: Implement queue clean-up upon job completion#1113
PSv2: Implement queue clean-up upon job completion#1113mihow merged 7 commits intoRolnickLab:mainfrom
Conversation
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
📝 WalkthroughWalkthroughThis pull request implements cleanup of async job resources (NATS streams/consumers and Redis keys) when ML jobs complete, fail, or are revoked. The core cleanup logic is refactored into a unified function, integrated into task status handlers, and validated with comprehensive tests covering all completion scenarios. Changes
Sequence DiagramsequenceDiagram
participant Job as Job Completion Event
participant Tasks as tasks.py<br/>(Orchestration)
participant Cleanup as jobs.py<br/>(cleanup_async_job_resources)
participant Redis as TaskStateManager<br/>(Redis)
participant NATS as TaskQueueManager<br/>(NATS)
Note over Job,NATS: Completion triggered by: progress==100% OR failure OR revocation
Job->>Tasks: _update_job_progress() / update_job_status() / update_job_failure()
Tasks->>Tasks: Check if job_type=="ml" & async_pipeline_workers enabled
alt Cleanup needed
Tasks->>Cleanup: _cleanup_job_if_needed(job)
Cleanup->>Redis: cleanup() - remove task state keys
activate Redis
Redis-->>Cleanup: redis_success (bool)
deactivate Redis
Cleanup->>NATS: TaskQueueManager context - delete streams/consumers
activate NATS
NATS-->>Cleanup: nats_success (bool)
deactivate NATS
Cleanup-->>Tasks: return redis_success AND nats_success
else No cleanup needed
Tasks->>Tasks: Skip cleanup
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR implements automatic cleanup of NATS JetStream and Redis resources when async ML jobs complete, fail, or are cancelled. This addresses issue #1083 by ensuring that temporary resources used for job orchestration are properly removed after jobs finish.
Changes:
- Renamed
cleanup_nats_resourcestocleanup_async_job_resourcesto handle both NATS and Redis cleanup - Integrated cleanup into job lifecycle at three points: completion, failure, and revocation
- Added comprehensive integration tests covering all three cleanup scenarios
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| ami/ml/orchestration/jobs.py | Enhanced cleanup function to handle both Redis and NATS resources, with proper error handling and logging |
| ami/jobs/tasks.py | Integrated cleanup calls in job completion, failure, and revocation handlers with feature flag checks |
| ami/ml/orchestration/test_cleanup.py | Added comprehensive integration tests verifying cleanup works correctly in all three scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/test_cleanup.py`:
- Around line 118-159: In _verify_resources_cleaned, change the broad exception
handling inside the async check_nats_resources (which calls
manager.js.stream_info and manager.js.consumer_info via TaskQueueManager) to
only treat nats.js.errors.NotFoundError as "not found" (set
stream_exists/consumer_exists = False) and re-raise any other exceptions so
connection/infra errors fail the test; import or reference NotFoundError from
nats.js.errors and use it in the except clauses for the respective stream and
consumer checks.
🧹 Nitpick comments (1)
ami/ml/orchestration/jobs.py (1)
33-53: Capture stack traces on cleanup failures for easier diagnosis.
job.logger.errordrops the traceback;job.logger.exceptionpreserves context without changing behavior.🔧 Suggested update
- except Exception as e: - job.logger.error(f"Error cleaning up Redis state for job {job.pk}: {e}") + except Exception: + job.logger.exception(f"Error cleaning up Redis state for job {job.pk}") ... - except Exception as e: - job.logger.error(f"Error cleaning up NATS resources for job {job.pk}: {e}") + except Exception: + job.logger.exception(f"Error cleaning up NATS resources for job {job.pk}")
* merge * feat: PSv2 - Queue/redis clean-up upon job completion * fix: catch specific exception * chore: move tests to a subdir --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com>
* merge * Update ML job counts in async case * Update date picker version and tweak layout logic (#1105) * fix: update date picker version and tweak layout logic * feat: set start month based on selected date * fix: Properly handle async job state with celery tasks (#1114) * merge * fix: Properly handle async job state with celery tasks * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Delete implemented plan --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * PSv2: Implement queue clean-up upon job completion (#1113) * merge * feat: PSv2 - Queue/redis clean-up upon job completion * fix: catch specific exception * chore: move tests to a subdir --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> * fix: PSv2: Workers should not try to fetch tasks from v1 jobs (#1118) Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs. JobDispatchMode enum (ami/jobs/models.py): internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs. sync_api — worker calls an external processing service API synchronously and waits for each response. async_api — worker publishes items to NATS for external processing service workers to pick up independently. Database and Model Changes: Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py. ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled. ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset). API and Filtering: dispatch_mode is exposed (read-only) in job list and detail serializers. Filterable via query parameter: ?dispatch_mode=async_api The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch. Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping. --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com> * PSv2 cleanup: use is_complete() and dispatch_mode in job progress handler (#1125) * refactor: use is_complete() and dispatch_mode in job progress handler Replace hardcoded `stage == "results"` check with `job.progress.is_complete()` which verifies ALL stages are done, making it work for any job type. Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API` which is immutable for the job's lifetime and more correct than re-reading a mutable flag that could change between job creation and completion. Co-Authored-By: Claude <noreply@anthropic.com> * test: update cleanup tests for is_complete() and dispatch_mode checks Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard. Complete all stages (collect, process, results) in the completion test since is_complete() correctly requires all stages to be done. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com> * track captures and failures * Update tests, CR feedback, log error images * CR feedback * fix type checking * refactor: rename _get_progress to _commit_update in TaskStateManager Clarify naming to distinguish mutating vs read-only methods: - _commit_update(): private, writes mutations to Redis, returns progress - get_progress(): public, read-only snapshot (added in #1129) - update_state(): public API, acquires lock, calls _commit_update() Co-Authored-By: Claude <noreply@anthropic.com> * fix: unify FAILURE_THRESHOLD and convert TaskProgress to dataclass - Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py - Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match the sync path's boundary behavior at exactly 50% - Convert TaskProgress from namedtuple to dataclass with defaults, so new fields don't break existing callers Co-Authored-By: Claude <noreply@anthropic.com> * refactor: rename TaskProgress to JobStateProgress Clarify that this dataclass tracks job-level progress in Redis, not individual task/image progress. Aligns with the naming of JobProgress (the Django/Pydantic model equivalent). Co-Authored-By: Claude <noreply@anthropic.com> * docs: update NATS todo and planning docs with session learnings Mark connection handling as done (PR #1130), add worktree/remote mapping and docker testing notes for future sessions. Co-Authored-By: Claude <noreply@anthropic.com> * Rename TaskStateManager to AsyncJobStateManager * Track results counts in the job itself vs Redis * small simplification * Reset counts to 0 on reset * chore: remove local planning docs from PR branch Co-Authored-By: Claude <noreply@anthropic.com> * docs: clarify three-layer job state architecture in docstrings Explain the relationship between AsyncJobStateManager (Redis), JobProgress (JSONB), and JobState (enum). Clarify that all counts in JobStateProgress refer to source images (captures). Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Anna Viklund <annamariaviklund@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* merge * feat: PSv2 - Queue/redis clean-up upon job completion * fix: catch specific exception * chore: move tests to a subdir --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com>
* merge * Update ML job counts in async case * Update date picker version and tweak layout logic (RolnickLab#1105) * fix: update date picker version and tweak layout logic * feat: set start month based on selected date * fix: Properly handle async job state with celery tasks (RolnickLab#1114) * merge * fix: Properly handle async job state with celery tasks * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Delete implemented plan --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * PSv2: Implement queue clean-up upon job completion (RolnickLab#1113) * merge * feat: PSv2 - Queue/redis clean-up upon job completion * fix: catch specific exception * chore: move tests to a subdir --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> * fix: PSv2: Workers should not try to fetch tasks from v1 jobs (RolnickLab#1118) Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs. JobDispatchMode enum (ami/jobs/models.py): internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs. sync_api — worker calls an external processing service API synchronously and waits for each response. async_api — worker publishes items to NATS for external processing service workers to pick up independently. Database and Model Changes: Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py. ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled. ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset). API and Filtering: dispatch_mode is exposed (read-only) in job list and detail serializers. Filterable via query parameter: ?dispatch_mode=async_api The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch. Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping. --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com> * PSv2 cleanup: use is_complete() and dispatch_mode in job progress handler (RolnickLab#1125) * refactor: use is_complete() and dispatch_mode in job progress handler Replace hardcoded `stage == "results"` check with `job.progress.is_complete()` which verifies ALL stages are done, making it work for any job type. Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API` which is immutable for the job's lifetime and more correct than re-reading a mutable flag that could change between job creation and completion. Co-Authored-By: Claude <noreply@anthropic.com> * test: update cleanup tests for is_complete() and dispatch_mode checks Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard. Complete all stages (collect, process, results) in the completion test since is_complete() correctly requires all stages to be done. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com> * track captures and failures * Update tests, CR feedback, log error images * CR feedback * fix type checking * refactor: rename _get_progress to _commit_update in TaskStateManager Clarify naming to distinguish mutating vs read-only methods: - _commit_update(): private, writes mutations to Redis, returns progress - get_progress(): public, read-only snapshot (added in RolnickLab#1129) - update_state(): public API, acquires lock, calls _commit_update() Co-Authored-By: Claude <noreply@anthropic.com> * fix: unify FAILURE_THRESHOLD and convert TaskProgress to dataclass - Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py - Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match the sync path's boundary behavior at exactly 50% - Convert TaskProgress from namedtuple to dataclass with defaults, so new fields don't break existing callers Co-Authored-By: Claude <noreply@anthropic.com> * refactor: rename TaskProgress to JobStateProgress Clarify that this dataclass tracks job-level progress in Redis, not individual task/image progress. Aligns with the naming of JobProgress (the Django/Pydantic model equivalent). Co-Authored-By: Claude <noreply@anthropic.com> * docs: update NATS todo and planning docs with session learnings Mark connection handling as done (PR RolnickLab#1130), add worktree/remote mapping and docker testing notes for future sessions. Co-Authored-By: Claude <noreply@anthropic.com> * Rename TaskStateManager to AsyncJobStateManager * Track results counts in the job itself vs Redis * small simplification * Reset counts to 0 on reset * chore: remove local planning docs from PR branch Co-Authored-By: Claude <noreply@anthropic.com> * docs: clarify three-layer job state architecture in docstrings Explain the relationship between AsyncJobStateManager (Redis), JobProgress (JSONB), and JobState (enum). Clarify that all counts in JobStateProgress refer to source images (captures). Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Anna Viklund <annamariaviklund@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>

Summary
Performs clean-up of NATS and Redis resources used by async jobs
Related Issues
Closes #1083
Testing
NATS Dashboard With job running:
After job finished:
Redis with job running:
With job complete:
Job logs:
Checklist
Summary by CodeRabbit
Bug Fixes
Tests