PSv2: Async backend docs and diagram#1137
PSv2: Async backend docs and diagram#1137carlosgjs wants to merge 35 commits intoRolnickLab:mainfrom
Conversation
* fix: update date picker version and tweak layout logic * feat: set start month based on selected date
* merge * fix: Properly handle async job state with celery tasks * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Delete implemented plan --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* merge * feat: PSv2 - Queue/redis clean-up upon job completion * fix: catch specific exception * chore: move tests to a subdir --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com>
…kLab#1118) Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs. JobDispatchMode enum (ami/jobs/models.py): internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs. sync_api — worker calls an external processing service API synchronously and waits for each response. async_api — worker publishes items to NATS for external processing service workers to pick up independently. Database and Model Changes: Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py. ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled. ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset). API and Filtering: dispatch_mode is exposed (read-only) in job list and detail serializers. Filterable via query parameter: ?dispatch_mode=async_api The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch. Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping. --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
…dler (RolnickLab#1125) * refactor: use is_complete() and dispatch_mode in job progress handler Replace hardcoded `stage == "results"` check with `job.progress.is_complete()` which verifies ALL stages are done, making it work for any job type. Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API` which is immutable for the job's lifetime and more correct than re-reading a mutable flag that could change between job creation and completion. Co-Authored-By: Claude <noreply@anthropic.com> * test: update cleanup tests for is_complete() and dispatch_mode checks Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard. Complete all stages (collect, process, results) in the completion test since is_complete() correctly requires all stages to be done. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
# Conflicts: # ami/ml/orchestration/task_state.py
Clarify naming to distinguish mutating vs read-only methods: - _commit_update(): private, writes mutations to Redis, returns progress - get_progress(): public, read-only snapshot (added in RolnickLab#1129) - update_state(): public API, acquires lock, calls _commit_update() Co-Authored-By: Claude <noreply@anthropic.com>
- Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py - Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match the sync path's boundary behavior at exactly 50% - Convert TaskProgress from namedtuple to dataclass with defaults, so new fields don't break existing callers Co-Authored-By: Claude <noreply@anthropic.com>
Clarify that this dataclass tracks job-level progress in Redis, not individual task/image progress. Aligns with the naming of JobProgress (the Django/Pydantic model equivalent). Co-Authored-By: Claude <noreply@anthropic.com>
Mark connection handling as done (PR RolnickLab#1130), add worktree/remote mapping and docker testing notes for future sessions. Co-Authored-By: Claude <noreply@anthropic.com>
…carlos/trackcounts
Co-Authored-By: Claude <noreply@anthropic.com>
Explain the relationship between AsyncJobStateManager (Redis), JobProgress (JSONB), and JobState (enum). Clarify that all counts in JobStateProgress refer to source images (captures). Co-Authored-By: Claude <noreply@anthropic.com>
👷 Deploy request for antenna-ssec pending review.Visit the deploys page to approve it
|
✅ Deploy Preview for antenna-preview canceled.
|
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughAdds a new documentation page describing the async ML backend architecture, components (Django, Postgres, Celery, Redis, NATS JetStream, ML Worker), end-to-end async job/task flow, API shapes for GET /jobs/{id}/tasks and POST /jobs/{id}/result, design decisions, and error/cancellation semantics. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Django
participant Postgres
participant Redis
participant NATS
participant MLWorker
participant Celery
Client->>Django: POST /jobs/{id}/result (or GET /jobs/{id}/tasks)
Django->>Postgres: read/write job/task metadata
Django->>Redis: update task reservation / visibility timeout
Django->>NATS: publish to per-job stream (reply_subject for responses)
NATS->>MLWorker: deliver task message (subscribe)
MLWorker->>MLWorker: process task
MLWorker->>Django: POST /jobs/{id}/result (uses reply_subject / API)
Django->>Redis: acknowledge / update task state
Django->>Celery: enqueue downstream processing (if needed)
Celery->>Postgres: finalize results / persist outputs
Django->>Client: respond with task/result acknowledgment
Estimated code review effort🎯 1 (Trivial) | ⏱️ ~3 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
docs/diagrams/async_ml_backend.md (2)
85-98: Optional: Consider explaining the TTR vs. lock duration relationship.The visibility timeout is 300 seconds (line 88) while the lock duration is 360 seconds (line 96). The 60-second buffer is likely intentional to ensure the lock covers the entire visibility window, but making this relationship explicit would help readers understand the coordination between NATS and Redis timing parameters.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/diagrams/async_ml_backend.md` around lines 85 - 98, The docs list a Visibility Timeout (TTR) of 300s and a Redis Lock Duration of 360s but don't explain their relationship; update the "Asynchronous Task Queue (NATS JetStream)" and "Redis-Based State Management" sections to explicitly state that the lock duration (360s) intentionally exceeds the TTR (300s) by a 60s buffer to ensure the Redis distributed lock outlives the NATS visibility window, preventing races where a task becomes visible again while the lock still holds or vice versa; mention that this buffer covers clock skew and worker reprocessing delays and note that the values (TTR and Lock Duration) must be adjusted together if either is changed.
66-78: Consider clarifying the dual update pattern for pending images and progress.The diagram shows two separate updates to both pending images (lines 67, 76) and job.progress (lines 71, 77), labeled as "process stage" and "results stage." The distinction between these stages and the rationale for updating the same state twice isn't explained elsewhere in the document.
If this accurately reflects the implementation, consider adding a brief note explaining why both stages are needed. If it's unintentional duplication, the flow could potentially be simplified.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/diagrams/async_ml_backend.md` around lines 66 - 78, The diagram shows two updates to "pending images" (Redis) and "job.progress" (DB) labeled "process stage" and "results stage" which is ambiguous; verify the worker implementation (Celery task) to confirm whether both Redis updates and both DB job.progress writes are required. If intentional, add a brief Note in the diagram (e.g., after "Celery->>Redis: Update pending images (remove processed)") explaining the two-stage lifecycle: first update during per-image processing to reflect work-in-progress and progress percentage, and second update in results stage to record final state and release the lock; if unintentional, consolidate the two Redis updates into a single "Update pending images" step and merge the two DB writes into one "Update job.progress" step and adjust the surrounding sequence (acknowledge_task, save detections) accordingly so the diagram matches the actual Celery worker logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@docs/diagrams/async_ml_backend.md`:
- Around line 85-98: The docs list a Visibility Timeout (TTR) of 300s and a
Redis Lock Duration of 360s but don't explain their relationship; update the
"Asynchronous Task Queue (NATS JetStream)" and "Redis-Based State Management"
sections to explicitly state that the lock duration (360s) intentionally exceeds
the TTR (300s) by a 60s buffer to ensure the Redis distributed lock outlives the
NATS visibility window, preventing races where a task becomes visible again
while the lock still holds or vice versa; mention that this buffer covers clock
skew and worker reprocessing delays and note that the values (TTR and Lock
Duration) must be adjusted together if either is changed.
- Around line 66-78: The diagram shows two updates to "pending images" (Redis)
and "job.progress" (DB) labeled "process stage" and "results stage" which is
ambiguous; verify the worker implementation (Celery task) to confirm whether
both Redis updates and both DB job.progress writes are required. If intentional,
add a brief Note in the diagram (e.g., after "Celery->>Redis: Update pending
images (remove processed)") explaining the two-stage lifecycle: first update
during per-image processing to reflect work-in-progress and progress percentage,
and second update in results stage to record final state and release the lock;
if unintentional, consolidate the two Redis updates into a single "Update
pending images" step and merge the two DB writes into one "Update job.progress"
step and adjust the surrounding sequence (acknowledge_task, save detections)
accordingly so the diagram matches the actual Celery worker logic.
There was a problem hiding this comment.
Pull request overview
Adds architecture documentation for Antenna’s async ML job execution flow, illustrating how Django, Celery, Redis, and NATS JetStream interact with external ML workers.
Changes:
- Added a new Mermaid sequence diagram describing the async ML job lifecycle (queueing → reserving → result posting → ACK/progress updates).
- Documented key design decisions (per-job streams, TTR/max-deliver retries, Redis locking/state, reply-subject ACK pattern).
- Documented worker-facing task/result endpoints and example payloads.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
docs/diagrams/async_ml_backend.md
Outdated
|
|
||
| **Query Parameters:** | ||
|
|
||
| - `batch`: Number of tasks to fetch (1-100) |
There was a problem hiding this comment.
The batch query parameter is documented as “(1-100)”, but the API currently only enforces min_value=1 and does not cap the maximum. Either remove the upper bound from the docs or add a corresponding max validation in the endpoint so documentation and behavior stay aligned.
| - `batch`: Number of tasks to fetch (1-100) | |
| - `batch`: Number of tasks to fetch (minimum 1) |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
docs/diagrams/async_ml_backend.md (2)
119-119: Add comma for clarity.The sentence should have a comma before the main clause.
📝 Suggested grammar fix
-- If worker crashes and never reports a result or error NATS will redeliver after visibility timeout +- If worker crashes and never reports a result or error, NATS will redeliver after visibility timeout🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/diagrams/async_ml_backend.md` at line 119, Update the sentence "If worker crashes and never reports a result or error NATS will redeliver after visibility timeout" by inserting a comma before the main clause so it reads like: "If worker crashes and never reports a result or error, NATS will redeliver after visibility timeout" (adjust surrounding articles for clarity if desired); target the exact sentence in the document.
160-160: Hyphenate compound adjective.The phrase "post processing results" should be hyphenated as "post-processing results" when used as a compound adjective modifying "results." As per static analysis hints, use a hyphen to join these words.
📝 Suggested grammar fix
-Worker endpoint to post processing results. +Worker endpoint to post-processing results.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/diagrams/async_ml_backend.md` at line 160, Update the documented phrase "Worker endpoint to post processing results." by hyphenating the compound adjective so it reads "Worker endpoint to post-processing results." Locate the string in docs/diagrams/async_ml_backend.md (the line containing "post processing results") and replace it with the hyphenated form to satisfy the static analysis/grammar check.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@docs/diagrams/async_ml_backend.md`:
- Line 119: Update the sentence "If worker crashes and never reports a result or
error NATS will redeliver after visibility timeout" by inserting a comma before
the main clause so it reads like: "If worker crashes and never reports a result
or error, NATS will redeliver after visibility timeout" (adjust surrounding
articles for clarity if desired); target the exact sentence in the document.
- Line 160: Update the documented phrase "Worker endpoint to post processing
results." by hyphenating the compound adjective so it reads "Worker endpoint to
post-processing results." Locate the string in docs/diagrams/async_ml_backend.md
(the line containing "post processing results") and replace it with the
hyphenated form to satisfy the static analysis/grammar check.
Add async ML backend diagram
Summary by CodeRabbit