diff --git a/audb/core/api.py b/audb/core/api.py index 1e4fc8d2..6aa581c3 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -225,7 +225,6 @@ def cached( if ( define.DEPENDENCY_FILE not in files and define.LEGACY_DEPENDENCY_FILE not in files - and define.CACHED_DEPENDENCY_FILE not in files ): # Skip all cache entries # that don't contain a dependency file @@ -293,28 +292,19 @@ def dependencies( version, cache_root=cache_root, ) - cached_deps_file = os.path.join(db_root, define.CACHED_DEPENDENCY_FILE) + deps_file = os.path.join(db_root, define.DEPENDENCY_FILE) with FolderLock(db_root): try: deps = Dependencies() - deps.load(cached_deps_file) + deps.load(deps_file) except Exception: # does not catch KeyboardInterupt # If loading cached file fails, load again from backend # - # Loading a cache file can fail - # as we use PyArrow data types, - # which when loading from pickle - # are not compatible between all pandas versions. - # We had originally some tests for it, - # but as the actual failure is not that important, - # we removed them in - # See https://github.com/audeering/audb/pull/507 - # backend_interface = utils.lookup_backend(name, version) deps = download_dependencies(backend_interface, name, version, verbose) - # Store as pickle in cache - deps.save(cached_deps_file) + # Store in cache + deps.save(deps_file) return deps diff --git a/audb/core/define.py b/audb/core/define.py index 4dab8d2f..cf99dfd0 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -11,16 +11,14 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCY_FILE = f"{DB}.parquet" +DEPENDENCY_FILE = f"{DB}.sqlite" r"""Filename and extension of dependency table file.""" -CACHED_DEPENDENCY_FILE = f"{DB}.pkl" -r"""Filename and extension of cached dependency table file. +PARQUET_DEPENDENCY_FILE = f"{DB}.parquet" +r"""Filename and extension of parquet dependency table file. -As loading from a pickle file is still faster -than loading from a parquet file, -we are storing the dependency table -as a pickle file in cache. +In ``audb`` versions between 1.7.0 and the SQLite migration, +the dependency table was stored in a parquet file. """ diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 84a94cf3..74e3e2ad 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -5,6 +5,7 @@ import errno import os import re +import sqlite3 import tempfile import pandas as pd @@ -18,6 +19,14 @@ from audb.core import define +# SQLITE query variables +DEPENDENCIES = ( + "(file, archive, bit_depth, channels, checksum, duration, format, " + "removed, sampling_rate, type, version)" +) +VALUES = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + + class Dependencies: r"""Dependencies of a database. @@ -43,7 +52,7 @@ class Dependencies: >>> deps = audb.dependencies("emodb", version="1.4.1") >>> # List all files or archives >>> deps.files[:3] - ['db.emotion.csv', 'db.files.csv', 'wav/03a01Fa.wav'] + ['db.emotion.categories.test.gold_standard.csv', 'db.emotion.categories.train.gold_standard.csv', 'db.emotion.csv'] >>> deps.archives[:2] ['005d2b91-5317-0c80-d602-6d55f0323f8c', '014f82d8-3491-fd00-7397-c3b2ac3b2875'] >>> # Access properties for a given file @@ -60,8 +69,38 @@ class Dependencies: """ # noqa: E501 def __init__(self): - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) - self._df = self._set_dtypes(self._df) + # Use in-memory SQLite database instead of pandas DataFrame + # Set check_same_thread=False to allow usage across threads + self._conn = sqlite3.connect(":memory:", check_same_thread=False) + self._db_path = None # Track if connected to a file or in-memory + + # Create the dependencies table + self._conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes for frequently queried columns + self._conn.execute("CREATE INDEX idx_type ON dependencies(type)") + self._conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + self._conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + self._conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" + ) + + self._conn.commit() + # pyarrow schema # used for reading and writing files self._schema = pa.schema( @@ -87,7 +126,16 @@ def __call__(self) -> pd.DataFrame: table with dependencies """ - return self._df + df = pd.read_sql_query( + "SELECT * FROM dependencies", + self._conn, + index_col="file", + ) + # Remove index name to match expected format + df.index.name = None + # Set correct dtypes + df = self._set_dtypes(df) + return df def __contains__(self, file: str) -> bool: r"""Check if file is part of dependencies. @@ -99,7 +147,10 @@ def __contains__(self, file: str) -> bool: ``True`` if a dependency to the file exists """ - return file in self._df.index + cursor = self._conn.execute( + "SELECT 1 FROM dependencies WHERE file = ? LIMIT 1", (file,) + ) + return cursor.fetchone() is not None def __eq__(self, other: "Dependencies") -> bool: r"""Check if two dependency tables are equal. @@ -111,7 +162,8 @@ def __eq__(self, other: "Dependencies") -> bool: ``True`` if both dependency tables have the same entries """ - return self._df.equals(other._df) + # Compare by converting to DataFrames + return self().equals(other()) def __getitem__(self, file: str) -> list: r"""File information. @@ -123,14 +175,113 @@ def __getitem__(self, file: str) -> list: list with meta information """ - return self._df.loc[file].tolist() + cursor = self._conn.execute( + "SELECT archive, bit_depth, channels, checksum, duration, " + "format, removed, sampling_rate, type, version " + "FROM dependencies WHERE file = ?", + (file,), + ) + row = cursor.fetchone() + if row is None: + raise KeyError(file) + return list(row) def __len__(self) -> int: r"""Number of all media, table, attachment files.""" - return len(self._df) + cursor = self._conn.execute("SELECT COUNT(*) FROM dependencies") + return cursor.fetchone()[0] def __str__(self) -> str: # noqa: D105 - return str(self._df) + return str(self()) + + def __del__(self): + """Clean up SQLite connection when object is deleted.""" + if hasattr(self, "_conn") and self._conn: + self._conn.close() + + def __getstate__(self): + """Make object serializable.""" + # Get all data as a DataFrame + df = self() + # Return the DataFrame and schema for reconstruction + return { + "data": df.to_dict("records"), + "index": df.index.tolist(), + } + + def __setstate__(self, state): + """Restore object from serialized state.""" + # Recreate the SQLite connection + self._conn = sqlite3.connect(":memory:", check_same_thread=False) + self._db_path = None + + # Recreate the table structure + self._conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes + self._conn.execute("CREATE INDEX idx_type ON dependencies(type)") + self._conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + self._conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + self._conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" + ) + + self._conn.commit() + + # Recreate the schema + self._schema = pa.schema( + [ + ("file", pa.string()), + ("archive", pa.string()), + ("bit_depth", pa.int32()), + ("channels", pa.int32()), + ("checksum", pa.string()), + ("duration", pa.float64()), + ("format", pa.string()), + ("removed", pa.int32()), + ("sampling_rate", pa.int32()), + ("type", pa.int32()), + ("version", pa.string()), + ] + ) + + # Restore the data + if state["data"]: + data = state["data"] + index = state["index"] + for i, row in enumerate(data): + file = index[i] + self._conn.execute( + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", + ( + file, + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + self._conn.commit() @property def archives(self) -> list[str]: @@ -140,7 +291,10 @@ def archives(self) -> list[str]: list of archives """ - return sorted(self._df.archive.unique().tolist()) + cursor = self._conn.execute( + "SELECT DISTINCT archive FROM dependencies ORDER BY archive" + ) + return [row[0] for row in cursor.fetchall()] @property def attachments(self) -> list[str]: @@ -150,9 +304,11 @@ def attachments(self) -> list[str]: list of attachments """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["attachment"],), + ) + return [row[0] for row in cursor.fetchall()] @property def attachment_ids(self) -> list[str]: @@ -162,9 +318,11 @@ def attachment_ids(self) -> list[str]: list of attachment IDs """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].archive.tolist() + cursor = self._conn.execute( + "SELECT archive FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["attachment"],), + ) + return [row[0] for row in cursor.fetchall()] @property def files(self) -> list[str]: @@ -174,7 +332,8 @@ def files(self) -> list[str]: list of files """ - return self._df.index.tolist() + cursor = self._conn.execute("SELECT file FROM dependencies") + return [row[0] for row in cursor.fetchall()] @property def media(self) -> list[str]: @@ -184,9 +343,11 @@ def media(self) -> list[str]: list of media """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["media"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["media"],), + ) + return [row[0] for row in cursor.fetchall()] @property def removed_media(self) -> list[str]: @@ -196,10 +357,11 @@ def removed_media(self) -> list[str]: list of media """ - return self._df[ - (self._df["type"] == define.DEPENDENCY_TYPE["media"]) - & (self._df["removed"] == 1) - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ? AND removed = 1", + (define.DEPENDENCY_TYPE["media"],), + ) + return [row[0] for row in cursor.fetchall()] @property def table_ids(self) -> list[str]: @@ -223,9 +385,11 @@ def tables(self) -> list[str]: list of tables """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["meta"] - ].index.tolist() + cursor = self._conn.execute( + "SELECT file FROM dependencies WHERE type = ?", + (define.DEPENDENCY_TYPE["meta"],), + ) + return [row[0] for row in cursor.fetchall()] def archive(self, file: str) -> str: r"""Name of archive the file belongs to. @@ -237,7 +401,7 @@ def archive(self, file: str) -> str: archive name """ - return self._df.archive[file] + return self._column_loc("archive", file) def bit_depth(self, file: str) -> int: r"""Bit depth of media file. @@ -306,22 +470,20 @@ def load(self, path: str): Args: path: path to file. - File extension can be ``csv`` - ``pkl``, - or ``parquet`` + File extension can be ``csv``, + ``parquet``, or ``sqlite`` Raises: ValueError: if file extension is not one of - ``csv``, ``pkl``, ``parquet`` + ``csv``, ``parquet``, ``sqlite`` FileNotFoundError: if ``path`` does not exists """ - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) extension = audeer.file_extension(path) - if extension not in ["csv", "pkl", "parquet"]: + if extension not in ["csv", "parquet", "sqlite"]: raise ValueError( - f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' " + f"File extension of 'path' has to be 'csv', 'parquet', or 'sqlite' " f"not '{extension}'" ) if not os.path.exists(path): @@ -330,27 +492,48 @@ def load(self, path: str): os.strerror(errno.ENOENT), path, ) - if extension == "pkl": - self._df = pd.read_pickle(path) - # Correct dtypes - # to make backward compatiple - # with old pickle files in cache - self._df = self._set_dtypes(self._df) - - elif extension == "csv": - table = csv.read_csv( - path, - read_options=csv.ReadOptions( - column_names=self._schema.names, - skip_rows=1, - ), - convert_options=csv.ConvertOptions(column_types=self._schema), - ) - self._df = self._table_to_dataframe(table) - elif extension == "parquet": - table = parquet.read_table(path) - self._df = self._table_to_dataframe(table) + # Clear existing data + self._conn.execute("DELETE FROM dependencies") + self._conn.commit() + + if extension == "sqlite": + # For SQLite files, we can attach and copy the data + self._conn.execute(f"ATTACH DATABASE '{path}' AS source_db") + try: + self._conn.execute( + "INSERT INTO dependencies SELECT * FROM source_db.dependencies" + ) + self._conn.commit() + finally: + self._conn.execute("DETACH DATABASE source_db") + + else: + # For CSV and parquet, load via pandas and insert into SQLite + if extension == "csv": + table = csv.read_csv( + path, + read_options=csv.ReadOptions( + column_names=self._schema.names, + skip_rows=1, + ), + convert_options=csv.ConvertOptions(column_types=self._schema), + ) + df = self._table_to_dataframe(table) + + elif extension == "parquet": + table = parquet.read_table(path) + df = self._table_to_dataframe(table) + + # Insert the dataframe into SQLite + df_to_insert = df.reset_index() + df_to_insert.columns = ["file"] + list(df.columns) + df_to_insert.to_sql( + "dependencies", + self._conn, + if_exists="append", + index=False, + ) def removed(self, file: str) -> bool: r"""Check if file is marked as removed. @@ -381,25 +564,64 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv``, ``pkl``, or ``parquet`` + File extension can be ``csv``, ``parquet``, or ``sqlite`` """ path = audeer.path(path) if path.endswith("csv"): - table = self._dataframe_to_table(self._df) + df = self() + table = self._dataframe_to_table(df) csv.write_csv( table, path, write_options=csv.WriteOptions(quoting_style="none"), ) - elif path.endswith("pkl"): - self._df.to_pickle( - path, - protocol=4, # supported by Python >= 3.4 - ) elif path.endswith("parquet"): - table = self._dataframe_to_table(self._df, file_column=True) + df = self() + table = self._dataframe_to_table(df, file_column=True) parquet.write_table(table, path) + elif path.endswith("sqlite"): + # Remove existing database file if it exists + if os.path.exists(path): + os.remove(path) + + # Create a new connection to the file database + file_conn = sqlite3.connect(path) + try: + # Create table with proper schema + file_conn.execute(""" + CREATE TABLE dependencies ( + file TEXT PRIMARY KEY, + archive TEXT, + bit_depth INTEGER, + channels INTEGER, + checksum TEXT, + duration REAL, + format TEXT, + removed INTEGER, + sampling_rate INTEGER, + type INTEGER, + version TEXT + ) + """) + + # Create indexes for frequently queried columns + file_conn.execute("CREATE INDEX idx_type ON dependencies(type)") + file_conn.execute("CREATE INDEX idx_removed ON dependencies(removed)") + file_conn.execute("CREATE INDEX idx_archive ON dependencies(archive)") + file_conn.execute( + "CREATE INDEX idx_type_removed ON dependencies(type, removed)" + ) + + # Copy data from in-memory database to file + # Use iterdump to copy all data + for line in self._conn.iterdump(): + if line.startswith("INSERT INTO"): + file_conn.execute(line) + + file_conn.commit() + finally: + file_conn.close() def type(self, file: str) -> int: r"""Type of file. @@ -443,18 +665,23 @@ def _add_attachment( """ format = audeer.file_extension(file).lower() - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["attachment"], # type - version, # version - ] + self._conn.execute( + f"INSERT OR REPLACE INTO dependencies {DEPENDENCIES} VALUES {VALUES}", + ( + file, + archive, + 0, + 0, + checksum, + 0.0, + format, + 0, + 0, + define.DEPENDENCY_TYPE["attachment"], + version, + ), + ) + self._conn.commit() def _add_media( self, @@ -481,12 +708,11 @@ def _add_media( where each tuple holds the values of a new media entry """ - df = pd.DataFrame.from_records( + self._conn.executemany( + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df = pd.concat([self._df, df]) + ) + self._conn.commit() def _add_meta( self, @@ -508,18 +734,23 @@ def _add_meta( else: archive = os.path.splitext(file[3:])[0] - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["meta"], # type - version, # version - ] + self._conn.execute( + f"INSERT OR REPLACE INTO dependencies {DEPENDENCIES} VALUES {VALUES}", + ( + file, + archive, + 0, + 0, + checksum, + 0.0, + format, + 0, + 0, + define.DEPENDENCY_TYPE["meta"], + version, + ), + ) + self._conn.commit() def _column_loc( self, @@ -538,7 +769,13 @@ def _column_loc( scalar value """ - value = self._df.at[file, column] + cursor = self._conn.execute( + f"SELECT {column} FROM dependencies WHERE file = ?", (file,) + ) + row = cursor.fetchone() + if row is None: + raise KeyError(file) + value = row[0] if dtype is not None: value = dtype(value) return value @@ -579,13 +816,15 @@ def _drop(self, files: Sequence[str]): files: relative file paths """ - # self._df.drop is slow, - # see https://stackoverflow.com/a/53394627. - # The solution presented in https://stackoverflow.com/a/53395360 - # self._df = self._df.loc[self._df.index.drop(files)] - # which is claimed to be faster, - # isn't. - self._df = self._df[~self._df.index.isin(files)] + if not files: + return + # Convert to tuple if needed (e.g., if files is a set) + files = tuple(files) if not isinstance(files, (list, tuple)) else files + placeholders = ",".join("?" * len(files)) + self._conn.execute( + f"DELETE FROM dependencies WHERE file IN ({placeholders})", files + ) + self._conn.commit() def _remove(self, file: str): r"""Mark file as removed. @@ -594,7 +833,10 @@ def _remove(self, file: str): file: relative file path """ - self._df.at[file, "removed"] = 1 + self._conn.execute( + "UPDATE dependencies SET removed = 1 WHERE file = ?", (file,) + ) + self._conn.commit() @staticmethod def _set_dtypes(df: pd.DataFrame) -> pd.DataFrame: @@ -663,13 +905,34 @@ def _update_media( values: list of tuples, where each tuple holds the new values for a media entry + Raises: + KeyError: if a file in values does not exist in dependencies + """ - df = pd.DataFrame.from_records( - values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df.loc[df.index] = df + # Check if all files exist before updating + for value in values: + file = value[0] + cursor = self._conn.execute( + "SELECT 1 FROM dependencies WHERE file = ? LIMIT 1", (file,) + ) + if cursor.fetchone() is None: + raise KeyError(file) + + # Update existing entries + self._conn.executemany( + """ + UPDATE dependencies + SET archive = ?, bit_depth = ?, channels = ?, checksum = ?, duration = ?, + format = ?, removed = ?, sampling_rate = ?, type = ?, version = ? + WHERE file = ? + """, + # Reorder tuple to put file at the end + [ + (v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8], v[9], v[10], v[0]) + for v in values + ], + ) + self._conn.commit() def _update_media_version( self, @@ -683,7 +946,16 @@ def _update_media_version( version: version string """ - self._df.loc[files, "version"] = version + if not files: + return + # Convert to tuple if needed (e.g., if files is a set) + files = tuple(files) if not isinstance(files, (list, tuple)) else files + placeholders = ",".join("?" * len(files)) + self._conn.execute( + f"UPDATE dependencies SET version = ? WHERE file IN ({placeholders})", + [version] + list(files), + ) + self._conn.commit() def error_message_missing_object( @@ -805,9 +1077,8 @@ def download_dependencies( """ with tempfile.TemporaryDirectory() as tmp_root: - # Load `db.parquet` file, - # or if non-existent `db.zip` - # from backend + # Try to load in order: db.sqlite, db.parquet, db.zip (legacy CSV) + # First, try SQLite (current format) remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) if backend_interface.exists(remote_deps_file, version): local_deps_file = os.path.join(tmp_root, define.DEPENDENCY_FILE) @@ -818,17 +1089,31 @@ def download_dependencies( verbose=verbose, ) else: - remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") - local_deps_file = os.path.join( - tmp_root, - define.LEGACY_DEPENDENCY_FILE, - ) - backend_interface.get_archive( - remote_deps_file, - tmp_root, - version, - verbose=verbose, + # Try parquet (previous format) + remote_deps_file = backend_interface.join( + "/", name, define.PARQUET_DEPENDENCY_FILE ) + if backend_interface.exists(remote_deps_file, version): + local_deps_file = os.path.join(tmp_root, define.PARQUET_DEPENDENCY_FILE) + backend_interface.get_file( + remote_deps_file, + local_deps_file, + version, + verbose=verbose, + ) + else: + # Fall back to legacy CSV format + remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") + local_deps_file = os.path.join( + tmp_root, + define.LEGACY_DEPENDENCY_FILE, + ) + backend_interface.get_archive( + remote_deps_file, + tmp_root, + version, + verbose=verbose, + ) # Create deps object from downloaded file deps = Dependencies() deps.load(local_deps_file) diff --git a/audb/core/load.py b/audb/core/load.py index bd4aefd5..18fdc4ac 100644 --- a/audb/core/load.py +++ b/audb/core/load.py @@ -471,7 +471,9 @@ def job(archive: str, version: str): tmp_root=db_root_tmp, ) # media files that can be changed to a requested flavor - flavor_files = deps._df[deps._df.sampling_rate != 0].index + # Get files with sampling_rate != 0 (audio files) + df = deps() + flavor_files = df[df.sampling_rate != 0].index for file in files: if os.name == "nt": # pragma: no cover file = file.replace(os.sep, "/") diff --git a/audb/core/publish.py b/audb/core/publish.py index 658f2975..849d3c29 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -84,8 +84,8 @@ def _find_attachments( r"""Find altered, new or removed attachments and update 'deps'.""" # drop removed attachments from dependency table removed_attachments = [ - deps._df.index[deps._df.archive == attachment_id][0] - for attachment_id in deps.attachment_ids + attachment + for attachment, attachment_id in zip(deps.attachments, deps.attachment_ids) if attachment_id not in db.attachments ] deps._drop(removed_attachments) diff --git a/docs/publish.rst b/docs/publish.rst index 6509c416..89aae62d 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -126,7 +126,7 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.sqlite db.yaml media/ 1.0.0/ @@ -142,7 +142,7 @@ inside the ``media/`` folder, all tables inside the ``meta/`` folder, the database header in the file ``db.yaml``, and the database dependencies -in the file ``db.parquet``. +in the file ``db.sqlite``. Note, that the structure of the folders used for versioning @@ -228,10 +228,10 @@ and will only publish those. >>> deps() archive bit_depth ... type version -db.age.parquet 0 ... 0 1.1.0 audio/001.wav 436c65ec-1e42-f9de-2708-ecafe07e827e 16 ... 1 1.0.0 audio/002.wav fda7e4d6-f2b2-4cff-cab5-906ef5d57607 16 ... 1 1.0.0 audio/003.wav e26ef45d-bdc1-6153-bdc4-852d83806e4a 16 ... 1 1.0.0 +db.age.parquet 0 ... 0 1.1.0 audio/004.wav ef4d1e81-6488-95cf-a165-604d1e47d575 16 ... 1 1.1.0 [5 rows x 10 columns] @@ -247,10 +247,10 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.sqlite db.yaml 1.1.0/ - db.parquet + db.sqlite db.yaml media/ 1.0.0/ diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 618e7757..81cac116 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -51,6 +51,11 @@ "version": "1.0.0", }, ] +DEPENDENCIES = ( + "(file, archive, bit_depth, channels, checksum, duration, format, " + "removed, sampling_rate, type, version)" +) +VALUES = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" def get_entries(column): @@ -66,13 +71,25 @@ def test_get_entries(): ) def deps(): deps = audb.Dependencies() - df = pd.DataFrame.from_records(ROWS) - df.set_index("file", inplace=True) - # Ensure correct dtype - df.index = df.index.astype(audb.core.define.DEPENDENCY_INDEX_DTYPE) - df.index.name = None - df = df.astype(audb.core.define.DEPENDENCY_TABLE) - deps._df = df + # Insert test data directly into SQLite + for row in ROWS: + deps._conn.execute( + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + deps._conn.commit() return deps @@ -80,10 +97,10 @@ def test_instantiation(): r"""Test instantiation of audb.Dependencies. During instantiation of ``audb.Dependencies`` - an empty dataframe is created under ``self._df``, + an empty SQLite database is created under ``self._conn``, that stores the dependency table. This test ensures, - that the dataframe + that the database contains the correct column names and data types, and the correct name and data type of its index. @@ -106,9 +123,8 @@ def test_instantiation(): audb.core.define.DEPENDENCY_INDEX_DTYPE ) expected_df = expected_df.astype(audb.core.define.DEPENDENCY_TABLE) - pd.testing.assert_frame_equal(deps._df, expected_df) - assert list(deps._df.columns) == expected_columns df = deps() + pd.testing.assert_frame_equal(df, expected_df) assert list(df.columns) == expected_columns @@ -139,10 +155,32 @@ def test_equals(deps): assert deps != audb.Dependencies() # example table vs. example table assert deps == deps - _deps._df = deps._df.copy() + # Copy data to new Dependencies object + _deps = audb.Dependencies() + for row in ROWS: + _deps._conn.execute( + f"INSERT INTO dependencies {DEPENDENCIES} VALUES {VALUES}", + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ), + ) + _deps._conn.commit() assert deps == _deps # example table vs. different table - _deps._df.loc["db.files.csv", "channels"] = 4 + _deps._conn.execute( + "UPDATE dependencies SET channels = 4 WHERE file = 'db.files.csv'" + ) + _deps._conn.commit() assert deps != _deps @@ -250,7 +288,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.pkl", "deps.parquet"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.sqlite"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. @@ -265,76 +303,7 @@ def test_load_save(tmpdir, deps, file): deps2 = audb.Dependencies() deps2.load(deps_file) pd.testing.assert_frame_equal(deps(), deps2()) - assert list(deps2._df.dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) - - -def test_load_save_backward_compatibility(tmpdir, deps): - """Test backward compatibility with old pickle cache files. - - As the dtype of the index has changed, - we need to make sure this is corrected - when loading old cache files. - - Old behaviour (audb<1.7): - - archive string[python] - bit_depth int32 - channels int32 - checksum string[python] - duration float64 - format string[python] - removed int32 - sampling_rate int32 - type int32 - version string[python] - - New behaviour (audb>=1.7): - - archive string[pyarrow] - bit_depth int32[pyarrow] - channels int32[pyarrow] - checksum string[pyarrow] - duration double[pyarrow] - format string[pyarrow] - removed int32[pyarrow] - sampling_rate int32[pyarrow] - type int32[pyarrow] - version string[pyarrow] - - """ - deps_file = audeer.path(tmpdir, "deps.pkl") - - deps_old = audb.Dependencies() - deps_old._df = deps._df.copy() - - # Change dtype of index from object to string - # to mimic previous behavior - deps_old._df.index = deps_old._df.index.astype("string") - # Change dtype of columns - # to mimic previous behavior - deps_old._df = deps_old._df.astype( - { - "archive": "string", - "bit_depth": "int32", - "channels": "int32", - "checksum": "string", - "duration": "float64", - "format": "string", - "removed": "int32", - "sampling_rate": "int32", - "type": "int32", - "version": "string", - } - ) - deps_old.save(deps_file) - - # Check that we get the correct dtypes, - # when loading from cache - deps2 = audb.Dependencies() - deps2.load(deps_file) - assert deps2._df.index.dtype == audb.core.define.DEPENDENCY_INDEX_DTYPE - pd.testing.assert_frame_equal(deps._df, deps2._df) - assert deps == deps2 + assert list(deps2().dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) def test_load_save_errors(deps): @@ -394,7 +363,8 @@ def test_str(deps): ) print(str(deps)) assert expected_str.match(str(deps)) - assert expected_str.match(deps._df.to_string()) + # str(deps) now calls __str__ which calls __call__ which returns a DataFrame + assert expected_str.match(deps().to_string()) # === Test hidden methods === diff --git a/tests/test_publish.py b/tests/test_publish.py index 15306c71..e3cea2f2 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1044,7 +1044,7 @@ def test_publish_error_cross_repository(tmpdir): audb.publish(db_path_v2, "2.0.0", repo2, previous_version="1.0.0") # Publishing to repo2 with previous_version=None should work - os.remove(audeer.path(db_path, "db.parquet")) + os.remove(audeer.path(db_path, "db.sqlite")) audb.publish(db_path, "2.0.0", repo2, previous_version=None) # Assert that the new version appears in repo2 diff --git a/tests/test_publish_table.py b/tests/test_publish_table.py index 6ad7e3f0..3cece588 100644 --- a/tests/test_publish_table.py +++ b/tests/test_publish_table.py @@ -90,7 +90,7 @@ def assert_db_published_to_repo( """ repo = audeer.path(repository.host, repository.name) - dependency_file = "db.parquet" + dependency_file = "db.sqlite" header_file = "db.yaml" files = list(db.files) tables = list(db)