From 981e0e2f6472d7b754779c64fdcdef0726ed418a Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 11:50:53 +0100 Subject: [PATCH 01/10] Avoid using _df in publish --- audb/core/publish.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/audb/core/publish.py b/audb/core/publish.py index 658f2975..849d3c29 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -84,8 +84,8 @@ def _find_attachments( r"""Find altered, new or removed attachments and update 'deps'.""" # drop removed attachments from dependency table removed_attachments = [ - deps._df.index[deps._df.archive == attachment_id][0] - for attachment_id in deps.attachment_ids + attachment + for attachment, attachment_id in zip(deps.attachments, deps.attachment_ids) if attachment_id not in db.attachments ] deps._drop(removed_attachments) From 9094f78c9f30f6456ed2508aff83a3aea9a655a3 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 11:58:09 +0100 Subject: [PATCH 02/10] Remove pkl cache for deps file --- audb/core/api.py | 18 +++------- audb/core/define.py | 10 ------ audb/core/dependencies.py | 23 +++--------- tests/test_dependencies.py | 71 +------------------------------------- 4 files changed, 10 insertions(+), 112 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index 1e4fc8d2..6aa581c3 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -225,7 +225,6 @@ def cached( if ( define.DEPENDENCY_FILE not in files and define.LEGACY_DEPENDENCY_FILE not in files - and define.CACHED_DEPENDENCY_FILE not in files ): # Skip all cache entries # that don't contain a dependency file @@ -293,28 +292,19 @@ def dependencies( version, cache_root=cache_root, ) - cached_deps_file = os.path.join(db_root, define.CACHED_DEPENDENCY_FILE) + deps_file = os.path.join(db_root, define.DEPENDENCY_FILE) with FolderLock(db_root): try: deps = Dependencies() - deps.load(cached_deps_file) + deps.load(deps_file) except Exception: # does not catch KeyboardInterupt # If loading cached file fails, load again from backend # - # Loading a cache file can fail - # as we use PyArrow data types, - # which when loading from pickle - # are not compatible between all pandas versions. - # We had originally some tests for it, - # but as the actual failure is not that important, - # we removed them in - # See https://github.com/audeering/audb/pull/507 - # backend_interface = utils.lookup_backend(name, version) deps = download_dependencies(backend_interface, name, version, verbose) - # Store as pickle in cache - deps.save(cached_deps_file) + # Store in cache + deps.save(deps_file) return deps diff --git a/audb/core/define.py b/audb/core/define.py index 4dab8d2f..35603c22 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -14,16 +14,6 @@ DEPENDENCY_FILE = f"{DB}.parquet" r"""Filename and extension of dependency table file.""" -CACHED_DEPENDENCY_FILE = f"{DB}.pkl" -r"""Filename and extension of cached dependency table file. - -As loading from a pickle file is still faster -than loading from a parquet file, -we are storing the dependency table -as a pickle file in cache. - -""" - LEGACY_DEPENDENCY_FILE = f"{DB}.csv" r"""Filename and extension of legacy dependency table file. diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 84a94cf3..13615f99 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -307,21 +307,20 @@ def load(self, path: str): Args: path: path to file. File extension can be ``csv`` - ``pkl``, or ``parquet`` Raises: ValueError: if file extension is not one of - ``csv``, ``pkl``, ``parquet`` + ``csv``, ``parquet`` FileNotFoundError: if ``path`` does not exists """ self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) extension = audeer.file_extension(path) - if extension not in ["csv", "pkl", "parquet"]: + if extension not in ["csv", "parquet"]: raise ValueError( - f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' " + f"File extension of 'path' has to be 'csv' or 'parquet' " f"not '{extension}'" ) if not os.path.exists(path): @@ -330,14 +329,7 @@ def load(self, path: str): os.strerror(errno.ENOENT), path, ) - if extension == "pkl": - self._df = pd.read_pickle(path) - # Correct dtypes - # to make backward compatiple - # with old pickle files in cache - self._df = self._set_dtypes(self._df) - - elif extension == "csv": + if extension == "csv": table = csv.read_csv( path, read_options=csv.ReadOptions( @@ -381,7 +373,7 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv``, ``pkl``, or ``parquet`` + File extension can be ``csv`` or ``parquet`` """ path = audeer.path(path) @@ -392,11 +384,6 @@ def save(self, path: str): path, write_options=csv.WriteOptions(quoting_style="none"), ) - elif path.endswith("pkl"): - self._df.to_pickle( - path, - protocol=4, # supported by Python >= 3.4 - ) elif path.endswith("parquet"): table = self._dataframe_to_table(self._df, file_column=True) parquet.write_table(table, path) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 618e7757..d32c0d56 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -250,7 +250,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.pkl", "deps.parquet"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. @@ -268,75 +268,6 @@ def test_load_save(tmpdir, deps, file): assert list(deps2._df.dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) -def test_load_save_backward_compatibility(tmpdir, deps): - """Test backward compatibility with old pickle cache files. - - As the dtype of the index has changed, - we need to make sure this is corrected - when loading old cache files. - - Old behaviour (audb<1.7): - - archive string[python] - bit_depth int32 - channels int32 - checksum string[python] - duration float64 - format string[python] - removed int32 - sampling_rate int32 - type int32 - version string[python] - - New behaviour (audb>=1.7): - - archive string[pyarrow] - bit_depth int32[pyarrow] - channels int32[pyarrow] - checksum string[pyarrow] - duration double[pyarrow] - format string[pyarrow] - removed int32[pyarrow] - sampling_rate int32[pyarrow] - type int32[pyarrow] - version string[pyarrow] - - """ - deps_file = audeer.path(tmpdir, "deps.pkl") - - deps_old = audb.Dependencies() - deps_old._df = deps._df.copy() - - # Change dtype of index from object to string - # to mimic previous behavior - deps_old._df.index = deps_old._df.index.astype("string") - # Change dtype of columns - # to mimic previous behavior - deps_old._df = deps_old._df.astype( - { - "archive": "string", - "bit_depth": "int32", - "channels": "int32", - "checksum": "string", - "duration": "float64", - "format": "string", - "removed": "int32", - "sampling_rate": "int32", - "type": "int32", - "version": "string", - } - ) - deps_old.save(deps_file) - - # Check that we get the correct dtypes, - # when loading from cache - deps2 = audb.Dependencies() - deps2.load(deps_file) - assert deps2._df.index.dtype == audb.core.define.DEPENDENCY_INDEX_DTYPE - pd.testing.assert_frame_equal(deps._df, deps2._df) - assert deps == deps2 - - def test_load_save_errors(deps): """Test possible errors when loading/saving.""" # Wrong file extension From ab75770d47821c44e6c11cd571cc76f10700b5c1 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 13:40:38 +0100 Subject: [PATCH 03/10] Store dependencies as sqlite --- audb/core/define.py | 10 +++- audb/core/dependencies.py | 111 ++++++++++++++++++++++++++++++------ docs/publish.rst | 8 +-- tests/test_dependencies.py | 2 +- tests/test_publish.py | 2 +- tests/test_publish_table.py | 2 +- 6 files changed, 108 insertions(+), 27 deletions(-) diff --git a/audb/core/define.py b/audb/core/define.py index 35603c22..cf99dfd0 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -11,9 +11,17 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCY_FILE = f"{DB}.parquet" +DEPENDENCY_FILE = f"{DB}.sqlite" r"""Filename and extension of dependency table file.""" +PARQUET_DEPENDENCY_FILE = f"{DB}.parquet" +r"""Filename and extension of parquet dependency table file. + +In ``audb`` versions between 1.7.0 and the SQLite migration, +the dependency table was stored in a parquet file. + +""" + LEGACY_DEPENDENCY_FILE = f"{DB}.csv" r"""Filename and extension of legacy dependency table file. diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 13615f99..68aa532e 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -5,6 +5,7 @@ import errno import os import re +import sqlite3 import tempfile import pandas as pd @@ -306,21 +307,21 @@ def load(self, path: str): Args: path: path to file. - File extension can be ``csv`` - or ``parquet`` + File extension can be ``csv``, + ``parquet``, or ``sqlite`` Raises: ValueError: if file extension is not one of - ``csv``, ``parquet`` + ``csv``, ``parquet``, ``sqlite`` FileNotFoundError: if ``path`` does not exists """ self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) extension = audeer.file_extension(path) - if extension not in ["csv", "parquet"]: + if extension not in ["csv", "parquet", "sqlite"]: raise ValueError( - f"File extension of 'path' has to be 'csv' or 'parquet' " + f"File extension of 'path' has to be 'csv', 'parquet', or 'sqlite' " f"not '{extension}'" ) if not os.path.exists(path): @@ -344,6 +345,22 @@ def load(self, path: str): table = parquet.read_table(path) self._df = self._table_to_dataframe(table) + elif extension == "sqlite": + conn = sqlite3.connect(path) + try: + # Read directly into pandas with correct index + self._df = pd.read_sql_query( + "SELECT * FROM dependencies", + conn, + index_col="file", + ) + # Remove index name to match expected format + self._df.index.name = None + # Set correct dtypes + self._df = self._set_dtypes(self._df) + finally: + conn.close() + def removed(self, file: str) -> bool: r"""Check if file is marked as removed. @@ -373,7 +390,7 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv`` or ``parquet`` + File extension can be ``csv``, ``parquet``, or ``sqlite`` """ path = audeer.path(path) @@ -387,6 +404,49 @@ def save(self, path: str): elif path.endswith("parquet"): table = self._dataframe_to_table(self._df, file_column=True) parquet.write_table(table, path) + elif path.endswith("sqlite"): + # Remove existing database file if it exists + if os.path.exists(path): + os.remove(path) + + conn = sqlite3.connect(path) + try: + # Create table with proper schema + conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes for frequently queried columns + conn.execute("CREATE INDEX idx_type ON dependencies(type)") + conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + + # Write dataframe to SQLite + # Reset index to include 'file' as a column + df_to_save = self._df.reset_index() + df_to_save.columns = ["file"] + list(self._df.columns) + df_to_save.to_sql( + "dependencies", + conn, + if_exists="append", + index=False, + ) + + conn.commit() + finally: + conn.close() def type(self, file: str) -> int: r"""Type of file. @@ -792,9 +852,8 @@ def download_dependencies( """ with tempfile.TemporaryDirectory() as tmp_root: - # Load `db.parquet` file, - # or if non-existent `db.zip` - # from backend + # Try to load in order: db.sqlite, db.parquet, db.zip (legacy CSV) + # First, try SQLite (current format) remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) if backend_interface.exists(remote_deps_file, version): local_deps_file = os.path.join(tmp_root, define.DEPENDENCY_FILE) @@ -805,17 +864,31 @@ def download_dependencies( verbose=verbose, ) else: - remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") - local_deps_file = os.path.join( - tmp_root, - define.LEGACY_DEPENDENCY_FILE, - ) - backend_interface.get_archive( - remote_deps_file, - tmp_root, - version, - verbose=verbose, + # Try parquet (previous format) + remote_deps_file = backend_interface.join( + "/", name, define.PARQUET_DEPENDENCY_FILE ) + if backend_interface.exists(remote_deps_file, version): + local_deps_file = os.path.join(tmp_root, define.PARQUET_DEPENDENCY_FILE) + backend_interface.get_file( + remote_deps_file, + local_deps_file, + version, + verbose=verbose, + ) + else: + # Fall back to legacy CSV format + remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") + local_deps_file = os.path.join( + tmp_root, + define.LEGACY_DEPENDENCY_FILE, + ) + backend_interface.get_archive( + remote_deps_file, + tmp_root, + version, + verbose=verbose, + ) # Create deps object from downloaded file deps = Dependencies() deps.load(local_deps_file) diff --git a/docs/publish.rst b/docs/publish.rst index 6509c416..465e7ace 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -126,7 +126,7 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.sqlite db.yaml media/ 1.0.0/ @@ -142,7 +142,7 @@ inside the ``media/`` folder, all tables inside the ``meta/`` folder, the database header in the file ``db.yaml``, and the database dependencies -in the file ``db.parquet``. +in the file ``db.sqlite``. Note, that the structure of the folders used for versioning @@ -247,10 +247,10 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.sqlite db.yaml 1.1.0/ - db.parquet + db.sqlite db.yaml media/ 1.0.0/ diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index d32c0d56..36d3c3af 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -250,7 +250,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.sqlite"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. diff --git a/tests/test_publish.py b/tests/test_publish.py index 15306c71..e3cea2f2 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1044,7 +1044,7 @@ def test_publish_error_cross_repository(tmpdir): audb.publish(db_path_v2, "2.0.0", repo2, previous_version="1.0.0") # Publishing to repo2 with previous_version=None should work - os.remove(audeer.path(db_path, "db.parquet")) + os.remove(audeer.path(db_path, "db.sqlite")) audb.publish(db_path, "2.0.0", repo2, previous_version=None) # Assert that the new version appears in repo2 diff --git a/tests/test_publish_table.py b/tests/test_publish_table.py index 6ad7e3f0..3cece588 100644 --- a/tests/test_publish_table.py +++ b/tests/test_publish_table.py @@ -90,7 +90,7 @@ def assert_db_published_to_repo( """ repo = audeer.path(repository.host, repository.name) - dependency_file = "db.parquet" + dependency_file = "db.sqlite" header_file = "db.yaml" files = list(db.files) tables = list(db) From b3a33d644186ed8002893d9fd7815c3670738d9c Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 14:36:51 +0100 Subject: [PATCH 04/10] Use sqlite for deps --- audb/core/dependencies.py | 469 +++++++++++++++++++++++++++---------- audb/core/load.py | 4 +- tests/test_dependencies.py | 72 ++++-- 3 files changed, 411 insertions(+), 134 deletions(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 68aa532e..5775b6b1 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -44,7 +44,7 @@ class Dependencies: >>> deps = audb.dependencies("emodb", version="1.4.1") >>> # List all files or archives >>> deps.files[:3] - ['db.emotion.csv', 'db.files.csv', 'wav/03a01Fa.wav'] + ['db.emotion.categories.test.gold_standard.csv', 'db.emotion.categories.train.gold_standard.csv', 'db.emotion.csv'] >>> deps.archives[:2] ['005d2b91-5317-0c80-d602-6d55f0323f8c', '014f82d8-3491-fd00-7397-c3b2ac3b2875'] >>> # Access properties for a given file @@ -61,8 +61,38 @@ class Dependencies: """ # noqa: E501 def __init__(self): - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) - self._df = self._set_dtypes(self._df) + # Use in-memory SQLite database instead of pandas DataFrame + # Set check_same_thread=False to allow usage across threads + self._conn = sqlite3.connect(":memory:", check_same_thread=False) + self._db_path = None # Track if connected to a file or in-memory + + # Create the dependencies table + self._conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes for frequently queried columns + self._conn.execute("CREATE INDEX idx_type ON dependencies(type)") + self._conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + self._conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + self._conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" + ) + + self._conn.commit() + # pyarrow schema # used for reading and writing files self._schema = pa.schema( @@ -88,7 +118,16 @@ def __call__(self) -> pd.DataFrame: table with dependencies """ - return self._df + df = pd.read_sql_query( + "SELECT * FROM dependencies", + self._conn, + index_col="file", + ) + # Remove index name to match expected format + df.index.name = None + # Set correct dtypes + df = self._set_dtypes(df) + return df def __contains__(self, file: str) -> bool: r"""Check if file is part of dependencies. @@ -100,7 +139,10 @@ def __contains__(self, file: str) -> bool: ``True`` if a dependency to the file exists """ - return file in self._df.index + cursor = self._conn.execute( + "SELECT 1 FROM dependencies WHERE file = ? LIMIT 1", (file,) + ) + return cursor.fetchone() is not None def __eq__(self, other: "Dependencies") -> bool: r"""Check if two dependency tables are equal. @@ -112,7 +154,8 @@ def __eq__(self, other: "Dependencies") -> bool: ``True`` if both dependency tables have the same entries """ - return self._df.equals(other._df) + # Compare by converting to DataFrames + return self().equals(other()) def __getitem__(self, file: str) -> list: r"""File information. @@ -124,14 +167,117 @@ def __getitem__(self, file: str) -> list: list with meta information """ - return self._df.loc[file].tolist() + cursor = self._conn.execute( + "SELECT archive, bit_depth, channels, checksum, duration, " + "format, removed, sampling_rate, type, version " + "FROM dependencies WHERE file = ?", + (file,), + ) + row = cursor.fetchone() + if row is None: + raise KeyError(file) + return list(row) def __len__(self) -> int: r"""Number of all media, table, attachment files.""" - return len(self._df) + cursor = self._conn.execute("SELECT COUNT(*) FROM dependencies") + return cursor.fetchone()[0] def __str__(self) -> str: # noqa: D105 - return str(self._df) + return str(self()) + + def __del__(self): + """Clean up SQLite connection when object is deleted.""" + if hasattr(self, "_conn") and self._conn: + self._conn.close() + + def __getstate__(self): + """Prepare object for pickling by converting SQLite data to serializable format.""" + # Get all data as a DataFrame + df = self() + # Return the DataFrame and schema for reconstruction + return { + "data": df.to_dict("records"), + "index": df.index.tolist(), + } + + def __setstate__(self, state): + """Restore object from pickled state.""" + # Recreate the SQLite connection + self._conn = sqlite3.connect(":memory:", check_same_thread=False) + self._db_path = None + + # Recreate the table structure + self._conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes + self._conn.execute("CREATE INDEX idx_type ON dependencies(type)") + self._conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + self._conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + self._conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" + ) + + self._conn.commit() + + # Recreate the schema + self._schema = pa.schema( + [ + ("file", pa.string()), + ("archive", pa.string()), + ("bit_depth", pa.int32()), + ("channels", pa.int32()), + ("checksum", pa.string()), + ("duration", pa.float64()), + ("format", pa.string()), + ("removed", pa.int32()), + ("sampling_rate", pa.int32()), + ("type", pa.int32()), + ("version", pa.string()), + ] + ) + + # Restore the data + if state["data"]: + data = state["data"] + index = state["index"] + for i, row in enumerate(data): + file = index[i] + self._conn.execute( + """ + INSERT INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + file, + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + self._conn.commit() @property def archives(self) -> list[str]: @@ -141,7 +287,10 @@ def archives(self) -> list[str]: list of archives """ - return sorted(self._df.archive.unique().tolist()) + cursor = self._conn.execute( + "SELECT DISTINCT archive FROM dependencies ORDER BY archive" + ) + return [row[0] for row in cursor.fetchall()] @property def attachments(self) -> list[str]: @@ -151,9 +300,11 @@ def attachments(self) -> list[str]: list of attachments """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["attachment"],), + ) + return [row[0] for row in cursor.fetchall()] @property def attachment_ids(self) -> list[str]: @@ -163,9 +314,11 @@ def attachment_ids(self) -> list[str]: list of attachment IDs """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].archive.tolist() + cursor = self._conn.execute( + "SELECT archive FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["attachment"],), + ) + return [row[0] for row in cursor.fetchall()] @property def files(self) -> list[str]: @@ -175,7 +328,8 @@ def files(self) -> list[str]: list of files """ - return self._df.index.tolist() + cursor = self._conn.execute("SELECT file FROM dependencies") + return [row[0] for row in cursor.fetchall()] @property def media(self) -> list[str]: @@ -185,9 +339,11 @@ def media(self) -> list[str]: list of media """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["media"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["media"],), + ) + return [row[0] for row in cursor.fetchall()] @property def removed_media(self) -> list[str]: @@ -197,10 +353,11 @@ def removed_media(self) -> list[str]: list of media """ - return self._df[ - (self._df["type"] == define.DEPENDENCY_TYPE["media"]) - & (self._df["removed"] == 1) - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ? AND removed = 1", + (define.DEPENDENCY_TYPE["media"],), + ) + return [row[0] for row in cursor.fetchall()] @property def table_ids(self) -> list[str]: @@ -224,9 +381,11 @@ def tables(self) -> list[str]: list of tables """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["meta"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["meta"],), + ) + return [row[0] for row in cursor.fetchall()] def archive(self, file: str) -> str: r"""Name of archive the file belongs to. @@ -238,7 +397,7 @@ def archive(self, file: str) -> str: archive name """ - return self._df.archive[file] + return self._column_loc("archive", file) def bit_depth(self, file: str) -> int: r"""Bit depth of media file. @@ -316,7 +475,6 @@ def load(self, path: str): FileNotFoundError: if ``path`` does not exists """ - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) extension = audeer.file_extension(path) if extension not in ["csv", "parquet", "sqlite"]: @@ -330,36 +488,48 @@ def load(self, path: str): os.strerror(errno.ENOENT), path, ) - if extension == "csv": - table = csv.read_csv( - path, - read_options=csv.ReadOptions( - column_names=self._schema.names, - skip_rows=1, - ), - convert_options=csv.ConvertOptions(column_types=self._schema), - ) - self._df = self._table_to_dataframe(table) - elif extension == "parquet": - table = parquet.read_table(path) - self._df = self._table_to_dataframe(table) + # Clear existing data + self._conn.execute("DELETE FROM dependencies") + self._conn.commit() - elif extension == "sqlite": - conn = sqlite3.connect(path) + if extension == "sqlite": + # For SQLite files, we can attach and copy the data + self._conn.execute(f"ATTACH DATABASE '{path}' AS source_db") try: - # Read directly into pandas with correct index - self._df = pd.read_sql_query( - "SELECT * FROM dependencies", - conn, - index_col="file", + self._conn.execute( + "INSERT INTO dependencies SELECT * FROM source_db.dependencies" ) - # Remove index name to match expected format - self._df.index.name = None - # Set correct dtypes - self._df = self._set_dtypes(self._df) + self._conn.commit() finally: - conn.close() + self._conn.execute("DETACH DATABASE source_db") + + else: + # For CSV and parquet, load via pandas and insert into SQLite + if extension == "csv": + table = csv.read_csv( + path, + read_options=csv.ReadOptions( + column_names=self._schema.names, + skip_rows=1, + ), + convert_options=csv.ConvertOptions(column_types=self._schema), + ) + df = self._table_to_dataframe(table) + + elif extension == "parquet": + table = parquet.read_table(path) + df = self._table_to_dataframe(table) + + # Insert the dataframe into SQLite + df_to_insert = df.reset_index() + df_to_insert.columns = ["file"] + list(df.columns) + df_to_insert.to_sql( + "dependencies", + self._conn, + if_exists="append", + index=False, + ) def removed(self, file: str) -> bool: r"""Check if file is marked as removed. @@ -395,24 +565,27 @@ def save(self, path: str): """ path = audeer.path(path) if path.endswith("csv"): - table = self._dataframe_to_table(self._df) + df = self() + table = self._dataframe_to_table(df) csv.write_csv( table, path, write_options=csv.WriteOptions(quoting_style="none"), ) elif path.endswith("parquet"): - table = self._dataframe_to_table(self._df, file_column=True) + df = self() + table = self._dataframe_to_table(df, file_column=True) parquet.write_table(table, path) elif path.endswith("sqlite"): # Remove existing database file if it exists if os.path.exists(path): os.remove(path) - conn = sqlite3.connect(path) + # Create a new connection to the file database + file_conn = sqlite3.connect(path) try: # Create table with proper schema - conn.execute(""" + file_conn.execute(""" CREATE TABLE dependencies ( file TEXT PRIMARY KEY, archive TEXT, @@ -429,24 +602,22 @@ def save(self, path: str): """) # Create indexes for frequently queried columns - conn.execute("CREATE INDEX idx_type ON dependencies(type)") - conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") - conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") - - # Write dataframe to SQLite - # Reset index to include 'file' as a column - df_to_save = self._df.reset_index() - df_to_save.columns = ["file"] + list(self._df.columns) - df_to_save.to_sql( - "dependencies", - conn, - if_exists="append", - index=False, + file_conn.execute("CREATE INDEX idx_type ON dependencies(type)") + file_conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + file_conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + file_conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" ) - conn.commit() + # Copy data from in-memory database to file + # Use iterdump to copy all data + for line in self._conn.iterdump(): + if line.startswith("INSERT INTO"): + file_conn.execute(line) + + file_conn.commit() finally: - conn.close() + file_conn.close() def type(self, file: str) -> int: r"""Type of file. @@ -490,18 +661,27 @@ def _add_attachment( """ format = audeer.file_extension(file).lower() - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["attachment"], # type - version, # version - ] + self._conn.execute( + """ + INSERT OR REPLACE INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + file, + archive, + 0, + 0, + checksum, + 0.0, + format, + 0, + 0, + define.DEPENDENCY_TYPE["attachment"], + version, + ), + ) + self._conn.commit() def _add_media( self, @@ -528,12 +708,15 @@ def _add_media( where each tuple holds the values of a new media entry """ - df = pd.DataFrame.from_records( + self._conn.executemany( + """ + INSERT INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df = pd.concat([self._df, df]) + ) + self._conn.commit() def _add_meta( self, @@ -555,18 +738,27 @@ def _add_meta( else: archive = os.path.splitext(file[3:])[0] - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["meta"], # type - version, # version - ] + self._conn.execute( + """ + INSERT OR REPLACE INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + file, + archive, + 0, + 0, + checksum, + 0.0, + format, + 0, + 0, + define.DEPENDENCY_TYPE["meta"], + version, + ), + ) + self._conn.commit() def _column_loc( self, @@ -585,7 +777,13 @@ def _column_loc( scalar value """ - value = self._df.at[file, column] + cursor = self._conn.execute( + f"SELECT {column} FROM dependencies WHERE file = ?", (file,) + ) + row = cursor.fetchone() + if row is None: + raise KeyError(file) + value = row[0] if dtype is not None: value = dtype(value) return value @@ -626,13 +824,15 @@ def _drop(self, files: Sequence[str]): files: relative file paths """ - # self._df.drop is slow, - # see https://stackoverflow.com/a/53394627. - # The solution presented in https://stackoverflow.com/a/53395360 - # self._df = self._df.loc[self._df.index.drop(files)] - # which is claimed to be faster, - # isn't. - self._df = self._df[~self._df.index.isin(files)] + if not files: + return + # Convert to tuple if needed (e.g., if files is a set) + files = tuple(files) if not isinstance(files, (list, tuple)) else files + placeholders = ",".join("?" * len(files)) + self._conn.execute( + f"DELETE FROM dependencies WHERE file IN ({placeholders})", files + ) + self._conn.commit() def _remove(self, file: str): r"""Mark file as removed. @@ -641,7 +841,10 @@ def _remove(self, file: str): file: relative file path """ - self._df.at[file, "removed"] = 1 + self._conn.execute( + "UPDATE dependencies SET removed = 1 WHERE file = ?", (file,) + ) + self._conn.commit() @staticmethod def _set_dtypes(df: pd.DataFrame) -> pd.DataFrame: @@ -710,13 +913,34 @@ def _update_media( values: list of tuples, where each tuple holds the new values for a media entry + Raises: + KeyError: if a file in values does not exist in dependencies + """ - df = pd.DataFrame.from_records( - values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df.loc[df.index] = df + # Check if all files exist before updating + for value in values: + file = value[0] + cursor = self._conn.execute( + "SELECT 1 FROM dependencies WHERE file = ? LIMIT 1", (file,) + ) + if cursor.fetchone() is None: + raise KeyError(file) + + # Update existing entries + self._conn.executemany( + """ + UPDATE dependencies + SET archive = ?, bit_depth = ?, channels = ?, checksum = ?, duration = ?, + format = ?, removed = ?, sampling_rate = ?, type = ?, version = ? + WHERE file = ? + """, + # Reorder tuple to put file at the end + [ + (v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8], v[9], v[10], v[0]) + for v in values + ], + ) + self._conn.commit() def _update_media_version( self, @@ -730,7 +954,16 @@ def _update_media_version( version: version string """ - self._df.loc[files, "version"] = version + if not files: + return + # Convert to tuple if needed (e.g., if files is a set) + files = tuple(files) if not isinstance(files, (list, tuple)) else files + placeholders = ",".join("?" * len(files)) + self._conn.execute( + f"UPDATE dependencies SET version = ? WHERE file IN ({placeholders})", + [version] + list(files), + ) + self._conn.commit() def error_message_missing_object( diff --git a/audb/core/load.py b/audb/core/load.py index bd4aefd5..18fdc4ac 100644 --- a/audb/core/load.py +++ b/audb/core/load.py @@ -471,7 +471,9 @@ def job(archive: str, version: str): tmp_root=db_root_tmp, ) # media files that can be changed to a requested flavor - flavor_files = deps._df[deps._df.sampling_rate != 0].index + # Get files with sampling_rate != 0 (audio files) + df = deps() + flavor_files = df[df.sampling_rate != 0].index for file in files: if os.name == "nt": # pragma: no cover file = file.replace(os.sep, "/") diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 36d3c3af..a8b56f3b 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -66,13 +66,29 @@ def test_get_entries(): ) def deps(): deps = audb.Dependencies() - df = pd.DataFrame.from_records(ROWS) - df.set_index("file", inplace=True) - # Ensure correct dtype - df.index = df.index.astype(audb.core.define.DEPENDENCY_INDEX_DTYPE) - df.index.name = None - df = df.astype(audb.core.define.DEPENDENCY_TABLE) - deps._df = df + # Insert test data directly into SQLite + for row in ROWS: + deps._conn.execute( + """ + INSERT INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + deps._conn.commit() return deps @@ -80,10 +96,10 @@ def test_instantiation(): r"""Test instantiation of audb.Dependencies. During instantiation of ``audb.Dependencies`` - an empty dataframe is created under ``self._df``, + an empty SQLite database is created under ``self._conn``, that stores the dependency table. This test ensures, - that the dataframe + that the database contains the correct column names and data types, and the correct name and data type of its index. @@ -106,9 +122,8 @@ def test_instantiation(): audb.core.define.DEPENDENCY_INDEX_DTYPE ) expected_df = expected_df.astype(audb.core.define.DEPENDENCY_TABLE) - pd.testing.assert_frame_equal(deps._df, expected_df) - assert list(deps._df.columns) == expected_columns df = deps() + pd.testing.assert_frame_equal(df, expected_df) assert list(df.columns) == expected_columns @@ -139,10 +154,36 @@ def test_equals(deps): assert deps != audb.Dependencies() # example table vs. example table assert deps == deps - _deps._df = deps._df.copy() + # Copy data to new Dependencies object + _deps = audb.Dependencies() + for row in ROWS: + _deps._conn.execute( + """ + INSERT INTO dependencies + (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + _deps._conn.commit() assert deps == _deps # example table vs. different table - _deps._df.loc["db.files.csv", "channels"] = 4 + _deps._conn.execute( + "UPDATE dependencies SET channels = 4 WHERE file = 'db.files.csv'" + ) + _deps._conn.commit() assert deps != _deps @@ -265,7 +306,7 @@ def test_load_save(tmpdir, deps, file): deps2 = audb.Dependencies() deps2.load(deps_file) pd.testing.assert_frame_equal(deps(), deps2()) - assert list(deps2._df.dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) + assert list(deps2().dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) def test_load_save_errors(deps): @@ -325,7 +366,8 @@ def test_str(deps): ) print(str(deps)) assert expected_str.match(str(deps)) - assert expected_str.match(deps._df.to_string()) + # str(deps) now calls __str__ which calls __call__ which returns a DataFrame + assert expected_str.match(deps().to_string()) # === Test hidden methods === From 0e957db3d820122faa8544c59f391927e7c775e8 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 14:50:19 +0100 Subject: [PATCH 05/10] Fix ruff errors --- audb/core/dependencies.py | 36 ++++++++++++++---------------------- tests/test_dependencies.py | 17 +++++++---------- 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 5775b6b1..74e3e2ad 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -19,6 +19,14 @@ from audb.core import define +# SQLITE query variables +DEPENDENCIES = ( + "(file, archive, bit_depth, channels, checksum, duration, format, " + "removed, sampling_rate, type, version)" +) +VALUES = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + + class Dependencies: r"""Dependencies of a database. @@ -192,7 +200,7 @@ def __del__(self): self._conn.close() def __getstate__(self): - """Prepare object for pickling by converting SQLite data to serializable format.""" + """Make object serializable.""" # Get all data as a DataFrame df = self() # Return the DataFrame and schema for reconstruction @@ -202,7 +210,7 @@ def __getstate__(self): } def __setstate__(self, state): - """Restore object from pickled state.""" + """Restore object from serialized state.""" # Recreate the SQLite connection self._conn = sqlite3.connect(":memory:", check_same_thread=False) self._db_path = None @@ -258,11 +266,7 @@ def __setstate__(self, state): for i, row in enumerate(data): file = index[i] self._conn.execute( - """ - INSERT INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( file, row["archive"], @@ -662,11 +666,7 @@ def _add_attachment( format = audeer.file_extension(file).lower() self._conn.execute( - """ - INSERT OR REPLACE INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT OR REPLACE INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( file, archive, @@ -709,11 +709,7 @@ def _add_media( """ self._conn.executemany( - """ - INSERT INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", values, ) self._conn.commit() @@ -739,11 +735,7 @@ def _add_meta( archive = os.path.splitext(file[3:])[0] self._conn.execute( - """ - INSERT OR REPLACE INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT OR REPLACE INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( file, archive, diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index a8b56f3b..81cac116 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -51,6 +51,11 @@ "version": "1.0.0", }, ] +DEPENDENCIES = ( + "(file, archive, bit_depth, channels, checksum, duration, format, " + "removed, sampling_rate, type, version)" +) +VALUES = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" def get_entries(column): @@ -69,11 +74,7 @@ def deps(): # Insert test data directly into SQLite for row in ROWS: deps._conn.execute( - """ - INSERT INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( row["file"], row["archive"], @@ -158,11 +159,7 @@ def test_equals(deps): _deps = audb.Dependencies() for row in ROWS: _deps._conn.execute( - """ - INSERT INTO dependencies - (file, archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", ( row["file"], row["archive"], From 2218914b3d83c2847658d4df055406f0c62df624 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 30 Dec 2025 15:53:44 +0100 Subject: [PATCH 06/10] Fix doctest --- docs/publish.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/publish.rst b/docs/publish.rst index 465e7ace..89aae62d 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -228,10 +228,10 @@ and will only publish those. >>> deps() archive bit_depth ... type version -db.age.parquet 0 ... 0 1.1.0 audio/001.wav 436c65ec-1e42-f9de-2708-ecafe07e827e 16 ... 1 1.0.0 audio/002.wav fda7e4d6-f2b2-4cff-cab5-906ef5d57607 16 ... 1 1.0.0 audio/003.wav e26ef45d-bdc1-6153-bdc4-852d83806e4a 16 ... 1 1.0.0 +db.age.parquet 0 ... 0 1.1.0 audio/004.wav ef4d1e81-6488-95cf-a165-604d1e47d575 16 ... 1 1.1.0 [5 rows x 10 columns] From 02cb716c1095e2129976929eefd6ff9af87e848a Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Fri, 2 Jan 2026 17:09:49 +0100 Subject: [PATCH 07/10] Use pylance for dependency tables --- audb/core/define.py | 5 +- audb/core/dependencies.py | 654 +++++++++++++++++++------------------ pyproject.toml | 1 + tests/test_dependencies.py | 102 +++--- 4 files changed, 393 insertions(+), 369 deletions(-) diff --git a/audb/core/define.py b/audb/core/define.py index cf99dfd0..d75032f8 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -11,14 +11,13 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCY_FILE = f"{DB}.sqlite" +DEPENDENCY_FILE = f"{DB}.lance" r"""Filename and extension of dependency table file.""" PARQUET_DEPENDENCY_FILE = f"{DB}.parquet" r"""Filename and extension of parquet dependency table file. -In ``audb`` versions between 1.7.0 and the SQLite migration, -the dependency table was stored in a parquet file. +Used as a backward compatible format for loading older databases. """ diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 74e3e2ad..4e7192de 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -5,11 +5,13 @@ import errno import os import re -import sqlite3 import tempfile +from lance.file import LanceFileReader +from lance.file import LanceFileWriter import pandas as pd import pyarrow as pa +import pyarrow.compute as pc import pyarrow.csv as csv import pyarrow.parquet as parquet @@ -19,14 +21,6 @@ from audb.core import define -# SQLITE query variables -DEPENDENCIES = ( - "(file, archive, bit_depth, channels, checksum, duration, format, " - "removed, sampling_rate, type, version)" -) -VALUES = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" - - class Dependencies: r"""Dependencies of a database. @@ -69,40 +63,8 @@ class Dependencies: """ # noqa: E501 def __init__(self): - # Use in-memory SQLite database instead of pandas DataFrame - # Set check_same_thread=False to allow usage across threads - self._conn = sqlite3.connect(":memory:", check_same_thread=False) - self._db_path = None # Track if connected to a file or in-memory - - # Create the dependencies table - self._conn.execute(""" - CREATE TABLE dependencies ( - file TEXT PRIMARY KEY, - archive TEXT, - bit_depth INTEGER, - channels INTEGER, - checksum TEXT, - duration REAL, - format TEXT, - removed INTEGER, - sampling_rate INTEGER, - type INTEGER, - version TEXT - ) - """) - - # Create indexes for frequently queried columns - self._conn.execute("CREATE INDEX idx_type ON dependencies(type)") - self._conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") - self._conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") - self._conn.execute( - "CREATE INDEX idx_type_removed ON dependencies(type, removed)" - ) - - self._conn.commit() - - # pyarrow schema - # used for reading and writing files + # Store dependencies as an in-memory PyArrow table + # Initialize with empty table self._schema = pa.schema( [ ("file", pa.string()), @@ -119,6 +81,16 @@ def __init__(self): ] ) + # Create empty table with the schema + self._table = pa.table( + {field.name: pa.array([], type=field.type) for field in self._schema}, + schema=self._schema, + ) + + # Create indices for fast lookups + # We'll use dictionaries to cache file->row_index mappings + self._file_index = {} + def __call__(self) -> pd.DataFrame: r"""Return dependencies as a table. @@ -126,15 +98,7 @@ def __call__(self) -> pd.DataFrame: table with dependencies """ - df = pd.read_sql_query( - "SELECT * FROM dependencies", - self._conn, - index_col="file", - ) - # Remove index name to match expected format - df.index.name = None - # Set correct dtypes - df = self._set_dtypes(df) + df = self._table_to_dataframe(self._table) return df def __contains__(self, file: str) -> bool: @@ -147,10 +111,7 @@ def __contains__(self, file: str) -> bool: ``True`` if a dependency to the file exists """ - cursor = self._conn.execute( - "SELECT 1 FROM dependencies WHERE file = ? LIMIT 1", (file,) - ) - return cursor.fetchone() is not None + return file in self._file_index def __eq__(self, other: "Dependencies") -> bool: r"""Check if two dependency tables are equal. @@ -175,35 +136,38 @@ def __getitem__(self, file: str) -> list: list with meta information """ - cursor = self._conn.execute( - "SELECT archive, bit_depth, channels, checksum, duration, " - "format, removed, sampling_rate, type, version " - "FROM dependencies WHERE file = ?", - (file,), - ) - row = cursor.fetchone() - if row is None: + if file not in self._file_index: raise KeyError(file) - return list(row) + + row_idx = self._file_index[file] + row = self._table.slice(row_idx, 1) + + # Return all columns except 'file' as a list + return [ + row["archive"][0].as_py(), + row["bit_depth"][0].as_py(), + row["channels"][0].as_py(), + row["checksum"][0].as_py(), + row["duration"][0].as_py(), + row["format"][0].as_py(), + row["removed"][0].as_py(), + row["sampling_rate"][0].as_py(), + row["type"][0].as_py(), + row["version"][0].as_py(), + ] def __len__(self) -> int: r"""Number of all media, table, attachment files.""" - cursor = self._conn.execute("SELECT COUNT(*) FROM dependencies") - return cursor.fetchone()[0] + return len(self._table) def __str__(self) -> str: # noqa: D105 return str(self()) - def __del__(self): - """Clean up SQLite connection when object is deleted.""" - if hasattr(self, "_conn") and self._conn: - self._conn.close() - def __getstate__(self): """Make object serializable.""" - # Get all data as a DataFrame + # Get all data as a DataFrame for serialization df = self() - # Return the DataFrame and schema for reconstruction + # Return the DataFrame data for reconstruction return { "data": df.to_dict("records"), "index": df.index.tolist(), @@ -211,37 +175,6 @@ def __getstate__(self): def __setstate__(self, state): """Restore object from serialized state.""" - # Recreate the SQLite connection - self._conn = sqlite3.connect(":memory:", check_same_thread=False) - self._db_path = None - - # Recreate the table structure - self._conn.execute(""" - CREATE TABLE dependencies ( - file TEXT PRIMARY KEY, - archive TEXT, - bit_depth INTEGER, - channels INTEGER, - checksum TEXT, - duration REAL, - format TEXT, - removed INTEGER, - sampling_rate INTEGER, - type INTEGER, - version TEXT - ) - """) - - # Create indexes - self._conn.execute("CREATE INDEX idx_type ON dependencies(type)") - self._conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") - self._conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") - self._conn.execute( - "CREATE INDEX idx_type_removed ON dependencies(type, removed)" - ) - - self._conn.commit() - # Recreate the schema self._schema = pa.schema( [ @@ -259,29 +192,51 @@ def __setstate__(self, state): ] ) - # Restore the data + # Restore the data from serialized state if state["data"]: data = state["data"] index = state["index"] - for i, row in enumerate(data): - file = index[i] - self._conn.execute( - f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", - ( - file, - row["archive"], - row["bit_depth"], - row["channels"], - row["checksum"], - row["duration"], - row["format"], - row["removed"], - row["sampling_rate"], - row["type"], - row["version"], - ), - ) - self._conn.commit() + + # Build lists for each column + files = index + archives = [row["archive"] for row in data] + bit_depths = [row["bit_depth"] for row in data] + channels = [row["channels"] for row in data] + checksums = [row["checksum"] for row in data] + durations = [row["duration"] for row in data] + formats = [row["format"] for row in data] + removed = [row["removed"] for row in data] + sampling_rates = [row["sampling_rate"] for row in data] + types = [row["type"] for row in data] + versions = [row["version"] for row in data] + + # Create PyArrow table + self._table = pa.table( + { + "file": pa.array(files, type=pa.string()), + "archive": pa.array(archives, type=pa.string()), + "bit_depth": pa.array(bit_depths, type=pa.int32()), + "channels": pa.array(channels, type=pa.int32()), + "checksum": pa.array(checksums, type=pa.string()), + "duration": pa.array(durations, type=pa.float64()), + "format": pa.array(formats, type=pa.string()), + "removed": pa.array(removed, type=pa.int32()), + "sampling_rate": pa.array(sampling_rates, type=pa.int32()), + "type": pa.array(types, type=pa.int32()), + "version": pa.array(versions, type=pa.string()), + }, + schema=self._schema, + ) + + # Rebuild file index + self._file_index = {file: idx for idx, file in enumerate(files)} + else: + # Create empty table + self._table = pa.table( + {field.name: pa.array([], type=field.type) for field in self._schema}, + schema=self._schema, + ) + self._file_index = {} @property def archives(self) -> list[str]: @@ -291,10 +246,10 @@ def archives(self) -> list[str]: list of archives """ - cursor = self._conn.execute( - "SELECT DISTINCT archive FROM dependencies ORDER BY archive" - ) - return [row[0] for row in cursor.fetchall()] + if len(self._table) == 0: + return [] + archives = pc.unique(self._table["archive"]) + return sorted([a.as_py() for a in archives]) @property def attachments(self) -> list[str]: @@ -304,11 +259,9 @@ def attachments(self) -> list[str]: list of attachments """ - cursor = self._conn.execute( - "SELECT file FROM dependencies WHERE type = ?", - (define.DEPENDENCY_TYPE["attachment"],), - ) - return [row[0] for row in cursor.fetchall()] + mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["attachment"]) + filtered = self._table.filter(mask) + return [f.as_py() for f in filtered["file"]] @property def attachment_ids(self) -> list[str]: @@ -318,11 +271,9 @@ def attachment_ids(self) -> list[str]: list of attachment IDs """ - cursor = self._conn.execute( - "SELECT archive FROM dependencies WHERE type = ?", - (define.DEPENDENCY_TYPE["attachment"],), - ) - return [row[0] for row in cursor.fetchall()] + mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["attachment"]) + filtered = self._table.filter(mask) + return [a.as_py() for a in filtered["archive"]] @property def files(self) -> list[str]: @@ -332,8 +283,7 @@ def files(self) -> list[str]: list of files """ - cursor = self._conn.execute("SELECT file FROM dependencies") - return [row[0] for row in cursor.fetchall()] + return [f.as_py() for f in self._table["file"]] @property def media(self) -> list[str]: @@ -343,11 +293,9 @@ def media(self) -> list[str]: list of media """ - cursor = self._conn.execute( - "SELECT file FROM dependencies WHERE type = ?", - (define.DEPENDENCY_TYPE["media"],), - ) - return [row[0] for row in cursor.fetchall()] + mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["media"]) + filtered = self._table.filter(mask) + return [f.as_py() for f in filtered["file"]] @property def removed_media(self) -> list[str]: @@ -357,11 +305,11 @@ def removed_media(self) -> list[str]: list of media """ - cursor = self._conn.execute( - "SELECT file FROM dependencies WHERE type = ? AND removed = 1", - (define.DEPENDENCY_TYPE["media"],), - ) - return [row[0] for row in cursor.fetchall()] + type_mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["media"]) + removed_mask = pc.equal(self._table["removed"], 1) + combined_mask = pc.and_(type_mask, removed_mask) + filtered = self._table.filter(combined_mask) + return [f.as_py() for f in filtered["file"]] @property def table_ids(self) -> list[str]: @@ -385,11 +333,9 @@ def tables(self) -> list[str]: list of tables """ - cursor = self._conn.execute( - "SELECT file FROM dependencies WHERE type = ?", - (define.DEPENDENCY_TYPE["meta"],), - ) - return [row[0] for row in cursor.fetchall()] + mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["meta"]) + filtered = self._table.filter(mask) + return [f.as_py() for f in filtered["file"]] def archive(self, file: str) -> str: r"""Name of archive the file belongs to. @@ -471,19 +417,19 @@ def load(self, path: str): Args: path: path to file. File extension can be ``csv``, - ``parquet``, or ``sqlite`` + ``parquet``, or ``lance`` Raises: ValueError: if file extension is not one of - ``csv``, ``parquet``, ``sqlite`` + ``csv``, ``parquet``, ``lance`` FileNotFoundError: if ``path`` does not exists """ path = audeer.path(path) extension = audeer.file_extension(path) - if extension not in ["csv", "parquet", "sqlite"]: + if extension not in ["csv", "parquet", "lance"]: raise ValueError( - f"File extension of 'path' has to be 'csv', 'parquet', or 'sqlite' " + f"File extension of 'path' has to be 'csv', 'parquet', or 'lance' " f"not '{extension}'" ) if not os.path.exists(path): @@ -493,47 +439,40 @@ def load(self, path: str): path, ) - # Clear existing data - self._conn.execute("DELETE FROM dependencies") - self._conn.commit() + # Load data based on format + if extension == "lance": + # Read from Lance file + reader = LanceFileReader(path) + results = reader.read_all() + # Convert ReaderResults to PyArrow table + table = results.to_table() + # Note: LanceFileReader doesn't need explicit close + + elif extension == "csv": + # Read from CSV file + # The CSV writer creates a duplicate header (known issue), so skip the first data row + # and use the column names from the header + table = csv.read_csv( + path, + read_options=csv.ReadOptions( + skip_rows=1, # Skip the duplicate header row + autogenerate_column_names=False, + ), + ) + # Rename the empty column to "file" + columns = table.column_names + columns = ["file" if c == "" else c for c in columns] + table = table.rename_columns(columns) + # Ensure correct schema types + table = table.cast(self._schema) - if extension == "sqlite": - # For SQLite files, we can attach and copy the data - self._conn.execute(f"ATTACH DATABASE '{path}' AS source_db") - try: - self._conn.execute( - "INSERT INTO dependencies SELECT * FROM source_db.dependencies" - ) - self._conn.commit() - finally: - self._conn.execute("DETACH DATABASE source_db") + elif extension == "parquet": + # Read from Parquet file + table = parquet.read_table(path) - else: - # For CSV and parquet, load via pandas and insert into SQLite - if extension == "csv": - table = csv.read_csv( - path, - read_options=csv.ReadOptions( - column_names=self._schema.names, - skip_rows=1, - ), - convert_options=csv.ConvertOptions(column_types=self._schema), - ) - df = self._table_to_dataframe(table) - - elif extension == "parquet": - table = parquet.read_table(path) - df = self._table_to_dataframe(table) - - # Insert the dataframe into SQLite - df_to_insert = df.reset_index() - df_to_insert.columns = ["file"] + list(df.columns) - df_to_insert.to_sql( - "dependencies", - self._conn, - if_exists="append", - index=False, - ) + # Set the table and rebuild index + self._table = table + self._rebuild_index() def removed(self, file: str) -> bool: r"""Check if file is marked as removed. @@ -564,11 +503,12 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv``, ``parquet``, or ``sqlite`` + File extension can be ``csv``, ``parquet``, or ``lance`` """ path = audeer.path(path) if path.endswith("csv"): + # Write to CSV df = self() table = self._dataframe_to_table(df) csv.write_csv( @@ -577,51 +517,19 @@ def save(self, path: str): write_options=csv.WriteOptions(quoting_style="none"), ) elif path.endswith("parquet"): + # Write to Parquet df = self() table = self._dataframe_to_table(df, file_column=True) parquet.write_table(table, path) - elif path.endswith("sqlite"): - # Remove existing database file if it exists + elif path.endswith("lance"): + # Write to Lance file + # Remove existing file if it exists if os.path.exists(path): os.remove(path) - # Create a new connection to the file database - file_conn = sqlite3.connect(path) - try: - # Create table with proper schema - file_conn.execute(""" - CREATE TABLE dependencies ( - file TEXT PRIMARY KEY, - archive TEXT, - bit_depth INTEGER, - channels INTEGER, - checksum TEXT, - duration REAL, - format TEXT, - removed INTEGER, - sampling_rate INTEGER, - type INTEGER, - version TEXT - ) - """) - - # Create indexes for frequently queried columns - file_conn.execute("CREATE INDEX idx_type ON dependencies(type)") - file_conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") - file_conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") - file_conn.execute( - "CREATE INDEX idx_type_removed ON dependencies(type, removed)" - ) - - # Copy data from in-memory database to file - # Use iterdump to copy all data - for line in self._conn.iterdump(): - if line.startswith("INSERT INTO"): - file_conn.execute(line) - - file_conn.commit() - finally: - file_conn.close() + # Create a new Lance file + with LanceFileWriter(path) as writer: + writer.write_batch(self._table) def type(self, file: str) -> int: r"""Type of file. @@ -665,23 +573,31 @@ def _add_attachment( """ format = audeer.file_extension(file).lower() - self._conn.execute( - f"INSERT OR REPLACE INTO dependencies {DEPENDENCIES} VALUES {VALUES}", - ( - file, - archive, - 0, - 0, - checksum, - 0.0, - format, - 0, - 0, - define.DEPENDENCY_TYPE["attachment"], - version, - ), + # Create a new row + new_row = pa.table( + { + "file": [file], + "archive": [archive], + "bit_depth": [0], + "channels": [0], + "checksum": [checksum], + "duration": [0.0], + "format": [format], + "removed": [0], + "sampling_rate": [0], + "type": [define.DEPENDENCY_TYPE["attachment"]], + "version": [version], + }, + schema=self._schema, ) - self._conn.commit() + + # Remove existing entry if present + if file in self._file_index: + self._drop([file]) + + # Append new row + self._table = pa.concat_tables([self._table, new_row]) + self._file_index[file] = len(self._table) - 1 def _add_media( self, @@ -708,11 +624,47 @@ def _add_media( where each tuple holds the values of a new media entry """ - self._conn.executemany( - f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", - values, + if not values: + return + + # Unpack values into column lists + files = [v[0] for v in values] + archives = [v[1] for v in values] + bit_depths = [v[2] for v in values] + channels = [v[3] for v in values] + checksums = [v[4] for v in values] + durations = [v[5] for v in values] + formats = [v[6] for v in values] + removed = [v[7] for v in values] + sampling_rates = [v[8] for v in values] + types = [v[9] for v in values] + versions = [v[10] for v in values] + + # Create new table + new_rows = pa.table( + { + "file": pa.array(files, type=pa.string()), + "archive": pa.array(archives, type=pa.string()), + "bit_depth": pa.array(bit_depths, type=pa.int32()), + "channels": pa.array(channels, type=pa.int32()), + "checksum": pa.array(checksums, type=pa.string()), + "duration": pa.array(durations, type=pa.float64()), + "format": pa.array(formats, type=pa.string()), + "removed": pa.array(removed, type=pa.int32()), + "sampling_rate": pa.array(sampling_rates, type=pa.int32()), + "type": pa.array(types, type=pa.int32()), + "version": pa.array(versions, type=pa.string()), + }, + schema=self._schema, ) - self._conn.commit() + + # Append new rows + self._table = pa.concat_tables([self._table, new_rows]) + + # Update index + start_idx = len(self._table) - len(values) + for i, file in enumerate(files): + self._file_index[file] = start_idx + i def _add_meta( self, @@ -734,23 +686,31 @@ def _add_meta( else: archive = os.path.splitext(file[3:])[0] - self._conn.execute( - f"INSERT OR REPLACE INTO dependencies {DEPENDENCIES} VALUES {VALUES}", - ( - file, - archive, - 0, - 0, - checksum, - 0.0, - format, - 0, - 0, - define.DEPENDENCY_TYPE["meta"], - version, - ), + # Create a new row + new_row = pa.table( + { + "file": [file], + "archive": [archive], + "bit_depth": [0], + "channels": [0], + "checksum": [checksum], + "duration": [0.0], + "format": [format], + "removed": [0], + "sampling_rate": [0], + "type": [define.DEPENDENCY_TYPE["meta"]], + "version": [version], + }, + schema=self._schema, ) - self._conn.commit() + + # Remove existing entry if present + if file in self._file_index: + self._drop([file]) + + # Append new row + self._table = pa.concat_tables([self._table, new_row]) + self._file_index[file] = len(self._table) - 1 def _column_loc( self, @@ -769,17 +729,23 @@ def _column_loc( scalar value """ - cursor = self._conn.execute( - f"SELECT {column} FROM dependencies WHERE file = ?", (file,) - ) - row = cursor.fetchone() - if row is None: + if file not in self._file_index: raise KeyError(file) - value = row[0] + + row_idx = self._file_index[file] + value = self._table[column][row_idx].as_py() + if dtype is not None: value = dtype(value) return value + def _rebuild_index(self): + r"""Rebuild the file index from the current table.""" + self._file_index = {} + files = self._table["file"].to_pylist() + for idx, file in enumerate(files): + self._file_index[file] = idx + def _dataframe_to_table( self, df: pd.DataFrame, @@ -818,13 +784,16 @@ def _drop(self, files: Sequence[str]): """ if not files: return - # Convert to tuple if needed (e.g., if files is a set) - files = tuple(files) if not isinstance(files, (list, tuple)) else files - placeholders = ",".join("?" * len(files)) - self._conn.execute( - f"DELETE FROM dependencies WHERE file IN ({placeholders})", files - ) - self._conn.commit() + + # Create a mask for rows to keep + files_to_drop = set(files) + mask = pc.invert(pc.is_in(self._table["file"], pa.array(list(files_to_drop)))) + + # Filter the table + self._table = self._table.filter(mask) + + # Rebuild index + self._rebuild_index() def _remove(self, file: str): r"""Mark file as removed. @@ -833,10 +802,21 @@ def _remove(self, file: str): file: relative file path """ - self._conn.execute( - "UPDATE dependencies SET removed = 1 WHERE file = ?", (file,) + if file not in self._file_index: + return + + row_idx = self._file_index[file] + + # Create a new removed column with the updated value + removed_array = self._table["removed"].to_pylist() + removed_array[row_idx] = 1 + + # Replace the removed column + self._table = self._table.set_column( + self._table.schema.get_field_index("removed"), + "removed", + pa.array(removed_array, type=pa.int32()), ) - self._conn.commit() @staticmethod def _set_dtypes(df: pd.DataFrame) -> pd.DataFrame: @@ -909,30 +889,60 @@ def _update_media( KeyError: if a file in values does not exist in dependencies """ + if not values: + return + # Check if all files exist before updating for value in values: file = value[0] - cursor = self._conn.execute( - "SELECT 1 FROM dependencies WHERE file = ? LIMIT 1", (file,) - ) - if cursor.fetchone() is None: + if file not in self._file_index: raise KeyError(file) - # Update existing entries - self._conn.executemany( - """ - UPDATE dependencies - SET archive = ?, bit_depth = ?, channels = ?, checksum = ?, duration = ?, - format = ?, removed = ?, sampling_rate = ?, type = ?, version = ? - WHERE file = ? - """, - # Reorder tuple to put file at the end - [ - (v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8], v[9], v[10], v[0]) - for v in values - ], + # Convert table to mutable lists + files_list = self._table["file"].to_pylist() + archives_list = self._table["archive"].to_pylist() + bit_depths_list = self._table["bit_depth"].to_pylist() + channels_list = self._table["channels"].to_pylist() + checksums_list = self._table["checksum"].to_pylist() + durations_list = self._table["duration"].to_pylist() + formats_list = self._table["format"].to_pylist() + removed_list = self._table["removed"].to_pylist() + sampling_rates_list = self._table["sampling_rate"].to_pylist() + types_list = self._table["type"].to_pylist() + versions_list = self._table["version"].to_pylist() + + # Update values + for value in values: + file = value[0] + row_idx = self._file_index[file] + archives_list[row_idx] = value[1] + bit_depths_list[row_idx] = value[2] + channels_list[row_idx] = value[3] + checksums_list[row_idx] = value[4] + durations_list[row_idx] = value[5] + formats_list[row_idx] = value[6] + removed_list[row_idx] = value[7] + sampling_rates_list[row_idx] = value[8] + types_list[row_idx] = value[9] + versions_list[row_idx] = value[10] + + # Rebuild table + self._table = pa.table( + { + "file": pa.array(files_list, type=pa.string()), + "archive": pa.array(archives_list, type=pa.string()), + "bit_depth": pa.array(bit_depths_list, type=pa.int32()), + "channels": pa.array(channels_list, type=pa.int32()), + "checksum": pa.array(checksums_list, type=pa.string()), + "duration": pa.array(durations_list, type=pa.float64()), + "format": pa.array(formats_list, type=pa.string()), + "removed": pa.array(removed_list, type=pa.int32()), + "sampling_rate": pa.array(sampling_rates_list, type=pa.int32()), + "type": pa.array(types_list, type=pa.int32()), + "version": pa.array(versions_list, type=pa.string()), + }, + schema=self._schema, ) - self._conn.commit() def _update_media_version( self, @@ -948,14 +958,22 @@ def _update_media_version( """ if not files: return - # Convert to tuple if needed (e.g., if files is a set) - files = tuple(files) if not isinstance(files, (list, tuple)) else files - placeholders = ",".join("?" * len(files)) - self._conn.execute( - f"UPDATE dependencies SET version = ? WHERE file IN ({placeholders})", - [version] + list(files), + + # Convert version column to mutable list + versions_list = self._table["version"].to_pylist() + + # Update versions for specified files + for file in files: + if file in self._file_index: + row_idx = self._file_index[file] + versions_list[row_idx] = version + + # Replace the version column + self._table = self._table.set_column( + self._table.schema.get_field_index("version"), + "version", + pa.array(versions_list, type=pa.string()), ) - self._conn.commit() def error_message_missing_object( @@ -1077,8 +1095,8 @@ def download_dependencies( """ with tempfile.TemporaryDirectory() as tmp_root: - # Try to load in order: db.sqlite, db.parquet, db.zip (legacy CSV) - # First, try SQLite (current format) + # Try to load in order: db.lance, db.parquet, db.zip (legacy CSV) + # First, try Lance (current format) remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) if backend_interface.exists(remote_deps_file, version): local_deps_file = os.path.join(tmp_root, define.DEPENDENCY_FILE) diff --git a/pyproject.toml b/pyproject.toml index 2f6b857b..5ec8b3ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ dependencies = [ 'oyaml', 'pandas >=2.1.0', 'pyarrow', + 'pylance', ] # Get version dynamically from git # (needs setuptools_scm tools config below) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 81cac116..37406dad 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -51,13 +51,6 @@ "version": "1.0.0", }, ] -DEPENDENCIES = ( - "(file, archive, bit_depth, channels, checksum, duration, format, " - "removed, sampling_rate, type, version)" -) -VALUES = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" - - def get_entries(column): return [row[column] for row in ROWS] @@ -71,25 +64,25 @@ def test_get_entries(): ) def deps(): deps = audb.Dependencies() - # Insert test data directly into SQLite - for row in ROWS: - deps._conn.execute( - f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", - ( - row["file"], - row["archive"], - row["bit_depth"], - row["channels"], - row["checksum"], - row["duration"], - row["format"], - row["removed"], - row["sampling_rate"], - row["type"], - row["version"], - ), + # Insert test data directly using _add_media for all types + # to preserve the exact test data including archive values + media_and_meta_rows = [ + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], ) - deps._conn.commit() + for row in ROWS + ] + deps._add_media(media_and_meta_rows) return deps @@ -97,10 +90,10 @@ def test_instantiation(): r"""Test instantiation of audb.Dependencies. During instantiation of ``audb.Dependencies`` - an empty SQLite database is created under ``self._conn``, + an empty PyArrow table is created under ``self._table``, that stores the dependency table. This test ensures, - that the database + that the table contains the correct column names and data types, and the correct name and data type of its index. @@ -157,30 +150,43 @@ def test_equals(deps): assert deps == deps # Copy data to new Dependencies object _deps = audb.Dependencies() - for row in ROWS: - _deps._conn.execute( - f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", - ( - row["file"], - row["archive"], - row["bit_depth"], - row["channels"], - row["checksum"], - row["duration"], - row["format"], - row["removed"], - row["sampling_rate"], - row["type"], - row["version"], - ), + media_and_meta_rows = [ + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], ) - _deps._conn.commit() + for row in ROWS + ] + _deps._add_media(media_and_meta_rows) assert deps == _deps # example table vs. different table - _deps._conn.execute( - "UPDATE dependencies SET channels = 4 WHERE file = 'db.files.csv'" + # Modify one row to make it different + _deps._update_media( + [ + ( + "file.wav", + "archive2", + 16, + 4, # changed from 2 to 4 + "917338b854ad9c72f76bc9a68818dcd8", + 1.23, + "wav", + 0, + 16000, + 1, + "1.0.0", + ) + ] ) - _deps._conn.commit() assert deps != _deps @@ -288,7 +294,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.sqlite"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.lance"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. From 744a029fe27d7982ad9c9719847a0e859dd0b29b Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Fri, 2 Jan 2026 18:12:30 +0100 Subject: [PATCH 08/10] Use lance --- audb/core/dependencies.py | 19 ++++++++----------- tests/test_publish.py | 2 +- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 4e7192de..1213a7dc 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -450,21 +450,15 @@ def load(self, path: str): elif extension == "csv": # Read from CSV file - # The CSV writer creates a duplicate header (known issue), so skip the first data row - # and use the column names from the header + # Provide explicit column names and skip the header row table = csv.read_csv( path, read_options=csv.ReadOptions( - skip_rows=1, # Skip the duplicate header row - autogenerate_column_names=False, + column_names=self._schema.names, + skip_rows=1, ), + convert_options=csv.ConvertOptions(column_types=self._schema), ) - # Rename the empty column to "file" - columns = table.column_names - columns = ["file" if c == "" else c for c in columns] - table = table.rename_columns(columns) - # Ensure correct schema types - table = table.cast(self._schema) elif extension == "parquet": # Read from Parquet file @@ -528,7 +522,8 @@ def save(self, path: str): os.remove(path) # Create a new Lance file - with LanceFileWriter(path) as writer: + # Provide schema to handle empty tables + with LanceFileWriter(path, schema=self._schema) as writer: writer.write_batch(self._table) def type(self, file: str) -> int: @@ -769,6 +764,8 @@ def _dataframe_to_table( preserve_index=False, schema=self._schema, ) + # Combine chunks to avoid multi-chunk columns that cause CSV writing issues + table = table.combine_chunks() if not file_column: columns = table.column_names columns = ["" if c == "file" else c for c in columns] diff --git a/tests/test_publish.py b/tests/test_publish.py index e3cea2f2..1da8f679 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1044,7 +1044,7 @@ def test_publish_error_cross_repository(tmpdir): audb.publish(db_path_v2, "2.0.0", repo2, previous_version="1.0.0") # Publishing to repo2 with previous_version=None should work - os.remove(audeer.path(db_path, "db.sqlite")) + os.remove(audeer.path(db_path, "db.lance")) audb.publish(db_path, "2.0.0", repo2, previous_version=None) # Assert that the new version appears in repo2 From 0c31d44fb88a34e3bc15b58e5fa286f6768fdbcb Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Mon, 5 Jan 2026 08:17:22 +0100 Subject: [PATCH 09/10] Use lancedb --- audb/core/api.py | 5 +- audb/core/define.py | 7 +- audb/core/dependencies.py | 177 ++++++++++++++++++++++--------------- audb/core/load_to.py | 10 ++- audb/core/publish.py | 6 +- docs/publish.rst | 8 +- tests/test_dependencies.py | 2 +- tests/test_load.py | 8 +- tests/test_publish.py | 4 +- 9 files changed, 135 insertions(+), 92 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index 6aa581c3..bd84c43b 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -222,12 +222,13 @@ def cached( # Skip old audb cache (e.g. 1 as flavor) files = audeer.list_file_names(version_path, basenames=True) + folders = audeer.list_dir_names(version_path, basenames=True) if ( - define.DEPENDENCY_FILE not in files + define.DEPENDENCY_FILE not in folders and define.LEGACY_DEPENDENCY_FILE not in files ): # Skip all cache entries - # that don't contain a dependency file + # that don't contain a dependency file/folder # as those stem from audb<1.0.0. continue # pragma: no cover diff --git a/audb/core/define.py b/audb/core/define.py index d75032f8..58c00e00 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -11,8 +11,11 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCY_FILE = f"{DB}.lance" -r"""Filename and extension of dependency table file.""" +DEPENDENCY_FILE = f"{DB}.lancedb" +r"""Folder name of lancedb dependency table.""" + +DEPENDENCY_TABLE_NAME = "dependencies" +r"""Name of the table inside the lancedb dependency folder.""" PARQUET_DEPENDENCY_FILE = f"{DB}.parquet" r"""Filename and extension of parquet dependency table file. diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 1213a7dc..1b6158d5 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -5,10 +5,10 @@ import errno import os import re +import shutil import tempfile -from lance.file import LanceFileReader -from lance.file import LanceFileWriter +import lancedb import pandas as pd import pyarrow as pa import pyarrow.compute as pc @@ -46,7 +46,7 @@ class Dependencies: >>> deps = audb.dependencies("emodb", version="1.4.1") >>> # List all files or archives >>> deps.files[:3] - ['db.emotion.categories.test.gold_standard.csv', 'db.emotion.categories.train.gold_standard.csv', 'db.emotion.csv'] + ['db.emotion.csv', 'db.files.csv', 'wav/03a01Fa.wav'] >>> deps.archives[:2] ['005d2b91-5317-0c80-d602-6d55f0323f8c', '014f82d8-3491-fd00-7397-c3b2ac3b2875'] >>> # Access properties for a given file @@ -410,59 +410,67 @@ def format(self, file: str) -> str: return self._column_loc("format", file) def load(self, path: str): - r"""Read dependencies from file. + r"""Read dependencies from file or folder. Clears existing dependencies. Args: - path: path to file. - File extension can be ``csv``, - ``parquet``, or ``lance`` + path: path to file or folder. + For files, extension can be ``csv``, + ``parquet``, or ``lance``. + For folders, name should end with ``lancedb``. Raises: ValueError: if file extension is not one of ``csv``, ``parquet``, ``lance`` - FileNotFoundError: if ``path`` does not exists + or folder name does not end with ``lancedb`` + FileNotFoundError: if ``path`` does not exist """ path = audeer.path(path) - extension = audeer.file_extension(path) - if extension not in ["csv", "parquet", "lance"]: - raise ValueError( - f"File extension of 'path' has to be 'csv', 'parquet', or 'lance' " - f"not '{extension}'" - ) - if not os.path.exists(path): - raise FileNotFoundError( - errno.ENOENT, - os.strerror(errno.ENOENT), - path, - ) - # Load data based on format - if extension == "lance": - # Read from Lance file - reader = LanceFileReader(path) - results = reader.read_all() - # Convert ReaderResults to PyArrow table - table = results.to_table() - # Note: LanceFileReader doesn't need explicit close - - elif extension == "csv": - # Read from CSV file - # Provide explicit column names and skip the header row - table = csv.read_csv( - path, - read_options=csv.ReadOptions( - column_names=self._schema.names, - skip_rows=1, - ), - convert_options=csv.ConvertOptions(column_types=self._schema), - ) + # Check if path is a lancedb folder (based on name ending with 'lancedb') + if path.endswith("lancedb"): + if not os.path.exists(path): + raise FileNotFoundError( + errno.ENOENT, + os.strerror(errno.ENOENT), + path, + ) + # Read from LanceDB folder + db = lancedb.connect(path) + lance_table = db.open_table(define.DEPENDENCY_TABLE_NAME) + table = lance_table.to_arrow() + else: + extension = audeer.file_extension(path) + if extension not in ["csv", "parquet"]: + raise ValueError( + f"File extension of 'path' has to be 'csv' or 'parquet', " + f"not '{extension}'" + ) + if not os.path.exists(path): + raise FileNotFoundError( + errno.ENOENT, + os.strerror(errno.ENOENT), + path, + ) + + # Load data based on format + if extension == "csv": + # Read from CSV file + # Provide explicit column names and skip the header row + table = csv.read_csv( + path, + read_options=csv.ReadOptions( + column_names=self._schema.names, + skip_rows=1, + ), + convert_options=csv.ConvertOptions(column_types=self._schema), + ) - elif extension == "parquet": - # Read from Parquet file - table = parquet.read_table(path) + elif extension == "parquet": + # Read from Parquet file + table = parquet.read_table(path) # Set the table and rebuild index self._table = table @@ -493,11 +501,12 @@ def sampling_rate(self, file: str) -> int: return self._column_loc("sampling_rate", file, int) def save(self, path: str): - r"""Write dependencies to file. + r"""Write dependencies to file or folder. Args: - path: path to file. - File extension can be ``csv``, ``parquet``, or ``lance`` + path: path to file or folder. + For files, extension can be ``csv`` or ``parquet``. + For folders, name should end with ``lancedb``. """ path = audeer.path(path) @@ -515,16 +524,15 @@ def save(self, path: str): df = self() table = self._dataframe_to_table(df, file_column=True) parquet.write_table(table, path) - elif path.endswith("lance"): - # Write to Lance file - # Remove existing file if it exists + elif path.endswith("lancedb"): + # Write to LanceDB folder + # Remove existing folder if it exists if os.path.exists(path): - os.remove(path) + shutil.rmtree(path) - # Create a new Lance file - # Provide schema to handle empty tables - with LanceFileWriter(path, schema=self._schema) as writer: - writer.write_batch(self._table) + # Create lancedb database and table + db = lancedb.connect(path) + db.create_table(define.DEPENDENCY_TABLE_NAME, self._table) def type(self, file: str) -> int: r"""Type of file. @@ -1092,14 +1100,18 @@ def download_dependencies( """ with tempfile.TemporaryDirectory() as tmp_root: - # Try to load in order: db.lance, db.parquet, db.zip (legacy CSV) - # First, try Lance (current format) - remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) + # Try to load in order: + # db.lancedb.zip (current), db.parquet, db.zip (legacy CSV) + + # First, try LanceDB (current format, stored as zip archive) + remote_deps_file = backend_interface.join( + "/", name, define.DEPENDENCY_FILE + ".zip" + ) if backend_interface.exists(remote_deps_file, version): - local_deps_file = os.path.join(tmp_root, define.DEPENDENCY_FILE) - backend_interface.get_file( + local_deps_path = os.path.join(tmp_root, define.DEPENDENCY_FILE) + backend_interface.get_archive( remote_deps_file, - local_deps_file, + tmp_root, version, verbose=verbose, ) @@ -1109,17 +1121,21 @@ def download_dependencies( "/", name, define.PARQUET_DEPENDENCY_FILE ) if backend_interface.exists(remote_deps_file, version): - local_deps_file = os.path.join(tmp_root, define.PARQUET_DEPENDENCY_FILE) + local_deps_path = os.path.join( + tmp_root, define.PARQUET_DEPENDENCY_FILE + ) backend_interface.get_file( remote_deps_file, - local_deps_file, + local_deps_path, version, verbose=verbose, ) else: # Fall back to legacy CSV format - remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") - local_deps_file = os.path.join( + remote_deps_file = backend_interface.join( + "/", name, define.DB + ".zip" + ) + local_deps_path = os.path.join( tmp_root, define.LEGACY_DEPENDENCY_FILE, ) @@ -1129,9 +1145,9 @@ def download_dependencies( version, verbose=verbose, ) - # Create deps object from downloaded file + # Create deps object from downloaded file/folder deps = Dependencies() - deps.load(local_deps_file) + deps.load(local_deps_path) return deps @@ -1144,9 +1160,9 @@ def upload_dependencies( ): r"""Upload dependency file to backend. - Store a dependency file + Store a dependency folder in the local database root folder, - and upload it to the backend. + and upload it as a zip archive to the backend. Args: backend_interface: backend interface @@ -1156,7 +1172,24 @@ def upload_dependencies( version: database version """ - local_deps_file = os.path.join(db_root, define.DEPENDENCY_FILE) - remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) - deps.save(local_deps_file) - backend_interface.put_file(local_deps_file, remote_deps_file, version) + local_deps_folder = os.path.join(db_root, define.DEPENDENCY_FILE) + remote_deps_file = backend_interface.join( + "/", name, define.DEPENDENCY_FILE + ".zip" + ) + deps.save(local_deps_folder) + # Get all files inside the lancedb folder + # audeer.list_file_names returns files with basenames=False by default + lancedb_files = audeer.list_file_names( + local_deps_folder, + basenames=False, + recursive=True, + ) + # Convert to relative paths from db_root for put_archive + relative_files = [os.path.relpath(f, db_root) for f in lancedb_files] + # Upload as archive (zip the folder contents) + backend_interface.put_archive( + db_root, + remote_deps_file, + version, + files=relative_files, + ) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index d2dc35c8..7c45d9d4 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -2,6 +2,7 @@ from collections.abc import Sequence import os +import shutil import audbackend import audeer @@ -447,11 +448,12 @@ def load_to( # save dependencies dep_path_tmp = os.path.join(db_root_tmp, define.DEPENDENCY_FILE) + dep_path_final = os.path.join(db_root, define.DEPENDENCY_FILE) deps.save(dep_path_tmp) - audeer.move_file( - dep_path_tmp, - os.path.join(db_root, define.DEPENDENCY_FILE), - ) + # lancedb is a folder, so we use shutil.move + if os.path.exists(dep_path_final): + shutil.rmtree(dep_path_final) + shutil.move(dep_path_tmp, dep_path_final) # save database and PKL tables diff --git a/audb/core/publish.py b/audb/core/publish.py index 849d3c29..161084ff 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -776,7 +776,11 @@ def publish( # load database and dependencies deps = Dependencies() - for deps_file in [define.DEPENDENCY_FILE, define.LEGACY_DEPENDENCY_FILE]: + deps_files = [ + define.DEPENDENCY_FILE, + define.LEGACY_DEPENDENCY_FILE, + ] + for deps_file in deps_files: deps_path = os.path.join(db_root, deps_file) if os.path.exists(deps_path): deps.load(deps_path) diff --git a/docs/publish.rst b/docs/publish.rst index 89aae62d..c956655a 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -126,7 +126,7 @@ data/ data-local/ age-test/ 1.0.0/ - db.sqlite + db.lancedb.zip db.yaml media/ 1.0.0/ @@ -142,7 +142,7 @@ inside the ``media/`` folder, all tables inside the ``meta/`` folder, the database header in the file ``db.yaml``, and the database dependencies -in the file ``db.sqlite``. +in the file ``db.lancedb.zip``. Note, that the structure of the folders used for versioning @@ -247,10 +247,10 @@ data/ data-local/ age-test/ 1.0.0/ - db.sqlite + db.lancedb.zip db.yaml 1.1.0/ - db.sqlite + db.lancedb.zip db.yaml media/ 1.0.0/ diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 37406dad..4c7de1b9 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -294,7 +294,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.lance"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.lancedb"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. diff --git a/tests/test_load.py b/tests/test_load.py index 3a3f6d61..2bdd12bf 100644 --- a/tests/test_load.py +++ b/tests/test_load.py @@ -133,7 +133,7 @@ def dbs(tmpdir_factory, persistent_repository, storage_format): db.save(db_root, storage_format=storage_format) audformat.testing.create_audio_files(db) - shutil.copy( + shutil.copytree( audeer.path(previous_db_root, audb.core.define.DEPENDENCY_FILE), audeer.path(db_root, audb.core.define.DEPENDENCY_FILE), ) @@ -160,7 +160,7 @@ def dbs(tmpdir_factory, persistent_repository, storage_format): db.save(db_root, storage_format=storage_format) audformat.testing.create_audio_files(db) - shutil.copy( + shutil.copytree( audeer.path(previous_db_root, audb.core.define.DEPENDENCY_FILE), audeer.path(db_root, audb.core.define.DEPENDENCY_FILE), ) @@ -196,7 +196,7 @@ def dbs(tmpdir_factory, persistent_repository, storage_format): os.remove(audeer.path(db_root, file)) db.save(db_root, storage_format=storage_format) - shutil.copy( + shutil.copytree( os.path.join(previous_db_root, audb.core.define.DEPENDENCY_FILE), os.path.join(db_root, audb.core.define.DEPENDENCY_FILE), ) @@ -224,7 +224,7 @@ def dbs(tmpdir_factory, persistent_repository, storage_format): db.drop_tables("train") db.save(db_root, storage_format=storage_format) audformat.testing.create_audio_files(db) - shutil.copy( + shutil.copytree( os.path.join(previous_db_root, audb.core.define.DEPENDENCY_FILE), os.path.join(db_root, audb.core.define.DEPENDENCY_FILE), ) diff --git a/tests/test_publish.py b/tests/test_publish.py index 1da8f679..ebbff0c5 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1044,7 +1044,7 @@ def test_publish_error_cross_repository(tmpdir): audb.publish(db_path_v2, "2.0.0", repo2, previous_version="1.0.0") # Publishing to repo2 with previous_version=None should work - os.remove(audeer.path(db_path, "db.lance")) + shutil.rmtree(audeer.path(db_path, audb.core.define.DEPENDENCY_FILE)) audb.publish(db_path, "2.0.0", repo2, previous_version=None) # Assert that the new version appears in repo2 @@ -1227,7 +1227,7 @@ def test_update_database(dbs, persistent_repository): dbs[version], audb.core.define.DEPENDENCY_FILE, ) - os.remove(dep_file) + shutil.rmtree(dep_file) error_msg = ( f"You want to depend on '{previous_version}' " f"of {DB_NAME}, " From ca1c0a30da02daa978c02889331f65ed6fa84a52 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Mon, 5 Jan 2026 08:18:40 +0100 Subject: [PATCH 10/10] Add missing dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5ec8b3ec..adfb492b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,10 +41,10 @@ dependencies = [ 'audobject >=0.5.0', 'audresample >=0.1.6', 'filelock', + 'lancedb', 'oyaml', 'pandas >=2.1.0', 'pyarrow', - 'pylance', ] # Get version dynamically from git # (needs setuptools_scm tools config below)