Skip to content

Commit 6108ee2

Browse files
committed
simplify again
remove unused code typing one more
1 parent 7f62adb commit 6108ee2

File tree

2 files changed

+118
-91
lines changed

2 files changed

+118
-91
lines changed

src/integrations/prefect-redis/prefect_redis/locking.py

+75-69
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ class RedisLockManager(LockManager):
1919
username: The username to use when connecting to the Redis server
2020
password: The password to use when connecting to the Redis server
2121
ssl: Whether to use SSL when connecting to the Redis server
22-
# client and async_client are initialized lazily
22+
client: The Redis client used to communicate with the Redis server
23+
async_client: The asynchronous Redis client used to communicate with the Redis server
2324
2425
Example:
2526
Use with a cache policy:
@@ -64,11 +65,13 @@ def __init__(
6465
self.username = username
6566
self.password = password
6667
self.ssl = ssl
67-
self.client: Optional[Redis] = None
68-
self.async_client: Optional[AsyncRedis] = None
68+
# Clients are initialized by _init_clients
69+
self.client: Redis
70+
self.async_client: AsyncRedis
71+
self._init_clients() # Initialize clients here
6972
self._locks: dict[str, Lock | AsyncLock] = {}
7073

71-
# ---------- pickle helpers ----------
74+
# ---------- pickling ----------
7275
def __getstate__(self) -> dict[str, Any]:
7376
return {
7477
k: getattr(self, k)
@@ -77,31 +80,28 @@ def __getstate__(self) -> dict[str, Any]:
7780

7881
def __setstate__(self, state: dict[str, Any]) -> None:
7982
self.__dict__.update(state)
80-
self.client = None
81-
self.async_client = None
83+
self._init_clients() # Re-initialize clients here
8284
self._locks = {}
8385

8486
# ------------------------------------
8587

86-
def _ensure_clients(self) -> None:
87-
if self.client is None:
88-
self.client = Redis(
89-
host=self.host,
90-
port=self.port,
91-
db=self.db,
92-
username=self.username,
93-
password=self.password,
94-
ssl=self.ssl,
95-
)
96-
if self.async_client is None:
97-
self.async_client = AsyncRedis(
98-
host=self.host,
99-
port=self.port,
100-
db=self.db,
101-
username=self.username,
102-
password=self.password,
103-
ssl=self.ssl,
104-
)
88+
def _init_clients(self) -> None:
89+
self.client = Redis(
90+
host=self.host,
91+
port=self.port,
92+
db=self.db,
93+
username=self.username,
94+
password=self.password,
95+
ssl=self.ssl,
96+
)
97+
self.async_client = AsyncRedis(
98+
host=self.host,
99+
port=self.port,
100+
db=self.db,
101+
username=self.username,
102+
password=self.password,
103+
ssl=self.ssl,
104+
)
105105

106106
@staticmethod
107107
def _lock_name_for_key(key: str) -> str:
@@ -114,16 +114,24 @@ def acquire_lock(
114114
acquire_timeout: Optional[float] = None,
115115
hold_timeout: Optional[float] = None,
116116
) -> bool:
117-
self._ensure_clients()
117+
"""
118+
Acquires a lock synchronously.
119+
120+
Args:
121+
key: Unique identifier for the transaction record.
122+
holder: Unique identifier for the holder of the lock.
123+
acquire_timeout: Maximum time to wait for the lock to be acquired.
124+
hold_timeout: Maximum time to hold the lock.
125+
126+
Returns:
127+
True if the lock was acquired, False otherwise.
128+
"""
118129
lock_name = self._lock_name_for_key(key)
119130
lock = self._locks.get(lock_name)
120131

121-
if lock is not None and self.is_lock_holder(
122-
key, holder
123-
): # is_lock_holder will also call _ensure_clients
132+
if lock is not None and self.is_lock_holder(key, holder):
124133
return True
125134
else:
126-
# If lock is None, or not held by current holder, create/acquire new one.
127135
lock = Lock(
128136
self.client, lock_name, timeout=hold_timeout, thread_local=False
129137
)
@@ -139,7 +147,19 @@ async def aacquire_lock(
139147
acquire_timeout: Optional[float] = None,
140148
hold_timeout: Optional[float] = None,
141149
) -> bool:
142-
self._ensure_clients()
150+
"""
151+
Acquires a lock asynchronously.
152+
153+
Args:
154+
key: Unique identifier for the transaction record.
155+
holder: Unique identifier for the holder of the lock. Must match the
156+
holder provided when acquiring the lock.
157+
acquire_timeout: Maximum time to wait for the lock to be acquired.
158+
hold_timeout: Maximum time to hold the lock.
159+
160+
Returns:
161+
True if the lock was acquired, False otherwise.
162+
"""
143163
lock_name = self._lock_name_for_key(key)
144164
lock = self._locks.get(lock_name)
145165

@@ -149,8 +169,10 @@ async def aacquire_lock(
149169
else:
150170
lock = None
151171

172+
# Handles the case where a lock might have been released during a task retry
173+
# If the lock doesn't exist in Redis at all, this method will succeed even if
174+
# the holder ID doesn't match the original holder.
152175
if lock is None:
153-
assert self.async_client is not None, "Async client should be initialized"
154176
new_lock = AsyncLock(
155177
self.async_client, lock_name, timeout=hold_timeout, thread_local=False
156178
)
@@ -164,7 +186,21 @@ async def aacquire_lock(
164186
return False
165187

166188
def release_lock(self, key: str, holder: str) -> None:
167-
self._ensure_clients()
189+
"""
190+
Releases the lock on the corresponding transaction record.
191+
192+
Handles the case where a lock might have been released during a task retry
193+
If the lock doesn't exist in Redis at all, this method will succeed even if
194+
the holder ID doesn't match the original holder.
195+
196+
Args:
197+
key: Unique identifier for the transaction record.
198+
holder: Unique identifier for the holder of the lock. Must match the
199+
holder provided when acquiring the lock.
200+
201+
Raises:
202+
ValueError: If the lock is held by a different holder.
203+
"""
168204
lock_name = self._lock_name_for_key(key)
169205
lock = self._locks.get(lock_name)
170206

@@ -173,67 +209,37 @@ def release_lock(self, key: str, holder: str) -> None:
173209
del self._locks[lock_name]
174210
return
175211

176-
if not self.is_locked(key): # is_locked calls _ensure_clients
212+
# If the lock doesn't exist in Redis at all, it's already been released
213+
if not self.is_locked(key):
177214
if lock_name in self._locks:
178215
del self._locks[lock_name]
179216
return
180217

218+
# We have a real conflict - lock exists in Redis but with a different holder
181219
raise ValueError(f"No lock held by {holder} for transaction with key {key}")
182220

183-
async def arelease_lock(self, key: str, holder: str) -> None: # Added async version
184-
self._ensure_clients()
185-
lock_name = self._lock_name_for_key(key)
186-
lock = self._locks.get(lock_name)
187-
188-
if lock is not None and isinstance(
189-
lock, AsyncLock
190-
): # Still need to check if it *is* an AsyncLock to call await .owned()
191-
if await lock.owned() and lock.local.token == holder.encode():
192-
await lock.release()
193-
del self._locks[lock_name]
194-
return
195-
196-
# Check if the lock key exists on the server at all.
197-
if not AsyncLock(self.async_client, lock_name).locked():
198-
# If the lock doesn't exist on the server, it's already effectively released.
199-
# Clean up from self._locks if it was there but holder didn't match.
200-
if lock_name in self._locks:
201-
del self._locks[lock_name]
202-
return
203-
204-
raise ValueError(
205-
f"No lock held by {holder} for transaction with key {key} (async)"
206-
)
207-
208221
def wait_for_lock(self, key: str, timeout: Optional[float] = None) -> bool:
209-
self._ensure_clients()
210222
lock_name = self._lock_name_for_key(key)
211-
lock = Lock(self.client, lock_name) # Create a temporary lock for waiting
223+
lock = Lock(self.client, lock_name)
212224
lock_freed = lock.acquire(blocking_timeout=timeout)
213225
if lock_freed:
214226
lock.release()
215227
return lock_freed
216228

217229
async def await_for_lock(self, key: str, timeout: Optional[float] = None) -> bool:
218-
self._ensure_clients()
219230
lock_name = self._lock_name_for_key(key)
220-
assert self.async_client is not None, "Async client should be initialized"
221-
lock = AsyncLock(
222-
self.async_client, lock_name
223-
) # Create a temporary lock for waiting
231+
lock = AsyncLock(self.async_client, lock_name)
224232
lock_freed = await lock.acquire(blocking_timeout=timeout)
225233
if lock_freed:
226234
await lock.release()
227235
return lock_freed
228236

229237
def is_locked(self, key: str) -> bool:
230-
self._ensure_clients()
231238
lock_name = self._lock_name_for_key(key)
232-
lock = Lock(self.client, lock_name) # Create a temporary lock for checking
239+
lock = Lock(self.client, lock_name)
233240
return lock.locked()
234241

235242
def is_lock_holder(self, key: str, holder: str) -> bool:
236-
self._ensure_clients() # Ensures self.client is available if needed by _locks access logic
237243
lock_name = self._lock_name_for_key(key)
238244
lock = self._locks.get(lock_name)
239245
if lock is None:

src/integrations/prefect-redis/tests/test_locking.py

+43-22
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import pickle
23
import queue
34
import threading
@@ -261,33 +262,42 @@ async def test_pickle_unpickle_and_use_lock_manager(
261262
):
262263
"""
263264
Tests that RedisLockManager can be pickled, unpickled, and then used successfully,
264-
ensuring lazy client initialization works.
265+
ensuring clients are re-initialized correctly on unpickle.
265266
"""
266-
# Initial state checks for the provided fixture instance
267-
# (clients should be None due to our __init__ and __setstate__ if it were unpickled,
268-
# or because _ensure_clients hasn't been called on a fresh instance)
269-
assert lock_manager.client is None, "Initial client should be None"
270-
assert lock_manager.async_client is None, "Initial async_client should be None"
267+
# With the new __init__, clients are initialized immediately.
268+
# So, no initial check for them being None.
269+
270+
# Store original client IDs for comparison after unpickling
271+
original_client_id = id(lock_manager.client)
272+
original_async_client_id = id(lock_manager.async_client)
271273

272274
# Pickle and unpickle
273275
pickled_manager = pickle.dumps(lock_manager)
274276
unpickled_manager: RedisLockManager = pickle.loads(pickled_manager)
275277

276-
# Verify state after unpickling (clients should be None due to __setstate__)
277-
assert unpickled_manager.client is None, (
278-
"Client should be None after unpickling"
278+
# Verify state after unpickling: clients should be NEW instances due to __setstate__ calling _init_clients()
279+
assert unpickled_manager.client is not None, (
280+
"Client should be re-initialized after unpickling, not None"
279281
)
280-
assert unpickled_manager.async_client is None, (
281-
"Async client should be None after unpickling"
282+
assert unpickled_manager.async_client is not None, (
283+
"Async client should be re-initialized after unpickling, not None"
282284
)
283-
# Accessing _locks directly is for testing internals.
285+
286+
assert id(unpickled_manager.client) != original_client_id, (
287+
"Client should be a NEW instance after unpickling"
288+
)
289+
assert id(unpickled_manager.async_client) != original_async_client_id, (
290+
"Async client should be a NEW instance after unpickling"
291+
)
292+
293+
# _locks should be an empty dict after unpickling due to __setstate__
284294
assert (
285295
hasattr(unpickled_manager, "_locks")
286296
and isinstance(getattr(unpickled_manager, "_locks"), dict)
287297
and not getattr(unpickled_manager, "_locks")
288298
), "_locks should be an empty dict after unpickling"
289299

290-
# Test synchronous operations (should trigger _ensure_clients)
300+
# Test synchronous operations (clients are already initialized)
291301
sync_key = "test_sync_pickle_key"
292302
sync_holder = "sync_pickle_holder"
293303

@@ -307,29 +317,40 @@ async def test_pickle_unpickle_and_use_lock_manager(
307317
# Test asynchronous operations (should trigger _ensure_clients for async_client)
308318
async_key = "test_async_pickle_key"
309319
async_holder = "async_pickle_holder"
320+
hold_timeout_seconds = 0.2 # Use a short hold timeout for testing expiration
310321

311322
acquired_async = await unpickled_manager.aacquire_lock(
312-
async_key, holder=async_holder, acquire_timeout=1, hold_timeout=5
323+
async_key,
324+
holder=async_holder,
325+
acquire_timeout=1,
326+
hold_timeout=hold_timeout_seconds,
313327
)
314328
assert acquired_async, "Should acquire async lock after unpickling"
315329
assert unpickled_manager.async_client is not None, (
316330
"Async client should be initialized after async use"
317331
)
318332

319-
# Verify holder by re-acquiring (should succeed) and then releasing.
333+
# Verify holder by re-acquiring (should succeed as it's the same holder and lock is fresh)
320334
assert await unpickled_manager.aacquire_lock(
321-
async_key, holder=async_holder, acquire_timeout=1, hold_timeout=5
335+
async_key,
336+
holder=async_holder,
337+
acquire_timeout=1,
338+
hold_timeout=hold_timeout_seconds,
322339
), "Re-acquiring same async lock should succeed"
323340

324-
await unpickled_manager.arelease_lock(async_key, async_holder)
341+
# Wait for the lock to expire based on hold_timeout
342+
await asyncio.sleep(
343+
hold_timeout_seconds + 0.1
344+
) # Wait a bit longer than the timeout
325345

326-
# Verify it's released by trying to acquire with a different holder.
346+
# Verify it's released (expired) by trying to acquire with a different holder.
327347
new_async_holder = "new_async_pickle_holder"
328348
acquired_by_new = await unpickled_manager.aacquire_lock(
329-
async_key, holder=new_async_holder, acquire_timeout=1, hold_timeout=5
349+
async_key,
350+
holder=new_async_holder,
351+
acquire_timeout=1,
352+
hold_timeout=hold_timeout_seconds,
330353
)
331354
assert acquired_by_new, (
332-
"Should acquire async lock with new holder after release"
355+
"Should acquire async lock with new holder after original lock expires"
333356
)
334-
if acquired_by_new: # Cleanup if acquired
335-
await unpickled_manager.arelease_lock(async_key, new_async_holder)

0 commit comments

Comments
 (0)