diff --git a/.gitignore b/.gitignore index d71e300..5c82f02 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,6 @@ tests/data/slocum.gps.csv # Notebooks *.ipynb + +# Pycharm projects +*.idea \ No newline at end of file diff --git a/src/glide/cli.py b/src/glide/cli.py index eee8acd..5798456 100644 --- a/src/glide/cli.py +++ b/src/glide/cli.py @@ -112,6 +112,23 @@ def l2( "-d", help="Minimum distance between profiles in number of data points." ), ] = 20, + riot_csv: Annotated[ + str | None, + typer.Option( + "-r", + "--riot-csv", + help="File path to output a RIOT-compatible CSV file in addition " + "to netCDF.", + ), + ] = None, + riot_add_positions: Annotated[ + bool, + typer.Option( + "--riot-positions", + help="Interpolate and add depth, latitude, and longitude into RIOT CSV " + "output.", + ), + ] = False, ) -> None: """ Generate L2 data from L1 data. @@ -144,6 +161,11 @@ def l2( out.to_netcdf(out_file) + if riot_csv: + from .riot_csv_writer import write_riot_csv + + write_riot_csv(out, riot_add_positions, riot_csv) + @app.command() @log_args diff --git a/src/glide/riot_csv_writer.py b/src/glide/riot_csv_writer.py new file mode 100644 index 0000000..4216a8f --- /dev/null +++ b/src/glide/riot_csv_writer.py @@ -0,0 +1,214 @@ +# Functions to parse the RIOT acoustics data and write to CSV + +import logging +import os +from collections import OrderedDict + +import numpy as np +import xarray as xr + +_log = logging.getLogger(__name__) + + +def write_riot_csv(ds: xr.Dataset, add_positions: bool, output_path: str) -> None: + """Write xarray Dataset to a RIOT-formatted CSV file. + The output is a wide, record-oriented CSV (one row per ping) whose + columns correspond to the fixed RIOT variables expected in a + RIOT Data User manual file format. + At a minimum, this function writes the following RIOT ping fields + as individual columns: + - ``sr_ping_epoch_days`` + - ``sr_ping_secs`` + - ``sr_ping_msecs`` + - ``sr_ping_rt_msecs`` + - ``sr_ping_freq`` + - ``sr_ping_detection_level`` + - ``sr_ping_sequence_number`` + - ``sr_ping_platform_id`` + - ``sr_ping_slot`` + - ``sr_ping_group`` + - ``sr_platform_state`` + If ``add_positions`` is True and the dataset contains them, the + following position variables are also included as additional + columns: + - ``depth`` + - ``lat`` + - ``lon`` + The resulting CSV, containing one record per ping with these + columns, is written to ``output_path``. + """ + + riot_vars = [ + "sr_ping_epoch_days", + "sr_ping_secs", + "sr_ping_msecs", + "sr_ping_rt_msecs", + "sr_ping_freq", + "sr_ping_detection_level", + "sr_ping_sequence_number", + "sr_ping_platform_id", + "sr_ping_slot", + "sr_ping_group", + "sr_platform_state", + ] + csv_columns_map = OrderedDict( + { + "sr_ping_rt_msecs": "rtMsecs", + "sr_ping_freq": "freq", + "sr_ping_detection_level": "detectionLevel", + "sr_ping_sequence_number": "sequenceNumber", + "sr_ping_group": "group", + "sr_ping_slot": "slot", + "sr_ping_platform_id": "platformId", + "sr_platform_state": "platformState", + } + ) + + # Check that all required RIOT variables are present in the dataset + if not set(riot_vars).issubset(set(ds.data_vars)): + missing_vars = set(riot_vars).difference(ds.data_vars) + _log.error(f"Dataset is missing required RIOT variables: {missing_vars}") + return + + # Drop any variables that are not needed for RIOT output + _log.debug(f"Gathering RIOT variables {output_path}") + vars_to_drop = set(ds.variables).difference(riot_vars) + riot_ds = ds.drop_vars(vars_to_drop) + if riot_ds.sizes.get("time", 0) == 0: + _log.error("Time dimension of RIOT dataset is empty, no data to write to CSV") + return + + # ToDo: this drop zeros section should be moved to processing L2 + # for issue#32, but finish the riot_csv branch first + # Drop any records with all zeros or NaNs + temp_riot_array = riot_ds.to_array() + rows_to_keep = np.logical_not( + np.all(np.logical_or(np.isnan(temp_riot_array), temp_riot_array == 0), axis=0) + ) + riot_ds = riot_ds.where(rows_to_keep, drop=True) + if riot_ds.sizes["time"] == 0: + _log.error("All RIOT records are zeros or NaNs, no data to write to CSV") + return + + # typecasting according to RIOT User data manual + epoch_days = riot_ds["sr_ping_epoch_days"].values.astype(np.int64) + secs = riot_ds["sr_ping_secs"].values.astype(np.int64) + msecs = riot_ds["sr_ping_msecs"].values.astype(np.int64) + # calculate the epoch time in milliseconds + epoch_msecs = np.empty_like(epoch_days, dtype=np.int64) + epoch_msecs[:] = epoch_days * 86400 * 1000 + secs * 1000 + msecs + + # converting everything to Int64 type makes it all integers but with + # 'NA' as a missing value, which will fill in as blank in the CSV. + riot_df = riot_ds.to_dataframe().astype("Int64") + + # drop the columns used to create epoch_msecs + riot_df = riot_df.drop( + ["sr_ping_epoch_days", "sr_ping_secs", "sr_ping_msecs"], axis=1 + ) + + # rename columns and reorder to match RIOT User data manual format + riot_df = riot_df.rename(columns=csv_columns_map)[csv_columns_map.values()] + + # Add the additional columns + riot_df.insert(loc=0, column="epochMsecs", value=epoch_msecs) + riot_df.insert(loc=0, column="riotDataPrefix", value="$riotData") + riot_df["recNumInFile"] = 65535 # unused record number in file. + + if add_positions: + riot_df = _add_positions(ds, riot_df, rows_to_keep) + + # Write to CSV + _log.debug(f"Writing to RIOT CSV: {output_path}") + # If the file exists already, it will append, so don't write + # the header. + if os.path.exists(output_path): + headerwrite = False + else: + headerwrite = True + + riot_df.to_csv( + output_path, index=False, header=headerwrite, lineterminator="\n", mode="a" + ) + + +def _add_positions(ds, riot_df, rows_to_keep): + """Add position variables (depth, lat, lon) to the RIOT DataFrame by + interpolating from the glider position data to the RIOT + timestamps. Only RIOT timestamps that fall within the time + boundaries of the available glider position data will be + interpolated; others will be left as NaN (and thus blank in the + CSV). + """ + _log.debug("Adding position variables to RIOT CSV") + # including depth in positions assumes that the thermodynamic + # calculations were added. + position_vars = ["depth", "lat", "lon", "time"] + + if not set(position_vars).issubset(ds.variables): + missing_vars = set(position_vars).difference(ds.variables) + _log.warning( + f"Position variables {missing_vars} are missing from " + "dataset, positions cannot be added to RIOT CSV filling " + "with blanks" + ) + riot_df["depth"] = np.nan + riot_df["lat"] = np.nan + riot_df["lon"] = np.nan + return riot_df + + vars_to_drop = set(ds.variables).difference(position_vars) + position_ds = ds.drop_vars(vars_to_drop) + position_ds = position_ds.where(rows_to_keep, drop=True) + + # Gather the timestamps for checking if interpolation is possible + riot_ts = riot_df["epochMsecs"] / 1000 + glider_ts = position_ds["time"].values + + # pre-allocate arrays with NaNs + depth = np.full(riot_ts.shape, np.nan) + lat = np.full(riot_ts.shape, np.nan) + lon = np.full(riot_ts.shape, np.nan) + + q_depth = np.logical_and( + np.isfinite(position_ds["depth"]), position_ds["depth"] != 0 + ) + q_pos = np.logical_and( + np.isfinite(position_ds["lat"]), np.isfinite(position_ds["lon"]) + ) + + # Only interpolate to timestamps that fall within the time boundaries + # of the available glider position data. Any that fall outside + # will be NaNs and ultimately blanks in the CSV file. + if not q_depth.values.any(): + _log.warning("No valid depths found. Adding blank depths") + else: + _log.debug( + "Interpolating depth variable to RIOT timestamps that fall " + "within the glider depth time boundaries" + ) + qdepth_in_tbnds = np.logical_and( + riot_ts >= glider_ts[q_depth][0], riot_ts <= glider_ts[q_depth][-1] + ) + depth[qdepth_in_tbnds] = np.interp( + riot_ts[qdepth_in_tbnds], glider_ts[q_depth], position_ds["depth"][q_depth] + ) + + if not q_pos.values.any(): + _log.warning("No valid positions found. Adding blank positions") + else: + qpos_in_tbnds = np.logical_and( + riot_ts >= glider_ts[q_pos][0], riot_ts <= glider_ts[q_pos][-1] + ) + lat[qpos_in_tbnds] = np.interp( + riot_ts[qpos_in_tbnds], glider_ts[q_pos], position_ds["lat"][q_pos] + ) + lon[qpos_in_tbnds] = np.interp( + riot_ts[qpos_in_tbnds], glider_ts[q_pos], position_ds["lon"][q_pos] + ) + + riot_df["depth"] = depth + riot_df["lat"] = lat + riot_df["lon"] = lon + + return riot_df diff --git a/tests/test_riot_csv_writer.py b/tests/test_riot_csv_writer.py new file mode 100644 index 0000000..33f5234 --- /dev/null +++ b/tests/test_riot_csv_writer.py @@ -0,0 +1,165 @@ +import os + +import numpy as np +import pandas as pd +import xarray as xr + +from glide.riot_csv_writer import write_riot_csv + + +def _make_riot_dataset(n: int = 5, include_positions: bool = False) -> xr.Dataset: + """Build a minimal xr.Dataset that satisfies write_riot_csv requirements. + + Parameters + ---------- + n : int + Number of time steps. + include_positions : bool + If True, add depth / lat / lon variables so that ``_add_positions`` + has something to interpolate. + """ + time = np.arange(n, dtype=np.float64) + 1.0 # non-zero epoch seconds + + ds = xr.Dataset( + { + "sr_ping_epoch_days": ("time", np.full(n, 19500, dtype=np.float64)), + "sr_ping_secs": ("time", np.arange(n, dtype=np.float64) * 10), + "sr_ping_msecs": ("time", np.arange(n, dtype=np.float64) * 100), + "sr_ping_rt_msecs": ("time", np.arange(n, dtype=np.float64) * 1000), + "sr_ping_freq": ("time", np.full(n, 69000, dtype=np.float64)), + "sr_ping_detection_level": ( + "time", + np.random.default_rng(0).integers(0, 100, n).astype(np.float64), + ), + "sr_ping_sequence_number": ("time", np.arange(n, dtype=np.float64)), + "sr_ping_platform_id": ("time", np.full(n, 42, dtype=np.float64)), + "sr_ping_slot": ("time", np.ones(n, dtype=np.float64)), + "sr_ping_group": ("time", np.ones(n, dtype=np.float64)), + "sr_platform_state": ("time", np.full(n, 3, dtype=np.float64)), + }, + coords={"time": time}, + ) + + if include_positions: + ds["depth"] = ("time", np.linspace(10, 50, n)) + ds["lat"] = ("time", np.linspace(44.0, 44.1, n)) + ds["lon"] = ("time", np.linspace(-124.0, -123.9, n)) + + return ds + + +class TestWriteRiotCsv: + """Tests for write_riot_csv.""" + + def test_creates_csv(self, tmp_path: object) -> None: + """A valid dataset produces a CSV file.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + ds = _make_riot_dataset() + write_riot_csv(ds, add_positions=False, output_path=out) + + assert os.path.exists(out) + + def test_csv_columns_without_positions(self, tmp_path: object) -> None: + """CSV should have the standard RIOT columns when positions are off.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + write_riot_csv(_make_riot_dataset(), add_positions=False, output_path=out) + + df = pd.read_csv(out) + expected = [ # This is the RIOT order specified in the docs + "riotDataPrefix", + "epochMsecs", + "rtMsecs", + "freq", + "detectionLevel", + "sequenceNumber", + "group", + "slot", + "platformId", + "platformState", + "recNumInFile", + ] + assert list(df.columns) == expected + + def test_csv_columns_with_positions(self, tmp_path: object) -> None: + """When positions are enabled and available, depth/lat/lon columns appear.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + ds = _make_riot_dataset(include_positions=True) + write_riot_csv(ds, add_positions=True, output_path=out) + + df = pd.read_csv(out) + for col in ("depth", "lat", "lon"): + assert col in df.columns, f"Missing column: {col}" + + def test_csv_columns_with_positions_missing_vars(self, tmp_path: object) -> None: + """When positions are requested but vars are missing, blank columns appear.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + ds = _make_riot_dataset(include_positions=False) + write_riot_csv(ds, add_positions=True, output_path=out) + + df = pd.read_csv(out) + for col in ("depth", "lat", "lon"): + assert col in df.columns + assert df[col].isna().all(), f"{col} should be all NaN" + + def test_row_count(self, tmp_path: object) -> None: + """Output should have as many rows as valid (non-zero) time steps.""" + n = 8 + out = str(tmp_path / "riot.csv") # type: ignore[operator] + write_riot_csv(_make_riot_dataset(n=n), add_positions=False, output_path=out) + + df = pd.read_csv(out) + assert len(df) == n + + def test_epoch_msecs_calculation(self, tmp_path: object) -> None: + """epochMsecs should equal days*86400000 + secs*1000 + msecs.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + ds = _make_riot_dataset(n=3) + write_riot_csv(ds, add_positions=False, output_path=out) + + df = pd.read_csv(out) + days = ds["sr_ping_epoch_days"].values.astype(np.int64) + secs = ds["sr_ping_secs"].values.astype(np.int64) + msecs = ds["sr_ping_msecs"].values.astype(np.int64) + expected = days * 86400 * 1000 + secs * 1000 + msecs + np.testing.assert_array_equal(df["epochMsecs"].values, expected) + + def test_append_mode(self, tmp_path: object) -> None: + """Calling write_riot_csv twice should append without repeating headers.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + ds = _make_riot_dataset(n=3) + write_riot_csv(ds, add_positions=False, output_path=out) + write_riot_csv(ds, add_positions=False, output_path=out) + + with open(out) as f: + lines = f.readlines() + + # header once + 3 rows + 3 appended rows = 7 lines + assert len(lines) == 7 + + def test_riot_data_prefix(self, tmp_path: object) -> None: + """Every row should have '$riotData' in the riotDataPrefix column.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + write_riot_csv(_make_riot_dataset(), add_positions=False, output_path=out) + + df = pd.read_csv(out) + assert (df["riotDataPrefix"] == "$riotData").all() + + def test_missing_variables_returns_early(self, tmp_path: object) -> None: + """If required RIOT vars are missing, no file is created.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + ds = xr.Dataset({"dummy": ("time", [1, 2, 3])}) + write_riot_csv(ds, add_positions=False, output_path=out) + + assert not os.path.exists(out) + + def test_all_zeros_dropped(self, tmp_path: object) -> None: + """Rows where all RIOT variables are zero should be dropped.""" + out = str(tmp_path / "riot.csv") # type: ignore[operator] + ds = _make_riot_dataset(n=5) + # Set all RIOT vars in the first row to 0 + for var in ds.data_vars: + ds[var].values[0] = 0 + write_riot_csv(ds, add_positions=False, output_path=out) + + df = pd.read_csv(out) + assert len(df) == 4 # first row should have been dropped