diff --git a/audb/core/api.py b/audb/core/api.py index 1e4fc8d2..aa03a871 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -224,8 +224,8 @@ def cached( files = audeer.list_file_names(version_path, basenames=True) if ( define.DEPENDENCY_FILE not in files + and define.PARQUET_DEPENDENCY_FILE not in files and define.LEGACY_DEPENDENCY_FILE not in files - and define.CACHED_DEPENDENCY_FILE not in files ): # Skip all cache entries # that don't contain a dependency file @@ -293,28 +293,19 @@ def dependencies( version, cache_root=cache_root, ) - cached_deps_file = os.path.join(db_root, define.CACHED_DEPENDENCY_FILE) + deps_file = os.path.join(db_root, define.DEPENDENCY_FILE) with FolderLock(db_root): try: deps = Dependencies() - deps.load(cached_deps_file) + deps.load(deps_file) except Exception: # does not catch KeyboardInterupt # If loading cached file fails, load again from backend # - # Loading a cache file can fail - # as we use PyArrow data types, - # which when loading from pickle - # are not compatible between all pandas versions. - # We had originally some tests for it, - # but as the actual failure is not that important, - # we removed them in - # See https://github.com/audeering/audb/pull/507 - # backend_interface = utils.lookup_backend(name, version) deps = download_dependencies(backend_interface, name, version, verbose) - # Store as pickle in cache - deps.save(cached_deps_file) + # Store in cache + deps.save(deps_file) return deps diff --git a/audb/core/define.py b/audb/core/define.py index 4dab8d2f..4d52d4e6 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -11,16 +11,22 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCY_FILE = f"{DB}.parquet" -r"""Filename and extension of dependency table file.""" +DEPENDENCY_FILE = f"{DB}.arrow" +r"""Filename and extension of dependency table file. -CACHED_DEPENDENCY_FILE = f"{DB}.pkl" -r"""Filename and extension of cached dependency table file. +Since ``audb`` version 1.12.0, +the dependency table is stored in an Apache Arrow IPC file. -As loading from a pickle file is still faster -than loading from a parquet file, -we are storing the dependency table -as a pickle file in cache. +""" + +PARQUET_DEPENDENCY_FILE = f"{DB}.parquet" +r"""Filename and extension of Parquet dependency table file. + +In ``audb`` versions 1.7.0 to 1.7.x, +the dependency table was stored in a Parquet file. +For backward compatibility, +the loader will try this format +if the Arrow IPC file is not found. """ @@ -28,7 +34,7 @@ r"""Filename and extension of legacy dependency table file. In ``audb`` versions smaller than 1.7.0, -the dependency table was stored in a csv file. +the dependency table was stored in a CSV file. """ diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 84a94cf3..8c99c116 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -9,7 +9,9 @@ import pandas as pd import pyarrow as pa +import pyarrow.compute as pc import pyarrow.csv as csv +import pyarrow.ipc as ipc import pyarrow.parquet as parquet import audbackend @@ -60,8 +62,6 @@ class Dependencies: """ # noqa: E501 def __init__(self): - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) - self._df = self._set_dtypes(self._df) # pyarrow schema # used for reading and writing files self._schema = pa.schema( @@ -79,6 +79,59 @@ def __init__(self): ("version", pa.string()), ] ) + # Internal Arrow table (immutable) + self._table = pa.table( + { + "file": pa.array([], type=pa.string()), + "archive": pa.array([], type=pa.string()), + "bit_depth": pa.array([], type=pa.int32()), + "channels": pa.array([], type=pa.int32()), + "checksum": pa.array([], type=pa.string()), + "duration": pa.array([], type=pa.float64()), + "format": pa.array([], type=pa.string()), + "removed": pa.array([], type=pa.int32()), + "sampling_rate": pa.array([], type=pa.int32()), + "type": pa.array([], type=pa.int32()), + "version": pa.array([], type=pa.string()), + }, + schema=self._schema, + ) + # File path to row index mapping for O(1) lookups + self._file_index: dict[str, int] = {} + # Cached DataFrame for __call__() + self._df_cache: pd.DataFrame | None = None + # Track if table was modified to invalidate cache + self._table_modified: bool = True + + @property + def _df(self) -> pd.DataFrame: + r"""Backward compatibility property for accessing the DataFrame. + + Returns the cached DataFrame representation of the Arrow table. + This property is provided for backward compatibility with code + that directly accesses ``_df``. + + Returns: + DataFrame with dependencies + + """ + return self.__call__() + + @_df.setter + def _df(self, df: pd.DataFrame): + r"""Backward compatibility setter for the DataFrame. + + Converts the DataFrame to an Arrow table and updates internal state. + This setter is provided for backward compatibility with code + that directly sets ``_df`` (e.g., test fixtures). + + Args: + df: DataFrame to set + + """ + self._table = self._dataframe_to_table(df, file_column=True) + self._rebuild_index() + self._invalidate_df_cache() def __call__(self) -> pd.DataFrame: r"""Return dependencies as a table. @@ -87,7 +140,10 @@ def __call__(self) -> pd.DataFrame: table with dependencies """ - return self._df + if self._df_cache is None or self._table_modified: + self._df_cache = self._table_to_dataframe(self._table) + self._table_modified = False + return self._df_cache def __contains__(self, file: str) -> bool: r"""Check if file is part of dependencies. @@ -99,7 +155,7 @@ def __contains__(self, file: str) -> bool: ``True`` if a dependency to the file exists """ - return file in self._df.index + return file in self._file_index def __eq__(self, other: "Dependencies") -> bool: r"""Check if two dependency tables are equal. @@ -111,7 +167,9 @@ def __eq__(self, other: "Dependencies") -> bool: ``True`` if both dependency tables have the same entries """ - return self._df.equals(other._df) + # Compare using DataFrames for compatibility with code + # that modifies _df directly + return self.__call__().equals(other.__call__()) def __getitem__(self, file: str) -> list: r"""File information. @@ -123,14 +181,22 @@ def __getitem__(self, file: str) -> list: list with meta information """ - return self._df.loc[file].tolist() + if file not in self._file_index: + raise KeyError(file) + row_idx = self._file_index[file] + # Direct column access is much faster than take() for single rows, + # especially with chunked arrays from parquet/arrow files + return [ + self._table.column(i)[row_idx].as_py() + for i in range(1, self._table.num_columns) + ] def __len__(self) -> int: r"""Number of all media, table, attachment files.""" - return len(self._df) + return len(self._table) def __str__(self) -> str: # noqa: D105 - return str(self._df) + return str(self.__call__()) @property def archives(self) -> list[str]: @@ -140,7 +206,8 @@ def archives(self) -> list[str]: list of archives """ - return sorted(self._df.archive.unique().tolist()) + unique_archives = pc.unique(self._table.column("archive")) + return sorted(unique_archives.to_pylist()) @property def attachments(self) -> list[str]: @@ -150,9 +217,10 @@ def attachments(self) -> list[str]: list of attachments """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].index.tolist() + type_col = self._table.column("type") + mask = pc.equal(type_col, define.DEPENDENCY_TYPE["attachment"]) + filtered_table = self._table.filter(mask) + return filtered_table.column("file").to_pylist() @property def attachment_ids(self) -> list[str]: @@ -162,9 +230,10 @@ def attachment_ids(self) -> list[str]: list of attachment IDs """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["attachment"] - ].archive.tolist() + type_col = self._table.column("type") + mask = pc.equal(type_col, define.DEPENDENCY_TYPE["attachment"]) + filtered_table = self._table.filter(mask) + return filtered_table.column("archive").to_pylist() @property def files(self) -> list[str]: @@ -174,7 +243,7 @@ def files(self) -> list[str]: list of files """ - return self._df.index.tolist() + return self._table.column("file").to_pylist() @property def media(self) -> list[str]: @@ -184,9 +253,10 @@ def media(self) -> list[str]: list of media """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["media"] - ].index.tolist() + type_col = self._table.column("type") + mask = pc.equal(type_col, define.DEPENDENCY_TYPE["media"]) + filtered_table = self._table.filter(mask) + return filtered_table.column("file").to_pylist() @property def removed_media(self) -> list[str]: @@ -196,10 +266,14 @@ def removed_media(self) -> list[str]: list of media """ - return self._df[ - (self._df["type"] == define.DEPENDENCY_TYPE["media"]) - & (self._df["removed"] == 1) - ].index.tolist() + type_col = self._table.column("type") + removed_col = self._table.column("removed") + mask = pc.and_( + pc.equal(type_col, define.DEPENDENCY_TYPE["media"]), + pc.equal(removed_col, 1), + ) + filtered_table = self._table.filter(mask) + return filtered_table.column("file").to_pylist() @property def table_ids(self) -> list[str]: @@ -223,9 +297,25 @@ def tables(self) -> list[str]: list of tables """ - return self._df[ - self._df["type"] == define.DEPENDENCY_TYPE["meta"] - ].index.tolist() + type_col = self._table.column("type") + mask = pc.equal(type_col, define.DEPENDENCY_TYPE["meta"]) + filtered_table = self._table.filter(mask) + return filtered_table.column("file").to_pylist() + + def get_files_with_sampling_rate(self) -> list[str]: + r"""Get files that have a non-zero sampling rate. + + This is a helper method for filtering files that need + flavor conversion based on sampling rate. + + Returns: + list of file paths with non-zero sampling rate + + """ + sampling_rate_col = self._table.column("sampling_rate") + mask = pc.not_equal(sampling_rate_col, 0) + filtered_table = self._table.filter(mask) + return filtered_table.column("file").to_pylist() def archive(self, file: str) -> str: r"""Name of archive the file belongs to. @@ -237,7 +327,7 @@ def archive(self, file: str) -> str: archive name """ - return self._df.archive[file] + return self._column_loc("archive", file) def bit_depth(self, file: str) -> int: r"""Bit depth of media file. @@ -304,38 +394,62 @@ def load(self, path: str): Clears existing dependencies. + Supports auto-detection of file format. + If no extension is provided or the file doesn't exist, + tries loading in order: Arrow IPC (``.arrow``), + Parquet (``.parquet``), CSV (``.csv``). + Args: path: path to file. - File extension can be ``csv`` - ``pkl``, - or ``parquet`` + File extension can be ``arrow``, ``parquet``, or ``csv``. + If the path doesn't exist, will attempt auto-detection + by trying different extensions in order Raises: ValueError: if file extension is not one of - ``csv``, ``pkl``, ``parquet`` - FileNotFoundError: if ``path`` does not exists + ``arrow``, ``parquet``, ``csv`` + FileNotFoundError: if ``path`` does not exist + and auto-detection fails """ - self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys()) path = audeer.path(path) + + # Check extension validity extension = audeer.file_extension(path) - if extension not in ["csv", "pkl", "parquet"]: + # If extension is provided and invalid, raise error + if extension and extension not in ["arrow", "parquet", "csv"]: raise ValueError( - f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' " + f"File extension of 'path' has to be " + f"'arrow', 'parquet', or 'csv', " f"not '{extension}'" ) - if not os.path.exists(path): - raise FileNotFoundError( - errno.ENOENT, - os.strerror(errno.ENOENT), - path, - ) - if extension == "pkl": - self._df = pd.read_pickle(path) - # Correct dtypes - # to make backward compatiple - # with old pickle files in cache - self._df = self._set_dtypes(self._df) + + # Auto-detection: try to find the file with different extensions + # if file doesn't exist or extension is empty + if not os.path.exists(path) or not extension: + base_path = os.path.splitext(path)[0] + for ext in [".arrow", ".parquet", ".csv"]: + candidate_path = base_path + ext + if os.path.exists(candidate_path): + path = candidate_path + extension = audeer.file_extension(path) + break + else: + # No file found with any extension + raise FileNotFoundError( + errno.ENOENT, + os.strerror(errno.ENOENT), + path, + ) + + if extension == "arrow": + with ipc.open_file(path) as reader: + table = reader.read_all() + self._table = table + + elif extension == "parquet": + table = parquet.read_table(path) + self._table = table elif extension == "csv": table = csv.read_csv( @@ -346,11 +460,11 @@ def load(self, path: str): ), convert_options=csv.ConvertOptions(column_types=self._schema), ) - self._df = self._table_to_dataframe(table) + self._table = table - elif extension == "parquet": - table = parquet.read_table(path) - self._df = self._table_to_dataframe(table) + # Rebuild index and invalidate cache + self._rebuild_index() + self._invalidate_df_cache() def removed(self, file: str) -> bool: r"""Check if file is marked as removed. @@ -381,25 +495,46 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv``, ``pkl``, or ``parquet`` + File extension can be ``arrow``, ``parquet``, or ``csv`` + + Raises: + ValueError: if file extension is not one of + ``arrow``, ``parquet``, ``csv`` """ path = audeer.path(path) - if path.endswith("csv"): - table = self._dataframe_to_table(self._df) + extension = audeer.file_extension(path) + + # Convert from DataFrame to capture any in-place modifications + # made via _df property + table = self._dataframe_to_table(self.__call__(), file_column=True) + + if extension == "arrow": + # Write as Arrow IPC format with LZ4 compression + with ipc.RecordBatchFileWriter( + path, + schema=self._schema, + options=ipc.IpcWriteOptions(compression="lz4"), + ) as writer: + writer.write_table(table) + elif extension == "parquet": + parquet.write_table(table, path) + elif extension == "csv": + # For CSV, rename file column to empty string for compatibility + columns = table.column_names + columns = ["" if c == "file" else c for c in columns] + table = table.rename_columns(columns) csv.write_csv( table, path, write_options=csv.WriteOptions(quoting_style="none"), ) - elif path.endswith("pkl"): - self._df.to_pickle( - path, - protocol=4, # supported by Python >= 3.4 + else: + raise ValueError( + f"File extension of 'path' has to be " + f"'arrow', 'parquet', or 'csv', " + f"not '{extension}'" ) - elif path.endswith("parquet"): - table = self._dataframe_to_table(self._df, file_column=True) - parquet.write_table(table, path) def type(self, file: str) -> int: r"""Type of file. @@ -443,18 +578,52 @@ def _add_attachment( """ format = audeer.file_extension(file).lower() - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["attachment"], # type - version, # version - ] + # If file exists, update it in place to preserve row order + if file in self._file_index: + row_idx = self._file_index[file] + # Update each column value at the specific row index + for col_name, value in [ + ("archive", archive), + ("bit_depth", 0), + ("channels", 0), + ("checksum", checksum), + ("duration", 0.0), + ("format", format), + ("removed", 0), + ("sampling_rate", 0), + ("type", define.DEPENDENCY_TYPE["attachment"]), + ("version", version), + ]: + col = self._table.column(col_name) + col_list = col.to_pylist() + col_list[row_idx] = value + col_type = self._schema.field(col_name).type + new_col = pa.array(col_list, type=col_type) + col_idx = self._schema.get_field_index(col_name) + self._table = self._table.set_column(col_idx, col_name, new_col) + self._invalidate_df_cache() + else: + # Build single-row table for new file + new_row = pa.table( + { + "file": [file], + "archive": [archive], + "bit_depth": [0], + "channels": [0], + "checksum": [checksum], + "duration": [0.0], + "format": [format], + "removed": [0], + "sampling_rate": [0], + "type": [define.DEPENDENCY_TYPE["attachment"]], + "version": [version], + }, + schema=self._schema, + ) + # Append new row + self._table = pa.concat_tables([self._table, new_row]) + self._rebuild_index() + self._invalidate_df_cache() def _add_media( self, @@ -481,12 +650,32 @@ def _add_media( where each tuple holds the values of a new media entry """ - df = pd.DataFrame.from_records( - values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df = pd.concat([self._df, df]) + # Build table directly from tuples (transpose to get column arrays) + if not values: + return + + arrays = list(zip(*values)) + new_rows = pa.table( + { + "file": arrays[0], + "archive": arrays[1], + "bit_depth": arrays[2], + "channels": arrays[3], + "checksum": arrays[4], + "duration": arrays[5], + "format": arrays[6], + "removed": arrays[7], + "sampling_rate": arrays[8], + "type": arrays[9], + "version": arrays[10], + }, + schema=self._schema, + ) + + # Concatenate with existing table + self._table = pa.concat_tables([self._table, new_rows]) + self._rebuild_index() + self._invalidate_df_cache() def _add_meta( self, @@ -508,18 +697,52 @@ def _add_meta( else: archive = os.path.splitext(file[3:])[0] - self._df.loc[file] = [ - archive, # archive - 0, # bit_depth - 0, # channels - checksum, # checksum - 0.0, # duration - format, # format - 0, # removed - 0, # sampling_rate - define.DEPENDENCY_TYPE["meta"], # type - version, # version - ] + # If file exists, update it in place to preserve row order + if file in self._file_index: + row_idx = self._file_index[file] + # Update each column value at the specific row index + for col_name, value in [ + ("archive", archive), + ("bit_depth", 0), + ("channels", 0), + ("checksum", checksum), + ("duration", 0.0), + ("format", format), + ("removed", 0), + ("sampling_rate", 0), + ("type", define.DEPENDENCY_TYPE["meta"]), + ("version", version), + ]: + col = self._table.column(col_name) + col_list = col.to_pylist() + col_list[row_idx] = value + col_type = self._schema.field(col_name).type + new_col = pa.array(col_list, type=col_type) + col_idx = self._schema.get_field_index(col_name) + self._table = self._table.set_column(col_idx, col_name, new_col) + self._invalidate_df_cache() + else: + # Build single-row table for new file + new_row = pa.table( + { + "file": [file], + "archive": [archive], + "bit_depth": [0], + "channels": [0], + "checksum": [checksum], + "duration": [0.0], + "format": [format], + "removed": [0], + "sampling_rate": [0], + "type": [define.DEPENDENCY_TYPE["meta"]], + "version": [version], + }, + schema=self._schema, + ) + # Append new row + self._table = pa.concat_tables([self._table, new_row]) + self._rebuild_index() + self._invalidate_df_cache() def _column_loc( self, @@ -538,7 +761,10 @@ def _column_loc( scalar value """ - value = self._df.at[file, column] + if file not in self._file_index: + raise KeyError(file) + row_idx = self._file_index[file] + value = self._table.column(column)[row_idx].as_py() if dtype is not None: value = dtype(value) return value @@ -579,13 +805,12 @@ def _drop(self, files: Sequence[str]): files: relative file paths """ - # self._df.drop is slow, - # see https://stackoverflow.com/a/53394627. - # The solution presented in https://stackoverflow.com/a/53395360 - # self._df = self._df.loc[self._df.index.drop(files)] - # which is claimed to be faster, - # isn't. - self._df = self._df[~self._df.index.isin(files)] + # Use Arrow compute filter with inverted is_in mask + file_col = self._table.column("file") + mask = pc.invert(pc.is_in(file_col, pa.array(files, type=pa.string()))) + self._table = self._table.filter(mask) + self._rebuild_index() + self._invalidate_df_cache() def _remove(self, file: str): r"""Mark file as removed. @@ -594,7 +819,20 @@ def _remove(self, file: str): file: relative file path """ - self._df.at[file, "removed"] = 1 + if file not in self._file_index: + raise KeyError(file) + + # Get row index and update removed column + row_idx = self._file_index[file] + removed_col = self._table.column("removed") + removed_list = removed_col.to_pylist() + removed_list[row_idx] = 1 + + # Replace column + new_removed_col = pa.array(removed_list, type=pa.int32()) + col_idx = self._schema.get_field_index("removed") + self._table = self._table.set_column(col_idx, "removed", new_removed_col) + self._invalidate_df_cache() @staticmethod def _set_dtypes(df: pd.DataFrame) -> pd.DataFrame: @@ -664,12 +902,47 @@ def _update_media( where each tuple holds the new values for a media entry """ - df = pd.DataFrame.from_records( - values, - columns=["file"] + list(define.DEPENDENCY_TABLE.keys()), - ).set_index("file") - df = self._set_dtypes(df) - self._df.loc[df.index] = df + if not values: + return + + # Build table from tuples + arrays = list(zip(*values)) + files_to_update = list(arrays[0]) + + # Check that all files exist (to match pandas behavior) + for file in files_to_update: + if file not in self._file_index: + raise KeyError(file) + + # Filter out rows that need updating + file_col = self._table.column("file") + mask = pc.invert( + pc.is_in(file_col, pa.array(files_to_update, type=pa.string())) + ) + filtered_table = self._table.filter(mask) + + # Build new rows table + new_rows = pa.table( + { + "file": arrays[0], + "archive": arrays[1], + "bit_depth": arrays[2], + "channels": arrays[3], + "checksum": arrays[4], + "duration": arrays[5], + "format": arrays[6], + "removed": arrays[7], + "sampling_rate": arrays[8], + "type": arrays[9], + "version": arrays[10], + }, + schema=self._schema, + ) + + # Concatenate filtered table with new rows + self._table = pa.concat_tables([filtered_table, new_rows]) + self._rebuild_index() + self._invalidate_df_cache() def _update_media_version( self, @@ -683,7 +956,45 @@ def _update_media_version( version: version string """ - self._df.loc[files, "version"] = version + # Get row indices for files + row_indices = [self._file_index[f] for f in files if f in self._file_index] + + if not row_indices: + return + + # Update version column + version_col = self._table.column("version") + version_list = version_col.to_pylist() + for idx in row_indices: + version_list[idx] = version + + # Replace column + new_version_col = pa.array(version_list, type=pa.string()) + col_idx = self._schema.get_field_index("version") + self._table = self._table.set_column(col_idx, "version", new_version_col) + self._invalidate_df_cache() + + def _rebuild_index(self): + r"""Rebuild file path to row index mapping. + + Creates a dictionary mapping each file path + to its row index in the Arrow table + for O(1) file lookups. + Called after any operation that changes row order. + + """ + file_col = self._table.column("file") + self._file_index = {file: i for i, file in enumerate(file_col.to_pylist())} + + def _invalidate_df_cache(self): + r"""Invalidate the cached DataFrame. + + Marks the DataFrame cache as stale + so it will be regenerated on next __call__(). + Called after any mutation operation. + + """ + self._table_modified = True def error_message_missing_object( @@ -794,6 +1105,11 @@ def download_dependencies( and return an dependency object loaded from that file. + Tries formats in order: + Arrow IPC (``.arrow``), + Parquet (``.parquet``), + CSV (``.csv`` in ``.zip``). + Args: backend_interface: backend interface name: database name @@ -805,9 +1121,14 @@ def download_dependencies( """ with tempfile.TemporaryDirectory() as tmp_root: - # Load `db.parquet` file, - # or if non-existent `db.zip` - # from backend + # Try loading dependency file in order of preference: + # 1. db.arrow (Arrow IPC format, newest) + # 2. db.parquet (Parquet format, introduced in v1.7.0) + # 3. db.zip containing db.csv (legacy format, pre-v1.7.0) + + local_deps_file = None + + # Try Arrow IPC first remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE) if backend_interface.exists(remote_deps_file, version): local_deps_file = os.path.join(tmp_root, define.DEPENDENCY_FILE) @@ -818,17 +1139,32 @@ def download_dependencies( verbose=verbose, ) else: - remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") - local_deps_file = os.path.join( - tmp_root, - define.LEGACY_DEPENDENCY_FILE, - ) - backend_interface.get_archive( - remote_deps_file, - tmp_root, - version, - verbose=verbose, + # Fall back to Parquet + remote_deps_file = backend_interface.join( + "/", name, define.PARQUET_DEPENDENCY_FILE ) + if backend_interface.exists(remote_deps_file, version): + local_deps_file = os.path.join(tmp_root, define.PARQUET_DEPENDENCY_FILE) + backend_interface.get_file( + remote_deps_file, + local_deps_file, + version, + verbose=verbose, + ) + else: + # Fall back to legacy CSV in ZIP + remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") + local_deps_file = os.path.join( + tmp_root, + define.LEGACY_DEPENDENCY_FILE, + ) + backend_interface.get_archive( + remote_deps_file, + tmp_root, + version, + verbose=verbose, + ) + # Create deps object from downloaded file deps = Dependencies() deps.load(local_deps_file) diff --git a/audb/core/load.py b/audb/core/load.py index bd4aefd5..183cf6aa 100644 --- a/audb/core/load.py +++ b/audb/core/load.py @@ -471,7 +471,7 @@ def job(archive: str, version: str): tmp_root=db_root_tmp, ) # media files that can be changed to a requested flavor - flavor_files = deps._df[deps._df.sampling_rate != 0].index + flavor_files = deps.get_files_with_sampling_rate() for file in files: if os.name == "nt": # pragma: no cover file = file.replace(os.sep, "/") diff --git a/audb/core/publish.py b/audb/core/publish.py index 658f2975..a936b275 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -84,8 +84,8 @@ def _find_attachments( r"""Find altered, new or removed attachments and update 'deps'.""" # drop removed attachments from dependency table removed_attachments = [ - deps._df.index[deps._df.archive == attachment_id][0] - for attachment_id in deps.attachment_ids + attachment + for attachment, attachment_id in zip(deps.attachments, deps.attachment_ids) if attachment_id not in db.attachments ] deps._drop(removed_attachments) @@ -776,7 +776,11 @@ def publish( # load database and dependencies deps = Dependencies() - for deps_file in [define.DEPENDENCY_FILE, define.LEGACY_DEPENDENCY_FILE]: + for deps_file in [ + define.DEPENDENCY_FILE, + define.PARQUET_DEPENDENCY_FILE, + define.LEGACY_DEPENDENCY_FILE, + ]: deps_path = os.path.join(db_root, deps_file) if os.path.exists(deps_path): deps.load(deps_path) diff --git a/docs/publish.rst b/docs/publish.rst index 6509c416..7874937a 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -126,7 +126,7 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.arrow db.yaml media/ 1.0.0/ @@ -142,7 +142,7 @@ inside the ``media/`` folder, all tables inside the ``meta/`` folder, the database header in the file ``db.yaml``, and the database dependencies -in the file ``db.parquet``. +in the file ``db.arrow``. Note, that the structure of the folders used for versioning @@ -247,10 +247,10 @@ data/ data-local/ age-test/ 1.0.0/ - db.parquet + db.arrow db.yaml 1.1.0/ - db.parquet + db.arrow db.yaml media/ 1.0.0/ diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 618e7757..4245de48 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -250,7 +250,7 @@ def test_removed(deps): deps.removed("non.existing") -@pytest.mark.parametrize("file", ["deps.csv", "deps.pkl", "deps.parquet"]) +@pytest.mark.parametrize("file", ["deps.csv", "deps.parquet", "deps.arrow"]) def test_load_save(tmpdir, deps, file): """Test consistency of dependency table after save/load cycle. @@ -268,83 +268,76 @@ def test_load_save(tmpdir, deps, file): assert list(deps2._df.dtypes) == list(audb.core.define.DEPENDENCY_TABLE.values()) -def test_load_save_backward_compatibility(tmpdir, deps): - """Test backward compatibility with old pickle cache files. +def test_load_save_errors(deps): + """Test possible errors when loading/saving.""" + # Wrong file extension + with pytest.raises(ValueError, match=r".*'txt'.*"): + deps.load("deps.txt") + # File missing + with pytest.raises(FileNotFoundError): + deps.load("deps.csv") - As the dtype of the index has changed, - we need to make sure this is corrected - when loading old cache files. - Old behaviour (audb<1.7): +def test_arrow_compression(tmpdir, deps): + """Test Arrow IPC file uses LZ4 compression.""" + import pyarrow.ipc as ipc - archive string[python] - bit_depth int32 - channels int32 - checksum string[python] - duration float64 - format string[python] - removed int32 - sampling_rate int32 - type int32 - version string[python] + deps_file = audeer.path(tmpdir, "deps.arrow") + deps.save(deps_file) - New behaviour (audb>=1.7): + # Read raw Arrow file to verify compression + with ipc.open_file(deps_file) as reader: + # Verify the file is valid Arrow IPC + assert reader.num_record_batches > 0 - archive string[pyarrow] - bit_depth int32[pyarrow] - channels int32[pyarrow] - checksum string[pyarrow] - duration double[pyarrow] - format string[pyarrow] - removed int32[pyarrow] - sampling_rate int32[pyarrow] - type int32[pyarrow] - version string[pyarrow] + # File should exist and be smaller than uncompressed + assert os.path.exists(deps_file) + assert os.path.getsize(deps_file) > 0 - """ - deps_file = audeer.path(tmpdir, "deps.pkl") - - deps_old = audb.Dependencies() - deps_old._df = deps._df.copy() - - # Change dtype of index from object to string - # to mimic previous behavior - deps_old._df.index = deps_old._df.index.astype("string") - # Change dtype of columns - # to mimic previous behavior - deps_old._df = deps_old._df.astype( - { - "archive": "string", - "bit_depth": "int32", - "channels": "int32", - "checksum": "string", - "duration": "float64", - "format": "string", - "removed": "int32", - "sampling_rate": "int32", - "type": "int32", - "version": "string", - } - ) - deps_old.save(deps_file) - # Check that we get the correct dtypes, - # when loading from cache +def test_backward_compatibility_auto_detection(tmpdir, deps): + """Test auto-detection loads Parquet/CSV when Arrow is missing.""" + # Save as Parquet + parquet_file = audeer.path(tmpdir, "deps.parquet") + deps.save(parquet_file) + + # Load without extension should auto-detect + base_path = audeer.path(tmpdir, "deps") deps2 = audb.Dependencies() - deps2.load(deps_file) - assert deps2._df.index.dtype == audb.core.define.DEPENDENCY_INDEX_DTYPE - pd.testing.assert_frame_equal(deps._df, deps2._df) - assert deps == deps2 + deps2.load(base_path) + pd.testing.assert_frame_equal(deps(), deps2()) + # Save as CSV + csv_file = audeer.path(tmpdir, "deps2.csv") + deps.save(csv_file) -def test_load_save_errors(deps): - """Test possible errors when loading/saving.""" - # Wrong file extension - with pytest.raises(ValueError, match=r".*'txt'.*"): - deps.load("deps.txt") - # File missing - with pytest.raises(FileNotFoundError): - deps.load("deps.csv") + # Load without extension should auto-detect + base_path = audeer.path(tmpdir, "deps2") + deps3 = audb.Dependencies() + deps3.load(base_path) + pd.testing.assert_frame_equal(deps(), deps3()) + + +def test_format_precedence(tmpdir, deps): + """Test Arrow takes precedence over Parquet and CSV.""" + base_path = audeer.path(tmpdir, "deps") + + # Create all three formats with different data + deps.save(base_path + ".csv") + deps.save(base_path + ".parquet") + + # Modify deps slightly + original_version = deps.version("db.files.csv") + deps._df.loc["db.files.csv", "version"] = "2.0.0" + deps.save(base_path + ".arrow") + + # Loading should prefer Arrow + deps2 = audb.Dependencies() + deps2.load(base_path) + assert deps2.version("db.files.csv") == "2.0.0" + + # Clean up for next test - restore original + deps._df.loc["db.files.csv", "version"] = original_version def test_sampling_rate(deps): diff --git a/tests/test_publish.py b/tests/test_publish.py index 15306c71..996ceb1b 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1044,7 +1044,7 @@ def test_publish_error_cross_repository(tmpdir): audb.publish(db_path_v2, "2.0.0", repo2, previous_version="1.0.0") # Publishing to repo2 with previous_version=None should work - os.remove(audeer.path(db_path, "db.parquet")) + os.remove(audeer.path(db_path, "db.arrow")) audb.publish(db_path, "2.0.0", repo2, previous_version=None) # Assert that the new version appears in repo2 diff --git a/tests/test_publish_table.py b/tests/test_publish_table.py index 6ad7e3f0..78927e74 100644 --- a/tests/test_publish_table.py +++ b/tests/test_publish_table.py @@ -90,7 +90,7 @@ def assert_db_published_to_repo( """ repo = audeer.path(repository.host, repository.name) - dependency_file = "db.parquet" + dependency_file = "db.arrow" header_file = "db.yaml" files = list(db.files) tables = list(db)