Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,24 @@ src2.retrieve('subdir1/subdir2a/binary1.bin', 'local_file2.bin')

Details of each class are available in the [module documentation](https://rms-filecache.readthedocs.io/en/latest/module.html).

## Stale Lock Cleanup

When using shared caches with multiprocessor-safe locking (`mp_safe=True`), a process that
crashes while downloading a file may leave behind a lock file. While the OS-level lock is
released on process exit, the lock file remains on disk and can cause other processes to
wait or time out. `FileCache` provides a method to clean up these stale locks:

```python
fc = FileCache('shared-cache')
removed = fc.clean_up_stale_locks()
print(f'Cleaned up {removed} stale lock(s)')
```
Comment on lines +207 to +211
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the stale-lock example self-contained and explicit about mp_safe.

The snippet currently omits the import and does not reflect the mp_safe=True context described above it.

📝 Proposed doc fix
 ```python
-fc = FileCache('shared-cache')
+from filecache import FileCache
+
+fc = FileCache('shared-cache', mp_safe=True)
 removed = fc.clean_up_stale_locks()
 print(f'Cleaned up {removed} stale lock(s)')
</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @README.md around lines 207 - 211, The example uses
FileCache.clean_up_stale_locks() but omits the import and the mp_safe=True
context; update the snippet to import FileCache and construct it with
mp_safe=True (i.e., use FileCache('shared-cache', mp_safe=True)) before calling
clean_up_stale_locks() so the example is self-contained and matches the
multiprocessing-safe discussion.


</details>

<!-- fingerprinting:phantom:poseidon:hawk -->

<!-- This is an auto-generated comment by CodeRabbit -->


The method walks the cache directory, identifies lock files that are not actively held by
any process, and removes them. Active locks held by running processes are left untouched.
Additionally, the multi-file retrieval path automatically detects and cleans up stale locks
from crashed processes during the download wait loop.

# Contributing

Information on contributing to this package can be found in the
Expand Down
86 changes: 85 additions & 1 deletion filecache/file_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def __init__(self,
else:
raise TypeError(f'cache_name argument {cache_name} is of improper type')

self._cache_name = cache_name
is_shared = (cache_name is not None)

self._delete_on_exit = (delete_on_exit if delete_on_exit is not None
Expand Down Expand Up @@ -336,6 +337,12 @@ def _validate_nthreads(self,
def registered_scheme_prefixes(self) -> tuple[str, ...]:
return tuple([x + '://' for x in _SCHEME_CLASSES])

@property
def cache_name(self) -> str | None:
"""The name of this cache, or None if unnamed."""

return self._cache_name

@property
def cache_dir(self) -> Path:
"""The top-level directory of the cache as a Path object."""
Expand Down Expand Up @@ -438,6 +445,62 @@ def _log_error(self, msg: str) -> None:
if logger:
logger.error(msg) # type: ignore

def __str__(self) -> str:
return str(self._cache_dir)

def __repr__(self) -> str:
parts = [repr(self._cache_name)]
if self._delete_on_exit:
parts.append('delete_on_exit=True')
if self._time_sensitive:
parts.append('time_sensitive=True')
if self._is_mp_safe:
parts.append('mp_safe=True')
if self._anonymous:
parts.append('anonymous=True')
if self._lock_timeout != 60:
parts.append(f'lock_timeout={self._lock_timeout!r}')
if self._nthreads != 8:
parts.append(f'nthreads={self._nthreads!r}')
return f'FileCache({", ".join(parts)})'
Comment on lines +451 to +465
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

__repr__ drops the non-default lifecycle and locking states.

delete_on_exit and mp_safe are only rendered when they are True. That makes FileCache(None, delete_on_exit=False) and FileCache('shared', mp_safe=False) look like default instances even though their behavior is the opposite.

🛠️ Proposed fix
     def __repr__(self) -> str:
         parts = [repr(self._cache_name)]
-        if self._delete_on_exit:
-            parts.append('delete_on_exit=True')
+        default_delete_on_exit = (self._cache_name is None)
+        if self._delete_on_exit != default_delete_on_exit:
+            parts.append(f'delete_on_exit={self._delete_on_exit!r}')
         if self._time_sensitive:
             parts.append('time_sensitive=True')
-        if self._is_mp_safe:
-            parts.append('mp_safe=True')
+        default_mp_safe = (self._cache_name is not None)
+        if self._is_mp_safe != default_mp_safe:
+            parts.append(f'mp_safe={self._is_mp_safe!r}')
         if self._anonymous:
             parts.append('anonymous=True')
         if self._lock_timeout != 60:
             parts.append(f'lock_timeout={self._lock_timeout!r}')
         if self._nthreads != 8:
             parts.append(f'nthreads={self._nthreads!r}')
         return f'FileCache({", ".join(parts)})'
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __repr__(self) -> str:
parts = [repr(self._cache_name)]
if self._delete_on_exit:
parts.append('delete_on_exit=True')
if self._time_sensitive:
parts.append('time_sensitive=True')
if self._is_mp_safe:
parts.append('mp_safe=True')
if self._anonymous:
parts.append('anonymous=True')
if self._lock_timeout != 60:
parts.append(f'lock_timeout={self._lock_timeout!r}')
if self._nthreads != 8:
parts.append(f'nthreads={self._nthreads!r}')
return f'FileCache({", ".join(parts)})'
def __repr__(self) -> str:
parts = [repr(self._cache_name)]
default_delete_on_exit = (self._cache_name is None)
if self._delete_on_exit != default_delete_on_exit:
parts.append(f'delete_on_exit={self._delete_on_exit!r}')
if self._time_sensitive:
parts.append('time_sensitive=True')
default_mp_safe = (self._cache_name is not None)
if self._is_mp_safe != default_mp_safe:
parts.append(f'mp_safe={self._is_mp_safe!r}')
if self._anonymous:
parts.append('anonymous=True')
if self._lock_timeout != 60:
parts.append(f'lock_timeout={self._lock_timeout!r}')
if self._nthreads != 8:
parts.append(f'nthreads={self._nthreads!r}')
return f'FileCache({", ".join(parts)})'
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@filecache/file_cache.py` around lines 451 - 465, The __repr__ implementation
omits boolean lifecycle/locking flags when they are False, making non-default
states indistinguishable; change FileCache.__repr__ to always include the
boolean fields (self._delete_on_exit, self._time_sensitive, self._is_mp_safe,
self._anonymous) in the parts list by appending their actual values (e.g.,
f'delete_on_exit={self._delete_on_exit!r}') instead of only appending when True,
while keeping the existing conditional logic for lock_timeout and nthreads if
desired.


def clean_up_stale_locks(self) -> int:
"""Remove stale lock files left behind by crashed processes.

Walks the cache directory looking for lock files. For each one found,
attempts to acquire the lock with no timeout. If the lock can be acquired,
no other process holds it and the lock file is stale; it is removed. If the
lock cannot be acquired, another process is actively using it and it is left
alone.

Returns:
The number of stale lock files removed.
"""

removed = 0
if not self._cache_dir.is_dir():
return removed

for root, _dirs, files in os.walk(self._cache_dir):
for name in files:
if not name.startswith(self._LOCK_PREFIX):
continue
lock_path = Path(root) / name
lock = filelock.FileLock(lock_path, timeout=0)
try:
lock.acquire()
except filelock._error.Timeout:
self._log_debug(f'Lock file is active, skipping: {lock_path}')
continue
try:
lock_path.unlink(missing_ok=True)
removed += 1
self._log_info(f'Removed stale lock file: {lock_path}')
finally:
lock.release()
Comment on lines +489 to +500
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Release the stale lock before deleting its lock file.

This new cleanup path reverses the release/unlink order already used elsewhere in FileCache. That sequence is there for a reason: unlinking an acquired lock can fail on Windows and can split the lock on POSIX.

🔒 Proposed fix
-                try:
-                    lock_path.unlink(missing_ok=True)
-                    removed += 1
-                    self._log_info(f'Removed stale lock file: {lock_path}')
-                finally:
-                    lock.release()
+                lock.release()
+                lock_path.unlink(missing_ok=True)
+                removed += 1
+                self._log_info(f'Removed stale lock file: {lock_path}')
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
lock = filelock.FileLock(lock_path, timeout=0)
try:
lock.acquire()
except filelock._error.Timeout:
self._log_debug(f'Lock file is active, skipping: {lock_path}')
continue
try:
lock_path.unlink(missing_ok=True)
removed += 1
self._log_info(f'Removed stale lock file: {lock_path}')
finally:
lock.release()
lock = filelock.FileLock(lock_path, timeout=0)
try:
lock.acquire()
except filelock._error.Timeout:
self._log_debug(f'Lock file is active, skipping: {lock_path}')
continue
lock.release()
lock_path.unlink(missing_ok=True)
removed += 1
self._log_info(f'Removed stale lock file: {lock_path}')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@filecache/file_cache.py` around lines 489 - 500, The cleanup code acquires a
FileLock then unlinks the lock file before releasing it (lock_path.unlink then
lock.release), which can fail on Windows and split locks on POSIX; change the
order so the lock is released prior to deleting the lock file: after
successfully acquiring the lock (lock.acquire()) call lock.release() first, then
unlink the path (lock_path.unlink(...)), and keep the removed increment and
logging (_log_info/_log_debug) intact; ensure this adjustment is applied in the
stale-lock cleanup block that uses filelock.FileLock, lock.acquire,
lock.release, and lock_path.unlink.


return removed

@staticmethod
def _split_url(url: str) -> tuple[str, str, str]:
url = url.replace('\\', '/')
Expand Down Expand Up @@ -1646,6 +1709,9 @@ def _retrieve_multi_locked(self,
# sit here and wait for all of the missing files to magically show up, or for
# us to time out. If the lock file disappears but the destination file isn't
# present, that means the other process failed in its download.
# We also check for stale locks left by crashed processes: if we can acquire
# a lock that we previously couldn't, the holding process must have exited
# without cleaning up.
timed_out = False
while wait_to_appear:
new_wait_to_appear = []
Expand All @@ -1659,7 +1725,25 @@ def _retrieve_multi_locked(self,
f'Another process failed to download {url}')
self._log_debug(f' Download elsewhere failed: {url}')
continue
new_wait_to_appear.append((idx, url, local_path, lock_path))
# Check for stale lock: try acquiring it to see if the holding
# process has exited without cleaning up
stale_lock = filelock.FileLock(lock_path, timeout=0)
try:
stale_lock.acquire()
except filelock._error.Timeout:
# Lock is still actively held by another process
new_wait_to_appear.append((idx, url, local_path, lock_path))
continue
# We acquired the lock, meaning the previous holder exited.
# The file wasn't downloaded, so this is a failed download
# from a crashed process.
stale_lock.release()
lock_path.unlink(missing_ok=True)
self._log_info(f' Cleaned up stale lock for: {url}')
func_ret[idx] = FileNotFoundError(
f'Another process failed to download {url} '
f'(stale lock cleaned up)')
files_not_exist.append(url)

if not new_wait_to_appear:
break
Expand Down
9 changes: 8 additions & 1 deletion filecache/file_cache_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,14 @@ def splitpath(self, search_dir: str) -> tuple[FCPath, ...]:
for i, j in zip(indices[:-1], indices[1:]))

def __repr__(self) -> str:
return f'FCPath({self._path!r})'
parts = [repr(self._path)]
if self._anonymous is not None:
parts.append(f'anonymous={self._anonymous!r}')
if self._lock_timeout is not None:
parts.append(f'lock_timeout={self._lock_timeout!r}')
if self._nthreads is not None:
parts.append(f'nthreads={self._nthreads!r}')
return f'FCPath({", ".join(parts)})'

def __eq__(self, other: object) -> bool:
if not isinstance(other, FCPath):
Expand Down
17 changes: 17 additions & 0 deletions filecache/file_cache_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ def __init__(self,
# The _cache_subdir attribute is only used by the FileCache class
self._cache_subdir = ''

def __str__(self) -> str:
return self._src_prefix

def __repr__(self) -> str:
parts = [f'{self._scheme!r}', f'{self._remote!r}']
if self._anonymous:
parts.append('anonymous=True')
return f'{type(self).__name__}({", ".join(parts)})'

@classmethod
@abstractmethod
def schemes(self) -> tuple[str, ...]:
Expand Down Expand Up @@ -1787,6 +1796,14 @@ def __init__(self,
self._storage_dir.mkdir(parents=True, exist_ok=True)
self._cache_subdir = self._src_prefix.replace('fake://', 'fake_')

def __repr__(self) -> str:
parts = [f'{self._scheme!r}', f'{self._remote!r}']
if self._anonymous:
parts.append('anonymous=True')
if self._storage_base != self._DEFAULT_STORAGE_DIR:
parts.append(f'storage_dir={str(self._storage_base)!r}')
return f'FileCacheSourceFake({", ".join(parts)})'

@classmethod
def schemes(cls) -> tuple[str, ...]:
"""The URL schemes supported by this class."""
Expand Down
76 changes: 76 additions & 0 deletions tests/test_file_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,82 @@ def test_cleanup_locking_bad():
fp.write('A')


def test_clean_up_stale_locks():
with FileCache(cache_name=None, mp_safe=True) as fc:
local_path = fc.get_local_path('gs://test/test.txt')
lock_path = local_path.parent / f'{fc._LOCK_PREFIX}{local_path.name}'
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text('stale')
assert lock_path.is_file()
removed = fc.clean_up_stale_locks()
assert removed == 1
assert not lock_path.is_file()


def test_clean_up_stale_locks_active():
with FileCache(cache_name=None, mp_safe=True) as fc:
local_path = fc.get_local_path('gs://test/test.txt')
lock_path = local_path.parent / f'{fc._LOCK_PREFIX}{local_path.name}'
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock = filelock.FileLock(lock_path, timeout=0)
lock.acquire()
try:
removed = fc.clean_up_stale_locks()
assert removed == 0
assert lock_path.is_file()
finally:
lock.release()
lock_path.unlink(missing_ok=True)


def test_clean_up_stale_locks_empty():
with FileCache(cache_name=None) as fc:
removed = fc.clean_up_stale_locks()
assert removed == 0


def test_clean_up_stale_locks_multiple():
with FileCache(cache_name=None, mp_safe=True) as fc:
for i in range(3):
local_path = fc.get_local_path(f'gs://test/test{i}.txt')
lock_path = local_path.parent / f'{fc._LOCK_PREFIX}{local_path.name}'
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text('stale')
removed = fc.clean_up_stale_locks()
assert removed == 3

Comment on lines +1073 to +1116
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add a regression test for the wait-loop stale-lock recovery path.

These tests only call clean_up_stale_locks() directly. The other half of the change lives in _retrieve_multi_locked(), where a waiter should detect a lock that becomes stale and fail immediately instead of sleeping until timeout; that behavior is still uncovered.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_file_cache.py` around lines 1073 - 1116, Add a regression test
that exercises the wait-loop stale-lock recovery path in FileCache by simulating
a waiter in _retrieve_multi_locked: create a lock file at the _LOCK_PREFIX path
for a target returned by get_local_path, spawn or simulate a concurrent waiter
that calls the code path that would block in _retrieve_multi_locked (e.g., by
invoking the method that uses the wait loop or by mimicking the waiter
behavior), then make the lock file stale (modify its mtime or write invalid
content) while the waiter is waiting and assert the waiter exits/fails
immediately (no sleep until timeout) and that clean_up_stale_locks() removes the
stale lock; reference FileCache, _retrieve_multi_locked, clean_up_stale_locks,
get_local_path and _LOCK_PREFIX to locate the relevant code paths.


def test_filecache_str_repr():
with FileCache(cache_name=None) as fc:
assert str(fc) == str(fc.cache_dir)
r = repr(fc)
assert r.startswith('FileCache(None')
assert 'delete_on_exit=True' in r or r.startswith('FileCache(None)')

with FileCache('test-repr-cache', delete_on_exit=True) as fc:
assert str(fc) == str(fc.cache_dir)
r = repr(fc)
assert r.startswith("FileCache('test-repr-cache'")
assert 'mp_safe=True' in r

with FileCache(cache_name=None, time_sensitive=True,
anonymous=True, lock_timeout=30,
nthreads=4) as fc:
r = repr(fc)
assert 'time_sensitive=True' in r
assert 'anonymous=True' in r
assert 'lock_timeout=30' in r
assert 'nthreads=4' in r


def test_filecache_cache_name():
with FileCache(cache_name=None) as fc:
assert fc.cache_name is None

with FileCache('test-name-cache', delete_on_exit=True) as fc:
assert fc.cache_name == 'test-name-cache'


def test_bad_cache_dir():
with pytest.raises(ValueError):
with FileCache(cache_name=None) as fc:
Expand Down
9 changes: 9 additions & 0 deletions tests/test_file_cache_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ def test__repr():
assert repr(FCPath(Path('a/b'))) == "FCPath('a/b')"
assert repr(FCPath(r'\a\b')) == "FCPath('/a/b')"

r = repr(FCPath('a/b', anonymous=True))
assert r == "FCPath('a/b', anonymous=True)"

r = repr(FCPath('a/b', lock_timeout=30))
assert r == "FCPath('a/b', lock_timeout=30)"

r = repr(FCPath('a/b', anonymous=False, lock_timeout=10, nthreads=4))
assert r == "FCPath('a/b', anonymous=False, lock_timeout=10, nthreads=4)"


def test_comparison():
p1a = FCPath('/a/b/c1.py')
Expand Down
33 changes: 33 additions & 0 deletions tests/test_file_cache_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,39 @@
from .test_file_cache import EXPECTED_DIR, EXPECTED_FILENAMES


def test_source_str_repr():
sl = FileCacheSourceFile('file', '')
assert str(sl) == 'file://'
assert repr(sl) == "FileCacheSourceFile('file', '')"

sh = FileCacheSourceHTTP('https', 'example.com')
assert str(sh) == 'https://example.com'
assert repr(sh) == "FileCacheSourceHTTP('https', 'example.com')"

sg = FileCacheSourceGS('gs', 'my-bucket', anonymous=True)
assert str(sg) == 'gs://my-bucket'
assert repr(sg) == "FileCacheSourceGS('gs', 'my-bucket', anonymous=True)"

ss = FileCacheSourceS3('s3', 'my-bucket')
assert str(ss) == 's3://my-bucket'
assert repr(ss) == "FileCacheSourceS3('s3', 'my-bucket')"

FileCacheSourceFake.delete_default_storage_dir()
try:
sf = FileCacheSourceFake('fake', 'test-bucket')
assert str(sf) == 'fake://test-bucket'
assert repr(sf) == "FileCacheSourceFake('fake', 'test-bucket')"
finally:
FileCacheSourceFake.delete_default_storage_dir()


def test_source_fake_repr_custom_storage(tmp_path: Path):
sf = FileCacheSourceFake('fake', 'bucket', storage_dir=tmp_path)
r = repr(sf)
assert r.startswith("FileCacheSourceFake('fake', 'bucket'")
assert 'storage_dir=' in r


def test_source_bad():
with pytest.raises(ValueError):
FileCacheSourceFile('fred', 'hi')
Expand Down
Loading