diff --git a/filecache/__init__.py b/filecache/__init__.py index 5205899..041fd83 100644 --- a/filecache/__init__.py +++ b/filecache/__init__.py @@ -174,7 +174,6 @@ FileCacheSourceGS, FileCacheSourceS3, FileCacheSourceFake) - __all__ = ['get_global_logger', 'register_filecachesource', 'set_easy_logger', diff --git a/filecache/file_cache.py b/filecache/file_cache.py index a98ebc9..b33f5f1 100644 --- a/filecache/file_cache.py +++ b/filecache/file_cache.py @@ -264,6 +264,7 @@ def __init__(self, raise TypeError(f'cache_name argument {cache_name} is of improper type') is_shared = (cache_name is not None) + self._cache_name = cache_name self._delete_on_exit = (delete_on_exit if delete_on_exit is not None else not is_shared) @@ -418,6 +419,15 @@ def logger(self) -> Logger | None: return _GLOBAL_LOGGER return cast(Logger, self._logger) + def __repr__(self) -> str: + return (f'FileCache({self._cache_name!r}, ' + f'anonymous={self._anonymous!r}, ' + f'lock_timeout={self._lock_timeout!r}, ' + f'nthreads={self._nthreads!r})') + + def __str__(self) -> str: + return str(self._cache_dir) + def _log_debug(self, msg: str) -> None: logger = self.logger if logger: @@ -1600,7 +1610,7 @@ def _retrieve_multi_locked(self, except filelock._error.Timeout: self._log_debug(f' Failed to lock: {sub_path}') wait_to_appear.append((idx, f'{source_pfx}{sub_path}', local_path, - lock_path)) + lock_path, source, sub_path)) continue lock_list.append((lock_path, lock)) source_idxes.append(idx) @@ -1644,12 +1654,29 @@ def _retrieve_multi_locked(self, # If wait_to_appear is not empty, then we failed to acquire at least one lock, # which means that another process was downloading the file. So now we just # sit here and wait for all of the missing files to magically show up, or for - # us to time out. If the lock file disappears but the destination file isn't - # present, that means the other process failed in its download. + # us to time out. + # + # In each iteration we also check for stale locks: a stale lock exists when the + # lock file is present but the process that created it has died (the OS released + # the advisory flock, but did not delete the file). We detect a stale lock by + # attempting a non-blocking acquire -- if it succeeds, no live process holds the + # lock, so we steal it and initiate the download ourselves. + # + # Race-condition guarantee: multiple waiting processes may all notice the same + # stale lock and all attempt `lock.acquire(timeout=0)` simultaneously. Because + # the underlying flock(2) call is atomic, exactly one process wins the race; the + # others receive a Timeout and remain in the wait list. The winner downloads the + # file atomically (write to temp path + rename), so the losers will find the + # completed file on their next poll iteration. + # + # If the lock file disappears without the destination file appearing, that means + # the other process failed (or cleaned up after itself on error). timed_out = False while wait_to_appear: new_wait_to_appear = [] - for idx, url, local_path, lock_path in wait_to_appear: + stale_lock_items = [] # Items whose locks we have stolen + + for idx, url, local_path, lock_path, source, sub_path in wait_to_appear: if local_path.is_file(): func_ret[idx] = local_path self._log_debug(f' Downloaded elsewhere: {url}') @@ -1659,18 +1686,67 @@ def _retrieve_multi_locked(self, f'Another process failed to download {url}') self._log_debug(f' Download elsewhere failed: {url}') continue - new_wait_to_appear.append((idx, url, local_path, lock_path)) + # Lock file exists and destination is absent. Check whether the lock is + # stale (flock released by a crashed process but file not cleaned up). + stale_lock = filelock.FileLock(lock_path, timeout=0) + try: + stale_lock.acquire() + # Acquired with timeout=0 → no live process holds this lock. + stale_lock_items.append( + (idx, url, local_path, lock_path, stale_lock, source, sub_path)) + self._log_debug(f' Stale lock detected for {url}, will re-download') + except filelock._error.Timeout: + # Lock is actively held -- another process is still downloading. + new_wait_to_appear.append( + (idx, url, local_path, lock_path, source, sub_path)) + + # Download all files whose stale locks we have just acquired, grouped by + # source so that retrieve_multi can fetch them in parallel. + stale_by_source: dict[str, list[tuple[ + int, str, Path, Path, filelock.FileLock, FileCacheSource, str]]] = {} + for item in stale_lock_items: + pfx = item[5]._src_prefix_ + if pfx not in stale_by_source: + stale_by_source[pfx] = [] + stale_by_source[pfx].append(item) + + for items in stale_by_source.values(): + s_idxes = [it[0] for it in items] + s_urls = [it[1] for it in items] + s_local_paths = [it[2] for it in items] + s_lock_paths = [it[3] for it in items] + s_locks = [it[4] for it in items] + s_source = items[0][5] + s_sub_paths = [it[6] for it in items] + + rets = s_source.retrieve_multi(s_sub_paths, s_local_paths, + preserve_mtime=False, nthreads=nthreads) + for idx, url, ret in zip(s_idxes, s_urls, rets): + func_ret[idx] = ret + if isinstance(ret, Exception): + files_not_exist.append(url) + self._log_debug( + f' Stale lock recovery download failed: ' + f'{url}: {ret!r}') + else: + self._download_counter += 1 + self._log_debug( + f' Re-downloaded after stale lock recovery: {url}') + + for s_lock, s_lock_path in zip(s_locks, s_lock_paths): + s_lock.release() + s_lock_path.unlink(missing_ok=True) if not new_wait_to_appear: break wait_to_appear = new_wait_to_appear - if time.time() - start_time > lock_timeout: + if lock_timeout >= 0 and time.time() - start_time > lock_timeout: exc = TimeoutError( 'Timeout while waiting for another process to finish downloading') self._log_debug( ' Timeout while waiting for another process to finish downloading:') - for idx, url, local_path, lock_path in wait_to_appear: + for idx, url, local_path, lock_path, source, sub_path in wait_to_appear: func_ret[idx] = exc self._log_debug(f' {url}') if exception_on_fail: @@ -1923,14 +1999,14 @@ def _upload_multi(self, pass if exception_on_fail: - exc_str = '' - if files_not_exist: - exc_str += f"File(s) do not exist: {', '.join(files_not_exist)}" - if files_failed: - if exc_str: - exc_str += ' AND ' - exc_str += f"Failed to upload file(s): {', '.join(files_failed)}" - if exc_str: + if files_not_exist and not files_failed: + raise FileNotFoundError( + f"File(s) do not exist: {', '.join(files_not_exist)}") + elif files_failed: + exc_str = f"Failed to upload file(s): {', '.join(files_failed)}" + if files_not_exist: + exc_str = (f"File(s) do not exist: {', '.join(files_not_exist)}" + f' AND {exc_str}') raise FileNotFoundError(exc_str) return cast(list[Union[Path, Exception]], func_ret) diff --git a/filecache/file_cache_path.py b/filecache/file_cache_path.py index 3ce7a1f..3c979c2 100644 --- a/filecache/file_cache_path.py +++ b/filecache/file_cache_path.py @@ -539,7 +539,20 @@ def splitpath(self, search_dir: str) -> tuple[FCPath, ...]: for i, j in zip(indices[:-1], indices[1:])) def __repr__(self) -> str: - return f'FCPath({self._path!r})' + parts = [repr(self._path)] + if self._filecache is not None: + parts.append(f'filecache={self._filecache!r}') + if self._anonymous is not None: + parts.append(f'anonymous={self._anonymous!r}') + if self._lock_timeout is not None: + parts.append(f'lock_timeout={self._lock_timeout!r}') + if self._nthreads is not None: + parts.append(f'nthreads={self._nthreads!r}') + if self._url_to_url is not None: + parts.append(f'url_to_url={self._url_to_url!r}') + if self._url_to_path is not None: + parts.append(f'url_to_path={self._url_to_path!r}') + return f'FCPath({", ".join(parts)})' def __eq__(self, other: object) -> bool: if not isinstance(other, FCPath): diff --git a/filecache/file_cache_source.py b/filecache/file_cache_source.py index 6185e4d..2ad85e3 100644 --- a/filecache/file_cache_source.py +++ b/filecache/file_cache_source.py @@ -72,6 +72,13 @@ def __init__(self, # The _cache_subdir attribute is only used by the FileCache class self._cache_subdir = '' + def __repr__(self) -> str: + return (f'{type(self).__name__}({self._scheme!r}, {self._remote!r}, ' + f'anonymous={self._anonymous!r})') + + def __str__(self) -> str: + return self._src_prefix + @classmethod @abstractmethod def schemes(self) -> tuple[str, ...]: diff --git a/tests/test_file_cache.py b/tests/test_file_cache.py index b1ea126..9cbb73e 100644 --- a/tests/test_file_cache.py +++ b/tests/test_file_cache.py @@ -10,6 +10,7 @@ from pathlib import Path import platform import tempfile +import threading import time import uuid @@ -1055,6 +1056,197 @@ def test_locking_multi_pfx_2(): assert not fc.cache_dir.exists() +@pytest.mark.skipif(platform.system() == 'Windows', + reason='Uses POSIX fcntl for stale-lock simulation') +def test_stale_lock_single(): + """A stale lock (file exists, no flock held) must not block a single-file retrieve. + + We simulate a stale lock by acquiring the flock with raw fcntl, then releasing + only the flock (leaving the lock file on disk) before the retrieve. On the + next call, filelock should acquire the lock immediately because no process + holds it. + """ + import fcntl + + with FileCache(delete_on_exit=True) as fc: + filename = EXPECTED_FILENAMES[0] + local_path = (fc.cache_dir / + (HTTP_TEST_ROOT.replace('https://', 'http_') + '/' + filename)) + lock_path = fc._lock_path(local_path) + lock_path.parent.mkdir(parents=True, exist_ok=True) + + # Create and hold the lock via raw fcntl so we can release the flock without + # deleting the file (simulating a process that crashed). + fd = os.open(str(lock_path), os.O_RDWR | os.O_CREAT, 0o644) + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + + # Release the flock but keep the file -- this is the "stale lock" state. + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + + # The lock file exists, but nobody holds the flock. The retrieve must + # succeed rather than raising a TimeoutError. + path = fc.retrieve(f'{HTTP_TEST_ROOT}/{filename}', lock_timeout=0) + assert isinstance(path, Path) + assert path.is_file() + _compare_to_expected_path(path, filename) + assert fc.download_counter == 1 + assert fc.upload_counter == 0 + + +@pytest.mark.skipif(platform.system() == 'Windows', + reason='Uses POSIX fcntl for stale-lock simulation') +def test_stale_lock_multi(): + """A stale lock must not permanently block multi-file retrieval. + + We hold the flock for the first file from a background thread, trigger the + multi-file download (which puts file[0] into wait_to_appear), then simulate + a crash by releasing only the flock (but not deleting the lock file) while + the wait loop is running. The stale-lock detection code must steal the lock + and complete the download without timing out. + + Race-condition guarantee: the background thread uses a threading.Event to + synchronise so that the flock is released only after the main thread's + retrieve call has entered the wait_to_appear loop. Because only one process + can win the flock race, the main thread is guaranteed to be the sole + downloader of file[0] after stealing the stale lock. + """ + import fcntl + + with FileCache(delete_on_exit=True) as fc: + filename0 = EXPECTED_FILENAMES[0] + local_path0 = (fc.cache_dir / + (HTTP_TEST_ROOT.replace('https://', 'http_') + '/' + filename0)) + lock_path0 = fc._lock_path(local_path0) + lock_path0.parent.mkdir(parents=True, exist_ok=True) + + # Phase-control events + flock_held = threading.Event() # signalled once the raw flock is held + flock_release = threading.Event() # signalled when the "crash" should happen + + def holder_thread() -> None: + """Hold the flock until told to "crash", then release without cleanup.""" + fd = os.open(str(lock_path0), os.O_RDWR | os.O_CREAT, 0o644) + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + flock_held.set() + flock_release.wait() + # Simulate crash: release flock, leave file on disk. + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + + t = threading.Thread(target=holder_thread, daemon=True) + t.start() + assert flock_held.wait(timeout=5), 'holder thread never acquired flock' + + results: list[list[Path]] = [] + exc_holder: list[Exception] = [] + + def do_retrieve() -> None: + try: + filenames = [f'{HTTP_TEST_ROOT}/{f}' for f in EXPECTED_FILENAMES] + ret = fc.retrieve(filenames, lock_timeout=15) + results.append(ret) # type: ignore[arg-type] + except Exception as e: + exc_holder.append(e) + + retrieve_thread = threading.Thread(target=do_retrieve, daemon=True) + retrieve_thread.start() + + # Wait until all non-blocked files have downloaded and retrieve_thread is + # still alive — at that point it must be blocked in the wait_to_appear loop. + deadline = time.monotonic() + 15 + while time.monotonic() < deadline: + if (fc.download_counter == len(EXPECTED_FILENAMES) - 1 and + retrieve_thread.is_alive()): + break + time.sleep(0.01) + else: + pytest.fail('retrieve never reached the blocked/stale-lock state') + + # Signal the holder to "crash" -- releases flock, keeps the lock file. + flock_release.set() + t.join(timeout=5) + assert not t.is_alive(), 'holder thread did not finish' + + retrieve_thread.join(timeout=20) + assert not retrieve_thread.is_alive(), 'retrieve thread timed out (stale lock not recovered)' + + assert not exc_holder, f'retrieve raised: {exc_holder[0]}' + assert results, 'retrieve returned no result' + for r in results[0]: + assert isinstance(r, Path), f'Expected Path, got {r!r}' + assert fc.download_counter == len(EXPECTED_FILENAMES) + assert fc.upload_counter == 0 + + +@pytest.mark.skipif(platform.system() == 'Windows', + reason='Uses POSIX fcntl for stale-lock simulation') +def test_stale_lock_multi_pfx(): + """Same as test_stale_lock_multi but exercised through the FCPath interface.""" + import fcntl + + with FileCache(delete_on_exit=True) as fc: + pfx = fc.new_path(HTTP_TEST_ROOT, lock_timeout=15) + filename0 = EXPECTED_FILENAMES[0] + local_path0 = (fc.cache_dir / + (HTTP_TEST_ROOT.replace('https://', 'http_') + '/' + filename0)) + lock_path0 = fc._lock_path(local_path0) + lock_path0.parent.mkdir(parents=True, exist_ok=True) + + flock_held = threading.Event() + flock_release = threading.Event() + + def holder_thread() -> None: + fd = os.open(str(lock_path0), os.O_RDWR | os.O_CREAT, 0o644) + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + flock_held.set() + flock_release.wait() + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + + t = threading.Thread(target=holder_thread, daemon=True) + t.start() + assert flock_held.wait(timeout=5), 'holder thread never acquired flock' + + results: list[list[Path]] = [] + exc_holder: list[Exception] = [] + + def do_retrieve() -> None: + try: + ret = pfx.retrieve(EXPECTED_FILENAMES) + results.append(ret) # type: ignore[arg-type] + except Exception as e: + exc_holder.append(e) + + retrieve_thread = threading.Thread(target=do_retrieve, daemon=True) + retrieve_thread.start() + + # Wait until all non-blocked files have downloaded and retrieve_thread is + # still alive — at that point it must be blocked in the wait_to_appear loop. + deadline = time.monotonic() + 15 + while time.monotonic() < deadline: + if (fc.download_counter == len(EXPECTED_FILENAMES) - 1 and + retrieve_thread.is_alive()): + break + time.sleep(0.01) + else: + pytest.fail('retrieve never reached the blocked/stale-lock state') + + flock_release.set() + t.join(timeout=5) + assert not t.is_alive() + + retrieve_thread.join(timeout=20) + assert not retrieve_thread.is_alive(), 'retrieve thread timed out (stale lock not recovered)' + + assert not exc_holder, f'retrieve raised: {exc_holder[0]}' + assert results + for r in results[0]: + assert isinstance(r, Path) + assert fc.download_counter == len(EXPECTED_FILENAMES) + assert fc.upload_counter == 0 + + def test_cleanup_locking_bad(): logger = MyLogger() with FileCache(cache_name=None, logger=logger) as fc: @@ -2948,3 +3140,22 @@ def test_modification_time_caching_multi(time_sensitive, cache_metadata, mp_safe assert lp[0].stat().st_mtime == mtime_lp_orig[0] assert lp[1].stat().st_mtime == mtime_lp_orig[1] assert lp[2].stat().st_mtime == mtime_lp_orig[2] + + +def test_filecache_repr_str(): + with FileCache(cache_name=None) as fc: + r = repr(fc) + assert r.startswith('FileCache(None,') + assert 'anonymous=False' in r + assert 'lock_timeout=60' in r + assert 'nthreads=8' in r + assert str(fc) == str(fc.cache_dir) + + # Named cache with non-default options + with FileCache('repr-test', anonymous=True, lock_timeout=30, nthreads=4, + delete_on_exit=True) as fc: + r = repr(fc) + assert r.startswith("FileCache('repr-test',") + assert 'anonymous=True' in r + assert 'lock_timeout=30' in r + assert 'nthreads=4' in r diff --git a/tests/test_file_cache_path.py b/tests/test_file_cache_path.py index 893e767..9018780 100644 --- a/tests/test_file_cache_path.py +++ b/tests/test_file_cache_path.py @@ -1308,3 +1308,37 @@ def test_splitpath(): FCPath('d')) assert FCPath('gs://bucket/a/c/b/b1/c/d/d1').splitpath('c') == \ (FCPath('gs://bucket/a'), FCPath('b/b1'), FCPath('d/d1')) + + +def test_fcpath_repr_str(): + # Default FCPath — only the path + p = FCPath('gs://bucket/path/to/file') + assert repr(p) == "FCPath('gs://bucket/path/to/file')" + assert str(p) == 'gs://bucket/path/to/file' + + # FCPath with extra options — they appear in repr + p2 = FCPath('s3://bucket/path', anonymous=True, lock_timeout=10, nthreads=2) + r = repr(p2) + assert r.startswith('FCPath(') + assert 'anonymous=True' in r + assert 'lock_timeout=10' in r + assert 'nthreads=2' in r + + # filecache kwarg appears in repr when set + with FileCache(cache_name=None, delete_on_exit=True) as fc: + p3 = FCPath('gs://bucket/x', filecache=fc) + r3 = repr(p3) + assert 'filecache=' in r3 + + # url_to_url and url_to_path appear in repr when set + def _identity_url(scheme, remote, path): + return None + + def _identity_path(scheme, remote, path, cache_dir, cache_subdir): + return None + + p4 = FCPath('gs://bucket/y', + url_to_url=_identity_url, url_to_path=_identity_path) + r4 = repr(p4) + assert 'url_to_url=' in r4 + assert 'url_to_path=' in r4 diff --git a/tests/test_file_cache_source.py b/tests/test_file_cache_source.py index 767ddde..301be3e 100644 --- a/tests/test_file_cache_source.py +++ b/tests/test_file_cache_source.py @@ -404,3 +404,32 @@ def mock_rename2(src, dst): pathlib.Path.rename = original_rename finally: FileCacheSourceFake.delete_default_storage_dir() + + +def test_filecachesource_repr_str(): + src_gs = FileCacheSourceGS('gs', 'rms-filecache-tests', anonymous=True) + r = repr(src_gs) + assert r == "FileCacheSourceGS('gs', 'rms-filecache-tests', anonymous=True)" + assert str(src_gs) == 'gs://rms-filecache-tests' + + src_s3 = FileCacheSourceS3('s3', 'rms-filecache-tests', anonymous=True) + r = repr(src_s3) + assert r == "FileCacheSourceS3('s3', 'rms-filecache-tests', anonymous=True)" + assert str(src_s3) == 's3://rms-filecache-tests' + + src_http = FileCacheSourceHTTP('https', 'storage.googleapis.com', + anonymous=False) + r = repr(src_http) + assert r == ("FileCacheSourceHTTP('https', 'storage.googleapis.com', " + "anonymous=False)") + assert str(src_http) == 'https://storage.googleapis.com' + + src_file = FileCacheSourceFile('file', '', anonymous=False) + r = repr(src_file) + assert r == "FileCacheSourceFile('file', '', anonymous=False)" + assert str(src_file) == 'file://' + + src_fake = FileCacheSourceFake('fake', 'fake-bucket', anonymous=False) + r = repr(src_fake) + assert r == "FileCacheSourceFake('fake', 'fake-bucket', anonymous=False)" + assert str(src_fake) == 'fake://fake-bucket'