diff --git a/pyproject.toml b/pyproject.toml index 41e8b3c..aebc65d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,3 +43,8 @@ dev = [ [tool.mypy] disable_error_code = ["import-untyped"] + +[tool.pytest.ini_options] +filterwarnings = [ + "ignore:Mean of empty slice:RuntimeWarning", +] diff --git a/src/glide/process_l1.py b/src/glide/process_l1.py index c8bcd4f..1a5ac3f 100644 --- a/src/glide/process_l1.py +++ b/src/glide/process_l1.py @@ -406,84 +406,6 @@ def assign_surface_state( return ds -def _find_dive_cycles(state: np.ndarray) -> list[tuple[int, int, int, int]]: - """Find dive cycles from state array. - - A dive cycle is a contiguous sequence of dive (1), climb (2), and unknown (-1) - states bounded by surface (0) states. Unknown states are included because - there may be gaps at inflection points between dive and climb. - - Parameters - ---------- - state : np.ndarray - State array with values -1 (unknown), 0 (surface), 1 (dive), 2 (climb). - - Returns - ------- - list of tuples - Each tuple is (cycle_start, cycle_end, surf_start, surf_end) where: - - cycle_start/end: indices of the dive/climb portion - - surf_start/end: indices of the following surface period (for velocity lookup) - """ - cycles = [] - n = len(state) - i = 0 - - while i < n: - # Skip initial unknown states at the very start - while i < n and state[i] == -1: - i += 1 - - if i >= n: - break - - # Skip surface period (before dive) - while i < n and state[i] == 0: - i += 1 - - if i >= n: - break - - # Now we should be in dive/climb or unknown (gap) - # Look for any dive or climb state to start a cycle - if state[i] not in (-1, 1, 2): - i += 1 - continue - - # Find the actual start of dive/climb activity - cycle_start = i - - # Continue through dive (1), climb (2), and unknown (-1) until we hit surface (0) - # This captures the full dive cycle including any gaps at inflection points - while i < n and state[i] != 0: - i += 1 - - cycle_end = i - - # Check if this segment actually contains any dive or climb - segment_states = state[cycle_start:cycle_end] - has_dive_climb = np.any((segment_states == 1) | (segment_states == 2)) - - if not has_dive_climb: - # Pure unknown segment, skip it - continue - - # Find following surface period - surf_start = i - while i < n and state[i] == 0: - i += 1 - surf_end = i - - # Only record if we found a surface period after (for velocity) - if surf_end > surf_start: - cycles.append((cycle_start, cycle_end, surf_start, surf_end)) - else: - # No surface after - this is incomplete, skip velocity - cycles.append((cycle_start, cycle_end, -1, -1)) - - return cycles - - def add_velocity( ds: xr.Dataset, config: dict, @@ -491,14 +413,16 @@ def add_velocity( ) -> xr.Dataset: """Add depth-averaged velocity variables to L2 dataset. - Uses state transitions to identify dive cycles. A dive cycle is all - contiguous dive (1) and climb (2) states between surface (0) periods. - Velocity is taken from the following surface period. + Groups profiles by velocity reports from the raw flight data. Each velocity + report (m_water_vx/vy) marks a surfacing. All profiles between consecutive + velocity reports belong to one group and share that velocity estimate. + An additional group is created for any trailing profiles with no velocity + report yet (for later backfill). Parameters ---------- ds : xr.Dataset - L2 dataset with state variable (must have surface states assigned). + L2 dataset with dive_id and climb_id from get_profiles. config : dict Configuration with variable specifications. flt : xr.Dataset, optional @@ -514,56 +438,64 @@ def _empty_velocity(): ds["v"] = (("time_uv",), [np.nan], specs["v"]["CF"]) return ds - if "state" not in ds: - _log.warning("No state variable in dataset - run get_profiles first") + if "dive_id" not in ds or "climb_id" not in ds: + _log.warning("No dive/climb IDs in dataset - run get_profiles first") return _empty_velocity() - # Find dive cycles from state transitions - cycles = _find_dive_cycles(ds.state.values) - n_cycles = len(cycles) + time_l2 = ds.time.values + lat_l2 = ds.lat.values + lon_l2 = ds.lon.values + is_profile = (ds.dive_id.values >= 0) | (ds.climb_id.values >= 0) - if n_cycles < 1: - _log.warning("No dive cycles detected from state transitions") + if not is_profile.any(): + _log.warning("No profiles found in dataset") return _empty_velocity() - _log.debug("Found %d dive cycles from state transitions", n_cycles) - - # Extract velocity data from flight data vel_times, vel_u, vel_v = _extract_velocity_data(flt) - time_l2 = ds.time.values - lat_l2 = ds.lat.values - lon_l2 = ds.lon.values - - time_uv = np.full(n_cycles, np.nan) - lat_uv = np.full(n_cycles, np.nan) - lon_uv = np.full(n_cycles, np.nan) - u = np.full(n_cycles, np.nan) - v = np.full(n_cycles, np.nan) - - for i, (cycle_start, cycle_end, surf_start, surf_end) in enumerate(cycles): - # Mean time and position from the dive/climb portion - cycle_times = time_l2[cycle_start:cycle_end] - cycle_lats = lat_l2[cycle_start:cycle_end] - cycle_lons = lon_l2[cycle_start:cycle_end] - - if len(cycle_times) > 0: - time_uv[i] = np.nanmean(cycle_times) - lat_uv[i] = np.nanmean(cycle_lats) - lon_uv[i] = np.nanmean(cycle_lons) - - # Find velocity in the following surface period - if surf_start < 0 or vel_times is None: - continue + # Build groups from velocity reports, each marking a surfacing. + # Profiles between consecutive velocity reports form one group. + groups = [] # list of (profile_mask, u_val, v_val) + + if ( + vel_times is not None + and vel_u is not None + and vel_v is not None + and len(vel_times) > 0 + ): + for i, t_vel in enumerate(vel_times): + t_start = vel_times[i - 1] if i > 0 else -np.inf + mask = (time_l2 > t_start) & (time_l2 <= t_vel) & is_profile + if mask.any(): + groups.append((mask, vel_u[i], vel_v[i])) + + # Trailing profiles after the last velocity report (no velocity yet) + mask = (time_l2 > vel_times[-1]) & is_profile + if mask.any(): + groups.append((mask, np.nan, np.nan)) + else: + # No velocity data at all — one group with all profiles, NaN velocity + if is_profile.any(): + groups.append((is_profile, np.nan, np.nan)) - t_start = time_l2[surf_start] - t_end = time_l2[surf_end - 1] + 60 if surf_end > surf_start else t_start + 60 + if not groups: + return _empty_velocity() - u_vel, v_vel = _find_velocity_in_window(vel_times, vel_u, vel_v, t_start, t_end) - if u_vel is not None: - u[i] = u_vel - v[i] = v_vel - _log.debug("Cycle %d: assigned u=%.4f, v=%.4f", i, u[i], v[i]) + n_groups = len(groups) + time_uv = np.full(n_groups, np.nan) + lat_uv = np.full(n_groups, np.nan) + lon_uv = np.full(n_groups, np.nan) + u = np.full(n_groups, np.nan) + v = np.full(n_groups, np.nan) + + for i, (mask, u_val, v_val) in enumerate(groups): + time_uv[i] = np.nanmean(time_l2[mask]) + lat_uv[i] = np.nanmean(lat_l2[mask]) + lon_uv[i] = np.nanmean(lon_l2[mask]) + u[i] = u_val + v[i] = v_val + if np.isfinite(u_val): + _log.debug("Group %d: assigned u=%.4f, v=%.4f", i, u_val, v_val) ds["time_uv"] = (("time_uv",), time_uv, specs["time_uv"]["CF"]) ds["lat_uv"] = (("time_uv",), lat_uv, specs["lat_uv"]["CF"]) @@ -572,8 +504,8 @@ def _empty_velocity(): ds["v"] = (("time_uv",), v, specs["v"]["CF"]) _log.info( - "Added velocity for %d dive cycles, %d with valid data", - n_cycles, + "Added velocity for %d groups, %d with valid data", + n_groups, np.isfinite(u).sum(), ) return ds @@ -606,7 +538,34 @@ def _extract_velocity_data( _log.warning("No valid velocity data in flight data") return None, None, None - return time_flt[vel_valid], u_flt[vel_valid], v_flt[vel_valid] + times = time_flt[vel_valid] + u_vals = u_flt[vel_valid] + v_vals = v_flt[vel_valid] + + # Sort by time (combined L1 files may not be time-ordered) + order = np.argsort(times) + times = times[order] + u_vals = u_vals[order] + v_vals = v_vals[order] + + # Cluster velocity reports by surfacing events. At each surfacing the + # flight computer may refine velocity as GPS improves, producing multiple + # reports within minutes. A gap longer than `gap_threshold` indicates the + # glider dove and surfaced again. We take the last report from each + # surfacing as the best estimate. + gap_threshold = 600 # seconds (10 minutes) + dt = np.diff(times) + new_surfacing = np.concatenate(([True], dt > gap_threshold)) + starts = np.where(new_surfacing)[0] + ends = np.concatenate((starts[1:], [len(times)])) - 1 + + _log.debug( + "Clustered %d velocity reports into %d surfacing events", + len(times), + len(ends), + ) + + return times[ends], u_vals[ends], v_vals[ends] def _find_velocity_in_window( @@ -649,10 +608,9 @@ def backfill_velocity( ) -> bool: """Backfill velocity data in an L2 file using raw flight data. - For each velocity estimate, searches raw flight data for velocity in a - time window starting at the end of the climb (last point with matching - climb_id after time_uv). Updates the value if it is missing (NaN) or if - the new estimate differs significantly from the existing one. + For each time_uv entry with missing velocity, searches raw flight data + for the first velocity report after that entry's time. Updates velocity + if missing or if the new estimate differs significantly. Parameters ---------- @@ -686,20 +644,13 @@ def backfill_velocity( _log.debug("Found %d velocity estimates in raw data", len(vel_times)) - # Update the L2 file file_updated = False with nc.Dataset(l2_file, "r+") as ds: if "u" not in ds.variables or "v" not in ds.variables: _log.warning("No velocity variables in %s", l2_file) return False - if "climb_id" not in ds.variables: - _log.warning("No climb_id variable in %s", l2_file) - return False - - time_l2 = ds.variables["time"][:] time_uv = np.ma.filled(ds.variables["time_uv"][:], np.nan) - climb_id = np.ma.filled(ds.variables["climb_id"][:], -1) u_vals = np.ma.filled(ds.variables["u"][:], np.nan) v_vals = np.ma.filled(ds.variables["v"][:], np.nan) @@ -711,30 +662,18 @@ def backfill_velocity( for i in range(n_uv): t_uv = time_uv[i] if np.isnan(t_uv): - continue # No valid time_uv for this cycle - - # Find the last point with this climb_id after time_uv - climb_mask = (climb_id == i) & (time_l2 >= t_uv) - if not climb_mask.any(): - _log.debug("No climb points after time_uv for cycle %d", i) continue - # Search window starts at end of climb - t_start = time_l2[climb_mask][-1] - - # Search window ends at next valid time_uv (or +10 min) - if i + 1 < n_uv and np.isfinite(time_uv[i + 1]): - t_end = time_uv[i + 1] - else: - t_end = t_start + 600 # 10 minutes max + # Search window: from time_uv to next time_uv (or +10 min) + t_end = ( + time_uv[i + 1] + if i + 1 < n_uv and np.isfinite(time_uv[i + 1]) + else t_uv + 600 + ) u_vel, v_vel = _find_velocity_in_window( - vel_times, vel_u, vel_v, t_start, t_end + vel_times, vel_u, vel_v, t_uv, t_end ) - if u_vel is None: - continue - - # Check if update is needed (missing or significantly different) if u_vel is None or v_vel is None: continue @@ -749,10 +688,10 @@ def backfill_velocity( ds.variables["v"][i] = v_vel file_updated = True if is_missing: - _log.info("Backfilled cycle %d: u=%.4f, v=%.4f", i, u_vel, v_vel) + _log.info("Backfilled group %d: u=%.4f, v=%.4f", i, u_vel, v_vel) else: _log.info( - "Updated cycle %d: u=%.4f->%.4f, v=%.4f->%.4f", + "Updated group %d: u=%.4f->%.4f, v=%.4f->%.4f", i, u_old, u_vel, diff --git a/src/glide/process_l2.py b/src/glide/process_l2.py index 0b9127d..490a0de 100644 --- a/src/glide/process_l2.py +++ b/src/glide/process_l2.py @@ -51,6 +51,89 @@ def _get_profile_indexes(ds: xr.Dataset) -> NDArray: return idxs[np.argsort(idxs[:, 0]), :] +def _interp_velocity_to_profiles( + ds_binned: xr.Dataset, + profile_time: list, + has_velocity: bool, + vel_time: NDArray | None, + vel_u: NDArray | None, + vel_v: NDArray | None, + u_attrs: dict, + v_attrs: dict, +) -> xr.Dataset: + """Interpolate depth-averaged velocity onto profile mid-point times. + + NaN velocity values propagate naturally: any profile whose bracketing + velocity entries include a NaN will receive NaN. Profiles outside the + time range of velocity reports also receive NaN (no extrapolation). + + Parameters + ---------- + ds_binned : xr.Dataset + Binned L3 dataset with profile_id dimension. + profile_time : list + Profile mid-point times in posix seconds. + has_velocity : bool + Whether velocity data exists in the L2 source. + vel_time, vel_u, vel_v : array or None + Velocity data; may contain NaN values which propagate to output. + u_attrs, v_attrs : dict + CF attributes for u and v variables. + """ + n_profiles = ds_binned.profile_id.size + profile_t = np.asarray(profile_time, dtype="f8") + + if ( + not has_velocity + or vel_time is None + or vel_u is None + or vel_v is None + or len(vel_time) < 1 + ): + _log.info("No valid velocity data for L3 interpolation") + ds_binned["u"] = (("profile_id",), np.full(n_profiles, np.nan), u_attrs) + ds_binned["v"] = (("profile_id",), np.full(n_profiles, np.nan), v_attrs) + return ds_binned + + # Interpolate using only the valid (finite) velocity points. + valid = np.isfinite(vel_u) & np.isfinite(vel_v) + if not valid.any(): + _log.info("All velocity values are NaN") + ds_binned["u"] = (("profile_id",), np.full(n_profiles, np.nan), u_attrs) + ds_binned["v"] = (("profile_id",), np.full(n_profiles, np.nan), v_attrs) + return ds_binned + + u_interp = np.interp( + profile_t, vel_time[valid], vel_u[valid], left=np.nan, right=np.nan + ) + v_interp = np.interp( + profile_t, vel_time[valid], vel_v[valid], left=np.nan, right=np.nan + ) + + # Propagate NaN: mask profiles whose bracketing velocity entries have NaN. + # For each profile time, find the surrounding entries in the full + # (including NaN) velocity array. If either neighbour is NaN, the + # interpolated value is set to NaN. + for i, t in enumerate(profile_t): + idx = np.searchsorted(vel_time, t) + left_nan = idx > 0 and not np.isfinite(vel_u[idx - 1]) + right_nan = idx < len(vel_time) and not np.isfinite(vel_u[idx]) + if left_nan or right_nan: + u_interp[i] = np.nan + v_interp[i] = np.nan + + n_valid = np.isfinite(u_interp).sum() + _log.info( + "Interpolated velocity onto %d profiles (%d valid)", + n_profiles, + n_valid, + ) + + ds_binned["u"] = (("profile_id",), u_interp, u_attrs) + ds_binned["v"] = (("profile_id",), v_interp, v_attrs) + return ds_binned + + # Public functions @@ -82,7 +165,29 @@ def bin_l2( # Dropping QC variables for now because it isn't clear how we should # treat them after binning. We may want to define new QC variables in the future. qc_variables = [v for v in ds.variables if "_qc" in str(v)] - drop_variables = qc_variables + ["dive_id", "climb_id", "state", "z"] + velocity_variables = [ + v + for v in ["u", "v", "lat_uv", "lon_uv", "time_uv"] + if v in ds.variables or v in ds.dims + ] + drop_variables = ( + qc_variables + ["dive_id", "climb_id", "state", "z"] + velocity_variables + ) + + # Extract velocity data before dropping, for later interpolation onto profiles. + # NaN velocity values are preserved so they propagate through interpolation. + has_velocity = "time_uv" in ds.dims and "u" in ds + if has_velocity: + vel_time = ds.time_uv.values.astype("f8") / 1e9 # posix seconds + vel_u = ds.u.values.astype("f8") + vel_v = ds.v.values.astype("f8") + # Only drop entries with invalid times; keep NaN u/v so they propagate. + valid_time = np.isfinite(vel_time) + vel_time = vel_time[valid_time] + vel_u = vel_u[valid_time] + vel_v = vel_v[valid_time] + u_attrs = ds.u.attrs.copy() + v_attrs = ds.v.attrs.copy() # Drop variables flagged with drop_from_l3 in the config if config is not None: @@ -171,4 +276,16 @@ def bin_l2( ds_binned = ds_binned.swap_dims({"profile": "profile_id"}).set_coords("profile_id") ds_binned = ds_binned.set_coords(["profile_time", "profile_lat", "profile_lon"]) + # Interpolate velocity onto profile mid-point times. + ds_binned = _interp_velocity_to_profiles( + ds_binned, + profile_time, + has_velocity, + vel_time if has_velocity else None, + vel_u if has_velocity else None, + vel_v if has_velocity else None, + u_attrs if has_velocity else {}, + v_attrs if has_velocity else {}, + ) + return ds_binned.transpose() diff --git a/tests/test_process_l1.py b/tests/test_process_l1.py index 14c0d44..20295ac 100644 --- a/tests/test_process_l1.py +++ b/tests/test_process_l1.py @@ -123,54 +123,60 @@ def test_assign_surface_state_no_flight_data() -> None: assert np.array_equal(result.state.values, state) -def test_find_dive_cycles() -> None: - """Test that dive cycles are correctly identified from state transitions.""" - # State sequence: surface -> dive -> climb -> surface -> dive -> climb -> surface - state = np.array( - [0, 0, 0, 1, 1, 1, 2, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 0], - dtype="b", - ) +def test_add_velocity_groups_by_velocity_reports() -> None: + """Test that add_velocity groups profiles by velocity report times.""" + from glide.config import load_config + + config = load_config() - cycles = pl1._find_dive_cycles(state) - - # Should find 2 dive cycles - assert len(cycles) == 2, f"Expected 2 cycles, got {len(cycles)}" - - # First cycle: indices 3-9 (dive+climb), surface 9-11 - cycle_start, cycle_end, surf_start, surf_end = cycles[0] - assert cycle_start == 3 - assert cycle_end == 9 - assert surf_start == 9 - assert surf_end == 11 - - # Second cycle: indices 11-15, surface 15-18 - cycle_start, cycle_end, surf_start, surf_end = cycles[1] - assert cycle_start == 11 - assert cycle_end == 15 - assert surf_start == 15 - assert surf_end == 18 - - -def test_find_dive_cycles_with_unknown_gaps() -> None: - """Test that unknown states at inflection points don't break cycles.""" - # State with unknown gap between dive and climb (at inflection) - # surface -> dive -> unknown (inflection) -> climb -> surface - state = np.array( - [0, 0, 1, 1, 1, -1, -1, 2, 2, 2, 0, 0], - dtype="b", + # Create L2-like dataset with 2 dive/climb cycles + # Times are in seconds, spaced to represent realistic dive durations + n = 100 + time = np.arange(n, dtype=float) * 100 # 100s spacing + pressure = np.zeros(n) + # Cycle 1: dive 10-30, climb 30-50 + pressure[10:30] = np.linspace(0, 100, 20) + pressure[30:50] = np.linspace(100, 0, 20) + # Cycle 2: dive 60-80, climb 80-95 + pressure[60:80] = np.linspace(0, 100, 20) + pressure[80:95] = np.linspace(100, 0, 15) + + dive_id = np.full(n, -1, dtype="i4") + climb_id = np.full(n, -1, dtype="i4") + dive_id[10:30] = 1 + climb_id[30:50] = 1 + dive_id[60:80] = 2 + climb_id[80:95] = 2 + + ds = xr.Dataset( + { + "pressure": ("time", pressure), + "dive_id": ("time", dive_id), + "climb_id": ("time", climb_id), + "lat": ("time", np.full(n, 45.0)), + "lon": ("time", np.full(n, -125.0)), + }, + coords={"time": time}, ) - cycles = pl1._find_dive_cycles(state) + # Flight data with velocity reports at surfacing times + # Times must be > 600s apart to be recognized as separate surfacings + flt_time = np.array([500.0, 5500.0, 9800.0]) + flt = xr.Dataset( + { + "m_present_time": ("i", flt_time), + "m_water_vx": ("i", [0.1, 0.2, 0.3]), + "m_water_vy": ("i", [-0.05, -0.1, -0.15]), + }, + ) - # Should find 1 cycle (not 2!) - assert len(cycles) == 1, f"Expected 1 cycle, got {len(cycles)}" + result = pl1.add_velocity(ds, config, flt=flt) - cycle_start, cycle_end, surf_start, surf_end = cycles[0] - # Cycle should span the entire dive+unknown+climb section - assert cycle_start == 2 - assert cycle_end == 10 - assert surf_start == 10 - assert surf_end == 12 + # Velocity at t=55 should capture cycle 1 (profiles 10-50) + # Velocity at t=98 should capture cycle 2 (profiles 60-95) + assert "u" in result + assert result.sizes["time_uv"] >= 2 + assert np.isfinite(result.u.values).sum() >= 2 def test_realtime_velocity_processing() -> None: