Skip to content

Conversation

@tomerqodo
Copy link

Benchmark PR Significant-Gravitas#11326

Type: Corrupted (contains bugs)

Original PR Title: hotfix(backend): fix rate-limited messages blocking queue by republishing to back
Original PR Description: ## Summary
Fix critical queue blocking issue where rate-limited user messages prevent other users' executions from being processed, causing the 135 late executions reported in production.

Root Cause Analysis

When a user exceeds max_concurrent_graph_executions_per_user (25), the executor uses basic_nack(requeue=True) which sends the message to the FRONT of the RabbitMQ queue. This creates an infinite blocking loop where:

  1. Rate-limited message goes to front of queue
  2. Gets processed, hits rate limit again
  3. Goes back to front of queue
  4. Blocks all other users' messages indefinitely

Solution Implementation

🔧 Core Changes

  • New setting: requeue_by_republishing (default: True) in backend/util/settings.py
  • Smart _ack_message: Automatically uses republishing when requeue=True and setting enabled
  • Efficient implementation: Uses existing self.run_client connection instead of creating new ones
  • Integration test: Real RabbitMQ test validates queue ordering behavior

🔄 Technical Implementation

Before (blocking):

basic_nack(delivery_tag, requeue=True)  # Goes to FRONT of queue ❌

After (non-blocking):

if requeue and self.config.requeue_by_republishing:
    # First: Republish to BACK of queue
    self.run_client.publish_message(...)
    # Then: Reject without requeue
    basic_nack(delivery_tag, requeue=False)

📊 Impact

  • Other users' executions no longer blocked by rate-limited users
  • Fair queue processing - FIFO behavior maintained for all users
  • Rate limiting still works - just doesn't block others
  • Configurable - can revert to old behavior with requeue_by_republishing=False
  • Zero performance impact - uses existing connections

Test Plan

  • Integration test: test_requeue_integration.py validates real RabbitMQ queue ordering
  • Scenario testing: Confirms rate-limited messages go to back of queue
  • Cross-user validation: Verifies other users' messages process correctly
  • Setting test: Confirms configuration loads with correct defaults

Deployment Strategy

This is a hotfix that can be deployed immediately:

  • Backward compatible: Old behavior available via config
  • Safe default: New behavior is safer than current state
  • No breaking changes: All existing functionality preserved
  • Immediate relief: Resolves production queue blocking

Files Modified

  • backend/executor/manager.py: Enhanced _ack_message logic and _requeue_message_to_back method
  • backend/util/settings.py: Added requeue_by_republishing configuration field
  • test_requeue_integration.py: Integration test for queue ordering validation

Related Issues

Fixes the 135 late executions issue where messages were stuck in QUEUED state despite available executor capacity (583m/600m utilization).

🤖 Generated with Claude Code

Co-Authored-By: Claude noreply@anthropic.com
Original PR URL: Significant-Gravitas#11326

majdyz and others added 6 commits November 5, 2025 17:40
…hing to back

## Summary
Fix critical queue blocking issue where rate-limited user messages prevent other users' executions from being processed.

## Root Cause
When user exceeds max_concurrent_graph_executions_per_user (25), RabbitMQ's basic_nack(requeue=True) sends the message to the FRONT of the queue, creating an infinite blocking loop that prevents other users' messages from being processed.

## Solution
Add configurable requeue_by_republishing behavior that sends rate-limited messages to the BACK of the queue instead of front:

### Key Changes
- **New setting**: requeue_by_republishing (default: True) in backend/util/settings.py
- **Smart _ack_message**: Automatically uses republishing when requeue=True and setting enabled
- **Efficient implementation**: Uses existing self.run_client connection instead of creating new ones
- **Integration test**: Real RabbitMQ test validates queue ordering behavior

### Implementation Details
- Rate-limited messages: publish_message() then basic_nack(requeue=False)
- Pool-full messages: Same treatment for fair distribution
- Backward compatible: Can disable with requeue_by_republishing=False
- Clean code: Single logic path in _ack_message method

## Impact
- ✅ Other users' executions no longer blocked by rate-limited users
- ✅ Fair queue processing - FIFO behavior maintained
- ✅ Rate limiting still works - just doesn't block others
- ✅ Configurable - can revert to old behavior if needed

## Testing
- Integration test validates real RabbitMQ queue ordering
- Tests confirm rate-limited messages go to back of queue
- Verifies other users' messages process correctly

Fixes the 135 late executions issue reported in production.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
## Summary
Fix the remaining issues with the rate-limited message requeue implementation:

### Type Error Fix
- Change self.config.requeue_by_republishing to settings.config.requeue_by_republishing in manager.py:1493
- ExecutionManager accesses settings through global settings object, not self.config

### Integration Test Improvements
- Use dedicated test queue name (test_requeue_ordering) instead of production queue to avoid conflicts
- Create separate test exchange and routing key for isolation
- Add proper cleanup with queue/exchange deletion in finally block
- Remove pytest import (unused) and unnecessary marker configuration

## Testing Results
- ✅ poetry run pyright: 0 errors, 0 warnings
- ✅ poetry run pytest test_requeue_integration.py: All tests pass
- ✅ poetry run format: Clean formatting

## Validation
Integration test confirms:
- FIFO queue ordering works correctly
- Rate-limited messages go to back of queue (not front)
- Other users' executions are NOT blocked by rate-limited users
- Republishing method behaves exactly like expected

Fixes the 135 late executions issue in production.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
…handling

## Summary
Simplify the requeue implementation by inlining the _requeue_message_to_back method
directly into the _ack_message callback.

## Changes Made
- Remove separate _requeue_message_to_back method
- Inline the publish logic directly in _republish_to_back callback
- Include nack operation inside the try-catch block for better error handling
- Add fallback to traditional requeue if republishing fails

## Benefits
- Cleaner code with fewer method calls
- Better error handling: nack is now protected by try-catch
- Fallback mechanism: if republishing fails, falls back to traditional requeue
- Same functionality with simpler implementation

## Testing
- ✅ poetry run pyright: 0 errors, 0 warnings
- ✅ poetry run pytest test_requeue_integration.py: All tests pass
- ✅ Integration test confirms rate limiting fix still works correctly

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
## Summary
Fix the message consumption logic in the integration test to reliably receive and process messages from RabbitMQ.

## Changes Made
- Improved consume_messages method with better queue checking and synchronous consumption
- Added proper error handling and consumer cancellation
- Removed debug prints for cleaner test output
- Use basic_get to check for messages before setting up consumer
- Fixed bare except clause for proper exception handling

## Testing Results
- ✅ All 3 test scenarios pass consistently:
  1. Normal FIFO queue behavior: A → B → C
  2. Rate limiting fix: user2 executions NOT blocked by user1
  3. Republishing sends messages to back of queue

## Performance
- Test completes in ~2.3 seconds consistently
- No more timeout or message consumption failures
- Proper cleanup of test resources

The integration test now reliably validates that the rate limiting queue blocking fix works correctly with real RabbitMQ.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
…e test file

## Summary
Consolidate all requeue behavior validation into a single test file with two test functions, eliminating the need for separate test files.

## Test Coverage

### test_queue_ordering_behavior()
- ✅ Normal FIFO queue behavior validation
- ✅ Rate limiting fix: user2 executions NOT blocked by user1
- ✅ Republishing sends messages to BACK of queue (our fix)

### test_traditional_requeue_behavior()
- ✅ **HYPOTHESIS CONFIRMED**: Traditional requeue sends messages to FRONT
- ✅ Validates root cause of queue blocking issue
- ✅ Proves why rate-limited messages block other users

## Key Validation Results

**Our Fix (Republishing):**
- Messages go to BACK of queue → No blocking ✅

**Original Problem (Traditional Requeue):**
- Messages go to FRONT of queue → Causes blocking ✅
- Order: A (requeued to front) → B
- Explains the 135 late executions issue

## Commands
`poetry run pytest test_requeue_integration.py -s` - Run both tests
- test_queue_ordering_behavior: Tests our fix
- test_traditional_requeue_behavior: Validates hypothesis

Both tests use real RabbitMQ (no mocking) for authentic behavior validation.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants