diff --git a/apps/worker/services/lock_manager.py b/apps/worker/services/lock_manager.py index 887aca43b..b99e83762 100644 --- a/apps/worker/services/lock_manager.py +++ b/apps/worker/services/lock_manager.py @@ -6,7 +6,9 @@ import sentry_sdk from redis import Redis # type: ignore +from redis.exceptions import ConnectionError as RedisConnectionError # type: ignore from redis.exceptions import LockError # type: ignore +from redis.exceptions import TimeoutError as RedisTimeoutError # type: ignore from database.enums import ReportType from shared.celery_config import ( @@ -23,6 +25,10 @@ RETRY_BACKOFF_MULTIPLIER = 3 RETRY_COUNTDOWN_RANGE_DIVISOR = 2 LOCK_NAME_SEPARATOR = "_lock_" +LOCK_ATTEMPTS_KEY_PREFIX = "lock_attempts:" +LOCK_ATTEMPTS_TTL_SECONDS = ( + 86400 # TTL for attempt counter key so it expires after one day +) class LockType(Enum): @@ -44,7 +50,7 @@ def __init__( countdown: int, max_retries_exceeded: bool = False, retry_num: int | None = None, - max_attempts: int | None = None, + max_retries: int | None = None, lock_name: str | None = None, repoid: int | None = None, commitid: str | None = None, @@ -52,13 +58,13 @@ def __init__( self.countdown = countdown self.max_retries_exceeded = max_retries_exceeded self.retry_num = retry_num - self.max_attempts = max_attempts + self.max_retries = max_retries self.lock_name = lock_name self.repoid = repoid self.commitid = commitid if max_retries_exceeded: error_msg = ( - f"Lock acquisition failed after {retry_num} retries (max: {max_attempts}). " + f"Lock acquisition failed after {retry_num} retries (max: {max_retries}). " f"Repo: {repoid}, Commit: {commitid}" ) super().__init__(error_msg) @@ -93,6 +99,22 @@ def lock_name(self, lock_type: LockType): else: return f"{lock_type.value}{LOCK_NAME_SEPARATOR}{self.repoid}_{self.commitid}_{self.report_type.value}" + def _clear_lock_attempt_counter(self, attempt_key: str, lock_name: str) -> None: + """Clear the lock attempt counter. Log and swallow Redis errors so teardown does not mask other failures.""" + try: + self.redis_connection.delete(attempt_key) + except (RedisConnectionError, RedisTimeoutError, OSError) as e: + log.warning( + "Failed to clear lock attempt counter (Redis unavailable or error)", + extra={ + "attempt_key": attempt_key, + "commitid": self.commitid, + "lock_name": lock_name, + "repoid": self.repoid, + }, + exc_info=True, + ) + @contextmanager def locked( self, @@ -106,9 +128,10 @@ def locked( lock_type: Type of lock to acquire retry_num: Attempt count (should be self.attempts from BaseCodecovTask). Used for both exponential backoff and max retry checking. - max_retries: Maximum number of retries allowed + max_retries: Maximum total attempts (stop when attempts >= this). """ lock_name = self.lock_name(lock_type) + attempt_key = f"{LOCK_ATTEMPTS_KEY_PREFIX}{lock_name}" try: log.info( "Acquiring lock", @@ -134,7 +157,10 @@ def locked( "repoid": self.repoid, }, ) - yield + try: + yield + finally: + self._clear_lock_attempt_counter(attempt_key, lock_name) lock_duration = time.time() - lock_acquired_time log.info( "Releasing lock", @@ -147,6 +173,12 @@ def locked( }, ) except LockError: + # incr/expire can raise RedisConnectionError/RedisTimeoutError when Redis + # is unavailable; we let those propagate so the task fails once (no infinite loop). + attempts = self.redis_connection.incr(attempt_key) + if attempts == 1: + self.redis_connection.expire(attempt_key, LOCK_ATTEMPTS_TTL_SECONDS) + max_retry_unbounded = self.base_retry_countdown * ( RETRY_BACKOFF_MULTIPLIER**retry_num ) @@ -159,28 +191,26 @@ def locked( ) countdown = countdown_unbounded - if max_retries is not None and retry_num > max_retries: - max_attempts = max_retries + 1 + if max_retries is not None and attempts >= max_retries: error = LockRetry( countdown=0, max_retries_exceeded=True, - retry_num=retry_num, - max_attempts=max_attempts, + retry_num=attempts, + max_retries=max_retries, lock_name=lock_name, repoid=self.repoid, commitid=self.commitid, ) log.error( - "Not retrying since we already had too many retries", + "Not retrying since we already had too many attempts", extra={ + "attempts": attempts, "commitid": self.commitid, "lock_name": lock_name, "lock_type": lock_type.value, - "max_attempts": max_attempts, "max_retries": max_retries, "repoid": self.repoid, "report_type": self.report_type.value, - "retry_num": retry_num, }, exc_info=True, ) @@ -188,15 +218,15 @@ def locked( error, contexts={ "lock_acquisition": { + "attempts": attempts, "blocking_timeout": self.blocking_timeout, "commitid": self.commitid, "lock_name": lock_name, "lock_timeout": self.lock_timeout, "lock_type": lock_type.value, - "max_attempts": max_attempts, + "max_retries": max_retries, "repoid": self.repoid, "report_type": self.report_type.value, - "retry_num": retry_num, } }, tags={ diff --git a/apps/worker/services/tests/test_lock_manager.py b/apps/worker/services/tests/test_lock_manager.py index e3273b3ac..673e30ae6 100644 --- a/apps/worker/services/tests/test_lock_manager.py +++ b/apps/worker/services/tests/test_lock_manager.py @@ -144,6 +144,7 @@ def test_locked_logs_duration(self, mock_redis, caplog): def test_locked_lock_error_raises_lock_retry(self, mock_redis): """Test that LockError raises LockRetry exception""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with pytest.raises(LockRetry) as exc_info: @@ -157,6 +158,7 @@ def test_locked_lock_error_raises_lock_retry(self, mock_redis): def test_locked_exponential_backoff_retry_0(self, mock_redis): """Test exponential backoff calculation for retry_num=0""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with pytest.raises(LockRetry) as exc_info: @@ -169,6 +171,7 @@ def test_locked_exponential_backoff_retry_0(self, mock_redis): def test_locked_exponential_backoff_retry_1(self, mock_redis): """Test exponential backoff calculation for retry_num=1""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with pytest.raises(LockRetry) as exc_info: @@ -181,6 +184,7 @@ def test_locked_exponential_backoff_retry_1(self, mock_redis): def test_locked_exponential_backoff_retry_2(self, mock_redis): """Test exponential backoff calculation for retry_num=2""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with pytest.raises(LockRetry) as exc_info: @@ -193,6 +197,7 @@ def test_locked_exponential_backoff_retry_2(self, mock_redis): def test_locked_exponential_backoff_cap(self, mock_redis): """Test that exponential backoff is capped at 5 hours""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") # Use a high retry_num that would exceed the cap @@ -206,6 +211,7 @@ def test_locked_exponential_backoff_cap(self, mock_redis): def test_locked_max_retries_not_provided(self, mock_redis, caplog): """Test that max_retries=None doesn't log error""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.ERROR): @@ -219,6 +225,7 @@ def test_locked_max_retries_not_provided(self, mock_redis, caplog): def test_locked_max_retries_not_exceeded(self, mock_redis, caplog): """Test that max_retries check doesn't log error when not exceeded""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.ERROR): @@ -234,8 +241,11 @@ def test_locked_max_retries_not_exceeded(self, mock_redis, caplog): assert len(error_logs) == 0 def test_locked_max_retries_exceeded(self, mock_redis, caplog): - """Test that max_retries exceeded raises LockRetry with max_retries_exceeded=True""" + """Test that max attempts exceeded raises LockRetry with max_retries_exceeded=True""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = ( + 3 # Redis attempt count >= max_retries (3 = 3 attempts) + ) manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.ERROR): @@ -243,11 +253,10 @@ def test_locked_max_retries_exceeded(self, mock_redis, caplog): with manager.locked(LockType.UPLOAD, retry_num=5, max_retries=3): pass - # Should raise LockRetry with max_retries_exceeded=True assert isinstance(exc_info.value, LockRetry) assert exc_info.value.max_retries_exceeded is True - assert exc_info.value.retry_num == 5 - assert exc_info.value.max_attempts == 4 # max_retries + 1 + assert exc_info.value.retry_num == 3 # from Redis incr + assert exc_info.value.max_retries == 3 assert exc_info.value.lock_name == "upload_lock_123_abc123" assert exc_info.value.repoid == 123 assert exc_info.value.commitid == "abc123" @@ -256,41 +265,40 @@ def test_locked_max_retries_exceeded(self, mock_redis, caplog): error_logs = [ r for r in caplog.records - if r.levelname == "ERROR" and "too many retries" in r.message + if r.levelname == "ERROR" and "too many attempts" in r.message ] assert len(error_logs) == 1 assert error_logs[0].__dict__["max_retries"] == 3 - assert error_logs[0].__dict__["retry_num"] == 5 + assert error_logs[0].__dict__["attempts"] == 3 def test_locked_max_retries_exceeded_at_boundary(self, mock_redis, caplog): - """Test that max_retries boundary condition raises LockRetry with max_retries_exceeded=True""" + """Test that max attempts boundary (attempts >= max_retries) raises LockRetry""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 3 # attempts >= max_retries (3) manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.ERROR): with pytest.raises(LockRetry) as exc_info: - # retry_num now represents self.attempts (starts at 1) - # max_retries=3 means max_attempts=4, so retry_num=4 should exceed with manager.locked(LockType.UPLOAD, retry_num=4, max_retries=3): pass - # Should raise LockRetry with max_retries_exceeded=True assert isinstance(exc_info.value, LockRetry) assert exc_info.value.max_retries_exceeded is True - assert exc_info.value.retry_num == 4 - assert exc_info.value.max_attempts == 4 + assert exc_info.value.retry_num == 3 + assert exc_info.value.max_retries == 3 assert exc_info.value.countdown == 0 error_logs = [ r for r in caplog.records - if r.levelname == "ERROR" and "too many retries" in r.message + if r.levelname == "ERROR" and "too many attempts" in r.message ] assert len(error_logs) == 1 def test_locked_warning_logged_on_lock_error(self, mock_redis, caplog): """Test that warning is logged when lock cannot be acquired""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.WARNING): @@ -358,6 +366,7 @@ def test_locked_blocking_timeout_enables_retry_logic(self, mock_redis): """ # When blocking_timeout is set, Redis raises LockError after timeout mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123", blocking_timeout=5) diff --git a/apps/worker/tasks/base.py b/apps/worker/tasks/base.py index 753458587..9d5b7294b 100644 --- a/apps/worker/tasks/base.py +++ b/apps/worker/tasks/base.py @@ -182,6 +182,12 @@ def _get_request_headers(request) -> dict: class BaseCodecovTask(celery_app.Task): + """Base task for Codecov workers. + + In this codebase, max_retries is the maximum total attempts (cap on runs). + We stop when attempts >= max_retries. The name matches Celery/config. + """ + Request = BaseCodecovRequest def __init_subclass__(cls, name=None): @@ -306,27 +312,19 @@ def attempts(self) -> int: return attempts_value def _has_exceeded_max_attempts(self, max_retries: int | None) -> bool: - """Check if task has exceeded max attempts.""" + """Return True if attempts >= max_retries (max_retries is max total attempts).""" if max_retries is None: return False - max_attempts = max_retries + 1 - return self.attempts >= max_attempts + return self.attempts >= max_retries def safe_retry(self, countdown=None, exc=None, **kwargs): - """Safely retry with max retry limit. Returns False if max exceeded, otherwise raises Retry. - - Uses self.max_retries to determine the retry limit. Tasks define max_retries as a class - attribute, so it's known at instantiation and doesn't change. - """ + """Safely retry with max retry limit. Returns False if max exceeded, otherwise raises Retry.""" if self._has_exceeded_max_attempts(self.max_retries): - # If we're here, self.max_retries is not None (otherwise _has_exceeded_max_attempts returns False) - max_attempts = self.max_retries + 1 log.error( f"Task {self.name} exceeded max retries", extra={ "attempts": self.attempts, - "max_attempts": max_attempts, "max_retries": self.max_retries, "task_name": self.name, }, @@ -334,12 +332,11 @@ def safe_retry(self, countdown=None, exc=None, **kwargs): TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc() sentry_sdk.capture_exception( MaxRetriesExceededError( - f"Task {self.name} exceeded max retries: {self.attempts} >= {max_attempts}" + f"Task {self.name} exceeded max retries: {self.attempts} >= {self.max_retries}" ), contexts={ "task": { "attempts": self.attempts, - "max_attempts": max_attempts, "max_retries": self.max_retries, "task_name": self.name, } diff --git a/apps/worker/tasks/bundle_analysis_notify.py b/apps/worker/tasks/bundle_analysis_notify.py index 118018d36..cc08558d4 100644 --- a/apps/worker/tasks/bundle_analysis_notify.py +++ b/apps/worker/tasks/bundle_analysis_notify.py @@ -77,7 +77,7 @@ def run_impl( "commitid": commitid, "repoid": repoid, "retry_num": retry.retry_num, - "max_attempts": retry.max_attempts, + "max_retries": retry.max_retries, }, ) return { diff --git a/apps/worker/tasks/bundle_analysis_processor.py b/apps/worker/tasks/bundle_analysis_processor.py index be7f242d9..a64425a06 100644 --- a/apps/worker/tasks/bundle_analysis_processor.py +++ b/apps/worker/tasks/bundle_analysis_processor.py @@ -30,6 +30,42 @@ log = logging.getLogger(__name__) +def _log_max_retries_exceeded( + commitid: str, + repoid: int, + attempts: int, + max_retries: int, + retry_num: int | None = None, +) -> None: + extra = { + "attempts": attempts, + "commitid": commitid, + "max_retries": max_retries, + "repoid": repoid, + } + if retry_num is not None: + extra["retry_num"] = retry_num + log.error("Bundle analysis processor exceeded max retries", extra=extra) + + +def _set_upload_error_and_commit( + db_session, + upload, + commitid: str, + repoid: int, + log_suffix: str = "", +) -> None: + upload.state_id = UploadState.ERROR.db_id + upload.state = "error" + try: + db_session.commit() + except Exception: + log.exception( + f"Failed to commit upload error state{log_suffix}", + extra={"commit": commitid, "repoid": repoid, "upload_id": upload.id_}, + ) + + class BundleAnalysisProcessorTask( BaseCodecovTask, name=bundle_analysis_processor_task_name ): @@ -80,25 +116,19 @@ def run_impl( previous_result, ) except LockRetry as retry: - # Check max retries using self.attempts (includes visibility timeout re-deliveries) - # This prevents infinite retry loops when max retries are exceeded - if self._has_exceeded_max_attempts(self.max_retries): - max_attempts = ( - self.max_retries + 1 if self.max_retries is not None else None - ) - log.error( - "Bundle analysis processor exceeded max retries", - extra={ - "attempts": self.attempts, - "commitid": commitid, - "max_attempts": max_attempts, - "max_retries": self.max_retries, - "repoid": repoid, - "retry_num": self.request.retries, - }, + # Honor LockManager cap (Redis attempt count) so re-delivered messages stop. + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + self.max_retries + ): + _log_max_retries_exceeded( + commitid=commitid, + repoid=repoid, + attempts=( + retry.retry_num if retry.max_retries_exceeded else self.attempts + ), + max_retries=self.max_retries, + retry_num=self.request.retries, ) - # Return previous_result to preserve chain behavior when max retries exceeded - # This allows the chain to continue with partial results rather than failing entirely return previous_result self.retry(max_retries=self.max_retries, countdown=retry.countdown) @@ -204,24 +234,16 @@ def process_impl_within_lock( result = report_service.process_upload(commit, upload, compare_sha) if result.error and result.error.is_retryable: if self._has_exceeded_max_attempts(self.max_retries): - attempts = self.attempts - max_attempts = self.max_retries + 1 - log.error( - "Bundle analysis processor exceeded max retries", - extra={ - "attempts": attempts, - "commitid": commitid, - "max_attempts": max_attempts, - "max_retries": self.max_retries, - "repoid": repoid, - }, + _log_max_retries_exceeded( + commitid=commitid, + repoid=repoid, + attempts=self.attempts, + max_retries=self.max_retries, ) - # Update upload state to "error" before returning try: result.update_upload(carriedforward=carriedforward) db_session.commit() except Exception: - # Log commit failure but don't raise - we've already set the error state log.exception( "Failed to update and commit upload error state after max retries exceeded", extra={ @@ -230,21 +252,10 @@ def process_impl_within_lock( "upload_id": upload.id_, }, ) - # Manually set upload state to error as fallback - upload.state_id = UploadState.ERROR.db_id - upload.state = "error" - try: - db_session.commit() - except Exception: - log.exception( - "Failed to commit upload error state fallback", - extra={ - "commit": commitid, - "repoid": repoid, - "upload_id": upload.id_, - }, - ) - return processing_results + _set_upload_error_and_commit( + db_session, upload, commitid, repoid, log_suffix=" fallback" + ) + return previous_result log.warn( "Attempting to retry bundle analysis upload", extra={ @@ -280,21 +291,8 @@ def process_impl_within_lock( "parent_task": self.request.parent_id, }, ) - upload.state_id = UploadState.ERROR.db_id - upload.state = "error" - try: - db_session.commit() - except Exception: - # Log commit failure but preserve original exception - log.exception( - "Failed to commit upload error state", - extra={ - "commit": commitid, - "repoid": repoid, - "upload_id": upload.id_, - }, - ) - raise + _set_upload_error_and_commit(db_session, upload, commitid, repoid) + return processing_results finally: if result is not None and result.bundle_report: result.bundle_report.cleanup() diff --git a/apps/worker/tasks/manual_trigger.py b/apps/worker/tasks/manual_trigger.py index 0b3c95d2c..e8b783f7f 100644 --- a/apps/worker/tasks/manual_trigger.py +++ b/apps/worker/tasks/manual_trigger.py @@ -58,7 +58,9 @@ def run_impl( **kwargs, ) except LockRetry as retry: - if self._has_exceeded_max_attempts(TASK_MAX_RETRIES_DEFAULT): + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + TASK_MAX_RETRIES_DEFAULT + ): return { "notifications_called": False, "message": "Unable to acquire lock", diff --git a/apps/worker/tasks/preprocess_upload.py b/apps/worker/tasks/preprocess_upload.py index 05795fb5b..087480edd 100644 --- a/apps/worker/tasks/preprocess_upload.py +++ b/apps/worker/tasks/preprocess_upload.py @@ -78,7 +78,9 @@ def run_impl(self, db_session, *, repoid: int, commitid: str, **kwargs): milestone=Milestones.READY_FOR_REPORT, error=Errors.INTERNAL_LOCK_ERROR, ) - if self._has_exceeded_max_attempts(PREPROCESS_UPLOAD_MAX_RETRIES): + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + PREPROCESS_UPLOAD_MAX_RETRIES + ): return { "preprocessed_upload": False, "reason": "unable_to_acquire_lock", diff --git a/apps/worker/tasks/test_analytics_notifier.py b/apps/worker/tasks/test_analytics_notifier.py index b58b51f74..7748e7a25 100644 --- a/apps/worker/tasks/test_analytics_notifier.py +++ b/apps/worker/tasks/test_analytics_notifier.py @@ -236,11 +236,11 @@ def _notification_lock(self, lock_manager: LockManager): "Not retrying lock acquisition - max retries exceeded", extra={ "retry_num": e.retry_num, - "max_attempts": e.max_attempts, + "max_retries": e.max_retries, }, ) raise MaxRetriesExceededError( - f"Lock acquisition exceeded max retries: {e.retry_num} >= {e.max_attempts}", + f"Lock acquisition exceeded max retries: {e.retry_num} >= {e.max_retries}", ) # Re-raise LockRetry to be handled by the caller's retry logic # The caller will catch this and call self.retry() diff --git a/apps/worker/tasks/test_results_finisher.py b/apps/worker/tasks/test_results_finisher.py index 40cb808ea..47c8a5012 100644 --- a/apps/worker/tasks/test_results_finisher.py +++ b/apps/worker/tasks/test_results_finisher.py @@ -78,7 +78,7 @@ def run_impl( "commitid": commitid, "repoid": repoid, "retry_num": retry.retry_num, - "max_attempts": retry.max_attempts, + "max_retries": retry.max_retries, }, ) return {"queue_notify": False} diff --git a/apps/worker/tasks/tests/integration/test_bundle_analysis_e2e.py b/apps/worker/tasks/tests/integration/test_bundle_analysis_e2e.py index e9eb3c8c7..0f5ed634a 100644 --- a/apps/worker/tasks/tests/integration/test_bundle_analysis_e2e.py +++ b/apps/worker/tasks/tests/integration/test_bundle_analysis_e2e.py @@ -236,6 +236,7 @@ def test_bundle_analysis_chain_max_retries_exceeded( # Mock Redis lock to fail (simulating lock contention) # This will cause LockRetry to be raised mock_redis.lock.return_value.__enter__.side_effect = LockError("Lock failed") + mock_redis.incr.return_value = 1 # LockManager: avoid MagicMock >= int # Create the chain task_signatures = [ diff --git a/apps/worker/tasks/tests/unit/test_base.py b/apps/worker/tasks/tests/unit/test_base.py index 237ba9b51..35e9e9109 100644 --- a/apps/worker/tasks/tests/unit/test_base.py +++ b/apps/worker/tasks/tests/unit/test_base.py @@ -654,12 +654,12 @@ def test_safe_retry_handles_max_retries_exceeded_exception(self, mocker): assert actual_increment == expected_increment def test_safe_retry_fails_when_attempts_exceeds_max(self, mocker): - """Test that safe_retry fails when attempts exceeds max_attempts even if retries don't.""" + """Test that safe_retry fails when attempts exceeds max_retries even if retries don't.""" task = SampleTask() task.request.retries = 3 # Below max_retries task.max_retries = 10 # Simulate re-delivery: attempts is ahead of retry_count - task.request.headers = {"attempts": 12} # Exceeds max_attempts (10 + 1 = 11) + task.request.headers = {"attempts": 12} # Exceeds max_retries (10) mock_retry = mocker.patch("celery.app.task.Task.retry") @@ -773,25 +773,34 @@ def test_get_attempts_with_re_delivery_scenario(self): @pytest.mark.django_db(databases={"default", "timeseries"}) class TestBaseCodecovTaskHasExceededMaxAttempts: - def test_has_exceeded_max_attempts_none_max_retries(self): + def test_has_exceeded_max_attempts_none_max_retries(self, mocker): """Test _has_exceeded_max_attempts when max_retries is None.""" + mock_request = MagicMock() + mock_request.retries = 100 + mock_request.headers = {"attempts": 100} + mock_property = PropertyMock(return_value=mock_request) + mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - task.request.retries = 100 - task.request.headers = {"attempts": 100} assert task._has_exceeded_max_attempts(max_retries=None) is False - def test_has_exceeded_max_attempts_by_attempts(self): - """Test _has_exceeded_max_attempts when attempts >= max_attempts.""" + def test_has_exceeded_max_attempts_by_attempts(self, mocker): + """Test _has_exceeded_max_attempts when attempts >= max_retries.""" + mock_request = MagicMock() + mock_request.retries = 5 + mock_request.headers = {"attempts": 11} + mock_property = PropertyMock(return_value=mock_request) + mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - task.request.retries = 5 - task.request.headers = {"attempts": 11} assert task._has_exceeded_max_attempts(max_retries=10) is True - def test_has_exceeded_max_attempts_not_exceeded(self): - """Test _has_exceeded_max_attempts when attempts < max_attempts.""" + def test_has_exceeded_max_attempts_not_exceeded(self, mocker): + """Test _has_exceeded_max_attempts when attempts < max_retries.""" + mock_request = MagicMock() + mock_request.retries = 3 + mock_request.headers = {"attempts": 4} + mock_property = PropertyMock(return_value=mock_request) + mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - task.request.retries = 3 - task.request.headers = {"attempts": 4} assert task._has_exceeded_max_attempts(max_retries=10) is False def test_has_exceeded_max_attempts_without_request(self, mocker): diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py index d77d9f636..80f78a1a6 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py @@ -223,19 +223,20 @@ def test_bundle_analysis_processor_task_general_error( task = BundleAnalysisProcessorTask() retry = mocker.patch.object(task, "retry") - with pytest.raises(Exception): - task.run_impl( - dbsession, - [{"previous": "result"}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - params={ - "upload_id": upload.id_, - "commit": commit.commitid, - }, - ) + previous_result = [{"previous": "result"}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={ + "upload_id": upload.id_, + "commit": commit.commitid, + }, + ) + assert result == previous_result assert upload.state == "error" assert not retry.called @@ -329,6 +330,9 @@ def test_bundle_analysis_processor_task_locked( }, ) mock_redis.lock.return_value.__enter__.side_effect = LockError() + mock_redis.incr.return_value = ( + 1 # LockManager: attempts < max_retries so it raises LockRetry(countdown) + ) commit = CommitFactory.create() dbsession.add(commit) @@ -392,6 +396,8 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_raises_error( ) # Mock Redis to simulate lock failure - this will cause LockManager to raise LockRetry mock_redis.lock.return_value.__enter__.side_effect = LockError() + # So LockManager sees attempts >= max_retries and raises LockRetry(max_retries_exceeded=True) + mock_redis.incr.return_value = BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES commit = CommitFactory.create() dbsession.add(commit) @@ -410,7 +416,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_raises_error( dbsession.flush() task = BundleAnalysisProcessorTask() - # Set retries to max_retries to simulate max retries exceeded scenario + # Set retries to max_retries to simulate max attempts exceeded scenario # Our code checks if retries >= max_retries, so retries = max_retries should exceed # This tests the real retry logic without mocking safe_retry or _has_exceeded_max_attempts task.request.retries = BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES @@ -483,6 +489,7 @@ def track_and_raise(*args, **kwargs): raise LockError() mock_redis.lock.side_effect = track_and_raise + mock_redis.incr.return_value = 1 # LockManager: avoid MagicMock >= int task = BundleAnalysisProcessorTask() task.request.retries = 0 @@ -561,6 +568,7 @@ def capture_init(self, *args, **kwargs): mocker.patch.object(LockManager, "__init__", capture_init) mock_redis.lock.return_value.__enter__.side_effect = LockError() + mock_redis.incr.return_value = 1 task = BundleAnalysisProcessorTask() task.request.retries = 0 @@ -1650,6 +1658,8 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_lock( }, ) mock_redis.lock.return_value.__enter__.side_effect = LockError() + # So LockManager sees attempts >= max_retries and raises LockRetry(max_retries_exceeded=True) + mock_redis.incr.return_value = BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES commit = CommitFactory.create() dbsession.add(commit) @@ -1713,6 +1723,9 @@ def test_bundle_analysis_processor_task_safe_retry_fails( }, ) mock_redis.lock.return_value.__enter__.side_effect = LockError() + mock_redis.incr.return_value = ( + 1 # LockManager: attempts < max so it raises LockRetry(countdown) + ) commit = CommitFactory.create() dbsession.add(commit) @@ -1785,7 +1798,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_processing( task = BundleAnalysisProcessorTask() task.request.retries = task.max_retries - task.request.headers = {"attempts": task.max_retries + 1} + task.request.headers = {"attempts": task.max_retries} # >= max_retries → exceeded mocker.patch.object(task, "_has_exceeded_max_attempts", return_value=True) # Create a ProcessingResult with a retryable error @@ -1840,7 +1853,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( Scenario: - self.request.retries = 5 (below max_retries of 10) - - self.attempts = 11 (exceeds max_retries due to visibility timeout re-deliveries) + - self.attempts = 11 (exceeds max_retries 10 due to visibility timeout re-deliveries) - Task should stop retrying and return previous_result """ storage_path = ( @@ -1857,6 +1870,9 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( ) # Mock Redis to simulate lock failure - this will cause LockManager to raise LockRetry mock_redis.lock.return_value.__enter__.side_effect = LockError() + # LockManager gets attempt count from Redis; use 1 so it raises LockRetry(max_retries_exceeded=False). + # Task then uses self.attempts (from headers) and returns because attempts >= max_retries. + mock_redis.incr.return_value = 1 commit = CommitFactory.create() dbsession.add(commit) @@ -1880,7 +1896,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( # - attempts header is high (11) due to visibility timeout re-deliveries # This simulates the bug where tasks kept retrying after max attempts task.request.retries = 5 # Below max_retries (10) - task.request.headers = {"attempts": 11} # Exceeds max_retries + 1 (11 > 10 + 1) + task.request.headers = {"attempts": 11} # Exceeds max_retries 10 (11 >= 10) previous_result = [{"previous": "result"}] # Task should return previous_result when max attempts exceeded (via attempts header) @@ -1905,7 +1921,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_commit_failure( dbsession, mock_storage, ): - """Test that when max retries are exceeded and commit fails, fallback error handling works.""" + """Test that when max attempts are exceeded and commit fails, fallback error handling works.""" storage_path = ( "v1/repos/testing/ed1bdd67-8fd2-4cdb-ac9e-39b99e4a3892/bundle_report.sqlite" ) @@ -1933,7 +1949,8 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_commit_failure( task = BundleAnalysisProcessorTask() task.request.retries = task.max_retries - task.request.headers = {"attempts": task.max_retries + 1} + # Exceed when attempts >= max_retries + task.request.headers = {"attempts": task.max_retries} mocker.patch.object(task, "_has_exceeded_max_attempts", return_value=True) # Create a ProcessingResult with a retryable error @@ -1985,7 +2002,7 @@ def test_bundle_analysis_processor_task_general_error_commit_failure( dbsession, mock_storage, ): - """Test that when general exception occurs and commit fails, error is logged but exception is preserved.""" + """Test that when general exception occurs and commit fails, task returns (does not re-raise) and sets upload to error.""" storage_path = ( "v1/repos/testing/ed1bdd67-8fd2-4cdb-ac9e-39b99e4a3892/bundle_report.sqlite" ) @@ -2023,26 +2040,24 @@ def test_bundle_analysis_processor_task_general_error_commit_failure( task = BundleAnalysisProcessorTask() retry = mocker.patch.object(task, "retry") - # Mock commit to fail - commit_mock = mocker.patch.object(dbsession, "commit") - commit_mock.side_effect = Exception("Commit failed") + # Mock commit to fail so we exercise the "set upload error and commit" path + mocker.patch.object(dbsession, "commit", side_effect=Exception("Commit failed")) - with pytest.raises(ValueError, match="Processing failed"): - task.run_impl( - dbsession, - [{"previous": "result"}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - params={ - "upload_id": upload.id_, - "commit": commit.commitid, - }, - ) + previous_result = [{"previous": "result"}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={ + "upload_id": upload.id_, + "commit": commit.commitid, + }, + ) - # Upload state should be set to error in memory (even though commit failed) - # Note: We check in-memory state, not database state, since commit failed - # The code does attempt to set error state, but database won't reflect it if commit fails + # Task returns processing_results (not re-raises) to ack and avoid unbounded retries + assert result == previous_result assert upload.state == "error" assert not retry.called @@ -2052,7 +2067,7 @@ def test_bundle_analysis_processor_task_cleanup_with_none_result( dbsession, mock_storage, ): - """Test that cleanup handles None result gracefully.""" + """Test that cleanup handles None result gracefully when process_upload raises.""" storage_path = ( "v1/repos/testing/ed1bdd67-8fd2-4cdb-ac9e-39b99e4a3892/bundle_report.sqlite" ) @@ -2086,18 +2101,19 @@ def test_bundle_analysis_processor_task_cleanup_with_none_result( task = BundleAnalysisProcessorTask() - with pytest.raises(ValueError, match="Processing failed"): - task.run_impl( - dbsession, - [{"previous": "result"}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - params={ - "upload_id": upload.id_, - "commit": commit.commitid, - }, - ) + # Task returns (does not re-raise) to ack and avoid unbounded retries; cleanup runs in finally + previous_result = [{"previous": "result"}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={ + "upload_id": upload.id_, + "commit": commit.commitid, + }, + ) - # Should not crash even though result is None - # The finally block should handle None result gracefully + assert result == previous_result + assert upload.state == "error" diff --git a/apps/worker/tasks/tests/unit/test_manual_trigger.py b/apps/worker/tasks/tests/unit/test_manual_trigger.py index 32ab141ab..61019fe50 100644 --- a/apps/worker/tasks/tests/unit/test_manual_trigger.py +++ b/apps/worker/tasks/tests/unit/test_manual_trigger.py @@ -164,7 +164,7 @@ def test_manual_upload_completion_trigger_with_uploaded_state( def test_lock_retry_max_retries_exceeded( self, mocker, mock_configuration, dbsession, mock_storage, mock_redis ): - """Test that LockRetry returns failure dict when max retries exceeded""" + """Test that LockRetry returns failure dict when max retries exceeded (task attempts).""" commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -187,6 +187,37 @@ def test_lock_retry_max_retries_exceeded( "message": "Unable to acquire lock", } + def test_lock_retry_max_retries_exceeded_from_lock_manager( + self, mocker, mock_configuration, dbsession, mock_storage, mock_redis + ): + """Test that LockRetry with max_retries_exceeded=True returns failure dict (Redis counter path).""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + m = mocker.MagicMock() + m.return_value.locked.return_value.__enter__.side_effect = LockRetry( + countdown=0, + max_retries_exceeded=True, + retry_num=TASK_MAX_RETRIES_DEFAULT, + max_retries=TASK_MAX_RETRIES_DEFAULT, + ) + mocker.patch("tasks.manual_trigger.LockManager", m) + + task = ManualTriggerTask() + task.request.retries = 0 + task.request.headers = {"attempts": 1} + ensure_hard_time_limit_task_is_numeric(mocker, task) + + result = task.run_impl( + dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml={} + ) + + assert result == { + "notifications_called": False, + "message": "Unable to acquire lock", + } + def test_lock_retry_will_retry( self, mocker, mock_configuration, dbsession, mock_storage, mock_redis ): diff --git a/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py b/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py index 201cfbb16..d8a9b5ba5 100644 --- a/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py +++ b/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py @@ -138,7 +138,7 @@ def test_max_retries_exceeded_from_notification_lock( countdown=0, max_retries_exceeded=True, retry_num=5, - max_attempts=5, + max_retries=5, lock_name="notification_lock", repoid=commit.repoid, commitid=commit.commitid, diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 4cf742dbf..41e54cdc9 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -466,13 +466,16 @@ def _process_reports_with_lock( upload_ids=upload_ids, error=Errors.INTERNAL_LOCK_ERROR, ) - if self._has_exceeded_max_attempts(UPLOAD_PROCESSOR_MAX_RETRIES): + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + UPLOAD_PROCESSOR_MAX_RETRIES + ): log.error( "Upload finisher exceeded max retries", extra={ - "attempts": self.attempts, + "attempts": retry.retry_num + if retry.max_retries_exceeded + else self.attempts, "commitid": commitid, - "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES + 1, "max_retries": UPLOAD_PROCESSOR_MAX_RETRIES, "repoid": repoid, }, @@ -584,13 +587,16 @@ def _handle_finisher_lock( error=Errors.INTERNAL_LOCK_ERROR, ) UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) - if self._has_exceeded_max_attempts(UPLOAD_PROCESSOR_MAX_RETRIES): + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + UPLOAD_PROCESSOR_MAX_RETRIES + ): log.error( "Upload finisher exceeded max retries (finisher lock)", extra={ - "attempts": self.attempts, + "attempts": retry.retry_num + if retry.max_retries_exceeded + else self.attempts, "commitid": commitid, - "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES + 1, "max_retries": UPLOAD_PROCESSOR_MAX_RETRIES, "repoid": repoid, }, diff --git a/libs/shared/shared/celery_config.py b/libs/shared/shared/celery_config.py index 021e69475..92b3fcbd5 100644 --- a/libs/shared/shared/celery_config.py +++ b/libs/shared/shared/celery_config.py @@ -248,8 +248,7 @@ def get_task_group(task_name: str) -> str | None: ) ) -# Maximum retries for tasks hitting transient conditions (e.g., processing locks) -# Default: 10 retries +# max_retries = maximum total attempts (stop when attempts >= this). Config key name matches Celery. # Celery docs: https://docs.celeryq.dev/en/stable/userguide/tasks.html#max-retries TASK_MAX_RETRIES_DEFAULT = int( get_config("setup", "tasks", "celery", "max_retries", default=10) @@ -306,8 +305,7 @@ def get_task_group(task_name: str) -> str | None: get_config("setup", "tasks", "upload", "processor_max_retries", default=5) ) -# Bundle analysis processor max retries -# How many times to retry when bundle analysis processor lock cannot be acquired +# Bundle analysis processor max_retries (max total attempts) # Default: matches TASK_MAX_RETRIES_DEFAULT BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES = int( get_config(