diff --git a/audb/core/api.py b/audb/core/api.py index 1e4fc8d2..bd84c43b 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -222,13 +222,13 @@ def cached( # Skip old audb cache (e.g. 1 as flavor) files = audeer.list_file_names(version_path, basenames=True) + folders = audeer.list_dir_names(version_path, basenames=True) if ( - define.DEPENDENCY_FILE not in files + define.DEPENDENCY_FILE not in folders and define.LEGACY_DEPENDENCY_FILE not in files - and define.CACHED_DEPENDENCY_FILE not in files ): # Skip all cache entries - # that don't contain a dependency file + # that don't contain a dependency file/folder # as those stem from audb<1.0.0. continue # pragma: no cover @@ -293,28 +293,19 @@ def dependencies( version, cache_root=cache_root, ) - cached_deps_file = os.path.join(db_root, define.CACHED_DEPENDENCY_FILE) + deps_file = os.path.join(db_root, define.DEPENDENCY_FILE) with FolderLock(db_root): try: deps = Dependencies() - deps.load(cached_deps_file) + deps.load(deps_file) except Exception: # does not catch KeyboardInterupt # If loading cached file fails, load again from backend # - # Loading a cache file can fail - # as we use PyArrow data types, - # which when loading from pickle - # are not compatible between all pandas versions. - # We had originally some tests for it, - # but as the actual failure is not that important, - # we removed them in - # See https://github.com/audeering/audb/pull/507 - # backend_interface = utils.lookup_backend(name, version) deps = download_dependencies(backend_interface, name, version, verbose) - # Store as pickle in cache - deps.save(cached_deps_file) + # Store in cache + deps.save(deps_file) return deps diff --git a/audb/core/define.py b/audb/core/define.py index 4dab8d2f..58c00e00 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -11,16 +11,16 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCY_FILE = f"{DB}.parquet" -r"""Filename and extension of dependency table file.""" +DEPENDENCY_FILE = f"{DB}.lancedb" +r"""Folder name of lancedb dependency table.""" -CACHED_DEPENDENCY_FILE = f"{DB}.pkl" -r"""Filename and extension of cached dependency table file. +DEPENDENCY_TABLE_NAME = "dependencies" +r"""Name of the table inside the lancedb dependency folder.""" -As loading from a pickle file is still faster -than loading from a parquet file, -we are storing the dependency table -as a pickle file in cache. +PARQUET_DEPENDENCY_FILE = f"{DB}.parquet" +r"""Filename and extension of parquet dependency table file. + +Used as a backward compatible format for loading older databases. """ diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 84a94cf3..1b6158d5 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -5,10 +5,13 @@ import errno import os import re +import shutil import tempfile +import lancedb import pandas as pd import pyarrow as pa +import pyarrow.compute as pc import pyarrow.csv as csv import pyarrow.parquet as parquet @@ -60,10 +63,8 @@ class Dependencies: """ # noqa: E501 def __init__(self): - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) - self._df = self._set_dtypes(self._df) - # pyarrow schema - # used for reading and writing files + # Store dependencies as an in-memory PyArrow table + # Initialize with empty table self._schema = pa.schema( [ ("file", pa.string()), @@ -80,6 +81,16 @@ def __init__(self): ] ) + # Create empty table with the schema + self._table = pa.table( + {field.name: pa.array([], type=field.type) for field in self._schema}, + schema=self._schema, + ) + + # Create indices for fast lookups + # We'll use dictionaries to cache file->row_index mappings + self._file_index = {} + def __call__(self) -> pd.DataFrame: r"""Return dependencies as a table. @@ -87,7 +98,8 @@ def __call__(self) -> pd.DataFrame: table with dependencies """ - return self._df + df = self._table_to_dataframe(self._table) + return df def __contains__(self, file: str) -> bool: r"""Check if file is part of dependencies. @@ -99,7 +111,7 @@ def __contains__(self, file: str) -> bool: ``True`` if a dependency to the file exists """ - return file in self._df.index + return file in self._file_index def __eq__(self, other: "Dependencies") -> bool: r"""Check if two dependency tables are equal. @@ -111,7 +123,8 @@ def __eq__(self, other: "Dependencies") -> bool: ``True`` if both dependency tables have the same entries """ - return self._df.equals(other._df) + # Compare by converting to DataFrames + return self().equals(other()) def __getitem__(self, file: str) -> list: r"""File information. @@ -123,14 +136,107 @@ def __getitem__(self, file: str) -> list: list with meta information """ - return self._df.loc[file].tolist() + if file not in self._file_index: + raise KeyError(file) + + row_idx = self._file_index[file] + row = self._table.slice(row_idx, 1) + + # Return all columns except 'file' as a list + return [ + row["archive"][0].as_py(), + row["bit_depth"][0].as_py(), + row["channels"][0].as_py(), + row["checksum"][0].as_py(), + row["duration"][0].as_py(), + row["format"][0].as_py(), + row["removed"][0].as_py(), + row["sampling_rate"][0].as_py(), + row["type"][0].as_py(), + row["version"][0].as_py(), + ] def __len__(self) -> int: r"""Number of all media, table, attachment files.""" - return len(self._df) + return len(self._table) def __str__(self) -> str: # noqa: D105 - return str(self._df) + return str(self()) + + def __getstate__(self): + """Make object serializable.""" + # Get all data as a DataFrame for serialization + df = self() + # Return the DataFrame data for reconstruction + return { + "data": df.to_dict("records"), + "index": df.index.tolist(), + } + + def __setstate__(self, state): + """Restore object from serialized state.""" + # Recreate the schema + self._schema = pa.schema( + [ + ("file", pa.string()), + ("archive", pa.string()), + ("bit_depth", pa.int32()), + ("channels", pa.int32()), + ("checksum", pa.string()), + ("duration", pa.float64()), + ("format", pa.string()), + ("removed", pa.int32()), + ("sampling_rate", pa.int32()), + ("type", pa.int32()), + ("version", pa.string()), + ] + ) + + # Restore the data from serialized state + if state["data"]: + data = state["data"] + index = state["index"] + + # Build lists for each column + files = index + archives = [row["archive"] for row in data] + bit_depths = [row["bit_depth"] for row in data] + channels = [row["channels"] for row in data] + checksums = [row["checksum"] for row in data] + durations = [row["duration"] for row in data] + formats = [row["format"] for row in data] + removed = [row["removed"] for row in data] + sampling_rates = [row["sampling_rate"] for row in data] + types = [row["type"] for row in data] + versions = [row["version"] for row in data] + + # Create PyArrow table + self._table = pa.table( + { + "file": pa.array(files, type=pa.string()), + "archive": pa.array(archives, type=pa.string()), + "bit_depth": pa.array(bit_depths, type=pa.int32()), + "channels": pa.array(channels, type=pa.int32()), + "checksum": pa.array(checksums, type=pa.string()), + "duration": pa.array(durations, type=pa.float64()), + "format": pa.array(formats, type=pa.string()), + "removed": pa.array(removed, type=pa.int32()), + "sampling_rate": pa.array(sampling_rates, type=pa.int32()), + "type": pa.array(types, type=pa.int32()), + "version": pa.array(versions, type=pa.string()), + }, + schema=self._schema, + ) + + # Rebuild file index + self._file_index = {file: idx for idx, file in enumerate(files)} + else: + # Create empty table + self._table = pa.table( + {field.name: pa.array([], type=field.type) for field in self._schema}, + schema=self._schema, + ) + self._file_index = {} @property def archives(self) -> list[str]: @@ -140,7 +246,10 @@ def archives(self) -> list[str]: list of archives """ - return sorted(self._df.archive.unique().tolist()) + if len(self._table) == 0: + return [] + archives = pc.unique(self._table["archive"]) + return sorted([a.as_py() for a in archives]) @property def attachments(self) -> list[str]: @@ -150,9 +259,9 @@ def attachments(self) -> list[str]: list of attachments """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].index.tolist() + mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["attachment"]) + filtered = self._table.filter(mask) + return [f.as_py() for f in filtered["file"]] @property def attachment_ids(self) -> list[str]: @@ -162,9 +271,9 @@ def attachment_ids(self) -> list[str]: list of attachment IDs """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].archive.tolist() + mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["attachment"]) + filtered = self._table.filter(mask) + return [a.as_py() for a in filtered["archive"]] @property def files(self) -> list[str]: @@ -174,7 +283,7 @@ def files(self) -> list[str]: list of files """ - return self._df.index.tolist() + return [f.as_py() for f in self._table["file"]] @property def media(self) -> list[str]: @@ -184,9 +293,9 @@ def media(self) -> list[str]: list of media """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["media"] - ].index.tolist() + mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["media"]) + filtered = self._table.filter(mask) + return [f.as_py() for f in filtered["file"]] @property def removed_media(self) -> list[str]: @@ -196,10 +305,11 @@ def removed_media(self) -> list[str]: list of media """ - return self._df[ - (self._df["type"] == define.DEPENDENCY_TYPE["media"]) - & (self._df["removed"] == 1) - ].index.tolist() + type_mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["media"]) + removed_mask = pc.equal(self._table["removed"], 1) + combined_mask = pc.and_(type_mask, removed_mask) + filtered = self._table.filter(combined_mask) + return [f.as_py() for f in filtered["file"]] @property def table_ids(self) -> list[str]: @@ -223,9 +333,9 @@ def tables(self) -> list[str]: list of tables """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["meta"] - ].index.tolist() + mask = pc.equal(self._table["type"], define.DEPENDENCY_TYPE["meta"]) + filtered = self._table.filter(mask) + return [f.as_py() for f in filtered["file"]] def archive(self, file: str) -> str: r"""Name of archive the file belongs to. @@ -237,7 +347,7 @@ def archive(self, file: str) -> str: archive name """ - return self._df.archive[file] + return self._column_loc("archive", file) def bit_depth(self, file: str) -> int: r"""Bit depth of media file. @@ -300,57 +410,71 @@ def format(self, file: str) -> str: return self._column_loc("format", file) def load(self, path: str): - r"""Read dependencies from file. + r"""Read dependencies from file or folder. Clears existing dependencies. Args: - path: path to file. - File extension can be ``csv`` - ``pkl``, - or ``parquet`` + path: path to file or folder. + For files, extension can be ``csv``, + ``parquet``, or ``lance``. + For folders, name should end with ``lancedb``. Raises: ValueError: if file extension is not one of - ``csv``, ``pkl``, ``parquet`` - FileNotFoundError: if ``path`` does not exists + ``csv``, ``parquet``, ``lance`` + or folder name does not end with ``lancedb`` + FileNotFoundError: if ``path`` does not exist """ - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) - extension = audeer.file_extension(path) - if extension not in ["csv", "pkl", "parquet"]: - raise ValueError( - f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' " - f"not '{extension}'" - ) - if not os.path.exists(path): - raise FileNotFoundError( - errno.ENOENT, - os.strerror(errno.ENOENT), - path, - ) - if extension == "pkl": - self._df = pd.read_pickle(path) - # Correct dtypes - # to make backward compatiple - # with old pickle files in cache - self._df = self._set_dtypes(self._df) - - elif extension == "csv": - table = csv.read_csv( - path, - read_options=csv.ReadOptions( - column_names=self._schema.names, - skip_rows=1, - ), - convert_options=csv.ConvertOptions(column_types=self._schema), - ) - self._df = self._table_to_dataframe(table) - elif extension == "parquet": - table = parquet.read_table(path) - self._df = self._table_to_dataframe(table) + # Check if path is a lancedb folder (based on name ending with 'lancedb') + if path.endswith("lancedb"): + if not os.path.exists(path): + raise FileNotFoundError( + errno.ENOENT, + os.strerror(errno.ENOENT), + path, + ) + # Read from LanceDB folder + db = lancedb.connect(path) + lance_table = db.open_table(define.DEPENDENCY_TABLE_NAME) + table = lance_table.to_arrow() + else: + extension = audeer.file_extension(path) + if extension not in ["csv", "parquet"]: + raise ValueError( + f"File extension of 'path' has to be 'csv' or 'parquet', " + f"not '{extension}'" + ) + if not os.path.exists(path): + raise FileNotFoundError( + errno.ENOENT, + os.strerror(errno.ENOENT), + path, + ) + + # Load data based on format + if extension == "csv": + # Read from CSV file + # Provide explicit column names and skip the header row + table = csv.read_csv( + path, + read_options=csv.ReadOptions( + column_names=self._schema.names, + skip_rows=1, + ), + convert_options=csv.ConvertOptions(column_types=self._schema), + ) + + elif extension == "parquet": + # Read from Parquet file + table = parquet.read_table(path) + + # Set the table and rebuild index + self._table = table + self._rebuild_index() def removed(self, file: str) -> bool: r"""Check if file is marked as removed. @@ -377,29 +501,38 @@ def sampling_rate(self, file: str) -> int: return self._column_loc("sampling_rate", file, int) def save(self, path: str): - r"""Write dependencies to file. + r"""Write dependencies to file or folder. Args: - path: path to file. - File extension can be ``csv``, ``pkl``, or ``parquet`` + path: path to file or folder. + For files, extension can be ``csv`` or ``parquet``. + For folders, name should end with ``lancedb``. """ path = audeer.path(path) if path.endswith("csv"): - table = self._dataframe_to_table(self._df) + # Write to CSV + df = self() + table = self._dataframe_to_table(df) csv.write_csv( table, path, write_options=csv.WriteOptions(quoting_style="none"), ) - elif path.endswith("pkl"): - self._df.to_pickle( - path, - protocol=4, # supported by Python >= 3.4 - ) elif path.endswith("parquet"): - table = self._dataframe_to_table(self._df, file_column=True) + # Write to Parquet + df = self() + table = self._dataframe_to_table(df, file_column=True) parquet.write_table(table, path) + elif path.endswith("lancedb"): + # Write to LanceDB folder + # Remove existing folder if it exists + if os.path.exists(path): + shutil.rmtree(path) + + # Create lancedb database and table + db = lancedb.connect(path) + db.create_table(define.DEPENDENCY_TABLE_NAME, self._table) def type(self, file: str) -> int: r"""Type of file. @@ -443,18 +576,31 @@ def _add_attachment( """ format = audeer.file_extension(file).lower() - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["attachment"], # type - version, # version - ] + # Create a new row + new_row = pa.table( + { + "file": [file], + "archive": [archive], + "bit_depth": [0], + "channels": [0], + "checksum": [checksum], + "duration": [0.0], + "format": [format], + "removed": [0], + "sampling_rate": [0], + "type": [define.DEPENDENCY_TYPE["attachment"]], + "version": [version], + }, + schema=self._schema, + ) + + # Remove existing entry if present + if file in self._file_index: + self._drop([file]) + + # Append new row + self._table = pa.concat_tables([self._table, new_row]) + self._file_index[file] = len(self._table) - 1 def _add_media( self, @@ -481,12 +627,47 @@ def _add_media( where each tuple holds the values of a new media entry """ - df = pd.DataFrame.from_records( - values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df = pd.concat([self._df, df]) + if not values: + return + + # Unpack values into column lists + files = [v[0] for v in values] + archives = [v[1] for v in values] + bit_depths = [v[2] for v in values] + channels = [v[3] for v in values] + checksums = [v[4] for v in values] + durations = [v[5] for v in values] + formats = [v[6] for v in values] + removed = [v[7] for v in values] + sampling_rates = [v[8] for v in values] + types = [v[9] for v in values] + versions = [v[10] for v in values] + + # Create new table + new_rows = pa.table( + { + "file": pa.array(files, type=pa.string()), + "archive": pa.array(archives, type=pa.string()), + "bit_depth": pa.array(bit_depths, type=pa.int32()), + "channels": pa.array(channels, type=pa.int32()), + "checksum": pa.array(checksums, type=pa.string()), + "duration": pa.array(durations, type=pa.float64()), + "format": pa.array(formats, type=pa.string()), + "removed": pa.array(removed, type=pa.int32()), + "sampling_rate": pa.array(sampling_rates, type=pa.int32()), + "type": pa.array(types, type=pa.int32()), + "version": pa.array(versions, type=pa.string()), + }, + schema=self._schema, + ) + + # Append new rows + self._table = pa.concat_tables([self._table, new_rows]) + + # Update index + start_idx = len(self._table) - len(values) + for i, file in enumerate(files): + self._file_index[file] = start_idx + i def _add_meta( self, @@ -508,18 +689,31 @@ def _add_meta( else: archive = os.path.splitext(file[3:])[0] - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["meta"], # type - version, # version - ] + # Create a new row + new_row = pa.table( + { + "file": [file], + "archive": [archive], + "bit_depth": [0], + "channels": [0], + "checksum": [checksum], + "duration": [0.0], + "format": [format], + "removed": [0], + "sampling_rate": [0], + "type": [define.DEPENDENCY_TYPE["meta"]], + "version": [version], + }, + schema=self._schema, + ) + + # Remove existing entry if present + if file in self._file_index: + self._drop([file]) + + # Append new row + self._table = pa.concat_tables([self._table, new_row]) + self._file_index[file] = len(self._table) - 1 def _column_loc( self, @@ -538,11 +732,23 @@ def _column_loc( scalar value """ - value = self._df.at[file, column] + if file not in self._file_index: + raise KeyError(file) + + row_idx = self._file_index[file] + value = self._table[column][row_idx].as_py() + if dtype is not None: value = dtype(value) return value + def _rebuild_index(self): + r"""Rebuild the file index from the current table.""" + self._file_index = {} + files = self._table["file"].to_pylist() + for idx, file in enumerate(files): + self._file_index[file] = idx + def _dataframe_to_table( self, df: pd.DataFrame, @@ -566,6 +772,8 @@ def _dataframe_to_table( preserve_index=False, schema=self._schema, ) + # Combine chunks to avoid multi-chunk columns that cause CSV writing issues + table = table.combine_chunks() if not file_column: columns = table.column_names columns = ["" if c == "file" else c for c in columns] @@ -579,13 +787,18 @@ def _drop(self, files: Sequence[str]): files: relative file paths """ - # self._df.drop is slow, - # see https://stackoverflow.com/a/53394627. - # The solution presented in https://stackoverflow.com/a/53395360 - # self._df = self._df.loc[self._df.index.drop(files)] - # which is claimed to be faster, - # isn't. - self._df = self._df[~self._df.index.isin(files)] + if not files: + return + + # Create a mask for rows to keep + files_to_drop = set(files) + mask = pc.invert(pc.is_in(self._table["file"], pa.array(list(files_to_drop)))) + + # Filter the table + self._table = self._table.filter(mask) + + # Rebuild index + self._rebuild_index() def _remove(self, file: str): r"""Mark file as removed. @@ -594,7 +807,21 @@ def _remove(self, file: str): file: relative file path """ - self._df.at[file, "removed"] = 1 + if file not in self._file_index: + return + + row_idx = self._file_index[file] + + # Create a new removed column with the updated value + removed_array = self._table["removed"].to_pylist() + removed_array[row_idx] = 1 + + # Replace the removed column + self._table = self._table.set_column( + self._table.schema.get_field_index("removed"), + "removed", + pa.array(removed_array, type=pa.int32()), + ) @staticmethod def _set_dtypes(df: pd.DataFrame) -> pd.DataFrame: @@ -663,13 +890,64 @@ def _update_media( values: list of tuples, where each tuple holds the new values for a media entry + Raises: + KeyError: if a file in values does not exist in dependencies + """ - df = pd.DataFrame.from_records( - values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df.loc[df.index] = df + if not values: + return + + # Check if all files exist before updating + for value in values: + file = value[0] + if file not in self._file_index: + raise KeyError(file) + + # Convert table to mutable lists + files_list = self._table["file"].to_pylist() + archives_list = self._table["archive"].to_pylist() + bit_depths_list = self._table["bit_depth"].to_pylist() + channels_list = self._table["channels"].to_pylist() + checksums_list = self._table["checksum"].to_pylist() + durations_list = self._table["duration"].to_pylist() + formats_list = self._table["format"].to_pylist() + removed_list = self._table["removed"].to_pylist() + sampling_rates_list = self._table["sampling_rate"].to_pylist() + types_list = self._table["type"].to_pylist() + versions_list = self._table["version"].to_pylist() + + # Update values + for value in values: + file = value[0] + row_idx = self._file_index[file] + archives_list[row_idx] = value[1] + bit_depths_list[row_idx] = value[2] + channels_list[row_idx] = value[3] + checksums_list[row_idx] = value[4] + durations_list[row_idx] = value[5] + formats_list[row_idx] = value[6] + removed_list[row_idx] = value[7] + sampling_rates_list[row_idx] = value[8] + types_list[row_idx] = value[9] + versions_list[row_idx] = value[10] + + # Rebuild table + self._table = pa.table( + { + "file": pa.array(files_list, type=pa.string()), + "archive": pa.array(archives_list, type=pa.string()), + "bit_depth": pa.array(bit_depths_list, type=pa.int32()), + "channels": pa.array(channels_list, type=pa.int32()), + "checksum": pa.array(checksums_list, type=pa.string()), + "duration": pa.array(durations_list, type=pa.float64()), + "format": pa.array(formats_list, type=pa.string()), + "removed": pa.array(removed_list, type=pa.int32()), + "sampling_rate": pa.array(sampling_rates_list, type=pa.int32()), + "type": pa.array(types_list, type=pa.int32()), + "version": pa.array(versions_list, type=pa.string()), + }, + schema=self._schema, + ) def _update_media_version( self, @@ -683,7 +961,24 @@ def _update_media_version( version: version string """ - self._df.loc[files, "version"] = version + if not files: + return + + # Convert version column to mutable list + versions_list = self._table["version"].to_pylist() + + # Update versions for specified files + for file in files: + if file in self._file_index: + row_idx = self._file_index[file] + versions_list[row_idx] = version + + # Replace the version column + self._table = self._table.set_column( + self._table.schema.get_field_index("version"), + "version", + pa.array(versions_list, type=pa.string()), + ) def error_message_missing_object( @@ -805,33 +1100,54 @@ def download_dependencies( """ with tempfile.TemporaryDirectory() as tmp_root: - # Load `db.parquet` file, - # or if non-existent `db.zip` - # from backend - remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) + # Try to load in order: + # db.lancedb.zip (current), db.parquet, db.zip (legacy CSV) + + # First, try LanceDB (current format, stored as zip archive) + remote_deps_file = backend_interface.join( + "/", name, define.DEPENDENCY_FILE + ".zip" + ) if backend_interface.exists(remote_deps_file, version): - local_deps_file = os.path.join(tmp_root, define.DEPENDENCY_FILE) - backend_interface.get_file( - remote_deps_file, - local_deps_file, - version, - verbose=verbose, - ) - else: - remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") - local_deps_file = os.path.join( - tmp_root, - define.LEGACY_DEPENDENCY_FILE, - ) + local_deps_path = os.path.join(tmp_root, define.DEPENDENCY_FILE) backend_interface.get_archive( remote_deps_file, tmp_root, version, verbose=verbose, ) - # Create deps object from downloaded file + else: + # Try parquet (previous format) + remote_deps_file = backend_interface.join( + "/", name, define.PARQUET_DEPENDENCY_FILE + ) + if backend_interface.exists(remote_deps_file, version): + local_deps_path = os.path.join( + tmp_root, define.PARQUET_DEPENDENCY_FILE + ) + backend_interface.get_file( + remote_deps_file, + local_deps_path, + version, + verbose=verbose, + ) + else: + # Fall back to legacy CSV format + remote_deps_file = backend_interface.join( + "/", name, define.DB + ".zip" + ) + local_deps_path = os.path.join( + tmp_root, + define.LEGACY_DEPENDENCY_FILE, + ) + backend_interface.get_archive( + remote_deps_file, + tmp_root, + version, + verbose=verbose, + ) + # Create deps object from downloaded file/folder deps = Dependencies() - deps.load(local_deps_file) + deps.load(local_deps_path) return deps @@ -844,9 +1160,9 @@ def upload_dependencies( ): r"""Upload dependency file to backend. - Store a dependency file + Store a dependency folder in the local database root folder, - and upload it to the backend. + and upload it as a zip archive to the backend. Args: backend_interface: backend interface @@ -856,7 +1172,24 @@ def upload_dependencies( version: database version """ - local_deps_file = os.path.join(db_root, define.DEPENDENCY_FILE) - remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) - deps.save(local_deps_file) - backend_interface.put_file(local_deps_file, remote_deps_file, version) + local_deps_folder = os.path.join(db_root, define.DEPENDENCY_FILE) + remote_deps_file = backend_interface.join( + "/", name, define.DEPENDENCY_FILE + ".zip" + ) + deps.save(local_deps_folder) + # Get all files inside the lancedb folder + # audeer.list_file_names returns files with basenames=False by default + lancedb_files = audeer.list_file_names( + local_deps_folder, + basenames=False, + recursive=True, + ) + # Convert to relative paths from db_root for put_archive + relative_files = [os.path.relpath(f, db_root) for f in lancedb_files] + # Upload as archive (zip the folder contents) + backend_interface.put_archive( + db_root, + remote_deps_file, + version, + files=relative_files, + ) diff --git a/audb/core/load.py b/audb/core/load.py index bd4aefd5..18fdc4ac 100644 --- a/audb/core/load.py +++ b/audb/core/load.py @@ -471,7 +471,9 @@ def job(archive: str, version: str): tmp_root=db_root_tmp, ) # media files that can be changed to a requested flavor - flavor_files = deps._df[deps._df.sampling_rate != 0].index + # Get files with sampling_rate != 0 (audio files) + df = deps() + flavor_files = df[df.sampling_rate != 0].index for file in files: if os.name == "nt": # pragma: no cover file = file.replace(os.sep, "/") diff --git a/audb/core/load_to.py b/audb/core/load_to.py index d2dc35c8..7c45d9d4 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -2,6 +2,7 @@ from collections.abc import Sequence import os +import shutil import audbackend import audeer @@ -447,11 +448,12 @@ def load_to( # save dependencies dep_path_tmp = os.path.join(db_root_tmp, define.DEPENDENCY_FILE) + dep_path_final = os.path.join(db_root, define.DEPENDENCY_FILE) deps.save(dep_path_tmp) - audeer.move_file( - dep_path_tmp, - os.path.join(db_root, define.DEPENDENCY_FILE), - ) + # lancedb is a folder, so we use shutil.move + if os.path.exists(dep_path_final): + shutil.rmtree(dep_path_final) + shutil.move(dep_path_tmp, dep_path_final) # save database and PKL tables diff --git a/audb/core/publish.py b/audb/core/publish.py index 658f2975..161084ff 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -84,8 +84,8 @@ def _find_attachments( r"""Find altered, new or removed attachments and update 'deps'.""" # drop removed attachments from dependency table removed_attachments = [ - deps._df.index[deps._df.archive == attachment_id][0] - for attachment_id in deps.attachment_ids + attachment + for attachment, attachment_id in zip(deps.attachments, deps.attachment_ids) if attachment_id not in db.attachments ] deps._drop(removed_attachments) @@ -776,7 +776,11 @@ def publish( # load database and dependencies deps = Dependencies() - for deps_file in [define.DEPENDENCY_FILE, define.LEGACY_DEPENDENCY_FILE]: + deps_files = [ + define.DEPENDENCY_FILE, + define.LEGACY_DEPENDENCY_FILE, + ] + for deps_file in deps_files: deps_path = os.path.join(db_root, deps_file) if os.path.exists(deps_path): deps.load(deps_path) diff --git a/docs/publish.rst b/docs/publish.rst index 6509c416..c956655a 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -126,7 +126,7 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.lancedb.zip db.yaml media/ 1.0.0/ @@ -142,7 +142,7 @@ inside the ``media/`` folder, all tables inside the ``meta/`` folder, the database header in the file ``db.yaml``, and the database dependencies -in the file ``db.parquet``. +in the file ``db.lancedb.zip``. Note, that the structure of the folders used for versioning @@ -228,10 +228,10 @@ and will only publish those. >>> deps() archive bit_depth ... type version -db.age.parquet 0 ... 0 1.1.0 audio/001.wav 436c65ec-1e42-f9de-2708-ecafe07e827e 16 ... 1 1.0.0 audio/002.wav fda7e4d6-f2b2-4cff-cab5-906ef5d57607 16 ... 1 1.0.0 audio/003.wav e26ef45d-bdc1-6153-bdc4-852d83806e4a 16 ... 1 1.0.0 +db.age.parquet 0 ... 0 1.1.0 audio/004.wav ef4d1e81-6488-95cf-a165-604d1e47d575 16 ... 1 1.1.0 [5 rows x 10 columns] @@ -247,10 +247,10 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.lancedb.zip db.yaml 1.1.0/ - db.parquet + db.lancedb.zip db.yaml media/ 1.0.0/ diff --git a/pyproject.toml b/pyproject.toml index 2f6b857b..adfb492b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dependencies = [ 'audobject >=0.5.0', 'audresample >=0.1.6', 'filelock', + 'lancedb', 'oyaml', 'pandas >=2.1.0', 'pyarrow', diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 618e7757..4c7de1b9 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -51,8 +51,6 @@ "version": "1.0.0", }, ] - - def get_entries(column): return [row[column] for row in ROWS] @@ -66,13 +64,25 @@ def test_get_entries(): ) def deps(): deps = audb.Dependencies() - df = pd.DataFrame.from_records(ROWS) - df.set_index("file", inplace=True) - # Ensure correct dtype - df.index = df.index.astype(audb.core.define.DEPENDENCY_INDEX_DTYPE) - df.index.name = None - df = df.astype(audb.core.define.DEPENDENCY_TABLE) - deps._df = df + # Insert test data directly using _add_media for all types + # to preserve the exact test data including archive values + media_and_meta_rows = [ + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ) + for row in ROWS + ] + deps._add_media(media_and_meta_rows) return deps @@ -80,10 +90,10 @@ def test_instantiation(): r"""Test instantiation of audb.Dependencies. During instantiation of ``audb.Dependencies`` - an empty dataframe is created under ``self._df``, + an empty PyArrow table is created under ``self._table``, that stores the dependency table. This test ensures, - that the dataframe + that the table contains the correct column names and data types, and the correct name and data type of its index. @@ -106,9 +116,8 @@ def test_instantiation(): audb.core.define.DEPENDENCY_INDEX_DTYPE ) expected_df = expected_df.astype(audb.core.define.DEPENDENCY_TABLE) - pd.testing.assert_frame_equal(deps._df, expected_df) - assert list(deps._df.columns) == expected_columns df = deps() + pd.testing.assert_frame_equal(df, expected_df) assert list(df.columns) == expected_columns @@ -139,10 +148,45 @@ def test_equals(deps): assert deps != audb.Dependencies() # example table vs. example table assert deps == deps - _deps._df = deps._df.copy() + # Copy data to new Dependencies object + _deps = audb.Dependencies() + media_and_meta_rows = [ + ( + row["file"], + row["archive"], + row["bit_depth"], + row["channels"], + row["checksum"], + row["duration"], + row["format"], + row["removed"], + row["sampling_rate"], + row["type"], + row["version"], + ) + for row in ROWS + ] + _deps._add_media(media_and_meta_rows) assert deps == _deps # example table vs. different table - _deps._df.loc["db.files.csv", "channels"] = 4 + # Modify one row to make it different + _deps._update_media( + [ + ( + "file.wav", + "archive2", + 16, + 4, # changed from 2 to 4 + "917338b854ad9c72f76bc9a68818dcd8", + 1.23, + "wav", + 0, + 16000, + 1, + "1.0.0", + ) + ] + ) assert deps != _deps @@ -250,7 +294,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.pkl", "deps.parquet"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.lancedb"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. @@ -265,76 +309,7 @@ def test_load_save(tmpdir, deps, file): deps2 = audb.Dependencies() deps2.load(deps_file) pd.testing.assert_frame_equal(deps(), deps2()) - assert list(deps2._df.dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) - - -def test_load_save_backward_compatibility(tmpdir, deps): - """Test backward compatibility with old pickle cache files. - - As the dtype of the index has changed, - we need to make sure this is corrected - when loading old cache files. - - Old behaviour (audb<1.7): - - archive string[python] - bit_depth int32 - channels int32 - checksum string[python] - duration float64 - format string[python] - removed int32 - sampling_rate int32 - type int32 - version string[python] - - New behaviour (audb>=1.7): - - archive string[pyarrow] - bit_depth int32[pyarrow] - channels int32[pyarrow] - checksum string[pyarrow] - duration double[pyarrow] - format string[pyarrow] - removed int32[pyarrow] - sampling_rate int32[pyarrow] - type int32[pyarrow] - version string[pyarrow] - - """ - deps_file = audeer.path(tmpdir, "deps.pkl") - - deps_old = audb.Dependencies() - deps_old._df = deps._df.copy() - - # Change dtype of index from object to string - # to mimic previous behavior - deps_old._df.index = deps_old._df.index.astype("string") - # Change dtype of columns - # to mimic previous behavior - deps_old._df = deps_old._df.astype( - { - "archive": "string", - "bit_depth": "int32", - "channels": "int32", - "checksum": "string", - "duration": "float64", - "format": "string", - "removed": "int32", - "sampling_rate": "int32", - "type": "int32", - "version": "string", - } - ) - deps_old.save(deps_file) - - # Check that we get the correct dtypes, - # when loading from cache - deps2 = audb.Dependencies() - deps2.load(deps_file) - assert deps2._df.index.dtype == audb.core.define.DEPENDENCY_INDEX_DTYPE - pd.testing.assert_frame_equal(deps._df, deps2._df) - assert deps == deps2 + assert list(deps2().dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) def test_load_save_errors(deps): @@ -394,7 +369,8 @@ def test_str(deps): ) print(str(deps)) assert expected_str.match(str(deps)) - assert expected_str.match(deps._df.to_string()) + # str(deps) now calls __str__ which calls __call__ which returns a DataFrame + assert expected_str.match(deps().to_string()) # === Test hidden methods === diff --git a/tests/test_load.py b/tests/test_load.py index 3a3f6d61..2bdd12bf 100644 --- a/tests/test_load.py +++ b/tests/test_load.py @@ -133,7 +133,7 @@ def dbs(tmpdir_factory, persistent_repository, storage_format): db.save(db_root, storage_format=storage_format) audformat.testing.create_audio_files(db) - shutil.copy( + shutil.copytree( audeer.path(previous_db_root, audb.core.define.DEPENDENCY_FILE), audeer.path(db_root, audb.core.define.DEPENDENCY_FILE), ) @@ -160,7 +160,7 @@ def dbs(tmpdir_factory, persistent_repository, storage_format): db.save(db_root, storage_format=storage_format) audformat.testing.create_audio_files(db) - shutil.copy( + shutil.copytree( audeer.path(previous_db_root, audb.core.define.DEPENDENCY_FILE), audeer.path(db_root, audb.core.define.DEPENDENCY_FILE), ) @@ -196,7 +196,7 @@ def dbs(tmpdir_factory, persistent_repository, storage_format): os.remove(audeer.path(db_root, file)) db.save(db_root, storage_format=storage_format) - shutil.copy( + shutil.copytree( os.path.join(previous_db_root, audb.core.define.DEPENDENCY_FILE), os.path.join(db_root, audb.core.define.DEPENDENCY_FILE), ) @@ -224,7 +224,7 @@ def dbs(tmpdir_factory, persistent_repository, storage_format): db.drop_tables("train") db.save(db_root, storage_format=storage_format) audformat.testing.create_audio_files(db) - shutil.copy( + shutil.copytree( os.path.join(previous_db_root, audb.core.define.DEPENDENCY_FILE), os.path.join(db_root, audb.core.define.DEPENDENCY_FILE), ) diff --git a/tests/test_publish.py b/tests/test_publish.py index 15306c71..ebbff0c5 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1044,7 +1044,7 @@ def test_publish_error_cross_repository(tmpdir): audb.publish(db_path_v2, "2.0.0", repo2, previous_version="1.0.0") # Publishing to repo2 with previous_version=None should work - os.remove(audeer.path(db_path, "db.parquet")) + shutil.rmtree(audeer.path(db_path, audb.core.define.DEPENDENCY_FILE)) audb.publish(db_path, "2.0.0", repo2, previous_version=None) # Assert that the new version appears in repo2 @@ -1227,7 +1227,7 @@ def test_update_database(dbs, persistent_repository): dbs[version], audb.core.define.DEPENDENCY_FILE, ) - os.remove(dep_file) + shutil.rmtree(dep_file) error_msg = ( f"You want to depend on '{previous_version}' " f"of {DB_NAME}, " diff --git a/tests/test_publish_table.py b/tests/test_publish_table.py index 6ad7e3f0..3cece588 100644 --- a/tests/test_publish_table.py +++ b/tests/test_publish_table.py @@ -90,7 +90,7 @@ def assert_db_published_to_repo( """ repo = audeer.path(repository.host, repository.name) - dependency_file = "db.parquet" + dependency_file = "db.sqlite" header_file = "db.yaml" files = list(db.files) tables = list(db)