From b5bed9cada6dd4f91a46c3b6561851c953ef3749 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 2 Apr 2026 14:45:36 -0400 Subject: [PATCH 1/9] fix: update hydrofabric schemas for nhf v1.1.3 --- .../iceberg_tables/hydrofabric_update.py | 98 +++++++++++++++++-- 1 file changed, 88 insertions(+), 10 deletions(-) diff --git a/src/icefabric/schemas/iceberg_tables/hydrofabric_update.py b/src/icefabric/schemas/iceberg_tables/hydrofabric_update.py index ef67728..b41b929 100644 --- a/src/icefabric/schemas/iceberg_tables/hydrofabric_update.py +++ b/src/icefabric/schemas/iceberg_tables/hydrofabric_update.py @@ -6,7 +6,7 @@ import pyarrow as pa from pyiceberg.schema import Schema -from pyiceberg.types import BinaryType, BooleanType, DoubleType, FloatType, LongType, NestedField, StringType +from pyiceberg.types import BinaryType, DoubleType, FloatType, LongType, NestedField, StringType class Divides: @@ -548,6 +548,8 @@ class Flowpaths: Estimated depth associated with top width at maximum levee r_ml : float Hydraulic radius at maximum levee + fp_to_id : float + Downstream flowpath identifier geometry : binary Spatial Geometry (MULTILINESTRING format) - stored in WKB binary format @@ -592,6 +594,7 @@ def columns(cls) -> list[str]: "topwdth_ml", "y_ml", "r_ml", + "fp_to_id", "geometry", ] @@ -635,6 +638,7 @@ def schema(cls) -> Schema: "Top width at maximum levee", "Estimated depth associated with top width at maximum levee", "Hydraulic radius at maximum levee", + "Downstream flowpath identifier", "Spatial Geometry (MULTILINESTRING format) - stored in WKB binary format", ] return Schema( @@ -667,7 +671,8 @@ def schema(cls) -> Schema: NestedField(27, "topwdth_ml", DoubleType(), required=False, doc=desc[26]), NestedField(28, "y_ml", FloatType(), required=False, doc=desc[27]), NestedField(29, "r_ml", FloatType(), required=False, doc=desc[28]), - NestedField(30, "geometry", BinaryType(), required=False, doc=desc[29]), + NestedField(30, "fp_to_id", DoubleType(), required=False, doc=desc[29]), + NestedField(31, "geometry", BinaryType(), required=False, doc=desc[30]), identifier_field_ids=[1], ) @@ -712,6 +717,7 @@ def arrow_schema(cls) -> pa.Schema: pa.field("topwdth_ml", pa.float64(), nullable=True), pa.field("y_ml", pa.float32(), nullable=True), pa.field("r_ml", pa.float32(), nullable=True), + pa.field("fp_to_id", pa.float64(), nullable=True), pa.field("geometry", pa.binary(), nullable=True), ] ) @@ -808,6 +814,10 @@ class ReferenceFlowpaths: Virtual flowpath identifier div_id : int Associated divide identifier + mainstem_virtual_fp_id : int + Mainstem virtual flowpath identifier + segment_order : int + Segment order """ @classmethod @@ -825,6 +835,8 @@ def columns(cls) -> list[str]: "fp_id", "virtual_fp_id", "div_id", + "mainstem_virtual_fp_id", + "segment_order", ] @classmethod @@ -842,12 +854,16 @@ def schema(cls) -> Schema: "A flowpath ID from the flowpath table that was derived from the reference flowpath ID", "Virtual flowpath identifier", "Associated divide identifier", + "Mainstem virtual flowpath identifier", + "Segment order", ] return Schema( NestedField(1, "ref_fp_id", LongType(), required=True, doc=desc[0]), NestedField(2, "fp_id", LongType(), required=False, doc=desc[1]), NestedField(3, "virtual_fp_id", LongType(), required=False, doc=desc[2]), NestedField(4, "div_id", LongType(), required=False, doc=desc[3]), + NestedField(5, "mainstem_virtual_fp_id", LongType(), required=False, doc=desc[4]), + NestedField(6, "segment_order", LongType(), required=False, doc=desc[5]), identifier_field_ids=[1], ) @@ -867,6 +883,8 @@ def arrow_schema(cls) -> pa.Schema: pa.field("fp_id", pa.int64(), nullable=True), pa.field("virtual_fp_id", pa.int64(), nullable=True), pa.field("div_id", pa.int64(), nullable=True), + pa.field("mainstem_virtual_fp_id", pa.int64(), nullable=True), + pa.field("segment_order", pa.int64(), nullable=True), ] ) @@ -911,6 +929,14 @@ class Waterbodies: Dam length ifd : float Initial flood depth + div_id : float + Associated divide identifier + dn_nex_id : float + Downstream nexus identifier + dn_virtual_nex_id : float + Downstream virtual nexus identifier + virtual_fp_id : float + Virtual flowpath identifier geometry : binary Spatial Geometry (POINT format) - stored in WKB binary format """ @@ -943,6 +969,10 @@ def columns(cls) -> list[str]: "OrficeE", "Dam_Length", "ifd", + "div_id", + "dn_nex_id", + "dn_virtual_nex_id", + "virtual_fp_id", "geometry", ] @@ -974,6 +1004,10 @@ def schema(cls) -> Schema: "Orifice elevation", "Dam length", "Initial flood depth", + "Associated divide identifier", + "Downstream nexus identifier", + "Downstream virtual nexus identifier", + "Virtual flowpath identifier", "Spatial Geometry (POLYGON format) - stored in WKB binary format", ] return Schema( @@ -994,7 +1028,11 @@ def schema(cls) -> Schema: NestedField(15, "OrficeE", FloatType(), required=False, doc=desc[14]), NestedField(16, "Dam_Length", FloatType(), required=False, doc=desc[15]), NestedField(17, "ifd", FloatType(), required=False, doc=desc[16]), - NestedField(18, "geometry", BinaryType(), required=False, doc=desc[17]), + NestedField(18, "div_id", DoubleType(), required=False, doc=desc[17]), + NestedField(19, "dn_nex_id", DoubleType(), required=False, doc=desc[18]), + NestedField(20, "dn_virtual_nex_id", DoubleType(), required=False, doc=desc[19]), + NestedField(21, "virtual_fp_id", DoubleType(), required=False, doc=desc[20]), + NestedField(22, "geometry", BinaryType(), required=False, doc=desc[21]), identifier_field_ids=[1], ) @@ -1027,6 +1065,10 @@ def arrow_schema(cls) -> pa.Schema: pa.field("OrficeE", pa.float32(), nullable=True), pa.field("Dam_Length", pa.float32(), nullable=True), pa.field("ifd", pa.float32(), nullable=True), + pa.field("div_id", pa.float64(), nullable=True), + pa.field("dn_nex_id", pa.float64(), nullable=True), + pa.field("dn_virtual_nex_id", pa.float64(), nullable=True), + pa.field("virtual_fp_id", pa.float64(), nullable=True), pa.field("geometry", pa.binary(), nullable=True), ] ) @@ -1054,6 +1096,16 @@ class Gages: Flowpath Identifier virtual_fp_id : float Virtual Flowpath Identifier + div_id : float + Associated divide identifier + dn_nex_id : float + Downstream nexus identifier + dn_virtual_nex_id : float + Downstream virtual nexus identifier + mainstem_virtual_fp_id : float + Mainstem virtual flowpath identifier + segment_order : float + Segment order geometry : binary Spatial Geometry (POINT format) - stored in WKB binary format """ @@ -1077,6 +1129,11 @@ def columns(cls) -> list[str]: "method_fp_to_gage", "fp_id", "virtual_fp_id", + "div_id", + "dn_nex_id", + "dn_virtual_nex_id", + "mainstem_virtual_fp_id", + "segment_order", "geometry", ] @@ -1099,6 +1156,11 @@ def schema(cls) -> Schema: "Method used to associate flowpath to gage", "Flowpath Identifier", "Virtual Flowpath Identifier", + "Associated divide identifier", + "Downstream nexus identifier", + "Downstream virtual nexus identifier", + "Mainstem virtual flowpath identifier", + "Segment order", "Spatial Geometry (POINT format) - stored in WKB binary format", ] return Schema( @@ -1110,7 +1172,12 @@ def schema(cls) -> Schema: NestedField(6, "method_fp_to_gage", StringType(), required=False, doc=desc[5]), NestedField(7, "fp_id", DoubleType(), required=False, doc=desc[6]), NestedField(8, "virtual_fp_id", DoubleType(), required=False, doc=desc[7]), - NestedField(9, "geometry", BinaryType(), required=False, doc=desc[8]), + NestedField(9, "div_id", DoubleType(), required=False, doc=desc[8]), + NestedField(10, "dn_nex_id", DoubleType(), required=False, doc=desc[9]), + NestedField(11, "dn_virtual_nex_id", DoubleType(), required=False, doc=desc[10]), + NestedField(12, "mainstem_virtual_fp_id", DoubleType(), required=False, doc=desc[11]), + NestedField(13, "segment_order", DoubleType(), required=False, doc=desc[12]), + NestedField(14, "geometry", BinaryType(), required=False, doc=desc[13]), identifier_field_ids=[1], ) @@ -1134,6 +1201,11 @@ def arrow_schema(cls) -> pa.Schema: pa.field("method_fp_to_gage", pa.string(), nullable=True), pa.field("fp_id", pa.float64(), nullable=True), pa.field("virtual_fp_id", pa.float64(), nullable=True), + pa.field("div_id", pa.float64(), nullable=True), + pa.field("dn_nex_id", pa.float64(), nullable=True), + pa.field("dn_virtual_nex_id", pa.float64(), nullable=True), + pa.field("mainstem_virtual_fp_id", pa.float64(), nullable=True), + pa.field("segment_order", pa.float64(), nullable=True), pa.field("geometry", pa.binary(), nullable=True), ] ) @@ -1151,8 +1223,8 @@ class VirtualFlowpaths: Downstream virtual nexus identifier up_virtual_nex_id : float Upstream virtual nexus identifier - routing_segment : bool - Routing segment indicator + segment_order : int + Segment order length_km : float Flowpath length [in kilometers] area_sqkm : float @@ -1179,7 +1251,7 @@ def columns(cls) -> list[str]: "virtual_fp_id", "dn_virtual_nex_id", "up_virtual_nex_id", - "routing_segment", + "segment_order", "length_km", "area_sqkm", "percentage_area_contribution", @@ -1201,7 +1273,7 @@ def schema(cls) -> Schema: "Virtual flowpath identifier", "Downstream virtual nexus identifier", "Upstream virtual nexus identifier", - "Routing segment indicator", + "Segment order", "Flowpath length [in kilometers]", "Incremental areas of divide [in square kilometers]", "Percentage area contribution", @@ -1212,7 +1284,7 @@ def schema(cls) -> Schema: NestedField(1, "virtual_fp_id", LongType(), required=True, doc=desc[0]), NestedField(2, "dn_virtual_nex_id", LongType(), required=False, doc=desc[1]), NestedField(3, "up_virtual_nex_id", DoubleType(), required=False, doc=desc[2]), - NestedField(4, "routing_segment", BooleanType(), required=False, doc=desc[3]), + NestedField(4, "segment_order", LongType(), required=False, doc=desc[3]), NestedField(5, "length_km", DoubleType(), required=False, doc=desc[4]), NestedField(6, "area_sqkm", DoubleType(), required=False, doc=desc[5]), NestedField(7, "percentage_area_contribution", DoubleType(), required=False, doc=desc[6]), @@ -1236,7 +1308,7 @@ def arrow_schema(cls) -> pa.Schema: pa.field("virtual_fp_id", pa.int64(), nullable=False), pa.field("dn_virtual_nex_id", pa.int64(), nullable=True), pa.field("up_virtual_nex_id", pa.float64(), nullable=True), - pa.field("routing_segment", pa.bool_(), nullable=True), + pa.field("segment_order", pa.int64(), nullable=True), pa.field("length_km", pa.float64(), nullable=True), pa.field("area_sqkm", pa.float64(), nullable=True), pa.field("percentage_area_contribution", pa.float64(), nullable=True), @@ -1333,6 +1405,8 @@ class Hydrolocations: Hydrolocations identifier dn_nex_id : int Downstream nexus identifier + dn_virtual_nex_id : float + Downstream virtual nexus identifier """ @classmethod @@ -1348,6 +1422,7 @@ def columns(cls) -> list[str]: return [ "hy_id", "dn_nex_id", + "dn_virtual_nex_id", ] @classmethod @@ -1363,10 +1438,12 @@ def schema(cls) -> Schema: desc = [ "Hydrolocations identifier", "Downstream nexus identifier", + "Downstream virtual nexus identifier", ] return Schema( NestedField(1, "hy_id", LongType(), required=True, doc=desc[0]), NestedField(2, "dn_nex_id", LongType(), required=False, doc=desc[1]), + NestedField(3, "dn_virtual_nex_id", DoubleType(), required=False, doc=desc[2]), identifier_field_ids=[1], ) @@ -1384,5 +1461,6 @@ def arrow_schema(cls) -> pa.Schema: [ pa.field("hy_id", pa.int64(), nullable=False), pa.field("dn_nex_id", pa.int64(), nullable=True), + pa.field("dn_virtual_nex_id", pa.float64(), nullable=True), ] ) From 43a2b328c88f13ade3d7756dc758f8f2f7e13803 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 2 Apr 2026 18:02:48 -0400 Subject: [PATCH 2/9] feat: add overwrite option to build_nhf.py This commit modifies build_nhf.py so that it overwrites instead of purges existing tables. This allows rollbacks/keeps previous versions in the catalog --- tools/iceberg/build_nhf.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/tools/iceberg/build_nhf.py b/tools/iceberg/build_nhf.py index 194a1af..06cb547 100644 --- a/tools/iceberg/build_nhf.py +++ b/tools/iceberg/build_nhf.py @@ -62,7 +62,7 @@ def tear_down_nhf(catalog_type: str): print(f"Snapshot table {snapshot_table_identifier} does not exist. Skipping purge.") -def build_nhf(catalog_type: str, file_dir: str): +def build_nhf(catalog_type: str, file_dir: str, overwrite_existing: bool = False): """ Builds the hydrofabric Iceberg tables @@ -72,6 +72,8 @@ def build_nhf(catalog_type: str, file_dir: str): the type of catalog. sql is local, glue is production file_dir : str where the files are located + overwrite_existing : bool + if True, overwrite existing populated tables (preserves old snapshots for rollback) """ catalog = load_catalog(catalog_type) namespace = "nhf" @@ -90,9 +92,19 @@ def build_nhf(catalog_type: str, file_dir: str): if catalog.table_exists(f"{namespace}.{layer}"): current_snapshot = catalog.load_table(f"{namespace}.{layer}").current_snapshot() if current_snapshot is not None: - print(f"Table {layer} already exists, and is populated. Skipping build") - snapshots[layer] = current_snapshot.snapshot_id - build_table = False + if overwrite_existing: + print(f"Table {layer} already exists. Overwriting (old snapshots preserved)...") + iceberg_table = catalog.load_table(f"{namespace}.{layer}") + with iceberg_table.update_schema() as update: + update.union_by_name(schema.arrow_schema()) + iceberg_table.overwrite(table) + snapshots[layer] = iceberg_table.current_snapshot().snapshot_id + update_snapshots = True + build_table = False + else: + print(f"Table {layer} already exists, and is populated. Skipping build") + snapshots[layer] = current_snapshot.snapshot_id + build_table = False else: print(f"Table {layer} has no current snapshot (must be empty).") @@ -170,9 +182,15 @@ def build_nhf(catalog_type: str, file_dir: str): default=False, help="Purges old catalog tables before building new ones - use with caution! Only use if the schemas have changed.", ) + parser.add_argument( + "--overwrite", + action="store_true", + default=False, + help="Overwrite existing populated tables. Preserves old snapshots for rollback via Iceberg time-travel.", + ) args = parser.parse_args() if args.delete_old: tear_down_nhf(catalog_type=args.catalog) - build_nhf(catalog_type=args.catalog, file_dir=args.files) + build_nhf(catalog_type=args.catalog, file_dir=args.files, overwrite_existing=args.overwrite) From 7105ab144f343f7eb1698d18d415c4a2e52fe781 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 2 Apr 2026 20:34:30 -0400 Subject: [PATCH 3/9] feat: modify api and tools to handle nhf oconus domains --- app/main.py | 3 + src/icefabric/hydrofabric/subset_nhf.py | 34 +++++++---- src/icefabric/schemas/hydrofabric.py | 23 ++++++-- tests/app/test_source_parameter.py | 74 +++++++++++++----------- tools/hydrofabric/nhf_gpkg_to_parquet.py | 5 ++ tools/iceberg/build_nhf.py | 41 +++++++++---- 6 files changed, 120 insertions(+), 60 deletions(-) diff --git a/app/main.py b/app/main.py index c421504..1767f9c 100644 --- a/app/main.py +++ b/app/main.py @@ -90,6 +90,9 @@ help="List of namespaces to include in local cache. Optionally specified as :", default=[ "conus_nhf", + "ak_nhf", + "hi_nhf", + "prvi_nhf", "conus_hf", "prvi_hf", "hi_hf", diff --git a/src/icefabric/hydrofabric/subset_nhf.py b/src/icefabric/hydrofabric/subset_nhf.py index de6fdc8..ee77a1c 100644 --- a/src/icefabric/hydrofabric/subset_nhf.py +++ b/src/icefabric/hydrofabric/subset_nhf.py @@ -102,27 +102,39 @@ def __init__( if parquet_dir is None and catalog is None: raise ValueError("Must provide either parquet_dir or catalog") - def _get_lazy_frame(self, layer: str) -> pl.LazyFrame: - """Get a LazyFrame for a given layer.""" + def _get_lazy_frame(self, layer: str) -> pl.LazyFrame | None: + """Get a LazyFrame for a given layer, or None if the layer doesn't exist.""" if self.catalog is not None: - return self.catalog.load_table(f"{self.namespace.value}.{layer}").to_polars() + table_id = f"{self.namespace.value}.{layer}" + if not self.catalog.table_exists(table_id): + return None + return self.catalog.load_table(table_id).to_polars() else: path = self.parquet_dir / f"{layer}.parquet" if not path.exists(): - raise FileNotFoundError(f"Parquet file not found: {path}") + return None return pl.scan_parquet(path) def load_filtered(self, layer: str, col: str, ids: set[int]) -> pl.DataFrame: """Load a layer filtered by a set of IDs.""" - return self._get_lazy_frame(layer).filter(pl.col(col).is_in(ids)).collect() + lf = self._get_lazy_frame(layer) + if lf is None: + return pl.DataFrame() + return lf.filter(pl.col(col).is_in(ids)).collect() def load_filtered_eq(self, layer: str, col: str, value: str) -> pl.DataFrame: """Load a layer filtered by string equality.""" - return self._get_lazy_frame(layer).filter(pl.col(col) == value).collect() + lf = self._get_lazy_frame(layer) + if lf is None: + return pl.DataFrame() + return lf.filter(pl.col(col) == value).collect() def load_columns(self, layer: str, columns: list[str]) -> pl.DataFrame: """Load specific columns from a layer.""" - return self._get_lazy_frame(layer).select(columns).collect() + lf = self._get_lazy_frame(layer) + if lf is None: + return pl.DataFrame() + return lf.select(columns).collect() # ============================================================================= @@ -200,7 +212,9 @@ def generate_subset_from_ids( + subset_fp.filter(pl.col("dn_nex_id").is_not_null())["dn_nex_id"].cast(pl.Int64).to_list() ) all_v_fp_ids = set(subset_ref_fp["virtual_fp_id"].to_list()) - all_hy_ids = set(subset_wb["hy_id"].to_list() + subset_gages["hy_id"].to_list()) + wb_hy_ids = subset_wb["hy_id"].to_list() if "hy_id" in subset_wb.columns else [] + gage_hy_ids = subset_gages["hy_id"].to_list() if "hy_id" in subset_gages.columns else [] + all_hy_ids = set(wb_hy_ids + gage_hy_ids) # Wave 2: nex_id/virtual_fp_id filtered (parallel) with ThreadPoolExecutor(max_workers=2) as ex: @@ -251,8 +265,8 @@ def generate_subset_from_ids( "divides": pl_to_gdf(subset_div), "virtual_nexus": pl_to_gdf(subset_v_nex), "virtual_flowpaths": pl_to_gdf(subset_v_fp), - "waterbodies": pl_to_gdf(subset_wb), - "gages": pl_to_gdf(subset_gages), + "waterbodies": pl_to_gdf(subset_wb) if len(subset_wb) > 0 else subset_wb.to_pandas(), + "gages": pl_to_gdf(subset_gages) if len(subset_gages) > 0 else subset_gages.to_pandas(), "reference_flowpaths": subset_ref_fp.to_pandas(), "hydrolocations": subset_hydrolocations.to_pandas(), } diff --git a/src/icefabric/schemas/hydrofabric.py b/src/icefabric/schemas/hydrofabric.py index 3f16009..981c556 100644 --- a/src/icefabric/schemas/hydrofabric.py +++ b/src/icefabric/schemas/hydrofabric.py @@ -144,6 +144,9 @@ class HydrofabricNamespace(str, Enum): NHF = "nhf" CONUS_NHF = "conus_nhf" + ALASKA_NHF = "ak_nhf" + HAWAII_NHF = "hi_nhf" + PUERTO_RICO_NHF = "prvi_nhf" CONUS_HF = "conus_hf" ALASKA_HF = "ak_hf" HAWAII_HF = "hi_hf" @@ -164,7 +167,13 @@ def __str__(self) -> str: @property def is_nhf(self) -> bool: """Return True if this namespace is NHF data.""" - return self in (HydrofabricNamespace.NHF, HydrofabricNamespace.CONUS_NHF) + return self in ( + HydrofabricNamespace.NHF, + HydrofabricNamespace.CONUS_NHF, + HydrofabricNamespace.ALASKA_NHF, + HydrofabricNamespace.HAWAII_NHF, + HydrofabricNamespace.PUERTO_RICO_NHF, + ) @property def is_oconus_hf(self) -> bool: @@ -183,6 +192,9 @@ def _missing_(cls, value: object) -> HydrofabricNamespace | None: aliases = { "nhf": cls.NHF, "conus_nhf": cls.CONUS_NHF, + "ak_nhf": cls.ALASKA_NHF, + "hi_nhf": cls.HAWAII_NHF, + "prvi_nhf": cls.PUERTO_RICO_NHF, "conus_hf": cls.CONUS_HF, "ak_hf": cls.ALASKA_HF, "hi_hf": cls.HAWAII_HF, @@ -286,11 +298,11 @@ def resolve( ] = { (GeographicDomain.CONUS, HydrofabricSource.NHF): cls.CONUS_NHF, (GeographicDomain.CONUS, HydrofabricSource.HF): cls.CONUS_HF, - (GeographicDomain.ALASKA, HydrofabricSource.NHF): None, + (GeographicDomain.ALASKA, HydrofabricSource.NHF): cls.ALASKA_NHF, (GeographicDomain.ALASKA, HydrofabricSource.HF): cls.ALASKA_HF, - (GeographicDomain.HAWAII, HydrofabricSource.NHF): None, + (GeographicDomain.HAWAII, HydrofabricSource.NHF): cls.HAWAII_NHF, (GeographicDomain.HAWAII, HydrofabricSource.HF): cls.HAWAII_HF, - (GeographicDomain.PUERTO_RICO, HydrofabricSource.NHF): None, + (GeographicDomain.PUERTO_RICO, HydrofabricSource.NHF): cls.PUERTO_RICO_NHF, (GeographicDomain.PUERTO_RICO, HydrofabricSource.HF): cls.PUERTO_RICO_HF, (GeographicDomain.GREAT_LAKES, HydrofabricSource.NHF): None, (GeographicDomain.GREAT_LAKES, HydrofabricSource.HF): cls.GREAT_LAKES_HF, @@ -300,8 +312,7 @@ def resolve( if namespace is None: raise NotImplementedError( - f"Domain '{domain.value}' is not currently available for source '{source.value}'. " # type: ignore[union-attr] - f"Only 'CONUS' is available for the National Hydrofabric (nhf) at this time." + f"Domain '{domain.value}' is not currently available for source '{source.value}'." # type: ignore[union-attr] ) return namespace diff --git a/tests/app/test_source_parameter.py b/tests/app/test_source_parameter.py index a98ce83..0a6380c 100644 --- a/tests/app/test_source_parameter.py +++ b/tests/app/test_source_parameter.py @@ -140,34 +140,29 @@ def test_great_lakes_hf(self): assert namespace == "gl_hf" assert namespace.is_nhf is False - # 501 Not Implemented tests - non-CONUS with NHF - def test_alaska_nhf_not_implemented(self): - """Test Alaska + NHF raises NotImplementedError.""" - with pytest.raises(NotImplementedError) as exc_info: - HydrofabricNamespace.resolve(GeographicDomain.ALASKA, HydrofabricSource.NHF) - assert "Alaska" in str(exc_info.value) - assert "CONUS" in str(exc_info.value) - - def test_hawaii_nhf_not_implemented(self): - """Test Hawaii + NHF raises NotImplementedError.""" - with pytest.raises(NotImplementedError) as exc_info: - HydrofabricNamespace.resolve(GeographicDomain.HAWAII, HydrofabricSource.NHF) - assert "Hawaii" in str(exc_info.value) - assert "CONUS" in str(exc_info.value) - - def test_puerto_rico_nhf_not_implemented(self): - """Test Puerto_Rico + NHF raises NotImplementedError.""" - with pytest.raises(NotImplementedError) as exc_info: - HydrofabricNamespace.resolve(GeographicDomain.PUERTO_RICO, HydrofabricSource.NHF) - assert "Puerto_Rico" in str(exc_info.value) - assert "CONUS" in str(exc_info.value) + # Non-CONUS NHF domain tests + def test_alaska_nhf(self): + """Test Alaska + NHF returns ak_nhf namespace.""" + namespace = HydrofabricNamespace.resolve(GeographicDomain.ALASKA, HydrofabricSource.NHF) + assert namespace == "ak_nhf" + assert namespace.is_nhf is True + + def test_hawaii_nhf(self): + """Test Hawaii + NHF returns hi_nhf namespace.""" + namespace = HydrofabricNamespace.resolve(GeographicDomain.HAWAII, HydrofabricSource.NHF) + assert namespace == "hi_nhf" + assert namespace.is_nhf is True + + def test_puerto_rico_nhf(self): + """Test Puerto_Rico + NHF returns prvi_nhf namespace.""" + namespace = HydrofabricNamespace.resolve(GeographicDomain.PUERTO_RICO, HydrofabricSource.NHF) + assert namespace == "prvi_nhf" + assert namespace.is_nhf is True def test_great_lakes_nhf_not_implemented(self): """Test Great_Lakes + NHF raises NotImplementedError.""" - with pytest.raises(NotImplementedError) as exc_info: + with pytest.raises(NotImplementedError): HydrofabricNamespace.resolve(GeographicDomain.GREAT_LAKES, HydrofabricSource.NHF) - assert "Great_Lakes" in str(exc_info.value) - assert "CONUS" in str(exc_info.value) # String input tests def test_string_domain_conus(self): @@ -206,6 +201,18 @@ def test_is_nhf_for_conus_nhf(self): """Test is_nhf returns True for CONUS_NHF namespace.""" assert HydrofabricNamespace.CONUS_NHF.is_nhf is True + def test_is_nhf_for_alaska_nhf(self): + """Test is_nhf returns True for ALASKA_NHF namespace.""" + assert HydrofabricNamespace.ALASKA_NHF.is_nhf is True + + def test_is_nhf_for_hawaii_nhf(self): + """Test is_nhf returns True for HAWAII_NHF namespace.""" + assert HydrofabricNamespace.HAWAII_NHF.is_nhf is True + + def test_is_nhf_for_puerto_rico_nhf(self): + """Test is_nhf returns True for PUERTO_RICO_NHF namespace.""" + assert HydrofabricNamespace.PUERTO_RICO_NHF.is_nhf is True + def test_is_nhf_for_conus_hf(self): """Test is_nhf returns False for CONUS_HF namespace.""" assert HydrofabricNamespace.CONUS_HF.is_nhf is False @@ -247,15 +254,14 @@ def test_hydrofabric_new_api_hawaii_hf(self, client, watershed_bound_id_good: st ) assert response.status_code == 200 - def test_hydrofabric_non_conus_nhf_returns_501(self, client): - """Test hydrofabric endpoint returns 501 for non-CONUS domains with NHF.""" + def test_hydrofabric_great_lakes_nhf_returns_501(self, client): + """Test hydrofabric endpoint returns 501 for Great Lakes with NHF.""" response = client.get( - "/v1/hydrofabric/test-id/gpkg?id_type=flowpath_id&source=nhf&domain=Hawaii&layers=divides" + "/v1/hydrofabric/test-id/gpkg?id_type=flowpath_id&source=nhf&domain=Great_Lakes&layers=divides" ) assert response.status_code == 501 data = response.json() assert data["detail"]["error"] == "domain_not_available" - assert "CONUS" in data["detail"]["available_domains"] def test_hydrofabric_source_without_domain_returns_400(self, client): """Test hydrofabric endpoint returns 400 when source provided without domain.""" @@ -280,9 +286,9 @@ def test_hydrofabric_geographic_domain_without_source_succeeds( class TestModuleRouterSourceParameter: """Integration tests for the NWM module routers source parameter.""" - def test_module_sft_non_conus_nhf_returns_501(self, client): - """Test SFT endpoint returns 501 for non-CONUS domains with NHF.""" - response = client.get("/v1/modules/sft/?identifier=01010000&source=nhf&domain=Alaska") + def test_module_sft_great_lakes_nhf_returns_501(self, client): + """Test SFT endpoint returns 501 for Great Lakes with NHF.""" + response = client.get("/v1/modules/sft/?identifier=01010000&source=nhf&domain=Great_Lakes") assert response.status_code == 501 data = response.json() assert data["detail"]["error"] == "domain_not_available" @@ -292,10 +298,10 @@ def test_module_sft_source_without_domain_returns_400(self, client): response = client.get("/v1/modules/sft/?identifier=01010000&source=nhf") assert response.status_code == 400 - def test_module_parameter_metadata_non_conus_nhf_returns_501(self, client): - """Test parameter_metadata endpoint returns 501 for non-CONUS domains with NHF.""" + def test_module_parameter_metadata_great_lakes_nhf_returns_501(self, client): + """Test parameter_metadata endpoint returns 501 for Great Lakes with NHF.""" response = client.get( - "/v1/modules/parameter_metadata/?modules=SFT&source=nhf&domain=Alaska&gage_id=01010000" + "/v1/modules/parameter_metadata/?modules=SFT&source=nhf&domain=Great_Lakes&gage_id=01010000" ) assert response.status_code == 501 data = response.json() diff --git a/tools/hydrofabric/nhf_gpkg_to_parquet.py b/tools/hydrofabric/nhf_gpkg_to_parquet.py index a1eb851..fcfbf7e 100644 --- a/tools/hydrofabric/nhf_gpkg_to_parquet.py +++ b/tools/hydrofabric/nhf_gpkg_to_parquet.py @@ -48,6 +48,11 @@ def nhf_gpkg_to_parquet(input_file: Path, output_folder: Path) -> None: # Drop the GeoDataFrame's geometry reference gdf = pd.DataFrame(gdf) + # Add missing nullable columns as null (handles domain differences) + for col in schema.columns(): + if col not in gdf.columns: + gdf[col] = None + # Create PyArrow table with schema validation table = pa.Table.from_pandas(gdf[schema.columns()], schema=schema.arrow_schema()) diff --git a/tools/iceberg/build_nhf.py b/tools/iceberg/build_nhf.py index 06cb547..24c580b 100644 --- a/tools/iceberg/build_nhf.py +++ b/tools/iceberg/build_nhf.py @@ -31,7 +31,15 @@ warnings.filterwarnings("ignore", category=ResourceWarning) -def tear_down_nhf(catalog_type: str): +DOMAIN_TO_NAMESPACE = { + "conus": "nhf", + "ak": "ak_nhf", + "hi": "hi_nhf", + "prvi": "prvi_nhf", +} + + +def tear_down_nhf(catalog_type: str, domain: str = "conus"): """ Tears down the hydrofabric Iceberg tables @@ -39,11 +47,13 @@ def tear_down_nhf(catalog_type: str): ---------- catalog_type : str the type of catalog. sql is local, glue is production + domain : str + the NHF domain (conus, ak, hi, prvi) """ catalog = load_catalog(catalog_type) - namespace = "nhf" + namespace = DOMAIN_TO_NAMESPACE[domain] - print("Tearing down existing NHF tables...") + print(f"Tearing down existing NHF tables in {namespace}...") for layer in nhf_layers.keys(): table_identifier = f"{namespace}.{layer}" if catalog.table_exists(table_identifier): @@ -53,7 +63,7 @@ def tear_down_nhf(catalog_type: str): print(f"NHF layer table {table_identifier} does not exist. Skipping purge.") print("Tearing down existing NHF snapshot table...") - snapshot_namespace = "nhf_snapshots" + snapshot_namespace = f"{namespace}_snapshots" snapshot_table_identifier = f"{snapshot_namespace}.id" if catalog.table_exists(snapshot_table_identifier): catalog.purge_table(snapshot_table_identifier) @@ -62,7 +72,7 @@ def tear_down_nhf(catalog_type: str): print(f"Snapshot table {snapshot_table_identifier} does not exist. Skipping purge.") -def build_nhf(catalog_type: str, file_dir: str, overwrite_existing: bool = False): +def build_nhf(catalog_type: str, file_dir: str, domain: str = "conus", overwrite_existing: bool = False): """ Builds the hydrofabric Iceberg tables @@ -72,11 +82,13 @@ def build_nhf(catalog_type: str, file_dir: str, overwrite_existing: bool = False the type of catalog. sql is local, glue is production file_dir : str where the files are located + domain : str + the NHF domain (conus, ak, hi, prvi) overwrite_existing : bool if True, overwrite existing populated tables (preserves old snapshots for rollback) """ catalog = load_catalog(catalog_type) - namespace = "nhf" + namespace = DOMAIN_TO_NAMESPACE[domain] catalog.create_namespace_if_not_exists(namespace) snapshots = {} @@ -139,7 +151,7 @@ def build_nhf(catalog_type: str, file_dir: str, overwrite_existing: bool = False snapshots[layer] = current_snapshot.snapshot_id if update_snapshots: - snapshot_namespace = "nhf_snapshots" + snapshot_namespace = f"{namespace}_snapshots" snapshot_table = f"{snapshot_namespace}.id" catalog.create_namespace_if_not_exists(snapshot_namespace) if catalog.table_exists(snapshot_table): @@ -180,7 +192,7 @@ def build_nhf(catalog_type: str, file_dir: str, overwrite_existing: bool = False "--delete-old", action="store_true", default=False, - help="Purges old catalog tables before building new ones - use with caution! Only use if the schemas have changed.", + help="Purges old catalog tables before building new ones - use with caution! This permanently deletes data with no rollback. Prefer --overwrite instead.", ) parser.add_argument( "--overwrite", @@ -188,9 +200,18 @@ def build_nhf(catalog_type: str, file_dir: str, overwrite_existing: bool = False default=False, help="Overwrite existing populated tables. Preserves old snapshots for rollback via Iceberg time-travel.", ) + parser.add_argument( + "--domain", + type=str, + default="conus", + choices=["conus", "ak", "hi", "prvi"], + help="The NHF domain (default: conus). Determines the namespace (e.g., ak -> ak_nhf).", + ) args = parser.parse_args() if args.delete_old: - tear_down_nhf(catalog_type=args.catalog) - build_nhf(catalog_type=args.catalog, file_dir=args.files, overwrite_existing=args.overwrite) + tear_down_nhf(catalog_type=args.catalog, domain=args.domain) + build_nhf( + catalog_type=args.catalog, file_dir=args.files, domain=args.domain, overwrite_existing=args.overwrite + ) From 8f1c7fec1a8d001386ea77de036d32c2b29cfeb4 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 2 Apr 2026 21:03:38 -0400 Subject: [PATCH 4/9] fix: add deploy env arg to build_nhf.py Previously build_nhf.py always assumed it would add NHF tables to the test environment. This makes it possible to use build_nhf.py to update OE nhf tables --- tools/iceberg/build_nhf.py | 68 ++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/tools/iceberg/build_nhf.py b/tools/iceberg/build_nhf.py index 24c580b..de1fd1a 100644 --- a/tools/iceberg/build_nhf.py +++ b/tools/iceberg/build_nhf.py @@ -15,21 +15,28 @@ from icefabric.schemas.iceberg_tables import nhf_layers from icefabric.schemas.iceberg_tables.nhf_snapshots import NHFSnapshot -# Loading credentials, setting path to save outputs -load_creds() -with open(os.environ["PYICEBERG_HOME"]) as f: - CONFIG = yaml.safe_load(f) -WAREHOUSE = Path(CONFIG["catalog"]["sql"]["warehouse"].replace("file://", "")) -WAREHOUSE.mkdir(parents=True, exist_ok=True) - -LOCATION = { - "glue": "s3://edfs-data/icefabric_catalog", - "sql": CONFIG["catalog"]["sql"]["warehouse"], -} - # Suppress threading cleanup warnings warnings.filterwarnings("ignore", category=ResourceWarning) +S3_BUCKETS = { + "test": "edfs-data", + "prod": "iceberg-data-oe", +} + + +def _init(deploy_env: str = "test"): + """Load credentials and resolve catalog locations.""" + load_creds(deploy_env) + with open(os.environ["PYICEBERG_HOME"]) as f: + config = yaml.safe_load(f) + warehouse = Path(config["catalog"]["sql"]["warehouse"].replace("file://", "")) + warehouse.mkdir(parents=True, exist_ok=True) + s3_bucket = S3_BUCKETS.get(deploy_env, S3_BUCKETS["test"]) + return { + "glue": f"s3://{s3_bucket}/icefabric_catalog", + "sql": config["catalog"]["sql"]["warehouse"], + } + DOMAIN_TO_NAMESPACE = { "conus": "nhf", @@ -39,7 +46,7 @@ } -def tear_down_nhf(catalog_type: str, domain: str = "conus"): +def tear_down_nhf(catalog_type: str, domain: str = "conus", deploy_env: str = "test"): """ Tears down the hydrofabric Iceberg tables @@ -49,7 +56,10 @@ def tear_down_nhf(catalog_type: str, domain: str = "conus"): the type of catalog. sql is local, glue is production domain : str the NHF domain (conus, ak, hi, prvi) + deploy_env : str + the deploy environment (test or prod) """ + _init(deploy_env) catalog = load_catalog(catalog_type) namespace = DOMAIN_TO_NAMESPACE[domain] @@ -72,7 +82,13 @@ def tear_down_nhf(catalog_type: str, domain: str = "conus"): print(f"Snapshot table {snapshot_table_identifier} does not exist. Skipping purge.") -def build_nhf(catalog_type: str, file_dir: str, domain: str = "conus", overwrite_existing: bool = False): +def build_nhf( + catalog_type: str, + file_dir: str, + domain: str = "conus", + overwrite_existing: bool = False, + deploy_env: str = "test", +): """ Builds the hydrofabric Iceberg tables @@ -86,7 +102,10 @@ def build_nhf(catalog_type: str, file_dir: str, domain: str = "conus", overwrite the NHF domain (conus, ak, hi, prvi) overwrite_existing : bool if True, overwrite existing populated tables (preserves old snapshots for rollback) + deploy_env : str + the deploy environment (test or prod) """ + location = _init(deploy_env) catalog = load_catalog(catalog_type) namespace = DOMAIN_TO_NAMESPACE[domain] catalog.create_namespace_if_not_exists(namespace) @@ -125,7 +144,7 @@ def build_nhf(catalog_type: str, file_dir: str, domain: str = "conus", overwrite iceberg_table = catalog.create_table_if_not_exists( f"{namespace}.{layer}", schema=schema.schema(), - location=f"{LOCATION[catalog_type]}/{namespace.lower()}/{layer}", + location=f"{location[catalog_type]}/{namespace.lower()}/{layer}", ) # Bucket partitioning on the schema ID field @@ -156,11 +175,13 @@ def build_nhf(catalog_type: str, file_dir: str, domain: str = "conus", overwrite catalog.create_namespace_if_not_exists(snapshot_namespace) if catalog.table_exists(snapshot_table): tbl = catalog.load_table(snapshot_table) + with tbl.update_schema() as update: + update.union_by_name(NHFSnapshot.arrow_schema()) else: tbl = catalog.create_table( snapshot_table, schema=NHFSnapshot.schema(), - location=f"{LOCATION[catalog_type]}/{snapshot_namespace}", + location=f"{location[catalog_type]}/{snapshot_namespace}", ) df = pa.Table.from_pylist([snapshots], schema=NHFSnapshot.arrow_schema()) tbl.append(df) @@ -207,11 +228,22 @@ def build_nhf(catalog_type: str, file_dir: str, domain: str = "conus", overwrite choices=["conus", "ak", "hi", "prvi"], help="The NHF domain (default: conus). Determines the namespace (e.g., ak -> ak_nhf).", ) + parser.add_argument( + "--deploy-env", + type=str, + default="test", + choices=["test", "prod"], + help="Deploy environment (default: test). Controls AWS credentials and S3 bucket location.", + ) args = parser.parse_args() if args.delete_old: - tear_down_nhf(catalog_type=args.catalog, domain=args.domain) + tear_down_nhf(catalog_type=args.catalog, domain=args.domain, deploy_env=args.deploy_env) build_nhf( - catalog_type=args.catalog, file_dir=args.files, domain=args.domain, overwrite_existing=args.overwrite + catalog_type=args.catalog, + file_dir=args.files, + domain=args.domain, + overwrite_existing=args.overwrite, + deploy_env=args.deploy_env, ) From e9352d7ac7eb7f8a11eb3c3372645aa5b1ca4203 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 2 Apr 2026 21:09:59 -0400 Subject: [PATCH 5/9] feat: add NHF lakes layer schema and update nhf_layers/snapshots --- .../schemas/iceberg_tables/__init__.py | 2 + .../iceberg_tables/hydrofabric_update.py | 188 ++++++++++++++++++ .../schemas/iceberg_tables/nhf_snapshots.py | 3 + 3 files changed, 193 insertions(+) diff --git a/src/icefabric/schemas/iceberg_tables/__init__.py b/src/icefabric/schemas/iceberg_tables/__init__.py index 56cca4f..4c0e233 100644 --- a/src/icefabric/schemas/iceberg_tables/__init__.py +++ b/src/icefabric/schemas/iceberg_tables/__init__.py @@ -4,6 +4,7 @@ Gages, Hydrolocations, Nexus, + NHFLakes, ReferenceFlowpaths, VirtualFlowpaths, VirtualNexus, @@ -20,4 +21,5 @@ "virtual_flowpaths": VirtualFlowpaths, "virtual_nexus": VirtualNexus, "hydrolocations": Hydrolocations, + "lakes": NHFLakes, } diff --git a/src/icefabric/schemas/iceberg_tables/hydrofabric_update.py b/src/icefabric/schemas/iceberg_tables/hydrofabric_update.py index b41b929..178cc05 100644 --- a/src/icefabric/schemas/iceberg_tables/hydrofabric_update.py +++ b/src/icefabric/schemas/iceberg_tables/hydrofabric_update.py @@ -1395,6 +1395,194 @@ def arrow_schema(cls) -> pa.Schema: ) +class NHFLakes: + """ + The schema for the NHF lakes table + + Attributes + ---------- + nhf_lake_id : int + Unique NHF lake identifier + ref_fp_id : int + Reference flowpath identifier + hy_id : int + Hydrolocation identifier + fp_id : float + Flowpath identifier + virtual_fp_id : int + Virtual flowpath identifier + dn_nex_id : float + Downstream nexus identifier + dn_virtual_nex_id : int + Downstream virtual nexus identifier + div_id : int + Associated divide identifier + lake_id : int + Lake identifier + res_id : str + Reservoir identifier + LkArea : float + Lake area + LkMxE : float + Lake maximum elevation + WeirC : float + Weir coefficient + WeirL : float + Weir length + WeirE : float + Weir elevation + OrificeC : float + Orifice coefficient + OrificeA : float + Orifice area + OrificeE : float + Orifice elevation + Dam_Length : float + Dam length + ifd : float + Initial flood depth + reservoir_index_AnA : float + Reservoir index for AnA configuration + reservoir_index_Extended_AnA : float + Reservoir index for Extended AnA configuration + reservoir_index_GDL_AK : float + Reservoir index for GDL AK configuration + reservoir_index_Medium_Range : float + Reservoir index for Medium Range configuration + reservoir_index_Short_Range : float + Reservoir index for Short Range configuration + geometry : binary + Spatial Geometry (POINT format) - stored in WKB binary format + """ + + @classmethod + def columns(cls) -> list[str]: + """Returns the columns associated with this schema.""" + return [ + "nhf_lake_id", + "ref_fp_id", + "hy_id", + "fp_id", + "virtual_fp_id", + "dn_nex_id", + "dn_virtual_nex_id", + "div_id", + "lake_id", + "res_id", + "LkArea", + "LkMxE", + "WeirC", + "WeirL", + "WeirE", + "OrificeC", + "OrificeA", + "OrificeE", + "Dam_Length", + "ifd", + "reservoir_index_AnA", + "reservoir_index_Extended_AnA", + "reservoir_index_GDL_AK", + "reservoir_index_Medium_Range", + "reservoir_index_Short_Range", + "geometry", + ] + + @classmethod + def schema(cls) -> Schema: + """Returns the PyIceberg Schema object.""" + desc = [ + "Unique NHF lake identifier", + "Reference flowpath identifier", + "Hydrolocation identifier", + "Flowpath identifier", + "Virtual flowpath identifier", + "Downstream nexus identifier", + "Downstream virtual nexus identifier", + "Associated divide identifier", + "Lake identifier", + "Reservoir identifier", + "Lake area", + "Lake maximum elevation", + "Weir coefficient", + "Weir length", + "Weir elevation", + "Orifice coefficient", + "Orifice area", + "Orifice elevation", + "Dam length", + "Initial flood depth", + "Reservoir index for AnA configuration", + "Reservoir index for Extended AnA configuration", + "Reservoir index for GDL AK configuration", + "Reservoir index for Medium Range configuration", + "Reservoir index for Short Range configuration", + "Spatial Geometry (POINT format) - stored in WKB binary format", + ] + return Schema( + NestedField(1, "nhf_lake_id", LongType(), required=True, doc=desc[0]), + NestedField(2, "ref_fp_id", LongType(), required=False, doc=desc[1]), + NestedField(3, "hy_id", LongType(), required=False, doc=desc[2]), + NestedField(4, "fp_id", DoubleType(), required=False, doc=desc[3]), + NestedField(5, "virtual_fp_id", LongType(), required=False, doc=desc[4]), + NestedField(6, "dn_nex_id", DoubleType(), required=False, doc=desc[5]), + NestedField(7, "dn_virtual_nex_id", LongType(), required=False, doc=desc[6]), + NestedField(8, "div_id", LongType(), required=False, doc=desc[7]), + NestedField(9, "lake_id", LongType(), required=False, doc=desc[8]), + NestedField(10, "res_id", StringType(), required=False, doc=desc[9]), + NestedField(11, "LkArea", DoubleType(), required=False, doc=desc[10]), + NestedField(12, "LkMxE", DoubleType(), required=False, doc=desc[11]), + NestedField(13, "WeirC", DoubleType(), required=False, doc=desc[12]), + NestedField(14, "WeirL", DoubleType(), required=False, doc=desc[13]), + NestedField(15, "WeirE", DoubleType(), required=False, doc=desc[14]), + NestedField(16, "OrificeC", DoubleType(), required=False, doc=desc[15]), + NestedField(17, "OrificeA", DoubleType(), required=False, doc=desc[16]), + NestedField(18, "OrificeE", DoubleType(), required=False, doc=desc[17]), + NestedField(19, "Dam_Length", DoubleType(), required=False, doc=desc[18]), + NestedField(20, "ifd", DoubleType(), required=False, doc=desc[19]), + NestedField(21, "reservoir_index_AnA", DoubleType(), required=False, doc=desc[20]), + NestedField(22, "reservoir_index_Extended_AnA", DoubleType(), required=False, doc=desc[21]), + NestedField(23, "reservoir_index_GDL_AK", DoubleType(), required=False, doc=desc[22]), + NestedField(24, "reservoir_index_Medium_Range", DoubleType(), required=False, doc=desc[23]), + NestedField(25, "reservoir_index_Short_Range", DoubleType(), required=False, doc=desc[24]), + NestedField(26, "geometry", BinaryType(), required=False, doc=desc[25]), + identifier_field_ids=[1], + ) + + @classmethod + def arrow_schema(cls) -> pa.Schema: + """Returns the PyArrow Schema object.""" + return pa.schema( + [ + pa.field("nhf_lake_id", pa.int64(), nullable=False), + pa.field("ref_fp_id", pa.int64(), nullable=True), + pa.field("hy_id", pa.int64(), nullable=True), + pa.field("fp_id", pa.float64(), nullable=True), + pa.field("virtual_fp_id", pa.int64(), nullable=True), + pa.field("dn_nex_id", pa.float64(), nullable=True), + pa.field("dn_virtual_nex_id", pa.int64(), nullable=True), + pa.field("div_id", pa.int64(), nullable=True), + pa.field("lake_id", pa.int64(), nullable=True), + pa.field("res_id", pa.string(), nullable=True), + pa.field("LkArea", pa.float64(), nullable=True), + pa.field("LkMxE", pa.float64(), nullable=True), + pa.field("WeirC", pa.float64(), nullable=True), + pa.field("WeirL", pa.float64(), nullable=True), + pa.field("WeirE", pa.float64(), nullable=True), + pa.field("OrificeC", pa.float64(), nullable=True), + pa.field("OrificeA", pa.float64(), nullable=True), + pa.field("OrificeE", pa.float64(), nullable=True), + pa.field("Dam_Length", pa.float64(), nullable=True), + pa.field("ifd", pa.float64(), nullable=True), + pa.field("reservoir_index_AnA", pa.float64(), nullable=True), + pa.field("reservoir_index_Extended_AnA", pa.float64(), nullable=True), + pa.field("reservoir_index_GDL_AK", pa.float64(), nullable=True), + pa.field("reservoir_index_Medium_Range", pa.float64(), nullable=True), + pa.field("reservoir_index_Short_Range", pa.float64(), nullable=True), + pa.field("geometry", pa.binary(), nullable=True), + ] + ) + + class Hydrolocations: """ The schema for the virtual_nexus table diff --git a/src/icefabric/schemas/iceberg_tables/nhf_snapshots.py b/src/icefabric/schemas/iceberg_tables/nhf_snapshots.py index f34ff2a..791d68f 100644 --- a/src/icefabric/schemas/iceberg_tables/nhf_snapshots.py +++ b/src/icefabric/schemas/iceberg_tables/nhf_snapshots.py @@ -40,6 +40,7 @@ def columns(cls) -> list[str]: "virtual_flowpaths", "virtual_nexus", "hydrolocations", + "lakes", ] @classmethod @@ -61,6 +62,7 @@ def schema(cls) -> Schema: NestedField(7, "virtual_flowpaths", LongType(), required=False), NestedField(8, "virtual_nexus", LongType(), required=False), NestedField(9, "hydrolocations", LongType(), required=False), + NestedField(10, "lakes", LongType(), required=False), ) @classmethod @@ -83,5 +85,6 @@ def arrow_schema(cls) -> pa.Schema: pa.field("virtual_flowpaths", pa.int64(), nullable=True), pa.field("virtual_nexus", pa.int64(), nullable=True), pa.field("hydrolocations", pa.int64(), nullable=True), + pa.field("lakes", pa.int64(), nullable=True), ] ) From 781c1ddf51a415aee26facd8ab7925ae56774e98 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 2 Apr 2026 21:23:51 -0400 Subject: [PATCH 6/9] fix: coerce numeric strings to numeric type during gpkg to parquet conversion --- tools/hydrofabric/nhf_gpkg_to_parquet.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tools/hydrofabric/nhf_gpkg_to_parquet.py b/tools/hydrofabric/nhf_gpkg_to_parquet.py index fcfbf7e..fa467e9 100644 --- a/tools/hydrofabric/nhf_gpkg_to_parquet.py +++ b/tools/hydrofabric/nhf_gpkg_to_parquet.py @@ -53,8 +53,18 @@ def nhf_gpkg_to_parquet(input_file: Path, output_folder: Path) -> None: if col not in gdf.columns: gdf[col] = None + # Coerce string-encoded numeric columns to match the arrow schema + arrow_schema = schema.arrow_schema() + for field in arrow_schema: + if ( + field.name in gdf.columns + and gdf[field.name].dtype == object + and pa.types.is_integer(field.type) + ): + gdf[field.name] = pd.to_numeric(gdf[field.name], errors="coerce") + # Create PyArrow table with schema validation - table = pa.Table.from_pandas(gdf[schema.columns()], schema=schema.arrow_schema()) + table = pa.Table.from_pandas(gdf[schema.columns()], schema=arrow_schema) # Write parquet file output_path = output_folder / f"{layer}.parquet" From ae32776099892ea26b095bc9fd160fadda726252 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 2 Apr 2026 21:58:23 -0400 Subject: [PATCH 7/9] feat: return all layers in nhf subsetting even if empty --- app/routers/hydrofabric/router.py | 36 ++++++++--------- src/icefabric/hydrofabric/subset_nhf.py | 51 ++++++++++++++++--------- 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/app/routers/hydrofabric/router.py b/app/routers/hydrofabric/router.py index 8ce8c42..b5cfb7d 100644 --- a/app/routers/hydrofabric/router.py +++ b/app/routers/hydrofabric/router.py @@ -181,13 +181,10 @@ async def get_hydrofabric_subset_gpkg( # Separate spatial vs non-spatial for table_name, layer_data in output_layers.items(): - if len(layer_data) > 0: - if isinstance(layer_data, gpd.GeoDataFrame): - spatial_layers[table_name] = layer_data - else: - nonspatial_layers[table_name] = layer_data - else: - logger.warning(f"Warning: {table_name} layer is empty") + if isinstance(layer_data, gpd.GeoDataFrame) and len(layer_data) > 0: + spatial_layers[table_name] = layer_data + elif not isinstance(layer_data, gpd.GeoDataFrame): + nonspatial_layers[table_name] = layer_data # Write spatial layers first with pyogrio for table_name, layer_data in spatial_layers.items(): @@ -195,19 +192,18 @@ async def get_hydrofabric_subset_gpkg( layers_written += 1 logger.info(f"Written spatial layer '{table_name}' with {len(layer_data)} records") - # Then write non-spatial layers with sqlite3 - if nonspatial_layers: - conn = sqlite3.connect(tmp_path) - for table_name, layer_data in nonspatial_layers.items(): - layer_data.to_sql(table_name, conn, if_exists="replace", index=False) - layers_written += 1 - logger.info(f"Written non-spatial layer '{table_name}' with {len(layer_data)} records") - conn.close() - - if layers_written == 0: - raise HTTPException( - status_code=404, detail=f"No non-empty layers found for identifier '{identifier}'" - ) + # Then write non-spatial layers with sqlite3 (includes empty layers) + conn = sqlite3.connect(tmp_path) + for table_name, layer_data in nonspatial_layers.items(): + layer_data.to_sql(table_name, conn, if_exists="replace", index=False) + layers_written += 1 + logger.info(f"Written non-spatial layer '{table_name}' with {len(layer_data)} records") + conn.close() + + if layers_written == 0: + raise HTTPException( + status_code=404, detail=f"No non-empty layers found for identifier '{identifier}'" + ) # Verify the file was created successfully if not tmp_path.exists(): diff --git a/src/icefabric/hydrofabric/subset_nhf.py b/src/icefabric/hydrofabric/subset_nhf.py index ee77a1c..77298f9 100644 --- a/src/icefabric/hydrofabric/subset_nhf.py +++ b/src/icefabric/hydrofabric/subset_nhf.py @@ -115,25 +115,34 @@ def _get_lazy_frame(self, layer: str) -> pl.LazyFrame | None: return None return pl.scan_parquet(path) + @staticmethod + def _empty_for_layer(layer: str) -> pl.DataFrame: + """Return an empty DataFrame with the correct schema for a missing layer.""" + from icefabric.schemas.iceberg_tables import nhf_layers + + if layer in nhf_layers: + return pl.from_arrow(nhf_layers[layer].arrow_schema().empty_table()) + return pl.DataFrame() + def load_filtered(self, layer: str, col: str, ids: set[int]) -> pl.DataFrame: """Load a layer filtered by a set of IDs.""" lf = self._get_lazy_frame(layer) if lf is None: - return pl.DataFrame() + return self._empty_for_layer(layer) return lf.filter(pl.col(col).is_in(ids)).collect() def load_filtered_eq(self, layer: str, col: str, value: str) -> pl.DataFrame: """Load a layer filtered by string equality.""" lf = self._get_lazy_frame(layer) if lf is None: - return pl.DataFrame() + return self._empty_for_layer(layer) return lf.filter(pl.col(col) == value).collect() def load_columns(self, layer: str, columns: list[str]) -> pl.DataFrame: """Load specific columns from a layer.""" lf = self._get_lazy_frame(layer) if lf is None: - return pl.DataFrame() + return self._empty_for_layer(layer) return lf.select(columns).collect() @@ -192,19 +201,21 @@ def generate_subset_from_ids( logger.debug(f"Subsetting {len(flowpath_ids)} flowpaths") # Wave 1: fp_id/div_id filtered (parallel) - with ThreadPoolExecutor(max_workers=5) as ex: + with ThreadPoolExecutor(max_workers=6) as ex: f = { "fp": ex.submit(source.load_filtered, "flowpaths", "fp_id", flowpath_ids), "div": ex.submit(source.load_filtered, "divides", "div_id", flowpath_ids), "wb": ex.submit(source.load_filtered, "waterbodies", "fp_id", flowpath_ids), "gages": ex.submit(source.load_filtered, "gages", "fp_id", flowpath_ids), "ref_fp": ex.submit(source.load_filtered, "reference_flowpaths", "div_id", flowpath_ids), + "lakes": ex.submit(source.load_filtered, "lakes", "fp_id", flowpath_ids), } subset_fp = f["fp"].result() subset_div = f["div"].result() subset_wb = f["wb"].result() subset_gages = f["gages"].result() subset_ref_fp = f["ref_fp"].result() + subset_lakes = f["lakes"].result() # Derive dependent IDs all_nex_ids = set( @@ -267,6 +278,7 @@ def generate_subset_from_ids( "virtual_flowpaths": pl_to_gdf(subset_v_fp), "waterbodies": pl_to_gdf(subset_wb) if len(subset_wb) > 0 else subset_wb.to_pandas(), "gages": pl_to_gdf(subset_gages) if len(subset_gages) > 0 else subset_gages.to_pandas(), + "lakes": pl_to_gdf(subset_lakes) if len(subset_lakes) > 0 else subset_lakes.to_pandas(), "reference_flowpaths": subset_ref_fp.to_pandas(), "hydrolocations": subset_hydrolocations.to_pandas(), } @@ -275,22 +287,27 @@ def generate_subset_from_ids( logger.debug(f"Writing to {subset_file}...") subset_file.parent.mkdir(parents=True, exist_ok=True) - for name, df in [ - ("flowpaths", output["flowpaths"]), - ("nexus", output["nexus"]), - ("divides", output["divides"]), - ("virtual_nexus", output["virtual_nexus"]), - ("virtual_flowpaths", output["virtual_flowpaths"]), - ("waterbodies", output["waterbodies"]), - ("gages", output["gages"]), - ]: + spatial_layers = [ + "flowpaths", + "nexus", + "divides", + "virtual_nexus", + "virtual_flowpaths", + "waterbodies", + "gages", + "lakes", + ] + for name in spatial_layers: + df = output[name] logger.debug(f" {name}: {len(df)} rows") - pyogrio.write_dataframe(df, subset_file, layer=name) + if isinstance(df, gpd.GeoDataFrame) and len(df) > 0: + pyogrio.write_dataframe(df, subset_file, layer=name) - logger.debug(f" reference_flowpaths: {len(subset_ref_fp)} rows") + nonspatial_layers = ["reference_flowpaths", "hydrolocations"] conn = sqlite3.connect(subset_file) - output["reference_flowpaths"].to_sql("reference_flowpaths", conn, if_exists="replace", index=False) - output["hydrolocations"].to_sql("hydrolocations", conn, if_exists="replace", index=False) + for name in nonspatial_layers: + logger.debug(f" {name}: {len(output[name])} rows") + output[name].to_sql(name, conn, if_exists="replace", index=False) conn.close() return output From 48294c9224b6141a84193b97743015ac666c8f60 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 2 Apr 2026 22:02:30 -0400 Subject: [PATCH 8/9] nit: add references to both test and oe warehouses in .pyiceberg.yaml --- .pyiceberg.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.pyiceberg.yaml b/.pyiceberg.yaml index 4c42331..78b26c0 100644 --- a/.pyiceberg.yaml +++ b/.pyiceberg.yaml @@ -2,7 +2,8 @@ catalog: glue: type: glue s3.endpoint: s3.us-east-1.amazonaws.com - warehouse: s3://52fcde3e7-5582-477d-7686ou4ij1ptxj8equ83a5xc51fsuse1b--table-s3 + # warehouse: s3://52fcde3e7-5582-477d-7686ou4ij1ptxj8equ83a5xc51fsuse1b--table-s3 # Test + warehouse: 154735606025 # OE region: us-east-1 glue_region: us-east-1 sql: From 03c0deacadc34d9091d7f5277928d19415d9ae18 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Fri, 3 Apr 2026 10:33:58 -0400 Subject: [PATCH 9/9] nit: comment out OE warehouse --- .pyiceberg.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pyiceberg.yaml b/.pyiceberg.yaml index 78b26c0..d313159 100644 --- a/.pyiceberg.yaml +++ b/.pyiceberg.yaml @@ -2,8 +2,8 @@ catalog: glue: type: glue s3.endpoint: s3.us-east-1.amazonaws.com - # warehouse: s3://52fcde3e7-5582-477d-7686ou4ij1ptxj8equ83a5xc51fsuse1b--table-s3 # Test - warehouse: 154735606025 # OE + warehouse: s3://52fcde3e7-5582-477d-7686ou4ij1ptxj8equ83a5xc51fsuse1b--table-s3 # Test + # warehouse: 154735606025 # OE region: us-east-1 glue_region: us-east-1 sql: