Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion model/make_spatial_prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,113 @@ def align_mask(mask_array, src_transform, src_crs, metas, inundation_file):
return aligned


def _resolve_latlon_columns(df):
lat_cols = ['latitude', 'lat', 'Latitude', 'LAT', 'Y', 'y']
lon_cols = ['longitude', 'lon', 'Longitude', 'LON', 'X', 'x']
lat_col = next((c for c in lat_cols if c in df.columns), None)
lon_col = next((c for c in lon_cols if c in df.columns), None)
if lat_col is None or lon_col is None:
raise ValueError("Could not identify latitude/longitude columns in exposure file")
return lat_col, lon_col


def load_exposure_gdf(filepath, facility_type):
if filepath.lower().endswith('.csv'):
df = pd.read_csv(filepath)
elif filepath.lower().endswith('.xlsx') or filepath.lower().endswith('.xls'):
df = pd.read_excel(filepath)
else:
raise ValueError(f"Unsupported exposure file format for {filepath}")

lat_col, lon_col = _resolve_latlon_columns(df)
name_col = next((c for c in ['name', 'Name', 'facility_name', 'Facility Name'] if c in df.columns), None)
if name_col is None:
df['name'] = f"unknown_{facility_type}"
name_col = 'name'

gdf = gpd.GeoDataFrame(
df.assign(geometry=gpd.points_from_xy(df[lon_col], df[lat_col])),
geometry='geometry',
crs='EPSG:4326'
)
gdf = gdf.rename(columns={name_col: 'name'})
gdf = gdf[['name', lat_col, lon_col, 'geometry']].copy()
gdf['facility_type'] = facility_type
gdf = gdf.rename(columns={lat_col: 'latitude', lon_col: 'longitude'})
return gdf


def get_exposure_points(hospital_path, school_path):
nodes = []
for path, facility_type in [(hospital_path, 'hospital'), (school_path, 'school')]:
if not os.path.exists(path):
logger.warning(f"Exposure file not found; skipping: {path}")
continue
try:
gdf = load_exposure_gdf(path, facility_type)
nodes.append(gdf)
logger.info(f"Loaded {len(gdf)} {facility_type} records from {path}")
except Exception as e:
logger.warning(f"Failed to load exposure data from {path}: {e}")

if not nodes:
logger.warning("No exposure points loaded; no impact CSV will be created.")
return gpd.GeoDataFrame(columns=['name', 'latitude', 'longitude', 'facility_type', 'geometry'])

all_points = pd.concat(nodes, ignore_index=True)
return gpd.GeoDataFrame(all_points, geometry='geometry', crs='EPSG:4326')


def get_impacted_exposure_points(points_gdf, aligned_mask, transform):
if points_gdf.empty:
return pd.DataFrame(columns=['name', 'latitude', 'longitude', 'facility_type', 'impacted'])

# We assume points_gdf is already in the same CRS as the mask transform.
points = points_gdf

xs = points.geometry.x.to_list()
ys = points.geometry.y.to_list()

# rasterio.transform.rowcol returns (row, col) based on coordinate-to-pixel mapping.
rows, cols = rasterio.transform.rowcol(transform, xs, ys, op=np.floor)

impacted = []
for row, col in zip(rows, cols):
if 0 <= row < aligned_mask.shape[0] and 0 <= col < aligned_mask.shape[1]:
impacted.append(bool(aligned_mask[row, col] == 1))
else:
impacted.append(False)

out = points.copy()
out['impacted'] = impacted
return out


def export_impacted_facilities(masks, metas, inundation_file, points_gdf, output_dir):
if points_gdf.empty:
logger.warning("No exposure points found, skipping impact CSV generation.")
return None

target_crs = metas[inundation_file]['crs']
target_transform = metas[inundation_file]['transform']

points_projected = points_gdf.to_crs(target_crs)
result = points_projected[['name', 'latitude', 'longitude', 'facility_type']].copy()

for scenario in ['Worst Case', 'Average Case', 'Best Case']:
aligned = align_mask((masks[scenario] > 0).astype(np.uint8),
metas[inundation_file]['transform'],
metas[inundation_file]['crs'],
metas, inundation_file)
impacted = get_impacted_exposure_points(points_projected, aligned, metas[inundation_file]['transform'])
result[f'{scenario.lower().replace(" ", "_")}'] = impacted['impacted'].values

csv_path = output_dir / 'impacted_schools_hospitals.csv'
result.to_csv(csv_path, index=False)
logger.info(f"Saved impacted facility list: {csv_path}")
return csv_path


def plot_flood_change_map(
masks, current_extent, transform, crs, regions_gdf,
inundation_clipped, metas, inundation_file='20250211.tif',
Expand Down Expand Up @@ -834,4 +941,13 @@ def run_full_spatial_analysis():
print(f"Error plotting {region}: {e}")

# Export for QGIS
export_qgis_files(masks, current_extent, transform, crs, regions_gdf, folder_title, metas, inundation_file='20250211.tif')
export_qgis_files(masks, current_extent, transform, crs, regions_gdf, folder_title, metas, inundation_file='20250211.tif')

# --- Exposure impact reporting for schools and hospitals ---
hospital_csv = 'data/downloads/hospitals.csv'
school_csv = 'data/downloads/schools.csv'

exposure_points = get_exposure_points(hospital_csv, school_csv)
export_impacted_facilities(masks, metas, '20250211.tif', exposure_points, output_dir)

logger.info('run_full_spatial_analysis completed: flood maps and exposure impact report generated.')
131 changes: 131 additions & 0 deletions model/test_spatial_prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import os
import tempfile
import numpy as np
import pandas as pd
import geopandas as gpd
import pytest
from pathlib import Path
from rasterio.transform import from_origin

from model.make_spatial_prediction import (
get_impacted_exposure_points,
get_exposure_points,
export_impacted_facilities,
)


def test_get_impacted_exposure_points_basic():
mask = np.zeros((10, 10), dtype=np.uint8)
mask[0, 0] = 1

# This transform maps raster row 0 to y=9..10 and col 0 to x=0..1
transform = from_origin(0.0, 10.0, 1.0, 1.0)

points = gpd.GeoDataFrame(
{
"name": ["inside", "outside", "near_edge"],
"latitude": [9.5, 20.0, 8.5],
"longitude": [0.5, 0.5, 9.5],
"facility_type": ["school", "hospital", "school"],
},
geometry=gpd.points_from_xy([0.5, 0.5, 9.5], [9.5, 20.0, 8.5]),
crs="EPSG:4326",
)

impacted = get_impacted_exposure_points(points, mask, transform)

assert impacted.loc[impacted["name"] == "inside", "impacted"].iloc[0] is True
assert impacted.loc[impacted["name"] == "outside", "impacted"].iloc[0] is False


def test_export_impacted_facilities_creates_csv():
mask = np.zeros((10, 10), dtype=np.uint8)
mask[0, 0] = 1

metas = {
"20250211.tif": {
"transform": from_origin(0.0, 10.0, 1.0, 1.0),
"crs": "EPSG:4326",
"height": 10,
"width": 10,
}
}

points = gpd.GeoDataFrame(
{
"name": ["inside", "outside"],
"latitude": [9.5, 20.0],
"longitude": [0.5, 0.5],
"facility_type": ["hospital", "school"],
},
geometry=gpd.points_from_xy([0.5, 0.5], [9.5, 20.0]),
crs="EPSG:4326",
)

masks = {
"Worst Case": mask,
"Average Case": mask,
"Best Case": mask,
}

with tempfile.TemporaryDirectory() as tmp_dir:
output_dir = Path(tmp_dir)
csv_path = export_impacted_facilities(masks, metas, "20250211.tif", points, output_dir)

assert csv_path is not None
assert csv_path.exists()

df = pd.read_csv(csv_path)
assert "worst_case" in df.columns
assert "average_case" in df.columns
assert "best_case" in df.columns

inside_row = df[df["name"] == "inside"].iloc[0]
assert inside_row["worst_case"] == True
assert inside_row["average_case"] == True
assert inside_row["best_case"] == True

outside_row = df[df["name"] == "outside"].iloc[0]
assert outside_row["worst_case"] == False


def test_get_exposure_points_loads_csv_crs():
with tempfile.TemporaryDirectory() as tmp_dir:
hospital_csv = Path(tmp_dir) / "hospitals.csv"
school_csv = Path(tmp_dir) / "schools.csv"

pd.DataFrame(
{
"name": ["test_hospital"],
"latitude": [9.5],
"longitude": [30.0],
}
).to_csv(hospital_csv, index=False)

pd.DataFrame(
{
"name": ["test_school"],
"latitude": [8.5],
"longitude": [30.1],
}
).to_csv(school_csv, index=False)

points = get_exposure_points(str(hospital_csv), str(school_csv))

assert not points.empty
assert points.crs.to_string() == "EPSG:4326"
assert set(points["facility_type"]) == {"hospital", "school"}


@pytest.mark.skip("Full run_full_spatial_analysis depends on JASMIN model data and may not be available in CI")
def test_run_full_spatial_analysis_smoke():
from model.make_spatial_prediction import run_full_spatial_analysis

# If this runs, it should not raise and should generate the target CSV
run_full_spatial_analysis()

# find latest folder
from model.make_spatial_prediction import load_latest_prediction_csv
_, folder_title = load_latest_prediction_csv()
output_csv = Path(f"predictions/{folder_title}/spatial_predictions/impacted_schools_hospitals.csv")
assert output_csv.exists()