Skip to content
3 changes: 2 additions & 1 deletion .pyiceberg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ catalog:
glue:
type: glue
s3.endpoint: s3.us-east-1.amazonaws.com
warehouse: s3://52fcde3e7-5582-477d-7686ou4ij1ptxj8equ83a5xc51fsuse1b--table-s3
warehouse: s3://52fcde3e7-5582-477d-7686ou4ij1ptxj8equ83a5xc51fsuse1b--table-s3 # Test
# warehouse: 154735606025 # OE
region: us-east-1
glue_region: us-east-1
sql:
Expand Down
3 changes: 3 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@
help="List of namespaces to include in local cache. Optionally specified as <namespace>:<snapshot>",
default=[
"conus_nhf",
"ak_nhf",
"hi_nhf",
"prvi_nhf",
"conus_hf",
"prvi_hf",
"hi_hf",
Expand Down
36 changes: 16 additions & 20 deletions app/routers/hydrofabric/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,33 +181,29 @@ async def get_hydrofabric_subset_gpkg(

# Separate spatial vs non-spatial
for table_name, layer_data in output_layers.items():
if len(layer_data) > 0:
if isinstance(layer_data, gpd.GeoDataFrame):
spatial_layers[table_name] = layer_data
else:
nonspatial_layers[table_name] = layer_data
else:
logger.warning(f"Warning: {table_name} layer is empty")
if isinstance(layer_data, gpd.GeoDataFrame) and len(layer_data) > 0:
spatial_layers[table_name] = layer_data
elif not isinstance(layer_data, gpd.GeoDataFrame):
nonspatial_layers[table_name] = layer_data

# Write spatial layers first with pyogrio
for table_name, layer_data in spatial_layers.items():
pyogrio.write_dataframe(layer_data, tmp_path, layer=table_name)
layers_written += 1
logger.info(f"Written spatial layer '{table_name}' with {len(layer_data)} records")

# Then write non-spatial layers with sqlite3
if nonspatial_layers:
conn = sqlite3.connect(tmp_path)
for table_name, layer_data in nonspatial_layers.items():
layer_data.to_sql(table_name, conn, if_exists="replace", index=False)
layers_written += 1
logger.info(f"Written non-spatial layer '{table_name}' with {len(layer_data)} records")
conn.close()

if layers_written == 0:
raise HTTPException(
status_code=404, detail=f"No non-empty layers found for identifier '{identifier}'"
)
# Then write non-spatial layers with sqlite3 (includes empty layers)
conn = sqlite3.connect(tmp_path)
for table_name, layer_data in nonspatial_layers.items():
layer_data.to_sql(table_name, conn, if_exists="replace", index=False)
layers_written += 1
logger.info(f"Written non-spatial layer '{table_name}' with {len(layer_data)} records")
conn.close()

if layers_written == 0:
raise HTTPException(
status_code=404, detail=f"No non-empty layers found for identifier '{identifier}'"
)

# Verify the file was created successfully
if not tmp_path.exists():
Expand Down
79 changes: 55 additions & 24 deletions src/icefabric/hydrofabric/subset_nhf.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,48 @@ def __init__(
if parquet_dir is None and catalog is None:
raise ValueError("Must provide either parquet_dir or catalog")

def _get_lazy_frame(self, layer: str) -> pl.LazyFrame:
"""Get a LazyFrame for a given layer."""
def _get_lazy_frame(self, layer: str) -> pl.LazyFrame | None:
"""Get a LazyFrame for a given layer, or None if the layer doesn't exist."""
if self.catalog is not None:
return self.catalog.load_table(f"{self.namespace.value}.{layer}").to_polars()
table_id = f"{self.namespace.value}.{layer}"
if not self.catalog.table_exists(table_id):
return None
return self.catalog.load_table(table_id).to_polars()
else:
path = self.parquet_dir / f"{layer}.parquet"
if not path.exists():
raise FileNotFoundError(f"Parquet file not found: {path}")
return None
return pl.scan_parquet(path)

@staticmethod
def _empty_for_layer(layer: str) -> pl.DataFrame:
"""Return an empty DataFrame with the correct schema for a missing layer."""
from icefabric.schemas.iceberg_tables import nhf_layers

if layer in nhf_layers:
return pl.from_arrow(nhf_layers[layer].arrow_schema().empty_table())
return pl.DataFrame()

def load_filtered(self, layer: str, col: str, ids: set[int]) -> pl.DataFrame:
"""Load a layer filtered by a set of IDs."""
return self._get_lazy_frame(layer).filter(pl.col(col).is_in(ids)).collect()
lf = self._get_lazy_frame(layer)
if lf is None:
return self._empty_for_layer(layer)
return lf.filter(pl.col(col).is_in(ids)).collect()

def load_filtered_eq(self, layer: str, col: str, value: str) -> pl.DataFrame:
"""Load a layer filtered by string equality."""
return self._get_lazy_frame(layer).filter(pl.col(col) == value).collect()
lf = self._get_lazy_frame(layer)
if lf is None:
return self._empty_for_layer(layer)
return lf.filter(pl.col(col) == value).collect()

def load_columns(self, layer: str, columns: list[str]) -> pl.DataFrame:
"""Load specific columns from a layer."""
return self._get_lazy_frame(layer).select(columns).collect()
lf = self._get_lazy_frame(layer)
if lf is None:
return self._empty_for_layer(layer)
return lf.select(columns).collect()


# =============================================================================
Expand Down Expand Up @@ -180,27 +201,31 @@ def generate_subset_from_ids(
logger.debug(f"Subsetting {len(flowpath_ids)} flowpaths")

# Wave 1: fp_id/div_id filtered (parallel)
with ThreadPoolExecutor(max_workers=5) as ex:
with ThreadPoolExecutor(max_workers=6) as ex:
f = {
"fp": ex.submit(source.load_filtered, "flowpaths", "fp_id", flowpath_ids),
"div": ex.submit(source.load_filtered, "divides", "div_id", flowpath_ids),
"wb": ex.submit(source.load_filtered, "waterbodies", "fp_id", flowpath_ids),
"gages": ex.submit(source.load_filtered, "gages", "fp_id", flowpath_ids),
"ref_fp": ex.submit(source.load_filtered, "reference_flowpaths", "div_id", flowpath_ids),
"lakes": ex.submit(source.load_filtered, "lakes", "fp_id", flowpath_ids),
}
subset_fp = f["fp"].result()
subset_div = f["div"].result()
subset_wb = f["wb"].result()
subset_gages = f["gages"].result()
subset_ref_fp = f["ref_fp"].result()
subset_lakes = f["lakes"].result()

# Derive dependent IDs
all_nex_ids = set(
subset_fp.filter(pl.col("up_nex_id").is_not_null())["up_nex_id"].cast(pl.Int64).to_list()
+ subset_fp.filter(pl.col("dn_nex_id").is_not_null())["dn_nex_id"].cast(pl.Int64).to_list()
)
all_v_fp_ids = set(subset_ref_fp["virtual_fp_id"].to_list())
all_hy_ids = set(subset_wb["hy_id"].to_list() + subset_gages["hy_id"].to_list())
wb_hy_ids = subset_wb["hy_id"].to_list() if "hy_id" in subset_wb.columns else []
gage_hy_ids = subset_gages["hy_id"].to_list() if "hy_id" in subset_gages.columns else []
all_hy_ids = set(wb_hy_ids + gage_hy_ids)

# Wave 2: nex_id/virtual_fp_id filtered (parallel)
with ThreadPoolExecutor(max_workers=2) as ex:
Expand Down Expand Up @@ -251,8 +276,9 @@ def generate_subset_from_ids(
"divides": pl_to_gdf(subset_div),
"virtual_nexus": pl_to_gdf(subset_v_nex),
"virtual_flowpaths": pl_to_gdf(subset_v_fp),
"waterbodies": pl_to_gdf(subset_wb),
"gages": pl_to_gdf(subset_gages),
"waterbodies": pl_to_gdf(subset_wb) if len(subset_wb) > 0 else subset_wb.to_pandas(),
"gages": pl_to_gdf(subset_gages) if len(subset_gages) > 0 else subset_gages.to_pandas(),
"lakes": pl_to_gdf(subset_lakes) if len(subset_lakes) > 0 else subset_lakes.to_pandas(),
"reference_flowpaths": subset_ref_fp.to_pandas(),
"hydrolocations": subset_hydrolocations.to_pandas(),
}
Expand All @@ -261,22 +287,27 @@ def generate_subset_from_ids(
logger.debug(f"Writing to {subset_file}...")
subset_file.parent.mkdir(parents=True, exist_ok=True)

for name, df in [
("flowpaths", output["flowpaths"]),
("nexus", output["nexus"]),
("divides", output["divides"]),
("virtual_nexus", output["virtual_nexus"]),
("virtual_flowpaths", output["virtual_flowpaths"]),
("waterbodies", output["waterbodies"]),
("gages", output["gages"]),
]:
spatial_layers = [
"flowpaths",
"nexus",
"divides",
"virtual_nexus",
"virtual_flowpaths",
"waterbodies",
"gages",
"lakes",
]
for name in spatial_layers:
df = output[name]
logger.debug(f" {name}: {len(df)} rows")
pyogrio.write_dataframe(df, subset_file, layer=name)
if isinstance(df, gpd.GeoDataFrame) and len(df) > 0:
pyogrio.write_dataframe(df, subset_file, layer=name)

logger.debug(f" reference_flowpaths: {len(subset_ref_fp)} rows")
nonspatial_layers = ["reference_flowpaths", "hydrolocations"]
conn = sqlite3.connect(subset_file)
output["reference_flowpaths"].to_sql("reference_flowpaths", conn, if_exists="replace", index=False)
output["hydrolocations"].to_sql("hydrolocations", conn, if_exists="replace", index=False)
for name in nonspatial_layers:
logger.debug(f" {name}: {len(output[name])} rows")
output[name].to_sql(name, conn, if_exists="replace", index=False)
conn.close()

return output
Expand Down
23 changes: 17 additions & 6 deletions src/icefabric/schemas/hydrofabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ class HydrofabricNamespace(str, Enum):

NHF = "nhf"
CONUS_NHF = "conus_nhf"
ALASKA_NHF = "ak_nhf"
HAWAII_NHF = "hi_nhf"
PUERTO_RICO_NHF = "prvi_nhf"
CONUS_HF = "conus_hf"
ALASKA_HF = "ak_hf"
HAWAII_HF = "hi_hf"
Expand All @@ -164,7 +167,13 @@ def __str__(self) -> str:
@property
def is_nhf(self) -> bool:
"""Return True if this namespace is NHF data."""
return self in (HydrofabricNamespace.NHF, HydrofabricNamespace.CONUS_NHF)
return self in (
HydrofabricNamespace.NHF,
HydrofabricNamespace.CONUS_NHF,
HydrofabricNamespace.ALASKA_NHF,
HydrofabricNamespace.HAWAII_NHF,
HydrofabricNamespace.PUERTO_RICO_NHF,
)

@property
def is_oconus_hf(self) -> bool:
Expand All @@ -183,6 +192,9 @@ def _missing_(cls, value: object) -> HydrofabricNamespace | None:
aliases = {
"nhf": cls.NHF,
"conus_nhf": cls.CONUS_NHF,
"ak_nhf": cls.ALASKA_NHF,
"hi_nhf": cls.HAWAII_NHF,
"prvi_nhf": cls.PUERTO_RICO_NHF,
"conus_hf": cls.CONUS_HF,
"ak_hf": cls.ALASKA_HF,
"hi_hf": cls.HAWAII_HF,
Expand Down Expand Up @@ -286,11 +298,11 @@ def resolve(
] = {
(GeographicDomain.CONUS, HydrofabricSource.NHF): cls.CONUS_NHF,
(GeographicDomain.CONUS, HydrofabricSource.HF): cls.CONUS_HF,
(GeographicDomain.ALASKA, HydrofabricSource.NHF): None,
(GeographicDomain.ALASKA, HydrofabricSource.NHF): cls.ALASKA_NHF,
(GeographicDomain.ALASKA, HydrofabricSource.HF): cls.ALASKA_HF,
(GeographicDomain.HAWAII, HydrofabricSource.NHF): None,
(GeographicDomain.HAWAII, HydrofabricSource.NHF): cls.HAWAII_NHF,
(GeographicDomain.HAWAII, HydrofabricSource.HF): cls.HAWAII_HF,
(GeographicDomain.PUERTO_RICO, HydrofabricSource.NHF): None,
(GeographicDomain.PUERTO_RICO, HydrofabricSource.NHF): cls.PUERTO_RICO_NHF,
(GeographicDomain.PUERTO_RICO, HydrofabricSource.HF): cls.PUERTO_RICO_HF,
(GeographicDomain.GREAT_LAKES, HydrofabricSource.NHF): None,
(GeographicDomain.GREAT_LAKES, HydrofabricSource.HF): cls.GREAT_LAKES_HF,
Expand All @@ -300,8 +312,7 @@ def resolve(

if namespace is None:
raise NotImplementedError(
f"Domain '{domain.value}' is not currently available for source '{source.value}'. " # type: ignore[union-attr]
f"Only 'CONUS' is available for the National Hydrofabric (nhf) at this time."
f"Domain '{domain.value}' is not currently available for source '{source.value}'." # type: ignore[union-attr]
)

return namespace
2 changes: 2 additions & 0 deletions src/icefabric/schemas/iceberg_tables/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Gages,
Hydrolocations,
Nexus,
NHFLakes,
ReferenceFlowpaths,
VirtualFlowpaths,
VirtualNexus,
Expand All @@ -20,4 +21,5 @@
"virtual_flowpaths": VirtualFlowpaths,
"virtual_nexus": VirtualNexus,
"hydrolocations": Hydrolocations,
"lakes": NHFLakes,
}
Loading
Loading