Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions autogpt_platform/backend/backend/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
from backend.executor.utils import (
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS,
GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
GRAPH_EXECUTION_EXCHANGE,
GRAPH_EXECUTION_QUEUE_NAME,
GRAPH_EXECUTION_ROUTING_KEY,
CancelExecutionEvent,
ExecutionOutputEntry,
LogMetadata,
Expand Down Expand Up @@ -1459,14 +1461,43 @@ def _handle_run_message(

@func_retry
def _ack_message(reject: bool, requeue: bool):
"""Acknowledge or reject the message based on execution status."""
"""
Acknowledge or reject the message based on execution status.

Args:
reject: Whether to reject the message
requeue: Whether to requeue the message
"""

# Connection can be lost, so always get a fresh channel
channel = self.run_client.get_channel()
if reject:
channel.connection.add_callback_threadsafe(
lambda: channel.basic_nack(delivery_tag, requeue=requeue)
)
if requeue and settings.config.requeue_by_republishing:
# Send rejected message to back of queue using republishing
def _republish_to_back():
try:
# First republish to back of queue
self.run_client.publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
message=body.decode(), # publish_message expects string, not bytes
exchange=GRAPH_EXECUTION_EXCHANGE,
)
# Then reject without requeue (message already republished)
channel.basic_nack(delivery_tag, requeue=False)
logger.info("Message requeued to back of queue")
except Exception as e:
logger.error(
f"[{self.service_name}] Failed to requeue message to back: {e}"
)
# Fall back to traditional requeue on failure
channel.basic_nack(delivery_tag, requeue=True)

channel.connection.add_callback_threadsafe(_republish_to_back)
else:
# Traditional requeue (goes to front) or no requeue
channel.connection.add_callback_threadsafe(
lambda: channel.basic_nack(delivery_tag, requeue=requeue)
)
else:
channel.connection.add_callback_threadsafe(
lambda: channel.basic_ack(delivery_tag)
Expand Down
5 changes: 5 additions & 0 deletions autogpt_platform/backend/backend/util/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Maximum number of workers to use for graph execution.",
)

requeue_by_republishing: bool = Field(
default=True,
description="Send rate-limited messages to back of queue by republishing instead of front requeue to prevent blocking other users.",
)

# FastAPI Thread Pool Configuration
# IMPORTANT: FastAPI automatically offloads ALL sync functions to a thread pool:
# - Sync endpoint functions (def instead of async def)
Expand Down
Loading