fix(queue): add circuit breaker to prevent API retry storms#772
Open
deepakdevp wants to merge 4 commits intovolcengine:mainfrom
Open
fix(queue): add circuit breaker to prevent API retry storms#772deepakdevp wants to merge 4 commits intovolcengine:mainfrom
deepakdevp wants to merge 4 commits intovolcengine:mainfrom
Conversation
…ection Adds a thread-safe CircuitBreaker with three states (CLOSED/OPEN/HALF_OPEN) and classify_api_error() that distinguishes permanent (403/401) from transient (429/5xx/timeout) errors. Permanent errors trip the breaker immediately. Part of fix for volcengine#729. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
on_dequeue() now checks a circuit breaker before processing. Permanent API errors (403/401) trip the breaker immediately and drop the message. Transient errors (429/5xx/timeout) re-enqueue the message for later retry. When the breaker is open, messages are re-enqueued with a throttled sleep to prevent re-enqueue storms. Part of fix for volcengine#729. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replaces 429-only error check with classify_api_error() that handles permanent (403/401), transient (429/5xx/timeout), and unknown errors. Permanent errors trip the breaker and drop the message. Transient errors re-enqueue for retry (extending existing 429 behavior to all transient errors). Circuit breaker check before embedding prevents calling a known-broken API. Part of fix for volcengine#729. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…integration - Restore top-level `import asyncio` in collection_schemas.py (was accidentally removed, breaking all embedding operations) - Return error instead of falling through when breaker is open and no queue manager is available - Log warning when queue_manager is None in _reenqueue_semantic_msg - Only call report_success() when msg was actually re-enqueued, not when msg is None Part of fix for volcengine#729.
MaojiaSheng
approved these changes
Mar 19, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
CircuitBreakerutility that trips after consecutive failures (or immediately on permanent errors like 403/401) and blocks further API calls until a cooldown period elapsesSemanticProcessorandTextEmbeddingHandlerqueue handlers with proper error classification: permanent errors (403/401) drop the message, transient errors (429/5xx/timeout) re-enqueue for later retryFixes #729
Root Cause
When the VLM/embedding API returns an unrecoverable error (e.g., 403 AccountOverdue), queue handlers had no error classification or circuit breaking. Many distinct messages in the queue each independently hit the same broken API endpoint, generating thousands of wasted calls before the queue drained.
Changes Made
openviking/utils/circuit_breaker.py(new):CircuitBreakerclass with CLOSED/OPEN/HALF_OPEN states +classify_api_error()that distinguishes permanent vs transient errors +CircuitBreakerOpenexceptionopenviking/storage/queuefs/semantic_processor.py: Added breaker guard before processing, error classification in the except block, and throttled re-enqueue for transient errorsopenviking/storage/collection_schemas.py: Replacedis_429_errorwithclassify_api_error(), added breaker guard before embedding, extended re-enqueue from 429-only to all transient errorstests/utils/test_circuit_breaker.py(new): 14 unit tests covering state transitions, error classification, thread safety, and edge casesType of Change
Testing
🤖 Generated with Claude Code