From d7bdc9b000e1ef99d5f7314cc427c41ccef8cdc6 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Fri, 30 Jan 2026 11:51:35 +0100 Subject: [PATCH 01/16] add reading and writing of parquet and feather data --- cdm_reader_mapper/cdm_mapper/reader.py | 86 +++++++++++------ cdm_reader_mapper/cdm_mapper/writer.py | 72 ++++++++------ cdm_reader_mapper/mdf_reader/reader.py | 90 ++++++++++++------ .../mdf_reader/utils/utilities.py | 89 ++++++++++++++++-- cdm_reader_mapper/mdf_reader/writer.py | 93 ++++++++++++------- 5 files changed, 303 insertions(+), 127 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index 5cc363de..230b69e5 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -47,6 +47,8 @@ import glob import os +from typing import Literal + import pandas as pd from cdm_reader_mapper.common import get_filename, logging_hdlr @@ -56,16 +58,30 @@ from .utils.utilities import get_cdm_subset, get_usecols -def _read_file(ifile, table, col_subset, **kwargs): +def _read_file( + ifile: str, + table: str, + col_subset: str | list | None, + data_format: Literal["csv", "parquet", "feather"], + **kwargs, +) -> pd.DataFrame: usecols = get_usecols(table, col_subset) - return pd.read_csv(ifile, usecols=usecols, **kwargs) + if data_format == "csv": + return pd.read_csv(ifile, usecols=usecols, **kwargs) + if data_format == "parquet": + return pd.read_parquet(ifile, columns=usecols, **kwargs) + if data_format == "feather": + return pd.read_feather(ifile, columns=usecols, **kwargs) + raise ValueError( + f"data_format must be one of [csv, parquet, feather] not {data_format}." + ) def _read_single_file( - ifile, - cdm_subset=None, - col_subset=None, - null_label="null", + ifile: str, + cdm_subset: str | list | None = None, + col_subset: str | list | None = None, + null_label: str = "null", **kwargs, ) -> pd.DataFrame: if not isinstance(cdm_subset, list): @@ -80,13 +96,13 @@ def _read_single_file( def _read_multiple_files( - inp_dir, - prefix=None, - suffix=None, - extension="psv", - cdm_subset=None, - col_subset=None, - null_label="null", + inp_dir: str, + prefix: str | None = None, + suffix: str | None = None, + extension: str = "psv", + cdm_subset: str | list | None = None, + col_subset: str | list | None = None, + null_label: str = "null", logger=None, **kwargs, ) -> list[pd.DataFrame]: @@ -108,6 +124,7 @@ def _read_multiple_files( if table not in properties.cdm_tables: logger.warning(f"Requested table {table} not defined in CDM") continue + logger.info(f"Getting file path for pattern {table}") pattern_ = get_filename( [prefix, table, f"*{suffix}"], path=inp_dir, extension=extension @@ -141,15 +158,16 @@ def _read_multiple_files( def read_tables( - source, - prefix=None, - suffix=None, - extension="psv", - cdm_subset=None, - col_subset=None, - delimiter="|", - na_values=None, - null_label="null", + source: str, + data_format: Literal["csv", "parquet", "feather"] = "csv", + prefix: str | None = None, + suffix: str | None = None, + extension: str | None = None, + cdm_subset: str | list | None = None, + col_subset: str | list | dict | None = None, + delimiter: str = "|", + na_values: str | None = None, + null_label: str = "null", **kwargs, ) -> DataBundle: """ @@ -157,15 +175,17 @@ def read_tables( Parameters ---------- - source: str, optional + source: str The file (including path) or the path to the file(s) to be read. + data_format: {"csv", "parquet", "feather"}, default: "csv" + Format of input data file(s). prefix: str, optional Prefix of file name structure: ``--*.``. Could de used if `source` is a valid directory path. suffix: str, optional Suffix of file name structure: ``-
-*.``. Could de used if `source` is a valid directory path. - extension: str + extension: str, optional Extension of file name structure: ``-
-*.``. Could de used if `source` is a valid directory path. Default: psv @@ -213,6 +233,11 @@ def read_tables( write_data : Write MDF data and validation mask to disk. """ logger = logging_hdlr.init_logger(__name__, level="INFO") + if data_format not in ["csv", "parquet", "feather"]: + raise ValueError( + f"data_format must be one of [csv, parquet, feather] not {data_format}." + ) + # Because how the printers are written, they modify the original data frame!, # also removing rows with empty observation_value in observation_tables kwargs = { @@ -224,6 +249,9 @@ def read_tables( # See if subset, if any of the tables is not as specs cdm_subset = get_cdm_subset(cdm_subset) + if extension is None: + extension = data_format + if os.path.isfile(source): df_list = [ _read_single_file( @@ -231,6 +259,7 @@ def read_tables( cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, + data_format=data_format, **kwargs, ) ] @@ -243,18 +272,17 @@ def read_tables( cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, + data_format=data_format, logger=logger, **kwargs, ) else: - logger.error( - f"Source is neither a valid file name nor a valid directory path: {source}" + raise FileNotFoundError( + f"Source is neither a valid file name nor a valid directory path: {source}." ) - return DataBundle(data=pd.DataFrame()) if len(df_list) == 0: - logger.error("All tables empty in file system") - return DataBundle(data=pd.DataFrame(), mode="tables") + raise ValueError("All tables empty in file system.") merged = pd.concat(df_list, axis=1, join="outer") merged = merged.reset_index(drop=True) diff --git a/cdm_reader_mapper/cdm_mapper/writer.py b/cdm_reader_mapper/cdm_mapper/writer.py index 9527b912..4d97c1c1 100755 --- a/cdm_reader_mapper/cdm_mapper/writer.py +++ b/cdm_reader_mapper/cdm_mapper/writer.py @@ -27,6 +27,7 @@ import pandas as pd from pathlib import Path +from typing import Literal from cdm_reader_mapper.common import get_filename, logging_hdlr @@ -34,37 +35,48 @@ from .utils.utilities import adjust_filename, dict_to_tuple_list, get_cdm_subset -def _table_to_ascii( - data, - delimiter="|", - encoding="utf-8", - col_subset=None, +def _table_to_file( + data: pd.DataFrame, filename=None, + data_format: Literal["csv", "parquet", "feather"] = "csv", + delimiter: str = "|", + encoding: str = "utf-8", + **kwargs, ) -> None: data = data.dropna(how="all") - header = True - wmode = "w" - data.to_csv( - filename, - index=False, - sep=delimiter, - header=header, - mode=wmode, - encoding=encoding, + if data_format == "csv": + header = True + wmode = "w" + data.to_csv( + filename, + index=False, + header=header, + mode=wmode, + sep=delimiter, + encoding=encoding, + **kwargs, + ) + elif data_format == "parquet": + data.to_parquet(filename, **kwargs) + elif data_format == "feather": + data.to_feather(filename, **kwargs) + raise ValueError( + f"data_format must be one of [csv, parquet, feather] not {data_format}." ) def write_tables( - data, - out_dir=None, - prefix=None, - suffix=None, - extension="psv", - filename=None, - cdm_subset=None, - col_subset=None, - delimiter="|", - encoding="utf-8", + data: pd.DataFrame, + data_format: Literal["csv", "parquet", "feather"] = "csv", + out_dir: str | None = None, + prefix: str | None = None, + suffix: str | None = None, + extension: str | None = None, + filename: str | dict | None = None, + cdm_subset: str | list | None = None, + col_subset: str | list | dict | None = None, + delimiter: str = "|", + encoding: str = "utf-8", **kwargs, ) -> None: """Write pandas.DataFrame to CDM-table file on file system. @@ -73,6 +85,8 @@ def write_tables( ---------- data: pandas.DataFrame pandas.DataFrame to export. + data_format: {"csv", "parquet", "feather"}, default: "csv" + Format of input data file(s). out_dir: str, optional Path to the output directory. Default: current directory @@ -80,9 +94,8 @@ def write_tables( Prefix of file name structure: ``-
-*.``. suffix: str, optional Suffix of file name structure: ``-
-*.``. - extension: str + extension: str, optional Extension of file name structure: ``-
-*.``. - Default: psv filename: str or dict, optional Name of the output file name(s). List one filename for each table name in ``data`` ({
:}). @@ -125,6 +138,10 @@ def write_tables( Use this function after reading CDM tables. """ logger = logging_hdlr.init_logger(__name__, level="INFO") + if data_format not in ["csv", "parquet", "feather"]: + raise ValueError( + f"data_format must be one of [csv, parquet, feather] not {data_format}." + ) cdm_subset = get_cdm_subset(cdm_subset) @@ -164,9 +181,10 @@ def write_tables( filename_ = os.path.join(out_dir, filename_) logger.info(f"Writing table {table}: {filename_}") - _table_to_ascii( + _table_to_file( cdm_table, delimiter=delimiter, encoding=encoding, filename=filename_, + data_format=data_format, ) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index ef10639e..dcfa90db 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -2,8 +2,8 @@ from __future__ import annotations -from io import StringIO as StringIO from pathlib import Path +from typing import Literal from cdm_reader_mapper import DataBundle @@ -12,7 +12,7 @@ from .utils.filereader import FileReader from .utils.utilities import validate_arg -from .utils.utilities import as_list, as_path, read_csv +from .utils.utilities import as_list, as_path, read_csv, read_parquet, read_feather def validate_read_mdf_args( @@ -220,11 +220,12 @@ def read_mdf( def read_data( - source, - mask=None, - info=None, - imodel=None, - col_subset=None, + data_file: str, + mask_file: str | None = None, + info_file: str | None = None, + data_format: Literal["csv", "parquet", "feather"] = "csv", + imodel: str | None = None, + col_subset: str | list | tuple | None = None, encoding: str | None = None, **kwargs, ) -> DataBundle: @@ -232,12 +233,14 @@ def read_data( Parameters ---------- - source: str + data_file: str The data file (including path) to be read. - mask: str, optional + mask_file: str, optional The validation file (including path) to be read. - info: str, optional + info_file: str, optional The information file (including path) to be read. + data_format: {"csv", "parquet", "feather"}, default: "csv" + Format of input data file(s). imodel: str, optional Name of internally available input data model. e.g. icoads_r300_d704 @@ -267,28 +270,57 @@ def read_data( write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ - info_dict = open_json_file(info) if info else {} - dtype = info_dict.get("dtypes", "object") - parse_dates = info_dict.get("parse_dates", False) - encoding = encoding or info_dict.get("encoding", None) - - pd_kwargs = kwargs.copy() - pd_kwargs.setdefault("dtype", dtype) - pd_kwargs.setdefault("parse_dates", parse_dates) - pd_kwargs.setdefault("encoding", encoding) + if data_format == "csv": + info_dict = open_json_file(info_file) if info_file else {} + dtype = info_dict.get("dtypes", "object") + parse_dates = info_dict.get("parse_dates", False) + encoding = encoding or info_dict.get("encoding", None) + + pd_kwargs = kwargs.copy() + pd_kwargs.setdefault("dtype", dtype) + pd_kwargs.setdefault("parse_dates", parse_dates) + pd_kwargs.setdefault("encoding", encoding) + + data, infos = read_csv( + data_file, + col_subset=col_subset, + **pd_kwargs, + ) - data, infos = read_csv( - source, - col_subset=col_subset, - **pd_kwargs, - ) + pd_kwargs = kwargs.copy() + pd_kwargs.setdefault("dtype", "boolean") - pd_kwargs = kwargs.copy() - pd_kwargs.setdefault("dtype", "boolean") + mask, _ = read_csv( + mask_file, col_subset=col_subset, columns=infos["columns"], **pd_kwargs + ) + elif data_format == "parquet": + data, infos = read_parquet( + data_file, + col_subset=col_subset, + **kwargs, + ) - mask, _ = read_csv( - mask, col_subset=col_subset, columns=infos["columns"], **pd_kwargs - ) + mask, _ = read_parquet( + mask_file, + col_subset=col_subset, + columns=infos["columns"] ** kwargs, + ) + elif data_format == "feather": + data, infos = read_feather( + data_file, + col_subset=col_subset, + **kwargs, + ) + + mask, _ = read_feather( + mask_file, + col_subset=col_subset, + columns=infos["columns"] ** kwargs, + ) + else: + raise ValueError( + f"data_format must be one of [csv, parquet, feather] not {data_format}." + ) return DataBundle( data=data, diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index f7a782bd..2778796b 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -183,7 +183,7 @@ def update_column_labels(columns: Iterable[str | tuple]) -> pd.Index | pd.MultiI def update_and_select( df: pd.DataFrame, subset: str | list | None = None, - columns: pd.Index | pd.MultiIndex | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, ) -> tuple[pd.DataFrame, dict[str, Any]]: """ Update string column labels and select subset from DataFrame. @@ -206,15 +206,15 @@ def update_and_select( df.columns = update_column_labels(df.columns) if subset is not None: df = df[subset] - if columns is not None and not df.empty: - df = df.reindex(columns=columns) + if column_names is not None and not df.empty: + df = df.reindex(columns=column_names) return df, {"columns": df.columns, "dtypes": df.dtypes} def read_csv( filepath: Path, col_subset: str | list | None = None, - columns: pd.Index | pd.MultiIndex | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, **kwargs, ) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: """ @@ -226,7 +226,7 @@ def read_csv( Path to the CSV file. col_subset : list of str, optional Subset of columns to read from the CSV. - columns: + column_names: Column labels for re-indexing. kwargs : any Additional keyword arguments passed to pandas.read_csv. @@ -244,7 +244,9 @@ def read_csv( data = pd.read_csv(filepath, delimiter=",", **kwargs) if isinstance(data, pd.DataFrame): - data, info = update_and_select(data, subset=col_subset, columns=columns) + data, info = update_and_select( + data, subset=col_subset, column_names=column_names + ) return data, info write_kwargs = {} @@ -254,7 +256,7 @@ def read_csv( data, info = process_textfilereader( data, func=update_and_select, - func_kwargs={"subset": col_subset, "columns": columns}, + func_kwargs={"subset": col_subset, "column_names": column_names}, read_kwargs=kwargs, write_kwargs=write_kwargs, makecopy=False, @@ -262,6 +264,79 @@ def read_csv( return data, info +def read_parquet( + filepath: Path, + col_subset: str | list | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, + **kwargs, +) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: + """ + Safe CSV reader that handles missing files and column subsets. + + Parameters + ---------- + filepath : str or Path or None + Path to the CSV file. + col_subset : list of str, optional + Subset of columns to read from the CSV. + column_names: + Column labels for re-indexing. + kwargs : any + Additional keyword arguments passed to pandas.read_csv. + + Returns + ------- + tuple[pd.DataFrame, dict] + - The CSV as a DataFrame. Empty if file does not exist. + - dictionary containing data column labels and data types + """ + if filepath is None or not Path(filepath).is_file(): + logging.warning(f"File not found: {filepath}") + return pd.DataFrame(), {} + + data = pd.read_parquet(filepath, **kwargs) + + data, info = update_and_select(data, subset=col_subset, column_names=column_names) + + return data, info + + +def read_feather( + filepath: Path, + col_subset: str | list | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, + **kwargs, +) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: + """ + Safe CSV reader that handles missing files and column subsets. + + Parameters + ---------- + filepath : str or Path or None + Path to the CSV file. + col_subset : list of str, optional + Subset of columns to read from the CSV. + column_names: + Column labels for re-indexing. + kwargs : any + Additional keyword arguments passed to pandas.read_csv. + + Returns + ------- + tuple[pd.DataFrame, dict] + - The CSV as a DataFrame. Empty if file does not exist. + - dictionary containing data column labels and data types + """ + if filepath is None or not Path(filepath).is_file(): + logging.warning(f"File not found: {filepath}") + return pd.DataFrame(), {} + + data = pd.read_feather(filepath, **kwargs) + + data, info = update_and_select(data, subset=col_subset, column_names=column_names) + return data, info + + def convert_dtypes(dtypes) -> tuple[str]: """ Convert datetime columns to object dtype and return columns to parse as dates. diff --git a/cdm_reader_mapper/mdf_reader/writer.py b/cdm_reader_mapper/mdf_reader/writer.py index a2d45fcf..d5440aee 100755 --- a/cdm_reader_mapper/mdf_reader/writer.py +++ b/cdm_reader_mapper/mdf_reader/writer.py @@ -6,8 +6,10 @@ import logging from io import StringIO as StringIO from pathlib import Path +from typing import Literal import pandas as pd +from pandas.io.parsers import TextFileReader from .utils.utilities import join, update_column_names, update_dtypes @@ -16,18 +18,19 @@ def write_data( - data, - mask=None, + data: pd.DataFrame | TextFileReader, + mask: pd.DataFrame | TextFileReader | None = None, + data_format: Literal["csv", "parquet", "feather"] = "csv", dtypes: dict | None = None, parse_dates: list | bool = False, - encoding="utf-8", - out_dir=".", - prefix=None, - suffix=None, - extension="csv", - filename=None, - col_subset=None, - delimiter=",", + encoding: str = "utf-8", + out_dir: str = ".", + prefix: str | None = None, + suffix: str | None = None, + extension: str = None, + filename: str | dict | None = None, + col_subset: str | list | tuple | None = None, + delimiter: str = ",", **kwargs, ) -> None: """Write pandas.DataFrame to MDF file on file system. @@ -36,31 +39,32 @@ def write_data( ---------- data: pandas.DataFrame pandas.DataFrame to export. - mask: pandas.DataFrame + mask: pandas.DataFrame, optional validation mask to export. - dtypes: dict + data_format: {"csv", "parquet", "feather"}, default: "csv" + Format of output data file(s). + dtypes: dict, optional Dictionary of data types on ``data``. Dump ``dtypes`` and ``parse_dates`` to json information file. - parse_dates: + parse_dates: list | bool, default: False Information of how to parse dates in :py:attr:`data`. Dump ``dtypes`` and ``parse_dates`` to json information file. For more information see :py:func:`pandas.read_csv`. - encoding: str + encoding: str, default: "utf-8" A string representing the encoding to use in the output file, defaults to utf-8. - out_dir: str + out_dir: str, default: "." Path to the output directory. - Default: current directory prefix: str, optional Prefix of file name structure: ``-data-*.``. suffix: str, optional Suffix of file name structure: ``-data-*.``. - extension: str + extension: str, optional Extension of file name structure: ``-data-*.``. - Default: psv + By default, extension depends on `data_format`. filename: str or dict, optional Name of the output file name(s). List one filename for both ``data`` and ``mask`` ({"data":, "mask":}). - Default: Automatically create file name from table name, ``prefix`` and ``suffix``. + By default, automatically create file name from table name, ``prefix`` and ``suffix``. col_subset: str, tuple or list, optional Specify the section or sections of the file to write. @@ -71,9 +75,8 @@ def write_data( e.g. list type object col_subset = [columns] Column labels could be both string or tuple. - delimiter: str + delimiter: str, default: "," Character or regex pattern to treat as the delimiter while reading with df.to_csv. - Default: "," See Also -------- @@ -88,11 +91,19 @@ def write_data( ---- Use this function after reading MDF data. """ + if data_format not in ["csv", "parquet", "feather"]: + raise ValueError( + f"data_format must be one of [csv, parquet, feather] not {data_format}." + ) + + if extension is None: + extension = data_format + dtypes = dtypes or {} if isinstance(parse_dates, bool): parse_dates = [] - if not isinstance(data, pd.io.parsers.TextFileReader): + if not isinstance(data, TextFileReader): data_list = [data] else: data_list = make_copy(data) @@ -100,7 +111,7 @@ def write_data( if mask is None: mask = pd.DataFrame() - if not isinstance(mask, pd.io.parsers.TextFileReader): + if not isinstance(mask, TextFileReader): mask_list = [mask] else: mask_list = make_copy(mask) @@ -142,18 +153,30 @@ def write_data( info["parse_dates"] = [p for p in info["parse_dates"] if p in header] info["encoding"] = encoding - csv_kwargs = dict( - header=header, - mode=mode, - index=False, - sep=delimiter, - encoding=encoding, - **kwargs, - ) - - data_df.to_csv(filename_data, **csv_kwargs) - if not mask_df.empty: - mask_df.to_csv(filename_mask, **csv_kwargs) + if data_format == "csv": + + csv_kwargs = dict( + header=header, + mode=mode, + index=False, + sep=delimiter, + encoding=encoding, + **kwargs, + ) + + data_df.to_csv(filename_data, **csv_kwargs) + if not mask_df.empty: + mask_df.to_csv(filename_mask, **csv_kwargs) + + elif data_format == "parquet": + data_df.to_parquet(filename_data, **kwargs) + if not mask_df.empty: + mask_df.to_parquet(filename_mask, **kwargs) + + elif data_format == "feather": + data_df.to_feather(filename_data, **kwargs) + if not mask_df.empty: + mask_df.to_feather(filename_mask, **kwargs) with open(filename_info, "w") as fileObj: json.dump(info, fileObj, indent=4) From fea4db01d09ad5e89652e0f8987be469e1c862ec Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 2 Feb 2026 13:33:07 +0100 Subject: [PATCH 02/16] breaking changes: data_file, mask_file, info_file, extension --- cdm_reader_mapper/cdm_mapper/reader.py | 6 +++--- cdm_reader_mapper/cdm_mapper/writer.py | 10 +++++++--- cdm_reader_mapper/mdf_reader/reader.py | 6 +++--- tests/_duplicates.py | 1 + tests/test_databundle.py | 4 ++-- tests/test_mdf_reader.py | 12 ++++++------ tests/test_writers.py | 24 +++++++++++++++++++----- 7 files changed, 41 insertions(+), 22 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index 230b69e5..bec88b0b 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -99,7 +99,7 @@ def _read_multiple_files( inp_dir: str, prefix: str | None = None, suffix: str | None = None, - extension: str = "psv", + extension: str | None = None, cdm_subset: str | list | None = None, col_subset: str | list | None = None, null_label: str = "null", @@ -114,12 +114,12 @@ def _read_multiple_files( files = glob.glob(pattern) if len(files) == 0: - logger.error(f"No files found matching pattern {pattern}") - return [pd.DataFrame()] + raise FileNotFoundError(f"No files found matching pattern {pattern}") df_list = [] if not isinstance(cdm_subset, list): cdm_subset = [cdm_subset] + for table in cdm_subset: if table not in properties.cdm_tables: logger.warning(f"Requested table {table} not defined in CDM") diff --git a/cdm_reader_mapper/cdm_mapper/writer.py b/cdm_reader_mapper/cdm_mapper/writer.py index 4d97c1c1..3011bb68 100755 --- a/cdm_reader_mapper/cdm_mapper/writer.py +++ b/cdm_reader_mapper/cdm_mapper/writer.py @@ -60,9 +60,10 @@ def _table_to_file( data.to_parquet(filename, **kwargs) elif data_format == "feather": data.to_feather(filename, **kwargs) - raise ValueError( - f"data_format must be one of [csv, parquet, feather] not {data_format}." - ) + else: + raise ValueError( + f"data_format must be one of [csv, parquet, feather] not {data_format}." + ) def write_tables( @@ -164,6 +165,9 @@ def write_tables( if out_dir is None: out_dir = "." + if extension is None: + extension = data_format + for table in cdm_subset: if table not in data: cdm_atts = get_cdm_atts(table) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index dcfa90db..6421a751 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -291,7 +291,7 @@ def read_data( pd_kwargs.setdefault("dtype", "boolean") mask, _ = read_csv( - mask_file, col_subset=col_subset, columns=infos["columns"], **pd_kwargs + mask_file, col_subset=col_subset, column_names=infos["columns"], **pd_kwargs ) elif data_format == "parquet": data, infos = read_parquet( @@ -303,7 +303,7 @@ def read_data( mask, _ = read_parquet( mask_file, col_subset=col_subset, - columns=infos["columns"] ** kwargs, + column_names=infos["columns"] ** kwargs, ) elif data_format == "feather": data, infos = read_feather( @@ -315,7 +315,7 @@ def read_data( mask, _ = read_feather( mask_file, col_subset=col_subset, - columns=infos["columns"] ** kwargs, + column_names=infos["columns"] ** kwargs, ) else: raise ValueError( diff --git a/tests/_duplicates.py b/tests/_duplicates.py index 0e5ab654..0e574ec3 100755 --- a/tests/_duplicates.py +++ b/tests/_duplicates.py @@ -132,6 +132,7 @@ def _get_test_data(imodel): suffix=f"{imodel}*", cdm_subset="header", mode="tables", + extension="psv", ) diff --git a/tests/test_databundle.py b/tests/test_databundle.py index 161a65ce..57f4adf7 100755 --- a/tests/test_databundle.py +++ b/tests/test_databundle.py @@ -70,7 +70,7 @@ def sample_db_df_testdata(): mask = test_data[f"test_{data_model}"]["mdf_mask"] info = test_data[f"test_{data_model}"]["mdf_info"] - return read_data(data, mask=mask, info=info) + return read_data(data_file=data, mask_file=mask, info_file=info) @pytest.fixture @@ -80,7 +80,7 @@ def sample_db_reader_testdata(): mask = test_data[f"test_{data_model}"]["mdf_mask"] info = test_data[f"test_{data_model}"]["mdf_info"] - return read_data(data, mask=mask, info=info, chunksize=2) + return read_data(data_file=data, mask_file=mask, info_file=info, chunksize=2) def test_len_df(sample_db_df): diff --git a/tests/test_mdf_reader.py b/tests/test_mdf_reader.py index e1467c0a..bdc2c179 100755 --- a/tests/test_mdf_reader.py +++ b/tests/test_mdf_reader.py @@ -38,7 +38,7 @@ def _read_mdf_test_data(data_model, select=None, drop=None, drop_idx=None, **kwa mask = test_data[f"test_{data_model}"]["mdf_mask"] info = test_data[f"test_{data_model}"]["mdf_info"] - expected = read_data(data, mask=mask, info=info) + expected = read_data(data_file=data, mask_file=mask, info_file=info) if not isinstance(result.data, pd.DataFrame): result.data = result.data.read() @@ -242,7 +242,7 @@ def test_read_data_no_mask(): data_model = "icoads_r300_d721" data = test_data[f"test_{data_model}"]["mdf_data"] info = test_data[f"test_{data_model}"]["mdf_info"] - db = read_data(data, info=info) + db = read_data(data_file=data, info_file=info) assert isinstance(db, DataBundle) @@ -277,7 +277,7 @@ def test_read_data_no_info(): data_model = "icoads_r300_d721" data = test_data[f"test_{data_model}"]["mdf_data"] - db = read_data(data) + db = read_data(data_file=data) assert isinstance(db, DataBundle) @@ -311,7 +311,7 @@ def test_read_data_col_subset(): data_model = "icoads_r300_d721" data = test_data[f"test_{data_model}"]["mdf_data"] info = test_data[f"test_{data_model}"]["mdf_info"] - db = read_data(data, info=info, col_subset="core") + db = read_data(data_file=data, info_file=info, col_subset="core") assert isinstance(db, DataBundle) @@ -345,7 +345,7 @@ def test_read_data_col_subset(): def test_read_data_encoding(): data_model = "icoads_r300_d721" data = test_data[f"test_{data_model}"]["mdf_data"] - db = read_data(data, encoding="cp1252") + db = read_data(data_file=data, encoding="cp1252") assert isinstance(db, DataBundle) @@ -381,7 +381,7 @@ def test_read_data_textfilereader(): data = test_data[f"test_{data_model}"]["mdf_data"] mask = test_data[f"test_{data_model}"]["mdf_mask"] info = test_data[f"test_{data_model}"]["mdf_info"] - db = read_data(data, mask=mask, info=info, chunksize=3) + db = read_data(data_file=data, mask_file=mask, info_file=info, chunksize=3) assert isinstance(db, DataBundle) diff --git a/tests/test_writers.py b/tests/test_writers.py index c880f145..a6a56cf9 100755 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -14,8 +14,9 @@ def db_exp(): for table in cdm_tables: cdm_path = test_data[pattern][f"cdm_{table}"].parent - db = read(cdm_path, suffix=f"{imodel}*", mode="tables") + db = read(cdm_path, suffix=f"{imodel}*", extension="psv", mode="tables") db.imodel = imodel + print(db) return db @@ -28,11 +29,19 @@ def test_write_data(tmp_path, db_exp): def test_write_header(tmp_path, db_exp): table = "header" db_exp.write( - out_dir=tmp_path, suffix=f"{db_exp.imodel}_{table}_all", cdm_subset=table + out_dir=tmp_path, + suffix=f"{db_exp.imodel}_{table}_all", + extension="psv", + cdm_subset=table, ) db_res = read( - tmp_path, suffix=f"{db_exp.imodel}_{table}_all", cdm_subset=table, mode="tables" + tmp_path, + suffix=f"{db_exp.imodel}_{table}_all", + cdm_subset=table, + extension="psv", + mode="tables", ) + table_exp = db_exp[table].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res[table]) @@ -88,8 +97,13 @@ def test_write_filename_dict_observations(tmp_path, db_exp): filename_dict = { "observations-sst": f"observations-sst-{db_exp.imodel}_filename_dict_all.psv", } - db_exp.write(out_dir=tmp_path, filename=filename_dict) - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_filename_dict_all", mode="tables") + db_exp.write(out_dir=tmp_path, filename=filename_dict, extension="psv") + db_res = read( + tmp_path, + suffix=f"{db_exp.imodel}_filename_dict_all", + mode="tables", + extension="psv", + ) table_exp = db_exp["observations-sst"].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res.data["observations-sst"]) From 862ed84309dd251f304621b63ddb02e398a247bb Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 2 Feb 2026 15:21:35 +0100 Subject: [PATCH 03/16] more readability --- cdm_reader_mapper/cdm_mapper/reader.py | 3 +- cdm_reader_mapper/cdm_mapper/writer.py | 6 +- cdm_reader_mapper/mdf_reader/reader.py | 4 +- .../mdf_reader/utils/utilities.py | 102 ++++++++++-------- cdm_reader_mapper/mdf_reader/writer.py | 76 +++++++------ 5 files changed, 111 insertions(+), 80 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index bec88b0b..c3c7484d 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -249,8 +249,7 @@ def read_tables( # See if subset, if any of the tables is not as specs cdm_subset = get_cdm_subset(cdm_subset) - if extension is None: - extension = data_format + extension = extension or data_format if os.path.isfile(source): df_list = [ diff --git a/cdm_reader_mapper/cdm_mapper/writer.py b/cdm_reader_mapper/cdm_mapper/writer.py index 3011bb68..82b3ccde 100755 --- a/cdm_reader_mapper/cdm_mapper/writer.py +++ b/cdm_reader_mapper/cdm_mapper/writer.py @@ -162,11 +162,9 @@ def write_tables( elif filename is None: filename = {} - if out_dir is None: - out_dir = "." + out_dir = out_dir or "." - if extension is None: - extension = data_format + extension = extension or data_format for table in cdm_subset: if table not in data: diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index 6421a751..cc28b7cf 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -153,8 +153,8 @@ def read_mdf( write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ - if skiprows is None: - skiprows = 0 + skiprows = skiprows or 0 + validate_read_mdf_args( source=source, imodel=imodel, diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index 2778796b..9fde85eb 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -4,7 +4,6 @@ import ast import csv -import logging import os from io import StringIO @@ -211,6 +210,45 @@ def update_and_select( return df, {"columns": df.columns, "dtypes": df.dtypes} +def _read_data_from_file( + filepath: Path, + reader: Callable[..., Any], + col_subset: str | list | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, + reader_kwargs: dict | None = None, + iterator: bool = False, +) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: + """Helper file reader.""" + if filepath is None or not Path(filepath).is_file(): + raise FileNotFoundError(f"File not found: {filepath}") + + reader_kwargs = reader_kwargs or {} + + data = reader(filepath, **reader_kwargs) + + if isinstance(data, pd.DataFrame): + return update_and_select(data, subset=col_subset, column_names=column_names) + + if iterator is True: + write_kwargs = {} + if "encoding" in reader_kwargs: + write_kwargs["encoding"] = reader_kwargs["encoding"] + + return process_textfilereader( + data, + func=update_and_select, + func_kwargs={ + "subset": col_subset, + "column_names": column_names, + }, + read_kwargs=reader_kwargs, + write_kwargs=write_kwargs, + makecopy=False, + ) + + raise ValueError(f"Unsupported reader return type: {type(data)}") + + def read_csv( filepath: Path, col_subset: str | list | None = None, @@ -237,31 +275,14 @@ def read_csv( - The CSV as a DataFrame. Empty if file does not exist. - dictionary containing data column labels and data types """ - if filepath is None or not Path(filepath).is_file(): - logging.warning(f"File not found: {filepath}") - return pd.DataFrame(), {} - - data = pd.read_csv(filepath, delimiter=",", **kwargs) - - if isinstance(data, pd.DataFrame): - data, info = update_and_select( - data, subset=col_subset, column_names=column_names - ) - return data, info - - write_kwargs = {} - if "encoding" in kwargs: - write_kwargs["encoding"] = kwargs["encoding"] - - data, info = process_textfilereader( - data, - func=update_and_select, - func_kwargs={"subset": col_subset, "column_names": column_names}, - read_kwargs=kwargs, - write_kwargs=write_kwargs, - makecopy=False, + return _read_data_from_file( + filepath, + reader=pd.read_csv, + col_subset=col_subset, + column_names=column_names, + reader_kwargs={"delimiter": ",", **kwargs}, + iterator=True, ) - return data, info def read_parquet( @@ -290,15 +311,13 @@ def read_parquet( - The CSV as a DataFrame. Empty if file does not exist. - dictionary containing data column labels and data types """ - if filepath is None or not Path(filepath).is_file(): - logging.warning(f"File not found: {filepath}") - return pd.DataFrame(), {} - - data = pd.read_parquet(filepath, **kwargs) - - data, info = update_and_select(data, subset=col_subset, column_names=column_names) - - return data, info + return _read_data_from_file( + filepath, + reader=pd.read_parquet, + col_subset=col_subset, + column_names=column_names, + reader_kwargs=kwargs, + ) def read_feather( @@ -327,14 +346,13 @@ def read_feather( - The CSV as a DataFrame. Empty if file does not exist. - dictionary containing data column labels and data types """ - if filepath is None or not Path(filepath).is_file(): - logging.warning(f"File not found: {filepath}") - return pd.DataFrame(), {} - - data = pd.read_feather(filepath, **kwargs) - - data, info = update_and_select(data, subset=col_subset, column_names=column_names) - return data, info + return _read_data_from_file( + filepath, + reader=pd.read_feather, + col_subset=col_subset, + column_names=column_names, + reader_kwargs=kwargs, + ) def convert_dtypes(dtypes) -> tuple[str]: diff --git a/cdm_reader_mapper/mdf_reader/writer.py b/cdm_reader_mapper/mdf_reader/writer.py index d5440aee..c31021cc 100755 --- a/cdm_reader_mapper/mdf_reader/writer.py +++ b/cdm_reader_mapper/mdf_reader/writer.py @@ -6,7 +6,7 @@ import logging from io import StringIO as StringIO from pathlib import Path -from typing import Literal +from typing import Literal, Any import pandas as pd from pandas.io.parsers import TextFileReader @@ -16,6 +16,38 @@ from ..common import get_filename from ..common.pandas_TextParser_hdlr import make_copy +WRITERS = { + "csv": "to_csv", + "parquet": "to_parquet", + "feather": "to_feather", +} + + +def _normalize_data_chunks( + data: pd.DataFrame | TextFileReader | None, +) -> list | TextFileReader: + """Helper function to normalize data chunks.""" + data = data or pd.DataFrame() + if isinstance(data, pd.DataFrame): + return [data] + if isinstance(data, TextFileReader): + return make_copy(data) + raise TypeError(f"Unsupported data type found: {type(data)}.") + + +def _write_data( + data_df: pd.DataFrame, + mask_df: pd.DataFrame, + data_fn: str, + mask_fn: str, + writer: str, + write_kwargs: dict[str, Any], +) -> None: + """Helper function to write data on disk.""" + getattr(data_df, writer)(data_fn, **write_kwargs) + if not mask_df.empty: + getattr(mask_df, writer)(mask_fn, **write_kwargs) + def write_data( data: pd.DataFrame | TextFileReader, @@ -96,25 +128,14 @@ def write_data( f"data_format must be one of [csv, parquet, feather] not {data_format}." ) - if extension is None: - extension = data_format + extension = extension or data_format dtypes = dtypes or {} if isinstance(parse_dates, bool): parse_dates = [] - if not isinstance(data, TextFileReader): - data_list = [data] - else: - data_list = make_copy(data) - - if mask is None: - mask = pd.DataFrame() - - if not isinstance(mask, TextFileReader): - mask_list = [mask] - else: - mask_list = make_copy(mask) + data_list = _normalize_data_chunks(data) + mask_list = _normalize_data_chunks(mask) info = {"dtypes": dtypes.copy(), "parse_dates": [join(p) for p in parse_dates]} @@ -153,9 +174,9 @@ def write_data( info["parse_dates"] = [p for p in info["parse_dates"] if p in header] info["encoding"] = encoding + write_kwargs = {} if data_format == "csv": - - csv_kwargs = dict( + write_kwargs = dict( header=header, mode=mode, index=False, @@ -164,19 +185,14 @@ def write_data( **kwargs, ) - data_df.to_csv(filename_data, **csv_kwargs) - if not mask_df.empty: - mask_df.to_csv(filename_mask, **csv_kwargs) - - elif data_format == "parquet": - data_df.to_parquet(filename_data, **kwargs) - if not mask_df.empty: - mask_df.to_parquet(filename_mask, **kwargs) - - elif data_format == "feather": - data_df.to_feather(filename_data, **kwargs) - if not mask_df.empty: - mask_df.to_feather(filename_mask, **kwargs) + _write_data( + data_df=data_df, + mask_df=mask_df, + data_fn=filename_data, + mask_fn=filename_mask, + writer=WRITERS[data_format], + write_kwargs=write_kwargs, + ) with open(filename_info, "w") as fileObj: json.dump(info, fileObj, indent=4) From a9dfc12090576f2042f71914fc061ce9a64a1c6f Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 2 Feb 2026 17:01:25 +0100 Subject: [PATCH 04/16] more helper functions --- cdm_reader_mapper/mdf_reader/reader.py | 105 ++++++++++-------- .../mdf_reader/utils/utilities.py | 4 +- cdm_reader_mapper/mdf_reader/writer.py | 3 +- tests/test_reader_utilities.py | 5 +- 4 files changed, 66 insertions(+), 51 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index cc28b7cf..42abb26d 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -3,7 +3,9 @@ from __future__ import annotations from pathlib import Path -from typing import Literal +from typing import Literal, Callable, Any + +import pandas as pd from cdm_reader_mapper import DataBundle @@ -15,6 +17,13 @@ from .utils.utilities import as_list, as_path, read_csv, read_parquet, read_feather +READERS = { + "csv": read_csv, + "parquet": read_parquet, + "feather": read_feather, +} + + def validate_read_mdf_args( *, source: str | Path, @@ -219,6 +228,34 @@ def read_mdf( ) +def _read_data( + data_file: str, + mask_file: str, + reader: Callable[..., Any], + col_subset: str | list | tuple | None, + data_kwargs: dict, + mask_kwargs: dict, +): + """Helper function for reading data files from disk.""" + data, info = reader( + data_file, + col_subset=col_subset, + **data_kwargs, + ) + + if mask_file is None: + mask = pd.DataFrame() + else: + mask, _ = reader( + mask_file, + col_subset=col_subset, + column_names=info["columns"], + **mask_kwargs, + ) + + return data, mask, info + + def read_data( data_file: str, mask_file: str | None = None, @@ -270,62 +307,38 @@ def read_data( write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ + if data_format not in ["csv", "parquet", "feather"]: + raise ValueError( + f"data_format must be one of [csv, parquet, feather] not {data_format}." + ) + + data_kwargs = kwargs.copy() + mask_kwargs = kwargs.copy() if data_format == "csv": info_dict = open_json_file(info_file) if info_file else {} dtype = info_dict.get("dtypes", "object") parse_dates = info_dict.get("parse_dates", False) encoding = encoding or info_dict.get("encoding", None) - pd_kwargs = kwargs.copy() - pd_kwargs.setdefault("dtype", dtype) - pd_kwargs.setdefault("parse_dates", parse_dates) - pd_kwargs.setdefault("encoding", encoding) + data_kwargs.setdefault("dtype", dtype) + data_kwargs.setdefault("parse_dates", parse_dates) + data_kwargs.setdefault("encoding", encoding) - data, infos = read_csv( - data_file, - col_subset=col_subset, - **pd_kwargs, - ) + mask_kwargs.setdefault("dtype", "boolean") - pd_kwargs = kwargs.copy() - pd_kwargs.setdefault("dtype", "boolean") - - mask, _ = read_csv( - mask_file, col_subset=col_subset, column_names=infos["columns"], **pd_kwargs - ) - elif data_format == "parquet": - data, infos = read_parquet( - data_file, - col_subset=col_subset, - **kwargs, - ) - - mask, _ = read_parquet( - mask_file, - col_subset=col_subset, - column_names=infos["columns"] ** kwargs, - ) - elif data_format == "feather": - data, infos = read_feather( - data_file, - col_subset=col_subset, - **kwargs, - ) - - mask, _ = read_feather( - mask_file, - col_subset=col_subset, - column_names=infos["columns"] ** kwargs, - ) - else: - raise ValueError( - f"data_format must be one of [csv, parquet, feather] not {data_format}." - ) + data, mask, info = _read_data( + data_file=data_file, + mask_file=mask_file, + reader=READERS[data_format], + col_subset=col_subset, + data_kwargs=data_kwargs, + mask_kwargs=mask_kwargs, + ) return DataBundle( data=data, - columns=infos["columns"], - dtypes=infos["dtypes"].to_dict(), + columns=info["columns"], + dtypes=info["dtypes"].to_dict(), parse_dates=parse_dates, mask=mask, imodel=imodel, diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index 9fde85eb..f4e776f8 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -280,7 +280,7 @@ def read_csv( reader=pd.read_csv, col_subset=col_subset, column_names=column_names, - reader_kwargs={"delimiter": ",", **kwargs}, + reader_kwargs=kwargs, iterator=True, ) @@ -513,6 +513,8 @@ def process_textfilereader( if write_kwargs is None: write_kwargs = {} + read_kwargs = {k: v for k, v in read_kwargs.items() if k != "delimiter"} + buffers = [] columns = [] diff --git a/cdm_reader_mapper/mdf_reader/writer.py b/cdm_reader_mapper/mdf_reader/writer.py index c31021cc..00e86118 100755 --- a/cdm_reader_mapper/mdf_reader/writer.py +++ b/cdm_reader_mapper/mdf_reader/writer.py @@ -27,7 +27,8 @@ def _normalize_data_chunks( data: pd.DataFrame | TextFileReader | None, ) -> list | TextFileReader: """Helper function to normalize data chunks.""" - data = data or pd.DataFrame() + if data is None: + data = pd.DataFrame() if isinstance(data, pd.DataFrame): return [data] if isinstance(data, TextFileReader): diff --git a/tests/test_reader_utilities.py b/tests/test_reader_utilities.py index 2ae892be..6685d145 100755 --- a/tests/test_reader_utilities.py +++ b/tests/test_reader_utilities.py @@ -178,9 +178,8 @@ def test_read_csv_file_exists(tmp_csv_file): def test_read_csv_file_missing(tmp_path): missing_file = tmp_path / "missing.csv" - df, info = read_csv(missing_file) - assert df.empty - assert info == {} + with pytest.raises(FileNotFoundError): + read_csv(missing_file) def test_read_csv_with_col_subset(tmp_csv_file): From eae832836ce05d6818b90250597ea46965d25f88 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 2 Feb 2026 17:14:49 +0100 Subject: [PATCH 05/16] remove user-specific delimiters in buffer reading --- .../mdf_reader/utils/utilities.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index f4e776f8..bd211027 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -230,9 +230,9 @@ def _read_data_from_file( return update_and_select(data, subset=col_subset, column_names=column_names) if iterator is True: - write_kwargs = {} + writer_kwargs = {} if "encoding" in reader_kwargs: - write_kwargs["encoding"] = reader_kwargs["encoding"] + writer_kwargs["encoding"] = reader_kwargs["encoding"] return process_textfilereader( data, @@ -242,7 +242,7 @@ def _read_data_from_file( "column_names": column_names, }, read_kwargs=reader_kwargs, - write_kwargs=write_kwargs, + write_kwargs=writer_kwargs, makecopy=False, ) @@ -506,14 +506,9 @@ def process_textfilereader( - One or more processed DataFrames (in the same order as returned by `func`) - Any additional outputs from `func` that are not DataFrames """ - if func_kwargs is None: - func_kwargs = {} - if read_kwargs is None: - read_kwargs = {} - if write_kwargs is None: - write_kwargs = {} - - read_kwargs = {k: v for k, v in read_kwargs.items() if k != "delimiter"} + func_kwargs = func_kwargs or {} + read_kwargs = read_kwargs or {} + write_kwargs = write_kwargs or {} buffers = [] columns = [] @@ -560,6 +555,7 @@ def process_textfilereader( result_dfs = [] for buffer, cols, rk in zip(buffers, columns, read_kwargs): buffer.seek(0) + rk = {k: v for k, v in rk.items() if k != "delimiter"} result_dfs.append( pd.read_csv( buffer, From 919ff4d0c5a01956ed5abc81637264b357bab772 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Tue, 3 Feb 2026 10:39:08 +0100 Subject: [PATCH 06/16] soome simple read/write tests --- cdm_reader_mapper/mdf_reader/reader.py | 1 + cdm_reader_mapper/mdf_reader/writer.py | 5 +- tests/test_writers.py | 115 ++++++++++++++++--------- 3 files changed, 80 insertions(+), 41 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index 42abb26d..c342602f 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -314,6 +314,7 @@ def read_data( data_kwargs = kwargs.copy() mask_kwargs = kwargs.copy() + parse_dates = False if data_format == "csv": info_dict = open_json_file(info_file) if info_file else {} dtype = info_dict.get("dtypes", "object") diff --git a/cdm_reader_mapper/mdf_reader/writer.py b/cdm_reader_mapper/mdf_reader/writer.py index 00e86118..9e7dcbe5 100755 --- a/cdm_reader_mapper/mdf_reader/writer.py +++ b/cdm_reader_mapper/mdf_reader/writer.py @@ -138,7 +138,10 @@ def write_data( data_list = _normalize_data_chunks(data) mask_list = _normalize_data_chunks(mask) - info = {"dtypes": dtypes.copy(), "parse_dates": [join(p) for p in parse_dates]} + info = { + "dtypes": {k: str(v) for k, v in dtypes.items()}, + "parse_dates": [join(p) for p in parse_dates], + } logging.info(f"WRITING DATA TO FILES IN: {out_dir}") out_dir = Path(out_dir) diff --git a/tests/test_writers.py b/tests/test_writers.py index a6a56cf9..b05f4b70 100755 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -1,5 +1,7 @@ from __future__ import annotations +import os + import pandas as pd import pytest # noqa @@ -8,7 +10,7 @@ @pytest.fixture(scope="session") -def db_exp(): +def db_tables(): imodel = "icoads_r300_d714" pattern = f"test_{imodel}" for table in cdm_tables: @@ -16,107 +18,140 @@ def db_exp(): db = read(cdm_path, suffix=f"{imodel}*", extension="psv", mode="tables") db.imodel = imodel - print(db) return db -def test_write_data(tmp_path, db_exp): - db_exp.write(out_dir=tmp_path, suffix=f"{db_exp.imodel}_all") - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_all", mode="tables") - pd.testing.assert_frame_equal(db_exp.data, db_res.data) +@pytest.fixture(scope="session") +def db_data(): + imodel = "icoads_r300_d714" + pattern = f"test_{imodel}" + + data_file = test_data[pattern]["mdf_data"] + info_file = test_data[pattern]["mdf_info"] + + db = read(data_file, info_file=info_file, mode="data") + db.imodel = imodel + return db + + +def test_write_data(tmp_path, db_tables): + db_tables.write(out_dir=tmp_path, suffix=f"{db_tables.imodel}_all") + db_res = read(tmp_path, suffix=f"{db_tables.imodel}_all", mode="tables") + pd.testing.assert_frame_equal(db_tables.data, db_res.data) -def test_write_header(tmp_path, db_exp): +def test_write_header(tmp_path, db_tables): table = "header" - db_exp.write( + db_tables.write( out_dir=tmp_path, - suffix=f"{db_exp.imodel}_{table}_all", + suffix=f"{db_tables.imodel}_{table}_all", extension="psv", cdm_subset=table, ) db_res = read( tmp_path, - suffix=f"{db_exp.imodel}_{table}_all", + suffix=f"{db_tables.imodel}_{table}_all", cdm_subset=table, extension="psv", mode="tables", ) - table_exp = db_exp[table].dropna(how="all").reset_index(drop=True) + table_exp = db_tables[table].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res[table]) -def test_write_observations(tmp_path, db_exp): +def test_write_observations(tmp_path, db_tables): table = "observations-sst" - db_exp.write( - out_dir=tmp_path, suffix=f"{db_exp.imodel}_{table}_all", cdm_subset=table + db_tables.write( + out_dir=tmp_path, suffix=f"{db_tables.imodel}_{table}_all", cdm_subset=table ) db_res = read( - tmp_path, suffix=f"{db_exp.imodel}_{table}_all", cdm_subset=table, mode="tables" + tmp_path, + suffix=f"{db_tables.imodel}_{table}_all", + cdm_subset=table, + mode="tables", ) - table_exp = db_exp[table].dropna(how="all").reset_index(drop=True) + table_exp = db_tables[table].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res[table]) -def test_write_fns(tmp_path, db_exp): - db_exp.write( +def test_write_fns(tmp_path, db_tables): + db_tables.write( out_dir=tmp_path, prefix="prefix", - suffix=f"{db_exp.imodel}_all", + suffix=f"{db_tables.imodel}_all", extension="csv", delimiter=",", ) db_res = read( tmp_path, prefix="prefix", - suffix=f"{db_exp.imodel}_all", + suffix=f"{db_tables.imodel}_all", extension="csv", delimiter=",", mode="tables", ) - pd.testing.assert_frame_equal(db_exp.data, db_res.data) + pd.testing.assert_frame_equal(db_tables.data, db_res.data) -def test_write_filename(tmp_path, db_exp): - db_exp.write(out_dir=tmp_path, filename=f"{db_exp.imodel}_filename_all") - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_filename_all", mode="tables") - pd.testing.assert_frame_equal(db_exp.data, db_res.data) +def test_write_filename(tmp_path, db_tables): + db_tables.write(out_dir=tmp_path, filename=f"{db_tables.imodel}_filename_all") + db_res = read(tmp_path, suffix=f"{db_tables.imodel}_filename_all", mode="tables") + pd.testing.assert_frame_equal(db_tables.data, db_res.data) -def test_write_filename_dict_header(tmp_path, db_exp): +def test_write_filename_dict_header(tmp_path, db_tables): filename_dict = { - "header": f"{db_exp.imodel}_filename_dict_all", + "header": f"{db_tables.imodel}_filename_dict_all", } - db_exp.write(out_dir=tmp_path, filename=filename_dict) - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_filename_dict_all", mode="tables") - table_exp = db_exp["header"].dropna(how="all").reset_index(drop=True) + db_tables.write(out_dir=tmp_path, filename=filename_dict) + db_res = read( + tmp_path, suffix=f"{db_tables.imodel}_filename_dict_all", mode="tables" + ) + table_exp = db_tables["header"].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res.data["header"]) -def test_write_filename_dict_observations(tmp_path, db_exp): +def test_write_filename_dict_observations(tmp_path, db_tables): filename_dict = { - "observations-sst": f"observations-sst-{db_exp.imodel}_filename_dict_all.psv", + "observations-sst": f"observations-sst-{db_tables.imodel}_filename_dict_all.psv", } - db_exp.write(out_dir=tmp_path, filename=filename_dict, extension="psv") + db_tables.write(out_dir=tmp_path, filename=filename_dict, extension="psv") db_res = read( tmp_path, - suffix=f"{db_exp.imodel}_filename_dict_all", + suffix=f"{db_tables.imodel}_filename_dict_all", mode="tables", extension="psv", ) - table_exp = db_exp["observations-sst"].dropna(how="all").reset_index(drop=True) + table_exp = db_tables["observations-sst"].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res.data["observations-sst"]) -def test_write_col_subset(tmp_path, db_exp): +def test_write_col_subset(tmp_path, db_tables): table = "header" columns = ["report_id", "latitude", "longitude"] - db_exp.write( + db_tables.write( out_dir=tmp_path, - suffix=f"{db_exp.imodel}_{table}_all", + suffix=f"{db_tables.imodel}_{table}_all", cdm_subset=table, col_subset={table: columns}, ) - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_{table}_all", mode="tables") - table_exp = db_exp[table][columns].dropna(how="all").reset_index(drop=True) + db_res = read(tmp_path, suffix=f"{db_tables.imodel}_{table}_all", mode="tables") + table_exp = db_tables[table][columns].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res[table]) + + +def test_write_parquet(tmp_path, db_data): + db_data.write(out_dir=tmp_path, data_format="parquet") + db_res = read( + os.path.join(tmp_path, "data.parquet"), data_format="parquet", mode="data" + ) + pd.testing.assert_frame_equal(db_data.data, db_res.data) + + +def test_write_feather(tmp_path, db_data): + db_data.write(out_dir=tmp_path, data_format="feather") + db_res = read( + os.path.join(tmp_path, "data.feather"), data_format="feather", mode="data" + ) + pd.testing.assert_frame_equal(db_data.data, db_res.data) From 2a6515cf186222437b81a1b39eb455720a0d0376 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Tue, 3 Feb 2026 11:17:35 +0100 Subject: [PATCH 07/16] main properties as pre-defined Literal constants --- cdm_reader_mapper/cdm_mapper/mapper.py | 4 ++-- cdm_reader_mapper/cdm_mapper/properties.py | 2 +- cdm_reader_mapper/common/pandas_TextParser_hdlr.py | 1 + cdm_reader_mapper/common/replace.py | 1 - cdm_reader_mapper/mdf_reader/properties.py | 10 ++++++---- cdm_reader_mapper/mdf_reader/schemas/schemas.py | 4 ++-- .../mdf_reader/utils/convert_and_decode.py | 8 +++++--- cdm_reader_mapper/mdf_reader/utils/validators.py | 9 ++++++--- cdm_reader_mapper/properties.py | 10 +++++++--- tests/test_reader_convert_and_decode.py | 9 +++++++-- 10 files changed, 37 insertions(+), 21 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/mapper.py b/cdm_reader_mapper/cdm_mapper/mapper.py index f084adec..ec5b09d8 100755 --- a/cdm_reader_mapper/cdm_mapper/mapper.py +++ b/cdm_reader_mapper/cdm_mapper/mapper.py @@ -15,7 +15,7 @@ from copy import deepcopy from io import StringIO -from typing import Any +from typing import Any, get_args import numpy as np import pandas as pd @@ -533,7 +533,7 @@ def map_model( """ logger = logging_hdlr.init_logger(__name__, level=log_level) imodel = imodel.split("_") - if imodel[0] not in properties.supported_data_models: + if imodel[0] not in get_args(properties.SupportedDataModels): logger.error("Input data model " f"{imodel[0]}" " not supported") return diff --git a/cdm_reader_mapper/cdm_mapper/properties.py b/cdm_reader_mapper/cdm_mapper/properties.py index c00bf325..f920fd12 100755 --- a/cdm_reader_mapper/cdm_mapper/properties.py +++ b/cdm_reader_mapper/cdm_mapper/properties.py @@ -2,7 +2,7 @@ from __future__ import annotations -from ..properties import numeric_types, object_types, supported_data_models # noqa +from ..properties import NumericTypes, ObjectTypes, SupportedDataModels # noqa _base = "cdm_reader_mapper.cdm_mapper" diff --git a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py index b3a17b8c..15fbe3ce 100755 --- a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py +++ b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py @@ -1,6 +1,7 @@ """Utilities for handling pandas TextParser objects safely.""" from __future__ import annotations + import pandas as pd from pandas.io.parsers import TextFileReader from io import StringIO diff --git a/cdm_reader_mapper/common/replace.py b/cdm_reader_mapper/common/replace.py index 15bf4351..058bf121 100755 --- a/cdm_reader_mapper/common/replace.py +++ b/cdm_reader_mapper/common/replace.py @@ -19,7 +19,6 @@ from __future__ import annotations - import pandas as pd from . import logging_hdlr diff --git a/cdm_reader_mapper/mdf_reader/properties.py b/cdm_reader_mapper/mdf_reader/properties.py index 8fa41d14..ab089843 100755 --- a/cdm_reader_mapper/mdf_reader/properties.py +++ b/cdm_reader_mapper/mdf_reader/properties.py @@ -2,7 +2,9 @@ from __future__ import annotations -from ..properties import numeric_types, object_types, supported_data_models # noqa +from typing import get_args + +from ..properties import NumericTypes, ObjectTypes, SupportedDataModels # noqa _base = "cdm_reader_mapper.mdf_reader" @@ -17,16 +19,16 @@ } pandas_dtypes = {} -for dtype in object_types: +for dtype in get_args(ObjectTypes): pandas_dtypes[dtype] = "object" -pandas_dtypes.update({x: x for x in numeric_types}) +pandas_dtypes.update({x: x for x in get_args(NumericTypes)}) pandas_dtypes["datetime"] = "datetime" pandas_int = "Int64" # ....and how they are managed data_type_conversion_args = {} -for dtype in numeric_types: +for dtype in get_args(NumericTypes): data_type_conversion_args[dtype] = ["scale", "offset"] data_type_conversion_args["str"] = ["disable_white_strip"] data_type_conversion_args["object"] = ["disable_white_strip"] diff --git a/cdm_reader_mapper/mdf_reader/schemas/schemas.py b/cdm_reader_mapper/mdf_reader/schemas/schemas.py index 96ff7718..f1e8fa54 100755 --- a/cdm_reader_mapper/mdf_reader/schemas/schemas.py +++ b/cdm_reader_mapper/mdf_reader/schemas/schemas.py @@ -10,7 +10,7 @@ from __future__ import annotations from pathlib import Path -from typing import TypedDict +from typing import TypedDict, get_args from cdm_reader_mapper.common.json_dict import collect_json_files, combine_dicts @@ -109,7 +109,7 @@ def _resolve_schema_files( if imodel: parts = imodel.split("_") model = parts[0] - if model not in properties.supported_data_models: + if model not in get_args(properties.SupportedDataModels): raise ValueError(f"Input data model {model} not supported") return collect_json_files(*parts, base=f"{properties._base}.schemas") diff --git a/cdm_reader_mapper/mdf_reader/utils/convert_and_decode.py b/cdm_reader_mapper/mdf_reader/utils/convert_and_decode.py index 121eced5..4f65d1b5 100755 --- a/cdm_reader_mapper/mdf_reader/utils/convert_and_decode.py +++ b/cdm_reader_mapper/mdf_reader/utils/convert_and_decode.py @@ -3,13 +3,15 @@ from __future__ import annotations from decimal import Decimal, InvalidOperation -from typing import Callable, Any +from typing import Callable, Any, get_args import pandas as pd from .. import properties from .utilities import convert_str_boolean +numeric_types = get_args(properties.NumericTypes) + def max_decimal_places(*decimals: Decimal) -> int: """ @@ -102,7 +104,7 @@ def __init__(self, dtype: str, encoding: str = "base36") -> None: self._registry = {"key": self.base36} - for numeric_type in properties.numeric_types: + for numeric_type in numeric_types: self._registry[numeric_type] = self.base36 def decoder(self) -> Callable[[pd.Series], pd.Series] | None: @@ -189,7 +191,7 @@ def __init__(self, dtype: str) -> None: "key": self.object_to_object, } - for numeric_type in properties.numeric_types: + for numeric_type in numeric_types: self._registry[numeric_type] = self.object_to_numeric def converter(self) -> Callable[..., pd.Series]: diff --git a/cdm_reader_mapper/mdf_reader/utils/validators.py b/cdm_reader_mapper/mdf_reader/utils/validators.py index d4d84057..b3f62024 100755 --- a/cdm_reader_mapper/mdf_reader/utils/validators.py +++ b/cdm_reader_mapper/mdf_reader/utils/validators.py @@ -6,13 +6,16 @@ import numpy as np import pandas as pd -from typing import Any, Iterable +from typing import Any, Iterable, get_args from .. import properties from ..codes import codes from .utilities import convert_str_boolean +numeric_types = get_args(properties.NumericTypes) + + def _is_false(x: Any) -> bool: """Check if a value is exactly False.""" return x is False @@ -173,7 +176,7 @@ def validate( } validated_columns = [] - validated_dtypes = set(properties.numeric_types) | {"datetime", "key"} + validated_dtypes = set(numeric_types) | {"datetime", "key"} basic_functions = { "datetime": validate_datetime, @@ -188,7 +191,7 @@ def validate( column_atts = element_atts.get(column, {}) column_type = column_atts.get("column_type") - if column_type in properties.numeric_types: + if column_type in numeric_types: valid_min = column_atts.get("valid_min", -np.inf) valid_max = column_atts.get("valid_max", np.inf) column_mask = validate_numeric(series, valid_min, valid_max) diff --git a/cdm_reader_mapper/properties.py b/cdm_reader_mapper/properties.py index b80513b7..cfc37041 100755 --- a/cdm_reader_mapper/properties.py +++ b/cdm_reader_mapper/properties.py @@ -2,8 +2,12 @@ from __future__ import annotations -numeric_types = ["Int64", "int", "float"] +from typing import Literal -object_types = ["str", "object", "key", "datetime"] +NumericTypes = Literal["Int64", "int", "float"] -supported_data_models = ["craid", "gdac", "icoads", "pub47"] +ObjectTypes = Literal["str", "object", "key", "datetime"] + +SupportedDataModels = Literal["craid", "gdac", "icoads", "pub47"] + +SupportedFileTypes = Literal["csv", "parquet", "feather"] diff --git a/tests/test_reader_convert_and_decode.py b/tests/test_reader_convert_and_decode.py index 56c3c170..c2d02ac9 100755 --- a/tests/test_reader_convert_and_decode.py +++ b/tests/test_reader_convert_and_decode.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import get_args + import pandas as pd import pytest @@ -15,6 +17,9 @@ from cdm_reader_mapper.mdf_reader import properties +numeric_types = get_args(properties.NumericTypes) + + @pytest.fixture def sample_series(): return pd.Series(["A", "Z", "10", "1Z"]) @@ -89,7 +94,7 @@ def test_base36_preserves_boolean(): def test_converter_numeric(numeric_series): - conv = Converters(dtype=next(iter(properties.numeric_types))) + conv = Converters(dtype=next(iter(numeric_types))) func = conv.converter() result = func(numeric_series) @@ -111,7 +116,7 @@ def test_numeric_with_scale_offset(): def test_preprocessing_function_pppp(): - conv = Converters(dtype=next(iter(properties.numeric_types))) + conv = Converters(dtype=next(iter(numeric_types))) series = pd.Series(["0123"], name="PPPP") result = conv.object_to_numeric(series) From a8c02531c3e1e9a71c27efe43be0d0914e6767d2 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Tue, 3 Feb 2026 11:36:35 +0100 Subject: [PATCH 08/16] make use of pre-defined Literal constant SupportedDataTypes --- cdm_reader_mapper/cdm_mapper/reader.py | 18 ++++++++++-------- cdm_reader_mapper/cdm_mapper/writer.py | 15 +++++++++------ cdm_reader_mapper/mdf_reader/reader.py | 10 ++++++---- cdm_reader_mapper/mdf_reader/writer.py | 11 +++++++---- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index c3c7484d..edda597b 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -47,14 +47,15 @@ import glob import os -from typing import Literal +from typing import get_args import pandas as pd from cdm_reader_mapper.common import get_filename, logging_hdlr from cdm_reader_mapper.core.databundle import DataBundle -from . import properties +from ..properties import SupportedFileTypes +from .properties import cdm_tables from .utils.utilities import get_cdm_subset, get_usecols @@ -62,7 +63,7 @@ def _read_file( ifile: str, table: str, col_subset: str | list | None, - data_format: Literal["csv", "parquet", "feather"], + data_format: SupportedFileTypes, **kwargs, ) -> pd.DataFrame: usecols = get_usecols(table, col_subset) @@ -73,7 +74,7 @@ def _read_file( if data_format == "feather": return pd.read_feather(ifile, columns=usecols, **kwargs) raise ValueError( - f"data_format must be one of [csv, parquet, feather] not {data_format}." + f"data_format must be one of {get_args(SupportedFileTypes)} not {data_format}." ) @@ -121,7 +122,7 @@ def _read_multiple_files( cdm_subset = [cdm_subset] for table in cdm_subset: - if table not in properties.cdm_tables: + if table not in cdm_tables: logger.warning(f"Requested table {table} not defined in CDM") continue @@ -159,7 +160,7 @@ def _read_multiple_files( def read_tables( source: str, - data_format: Literal["csv", "parquet", "feather"] = "csv", + data_format: SupportedFileTypes = "csv", prefix: str | None = None, suffix: str | None = None, extension: str | None = None, @@ -233,9 +234,10 @@ def read_tables( write_data : Write MDF data and validation mask to disk. """ logger = logging_hdlr.init_logger(__name__, level="INFO") - if data_format not in ["csv", "parquet", "feather"]: + supported_file_types = get_args(SupportedFileTypes) + if data_format not in supported_file_types: raise ValueError( - f"data_format must be one of [csv, parquet, feather] not {data_format}." + f"data_format must be one of {supported_file_types}, not {data_format}." ) # Because how the printers are written, they modify the original data frame!, diff --git a/cdm_reader_mapper/cdm_mapper/writer.py b/cdm_reader_mapper/cdm_mapper/writer.py index 82b3ccde..8de4f3ed 100755 --- a/cdm_reader_mapper/cdm_mapper/writer.py +++ b/cdm_reader_mapper/cdm_mapper/writer.py @@ -27,18 +27,20 @@ import pandas as pd from pathlib import Path -from typing import Literal +from typing import get_args from cdm_reader_mapper.common import get_filename, logging_hdlr from .tables.tables import get_cdm_atts from .utils.utilities import adjust_filename, dict_to_tuple_list, get_cdm_subset +from ..properties import SupportedFileTypes + def _table_to_file( data: pd.DataFrame, filename=None, - data_format: Literal["csv", "parquet", "feather"] = "csv", + data_format: SupportedFileTypes = "csv", delimiter: str = "|", encoding: str = "utf-8", **kwargs, @@ -62,13 +64,13 @@ def _table_to_file( data.to_feather(filename, **kwargs) else: raise ValueError( - f"data_format must be one of [csv, parquet, feather] not {data_format}." + f"data_format must be one of {get_args(SupportedFileTypes)} not {data_format}." ) def write_tables( data: pd.DataFrame, - data_format: Literal["csv", "parquet", "feather"] = "csv", + data_format: SupportedFileTypes = "csv", out_dir: str | None = None, prefix: str | None = None, suffix: str | None = None, @@ -139,9 +141,10 @@ def write_tables( Use this function after reading CDM tables. """ logger = logging_hdlr.init_logger(__name__, level="INFO") - if data_format not in ["csv", "parquet", "feather"]: + supported_file_types = get_args(SupportedFileTypes) + if data_format not in supported_file_types: raise ValueError( - f"data_format must be one of [csv, parquet, feather] not {data_format}." + f"data_format must be one of {supported_file_types}, not {data_format}." ) cdm_subset = get_cdm_subset(cdm_subset) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index c342602f..f29ea5b1 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -3,7 +3,7 @@ from __future__ import annotations from pathlib import Path -from typing import Literal, Callable, Any +from typing import Callable, Any, get_args import pandas as pd @@ -16,6 +16,7 @@ from .utils.utilities import as_list, as_path, read_csv, read_parquet, read_feather +from ..properties import SupportedFileTypes READERS = { "csv": read_csv, @@ -260,7 +261,7 @@ def read_data( data_file: str, mask_file: str | None = None, info_file: str | None = None, - data_format: Literal["csv", "parquet", "feather"] = "csv", + data_format: SupportedFileTypes = "csv", imodel: str | None = None, col_subset: str | list | tuple | None = None, encoding: str | None = None, @@ -307,9 +308,10 @@ def read_data( write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ - if data_format not in ["csv", "parquet", "feather"]: + supported_file_types = get_args(SupportedFileTypes) + if data_format not in supported_file_types: raise ValueError( - f"data_format must be one of [csv, parquet, feather] not {data_format}." + f"data_format must be one of {supported_file_types}, not {data_format}." ) data_kwargs = kwargs.copy() diff --git a/cdm_reader_mapper/mdf_reader/writer.py b/cdm_reader_mapper/mdf_reader/writer.py index 9e7dcbe5..1ffd9e39 100755 --- a/cdm_reader_mapper/mdf_reader/writer.py +++ b/cdm_reader_mapper/mdf_reader/writer.py @@ -6,7 +6,7 @@ import logging from io import StringIO as StringIO from pathlib import Path -from typing import Literal, Any +from typing import Any, get_args import pandas as pd from pandas.io.parsers import TextFileReader @@ -16,6 +16,8 @@ from ..common import get_filename from ..common.pandas_TextParser_hdlr import make_copy +from ..properties import SupportedFileTypes + WRITERS = { "csv": "to_csv", "parquet": "to_parquet", @@ -53,7 +55,7 @@ def _write_data( def write_data( data: pd.DataFrame | TextFileReader, mask: pd.DataFrame | TextFileReader | None = None, - data_format: Literal["csv", "parquet", "feather"] = "csv", + data_format: SupportedFileTypes = "csv", dtypes: dict | None = None, parse_dates: list | bool = False, encoding: str = "utf-8", @@ -124,9 +126,10 @@ def write_data( ---- Use this function after reading MDF data. """ - if data_format not in ["csv", "parquet", "feather"]: + supported_file_types = get_args(SupportedFileTypes) + if data_format not in supported_file_types: raise ValueError( - f"data_format must be one of [csv, parquet, feather] not {data_format}." + f"data_format must be one of {supported_file_types}, not {data_format}." ) extension = extension or data_format From 03b24440e64054c780bd0c660dd210828adb4b19 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Tue, 3 Feb 2026 11:49:48 +0100 Subject: [PATCH 09/16] readers dictionary approach in cdm_mapper.readers --- cdm_reader_mapper/cdm_mapper/reader.py | 34 +++++++++++-------- .../common/pandas_TextParser_hdlr.py | 4 +-- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index edda597b..2703761b 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -47,7 +47,7 @@ import glob import os -from typing import get_args +from typing import Any, Callable, get_args import pandas as pd @@ -59,27 +59,27 @@ from .utils.utilities import get_cdm_subset, get_usecols +READERS = { + "csv": pd.read_csv, + "parquet": pd.read_parquet, + "feather": pd.read_feather, +} + + def _read_file( ifile: str, table: str, + reader: Callable[..., Any], col_subset: str | list | None, - data_format: SupportedFileTypes, **kwargs, ) -> pd.DataFrame: usecols = get_usecols(table, col_subset) - if data_format == "csv": - return pd.read_csv(ifile, usecols=usecols, **kwargs) - if data_format == "parquet": - return pd.read_parquet(ifile, columns=usecols, **kwargs) - if data_format == "feather": - return pd.read_feather(ifile, columns=usecols, **kwargs) - raise ValueError( - f"data_format must be one of {get_args(SupportedFileTypes)} not {data_format}." - ) + return reader(ifile, usecols=usecols, **kwargs) def _read_single_file( ifile: str, + reader: Callable[..., Any], cdm_subset: str | list | None = None, col_subset: str | list | None = None, null_label: str = "null", @@ -87,7 +87,9 @@ def _read_single_file( ) -> pd.DataFrame: if not isinstance(cdm_subset, list): cdm_subset = [cdm_subset] - dfi_ = _read_file(ifile, table=cdm_subset[0], col_subset=col_subset, **kwargs) + dfi_ = _read_file( + ifile, table=cdm_subset[0], reader=reader, col_subset=col_subset, **kwargs + ) if dfi_.empty: return pd.DataFrame() dfi_ = dfi_.set_index("report_id", drop=False) @@ -98,6 +100,7 @@ def _read_single_file( def _read_multiple_files( inp_dir: str, + reader: Callable[..., Any], prefix: str | None = None, suffix: str | None = None, extension: str | None = None, @@ -140,6 +143,7 @@ def _read_multiple_files( dfi = _read_single_file( paths_[0], + reader=reader, cdm_subset=[table], col_subset=col_subset, null_label=null_label, @@ -240,6 +244,8 @@ def read_tables( f"data_format must be one of {supported_file_types}, not {data_format}." ) + reader = READERS[data_format] + # Because how the printers are written, they modify the original data frame!, # also removing rows with empty observation_value in observation_tables kwargs = { @@ -257,23 +263,23 @@ def read_tables( df_list = [ _read_single_file( source, + reader=reader, cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, - data_format=data_format, **kwargs, ) ] elif os.path.isdir(source): df_list = _read_multiple_files( source, + reader=reader, prefix=prefix, suffix=suffix, extension=extension, cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, - data_format=data_format, logger=logger, **kwargs, ) diff --git a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py index 15fbe3ce..8a61d825 100755 --- a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py +++ b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) -_READ_CSV_KWARGS = [ +READ_CSV_KWARGS = [ "chunksize", "names", "dtype", @@ -47,7 +47,7 @@ def _new_reader_from_buffer(parser: TextFileReader) -> TextFileReader | None: read_dict = read_dict = { k: parser.orig_options.get(k) - for k in _READ_CSV_KWARGS + for k in READ_CSV_KWARGS if k in parser.orig_options } return pd.read_csv(StringIO(raw), **read_dict) From a97a70a6f819b456a4737f9c560b3fc4725081e7 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Tue, 3 Feb 2026 12:22:07 +0100 Subject: [PATCH 10/16] read parquet and feather CDM table files --- cdm_reader_mapper/cdm_mapper/reader.py | 46 ++++++++++++++++---------- tests/test_writers.py | 37 +++++++++++++++++++-- 2 files changed, 63 insertions(+), 20 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index 2703761b..0c8baacb 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -47,7 +47,7 @@ import glob import os -from typing import Any, Callable, get_args +from typing import get_args import pandas as pd @@ -65,21 +65,29 @@ "feather": pd.read_feather, } +READER_KWARGS = { + "csv": "usecols", + "parquet": "columns", + "feather": "columns", +} + def _read_file( ifile: str, table: str, - reader: Callable[..., Any], col_subset: str | list | None, + data_format: SupportedFileTypes, **kwargs, ) -> pd.DataFrame: usecols = get_usecols(table, col_subset) - return reader(ifile, usecols=usecols, **kwargs) + reader = READERS[data_format] + reader_kwargs = {READER_KWARGS[data_format]: usecols, **kwargs} + return reader(ifile, **reader_kwargs) def _read_single_file( ifile: str, - reader: Callable[..., Any], + data_format: SupportedFileTypes, cdm_subset: str | list | None = None, col_subset: str | list | None = None, null_label: str = "null", @@ -88,7 +96,11 @@ def _read_single_file( if not isinstance(cdm_subset, list): cdm_subset = [cdm_subset] dfi_ = _read_file( - ifile, table=cdm_subset[0], reader=reader, col_subset=col_subset, **kwargs + ifile, + table=cdm_subset[0], + data_format=data_format, + col_subset=col_subset, + **kwargs, ) if dfi_.empty: return pd.DataFrame() @@ -100,7 +112,7 @@ def _read_single_file( def _read_multiple_files( inp_dir: str, - reader: Callable[..., Any], + data_format: SupportedFileTypes, prefix: str | None = None, suffix: str | None = None, extension: str | None = None, @@ -143,7 +155,7 @@ def _read_multiple_files( dfi = _read_single_file( paths_[0], - reader=reader, + data_format=data_format, cdm_subset=[table], col_subset=col_subset, null_label=null_label, @@ -244,16 +256,16 @@ def read_tables( f"data_format must be one of {supported_file_types}, not {data_format}." ) - reader = READERS[data_format] - # Because how the printers are written, they modify the original data frame!, # also removing rows with empty observation_value in observation_tables - kwargs = { - "delimiter": delimiter, - "dtype": "object", - "na_values": na_values, - "keep_default_na": False, - } + if data_format == "csv": + kwargs = { + "delimiter": delimiter, + "dtype": "object", + "na_values": na_values, + "keep_default_na": False, + **kwargs, + } # See if subset, if any of the tables is not as specs cdm_subset = get_cdm_subset(cdm_subset) @@ -263,7 +275,7 @@ def read_tables( df_list = [ _read_single_file( source, - reader=reader, + data_format=data_format, cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, @@ -273,7 +285,7 @@ def read_tables( elif os.path.isdir(source): df_list = _read_multiple_files( source, - reader=reader, + data_format=data_format, prefix=prefix, suffix=suffix, extension=extension, diff --git a/tests/test_writers.py b/tests/test_writers.py index b05f4b70..8abddca4 100755 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -34,7 +34,18 @@ def db_data(): return db -def test_write_data(tmp_path, db_tables): +def test_write_data_csv(tmp_path, db_data): + db_data.write(out_dir=tmp_path, data_format="csv") + db_res = read( + os.path.join(tmp_path, "data.csv"), + info_file=os.path.join(tmp_path, "info.json"), + data_format="csv", + mode="data", + ) + pd.testing.assert_frame_equal(db_data.data, db_res.data) + + +def test_write_tables_csv(tmp_path, db_tables): db_tables.write(out_dir=tmp_path, suffix=f"{db_tables.imodel}_all") db_res = read(tmp_path, suffix=f"{db_tables.imodel}_all", mode="tables") pd.testing.assert_frame_equal(db_tables.data, db_res.data) @@ -141,7 +152,7 @@ def test_write_col_subset(tmp_path, db_tables): pd.testing.assert_frame_equal(table_exp, db_res[table]) -def test_write_parquet(tmp_path, db_data): +def test_write_data_parquet(tmp_path, db_data): db_data.write(out_dir=tmp_path, data_format="parquet") db_res = read( os.path.join(tmp_path, "data.parquet"), data_format="parquet", mode="data" @@ -149,9 +160,29 @@ def test_write_parquet(tmp_path, db_data): pd.testing.assert_frame_equal(db_data.data, db_res.data) -def test_write_feather(tmp_path, db_data): +def test_write_data_feather(tmp_path, db_data): db_data.write(out_dir=tmp_path, data_format="feather") db_res = read( os.path.join(tmp_path, "data.feather"), data_format="feather", mode="data" ) pd.testing.assert_frame_equal(db_data.data, db_res.data) + + +def test_write_tables_parquet(tmp_path, db_tables): + db_tables.write( + out_dir=tmp_path, suffix=f"{db_tables.imodel}_all", data_format="parquet" + ) + db_res = read( + tmp_path, suffix=f"{db_tables.imodel}_all", mode="tables", data_format="parquet" + ) + pd.testing.assert_frame_equal(db_tables.data, db_res.data) + + +def test_write_tables_feather(tmp_path, db_tables): + db_tables.write( + out_dir=tmp_path, suffix=f"{db_tables.imodel}_all", data_format="feather" + ) + db_res = read( + tmp_path, suffix=f"{db_tables.imodel}_all", mode="tables", data_format="feather" + ) + pd.testing.assert_frame_equal(db_tables.data, db_res.data) From 2fb9b19b75914f4d7ac01d6f3ffb491ab27c1422 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Tue, 3 Feb 2026 12:39:46 +0100 Subject: [PATCH 11/16] pre-defined Literla constants for supported read and write modes --- cdm_reader_mapper/core/reader.py | 37 ++++++++++++++++++-------------- cdm_reader_mapper/core/writer.py | 35 ++++++++++++++++++++---------- cdm_reader_mapper/properties.py | 4 ++++ 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/cdm_reader_mapper/core/reader.py b/cdm_reader_mapper/core/reader.py index 10eb315a..5beaff32 100755 --- a/cdm_reader_mapper/core/reader.py +++ b/cdm_reader_mapper/core/reader.py @@ -2,15 +2,27 @@ from __future__ import annotations +from typing import get_args + from cdm_reader_mapper.cdm_mapper.reader import read_tables from cdm_reader_mapper.mdf_reader.reader import read_mdf, read_data from .databundle import DataBundle +from ..properties import SupportedReadModes + +supported_read_modes = get_args(SupportedReadModes) + +READERS = { + "mdf": read_mdf, + "data": read_data, + "tables": read_tables, +} + def read( - source, - mode="mdf", + source: str, + mode: SupportedReadModes = "mdf", **kwargs, ) -> DataBundle: """Read either original marine-meteorological data or MDF data or CDM tables from disk. @@ -19,15 +31,13 @@ def read( ---------- source: str Source of the input data. - mode: str, {mdf, data, tables} + mode: str, {mdf, data, tables}, default: mdf Read data mode: * "mdf" to read original marine-meteorological data from disk and convert them to MDF data * "data" to read MDF data from disk * "tables" to read CDM tables from disk. Map MDF data to CDM tables with :py:func:`DataBundle.map_model`. - Default: mdf - Returns ------- DataBundle @@ -46,14 +56,9 @@ def read( `kwargs` are the keyword arguments for the specific `mode` reader. """ - match mode.lower(): - case "mdf": - return read_mdf(source, **kwargs) - case "data": - return read_data(source, **kwargs) - case "tables": - return read_tables(source, **kwargs) - case _: - raise ValueError( - f"No valid mode: {mode}. Choose one of ['mdf', 'data', 'tables']" - ) + if mode not in supported_read_modes: + raise ValueError( + f"No valid mode: {mode}. Choose one of {supported_read_modes}." + ) + + return READERS[mode](source, **kwargs) diff --git a/cdm_reader_mapper/core/writer.py b/cdm_reader_mapper/core/writer.py index ce09c8d5..ae3899ae 100755 --- a/cdm_reader_mapper/core/writer.py +++ b/cdm_reader_mapper/core/writer.py @@ -2,21 +2,35 @@ from __future__ import annotations +from typing import get_args + +import pandas as pd +from pandas.io.parsers import TextFileReader + from cdm_reader_mapper.cdm_mapper.writer import write_tables from cdm_reader_mapper.mdf_reader.writer import write_data +from ..properties import SupportedWriteModes + +supported_write_modes = get_args(SupportedWriteModes) + +WRITERS = { + "data": write_data, + "tables": write_tables, +} + def write( - data, - mode="data", + data: pd.DataFrame | TextFileReader, + mode: SupportedWriteModes = "data", **kwargs, ) -> None: """Write either MDF data or CDM tables on disk. Parameters ---------- - data: pandas.DataFrame - pandas.DataFrame to export. + data: pandas.DataFrame or TextFileReader + Data to export. mode: str, {data, tables} Write data mode: @@ -40,10 +54,9 @@ def write( ---- `kwargs` are the keyword arguments for the specific `mode` reader. """ - match mode.lower(): - case "data": - write_data(data, **kwargs) - case "tables": - write_tables(data, **kwargs) - case _: - raise ValueError(f"No valid mode: {mode}. Choose one of ['data', 'tables']") + if mode not in supported_write_modes: + raise ValueError( + f"No valid mode: {mode}. Choose one of {supported_write_modes}." + ) + + return WRITERS[mode](data, **kwargs) diff --git a/cdm_reader_mapper/properties.py b/cdm_reader_mapper/properties.py index cfc37041..9c82fade 100755 --- a/cdm_reader_mapper/properties.py +++ b/cdm_reader_mapper/properties.py @@ -11,3 +11,7 @@ SupportedDataModels = Literal["craid", "gdac", "icoads", "pub47"] SupportedFileTypes = Literal["csv", "parquet", "feather"] + +SupportedReadModes = Literal["mdf", "data", "tables"] + +SupportedWriteModes = Literal["data", "tables"] From 72d843b2b147fabf1f81c437cb8242074e91a460 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Tue, 3 Feb 2026 16:55:26 +0100 Subject: [PATCH 12/16] more mdf reader and writer tests --- cdm_reader_mapper/core/databundle.py | 6 +- cdm_reader_mapper/mdf_reader/reader.py | 4 +- tests/test_mdf_reader.py | 224 ++++++++++++++++++++++++- tests/test_mdf_writer.py | 123 +++++++++++--- 4 files changed, 322 insertions(+), 35 deletions(-) diff --git a/cdm_reader_mapper/core/databundle.py b/cdm_reader_mapper/core/databundle.py index 78c2a4d2..42e26e6e 100755 --- a/cdm_reader_mapper/core/databundle.py +++ b/cdm_reader_mapper/core/databundle.py @@ -35,11 +35,11 @@ class DataBundle(_DataBundle): ---------- data: pandas.DataFrame, optional MDF DataFrame. - columns: list, optional + columns: pd.Index, pd.MultiIndex or list, optional Column labels of ``data`` - dtypes: dict, optional + dtypes: pd.Series or dict, optional Data types of ``data``. - parse_dates: list, optional + parse_dates: list or bool, optional Information how to parse dates on ``data`` mask: pandas.DataFrame, optional MDF validation mask diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index f29ea5b1..ced76b50 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -231,7 +231,7 @@ def read_mdf( def _read_data( data_file: str, - mask_file: str, + mask_file: str | None, reader: Callable[..., Any], col_subset: str | list | tuple | None, data_kwargs: dict, @@ -341,7 +341,7 @@ def read_data( return DataBundle( data=data, columns=info["columns"], - dtypes=info["dtypes"].to_dict(), + dtypes=info["dtypes"], parse_dates=parse_dates, mask=mask, imodel=imodel, diff --git a/tests/test_mdf_reader.py b/tests/test_mdf_reader.py index bdc2c179..2cee982c 100755 --- a/tests/test_mdf_reader.py +++ b/tests/test_mdf_reader.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import os import numpy as np @@ -8,11 +9,17 @@ from cdm_reader_mapper import test_data, DataBundle from cdm_reader_mapper.mdf_reader.reader import ( + _read_data, read_mdf, read_data, validate_read_mdf_args, ) from cdm_reader_mapper.mdf_reader.utils.filereader import _apply_multiindex +from cdm_reader_mapper.mdf_reader.utils.utilities import ( + read_csv, + read_parquet, + read_feather, +) def _get_columns(columns, select): @@ -226,7 +233,7 @@ def test_read_data_basic(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.MultiIndex) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert isinstance(db.parse_dates, list) assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -261,7 +268,7 @@ def test_read_data_no_mask(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.MultiIndex) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert isinstance(db.parse_dates, list) assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -296,7 +303,7 @@ def test_read_data_no_info(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.MultiIndex) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert db.parse_dates is False assert db.encoding is None assert db.imodel is None @@ -330,7 +337,7 @@ def test_read_data_col_subset(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.Index) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert isinstance(db.parse_dates, list) assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -364,7 +371,7 @@ def test_read_data_encoding(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.Index) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert db.parse_dates is False assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -400,7 +407,7 @@ def test_read_data_textfilereader(): assert isinstance(db.data, pd.io.parsers.TextFileReader) assert isinstance(db.mask, pd.io.parsers.TextFileReader) assert isinstance(db.columns, pd.MultiIndex) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert db.parse_dates == [] assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -511,3 +518,208 @@ def test_validate_read_mdf_args_invalid_years(tmp_path): chunksize=None, skiprows=0, ) + + +@pytest.fixture +def example_data(): + return pd.DataFrame( + { + "A": [1, 2, 3], + "B": [4.0, 5.0, 6.0], + "C": ["x", "y", "z"], + } + ) + + +@pytest.fixture +def example_mask(): + return pd.DataFrame( + { + "A": [True, False, True], + "B": [True, True, False], + "C": [True, True, False], + }, + dtype="boolean", + ) + + +@pytest.fixture +def example_info(example_data): + return { + "columns": example_data.columns, + "dtypes": example_data.dtypes, + "parse_dates": False, + "encoding": None, + } + + +@pytest.fixture +def csv_files(tmp_path, example_data, example_mask, example_info): + data_file = tmp_path / "data.csv" + mask_file = tmp_path / "mask.csv" + info_file = tmp_path / "info.json" + + example_data.to_csv(data_file, index=False) + example_mask.to_csv(mask_file, index=False) + info = { + "columns": list(example_info["columns"]), + "dtypes": example_info["dtypes"].astype(str).to_dict(), + "parse_dates": example_info["parse_dates"], + "encoding": example_info["encoding"], + } + info_file.write_text(json.dumps(info)) + + return data_file, mask_file, info_file + + +@pytest.fixture +def parquet_files(tmp_path, example_data, example_mask): + data_file = tmp_path / "data.parquet" + mask_file = tmp_path / "mask.parquet" + + example_data.to_parquet(data_file) + example_mask.to_parquet(mask_file) + + return data_file, mask_file + + +@pytest.fixture +def feather_files(tmp_path, example_data, example_mask): + data_file = tmp_path / "data.feather" + mask_file = tmp_path / "mask.feather" + + example_data.to_feather(data_file) + example_mask.to_feather(mask_file) + + return data_file, mask_file + + +def test_read_data_with_mask_csv(csv_files, example_data, example_mask, example_info): + data_file, mask_file, _ = csv_files + data, mask, info = _read_data( + data_file=data_file, + mask_file=mask_file, + reader=read_csv, + col_subset=None, + data_kwargs={}, + mask_kwargs={"dtype": "boolean"}, + ) + pd.testing.assert_frame_equal(data, example_data) + pd.testing.assert_frame_equal(mask, example_mask) + pd.testing.assert_index_equal(info["columns"], example_info["columns"]) + pd.testing.assert_series_equal(info["dtypes"], example_info["dtypes"]) + + +def test_read_data_with_mask_parquet( + parquet_files, example_data, example_mask, example_info +): + data_file, mask_file = parquet_files + data, mask, info = _read_data( + data_file=data_file, + mask_file=mask_file, + reader=read_parquet, + col_subset=None, + data_kwargs={}, + mask_kwargs={}, + ) + pd.testing.assert_frame_equal(data, example_data) + pd.testing.assert_frame_equal(mask, example_mask) + pd.testing.assert_index_equal(info["columns"], example_info["columns"]) + pd.testing.assert_series_equal(info["dtypes"], example_info["dtypes"]) + + +def test_read_data_with_mask_feather( + feather_files, example_data, example_mask, example_info +): + data_file, mask_file = feather_files + data, mask, info = _read_data( + data_file=data_file, + mask_file=mask_file, + reader=read_feather, + col_subset=None, + data_kwargs={}, + mask_kwargs={}, + ) + pd.testing.assert_frame_equal(data, example_data) + pd.testing.assert_frame_equal(mask, example_mask) + pd.testing.assert_index_equal(info["columns"], example_info["columns"]) + pd.testing.assert_series_equal(info["dtypes"], example_info["dtypes"]) + + +def test_read_data_without_mask_csv(csv_files, example_data, example_info): + data_file, _, _ = csv_files + data, mask, info = _read_data( + data_file=data_file, + mask_file=None, + reader=read_csv, + col_subset=None, + data_kwargs={}, + mask_kwargs={}, + ) + pd.testing.assert_frame_equal(data, example_data) + assert mask.empty + pd.testing.assert_index_equal(info["columns"], example_info["columns"]) + pd.testing.assert_series_equal(info["dtypes"], example_info["dtypes"]) + + +def test_read_data_csv(csv_files, example_data, example_mask): + data_file, mask_file, info_file = csv_files + + bundle = read_data( + data_file=data_file, + mask_file=mask_file, + info_file=info_file, + data_format="csv", + ) + + assert isinstance(bundle, DataBundle) + pd.testing.assert_frame_equal(bundle.data, example_data) + pd.testing.assert_frame_equal(bundle.mask, example_mask) + pd.testing.assert_index_equal(bundle.columns, example_data.columns) + pd.testing.assert_series_equal(bundle.dtypes, example_data.dtypes) + assert bundle.parse_dates is False + assert bundle.encoding is None + assert bundle.imodel is None + + +def test_read_data_parquet(parquet_files, example_data, example_mask): + data_file, mask_file = parquet_files + + bundle = read_data( + data_file=data_file, + mask_file=mask_file, + data_format="parquet", + ) + + assert isinstance(bundle, DataBundle) + pd.testing.assert_frame_equal(bundle.data, example_data) + pd.testing.assert_frame_equal(bundle.mask, example_mask) + pd.testing.assert_index_equal(bundle.columns, example_data.columns) + pd.testing.assert_series_equal(bundle.dtypes, example_data.dtypes) + assert bundle.parse_dates is False + assert bundle.encoding is None + assert bundle.imodel is None + + +def test_read_data_feather(feather_files, example_data, example_mask): + data_file, mask_file = feather_files + + bundle = read_data( + data_file=data_file, + mask_file=mask_file, + data_format="feather", + ) + + assert isinstance(bundle, DataBundle) + pd.testing.assert_frame_equal(bundle.data, example_data) + pd.testing.assert_frame_equal(bundle.mask, example_mask) + pd.testing.assert_index_equal(bundle.columns, example_data.columns) + pd.testing.assert_series_equal(bundle.dtypes, example_data.dtypes) + assert bundle.parse_dates is False + assert bundle.encoding is None + assert bundle.imodel is None + + +def test_read_data_invalid(): + with pytest.raises(ValueError): + read_data("data.invalid", data_format="invalid") diff --git a/tests/test_mdf_writer.py b/tests/test_mdf_writer.py index fc0a4ca1..1df1c3fa 100755 --- a/tests/test_mdf_writer.py +++ b/tests/test_mdf_writer.py @@ -12,19 +12,27 @@ ) -def test_write_data_basic(tmp_path): - data = pd.DataFrame( +@pytest.fixture +def example_data(): + return pd.DataFrame( { "A": [1, 2, 3], "B": ["1", "2", "3"], } ) - mask = pd.DataFrame( + + +@pytest.fixture +def example_mask(): + return pd.DataFrame( { "A": [True, True, False], "B": [False, True, True], } ) + + +def test_write_data_csv(tmp_path, example_data, example_mask): info = { "dtypes": {"A": "int", "B": "str"}, "parse_dates": [], @@ -32,8 +40,8 @@ def test_write_data_basic(tmp_path): } write_data( - data, - mask=mask, + example_data, + mask=example_mask, out_dir=tmp_path, prefix="test_write", suffix="basic", @@ -54,25 +62,13 @@ def test_write_data_basic(tmp_path): assert info_res == info data_res = pd.read_csv(data_file, dtype=info["dtypes"]) - assert_frame_equal(data, data_res) + assert_frame_equal(example_data, data_res) mask_res = pd.read_csv(mask_file, dtype="bool") - assert_frame_equal(mask, mask_res) + assert_frame_equal(example_mask, mask_res) -def test_write_data_col_subset(tmp_path): - data = pd.DataFrame( - { - "A": [1, 2, 3], - "B": ["1", "2", "3"], - } - ) - mask = pd.DataFrame( - { - "A": [True, True, False], - "B": [False, True, True], - } - ) +def test_write_data_col_subset(tmp_path, example_data, example_mask): info = { "dtypes": {"A": "int"}, "parse_dates": [], @@ -81,8 +77,8 @@ def test_write_data_col_subset(tmp_path): subset = "A" write_data( - data, - mask=mask, + example_data, + mask=example_mask, out_dir=tmp_path, prefix="test_write", suffix="subset", @@ -104,7 +100,86 @@ def test_write_data_col_subset(tmp_path): assert info_res == info data_res = pd.read_csv(data_file, dtype=info["dtypes"]) - assert_frame_equal(data[[subset]], data_res) + assert_frame_equal(example_data[[subset]], data_res) mask_res = pd.read_csv(mask_file, dtype="bool") - assert_frame_equal(mask[[subset]], mask_res) + assert_frame_equal(example_mask[[subset]], mask_res) + + +def test_write_data_parquet(tmp_path, example_data, example_mask): + info = { + "dtypes": {"A": "int", "B": "str"}, + "parse_dates": [], + "encoding": "utf-8", + } + + write_data( + example_data, + mask=example_mask, + out_dir=tmp_path, + prefix="test_write", + suffix="basic", + data_format="parquet", + **info, + ) + + data_file = tmp_path / "test_write-data-basic.parquet" + mask_file = tmp_path / "test_write-mask-basic.parquet" + info_file = tmp_path / "test_write-info-basic.json" + + assert data_file.is_file() + assert mask_file.is_file() + assert info_file.is_file() + + with open(info_file) as read_file: + info_res = json.load(read_file) + + assert info_res == info + + data_res = pd.read_parquet(data_file) + assert_frame_equal(example_data, data_res) + + mask_res = pd.read_parquet(mask_file) + assert_frame_equal(example_mask, mask_res) + + +def test_write_data_feather(tmp_path, example_data, example_mask): + info = { + "dtypes": {"A": "int", "B": "str"}, + "parse_dates": [], + "encoding": "utf-8", + } + + write_data( + example_data, + mask=example_mask, + out_dir=tmp_path, + prefix="test_write", + suffix="basic", + data_format="feather", + **info, + ) + + data_file = tmp_path / "test_write-data-basic.feather" + mask_file = tmp_path / "test_write-mask-basic.feather" + info_file = tmp_path / "test_write-info-basic.json" + + assert data_file.is_file() + assert mask_file.is_file() + assert info_file.is_file() + + with open(info_file) as read_file: + info_res = json.load(read_file) + + assert info_res == info + + data_res = pd.read_feather(data_file) + assert_frame_equal(example_data, data_res) + + mask_res = pd.read_feather(mask_file) + assert_frame_equal(example_mask, mask_res) + + +def test_write_data_invalid(example_data): + with pytest.raises(ValueError): + write_data(example_data, data_format="invalid") From a1c27e923ba8ac39d7f6dad3e527854b0adc1a6e Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 4 Feb 2026 08:19:25 +0100 Subject: [PATCH 13/16] cdm_reader I/O tests --- cdm_reader_mapper/cdm_mapper/reader.py | 1 + tests/test_cdm_io.py | 98 ++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100755 tests/test_cdm_io.py diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index 0c8baacb..e7f1a72a 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -104,6 +104,7 @@ def _read_single_file( ) if dfi_.empty: return pd.DataFrame() + dfi_ = dfi_.set_index("report_id", drop=False) if null_label in dfi_.index: return dfi_.drop(index=null_label) diff --git a/tests/test_cdm_io.py b/tests/test_cdm_io.py new file mode 100755 index 00000000..3a216589 --- /dev/null +++ b/tests/test_cdm_io.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import pandas as pd +import pytest + +from cdm_reader_mapper.cdm_mapper.reader import read_tables +from cdm_reader_mapper.cdm_mapper.writer import write_tables + + +@pytest.fixture +def example_data(): + arrays = [ + ["header", "header", "observations-sst", "observations-sst"], + ["report_id", "A", "report_id", "B"], + ] + multi_cols = pd.MultiIndex.from_arrays(arrays) + return pd.DataFrame( + [ + [1, 0.5, 1, "a"], + [2, 5.0, 2, "b"], + [3, 1.3, 3, "c"], + ], + columns=multi_cols, + ) + + +@pytest.fixture +def csv_path(tmp_path, example_data): + header_file = tmp_path / "header-test.csv" + obssst_file = tmp_path / "observations-sst-test.csv" + + example_data["header"].to_csv(header_file, index=False) + example_data["observations-sst"].to_csv(obssst_file, index=False) + + return tmp_path + + +@pytest.fixture +def parquet_path(tmp_path, example_data): + header_file = tmp_path / "header-test.parquet" + obssst_file = tmp_path / "observations-sst-test.parquet" + + example_data["header"].to_parquet(header_file, index=False) + example_data["observations-sst"].to_parquet(obssst_file, index=False) + + return tmp_path + + +@pytest.fixture +def feather_path(tmp_path, example_data): + header_file = tmp_path / "header-test.feather" + obssst_file = tmp_path / "observations-sst-test.feather" + + example_data["header"].to_feather( + header_file, + ) + example_data["observations-sst"].to_feather(obssst_file) + + return tmp_path + + +def test_read_data_csv(csv_path, example_data): + bundle = read_tables(csv_path, delimiter=",") + pd.testing.assert_frame_equal(bundle.data, example_data.astype(str)) + + +def test_read_data_parquet(parquet_path, example_data): + bundle = read_tables(parquet_path, data_format="parquet") + pd.testing.assert_frame_equal(bundle.data, example_data) + + +def test_read_data_feather(feather_path, example_data): + bundle = read_tables(feather_path, data_format="feather") + pd.testing.assert_frame_equal(bundle.data, example_data) + + +def test_write_data_csv(tmp_path, example_data): + write_tables(example_data, out_dir=tmp_path, suffix="test") + data_header = pd.read_csv(tmp_path / "header-test.csv", delimiter="|") + data_obssst = pd.read_csv(tmp_path / "observations-sst-test.csv", delimiter="|") + pd.testing.assert_frame_equal(example_data["header"], data_header) + pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) + + +def test_write_data_parquet(tmp_path, example_data): + write_tables(example_data, out_dir=tmp_path, suffix="test", data_format="parquet") + data_header = pd.read_parquet(tmp_path / "header-test.parquet") + data_obssst = pd.read_parquet(tmp_path / "observations-sst-test.parquet") + pd.testing.assert_frame_equal(example_data["header"], data_header) + pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) + + +def test_write_data_feather(tmp_path, example_data): + write_tables(example_data, out_dir=tmp_path, suffix="test", data_format="feather") + data_header = pd.read_feather(tmp_path / "header-test.feather") + data_obssst = pd.read_feather(tmp_path / "observations-sst-test.feather") + pd.testing.assert_frame_equal(example_data["header"], data_header) + pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) From 2f49ccc13222922e4db0054d7abfcfdb1bb75bed Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 4 Feb 2026 08:58:48 +0100 Subject: [PATCH 14/16] no need to specify suffix in read and write cdm data --- cdm_reader_mapper/cdm_mapper/reader.py | 9 +++++--- tests/test_cdm_io.py | 30 +++++++++++++------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index e7f1a72a..5c4204ea 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -143,9 +143,12 @@ def _read_multiple_files( continue logger.info(f"Getting file path for pattern {table}") - pattern_ = get_filename( - [prefix, table, f"*{suffix}"], path=inp_dir, extension=extension - ) + _pattern = [table] + if prefix: + _pattern = [prefix] + _pattern + if suffix: + _pattern = _pattern + [f"*{suffix}"] + pattern_ = get_filename(_pattern, path=inp_dir, extension=extension) paths_ = glob.glob(pattern_) if len(paths_) != 1: logger.warning( diff --git a/tests/test_cdm_io.py b/tests/test_cdm_io.py index 3a216589..81bd4483 100755 --- a/tests/test_cdm_io.py +++ b/tests/test_cdm_io.py @@ -26,8 +26,8 @@ def example_data(): @pytest.fixture def csv_path(tmp_path, example_data): - header_file = tmp_path / "header-test.csv" - obssst_file = tmp_path / "observations-sst-test.csv" + header_file = tmp_path / "header.csv" + obssst_file = tmp_path / "observations-sst.csv" example_data["header"].to_csv(header_file, index=False) example_data["observations-sst"].to_csv(obssst_file, index=False) @@ -37,8 +37,8 @@ def csv_path(tmp_path, example_data): @pytest.fixture def parquet_path(tmp_path, example_data): - header_file = tmp_path / "header-test.parquet" - obssst_file = tmp_path / "observations-sst-test.parquet" + header_file = tmp_path / "header.parquet" + obssst_file = tmp_path / "observations-sst.parquet" example_data["header"].to_parquet(header_file, index=False) example_data["observations-sst"].to_parquet(obssst_file, index=False) @@ -48,8 +48,8 @@ def parquet_path(tmp_path, example_data): @pytest.fixture def feather_path(tmp_path, example_data): - header_file = tmp_path / "header-test.feather" - obssst_file = tmp_path / "observations-sst-test.feather" + header_file = tmp_path / "header.feather" + obssst_file = tmp_path / "observations-sst.feather" example_data["header"].to_feather( header_file, @@ -75,24 +75,24 @@ def test_read_data_feather(feather_path, example_data): def test_write_data_csv(tmp_path, example_data): - write_tables(example_data, out_dir=tmp_path, suffix="test") - data_header = pd.read_csv(tmp_path / "header-test.csv", delimiter="|") - data_obssst = pd.read_csv(tmp_path / "observations-sst-test.csv", delimiter="|") + write_tables(example_data, out_dir=tmp_path) + data_header = pd.read_csv(tmp_path / "header.csv", delimiter="|") + data_obssst = pd.read_csv(tmp_path / "observations-sst.csv", delimiter="|") pd.testing.assert_frame_equal(example_data["header"], data_header) pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) def test_write_data_parquet(tmp_path, example_data): - write_tables(example_data, out_dir=tmp_path, suffix="test", data_format="parquet") - data_header = pd.read_parquet(tmp_path / "header-test.parquet") - data_obssst = pd.read_parquet(tmp_path / "observations-sst-test.parquet") + write_tables(example_data, out_dir=tmp_path, data_format="parquet") + data_header = pd.read_parquet(tmp_path / "header.parquet") + data_obssst = pd.read_parquet(tmp_path / "observations-sst.parquet") pd.testing.assert_frame_equal(example_data["header"], data_header) pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) def test_write_data_feather(tmp_path, example_data): - write_tables(example_data, out_dir=tmp_path, suffix="test", data_format="feather") - data_header = pd.read_feather(tmp_path / "header-test.feather") - data_obssst = pd.read_feather(tmp_path / "observations-sst-test.feather") + write_tables(example_data, out_dir=tmp_path, data_format="feather") + data_header = pd.read_feather(tmp_path / "header.feather") + data_obssst = pd.read_feather(tmp_path / "observations-sst.feather") pd.testing.assert_frame_equal(example_data["header"], data_header) pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) From df744a9b8bbcb1f5a37bbada721aa3785cc432fe Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 4 Feb 2026 09:22:53 +0100 Subject: [PATCH 15/16] mdf write_data: dtypes could be one of dict, pd.Series or None --- cdm_reader_mapper/mdf_reader/writer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/writer.py b/cdm_reader_mapper/mdf_reader/writer.py index 1ffd9e39..6722ecae 100755 --- a/cdm_reader_mapper/mdf_reader/writer.py +++ b/cdm_reader_mapper/mdf_reader/writer.py @@ -56,7 +56,7 @@ def write_data( data: pd.DataFrame | TextFileReader, mask: pd.DataFrame | TextFileReader | None = None, data_format: SupportedFileTypes = "csv", - dtypes: dict | None = None, + dtypes: pd.Series | dict | None = None, parse_dates: list | bool = False, encoding: str = "utf-8", out_dir: str = ".", @@ -134,7 +134,9 @@ def write_data( extension = extension or data_format - dtypes = dtypes or {} + if not isinstance(dtypes, (dict, pd.Series)): + dtypes = {} + if isinstance(parse_dates, bool): parse_dates = [] From 1e3d59557041f181f4773687e338781a4cb5dc59 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Thu, 5 Feb 2026 15:17:40 +0100 Subject: [PATCH 16/16] update CHANGELOG --- CHANGES.rst | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index a3be1749..cd2fedee 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,14 +10,29 @@ Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`) New features and enhancements ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * ``mdf_reader.read_data`` now supports chunking (:pull:`360`) +* read and write both `parquet` and `feather` files including new parameter `data_format` (:issue:`353`, :pull:`363`): + + * `mdf_reader.read_data`, + * `mdf_reader.write_data` + * `cdm_mapper.read_tables` + * `cdm_mapper.write_tables` Breaking changes ^^^^^^^^^^^^^^^^ * ``DataBundle.stack_v`` and ``DataBundle.stack_h`` only support `pd.DataFrames` as input, otherwise raises an `ValueError` (:pull:`360`) +* set default for `extension` from `psv` to specified `data_format` (:pull:`363`): + + * `cdm_mapper.read_tables` + * `cdm_mapper.write_tables` + +* set default for `extension` from ``csv` to specified `data_format` in `mdf_reader.write_data` (:pull:`363`) +* `mdf_reader.read_data`: save `dtypes` in return DataBundle as `pd.Series` not `dict` (:pull:`363`) Internal changes ^^^^^^^^^^^^^^^^ * re-work internal structure for more readability and better performance (:pull:`360`) +* use pre-defined `Literal` constants in `cdm_reader_mapper.properties` (:pull:`363`) +* `mdf_reader.utils.utilities.read_csv`: parameter `columns` to `column_names` (:pull:`363`) 2.2.1 (2026-01-23) ------------------