From 981e0e2f6472d7b754779c64fdcdef0726ed418a Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 11:50:53 +0100 Subject: [PATCH 1/6] Avoid using _df in publish --- audb/core/publish.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/audb/core/publish.py b/audb/core/publish.py index 658f2975..849d3c29 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -84,8 +84,8 @@ def _find_attachments( r"""Find altered, new or removed attachments and update 'deps'.""" # drop removed attachments from dependency table removed_attachments = [ - deps._df.index[deps._df.archive == attachment_id][0] - for attachment_id in deps.attachment_ids + attachment + for attachment, attachment_id in zip(deps.attachments, deps.attachment_ids) if attachment_id not in db.attachments ] deps._drop(removed_attachments) From 9094f78c9f30f6456ed2508aff83a3aea9a655a3 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 11:58:09 +0100 Subject: [PATCH 2/6] Remove pkl cache for deps file --- audb/core/api.py | 18 +++------- audb/core/define.py | 10 ------ audb/core/dependencies.py | 23 +++--------- tests/test_dependencies.py | 71 +------------------------------------- 4 files changed, 10 insertions(+), 112 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index 1e4fc8d2..6aa581c3 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -225,7 +225,6 @@ def cached( if ( define.DEPENDENCY_FILE not in files and define.LEGACY_DEPENDENCY_FILE not in files - and define.CACHED_DEPENDENCY_FILE not in files ): # Skip all cache entries # that don't contain a dependency file @@ -293,28 +292,19 @@ def dependencies( version, cache_root=cache_root, ) - cached_deps_file = os.path.join(db_root, define.CACHED_DEPENDENCY_FILE) + deps_file = os.path.join(db_root, define.DEPENDENCY_FILE) with FolderLock(db_root): try: deps = Dependencies() - deps.load(cached_deps_file) + deps.load(deps_file) except Exception: # does not catch KeyboardInterupt # If loading cached file fails, load again from backend # - # Loading a cache file can fail - # as we use PyArrow data types, - # which when loading from pickle - # are not compatible between all pandas versions. - # We had originally some tests for it, - # but as the actual failure is not that important, - # we removed them in - # See https://github.com/audeering/audb/pull/507 - # backend_interface = utils.lookup_backend(name, version) deps = download_dependencies(backend_interface, name, version, verbose) - # Store as pickle in cache - deps.save(cached_deps_file) + # Store in cache + deps.save(deps_file) return deps diff --git a/audb/core/define.py b/audb/core/define.py index 4dab8d2f..35603c22 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -14,16 +14,6 @@ DEPENDENCY_FILE = f"{DB}.parquet" r"""Filename and extension of dependency table file.""" -CACHED_DEPENDENCY_FILE = f"{DB}.pkl" -r"""Filename and extension of cached dependency table file. - -As loading from a pickle file is still faster -than loading from a parquet file, -we are storing the dependency table -as a pickle file in cache. - -""" - LEGACY_DEPENDENCY_FILE = f"{DB}.csv" r"""Filename and extension of legacy dependency table file. diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 84a94cf3..13615f99 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -307,21 +307,20 @@ def load(self, path: str): Args: path: path to file. File extension can be ``csv`` - ``pkl``, or ``parquet`` Raises: ValueError: if file extension is not one of - ``csv``, ``pkl``, ``parquet`` + ``csv``, ``parquet`` FileNotFoundError: if ``path`` does not exists """ self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) extension = audeer.file_extension(path) - if extension not in ["csv", "pkl", "parquet"]: + if extension not in ["csv", "parquet"]: raise ValueError( - f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' " + f"File extension of 'path' has to be 'csv' or 'parquet' " f"not '{extension}'" ) if not os.path.exists(path): @@ -330,14 +329,7 @@ def load(self, path: str): os.strerror(errno.ENOENT), path, ) - if extension == "pkl": - self._df = pd.read_pickle(path) - # Correct dtypes - # to make backward compatiple - # with old pickle files in cache - self._df = self._set_dtypes(self._df) - - elif extension == "csv": + if extension == "csv": table = csv.read_csv( path, read_options=csv.ReadOptions( @@ -381,7 +373,7 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv``, ``pkl``, or ``parquet`` + File extension can be ``csv`` or ``parquet`` """ path = audeer.path(path) @@ -392,11 +384,6 @@ def save(self, path: str): path, write_options=csv.WriteOptions(quoting_style="none"), ) - elif path.endswith("pkl"): - self._df.to_pickle( - path, - protocol=4, # supported by Python >= 3.4 - ) elif path.endswith("parquet"): table = self._dataframe_to_table(self._df, file_column=True) parquet.write_table(table, path) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 618e7757..d32c0d56 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -250,7 +250,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.pkl", "deps.parquet"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. @@ -268,75 +268,6 @@ def test_load_save(tmpdir, deps, file): assert list(deps2._df.dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) -def test_load_save_backward_compatibility(tmpdir, deps): - """Test backward compatibility with old pickle cache files. - - As the dtype of the index has changed, - we need to make sure this is corrected - when loading old cache files. - - Old behaviour (audb<1.7): - - archive string[python] - bit_depth int32 - channels int32 - checksum string[python] - duration float64 - format string[python] - removed int32 - sampling_rate int32 - type int32 - version string[python] - - New behaviour (audb>=1.7): - - archive string[pyarrow] - bit_depth int32[pyarrow] - channels int32[pyarrow] - checksum string[pyarrow] - duration double[pyarrow] - format string[pyarrow] - removed int32[pyarrow] - sampling_rate int32[pyarrow] - type int32[pyarrow] - version string[pyarrow] - - """ - deps_file = audeer.path(tmpdir, "deps.pkl") - - deps_old = audb.Dependencies() - deps_old._df = deps._df.copy() - - # Change dtype of index from object to string - # to mimic previous behavior - deps_old._df.index = deps_old._df.index.astype("string") - # Change dtype of columns - # to mimic previous behavior - deps_old._df = deps_old._df.astype( - { - "archive": "string", - "bit_depth": "int32", - "channels": "int32", - "checksum": "string", - "duration": "float64", - "format": "string", - "removed": "int32", - "sampling_rate": "int32", - "type": "int32", - "version": "string", - } - ) - deps_old.save(deps_file) - - # Check that we get the correct dtypes, - # when loading from cache - deps2 = audb.Dependencies() - deps2.load(deps_file) - assert deps2._df.index.dtype == audb.core.define.DEPENDENCY_INDEX_DTYPE - pd.testing.assert_frame_equal(deps._df, deps2._df) - assert deps == deps2 - - def test_load_save_errors(deps): """Test possible errors when loading/saving.""" # Wrong file extension From ab75770d47821c44e6c11cd571cc76f10700b5c1 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 13:40:38 +0100 Subject: [PATCH 3/6] Store dependencies as sqlite --- audb/core/define.py | 10 +++- audb/core/dependencies.py | 111 ++++++++++++++++++++++++++++++------ docs/publish.rst | 8 +-- tests/test_dependencies.py | 2 +- tests/test_publish.py | 2 +- tests/test_publish_table.py | 2 +- 6 files changed, 108 insertions(+), 27 deletions(-) diff --git a/audb/core/define.py b/audb/core/define.py index 35603c22..cf99dfd0 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -11,9 +11,17 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCY_FILE = f"{DB}.parquet" +DEPENDENCY_FILE = f"{DB}.sqlite" r"""Filename and extension of dependency table file.""" +PARQUET_DEPENDENCY_FILE = f"{DB}.parquet" +r"""Filename and extension of parquet dependency table file. + +In ``audb`` versions between 1.7.0 and the SQLite migration, +the dependency table was stored in a parquet file. + +""" + LEGACY_DEPENDENCY_FILE = f"{DB}.csv" r"""Filename and extension of legacy dependency table file. diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 13615f99..68aa532e 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -5,6 +5,7 @@ import errno import os import re +import sqlite3 import tempfile import pandas as pd @@ -306,21 +307,21 @@ def load(self, path: str): Args: path: path to file. - File extension can be ``csv`` - or ``parquet`` + File extension can be ``csv``, + ``parquet``, or ``sqlite`` Raises: ValueError: if file extension is not one of - ``csv``, ``parquet`` + ``csv``, ``parquet``, ``sqlite`` FileNotFoundError: if ``path`` does not exists """ self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) extension = audeer.file_extension(path) - if extension not in ["csv", "parquet"]: + if extension not in ["csv", "parquet", "sqlite"]: raise ValueError( - f"File extension of 'path' has to be 'csv' or 'parquet' " + f"File extension of 'path' has to be 'csv', 'parquet', or 'sqlite' " f"not '{extension}'" ) if not os.path.exists(path): @@ -344,6 +345,22 @@ def load(self, path: str): table = parquet.read_table(path) self._df = self._table_to_dataframe(table) + elif extension == "sqlite": + conn = sqlite3.connect(path) + try: + # Read directly into pandas with correct index + self._df = pd.read_sql_query( + "SELECT * FROM dependencies", + conn, + index_col="file", + ) + # Remove index name to match expected format + self._df.index.name = None + # Set correct dtypes + self._df = self._set_dtypes(self._df) + finally: + conn.close() + def removed(self, file: str) -> bool: r"""Check if file is marked as removed. @@ -373,7 +390,7 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv`` or ``parquet`` + File extension can be ``csv``, ``parquet``, or ``sqlite`` """ path = audeer.path(path) @@ -387,6 +404,49 @@ def save(self, path: str): elif path.endswith("parquet"): table = self._dataframe_to_table(self._df, file_column=True) parquet.write_table(table, path) + elif path.endswith("sqlite"): + # Remove existing database file if it exists + if os.path.exists(path): + os.remove(path) + + conn = sqlite3.connect(path) + try: + # Create table with proper schema + conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes for frequently queried columns + conn.execute("CREATE INDEX idx_type ON dependencies(type)") + conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + + # Write dataframe to SQLite + # Reset index to include 'file' as a column + df_to_save = self._df.reset_index() + df_to_save.columns = ["file"] + list(self._df.columns) + df_to_save.to_sql( + "dependencies", + conn, + if_exists="append", + index=False, + ) + + conn.commit() + finally: + conn.close() def type(self, file: str) -> int: r"""Type of file. @@ -792,9 +852,8 @@ def download_dependencies( """ with tempfile.TemporaryDirectory() as tmp_root: - # Load `db.parquet` file, - # or if non-existent `db.zip` - # from backend + # Try to load in order: db.sqlite, db.parquet, db.zip (legacy CSV) + # First, try SQLite (current format) remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) if backend_interface.exists(remote_deps_file, version): local_deps_file = os.path.join(tmp_root, define.DEPENDENCY_FILE) @@ -805,17 +864,31 @@ def download_dependencies( verbose=verbose, ) else: - remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") - local_deps_file = os.path.join( - tmp_root, - define.LEGACY_DEPENDENCY_FILE, - ) - backend_interface.get_archive( - remote_deps_file, - tmp_root, - version, - verbose=verbose, + # Try parquet (previous format) + remote_deps_file = backend_interface.join( + "/", name, define.PARQUET_DEPENDENCY_FILE ) + if backend_interface.exists(remote_deps_file, version): + local_deps_file = os.path.join(tmp_root, define.PARQUET_DEPENDENCY_FILE) + backend_interface.get_file( + remote_deps_file, + local_deps_file, + version, + verbose=verbose, + ) + else: + # Fall back to legacy CSV format + remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") + local_deps_file = os.path.join( + tmp_root, + define.LEGACY_DEPENDENCY_FILE, + ) + backend_interface.get_archive( + remote_deps_file, + tmp_root, + version, + verbose=verbose, + ) # Create deps object from downloaded file deps = Dependencies() deps.load(local_deps_file) diff --git a/docs/publish.rst b/docs/publish.rst index 6509c416..465e7ace 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -126,7 +126,7 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.sqlite db.yaml media/ 1.0.0/ @@ -142,7 +142,7 @@ inside the ``media/`` folder, all tables inside the ``meta/`` folder, the database header in the file ``db.yaml``, and the database dependencies -in the file ``db.parquet``. +in the file ``db.sqlite``. Note, that the structure of the folders used for versioning @@ -247,10 +247,10 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.sqlite db.yaml 1.1.0/ - db.parquet + db.sqlite db.yaml media/ 1.0.0/ diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index d32c0d56..36d3c3af 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -250,7 +250,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.sqlite"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. diff --git a/tests/test_publish.py b/tests/test_publish.py index 15306c71..e3cea2f2 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1044,7 +1044,7 @@ def test_publish_error_cross_repository(tmpdir): audb.publish(db_path_v2, "2.0.0", repo2, previous_version="1.0.0") # Publishing to repo2 with previous_version=None should work - os.remove(audeer.path(db_path, "db.parquet")) + os.remove(audeer.path(db_path, "db.sqlite")) audb.publish(db_path, "2.0.0", repo2, previous_version=None) # Assert that the new version appears in repo2 diff --git a/tests/test_publish_table.py b/tests/test_publish_table.py index 6ad7e3f0..3cece588 100644 --- a/tests/test_publish_table.py +++ b/tests/test_publish_table.py @@ -90,7 +90,7 @@ def assert_db_published_to_repo( """ repo = audeer.path(repository.host, repository.name) - dependency_file = "db.parquet" + dependency_file = "db.sqlite" header_file = "db.yaml" files = list(db.files) tables = list(db) From b3a33d644186ed8002893d9fd7815c3670738d9c Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 14:36:51 +0100 Subject: [PATCH 4/6] Use sqlite for deps --- audb/core/dependencies.py | 469 +++++++++++++++++++++++++++---------- audb/core/load.py | 4 +- tests/test_dependencies.py | 72 ++++-- 3 files changed, 411 insertions(+), 134 deletions(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 68aa532e..5775b6b1 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -44,7 +44,7 @@ class Dependencies: >>> deps = audb.dependencies("emodb", version="1.4.1") >>> # List all files or archives >>> deps.files[:3] - ['db.emotion.csv', 'db.files.csv', 'wav/03a01Fa.wav'] + ['db.emotion.categories.test.gold_standard.csv', 'db.emotion.categories.train.gold_standard.csv', 'db.emotion.csv'] >>> deps.archives[:2] ['005d2b91-5317-0c80-d602-6d55f0323f8c', '014f82d8-3491-fd00-7397-c3b2ac3b2875'] >>> # Access properties for a given file @@ -61,8 +61,38 @@ class Dependencies: """ # noqa: E501 def __init__(self): - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) - self._df = self._set_dtypes(self._df) + # Use in-memory SQLite database instead of pandas DataFrame + # Set check_same_thread=False to allow usage across threads + self._conn = sqlite3.connect(":memory:", check_same_thread=False) + self._db_path = None # Track if connected to a file or in-memory + + # Create the dependencies table + self._conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes for frequently queried columns + self._conn.execute("CREATE INDEX idx_type ON dependencies(type)") + self._conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + self._conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + self._conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" + ) + + self._conn.commit() + # pyarrow schema # used for reading and writing files self._schema = pa.schema( @@ -88,7 +118,16 @@ def __call__(self) -> pd.DataFrame: table with dependencies """ - return self._df + df = pd.read_sql_query( + "SELECT * FROM dependencies", + self._conn, + index_col="file", + ) + # Remove index name to match expected format + df.index.name = None + # Set correct dtypes + df = self._set_dtypes(df) + return df def __contains__(self, file: str) -> bool: r"""Check if file is part of dependencies. @@ -100,7 +139,10 @@ def __contains__(self, file: str) -> bool: ``True`` if a dependency to the file exists """ - return file in self._df.index + cursor = self._conn.execute( + "SELECT 1 FROM dependencies WHERE file = ? LIMIT 1", (file,) + ) + return cursor.fetchone() is not None def __eq__(self, other: "Dependencies") -> bool: r"""Check if two dependency tables are equal. @@ -112,7 +154,8 @@ def __eq__(self, other: "Dependencies") -> bool: ``True`` if both dependency tables have the same entries """ - return self._df.equals(other._df) + # Compare by converting to DataFrames + return self().equals(other()) def __getitem__(self, file: str) -> list: r"""File information. @@ -124,14 +167,117 @@ def __getitem__(self, file: str) -> list: list with meta information """ - return self._df.loc[file].tolist() + cursor = self._conn.execute( + "SELECT archive, bit_depth, channels, checksum, duration, " + "format, removed, sampling_rate, type, version " + "FROM dependencies WHERE file = ?", + (file,), + ) + row = cursor.fetchone() + if row is None: + raise KeyError(file) + return list(row) def __len__(self) -> int: r"""Number of all media, table, attachment files.""" - return len(self._df) + cursor = self._conn.execute("SELECT COUNT(*) FROM dependencies") + return cursor.fetchone()[0] def __str__(self) -> str: # noqa: D105 - return str(self._df) + return str(self()) + + def __del__(self): + """Clean up SQLite connection when object is deleted.""" + if hasattr(self, "_conn") and self._conn: + self._conn.close() + + def __getstate__(self): + """Prepare object for pickling by converting SQLite data to serializable format.""" + # Get all data as a DataFrame + df = self() + # Return the DataFrame and schema for reconstruction + return { + "data": df.to_dict("records"), + "index": df.index.tolist(), + } + + def __setstate__(self, state): + """Restore object from pickled state.""" + # Recreate the SQLite connection + self._conn = sqlite3.connect(":memory:", check_same_thread=False) + self._db_path = None + + # Recreate the table structure + self._conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes + self._conn.execute("CREATE INDEX idx_type ON dependencies(type)") + self._conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + self._conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + self._conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" + ) + + self._conn.commit() + + # Recreate the schema + self._schema = pa.schema( + [ + ("file", pa.string()), + ("archive", pa.string()), + ("bit_depth", pa.int32()), + ("channels", pa.int32()), + ("checksum", pa.string()), + ("duration", pa.float64()), + ("format", pa.string()), + ("removed", pa.int32()), + ("sampling_rate", pa.int32()), + ("type", pa.int32()), + ("version", pa.string()), + ] + ) + + # Restore the data + if state["data"]: + data = state["data"] + index = state["index"] + for i, row in enumerate(data): + file = index[i] + self._conn.execute( + """ + INSERT INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + file, + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + self._conn.commit() @property def archives(self) -> list[str]: @@ -141,7 +287,10 @@ def archives(self) -> list[str]: list of archives """ - return sorted(self._df.archive.unique().tolist()) + cursor = self._conn.execute( + "SELECT DISTINCT archive FROM dependencies ORDER BY archive" + ) + return [row[0] for row in cursor.fetchall()] @property def attachments(self) -> list[str]: @@ -151,9 +300,11 @@ def attachments(self) -> list[str]: list of attachments """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["attachment"],), + ) + return [row[0] for row in cursor.fetchall()] @property def attachment_ids(self) -> list[str]: @@ -163,9 +314,11 @@ def attachment_ids(self) -> list[str]: list of attachment IDs """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].archive.tolist() + cursor = self._conn.execute( + "SELECT archive FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["attachment"],), + ) + return [row[0] for row in cursor.fetchall()] @property def files(self) -> list[str]: @@ -175,7 +328,8 @@ def files(self) -> list[str]: list of files """ - return self._df.index.tolist() + cursor = self._conn.execute("SELECT file FROM dependencies") + return [row[0] for row in cursor.fetchall()] @property def media(self) -> list[str]: @@ -185,9 +339,11 @@ def media(self) -> list[str]: list of media """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["media"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["media"],), + ) + return [row[0] for row in cursor.fetchall()] @property def removed_media(self) -> list[str]: @@ -197,10 +353,11 @@ def removed_media(self) -> list[str]: list of media """ - return self._df[ - (self._df["type"] == define.DEPENDENCY_TYPE["media"]) - & (self._df["removed"] == 1) - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ? AND removed = 1", + (define.DEPENDENCY_TYPE["media"],), + ) + return [row[0] for row in cursor.fetchall()] @property def table_ids(self) -> list[str]: @@ -224,9 +381,11 @@ def tables(self) -> list[str]: list of tables """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["meta"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["meta"],), + ) + return [row[0] for row in cursor.fetchall()] def archive(self, file: str) -> str: r"""Name of archive the file belongs to. @@ -238,7 +397,7 @@ def archive(self, file: str) -> str: archive name """ - return self._df.archive[file] + return self._column_loc("archive", file) def bit_depth(self, file: str) -> int: r"""Bit depth of media file. @@ -316,7 +475,6 @@ def load(self, path: str): FileNotFoundError: if ``path`` does not exists """ - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) extension = audeer.file_extension(path) if extension not in ["csv", "parquet", "sqlite"]: @@ -330,36 +488,48 @@ def load(self, path: str): os.strerror(errno.ENOENT), path, ) - if extension == "csv": - table = csv.read_csv( - path, - read_options=csv.ReadOptions( - column_names=self._schema.names, - skip_rows=1, - ), - convert_options=csv.ConvertOptions(column_types=self._schema), - ) - self._df = self._table_to_dataframe(table) - elif extension == "parquet": - table = parquet.read_table(path) - self._df = self._table_to_dataframe(table) + # Clear existing data + self._conn.execute("DELETE FROM dependencies") + self._conn.commit() - elif extension == "sqlite": - conn = sqlite3.connect(path) + if extension == "sqlite": + # For SQLite files, we can attach and copy the data + self._conn.execute(f"ATTACH DATABASE '{path}' AS source_db") try: - # Read directly into pandas with correct index - self._df = pd.read_sql_query( - "SELECT * FROM dependencies", - conn, - index_col="file", + self._conn.execute( + "INSERT INTO dependencies SELECT * FROM source_db.dependencies" ) - # Remove index name to match expected format - self._df.index.name = None - # Set correct dtypes - self._df = self._set_dtypes(self._df) + self._conn.commit() finally: - conn.close() + self._conn.execute("DETACH DATABASE source_db") + + else: + # For CSV and parquet, load via pandas and insert into SQLite + if extension == "csv": + table = csv.read_csv( + path, + read_options=csv.ReadOptions( + column_names=self._schema.names, + skip_rows=1, + ), + convert_options=csv.ConvertOptions(column_types=self._schema), + ) + df = self._table_to_dataframe(table) + + elif extension == "parquet": + table = parquet.read_table(path) + df = self._table_to_dataframe(table) + + # Insert the dataframe into SQLite + df_to_insert = df.reset_index() + df_to_insert.columns = ["file"] + list(df.columns) + df_to_insert.to_sql( + "dependencies", + self._conn, + if_exists="append", + index=False, + ) def removed(self, file: str) -> bool: r"""Check if file is marked as removed. @@ -395,24 +565,27 @@ def save(self, path: str): """ path = audeer.path(path) if path.endswith("csv"): - table = self._dataframe_to_table(self._df) + df = self() + table = self._dataframe_to_table(df) csv.write_csv( table, path, write_options=csv.WriteOptions(quoting_style="none"), ) elif path.endswith("parquet"): - table = self._dataframe_to_table(self._df, file_column=True) + df = self() + table = self._dataframe_to_table(df, file_column=True) parquet.write_table(table, path) elif path.endswith("sqlite"): # Remove existing database file if it exists if os.path.exists(path): os.remove(path) - conn = sqlite3.connect(path) + # Create a new connection to the file database + file_conn = sqlite3.connect(path) try: # Create table with proper schema - conn.execute(""" + file_conn.execute(""" CREATE TABLE dependencies ( file TEXT PRIMARY KEY, archive TEXT, @@ -429,24 +602,22 @@ def save(self, path: str): """) # Create indexes for frequently queried columns - conn.execute("CREATE INDEX idx_type ON dependencies(type)") - conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") - conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") - - # Write dataframe to SQLite - # Reset index to include 'file' as a column - df_to_save = self._df.reset_index() - df_to_save.columns = ["file"] + list(self._df.columns) - df_to_save.to_sql( - "dependencies", - conn, - if_exists="append", - index=False, + file_conn.execute("CREATE INDEX idx_type ON dependencies(type)") + file_conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + file_conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + file_conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" ) - conn.commit() + # Copy data from in-memory database to file + # Use iterdump to copy all data + for line in self._conn.iterdump(): + if line.startswith("INSERT INTO"): + file_conn.execute(line) + + file_conn.commit() finally: - conn.close() + file_conn.close() def type(self, file: str) -> int: r"""Type of file. @@ -490,18 +661,27 @@ def _add_attachment( """ format = audeer.file_extension(file).lower() - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["attachment"], # type - version, # version - ] + self._conn.execute( + """ + INSERT OR REPLACE INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + file, + archive, + 0, + 0, + checksum, + 0.0, + format, + 0, + 0, + define.DEPENDENCY_TYPE["attachment"], + version, + ), + ) + self._conn.commit() def _add_media( self, @@ -528,12 +708,15 @@ def _add_media( where each tuple holds the values of a new media entry """ - df = pd.DataFrame.from_records( + self._conn.executemany( + """ + INSERT INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df = pd.concat([self._df, df]) + ) + self._conn.commit() def _add_meta( self, @@ -555,18 +738,27 @@ def _add_meta( else: archive = os.path.splitext(file[3:])[0] - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["meta"], # type - version, # version - ] + self._conn.execute( + """ + INSERT OR REPLACE INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + file, + archive, + 0, + 0, + checksum, + 0.0, + format, + 0, + 0, + define.DEPENDENCY_TYPE["meta"], + version, + ), + ) + self._conn.commit() def _column_loc( self, @@ -585,7 +777,13 @@ def _column_loc( scalar value """ - value = self._df.at[file, column] + cursor = self._conn.execute( + f"SELECT {column} FROM dependencies WHERE file = ?", (file,) + ) + row = cursor.fetchone() + if row is None: + raise KeyError(file) + value = row[0] if dtype is not None: value = dtype(value) return value @@ -626,13 +824,15 @@ def _drop(self, files: Sequence[str]): files: relative file paths """ - # self._df.drop is slow, - # see https://stackoverflow.com/a/53394627. - # The solution presented in https://stackoverflow.com/a/53395360 - # self._df = self._df.loc[self._df.index.drop(files)] - # which is claimed to be faster, - # isn't. - self._df = self._df[~self._df.index.isin(files)] + if not files: + return + # Convert to tuple if needed (e.g., if files is a set) + files = tuple(files) if not isinstance(files, (list, tuple)) else files + placeholders = ",".join("?" * len(files)) + self._conn.execute( + f"DELETE FROM dependencies WHERE file IN ({placeholders})", files + ) + self._conn.commit() def _remove(self, file: str): r"""Mark file as removed. @@ -641,7 +841,10 @@ def _remove(self, file: str): file: relative file path """ - self._df.at[file, "removed"] = 1 + self._conn.execute( + "UPDATE dependencies SET removed = 1 WHERE file = ?", (file,) + ) + self._conn.commit() @staticmethod def _set_dtypes(df: pd.DataFrame) -> pd.DataFrame: @@ -710,13 +913,34 @@ def _update_media( values: list of tuples, where each tuple holds the new values for a media entry + Raises: + KeyError: if a file in values does not exist in dependencies + """ - df = pd.DataFrame.from_records( - values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df.loc[df.index] = df + # Check if all files exist before updating + for value in values: + file = value[0] + cursor = self._conn.execute( + "SELECT 1 FROM dependencies WHERE file = ? LIMIT 1", (file,) + ) + if cursor.fetchone() is None: + raise KeyError(file) + + # Update existing entries + self._conn.executemany( + """ + UPDATE dependencies + SET archive = ?, bit_depth = ?, channels = ?, checksum = ?, duration = ?, + format = ?, removed = ?, sampling_rate = ?, type = ?, version = ? + WHERE file = ? + """, + # Reorder tuple to put file at the end + [ + (v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8], v[9], v[10], v[0]) + for v in values + ], + ) + self._conn.commit() def _update_media_version( self, @@ -730,7 +954,16 @@ def _update_media_version( version: version string """ - self._df.loc[files, "version"] = version + if not files: + return + # Convert to tuple if needed (e.g., if files is a set) + files = tuple(files) if not isinstance(files, (list, tuple)) else files + placeholders = ",".join("?" * len(files)) + self._conn.execute( + f"UPDATE dependencies SET version = ? WHERE file IN ({placeholders})", + [version] + list(files), + ) + self._conn.commit() def error_message_missing_object( diff --git a/audb/core/load.py b/audb/core/load.py index bd4aefd5..18fdc4ac 100644 --- a/audb/core/load.py +++ b/audb/core/load.py @@ -471,7 +471,9 @@ def job(archive: str, version: str): tmp_root=db_root_tmp, ) # media files that can be changed to a requested flavor - flavor_files = deps._df[deps._df.sampling_rate != 0].index + # Get files with sampling_rate != 0 (audio files) + df = deps() + flavor_files = df[df.sampling_rate != 0].index for file in files: if os.name == "nt": # pragma: no cover file = file.replace(os.sep, "/") diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 36d3c3af..a8b56f3b 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -66,13 +66,29 @@ def test_get_entries(): ) def deps(): deps = audb.Dependencies() - df = pd.DataFrame.from_records(ROWS) - df.set_index("file", inplace=True) - # Ensure correct dtype - df.index = df.index.astype(audb.core.define.DEPENDENCY_INDEX_DTYPE) - df.index.name = None - df = df.astype(audb.core.define.DEPENDENCY_TABLE) - deps._df = df + # Insert test data directly into SQLite + for row in ROWS: + deps._conn.execute( + """ + INSERT INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + deps._conn.commit() return deps @@ -80,10 +96,10 @@ def test_instantiation(): r"""Test instantiation of audb.Dependencies. During instantiation of ``audb.Dependencies`` - an empty dataframe is created under ``self._df``, + an empty SQLite database is created under ``self._conn``, that stores the dependency table. This test ensures, - that the dataframe + that the database contains the correct column names and data types, and the correct name and data type of its index. @@ -106,9 +122,8 @@ def test_instantiation(): audb.core.define.DEPENDENCY_INDEX_DTYPE ) expected_df = expected_df.astype(audb.core.define.DEPENDENCY_TABLE) - pd.testing.assert_frame_equal(deps._df, expected_df) - assert list(deps._df.columns) == expected_columns df = deps() + pd.testing.assert_frame_equal(df, expected_df) assert list(df.columns) == expected_columns @@ -139,10 +154,36 @@ def test_equals(deps): assert deps != audb.Dependencies() # example table vs. example table assert deps == deps - _deps._df = deps._df.copy() + # Copy data to new Dependencies object + _deps = audb.Dependencies() + for row in ROWS: + _deps._conn.execute( + """ + INSERT INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + _deps._conn.commit() assert deps == _deps # example table vs. different table - _deps._df.loc["db.files.csv", "channels"] = 4 + _deps._conn.execute( + "UPDATE dependencies SET channels = 4 WHERE file = 'db.files.csv'" + ) + _deps._conn.commit() assert deps != _deps @@ -265,7 +306,7 @@ def test_load_save(tmpdir, deps, file): deps2 = audb.Dependencies() deps2.load(deps_file) pd.testing.assert_frame_equal(deps(), deps2()) - assert list(deps2._df.dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) + assert list(deps2().dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) def test_load_save_errors(deps): @@ -325,7 +366,8 @@ def test_str(deps): ) print(str(deps)) assert expected_str.match(str(deps)) - assert expected_str.match(deps._df.to_string()) + # str(deps) now calls __str__ which calls __call__ which returns a DataFrame + assert expected_str.match(deps().to_string()) # === Test hidden methods === From 0e957db3d820122faa8544c59f391927e7c775e8 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 14:50:19 +0100 Subject: [PATCH 5/6] Fix ruff errors --- audb/core/dependencies.py | 36 ++++++++++++++---------------------- tests/test_dependencies.py | 17 +++++++---------- 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 5775b6b1..74e3e2ad 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -19,6 +19,14 @@ from audb.core import define +# SQLITE query variables +DEPENDENCIES = ( + "(file, archive, bit_depth, channels, checksum, duration, format, " + "removed, sampling_rate, type, version)" +) +VALUES = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + + class Dependencies: r"""Dependencies of a database. @@ -192,7 +200,7 @@ def __del__(self): self._conn.close() def __getstate__(self): - """Prepare object for pickling by converting SQLite data to serializable format.""" + """Make object serializable.""" # Get all data as a DataFrame df = self() # Return the DataFrame and schema for reconstruction @@ -202,7 +210,7 @@ def __getstate__(self): } def __setstate__(self, state): - """Restore object from pickled state.""" + """Restore object from serialized state.""" # Recreate the SQLite connection self._conn = sqlite3.connect(":memory:", check_same_thread=False) self._db_path = None @@ -258,11 +266,7 @@ def __setstate__(self, state): for i, row in enumerate(data): file = index[i] self._conn.execute( - """ - INSERT INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( file, row["archive"], @@ -662,11 +666,7 @@ def _add_attachment( format = audeer.file_extension(file).lower() self._conn.execute( - """ - INSERT OR REPLACE INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT OR REPLACE INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( file, archive, @@ -709,11 +709,7 @@ def _add_media( """ self._conn.executemany( - """ - INSERT INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", values, ) self._conn.commit() @@ -739,11 +735,7 @@ def _add_meta( archive = os.path.splitext(file[3:])[0] self._conn.execute( - """ - INSERT OR REPLACE INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT OR REPLACE INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( file, archive, diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index a8b56f3b..81cac116 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -51,6 +51,11 @@ "version": "1.0.0", }, ] +DEPENDENCIES = ( + "(file, archive, bit_depth, channels, checksum, duration, format, " + "removed, sampling_rate, type, version)" +) +VALUES = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" def get_entries(column): @@ -69,11 +74,7 @@ def deps(): # Insert test data directly into SQLite for row in ROWS: deps._conn.execute( - """ - INSERT INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( row["file"], row["archive"], @@ -158,11 +159,7 @@ def test_equals(deps): _deps = audb.Dependencies() for row in ROWS: _deps._conn.execute( - """ - INSERT INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( row["file"], row["archive"], From 2218914b3d83c2847658d4df055406f0c62df624 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 15:53:44 +0100 Subject: [PATCH 6/6] Fix doctest --- docs/publish.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/publish.rst b/docs/publish.rst index 465e7ace..89aae62d 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -228,10 +228,10 @@ and will only publish those. >>> deps() archive bit_depth ... type version -db.age.parquet 0 ... 0 1.1.0 audio/001.wav 436c65ec-1e42-f9de-2708-ecafe07e827e 16 ... 1 1.0.0 audio/002.wav fda7e4d6-f2b2-4cff-cab5-906ef5d57607 16 ... 1 1.0.0 audio/003.wav e26ef45d-bdc1-6153-bdc4-852d83806e4a 16 ... 1 1.0.0 +db.age.parquet 0 ... 0 1.1.0 audio/004.wav ef4d1e81-6488-95cf-a165-604d1e47d575 16 ... 1 1.1.0 [5 rows x 10 columns]