Skip to content
58 changes: 44 additions & 14 deletions apps/worker/services/lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import sentry_sdk
from redis import Redis # type: ignore
from redis.exceptions import ConnectionError as RedisConnectionError # type: ignore
from redis.exceptions import LockError # type: ignore
from redis.exceptions import TimeoutError as RedisTimeoutError # type: ignore

from database.enums import ReportType
from shared.celery_config import (
Expand All @@ -23,6 +25,10 @@
RETRY_BACKOFF_MULTIPLIER = 3
RETRY_COUNTDOWN_RANGE_DIVISOR = 2
LOCK_NAME_SEPARATOR = "_lock_"
LOCK_ATTEMPTS_KEY_PREFIX = "lock_attempts:"
LOCK_ATTEMPTS_TTL_SECONDS = (
86400 # TTL for attempt counter key so it expires after one day
)


class LockType(Enum):
Expand All @@ -44,21 +50,21 @@ def __init__(
countdown: int,
max_retries_exceeded: bool = False,
retry_num: int | None = None,
max_attempts: int | None = None,
max_retries: int | None = None,
lock_name: str | None = None,
repoid: int | None = None,
commitid: str | None = None,
):
self.countdown = countdown
self.max_retries_exceeded = max_retries_exceeded
self.retry_num = retry_num
self.max_attempts = max_attempts
self.max_retries = max_retries
self.lock_name = lock_name
self.repoid = repoid
self.commitid = commitid
if max_retries_exceeded:
error_msg = (
f"Lock acquisition failed after {retry_num} retries (max: {max_attempts}). "
f"Lock acquisition failed after {retry_num} retries (max: {max_retries}). "
f"Repo: {repoid}, Commit: {commitid}"
)
super().__init__(error_msg)
Expand Down Expand Up @@ -93,6 +99,22 @@ def lock_name(self, lock_type: LockType):
else:
return f"{lock_type.value}{LOCK_NAME_SEPARATOR}{self.repoid}_{self.commitid}_{self.report_type.value}"

def _clear_lock_attempt_counter(self, attempt_key: str, lock_name: str) -> None:
"""Clear the lock attempt counter. Log and swallow Redis errors so teardown does not mask other failures."""
try:
self.redis_connection.delete(attempt_key)
except (RedisConnectionError, RedisTimeoutError, OSError) as e:
log.warning(
"Failed to clear lock attempt counter (Redis unavailable or error)",
extra={
"attempt_key": attempt_key,
"commitid": self.commitid,
"lock_name": lock_name,
"repoid": self.repoid,
},
exc_info=True,
)

@contextmanager
def locked(
self,
Expand All @@ -106,9 +128,10 @@ def locked(
lock_type: Type of lock to acquire
retry_num: Attempt count (should be self.attempts from BaseCodecovTask).
Used for both exponential backoff and max retry checking.
max_retries: Maximum number of retries allowed
max_retries: Maximum total attempts (stop when attempts >= this).
"""
lock_name = self.lock_name(lock_type)
attempt_key = f"{LOCK_ATTEMPTS_KEY_PREFIX}{lock_name}"
try:
log.info(
"Acquiring lock",
Expand All @@ -134,7 +157,10 @@ def locked(
"repoid": self.repoid,
},
)
yield
try:
yield
finally:
self._clear_lock_attempt_counter(attempt_key, lock_name)
lock_duration = time.time() - lock_acquired_time
log.info(
"Releasing lock",
Expand All @@ -147,6 +173,12 @@ def locked(
},
)
except LockError:
# incr/expire can raise RedisConnectionError/RedisTimeoutError when Redis
# is unavailable; we let those propagate so the task fails once (no infinite loop).
attempts = self.redis_connection.incr(attempt_key)
if attempts == 1:
self.redis_connection.expire(attempt_key, LOCK_ATTEMPTS_TTL_SECONDS)

max_retry_unbounded = self.base_retry_countdown * (
RETRY_BACKOFF_MULTIPLIER**retry_num
)
Expand All @@ -159,44 +191,42 @@ def locked(
)
countdown = countdown_unbounded

if max_retries is not None and retry_num > max_retries:
max_attempts = max_retries + 1
if max_retries is not None and attempts >= max_retries:
error = LockRetry(
countdown=0,
max_retries_exceeded=True,
retry_num=retry_num,
max_attempts=max_attempts,
retry_num=attempts,
max_retries=max_retries,
lock_name=lock_name,
repoid=self.repoid,
commitid=self.commitid,
)
log.error(
"Not retrying since we already had too many retries",
"Not retrying since we already had too many attempts",
extra={
"attempts": attempts,
"commitid": self.commitid,
"lock_name": lock_name,
"lock_type": lock_type.value,
"max_attempts": max_attempts,
"max_retries": max_retries,
"repoid": self.repoid,
"report_type": self.report_type.value,
"retry_num": retry_num,
},
exc_info=True,
)
sentry_sdk.capture_exception(
error,
contexts={
"lock_acquisition": {
"attempts": attempts,
"blocking_timeout": self.blocking_timeout,
"commitid": self.commitid,
"lock_name": lock_name,
"lock_timeout": self.lock_timeout,
"lock_type": lock_type.value,
"max_attempts": max_attempts,
"max_retries": max_retries,
"repoid": self.repoid,
"report_type": self.report_type.value,
"retry_num": retry_num,
}
},
tags={
Expand Down
35 changes: 22 additions & 13 deletions apps/worker/services/tests/test_lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def test_locked_logs_duration(self, mock_redis, caplog):
def test_locked_lock_error_raises_lock_retry(self, mock_redis):
"""Test that LockError raises LockRetry exception"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123")
with pytest.raises(LockRetry) as exc_info:
Expand All @@ -157,6 +158,7 @@ def test_locked_lock_error_raises_lock_retry(self, mock_redis):
def test_locked_exponential_backoff_retry_0(self, mock_redis):
"""Test exponential backoff calculation for retry_num=0"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123")
with pytest.raises(LockRetry) as exc_info:
Expand All @@ -169,6 +171,7 @@ def test_locked_exponential_backoff_retry_0(self, mock_redis):
def test_locked_exponential_backoff_retry_1(self, mock_redis):
"""Test exponential backoff calculation for retry_num=1"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123")
with pytest.raises(LockRetry) as exc_info:
Expand All @@ -181,6 +184,7 @@ def test_locked_exponential_backoff_retry_1(self, mock_redis):
def test_locked_exponential_backoff_retry_2(self, mock_redis):
"""Test exponential backoff calculation for retry_num=2"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123")
with pytest.raises(LockRetry) as exc_info:
Expand All @@ -193,6 +197,7 @@ def test_locked_exponential_backoff_retry_2(self, mock_redis):
def test_locked_exponential_backoff_cap(self, mock_redis):
"""Test that exponential backoff is capped at 5 hours"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123")
# Use a high retry_num that would exceed the cap
Expand All @@ -206,6 +211,7 @@ def test_locked_exponential_backoff_cap(self, mock_redis):
def test_locked_max_retries_not_provided(self, mock_redis, caplog):
"""Test that max_retries=None doesn't log error"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123")
with caplog.at_level(logging.ERROR):
Expand All @@ -219,6 +225,7 @@ def test_locked_max_retries_not_provided(self, mock_redis, caplog):
def test_locked_max_retries_not_exceeded(self, mock_redis, caplog):
"""Test that max_retries check doesn't log error when not exceeded"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123")
with caplog.at_level(logging.ERROR):
Expand All @@ -234,20 +241,22 @@ def test_locked_max_retries_not_exceeded(self, mock_redis, caplog):
assert len(error_logs) == 0

def test_locked_max_retries_exceeded(self, mock_redis, caplog):
"""Test that max_retries exceeded raises LockRetry with max_retries_exceeded=True"""
"""Test that max attempts exceeded raises LockRetry with max_retries_exceeded=True"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = (
3 # Redis attempt count >= max_retries (3 = 3 attempts)
)

manager = LockManager(repoid=123, commitid="abc123")
with caplog.at_level(logging.ERROR):
with pytest.raises(LockRetry) as exc_info:
with manager.locked(LockType.UPLOAD, retry_num=5, max_retries=3):
pass

# Should raise LockRetry with max_retries_exceeded=True
assert isinstance(exc_info.value, LockRetry)
assert exc_info.value.max_retries_exceeded is True
assert exc_info.value.retry_num == 5
assert exc_info.value.max_attempts == 4 # max_retries + 1
assert exc_info.value.retry_num == 3 # from Redis incr
assert exc_info.value.max_retries == 3
assert exc_info.value.lock_name == "upload_lock_123_abc123"
assert exc_info.value.repoid == 123
assert exc_info.value.commitid == "abc123"
Expand All @@ -256,41 +265,40 @@ def test_locked_max_retries_exceeded(self, mock_redis, caplog):
error_logs = [
r
for r in caplog.records
if r.levelname == "ERROR" and "too many retries" in r.message
if r.levelname == "ERROR" and "too many attempts" in r.message
]
assert len(error_logs) == 1
assert error_logs[0].__dict__["max_retries"] == 3
assert error_logs[0].__dict__["retry_num"] == 5
assert error_logs[0].__dict__["attempts"] == 3

def test_locked_max_retries_exceeded_at_boundary(self, mock_redis, caplog):
"""Test that max_retries boundary condition raises LockRetry with max_retries_exceeded=True"""
"""Test that max attempts boundary (attempts >= max_retries) raises LockRetry"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 3 # attempts >= max_retries (3)

manager = LockManager(repoid=123, commitid="abc123")
with caplog.at_level(logging.ERROR):
with pytest.raises(LockRetry) as exc_info:
# retry_num now represents self.attempts (starts at 1)
# max_retries=3 means max_attempts=4, so retry_num=4 should exceed
with manager.locked(LockType.UPLOAD, retry_num=4, max_retries=3):
pass

# Should raise LockRetry with max_retries_exceeded=True
assert isinstance(exc_info.value, LockRetry)
assert exc_info.value.max_retries_exceeded is True
assert exc_info.value.retry_num == 4
assert exc_info.value.max_attempts == 4
assert exc_info.value.retry_num == 3
assert exc_info.value.max_retries == 3
assert exc_info.value.countdown == 0

error_logs = [
r
for r in caplog.records
if r.levelname == "ERROR" and "too many retries" in r.message
if r.levelname == "ERROR" and "too many attempts" in r.message
]
assert len(error_logs) == 1

def test_locked_warning_logged_on_lock_error(self, mock_redis, caplog):
"""Test that warning is logged when lock cannot be acquired"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123")
with caplog.at_level(logging.WARNING):
Expand Down Expand Up @@ -358,6 +366,7 @@ def test_locked_blocking_timeout_enables_retry_logic(self, mock_redis):
"""
# When blocking_timeout is set, Redis raises LockError after timeout
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

manager = LockManager(repoid=123, commitid="abc123", blocking_timeout=5)

Expand Down
23 changes: 10 additions & 13 deletions apps/worker/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ def _get_request_headers(request) -> dict:


class BaseCodecovTask(celery_app.Task):
"""Base task for Codecov workers.

In this codebase, max_retries is the maximum total attempts (cap on runs).
We stop when attempts >= max_retries. The name matches Celery/config.
"""

Request = BaseCodecovRequest

def __init_subclass__(cls, name=None):
Expand Down Expand Up @@ -306,40 +312,31 @@ def attempts(self) -> int:
return attempts_value

def _has_exceeded_max_attempts(self, max_retries: int | None) -> bool:
"""Check if task has exceeded max attempts."""
"""Return True if attempts >= max_retries (max_retries is max total attempts)."""
if max_retries is None:
return False

max_attempts = max_retries + 1
return self.attempts >= max_attempts
return self.attempts >= max_retries

def safe_retry(self, countdown=None, exc=None, **kwargs):
"""Safely retry with max retry limit. Returns False if max exceeded, otherwise raises Retry.

Uses self.max_retries to determine the retry limit. Tasks define max_retries as a class
attribute, so it's known at instantiation and doesn't change.
"""
"""Safely retry with max retry limit. Returns False if max exceeded, otherwise raises Retry."""
if self._has_exceeded_max_attempts(self.max_retries):
# If we're here, self.max_retries is not None (otherwise _has_exceeded_max_attempts returns False)
max_attempts = self.max_retries + 1
log.error(
f"Task {self.name} exceeded max retries",
extra={
"attempts": self.attempts,
"max_attempts": max_attempts,
"max_retries": self.max_retries,
"task_name": self.name,
},
)
TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc()
sentry_sdk.capture_exception(
MaxRetriesExceededError(
f"Task {self.name} exceeded max retries: {self.attempts} >= {max_attempts}"
f"Task {self.name} exceeded max retries: {self.attempts} >= {self.max_retries}"
),
contexts={
"task": {
"attempts": self.attempts,
"max_attempts": max_attempts,
"max_retries": self.max_retries,
"task_name": self.name,
}
Expand Down
2 changes: 1 addition & 1 deletion apps/worker/tasks/bundle_analysis_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def run_impl(
"commitid": commitid,
"repoid": repoid,
"retry_num": retry.retry_num,
"max_attempts": retry.max_attempts,
"max_retries": retry.max_retries,
},
)
return {
Expand Down
Loading
Loading