From c507e8d89a042a36139b32290988c05192936efa Mon Sep 17 00:00:00 2001 From: Aditya8369 Date: Thu, 26 Feb 2026 19:23:00 +0530 Subject: [PATCH 1/3] [REFACTOR] Duplicated Retry Decorator Logic --- bindu/utils/retry.py | 191 +++++++++++++++++++------------------------ 1 file changed, 82 insertions(+), 109 deletions(-) diff --git a/bindu/utils/retry.py b/bindu/utils/retry.py index ceb6f5df..d642e603 100644 --- a/bindu/utils/retry.py +++ b/bindu/utils/retry.py @@ -55,6 +55,76 @@ ) +def _create_retry_decorator( + operation_type: str, + use_jitter: bool, + log_prefix: str, +) -> Callable[[F], F]: + """Generic retry decorator factory. + + Creates retry decorators with configurable settings and wait strategies. + + Args: + operation_type: Type of operation (worker, storage, scheduler, api) + use_jitter: Whether to use random exponential backoff (jitter) + log_prefix: Prefix for log messages + + Returns: + Decorator function with retry logic + """ + + def decorator( + max_attempts: int | None = None, + min_wait: float | None = None, + max_wait: float | None = None, + ) -> Callable[[F], F]: + def inner(func: F) -> F: + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + # Get settings from app_settings.retry + retry_settings = app_settings.retry + _max_attempts = max_attempts or getattr( + retry_settings, f"{operation_type}_max_attempts" + ) + _min_wait = min_wait or getattr( + retry_settings, f"{operation_type}_min_wait" + ) + _max_wait = max_wait or getattr( + retry_settings, f"{operation_type}_max_wait" + ) + + # Choose wait strategy + if use_jitter: + wait_strategy = wait_random_exponential( + multiplier=1, min=_min_wait, max=_max_wait + ) + else: + wait_strategy = wait_exponential( + multiplier=1, min=_min_wait, max=_max_wait + ) + + async for attempt in AsyncRetrying( + stop=stop_after_attempt(_max_attempts), + wait=wait_strategy, + retry=retry_if_exception_type(TRANSIENT_EXCEPTIONS), + before_sleep=before_sleep_log(logger, logging.WARNING), + after=after_log(logger, logging.INFO), + reraise=True, + ): + with attempt: + logger.debug( + f"Executing {log_prefix} {func.__name__} " # type: ignore[attr-defined] + f"(attempt {attempt.retry_state.attempt_number}/{_max_attempts})" + ) + return await func(*args, **kwargs) + + return wrapper # type: ignore + + return inner + + return decorator + + def retry_worker_operation( max_attempts: int | None = None, min_wait: float | None = None, @@ -79,33 +149,9 @@ async def run_task(self, params): # Task execution logic pass """ - - def decorator(func: F) -> F: - @wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - _max_attempts = max_attempts or app_settings.retry.worker_max_attempts - _min_wait = min_wait or app_settings.retry.worker_min_wait - _max_wait = max_wait or app_settings.retry.worker_max_wait - - async for attempt in AsyncRetrying( - stop=stop_after_attempt(_max_attempts), - wait=wait_random_exponential( - multiplier=1, min=_min_wait, max=_max_wait - ), - retry=retry_if_exception_type(TRANSIENT_EXCEPTIONS), - before_sleep=before_sleep_log(logger, logging.WARNING), - after=after_log(logger, logging.INFO), - reraise=True, - ): - with attempt: - logger.debug( - f"Executing {func.__name__} (attempt {attempt.retry_state.attempt_number}/{_max_attempts})" # type: ignore[attr-defined] - ) - return await func(*args, **kwargs) - - return wrapper # type: ignore - - return decorator + return _create_retry_decorator("worker", True, "")( + max_attempts, min_wait, max_wait + ) def retry_storage_operation( @@ -131,32 +177,9 @@ async def update_task(self, task_id, state): # Database update logic pass """ - - def decorator(func: F) -> F: - @wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - _max_attempts = max_attempts or app_settings.retry.storage_max_attempts - _min_wait = min_wait or app_settings.retry.storage_min_wait - _max_wait = max_wait or app_settings.retry.storage_max_wait - - async for attempt in AsyncRetrying( - stop=stop_after_attempt(_max_attempts), - wait=wait_exponential(multiplier=1, min=_min_wait, max=_max_wait), - retry=retry_if_exception_type(TRANSIENT_EXCEPTIONS), - before_sleep=before_sleep_log(logger, logging.WARNING), - after=after_log(logger, logging.INFO), - reraise=True, - ): - with attempt: - logger.debug( - f"Executing storage operation {func.__name__} " # type: ignore[attr-defined] - f"(attempt {attempt.retry_state.attempt_number}/{_max_attempts})" - ) - return await func(*args, **kwargs) - - return wrapper # type: ignore - - return decorator + return _create_retry_decorator("storage", False, "storage operation")( + max_attempts, min_wait, max_wait + ) def retry_scheduler_operation( @@ -182,34 +205,9 @@ async def run_task(self, params): # Scheduler logic pass """ - - def decorator(func: F) -> F: - @wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - _max_attempts = max_attempts or app_settings.retry.scheduler_max_attempts - _min_wait = min_wait or app_settings.retry.scheduler_min_wait - _max_wait = max_wait or app_settings.retry.scheduler_max_wait - - async for attempt in AsyncRetrying( - stop=stop_after_attempt(_max_attempts), - wait=wait_random_exponential( - multiplier=1, min=_min_wait, max=_max_wait - ), - retry=retry_if_exception_type(TRANSIENT_EXCEPTIONS), - before_sleep=before_sleep_log(logger, logging.WARNING), - after=after_log(logger, logging.INFO), - reraise=True, - ): - with attempt: - logger.debug( - f"Executing scheduler operation {func.__name__} " # type: ignore[attr-defined] - f"(attempt {attempt.retry_state.attempt_number}/{_max_attempts})" - ) - return await func(*args, **kwargs) - - return wrapper # type: ignore - - return decorator + return _create_retry_decorator("scheduler", True, "scheduler operation")( + max_attempts, min_wait, max_wait + ) def retry_api_call( @@ -235,34 +233,9 @@ async def call_external_service(self, data): # API call logic pass """ - - def decorator(func: F) -> F: - @wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - _max_attempts = max_attempts or app_settings.retry.api_max_attempts - _min_wait = min_wait or app_settings.retry.api_min_wait - _max_wait = max_wait or app_settings.retry.api_max_wait - - async for attempt in AsyncRetrying( - stop=stop_after_attempt(_max_attempts), - wait=wait_random_exponential( - multiplier=1, min=_min_wait, max=_max_wait - ), - retry=retry_if_exception_type(TRANSIENT_EXCEPTIONS), - before_sleep=before_sleep_log(logger, logging.WARNING), - after=after_log(logger, logging.INFO), - reraise=True, - ): - with attempt: - logger.debug( - f"Executing API call {func.__name__} " # type: ignore[attr-defined] - f"(attempt {attempt.retry_state.attempt_number}/{_max_attempts})" - ) - return await func(*args, **kwargs) - - return wrapper # type: ignore - - return decorator + return _create_retry_decorator("api", True, "API call")( + max_attempts, min_wait, max_wait + ) def is_retryable_error(exception: Exception) -> bool: From 4228fffee07fbee90b7dffe53df8dfa00ac92c78 Mon Sep 17 00:00:00 2001 From: Aditya8369 Date: Sat, 21 Mar 2026 18:29:05 +0530 Subject: [PATCH 2/3] Bug: Unimplemented Task Pause/Resume Functionality in Base Worker --- bindu/server/workers/base.py | 103 +++++++- docs/SCHEDULER.md | 63 +++++ tests/unit/test_worker_pause_resume.py | 323 +++++++++++++++++++++++++ 3 files changed, 479 insertions(+), 10 deletions(-) create mode 100644 tests/unit/test_worker_pause_resume.py diff --git a/bindu/server/workers/base.py b/bindu/server/workers/base.py index 5619483d..50f3cb23 100644 --- a/bindu/server/workers/base.py +++ b/bindu/server/workers/base.py @@ -23,7 +23,9 @@ from abc import ABC, abstractmethod from contextlib import asynccontextmanager from dataclasses import dataclass +from datetime import datetime, timezone from typing import Any, AsyncIterator +from uuid import UUID import anyio from opentelemetry.trace import get_tracer, use_span @@ -221,19 +223,100 @@ def build_artifacts(self, result: Any) -> list[Artifact]: async def _handle_pause(self, params: TaskIdParams) -> None: """Handle pause operation. - TODO: Implement task pause functionality - - Save current execution state - - Update task to 'suspended' state - - Release resources while preserving context + Saves task execution context and transitions task to suspended state. + + Implementation: + - Load task from storage + - Save execution state to task metadata (paused_at timestamp) + - Update task state to 'suspended' + - Release resources while preserving task context + - Log the pause operation + + Args: + params: Task identification parameters containing task_id """ - raise NotImplementedError("Pause operation not yet implemented") + task_id: UUID = params["task_id"] + + try: + # Load the task to ensure it exists + task = await self.storage.load_task(task_id) + if not task: + logger.error(f"Task {task_id} not found for pause operation") + raise ValueError(f"Task {task_id} not found") + + # Prepare metadata with pause timestamp + pause_metadata = { + "paused_at": datetime.now(timezone.utc).isoformat(), + "paused_from_state": task.get("status", {}).get("state", "unknown"), + } + + # Update task to suspended state with pause metadata + await self.storage.update_task( + task_id, + state="suspended", + metadata=pause_metadata, + ) + + logger.info( + f"Task {task_id} paused successfully. " + f"Previous state: {pause_metadata['paused_from_state']}" + ) + + except Exception as e: + logger.error(f"Failed to pause task {task_id}: {e}", exc_info=True) + raise async def _handle_resume(self, params: TaskIdParams) -> None: """Handle resume operation. - TODO: Implement task resume functionality - - Restore execution state - - Update task to 'resumed' state - - Continue from last checkpoint + Restores task execution context and transitions task to resumed state. + + Implementation: + - Load task from storage + - Verify task is in suspended state (graceful if not) + - Restore execution context from task metadata if available + - Update task state to 'resumed' + - Log the resume operation + + Args: + params: Task identification parameters containing task_id """ - raise NotImplementedError("Resume operation not yet implemented") + task_id: UUID = params["task_id"] + + try: + # Load the task to ensure it exists + task = await self.storage.load_task(task_id) + if not task: + logger.error(f"Task {task_id} not found for resume operation") + raise ValueError(f"Task {task_id} not found") + + current_state = task.get("status", {}).get("state", "unknown") + + # Prepare metadata with resume timestamp and original pause info + resume_metadata = { + "resumed_at": datetime.now(timezone.utc).isoformat(), + "resumed_from_state": current_state, + } + + # If task was suspended, preserve the original paused_at timestamp + if "metadata" in task and "paused_at" in task.get("metadata", {}): + resume_metadata["paused_at"] = task["metadata"]["paused_at"] + resume_metadata["paused_from_state"] = task["metadata"].get( + "paused_from_state", "unknown" + ) + + # Update task to resumed state with resume metadata + await self.storage.update_task( + task_id, + state="resumed", + metadata=resume_metadata, + ) + + logger.info( + f"Task {task_id} resumed successfully. " + f"Previous state: {current_state}" + ) + + except Exception as e: + logger.error(f"Failed to resume task {task_id}: {e}", exc_info=True) + raise diff --git a/docs/SCHEDULER.md b/docs/SCHEDULER.md index 052ccc5d..3efcb27a 100644 --- a/docs/SCHEDULER.md +++ b/docs/SCHEDULER.md @@ -65,6 +65,69 @@ sequenceDiagram Note over Scheduler: - run_task
- cancel_task
- pause_task
- resume_task ``` +## Task Operations + +The scheduler supports four primary task operations: + +### 1. Run Task +Executes a task from the queue. The worker processes the task and updates the state based on execution results. + +### 2. Cancel Task +Cancels a running task immediately. Updates task state to `canceled` and releases allocated resources. + +### 3. Pause Task (NEW) +Suspends task execution while preserving the execution context. The worker: +- Saves the current task state to storage +- Records the pause timestamp in task metadata +- Updates task state to `suspended` +- Releases computational resources +- Preserves all task history and context for later resumption + +**Use Cases:** +- Long-running tasks that users want to pause temporarily +- Resource optimization (pause low-priority tasks) +- Checkpoint-based execution in interruptible workflows + +**Example API Call:** +```python +# Pause a running task +await task_manager.pause_task(task_id) + +# Task state transitions: working → suspended +# Metadata includes: paused_at (ISO timestamp), paused_from_state +``` + +### 4. Resume Task (NEW) +Resumes a suspended task from where it was paused. The worker: +- Loads the task from storage +- Restores execution context from task metadata +- Updates task state to `resumed` +- Continues execution from the last checkpoint +- Preserves original pause timestamp in metadata + +**Use Cases:** +- Resuming paused long-running tasks +- Continuing work on interrupted workflows +- Distributed task execution with pause points + +**Example API Call:** +```python +# Resume a suspended task +await task_manager.resume_task(task_id) + +# Task state transitions: suspended → resumed +# Metadata includes: resumed_at (ISO timestamp), paused_from_state +``` + +**Task Lifecycle with Pause/Resume:** +``` +submitted → working ─┬→ completed (normal) + ├→ suspended (pause) + │ └→ resumed (resume) + │ └→ completed + └→ failed/canceled +``` + ## Configuration ### Environment Variables diff --git a/tests/unit/test_worker_pause_resume.py b/tests/unit/test_worker_pause_resume.py new file mode 100644 index 00000000..6b94bcc9 --- /dev/null +++ b/tests/unit/test_worker_pause_resume.py @@ -0,0 +1,323 @@ +"""Unit tests for Worker pause/resume functionality.""" + +from typing import cast +from uuid import uuid4 + +import pytest + +from bindu.common.models import AgentManifest +from bindu.common.protocol.types import TaskIdParams, TaskSendParams +from bindu.server.scheduler.memory_scheduler import InMemoryScheduler +from bindu.server.storage.memory_storage import InMemoryStorage +from bindu.server.workers.manifest_worker import ManifestWorker +from tests.mocks import MockManifest +from tests.utils import assert_task_state, create_test_message + + +class TestPauseOperation: + """Test task pause functionality.""" + + @pytest.mark.asyncio + async def test_pause_suspended_task( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test pausing a task transitions it to suspended state.""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Create a task + message = create_test_message(text="Test task") + task = await storage.submit_task(message["context_id"], message) + task_id = task["id"] + + # Update task to working state + await storage.update_task(task_id, state="working") + + # Pause the task + params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_pause(params) + + # Verify task is suspended + paused_task = await storage.load_task(task_id) + assert_task_state(paused_task, "suspended") + assert paused_task is not None + + @pytest.mark.asyncio + async def test_pause_stores_metadata( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test pause stores metadata timestamp.""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Create and pause a task + message = create_test_message(text="Test") + task = await storage.submit_task(message["context_id"], message) + task_id = task["id"] + + await storage.update_task(task_id, state="working") + params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_pause(params) + + # Verify metadata contains pause information + paused_task = await storage.load_task(task_id) + assert paused_task is not None + assert "metadata" in paused_task + metadata = paused_task.get("metadata", {}) + assert "paused_at" in metadata + assert "paused" in str(metadata).lower() or "suspensio" in str(metadata).lower() + + @pytest.mark.asyncio + async def test_pause_nonexistent_task_fails( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test pausing a nonexistent task raises error.""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Try to pause a task that doesn't exist + fake_task_id = uuid4() + params = cast(TaskIdParams, {"task_id": fake_task_id}) + + with pytest.raises(Exception): + await worker._handle_pause(params) + + +class TestResumeOperation: + """Test task resume functionality.""" + + @pytest.mark.asyncio + async def test_resume_resumes_suspended_task( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test resuming a suspended task transitions it to resumed state.""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Create, suspend, then resume a task + message = create_test_message(text="Test task") + task = await storage.submit_task(message["context_id"], message) + task_id = task["id"] + + await storage.update_task(task_id, state="working") + + # Pause the task + pause_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_pause(pause_params) + + # Verify it's suspended + paused_task = await storage.load_task(task_id) + assert_task_state(paused_task, "suspended") + + # Resume the task + resume_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_resume(resume_params) + + # Verify task is resumed + resumed_task = await storage.load_task(task_id) + assert_task_state(resumed_task, "resumed") + + @pytest.mark.asyncio + async def test_resume_preserves_metadata( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test resume preserves task metadata from suspension.""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Create task with initial metadata + message = create_test_message(text="Test") + task = await storage.submit_task(message["context_id"], message) + task_id = task["id"] + + initial_metadata = {"custom_field": "value"} + await storage.update_task(task_id, state="working", metadata=initial_metadata) + + # Pause and resume + pause_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_pause(pause_params) + + resume_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_resume(resume_params) + + # Verify metadata is preserved + resumed_task = await storage.load_task(task_id) + metadata = resumed_task.get("metadata", {}) + assert metadata.get("custom_field") == "value" + assert "paused_at" in metadata or "resumed_at" in metadata + + @pytest.mark.asyncio + async def test_resume_nonexistent_task_fails( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test resuming a nonexistent task raises error.""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Try to resume a task that doesn't exist + fake_task_id = uuid4() + params = cast(TaskIdParams, {"task_id": fake_task_id}) + + with pytest.raises(Exception): + await worker._handle_resume(params) + + @pytest.mark.asyncio + async def test_resume_non_suspended_task( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test resuming a non-suspended task (graceful handling).""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Create a task in working state (not suspended) + message = create_test_message(text="Test") + task = await storage.submit_task(message["context_id"], message) + task_id = task["id"] + + await storage.update_task(task_id, state="working") + + # Try to resume without pausing first + # Should still work - transitions to resumed + resume_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_resume(resume_params) + + # Verify task is resumed + resumed_task = await storage.load_task(task_id) + assert_task_state(resumed_task, "resumed") + + +class TestPauseResumeCycle: + """Test complete pause/resume cycles.""" + + @pytest.mark.asyncio + async def test_multiple_pause_resume_cycles( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test multiple pause/resume cycles on same task.""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Create a task + message = create_test_message(text="Test") + task = await storage.submit_task(message["context_id"], message) + task_id = task["id"] + + # Cycle 1: suspend + await storage.update_task(task_id, state="working") + pause_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_pause(pause_params) + paused1 = await storage.load_task(task_id) + assert_task_state(paused1, "suspended") + + # Resume cycle 1 + resume_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_resume(resume_params) + resumed1 = await storage.load_task(task_id) + assert_task_state(resumed1, "resumed") + + # Update back to working for next cycle + await storage.update_task(task_id, state="working") + + # Cycle 2: suspend again + await worker._handle_pause(pause_params) + paused2 = await storage.load_task(task_id) + assert_task_state(paused2, "suspended") + + # Resume cycle 2 + await worker._handle_resume(resume_params) + resumed2 = await storage.load_task(task_id) + assert_task_state(resumed2, "resumed") + + @pytest.mark.asyncio + async def test_pause_preserves_history_and_artifacts( + self, + storage: InMemoryStorage, + scheduler: InMemoryScheduler, + ): + """Test that pause/resume preserves task history and artifacts.""" + manifest = MockManifest() + worker = ManifestWorker( + scheduler=scheduler, + storage=storage, + manifest=cast(AgentManifest, manifest), + ) + + # Create a task with history + message1 = create_test_message(text="First message") + task = await storage.submit_task(message1["context_id"], message1) + task_id = task["id"] + + # Add more messages to history + message2 = create_test_message( + text="Second message", context_id=task["context_id"] + ) + await storage.append_to_contexts( + task["context_id"], [message2] + ) + + await storage.update_task(task_id, state="working") + + # Get history before pause + task_before = await storage.load_task(task_id) + history_before = task_before.get("history", []) + + # Pause and resume + pause_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_pause(pause_params) + + resume_params = cast(TaskIdParams, {"task_id": task_id}) + await worker._handle_resume(resume_params) + + # Verify history is preserved + task_after = await storage.load_task(task_id) + history_after = task_after.get("history", []) + + assert len(history_after) >= len(history_before) From 074c90e0d70f3860306cd6d24df4631005be29ca Mon Sep 17 00:00:00 2001 From: Aditya Mahajan Date: Tue, 24 Mar 2026 09:41:25 +0530 Subject: [PATCH 3/3] Delete bindu/utils/retry.py --- bindu/utils/retry.py | 289 ------------------------------------------- 1 file changed, 289 deletions(-) delete mode 100644 bindu/utils/retry.py diff --git a/bindu/utils/retry.py b/bindu/utils/retry.py deleted file mode 100644 index b025b90d..00000000 --- a/bindu/utils/retry.py +++ /dev/null @@ -1,289 +0,0 @@ -"""Retry configuration and decorators using Tenacity. - -This module provides retry mechanisms for various operations in Bindu: -- Worker task execution -- Storage operations (database, redis) -- External API calls -- Scheduler operations - -Retry Strategies: -- Exponential backoff with jitter -- Configurable max attempts -- Custom retry conditions -- Logging and observability integration -""" - -from __future__ import annotations - -import asyncio -import logging -from functools import wraps -from typing import Any, Callable, TypeVar - -from tenacity import ( - AsyncRetrying, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, - before_sleep_log, - after_log, -) -from tenacity.wait import wait_random_exponential - -from bindu.utils.exceptions import ( - HTTPConnectionError, - HTTPTimeoutError, - HTTPServerError, -) -from bindu.utils.logging import get_logger -from bindu.settings import app_settings - -logger = get_logger("bindu.utils.retry") - -# Type variables for generic decorators -F = TypeVar("F", bound=Callable[..., Any]) - -# Common transient errors that should trigger retries -# Note: Only includes truly transient errors (network, timeout, connection) -# Application logic errors (ValueError, KeyError, etc.) should not be retried -TRANSIENT_EXCEPTIONS = ( - # Network errors - ConnectionError, - ConnectionRefusedError, - ConnectionResetError, - ConnectionAbortedError, - # Timeout errors - TimeoutError, - asyncio.TimeoutError, - # OS errors - OSError, # Covers BrokenPipeError, etc. -) - -# HTTP-specific retryable exceptions -HTTP_RETRYABLE_EXCEPTIONS = TRANSIENT_EXCEPTIONS + ( - HTTPConnectionError, - HTTPTimeoutError, - HTTPServerError, # 5xx errors are retryable -) - - -def create_retry_decorator( - operation_type: str, - max_attempts: int | None = None, - min_wait: float | None = None, - max_wait: float | None = None, - use_jitter: bool = True, -) -> Callable[[F], F]: - """Create retry decorators with consistent behavior. - - This replaces the duplicate retry_worker_operation, retry_storage_operation, - retry_scheduler_operation, and retry_api_call decorators. - - Args: - operation_type: Type of operation ('worker', 'storage', 'scheduler', 'api') - max_attempts: Maximum number of retry attempts (uses settings default if None) - min_wait: Minimum wait time between retries in seconds (uses settings default if None) - max_wait: Maximum wait time between retries in seconds (uses settings default if None) - use_jitter: Whether to use random exponential backoff (True) or regular exponential (False) - - Returns: - Decorated function with retry logic - - Example: - @create_retry_decorator('worker') - async def run_task(self, params): - # Task execution logic - pass - """ - # Map operation types to settings - settings_map = { - "worker": ("worker_max_attempts", "worker_min_wait", "worker_max_wait"), - "storage": ("storage_max_attempts", "storage_min_wait", "storage_max_wait"), - "scheduler": ( - "scheduler_max_attempts", - "scheduler_min_wait", - "scheduler_max_wait", - ), - "api": ("api_max_attempts", "api_min_wait", "api_max_wait"), - } - - if operation_type not in settings_map: - raise ValueError( - f"Invalid operation_type: {operation_type}. " - f"Must be one of: {', '.join(settings_map.keys())}" - ) - - max_key, min_key, max_wait_key = settings_map[operation_type] - - def decorator(func: F) -> F: - @wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - # Get retry parameters from settings or use provided values - _max_attempts = max_attempts or getattr(app_settings.retry, max_key) - _min_wait = min_wait or getattr(app_settings.retry, min_key) - _max_wait = max_wait or getattr(app_settings.retry, max_wait_key) - - # Choose wait strategy based on use_jitter - wait_strategy = ( - wait_random_exponential(multiplier=1, min=_min_wait, max=_max_wait) - if use_jitter - else wait_exponential(multiplier=1, min=_min_wait, max=_max_wait) - ) - - async for attempt in AsyncRetrying( - stop=stop_after_attempt(_max_attempts), - wait=wait_strategy, - retry=retry_if_exception_type(TRANSIENT_EXCEPTIONS), - before_sleep=before_sleep_log(logger, logging.WARNING), - after=after_log(logger, logging.INFO), - reraise=True, - ): - with attempt: - logger.debug( - f"Executing {operation_type} operation {func.__name__} " - f"(attempt {attempt.retry_state.attempt_number}/{_max_attempts})" - ) - return await func(*args, **kwargs) - - return wrapper # type: ignore - - return decorator - - -# Convenience decorators using the factory (backward compatibility) -def retry_worker_operation( - max_attempts: int | None = None, - min_wait: float | None = None, - max_wait: float | None = None, -) -> Callable[[F], F]: - """Retry decorator for worker task execution operations. - - Retries on transient failures with exponential backoff and jitter. - - Args: - max_attempts: Maximum number of retry attempts - min_wait: Minimum wait time between retries (seconds) - max_wait: Maximum wait time between retries (seconds) - - Returns: - Decorated function with retry logic - """ - return create_retry_decorator( - "worker", max_attempts, min_wait, max_wait, use_jitter=True - ) - - -def retry_storage_operation( - max_attempts: int | None = None, - min_wait: float | None = None, - max_wait: float | None = None, -) -> Callable[[F], F]: - """Retry decorator for storage operations (database, redis). - - Handles transient database connection issues, deadlocks, and timeouts. - - Args: - max_attempts: Maximum number of retry attempts - min_wait: Minimum wait time between retries (seconds) - max_wait: Maximum wait time between retries (seconds) - - Returns: - Decorated function with retry logic - """ - return create_retry_decorator( - "storage", max_attempts, min_wait, max_wait, use_jitter=False - ) - - -def retry_scheduler_operation( - max_attempts: int | None = None, - min_wait: float | None = None, - max_wait: float | None = None, -) -> Callable[[F], F]: - """Retry decorator for scheduler operations. - - Handles transient failures in task scheduling and broker communication. - - Args: - max_attempts: Maximum number of retry attempts - min_wait: Minimum wait time between retries (seconds) - max_wait: Maximum wait time between retries (seconds) - - Returns: - Decorated function with retry logic - """ - return create_retry_decorator( - "scheduler", max_attempts, min_wait, max_wait, use_jitter=True - ) - - -def retry_api_call( - max_attempts: int | None = None, - min_wait: float | None = None, - max_wait: float | None = None, -) -> Callable[[F], F]: - """Retry decorator for external API calls. - - Handles transient network failures, rate limits, and timeouts. - - Args: - max_attempts: Maximum number of retry attempts - min_wait: Minimum wait time between retries (seconds) - max_wait: Maximum wait time between retries (seconds) - - Returns: - Decorated function with retry logic - """ - return create_retry_decorator( - "api", max_attempts, min_wait, max_wait, use_jitter=True - ) - - -async def execute_with_retry( - func: Callable[..., Any], - *args: Any, - max_attempts: int = 3, - min_wait: float = 1, - max_wait: float = 10, - **kwargs: Any, -) -> Any: - """Execute a function with retry logic. - - Utility function for ad-hoc retry logic without decorators. - - Args: - func: Function to execute - *args: Positional arguments for the function - max_attempts: Maximum number of retry attempts - min_wait: Minimum wait time between retries (seconds) - max_wait: Maximum wait time between retries (seconds) - **kwargs: Keyword arguments for the function - - Returns: - Result of the function execution - - Raises: - RetryError: If all retry attempts fail - - Example: - result = await execute_with_retry( - some_async_function, - arg1, arg2, - max_attempts=5, - kwarg1=value1 - ) - """ - async for attempt in AsyncRetrying( - stop=stop_after_attempt(max_attempts), - wait=wait_random_exponential(multiplier=1, min=min_wait, max=max_wait), - retry=retry_if_exception_type(TRANSIENT_EXCEPTIONS), - before_sleep=before_sleep_log(logger, logging.WARNING), - after=after_log(logger, logging.INFO), - reraise=True, - ): - with attempt: - logger.debug( - f"Executing {func.__name__} " # type: ignore[attr-defined] - f"(attempt {attempt.retry_state.attempt_number}/{max_attempts})" - ) - return await func(*args, **kwargs)