Skip to content

PSv2: Use connection pooling and retries for NATS#1130

Open
carlosgjs wants to merge 21 commits intoRolnickLab:mainfrom
uw-ssec:carlosg/natsconn
Open

PSv2: Use connection pooling and retries for NATS#1130
carlosgjs wants to merge 21 commits intoRolnickLab:mainfrom
uw-ssec:carlosg/natsconn

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Feb 11, 2026

Summary

This pull request introduces improvements of the NATS JetStream task queue management to improve connection reliability, efficiency, and error handling. The main change is the introduction of a process-local NATS connection pool, which replaces the previous pattern of creating and closing connections for every operation. The code now uses retry logic with exponential backoff for all NATS operations. The context manager pattern for TaskQueueManager is removed, and all methods are updated to use the shared connection pool. Several methods are now decorated to automatically retry on connection errors.

Testing

Tested locally with multiple runs of 100 images, verifying tasks are acknowledges and NATS resources cleaned up.
image

Checklist

  • [ x] I have tested these changes appropriately.
  • [ x] I have added and/or modified relevant tests.
  • [ x] I updated relevant documentation or comments.
  • [ x] I have verified that this PR follows the project's coding standards.
  • [ x] Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Added automatic retry with backoff for transient NATS connection errors.
  • Refactor

    • Moved to a process-local, event-loop-bound pooled NATS connection model and lifecycle-managed access for queue operations.
    • Switched callers from context-manager usage to direct manager instantiation.
  • Bug Fixes

    • Improved handling of missing stream/consumer and timeout/no-message scenarios; best-effort cleanup with per-operation retries.
  • Tests

    • Expanded tests for connection pool lifecycle, retry behavior, ack/cleanup flows, and error handling.
  • Documentation

    • Updated test/run command docs to use CI docker-compose file.

@netlify
Copy link

netlify bot commented Feb 11, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit c7b2014
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/6993bde5a082900008382202

@netlify
Copy link

netlify bot commented Feb 11, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit c7b2014
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/6993bde5eb6c4f00089f084a

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Introduces a per-event-loop NATS ConnectionPool and retry decorator, refactors TaskQueueManager to use pool-provided (nc, js) per operation, and updates publish/reserve/ack/delete/cleanup flows and tests to use lifecycle-managed connections with retry/backoff.

Changes

Cohort / File(s) Summary
Connection pool & API
ami/ml/orchestration/nats_connection.py
Adds event-loop-scoped ConnectionPool and ContextManagerConnection, plus module-level get_connection() and reset_connection() to provide/ reset (nc, js) with lazy locking and reconnection logic.
Task queue, retries & pool integration
ami/ml/orchestration/nats_queue.py
Adds retry_on_connection_error decorator; refactors TaskQueueManager to use _get_connection() (pool-backed) for publish/reserve/ack/delete/cleanup; handles NotFoundError/TimeoutError, per-operation retries and backoff.
NATS tests
ami/ml/orchestration/tests/test_nats_queue.py, ami/ml/orchestration/tests/test_nats_connection.py
Reworks NATS mocks to a context-managed pool setup, adds tests for retry/reset behavior and comprehensive ConnectionPool lifecycle (create, reuse, reconnect, close, reset) and module-level functions.
Cleanup tests update
ami/ml/orchestration/tests/test_cleanup.py
Adjusts checks to obtain JetStream context via TaskQueueManager._get_connection() and use returned js for stream/consumer verifications.
Call-site lifecycle changes
ami/jobs/tasks.py, ami/jobs/views.py, ami/ml/orchestration/jobs.py
Replaces several async with TaskQueueManager() usages with plain TaskQueueManager() instantiation and direct method calls, shifting resource-management semantics away from context-manager teardown.
Docs / CI command tweak
.agents/AGENTS.md
Updates CI/test docker-compose invocation examples to use docker-compose.ci.yml.

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant TQM as TaskQueueManager
    participant Retry as RetryDecorator
    participant Pool as ConnectionPool
    participant NATS as NATS Client
    participant JS as JetStream

    App->>TQM: publish_task(job_id, task)
    activate TQM
    TQM->>Retry: wrapped call
    activate Retry
    Retry->>TQM: attempt
    TQM->>Pool: _get_connection()
    activate Pool
    Pool->>NATS: ensure/connect (nc)
    Pool->>JS: provide JetStreamContext (js)
    Pool-->>TQM: (nc, js)
    deactivate Pool
    TQM->>JS: publish(stream, data)
    JS-->>TQM: ack / error
    alt Connection error
        TQM-->>Retry: raise connection error
        Retry->>Pool: reset_connection()
        Retry->>Retry: backoff wait
        Retry->>TQM: retry attempt (up to max)
    end
    Retry-->>App: result
    deactivate Retry
    deactivate TQM

    App->>TQM: reserve_task(job_id, timeout)
    activate TQM
    TQM->>Retry: wrapped call
    activate Retry
    Retry->>TQM: attempt
    TQM->>Pool: _get_connection()
    Pool-->>TQM: (nc, js)
    TQM->>JS: pull_subscribe + fetch(timeout)
    alt Message received
        JS-->>TQM: message -> PipelineProcessingTask
        Retry-->>App: task
    else Timeout
        JS-->>TQM: TimeoutError
        Retry-->>App: None
    end
    deactivate Retry
    deactivate TQM
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

backend, ml

Poem

🐇 I hopped into the pool today,

Loops kept snug where connections stay,
Retries nudge with gentle beat,
Backoff twitches, then we meet,
Streams and ACKs — a tidy play.

🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: introducing connection pooling and retries for NATS operations, which is the primary focus of this PR.
Description check ✅ Passed The description covers the summary, main changes, testing performed, and checklist; however, it lacks a 'Related Issues' section, 'Detailed Description' section with potential side effects/risks, explicit 'How to Test' instructions, and 'Deployment Notes' section.
Docstring Coverage ✅ Passed Docstring coverage is 82.81% which is sufficient. The required threshold is 80.00%.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@carlosgjs carlosgjs requested a review from mihow February 11, 2026 18:13
@carlosgjs carlosgjs marked this pull request as ready for review February 11, 2026 18:21
Copilot AI review requested due to automatic review settings February 11, 2026 18:21
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors NATS JetStream task-queue interactions to reuse a shared (process-local) connection and to add retry/backoff behavior for improved reliability and reduced connection churn.

Changes:

  • Introduces a new ConnectionPool module to lazily create and reuse a NATS + JetStream connection.
  • Updates TaskQueueManager to always obtain connections from the pool, removes async context-manager lifecycle, and adds a retry decorator for connection-related failures.
  • Updates call sites and unit tests to match the new non-context-manager usage.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
ami/ml/orchestration/tests/test_nats_queue.py Refactors tests to mock the new connection pool and remove context-manager assumptions.
ami/ml/orchestration/nats_queue.py Adds retry decorator, removes per-operation connection creation, and routes all operations through the shared pool.
ami/ml/orchestration/nats_connection_pool.py New module implementing a process-local cached NATS connection + JetStream context.
ami/ml/orchestration/jobs.py Removes async with TaskQueueManager() usage in orchestration job helpers.
ami/jobs/views.py Removes async with TaskQueueManager() usage in the tasks endpoint.
ami/jobs/tasks.py Removes async with TaskQueueManager() usage when ACKing tasks from Celery.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/nats_connection_pool.py`:
- Around line 104-112: get_pool() can race when multiple threads call it
concurrently; make initialization of the module-global _connection_pool
thread-safe by introducing a module-level threading.Lock (e.g.,
_connection_pool_lock) and use a double-checked locking pattern inside
get_pool(): check _connection_pool, acquire _connection_pool_lock, check again,
and only then instantiate ConnectionPool and assign to _connection_pool; ensure
you import threading and keep the global _connection_pool and lock declarations
at module scope so get_pool() uses them.
- Around line 88-97: The reset() method currently nulls out self._nc and
self._js and leaks an active NATS connection; modify reset (or add an async
reset_async) to attempt to close/drain the existing connection before clearing
references: if self._nc exists, call its close/drain routine (await if reset
becomes async, or schedule with asyncio.create_task(self._nc.close()) if keeping
reset synchronous) wrapped in try/except to swallow errors, then set self._nc =
None and self._js = None; update any callers (including the retry decorator) to
call the async version or rely on the scheduled background close so the old TCP
socket is not leaked.

In `@ami/ml/orchestration/nats_queue.py`:
- Line 318: The log message contains a missing space between "job" and the job
id; update the logger.info call that logs stream deletion (the line using
logger.info with f"Deleted stream {stream_name} for job'{job_id}'") to insert a
space so it reads f"Deleted stream {stream_name} for job '{job_id}'", leaving
the surrounding code unchanged.
🧹 Nitpick comments (6)
ami/ml/orchestration/jobs.py (1)

107-109: Use logger.exception to preserve the stack trace.

When a publish fails, the traceback is valuable for diagnosing whether it's a connection issue that exhausted retries, a serialization bug, etc.

Proposed fix
             except Exception as e:
-                logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
+                logger.exception(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
                 success = False
ami/ml/orchestration/tests/test_nats_queue.py (2)

88-100: Consider adding a test for the TimeoutError path in reserve_task.

This test mocks fetch to return [], which exercises the if msgs: falsy branch. However, in practice, NATS's fetch() raises nats_errors.TimeoutError when no messages are available (handled at Line 243 of nats_queue.py). Consider adding a test case where fetch raises TimeoutError to cover that code path as well.

Additional test case
async def test_reserve_task_timeout(self):
    """Test reserve_task when fetch raises TimeoutError (no messages)."""
    from nats import errors as nats_errors

    with self._mock_nats_setup() as (_, js, _):
        mock_psub = MagicMock()
        mock_psub.fetch = AsyncMock(side_effect=nats_errors.TimeoutError)
        mock_psub.unsubscribe = AsyncMock()
        js.pull_subscribe = AsyncMock(return_value=mock_psub)

        manager = TaskQueueManager()
        task = await manager.reserve_task(123)

        self.assertIsNone(task)
        mock_psub.unsubscribe.assert_called_once()

49-131: No tests for the retry/backoff behavior.

The retry_on_connection_error decorator is a core part of this PR. Consider adding at least one test that verifies a retried operation succeeds on a subsequent attempt after a connection error, and one that verifies the error is raised after exhausting retries. This would validate the most important new behavior introduced.

ami/ml/orchestration/nats_connection_pool.py (1)

50-58: Fast-path check outside the lock is fine for asyncio but worth a comment.

The health check at line 51 and stale-connection cleanup at lines 55-58 happen outside the lock, relying on asyncio's cooperative scheduling (no preemption between awaits). This is correct but non-obvious. A brief inline comment would help future readers.

Suggested comment
         # Fast path: connection exists, is open, and is connected
+        # Safe without lock: no await/yield between check and return (cooperative scheduling)
         if self._nc is not None and not self._nc.is_closed and self._nc.is_connected:
             return self._nc, self._js  # type: ignore
ami/ml/orchestration/nats_queue.py (2)

169-197: Redundant _get_connection() call — js fetched at line 182 is also re-fetched inside _ensure_stream and _ensure_consumer.

publish_task fetches js at line 182, but _ensure_stream (line 124) and _ensure_consumer (line 144) each fetch their own js from the pool internally. The js from line 182 is only used at line 194. This works because the pool returns the same object, but the pattern is slightly misleading — it looks like the connection from line 182 is used throughout when it's not.

Consider either (a) passing js into _ensure_stream/_ensure_consumer, or (b) moving the _get_connection() call to after the ensure calls since that's where it's first needed. Same applies to reserve_task.


199-249: reserve_task: implicit None return when msgs is empty.

If psub.fetch() returns an empty list (rather than raising TimeoutError), the function falls through the if msgs: block and except clause, runs finally, and returns None implicitly. This is technically correct per the return type, but an explicit return None after the if msgs: block would make the intent clearer.

Suggested improvement
             if msgs:
                 msg = msgs[0]
                 task_data = json.loads(msg.data.decode())
                 metadata = msg.metadata
 
                 # Parse the task data into PipelineProcessingTask
                 task = PipelineProcessingTask(**task_data)
                 # Set the reply_subject for acknowledgment
                 task.reply_subject = msg.reply
 
                 logger.debug(f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}")
                 return task
+            return None
 
         except nats_errors.TimeoutError:

carlosgjs and others added 2 commits February 11, 2026 11:58
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@mihow
Copy link
Collaborator

mihow commented Feb 12, 2026

Hi Carlos, here is a Claude-written comment after I spent some time with it reviewing these changes. Ha.


The retry decorator is a great addition — the nats.py client doesn't buffer operations during reconnection (unlike the Go client), so application-level retry is genuinely needed.

I think we can simplify the connection pooling, it may be unnecessary for our scale after all.

We run ~3 Celery workers on separate VMs. Since async_to_sync() is blocking, each worker only has one NATS connection alive at a time even with the old open/close pattern. The 2000 TCP connections concern from my original feedback was about connect/disconnect cycles, not concurrent connections. The overhead is real but modest — TCP handshake + TLS (if applicable) per operation.

More importantly, the event-loop-keyed WeakKeyDictionary pool adds complexity that's hard to reason about:

  • async_to_sync() (from asgiref/Django) creates ephemeral event loops per call, so the pool entries may not actually be reused between calls — defeating the purpose
  • reset() clears self._lock = None which can race with concurrent _ensure_lock() calls on the same pool instance
  • The pool fights NATS's built-in reconnection: get_connection() discards a client in RECONNECTING state and creates a new one, rather than letting the client finish reconnecting on its own

Suggested simplification

Keep the retry decorator (real value) and drop the pool. The connection scoping is already natural:

  1. queue_images_to_nats — already wraps the entire publish loop in one async_to_sync() call, so one connection covers all publishes. No change needed.
  2. _ack_task_via_nats — one connection per ACK is fine at our scale. The retry decorator handles transient failures.
  3. reserve_task (views.py) — one connection per batch fetch, already scoped correctly.

Concretely, this would mean:

  • Delete nats_connection_pool.py
  • Keep retry_on_connection_error decorator in nats_queue.py
  • Restore the async with TaskQueueManager() context manager (or just create+close within each method), but keep the retry wrapping
  • Optionally: pass reconnected_cb / disconnected_cb to nats.connect() for logging, since the defaults (allow_reconnect=True, 60 attempts, 2s apart) already handle socket-level reconnection

This keeps the retry logic (the real fix) while avoiding the event-loop-keyed pool complexity. What do you think?

carlosgjs and others added 3 commits February 12, 2026 16:16
…rop pool

Replace the event-loop-keyed WeakKeyDictionary connection pool with a
straightforward async context manager on TaskQueueManager. Each
async_to_sync() call now scopes one connection for its block of
operations (e.g. queue_all_images reuses one connection for all
publishes, _ack_task_via_nats gets one for the single ACK).

The retry decorator is preserved — on connection error it closes the
stale connection so the next _get_connection() creates a fresh one.
Also adds reconnected_cb/disconnected_cb logging callbacks to
nats.connect() and narrows bare except clauses to NotFoundError.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit to uw-ssec/antenna that referenced this pull request Feb 13, 2026
Mark connection handling as done (PR RolnickLab#1130), add worktree/remote
mapping and docker testing notes for future sessions.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow and others added 3 commits February 13, 2026 12:02
… churn

Reverts c384199 which replaced the event-loop-keyed connection pool with
a plain async context manager. The context manager approach opened and
closed a TCP connection per async block, causing ~1500 connections per
1000-image job (250-500 for task fetches, 1000 for ACKs).

The connection pool keeps one persistent connection per event loop and
reuses it across all TaskQueueManager operations.

Co-Authored-By: Claude <noreply@anthropic.com>
Extract connection pool to a pluggable design with two strategies:
- "pool" (default): persistent connection reuse per event loop
- "per_operation": fresh TCP connection each time, for debugging

Controlled by NATS_CONNECTION_STRATEGY Django setting. Both strategies
implement the same interface (get_connection, reset, close) so
TaskQueueManager is agnostic to which one is active.

Changes:
- Rename nats_connection_pool.py to nats_connection.py
- Rename get_pool() to get_provider()
- Use settings.NATS_URL directly instead of getattr with divergent defaults
- Narrow except clauses in _ensure_stream/_ensure_consumer to NotFoundError
- Add _js guard to fast path, add strategy logging
- Enhanced module and class docstrings

Co-Authored-By: Claude <noreply@anthropic.com>
Remove the switchable strategy pattern (Protocol + factory + Django setting)
and expose the connection pool directly via module-level get_connection() and
reset_connection() functions. The PerOperationConnection is archived as
ContextManagerConnection for reference. Remove NATS_CONNECTION_STRATEGY setting.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/nats_connection.py`:
- Around line 186-192: The except block that catches RuntimeError from
asyncio.get_running_loop() should preserve the original exception chain: capture
the caught exception (e.g., "except RuntimeError as err:") and re-raise the
RuntimeError with your existing message using "raise ... from err" so the
original traceback is linked; update the block inside get_connection()/where
get_running_loop() is called to use "except RuntimeError as err" and "raise
RuntimeError(... ) from err".
- Around line 1-27: The pool keyed by event loop (ConnectionPool /
WeakKeyDictionary) doesn't guarantee reuse when callers use async_to_sync(), so
either confirm your runtime actually reuses the same event loop across
async_to_sync() or switch to a per-call connection strategy: remove/stop using
the loop-keyed pool lookup in ConnectionPool (and associated WeakKeyDictionary),
instead create and close a fresh nats.Client per operation (or default to
ContextManagerConnection) and rely on the existing retry_on_connection_error /
reset_connection logic for resilience; update any helper functions that call
ConnectionPool.create/get to use the per-call connect/close flow and keep
ContextManagerConnection as the simple fallback.

In `@ami/ml/orchestration/nats_queue.py`:
- Around line 253-276: The finally block currently awaits psub.unsubscribe()
which can raise and mask earlier exceptions and also makes the empty-msgs path
implicit; change the flow so that after calling psub.fetch(1, timeout=timeout)
you explicitly return None if msgs is falsy, and move the unsubscribe call into
its own try/except that catches and logs/unwarns errors without suppressing the
original exception (i.e., if an exception is active, call psub.unsubscribe()
inside a nested try/except and re-raise the original exception; if no exception,
safely await unsubscribe and log any unsubscribe errors). Update the block that
constructs PipelineProcessingTask and the handling around
psub.fetch/psub.unsubscribe accordingly so psub.unsubscribe failures do not hide
earlier exceptions.
- Line 217: Replace the deprecated Pydantic v1 call: in the line that builds
task_data (task_data = json.dumps(data.dict())), call data.model_dump() instead
so it uses Pydantic v2; update the expression that creates task_data in
nats_queue.py (the variable task_data and the object data) to use model_dump()
to avoid deprecation warnings and preserve the same serialized structure.
🧹 Nitpick comments (4)
ami/ml/orchestration/nats_connection.py (3)

59-68: Race between reset() and _ensure_lock() when self._lock is set to None.

reset() (line 137) sets self._lock = None. If a concurrent coroutine has already passed the self._lock is None check at line 66 but hasn't yet assigned the new lock, reset() could clear the newly-created lock. In practice, within a single-threaded asyncio event loop, cooperative scheduling means there's no true interleaving between lines 66–67. However, _ensure_lock should still be tightened to avoid fragility if reset() ever runs concurrently (e.g., from a callback):

Suggested improvement
     async def reset(self):
         ...
         self._nc = None
         self._js = None
-        self._lock = None  # Clear lock so new one is created for fresh connection
+        self._lock = None  # Will be lazily recreated on next get_connection()

Consider documenting that reset() must only be called from the same event loop that owns this pool (which is already implied by the design).


80-107: Fast-path state mutation outside the lock may confuse future maintainers.

Lines 86–89 set self._nc = None and self._js = None before acquiring the lock. While safe in a cooperative single-threaded asyncio context (the double-check at line 95 handles the reconnection), this pattern is unusual for a lock-protected resource and could lead to bugs if the code is ever adapted for truly concurrent access. Consider moving the clearing inside the lock:

Suggested refactor
-        # Connection is stale or doesn't exist — clear references before reconnecting
-        if self._nc is not None:
-            logger.warning("NATS connection is closed or disconnected, will reconnect")
-            self._nc = None
-            self._js = None
-
         # Slow path: acquire lock to prevent concurrent reconnection attempts
         lock = self._ensure_lock()
         async with lock:
             # Double-check after acquiring lock (another coroutine may have reconnected)
             if self._nc is not None and self._js is not None and not self._nc.is_closed and self._nc.is_connected:
                 return self._nc, self._js
 
+            # Connection is stale or doesn't exist — clear references before reconnecting
+            if self._nc is not None:
+                logger.warning("NATS connection is closed or disconnected, will reconnect")
+            self._nc = None
+            self._js = None
+
             nats_url = settings.NATS_URL

140-175: ContextManagerConnection is dead code — consider removing it.

This class is described as "archived" and "kept as a drop-in fallback," but it is never referenced anywhere in the codebase. Keeping dead code increases the maintenance surface. If it's valuable as documentation, a comment or ADR would serve better.

#!/bin/bash
# Verify ContextManagerConnection is not used anywhere
rg -n "ContextManagerConnection" --type=py
ami/ml/orchestration/nats_queue.py (1)

196-224: _ensure_stream and _ensure_consumer are called on every publish_task invocation.

For a 1000-image job, this results in ~2000 extra stream_info/consumer_info round trips to NATS, even though the stream and consumer only need to be created once. Consider caching the "ensured" status per job_id for the lifetime of the event loop (e.g., a set on the manager or pool), or moving the ensure calls to a separate setup method invoked once before the publish loop.

Sketch
+    _ensured_jobs: set[int] = set()  # class-level or instance-level cache
+
+    async def _ensure_resources(self, job_id: int):
+        if job_id in self._ensured_jobs:
+            return
+        await self._ensure_stream(job_id)
+        await self._ensure_consumer(job_id)
+        self._ensured_jobs.add(job_id)
+
     `@retry_on_connection_error`(max_retries=2, backoff_seconds=0.5)
     async def publish_task(self, job_id: int, data: PipelineProcessingTask) -> bool:
         _, js = await self._get_connection()
-        await self._ensure_stream(job_id)
-        await self._ensure_consumer(job_id)
+        await self._ensure_resources(job_id)
         ...

mihow and others added 3 commits February 13, 2026 15:13
The docstring previously implied the pool reused connections across all
async_to_sync() calls. In practice, each async_to_sync() creates a new
event loop, so reuse only happens within a single boundary. Updated to
be explicit about where the pool helps (bulk publishes, batch reserves)
and where it doesn't (single-operation calls like ACKs).

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/nats_connection.py`:
- Around line 122-142: The reset() method currently sets self._lock = None which
allows another coroutine to create a new lock while reset is awaiting
self._nc.close(), enabling two coroutines to enter the critical "slow path" in
get_connection(); fix this by removing the line that clears the lock and only
reset connection state (keep self._lock intact), i.e., in reset() clear/close
self._nc and self._js but do not assign self._lock = None so _ensure_lock()
continues to use the same event-loop-bound lock instance.
🧹 Nitpick comments (1)
ami/ml/orchestration/nats_connection.py (1)

85-112: Add is_reconnecting check to prevent discarding clients during automatic reconnection.

The fast-path check on line 87 rejects clients in the RECONNECTING state (is_connected=False), causing them to be cleared and forcibly reconnected instead of allowing nats.py's automatic reconnection logic to complete. This creates redundant TCP connections during temporary outages.

The suggested fix is correct and prevents this regression:

Suggested change
-        if self._nc is not None and self._js is not None and not self._nc.is_closed and self._nc.is_connected:
+        if self._nc is not None and self._js is not None and not self._nc.is_closed and (self._nc.is_connected or self._nc.is_reconnecting):
             return self._nc, self._js

Note: The same pattern exists in the double-check at line 100 and should also be updated for consistency.

The _setup_mock_nats helper was configuring TaskQueueManager as an async
context manager (__aenter__/__aexit__), but _ack_task_via_nats uses plain
instantiation. The await on a non-awaitable MagicMock failed silently in
the except clause, causing acknowledge_task assertions to always fail.

Co-Authored-By: Claude <noreply@anthropic.com>
- Use logger.exception instead of logger.error in jobs.py for stack traces
- Add explicit return None in reserve_task for empty message list
- Wrap psub.unsubscribe() in try/except to prevent exception masking
- Add test for TimeoutError path in reserve_task
- Expand module docstring with call paths, concurrency model, and
  design rationale for is_reconnecting and lock clearing decisions

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow
Copy link
Collaborator

mihow commented Feb 17, 2026

from Claude:

Integration testing finding: event loop blocking when NATS is unreachable

While running end-to-end integration tests on the integration/psv2 branch (all 4 PSv2 PRs merged), I hit a case where Django becomes completely unresponsive when NATS is temporarily unreachable (e.g. after a container restart or network blip).

Root cause

nats.connect() in nats_connection.py:106 uses nats-py's default settings:

  • max_reconnect_attempts=60
  • connect_timeout=2s
  • reconnect_time_wait=2s

That's ~240 seconds of retries per connect attempt. Since DRF views use async_to_sync() which routes coroutines onto the uvicorn event loop, this blocks all HTTP request handling for the duration. The retry_on_connection_error decorator then retries the whole operation up to 2 more times, so worst case is ~12 minutes of total blocking.

Fix that unblocked testing

# nats_connection.py line 106
self._nc = await nats.connect(nats_url, allow_reconnect=False, connect_timeout=5)

This makes the initial connect fail fast (~5s) instead of retrying internally. The higher-level retry_on_connection_error decorator still provides retry-with-backoff, so resilience is preserved — just without the nested retry loop.

The same change should be applied to the ContextManagerConnection class at line 167.

The rest of the PR is fine

The connection pool design (per-event-loop keying, lock for concurrent reconnection, WeakKeyDictionary cleanup) is sound. It's just the nats-py client defaults that are too aggressive for use on a shared ASGI event loop.

The other three PSv2 PRs (#1076, #1117, #1121) all passed integration testing — job completed successfully with 20 images, 137 detections, 0 failures, dispatch_mode=async_api.

mihow added a commit that referenced this pull request Feb 17, 2026
mihow added a commit that referenced this pull request Feb 17, 2026
* merge

* Update ML job counts in async case

* Update date picker version and tweak layout logic (#1105)

* fix: update date picker version and tweak layout logic

* feat: set start month based on selected date

* fix: Properly handle async job state with celery tasks  (#1114)

* merge

* fix: Properly handle async job state with celery tasks

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Delete implemented plan

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* PSv2: Implement queue clean-up upon job completion (#1113)

* merge

* feat: PSv2 - Queue/redis clean-up upon job completion

* fix: catch specific exception

* chore: move tests to a subdir

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>

* fix: PSv2: Workers should not try to fetch tasks from v1 jobs (#1118)

Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. 


This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs.

JobDispatchMode enum (ami/jobs/models.py):

internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs.
sync_api — worker calls an external processing service API synchronously and waits for each response.
async_api — worker publishes items to NATS for external processing service workers to pick up independently.
Database and Model Changes:

Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py.
ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled.
ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset).
API and Filtering:

dispatch_mode is exposed (read-only) in job list and detail serializers.
Filterable via query parameter: ?dispatch_mode=async_api
The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch.
Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping.

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>

* PSv2 cleanup: use is_complete() and dispatch_mode in job progress handler (#1125)

* refactor: use is_complete() and dispatch_mode in job progress handler

Replace hardcoded `stage == "results"` check with `job.progress.is_complete()`
which verifies ALL stages are done, making it work for any job type.

Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API`
which is immutable for the job's lifetime and more correct than re-reading
a mutable flag that could change between job creation and completion.

Co-Authored-By: Claude <noreply@anthropic.com>

* test: update cleanup tests for is_complete() and dispatch_mode checks

Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard.
Complete all stages (collect, process, results) in the completion test
since is_complete() correctly requires all stages to be done.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

* track captures and failures

* Update tests, CR feedback, log error images

* CR feedback

* fix type checking

* refactor: rename _get_progress to _commit_update in TaskStateManager

Clarify naming to distinguish mutating vs read-only methods:
- _commit_update(): private, writes mutations to Redis, returns progress
- get_progress(): public, read-only snapshot (added in #1129)
- update_state(): public API, acquires lock, calls _commit_update()

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: unify FAILURE_THRESHOLD and convert TaskProgress to dataclass

- Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py
- Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match
  the sync path's boundary behavior at exactly 50%
- Convert TaskProgress from namedtuple to dataclass with defaults,
  so new fields don't break existing callers

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: rename TaskProgress to JobStateProgress

Clarify that this dataclass tracks job-level progress in Redis,
not individual task/image progress. Aligns with the naming of
JobProgress (the Django/Pydantic model equivalent).

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: update NATS todo and planning docs with session learnings

Mark connection handling as done (PR #1130), add worktree/remote
mapping and docker testing notes for future sessions.

Co-Authored-By: Claude <noreply@anthropic.com>

* Rename TaskStateManager to AsyncJobStateManager

* Track results counts in the job itself vs Redis

* small simplification

* Reset counts to 0 on reset

* chore: remove local planning docs from PR branch

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: clarify three-layer job state architecture in docstrings

Explain the relationship between AsyncJobStateManager (Redis),
JobProgress (JSONB), and JobState (enum). Clarify that all counts
in JobStateProgress refer to source images (captures).

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Anna Viklund <annamariaviklund@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
carlosgjs added a commit to uw-ssec/antenna that referenced this pull request Feb 18, 2026
* merge

* Update ML job counts in async case

* Update date picker version and tweak layout logic (RolnickLab#1105)

* fix: update date picker version and tweak layout logic

* feat: set start month based on selected date

* fix: Properly handle async job state with celery tasks  (RolnickLab#1114)

* merge

* fix: Properly handle async job state with celery tasks

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Delete implemented plan

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* PSv2: Implement queue clean-up upon job completion (RolnickLab#1113)

* merge

* feat: PSv2 - Queue/redis clean-up upon job completion

* fix: catch specific exception

* chore: move tests to a subdir

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>

* fix: PSv2: Workers should not try to fetch tasks from v1 jobs (RolnickLab#1118)

Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. 


This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs.

JobDispatchMode enum (ami/jobs/models.py):

internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs.
sync_api — worker calls an external processing service API synchronously and waits for each response.
async_api — worker publishes items to NATS for external processing service workers to pick up independently.
Database and Model Changes:

Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py.
ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled.
ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset).
API and Filtering:

dispatch_mode is exposed (read-only) in job list and detail serializers.
Filterable via query parameter: ?dispatch_mode=async_api
The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch.
Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping.

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>

* PSv2 cleanup: use is_complete() and dispatch_mode in job progress handler (RolnickLab#1125)

* refactor: use is_complete() and dispatch_mode in job progress handler

Replace hardcoded `stage == "results"` check with `job.progress.is_complete()`
which verifies ALL stages are done, making it work for any job type.

Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API`
which is immutable for the job's lifetime and more correct than re-reading
a mutable flag that could change between job creation and completion.

Co-Authored-By: Claude <noreply@anthropic.com>

* test: update cleanup tests for is_complete() and dispatch_mode checks

Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard.
Complete all stages (collect, process, results) in the completion test
since is_complete() correctly requires all stages to be done.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

* track captures and failures

* Update tests, CR feedback, log error images

* CR feedback

* fix type checking

* refactor: rename _get_progress to _commit_update in TaskStateManager

Clarify naming to distinguish mutating vs read-only methods:
- _commit_update(): private, writes mutations to Redis, returns progress
- get_progress(): public, read-only snapshot (added in RolnickLab#1129)
- update_state(): public API, acquires lock, calls _commit_update()

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: unify FAILURE_THRESHOLD and convert TaskProgress to dataclass

- Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py
- Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match
  the sync path's boundary behavior at exactly 50%
- Convert TaskProgress from namedtuple to dataclass with defaults,
  so new fields don't break existing callers

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: rename TaskProgress to JobStateProgress

Clarify that this dataclass tracks job-level progress in Redis,
not individual task/image progress. Aligns with the naming of
JobProgress (the Django/Pydantic model equivalent).

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: update NATS todo and planning docs with session learnings

Mark connection handling as done (PR RolnickLab#1130), add worktree/remote
mapping and docker testing notes for future sessions.

Co-Authored-By: Claude <noreply@anthropic.com>

* Rename TaskStateManager to AsyncJobStateManager

* Track results counts in the job itself vs Redis

* small simplification

* Reset counts to 0 on reset

* chore: remove local planning docs from PR branch

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: clarify three-layer job state architecture in docstrings

Explain the relationship between AsyncJobStateManager (Redis),
JobProgress (JSONB), and JobState (enum). Clarify that all counts
in JobStateProgress refer to source images (captures).

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Anna Viklund <annamariaviklund@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments