From 4f0747c6f7e8cc80ac5d6829450e1672f22195a8 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Fri, 18 Jul 2025 16:57:30 +0200 Subject: [PATCH] Add .complete file to db root --- audb/core/define.py | 1 + audb/core/load.py | 285 ++++++++++++++++------------ tests/test_complete_file.py | 366 ++++++++++++++++++++++++++++++++++++ 3 files changed, 533 insertions(+), 119 deletions(-) create mode 100644 tests/test_complete_file.py diff --git a/audb/core/define.py b/audb/core/define.py index 4dab8d2f..5352cdcb 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -76,6 +76,7 @@ TIMEOUT = 86400 # 24 h CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions LOCK_FILE = ".lock" +COMPLETE_FILE = ".complete" TIMEOUT_MSG = "Lock could not be acquired. Timeout exceeded." diff --git a/audb/core/load.py b/audb/core/load.py index b4af0567..b9b8bd6e 100644 --- a/audb/core/load.py +++ b/audb/core/load.py @@ -193,10 +193,21 @@ def check() -> bool: ) audeer.rmdir(db_root_tmp) + # Create .complete file to signal completion + complete_file = os.path.join(db_root, define.COMPLETE_FILE) + audeer.touch(complete_file) + def _database_is_complete( db: audformat.Database, ) -> bool: + # First check for .complete file + if "audb" in db.meta and "root" in db.meta["audb"]: + complete_file = os.path.join(db.meta["audb"]["root"], define.COMPLETE_FILE) + if os.path.exists(complete_file): + return True + + # Fallback to checking metadata complete = False if "audb" in db.meta: if "complete" in db.meta["audb"]: @@ -1152,8 +1163,10 @@ def load( ) try: - with FolderLock(db_root, timeout=timeout): - # Start with database header without tables + # Check if database is already complete by looking for .complete file + complete_file = os.path.join(db_root, define.COMPLETE_FILE) + if os.path.exists(complete_file): + # Database is complete, no need to lock db, backend_interface = load_header_to( db_root, name, @@ -1161,55 +1174,37 @@ def load( flavor=flavor, add_audb_meta=True, ) - - db_is_complete = _database_is_complete(db) - - # load attachments - if not db_is_complete and not only_metadata: - # filter attachments - requested_attachments = filter_deps( - attachments, - db.attachments, - "attachment", - ) - - cached_versions = _load_attachments( - requested_attachments, - backend_interface, + db_is_complete = True + else: + # Database is not complete, need to lock + with FolderLock(db_root, timeout=timeout): + # Start with database header without tables + db, backend_interface = load_header_to( db_root, - db, + name, version, - cached_versions, - deps, - flavor, - cache_root, - num_workers, - verbose, + flavor=flavor, + add_audb_meta=True, ) - # filter tables (convert regexp pattern to list of tables) - requested_tables = filter_deps(tables, list(db), "table") - - # add/split into misc tables used in a scheme - # and all other (misc) tables - requested_misc_tables = _misc_tables_used_in_scheme(db) - requested_tables = [ - table - for table in requested_tables - if table not in requested_misc_tables - ] - - # load missing tables - if not db_is_complete: - for _tables in [ - requested_misc_tables, - requested_tables, - ]: - # need to load misc tables used in a scheme first - # as loading is done in parallel - cached_versions = _load_files( - _tables, - "table", + # Double-check completion status after acquiring lock + complete_file = os.path.join(db_root, define.COMPLETE_FILE) + if os.path.exists(complete_file): + db_is_complete = True + else: + db_is_complete = _database_is_complete(db) + + # load attachments + if not db_is_complete and not only_metadata: + # filter attachments + requested_attachments = filter_deps( + attachments, + db.attachments, + "attachment", + ) + + cached_versions = _load_attachments( + requested_attachments, backend_interface, db_root, db, @@ -1218,34 +1213,31 @@ def load( deps, flavor, cache_root, - pickle_tables, num_workers, verbose, ) - requested_tables = requested_misc_tables + requested_tables - - # filter tables - if tables is not None: - db.pick_tables(requested_tables) - - # load tables - for table in requested_tables: - db[table].load(os.path.join(db_root, f"db.{table}")) - - # filter media - requested_media = filter_deps( - media, - db.files, - "media", - name, - version, - ) - # load missing media - if not db_is_complete and not only_metadata: + # filter tables (convert regexp pattern to list of tables) + requested_tables = filter_deps(tables, list(db), "table") + + # add/split into misc tables used in a scheme + # and all other (misc) tables + requested_misc_tables = _misc_tables_used_in_scheme(db) + requested_tables = [ + table for table in requested_tables if table not in requested_misc_tables + ] + + # load missing tables + if not db_is_complete: + for _tables in [ + requested_misc_tables, + requested_tables, + ]: + # need to load misc tables used in a scheme first + # as loading is done in parallel cached_versions = _load_files( - requested_media, - "media", + _tables, + "table", backend_interface, db_root, db, @@ -1254,45 +1246,81 @@ def load( deps, flavor, cache_root, - False, + pickle_tables, num_workers, verbose, ) + requested_tables = requested_misc_tables + requested_tables - # filter media - if media is not None or tables is not None: - db.pick_files(requested_media) + # filter tables + if tables is not None: + db.pick_tables(requested_tables) - if not removed_media: - _remove_media(db, deps, num_workers, verbose) + # load tables + for table in requested_tables: + db[table].load(os.path.join(db_root, f"db.{table}")) - # Adjust full paths and file extensions in tables - _update_path( - db, + # filter media + requested_media = filter_deps( + media, + db.files, + "media", + name, + version, + ) + + # load missing media + if not db_is_complete and not only_metadata: + cached_versions = _load_files( + requested_media, + "media", + backend_interface, db_root, - full_path, - flavor.format, + db, + version, + cached_versions, + deps, + flavor, + cache_root, + False, num_workers, verbose, ) - # set file durations - _files_duration( + # filter media + if media is not None or tables is not None: + db.pick_files(requested_media) + + if not removed_media: + _remove_media(db, deps, num_workers, verbose) + + # Adjust full paths and file extensions in tables + _update_path( + db, + db_root, + full_path, + flavor.format, + num_workers, + verbose, + ) + + # set file durations + _files_duration( + db, + deps, + requested_media, + flavor.format, + ) + + # check if database is now complete + if not db_is_complete: + _database_check_complete( db, + db_root, + flavor, deps, - requested_media, - flavor.format, ) - # check if database is now complete - if not db_is_complete: - _database_check_complete( - db, - db_root, - flavor, - deps, - ) - except filelock.Timeout: utils.timeout_warning() @@ -1592,8 +1620,10 @@ def load_media( raise ValueError(msg) try: - with FolderLock(db_root, timeout=timeout): - # Start with database header without tables + # Check if database is already complete by looking for .complete file + complete_file = os.path.join(db_root, define.COMPLETE_FILE) + if os.path.exists(complete_file): + # Database is complete, no need to lock db, backend_interface = load_header_to( db_root, name, @@ -1601,33 +1631,50 @@ def load_media( flavor=flavor, add_audb_meta=True, ) - - db_is_complete = _database_is_complete(db) - - # load missing media - if not db_is_complete: - _load_files( - media, - "media", - backend_interface, + db_is_complete = True + else: + # Database is not complete, need to lock + with FolderLock(db_root, timeout=timeout): + # Start with database header without tables + db, backend_interface = load_header_to( db_root, - db, + name, version, - None, - deps, - flavor, - cache_root, - False, - num_workers, - verbose, + flavor=flavor, + add_audb_meta=True, ) - if format is not None: - media = [audeer.replace_file_extension(m, format) for m in media] - files = [ - os.path.join(db_root, os.path.normpath(file)) # convert "/" to os.sep - for file in media - ] + # Double-check completion status after acquiring lock + complete_file = os.path.join(db_root, define.COMPLETE_FILE) + if os.path.exists(complete_file): + db_is_complete = True + else: + db_is_complete = _database_is_complete(db) + + # load missing media + if not db_is_complete: + _load_files( + media, + "media", + backend_interface, + db_root, + db, + version, + None, + deps, + flavor, + cache_root, + False, + num_workers, + verbose, + ) + + if format is not None: + media = [audeer.replace_file_extension(m, format) for m in media] + files = [ + os.path.join(db_root, os.path.normpath(file)) # convert "/" to os.sep + for file in media + ] except filelock.Timeout: utils.timeout_warning() diff --git a/tests/test_complete_file.py b/tests/test_complete_file.py new file mode 100644 index 00000000..9cbf2ef8 --- /dev/null +++ b/tests/test_complete_file.py @@ -0,0 +1,366 @@ +import os +import shutil +import tempfile +import threading +import time +from unittest import mock + +import pytest + +import audeer +import audformat + +from audb.core import define +from audb.core.dependencies import Dependencies +from audb.core.flavor import Flavor +from audb.core.load import _database_check_complete +from audb.core.load import _database_is_complete + + +@pytest.fixture +def temp_db_root(): + """Create a temporary directory for database root.""" + db_root = tempfile.mkdtemp() + yield db_root + shutil.rmtree(db_root) + + +@pytest.fixture +def mock_database(): + """Create a mock database for testing.""" + db = audformat.Database(name="test_db") + db.meta["audb"] = { + "root": "/tmp/test_db", + "version": "1.0.0", + "flavor": {}, + "complete": False, + } + return db + + +def test_complete_file_creation(temp_db_root, mock_database): + """Test that .complete file is created when database is complete.""" + # Create a mock dependencies object + deps = mock.Mock(spec=Dependencies) + deps.attachments = [] + deps.tables = [] + deps.media = [] + deps.removed = mock.Mock(return_value=False) + + # Create a mock flavor + flavor = Flavor() + + # Update the database root in metadata + mock_database.meta["audb"]["root"] = temp_db_root + + # Create header file + header_file = os.path.join(temp_db_root, define.HEADER_FILE) + audeer.mkdir(os.path.dirname(header_file)) + mock_database.save(temp_db_root, header_only=True) + + # Call _database_check_complete + _database_check_complete(mock_database, temp_db_root, flavor, deps) + + # Check that .complete file was created + complete_file = os.path.join(temp_db_root, define.COMPLETE_FILE) + assert os.path.exists(complete_file), "Complete file should be created" + + # Check that database is marked complete in metadata + assert mock_database.meta["audb"]["complete"] is True + + +def test_complete_file_detection(): + """Test that _database_is_complete detects .complete file.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a database with .complete file + db = audformat.Database(name="test_db") + db.meta["audb"] = { + "root": temp_dir, + "version": "1.0.0", + "flavor": {}, + "complete": False, # metadata says not complete + } + + # Create .complete file + complete_file = os.path.join(temp_dir, define.COMPLETE_FILE) + audeer.touch(complete_file) + + # _database_is_complete should return True due to .complete file + assert _database_is_complete(db) is True + + +def test_complete_file_fallback_to_metadata(): + """Test that _database_is_complete falls back to metadata when no .complete file.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a database without .complete file + db = audformat.Database(name="test_db") + db.meta["audb"] = { + "root": temp_dir, + "version": "1.0.0", + "flavor": {}, + "complete": True, # metadata says complete + } + + # No .complete file exists, should fall back to metadata + assert _database_is_complete(db) is True + + # Test with incomplete metadata + db.meta["audb"]["complete"] = False + assert _database_is_complete(db) is False + + +def test_complete_file_no_metadata(): + """Test _database_is_complete when no audb metadata exists.""" + db = audformat.Database(name="test_db") + # No audb metadata at all + assert _database_is_complete(db) is False + + +def test_load_function_skips_lock_when_complete(temp_db_root): + """Test that load function skips locking when .complete file exists.""" + # Create .complete file + complete_file = os.path.join(temp_db_root, define.COMPLETE_FILE) + audeer.touch(complete_file) + + # Create a minimal database header + header_file = os.path.join(temp_db_root, define.HEADER_FILE) + audeer.mkdir(os.path.dirname(header_file)) + + db = audformat.Database(name="test_db") + db.meta["audb"] = { + "root": temp_db_root, + "version": "1.0.0", + "flavor": {}, + "complete": True, + } + db.save(temp_db_root, header_only=True) + + # Mock the FolderLock to ensure it's not called + with mock.patch("audb.core.load.FolderLock") as mock_lock: + mock_lock.return_value.__enter__ = mock.Mock(return_value=mock_lock) + mock_lock.return_value.__exit__ = mock.Mock() + + # Mock other dependencies + with mock.patch("audb.core.load.dependencies") as mock_deps: + # Create a proper mock dependencies object + mock_dep_instance = mock.Mock() + mock_dep_instance.return_value = mock.Mock() + mock_dep_instance.return_value.loc = mock.Mock() + mock_dep_instance.return_value.loc.__getitem__ = mock.Mock( + return_value=mock.Mock() + ) + mock_dep_instance.attachments = [] + mock_dep_instance.tables = [] + mock_dep_instance.media = [] + mock_dep_instance.removed = mock.Mock(return_value=False) + mock_deps.return_value = mock_dep_instance + + with mock.patch("audb.core.load.latest_version") as mock_version: + mock_version.return_value = "1.0.0" + + with mock.patch( + "audb.core.load.database_cache_root" + ) as mock_cache_root: + mock_cache_root.return_value = temp_db_root + + with mock.patch("audb.core.load.filter_deps") as mock_filter: + mock_filter.return_value = [] + + with mock.patch( + "audb.core.load._misc_tables_used_in_scheme" + ) as mock_misc: + mock_misc.return_value = [] + + with mock.patch( + "audb.core.load._files_duration" + ) as mock_duration: + mock_duration.return_value = None + + # Import and call load function + from audb.core.load import load + + # This should not call FolderLock + result = load("test_db", version="1.0.0", verbose=False) + + # Verify FolderLock was not called + mock_lock.assert_not_called() + + # Verify result is not None (successful load) + assert result is not None + + +def test_load_function_uses_lock_when_not_complete(temp_db_root): + """Test that load function uses locking when .complete file doesn't exist.""" + # Don't create .complete file + + # Create a minimal database header + header_file = os.path.join(temp_db_root, define.HEADER_FILE) + audeer.mkdir(os.path.dirname(header_file)) + + db = audformat.Database(name="test_db") + db.meta["audb"] = { + "root": temp_db_root, + "version": "1.0.0", + "flavor": {}, + "complete": False, + } + db.save(temp_db_root, header_only=True) + + # Mock the FolderLock to track if it's called + with mock.patch("audb.core.load.FolderLock") as mock_lock: + mock_lock.return_value.__enter__ = mock.Mock(return_value=mock_lock) + mock_lock.return_value.__exit__ = mock.Mock() + + # Mock other dependencies + with mock.patch("audb.core.load.dependencies") as mock_deps: + # Create a proper mock dependencies object + mock_dep_instance = mock.Mock() + mock_dep_instance.return_value = mock.Mock() + mock_dep_instance.return_value.loc = mock.Mock() + mock_dep_instance.return_value.loc.__getitem__ = mock.Mock( + return_value=mock.Mock() + ) + mock_dep_instance.attachments = [] + mock_dep_instance.tables = [] + mock_dep_instance.media = [] + mock_dep_instance.removed = mock.Mock(return_value=False) + mock_deps.return_value = mock_dep_instance + + with mock.patch("audb.core.load.latest_version") as mock_version: + mock_version.return_value = "1.0.0" + + with mock.patch( + "audb.core.load.database_cache_root" + ) as mock_cache_root: + mock_cache_root.return_value = temp_db_root + + with mock.patch("audb.core.load.filter_deps") as mock_filter: + mock_filter.return_value = [] + + with mock.patch( + "audb.core.load._misc_tables_used_in_scheme" + ) as mock_misc: + mock_misc.return_value = [] + + with mock.patch( + "audb.core.load._files_duration" + ) as mock_duration: + mock_duration.return_value = None + + # Import and call load function + from audb.core.load import load + + # This should use FolderLock + load("test_db", version="1.0.0", verbose=False) + + # Verify FolderLock was called + mock_lock.assert_called_once_with( + temp_db_root, timeout=define.TIMEOUT + ) + + +def test_concurrent_completion_race_condition(): + """Test race condition where multiple processes try to complete database.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create database files + db = audformat.Database(name="test_db") + db.meta["audb"] = { + "root": temp_dir, + "version": "1.0.0", + "flavor": {}, + "complete": False, + } + + db.save(temp_dir, header_only=True) + + # Mock dependencies + deps = mock.Mock(spec=Dependencies) + deps.attachments = [] + deps.tables = [] + deps.media = [] + deps.removed = mock.Mock(return_value=False) + + flavor = Flavor() + + # Simulate two processes trying to complete the database + complete_file = os.path.join(temp_dir, define.COMPLETE_FILE) + results = [] + + def complete_database(): + """Simulate database completion.""" + try: + # Create a fresh database instance for each thread + db_instance = audformat.Database(name="test_db") + db_instance.meta["audb"] = { + "root": temp_dir, + "version": "1.0.0", + "flavor": {}, + "complete": False, + } + # Small delay to increase chance of race condition + time.sleep(0.01) + _database_check_complete(db_instance, temp_dir, flavor, deps) + results.append(True) + except Exception as e: + results.append(f"Error: {e}") + + # Start two threads to simulate concurrent completion + thread1 = threading.Thread(target=complete_database) + thread2 = threading.Thread(target=complete_database) + + thread1.start() + thread2.start() + + thread1.join() + thread2.join() + + # Both should succeed or at least not fail catastrophically + assert len(results) == 2 + # At least one should succeed + assert any(result is True for result in results) + + # .complete file should exist + assert os.path.exists(complete_file) + + +def test_complete_file_constant(): + """Test that COMPLETE_FILE constant is properly defined.""" + from audb.core import define + + assert hasattr(define, "COMPLETE_FILE") + assert define.COMPLETE_FILE == ".complete" + + +def test_database_check_complete_no_complete_files(): + """Test _database_check_complete when files are missing.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create database + db = audformat.Database(name="test_db") + db.meta["audb"] = { + "root": temp_dir, + "version": "1.0.0", + "flavor": {}, + "complete": False, + } + + # Create header file + db.save(temp_dir, header_only=True) + + # Mock dependencies with missing files + deps = mock.Mock(spec=Dependencies) + deps.attachments = ["missing_attachment.txt"] + deps.tables = [] + deps.media = [] + deps.removed = mock.Mock(return_value=False) + + flavor = Flavor() + + # Call _database_check_complete - should not mark as complete + _database_check_complete(db, temp_dir, flavor, deps) + + # .complete file should not be created + complete_file = os.path.join(temp_dir, define.COMPLETE_FILE) + assert not os.path.exists(complete_file) + + # Database should not be marked complete + assert db.meta["audb"]["complete"] is False