From ba2d4dabbdad658ec4c044910b32935bb94d9d63 Mon Sep 17 00:00:00 2001 From: Robert French Date: Tue, 24 Mar 2026 14:46:13 -0700 Subject: [PATCH] Add __str__/__repr__ to all public classes and fix stale lock cleanup Fix #21: Add __str__ and __repr__ methods to FileCache, FileCacheSource (and all subclasses), and enhance FCPath's __repr__ to include non-default parameters (anonymous, lock_timeout, nthreads). Also add cache_name property to FileCache. Fix #56: Add clean_up_stale_locks() method to FileCache that walks the cache directory and removes orphaned lock files left by crashed processes. Also improve the _retrieve_multi_locked wait loop to automatically detect and clean up stale locks by attempting to acquire them, rather than waiting until timeout. Made-with: Cursor --- README.md | 18 +++++++ filecache/file_cache.py | 86 ++++++++++++++++++++++++++++++++- filecache/file_cache_path.py | 9 +++- filecache/file_cache_source.py | 17 +++++++ tests/test_file_cache.py | 76 +++++++++++++++++++++++++++++ tests/test_file_cache_path.py | 9 ++++ tests/test_file_cache_source.py | 33 +++++++++++++ 7 files changed, 246 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 631dd29..5a9a297 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,24 @@ src2.retrieve('subdir1/subdir2a/binary1.bin', 'local_file2.bin') Details of each class are available in the [module documentation](https://rms-filecache.readthedocs.io/en/latest/module.html). +## Stale Lock Cleanup + +When using shared caches with multiprocessor-safe locking (`mp_safe=True`), a process that +crashes while downloading a file may leave behind a lock file. While the OS-level lock is +released on process exit, the lock file remains on disk and can cause other processes to +wait or time out. `FileCache` provides a method to clean up these stale locks: + +```python +fc = FileCache('shared-cache') +removed = fc.clean_up_stale_locks() +print(f'Cleaned up {removed} stale lock(s)') +``` + +The method walks the cache directory, identifies lock files that are not actively held by +any process, and removes them. Active locks held by running processes are left untouched. +Additionally, the multi-file retrieval path automatically detects and cleans up stale locks +from crashed processes during the download wait loop. + # Contributing Information on contributing to this package can be found in the diff --git a/filecache/file_cache.py b/filecache/file_cache.py index a98ebc9..0fb0bf2 100644 --- a/filecache/file_cache.py +++ b/filecache/file_cache.py @@ -263,6 +263,7 @@ def __init__(self, else: raise TypeError(f'cache_name argument {cache_name} is of improper type') + self._cache_name = cache_name is_shared = (cache_name is not None) self._delete_on_exit = (delete_on_exit if delete_on_exit is not None @@ -336,6 +337,12 @@ def _validate_nthreads(self, def registered_scheme_prefixes(self) -> tuple[str, ...]: return tuple([x + '://' for x in _SCHEME_CLASSES]) + @property + def cache_name(self) -> str | None: + """The name of this cache, or None if unnamed.""" + + return self._cache_name + @property def cache_dir(self) -> Path: """The top-level directory of the cache as a Path object.""" @@ -438,6 +445,62 @@ def _log_error(self, msg: str) -> None: if logger: logger.error(msg) # type: ignore + def __str__(self) -> str: + return str(self._cache_dir) + + def __repr__(self) -> str: + parts = [repr(self._cache_name)] + if self._delete_on_exit: + parts.append('delete_on_exit=True') + if self._time_sensitive: + parts.append('time_sensitive=True') + if self._is_mp_safe: + parts.append('mp_safe=True') + if self._anonymous: + parts.append('anonymous=True') + if self._lock_timeout != 60: + parts.append(f'lock_timeout={self._lock_timeout!r}') + if self._nthreads != 8: + parts.append(f'nthreads={self._nthreads!r}') + return f'FileCache({", ".join(parts)})' + + def clean_up_stale_locks(self) -> int: + """Remove stale lock files left behind by crashed processes. + + Walks the cache directory looking for lock files. For each one found, + attempts to acquire the lock with no timeout. If the lock can be acquired, + no other process holds it and the lock file is stale; it is removed. If the + lock cannot be acquired, another process is actively using it and it is left + alone. + + Returns: + The number of stale lock files removed. + """ + + removed = 0 + if not self._cache_dir.is_dir(): + return removed + + for root, _dirs, files in os.walk(self._cache_dir): + for name in files: + if not name.startswith(self._LOCK_PREFIX): + continue + lock_path = Path(root) / name + lock = filelock.FileLock(lock_path, timeout=0) + try: + lock.acquire() + except filelock._error.Timeout: + self._log_debug(f'Lock file is active, skipping: {lock_path}') + continue + try: + lock_path.unlink(missing_ok=True) + removed += 1 + self._log_info(f'Removed stale lock file: {lock_path}') + finally: + lock.release() + + return removed + @staticmethod def _split_url(url: str) -> tuple[str, str, str]: url = url.replace('\\', '/') @@ -1646,6 +1709,9 @@ def _retrieve_multi_locked(self, # sit here and wait for all of the missing files to magically show up, or for # us to time out. If the lock file disappears but the destination file isn't # present, that means the other process failed in its download. + # We also check for stale locks left by crashed processes: if we can acquire + # a lock that we previously couldn't, the holding process must have exited + # without cleaning up. timed_out = False while wait_to_appear: new_wait_to_appear = [] @@ -1659,7 +1725,25 @@ def _retrieve_multi_locked(self, f'Another process failed to download {url}') self._log_debug(f' Download elsewhere failed: {url}') continue - new_wait_to_appear.append((idx, url, local_path, lock_path)) + # Check for stale lock: try acquiring it to see if the holding + # process has exited without cleaning up + stale_lock = filelock.FileLock(lock_path, timeout=0) + try: + stale_lock.acquire() + except filelock._error.Timeout: + # Lock is still actively held by another process + new_wait_to_appear.append((idx, url, local_path, lock_path)) + continue + # We acquired the lock, meaning the previous holder exited. + # The file wasn't downloaded, so this is a failed download + # from a crashed process. + stale_lock.release() + lock_path.unlink(missing_ok=True) + self._log_info(f' Cleaned up stale lock for: {url}') + func_ret[idx] = FileNotFoundError( + f'Another process failed to download {url} ' + f'(stale lock cleaned up)') + files_not_exist.append(url) if not new_wait_to_appear: break diff --git a/filecache/file_cache_path.py b/filecache/file_cache_path.py index 3ce7a1f..19f44c5 100644 --- a/filecache/file_cache_path.py +++ b/filecache/file_cache_path.py @@ -539,7 +539,14 @@ def splitpath(self, search_dir: str) -> tuple[FCPath, ...]: for i, j in zip(indices[:-1], indices[1:])) def __repr__(self) -> str: - return f'FCPath({self._path!r})' + parts = [repr(self._path)] + if self._anonymous is not None: + parts.append(f'anonymous={self._anonymous!r}') + if self._lock_timeout is not None: + parts.append(f'lock_timeout={self._lock_timeout!r}') + if self._nthreads is not None: + parts.append(f'nthreads={self._nthreads!r}') + return f'FCPath({", ".join(parts)})' def __eq__(self, other: object) -> bool: if not isinstance(other, FCPath): diff --git a/filecache/file_cache_source.py b/filecache/file_cache_source.py index 6185e4d..be07042 100644 --- a/filecache/file_cache_source.py +++ b/filecache/file_cache_source.py @@ -72,6 +72,15 @@ def __init__(self, # The _cache_subdir attribute is only used by the FileCache class self._cache_subdir = '' + def __str__(self) -> str: + return self._src_prefix + + def __repr__(self) -> str: + parts = [f'{self._scheme!r}', f'{self._remote!r}'] + if self._anonymous: + parts.append('anonymous=True') + return f'{type(self).__name__}({", ".join(parts)})' + @classmethod @abstractmethod def schemes(self) -> tuple[str, ...]: @@ -1787,6 +1796,14 @@ def __init__(self, self._storage_dir.mkdir(parents=True, exist_ok=True) self._cache_subdir = self._src_prefix.replace('fake://', 'fake_') + def __repr__(self) -> str: + parts = [f'{self._scheme!r}', f'{self._remote!r}'] + if self._anonymous: + parts.append('anonymous=True') + if self._storage_base != self._DEFAULT_STORAGE_DIR: + parts.append(f'storage_dir={str(self._storage_base)!r}') + return f'FileCacheSourceFake({", ".join(parts)})' + @classmethod def schemes(cls) -> tuple[str, ...]: """The URL schemes supported by this class.""" diff --git a/tests/test_file_cache.py b/tests/test_file_cache.py index b1ea126..d54bf2b 100644 --- a/tests/test_file_cache.py +++ b/tests/test_file_cache.py @@ -1070,6 +1070,82 @@ def test_cleanup_locking_bad(): fp.write('A') +def test_clean_up_stale_locks(): + with FileCache(cache_name=None, mp_safe=True) as fc: + local_path = fc.get_local_path('gs://test/test.txt') + lock_path = local_path.parent / f'{fc._LOCK_PREFIX}{local_path.name}' + lock_path.parent.mkdir(parents=True, exist_ok=True) + lock_path.write_text('stale') + assert lock_path.is_file() + removed = fc.clean_up_stale_locks() + assert removed == 1 + assert not lock_path.is_file() + + +def test_clean_up_stale_locks_active(): + with FileCache(cache_name=None, mp_safe=True) as fc: + local_path = fc.get_local_path('gs://test/test.txt') + lock_path = local_path.parent / f'{fc._LOCK_PREFIX}{local_path.name}' + lock_path.parent.mkdir(parents=True, exist_ok=True) + lock = filelock.FileLock(lock_path, timeout=0) + lock.acquire() + try: + removed = fc.clean_up_stale_locks() + assert removed == 0 + assert lock_path.is_file() + finally: + lock.release() + lock_path.unlink(missing_ok=True) + + +def test_clean_up_stale_locks_empty(): + with FileCache(cache_name=None) as fc: + removed = fc.clean_up_stale_locks() + assert removed == 0 + + +def test_clean_up_stale_locks_multiple(): + with FileCache(cache_name=None, mp_safe=True) as fc: + for i in range(3): + local_path = fc.get_local_path(f'gs://test/test{i}.txt') + lock_path = local_path.parent / f'{fc._LOCK_PREFIX}{local_path.name}' + lock_path.parent.mkdir(parents=True, exist_ok=True) + lock_path.write_text('stale') + removed = fc.clean_up_stale_locks() + assert removed == 3 + + +def test_filecache_str_repr(): + with FileCache(cache_name=None) as fc: + assert str(fc) == str(fc.cache_dir) + r = repr(fc) + assert r.startswith('FileCache(None') + assert 'delete_on_exit=True' in r or r.startswith('FileCache(None)') + + with FileCache('test-repr-cache', delete_on_exit=True) as fc: + assert str(fc) == str(fc.cache_dir) + r = repr(fc) + assert r.startswith("FileCache('test-repr-cache'") + assert 'mp_safe=True' in r + + with FileCache(cache_name=None, time_sensitive=True, + anonymous=True, lock_timeout=30, + nthreads=4) as fc: + r = repr(fc) + assert 'time_sensitive=True' in r + assert 'anonymous=True' in r + assert 'lock_timeout=30' in r + assert 'nthreads=4' in r + + +def test_filecache_cache_name(): + with FileCache(cache_name=None) as fc: + assert fc.cache_name is None + + with FileCache('test-name-cache', delete_on_exit=True) as fc: + assert fc.cache_name == 'test-name-cache' + + def test_bad_cache_dir(): with pytest.raises(ValueError): with FileCache(cache_name=None) as fc: diff --git a/tests/test_file_cache_path.py b/tests/test_file_cache_path.py index 893e767..1562def 100644 --- a/tests/test_file_cache_path.py +++ b/tests/test_file_cache_path.py @@ -180,6 +180,15 @@ def test__repr(): assert repr(FCPath(Path('a/b'))) == "FCPath('a/b')" assert repr(FCPath(r'\a\b')) == "FCPath('/a/b')" + r = repr(FCPath('a/b', anonymous=True)) + assert r == "FCPath('a/b', anonymous=True)" + + r = repr(FCPath('a/b', lock_timeout=30)) + assert r == "FCPath('a/b', lock_timeout=30)" + + r = repr(FCPath('a/b', anonymous=False, lock_timeout=10, nthreads=4)) + assert r == "FCPath('a/b', anonymous=False, lock_timeout=10, nthreads=4)" + def test_comparison(): p1a = FCPath('/a/b/c1.py') diff --git a/tests/test_file_cache_source.py b/tests/test_file_cache_source.py index 767ddde..d8a96a2 100644 --- a/tests/test_file_cache_source.py +++ b/tests/test_file_cache_source.py @@ -18,6 +18,39 @@ from .test_file_cache import EXPECTED_DIR, EXPECTED_FILENAMES +def test_source_str_repr(): + sl = FileCacheSourceFile('file', '') + assert str(sl) == 'file://' + assert repr(sl) == "FileCacheSourceFile('file', '')" + + sh = FileCacheSourceHTTP('https', 'example.com') + assert str(sh) == 'https://example.com' + assert repr(sh) == "FileCacheSourceHTTP('https', 'example.com')" + + sg = FileCacheSourceGS('gs', 'my-bucket', anonymous=True) + assert str(sg) == 'gs://my-bucket' + assert repr(sg) == "FileCacheSourceGS('gs', 'my-bucket', anonymous=True)" + + ss = FileCacheSourceS3('s3', 'my-bucket') + assert str(ss) == 's3://my-bucket' + assert repr(ss) == "FileCacheSourceS3('s3', 'my-bucket')" + + FileCacheSourceFake.delete_default_storage_dir() + try: + sf = FileCacheSourceFake('fake', 'test-bucket') + assert str(sf) == 'fake://test-bucket' + assert repr(sf) == "FileCacheSourceFake('fake', 'test-bucket')" + finally: + FileCacheSourceFake.delete_default_storage_dir() + + +def test_source_fake_repr_custom_storage(tmp_path: Path): + sf = FileCacheSourceFake('fake', 'bucket', storage_dir=tmp_path) + r = repr(sf) + assert r.startswith("FileCacheSourceFake('fake', 'bucket'") + assert 'storage_dir=' in r + + def test_source_bad(): with pytest.raises(ValueError): FileCacheSourceFile('fred', 'hi')