diff --git a/CHANGES.rst b/CHANGES.rst index a3be1749..cd2fedee 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,14 +10,29 @@ Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`) New features and enhancements ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * ``mdf_reader.read_data`` now supports chunking (:pull:`360`) +* read and write both `parquet` and `feather` files including new parameter `data_format` (:issue:`353`, :pull:`363`): + + * `mdf_reader.read_data`, + * `mdf_reader.write_data` + * `cdm_mapper.read_tables` + * `cdm_mapper.write_tables` Breaking changes ^^^^^^^^^^^^^^^^ * ``DataBundle.stack_v`` and ``DataBundle.stack_h`` only support `pd.DataFrames` as input, otherwise raises an `ValueError` (:pull:`360`) +* set default for `extension` from `psv` to specified `data_format` (:pull:`363`): + + * `cdm_mapper.read_tables` + * `cdm_mapper.write_tables` + +* set default for `extension` from ``csv` to specified `data_format` in `mdf_reader.write_data` (:pull:`363`) +* `mdf_reader.read_data`: save `dtypes` in return DataBundle as `pd.Series` not `dict` (:pull:`363`) Internal changes ^^^^^^^^^^^^^^^^ * re-work internal structure for more readability and better performance (:pull:`360`) +* use pre-defined `Literal` constants in `cdm_reader_mapper.properties` (:pull:`363`) +* `mdf_reader.utils.utilities.read_csv`: parameter `columns` to `column_names` (:pull:`363`) 2.2.1 (2026-01-23) ------------------ diff --git a/cdm_reader_mapper/cdm_mapper/mapper.py b/cdm_reader_mapper/cdm_mapper/mapper.py index f084adec..ec5b09d8 100755 --- a/cdm_reader_mapper/cdm_mapper/mapper.py +++ b/cdm_reader_mapper/cdm_mapper/mapper.py @@ -15,7 +15,7 @@ from copy import deepcopy from io import StringIO -from typing import Any +from typing import Any, get_args import numpy as np import pandas as pd @@ -533,7 +533,7 @@ def map_model( """ logger = logging_hdlr.init_logger(__name__, level=log_level) imodel = imodel.split("_") - if imodel[0] not in properties.supported_data_models: + if imodel[0] not in get_args(properties.SupportedDataModels): logger.error("Input data model " f"{imodel[0]}" " not supported") return diff --git a/cdm_reader_mapper/cdm_mapper/properties.py b/cdm_reader_mapper/cdm_mapper/properties.py index c00bf325..f920fd12 100755 --- a/cdm_reader_mapper/cdm_mapper/properties.py +++ b/cdm_reader_mapper/cdm_mapper/properties.py @@ -2,7 +2,7 @@ from __future__ import annotations -from ..properties import numeric_types, object_types, supported_data_models # noqa +from ..properties import NumericTypes, ObjectTypes, SupportedDataModels # noqa _base = "cdm_reader_mapper.cdm_mapper" diff --git a/cdm_reader_mapper/cdm_mapper/reader.py b/cdm_reader_mapper/cdm_mapper/reader.py index 5cc363de..5c4204ea 100755 --- a/cdm_reader_mapper/cdm_mapper/reader.py +++ b/cdm_reader_mapper/cdm_mapper/reader.py @@ -47,32 +47,64 @@ import glob import os +from typing import get_args + import pandas as pd from cdm_reader_mapper.common import get_filename, logging_hdlr from cdm_reader_mapper.core.databundle import DataBundle -from . import properties +from ..properties import SupportedFileTypes +from .properties import cdm_tables from .utils.utilities import get_cdm_subset, get_usecols -def _read_file(ifile, table, col_subset, **kwargs): +READERS = { + "csv": pd.read_csv, + "parquet": pd.read_parquet, + "feather": pd.read_feather, +} + +READER_KWARGS = { + "csv": "usecols", + "parquet": "columns", + "feather": "columns", +} + + +def _read_file( + ifile: str, + table: str, + col_subset: str | list | None, + data_format: SupportedFileTypes, + **kwargs, +) -> pd.DataFrame: usecols = get_usecols(table, col_subset) - return pd.read_csv(ifile, usecols=usecols, **kwargs) + reader = READERS[data_format] + reader_kwargs = {READER_KWARGS[data_format]: usecols, **kwargs} + return reader(ifile, **reader_kwargs) def _read_single_file( - ifile, - cdm_subset=None, - col_subset=None, - null_label="null", + ifile: str, + data_format: SupportedFileTypes, + cdm_subset: str | list | None = None, + col_subset: str | list | None = None, + null_label: str = "null", **kwargs, ) -> pd.DataFrame: if not isinstance(cdm_subset, list): cdm_subset = [cdm_subset] - dfi_ = _read_file(ifile, table=cdm_subset[0], col_subset=col_subset, **kwargs) + dfi_ = _read_file( + ifile, + table=cdm_subset[0], + data_format=data_format, + col_subset=col_subset, + **kwargs, + ) if dfi_.empty: return pd.DataFrame() + dfi_ = dfi_.set_index("report_id", drop=False) if null_label in dfi_.index: return dfi_.drop(index=null_label) @@ -80,13 +112,14 @@ def _read_single_file( def _read_multiple_files( - inp_dir, - prefix=None, - suffix=None, - extension="psv", - cdm_subset=None, - col_subset=None, - null_label="null", + inp_dir: str, + data_format: SupportedFileTypes, + prefix: str | None = None, + suffix: str | None = None, + extension: str | None = None, + cdm_subset: str | list | None = None, + col_subset: str | list | None = None, + null_label: str = "null", logger=None, **kwargs, ) -> list[pd.DataFrame]: @@ -98,20 +131,24 @@ def _read_multiple_files( files = glob.glob(pattern) if len(files) == 0: - logger.error(f"No files found matching pattern {pattern}") - return [pd.DataFrame()] + raise FileNotFoundError(f"No files found matching pattern {pattern}") df_list = [] if not isinstance(cdm_subset, list): cdm_subset = [cdm_subset] + for table in cdm_subset: - if table not in properties.cdm_tables: + if table not in cdm_tables: logger.warning(f"Requested table {table} not defined in CDM") continue + logger.info(f"Getting file path for pattern {table}") - pattern_ = get_filename( - [prefix, table, f"*{suffix}"], path=inp_dir, extension=extension - ) + _pattern = [table] + if prefix: + _pattern = [prefix] + _pattern + if suffix: + _pattern = _pattern + [f"*{suffix}"] + pattern_ = get_filename(_pattern, path=inp_dir, extension=extension) paths_ = glob.glob(pattern_) if len(paths_) != 1: logger.warning( @@ -122,6 +159,7 @@ def _read_multiple_files( dfi = _read_single_file( paths_[0], + data_format=data_format, cdm_subset=[table], col_subset=col_subset, null_label=null_label, @@ -141,15 +179,16 @@ def _read_multiple_files( def read_tables( - source, - prefix=None, - suffix=None, - extension="psv", - cdm_subset=None, - col_subset=None, - delimiter="|", - na_values=None, - null_label="null", + source: str, + data_format: SupportedFileTypes = "csv", + prefix: str | None = None, + suffix: str | None = None, + extension: str | None = None, + cdm_subset: str | list | None = None, + col_subset: str | list | dict | None = None, + delimiter: str = "|", + na_values: str | None = None, + null_label: str = "null", **kwargs, ) -> DataBundle: """ @@ -157,15 +196,17 @@ def read_tables( Parameters ---------- - source: str, optional + source: str The file (including path) or the path to the file(s) to be read. + data_format: {"csv", "parquet", "feather"}, default: "csv" + Format of input data file(s). prefix: str, optional Prefix of file name structure: ``--*.``. Could de used if `source` is a valid directory path. suffix: str, optional Suffix of file name structure: ``-
-*.``. Could de used if `source` is a valid directory path. - extension: str + extension: str, optional Extension of file name structure: ``-
-*.``. Could de used if `source` is a valid directory path. Default: psv @@ -213,21 +254,32 @@ def read_tables( write_data : Write MDF data and validation mask to disk. """ logger = logging_hdlr.init_logger(__name__, level="INFO") + supported_file_types = get_args(SupportedFileTypes) + if data_format not in supported_file_types: + raise ValueError( + f"data_format must be one of {supported_file_types}, not {data_format}." + ) + # Because how the printers are written, they modify the original data frame!, # also removing rows with empty observation_value in observation_tables - kwargs = { - "delimiter": delimiter, - "dtype": "object", - "na_values": na_values, - "keep_default_na": False, - } + if data_format == "csv": + kwargs = { + "delimiter": delimiter, + "dtype": "object", + "na_values": na_values, + "keep_default_na": False, + **kwargs, + } # See if subset, if any of the tables is not as specs cdm_subset = get_cdm_subset(cdm_subset) + extension = extension or data_format + if os.path.isfile(source): df_list = [ _read_single_file( source, + data_format=data_format, cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, @@ -237,6 +289,7 @@ def read_tables( elif os.path.isdir(source): df_list = _read_multiple_files( source, + data_format=data_format, prefix=prefix, suffix=suffix, extension=extension, @@ -247,14 +300,12 @@ def read_tables( **kwargs, ) else: - logger.error( - f"Source is neither a valid file name nor a valid directory path: {source}" + raise FileNotFoundError( + f"Source is neither a valid file name nor a valid directory path: {source}." ) - return DataBundle(data=pd.DataFrame()) if len(df_list) == 0: - logger.error("All tables empty in file system") - return DataBundle(data=pd.DataFrame(), mode="tables") + raise ValueError("All tables empty in file system.") merged = pd.concat(df_list, axis=1, join="outer") merged = merged.reset_index(drop=True) diff --git a/cdm_reader_mapper/cdm_mapper/writer.py b/cdm_reader_mapper/cdm_mapper/writer.py index 9527b912..8de4f3ed 100755 --- a/cdm_reader_mapper/cdm_mapper/writer.py +++ b/cdm_reader_mapper/cdm_mapper/writer.py @@ -27,44 +27,59 @@ import pandas as pd from pathlib import Path +from typing import get_args from cdm_reader_mapper.common import get_filename, logging_hdlr from .tables.tables import get_cdm_atts from .utils.utilities import adjust_filename, dict_to_tuple_list, get_cdm_subset +from ..properties import SupportedFileTypes -def _table_to_ascii( - data, - delimiter="|", - encoding="utf-8", - col_subset=None, + +def _table_to_file( + data: pd.DataFrame, filename=None, + data_format: SupportedFileTypes = "csv", + delimiter: str = "|", + encoding: str = "utf-8", + **kwargs, ) -> None: data = data.dropna(how="all") - header = True - wmode = "w" - data.to_csv( - filename, - index=False, - sep=delimiter, - header=header, - mode=wmode, - encoding=encoding, - ) + if data_format == "csv": + header = True + wmode = "w" + data.to_csv( + filename, + index=False, + header=header, + mode=wmode, + sep=delimiter, + encoding=encoding, + **kwargs, + ) + elif data_format == "parquet": + data.to_parquet(filename, **kwargs) + elif data_format == "feather": + data.to_feather(filename, **kwargs) + else: + raise ValueError( + f"data_format must be one of {get_args(SupportedFileTypes)} not {data_format}." + ) def write_tables( - data, - out_dir=None, - prefix=None, - suffix=None, - extension="psv", - filename=None, - cdm_subset=None, - col_subset=None, - delimiter="|", - encoding="utf-8", + data: pd.DataFrame, + data_format: SupportedFileTypes = "csv", + out_dir: str | None = None, + prefix: str | None = None, + suffix: str | None = None, + extension: str | None = None, + filename: str | dict | None = None, + cdm_subset: str | list | None = None, + col_subset: str | list | dict | None = None, + delimiter: str = "|", + encoding: str = "utf-8", **kwargs, ) -> None: """Write pandas.DataFrame to CDM-table file on file system. @@ -73,6 +88,8 @@ def write_tables( ---------- data: pandas.DataFrame pandas.DataFrame to export. + data_format: {"csv", "parquet", "feather"}, default: "csv" + Format of input data file(s). out_dir: str, optional Path to the output directory. Default: current directory @@ -80,9 +97,8 @@ def write_tables( Prefix of file name structure: ``-
-*.``. suffix: str, optional Suffix of file name structure: ``-
-*.``. - extension: str + extension: str, optional Extension of file name structure: ``-
-*.``. - Default: psv filename: str or dict, optional Name of the output file name(s). List one filename for each table name in ``data`` ({
:}). @@ -125,6 +141,11 @@ def write_tables( Use this function after reading CDM tables. """ logger = logging_hdlr.init_logger(__name__, level="INFO") + supported_file_types = get_args(SupportedFileTypes) + if data_format not in supported_file_types: + raise ValueError( + f"data_format must be one of {supported_file_types}, not {data_format}." + ) cdm_subset = get_cdm_subset(cdm_subset) @@ -144,8 +165,9 @@ def write_tables( elif filename is None: filename = {} - if out_dir is None: - out_dir = "." + out_dir = out_dir or "." + + extension = extension or data_format for table in cdm_subset: if table not in data: @@ -164,9 +186,10 @@ def write_tables( filename_ = os.path.join(out_dir, filename_) logger.info(f"Writing table {table}: {filename_}") - _table_to_ascii( + _table_to_file( cdm_table, delimiter=delimiter, encoding=encoding, filename=filename_, + data_format=data_format, ) diff --git a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py index b3a17b8c..8a61d825 100755 --- a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py +++ b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py @@ -1,6 +1,7 @@ """Utilities for handling pandas TextParser objects safely.""" from __future__ import annotations + import pandas as pd from pandas.io.parsers import TextFileReader from io import StringIO @@ -8,7 +9,7 @@ logger = logging.getLogger(__name__) -_READ_CSV_KWARGS = [ +READ_CSV_KWARGS = [ "chunksize", "names", "dtype", @@ -46,7 +47,7 @@ def _new_reader_from_buffer(parser: TextFileReader) -> TextFileReader | None: read_dict = read_dict = { k: parser.orig_options.get(k) - for k in _READ_CSV_KWARGS + for k in READ_CSV_KWARGS if k in parser.orig_options } return pd.read_csv(StringIO(raw), **read_dict) diff --git a/cdm_reader_mapper/common/replace.py b/cdm_reader_mapper/common/replace.py index 15bf4351..058bf121 100755 --- a/cdm_reader_mapper/common/replace.py +++ b/cdm_reader_mapper/common/replace.py @@ -19,7 +19,6 @@ from __future__ import annotations - import pandas as pd from . import logging_hdlr diff --git a/cdm_reader_mapper/core/databundle.py b/cdm_reader_mapper/core/databundle.py index 78c2a4d2..42e26e6e 100755 --- a/cdm_reader_mapper/core/databundle.py +++ b/cdm_reader_mapper/core/databundle.py @@ -35,11 +35,11 @@ class DataBundle(_DataBundle): ---------- data: pandas.DataFrame, optional MDF DataFrame. - columns: list, optional + columns: pd.Index, pd.MultiIndex or list, optional Column labels of ``data`` - dtypes: dict, optional + dtypes: pd.Series or dict, optional Data types of ``data``. - parse_dates: list, optional + parse_dates: list or bool, optional Information how to parse dates on ``data`` mask: pandas.DataFrame, optional MDF validation mask diff --git a/cdm_reader_mapper/core/reader.py b/cdm_reader_mapper/core/reader.py index 10eb315a..5beaff32 100755 --- a/cdm_reader_mapper/core/reader.py +++ b/cdm_reader_mapper/core/reader.py @@ -2,15 +2,27 @@ from __future__ import annotations +from typing import get_args + from cdm_reader_mapper.cdm_mapper.reader import read_tables from cdm_reader_mapper.mdf_reader.reader import read_mdf, read_data from .databundle import DataBundle +from ..properties import SupportedReadModes + +supported_read_modes = get_args(SupportedReadModes) + +READERS = { + "mdf": read_mdf, + "data": read_data, + "tables": read_tables, +} + def read( - source, - mode="mdf", + source: str, + mode: SupportedReadModes = "mdf", **kwargs, ) -> DataBundle: """Read either original marine-meteorological data or MDF data or CDM tables from disk. @@ -19,15 +31,13 @@ def read( ---------- source: str Source of the input data. - mode: str, {mdf, data, tables} + mode: str, {mdf, data, tables}, default: mdf Read data mode: * "mdf" to read original marine-meteorological data from disk and convert them to MDF data * "data" to read MDF data from disk * "tables" to read CDM tables from disk. Map MDF data to CDM tables with :py:func:`DataBundle.map_model`. - Default: mdf - Returns ------- DataBundle @@ -46,14 +56,9 @@ def read( `kwargs` are the keyword arguments for the specific `mode` reader. """ - match mode.lower(): - case "mdf": - return read_mdf(source, **kwargs) - case "data": - return read_data(source, **kwargs) - case "tables": - return read_tables(source, **kwargs) - case _: - raise ValueError( - f"No valid mode: {mode}. Choose one of ['mdf', 'data', 'tables']" - ) + if mode not in supported_read_modes: + raise ValueError( + f"No valid mode: {mode}. Choose one of {supported_read_modes}." + ) + + return READERS[mode](source, **kwargs) diff --git a/cdm_reader_mapper/core/writer.py b/cdm_reader_mapper/core/writer.py index ce09c8d5..ae3899ae 100755 --- a/cdm_reader_mapper/core/writer.py +++ b/cdm_reader_mapper/core/writer.py @@ -2,21 +2,35 @@ from __future__ import annotations +from typing import get_args + +import pandas as pd +from pandas.io.parsers import TextFileReader + from cdm_reader_mapper.cdm_mapper.writer import write_tables from cdm_reader_mapper.mdf_reader.writer import write_data +from ..properties import SupportedWriteModes + +supported_write_modes = get_args(SupportedWriteModes) + +WRITERS = { + "data": write_data, + "tables": write_tables, +} + def write( - data, - mode="data", + data: pd.DataFrame | TextFileReader, + mode: SupportedWriteModes = "data", **kwargs, ) -> None: """Write either MDF data or CDM tables on disk. Parameters ---------- - data: pandas.DataFrame - pandas.DataFrame to export. + data: pandas.DataFrame or TextFileReader + Data to export. mode: str, {data, tables} Write data mode: @@ -40,10 +54,9 @@ def write( ---- `kwargs` are the keyword arguments for the specific `mode` reader. """ - match mode.lower(): - case "data": - write_data(data, **kwargs) - case "tables": - write_tables(data, **kwargs) - case _: - raise ValueError(f"No valid mode: {mode}. Choose one of ['data', 'tables']") + if mode not in supported_write_modes: + raise ValueError( + f"No valid mode: {mode}. Choose one of {supported_write_modes}." + ) + + return WRITERS[mode](data, **kwargs) diff --git a/cdm_reader_mapper/mdf_reader/properties.py b/cdm_reader_mapper/mdf_reader/properties.py index 8fa41d14..ab089843 100755 --- a/cdm_reader_mapper/mdf_reader/properties.py +++ b/cdm_reader_mapper/mdf_reader/properties.py @@ -2,7 +2,9 @@ from __future__ import annotations -from ..properties import numeric_types, object_types, supported_data_models # noqa +from typing import get_args + +from ..properties import NumericTypes, ObjectTypes, SupportedDataModels # noqa _base = "cdm_reader_mapper.mdf_reader" @@ -17,16 +19,16 @@ } pandas_dtypes = {} -for dtype in object_types: +for dtype in get_args(ObjectTypes): pandas_dtypes[dtype] = "object" -pandas_dtypes.update({x: x for x in numeric_types}) +pandas_dtypes.update({x: x for x in get_args(NumericTypes)}) pandas_dtypes["datetime"] = "datetime" pandas_int = "Int64" # ....and how they are managed data_type_conversion_args = {} -for dtype in numeric_types: +for dtype in get_args(NumericTypes): data_type_conversion_args[dtype] = ["scale", "offset"] data_type_conversion_args["str"] = ["disable_white_strip"] data_type_conversion_args["object"] = ["disable_white_strip"] diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index ef10639e..ced76b50 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -2,8 +2,10 @@ from __future__ import annotations -from io import StringIO as StringIO from pathlib import Path +from typing import Callable, Any, get_args + +import pandas as pd from cdm_reader_mapper import DataBundle @@ -12,7 +14,15 @@ from .utils.filereader import FileReader from .utils.utilities import validate_arg -from .utils.utilities import as_list, as_path, read_csv +from .utils.utilities import as_list, as_path, read_csv, read_parquet, read_feather + +from ..properties import SupportedFileTypes + +READERS = { + "csv": read_csv, + "parquet": read_parquet, + "feather": read_feather, +} def validate_read_mdf_args( @@ -153,8 +163,8 @@ def read_mdf( write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ - if skiprows is None: - skiprows = 0 + skiprows = skiprows or 0 + validate_read_mdf_args( source=source, imodel=imodel, @@ -219,12 +229,41 @@ def read_mdf( ) +def _read_data( + data_file: str, + mask_file: str | None, + reader: Callable[..., Any], + col_subset: str | list | tuple | None, + data_kwargs: dict, + mask_kwargs: dict, +): + """Helper function for reading data files from disk.""" + data, info = reader( + data_file, + col_subset=col_subset, + **data_kwargs, + ) + + if mask_file is None: + mask = pd.DataFrame() + else: + mask, _ = reader( + mask_file, + col_subset=col_subset, + column_names=info["columns"], + **mask_kwargs, + ) + + return data, mask, info + + def read_data( - source, - mask=None, - info=None, - imodel=None, - col_subset=None, + data_file: str, + mask_file: str | None = None, + info_file: str | None = None, + data_format: SupportedFileTypes = "csv", + imodel: str | None = None, + col_subset: str | list | tuple | None = None, encoding: str | None = None, **kwargs, ) -> DataBundle: @@ -232,12 +271,14 @@ def read_data( Parameters ---------- - source: str + data_file: str The data file (including path) to be read. - mask: str, optional + mask_file: str, optional The validation file (including path) to be read. - info: str, optional + info_file: str, optional The information file (including path) to be read. + data_format: {"csv", "parquet", "feather"}, default: "csv" + Format of input data file(s). imodel: str, optional Name of internally available input data model. e.g. icoads_r300_d704 @@ -267,33 +308,40 @@ def read_data( write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ - info_dict = open_json_file(info) if info else {} - dtype = info_dict.get("dtypes", "object") - parse_dates = info_dict.get("parse_dates", False) - encoding = encoding or info_dict.get("encoding", None) - - pd_kwargs = kwargs.copy() - pd_kwargs.setdefault("dtype", dtype) - pd_kwargs.setdefault("parse_dates", parse_dates) - pd_kwargs.setdefault("encoding", encoding) + supported_file_types = get_args(SupportedFileTypes) + if data_format not in supported_file_types: + raise ValueError( + f"data_format must be one of {supported_file_types}, not {data_format}." + ) - data, infos = read_csv( - source, + data_kwargs = kwargs.copy() + mask_kwargs = kwargs.copy() + parse_dates = False + if data_format == "csv": + info_dict = open_json_file(info_file) if info_file else {} + dtype = info_dict.get("dtypes", "object") + parse_dates = info_dict.get("parse_dates", False) + encoding = encoding or info_dict.get("encoding", None) + + data_kwargs.setdefault("dtype", dtype) + data_kwargs.setdefault("parse_dates", parse_dates) + data_kwargs.setdefault("encoding", encoding) + + mask_kwargs.setdefault("dtype", "boolean") + + data, mask, info = _read_data( + data_file=data_file, + mask_file=mask_file, + reader=READERS[data_format], col_subset=col_subset, - **pd_kwargs, - ) - - pd_kwargs = kwargs.copy() - pd_kwargs.setdefault("dtype", "boolean") - - mask, _ = read_csv( - mask, col_subset=col_subset, columns=infos["columns"], **pd_kwargs + data_kwargs=data_kwargs, + mask_kwargs=mask_kwargs, ) return DataBundle( data=data, - columns=infos["columns"], - dtypes=infos["dtypes"].to_dict(), + columns=info["columns"], + dtypes=info["dtypes"], parse_dates=parse_dates, mask=mask, imodel=imodel, diff --git a/cdm_reader_mapper/mdf_reader/schemas/schemas.py b/cdm_reader_mapper/mdf_reader/schemas/schemas.py index 96ff7718..f1e8fa54 100755 --- a/cdm_reader_mapper/mdf_reader/schemas/schemas.py +++ b/cdm_reader_mapper/mdf_reader/schemas/schemas.py @@ -10,7 +10,7 @@ from __future__ import annotations from pathlib import Path -from typing import TypedDict +from typing import TypedDict, get_args from cdm_reader_mapper.common.json_dict import collect_json_files, combine_dicts @@ -109,7 +109,7 @@ def _resolve_schema_files( if imodel: parts = imodel.split("_") model = parts[0] - if model not in properties.supported_data_models: + if model not in get_args(properties.SupportedDataModels): raise ValueError(f"Input data model {model} not supported") return collect_json_files(*parts, base=f"{properties._base}.schemas") diff --git a/cdm_reader_mapper/mdf_reader/utils/convert_and_decode.py b/cdm_reader_mapper/mdf_reader/utils/convert_and_decode.py index 121eced5..4f65d1b5 100755 --- a/cdm_reader_mapper/mdf_reader/utils/convert_and_decode.py +++ b/cdm_reader_mapper/mdf_reader/utils/convert_and_decode.py @@ -3,13 +3,15 @@ from __future__ import annotations from decimal import Decimal, InvalidOperation -from typing import Callable, Any +from typing import Callable, Any, get_args import pandas as pd from .. import properties from .utilities import convert_str_boolean +numeric_types = get_args(properties.NumericTypes) + def max_decimal_places(*decimals: Decimal) -> int: """ @@ -102,7 +104,7 @@ def __init__(self, dtype: str, encoding: str = "base36") -> None: self._registry = {"key": self.base36} - for numeric_type in properties.numeric_types: + for numeric_type in numeric_types: self._registry[numeric_type] = self.base36 def decoder(self) -> Callable[[pd.Series], pd.Series] | None: @@ -189,7 +191,7 @@ def __init__(self, dtype: str) -> None: "key": self.object_to_object, } - for numeric_type in properties.numeric_types: + for numeric_type in numeric_types: self._registry[numeric_type] = self.object_to_numeric def converter(self) -> Callable[..., pd.Series]: diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index f7a782bd..bd211027 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -4,7 +4,6 @@ import ast import csv -import logging import os from io import StringIO @@ -183,7 +182,7 @@ def update_column_labels(columns: Iterable[str | tuple]) -> pd.Index | pd.MultiI def update_and_select( df: pd.DataFrame, subset: str | list | None = None, - columns: pd.Index | pd.MultiIndex | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, ) -> tuple[pd.DataFrame, dict[str, Any]]: """ Update string column labels and select subset from DataFrame. @@ -206,15 +205,54 @@ def update_and_select( df.columns = update_column_labels(df.columns) if subset is not None: df = df[subset] - if columns is not None and not df.empty: - df = df.reindex(columns=columns) + if column_names is not None and not df.empty: + df = df.reindex(columns=column_names) return df, {"columns": df.columns, "dtypes": df.dtypes} +def _read_data_from_file( + filepath: Path, + reader: Callable[..., Any], + col_subset: str | list | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, + reader_kwargs: dict | None = None, + iterator: bool = False, +) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: + """Helper file reader.""" + if filepath is None or not Path(filepath).is_file(): + raise FileNotFoundError(f"File not found: {filepath}") + + reader_kwargs = reader_kwargs or {} + + data = reader(filepath, **reader_kwargs) + + if isinstance(data, pd.DataFrame): + return update_and_select(data, subset=col_subset, column_names=column_names) + + if iterator is True: + writer_kwargs = {} + if "encoding" in reader_kwargs: + writer_kwargs["encoding"] = reader_kwargs["encoding"] + + return process_textfilereader( + data, + func=update_and_select, + func_kwargs={ + "subset": col_subset, + "column_names": column_names, + }, + read_kwargs=reader_kwargs, + write_kwargs=writer_kwargs, + makecopy=False, + ) + + raise ValueError(f"Unsupported reader return type: {type(data)}") + + def read_csv( filepath: Path, col_subset: str | list | None = None, - columns: pd.Index | pd.MultiIndex | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, **kwargs, ) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: """ @@ -226,7 +264,7 @@ def read_csv( Path to the CSV file. col_subset : list of str, optional Subset of columns to read from the CSV. - columns: + column_names: Column labels for re-indexing. kwargs : any Additional keyword arguments passed to pandas.read_csv. @@ -237,29 +275,84 @@ def read_csv( - The CSV as a DataFrame. Empty if file does not exist. - dictionary containing data column labels and data types """ - if filepath is None or not Path(filepath).is_file(): - logging.warning(f"File not found: {filepath}") - return pd.DataFrame(), {} + return _read_data_from_file( + filepath, + reader=pd.read_csv, + col_subset=col_subset, + column_names=column_names, + reader_kwargs=kwargs, + iterator=True, + ) - data = pd.read_csv(filepath, delimiter=",", **kwargs) - if isinstance(data, pd.DataFrame): - data, info = update_and_select(data, subset=col_subset, columns=columns) - return data, info - - write_kwargs = {} - if "encoding" in kwargs: - write_kwargs["encoding"] = kwargs["encoding"] - - data, info = process_textfilereader( - data, - func=update_and_select, - func_kwargs={"subset": col_subset, "columns": columns}, - read_kwargs=kwargs, - write_kwargs=write_kwargs, - makecopy=False, +def read_parquet( + filepath: Path, + col_subset: str | list | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, + **kwargs, +) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: + """ + Safe CSV reader that handles missing files and column subsets. + + Parameters + ---------- + filepath : str or Path or None + Path to the CSV file. + col_subset : list of str, optional + Subset of columns to read from the CSV. + column_names: + Column labels for re-indexing. + kwargs : any + Additional keyword arguments passed to pandas.read_csv. + + Returns + ------- + tuple[pd.DataFrame, dict] + - The CSV as a DataFrame. Empty if file does not exist. + - dictionary containing data column labels and data types + """ + return _read_data_from_file( + filepath, + reader=pd.read_parquet, + col_subset=col_subset, + column_names=column_names, + reader_kwargs=kwargs, + ) + + +def read_feather( + filepath: Path, + col_subset: str | list | None = None, + column_names: pd.Index | pd.MultiIndex | None = None, + **kwargs, +) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: + """ + Safe CSV reader that handles missing files and column subsets. + + Parameters + ---------- + filepath : str or Path or None + Path to the CSV file. + col_subset : list of str, optional + Subset of columns to read from the CSV. + column_names: + Column labels for re-indexing. + kwargs : any + Additional keyword arguments passed to pandas.read_csv. + + Returns + ------- + tuple[pd.DataFrame, dict] + - The CSV as a DataFrame. Empty if file does not exist. + - dictionary containing data column labels and data types + """ + return _read_data_from_file( + filepath, + reader=pd.read_feather, + col_subset=col_subset, + column_names=column_names, + reader_kwargs=kwargs, ) - return data, info def convert_dtypes(dtypes) -> tuple[str]: @@ -413,12 +506,9 @@ def process_textfilereader( - One or more processed DataFrames (in the same order as returned by `func`) - Any additional outputs from `func` that are not DataFrames """ - if func_kwargs is None: - func_kwargs = {} - if read_kwargs is None: - read_kwargs = {} - if write_kwargs is None: - write_kwargs = {} + func_kwargs = func_kwargs or {} + read_kwargs = read_kwargs or {} + write_kwargs = write_kwargs or {} buffers = [] columns = [] @@ -465,6 +555,7 @@ def process_textfilereader( result_dfs = [] for buffer, cols, rk in zip(buffers, columns, read_kwargs): buffer.seek(0) + rk = {k: v for k, v in rk.items() if k != "delimiter"} result_dfs.append( pd.read_csv( buffer, diff --git a/cdm_reader_mapper/mdf_reader/utils/validators.py b/cdm_reader_mapper/mdf_reader/utils/validators.py index d4d84057..b3f62024 100755 --- a/cdm_reader_mapper/mdf_reader/utils/validators.py +++ b/cdm_reader_mapper/mdf_reader/utils/validators.py @@ -6,13 +6,16 @@ import numpy as np import pandas as pd -from typing import Any, Iterable +from typing import Any, Iterable, get_args from .. import properties from ..codes import codes from .utilities import convert_str_boolean +numeric_types = get_args(properties.NumericTypes) + + def _is_false(x: Any) -> bool: """Check if a value is exactly False.""" return x is False @@ -173,7 +176,7 @@ def validate( } validated_columns = [] - validated_dtypes = set(properties.numeric_types) | {"datetime", "key"} + validated_dtypes = set(numeric_types) | {"datetime", "key"} basic_functions = { "datetime": validate_datetime, @@ -188,7 +191,7 @@ def validate( column_atts = element_atts.get(column, {}) column_type = column_atts.get("column_type") - if column_type in properties.numeric_types: + if column_type in numeric_types: valid_min = column_atts.get("valid_min", -np.inf) valid_max = column_atts.get("valid_max", np.inf) column_mask = validate_numeric(series, valid_min, valid_max) diff --git a/cdm_reader_mapper/mdf_reader/writer.py b/cdm_reader_mapper/mdf_reader/writer.py index a2d45fcf..6722ecae 100755 --- a/cdm_reader_mapper/mdf_reader/writer.py +++ b/cdm_reader_mapper/mdf_reader/writer.py @@ -6,28 +6,66 @@ import logging from io import StringIO as StringIO from pathlib import Path +from typing import Any, get_args import pandas as pd +from pandas.io.parsers import TextFileReader from .utils.utilities import join, update_column_names, update_dtypes from ..common import get_filename from ..common.pandas_TextParser_hdlr import make_copy +from ..properties import SupportedFileTypes + +WRITERS = { + "csv": "to_csv", + "parquet": "to_parquet", + "feather": "to_feather", +} + + +def _normalize_data_chunks( + data: pd.DataFrame | TextFileReader | None, +) -> list | TextFileReader: + """Helper function to normalize data chunks.""" + if data is None: + data = pd.DataFrame() + if isinstance(data, pd.DataFrame): + return [data] + if isinstance(data, TextFileReader): + return make_copy(data) + raise TypeError(f"Unsupported data type found: {type(data)}.") + + +def _write_data( + data_df: pd.DataFrame, + mask_df: pd.DataFrame, + data_fn: str, + mask_fn: str, + writer: str, + write_kwargs: dict[str, Any], +) -> None: + """Helper function to write data on disk.""" + getattr(data_df, writer)(data_fn, **write_kwargs) + if not mask_df.empty: + getattr(mask_df, writer)(mask_fn, **write_kwargs) + def write_data( - data, - mask=None, - dtypes: dict | None = None, + data: pd.DataFrame | TextFileReader, + mask: pd.DataFrame | TextFileReader | None = None, + data_format: SupportedFileTypes = "csv", + dtypes: pd.Series | dict | None = None, parse_dates: list | bool = False, - encoding="utf-8", - out_dir=".", - prefix=None, - suffix=None, - extension="csv", - filename=None, - col_subset=None, - delimiter=",", + encoding: str = "utf-8", + out_dir: str = ".", + prefix: str | None = None, + suffix: str | None = None, + extension: str = None, + filename: str | dict | None = None, + col_subset: str | list | tuple | None = None, + delimiter: str = ",", **kwargs, ) -> None: """Write pandas.DataFrame to MDF file on file system. @@ -36,31 +74,32 @@ def write_data( ---------- data: pandas.DataFrame pandas.DataFrame to export. - mask: pandas.DataFrame + mask: pandas.DataFrame, optional validation mask to export. - dtypes: dict + data_format: {"csv", "parquet", "feather"}, default: "csv" + Format of output data file(s). + dtypes: dict, optional Dictionary of data types on ``data``. Dump ``dtypes`` and ``parse_dates`` to json information file. - parse_dates: + parse_dates: list | bool, default: False Information of how to parse dates in :py:attr:`data`. Dump ``dtypes`` and ``parse_dates`` to json information file. For more information see :py:func:`pandas.read_csv`. - encoding: str + encoding: str, default: "utf-8" A string representing the encoding to use in the output file, defaults to utf-8. - out_dir: str + out_dir: str, default: "." Path to the output directory. - Default: current directory prefix: str, optional Prefix of file name structure: ``-data-*.``. suffix: str, optional Suffix of file name structure: ``-data-*.``. - extension: str + extension: str, optional Extension of file name structure: ``-data-*.``. - Default: psv + By default, extension depends on `data_format`. filename: str or dict, optional Name of the output file name(s). List one filename for both ``data`` and ``mask`` ({"data":, "mask":}). - Default: Automatically create file name from table name, ``prefix`` and ``suffix``. + By default, automatically create file name from table name, ``prefix`` and ``suffix``. col_subset: str, tuple or list, optional Specify the section or sections of the file to write. @@ -71,9 +110,8 @@ def write_data( e.g. list type object col_subset = [columns] Column labels could be both string or tuple. - delimiter: str + delimiter: str, default: "," Character or regex pattern to treat as the delimiter while reading with df.to_csv. - Default: "," See Also -------- @@ -88,24 +126,27 @@ def write_data( ---- Use this function after reading MDF data. """ - dtypes = dtypes or {} - if isinstance(parse_dates, bool): - parse_dates = [] + supported_file_types = get_args(SupportedFileTypes) + if data_format not in supported_file_types: + raise ValueError( + f"data_format must be one of {supported_file_types}, not {data_format}." + ) - if not isinstance(data, pd.io.parsers.TextFileReader): - data_list = [data] - else: - data_list = make_copy(data) + extension = extension or data_format - if mask is None: - mask = pd.DataFrame() + if not isinstance(dtypes, (dict, pd.Series)): + dtypes = {} - if not isinstance(mask, pd.io.parsers.TextFileReader): - mask_list = [mask] - else: - mask_list = make_copy(mask) + if isinstance(parse_dates, bool): + parse_dates = [] - info = {"dtypes": dtypes.copy(), "parse_dates": [join(p) for p in parse_dates]} + data_list = _normalize_data_chunks(data) + mask_list = _normalize_data_chunks(mask) + + info = { + "dtypes": {k: str(v) for k, v in dtypes.items()}, + "parse_dates": [join(p) for p in parse_dates], + } logging.info(f"WRITING DATA TO FILES IN: {out_dir}") out_dir = Path(out_dir) @@ -142,18 +183,25 @@ def write_data( info["parse_dates"] = [p for p in info["parse_dates"] if p in header] info["encoding"] = encoding - csv_kwargs = dict( - header=header, - mode=mode, - index=False, - sep=delimiter, - encoding=encoding, - **kwargs, + write_kwargs = {} + if data_format == "csv": + write_kwargs = dict( + header=header, + mode=mode, + index=False, + sep=delimiter, + encoding=encoding, + **kwargs, + ) + + _write_data( + data_df=data_df, + mask_df=mask_df, + data_fn=filename_data, + mask_fn=filename_mask, + writer=WRITERS[data_format], + write_kwargs=write_kwargs, ) - data_df.to_csv(filename_data, **csv_kwargs) - if not mask_df.empty: - mask_df.to_csv(filename_mask, **csv_kwargs) - with open(filename_info, "w") as fileObj: json.dump(info, fileObj, indent=4) diff --git a/cdm_reader_mapper/properties.py b/cdm_reader_mapper/properties.py index b80513b7..9c82fade 100755 --- a/cdm_reader_mapper/properties.py +++ b/cdm_reader_mapper/properties.py @@ -2,8 +2,16 @@ from __future__ import annotations -numeric_types = ["Int64", "int", "float"] +from typing import Literal -object_types = ["str", "object", "key", "datetime"] +NumericTypes = Literal["Int64", "int", "float"] -supported_data_models = ["craid", "gdac", "icoads", "pub47"] +ObjectTypes = Literal["str", "object", "key", "datetime"] + +SupportedDataModels = Literal["craid", "gdac", "icoads", "pub47"] + +SupportedFileTypes = Literal["csv", "parquet", "feather"] + +SupportedReadModes = Literal["mdf", "data", "tables"] + +SupportedWriteModes = Literal["data", "tables"] diff --git a/tests/_duplicates.py b/tests/_duplicates.py index 0e5ab654..0e574ec3 100755 --- a/tests/_duplicates.py +++ b/tests/_duplicates.py @@ -132,6 +132,7 @@ def _get_test_data(imodel): suffix=f"{imodel}*", cdm_subset="header", mode="tables", + extension="psv", ) diff --git a/tests/test_cdm_io.py b/tests/test_cdm_io.py new file mode 100755 index 00000000..81bd4483 --- /dev/null +++ b/tests/test_cdm_io.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import pandas as pd +import pytest + +from cdm_reader_mapper.cdm_mapper.reader import read_tables +from cdm_reader_mapper.cdm_mapper.writer import write_tables + + +@pytest.fixture +def example_data(): + arrays = [ + ["header", "header", "observations-sst", "observations-sst"], + ["report_id", "A", "report_id", "B"], + ] + multi_cols = pd.MultiIndex.from_arrays(arrays) + return pd.DataFrame( + [ + [1, 0.5, 1, "a"], + [2, 5.0, 2, "b"], + [3, 1.3, 3, "c"], + ], + columns=multi_cols, + ) + + +@pytest.fixture +def csv_path(tmp_path, example_data): + header_file = tmp_path / "header.csv" + obssst_file = tmp_path / "observations-sst.csv" + + example_data["header"].to_csv(header_file, index=False) + example_data["observations-sst"].to_csv(obssst_file, index=False) + + return tmp_path + + +@pytest.fixture +def parquet_path(tmp_path, example_data): + header_file = tmp_path / "header.parquet" + obssst_file = tmp_path / "observations-sst.parquet" + + example_data["header"].to_parquet(header_file, index=False) + example_data["observations-sst"].to_parquet(obssst_file, index=False) + + return tmp_path + + +@pytest.fixture +def feather_path(tmp_path, example_data): + header_file = tmp_path / "header.feather" + obssst_file = tmp_path / "observations-sst.feather" + + example_data["header"].to_feather( + header_file, + ) + example_data["observations-sst"].to_feather(obssst_file) + + return tmp_path + + +def test_read_data_csv(csv_path, example_data): + bundle = read_tables(csv_path, delimiter=",") + pd.testing.assert_frame_equal(bundle.data, example_data.astype(str)) + + +def test_read_data_parquet(parquet_path, example_data): + bundle = read_tables(parquet_path, data_format="parquet") + pd.testing.assert_frame_equal(bundle.data, example_data) + + +def test_read_data_feather(feather_path, example_data): + bundle = read_tables(feather_path, data_format="feather") + pd.testing.assert_frame_equal(bundle.data, example_data) + + +def test_write_data_csv(tmp_path, example_data): + write_tables(example_data, out_dir=tmp_path) + data_header = pd.read_csv(tmp_path / "header.csv", delimiter="|") + data_obssst = pd.read_csv(tmp_path / "observations-sst.csv", delimiter="|") + pd.testing.assert_frame_equal(example_data["header"], data_header) + pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) + + +def test_write_data_parquet(tmp_path, example_data): + write_tables(example_data, out_dir=tmp_path, data_format="parquet") + data_header = pd.read_parquet(tmp_path / "header.parquet") + data_obssst = pd.read_parquet(tmp_path / "observations-sst.parquet") + pd.testing.assert_frame_equal(example_data["header"], data_header) + pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) + + +def test_write_data_feather(tmp_path, example_data): + write_tables(example_data, out_dir=tmp_path, data_format="feather") + data_header = pd.read_feather(tmp_path / "header.feather") + data_obssst = pd.read_feather(tmp_path / "observations-sst.feather") + pd.testing.assert_frame_equal(example_data["header"], data_header) + pd.testing.assert_frame_equal(example_data["observations-sst"], data_obssst) diff --git a/tests/test_databundle.py b/tests/test_databundle.py index 161a65ce..57f4adf7 100755 --- a/tests/test_databundle.py +++ b/tests/test_databundle.py @@ -70,7 +70,7 @@ def sample_db_df_testdata(): mask = test_data[f"test_{data_model}"]["mdf_mask"] info = test_data[f"test_{data_model}"]["mdf_info"] - return read_data(data, mask=mask, info=info) + return read_data(data_file=data, mask_file=mask, info_file=info) @pytest.fixture @@ -80,7 +80,7 @@ def sample_db_reader_testdata(): mask = test_data[f"test_{data_model}"]["mdf_mask"] info = test_data[f"test_{data_model}"]["mdf_info"] - return read_data(data, mask=mask, info=info, chunksize=2) + return read_data(data_file=data, mask_file=mask, info_file=info, chunksize=2) def test_len_df(sample_db_df): diff --git a/tests/test_mdf_reader.py b/tests/test_mdf_reader.py index e1467c0a..2cee982c 100755 --- a/tests/test_mdf_reader.py +++ b/tests/test_mdf_reader.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import os import numpy as np @@ -8,11 +9,17 @@ from cdm_reader_mapper import test_data, DataBundle from cdm_reader_mapper.mdf_reader.reader import ( + _read_data, read_mdf, read_data, validate_read_mdf_args, ) from cdm_reader_mapper.mdf_reader.utils.filereader import _apply_multiindex +from cdm_reader_mapper.mdf_reader.utils.utilities import ( + read_csv, + read_parquet, + read_feather, +) def _get_columns(columns, select): @@ -38,7 +45,7 @@ def _read_mdf_test_data(data_model, select=None, drop=None, drop_idx=None, **kwa mask = test_data[f"test_{data_model}"]["mdf_mask"] info = test_data[f"test_{data_model}"]["mdf_info"] - expected = read_data(data, mask=mask, info=info) + expected = read_data(data_file=data, mask_file=mask, info_file=info) if not isinstance(result.data, pd.DataFrame): result.data = result.data.read() @@ -226,7 +233,7 @@ def test_read_data_basic(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.MultiIndex) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert isinstance(db.parse_dates, list) assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -242,7 +249,7 @@ def test_read_data_no_mask(): data_model = "icoads_r300_d721" data = test_data[f"test_{data_model}"]["mdf_data"] info = test_data[f"test_{data_model}"]["mdf_info"] - db = read_data(data, info=info) + db = read_data(data_file=data, info_file=info) assert isinstance(db, DataBundle) @@ -261,7 +268,7 @@ def test_read_data_no_mask(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.MultiIndex) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert isinstance(db.parse_dates, list) assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -277,7 +284,7 @@ def test_read_data_no_info(): data_model = "icoads_r300_d721" data = test_data[f"test_{data_model}"]["mdf_data"] - db = read_data(data) + db = read_data(data_file=data) assert isinstance(db, DataBundle) @@ -296,7 +303,7 @@ def test_read_data_no_info(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.MultiIndex) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert db.parse_dates is False assert db.encoding is None assert db.imodel is None @@ -311,7 +318,7 @@ def test_read_data_col_subset(): data_model = "icoads_r300_d721" data = test_data[f"test_{data_model}"]["mdf_data"] info = test_data[f"test_{data_model}"]["mdf_info"] - db = read_data(data, info=info, col_subset="core") + db = read_data(data_file=data, info_file=info, col_subset="core") assert isinstance(db, DataBundle) @@ -330,7 +337,7 @@ def test_read_data_col_subset(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.Index) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert isinstance(db.parse_dates, list) assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -345,7 +352,7 @@ def test_read_data_col_subset(): def test_read_data_encoding(): data_model = "icoads_r300_d721" data = test_data[f"test_{data_model}"]["mdf_data"] - db = read_data(data, encoding="cp1252") + db = read_data(data_file=data, encoding="cp1252") assert isinstance(db, DataBundle) @@ -364,7 +371,7 @@ def test_read_data_encoding(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.Index) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert db.parse_dates is False assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -381,7 +388,7 @@ def test_read_data_textfilereader(): data = test_data[f"test_{data_model}"]["mdf_data"] mask = test_data[f"test_{data_model}"]["mdf_mask"] info = test_data[f"test_{data_model}"]["mdf_info"] - db = read_data(data, mask=mask, info=info, chunksize=3) + db = read_data(data_file=data, mask_file=mask, info_file=info, chunksize=3) assert isinstance(db, DataBundle) @@ -400,7 +407,7 @@ def test_read_data_textfilereader(): assert isinstance(db.data, pd.io.parsers.TextFileReader) assert isinstance(db.mask, pd.io.parsers.TextFileReader) assert isinstance(db.columns, pd.MultiIndex) - assert isinstance(db.dtypes, dict) + assert isinstance(db.dtypes, pd.Series) assert db.parse_dates == [] assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -511,3 +518,208 @@ def test_validate_read_mdf_args_invalid_years(tmp_path): chunksize=None, skiprows=0, ) + + +@pytest.fixture +def example_data(): + return pd.DataFrame( + { + "A": [1, 2, 3], + "B": [4.0, 5.0, 6.0], + "C": ["x", "y", "z"], + } + ) + + +@pytest.fixture +def example_mask(): + return pd.DataFrame( + { + "A": [True, False, True], + "B": [True, True, False], + "C": [True, True, False], + }, + dtype="boolean", + ) + + +@pytest.fixture +def example_info(example_data): + return { + "columns": example_data.columns, + "dtypes": example_data.dtypes, + "parse_dates": False, + "encoding": None, + } + + +@pytest.fixture +def csv_files(tmp_path, example_data, example_mask, example_info): + data_file = tmp_path / "data.csv" + mask_file = tmp_path / "mask.csv" + info_file = tmp_path / "info.json" + + example_data.to_csv(data_file, index=False) + example_mask.to_csv(mask_file, index=False) + info = { + "columns": list(example_info["columns"]), + "dtypes": example_info["dtypes"].astype(str).to_dict(), + "parse_dates": example_info["parse_dates"], + "encoding": example_info["encoding"], + } + info_file.write_text(json.dumps(info)) + + return data_file, mask_file, info_file + + +@pytest.fixture +def parquet_files(tmp_path, example_data, example_mask): + data_file = tmp_path / "data.parquet" + mask_file = tmp_path / "mask.parquet" + + example_data.to_parquet(data_file) + example_mask.to_parquet(mask_file) + + return data_file, mask_file + + +@pytest.fixture +def feather_files(tmp_path, example_data, example_mask): + data_file = tmp_path / "data.feather" + mask_file = tmp_path / "mask.feather" + + example_data.to_feather(data_file) + example_mask.to_feather(mask_file) + + return data_file, mask_file + + +def test_read_data_with_mask_csv(csv_files, example_data, example_mask, example_info): + data_file, mask_file, _ = csv_files + data, mask, info = _read_data( + data_file=data_file, + mask_file=mask_file, + reader=read_csv, + col_subset=None, + data_kwargs={}, + mask_kwargs={"dtype": "boolean"}, + ) + pd.testing.assert_frame_equal(data, example_data) + pd.testing.assert_frame_equal(mask, example_mask) + pd.testing.assert_index_equal(info["columns"], example_info["columns"]) + pd.testing.assert_series_equal(info["dtypes"], example_info["dtypes"]) + + +def test_read_data_with_mask_parquet( + parquet_files, example_data, example_mask, example_info +): + data_file, mask_file = parquet_files + data, mask, info = _read_data( + data_file=data_file, + mask_file=mask_file, + reader=read_parquet, + col_subset=None, + data_kwargs={}, + mask_kwargs={}, + ) + pd.testing.assert_frame_equal(data, example_data) + pd.testing.assert_frame_equal(mask, example_mask) + pd.testing.assert_index_equal(info["columns"], example_info["columns"]) + pd.testing.assert_series_equal(info["dtypes"], example_info["dtypes"]) + + +def test_read_data_with_mask_feather( + feather_files, example_data, example_mask, example_info +): + data_file, mask_file = feather_files + data, mask, info = _read_data( + data_file=data_file, + mask_file=mask_file, + reader=read_feather, + col_subset=None, + data_kwargs={}, + mask_kwargs={}, + ) + pd.testing.assert_frame_equal(data, example_data) + pd.testing.assert_frame_equal(mask, example_mask) + pd.testing.assert_index_equal(info["columns"], example_info["columns"]) + pd.testing.assert_series_equal(info["dtypes"], example_info["dtypes"]) + + +def test_read_data_without_mask_csv(csv_files, example_data, example_info): + data_file, _, _ = csv_files + data, mask, info = _read_data( + data_file=data_file, + mask_file=None, + reader=read_csv, + col_subset=None, + data_kwargs={}, + mask_kwargs={}, + ) + pd.testing.assert_frame_equal(data, example_data) + assert mask.empty + pd.testing.assert_index_equal(info["columns"], example_info["columns"]) + pd.testing.assert_series_equal(info["dtypes"], example_info["dtypes"]) + + +def test_read_data_csv(csv_files, example_data, example_mask): + data_file, mask_file, info_file = csv_files + + bundle = read_data( + data_file=data_file, + mask_file=mask_file, + info_file=info_file, + data_format="csv", + ) + + assert isinstance(bundle, DataBundle) + pd.testing.assert_frame_equal(bundle.data, example_data) + pd.testing.assert_frame_equal(bundle.mask, example_mask) + pd.testing.assert_index_equal(bundle.columns, example_data.columns) + pd.testing.assert_series_equal(bundle.dtypes, example_data.dtypes) + assert bundle.parse_dates is False + assert bundle.encoding is None + assert bundle.imodel is None + + +def test_read_data_parquet(parquet_files, example_data, example_mask): + data_file, mask_file = parquet_files + + bundle = read_data( + data_file=data_file, + mask_file=mask_file, + data_format="parquet", + ) + + assert isinstance(bundle, DataBundle) + pd.testing.assert_frame_equal(bundle.data, example_data) + pd.testing.assert_frame_equal(bundle.mask, example_mask) + pd.testing.assert_index_equal(bundle.columns, example_data.columns) + pd.testing.assert_series_equal(bundle.dtypes, example_data.dtypes) + assert bundle.parse_dates is False + assert bundle.encoding is None + assert bundle.imodel is None + + +def test_read_data_feather(feather_files, example_data, example_mask): + data_file, mask_file = feather_files + + bundle = read_data( + data_file=data_file, + mask_file=mask_file, + data_format="feather", + ) + + assert isinstance(bundle, DataBundle) + pd.testing.assert_frame_equal(bundle.data, example_data) + pd.testing.assert_frame_equal(bundle.mask, example_mask) + pd.testing.assert_index_equal(bundle.columns, example_data.columns) + pd.testing.assert_series_equal(bundle.dtypes, example_data.dtypes) + assert bundle.parse_dates is False + assert bundle.encoding is None + assert bundle.imodel is None + + +def test_read_data_invalid(): + with pytest.raises(ValueError): + read_data("data.invalid", data_format="invalid") diff --git a/tests/test_mdf_writer.py b/tests/test_mdf_writer.py index fc0a4ca1..1df1c3fa 100755 --- a/tests/test_mdf_writer.py +++ b/tests/test_mdf_writer.py @@ -12,19 +12,27 @@ ) -def test_write_data_basic(tmp_path): - data = pd.DataFrame( +@pytest.fixture +def example_data(): + return pd.DataFrame( { "A": [1, 2, 3], "B": ["1", "2", "3"], } ) - mask = pd.DataFrame( + + +@pytest.fixture +def example_mask(): + return pd.DataFrame( { "A": [True, True, False], "B": [False, True, True], } ) + + +def test_write_data_csv(tmp_path, example_data, example_mask): info = { "dtypes": {"A": "int", "B": "str"}, "parse_dates": [], @@ -32,8 +40,8 @@ def test_write_data_basic(tmp_path): } write_data( - data, - mask=mask, + example_data, + mask=example_mask, out_dir=tmp_path, prefix="test_write", suffix="basic", @@ -54,25 +62,13 @@ def test_write_data_basic(tmp_path): assert info_res == info data_res = pd.read_csv(data_file, dtype=info["dtypes"]) - assert_frame_equal(data, data_res) + assert_frame_equal(example_data, data_res) mask_res = pd.read_csv(mask_file, dtype="bool") - assert_frame_equal(mask, mask_res) + assert_frame_equal(example_mask, mask_res) -def test_write_data_col_subset(tmp_path): - data = pd.DataFrame( - { - "A": [1, 2, 3], - "B": ["1", "2", "3"], - } - ) - mask = pd.DataFrame( - { - "A": [True, True, False], - "B": [False, True, True], - } - ) +def test_write_data_col_subset(tmp_path, example_data, example_mask): info = { "dtypes": {"A": "int"}, "parse_dates": [], @@ -81,8 +77,8 @@ def test_write_data_col_subset(tmp_path): subset = "A" write_data( - data, - mask=mask, + example_data, + mask=example_mask, out_dir=tmp_path, prefix="test_write", suffix="subset", @@ -104,7 +100,86 @@ def test_write_data_col_subset(tmp_path): assert info_res == info data_res = pd.read_csv(data_file, dtype=info["dtypes"]) - assert_frame_equal(data[[subset]], data_res) + assert_frame_equal(example_data[[subset]], data_res) mask_res = pd.read_csv(mask_file, dtype="bool") - assert_frame_equal(mask[[subset]], mask_res) + assert_frame_equal(example_mask[[subset]], mask_res) + + +def test_write_data_parquet(tmp_path, example_data, example_mask): + info = { + "dtypes": {"A": "int", "B": "str"}, + "parse_dates": [], + "encoding": "utf-8", + } + + write_data( + example_data, + mask=example_mask, + out_dir=tmp_path, + prefix="test_write", + suffix="basic", + data_format="parquet", + **info, + ) + + data_file = tmp_path / "test_write-data-basic.parquet" + mask_file = tmp_path / "test_write-mask-basic.parquet" + info_file = tmp_path / "test_write-info-basic.json" + + assert data_file.is_file() + assert mask_file.is_file() + assert info_file.is_file() + + with open(info_file) as read_file: + info_res = json.load(read_file) + + assert info_res == info + + data_res = pd.read_parquet(data_file) + assert_frame_equal(example_data, data_res) + + mask_res = pd.read_parquet(mask_file) + assert_frame_equal(example_mask, mask_res) + + +def test_write_data_feather(tmp_path, example_data, example_mask): + info = { + "dtypes": {"A": "int", "B": "str"}, + "parse_dates": [], + "encoding": "utf-8", + } + + write_data( + example_data, + mask=example_mask, + out_dir=tmp_path, + prefix="test_write", + suffix="basic", + data_format="feather", + **info, + ) + + data_file = tmp_path / "test_write-data-basic.feather" + mask_file = tmp_path / "test_write-mask-basic.feather" + info_file = tmp_path / "test_write-info-basic.json" + + assert data_file.is_file() + assert mask_file.is_file() + assert info_file.is_file() + + with open(info_file) as read_file: + info_res = json.load(read_file) + + assert info_res == info + + data_res = pd.read_feather(data_file) + assert_frame_equal(example_data, data_res) + + mask_res = pd.read_feather(mask_file) + assert_frame_equal(example_mask, mask_res) + + +def test_write_data_invalid(example_data): + with pytest.raises(ValueError): + write_data(example_data, data_format="invalid") diff --git a/tests/test_reader_convert_and_decode.py b/tests/test_reader_convert_and_decode.py index 56c3c170..c2d02ac9 100755 --- a/tests/test_reader_convert_and_decode.py +++ b/tests/test_reader_convert_and_decode.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import get_args + import pandas as pd import pytest @@ -15,6 +17,9 @@ from cdm_reader_mapper.mdf_reader import properties +numeric_types = get_args(properties.NumericTypes) + + @pytest.fixture def sample_series(): return pd.Series(["A", "Z", "10", "1Z"]) @@ -89,7 +94,7 @@ def test_base36_preserves_boolean(): def test_converter_numeric(numeric_series): - conv = Converters(dtype=next(iter(properties.numeric_types))) + conv = Converters(dtype=next(iter(numeric_types))) func = conv.converter() result = func(numeric_series) @@ -111,7 +116,7 @@ def test_numeric_with_scale_offset(): def test_preprocessing_function_pppp(): - conv = Converters(dtype=next(iter(properties.numeric_types))) + conv = Converters(dtype=next(iter(numeric_types))) series = pd.Series(["0123"], name="PPPP") result = conv.object_to_numeric(series) diff --git a/tests/test_reader_utilities.py b/tests/test_reader_utilities.py index 2ae892be..6685d145 100755 --- a/tests/test_reader_utilities.py +++ b/tests/test_reader_utilities.py @@ -178,9 +178,8 @@ def test_read_csv_file_exists(tmp_csv_file): def test_read_csv_file_missing(tmp_path): missing_file = tmp_path / "missing.csv" - df, info = read_csv(missing_file) - assert df.empty - assert info == {} + with pytest.raises(FileNotFoundError): + read_csv(missing_file) def test_read_csv_with_col_subset(tmp_csv_file): diff --git a/tests/test_writers.py b/tests/test_writers.py index c880f145..8abddca4 100755 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -1,5 +1,7 @@ from __future__ import annotations +import os + import pandas as pd import pytest # noqa @@ -8,101 +10,179 @@ @pytest.fixture(scope="session") -def db_exp(): +def db_tables(): imodel = "icoads_r300_d714" pattern = f"test_{imodel}" for table in cdm_tables: cdm_path = test_data[pattern][f"cdm_{table}"].parent - db = read(cdm_path, suffix=f"{imodel}*", mode="tables") + db = read(cdm_path, suffix=f"{imodel}*", extension="psv", mode="tables") + db.imodel = imodel + return db + + +@pytest.fixture(scope="session") +def db_data(): + imodel = "icoads_r300_d714" + pattern = f"test_{imodel}" + + data_file = test_data[pattern]["mdf_data"] + info_file = test_data[pattern]["mdf_info"] + + db = read(data_file, info_file=info_file, mode="data") db.imodel = imodel return db -def test_write_data(tmp_path, db_exp): - db_exp.write(out_dir=tmp_path, suffix=f"{db_exp.imodel}_all") - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_all", mode="tables") - pd.testing.assert_frame_equal(db_exp.data, db_res.data) +def test_write_data_csv(tmp_path, db_data): + db_data.write(out_dir=tmp_path, data_format="csv") + db_res = read( + os.path.join(tmp_path, "data.csv"), + info_file=os.path.join(tmp_path, "info.json"), + data_format="csv", + mode="data", + ) + pd.testing.assert_frame_equal(db_data.data, db_res.data) + + +def test_write_tables_csv(tmp_path, db_tables): + db_tables.write(out_dir=tmp_path, suffix=f"{db_tables.imodel}_all") + db_res = read(tmp_path, suffix=f"{db_tables.imodel}_all", mode="tables") + pd.testing.assert_frame_equal(db_tables.data, db_res.data) -def test_write_header(tmp_path, db_exp): +def test_write_header(tmp_path, db_tables): table = "header" - db_exp.write( - out_dir=tmp_path, suffix=f"{db_exp.imodel}_{table}_all", cdm_subset=table + db_tables.write( + out_dir=tmp_path, + suffix=f"{db_tables.imodel}_{table}_all", + extension="psv", + cdm_subset=table, ) db_res = read( - tmp_path, suffix=f"{db_exp.imodel}_{table}_all", cdm_subset=table, mode="tables" + tmp_path, + suffix=f"{db_tables.imodel}_{table}_all", + cdm_subset=table, + extension="psv", + mode="tables", ) - table_exp = db_exp[table].dropna(how="all").reset_index(drop=True) + + table_exp = db_tables[table].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res[table]) -def test_write_observations(tmp_path, db_exp): +def test_write_observations(tmp_path, db_tables): table = "observations-sst" - db_exp.write( - out_dir=tmp_path, suffix=f"{db_exp.imodel}_{table}_all", cdm_subset=table + db_tables.write( + out_dir=tmp_path, suffix=f"{db_tables.imodel}_{table}_all", cdm_subset=table ) db_res = read( - tmp_path, suffix=f"{db_exp.imodel}_{table}_all", cdm_subset=table, mode="tables" + tmp_path, + suffix=f"{db_tables.imodel}_{table}_all", + cdm_subset=table, + mode="tables", ) - table_exp = db_exp[table].dropna(how="all").reset_index(drop=True) + table_exp = db_tables[table].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res[table]) -def test_write_fns(tmp_path, db_exp): - db_exp.write( +def test_write_fns(tmp_path, db_tables): + db_tables.write( out_dir=tmp_path, prefix="prefix", - suffix=f"{db_exp.imodel}_all", + suffix=f"{db_tables.imodel}_all", extension="csv", delimiter=",", ) db_res = read( tmp_path, prefix="prefix", - suffix=f"{db_exp.imodel}_all", + suffix=f"{db_tables.imodel}_all", extension="csv", delimiter=",", mode="tables", ) - pd.testing.assert_frame_equal(db_exp.data, db_res.data) + pd.testing.assert_frame_equal(db_tables.data, db_res.data) -def test_write_filename(tmp_path, db_exp): - db_exp.write(out_dir=tmp_path, filename=f"{db_exp.imodel}_filename_all") - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_filename_all", mode="tables") - pd.testing.assert_frame_equal(db_exp.data, db_res.data) +def test_write_filename(tmp_path, db_tables): + db_tables.write(out_dir=tmp_path, filename=f"{db_tables.imodel}_filename_all") + db_res = read(tmp_path, suffix=f"{db_tables.imodel}_filename_all", mode="tables") + pd.testing.assert_frame_equal(db_tables.data, db_res.data) -def test_write_filename_dict_header(tmp_path, db_exp): +def test_write_filename_dict_header(tmp_path, db_tables): filename_dict = { - "header": f"{db_exp.imodel}_filename_dict_all", + "header": f"{db_tables.imodel}_filename_dict_all", } - db_exp.write(out_dir=tmp_path, filename=filename_dict) - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_filename_dict_all", mode="tables") - table_exp = db_exp["header"].dropna(how="all").reset_index(drop=True) + db_tables.write(out_dir=tmp_path, filename=filename_dict) + db_res = read( + tmp_path, suffix=f"{db_tables.imodel}_filename_dict_all", mode="tables" + ) + table_exp = db_tables["header"].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res.data["header"]) -def test_write_filename_dict_observations(tmp_path, db_exp): +def test_write_filename_dict_observations(tmp_path, db_tables): filename_dict = { - "observations-sst": f"observations-sst-{db_exp.imodel}_filename_dict_all.psv", + "observations-sst": f"observations-sst-{db_tables.imodel}_filename_dict_all.psv", } - db_exp.write(out_dir=tmp_path, filename=filename_dict) - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_filename_dict_all", mode="tables") - table_exp = db_exp["observations-sst"].dropna(how="all").reset_index(drop=True) + db_tables.write(out_dir=tmp_path, filename=filename_dict, extension="psv") + db_res = read( + tmp_path, + suffix=f"{db_tables.imodel}_filename_dict_all", + mode="tables", + extension="psv", + ) + table_exp = db_tables["observations-sst"].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res.data["observations-sst"]) -def test_write_col_subset(tmp_path, db_exp): +def test_write_col_subset(tmp_path, db_tables): table = "header" columns = ["report_id", "latitude", "longitude"] - db_exp.write( + db_tables.write( out_dir=tmp_path, - suffix=f"{db_exp.imodel}_{table}_all", + suffix=f"{db_tables.imodel}_{table}_all", cdm_subset=table, col_subset={table: columns}, ) - db_res = read(tmp_path, suffix=f"{db_exp.imodel}_{table}_all", mode="tables") - table_exp = db_exp[table][columns].dropna(how="all").reset_index(drop=True) + db_res = read(tmp_path, suffix=f"{db_tables.imodel}_{table}_all", mode="tables") + table_exp = db_tables[table][columns].dropna(how="all").reset_index(drop=True) pd.testing.assert_frame_equal(table_exp, db_res[table]) + + +def test_write_data_parquet(tmp_path, db_data): + db_data.write(out_dir=tmp_path, data_format="parquet") + db_res = read( + os.path.join(tmp_path, "data.parquet"), data_format="parquet", mode="data" + ) + pd.testing.assert_frame_equal(db_data.data, db_res.data) + + +def test_write_data_feather(tmp_path, db_data): + db_data.write(out_dir=tmp_path, data_format="feather") + db_res = read( + os.path.join(tmp_path, "data.feather"), data_format="feather", mode="data" + ) + pd.testing.assert_frame_equal(db_data.data, db_res.data) + + +def test_write_tables_parquet(tmp_path, db_tables): + db_tables.write( + out_dir=tmp_path, suffix=f"{db_tables.imodel}_all", data_format="parquet" + ) + db_res = read( + tmp_path, suffix=f"{db_tables.imodel}_all", mode="tables", data_format="parquet" + ) + pd.testing.assert_frame_equal(db_tables.data, db_res.data) + + +def test_write_tables_feather(tmp_path, db_tables): + db_tables.write( + out_dir=tmp_path, suffix=f"{db_tables.imodel}_all", data_format="feather" + ) + db_res = read( + tmp_path, suffix=f"{db_tables.imodel}_all", mode="tables", data_format="feather" + ) + pd.testing.assert_frame_equal(db_tables.data, db_res.data)