From 0b0247587bd1b4c2565c62584d913a7839ab8018 Mon Sep 17 00:00:00 2001 From: Jesse Cusack Date: Sat, 3 Jan 2026 23:00:00 -0800 Subject: [PATCH] refactor the merging functionality --- src/glide/ancillery.py | 29 ++++++++++++++ src/glide/cli.py | 82 +++++++++++++++++++++++++++------------- src/glide/process_l1.py | 36 +++++++++--------- src/glide/process_l2.py | 6 +-- src/glide/process_l3.py | 26 ++----------- tests/test_process_l1.py | 5 +-- 6 files changed, 110 insertions(+), 74 deletions(-) create mode 100644 src/glide/ancillery.py diff --git a/src/glide/ancillery.py b/src/glide/ancillery.py new file mode 100644 index 0000000..0e85b48 --- /dev/null +++ b/src/glide/ancillery.py @@ -0,0 +1,29 @@ +# Functions for reading non-glide files +import logging + +import xarray as xr + +_log = logging.getLogger(__name__) + + +# Public functions + + +def concat(file_list: list[str], concat_dim: str = "time") -> xr.Dataset: + _log.debug("Loading files") + return xr.open_mfdataset( + file_list, + concat_dim=concat_dim, + combine="nested", + compat="override", + coords="minimal", + decode_timedelta=False, + data_vars="minimal", + ).load() + + +def parse_q(q_file: str) -> xr.Dataset: + _log.debug("Loading Q files") + return xr.open_mfdataset(q_file, decode_timedelta=False)[ + ["e_1", "e_2", "pressure"] + ].load() diff --git a/src/glide/cli.py b/src/glide/cli.py index 7dabeee..464fe30 100644 --- a/src/glide/cli.py +++ b/src/glide/cli.py @@ -6,10 +6,11 @@ from importlib.metadata import version from pathlib import Path +import netCDF4 as nc import typer from typing_extensions import Annotated -from . import config, hotel, process_l1, process_l2, process_l3 +from . import ancillery, config, hotel, process_l1, process_l2, process_l3 _log = logging.getLogger(__name__) @@ -86,7 +87,9 @@ def l1b( """ conf = config.load_config(config_file) - ds = process_l1.parse_l1(file, conf) + ds = process_l1.parse_l1(file) + + ds = process_l1.format_l1(ds, conf) ds = process_l1.apply_qc(ds, conf) @@ -115,8 +118,11 @@ def l2( """ conf = config.load_config(config_file) - flt = process_l1.parse_l1(flt_file, conf) - sci = process_l1.parse_l1(sci_file, conf) + flt = process_l1.parse_l1(flt_file) + sci = process_l1.parse_l1(sci_file) + + flt = process_l1.format_l1(flt, conf) + sci = process_l1.format_l1(sci, conf) flt = process_l1.apply_qc(flt, conf) sci = process_l1.apply_qc(sci, conf) @@ -163,7 +169,7 @@ def l3( if q_netcdf is not None: conf = config.load_config(config_file) - q = process_l3.parse_q(q_netcdf) + q = ancillery.parse_q(q_netcdf) out = process_l3.bin_q(out, q, bin_size, conf) @@ -172,45 +178,69 @@ def l3( @app.command() @log_args -def ml3( - l3_file: Annotated[str, typer.Argument(help="The L3 dataset.")], - out_file: _out_file_annotation = "slocum.l3.nc", - q_netcdf: Annotated[ - str | None, - typer.Option("--q-in", "-q", help="netCDF file(s) processed by q2netcdf."), - ] = None, +def merge( + glide_file: Annotated[ + str, typer.Argument(help="A L2 or L3 dataset produced by glide.") + ], + input_file: Annotated[str, typer.Argument(help="Input file(s) of a given type.")], + file_type: Annotated[ + str, + typer.Argument( + help="Choose 'q' for q2netcdf output file, 'p' for p for p2netcdf output file." + ), + ], + out_file: _out_file_annotation = "slocum.merged.nc", config_file: _config_annotation = None, overwrite: Annotated[ bool, typer.Option( "--overwrite", "-w", - help="Overwrite the existing L3 dataset if it exists.", + help="Overwrite the existing dataset if it exists.", ), ] = False, ) -> None: """ - Merge ancillary data into L3 data. + Merge ancillary data into L2 or L3 data. """ - # I could remove the defaul argument to enforce this rule but I am anticipating that - # in the future we may want to merge other kinds of data into the L3 dataset. - if q_netcdf is None: - raise typer.BadParameter("The --q-in option is required for ml3 command.") - if not overwrite and Path(out_file).exists(): + if file_type not in ["q", "p"]: + raise typer.BadParameter(f"The file type {file_type} must be q or p.") + + if Path(out_file).exists() and not overwrite: raise typer.BadParameter( f"The output file {out_file} already exists. Use --overwrite to overwrite it." ) - l3, bin_size = process_l3.parse_l3(l3_file) + # Figure out the processig level of the input + input_file_level = -1 + ds = nc.Dataset(glide_file) + dataset_dims = set(ds.dimensions) + ds.close() + + if dataset_dims == {"time"}: + input_file_level = 2 + elif dataset_dims == {"profile_id", "z"}: + input_file_level = 3 + else: + raise ValueError( + f"Could not determine processing level of input file {glide_file} with dimensions {dataset_dims}" + ) conf = config.load_config(config_file) - q = process_l3.parse_q(q_netcdf) - - out = process_l3.bin_q(l3, q, bin_size, conf) - - out.to_netcdf(out_file) + if file_type == "q": + if input_file_level == 3: + l3, bin_size = process_l3.parse_l3(glide_file) + q = ancillery.parse_q(input_file) + out = process_l3.bin_q(l3, q, bin_size, conf) + out.to_netcdf(out_file) + else: + raise NotImplementedError( + "Merging q files only supported for level 3 data." + ) + if file_type == "p": + raise NotImplementedError("Merging of p files is not yet supported.") @app.command() @@ -260,7 +290,7 @@ def concat( """ Concatenate multiple netCDF files along a dimension. """ - ds = process_l3.concat(files, concat_dim=concat_dim) + ds = ancillery.concat(files, concat_dim=concat_dim) ds.to_netcdf(out_file) diff --git a/src/glide/process_l1.py b/src/glide/process_l1.py index 1bc62f0..cba027f 100644 --- a/src/glide/process_l1.py +++ b/src/glide/process_l1.py @@ -17,22 +17,6 @@ # Helper functions -def _load_l1_file(file: str | xr.Dataset) -> xr.Dataset: - if isinstance(file, str): - _log.debug("Parsing L1 %s", file) - try: - ds = xr.open_dataset(file, decode_timedelta=True).drop_dims("j").load() - _log.debug("xarray.open_dataset opened %s", file) - except ValueError: - ds = pd.read_csv(file).to_xarray() - _log.debug("pandas.read_csv opened %s", file) - elif isinstance(file, xr.Dataset): # Primarily for testing - ds = file - else: - raise ValueError(f"Expected type str or xarray.Dataset but got {type(file)}") - return ds - - def _fix_time_varaiable_conflict(ds: xr.Dataset) -> xr.Dataset: """This fixes conflicting time variable names when parsing a combined flight/science data. Generally, they should be parsed separately.""" @@ -87,10 +71,24 @@ def _format_variables( # Public API functions -def parse_l1(file: str | xr.Dataset, config: dict) -> xr.Dataset: - """Parses flight (sbd) or science (tbd) data processed by dbd2netcdf or dbd2csv.""" +def parse_l1(file: str | xr.Dataset) -> xr.Dataset: + if isinstance(file, str): + _log.debug("Parsing L1 %s", file) + try: + ds = xr.open_dataset(file, decode_timedelta=True).drop_dims("j").load() + _log.debug("xarray.open_dataset opened %s", file) + except ValueError: + ds = pd.read_csv(file).to_xarray() + _log.debug("pandas.read_csv opened %s", file) + elif isinstance(file, xr.Dataset): # Primarily for testing + ds = file + else: + raise ValueError(f"Expected type str or xarray.Dataset but got {type(file)}") + return ds + - ds = _load_l1_file(file) +def format_l1(ds: xr.Dataset, config: dict) -> xr.Dataset: + """Parses flight (sbd) or science (tbd) data processed by dbd2netcdf or dbd2csv.""" ds = _fix_time_varaiable_conflict(ds) diff --git a/src/glide/process_l2.py b/src/glide/process_l2.py index 2372182..ddd1acb 100644 --- a/src/glide/process_l2.py +++ b/src/glide/process_l2.py @@ -1,4 +1,4 @@ -# Level 3 processing of the level 2 processed data +# Level 3 processing of the level 2 data # Data are binned in depth import logging @@ -54,8 +54,8 @@ def _get_profile_indexes(ds: xr.Dataset) -> NDArray: # Public functions -def parse_l2(l2_file: str) -> xr.Dataset: - return xr.open_dataset(l2_file, decode_timedelta=True).load() +def parse_l2(file: str) -> xr.Dataset: + return xr.open_dataset(file, decode_timedelta=True).load() def bin_l2( diff --git a/src/glide/process_l3.py b/src/glide/process_l3.py index 516e92c..b138344 100644 --- a/src/glide/process_l3.py +++ b/src/glide/process_l3.py @@ -1,4 +1,4 @@ -# Additional processing of the l3 data, including the assimiation +# Additional processing of the l3 data, including the assimilation # of other variables such as epsilon. import logging @@ -18,33 +18,13 @@ def _infer_bin_size(ds: xr.Dataset) -> float: # Public functions -def concat(file_list: list[str], concat_dim: str = "time") -> xr.Dataset: - _log.debug("Loading files") - return xr.open_mfdataset( - file_list, - concat_dim=concat_dim, - combine="nested", - compat="override", - coords="minimal", - decode_timedelta=False, - data_vars="minimal", - ).load() - - -def parse_l3(l3_file: str) -> tuple[xr.Dataset, float]: - ds = xr.open_dataset(l3_file, decode_timedelta=True).load() +def parse_l3(file: str) -> tuple[xr.Dataset, float]: + ds = xr.open_dataset(file, decode_timedelta=True).load() bin_size = _infer_bin_size(ds) ds.close() # Will enable overwrite of existing l3 file. return ds, bin_size -def parse_q(q_file: str) -> xr.Dataset: - _log.debug("Loading Q files") - return xr.open_mfdataset(q_file, decode_timedelta=False)[ - ["e_1", "e_2", "pressure"] - ].load() - - def bin_q( ds: xr.Dataset, ds_q: xr.Dataset, bin_size: float, config: dict ) -> xr.Dataset: diff --git a/tests/test_process_l1.py b/tests/test_process_l1.py index a4d3ad9..a52b7dd 100644 --- a/tests/test_process_l1.py +++ b/tests/test_process_l1.py @@ -26,6 +26,5 @@ def test_format_variables() -> None: def test_parse_l1() -> None: - config = load_config() - pl1.parse_l1(get_test_data("684", "sbd"), config) - pl1.parse_l1(get_test_data("684", "tbd"), config) + pl1.parse_l1(get_test_data("684", "sbd")) + pl1.parse_l1(get_test_data("684", "tbd"))