Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,29 @@ Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`)
New features and enhancements
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* ``mdf_reader.read_data`` now supports chunking (:pull:`360`)
* read and write both `parquet` and `feather` files including new parameter `data_format` (:issue:`353`, :pull:`363`):

* `mdf_reader.read_data`,
* `mdf_reader.write_data`
* `cdm_mapper.read_tables`
* `cdm_mapper.write_tables`

Breaking changes
^^^^^^^^^^^^^^^^
* ``DataBundle.stack_v`` and ``DataBundle.stack_h`` only support `pd.DataFrames` as input, otherwise raises an `ValueError` (:pull:`360`)
* set default for `extension` from `psv` to specified `data_format` (:pull:`363`):

* `cdm_mapper.read_tables`
* `cdm_mapper.write_tables`

* set default for `extension` from ``csv` to specified `data_format` in `mdf_reader.write_data` (:pull:`363`)
* `mdf_reader.read_data`: save `dtypes` in return DataBundle as `pd.Series` not `dict` (:pull:`363`)

Internal changes
^^^^^^^^^^^^^^^^
* re-work internal structure for more readability and better performance (:pull:`360`)
* use pre-defined `Literal` constants in `cdm_reader_mapper.properties` (:pull:`363`)
* `mdf_reader.utils.utilities.read_csv`: parameter `columns` to `column_names` (:pull:`363`)

2.2.1 (2026-01-23)
------------------
Expand Down
4 changes: 2 additions & 2 deletions cdm_reader_mapper/cdm_mapper/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from copy import deepcopy
from io import StringIO
from typing import Any
from typing import Any, get_args

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -533,7 +533,7 @@ def map_model(
"""
logger = logging_hdlr.init_logger(__name__, level=log_level)
imodel = imodel.split("_")
if imodel[0] not in properties.supported_data_models:
if imodel[0] not in get_args(properties.SupportedDataModels):
logger.error("Input data model " f"{imodel[0]}" " not supported")
return

Expand Down
2 changes: 1 addition & 1 deletion cdm_reader_mapper/cdm_mapper/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from ..properties import numeric_types, object_types, supported_data_models # noqa
from ..properties import NumericTypes, ObjectTypes, SupportedDataModels # noqa

_base = "cdm_reader_mapper.cdm_mapper"

Expand Down
137 changes: 94 additions & 43 deletions cdm_reader_mapper/cdm_mapper/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,46 +47,79 @@
import glob
import os

from typing import get_args

import pandas as pd

from cdm_reader_mapper.common import get_filename, logging_hdlr
from cdm_reader_mapper.core.databundle import DataBundle

from . import properties
from ..properties import SupportedFileTypes
from .properties import cdm_tables
from .utils.utilities import get_cdm_subset, get_usecols


def _read_file(ifile, table, col_subset, **kwargs):
READERS = {
"csv": pd.read_csv,
"parquet": pd.read_parquet,
"feather": pd.read_feather,
}

READER_KWARGS = {
"csv": "usecols",
"parquet": "columns",
"feather": "columns",
}


def _read_file(
ifile: str,
table: str,
col_subset: str | list | None,
data_format: SupportedFileTypes,
**kwargs,
) -> pd.DataFrame:
usecols = get_usecols(table, col_subset)
return pd.read_csv(ifile, usecols=usecols, **kwargs)
reader = READERS[data_format]
reader_kwargs = {READER_KWARGS[data_format]: usecols, **kwargs}
return reader(ifile, **reader_kwargs)


def _read_single_file(
ifile,
cdm_subset=None,
col_subset=None,
null_label="null",
ifile: str,
data_format: SupportedFileTypes,
cdm_subset: str | list | None = None,
col_subset: str | list | None = None,
null_label: str = "null",
**kwargs,
) -> pd.DataFrame:
if not isinstance(cdm_subset, list):
cdm_subset = [cdm_subset]
dfi_ = _read_file(ifile, table=cdm_subset[0], col_subset=col_subset, **kwargs)
dfi_ = _read_file(
ifile,
table=cdm_subset[0],
data_format=data_format,
col_subset=col_subset,
**kwargs,
)
if dfi_.empty:
return pd.DataFrame()

dfi_ = dfi_.set_index("report_id", drop=False)
if null_label in dfi_.index:
return dfi_.drop(index=null_label)
return dfi_


def _read_multiple_files(
inp_dir,
prefix=None,
suffix=None,
extension="psv",
cdm_subset=None,
col_subset=None,
null_label="null",
inp_dir: str,
data_format: SupportedFileTypes,
prefix: str | None = None,
suffix: str | None = None,
extension: str | None = None,
cdm_subset: str | list | None = None,
col_subset: str | list | None = None,
null_label: str = "null",
logger=None,
**kwargs,
) -> list[pd.DataFrame]:
Expand All @@ -98,20 +131,24 @@ def _read_multiple_files(
files = glob.glob(pattern)

if len(files) == 0:
logger.error(f"No files found matching pattern {pattern}")
return [pd.DataFrame()]
raise FileNotFoundError(f"No files found matching pattern {pattern}")

df_list = []
if not isinstance(cdm_subset, list):
cdm_subset = [cdm_subset]

for table in cdm_subset:
if table not in properties.cdm_tables:
if table not in cdm_tables:
logger.warning(f"Requested table {table} not defined in CDM")
continue

logger.info(f"Getting file path for pattern {table}")
pattern_ = get_filename(
[prefix, table, f"*{suffix}"], path=inp_dir, extension=extension
)
_pattern = [table]
if prefix:
_pattern = [prefix] + _pattern
if suffix:
_pattern = _pattern + [f"*{suffix}"]
pattern_ = get_filename(_pattern, path=inp_dir, extension=extension)
paths_ = glob.glob(pattern_)
if len(paths_) != 1:
logger.warning(
Expand All @@ -122,6 +159,7 @@ def _read_multiple_files(

dfi = _read_single_file(
paths_[0],
data_format=data_format,
cdm_subset=[table],
col_subset=col_subset,
null_label=null_label,
Expand All @@ -141,31 +179,34 @@ def _read_multiple_files(


def read_tables(
source,
prefix=None,
suffix=None,
extension="psv",
cdm_subset=None,
col_subset=None,
delimiter="|",
na_values=None,
null_label="null",
source: str,
data_format: SupportedFileTypes = "csv",
prefix: str | None = None,
suffix: str | None = None,
extension: str | None = None,
cdm_subset: str | list | None = None,
col_subset: str | list | dict | None = None,
delimiter: str = "|",
na_values: str | None = None,
null_label: str = "null",
**kwargs,
) -> DataBundle:
"""
Read CDM-table-like files from file system to a pandas.DataFrame.

Parameters
----------
source: str, optional
source: str
The file (including path) or the path to the file(s) to be read.
data_format: {"csv", "parquet", "feather"}, default: "csv"
Format of input data file(s).
prefix: str, optional
Prefix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
suffix: str, optional
Suffix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
extension: str
extension: str, optional
Extension of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
Default: psv
Expand Down Expand Up @@ -213,21 +254,32 @@ def read_tables(
write_data : Write MDF data and validation mask to disk.
"""
logger = logging_hdlr.init_logger(__name__, level="INFO")
supported_file_types = get_args(SupportedFileTypes)
if data_format not in supported_file_types:
raise ValueError(
f"data_format must be one of {supported_file_types}, not {data_format}."
)

# Because how the printers are written, they modify the original data frame!,
# also removing rows with empty observation_value in observation_tables
kwargs = {
"delimiter": delimiter,
"dtype": "object",
"na_values": na_values,
"keep_default_na": False,
}
if data_format == "csv":
kwargs = {
"delimiter": delimiter,
"dtype": "object",
"na_values": na_values,
"keep_default_na": False,
**kwargs,
}
# See if subset, if any of the tables is not as specs
cdm_subset = get_cdm_subset(cdm_subset)

extension = extension or data_format

if os.path.isfile(source):
df_list = [
_read_single_file(
source,
data_format=data_format,
cdm_subset=cdm_subset,
col_subset=col_subset,
null_label=null_label,
Expand All @@ -237,6 +289,7 @@ def read_tables(
elif os.path.isdir(source):
df_list = _read_multiple_files(
source,
data_format=data_format,
prefix=prefix,
suffix=suffix,
extension=extension,
Expand All @@ -247,14 +300,12 @@ def read_tables(
**kwargs,
)
else:
logger.error(
f"Source is neither a valid file name nor a valid directory path: {source}"
raise FileNotFoundError(
f"Source is neither a valid file name nor a valid directory path: {source}."
)
return DataBundle(data=pd.DataFrame())

if len(df_list) == 0:
logger.error("All tables empty in file system")
return DataBundle(data=pd.DataFrame(), mode="tables")
raise ValueError("All tables empty in file system.")

merged = pd.concat(df_list, axis=1, join="outer")
merged = merged.reset_index(drop=True)
Expand Down
Loading
Loading