From b888eb37c05f23269e3e3016cca495f6cbaf0040 Mon Sep 17 00:00:00 2001 From: Joe Becher Date: Fri, 6 Feb 2026 13:04:08 -0500 Subject: [PATCH 1/5] fix(worker): Cap bundle analysis processor at 10 attempts and fix retry logic Cap total attempts at 10 (not 10 retries + 1) for BundleAnalysisProcessorTask and LockManager so we stop after 10 tries. Add Redis-backed attempt counter in LockManager for lock contention so broker re-deliveries with unchanged headers do not retry indefinitely. BaseCodecovTask._has_exceeded_max_attempts now takes max_attempts and compares to attempts (retries + 1 or header). On generic exception in bundle processor, return and set upload to error instead of re-raising to avoid unbounded retries. Update tests: mock request for _has_exceeded_max_attempts, set mock_redis.incr.return_value where LockManager compares attempts, and adjust cleanup test to expect return instead of raised ValueError. Refs CCMRG-2042 Co-authored-by: Cursor --- apps/worker/services/lock_manager.py | 45 +++++-- .../services/tests/test_lock_manager.py | 35 +++-- apps/worker/tasks/base.py | 21 ++- .../worker/tasks/bundle_analysis_processor.py | 120 +++++++++--------- apps/worker/tasks/preprocess_upload.py | 4 +- .../integration/test_bundle_analysis_e2e.py | 1 + apps/worker/tasks/tests/unit/test_base.py | 37 ++++-- .../test_bundle_analysis_processor_task.py | 119 +++++++++-------- apps/worker/tasks/upload_finisher.py | 16 ++- libs/shared/shared/celery_config.py | 7 +- 10 files changed, 233 insertions(+), 172 deletions(-) diff --git a/apps/worker/services/lock_manager.py b/apps/worker/services/lock_manager.py index ab4ce74a5..3a102c188 100644 --- a/apps/worker/services/lock_manager.py +++ b/apps/worker/services/lock_manager.py @@ -6,7 +6,9 @@ import sentry_sdk from redis import Redis # type: ignore +from redis.exceptions import ConnectionError as RedisConnectionError # type: ignore from redis.exceptions import LockError # type: ignore +from redis.exceptions import TimeoutError as RedisTimeoutError # type: ignore from database.enums import ReportType from shared.celery_config import ( @@ -22,6 +24,8 @@ RETRY_BACKOFF_MULTIPLIER = 3 RETRY_COUNTDOWN_RANGE_DIVISOR = 2 LOCK_NAME_SEPARATOR = "_lock_" +LOCK_ATTEMPTS_KEY_PREFIX = "lock_attempts:" +LOCK_ATTEMPTS_TTL_SECONDS = 86400 # 24h # Exponential backoff calculation: BASE * MULTIPLIER^retry_num # With BASE=200 and MULTIPLIER=3, this yields: @@ -112,6 +116,7 @@ def locked( max_retries: Maximum number of retries allowed """ lock_name = self.lock_name(lock_type) + attempt_key = f"{LOCK_ATTEMPTS_KEY_PREFIX}{lock_name}" try: log.info( "Acquiring lock", @@ -137,7 +142,22 @@ def locked( "repoid": self.repoid, }, ) - yield + try: + yield + finally: + try: + self.redis_connection.delete(attempt_key) + except (RedisConnectionError, RedisTimeoutError, OSError) as e: + log.warning( + "Failed to clear lock attempt counter (Redis unavailable or error)", + extra={ + "attempt_key": attempt_key, + "commitid": self.commitid, + "lock_name": lock_name, + "repoid": self.repoid, + }, + exc_info=True, + ) lock_duration = time.time() - lock_acquired_time log.info( "Releasing lock", @@ -150,6 +170,12 @@ def locked( }, ) except LockError: + # incr/expire can raise RedisConnectionError/RedisTimeoutError when Redis + # is unavailable; we let those propagate so the task fails once (no infinite loop). + attempts = self.redis_connection.incr(attempt_key) + if attempts == 1: + self.redis_connection.expire(attempt_key, LOCK_ATTEMPTS_TTL_SECONDS) + max_retry_unbounded = BASE_RETRY_COUNTDOWN_SECONDS * ( RETRY_BACKOFF_MULTIPLIER**retry_num ) @@ -162,28 +188,27 @@ def locked( ) countdown = countdown_unbounded - if max_retries is not None and retry_num > max_retries: - max_attempts = max_retries + 1 + if max_retries is not None and attempts >= max_retries: error = LockRetry( countdown=0, max_retries_exceeded=True, - retry_num=retry_num, - max_attempts=max_attempts, + retry_num=attempts, + max_attempts=max_retries, lock_name=lock_name, repoid=self.repoid, commitid=self.commitid, ) log.error( - "Not retrying since we already had too many retries", + "Not retrying since we already had too many attempts", extra={ + "attempts": attempts, "commitid": self.commitid, "lock_name": lock_name, "lock_type": lock_type.value, - "max_attempts": max_attempts, + "max_attempts": max_retries, "max_retries": max_retries, "repoid": self.repoid, "report_type": self.report_type.value, - "retry_num": retry_num, }, exc_info=True, ) @@ -191,15 +216,15 @@ def locked( error, contexts={ "lock_acquisition": { + "attempts": attempts, "blocking_timeout": self.blocking_timeout, "commitid": self.commitid, "lock_name": lock_name, "lock_timeout": self.lock_timeout, "lock_type": lock_type.value, - "max_attempts": max_attempts, + "max_attempts": max_retries, "repoid": self.repoid, "report_type": self.report_type.value, - "retry_num": retry_num, } }, tags={ diff --git a/apps/worker/services/tests/test_lock_manager.py b/apps/worker/services/tests/test_lock_manager.py index e3273b3ac..5079591c3 100644 --- a/apps/worker/services/tests/test_lock_manager.py +++ b/apps/worker/services/tests/test_lock_manager.py @@ -144,6 +144,7 @@ def test_locked_logs_duration(self, mock_redis, caplog): def test_locked_lock_error_raises_lock_retry(self, mock_redis): """Test that LockError raises LockRetry exception""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with pytest.raises(LockRetry) as exc_info: @@ -157,6 +158,7 @@ def test_locked_lock_error_raises_lock_retry(self, mock_redis): def test_locked_exponential_backoff_retry_0(self, mock_redis): """Test exponential backoff calculation for retry_num=0""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with pytest.raises(LockRetry) as exc_info: @@ -169,6 +171,7 @@ def test_locked_exponential_backoff_retry_0(self, mock_redis): def test_locked_exponential_backoff_retry_1(self, mock_redis): """Test exponential backoff calculation for retry_num=1""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with pytest.raises(LockRetry) as exc_info: @@ -181,6 +184,7 @@ def test_locked_exponential_backoff_retry_1(self, mock_redis): def test_locked_exponential_backoff_retry_2(self, mock_redis): """Test exponential backoff calculation for retry_num=2""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with pytest.raises(LockRetry) as exc_info: @@ -193,6 +197,7 @@ def test_locked_exponential_backoff_retry_2(self, mock_redis): def test_locked_exponential_backoff_cap(self, mock_redis): """Test that exponential backoff is capped at 5 hours""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") # Use a high retry_num that would exceed the cap @@ -206,6 +211,7 @@ def test_locked_exponential_backoff_cap(self, mock_redis): def test_locked_max_retries_not_provided(self, mock_redis, caplog): """Test that max_retries=None doesn't log error""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.ERROR): @@ -219,6 +225,7 @@ def test_locked_max_retries_not_provided(self, mock_redis, caplog): def test_locked_max_retries_not_exceeded(self, mock_redis, caplog): """Test that max_retries check doesn't log error when not exceeded""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.ERROR): @@ -234,8 +241,11 @@ def test_locked_max_retries_not_exceeded(self, mock_redis, caplog): assert len(error_logs) == 0 def test_locked_max_retries_exceeded(self, mock_redis, caplog): - """Test that max_retries exceeded raises LockRetry with max_retries_exceeded=True""" + """Test that max attempts exceeded raises LockRetry with max_retries_exceeded=True""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = ( + 3 # Redis attempt count >= max_retries (3 = 3 attempts) + ) manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.ERROR): @@ -243,11 +253,10 @@ def test_locked_max_retries_exceeded(self, mock_redis, caplog): with manager.locked(LockType.UPLOAD, retry_num=5, max_retries=3): pass - # Should raise LockRetry with max_retries_exceeded=True assert isinstance(exc_info.value, LockRetry) assert exc_info.value.max_retries_exceeded is True - assert exc_info.value.retry_num == 5 - assert exc_info.value.max_attempts == 4 # max_retries + 1 + assert exc_info.value.retry_num == 3 # from Redis incr + assert exc_info.value.max_attempts == 3 # max_retries = max attempts assert exc_info.value.lock_name == "upload_lock_123_abc123" assert exc_info.value.repoid == 123 assert exc_info.value.commitid == "abc123" @@ -256,41 +265,40 @@ def test_locked_max_retries_exceeded(self, mock_redis, caplog): error_logs = [ r for r in caplog.records - if r.levelname == "ERROR" and "too many retries" in r.message + if r.levelname == "ERROR" and "too many attempts" in r.message ] assert len(error_logs) == 1 assert error_logs[0].__dict__["max_retries"] == 3 - assert error_logs[0].__dict__["retry_num"] == 5 + assert error_logs[0].__dict__["attempts"] == 3 def test_locked_max_retries_exceeded_at_boundary(self, mock_redis, caplog): - """Test that max_retries boundary condition raises LockRetry with max_retries_exceeded=True""" + """Test that max attempts boundary (attempts >= max_retries) raises LockRetry""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 3 # attempts >= max_retries (3) manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.ERROR): with pytest.raises(LockRetry) as exc_info: - # retry_num now represents self.attempts (starts at 1) - # max_retries=3 means max_attempts=4, so retry_num=4 should exceed with manager.locked(LockType.UPLOAD, retry_num=4, max_retries=3): pass - # Should raise LockRetry with max_retries_exceeded=True assert isinstance(exc_info.value, LockRetry) assert exc_info.value.max_retries_exceeded is True - assert exc_info.value.retry_num == 4 - assert exc_info.value.max_attempts == 4 + assert exc_info.value.retry_num == 3 + assert exc_info.value.max_attempts == 3 assert exc_info.value.countdown == 0 error_logs = [ r for r in caplog.records - if r.levelname == "ERROR" and "too many retries" in r.message + if r.levelname == "ERROR" and "too many attempts" in r.message ] assert len(error_logs) == 1 def test_locked_warning_logged_on_lock_error(self, mock_redis, caplog): """Test that warning is logged when lock cannot be acquired""" mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123") with caplog.at_level(logging.WARNING): @@ -358,6 +366,7 @@ def test_locked_blocking_timeout_enables_retry_logic(self, mock_redis): """ # When blocking_timeout is set, Redis raises LockError after timeout mock_redis.lock.side_effect = LockError() + mock_redis.incr.return_value = 1 manager = LockManager(repoid=123, commitid="abc123", blocking_timeout=5) diff --git a/apps/worker/tasks/base.py b/apps/worker/tasks/base.py index 753458587..b0a47afca 100644 --- a/apps/worker/tasks/base.py +++ b/apps/worker/tasks/base.py @@ -305,12 +305,11 @@ def attempts(self) -> int: self.__dict__[cache_key] = attempts_value return attempts_value - def _has_exceeded_max_attempts(self, max_retries: int | None) -> bool: - """Check if task has exceeded max attempts.""" - if max_retries is None: + def _has_exceeded_max_attempts(self, max_attempts: int | None) -> bool: + """Check if task has exceeded max attempts. max_attempts is the maximum total runs (e.g. 10 = 10 attempts).""" + if max_attempts is None: return False - max_attempts = max_retries + 1 return self.attempts >= max_attempts def safe_retry(self, countdown=None, exc=None, **kwargs): @@ -320,31 +319,27 @@ def safe_retry(self, countdown=None, exc=None, **kwargs): attribute, so it's known at instantiation and doesn't change. """ if self._has_exceeded_max_attempts(self.max_retries): - # If we're here, self.max_retries is not None (otherwise _has_exceeded_max_attempts returns False) - max_attempts = self.max_retries + 1 log.error( - f"Task {self.name} exceeded max retries", + f"Task {self.name} exceeded max attempts", extra={ "attempts": self.attempts, - "max_attempts": max_attempts, - "max_retries": self.max_retries, + "max_attempts": self.max_retries, "task_name": self.name, }, ) TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc() sentry_sdk.capture_exception( MaxRetriesExceededError( - f"Task {self.name} exceeded max retries: {self.attempts} >= {max_attempts}" + f"Task {self.name} exceeded max attempts: {self.attempts} >= {self.max_retries}" ), contexts={ "task": { "attempts": self.attempts, - "max_attempts": max_attempts, - "max_retries": self.max_retries, + "max_attempts": self.max_retries, "task_name": self.name, } }, - tags={"error_type": "max_retries_exceeded", "task": self.name}, + tags={"error_type": "max_attempts_exceeded", "task": self.name}, ) return False diff --git a/apps/worker/tasks/bundle_analysis_processor.py b/apps/worker/tasks/bundle_analysis_processor.py index 5336201c2..2c7845efc 100644 --- a/apps/worker/tasks/bundle_analysis_processor.py +++ b/apps/worker/tasks/bundle_analysis_processor.py @@ -26,6 +26,42 @@ log = logging.getLogger(__name__) +def _log_max_attempts_exceeded( + commitid: str, + repoid: int, + attempts: int, + max_attempts: int, + retry_num: int | None = None, +) -> None: + extra = { + "attempts": attempts, + "commitid": commitid, + "max_attempts": max_attempts, + "repoid": repoid, + } + if retry_num is not None: + extra["retry_num"] = retry_num + log.error("Bundle analysis processor exceeded max attempts", extra=extra) + + +def _set_upload_error_and_commit( + db_session, + upload, + commitid: str, + repoid: int, + log_suffix: str = "", +) -> None: + upload.state_id = UploadState.ERROR.db_id + upload.state = "error" + try: + db_session.commit() + except Exception: + log.exception( + f"Failed to commit upload error state{log_suffix}", + extra={"commit": commitid, "repoid": repoid, "upload_id": upload.id_}, + ) + + class BundleAnalysisProcessorTask( BaseCodecovTask, name=bundle_analysis_processor_task_name ): @@ -77,25 +113,19 @@ def run_impl( previous_result, ) except LockRetry as retry: - # Check max retries using self.attempts (includes visibility timeout re-deliveries) - # This prevents infinite retry loops when max retries are exceeded - if self._has_exceeded_max_attempts(self.max_retries): - max_attempts = ( - self.max_retries + 1 if self.max_retries is not None else None - ) - log.error( - "Bundle analysis processor exceeded max retries", - extra={ - "attempts": self.attempts, - "commitid": commitid, - "max_attempts": max_attempts, - "max_retries": self.max_retries, - "repoid": repoid, - "retry_num": self.request.retries, - }, + # Honor LockManager cap (Redis attempt count) so re-delivered messages stop. + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + self.max_retries + ): + _log_max_attempts_exceeded( + commitid=commitid, + repoid=repoid, + attempts=( + retry.retry_num if retry.max_retries_exceeded else self.attempts + ), + max_attempts=self.max_retries, + retry_num=self.request.retries, ) - # Return previous_result to preserve chain behavior when max retries exceeded - # This allows the chain to continue with partial results rather than failing entirely return previous_result self.retry(max_retries=self.max_retries, countdown=retry.countdown) @@ -201,24 +231,16 @@ def process_impl_within_lock( result = report_service.process_upload(commit, upload, compare_sha) if result.error and result.error.is_retryable: if self._has_exceeded_max_attempts(self.max_retries): - attempts = self.attempts - max_attempts = self.max_retries + 1 - log.error( - "Bundle analysis processor exceeded max retries", - extra={ - "attempts": attempts, - "commitid": commitid, - "max_attempts": max_attempts, - "max_retries": self.max_retries, - "repoid": repoid, - }, + _log_max_attempts_exceeded( + commitid=commitid, + repoid=repoid, + attempts=self.attempts, + max_attempts=self.max_retries, ) - # Update upload state to "error" before returning try: result.update_upload(carriedforward=carriedforward) db_session.commit() except Exception: - # Log commit failure but don't raise - we've already set the error state log.exception( "Failed to update and commit upload error state after max retries exceeded", extra={ @@ -227,21 +249,10 @@ def process_impl_within_lock( "upload_id": upload.id_, }, ) - # Manually set upload state to error as fallback - upload.state_id = UploadState.ERROR.db_id - upload.state = "error" - try: - db_session.commit() - except Exception: - log.exception( - "Failed to commit upload error state fallback", - extra={ - "commit": commitid, - "repoid": repoid, - "upload_id": upload.id_, - }, - ) - return processing_results + _set_upload_error_and_commit( + db_session, upload, commitid, repoid, log_suffix=" fallback" + ) + return previous_result log.warn( "Attempting to retry bundle analysis upload", extra={ @@ -277,21 +288,8 @@ def process_impl_within_lock( "parent_task": self.request.parent_id, }, ) - upload.state_id = UploadState.ERROR.db_id - upload.state = "error" - try: - db_session.commit() - except Exception: - # Log commit failure but preserve original exception - log.exception( - "Failed to commit upload error state", - extra={ - "commit": commitid, - "repoid": repoid, - "upload_id": upload.id_, - }, - ) - raise + _set_upload_error_and_commit(db_session, upload, commitid, repoid) + return processing_results finally: if result is not None and result.bundle_report: result.bundle_report.cleanup() diff --git a/apps/worker/tasks/preprocess_upload.py b/apps/worker/tasks/preprocess_upload.py index 05795fb5b..087480edd 100644 --- a/apps/worker/tasks/preprocess_upload.py +++ b/apps/worker/tasks/preprocess_upload.py @@ -78,7 +78,9 @@ def run_impl(self, db_session, *, repoid: int, commitid: str, **kwargs): milestone=Milestones.READY_FOR_REPORT, error=Errors.INTERNAL_LOCK_ERROR, ) - if self._has_exceeded_max_attempts(PREPROCESS_UPLOAD_MAX_RETRIES): + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + PREPROCESS_UPLOAD_MAX_RETRIES + ): return { "preprocessed_upload": False, "reason": "unable_to_acquire_lock", diff --git a/apps/worker/tasks/tests/integration/test_bundle_analysis_e2e.py b/apps/worker/tasks/tests/integration/test_bundle_analysis_e2e.py index e9eb3c8c7..0f5ed634a 100644 --- a/apps/worker/tasks/tests/integration/test_bundle_analysis_e2e.py +++ b/apps/worker/tasks/tests/integration/test_bundle_analysis_e2e.py @@ -236,6 +236,7 @@ def test_bundle_analysis_chain_max_retries_exceeded( # Mock Redis lock to fail (simulating lock contention) # This will cause LockRetry to be raised mock_redis.lock.return_value.__enter__.side_effect = LockError("Lock failed") + mock_redis.incr.return_value = 1 # LockManager: avoid MagicMock >= int # Create the chain task_signatures = [ diff --git a/apps/worker/tasks/tests/unit/test_base.py b/apps/worker/tasks/tests/unit/test_base.py index 237ba9b51..27b65c6be 100644 --- a/apps/worker/tasks/tests/unit/test_base.py +++ b/apps/worker/tasks/tests/unit/test_base.py @@ -773,26 +773,35 @@ def test_get_attempts_with_re_delivery_scenario(self): @pytest.mark.django_db(databases={"default", "timeseries"}) class TestBaseCodecovTaskHasExceededMaxAttempts: - def test_has_exceeded_max_attempts_none_max_retries(self): - """Test _has_exceeded_max_attempts when max_retries is None.""" + def test_has_exceeded_max_attempts_none_max_retries(self, mocker): + """Test _has_exceeded_max_attempts when max_attempts is None.""" + mock_request = MagicMock() + mock_request.retries = 100 + mock_request.headers = {"attempts": 100} + mock_property = PropertyMock(return_value=mock_request) + mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - task.request.retries = 100 - task.request.headers = {"attempts": 100} - assert task._has_exceeded_max_attempts(max_retries=None) is False + assert task._has_exceeded_max_attempts(max_attempts=None) is False - def test_has_exceeded_max_attempts_by_attempts(self): + def test_has_exceeded_max_attempts_by_attempts(self, mocker): """Test _has_exceeded_max_attempts when attempts >= max_attempts.""" + mock_request = MagicMock() + mock_request.retries = 5 + mock_request.headers = {"attempts": 11} + mock_property = PropertyMock(return_value=mock_request) + mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - task.request.retries = 5 - task.request.headers = {"attempts": 11} - assert task._has_exceeded_max_attempts(max_retries=10) is True + assert task._has_exceeded_max_attempts(max_attempts=10) is True - def test_has_exceeded_max_attempts_not_exceeded(self): + def test_has_exceeded_max_attempts_not_exceeded(self, mocker): """Test _has_exceeded_max_attempts when attempts < max_attempts.""" + mock_request = MagicMock() + mock_request.retries = 3 + mock_request.headers = {"attempts": 4} + mock_property = PropertyMock(return_value=mock_request) + mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - task.request.retries = 3 - task.request.headers = {"attempts": 4} - assert task._has_exceeded_max_attempts(max_retries=10) is False + assert task._has_exceeded_max_attempts(max_attempts=10) is False def test_has_exceeded_max_attempts_without_request(self, mocker): """Test _has_exceeded_max_attempts when request doesn't exist.""" @@ -805,7 +814,7 @@ def mock_hasattr(obj, name): return original_hasattr(obj, name) mocker.patch("builtins.hasattr", side_effect=mock_hasattr) - assert task._has_exceeded_max_attempts(max_retries=10) is False + assert task._has_exceeded_max_attempts(max_attempts=10) is False class TestBaseCodecovRequest: diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py index 78037f417..b3b11698e 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py @@ -222,19 +222,20 @@ def test_bundle_analysis_processor_task_general_error( task = BundleAnalysisProcessorTask() retry = mocker.patch.object(task, "retry") - with pytest.raises(Exception): - task.run_impl( - dbsession, - [{"previous": "result"}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - params={ - "upload_id": upload.id_, - "commit": commit.commitid, - }, - ) + previous_result = [{"previous": "result"}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={ + "upload_id": upload.id_, + "commit": commit.commitid, + }, + ) + assert result == previous_result assert upload.state == "error" assert not retry.called @@ -328,6 +329,9 @@ def test_bundle_analysis_processor_task_locked( }, ) mock_redis.lock.return_value.__enter__.side_effect = LockError() + mock_redis.incr.return_value = ( + 1 # LockManager: attempts < max_retries so it raises LockRetry(countdown) + ) commit = CommitFactory.create() dbsession.add(commit) @@ -391,6 +395,8 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_raises_error( ) # Mock Redis to simulate lock failure - this will cause LockManager to raise LockRetry mock_redis.lock.return_value.__enter__.side_effect = LockError() + # So LockManager sees attempts >= max_retries (10) and raises LockRetry(max_retries_exceeded=True) + mock_redis.incr.return_value = BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES commit = CommitFactory.create() dbsession.add(commit) @@ -409,7 +415,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_raises_error( dbsession.flush() task = BundleAnalysisProcessorTask() - # Set retries to max_retries to simulate max retries exceeded scenario + # Set retries to max_retries to simulate max attempts exceeded scenario # Our code checks if retries >= max_retries, so retries = max_retries should exceed # This tests the real retry logic without mocking safe_retry or _has_exceeded_max_attempts task.request.retries = BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES @@ -482,6 +488,7 @@ def track_and_raise(*args, **kwargs): raise LockError() mock_redis.lock.side_effect = track_and_raise + mock_redis.incr.return_value = 1 # LockManager: avoid MagicMock >= int task = BundleAnalysisProcessorTask() task.request.retries = 0 @@ -1570,6 +1577,8 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_lock( }, ) mock_redis.lock.return_value.__enter__.side_effect = LockError() + # So LockManager sees attempts >= max_retries and raises LockRetry(max_retries_exceeded=True) + mock_redis.incr.return_value = BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES commit = CommitFactory.create() dbsession.add(commit) @@ -1633,6 +1642,9 @@ def test_bundle_analysis_processor_task_safe_retry_fails( }, ) mock_redis.lock.return_value.__enter__.side_effect = LockError() + mock_redis.incr.return_value = ( + 1 # LockManager: attempts < max so it raises LockRetry(countdown) + ) commit = CommitFactory.create() dbsession.add(commit) @@ -1705,7 +1717,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_processing( task = BundleAnalysisProcessorTask() task.request.retries = task.max_retries - task.request.headers = {"attempts": task.max_retries + 1} + task.request.headers = {"attempts": task.max_retries} # >= max_attempts → exceeded mocker.patch.object(task, "_has_exceeded_max_attempts", return_value=True) # Create a ProcessingResult with a retryable error @@ -1760,7 +1772,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( Scenario: - self.request.retries = 5 (below max_retries of 10) - - self.attempts = 11 (exceeds max_retries due to visibility timeout re-deliveries) + - self.attempts = 11 (exceeds max_attempts 10 due to visibility timeout re-deliveries) - Task should stop retrying and return previous_result """ storage_path = ( @@ -1777,6 +1789,9 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( ) # Mock Redis to simulate lock failure - this will cause LockManager to raise LockRetry mock_redis.lock.return_value.__enter__.side_effect = LockError() + # LockManager gets attempt count from Redis; use 1 so it raises LockRetry(max_retries_exceeded=False). + # Task then uses self.attempts (from headers) and returns because 11 >= 10. + mock_redis.incr.return_value = 1 commit = CommitFactory.create() dbsession.add(commit) @@ -1800,7 +1815,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( # - attempts header is high (11) due to visibility timeout re-deliveries # This simulates the bug where tasks kept retrying after max attempts task.request.retries = 5 # Below max_retries (10) - task.request.headers = {"attempts": 11} # Exceeds max_retries + 1 (11 > 10 + 1) + task.request.headers = {"attempts": 11} # Exceeds max_attempts 10 (11 >= 10) previous_result = [{"previous": "result"}] # Task should return previous_result when max attempts exceeded (via attempts header) @@ -1825,7 +1840,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_commit_failure( dbsession, mock_storage, ): - """Test that when max retries are exceeded and commit fails, fallback error handling works.""" + """Test that when max attempts are exceeded and commit fails, fallback error handling works.""" storage_path = ( "v1/repos/testing/ed1bdd67-8fd2-4cdb-ac9e-39b99e4a3892/bundle_report.sqlite" ) @@ -1853,7 +1868,8 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_commit_failure( task = BundleAnalysisProcessorTask() task.request.retries = task.max_retries - task.request.headers = {"attempts": task.max_retries + 1} + # max_retries is max attempts; exceed when attempts >= 10 + task.request.headers = {"attempts": task.max_retries} mocker.patch.object(task, "_has_exceeded_max_attempts", return_value=True) # Create a ProcessingResult with a retryable error @@ -1905,7 +1921,7 @@ def test_bundle_analysis_processor_task_general_error_commit_failure( dbsession, mock_storage, ): - """Test that when general exception occurs and commit fails, error is logged but exception is preserved.""" + """Test that when general exception occurs and commit fails, task returns (does not re-raise) and sets upload to error.""" storage_path = ( "v1/repos/testing/ed1bdd67-8fd2-4cdb-ac9e-39b99e4a3892/bundle_report.sqlite" ) @@ -1943,26 +1959,24 @@ def test_bundle_analysis_processor_task_general_error_commit_failure( task = BundleAnalysisProcessorTask() retry = mocker.patch.object(task, "retry") - # Mock commit to fail - commit_mock = mocker.patch.object(dbsession, "commit") - commit_mock.side_effect = Exception("Commit failed") + # Mock commit to fail so we exercise the "set upload error and commit" path + mocker.patch.object(dbsession, "commit", side_effect=Exception("Commit failed")) - with pytest.raises(ValueError, match="Processing failed"): - task.run_impl( - dbsession, - [{"previous": "result"}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - params={ - "upload_id": upload.id_, - "commit": commit.commitid, - }, - ) + previous_result = [{"previous": "result"}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={ + "upload_id": upload.id_, + "commit": commit.commitid, + }, + ) - # Upload state should be set to error in memory (even though commit failed) - # Note: We check in-memory state, not database state, since commit failed - # The code does attempt to set error state, but database won't reflect it if commit fails + # Task returns processing_results (not re-raises) to ack and avoid unbounded retries + assert result == previous_result assert upload.state == "error" assert not retry.called @@ -1972,7 +1986,7 @@ def test_bundle_analysis_processor_task_cleanup_with_none_result( dbsession, mock_storage, ): - """Test that cleanup handles None result gracefully.""" + """Test that cleanup handles None result gracefully when process_upload raises.""" storage_path = ( "v1/repos/testing/ed1bdd67-8fd2-4cdb-ac9e-39b99e4a3892/bundle_report.sqlite" ) @@ -2006,18 +2020,19 @@ def test_bundle_analysis_processor_task_cleanup_with_none_result( task = BundleAnalysisProcessorTask() - with pytest.raises(ValueError, match="Processing failed"): - task.run_impl( - dbsession, - [{"previous": "result"}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - params={ - "upload_id": upload.id_, - "commit": commit.commitid, - }, - ) + # Task returns (does not re-raise) to ack and avoid unbounded retries; cleanup runs in finally + previous_result = [{"previous": "result"}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={ + "upload_id": upload.id_, + "commit": commit.commitid, + }, + ) - # Should not crash even though result is None - # The finally block should handle None result gracefully + assert result == previous_result + assert upload.state == "error" diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 4cf742dbf..0bcb94427 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -466,11 +466,15 @@ def _process_reports_with_lock( upload_ids=upload_ids, error=Errors.INTERNAL_LOCK_ERROR, ) - if self._has_exceeded_max_attempts(UPLOAD_PROCESSOR_MAX_RETRIES): + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + UPLOAD_PROCESSOR_MAX_RETRIES + ): log.error( "Upload finisher exceeded max retries", extra={ - "attempts": self.attempts, + "attempts": retry.retry_num + if retry.max_retries_exceeded + else self.attempts, "commitid": commitid, "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES + 1, "max_retries": UPLOAD_PROCESSOR_MAX_RETRIES, @@ -584,11 +588,15 @@ def _handle_finisher_lock( error=Errors.INTERNAL_LOCK_ERROR, ) UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) - if self._has_exceeded_max_attempts(UPLOAD_PROCESSOR_MAX_RETRIES): + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + UPLOAD_PROCESSOR_MAX_RETRIES + ): log.error( "Upload finisher exceeded max retries (finisher lock)", extra={ - "attempts": self.attempts, + "attempts": retry.retry_num + if retry.max_retries_exceeded + else self.attempts, "commitid": commitid, "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES + 1, "max_retries": UPLOAD_PROCESSOR_MAX_RETRIES, diff --git a/libs/shared/shared/celery_config.py b/libs/shared/shared/celery_config.py index 021e69475..e42130ede 100644 --- a/libs/shared/shared/celery_config.py +++ b/libs/shared/shared/celery_config.py @@ -248,8 +248,8 @@ def get_task_group(task_name: str) -> str | None: ) ) -# Maximum retries for tasks hitting transient conditions (e.g., processing locks) -# Default: 10 retries +# Maximum total attempts for tasks hitting transient conditions (e.g., processing locks). +# Value is total attempts, not retries: 10 means 10 attempts total. # Celery docs: https://docs.celeryq.dev/en/stable/userguide/tasks.html#max-retries TASK_MAX_RETRIES_DEFAULT = int( get_config("setup", "tasks", "celery", "max_retries", default=10) @@ -306,8 +306,7 @@ def get_task_group(task_name: str) -> str | None: get_config("setup", "tasks", "upload", "processor_max_retries", default=5) ) -# Bundle analysis processor max retries -# How many times to retry when bundle analysis processor lock cannot be acquired +# Bundle analysis processor max attempts (total runs; 10 = 10 attempts) # Default: matches TASK_MAX_RETRIES_DEFAULT BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES = int( get_config( From 1bd029a4228863d8a0b2f8c260201ea68ad096c4 Mon Sep 17 00:00:00 2001 From: Joe Becher Date: Fri, 6 Feb 2026 13:13:03 -0500 Subject: [PATCH 2/5] ref(worker): Extract lock clear helper, fix finisher log, clean comments - LockManager: extract _clear_lock_attempt_counter to remove nested try in locked() - Upload finisher: log max_attempts as UPLOAD_PROCESSOR_MAX_RETRIES (not +1) - Lock_manager: comment TTL intent instead of restating 24h - Tests: remove hard-coded (10) from comments; use max_attempts wording Co-authored-by: Cursor --- apps/worker/services/lock_manager.py | 34 +++++++++++-------- .../test_bundle_analysis_processor_task.py | 4 +-- apps/worker/tasks/upload_finisher.py | 4 +-- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/apps/worker/services/lock_manager.py b/apps/worker/services/lock_manager.py index 3a102c188..788853a05 100644 --- a/apps/worker/services/lock_manager.py +++ b/apps/worker/services/lock_manager.py @@ -25,7 +25,9 @@ RETRY_COUNTDOWN_RANGE_DIVISOR = 2 LOCK_NAME_SEPARATOR = "_lock_" LOCK_ATTEMPTS_KEY_PREFIX = "lock_attempts:" -LOCK_ATTEMPTS_TTL_SECONDS = 86400 # 24h +LOCK_ATTEMPTS_TTL_SECONDS = ( + 86400 # TTL for attempt counter key so it expires after one day +) # Exponential backoff calculation: BASE * MULTIPLIER^retry_num # With BASE=200 and MULTIPLIER=3, this yields: @@ -100,6 +102,22 @@ def lock_name(self, lock_type: LockType): else: return f"{lock_type.value}{LOCK_NAME_SEPARATOR}{self.repoid}_{self.commitid}_{self.report_type.value}" + def _clear_lock_attempt_counter(self, attempt_key: str, lock_name: str) -> None: + """Clear the lock attempt counter. Log and swallow Redis errors so teardown does not mask other failures.""" + try: + self.redis_connection.delete(attempt_key) + except (RedisConnectionError, RedisTimeoutError, OSError) as e: + log.warning( + "Failed to clear lock attempt counter (Redis unavailable or error)", + extra={ + "attempt_key": attempt_key, + "commitid": self.commitid, + "lock_name": lock_name, + "repoid": self.repoid, + }, + exc_info=True, + ) + @contextmanager def locked( self, @@ -145,19 +163,7 @@ def locked( try: yield finally: - try: - self.redis_connection.delete(attempt_key) - except (RedisConnectionError, RedisTimeoutError, OSError) as e: - log.warning( - "Failed to clear lock attempt counter (Redis unavailable or error)", - extra={ - "attempt_key": attempt_key, - "commitid": self.commitid, - "lock_name": lock_name, - "repoid": self.repoid, - }, - exc_info=True, - ) + self._clear_lock_attempt_counter(attempt_key, lock_name) lock_duration = time.time() - lock_acquired_time log.info( "Releasing lock", diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py index b3b11698e..87d86f226 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py @@ -395,7 +395,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_raises_error( ) # Mock Redis to simulate lock failure - this will cause LockManager to raise LockRetry mock_redis.lock.return_value.__enter__.side_effect = LockError() - # So LockManager sees attempts >= max_retries (10) and raises LockRetry(max_retries_exceeded=True) + # So LockManager sees attempts >= max_retries and raises LockRetry(max_retries_exceeded=True) mock_redis.incr.return_value = BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES commit = CommitFactory.create() @@ -1790,7 +1790,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( # Mock Redis to simulate lock failure - this will cause LockManager to raise LockRetry mock_redis.lock.return_value.__enter__.side_effect = LockError() # LockManager gets attempt count from Redis; use 1 so it raises LockRetry(max_retries_exceeded=False). - # Task then uses self.attempts (from headers) and returns because 11 >= 10. + # Task then uses self.attempts (from headers) and returns because attempts >= max_attempts. mock_redis.incr.return_value = 1 commit = CommitFactory.create() diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 0bcb94427..f50332fb3 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -476,7 +476,7 @@ def _process_reports_with_lock( if retry.max_retries_exceeded else self.attempts, "commitid": commitid, - "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES + 1, + "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES, "max_retries": UPLOAD_PROCESSOR_MAX_RETRIES, "repoid": repoid, }, @@ -598,7 +598,7 @@ def _handle_finisher_lock( if retry.max_retries_exceeded else self.attempts, "commitid": commitid, - "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES + 1, + "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES, "max_retries": UPLOAD_PROCESSOR_MAX_RETRIES, "repoid": repoid, }, From 60ed18f0895ac66156f904314278b7c340b95796 Mon Sep 17 00:00:00 2001 From: Joe Becher Date: Fri, 6 Feb 2026 13:25:49 -0500 Subject: [PATCH 3/5] ref(worker): use max_retries only, remove max_attempts alias - BaseCodecovTask: doc and safe_retry use max_retries; drop max_attempts property - LockRetry: max_attempts -> max_retries (same semantics: max total attempts) - LockManager/bundle_analysis_processor/upload_finisher: log and Sentry use max_retries - Tests: LockRetry(max_retries=...), comments say max_retries - celery_config: one-line convention (max_retries = max total attempts) - Fix duplicate dict keys in lock_manager and upload_finisher Refs CCMRG-2042 Co-authored-by: Cursor --- apps/worker/services/lock_manager.py | 13 ++++---- .../services/tests/test_lock_manager.py | 4 +-- apps/worker/tasks/base.py | 30 ++++++++++--------- apps/worker/tasks/bundle_analysis_notify.py | 2 +- .../worker/tasks/bundle_analysis_processor.py | 16 +++++----- apps/worker/tasks/test_analytics_notifier.py | 4 +-- apps/worker/tasks/test_results_finisher.py | 2 +- apps/worker/tasks/tests/unit/test_base.py | 18 +++++------ .../test_bundle_analysis_processor_task.py | 10 +++---- .../unit/test_test_analytics_notifier.py | 2 +- apps/worker/tasks/upload_finisher.py | 2 -- libs/shared/shared/celery_config.py | 5 ++-- 12 files changed, 53 insertions(+), 55 deletions(-) diff --git a/apps/worker/services/lock_manager.py b/apps/worker/services/lock_manager.py index 788853a05..51367387a 100644 --- a/apps/worker/services/lock_manager.py +++ b/apps/worker/services/lock_manager.py @@ -55,7 +55,7 @@ def __init__( countdown: int, max_retries_exceeded: bool = False, retry_num: int | None = None, - max_attempts: int | None = None, + max_retries: int | None = None, lock_name: str | None = None, repoid: int | None = None, commitid: str | None = None, @@ -63,13 +63,13 @@ def __init__( self.countdown = countdown self.max_retries_exceeded = max_retries_exceeded self.retry_num = retry_num - self.max_attempts = max_attempts + self.max_retries = max_retries self.lock_name = lock_name self.repoid = repoid self.commitid = commitid if max_retries_exceeded: error_msg = ( - f"Lock acquisition failed after {retry_num} retries (max: {max_attempts}). " + f"Lock acquisition failed after {retry_num} retries (max: {max_retries}). " f"Repo: {repoid}, Commit: {commitid}" ) super().__init__(error_msg) @@ -131,7 +131,7 @@ def locked( lock_type: Type of lock to acquire retry_num: Attempt count (should be self.attempts from BaseCodecovTask). Used for both exponential backoff and max retry checking. - max_retries: Maximum number of retries allowed + max_retries: Maximum total attempts (stop when attempts >= this). """ lock_name = self.lock_name(lock_type) attempt_key = f"{LOCK_ATTEMPTS_KEY_PREFIX}{lock_name}" @@ -199,7 +199,7 @@ def locked( countdown=0, max_retries_exceeded=True, retry_num=attempts, - max_attempts=max_retries, + max_retries=max_retries, lock_name=lock_name, repoid=self.repoid, commitid=self.commitid, @@ -211,7 +211,6 @@ def locked( "commitid": self.commitid, "lock_name": lock_name, "lock_type": lock_type.value, - "max_attempts": max_retries, "max_retries": max_retries, "repoid": self.repoid, "report_type": self.report_type.value, @@ -228,7 +227,7 @@ def locked( "lock_name": lock_name, "lock_timeout": self.lock_timeout, "lock_type": lock_type.value, - "max_attempts": max_retries, + "max_retries": max_retries, "repoid": self.repoid, "report_type": self.report_type.value, } diff --git a/apps/worker/services/tests/test_lock_manager.py b/apps/worker/services/tests/test_lock_manager.py index 5079591c3..673e30ae6 100644 --- a/apps/worker/services/tests/test_lock_manager.py +++ b/apps/worker/services/tests/test_lock_manager.py @@ -256,7 +256,7 @@ def test_locked_max_retries_exceeded(self, mock_redis, caplog): assert isinstance(exc_info.value, LockRetry) assert exc_info.value.max_retries_exceeded is True assert exc_info.value.retry_num == 3 # from Redis incr - assert exc_info.value.max_attempts == 3 # max_retries = max attempts + assert exc_info.value.max_retries == 3 assert exc_info.value.lock_name == "upload_lock_123_abc123" assert exc_info.value.repoid == 123 assert exc_info.value.commitid == "abc123" @@ -285,7 +285,7 @@ def test_locked_max_retries_exceeded_at_boundary(self, mock_redis, caplog): assert isinstance(exc_info.value, LockRetry) assert exc_info.value.max_retries_exceeded is True assert exc_info.value.retry_num == 3 - assert exc_info.value.max_attempts == 3 + assert exc_info.value.max_retries == 3 assert exc_info.value.countdown == 0 error_logs = [ diff --git a/apps/worker/tasks/base.py b/apps/worker/tasks/base.py index b0a47afca..9d5b7294b 100644 --- a/apps/worker/tasks/base.py +++ b/apps/worker/tasks/base.py @@ -182,6 +182,12 @@ def _get_request_headers(request) -> dict: class BaseCodecovTask(celery_app.Task): + """Base task for Codecov workers. + + In this codebase, max_retries is the maximum total attempts (cap on runs). + We stop when attempts >= max_retries. The name matches Celery/config. + """ + Request = BaseCodecovRequest def __init_subclass__(cls, name=None): @@ -305,41 +311,37 @@ def attempts(self) -> int: self.__dict__[cache_key] = attempts_value return attempts_value - def _has_exceeded_max_attempts(self, max_attempts: int | None) -> bool: - """Check if task has exceeded max attempts. max_attempts is the maximum total runs (e.g. 10 = 10 attempts).""" - if max_attempts is None: + def _has_exceeded_max_attempts(self, max_retries: int | None) -> bool: + """Return True if attempts >= max_retries (max_retries is max total attempts).""" + if max_retries is None: return False - return self.attempts >= max_attempts + return self.attempts >= max_retries def safe_retry(self, countdown=None, exc=None, **kwargs): - """Safely retry with max retry limit. Returns False if max exceeded, otherwise raises Retry. - - Uses self.max_retries to determine the retry limit. Tasks define max_retries as a class - attribute, so it's known at instantiation and doesn't change. - """ + """Safely retry with max retry limit. Returns False if max exceeded, otherwise raises Retry.""" if self._has_exceeded_max_attempts(self.max_retries): log.error( - f"Task {self.name} exceeded max attempts", + f"Task {self.name} exceeded max retries", extra={ "attempts": self.attempts, - "max_attempts": self.max_retries, + "max_retries": self.max_retries, "task_name": self.name, }, ) TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc() sentry_sdk.capture_exception( MaxRetriesExceededError( - f"Task {self.name} exceeded max attempts: {self.attempts} >= {self.max_retries}" + f"Task {self.name} exceeded max retries: {self.attempts} >= {self.max_retries}" ), contexts={ "task": { "attempts": self.attempts, - "max_attempts": self.max_retries, + "max_retries": self.max_retries, "task_name": self.name, } }, - tags={"error_type": "max_attempts_exceeded", "task": self.name}, + tags={"error_type": "max_retries_exceeded", "task": self.name}, ) return False diff --git a/apps/worker/tasks/bundle_analysis_notify.py b/apps/worker/tasks/bundle_analysis_notify.py index 73023bf5e..09e3796d5 100644 --- a/apps/worker/tasks/bundle_analysis_notify.py +++ b/apps/worker/tasks/bundle_analysis_notify.py @@ -75,7 +75,7 @@ def run_impl( "commitid": commitid, "repoid": repoid, "retry_num": retry.retry_num, - "max_attempts": retry.max_attempts, + "max_retries": retry.max_retries, }, ) return { diff --git a/apps/worker/tasks/bundle_analysis_processor.py b/apps/worker/tasks/bundle_analysis_processor.py index 2c7845efc..d57acd962 100644 --- a/apps/worker/tasks/bundle_analysis_processor.py +++ b/apps/worker/tasks/bundle_analysis_processor.py @@ -26,22 +26,22 @@ log = logging.getLogger(__name__) -def _log_max_attempts_exceeded( +def _log_max_retries_exceeded( commitid: str, repoid: int, attempts: int, - max_attempts: int, + max_retries: int, retry_num: int | None = None, ) -> None: extra = { "attempts": attempts, "commitid": commitid, - "max_attempts": max_attempts, + "max_retries": max_retries, "repoid": repoid, } if retry_num is not None: extra["retry_num"] = retry_num - log.error("Bundle analysis processor exceeded max attempts", extra=extra) + log.error("Bundle analysis processor exceeded max retries", extra=extra) def _set_upload_error_and_commit( @@ -117,13 +117,13 @@ def run_impl( if retry.max_retries_exceeded or self._has_exceeded_max_attempts( self.max_retries ): - _log_max_attempts_exceeded( + _log_max_retries_exceeded( commitid=commitid, repoid=repoid, attempts=( retry.retry_num if retry.max_retries_exceeded else self.attempts ), - max_attempts=self.max_retries, + max_retries=self.max_retries, retry_num=self.request.retries, ) return previous_result @@ -231,11 +231,11 @@ def process_impl_within_lock( result = report_service.process_upload(commit, upload, compare_sha) if result.error and result.error.is_retryable: if self._has_exceeded_max_attempts(self.max_retries): - _log_max_attempts_exceeded( + _log_max_retries_exceeded( commitid=commitid, repoid=repoid, attempts=self.attempts, - max_attempts=self.max_retries, + max_retries=self.max_retries, ) try: result.update_upload(carriedforward=carriedforward) diff --git a/apps/worker/tasks/test_analytics_notifier.py b/apps/worker/tasks/test_analytics_notifier.py index b58b51f74..7748e7a25 100644 --- a/apps/worker/tasks/test_analytics_notifier.py +++ b/apps/worker/tasks/test_analytics_notifier.py @@ -236,11 +236,11 @@ def _notification_lock(self, lock_manager: LockManager): "Not retrying lock acquisition - max retries exceeded", extra={ "retry_num": e.retry_num, - "max_attempts": e.max_attempts, + "max_retries": e.max_retries, }, ) raise MaxRetriesExceededError( - f"Lock acquisition exceeded max retries: {e.retry_num} >= {e.max_attempts}", + f"Lock acquisition exceeded max retries: {e.retry_num} >= {e.max_retries}", ) # Re-raise LockRetry to be handled by the caller's retry logic # The caller will catch this and call self.retry() diff --git a/apps/worker/tasks/test_results_finisher.py b/apps/worker/tasks/test_results_finisher.py index 40cb808ea..47c8a5012 100644 --- a/apps/worker/tasks/test_results_finisher.py +++ b/apps/worker/tasks/test_results_finisher.py @@ -78,7 +78,7 @@ def run_impl( "commitid": commitid, "repoid": repoid, "retry_num": retry.retry_num, - "max_attempts": retry.max_attempts, + "max_retries": retry.max_retries, }, ) return {"queue_notify": False} diff --git a/apps/worker/tasks/tests/unit/test_base.py b/apps/worker/tasks/tests/unit/test_base.py index 27b65c6be..35e9e9109 100644 --- a/apps/worker/tasks/tests/unit/test_base.py +++ b/apps/worker/tasks/tests/unit/test_base.py @@ -654,12 +654,12 @@ def test_safe_retry_handles_max_retries_exceeded_exception(self, mocker): assert actual_increment == expected_increment def test_safe_retry_fails_when_attempts_exceeds_max(self, mocker): - """Test that safe_retry fails when attempts exceeds max_attempts even if retries don't.""" + """Test that safe_retry fails when attempts exceeds max_retries even if retries don't.""" task = SampleTask() task.request.retries = 3 # Below max_retries task.max_retries = 10 # Simulate re-delivery: attempts is ahead of retry_count - task.request.headers = {"attempts": 12} # Exceeds max_attempts (10 + 1 = 11) + task.request.headers = {"attempts": 12} # Exceeds max_retries (10) mock_retry = mocker.patch("celery.app.task.Task.retry") @@ -774,34 +774,34 @@ def test_get_attempts_with_re_delivery_scenario(self): @pytest.mark.django_db(databases={"default", "timeseries"}) class TestBaseCodecovTaskHasExceededMaxAttempts: def test_has_exceeded_max_attempts_none_max_retries(self, mocker): - """Test _has_exceeded_max_attempts when max_attempts is None.""" + """Test _has_exceeded_max_attempts when max_retries is None.""" mock_request = MagicMock() mock_request.retries = 100 mock_request.headers = {"attempts": 100} mock_property = PropertyMock(return_value=mock_request) mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - assert task._has_exceeded_max_attempts(max_attempts=None) is False + assert task._has_exceeded_max_attempts(max_retries=None) is False def test_has_exceeded_max_attempts_by_attempts(self, mocker): - """Test _has_exceeded_max_attempts when attempts >= max_attempts.""" + """Test _has_exceeded_max_attempts when attempts >= max_retries.""" mock_request = MagicMock() mock_request.retries = 5 mock_request.headers = {"attempts": 11} mock_property = PropertyMock(return_value=mock_request) mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - assert task._has_exceeded_max_attempts(max_attempts=10) is True + assert task._has_exceeded_max_attempts(max_retries=10) is True def test_has_exceeded_max_attempts_not_exceeded(self, mocker): - """Test _has_exceeded_max_attempts when attempts < max_attempts.""" + """Test _has_exceeded_max_attempts when attempts < max_retries.""" mock_request = MagicMock() mock_request.retries = 3 mock_request.headers = {"attempts": 4} mock_property = PropertyMock(return_value=mock_request) mocker.patch.object(SampleTask, "request", mock_property, create=True) task = SampleTask() - assert task._has_exceeded_max_attempts(max_attempts=10) is False + assert task._has_exceeded_max_attempts(max_retries=10) is False def test_has_exceeded_max_attempts_without_request(self, mocker): """Test _has_exceeded_max_attempts when request doesn't exist.""" @@ -814,7 +814,7 @@ def mock_hasattr(obj, name): return original_hasattr(obj, name) mocker.patch("builtins.hasattr", side_effect=mock_hasattr) - assert task._has_exceeded_max_attempts(max_attempts=10) is False + assert task._has_exceeded_max_attempts(max_retries=10) is False class TestBaseCodecovRequest: diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py index 87d86f226..26a8b8efa 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py @@ -1717,7 +1717,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_processing( task = BundleAnalysisProcessorTask() task.request.retries = task.max_retries - task.request.headers = {"attempts": task.max_retries} # >= max_attempts → exceeded + task.request.headers = {"attempts": task.max_retries} # >= max_retries → exceeded mocker.patch.object(task, "_has_exceeded_max_attempts", return_value=True) # Create a ProcessingResult with a retryable error @@ -1772,7 +1772,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( Scenario: - self.request.retries = 5 (below max_retries of 10) - - self.attempts = 11 (exceeds max_attempts 10 due to visibility timeout re-deliveries) + - self.attempts = 11 (exceeds max_retries 10 due to visibility timeout re-deliveries) - Task should stop retrying and return previous_result """ storage_path = ( @@ -1790,7 +1790,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( # Mock Redis to simulate lock failure - this will cause LockManager to raise LockRetry mock_redis.lock.return_value.__enter__.side_effect = LockError() # LockManager gets attempt count from Redis; use 1 so it raises LockRetry(max_retries_exceeded=False). - # Task then uses self.attempts (from headers) and returns because attempts >= max_attempts. + # Task then uses self.attempts (from headers) and returns because attempts >= max_retries. mock_redis.incr.return_value = 1 commit = CommitFactory.create() @@ -1815,7 +1815,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_visibility_timeout( # - attempts header is high (11) due to visibility timeout re-deliveries # This simulates the bug where tasks kept retrying after max attempts task.request.retries = 5 # Below max_retries (10) - task.request.headers = {"attempts": 11} # Exceeds max_attempts 10 (11 >= 10) + task.request.headers = {"attempts": 11} # Exceeds max_retries 10 (11 >= 10) previous_result = [{"previous": "result"}] # Task should return previous_result when max attempts exceeded (via attempts header) @@ -1868,7 +1868,7 @@ def test_bundle_analysis_processor_task_max_retries_exceeded_commit_failure( task = BundleAnalysisProcessorTask() task.request.retries = task.max_retries - # max_retries is max attempts; exceed when attempts >= 10 + # Exceed when attempts >= max_retries task.request.headers = {"attempts": task.max_retries} mocker.patch.object(task, "_has_exceeded_max_attempts", return_value=True) diff --git a/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py b/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py index 201cfbb16..d8a9b5ba5 100644 --- a/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py +++ b/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py @@ -138,7 +138,7 @@ def test_max_retries_exceeded_from_notification_lock( countdown=0, max_retries_exceeded=True, retry_num=5, - max_attempts=5, + max_retries=5, lock_name="notification_lock", repoid=commit.repoid, commitid=commit.commitid, diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index f50332fb3..41e54cdc9 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -476,7 +476,6 @@ def _process_reports_with_lock( if retry.max_retries_exceeded else self.attempts, "commitid": commitid, - "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES, "max_retries": UPLOAD_PROCESSOR_MAX_RETRIES, "repoid": repoid, }, @@ -598,7 +597,6 @@ def _handle_finisher_lock( if retry.max_retries_exceeded else self.attempts, "commitid": commitid, - "max_attempts": UPLOAD_PROCESSOR_MAX_RETRIES, "max_retries": UPLOAD_PROCESSOR_MAX_RETRIES, "repoid": repoid, }, diff --git a/libs/shared/shared/celery_config.py b/libs/shared/shared/celery_config.py index e42130ede..92b3fcbd5 100644 --- a/libs/shared/shared/celery_config.py +++ b/libs/shared/shared/celery_config.py @@ -248,8 +248,7 @@ def get_task_group(task_name: str) -> str | None: ) ) -# Maximum total attempts for tasks hitting transient conditions (e.g., processing locks). -# Value is total attempts, not retries: 10 means 10 attempts total. +# max_retries = maximum total attempts (stop when attempts >= this). Config key name matches Celery. # Celery docs: https://docs.celeryq.dev/en/stable/userguide/tasks.html#max-retries TASK_MAX_RETRIES_DEFAULT = int( get_config("setup", "tasks", "celery", "max_retries", default=10) @@ -306,7 +305,7 @@ def get_task_group(task_name: str) -> str | None: get_config("setup", "tasks", "upload", "processor_max_retries", default=5) ) -# Bundle analysis processor max attempts (total runs; 10 = 10 attempts) +# Bundle analysis processor max_retries (max total attempts) # Default: matches TASK_MAX_RETRIES_DEFAULT BUNDLE_ANALYSIS_PROCESSOR_MAX_RETRIES = int( get_config( From 279dbe8d7a161aef75751ce2c1aca8d3b72fa196 Mon Sep 17 00:00:00 2001 From: Joe Becher Date: Fri, 6 Feb 2026 13:48:25 -0500 Subject: [PATCH 4/5] test(worker): set mock_redis.incr.return_value in BA retry countdown test LockManager uses redis incr for attempt count; mock must return an int so attempts >= max_retries does not raise TypeError. Refs CCMRG-2042 Co-authored-by: Cursor --- .../tasks/tests/unit/test_bundle_analysis_processor_task.py | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py index dbaa34367..80f78a1a6 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py @@ -568,6 +568,7 @@ def capture_init(self, *args, **kwargs): mocker.patch.object(LockManager, "__init__", capture_init) mock_redis.lock.return_value.__enter__.side_effect = LockError() + mock_redis.incr.return_value = 1 task = BundleAnalysisProcessorTask() task.request.retries = 0 From 5a7c73fe082a12b59e147479ee9c37bd352317b9 Mon Sep 17 00:00:00 2001 From: Joe Becher Date: Fri, 6 Feb 2026 13:51:52 -0500 Subject: [PATCH 5/5] fix(worker): honor LockRetry.max_retries_exceeded in ManualTriggerTask When LockManager's Redis attempt counter hits max_retries before the task's attempt count (e.g. re-deliveries), it raises LockRetry(max_retries_exceeded=True, countdown=0). ManualTriggerTask only checked self._has_exceeded_max_attempts(), so it fell through to self.retry(countdown=0) and caused rapid zero-delay retries. Align with preprocess_upload and other callers: check retry.max_retries_exceeded or self._has_exceeded_max_attempts() and return failure dict when either is true. Add test for the Redis-counter path. Co-authored-by: Cursor --- apps/worker/tasks/manual_trigger.py | 4 ++- .../tasks/tests/unit/test_manual_trigger.py | 33 ++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/apps/worker/tasks/manual_trigger.py b/apps/worker/tasks/manual_trigger.py index 0b3c95d2c..e8b783f7f 100644 --- a/apps/worker/tasks/manual_trigger.py +++ b/apps/worker/tasks/manual_trigger.py @@ -58,7 +58,9 @@ def run_impl( **kwargs, ) except LockRetry as retry: - if self._has_exceeded_max_attempts(TASK_MAX_RETRIES_DEFAULT): + if retry.max_retries_exceeded or self._has_exceeded_max_attempts( + TASK_MAX_RETRIES_DEFAULT + ): return { "notifications_called": False, "message": "Unable to acquire lock", diff --git a/apps/worker/tasks/tests/unit/test_manual_trigger.py b/apps/worker/tasks/tests/unit/test_manual_trigger.py index 32ab141ab..61019fe50 100644 --- a/apps/worker/tasks/tests/unit/test_manual_trigger.py +++ b/apps/worker/tasks/tests/unit/test_manual_trigger.py @@ -164,7 +164,7 @@ def test_manual_upload_completion_trigger_with_uploaded_state( def test_lock_retry_max_retries_exceeded( self, mocker, mock_configuration, dbsession, mock_storage, mock_redis ): - """Test that LockRetry returns failure dict when max retries exceeded""" + """Test that LockRetry returns failure dict when max retries exceeded (task attempts).""" commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -187,6 +187,37 @@ def test_lock_retry_max_retries_exceeded( "message": "Unable to acquire lock", } + def test_lock_retry_max_retries_exceeded_from_lock_manager( + self, mocker, mock_configuration, dbsession, mock_storage, mock_redis + ): + """Test that LockRetry with max_retries_exceeded=True returns failure dict (Redis counter path).""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + m = mocker.MagicMock() + m.return_value.locked.return_value.__enter__.side_effect = LockRetry( + countdown=0, + max_retries_exceeded=True, + retry_num=TASK_MAX_RETRIES_DEFAULT, + max_retries=TASK_MAX_RETRIES_DEFAULT, + ) + mocker.patch("tasks.manual_trigger.LockManager", m) + + task = ManualTriggerTask() + task.request.retries = 0 + task.request.headers = {"attempts": 1} + ensure_hard_time_limit_task_is_numeric(mocker, task) + + result = task.run_impl( + dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml={} + ) + + assert result == { + "notifications_called": False, + "message": "Unable to acquire lock", + } + def test_lock_retry_will_retry( self, mocker, mock_configuration, dbsession, mock_storage, mock_redis ):