From 1c0a2e872ebe7aa09fe620c4c5757c7754e3c05d Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Mon, 27 Jan 2025 15:26:36 +0000 Subject: [PATCH 01/46] deps: add polars --- pyproject.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 28b90461..8372af3a 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dynamic = ["version"] dependencies = [ "matplotlib", "pandas>=2.1.0", + "polars", "dask", "distributed", "datashader", @@ -216,6 +217,7 @@ check-typed-exception = true "matplotlib.pyplot" = "plt" numpy = "np" pandas = "pd" +polars = "pl" scipy = "sp" xarray = "xr" From 90aedfb2b349f6406bd4552c5f499b8a45a382b0 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Mon, 27 Jan 2025 15:26:46 +0000 Subject: [PATCH 02/46] refactor!: add functions for reading sections using polars --- cdm_reader_mapper/mdf_reader/read.py | 3 +- .../mdf_reader/utils/configurator.py | 138 ++++++++++++++++++ .../mdf_reader/utils/filereader.py | 53 ++++++- 3 files changed, 187 insertions(+), 7 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/read.py b/cdm_reader_mapper/mdf_reader/read.py index fb654b5b..531b8522 100755 --- a/cdm_reader_mapper/mdf_reader/read.py +++ b/cdm_reader_mapper/mdf_reader/read.py @@ -227,7 +227,8 @@ def read( read_sections_list, sections, # INFO: Set default as "pandas" to account for custom schema - open_with=properties.open_file.get(self.imodel, "pandas"), + # open_with=properties.open_file.get(self.imodel, "pandas"), + open_with=properties.open_file.get(self.imodel, "polars"), chunksize=chunksize, ) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index b2f89683..836cf379 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -8,10 +8,12 @@ import numpy as np import pandas as pd +import polars as pl from .. import properties from . import converters, decoders from .utilities import convert_dtypes +from ..properties import internal_delimiter class Configurator: @@ -50,6 +52,12 @@ def _get_index(self, section, order): else: return (order, section) + def _get_polars_index(self, section, order): + if len(self.orders) == 1: + return section + else: + return internal_delimiter.join([order, section]) + def _get_ignore(self, section_dict): ignore = section_dict.get("ignore") if isinstance(ignore, str): @@ -147,6 +155,136 @@ def get_configuration(self): }, } + def open_polars(self): + """Open TextParser to a pl.DataFrame""" + self.df = self.df.with_columns(pl.lit([]).alias("missing_values")) + for order in self.orders: + header = self.schema["sections"][order]["header"] + + disable_read = header.get("disable_read", False) + if disable_read is True: + break + + sentinal = header.get("sentinal") + + section_length = header.get("length", properties.MAX_FULL_REPORT_WIDTH) + sections = self.schema["sections"][order]["elements"] + + # OPTIM: Is it more efficient to create the two columns or use the + # regex matching? + if sentinal is not None: + self.df = self.df.with_columns( + [ + ( + pl.when(pl.col("full_str").str.starts_with(sentinal)) + .then(pl.col("full_str").str.slice(0, section_length)) + .otherwise(pl.lit(None)) + .alias(order) + ), + ( + pl.when(pl.col("full_str").str.starts_with(sentinal)) + .then(pl.col("full_str").str.slice(section_length)) + .otherwise(pl.col("full_str")) + .alias("full_str") + ), + ] + ) + else: + self.df = self.df.with_columns( + [ + pl.col("full_str").str.slice(0, section_length).alias(order), + pl.col("full_str").str.slice(section_length).alias("full_str"), + ] + ) + + field_layout = header.get("field_layout") + delimiter = header.get("delimiter") + + # Handle delimited section + if delimiter is not None: + delimiter_format = header.get("format") + if delimiter_format == "delimited": + # Read as CSV + field_names = sections.keys() + field_names = [ + self._get_polars_index(order, x) for x in field_names + ] + n_fields = len(field_names) + self.df = self.df.with_columns( + pl.col(order) + .str.splitn(delimiter, n_fields) + .struct.rename_fields(field_names) + .struct.unnest() + ) + for field in field_names: + self.df = self.df.with_columns( + pl.col(field).str.strip_chars(" ").name.keep() + ).with_columns( + pl.when(pl.col(field).eq("") | pl.col(field).is_null()) + .then(pl.col("missing_values").list.concat(pl.lit([field]))) + .otherwise(pl.col("missing_values")) + .alias("missing_values") + ) + + continue + elif field_layout != "fixed_width": + logging.error( + f"Delimiter for {order} is set to {delimiter}. " + + f"Please specify either format or field_layout in your header schema {header}." + ) + raise ValueError( + f"Delimiter for {order} is set to {delimiter}. " + + f"Please specify either format or field_layout in your header schema {header}." + ) + + for section, section_dict in sections.items(): + index = self._get_polars_index(section, order) + ignore = (order not in self.valid) or self._get_ignore(section_dict) + field_length = section_dict.get( + "field_length", properties.MAX_FULL_REPORT_WIDTH + ) + na_value = section_dict.get("missing_value") + + if ignore: + self.df = self.df.with_columns( + [ + pl.col(order).str.slice(field_length).name.keep(), + pl.col("missing_values") + .list.concat(pl.lit([index])) + .name.keep(), + ] + ) + continue + + missing_expr = pl.col(index).eq("") | pl.col(index).is_null() + if na_value is not None: + missing_expr = missing_expr | pl.col("index").eq(na_value) + + self.df = self.df.with_columns( + [ + pl.col(order) + .str.slice(0, field_length) + .str.strip_chars(" ") + .alias(index), + pl.col(order).str.slice(field_length).name.keep(), + ] + ).with_columns( + pl.when(missing_expr) + .then(pl.col("missing_values").list.concat(pl.lit([index]))) + .otherwise(pl.col("missing_values")) + .alias("missing_values") + ) + + if delimiter is not None: + self.df = self.df.with_columns( + pl.col(order).str.strip_prefix(delimiter).name.keep() + ) + + self.df = self.df.drop([order]) + + continue + return self.df.drop("full_str") + def open_pandas(self): """Open TextParser to pd.DataSeries.""" return self.df.apply(lambda x: self._read_line(x[0]), axis=1) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 54c78882..2501e0a6 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -10,6 +10,7 @@ import numpy as np import pandas as pd +import polars as pl import xarray as xr from .. import properties @@ -125,6 +126,17 @@ def _read_pandas(self, **kwargs): **kwargs, ) + def _read_polars(self, **kwargs): + return pl.read_csv( + self.source, + has_header=False, + separator="\0", + new_columns=["full_str"], + quote_char="\0", + infer_schema=False, + **kwargs, + ) + def _read_netcdf(self, **kwargs): ds = xr.open_mfdataset(self.source, **kwargs) self._adjust_schema(ds, ds.dtypes) @@ -137,7 +149,15 @@ def _read_sections( valid, open_with, ): - if open_with == "pandas": + if open_with == "polars": + print(TextParser.head) + df = Configurator( + df=pl.from_pandas(TextParser).rename({"0": "full_str"}), + schema=self.schema, + order=order, + valid=valid, + ).open_polars() + elif open_with == "pandas": df = Configurator( df=TextParser, schema=self.schema, order=order, valid=valid ).open_pandas() @@ -146,10 +166,22 @@ def _read_sections( df=TextParser, schema=self.schema, order=order, valid=valid ).open_netcdf() else: - raise ValueError("open_with has to be one of ['pandas', 'netcdf']") + raise ValueError( + "open_with has to be one of ['pandas', 'polars', 'netcdf']" + ) - missing_values_ = df["missing_values"] - del df["missing_values"] + if isinstance(df, pl.DataFrame): + missing_values_ = df.get_column("missing_values").to_pandas() + df = df.drop("missing_values").to_pandas() + df.columns = pd.MultiIndex.from_tuples( + [ + tuple(x.split(properties.internal_delimiter, maxsplit=1)) + for x in df.columns + ] + ) + else: + missing_values_ = df["missing_values"] + del df["missing_values"] df = self._select_years(df) missing_values = set_missing_values(pd.DataFrame(missing_values_), df) self.columns = df.columns @@ -216,17 +248,26 @@ def open_data( """DOCUMENTATION.""" if open_with == "netcdf": TextParser = self._read_netcdf() - elif open_with == "pandas": + elif open_with == "pandas" or open_with == "polars": TextParser = self._read_pandas( encoding=self.schema["header"].get("encoding"), widths=[properties.MAX_FULL_REPORT_WIDTH], skiprows=self.skiprows, chunksize=chunksize, ) + # elif open_with == "polars": + # TextParser = self._read_polars( + # # encoding=self.schema["header"].get("encoding"), + # # widths=[properties.MAX_FULL_REPORT_WIDTH], + # # skiprows=self.skiprows, + # # chunksize=chunksize, + # ) else: raise ValueError("open_with has to be one of ['pandas', 'netcdf']") - if isinstance(TextParser, pd.DataFrame) or isinstance(TextParser, xr.Dataset): + if isinstance(TextParser, (pd.DataFrame, pl.DataFrame)) or isinstance( + TextParser, xr.Dataset + ): df, self.missing_values = self._read_sections( TextParser, order, valid, open_with=open_with ) From 9cca76530516799a151057353654d0cf70f180ae Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Tue, 28 Jan 2025 07:11:44 +0000 Subject: [PATCH 03/46] chore: remove print --- cdm_reader_mapper/mdf_reader/utils/filereader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 2501e0a6..0128a36a 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -150,7 +150,6 @@ def _read_sections( open_with, ): if open_with == "polars": - print(TextParser.head) df = Configurator( df=pl.from_pandas(TextParser).rename({"0": "full_str"}), schema=self.schema, From 8bc58b1257c691d7bf5286795cbcb9cf5a497923 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 28 Jan 2025 07:35:05 +0000 Subject: [PATCH 04/46] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 836cf379..21226554 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -11,9 +11,9 @@ import polars as pl from .. import properties +from ..properties import internal_delimiter from . import converters, decoders from .utilities import convert_dtypes -from ..properties import internal_delimiter class Configurator: From 6ee5757565cef9dee7e709e4fef7d54977bca890 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Tue, 28 Jan 2025 07:38:49 +0000 Subject: [PATCH 05/46] deps: add polars to ci requirements/environment files --- ci/requirements/environment.yml | 1 + ci/requirements/requirements.txt | 1 + ci/requirements/requirements_ci.txt | 1 + 3 files changed, 3 insertions(+) diff --git a/ci/requirements/environment.yml b/ci/requirements/environment.yml index 3886f07f..2b56a19b 100755 --- a/ci/requirements/environment.yml +++ b/ci/requirements/environment.yml @@ -6,6 +6,7 @@ channels: dependencies: - pandas + - polars - dask - matplotlib - xarray diff --git a/ci/requirements/requirements.txt b/ci/requirements/requirements.txt index f94e9659..bd924a6c 100755 --- a/ci/requirements/requirements.txt +++ b/ci/requirements/requirements.txt @@ -1,5 +1,6 @@ matplotlib pandas>=2.1.0 +polars dask distributed datashader diff --git a/ci/requirements/requirements_ci.txt b/ci/requirements/requirements_ci.txt index 600fae43..781b94bb 100755 --- a/ci/requirements/requirements_ci.txt +++ b/ci/requirements/requirements_ci.txt @@ -7,6 +7,7 @@ tox==4.23.0 tox-gh==1.4.4 matplotlib pandas>=2.1.0 +polars dask distributed datashader From 9c98891bfe1949a176db45e20a70f5f6a3f2a8de Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Tue, 28 Jan 2025 09:23:38 +0000 Subject: [PATCH 06/46] fix: typo in log message --- cdm_reader_mapper/mdf_reader/read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/read.py b/cdm_reader_mapper/mdf_reader/read.py index 531b8522..b83dca35 100755 --- a/cdm_reader_mapper/mdf_reader/read.py +++ b/cdm_reader_mapper/mdf_reader/read.py @@ -243,7 +243,7 @@ def read( self.validate_entries(validate) # 3. Create output DataBundle object - logging.info("Creata output DataBundle object") + logging.info("Create output DataBundle object") return DataBundle( data=self.data, From f286232ede7d0c9b7f42e0be37759edcd8635782 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Tue, 28 Jan 2025 14:52:44 +0000 Subject: [PATCH 07/46] refactor: missing values are only assigned for missing sections --- .../mdf_reader/utils/configurator.py | 94 +++++++++---------- .../mdf_reader/utils/filereader.py | 40 ++++++-- 2 files changed, 76 insertions(+), 58 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 21226554..7994030e 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -155,23 +155,18 @@ def get_configuration(self): }, } - def open_polars(self): + def open_polars(self) -> pl.DataFrame: """Open TextParser to a pl.DataFrame""" self.df = self.df.with_columns(pl.lit([]).alias("missing_values")) - for order in self.orders: - header = self.schema["sections"][order]["header"] - - disable_read = header.get("disable_read", False) - if disable_read is True: - break + for section in self.orders: + header = self.schema["sections"][section]["header"] sentinal = header.get("sentinal") section_length = header.get("length", properties.MAX_FULL_REPORT_WIDTH) - sections = self.schema["sections"][order]["elements"] + fields = self.schema["sections"][section]["elements"] - # OPTIM: Is it more efficient to create the two columns or use the - # regex matching? + # Get data associated with current section if sentinal is not None: self.df = self.df.with_columns( [ @@ -179,7 +174,7 @@ def open_polars(self): pl.when(pl.col("full_str").str.starts_with(sentinal)) .then(pl.col("full_str").str.slice(0, section_length)) .otherwise(pl.lit(None)) - .alias(order) + .alias(section) ), ( pl.when(pl.col("full_str").str.starts_with(sentinal)) @@ -192,11 +187,16 @@ def open_polars(self): else: self.df = self.df.with_columns( [ - pl.col("full_str").str.slice(0, section_length).alias(order), + pl.col("full_str").str.slice(0, section_length).alias(section), pl.col("full_str").str.slice(section_length).alias("full_str"), ] ) + # Don't read fields + disable_read = header.get("disable_read", False) + if disable_read is True: + continue + field_layout = header.get("field_layout") delimiter = header.get("delimiter") @@ -205,13 +205,13 @@ def open_polars(self): delimiter_format = header.get("format") if delimiter_format == "delimited": # Read as CSV - field_names = sections.keys() + field_names = fields.keys() field_names = [ - self._get_polars_index(order, x) for x in field_names + self._get_polars_index(section, x) for x in field_names ] n_fields = len(field_names) self.df = self.df.with_columns( - pl.col(order) + pl.col(section) .str.splitn(delimiter, n_fields) .struct.rename_fields(field_names) .struct.unnest() @@ -219,70 +219,66 @@ def open_polars(self): for field in field_names: self.df = self.df.with_columns( pl.col(field).str.strip_chars(" ").name.keep() - ).with_columns( - pl.when(pl.col(field).eq("") | pl.col(field).is_null()) - .then(pl.col("missing_values").list.concat(pl.lit([field]))) - .otherwise(pl.col("missing_values")) - .alias("missing_values") ) continue elif field_layout != "fixed_width": - logging.error( - f"Delimiter for {order} is set to {delimiter}. " - + f"Please specify either format or field_layout in your header schema {header}." - ) + # logging.error( + # f"Delimiter for {order} is set to {delimiter}. " + # + f"Please specify either format or field_layout in your header schema {header}." + # ) raise ValueError( - f"Delimiter for {order} is set to {delimiter}. " + f"Delimiter for {section} is set to {delimiter}. " + f"Please specify either format or field_layout in your header schema {header}." ) - for section, section_dict in sections.items(): - index = self._get_polars_index(section, order) - ignore = (order not in self.valid) or self._get_ignore(section_dict) - field_length = section_dict.get( + # Loop through fixed-width fields + for field, field_dict in fields.items(): + index = self._get_polars_index(field, section) + ignore = (section not in self.valid) or self._get_ignore(field_dict) + field_length = field_dict.get( "field_length", properties.MAX_FULL_REPORT_WIDTH ) - na_value = section_dict.get("missing_value") + na_value = field_dict.get("missing_value") if ignore: self.df = self.df.with_columns( - [ - pl.col(order).str.slice(field_length).name.keep(), - pl.col("missing_values") - .list.concat(pl.lit([index])) - .name.keep(), - ] + pl.col(section) + .str.slice(field_length) + .str.strip_prefix(delimiter) + .name.keep(), ) continue - missing_expr = pl.col(index).eq("") | pl.col(index).is_null() + missing_map = {"": None} if na_value is not None: - missing_expr = missing_expr | pl.col("index").eq(na_value) + missing_map[na_value] = None self.df = self.df.with_columns( [ - pl.col(order) + # If section not present in a row, then both these are null + pl.col(section) .str.slice(0, field_length) .str.strip_chars(" ") + .replace(missing_map) .alias(index), - pl.col(order).str.slice(field_length).name.keep(), + pl.col(section).str.slice(field_length).name.keep(), + ( + # Handle missing sections + pl.when(pl.col(section).is_null()) + .then(pl.col("missing_values").list.concat(pl.lit([index]))) + .otherwise(pl.col("missing_values")) + .alias("missing_values") + ), ] - ).with_columns( - pl.when(missing_expr) - .then(pl.col("missing_values").list.concat(pl.lit([index]))) - .otherwise(pl.col("missing_values")) - .alias("missing_values") ) - if delimiter is not None: self.df = self.df.with_columns( - pl.col(order).str.strip_prefix(delimiter).name.keep() + pl.col(section).str.strip_prefix(delimiter).name.keep() ) - self.df = self.df.drop([order]) + self.df = self.df.drop([section]) - continue return self.df.drop("full_str") def open_pandas(self): diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 0128a36a..15ff55f1 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -115,6 +115,27 @@ def get_years_from_datetime(date): index = mask[mask].index return df.iloc[index].reset_index(drop=True) + # def _select_years(self, df: pl.DataFrame): + # if self.year_init is None and self.year_end is None: + # return df + # + # data_model = self.imodel.split("_")[0] + # dates = df.get_column(properties.year_column[data_model]) + # if dates.dtype == pl.Datetime: + # years = dates.dt.year() + # else: + # years = dates.cast(pl.Int64, strict=False) + # + # mask = pl.repeat(True, df.height, eager=True) + # if self.year_init and self.year_end: + # mask = years.is_between(self.year_init, self.year_end, closed="both") + # elif self.year_init: + # mask = years.ge(self.year_init) + # elif self.year_end: + # mask = years.le(self.year_end) + # + # return df.filter(mask) + def _read_pandas(self, **kwargs): return pd.read_fwf( self.source, @@ -151,7 +172,7 @@ def _read_sections( ): if open_with == "polars": df = Configurator( - df=pl.from_pandas(TextParser).rename({"0": "full_str"}), + df=TextParser, schema=self.schema, order=order, valid=valid, @@ -181,6 +202,7 @@ def _read_sections( else: missing_values_ = df["missing_values"] del df["missing_values"] + df = self._select_years(df) missing_values = set_missing_values(pd.DataFrame(missing_values_), df) self.columns = df.columns @@ -247,20 +269,20 @@ def open_data( """DOCUMENTATION.""" if open_with == "netcdf": TextParser = self._read_netcdf() - elif open_with == "pandas" or open_with == "polars": + elif open_with == "pandas": TextParser = self._read_pandas( encoding=self.schema["header"].get("encoding"), widths=[properties.MAX_FULL_REPORT_WIDTH], skiprows=self.skiprows, chunksize=chunksize, ) - # elif open_with == "polars": - # TextParser = self._read_polars( - # # encoding=self.schema["header"].get("encoding"), - # # widths=[properties.MAX_FULL_REPORT_WIDTH], - # # skiprows=self.skiprows, - # # chunksize=chunksize, - # ) + elif open_with == "polars": + TextParser = self._read_polars( + # encoding=self.schema["header"].get("encoding"), + # widths=[properties.MAX_FULL_REPORT_WIDTH], + # skiprows=self.skiprows, + # chunksize=chunksize, + ) else: raise ValueError("open_with has to be one of ['pandas', 'netcdf']") From c129c9b003b6cf93d9849167a7901f71478bdf4d Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Tue, 28 Jan 2025 14:53:44 +0000 Subject: [PATCH 08/46] ignore: add uv.lock --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 5471529b..f49e3a69 100755 --- a/.gitignore +++ b/.gitignore @@ -105,3 +105,6 @@ ENV/ # IDE settings .vscode/ .idea/ + +# UV +uv.lock From 58bb58da3fc08793faf2bc5c5bf05e008e55fdad Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Wed, 29 Jan 2025 12:09:20 +0000 Subject: [PATCH 09/46] refactor: return polars Frame after reading netCDF --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 7994030e..aaafa187 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -405,4 +405,4 @@ def replace_empty_strings(series): df[column] = np.nan df = df.apply(lambda x: replace_empty_strings(x)) df["missing_values"] = [missing_values] * len(df) - return df + return pl.from_pandas(df) From 3f7f5a5065486e5def297df0bd24c3f915829daa Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Wed, 29 Jan 2025 12:09:54 +0000 Subject: [PATCH 10/46] fix: use ":" as delimiter in column names --- .../mdf_reader/utils/configurator.py | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index aaafa187..6adadc3f 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -11,7 +11,6 @@ import polars as pl from .. import properties -from ..properties import internal_delimiter from . import converters, decoders from .utilities import convert_dtypes @@ -50,13 +49,7 @@ def _get_index(self, section, order): if len(self.orders) == 1: return section else: - return (order, section) - - def _get_polars_index(self, section, order): - if len(self.orders) == 1: - return section - else: - return internal_delimiter.join([order, section]) + return ":".join([order, section]) def _get_ignore(self, section_dict): ignore = section_dict.get("ignore") @@ -157,7 +150,9 @@ def get_configuration(self): def open_polars(self) -> pl.DataFrame: """Open TextParser to a pl.DataFrame""" - self.df = self.df.with_columns(pl.lit([]).alias("missing_values")) + self.df = self.df.with_columns( + pl.lit([]).alias("missing_values") + ).with_row_index("index") for section in self.orders: header = self.schema["sections"][section]["header"] @@ -206,9 +201,7 @@ def open_polars(self) -> pl.DataFrame: if delimiter_format == "delimited": # Read as CSV field_names = fields.keys() - field_names = [ - self._get_polars_index(section, x) for x in field_names - ] + field_names = [self._get_index(section, x) for x in field_names] n_fields = len(field_names) self.df = self.df.with_columns( pl.col(section) @@ -223,10 +216,6 @@ def open_polars(self) -> pl.DataFrame: continue elif field_layout != "fixed_width": - # logging.error( - # f"Delimiter for {order} is set to {delimiter}. " - # + f"Please specify either format or field_layout in your header schema {header}." - # ) raise ValueError( f"Delimiter for {section} is set to {delimiter}. " + f"Please specify either format or field_layout in your header schema {header}." @@ -234,7 +223,7 @@ def open_polars(self) -> pl.DataFrame: # Loop through fixed-width fields for field, field_dict in fields.items(): - index = self._get_polars_index(field, section) + index = self._get_index(field, section) ignore = (section not in self.valid) or self._get_ignore(field_dict) field_length = field_dict.get( "field_length", properties.MAX_FULL_REPORT_WIDTH @@ -242,6 +231,7 @@ def open_polars(self) -> pl.DataFrame: na_value = field_dict.get("missing_value") if ignore: + # Move to next field self.df = self.df.with_columns( pl.col(section) .str.slice(field_length) From 9263d4e379b94b81f8c72fd9cd9a517f3cc623d9 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Wed, 29 Jan 2025 12:11:36 +0000 Subject: [PATCH 11/46] refactor: use polars operations. Remove chunksize option for polars. --- .../mdf_reader/utils/filereader.py | 275 ++++++++---------- 1 file changed, 115 insertions(+), 160 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 15ff55f1..c09e1ac4 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -91,63 +91,57 @@ def _adjust_schema(self, ds, dtypes): attr ] - def _select_years(self, df): - def get_years_from_datetime(date): - try: - return date.year - except AttributeError: - return date - + def _select_years(self, df: pl.DataFrame) -> pl.DataFrame: if self.year_init is None and self.year_end is None: return df + if self.imodel is None: + logging.error("Selection of years is not supported for custom schema") + return df + data_model = self.imodel.split("_")[0] - dates = df[properties.year_column[data_model]] - years = dates.apply(lambda x: get_years_from_datetime(x)) - years = years.astype(int) + dates = df.get_column(properties.year_column[data_model]) + if dates.dtype == pl.Datetime: + years = dates.dt.year() + else: + years = dates.cast(pl.Int64, strict=False) - mask = pd.Series([True] * len(years)) - if self.year_init: - mask[years < self.year_init] = False - if self.year_end: - mask[years > self.year_end] = False + mask = pl.repeat(True, df.height, eager=True) + if self.year_init and self.year_end: + mask = years.is_between(self.year_init, self.year_end, closed="both") + elif self.year_init: + mask = years.ge(self.year_init) + elif self.year_end: + mask = years.le(self.year_end) - index = mask[mask].index - return df.iloc[index].reset_index(drop=True) + return df.filter(mask) - # def _select_years(self, df: pl.DataFrame): - # if self.year_init is None and self.year_end is None: - # return df + # def _read_pandas(self, **kwargs): + # return pd.read_fwf( + # self.source, + # header=None, + # quotechar="\0", + # escapechar="\0", + # dtype=object, + # skip_blank_lines=False, + # **kwargs, + # ) # - # data_model = self.imodel.split("_")[0] - # dates = df.get_column(properties.year_column[data_model]) - # if dates.dtype == pl.Datetime: - # years = dates.dt.year() - # else: - # years = dates.cast(pl.Int64, strict=False) - # - # mask = pl.repeat(True, df.height, eager=True) - # if self.year_init and self.year_end: - # mask = years.is_between(self.year_init, self.year_end, closed="both") - # elif self.year_init: - # mask = years.ge(self.year_init) - # elif self.year_end: - # mask = years.le(self.year_end) - # - # return df.filter(mask) - - def _read_pandas(self, **kwargs): - return pd.read_fwf( - self.source, - header=None, - quotechar="\0", - escapechar="\0", - dtype=object, - skip_blank_lines=False, - **kwargs, - ) - - def _read_polars(self, **kwargs): + def _read_fwf_polars(self, **kwargs): + if "chunksize" in kwargs: + logging.warn("Chunking not supported by polars reader") + batch_size = kwargs["chunksize"] + del kwargs["chunksize"] + # return pl.read_csv_batched( + # self.source, + # has_header=False, + # separator="\0", + # new_columns=["full_str"], + # quote_char="\0", + # infer_schema_length=0, + # batch_size=batch_size, + # **kwargs, + # ) return pl.read_csv( self.source, has_header=False, @@ -171,42 +165,24 @@ def _read_sections( open_with, ): if open_with == "polars": - df = Configurator( - df=TextParser, - schema=self.schema, - order=order, - valid=valid, - ).open_polars() - elif open_with == "pandas": df = Configurator( df=TextParser, schema=self.schema, order=order, valid=valid - ).open_pandas() + ).open_polars() + # elif open_with == "pandas": + # df = Configurator(df=TextParser, schema=self.schema, order=order, valid=valid).open_pandas() elif open_with == "netcdf": df = Configurator( df=TextParser, schema=self.schema, order=order, valid=valid ).open_netcdf() else: - raise ValueError( - "open_with has to be one of ['pandas', 'polars', 'netcdf']" - ) + raise ValueError("open_with has to be one of ['polars', 'netcdf']") - if isinstance(df, pl.DataFrame): - missing_values_ = df.get_column("missing_values").to_pandas() - df = df.drop("missing_values").to_pandas() - df.columns = pd.MultiIndex.from_tuples( - [ - tuple(x.split(properties.internal_delimiter, maxsplit=1)) - for x in df.columns - ] - ) - else: - missing_values_ = df["missing_values"] - del df["missing_values"] + missing_values = df.select(["index", "missing_values"]).pipe(set_missing_values) + df = df.drop("missing_values").pipe(self._select_years) - df = self._select_years(df) - missing_values = set_missing_values(pd.DataFrame(missing_values_), df) self.columns = df.columns - df = df.where(df.notnull(), np.nan) + # Replace None with NaN - is this necessary for polars? + # df = df.where(df.notnull(), np.nan) return df, missing_values def get_configurations(self, order, valid): @@ -263,97 +239,76 @@ def open_data( self, order, valid, - chunksize, - open_with="pandas", + # chunksize, + open_with="polars", ): """DOCUMENTATION.""" if open_with == "netcdf": TextParser = self._read_netcdf() - elif open_with == "pandas": - TextParser = self._read_pandas( - encoding=self.schema["header"].get("encoding"), - widths=[properties.MAX_FULL_REPORT_WIDTH], - skiprows=self.skiprows, - chunksize=chunksize, - ) + # TODO: Chunk? polars does have pl.read_csv_batched, but batch_size is + # not respected: https://github.com/pola-rs/polars/issues/19978 + # alternative: lazy? elif open_with == "polars": - TextParser = self._read_polars( - # encoding=self.schema["header"].get("encoding"), - # widths=[properties.MAX_FULL_REPORT_WIDTH], - # skiprows=self.skiprows, + TextParser = self._read_fwf_polars( + encoding=self.schema["header"].get("encoding"), + skip_rows=self.skiprows, # chunksize=chunksize, ) else: - raise ValueError("open_with has to be one of ['pandas', 'netcdf']") + raise ValueError("open_with has to be one of ['polars', 'netcdf']") - if isinstance(TextParser, (pd.DataFrame, pl.DataFrame)) or isinstance( - TextParser, xr.Dataset - ): - df, self.missing_values = self._read_sections( - TextParser, order, valid, open_with=open_with - ) - return df, df.isna() - else: - data_buffer = StringIO() - missings_buffer = StringIO() - isna_buffer = StringIO() - for i, df_ in enumerate(TextParser): - df, missing_values = self._read_sections( - df_, order, valid, open_with=open_with - ) - df_isna = df.isna() - missing_values.to_csv( - missings_buffer, - header=False, - mode="a", - encoding="utf-8", - index=False, - ) - df_isna.to_csv( - isna_buffer, - header=False, - mode="a", - index=False, - quoting=csv.QUOTE_NONE, - sep=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - df.to_csv( - data_buffer, - header=False, - mode="a", - encoding="utf-8", - index=False, - quoting=csv.QUOTE_NONE, - sep=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - missings_buffer.seek(0) - self.missing_values = pd.read_csv( - missings_buffer, - names=missing_values.columns, - chunksize=None, - ) - data_buffer.seek(0) - data = pd.read_csv( - data_buffer, - names=df.columns, - chunksize=self.chunksize, - dtype=object, - parse_dates=self.parse_dates, - delimiter=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - isna_buffer.seek(0) - isna = pd.read_csv( - isna_buffer, - names=df.columns, - chunksize=self.chunksize, - delimiter=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - return data, isna + # if isinstance(TextParser, (pl.DataFrame, xr.Dataset)): + df, self.missing_values = self._read_sections( + TextParser, order, valid, open_with=open_with + ) + return df, df.select(pl.all().is_null()) + # else: + # data_buffer = StringIO() + # missings_buffer = StringIO() + # isna_buffer = StringIO() + # df = pl.DataFrame() + # missing_values = pl.DataFrame() + # for _, df_ in enumerate(TextParser): + # df, missing_values = self._read_sections(df_, order, valid, open_with=open_with) + # df_isna = df.select(pl.all().is_null()) + # # utf-8 is default for polars + # missing_values.drop("index").write_csv( + # missings_buffer, + # include_header=False, + # ) + # df_isna.drop("index").write_csv( + # isna_buffer, + # include_header=False, + # separator=properties.internal_delimiter, + # quote_char="\0", + # ) + # df.drop("index").write_csv( + # data_buffer, + # include_header=False, + # separator=properties.internal_delimiter, + # quote_char="\0", + # ) + # missings_buffer.seek(0) + # self.missing_values = pl.read_csv( + # missings_buffer, + # columns=missing_values.columns, + # ) + # data_buffer.seek(0) + # data = pl.read_csv( + # data_buffer, + # names=df.columns, + # chunksize=self.chunksize, + # dtype=object, + # parse_dates=self.parse_dates, + # delimiter=properties.internal_delimiter, + # quotechar="\0", + # escapechar="\0", + # ) + # isna_buffer.seek(0) + # isna = pl.read_csv( + # isna_buffer, + # columns=df.columns, + # separator=properties.internal_delimiter, + # quote_char="\0", + # ) + # return data, isna From e8b1219ef5f0bbd123a95b2c26813054ce1efadf Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Wed, 29 Jan 2025 12:12:31 +0000 Subject: [PATCH 12/46] refactor: set_missing_values to polars --- .../mdf_reader/utils/utilities.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index 60e57d8b..07ce71c4 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -5,7 +5,7 @@ import logging import os -import pandas as pd +import polars as pl def convert_dtypes(dtypes): @@ -74,18 +74,15 @@ def decode_entries(series, decoder_func): return decoder_func(series) -def set_missing_values(df, ref): +def set_missing_values(df: pl.DataFrame) -> pl.DataFrame: """DOCUMENTATION.""" - explode_ = df.explode("missing_values") - explode_["index"] = explode_.index - explode_["values"] = True - pivots_ = explode_.pivot_table( - columns="missing_values", - index="index", - values="values", + # QUESTION: Do I need to re-order the columns here? + return ( + df.explode(columns="missing_values") + .with_columns(pl.lit(True).alias("values")) + .pivot("missing_values", index="index", values="values") + .fill_null(False) ) - missing_values = pd.DataFrame(data=pivots_, columns=ref.columns, index=ref.index) - return missing_values.notna() def create_mask(df, isna, missing_values=[]): From 61a52168b19f89b5399522adaadca9c1bcf97d52 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Wed, 29 Jan 2025 12:13:11 +0000 Subject: [PATCH 13/46] feat: add polars dtypes --- .../mdf_reader/schemas/schemas.py | 54 ++++++++++++++----- 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/schemas/schemas.py b/cdm_reader_mapper/mdf_reader/schemas/schemas.py index 1a0d115c..6dac7964 100755 --- a/cdm_reader_mapper/mdf_reader/schemas/schemas.py +++ b/cdm_reader_mapper/mdf_reader/schemas/schemas.py @@ -11,6 +11,7 @@ import logging import os +import polars as pl from pathlib import Path from cdm_reader_mapper.common.json_dict import collect_json_files, combine_dicts @@ -21,22 +22,49 @@ def convert_dtype_to_default(dtype, section, element): """Convert data type to defaults (int, float).""" if dtype is None: - return - elif dtype == "float": - return dtype - elif dtype == "int": - return properties.pandas_int - elif "float" in dtype.lower(): + return # pl.String + # TODO: replace with match-case statement? + dtype = dtype.lower() + if dtype == "float" or dtype == "float64": + return pl.Float64 + elif dtype == "float32": + return pl.Float32 + elif "float" in dtype: + logging.warning( + f"Set column type of ({section}, {element}) from deprecated {dtype} to float 32." + ) + return pl.Float32 + elif dtype == "int" or dtype == "int64": + return pl.Int64 + elif dtype == "int32": + return pl.Int32 + elif dtype == "int16": + return pl.Int16 + elif dtype == "int8": + return pl.Int8 + elif dtype == "uint" or dtype == "uint64": + return pl.UInt64 + elif dtype == "uint32": + return pl.Int32 + elif dtype == "uint16": + return pl.Int16 + elif dtype == "uint8": + return pl.Int8 + elif "uint" in dtype: logging.warning( - f"Set column type of ({section}, {element}) from deprecated {dtype} to float." + f"Set column type of ({section}, {element}) from deprecated {dtype} to uint 32." ) - return "float" - elif "int" in dtype.lower(): + return pl.UInt32 + elif "int" in dtype: logging.warning( - f"Set column type of ({section}, {element}) from deprecated {dtype} to int." + f"Set column type of ({section}, {element}) from deprecated {dtype} to int 32." ) - return properties.pandas_int - return dtype + return pl.Int32 + elif dtype in ["datetime", "time", "date"]: + return pl.Datetime + elif dtype == "key": + return pl.Categorical + return pl.String def _read_schema(schema): @@ -154,7 +182,7 @@ def read_schema(imodel=None, ext_schema_path=None, ext_schema_file=None): else: imodel = imodel.split("_") if imodel[0] not in properties.supported_data_models: - logging.error("Input data model " f"{imodel[0]}" " not supported") + logging.error(f"Input data model {imodel[0]} not supported") return schema_files = collect_json_files(*imodel, base=f"{properties._base}.schemas") From 476e6c4a9fdb1bb94416cd8e23c89e347e559478 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Wed, 29 Jan 2025 12:13:21 +0000 Subject: [PATCH 14/46] refactor: decoders into polars --- .../mdf_reader/utils/decoders.py | 123 ++++++++++-------- 1 file changed, 66 insertions(+), 57 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/decoders.py b/cdm_reader_mapper/mdf_reader/utils/decoders.py index 12f5a2b1..1b736ea3 100755 --- a/cdm_reader_mapper/mdf_reader/utils/decoders.py +++ b/cdm_reader_mapper/mdf_reader/utils/decoders.py @@ -2,10 +2,11 @@ from __future__ import annotations +import logging import string import numpy as np -import pandas as pd +import polars as pl from .. import properties @@ -36,72 +37,80 @@ def _get_overpunch_factor(): return overpunch_factor -def _get_n(x, overpunch_number): - return ( - "".join(list(map(lambda x: overpunch_number.get(x, np.nan), list(x)))) - if x == x - else np.nan - ) - - -def _get_f(x, overpunch_factor): - return ( - np.prod(list(map(lambda x: overpunch_factor.get(x, np.nan), list(x)))) - if x == x - else np.nan - ) - - -def _get_converted(n, f): - return f * int(n) if f and n and n == n and f == f else np.nan - - -def signed_overpunch_i(x): - """DOCUMENTATION.""" - # Blanks and np.nan as missing data - # In TDF-11, mix of overpunch and no overpunch: include integers in dictionary - # Define decoding dictionary: should do this smart-like: None where non-existing keys!!!! - overpunch_number = _get_overpunch_number() - overpunch_factor = _get_overpunch_factor() - try: - n = _get_n(x, overpunch_number) - f = _get_f(x, overpunch_factor) - return _get_converted(n, f) - except Exception as e: - print(f"ERROR decoding element: {x}") - print(e) - print("Conversion sequence:") - try: - print(f"number base conversion: {n}") - except Exception: - print("number base conversion not defined") - try: - print(f"factor conversion: {f}") - except Exception: - print("factor conversion not defined") - return np.nan - - class df_decoders: """DOCUMENTATION.""" def __init__(self, dtype): # Return as object, conversion to actual type in converters only! - self.dtype = "object" - - def signed_overpunch(self, data): + self.dtype = pl.String + + def _check_decode( + self, data: pl.Series, decoded: pl.Series, threshold: int, method: str + ): + if ( + bad_decode := data.filter(decoded.is_null() & data.is_not_null()) + ).len() > 0: + msg = f"Have {bad_decode.len()} values that failed to be {method} decoded" + if bad_decode.len() <= threshold: + msg += f": values = {', '.join(bad_decode)}" + logging.warning(msg) + return None + + def signed_overpunch(self, data: pl.Series): """DOCUMENTATION.""" - decoded_numeric = np.vectorize(signed_overpunch_i, otypes=[float])(data) - return pd.Series(decoded_numeric, dtype=self.dtype) + # Blanks and np.nan as missing data + # In TDF-11, mix of overpunch and no overpunch: include integers in dictionary + # Define decoding dictionary: should do this smart-like: None where non-existing keys!!!! + overpunch_number = _get_overpunch_number() + overpunch_factor = _get_overpunch_factor() + + name = data.name + decoded = ( + data.str.to_uppercase() # QUESTION: Do I want to uppercase?? + .str.split("") + .to_frame() + .with_row_index("i") + .explode(name) + .with_columns( + [ + pl.col(name).replace(overpunch_number).alias("opn"), + ( + pl.col(name) + .replace_strict(overpunch_factor, default=np.nan) + .alias("opf") + ), + ] + ) + .group_by("i") + .agg(name, pl.col("opn"), pl.col("opf").product()) + .with_columns( + [ + pl.col(name).list.join(""), + pl.col("opn").list.join("").str.to_integer(strict=False), + ] + ) + .with_columns( + (pl.col("opn") * pl.col("opf")).cast(self.dtype).alias("conv") + ) + .fill_nan(None) + .get_column("conv") + ) + self._check_decode(data, decoded, 20, "signed_overpunch") + return decoded - def base36(self, data): + def base36(self, data: pl.Series): """DOCUMENTATION.""" # Caution: int(str(np.nan),36) ==> 30191 - data = data.apply( - lambda x: np.nan if isinstance(x, str) and (x.isspace() or not x) else x + decoded = ( + data.fill_nan(None) + .str.strip_chars(" ") + .replace({"": None}) + .str.to_integer(base=36, strict=False) + .cast(self.dtype) ) - data = [str(int(str(i), 36)) if i == i and i else np.nan for i in data] - return pd.Series(data, dtype=self.dtype) + + self._check_decode(data, decoded, 20, "base36") + return decoded decoders = dict() From f8a78b7656b46be626f80895b5b7bc86aa0defab Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Wed, 29 Jan 2025 12:13:57 +0000 Subject: [PATCH 15/46] refactor: converters to polars --- .../mdf_reader/utils/converters.py | 70 ++++++++++++------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/converters.py b/cdm_reader_mapper/mdf_reader/utils/converters.py index 68ff29c8..611ca974 100755 --- a/cdm_reader_mapper/mdf_reader/utils/converters.py +++ b/cdm_reader_mapper/mdf_reader/utils/converters.py @@ -2,8 +2,11 @@ from __future__ import annotations +import logging + import numpy as np import pandas as pd +import polars as pl from .. import properties @@ -16,6 +19,30 @@ def __init__(self, dtype): self.numeric_scale = 1.0 if self.dtype == "float" else 1 self.numeric_offset = 0.0 if self.dtype == "float" else 0 + def _check_conversion(self, data: pl.Series, converted: pl.Series, threshold: int): + if ( + bad_converts := data.filter(converted.is_null() & data.is_not_null()) + ).len() > 0: + msg = f"Have {bad_converts.len()} values that failed to be converted to {self.dtype}" + if bad_converts.len() <= threshold: + msg += f": values = {', '.join(bad_converts)}" + logging.warning(msg) + return None + + def _drop_whitespace_vals(self, data: pl.Series): + data_name = data.name + return ( + data.to_frame() + .select( + pl.when(data.str.contains(r"^\s*$")) + .then(pl.lit(None)) + .otherwise(data) + .alias(data_name) + ) + .get_column(data_name) + ) + + # May not be needed? def decode(self, data): """Decode object type elements of a pandas series to UTF-8.""" decoded = data.str.decode("utf-8") @@ -23,25 +50,20 @@ def decode(self, data): return data return decoded - def to_numeric(self, data): + def to_numeric(self, data: pl.Series): """Convert object type elements of a pandas series to numeric type.""" - data = data.apply( - lambda x: np.nan if isinstance(x, str) and (x.isspace() or not x) else x - ) - + data = self._drop_whitespace_vals(data) # str method fails if all nan, pd.Series.replace method is not the same # as pd.Series.str.replace! - if data.count() > 0: - data = self.decode(data) - data = data.str.strip() - data = data.str.replace(" ", "0") + data = data.str.strip_chars().str.replace_all(" ", "0") + + converted = data.cast(self.dtype, strict=False) + self._check_conversion(data, converted, 20) # Convert to numeric, then scale (?!) and give it's actual int type - return pd.to_numeric( - data, errors="coerce" - ) # astype fails on strings, to_numeric manages errors....! + return converted - def object_to_numeric(self, data, scale=None, offset=None): + def object_to_numeric(self, data: pl.Series, scale=None, offset=None): """ Convert the object type elements of a pandas series to numeric type. @@ -75,30 +97,30 @@ def object_to_numeric(self, data, scale=None, offset=None): data = self.to_numeric(data) data = offset + data * scale - return pd.Series(data, dtype=self.dtype) + return data.cast(self.dtype) - def object_to_object(self, data, disable_white_strip=False): + def object_to_object(self, data: pl.Series, disable_white_strip=None): """DOCUMENTATION.""" # With strip() an empty element after stripping, is just an empty element, no NaN... if data.dtype != "object": return data data = self.decode(data) - if not disable_white_strip: - data = data.str.strip() + if disable_white_strip is None: + data = data.str.strip_chars(" ") else: if disable_white_strip == "l": - data = data.str.rstrip() + data = data.str.strip_chars_end(" ") elif disable_white_strip == "r": - data = data.str.lstrip() - return data.apply( - lambda x: np.nan if isinstance(x, str) and (x.isspace() or not x) else x - ) + data = data.str.strip_chars_start(" ") + return self._drop_whitespace_vals(data) - def object_to_datetime(self, data, datetime_format="%Y%m%d"): + def object_to_datetime(self, data: pl.Series, datetime_format="%Y%m%d"): """DOCUMENTATION.""" if data.dtype != "object": return data - return pd.to_datetime(data, format=datetime_format, errors="coerce") + converted = data.str.to_datetime(format=datetime_format, strict=False) + self._check_conversion(data, converted, 20) + return converted converters = dict() From 0ee0297419f0b1bb55513382eb5a1bd4575802f6 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Wed, 29 Jan 2025 12:16:05 +0000 Subject: [PATCH 16/46] refactor: remove chunk looping from convert_and_decode_entries --- cdm_reader_mapper/mdf_reader/read.py | 57 +++++----------------------- 1 file changed, 9 insertions(+), 48 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/read.py b/cdm_reader_mapper/mdf_reader/read.py index b83dca35..d7213515 100755 --- a/cdm_reader_mapper/mdf_reader/read.py +++ b/cdm_reader_mapper/mdf_reader/read.py @@ -3,11 +3,11 @@ from __future__ import annotations import ast -import csv import logging from io import StringIO as StringIO import pandas as pd +import polars as pl from cdm_reader_mapper.common.json_dict import open_json_file from cdm_reader_mapper.common.pandas_TextParser_hdlr import make_copy @@ -85,52 +85,14 @@ def convert_and_decode_entries( if decode is not True: decoder_dict = {} - if isinstance(self.data, pd.DataFrame): - dtype = adjust_dtype(dtype, self.data) - data = self.convert_and_decode_df( - self.data, - converter_dict, - converter_kwargs, - decoder_dict, - ) - self.data = data.astype(dtype) - else: - data_buffer = StringIO() - TextParser = make_copy(self.data) - for i, df_ in enumerate(TextParser): - df = self.convert_and_decode_df( - df_, - converter_dict, - converter_kwargs, - decoder_dict, - ) - df.to_csv( - data_buffer, - header=False, - mode="a", - encoding="utf-8", - index=False, - quoting=csv.QUOTE_NONE, - sep=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - date_columns = [] - for i, element in enumerate(list(dtype)): - if dtype.get(element) == "datetime": - date_columns.append(i) - dtype = adjust_dtype(dtype, df) - data_buffer.seek(0) - self.data = pd.read_csv( - data_buffer, - names=df.columns, - chunksize=self.chunksize, - dtype=dtype, - parse_dates=date_columns, - delimiter=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) + dtype = adjust_dtype(dtype, self.data) + data = self.convert_and_decode_df( + self.data, + converter_dict, + converter_kwargs, + decoder_dict, + ) + self.data = data.cast(dtype) return self def validate_entries(self, validate): @@ -229,7 +191,6 @@ def read( # INFO: Set default as "pandas" to account for custom schema # open_with=properties.open_file.get(self.imodel, "pandas"), open_with=properties.open_file.get(self.imodel, "polars"), - chunksize=chunksize, ) # 2.3. Extract, read and validate data in same loop From 2eca5d002471f2919a7c329a50b290e9c2d784d4 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 30 Jan 2025 08:46:10 +0000 Subject: [PATCH 17/46] refactor: update properties to reflect polars, minor corrections --- cdm_reader_mapper/mdf_reader/properties.py | 20 ++++++++++--------- .../mdf_reader/schemas/schemas.py | 4 ++-- .../mdf_reader/utils/configurator.py | 11 +++++----- .../mdf_reader/utils/converters.py | 11 +++++----- .../mdf_reader/utils/decoders.py | 8 +++++--- .../mdf_reader/utils/filereader.py | 8 ++++---- cdm_reader_mapper/mdf_reader/validate.py | 2 +- cdm_reader_mapper/properties.py | 18 +++++++++++++++-- 8 files changed, 51 insertions(+), 31 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/properties.py b/cdm_reader_mapper/mdf_reader/properties.py index 5289d9ae..c66548ff 100755 --- a/cdm_reader_mapper/mdf_reader/properties.py +++ b/cdm_reader_mapper/mdf_reader/properties.py @@ -3,6 +3,7 @@ from __future__ import annotations from ..properties import numeric_types, object_types, supported_data_models # noqa +import polars as pl _base = "cdm_reader_mapper.mdf_reader" @@ -16,22 +17,23 @@ "craid": ("drifter_measurements", "JULD"), } -pandas_dtypes = {} +polars_dtypes = {} for dtype in object_types: - pandas_dtypes[dtype] = "object" -pandas_dtypes.update({x: x for x in numeric_types}) -pandas_dtypes["datetime"] = "datetime" + polars_dtypes[dtype] = pl.String +polars_dtypes.update({x: x for x in numeric_types}) +polars_dtypes[pl.Datetime] = pl.Datetime -pandas_int = "Int64" +polars_int = pl.Int64 # ....and how they are managed data_type_conversion_args = {} for dtype in numeric_types: data_type_conversion_args[dtype] = ["scale", "offset"] -data_type_conversion_args["str"] = ["disable_white_strip"] -data_type_conversion_args["object"] = ["disable_white_strip"] -data_type_conversion_args["key"] = ["disable_white_strip"] -data_type_conversion_args["datetime"] = ["datetime_format"] +data_type_conversion_args[pl.Utf8] = ["disable_white_strip"] +data_type_conversion_args[pl.String] = ["disable_white_strip"] +data_type_conversion_args[pl.Categorical] = ["disable_white_strip"] +data_type_conversion_args[pl.Object] = ["disable_white_strip"] +data_type_conversion_args[pl.Datetime] = ["datetime_format"] # Misc ------------------------------------------------------------------------ dummy_level = "_SECTION_" diff --git a/cdm_reader_mapper/mdf_reader/schemas/schemas.py b/cdm_reader_mapper/mdf_reader/schemas/schemas.py index 6dac7964..38e50ad5 100755 --- a/cdm_reader_mapper/mdf_reader/schemas/schemas.py +++ b/cdm_reader_mapper/mdf_reader/schemas/schemas.py @@ -62,8 +62,8 @@ def convert_dtype_to_default(dtype, section, element): return pl.Int32 elif dtype in ["datetime", "time", "date"]: return pl.Datetime - elif dtype == "key": - return pl.Categorical + elif dtype == "key" or dtype == "str" or dtype == "object": + return pl.String return pl.String diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 6adadc3f..d8849276 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -58,7 +58,7 @@ def _get_ignore(self, section_dict): return ignore def _get_dtype(self): - return properties.pandas_dtypes.get(self.sections_dict.get("column_type")) + return properties.polars_dtypes.get(self.sections_dict.get("column_type")) def _get_converter(self): return converters.get(self.sections_dict.get("column_type")) @@ -233,11 +233,12 @@ def open_polars(self) -> pl.DataFrame: if ignore: # Move to next field self.df = self.df.with_columns( - pl.col(section) - .str.slice(field_length) - .str.strip_prefix(delimiter) - .name.keep(), + pl.col(section).str.slice(field_length).name.keep(), ) + if delimiter is not None: + self.df = self.df.with_columns( + pl.col(section).str.strip_prefix(delimiter).name.keep() + ) continue missing_map = {"": None} diff --git a/cdm_reader_mapper/mdf_reader/utils/converters.py b/cdm_reader_mapper/mdf_reader/utils/converters.py index 611ca974..ee3ebc80 100755 --- a/cdm_reader_mapper/mdf_reader/utils/converters.py +++ b/cdm_reader_mapper/mdf_reader/utils/converters.py @@ -93,7 +93,7 @@ def object_to_numeric(self, data: pl.Series, scale=None, offset=None): """ scale = scale if scale else self.numeric_scale offset = offset if offset else self.numeric_offset - if data.dtype == "object": + if not data.dtype.is_numeric(): data = self.to_numeric(data) data = offset + data * scale @@ -126,7 +126,8 @@ def object_to_datetime(self, data: pl.Series, datetime_format="%Y%m%d"): converters = dict() for dtype in properties.numeric_types: converters[dtype] = df_converters(dtype).object_to_numeric -converters["datetime"] = df_converters("datetime").object_to_datetime -converters["str"] = df_converters("str").object_to_object -converters["object"] = df_converters("object").object_to_object -converters["key"] = df_converters("key").object_to_object +converters[pl.Datetime] = df_converters(pl.Datetime).object_to_datetime +converters[pl.String] = df_converters(pl.String).object_to_object +converters[pl.Utf8] = df_converters(pl.Utf8).object_to_object +converters[pl.Object] = df_converters(pl.Object).object_to_object +converters[pl.Categorical] = df_converters(pl.Categorical).object_to_object diff --git a/cdm_reader_mapper/mdf_reader/utils/decoders.py b/cdm_reader_mapper/mdf_reader/utils/decoders.py index 1b736ea3..ef41ca0c 100755 --- a/cdm_reader_mapper/mdf_reader/utils/decoders.py +++ b/cdm_reader_mapper/mdf_reader/utils/decoders.py @@ -102,7 +102,7 @@ def base36(self, data: pl.Series): """DOCUMENTATION.""" # Caution: int(str(np.nan),36) ==> 30191 decoded = ( - data.fill_nan(None) + data.replace({"NaN": None}) .str.strip_chars(" ") .replace({"": None}) .str.to_integer(base=36, strict=False) @@ -118,9 +118,11 @@ def base36(self, data: pl.Series): decoders["signed_overpunch"] = dict() for dtype in properties.numeric_types: decoders["signed_overpunch"][dtype] = df_decoders(dtype).signed_overpunch -decoders["signed_overpunch"]["key"] = df_decoders("key").signed_overpunch +decoders["signed_overpunch"][pl.Categorical] = df_decoders( + pl.Categorical +).signed_overpunch decoders["base36"] = dict() for dtype in properties.numeric_types: decoders["base36"][dtype] = df_decoders(dtype).base36 -decoders["base36"]["key"] = df_decoders("key").base36 +decoders["base36"][pl.Categorical] = df_decoders(pl.Categorical).base36 diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index c09e1ac4..aea697a8 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -211,16 +211,16 @@ def convert_and_decode_df( df[section], decoder_dict[section], ) - decoded.index = df[section].index - df[section] = decoded + # decoded.index = df[section].index + df = df.with_columns(decoded.alias(section)) converted = convert_entries( df[section], converter_dict[section], **converter_kwargs[section], ) - converted.index = df[section].index - df[section] = converted + # converted.index = df[section].index + df = df.with_columns(converted.alias(section)) return df def validate_df(self, df, isna=None): diff --git a/cdm_reader_mapper/mdf_reader/validate.py b/cdm_reader_mapper/mdf_reader/validate.py index 3d45dc40..5583a673 100755 --- a/cdm_reader_mapper/mdf_reader/validate.py +++ b/cdm_reader_mapper/mdf_reader/validate.py @@ -90,7 +90,7 @@ def validate_codes(elements, data, schema, imodel, ext_table_path, supp=False): else list(table["_keys"].get(element)) ) dtypes = { - x: properties.pandas_dtypes.get(schema.get(x).get("column_type")) + x: properties.polars_dtypes.get(schema.get(x).get("column_type")) for x in key_elements } diff --git a/cdm_reader_mapper/properties.py b/cdm_reader_mapper/properties.py index 59668b9f..713143b6 100755 --- a/cdm_reader_mapper/properties.py +++ b/cdm_reader_mapper/properties.py @@ -1,9 +1,23 @@ """Common Data Model (CDM) reader and mapper common properties.""" from __future__ import annotations +import polars as pl -numeric_types = ["Int64", "int", "float"] +# numeric_types = ["Int64", "int", "float"] +numeric_types = [ + pl.Float64, + pl.Float32, + pl.Int64, + pl.Int32, + pl.Int16, + pl.Int8, + pl.UInt64, + pl.UInt32, + pl.UInt16, + pl.UInt8, +] -object_types = ["str", "object", "key", "datetime"] +# object_types = ["str", "object", "key", "datetime"] +object_types = [pl.String, pl.Utf8, pl.Datetime, pl.Object, pl.Categorical] supported_data_models = ["craid", "gcc", "icoads", "pub47"] From 3d12a1237df44efafc65e850e37b892d39107115 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 30 Jan 2025 08:47:53 +0000 Subject: [PATCH 18/46] docs: update and run example notebook --- docs/example_notebooks/read_overview.ipynb | 291 ++++++--------------- 1 file changed, 85 insertions(+), 206 deletions(-) diff --git a/docs/example_notebooks/read_overview.ipynb b/docs/example_notebooks/read_overview.ipynb index f1c74960..d2d941f3 100755 --- a/docs/example_notebooks/read_overview.ipynb +++ b/docs/example_notebooks/read_overview.ipynb @@ -16,7 +16,9 @@ "name": "stderr", "output_type": "stream", "text": [ - "2025-01-07 11:44:05,441 - root - INFO - init basic configure of logging success\n" + "2025-01-30 08:43:10,811 - root - INFO - init basic configure of logging success\n", + "/Users/josidd/git/github/cdm_fork/.venv/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" ] } ], @@ -27,6 +29,7 @@ "import sys\n", "\n", "import pandas as pd\n", + "import polars as pl\n", "\n", "from cdm_reader_mapper import properties, read_mdf, test_data" ] @@ -54,7 +57,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "2025-01-07 11:44:09,196 - root - INFO - Attempting to fetch remote file: icoads/r300/d704/input/icoads_r300_d704_1878-10-01_subset.imma.md5\n" + "2025-01-30 08:43:16,181 - root - INFO - Attempting to fetch remote file: icoads/r300/d704/input/icoads_r300_d704_1878-10-01_subset.imma.md5\n" ] }, { @@ -133,7 +136,7 @@ "A **schema** file gathers a collection of descriptors that enable the `mdf_reader` tool to access the content\n", "of a `data model/ schema` and extract the sections of the raw data file that contains meaningful information. These **schema files** are the `bones` of the data model, basically `.json` files outlining the structure of the incoming raw data.\n", "\n", - "The `mdf_reader` takes this information and translate the characteristics of the data to a python pandas dataframe.\n", + "The `mdf_reader` takes this information and translate the characteristics of the data to a python polars dataframe.\n", "\n", "The tool has several **schema** templates build in." ] @@ -174,19 +177,18 @@ "name": "stderr", "output_type": "stream", "text": [ - "2025-01-07 11:44:10,310 - root - INFO - READING DATA MODEL SCHEMA FILE...\n", - "2025-01-07 11:44:10,327 - root - INFO - EXTRACTING DATA FROM MODEL: icoads_r300_d704\n", - "2025-01-07 11:44:10,327 - root - INFO - Getting data string from source...\n", - "2025-01-07 11:44:11,139 - root - WARNING - Data numeric elements with missing upper or lower threshold: ('c1', 'BSI'),('c1', 'AQZ'),('c1', 'AQA'),('c1', 'UQZ'),('c1', 'UQA'),('c1', 'VQZ'),('c1', 'VQA'),('c1', 'PQZ'),('c1', 'PQA'),('c1', 'DQZ'),('c1', 'DQA'),('c5', 'OS'),('c5', 'OP'),('c5', 'FM'),('c5', 'IMMV'),('c5', 'IX'),('c5', 'W2'),('c5', 'WMI'),('c5', 'SD2'),('c5', 'SP2'),('c5', 'IS'),('c5', 'RS'),('c5', 'IC1'),('c5', 'IC2'),('c5', 'IC3'),('c5', 'IC4'),('c5', 'IC5'),('c5', 'IR'),('c5', 'RRR'),('c5', 'TR'),('c5', 'NU'),('c5', 'QCI'),('c5', 'QI1'),('c5', 'QI2'),('c5', 'QI3'),('c5', 'QI4'),('c5', 'QI5'),('c5', 'QI6'),('c5', 'QI7'),('c5', 'QI8'),('c5', 'QI9'),('c5', 'QI10'),('c5', 'QI11'),('c5', 'QI12'),('c5', 'QI13'),('c5', 'QI14'),('c5', 'QI15'),('c5', 'QI16'),('c5', 'QI17'),('c5', 'QI18'),('c5', 'QI19'),('c5', 'QI20'),('c5', 'QI21'),('c5', 'QI22'),('c5', 'QI23'),('c5', 'QI24'),('c5', 'QI25'),('c5', 'QI26'),('c5', 'QI27'),('c5', 'QI28'),('c5', 'QI29'),('c5', 'RHI'),('c5', 'AWSI'),('c6', 'FBSRC'),('c6', 'MST'),('c7', 'OPM'),('c7', 'LOT'),('c9', 'CCe'),('c9', 'WWe'),('c9', 'Ne'),('c9', 'NHe'),('c9', 'He'),('c9', 'CLe'),('c9', 'CMe'),('c9', 'CHe'),('c9', 'SBI'),('c95', 'DPRO'),('c95', 'DPRP'),('c95', 'UFR'),('c95', 'ASIR'),('c96', 'ASII'),('c97', 'ASIE'),('c99_journal', 'vessel_length'),('c99_journal', 'vessel_beam'),('c99_journal', 'hold_depth'),('c99_journal', 'tonnage'),('c99_journal', 'baro_height'),('c99_daily', 'year'),('c99_daily', 'month'),('c99_daily', 'day'),('c99_daily', 'distance'),('c99_daily', 'lat_deg_an'),('c99_daily', 'lat_min_an'),('c99_daily', 'lon_deg_an'),('c99_daily', 'lon_min_an'),('c99_daily', 'lat_deg_on'),('c99_daily', 'lat_min_on'),('c99_daily', 'lon_deg_of'),('c99_daily', 'lon_min_of'),('c99_daily', 'current_speed'),('c99_data4', 'year'),('c99_data4', 'month'),('c99_data4', 'day'),('c99_data4', 'hour'),('c99_data4', 'ship_speed'),('c99_data4', 'compass_correction'),('c99_data4', 'attached_thermometer'),('c99_data4', 'air_temperature'),('c99_data4', 'wet_bulb_temperature'),('c99_data4', 'sea_temperature'),('c99_data4', 'sky_clear'),('c99_data5', 'year'),('c99_data5', 'month'),('c99_data5', 'day'),('c99_data5', 'hour'),('c99_data5', 'ship_speed'),('c99_data5', 'attached_thermometer'),('c99_data5', 'air_temperature'),('c99_data5', 'wet_bulb_temperature'),('c99_data5', 'sea_temperature'),('c99_data5', 'sky_clear'),('c99_data5', 'compass_correction')\n", - "2025-01-07 11:44:11,139 - root - WARNING - Corresponding upper and/or lower bounds set to +/-inf for validation\n", - "2025-01-07 11:44:12,409 - root - INFO - CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL\n" + "2025-01-30 08:43:16,516 - root - INFO - READING DATA MODEL SCHEMA FILE...\n", + "2025-01-30 08:43:16,527 - root - INFO - EXTRACTING DATA FROM MODEL: icoads_r300_d704\n", + "2025-01-30 08:43:16,528 - root - INFO - Getting data string from source...\n", + "2025-01-30 08:43:16,618 - root - INFO - Extracting and reading sections\n", + "2025-01-30 08:43:16,706 - root - INFO - Create output DataBundle object\n" ] } ], "source": [ "schema = \"icoads_r300_d704\"\n", "\n", - "data = read_mdf(data_path, imodel=schema)" + "data = read_mdf(data_path, imodel=schema, validate=False)" ] }, { @@ -218,7 +220,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Now metadata information can be extracted as a component of the padas dataframe." + "The full output `polars.DataFrame` can be accessed:" ] }, { @@ -229,196 +231,30 @@ { "data": { "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
sentinalreel_nojournal_noframe_noship_namejournal_edrigship_materialvessel_typevessel_length...hold_depthtonnagebaro_typebaro_heightbaro_cdatebaro_locbaro_unitsbaro_corthermo_mountSST_I
0100200180003Panay780111187...231190214NaNBulkhead of cabin1- .1022NaN
1100200180003Panay780111187...231190214NaNBulkhead of cabin1- .1022NaN
2100200180003Panay780111187...231190214NaNBulkhead of cabin1- .1022NaN
3100200180003Panay780111187...231190214NaNBulkhead of cabin1- .1022NaN
4100200180003Panay780111187...231190214NaNBulkhead of cabin1- .1022NaN
\n", - "

5 rows × 24 columns

\n", - "
" + "shape: (5, 401)
indexcore:YRcore:MOcore:DYcore:HRcore:LATcore:LONcore:IMcore:ATTCcore:TIcore:LIcore:DScore:VScore:NIDcore:IIcore:IDcore:C1core:DIcore:Dcore:WIcore:Wcore:VIcore:VVcore:WWcore:W1core:SLPcore:Acore:PPPcore:ITcore:ATcore:WBTIcore:WBTcore:DPTIcore:DPTcore:SIcore:SSTcore:Nc99_data4:wet_bulb_temperaturec99_data4:sea_temperaturec99_data4:present_weatherc99_data4:cloudsc99_data4:sky_clearc99_data4:sea_statec99_data5:sentinalc99_data5:reel_noc99_data5:journal_noc99_data5:frame_startc99_data5:framec99_data5:yearc99_data5:monthc99_data5:dayc99_data5:time_indc99_data5:hourc99_data5:ship_speedc99_data5:compass_indc99_data5:ship_course_compassc99_data5:blankc99_data5:ship_course_truec99_data5:wind_dir_magc99_data5:wind_dir_truec99_data5:wind_forcec99_data5:barometerc99_data5:temp_indc99_data5:attached_thermometerc99_data5:air_temperaturec99_data5:wet_bulb_temperaturec99_data5:sea_temperaturec99_data5:present_weatherc99_data5:cloudsc99_data5:sky_clearc99_data5:sea_statec99_data5:compass_correction_indc99_data5:compass_correctionc99_data5:compass_correction_dir
u32i64i64i64f64f64f64stri64strstrstrstrstrstrstrstrstri64strf64strstrstrstrf64strf64strf64strf64strf64strf64i64f64f64strstri64strstrstrstrstrstri64i64i64stri64f64strstrstrstrstrstrstrstrstrf64f64f64f64strstri64strstrf64str
0187810206.042.28291.59"1"3"0""6""2""3"null"10""Panay"null"1"232"5"12.3nullnullnullnull996.1nullnullnullnullnullnullnullnullnullnull4nullnull"BOC""CU"5"R"nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
1187810208.042.31291.97"1"3"0""6""2""3"null"10""Panay"null"1"232"5"12.3nullnullnullnull996.3nullnullnullnullnullnullnullnullnullnull6nullnull"BOC""SC"3"R"nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
21878102010.042.33292.36"1"3"0""6""2""3"null"10""Panay"null"1"254"5"12.3nullnullnullnull996.9nullnull"7"8.9nullnullnullnull"1"11.18null5.2"OCG""SC"0"R"nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
31878102012.042.35292.71"1"3"0""6""2""3"null"10""Panay"null"1"254"5"12.3nullnullnullnull997.6nullnull"7"8.9nullnullnullnull"1"11.18null5.2"CG""SC"0"R"nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
41878102014.042.37293.1"1"3"0""6""2""3"null"10""Panay"null"1"254"5"12.3nullnullnullnull999.2nullnull"7"8.9nullnullnullnull"1"10.06null5.0"BC""SC"2"L"nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
" ], "text/plain": [ - " sentinal reel_no journal_no frame_no ship_name journal_ed rig ship_material \\\n", - "0 1 002 0018 0003 Panay 78 01 1 \n", - "1 1 002 0018 0003 Panay 78 01 1 \n", - "2 1 002 0018 0003 Panay 78 01 1 \n", - "3 1 002 0018 0003 Panay 78 01 1 \n", - "4 1 002 0018 0003 Panay 78 01 1 \n", - "\n", - " vessel_type vessel_length ... hold_depth tonnage baro_type baro_height \\\n", - "0 1 187 ... 23 1190 2 14 \n", - "1 1 187 ... 23 1190 2 14 \n", - "2 1 187 ... 23 1190 2 14 \n", - "3 1 187 ... 23 1190 2 14 \n", - "4 1 187 ... 23 1190 2 14 \n", - "\n", - " baro_cdate baro_loc baro_units baro_cor thermo_mount SST_I \n", - "0 NaN Bulkhead of cabin 1 - .102 2 NaN \n", - "1 NaN Bulkhead of cabin 1 - .102 2 NaN \n", - "2 NaN Bulkhead of cabin 1 - .102 2 NaN \n", - "3 NaN Bulkhead of cabin 1 - .102 2 NaN \n", - "4 NaN Bulkhead of cabin 1 - .102 2 NaN \n", - "\n", - "[5 rows x 24 columns]" + "shape: (5, 401)\n", + "┌───────┬─────────┬─────────┬─────────┬───┬──────────────┬─────────────┬─────────────┬─────────────┐\n", + "│ index ┆ core:YR ┆ core:MO ┆ core:DY ┆ … ┆ c99_data5:se ┆ c99_data5:c ┆ c99_data5:c ┆ c99_data5:c │\n", + "│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ a_state ┆ ompass_corr ┆ ompass_corr ┆ ompass_corr │\n", + "│ u32 ┆ i64 ┆ i64 ┆ i64 ┆ ┆ --- ┆ ection_i… ┆ ection ┆ ection_d… │\n", + "│ ┆ ┆ ┆ ┆ ┆ str ┆ --- ┆ --- ┆ --- │\n", + "│ ┆ ┆ ┆ ┆ ┆ ┆ str ┆ f64 ┆ str │\n", + "╞═══════╪═════════╪═════════╪═════════╪═══╪══════════════╪═════════════╪═════════════╪═════════════╡\n", + "│ 0 ┆ 1878 ┆ 10 ┆ 20 ┆ … ┆ null ┆ null ┆ null ┆ null │\n", + "│ 1 ┆ 1878 ┆ 10 ┆ 20 ┆ … ┆ null ┆ null ┆ null ┆ null │\n", + "│ 2 ┆ 1878 ┆ 10 ┆ 20 ┆ … ┆ null ┆ null ┆ null ┆ null │\n", + "│ 3 ┆ 1878 ┆ 10 ┆ 20 ┆ … ┆ null ┆ null ┆ null ┆ null │\n", + "│ 4 ┆ 1878 ┆ 10 ┆ 20 ┆ … ┆ null ┆ null ┆ null ┆ null │\n", + "└───────┴─────────┴─────────┴─────────┴───┴──────────────┴─────────────┴─────────────┴─────────────┘" ] }, "execution_count": 5, @@ -427,29 +263,72 @@ } ], "source": [ - "data.data.c99_journal" + "data.data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "To learn how to construct a schema or data model for a particular deck/source, visit this other [tutorial notebook](https://github.com/glamod/cdm_reader_mapper/blob/main/docs/example_notebooks/CLIWOC_datamodel.ipynb)" + "Now metadata information can be extracted as a component of the polars dataframe." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, - "outputs": [], - "source": [] + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "shape: (5, 24)
c99_journal:sentinalc99_journal:reel_noc99_journal:journal_noc99_journal:frame_noc99_journal:ship_namec99_journal:journal_edc99_journal:rigc99_journal:ship_materialc99_journal:vessel_typec99_journal:vessel_lengthc99_journal:vessel_beamc99_journal:commanderc99_journal:countryc99_journal:screw_paddlec99_journal:hold_depthc99_journal:tonnagec99_journal:baro_typec99_journal:baro_heightc99_journal:baro_cdatec99_journal:baro_locc99_journal:baro_unitsc99_journal:baro_corc99_journal:thermo_mountc99_journal:SST_I
strstrstrstrstrstrstrstrstri64i64strstrstri64i64stri64strstrstrstrstrstr
"1""002""0018""0003""Panay""78""01""1""1"18737"S.P.Bray,Jr""01""3"231190"2"14null"Bulkhead of cabin""1""- .102""2"null
"1""002""0018""0003""Panay""78""01""1""1"18737"S.P.Bray,Jr""01""3"231190"2"14null"Bulkhead of cabin""1""- .102""2"null
"1""002""0018""0003""Panay""78""01""1""1"18737"S.P.Bray,Jr""01""3"231190"2"14null"Bulkhead of cabin""1""- .102""2"null
"1""002""0018""0003""Panay""78""01""1""1"18737"S.P.Bray,Jr""01""3"231190"2"14null"Bulkhead of cabin""1""- .102""2"null
"1""002""0018""0003""Panay""78""01""1""1"18737"S.P.Bray,Jr""01""3"231190"2"14null"Bulkhead of cabin""1""- .102""2"null
" + ], + "text/plain": [ + "shape: (5, 24)\n", + "┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐\n", + "│ c99_journ ┆ c99_journ ┆ c99_journ ┆ c99_journ ┆ … ┆ c99_journ ┆ c99_journ ┆ c99_journ ┆ c99_jour │\n", + "│ al:sentin ┆ al:reel_n ┆ al:journa ┆ al:frame_ ┆ ┆ al:baro_u ┆ al:baro_c ┆ al:thermo ┆ nal:SST_ │\n", + "│ al ┆ o ┆ l_no ┆ no ┆ ┆ nits ┆ or ┆ _mount ┆ I │\n", + "│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │\n", + "│ str ┆ str ┆ str ┆ str ┆ ┆ str ┆ str ┆ str ┆ str │\n", + "╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡\n", + "│ 1 ┆ 002 ┆ 0018 ┆ 0003 ┆ … ┆ 1 ┆ - .102 ┆ 2 ┆ null │\n", + "│ 1 ┆ 002 ┆ 0018 ┆ 0003 ┆ … ┆ 1 ┆ - .102 ┆ 2 ┆ null │\n", + "│ 1 ┆ 002 ┆ 0018 ┆ 0003 ┆ … ┆ 1 ┆ - .102 ┆ 2 ┆ null │\n", + "│ 1 ┆ 002 ┆ 0018 ┆ 0003 ┆ … ┆ 1 ┆ - .102 ┆ 2 ┆ null │\n", + "│ 1 ┆ 002 ┆ 0018 ┆ 0003 ┆ … ┆ 1 ┆ - .102 ┆ 2 ┆ null │\n", + "└───────────┴───────────┴───────────┴───────────┴───┴───────────┴───────────┴───────────┴──────────┘" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "data.data.select(pl.col(\"^c99_journal:.*$\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To learn how to construct a schema or data model for a particular deck/source, visit this other [tutorial notebook](https://github.com/glamod/cdm_reader_mapper/blob/main/docs/example_notebooks/CLIWOC_datamodel.ipynb)" + ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "cdm", "language": "python", - "name": "python3" + "name": "cdm" }, "language_info": { "codemirror_mode": { @@ -461,7 +340,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.12.8" } }, "nbformat": 4, From bfa2bbdb9d082f9e0abaca2432748157eaf8a2e6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 30 Jan 2025 08:48:35 +0000 Subject: [PATCH 19/46] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- cdm_reader_mapper/mdf_reader/properties.py | 3 ++- cdm_reader_mapper/mdf_reader/schemas/schemas.py | 3 ++- cdm_reader_mapper/properties.py | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/properties.py b/cdm_reader_mapper/mdf_reader/properties.py index c66548ff..f6f491be 100755 --- a/cdm_reader_mapper/mdf_reader/properties.py +++ b/cdm_reader_mapper/mdf_reader/properties.py @@ -2,9 +2,10 @@ from __future__ import annotations -from ..properties import numeric_types, object_types, supported_data_models # noqa import polars as pl +from ..properties import numeric_types, object_types, supported_data_models # noqa + _base = "cdm_reader_mapper.mdf_reader" open_file = { diff --git a/cdm_reader_mapper/mdf_reader/schemas/schemas.py b/cdm_reader_mapper/mdf_reader/schemas/schemas.py index 38e50ad5..5e87c047 100755 --- a/cdm_reader_mapper/mdf_reader/schemas/schemas.py +++ b/cdm_reader_mapper/mdf_reader/schemas/schemas.py @@ -11,9 +11,10 @@ import logging import os -import polars as pl from pathlib import Path +import polars as pl + from cdm_reader_mapper.common.json_dict import collect_json_files, combine_dicts from .. import properties diff --git a/cdm_reader_mapper/properties.py b/cdm_reader_mapper/properties.py index 713143b6..2ecaef89 100755 --- a/cdm_reader_mapper/properties.py +++ b/cdm_reader_mapper/properties.py @@ -1,6 +1,7 @@ """Common Data Model (CDM) reader and mapper common properties.""" from __future__ import annotations + import polars as pl # numeric_types = ["Int64", "int", "float"] From 25cd538631d7885af51a79771e66be86a6ccca56 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 30 Jan 2025 08:57:33 +0000 Subject: [PATCH 20/46] fix: simplify convert_dtype_to_default --- .../mdf_reader/schemas/schemas.py | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/schemas/schemas.py b/cdm_reader_mapper/mdf_reader/schemas/schemas.py index 5e87c047..d4cea053 100755 --- a/cdm_reader_mapper/mdf_reader/schemas/schemas.py +++ b/cdm_reader_mapper/mdf_reader/schemas/schemas.py @@ -20,37 +20,46 @@ from .. import properties +def _get_type_map(): + return { + "float": pl.Float64, + "float64": pl.Float64, + "float32": pl.Float32, + "int": pl.Int64, + "int64": pl.Int64, + "int32": pl.Int32, + "int16": pl.Int16, + "int8": pl.Int8, + "uint": pl.UInt64, + "uint64": pl.UInt64, + "uint32": pl.UInt32, + "uint16": pl.UInt16, + "uint8": pl.UInt8, + "str": pl.String, + "object": pl.String, + "key": pl.String, + "date": pl.Datetime, + "datetime": pl.Datetime, + "time": pl.Datetime, + } + + def convert_dtype_to_default(dtype, section, element): """Convert data type to defaults (int, float).""" if dtype is None: return # pl.String - # TODO: replace with match-case statement? + dtype_map = _get_type_map() dtype = dtype.lower() - if dtype == "float" or dtype == "float64": - return pl.Float64 - elif dtype == "float32": - return pl.Float32 - elif "float" in dtype: + + polars_dtype = dtype_map.get(dtype) + if polars_dtype is not None: + return polars_dtype + + if "float" in dtype: logging.warning( f"Set column type of ({section}, {element}) from deprecated {dtype} to float 32." ) return pl.Float32 - elif dtype == "int" or dtype == "int64": - return pl.Int64 - elif dtype == "int32": - return pl.Int32 - elif dtype == "int16": - return pl.Int16 - elif dtype == "int8": - return pl.Int8 - elif dtype == "uint" or dtype == "uint64": - return pl.UInt64 - elif dtype == "uint32": - return pl.Int32 - elif dtype == "uint16": - return pl.Int16 - elif dtype == "uint8": - return pl.Int8 elif "uint" in dtype: logging.warning( f"Set column type of ({section}, {element}) from deprecated {dtype} to uint 32." @@ -61,10 +70,6 @@ def convert_dtype_to_default(dtype, section, element): f"Set column type of ({section}, {element}) from deprecated {dtype} to int 32." ) return pl.Int32 - elif dtype in ["datetime", "time", "date"]: - return pl.Datetime - elif dtype == "key" or dtype == "str" or dtype == "object": - return pl.String return pl.String From 8df537001cd8ad33e80b5a7e2e13c2cb43614203 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 30 Jan 2025 09:01:04 +0000 Subject: [PATCH 21/46] fix: replace todo comment with note --- cdm_reader_mapper/mdf_reader/utils/filereader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index aea697a8..a353862b 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -245,9 +245,9 @@ def open_data( """DOCUMENTATION.""" if open_with == "netcdf": TextParser = self._read_netcdf() - # TODO: Chunk? polars does have pl.read_csv_batched, but batch_size is - # not respected: https://github.com/pola-rs/polars/issues/19978 - # alternative: lazy? + # NOTE: Chunking - polars does have pl.read_csv_batched, but batch_size + # is not respected: https://github.com/pola-rs/polars/issues/19978 + # alternative: lazy? elif open_with == "polars": TextParser = self._read_fwf_polars( encoding=self.schema["header"].get("encoding"), From a96ef2bf077a49bea68f0528ae2e5344e75bcc5c Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 30 Jan 2025 09:08:09 +0000 Subject: [PATCH 22/46] fix: ensure output polars frame has index when reading sections from netCDF --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index d8849276..0850ba9b 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -396,4 +396,4 @@ def replace_empty_strings(series): df[column] = np.nan df = df.apply(lambda x: replace_empty_strings(x)) df["missing_values"] = [missing_values] * len(df) - return pl.from_pandas(df) + return pl.from_pandas(df).with_row_index("index") From b1e7f5caacf7875065d94eb4a28da76cb813fdd2 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 30 Jan 2025 09:12:12 +0000 Subject: [PATCH 23/46] chore: ruff linter fixes --- cdm_reader_mapper/mdf_reader/read.py | 1 - cdm_reader_mapper/mdf_reader/utils/converters.py | 2 -- cdm_reader_mapper/mdf_reader/utils/filereader.py | 8 ++------ 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/read.py b/cdm_reader_mapper/mdf_reader/read.py index d7213515..f2b348eb 100755 --- a/cdm_reader_mapper/mdf_reader/read.py +++ b/cdm_reader_mapper/mdf_reader/read.py @@ -7,7 +7,6 @@ from io import StringIO as StringIO import pandas as pd -import polars as pl from cdm_reader_mapper.common.json_dict import open_json_file from cdm_reader_mapper.common.pandas_TextParser_hdlr import make_copy diff --git a/cdm_reader_mapper/mdf_reader/utils/converters.py b/cdm_reader_mapper/mdf_reader/utils/converters.py index ee3ebc80..86cf554c 100755 --- a/cdm_reader_mapper/mdf_reader/utils/converters.py +++ b/cdm_reader_mapper/mdf_reader/utils/converters.py @@ -4,8 +4,6 @@ import logging -import numpy as np -import pandas as pd import polars as pl from .. import properties diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index a353862b..f3f842db 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -2,14 +2,10 @@ from __future__ import annotations -import csv import logging import os from copy import deepcopy -from io import StringIO -import numpy as np -import pandas as pd import polars as pl import xarray as xr @@ -129,8 +125,8 @@ def _select_years(self, df: pl.DataFrame) -> pl.DataFrame: # def _read_fwf_polars(self, **kwargs): if "chunksize" in kwargs: - logging.warn("Chunking not supported by polars reader") - batch_size = kwargs["chunksize"] + logging.warning("Chunking not supported by polars reader") + # batch_size = kwargs["chunksize"] del kwargs["chunksize"] # return pl.read_csv_batched( # self.source, From 1f65f8c45ceb1aab6569f8858334e60a96323e61 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 30 Jan 2025 09:36:49 +0000 Subject: [PATCH 24/46] fix: get fields after checking if disable_read is True --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 0850ba9b..0ad911e0 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -159,7 +159,6 @@ def open_polars(self) -> pl.DataFrame: sentinal = header.get("sentinal") section_length = header.get("length", properties.MAX_FULL_REPORT_WIDTH) - fields = self.schema["sections"][section]["elements"] # Get data associated with current section if sentinal is not None: @@ -192,6 +191,7 @@ def open_polars(self) -> pl.DataFrame: if disable_read is True: continue + fields = self.schema["sections"][section]["elements"] field_layout = header.get("field_layout") delimiter = header.get("delimiter") From 1854b58fdf450917710b80fdbb0361ead4a51375 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 31 Jan 2025 07:45:16 +0000 Subject: [PATCH 25/46] tool: add ruff formatting settings to match black --- pyproject.toml | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8372af3a..013a8fe9 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -175,14 +175,10 @@ python_files = "test_*.py" testpaths = ["./tests"] [tool.ruff] -src = ["xclim"] -line-length = 150 -target-version = "py39" -exclude = [ - ".git", - "build", - ".eggs" -] +line-length = 88 +indent-width = 4 +target-version = "py311" +exclude = [".eggs", ".git", ".venv", "build", "venv"] extend-include = [ "*.ipynb" # Include notebooks ] From 7464d03f4e0b957b93dd86049cfd96f142523a1d Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 31 Jan 2025 09:03:06 +0000 Subject: [PATCH 26/46] opt: use str.head/str.tail rather than str.slice --- .../mdf_reader/utils/configurator.py | 28 ++++++++++++------- .../mdf_reader/utils/filereader.py | 7 ++++- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 0ad911e0..ebadece1 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -166,26 +166,32 @@ def open_polars(self) -> pl.DataFrame: [ ( pl.when(pl.col("full_str").str.starts_with(sentinal)) - .then(pl.col("full_str").str.slice(0, section_length)) + .then(pl.col("full_str").str.head(section_length)) .otherwise(pl.lit(None)) .alias(section) ), ( pl.when(pl.col("full_str").str.starts_with(sentinal)) - .then(pl.col("full_str").str.slice(section_length)) + .then(pl.col("full_str").str.tail(-section_length)) .otherwise(pl.col("full_str")) .alias("full_str") ), ] ) else: + # Sentinal is None, the section is always present self.df = self.df.with_columns( [ - pl.col("full_str").str.slice(0, section_length).alias(section), - pl.col("full_str").str.slice(section_length).alias("full_str"), + pl.col("full_str").str.head(section_length).alias(section), + pl.col("full_str").str.tail(-section_length).alias("full_str"), ] ) + # Used for validation + self.df = self.df.with_columns( + pl.col(section).is_null().alias(f"_{section}_missing") + ) + # Don't read fields disable_read = header.get("disable_read", False) if disable_read is True: @@ -248,12 +254,14 @@ def open_polars(self) -> pl.DataFrame: self.df = self.df.with_columns( [ # If section not present in a row, then both these are null - pl.col(section) - .str.slice(0, field_length) - .str.strip_chars(" ") - .replace(missing_map) - .alias(index), - pl.col(section).str.slice(field_length).name.keep(), + ( + pl.col(section) + .str.head(field_length) + .str.strip_chars(" ") + .replace(missing_map) + .alias(index) + ), + pl.col(section).str.tail(-field_length).name.keep(), ( # Handle missing sections pl.when(pl.col(section).is_null()) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index f3f842db..508cedf7 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -120,6 +120,7 @@ def _select_years(self, df: pl.DataFrame) -> pl.DataFrame: # escapechar="\0", # dtype=object, # skip_blank_lines=False, + # widths=[properties.MAX_FULL_REPORT_WIDTH], # **kwargs, # ) # @@ -146,6 +147,8 @@ def _read_fwf_polars(self, **kwargs): quote_char="\0", infer_schema=False, **kwargs, + ).select( + pl.col("full_str").str.head(properties.MAX_FULL_REPORT_WIDTH).name.keep() ) def _read_netcdf(self, **kwargs): @@ -165,7 +168,9 @@ def _read_sections( df=TextParser, schema=self.schema, order=order, valid=valid ).open_polars() # elif open_with == "pandas": - # df = Configurator(df=TextParser, schema=self.schema, order=order, valid=valid).open_pandas() + # df = Configurator( + # df=TextParser, schema=self.schema, order=order, valid=valid + # ).open_pandas() elif open_with == "netcdf": df = Configurator( df=TextParser, schema=self.schema, order=order, valid=valid From 9af7950d0528ec0938d7a8126359451a420e21f6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 31 Jan 2025 09:35:21 +0000 Subject: [PATCH 27/46] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- cdm_reader_mapper/mdf_reader/utils/filereader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 0a15f256..01bb21e7 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -15,8 +15,8 @@ from .utilities import ( convert_entries, decode_entries, - validate_path, set_missing_values, + validate_path, ) from .validators import validate From d662b84b62c56c421d880b010cd1f89f228a7205 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 31 Jan 2025 09:43:25 +0000 Subject: [PATCH 28/46] chore: remove unused function --- cdm_reader_mapper/mdf_reader/utils/filereader.py | 1 - cdm_reader_mapper/mdf_reader/utils/utilities.py | 11 ----------- 2 files changed, 12 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 01bb21e7..49533d42 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -15,7 +15,6 @@ from .utilities import ( convert_entries, decode_entries, - set_missing_values, validate_path, ) from .validators import validate diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index 8c64bdd8..1c0fbd1d 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -74,17 +74,6 @@ def decode_entries(series, decoder_func): return decoder_func(series) -def set_missing_values(df: pl.DataFrame) -> pl.DataFrame: - """DOCUMENTATION.""" - # QUESTION: Do I need to re-order the columns here? - return ( - df.explode(columns="missing_values") - .with_columns(pl.lit(True).alias("values")) - .pivot("missing_values", index="index", values="values") - .fill_null(False) - ) - - def adjust_dtype(dtype, df): """DOCUMENTATION.""" if not isinstance(dtype, dict): From a7b4a8780cd71115ff5c2a79f2c23bb1820a98ef Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 31 Jan 2025 11:55:09 +0000 Subject: [PATCH 29/46] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- cdm_reader_mapper/mdf_reader/utils/filereader.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 49533d42..1fe397a6 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -12,11 +12,7 @@ from .. import properties from ..schemas import schemas from .configurator import Configurator -from .utilities import ( - convert_entries, - decode_entries, - validate_path, -) +from .utilities import convert_entries, decode_entries, validate_path from .validators import validate From 3b18679db548f2676f613dd8bf89b1126f51fe4b Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 27 Mar 2025 14:35:09 +0000 Subject: [PATCH 30/46] refactor: use pandas to read, open_with -> format with 'text', 'netcdf' options --- cdm_reader_mapper/mdf_reader/reader.py | 9 +- .../mdf_reader/utils/filereader.py | 87 +++++++------------ 2 files changed, 33 insertions(+), 63 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index 655c697d..fc2f34dd 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -244,6 +244,7 @@ def read( self.chunksize = chunksize self.skiprows = skiprows + self.format = properties.open_file.get(self.imodel, "text") # 2. READ AND VALIDATE DATA logging.info(f"EXTRACTING DATA FROM MODEL: {self.imodel}") @@ -258,12 +259,10 @@ def read( # a list with a single dataframe or a pd.io.parsers.TextFileReader logging.info("Getting data string from source...") self.configurations = self.get_configurations(read_sections_list, sections) + data = self.open_data( - read_sections_list, - sections, - # INFO: Set default as "pandas" to account for custom schema - # open_with=properties.open_file.get(self.imodel, "pandas"), - open_with=properties.open_file.get(self.imodel, "polars"), + chunksize=chunksize, + format=self.format, ) # 2.3. Extract, read and validate data in same loop diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index b84f6f2b..622ca435 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -7,6 +7,7 @@ from copy import deepcopy import polars as pl +import pandas as pd import xarray as xr from .. import properties @@ -104,43 +105,16 @@ def _select_years(self, df: pl.DataFrame) -> pl.DataFrame: return df.filter(mask) - # def _read_pandas(self, **kwargs): - # return pd.read_fwf( - # self.source, - # header=None, - # quotechar="\0", - # escapechar="\0", - # dtype=object, - # skip_blank_lines=False, - # widths=[properties.MAX_FULL_REPORT_WIDTH], - # **kwargs, - # ) - # - def _read_fwf_polars(self, **kwargs): - if "chunksize" in kwargs: - logging.warning("Chunking not supported by polars reader") - # batch_size = kwargs["chunksize"] - del kwargs["chunksize"] - # return pl.read_csv_batched( - # self.source, - # has_header=False, - # separator="\0", - # new_columns=["full_str"], - # quote_char="\0", - # infer_schema_length=0, - # batch_size=batch_size, - # **kwargs, - # ) - return pl.read_csv( + def _read_text(self, **kwargs): + return pd.read_fwf( self.source, - has_header=False, - separator="\0", - new_columns=["full_str"], - quote_char="\0", - infer_schema=False, + header=None, + quotechar="\0", + escapechar="\0", + dtype=object, + skip_blank_lines=False, + widths=[properties.MAX_FULL_REPORT_WIDTH], **kwargs, - ).select( - pl.col("full_str").str.head(properties.MAX_FULL_REPORT_WIDTH).name.keep() ) def _read_netcdf(self, **kwargs): @@ -153,30 +127,26 @@ def _read_sections( TextParser, order, valid, - open_with, + format, ): - if open_with == "polars": + if format == "text": df = Configurator( df=TextParser, schema=self.schema, order=order, valid=valid - ).open_polars() - # elif open_with == "pandas": - # df = Configurator( - # df=TextParser, schema=self.schema, order=order, valid=valid - # ).open_pandas() - elif open_with == "netcdf": + ).open_text() + elif format == "netcdf": df = Configurator( df=TextParser, schema=self.schema, order=order, valid=valid ).open_netcdf() else: - raise ValueError("open_with has to be one of ['polars', 'netcdf']") + raise ValueError("format has to be one of ['text', 'netcdf']") # missing_values = df.select(["index", "missing_values"]).pipe(set_missing_values) - df = df.drop("missing_values").pipe(self._select_years) + df = df.pipe(self._select_years) self.columns = df.columns # Replace None with NaN - is this necessary for polars? # df = df.where(df.notnull(), np.nan) - return self._select_years(df) + return df def get_configurations(self, order, valid): """DOCUMENTATION.""" @@ -190,29 +160,30 @@ def get_configurations(self, order, valid): def open_data( self, - order, - valid, - # chunksize, - open_with="polars", + chunksize, + format="text", ): """DOCUMENTATION.""" encoding = self.schema["header"].get("encoding") - if open_with == "netcdf": + if format == "netcdf": TextParser = self._read_netcdf() # NOTE: Chunking - polars does have pl.read_csv_batched, but batch_size # is not respected: https://github.com/pola-rs/polars/issues/19978 # alternative: lazy? - elif open_with == "polars": - TextParser = self._read_fwf_polars( + elif format == "text": + TextParser = self._read_text( encoding=encoding, - skip_rows=self.skiprows, - # chunksize=chunksize, + widths=[properties.MAX_FULL_REPORT_WIDTH], + skiprows=self.skiprows, + chunksize=chunksize, ) else: - raise ValueError("open_with has to be one of ['polars', 'netcdf']") + raise ValueError("format has to be one of ['text', 'netcdf']") + + return TextParser # if isinstance(TextParser, (pl.DataFrame, xr.Dataset)): - df = self._read_sections(TextParser, order, valid, open_with=open_with) - return df + # df, mask_df = self._read_sections(TextParser, order, valid, open_with=open_with) + # return df, mask_df # else: # data_buffer = StringIO() # for i, df_ in enumerate(TextParser): From fb757799c8bba66f04d96d3c78472e6dde8e9faf Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 27 Mar 2025 14:37:21 +0000 Subject: [PATCH 31/46] refactor: Configurator open_ methods now return two polars Frames. Rename open_polars -> open_text. --- .../mdf_reader/utils/configurator.py | 161 +++++------------- .../mdf_reader/utils/filereader.py | 6 +- 2 files changed, 50 insertions(+), 117 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 83815b53..d2063948 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -3,12 +3,11 @@ from __future__ import annotations import ast -import csv -import logging -import numpy as np import pandas as pd import polars as pl +import polars.selectors as cs +import xarray as xr from .. import properties from . import converters, decoders @@ -20,7 +19,7 @@ class Configurator: def __init__( self, - df=pd.DataFrame(), + df: pd.DataFrame | pl.DataFrame | xr.Dataset = pl.DataFrame(), schema=None, order=None, valid=None, @@ -138,11 +137,14 @@ def get_configuration(self): }, } - def open_polars(self) -> pl.DataFrame: - """Open TextParser to a pl.DataFrame""" - self.df = self.df.with_columns( - pl.lit([]).alias("missing_values") - ).with_row_index("index") + def open_text(self): + """Open TextParser to a polars.DataFrame""" + if isinstance(self.df, pd.DataFrame): + self.df = pl.from_pandas(self.df) + if not isinstance(self.df, pl.DataFrame): + raise TypeError(f"Cannot open with polars for {type(self.df) = }") + self.df = self.df.with_row_index("index") + mask_df = pl.DataFrame() for section in self.orders: header = self.schema["sections"][section]["header"] @@ -178,9 +180,7 @@ def open_polars(self) -> pl.DataFrame: ) # Used for validation - self.df = self.df.with_columns( - pl.col(section).is_null().alias(f"_{section}_missing") - ) + section_missing = self.df.get_column(section).is_null() # Don't read fields disable_read = header.get("disable_read", False) @@ -209,6 +209,12 @@ def open_polars(self) -> pl.DataFrame: self.df = self.df.with_columns( pl.col(field).str.strip_chars(" ").name.keep() ) + mask_df = mask_df.with_columns( + ( + section_missing + | self.df.get_column(field).is_not_null() + ).alias(field) + ) continue elif field_layout != "fixed_width": @@ -252,15 +258,13 @@ def open_polars(self) -> pl.DataFrame: .alias(index) ), pl.col(section).str.tail(-field_length).name.keep(), - ( - # Handle missing sections - pl.when(pl.col(section).is_null()) - .then(pl.col("missing_values").list.concat(pl.lit([index]))) - .otherwise(pl.col("missing_values")) - .alias("missing_values") - ), ] ) + mask_df = mask_df.with_columns( + (section_missing | self.df.get_column(field).is_not_null()).alias( + field + ) + ) if delimiter is not None: self.df = self.df.with_columns( pl.col(section).str.strip_prefix(delimiter).name.keep() @@ -268,94 +272,12 @@ def open_polars(self) -> pl.DataFrame: self.df = self.df.drop([section]) - return self.df.drop("full_str") - - def open_pandas(self): - """Open TextParser to pd.DataSeries.""" - return self.df.apply(lambda x: self._read_line(x[0]), axis=1) - - def _read_line(self, line: str): - i = j = 0 - data_dict = {} - for order in self.orders: - header = self.schema["sections"][order]["header"] - - disable_read = header.get("disable_read") - if disable_read is True: - data_dict[order] = line[i : properties.MAX_FULL_REPORT_WIDTH] - continue - - sentinal = header.get("sentinal") - bad_sentinal = sentinal is not None and not self._validate_sentinal( - i, line, sentinal - ) - - section_length = header.get("length", properties.MAX_FULL_REPORT_WIDTH) - sections = self.schema["sections"][order]["elements"] - - field_layout = header.get("field_layout") - delimiter = header.get("delimiter") - if delimiter is not None: - delimiter_format = header.get("format") - if delimiter_format == "delimited": - # Read as CSV - field_names = sections.keys() - fields = list(csv.reader([line[i:]], delimiter=delimiter))[0] - for field_name, field in zip(field_names, fields): - index = self._get_index(field_name, order) - data_dict[index] = field.strip() - i += len(field) - j = i - continue - elif field_layout != "fixed_width": - logging.error( - f"Delimiter for {order} is set to {delimiter}. Please specify either format or field_layout in your header schema {header}." - ) - return - - k = i + section_length - for section, section_dict in sections.items(): - missing = True - index = self._get_index(section, order) - ignore = (order not in self.valid) or self._get_ignore(section_dict) - na_value = section_dict.get("missing_value") - field_length = section_dict.get( - "field_length", properties.MAX_FULL_REPORT_WIDTH - ) - - j = (i + field_length) if not bad_sentinal else i - if j > k: - missing = False - j = k - - if ignore is not True: - value = line[i:j] - - if not value.strip(): - value = True - if value == na_value: - value = True - - if i == j and missing is True: - value = False - - data_dict[index] = value - - if delimiter is not None and line[j : j + len(delimiter)] == delimiter: - j += len(delimiter) - i = j - - return pd.Series(data_dict) + return self.df.drop("full_str"), mask_df.with_row_index("index") def open_netcdf(self): - """Open netCDF to pd.Series.""" - - def replace_empty_strings(series): - if series.dtype == "object": - series = series.str.decode("utf-8") - series = series.str.strip() - series = series.map(lambda x: True if x == "" else x) - return series + """Open netCDF to polars.DataFrame.""" + if not isinstance(self.df, xr.Dataset): + raise TypeError(f"Cannot open with netCDF for {type(self.df) = }") missing_values = [] attrs = {} @@ -382,15 +304,26 @@ def replace_empty_strings(series): elif section in self.df.dims: renames[section] = index elif section in self.df.attrs: - attrs[index] = self.df.attrs[index] + # Initialise a constant column + attrs[index] = pl.lit(self.df.attrs[section].replace("\n", "; ")) else: missing_values.append(index) - df = self.df[renames.keys()].to_dataframe().reset_index() - attrs = {k: v.replace("\n", "; ") for k, v in attrs.items()} - df = df.rename(columns=renames) - df = df.assign(**attrs) - df[disables] = np.nan - df = df.apply(lambda x: replace_empty_strings(x)) - df[missing_values] = False - return pl.from_pandas(df).with_row_index("index") + df: pl.DataFrame = pl.from_pandas( + self.df[renames.keys()].to_dataframe().reset_index() + ).with_row_index("index") + df = df.rename(mapping=renames) + df = df.with_columns(**attrs) + df = df.with_columns( + [pl.lit(None).alias(missing) for missing in missing_values] + ) + df = df.with_columns([pl.lit(None).alias(disable) for disable in disables]) + # Replace empty or whitespace string with None + df = df.with_columns(cs.string().str.strip_chars().replace("", None)) + + # Create missing mask + mask_df = df.select(pl.all().is_not_null()) + mask_df = mask_df.with_columns( + [pl.lit(True).alias(c) for c in missing_values + disables] + ) + return df, mask_df.with_row_index("index") diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 622ca435..00f0b9a3 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -130,11 +130,11 @@ def _read_sections( format, ): if format == "text": - df = Configurator( + df, mask_df = Configurator( df=TextParser, schema=self.schema, order=order, valid=valid ).open_text() elif format == "netcdf": - df = Configurator( + df, mask_df = Configurator( df=TextParser, schema=self.schema, order=order, valid=valid ).open_netcdf() else: @@ -146,7 +146,7 @@ def _read_sections( self.columns = df.columns # Replace None with NaN - is this necessary for polars? # df = df.where(df.notnull(), np.nan) - return df + return df, mask_df def get_configurations(self, order, valid): """DOCUMENTATION.""" From 60e45e019d5b4dcc687c6fd21c8cd834dac56f92 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 27 Mar 2025 14:39:19 +0000 Subject: [PATCH 32/46] refactor: perform all read steps in one loop rather than repeatedly save/write out to StringIO --- cdm_reader_mapper/mdf_reader/reader.py | 112 ++++++++++++++++++------- 1 file changed, 82 insertions(+), 30 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index fc2f34dd..d885d32b 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -10,6 +10,7 @@ import pandas as pd import polars as pl +import xarray as xr from cdm_reader_mapper.common.json_dict import open_json_file from cdm_reader_mapper.common.pandas_TextParser_hdlr import make_copy @@ -61,7 +62,7 @@ def _convert_and_decode( df = df.with_columns(converted.alias(section)) return df - def _validate(self, df): + def _validate(self, df) -> pl.DataFrame: """DOCUMENTATION.""" return validate( data=df, @@ -125,33 +126,17 @@ def convert_and_decode_entries( ) return data - def validate_entries(self, data, validate): + def validate_entries(self, data, validate) -> pl.DataFrame: """Validate data entries by using a pre-defined data model. Fill attribute `valid` with boolean mask. """ if validate is not True: - mask = pd.DataFrame() - elif isinstance(data, pd.DataFrame): + mask = pl.DataFrame() + elif isinstance(data, pl.DataFrame): mask = self._validate(data) else: - data_buffer = StringIO() - TextParser_ = make_copy(data) - for i, df_ in enumerate(TextParser_): - mask_ = self._validate(df_) - mask_.to_csv( - data_buffer, - header=False, - mode="a", - encoding=self.encoding, - index=False, - ) - data_buffer.seek(0) - mask = pd.read_csv( - data_buffer, - names=df_.columns, - chunksize=self.chunksize, - ) + raise TypeError("Unknown data type") return mask def remove_boolean_values(self, data): @@ -260,23 +245,70 @@ def read( logging.info("Getting data string from source...") self.configurations = self.get_configurations(read_sections_list, sections) - data = self.open_data( + TextParser = self.open_data( chunksize=chunksize, format=self.format, ) # 2.3. Extract, read and validate data in same loop - logging.info("Extracting and reading sections") - data = self.convert_and_decode_entries( - data, - convert=convert, - decode=decode, - ) - mask = self.validate_entries(data, validate) + if isinstance(TextParser, (pd.DataFrame, xr.Dataset)): + data, mask = self._read_loop( + TextParser, read_sections_list, sections, decode, convert, validate + ) + else: + data_buffer = StringIO() + mask_buffer = StringIO() + for df_ in TextParser: + df, mask_ = self._read_loop( + df_, read_sections_list, sections, decode, convert, validate + ) + df.to_csv( + data_buffer, + header=False, + mode="a", + encoding="utf-8", + index=False, + quoting=csv.QUOTE_NONE, + sep=properties.internal_delimiter, + quotechar="\0", + escapechar="\0", + ) + mask_.to_csv( + mask_buffer, + header=False, + mode="a", + encoding="utf-8", + index=False, + quoting=csv.QUOTE_NONE, + sep=properties.internal_delimiter, + quotechar="\0", + escapechar="\0", + ) + data_buffer.seek(0) + data = pd.read_csv( + data_buffer, + names=self.columns, + chunksize=self.chunksize, + dtype=object, + parse_dates=self.parse_dates, + delimiter=properties.internal_delimiter, + quotechar="\0", + escapechar="\0", + ) + mask_buffer.seek(0) + mask = pd.read_csv( + mask_buffer, + names=self.columns, + chunksize=self.chunksize, + dtype=object, + parse_dates=self.parse_dates, + delimiter=properties.internal_delimiter, + quotechar="\0", + escapechar="\0", + ) # 3. Create output DataBundle object logging.info("Create output DataBundle object") - data = self.remove_boolean_values(data) return DataBundle( data=data, columns=self.columns, @@ -287,6 +319,26 @@ def read( imodel=self.imodel, ) + def _read_loop( + self, TextParser, order, valid, decode, convert, validate + ) -> tuple[pd.DataFrame, pd.DataFrame]: + logging.info("Extracting and reading sections") + data, mask = self._read_sections( + pl.from_pandas(TextParser), order, valid, format=self.format + ) + + logging.info("Decoding and converting entries") + data = self.convert_and_decode_entries( + data, + convert=convert, + decode=decode, + ) + + logging.info("Extracting and reading sections") + mask = self.validate_entries(data, validate) + + return data.to_pandas(), mask.to_pandas() + def read_mdf( source, From d84e119b6dc27ed8d999beff263cb6ac265f69ec Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 27 Mar 2025 14:46:00 +0000 Subject: [PATCH 33/46] fix: remove duplicate "widths" argument being passed to _read_text method --- cdm_reader_mapper/mdf_reader/utils/filereader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 00f0b9a3..91312125 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -173,7 +173,6 @@ def open_data( elif format == "text": TextParser = self._read_text( encoding=encoding, - widths=[properties.MAX_FULL_REPORT_WIDTH], skiprows=self.skiprows, chunksize=chunksize, ) From b738f2c25008a96ea510c29a4d8c3997a6641c39 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Thu, 27 Mar 2025 14:50:12 +0000 Subject: [PATCH 34/46] fix: set column name for full-string read by read_text --- cdm_reader_mapper/mdf_reader/utils/filereader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 91312125..afd09275 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -114,6 +114,7 @@ def _read_text(self, **kwargs): dtype=object, skip_blank_lines=False, widths=[properties.MAX_FULL_REPORT_WIDTH], + names=["full_str"], **kwargs, ) From 97e60fde9b23f988aa9af1e933fea12d109f36ef Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 09:33:21 +0000 Subject: [PATCH 35/46] fix: correct call to get field name from _get_index --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index d2063948..7eeeb45d 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -197,7 +197,7 @@ def open_text(self): if delimiter_format == "delimited": # Read as CSV field_names = fields.keys() - field_names = [self._get_index(section, x) for x in field_names] + field_names = [self._get_index(x, section) for x in field_names] n_fields = len(field_names) self.df = self.df.with_columns( pl.col(section) From b59ccd53567ea729653ada68b70834c72fc3b27c Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 09:34:16 +0000 Subject: [PATCH 36/46] fix: cast binary to string --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 7eeeb45d..35ba9d49 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -319,7 +319,9 @@ def open_netcdf(self): ) df = df.with_columns([pl.lit(None).alias(disable) for disable in disables]) # Replace empty or whitespace string with None - df = df.with_columns(cs.string().str.strip_chars().replace("", None)) + df = df.with_columns( + cs.binary().cast(pl.String).str.strip_chars().replace("", None) + ) # Create missing mask mask_df = df.select(pl.all().is_not_null()) From 219f75a3f8fdcdc886e2bc5a68a4b572f617122c Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 09:34:43 +0000 Subject: [PATCH 37/46] fix: correct column name indexing and naming for mask --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 35ba9d49..2772ec8b 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -261,8 +261,8 @@ def open_text(self): ] ) mask_df = mask_df.with_columns( - (section_missing | self.df.get_column(field).is_not_null()).alias( - field + (section_missing | self.df.get_column(index).is_not_null()).alias( + index ) ) if delimiter is not None: From e3d450cf6e93ca6b07af49a37c512a98906c9320 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 09:35:09 +0000 Subject: [PATCH 38/46] fix: drop section from data if delimited --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 2772ec8b..1e0ba124 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -215,6 +215,7 @@ def open_text(self): | self.df.get_column(field).is_not_null() ).alias(field) ) + self.df = self.df.drop([section]) continue elif field_layout != "fixed_width": From e38b9a1413b5a45a474a0231d1e0cbe38e117b80 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 09:35:32 +0000 Subject: [PATCH 39/46] fix: add row index at return --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 1e0ba124..35535a7a 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -312,7 +312,7 @@ def open_netcdf(self): df: pl.DataFrame = pl.from_pandas( self.df[renames.keys()].to_dataframe().reset_index() - ).with_row_index("index") + ) df = df.rename(mapping=renames) df = df.with_columns(**attrs) df = df.with_columns( @@ -329,4 +329,4 @@ def open_netcdf(self): mask_df = mask_df.with_columns( [pl.lit(True).alias(c) for c in missing_values + disables] ) - return df, mask_df.with_row_index("index") + return df.with_row_index("index"), mask_df.with_row_index("index") From 536406cab0fdf7adc5081a26a69c29e647486d34 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 09:36:16 +0000 Subject: [PATCH 40/46] opt: use tail rather than slice --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index 35535a7a..d7212c14 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -236,7 +236,7 @@ def open_text(self): if ignore: # Move to next field self.df = self.df.with_columns( - pl.col(section).str.slice(field_length).name.keep(), + pl.col(section).str.tail(-field_length).name.keep(), ) if delimiter is not None: self.df = self.df.with_columns( From d905c236d40a1931d9040bc546ab8716f2aac09d Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 09:36:45 +0000 Subject: [PATCH 41/46] fix: don't convert to polars in read_loop --- cdm_reader_mapper/mdf_reader/reader.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index d885d32b..0b6f1cf2 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -6,7 +6,7 @@ import csv import logging import os -from io import StringIO as StringIO +from io import StringIO import pandas as pd import polars as pl @@ -323,9 +323,7 @@ def _read_loop( self, TextParser, order, valid, decode, convert, validate ) -> tuple[pd.DataFrame, pd.DataFrame]: logging.info("Extracting and reading sections") - data, mask = self._read_sections( - pl.from_pandas(TextParser), order, valid, format=self.format - ) + data, mask = self._read_sections(TextParser, order, valid, format=self.format) logging.info("Decoding and converting entries") data = self.convert_and_decode_entries( From 2b077c5e72f0164d3480dd50835b10eda759dad1 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 13:46:31 +0000 Subject: [PATCH 42/46] fix: following polars method column name --- cdm_reader_mapper/mdf_reader/schemas/schemas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/schemas/schemas.py b/cdm_reader_mapper/mdf_reader/schemas/schemas.py index d4cea053..2afe668a 100755 --- a/cdm_reader_mapper/mdf_reader/schemas/schemas.py +++ b/cdm_reader_mapper/mdf_reader/schemas/schemas.py @@ -243,7 +243,7 @@ def clean_schema(columns, schema): def get_index(idx, lst, section): if len(lst) == 1: return idx - return (section, idx) + return ":".join([section, idx]) flat_schema = dict() for section in schema.get("sections"): From e849ff34e6fe2a6482d9d8f66fd59cefd4778a78 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 13:47:04 +0000 Subject: [PATCH 43/46] fix: don't add index to data and mask polars frames --- cdm_reader_mapper/mdf_reader/utils/configurator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/configurator.py b/cdm_reader_mapper/mdf_reader/utils/configurator.py index d7212c14..effe3fb3 100755 --- a/cdm_reader_mapper/mdf_reader/utils/configurator.py +++ b/cdm_reader_mapper/mdf_reader/utils/configurator.py @@ -143,7 +143,7 @@ def open_text(self): self.df = pl.from_pandas(self.df) if not isinstance(self.df, pl.DataFrame): raise TypeError(f"Cannot open with polars for {type(self.df) = }") - self.df = self.df.with_row_index("index") + self.df = self.df mask_df = pl.DataFrame() for section in self.orders: header = self.schema["sections"][section]["header"] @@ -273,7 +273,7 @@ def open_text(self): self.df = self.df.drop([section]) - return self.df.drop("full_str"), mask_df.with_row_index("index") + return self.df.drop("full_str"), mask_df def open_netcdf(self): """Open netCDF to polars.DataFrame.""" @@ -329,4 +329,4 @@ def open_netcdf(self): mask_df = mask_df.with_columns( [pl.lit(True).alias(c) for c in missing_values + disables] ) - return df.with_row_index("index"), mask_df.with_row_index("index") + return df, mask_df From 90fd5d93ba805dd96667bdfaf3941a6c51785a8e Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 13:47:31 +0000 Subject: [PATCH 44/46] refactor(validators)!: polarise validators, pass mask as first argument --- cdm_reader_mapper/mdf_reader/reader.py | 16 ++- .../mdf_reader/utils/validators.py | 134 ++++++++---------- 2 files changed, 74 insertions(+), 76 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index 0b6f1cf2..0a514b74 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -62,10 +62,11 @@ def _convert_and_decode( df = df.with_columns(converted.alias(section)) return df - def _validate(self, df) -> pl.DataFrame: + def _validate(self, df, mask) -> pl.DataFrame: """DOCUMENTATION.""" return validate( data=df, + mask=mask, imodel=self.imodel, ext_table_path=self.ext_table_path, schema=self.schema, @@ -126,7 +127,7 @@ def convert_and_decode_entries( ) return data - def validate_entries(self, data, validate) -> pl.DataFrame: + def validate_entries(self, data, mask, validate) -> pl.DataFrame: """Validate data entries by using a pre-defined data model. Fill attribute `valid` with boolean mask. @@ -134,7 +135,7 @@ def validate_entries(self, data, validate) -> pl.DataFrame: if validate is not True: mask = pl.DataFrame() elif isinstance(data, pl.DataFrame): - mask = self._validate(data) + mask = self._validate(data, mask) else: raise TypeError("Unknown data type") return mask @@ -333,9 +334,14 @@ def _read_loop( ) logging.info("Extracting and reading sections") - mask = self.validate_entries(data, validate) + mask = self.validate_entries(data, mask, validate) + + renames = {c: tuple(c.split(":")) for c in data.columns} - return data.to_pandas(), mask.to_pandas() + return ( + data.to_pandas().rename(columns=renames), + mask.to_pandas().rename(columns=renames), + ) def read_mdf( diff --git a/cdm_reader_mapper/mdf_reader/utils/validators.py b/cdm_reader_mapper/mdf_reader/utils/validators.py index a368cc18..ee773285 100755 --- a/cdm_reader_mapper/mdf_reader/utils/validators.py +++ b/cdm_reader_mapper/mdf_reader/utils/validators.py @@ -5,7 +5,7 @@ import logging import numpy as np -import pandas as pd +import polars as pl from .. import properties from ..codes import codes @@ -13,37 +13,28 @@ from .utilities import convert_str_boolean -def validate_datetime(elements, data): +def validate_datetime(mask, elements, data: pl.DataFrame): """DOCUMENTATION.""" + for element in elements: + col = data.get_column(element) + if not col.dtype.is_temporal(): + mask = mask.with_columns(pl.lit(False).alias(element)) + continue - def is_date_object(object): - if hasattr(object, "year"): - return True + mask = mask.with_columns((pl.col(element) & col.is_not_null()).alias(element)) - mask = pd.DataFrame(index=data.index, data=False, columns=elements) - mask[elements] = ( - data[elements].apply(np.vectorize(is_date_object)) | data[elements].isna() - ) return mask -def validate_numeric(elements, data, schema): +def validate_numeric(mask: pl.DataFrame, elements, data: pl.DataFrame, schema): """DOCUMENTATION.""" - - # Find thresholds in schema. Flag if not available -> warn - def _to_numeric(x): - if x is None: - return np.nan - x = convert_str_boolean(x) - if isinstance(x, bool): - return x - return float(x) - - data[elements] = data[elements].map(_to_numeric) - mask = pd.DataFrame(index=data.index, data=False, columns=elements) lower = {x: schema.get(x).get("valid_min", -np.inf) for x in elements} upper = {x: schema.get(x).get("valid_max", np.inf) for x in elements} + # Handle cases where value is explicitly None in the dictionary + lower.update({k: -np.inf for k, v in lower.items() if v is None}) + upper.update({k: np.inf for k, v in upper.items() if v is None}) + set_elements = [ x for x in lower.keys() if lower.get(x) != -np.inf and upper.get(x) != np.inf ] @@ -57,26 +48,45 @@ def _to_numeric(x): logging.warning( "Corresponding upper and/or lower bounds set to +/-inf for validation" ) - mask[elements] = ( - (data[elements] >= [lower.get(x) for x in elements]) - & (data[elements] <= [upper.get(x) for x in elements]) - ) | data[elements].isna() + + mask = mask.with_columns( + [ + ( + pl.col(element) + | ( + data[element].is_not_null() + & data[element].is_not_nan() + & data[element].is_between( + lower.get(element), upper.get(element), closed="both" + ) + ) + ).alias(element) + for element in elements + ] + ) return mask -def validate_str(elements, data): +def validate_str(mask, elements, data): """DOCUMENTATION.""" - return pd.DataFrame(index=data.index, data=True, columns=elements) + return mask -def validate_codes(elements, data, schema, imodel, ext_table_path): +def validate_codes( + mask: pl.DataFrame, + elements: list[str], + data: pl.DataFrame, + schema, + imodel, + ext_table_path, +): """DOCUMENTATION.""" - mask = pd.DataFrame(index=data.index, data=False, columns=elements) for element in elements: code_table_name = schema.get(element).get("codetable") if not code_table_name: logging.error(f"Code table not defined for element {element}") logging.warning("Element mask set to False") + mask = mask.with_columns(pl.lit(False).alias(element)) continue table = codes.read_table( @@ -90,16 +100,15 @@ def validate_codes(elements, data, schema, imodel, ext_table_path): dtype = properties.polars_dtypes.get(schema.get(element).get("column_type")) table_keys = list(table.keys()) - validation_df = data[element] - value = validation_df.astype(dtype).astype("str") - valid = validation_df.notna() - mask_ = value.isin(table_keys) - mask[element] = mask_.where(valid, True) + col = data.get_column(element) + col = col.cast(dtype).cast(pl.String) + valid = col.is_not_null() & col.is_not_nan() & col.is_in(table_keys) + mask = mask.with_columns((pl.col(element) | valid).alias(element)) return mask -def _get_elements(elements, element_atts, key): +def _get_elements(elements, element_atts, key) -> list[str]: def _condition(x): column_types = element_atts.get(x).get("column_type") if key == "numeric_types": @@ -125,7 +134,8 @@ def _mask_boolean(x, boolean): def validate( - data, + data: pl.DataFrame, + mask: pl.DataFrame, imodel, ext_table_path, schema, @@ -159,37 +169,26 @@ def validate( filename=None, ) # Check input - if not isinstance(data, pd.DataFrame): # or not isinstance(mask0, pd.DataFrame): + if not isinstance(data, pl.DataFrame) or not isinstance(mask, pl.DataFrame): # logging.error("Input data and mask must be a pandas data frame object") logging.error("input data must be a pandas DataFrame.") return - mask = pd.DataFrame(index=data.index, columns=data.columns, dtype=object) - if data.empty: + if data.is_empty(): return mask + disables = disables or [] + # Get the data elements from the input data: might be just a subset of # data model and flatten the schema to get a simple and sequential list # of elements included in the input data - elements = [x for x in data if x not in disables] + elements = [x for x in data.columns if x not in disables] element_atts = schemas.df_schema(elements, schema) - # See what elements we need to validate + # 1. Numeric elements numeric_elements = _get_elements(elements, element_atts, "numeric_types") - datetime_elements = _get_elements(elements, element_atts, "datetime") - coded_elements = _get_elements(elements, element_atts, "key") - str_elements = _get_elements(elements, element_atts, "str") - - if _element_tuples(numeric_elements, datetime_elements, coded_elements): - validated_columns = pd.MultiIndex.from_tuples( - list(set(numeric_elements + coded_elements + datetime_elements)) - ) - else: - validated_columns = list( - set(numeric_elements + coded_elements + datetime_elements) - ) - - mask[numeric_elements] = validate_numeric(numeric_elements, data, element_atts) + print(numeric_elements) + mask = validate_numeric(mask, numeric_elements, data, element_atts) # 2. Table coded elements # See following: in multiple keys code tables, the non parameter element, @@ -201,8 +200,10 @@ def validate( # Get the full list of keys combinations (tuples, triplets...) and check the column combination against that: if it fails, mark the element! # Need to see how to grab the YEAR part of a datetime when YEAR comes from a datetime element # pd.DatetimeIndex(df['_datetime']).year + coded_elements = _get_elements(elements, element_atts, "key") if len(coded_elements) > 0: - mask[coded_elements] = validate_codes( + mask = validate_codes( + mask, coded_elements, data, element_atts, @@ -211,21 +212,12 @@ def validate( ) # 3. Datetime elements - mask[datetime_elements] = validate_datetime(datetime_elements, data) + datetime_elements = _get_elements(elements, element_atts, "datetime") + mask = validate_datetime(mask, datetime_elements, data) # 4. str elements - mask[str_elements] = validate_str(str_elements, data) - - # 5. Set False values - mask[validated_columns] = mask[validated_columns].mask( - data[validated_columns].map(_mask_boolean, boolean=False), - False, - ) - - mask[validated_columns] = mask[validated_columns].mask( - data[validated_columns].map(_mask_boolean, boolean=True), - True, - ) + str_elements = _get_elements(elements, element_atts, "str") + mask = validate_str(mask, str_elements, data) - mask[disables] = np.nan + mask = mask.with_columns([pl.lit(None).alias(disable) for disable in disables]) return mask From 4973677daccaea6fc25661131307bf73f99c3b76 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 13:54:30 +0000 Subject: [PATCH 45/46] chore: remove debug print statement --- cdm_reader_mapper/mdf_reader/utils/validators.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/validators.py b/cdm_reader_mapper/mdf_reader/utils/validators.py index ee773285..d03f88ab 100755 --- a/cdm_reader_mapper/mdf_reader/utils/validators.py +++ b/cdm_reader_mapper/mdf_reader/utils/validators.py @@ -187,7 +187,6 @@ def validate( # 1. Numeric elements numeric_elements = _get_elements(elements, element_atts, "numeric_types") - print(numeric_elements) mask = validate_numeric(mask, numeric_elements, data, element_atts) # 2. Table coded elements From 56ab3fe415b5241ef41afe493ab2424486534433 Mon Sep 17 00:00:00 2001 From: jtsiddons Date: Fri, 28 Mar 2025 15:24:39 +0000 Subject: [PATCH 46/46] fix: reduce complexity, handle explicit None in schema for numeric bounds --- cdm_reader_mapper/mdf_reader/utils/validators.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cdm_reader_mapper/mdf_reader/utils/validators.py b/cdm_reader_mapper/mdf_reader/utils/validators.py index d03f88ab..7f2fd0d1 100755 --- a/cdm_reader_mapper/mdf_reader/utils/validators.py +++ b/cdm_reader_mapper/mdf_reader/utils/validators.py @@ -28,12 +28,9 @@ def validate_datetime(mask, elements, data: pl.DataFrame): def validate_numeric(mask: pl.DataFrame, elements, data: pl.DataFrame, schema): """DOCUMENTATION.""" - lower = {x: schema.get(x).get("valid_min", -np.inf) for x in elements} - upper = {x: schema.get(x).get("valid_max", np.inf) for x in elements} - # Handle cases where value is explicitly None in the dictionary - lower.update({k: -np.inf for k, v in lower.items() if v is None}) - upper.update({k: np.inf for k, v in upper.items() if v is None}) + lower = {x: schema.get(x).get("valid_min") or -np.inf for x in elements} + upper = {x: schema.get(x).get("valid_max") or np.inf for x in elements} set_elements = [ x for x in lower.keys() if lower.get(x) != -np.inf and upper.get(x) != np.inf