Skip to content

PSv2: Track and display image count progress and state#1121

Merged
mihow merged 29 commits intoRolnickLab:mainfrom
uw-ssec:carlos/trackcounts
Feb 17, 2026
Merged

PSv2: Track and display image count progress and state#1121
mihow merged 29 commits intoRolnickLab:mainfrom
uw-ssec:carlos/trackcounts

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Feb 6, 2026

Summary

This pull request introduces progress tracking of various counts for async image processing jobs. It brings the async path to parity with the sync path. This includes setting the JobState according to the ratio of failures.

  • Added tracking of failed images and cumulative counts for detections, classifications, and captures in TaskStateManager and TaskProgress, including new Redis keys and logic for updating and cleaning up these metrics. * Updated process_nats_pipeline_result in ami/jobs/tasks.py to calculate and pass failed image IDs and detection/classification/capture counts to TaskStateManager, and to reflect these in job progress updates.
  • Modified _update_job_progress to support custom completion states (success/failure) based on the proportion of failed images, and to propagate new progress metrics to the job model.
  • Added logging of error results to the job log
  • Expanded unit tests in ami/ml/tests.py to validate the new progress fields, including detections, classifications, and failed image counts.

Screenshots

Success case:
image
image

Failure case:
(test artificially failed some images). Also Job state set to Failure.
image

image image

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Enhanced job progress tracking: per-stage counts for detections, classifications, captures, plus cumulative failed-image tracking and propagated counts across stages.
  • Bug Fixes

    • Improved error handling and failure-state determination during pipeline processing and finalization; progress reflects failure ratios.
  • Tests

    • Tests updated to verify cumulative failure tracking, retry behavior, cleanup of failed-image tracking, and new progress semantics.
  • Documentation

    • Added NATS infrastructure notes and planning docs for reliability and cleanup improvements.

@netlify
Copy link

netlify bot commented Feb 6, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit a15ebda
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/6993f831d508080007a7a49a

@netlify
Copy link

netlify bot commented Feb 6, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit a15ebda
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/6993f8311cf1840008e2d8a7

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 6, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Replaces TaskStateManager with AsyncJobStateManager, adds failed-image tracking and cumulative counts (detections, classifications, captures) through NATS pipeline result handling, and updates _update_job_progress to accept a complete_state and propagate computed completion and counts into job progress and status.

Changes

Cohort / File(s) Summary
State manager implementation
ami/ml/orchestration/async_job_state.py
Renamed TaskStateManager → AsyncJobStateManager; introduced JobStateProgress dataclass, _failed_key tracking, _commit_update for atomic updates, added failed-count handling, updated get_progress/update_state return types and cleanup.
Pipeline & job progress
ami/jobs/tasks.py
Replaced TaskStateManager usage with AsyncJobStateManager; process_nats_pipeline_result now computes/propagates failed_image_ids, detections/classifications/captures counts, derives complete_state from failure ratio, and calls updated _update_job_progress(job_id, stage, pct, complete_state, **state_params).
Tests
ami/ml/tests.py, ami/jobs/test_tasks.py, ami/ml/orchestration/tests/test_cleanup.py
Updated imports/usages to AsyncJobStateManager; replaced _get_progress usage with _commit_update/_commit_update semantics; added/updated tests for failed-image tracking, retries, cumulative counts, and cleanup of failed keys.
Integration / jobs orchestration
ami/ml/orchestration/jobs.py, ami/jobs/models.py
Swapped TaskStateManager → AsyncJobStateManager in cleanup/initialization paths; moved FAILURE_THRESHOLD usage to ami/jobs/tasks.py and imported it where needed.
Docs / planning
docs/claude/nats-todo.md, docs/claude/planning/pr-trackcounts-next-session.md
Added NATS reliability/cleanup TODO and planning notes documenting design decisions, next steps, and interactions between Redis-based JobStateProgress and Django JobProgress.

Sequence Diagram

sequenceDiagram
    participant NATS
    participant Pipeline as process_nats_pipeline_result
    participant StateMgr as AsyncJobStateManager
    participant Redis
    participant Django as Job Model

    NATS->>Pipeline: deliver result (success / error)
    Pipeline->>Pipeline: extract processed_image_ids, error_result, counts
    Pipeline->>StateMgr: _commit_update(processed_ids, stage, failed_image_ids?)
    StateMgr->>Redis: add/remove failed ids, update counters (processed, total, failed)
    StateMgr-->>Pipeline: JobStateProgress (processed, remaining, failed, percentage)
    Pipeline->>Pipeline: compute complete_state from failure ratio and FAILURE_THRESHOLD
    Pipeline->>Pipeline: call _update_job_progress(job_id, stage, pct, complete_state, detections..., classifications..., captures...)
    Pipeline->>Django: persist job.status and progress.summary.status when complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

backend, ml

Suggested reviewers

  • mihow

Poem

🐰 I tally hops and image counts,
failed seeds I gently fence,
detections, captures, classifications—
I carry sums through every tense.
Hop on, progress, make it dense! 🥕

🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately captures the main objective of the PR: tracking and displaying image count progress and state for async jobs (PSv2).
Description check ✅ Passed The description covers all required sections: summary, list of changes, screenshots demonstrating success/failure cases, and a completed checklist.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

carlosgjs and others added 7 commits February 10, 2026 08:50
* fix: update date picker version and tweak layout logic

* feat: set start month based on selected date
* merge

* fix: Properly handle async job state with celery tasks

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Delete implemented plan

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* merge

* feat: PSv2 - Queue/redis clean-up upon job completion

* fix: catch specific exception

* chore: move tests to a subdir

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
…kLab#1118)

Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. 


This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs.

JobDispatchMode enum (ami/jobs/models.py):

internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs.
sync_api — worker calls an external processing service API synchronously and waits for each response.
async_api — worker publishes items to NATS for external processing service workers to pick up independently.
Database and Model Changes:

Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py.
ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled.
ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset).
API and Filtering:

dispatch_mode is exposed (read-only) in job list and detail serializers.
Filterable via query parameter: ?dispatch_mode=async_api
The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch.
Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping.

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
…dler (RolnickLab#1125)

* refactor: use is_complete() and dispatch_mode in job progress handler

Replace hardcoded `stage == "results"` check with `job.progress.is_complete()`
which verifies ALL stages are done, making it work for any job type.

Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API`
which is immutable for the job's lifetime and more correct than re-reading
a mutable flag that could change between job creation and completion.

Co-Authored-By: Claude <noreply@anthropic.com>

* test: update cleanup tests for is_complete() and dispatch_mode checks

Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard.
Complete all stages (collect, process, results) in the completion test
since is_complete() correctly requires all stages to be done.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
@carlosgjs carlosgjs changed the title Carlos/trackcounts PSv2: Track and display image count progress and state Feb 11, 2026
@carlosgjs carlosgjs requested a review from mihow February 11, 2026 00:51
@carlosgjs carlosgjs marked this pull request as ready for review February 11, 2026 00:55
Copilot AI review requested due to automatic review settings February 11, 2026 00:55
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds richer progress tracking for async ML image processing jobs (detections/classifications/captures/failed) and uses failure ratio to determine final job state, bringing async behavior closer to the existing sync path.

Changes:

  • Extend TaskStateManager/TaskProgress to track cumulative detections/classifications/captures and a unique failed-image set in Redis.
  • Update async result handling (process_nats_pipeline_result) to record failed images and per-result counts, and to propagate these into job progress updates (including final success/failure state).
  • Expand TaskStateManager unit tests to validate the new progress fields and cleanup behavior (partial coverage).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
ami/ml/orchestration/task_state.py Adds Redis-backed cumulative counters + failed-image tracking and surfaces them via TaskProgress.
ami/jobs/tasks.py Plumbs new progress metrics through async pipeline result processing and modifies job completion state handling.
ami/ml/tests.py Adds/updates tests for new progress fields and failed-image tracking.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@ami/jobs/tasks.py`:
- Around line 92-95: complete_state currently computed once using
FAILURE_THRESHOLD, JobState and progress_info and then reused later for the
"results" stage, which can be stale; before calling _update_job_progress for the
"results" stage (and before the state_manager.update_state that sets results to
100%), re-read the latest progress counts (e.g., fetch updated
progress_info.failed and progress_info.total from the state manager or from the
second state_manager.update_state response) and recompute complete_state =
JobState.FAILURE if (failed/total) >= FAILURE_THRESHOLD else JobState.SUCCESS so
the final job.status reflects the most recent state.
- Around line 92-95: The code currently divides progress_info.failed by
progress_info.total without guarding total==0, causing ZeroDivisionError; update
the logic around FAILURE_THRESHOLD, complete_state and JobState so that if
progress_info.total == 0 you short-circuit (set complete_state =
JobState.SUCCESS) and only compute (progress_info.failed / progress_info.total)
>= FAILURE_THRESHOLD when total > 0, mirroring the guard used in
_get_progress/task_state.py.
🧹 Nitpick comments (4)
ami/jobs/tasks.py (2)

133-140: Redundant if pipeline_result checks inside if pipeline_result: block.

Lines 134 and 137 guard with if pipeline_result else 0, but this code is already inside the if pipeline_result: block at line 123. The checks are dead branches.

♻️ Simplification
             # Calculate detection and classification counts from this result
-            detections_count = len(pipeline_result.detections) if pipeline_result else 0
-            classifications_count = (
-                sum(len(detection.classifications) for detection in pipeline_result.detections)
-                if pipeline_result
-                else 0
-            )
+            detections_count = len(pipeline_result.detections)
+            classifications_count = sum(len(detection.classifications) for detection in pipeline_result.detections)
             captures_count = len(pipeline_result.source_images)

195-197: complete_state typed as Any — prefer a narrower type.

Using Any weakens type safety. Since this always receives a JobState value, type it accordingly.

♻️ Tighten the type
+from ami.jobs.models import JobState  # if not already imported at module level
+
 def _update_job_progress(
-    job_id: int, stage: str, progress_percentage: float, complete_state: Any, **state_params
+    job_id: int, stage: str, progress_percentage: float, complete_state: "JobState", **state_params
 ) -> None:

Since JobState is imported locally to avoid circular imports, you can use a string literal ("JobState") or TYPE_CHECKING guard for the annotation.

ami/ml/orchestration/task_state.py (2)

146-169: Read-modify-write on counters is safe only under the lock — consider a comment.

The cumulative counting (lines 148–158) and failed-set union (lines 162–164) use non-atomic cache.get + cache.set. This is safe because the caller update_state holds a per-job lock, but _get_progress is also called directly in tests. A brief inline comment noting the lock invariant would help future maintainers.

📝 Suggested comment
+        # NOTE: The read-modify-write below is safe because callers must hold
+        # the per-job lock (acquired in update_state). Do not call _get_progress
+        # without the lock in production code.
+
         # Update cumulative detection, classification, and capture counts
         current_detections = cache.get(self._detections_key, 0)

139-141: O(n) filtering of pending images on every update.

remaining_images is computed with a list comprehension iterating all pending images (line 139), which grows as job size grows. For large jobs (tens of thousands of images), this linear scan on every progress update could become a bottleneck. A Redis set with SREM would make removals O(1) per image, but this may be acceptable for current workloads.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/jobs/tasks.py (1)

124-183: ⚠️ Potential issue | 🔴 Critical

self.retry() at line 163 will be swallowed by the broad except Exception at line 180.

celery.exceptions.Retry is a subclass of Exception. When state_manager.update_state returns None and self.retry(...) is raised at line 163, it's caught by line 180, logged as an error, and silently discarded — the task will never actually retry.

Either narrow the except clause or re-raise Retry:

Proposed fix (option A — let Retry propagate)
+    from celery.exceptions import Retry
+
     try:
         # Save to database (this is the slow operation)
         ...
-    except Exception as e:
+    except Retry:
+        raise
+    except Exception as e:
         job.logger.error(
             f"Failed to process pipeline result for job {job_id}: {e}. NATS will redeliver the task message."
         )
🧹 Nitpick comments (4)
ami/ml/tests.py (2)

883-886: Consider also asserting captures count in the init helper.

_init_and_verify checks detections, classifications, and failed but omits captures. While captures is tested in test_cumulative_counting, adding it here keeps the initial-state verification comprehensive and consistent.

Proposed fix
         self.assertEqual(progress.detections, 0)
         self.assertEqual(progress.classifications, 0)
         self.assertEqual(progress.failed, 0)
+        self.assertEqual(progress.captures, 0)

1104-1122: cache.get on a Redis set key may return None or an unexpected type depending on the cache backend.

Line 1114 assumes cache.get(self.manager._failed_key) returns a Python set with a len(). This works with Django's locmem cache in tests, but if the _get_progress method uses cache.get/cache.set to store a Python set, the test is valid. Just be aware this would break if the underlying storage changes (e.g., raw Redis commands instead of Django cache).

ami/jobs/tasks.py (2)

137-144: Redundant if pipeline_result guards inside an if pipeline_result: block.

Lines 138 and 141 check if pipeline_result again, but they're already inside the if pipeline_result: block starting at line 127.

Simplified
-            detections_count = len(pipeline_result.detections) if pipeline_result else 0
-            classifications_count = (
-                sum(len(detection.classifications) for detection in pipeline_result.detections)
-                if pipeline_result
-                else 0
-            )
+            detections_count = len(pipeline_result.detections)
+            classifications_count = sum(len(d.classifications) for d in pipeline_result.detections)
             captures_count = len(pipeline_result.source_images)

204-206: Consider using JobState instead of Any for complete_state type hint.

Any obscures the expected type. Since JobState is already imported inside the function body, you could use a string literal "JobState" or import it at module level under TYPE_CHECKING to get a proper type hint without circular import issues.

carlosgjs and others added 6 commits February 11, 2026 12:09
# Conflicts:
#	ami/ml/orchestration/task_state.py
Clarify naming to distinguish mutating vs read-only methods:
- _commit_update(): private, writes mutations to Redis, returns progress
- get_progress(): public, read-only snapshot (added in RolnickLab#1129)
- update_state(): public API, acquires lock, calls _commit_update()

Co-Authored-By: Claude <noreply@anthropic.com>
- Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py
- Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match
  the sync path's boundary behavior at exactly 50%
- Convert TaskProgress from namedtuple to dataclass with defaults,
  so new fields don't break existing callers

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ami/jobs/tasks.py (2)

159-184: ⚠️ Potential issue | 🔴 Critical

raise self.retry() on line 164 will be swallowed by the except Exception on line 181.

Celery's self.retry() raises celery.exceptions.Retry, which is a subclass of Exception. The broad except Exception at line 181 will catch it, log it as a processing error, and silently discard the retry. This means if the lock isn't acquired for the "results" stage update, the progress will never be updated for that image, and the job may never complete.

Note that the identical pattern at line 95 works correctly because it's outside any try/except block.

🐛 Proposed fix — exclude Retry from the catch-all
+    from celery.exceptions import Retry as CeleryRetry
+
     try:
         # Save to database (this is the slow operation)
         detections_count, classifications_count, captures_count = 0, 0, 0
         ...
+    except CeleryRetry:
+        raise
     except Exception as e:
         job.logger.error(
             f"Failed to process pipeline result for job {job_id}: {e}. NATS will redeliver the task message."
         )

129-184: ⚠️ Potential issue | 🟠 Major

NATS ack at line 147 precedes the results-stage progress update — if the latter fails, the message is lost.

After _ack_task_via_nats succeeds (line 147), any exception in the results-stage update_state or _update_job_progress (lines 150–179) is caught by the except at line 181 without re-raising. The error message claims "NATS will redeliver the task message," but the message has already been acknowledged. This could leave the job stuck with a results stage that never reaches 100%.

Consider either moving the ack after the results-stage progress update, or re-raising the exception so Celery retries the task (NATS redelivery would then handle it since ack didn't happen).

🧹 Nitpick comments (1)
ami/ml/orchestration/task_state.py (1)

144-208: Cumulative counters are shared across stages — works but is implicit.

The detection/classification/capture counters use a single set of Redis keys (not per-stage), so the "process" stage always contributes 0 and the "results" stage contributes actual counts. This works correctly given the current call sites in tasks.py, but it's fragile if a future caller passes non-zero counts for the "process" stage — they'd be silently accumulated.

A brief comment noting that counters are only expected to be incremented during the "results" stage would help future maintainners.

mihow and others added 2 commits February 12, 2026 16:57
Clarify that this dataclass tracks job-level progress in Redis,
not individual task/image progress. Aligns with the naming of
JobProgress (the Django/Pydantic model equivalent).

Co-Authored-By: Claude <noreply@anthropic.com>
Mark connection handling as done (PR RolnickLab#1130), add worktree/remote
mapping and docker testing notes for future sessions.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Collaborator

@mihow mihow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carlosgjs what do you think about using the existing Job model to track the progress & failure counts directly? instead of adding counts in Redis then copying to the Job model. Also I am a bit confused about what the "Tasks" are in Redis now, since they have counts for the Job's total captures, failures, etc. Or are those counts for a single batch? The current job model counts the progress & failures per stage type, so there isn't a single count for all failures. Or if there is a reason to use Redis I am open to that! But maybe we can update the naming & docstrings.

Also, I am thinking we should simplify the logic determining if a Job is in the FAILURE state. Let's just show the counts. Really we need a new state like "COMPLETED" instead of Celery's SUCCESS & FAILURE states. "Completed with errors". Then we can remove a number of checks related to the stage status & overall status.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
ami/ml/orchestration/jobs.py (1)

15-21: ⚠️ Potential issue | 🟡 Minor

Stale docstring reference to TaskStateManager.

Line 19 still says "Redis state (via TaskStateManager.cleanup)" but the class has been renamed to AsyncJobStateManager.

📝 Proposed fix
-    1. Redis state (via TaskStateManager.cleanup):
+    1. Redis state (via AsyncJobStateManager.cleanup):
ami/ml/tests.py (1)

860-861: ⚠️ Potential issue | 🟡 Minor

Stale class docstring.

Line 861 says "Test TaskStateManager" but the class under test is now AsyncJobStateManager.

📝 Proposed fix
 class TestTaskStateManager(TestCase):
-    """Test TaskStateManager for job progress tracking."""
+    """Test AsyncJobStateManager for job progress tracking."""
ami/jobs/test_tasks.py (1)

89-93: ⚠️ Potential issue | 🟡 Minor

Stale docstring reference.

Line 92 still says "Assert TaskStateManager state is correct" — should reference AsyncJobStateManager.

📝 Proposed fix
     def _assert_progress_updated(
         self, job_id: int, expected_processed: int, expected_total: int, stage: str = "process"
     ):
-        """Assert TaskStateManager state is correct."""
+        """Assert AsyncJobStateManager state is correct."""
🤖 Fix all issues with AI agents
In `@docs/claude/nats-todo.md`:
- Around line 132-159: Fix three minor typos in the notes: replace the word
"prevous" with "previous", change "hbs" to "has", and correct "SLUM" to "SLURM"
in the nats-todo notes (search for the exact tokens "prevous", "hbs", and "SLUM"
to locate the spots to update).
- Around line 86-93: The doc exposes an internal IP (192.168.123.176) in the
NATS monitoring instructions; update the text to avoid leaking internal network
addresses by replacing the IP with the hostname (e.g., ami-redis-1) or a
placeholder like <internal-host> and/or remove the IP entirely, and ensure the
example line that reads "http://192.168.123.176:8222" is changed to use the
hostname or placeholder so the SSH tunnel instructions and the dashboard server
URL examples remain accurate and non-sensitive.
- Around line 10-16: The doc contains sensitive infrastructure topology
(hostnames like ami-cc/ami-worker-2 and SSH commands) that should be removed;
replace those concrete hostnames/SSH commands with a brief, non-sensitive
operational note and/or move detailed steps to the private ops repo or wiki, and
redact the specific commands from the document while keeping the relevant config
detail (the NATS_URL change from the old default to nats://nats:4222) documented
generically; search for occurrences of NATS_URL and the nats://nats:4222 default
in the docs entry and replace the explicit SSH/host examples with a short,
environment-agnostic remediation note pointing to the private ops location.

In `@docs/claude/planning/pr-trackcounts-next-session.md`:
- Around line 39-51: Update the stale references in the document: replace any
mention of the old class name TaskStateManager with AsyncJobStateManager and
update the file reference ami/ml/orchestration/task_state.py to
ami/ml/orchestration/async_job_state.py (also check nearby mentions of
TaskStateManager/ task_state.py and adjust them to AsyncJobStateManager/
async_job_state.py so all references match the renamed class and file).
🧹 Nitpick comments (1)
ami/ml/orchestration/async_job_state.py (1)

144-208: _commit_update relies on the caller holding the lock — document and enforce this.

The read-modify-write cycle on counters (lines 171–181) and the failed set (lines 184–190) is safe only because update_state serializes access via the Redis lock. However, _commit_update is callable directly (and tests do so), and has no guard of its own.

The docstring on line 154 says "Must be called under lock," which is good. Consider raising or asserting if the lock is not held, or at minimum note in the docstring that direct calls outside tests are unsafe.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ami/ml/orchestration/async_job_state.py (1)

69-86: ⚠️ Potential issue | 🟡 Minor

Stale docstring: references non-existent parameters.

The docstring for update_state documents detections_count, classifications_count, and captures_count parameters (Lines 83–85) that don't exist in the method signature. These appear to be leftovers from an earlier iteration.

✏️ Proposed fix
         Args:
             processed_image_ids: Set of image IDs that have just been processed
             stage: The processing stage ("process" or "results")
             request_id: Unique identifier for this processing request
-            detections_count: Number of detections to add to cumulative count
-            classifications_count: Number of classifications to add to cumulative count
-            captures_count: Number of captures to add to cumulative count
             failed_image_ids: Set of image IDs that failed processing (optional)
ami/jobs/tasks.py (1)

129-181: ⚠️ Potential issue | 🟠 Major

If an error occurs after the NATS ACK (Line 147), the results-stage progress will never be updated, potentially leaving the job stuck.

The except on Line 178 catches all exceptions and logs them, but doesn't re-raise. Since the NATS message was already acknowledged, it won't be redelivered. If state_manager.update_state (Line 150) or _update_job_progress (Line 168) fails, the "results" stage will never reach 100% and the job will remain incomplete.

Also, the error message on Line 180 says "NATS will redeliver the task message," which is incorrect for failures after the ACK.

Consider either:

  • Moving the ACK after the results-stage update, or
  • Re-raising after logging so the Celery task can be retried.
🧹 Nitpick comments (2)
ami/ml/tests.py (2)

860-861: Stale class name in test class docstring.

The docstring still references TaskStateManager but the class was renamed to AsyncJobStateManager.

 class TestTaskStateManager(TestCase):
-    """Test TaskStateManager for job progress tracking."""
+    """Test AsyncJobStateManager for job progress tracking."""

1018-1036: Direct access to _failed_key is fine for whitebox testing, but cache.get on Line 1028 could return None if the test setup changes.

Currently this is safe because the preceding _commit_update call guarantees the key exists. Just be aware this is a fragile coupling—if _commit_update's caching behavior changes, len(failed_set) on Line 1029 would raise TypeError.

@mihow
Copy link
Collaborator

mihow commented Feb 17, 2026

@carlosgjs what do you think about using the existing Job model to track the progress & failure counts directly? instead of adding counts in Redis then copying to the Job model. Also I am a bit confused about what the "Tasks" are in Redis now, since they have counts for the Job's total captures, failures, etc. Or are those counts for a single batch? The current job model counts the progress & failures per stage type, so there isn't a single count for all failures. Or if there is a reason to use Redis I am open to that! But maybe we can update the naming & docstrings.

Also, I am thinking we should simplify the logic determining if a Job is in the FAILURE state. Let's just show the counts. Really we need a new state like "COMPLETED" instead of Celery's SUCCESS & FAILURE states. "Completed with errors". Then we can remove a number of checks related to the stage status & overall status.

I added a ticket for the COMPLETED status here #1134

Responses from Carlos about keeping the counts in Redis:

Mostly to keep all the progress tracked in a uniform way. I agree we could use the JobProgressStageDetail and key value pairs in there to store some (e.g. classifications/detections/captures). However, this means that some values would be tracked in the job directly while others are tracked in Redis, making the code more complicated. Some metrics like processed and failed need to be computed as sets due to potential retries, hence the need for redis.
Carlos  [9:04 AM]

I've renamed the state tracking class to AsyncJobStateManager and moved the tracking of classifications/detections/captures to the job. Good call, it did require a bit of special casing but I think it's simpler overall.
[9:05 AM]tested with a 100 image job in my local env (edited) 
[9:09 AM]oh wait, I need to fix one thing. When the jobs are retried the counts need to be reset to 0 ..., which wasn't needed with redis

mihow added a commit that referenced this pull request Feb 17, 2026
mihow and others added 2 commits February 16, 2026 21:09
Co-Authored-By: Claude <noreply@anthropic.com>
Explain the relationship between AsyncJobStateManager (Redis),
JobProgress (JSONB), and JobState (enum). Clarify that all counts
in JobStateProgress refer to source images (captures).

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow merged commit 3d654e6 into RolnickLab:main Feb 17, 2026
7 checks passed
carlosgjs added a commit to uw-ssec/antenna that referenced this pull request Feb 18, 2026
* merge

* Update ML job counts in async case

* Update date picker version and tweak layout logic (RolnickLab#1105)

* fix: update date picker version and tweak layout logic

* feat: set start month based on selected date

* fix: Properly handle async job state with celery tasks  (RolnickLab#1114)

* merge

* fix: Properly handle async job state with celery tasks

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Delete implemented plan

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* PSv2: Implement queue clean-up upon job completion (RolnickLab#1113)

* merge

* feat: PSv2 - Queue/redis clean-up upon job completion

* fix: catch specific exception

* chore: move tests to a subdir

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>

* fix: PSv2: Workers should not try to fetch tasks from v1 jobs (RolnickLab#1118)

Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. 


This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs.

JobDispatchMode enum (ami/jobs/models.py):

internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs.
sync_api — worker calls an external processing service API synchronously and waits for each response.
async_api — worker publishes items to NATS for external processing service workers to pick up independently.
Database and Model Changes:

Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py.
ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled.
ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset).
API and Filtering:

dispatch_mode is exposed (read-only) in job list and detail serializers.
Filterable via query parameter: ?dispatch_mode=async_api
The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch.
Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping.

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>

* PSv2 cleanup: use is_complete() and dispatch_mode in job progress handler (RolnickLab#1125)

* refactor: use is_complete() and dispatch_mode in job progress handler

Replace hardcoded `stage == "results"` check with `job.progress.is_complete()`
which verifies ALL stages are done, making it work for any job type.

Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API`
which is immutable for the job's lifetime and more correct than re-reading
a mutable flag that could change between job creation and completion.

Co-Authored-By: Claude <noreply@anthropic.com>

* test: update cleanup tests for is_complete() and dispatch_mode checks

Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard.
Complete all stages (collect, process, results) in the completion test
since is_complete() correctly requires all stages to be done.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

* track captures and failures

* Update tests, CR feedback, log error images

* CR feedback

* fix type checking

* refactor: rename _get_progress to _commit_update in TaskStateManager

Clarify naming to distinguish mutating vs read-only methods:
- _commit_update(): private, writes mutations to Redis, returns progress
- get_progress(): public, read-only snapshot (added in RolnickLab#1129)
- update_state(): public API, acquires lock, calls _commit_update()

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: unify FAILURE_THRESHOLD and convert TaskProgress to dataclass

- Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py
- Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match
  the sync path's boundary behavior at exactly 50%
- Convert TaskProgress from namedtuple to dataclass with defaults,
  so new fields don't break existing callers

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: rename TaskProgress to JobStateProgress

Clarify that this dataclass tracks job-level progress in Redis,
not individual task/image progress. Aligns with the naming of
JobProgress (the Django/Pydantic model equivalent).

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: update NATS todo and planning docs with session learnings

Mark connection handling as done (PR RolnickLab#1130), add worktree/remote
mapping and docker testing notes for future sessions.

Co-Authored-By: Claude <noreply@anthropic.com>

* Rename TaskStateManager to AsyncJobStateManager

* Track results counts in the job itself vs Redis

* small simplification

* Reset counts to 0 on reset

* chore: remove local planning docs from PR branch

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: clarify three-layer job state architecture in docstrings

Explain the relationship between AsyncJobStateManager (Redis),
JobProgress (JSONB), and JobState (enum). Clarify that all counts
in JobStateProgress refer to source images (captures).

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Anna Viklund <annamariaviklund@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

Comments