feat(BA-5578): refactor ValkeyClient to async with acquire() pattern#10766
feat(BA-5578): refactor ValkeyClient to async with acquire() pattern#10766jopemachine wants to merge 4 commits intomainfrom
Conversation
…ttern Add an acquire() async context manager to AbstractValkeyClient that wraps operations and tracks consecutive connection failures. When the failure threshold is exceeded, the connection is torn down and reconnected rather than continuing to retry on a broken connection. - Add _VALKEY_CONNECTION_ERRORS tuple for connection error detection - Add acquire() base implementation to AbstractValkeyClient - Override acquire() in MonitoringValkeyClient with failure tracking and threshold-based reconnection - Migrate all 13 domain-specific Valkey clients to use the new pattern - Add tests for retry-based disconnection logic Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Refactors the Valkey client abstraction to standardize operations behind an async with ... acquire() context manager, enabling operation-level failure tracking and reconnect-on-threshold behavior, and migrates all domain Valkey clients to use the new pattern.
Changes:
- Added
AbstractValkeyClient.acquire()async context manager and aMonitoringValkeyClient.acquire()override that counts consecutive operation connection failures and triggers_reconnect()at a threshold. - Migrated domain-specific Valkey clients to use
async with self._client.acquire() as conn:for all Valkey operations. - Added unit tests validating failure counting, threshold behavior, and reconnection outcomes for
MonitoringValkeyClient.acquire().
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/ai/backend/common/clients/valkey_client/client.py |
Introduces acquire() and implements operation failure tracking + threshold-based reconnection. |
tests/unit/common/clients/valkey_client/test_monitoring_valkey_client.py |
Adds test coverage for the new acquire() behavior and reconnection triggering. |
src/ai/backend/common/clients/valkey_client/valkey_volume_stats/client.py |
Migrates volume stats operations to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_stream/client.py |
Migrates stream operations (xgroup/xreadgroup/etc.) to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_stat/client.py |
Migrates stat/cache operations and batched execs to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_session/client.py |
Migrates session CRUD and time/ping to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_schedule/client.py |
Migrates scheduling state operations and batch execs to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_rate_limit/client.py |
Migrates rate-limit scripts and sorted-set operations to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_live/client.py |
Migrates live data operations, scans, and pipelines to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_leader/client.py |
Migrates leadership Lua script invocations to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_image/client.py |
Migrates image membership operations and scans to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_container_log/client.py |
Migrates list/log operations and batch execs to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_bgtask/client.py |
Migrates task metadata operations, scripts, and execs to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_artifact/client.py |
Migrates artifact progress tracking (get/exec/hincrby/hset/scan) to the acquire() pattern. |
src/ai/backend/common/clients/valkey_client/valkey_artifact_registries/client.py |
Migrates registry CRUD operations to the acquire() pattern. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if self._operation_failure_count >= self._consecutive_failure_threshold: | ||
| log.warning( | ||
| "Operation failure threshold reached ({}), triggering reconnection...", | ||
| self._consecutive_failure_threshold, | ||
| ) | ||
| self._operation_failure_count = 0 | ||
| await self._reconnect() | ||
| raise |
There was a problem hiding this comment.
MonitoringValkeyClient.acquire() can call await self._reconnect() while the background monitor task may also call _reconnect() (and multiple concurrent operations can hit the threshold together). Because _reconnect() disconnects/connects both clients, concurrent invocations can race and lead to flapping reconnects or disconnecting a client while another reconnect is in progress. Consider serializing reconnects with an asyncio.Lock (e.g., self._reconnect_lock) and using it in both acquire() and the monitor loop (or inside _reconnect() itself).
| from ai.backend.common.clients.valkey_client.client import ( | ||
| _VALKEY_CONNECTION_ERRORS, | ||
| MonitoringValkeyClient, | ||
| create_valkey_client, | ||
| ) |
There was a problem hiding this comment.
The test imports _VALKEY_CONNECTION_ERRORS (a private module constant). This makes the test suite tightly coupled to an internal name that may change during refactors. Prefer either (a) exposing a public constant (no leading underscore) intended for reuse, or (b) asserting on the public behavior (e.g., raising ClientNotConnectedError / ClosingError etc.) without importing private internals.
The ValkeyLeaderClient was refactored to use `async with self._client.acquire()`, but the test mock didn't support the async context manager protocol. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add MonitoringValkeyClientSpec dataclass to group configuration fields (reconnectable_exceptions, consecutive_failure_threshold, monitor_interval) - Make AbstractValkeyClient.acquire() a proper abstract method - Add acquire() implementations to ValkeyStandaloneClient and ValkeySentinelClient Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Resolves BA-5578.
Summary
acquire()async context manager toAbstractValkeyClientandMonitoringValkeyClientthat wraps operations and tracks consecutive connection failuresasync with self._client.acquire() as conn:patternChanges
client.py): Define_VALKEY_CONNECTION_ERRORStuple, addacquire()base implementation toAbstractValkeyClient, override inMonitoringValkeyClientwith_operation_failure_counttracking and threshold-based_reconnect()triggerself._client.client.xxx()calls withasync with self._client.acquire() as conn:patternTestMonitoringValkeyClientAcquireclass with 8 test cases covering success/failure counting, threshold detection, reconnection, and error type trackingTest plan
pants fmt/fix/lint --changed-since=HEADpassespants check --changed-since=HEAD(mypy) passespants test --changed-since=HEADpasses🤖 Generated with Claude Code