From 532c82aff4a2f5ac26e563f4561d9b247d92a8e5 Mon Sep 17 00:00:00 2001 From: sudoKiarie Date: Wed, 1 Apr 2026 18:41:03 +0300 Subject: [PATCH] spatial predictions --- model/make_spatial_prediction.py | 118 +++++++++++++++++++++++++++- model/test_spatial_prediction.py | 131 +++++++++++++++++++++++++++++++ 2 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 model/test_spatial_prediction.py diff --git a/model/make_spatial_prediction.py b/model/make_spatial_prediction.py index a2cf70c..40e71f3 100644 --- a/model/make_spatial_prediction.py +++ b/model/make_spatial_prediction.py @@ -353,6 +353,113 @@ def align_mask(mask_array, src_transform, src_crs, metas, inundation_file): return aligned +def _resolve_latlon_columns(df): + lat_cols = ['latitude', 'lat', 'Latitude', 'LAT', 'Y', 'y'] + lon_cols = ['longitude', 'lon', 'Longitude', 'LON', 'X', 'x'] + lat_col = next((c for c in lat_cols if c in df.columns), None) + lon_col = next((c for c in lon_cols if c in df.columns), None) + if lat_col is None or lon_col is None: + raise ValueError("Could not identify latitude/longitude columns in exposure file") + return lat_col, lon_col + + +def load_exposure_gdf(filepath, facility_type): + if filepath.lower().endswith('.csv'): + df = pd.read_csv(filepath) + elif filepath.lower().endswith('.xlsx') or filepath.lower().endswith('.xls'): + df = pd.read_excel(filepath) + else: + raise ValueError(f"Unsupported exposure file format for {filepath}") + + lat_col, lon_col = _resolve_latlon_columns(df) + name_col = next((c for c in ['name', 'Name', 'facility_name', 'Facility Name'] if c in df.columns), None) + if name_col is None: + df['name'] = f"unknown_{facility_type}" + name_col = 'name' + + gdf = gpd.GeoDataFrame( + df.assign(geometry=gpd.points_from_xy(df[lon_col], df[lat_col])), + geometry='geometry', + crs='EPSG:4326' + ) + gdf = gdf.rename(columns={name_col: 'name'}) + gdf = gdf[['name', lat_col, lon_col, 'geometry']].copy() + gdf['facility_type'] = facility_type + gdf = gdf.rename(columns={lat_col: 'latitude', lon_col: 'longitude'}) + return gdf + + +def get_exposure_points(hospital_path, school_path): + nodes = [] + for path, facility_type in [(hospital_path, 'hospital'), (school_path, 'school')]: + if not os.path.exists(path): + logger.warning(f"Exposure file not found; skipping: {path}") + continue + try: + gdf = load_exposure_gdf(path, facility_type) + nodes.append(gdf) + logger.info(f"Loaded {len(gdf)} {facility_type} records from {path}") + except Exception as e: + logger.warning(f"Failed to load exposure data from {path}: {e}") + + if not nodes: + logger.warning("No exposure points loaded; no impact CSV will be created.") + return gpd.GeoDataFrame(columns=['name', 'latitude', 'longitude', 'facility_type', 'geometry']) + + all_points = pd.concat(nodes, ignore_index=True) + return gpd.GeoDataFrame(all_points, geometry='geometry', crs='EPSG:4326') + + +def get_impacted_exposure_points(points_gdf, aligned_mask, transform): + if points_gdf.empty: + return pd.DataFrame(columns=['name', 'latitude', 'longitude', 'facility_type', 'impacted']) + + # We assume points_gdf is already in the same CRS as the mask transform. + points = points_gdf + + xs = points.geometry.x.to_list() + ys = points.geometry.y.to_list() + + # rasterio.transform.rowcol returns (row, col) based on coordinate-to-pixel mapping. + rows, cols = rasterio.transform.rowcol(transform, xs, ys, op=np.floor) + + impacted = [] + for row, col in zip(rows, cols): + if 0 <= row < aligned_mask.shape[0] and 0 <= col < aligned_mask.shape[1]: + impacted.append(bool(aligned_mask[row, col] == 1)) + else: + impacted.append(False) + + out = points.copy() + out['impacted'] = impacted + return out + + +def export_impacted_facilities(masks, metas, inundation_file, points_gdf, output_dir): + if points_gdf.empty: + logger.warning("No exposure points found, skipping impact CSV generation.") + return None + + target_crs = metas[inundation_file]['crs'] + target_transform = metas[inundation_file]['transform'] + + points_projected = points_gdf.to_crs(target_crs) + result = points_projected[['name', 'latitude', 'longitude', 'facility_type']].copy() + + for scenario in ['Worst Case', 'Average Case', 'Best Case']: + aligned = align_mask((masks[scenario] > 0).astype(np.uint8), + metas[inundation_file]['transform'], + metas[inundation_file]['crs'], + metas, inundation_file) + impacted = get_impacted_exposure_points(points_projected, aligned, metas[inundation_file]['transform']) + result[f'{scenario.lower().replace(" ", "_")}'] = impacted['impacted'].values + + csv_path = output_dir / 'impacted_schools_hospitals.csv' + result.to_csv(csv_path, index=False) + logger.info(f"Saved impacted facility list: {csv_path}") + return csv_path + + def plot_flood_change_map( masks, current_extent, transform, crs, regions_gdf, inundation_clipped, metas, inundation_file='20250211.tif', @@ -834,4 +941,13 @@ def run_full_spatial_analysis(): print(f"Error plotting {region}: {e}") # Export for QGIS - export_qgis_files(masks, current_extent, transform, crs, regions_gdf, folder_title, metas, inundation_file='20250211.tif') \ No newline at end of file + export_qgis_files(masks, current_extent, transform, crs, regions_gdf, folder_title, metas, inundation_file='20250211.tif') + + # --- Exposure impact reporting for schools and hospitals --- + hospital_csv = 'data/downloads/hospitals.csv' + school_csv = 'data/downloads/schools.csv' + + exposure_points = get_exposure_points(hospital_csv, school_csv) + export_impacted_facilities(masks, metas, '20250211.tif', exposure_points, output_dir) + + logger.info('run_full_spatial_analysis completed: flood maps and exposure impact report generated.') \ No newline at end of file diff --git a/model/test_spatial_prediction.py b/model/test_spatial_prediction.py new file mode 100644 index 0000000..5d7db30 --- /dev/null +++ b/model/test_spatial_prediction.py @@ -0,0 +1,131 @@ +import os +import tempfile +import numpy as np +import pandas as pd +import geopandas as gpd +import pytest +from pathlib import Path +from rasterio.transform import from_origin + +from model.make_spatial_prediction import ( + get_impacted_exposure_points, + get_exposure_points, + export_impacted_facilities, +) + + +def test_get_impacted_exposure_points_basic(): + mask = np.zeros((10, 10), dtype=np.uint8) + mask[0, 0] = 1 + + # This transform maps raster row 0 to y=9..10 and col 0 to x=0..1 + transform = from_origin(0.0, 10.0, 1.0, 1.0) + + points = gpd.GeoDataFrame( + { + "name": ["inside", "outside", "near_edge"], + "latitude": [9.5, 20.0, 8.5], + "longitude": [0.5, 0.5, 9.5], + "facility_type": ["school", "hospital", "school"], + }, + geometry=gpd.points_from_xy([0.5, 0.5, 9.5], [9.5, 20.0, 8.5]), + crs="EPSG:4326", + ) + + impacted = get_impacted_exposure_points(points, mask, transform) + + assert impacted.loc[impacted["name"] == "inside", "impacted"].iloc[0] is True + assert impacted.loc[impacted["name"] == "outside", "impacted"].iloc[0] is False + + +def test_export_impacted_facilities_creates_csv(): + mask = np.zeros((10, 10), dtype=np.uint8) + mask[0, 0] = 1 + + metas = { + "20250211.tif": { + "transform": from_origin(0.0, 10.0, 1.0, 1.0), + "crs": "EPSG:4326", + "height": 10, + "width": 10, + } + } + + points = gpd.GeoDataFrame( + { + "name": ["inside", "outside"], + "latitude": [9.5, 20.0], + "longitude": [0.5, 0.5], + "facility_type": ["hospital", "school"], + }, + geometry=gpd.points_from_xy([0.5, 0.5], [9.5, 20.0]), + crs="EPSG:4326", + ) + + masks = { + "Worst Case": mask, + "Average Case": mask, + "Best Case": mask, + } + + with tempfile.TemporaryDirectory() as tmp_dir: + output_dir = Path(tmp_dir) + csv_path = export_impacted_facilities(masks, metas, "20250211.tif", points, output_dir) + + assert csv_path is not None + assert csv_path.exists() + + df = pd.read_csv(csv_path) + assert "worst_case" in df.columns + assert "average_case" in df.columns + assert "best_case" in df.columns + + inside_row = df[df["name"] == "inside"].iloc[0] + assert inside_row["worst_case"] == True + assert inside_row["average_case"] == True + assert inside_row["best_case"] == True + + outside_row = df[df["name"] == "outside"].iloc[0] + assert outside_row["worst_case"] == False + + +def test_get_exposure_points_loads_csv_crs(): + with tempfile.TemporaryDirectory() as tmp_dir: + hospital_csv = Path(tmp_dir) / "hospitals.csv" + school_csv = Path(tmp_dir) / "schools.csv" + + pd.DataFrame( + { + "name": ["test_hospital"], + "latitude": [9.5], + "longitude": [30.0], + } + ).to_csv(hospital_csv, index=False) + + pd.DataFrame( + { + "name": ["test_school"], + "latitude": [8.5], + "longitude": [30.1], + } + ).to_csv(school_csv, index=False) + + points = get_exposure_points(str(hospital_csv), str(school_csv)) + + assert not points.empty + assert points.crs.to_string() == "EPSG:4326" + assert set(points["facility_type"]) == {"hospital", "school"} + + +@pytest.mark.skip("Full run_full_spatial_analysis depends on JASMIN model data and may not be available in CI") +def test_run_full_spatial_analysis_smoke(): + from model.make_spatial_prediction import run_full_spatial_analysis + + # If this runs, it should not raise and should generate the target CSV + run_full_spatial_analysis() + + # find latest folder + from model.make_spatial_prediction import load_latest_prediction_csv + _, folder_title = load_latest_prediction_csv() + output_csv = Path(f"predictions/{folder_title}/spatial_predictions/impacted_schools_hospitals.csv") + assert output_csv.exists()