Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions autogpt_platform/backend/backend/executor/cluster_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Redis-based distributed locking for cluster coordination."""

import logging
import time
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from redis import Redis

logger = logging.getLogger(__name__)


class ClusterLock:
"""Simple Redis-based distributed lock for preventing duplicate execution."""

def __init__(self, redis: "Redis", key: str, owner_id: str, timeout: int = 300):
self.redis = redis
self.key = key
self.owner_id = owner_id
self.timeout = timeout
self._last_refresh = 0.0

def try_acquire(self) -> str | None:
"""Try to acquire the lock.

Returns:
- owner_id (self.owner_id) if successfully acquired
- different owner_id if someone else holds the lock
- None if Redis is unavailable or other error
"""
try:
success = self.redis.set(self.key, self.owner_id, nx=True, ex=self.timeout)
if success:
self._last_refresh = time.time()
return self.owner_id # Successfully acquired

# Failed to acquire, get current owner
current_value = self.redis.get(self.key)
if current_value:
current_owner = (
current_value.decode("utf-8")
if isinstance(current_value, bytes)
else str(current_value)
)
return current_owner

# Key doesn't exist but we failed to set it - race condition or Redis issue
return None

except Exception as e:
logger.error(f"ClusterLock.try_acquire failed for key {self.key}: {e}")
return None

def refresh(self) -> bool:
"""Refresh lock TTL if we still own it.

Rate limited to at most once every timeout/10 seconds (minimum 1 second).
During rate limiting, still verifies lock existence but skips TTL extension.
Setting _last_refresh to 0 bypasses rate limiting for testing.
"""
# Calculate refresh interval: max(timeout // 10, 1)
refresh_interval = max(self.timeout // 10, 1)
current_time = time.time()

# Check if we're within the rate limit period
# _last_refresh == 0 forces a refresh (bypasses rate limiting for testing)
is_rate_limited = (
self._last_refresh > 0
and (current_time - self._last_refresh) < refresh_interval
)

try:
# Always verify lock existence, even during rate limiting
current_value = self.redis.get(self.key)
if not current_value:
self._last_refresh = 0
return False

stored_owner = (
current_value.decode("utf-8")
if isinstance(current_value, bytes)
else str(current_value)
)
if stored_owner != self.owner_id:
self._last_refresh = 0
return False

# If rate limited, return True but don't update TTL or timestamp
if is_rate_limited:
return True

# Perform actual refresh
if self.redis.expire(self.key, self.timeout):
self._last_refresh = current_time
return True

self._last_refresh = 0
return False

except Exception as e:
logger.error(f"ClusterLock.refresh failed for key {self.key}: {e}")
self._last_refresh = 0
return False

def release(self):
"""Release the lock."""
if self._last_refresh == 0:
return

try:
self.redis.delete(self.key)
except Exception:
pass

self._last_refresh = 0.0
Loading