Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions src/icefabric/hydrofabric/subset_nhf.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ def load_columns(self, layer: str, columns: list[str]) -> pl.DataFrame:


def pl_to_gdf(pl_df: pl.DataFrame, crs: str = "EPSG:5070") -> gpd.GeoDataFrame:
"""Convert Polars DataFrame with WKB geometry to GeoDataFrame."""
"""Convert Polars DataFrame with WKB geometry to GeoDataFrame.

NHF geometries are ingested into Iceberg as WKB without reprojection, so
the caller is responsible for supplying the native CRS of the source
domain (see ``HydrofabricNamespace.crs``).
"""
df = pl_df.to_pandas()
df["geometry"] = gpd.GeoSeries.from_wkb(df["geometry"])
return gpd.GeoDataFrame(df, crs=crs)
Expand Down Expand Up @@ -205,6 +210,7 @@ def generate_subset_from_ids(
) -> dict[str, gpd.GeoDataFrame]:
"""Subset hydrofabric to a given set of flowpath IDs."""
logger.debug(f"Subsetting {len(flowpath_ids)} flowpaths")
crs = source.namespace.crs

# Wave 1: fp_id/div_id filtered (parallel)
with ThreadPoolExecutor(max_workers=6) as ex:
Expand Down Expand Up @@ -277,14 +283,14 @@ def generate_subset_from_ids(
# Write output
# ======================================================================
output = {
"flowpaths": pl_to_gdf(subset_fp),
"nexus": pl_to_gdf(subset_nex),
"divides": pl_to_gdf(subset_div),
"virtual_nexus": pl_to_gdf(subset_v_nex),
"virtual_flowpaths": pl_to_gdf(subset_v_fp),
"waterbodies": pl_to_gdf(subset_wb) if len(subset_wb) > 0 else subset_wb.to_pandas(),
"gages": pl_to_gdf(subset_gages) if len(subset_gages) > 0 else subset_gages.to_pandas(),
"lakes": pl_to_gdf(subset_lakes) if len(subset_lakes) > 0 else subset_lakes.to_pandas(),
"flowpaths": pl_to_gdf(subset_fp, crs=crs),
"nexus": pl_to_gdf(subset_nex, crs=crs),
"divides": pl_to_gdf(subset_div, crs=crs),
"virtual_nexus": pl_to_gdf(subset_v_nex, crs=crs),
"virtual_flowpaths": pl_to_gdf(subset_v_fp, crs=crs),
"waterbodies": pl_to_gdf(subset_wb, crs=crs) if len(subset_wb) > 0 else subset_wb.to_pandas(),
"gages": pl_to_gdf(subset_gages, crs=crs) if len(subset_gages) > 0 else subset_gages.to_pandas(),
"lakes": pl_to_gdf(subset_lakes, crs=crs) if len(subset_lakes) > 0 else subset_lakes.to_pandas(),
"reference_flowpaths": subset_ref_fp.to_pandas(),
"hydrolocations": subset_hydrolocations.to_pandas(),
}
Expand Down
14 changes: 14 additions & 0 deletions src/icefabric/schemas/hydrofabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,20 @@ def is_oconus_hf(self) -> bool:
HydrofabricNamespace.GREAT_LAKES_HF,
)

@property
def crs(self) -> str:
"""Return the EPSG CRS string for this namespace's native geometries.

NHF geometries are ingested without reprojection, so the CRS differs by
domain and must be re-applied when reading WKB back out of Iceberg.
"""
oconus_nhf_crs = {
HydrofabricNamespace.ALASKA_NHF: "EPSG:3338",
HydrofabricNamespace.HAWAII_NHF: "EPSG:32604",
HydrofabricNamespace.PUERTO_RICO_NHF: "EPSG:6566",
}
return oconus_nhf_crs.get(self, "EPSG:5070")

@classmethod
def _missing_(cls, value: object) -> HydrofabricNamespace | None:
"""Handle legacy string values."""
Expand Down
17 changes: 17 additions & 0 deletions tests/app/test_source_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,23 @@ def test_is_oconus_hf_for_hawaii(self):
"""Test is_oconus_hf returns False for HAWAII_HF namespace (it's OCONUS but not in the set)."""
assert HydrofabricNamespace.HAWAII_HF.is_oconus_hf is False

def test_crs_for_conus_nhf(self):
"""CONUS NHF is stored in EPSG:5070 (NAD83 / Conus Albers)."""
assert HydrofabricNamespace.NHF.crs == "EPSG:5070"
assert HydrofabricNamespace.CONUS_NHF.crs == "EPSG:5070"

def test_crs_for_alaska_nhf(self):
"""Alaska NHF is stored in EPSG:3338 (NAD83 / Alaska Albers)."""
assert HydrofabricNamespace.ALASKA_NHF.crs == "EPSG:3338"

def test_crs_for_hawaii_nhf(self):
"""Hawaii NHF is stored in EPSG:32604 (WGS 84 / UTM 4N)."""
assert HydrofabricNamespace.HAWAII_NHF.crs == "EPSG:32604"

def test_crs_for_puerto_rico_nhf(self):
"""Puerto Rico / USVI NHF is stored in EPSG:6566."""
assert HydrofabricNamespace.PUERTO_RICO_NHF.crs == "EPSG:6566"


class TestHydrofabricRouterSourceParameter:
"""Integration tests for the hydrofabric router source parameter."""
Expand Down
Loading