diff --git a/Dockerfile b/Dockerfile index 7a4d3b1..c77cfea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,5 +27,13 @@ COPY src src RUN chown -R stormhub:stormhub /usr/src/app USER stormhub +# Cap intra-process thread fan-out so worker memory scales with worker count +# only, not num_workers × cpu_count. See src/worker_sizing.py for context. +ENV DASK_SCHEDULER=synchronous \ + OMP_NUM_THREADS=1 \ + OPENBLAS_NUM_THREADS=1 \ + MKL_NUM_THREADS=1 \ + NUMEXPR_NUM_THREADS=1 + ENTRYPOINT ["python3.12", "-u"] CMD ["src/plugin.py"] diff --git a/README.md b/README.md index 39d63b9..3ae9622 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ A [USACE Cloud Compute](https://github.com/USACE-Cloud-Compute/cloudcompute) plugin that creates storm catalogs from NOAA AORC precipitation data and converts them to HEC-DSS files. ``` -S3 payload --> download-inputs --> process-storms --> convert-to-dss --> upload-outputs +S3 payload --> download-inputs --> process-storms --> convert-to-dss --> create-grid-file --> upload-outputs ``` ## Quick Start @@ -72,9 +72,14 @@ docker compose -f docker-compose.yaml -f docker-compose.mem-limit.yaml run --rm docker compose -f docker-compose.yaml -f docker-compose.mem-limit.yaml run --rm storm-cloud-plugin ``` -With the fix, the resolver reads the cgroup limit and picks 1 worker; -without it, the library would pick 6 and `BrokenProcessPool`. +With the fix, the resolver reads the cgroup limit and picks a safe worker +count; without it, the library would pick 6 and `BrokenProcessPool`. + +**Re-run this repro after bumping the `lib/stormhub` submodule** — it's +the regression test for both the worker-count heuristic and the +thread-cap env vars in the Dockerfile. ## Known Limitations - **stormhub v0.5.0**: Workers hang during storm collection. Pinned to v0.4.0. +- **stormhub thread fan-out**: `num_workers` only caps the *process* pool. Each worker still appears to fan out internally (likely via dask's threaded scheduler in the AORC loader and/or BLAS threads), so peak RSS scales with the container's visible vCPU count even at `num_workers=1`. **Workaround:** in addition to setting `num_workers=1` (payload attribute or `CC_NUM_WORKERS=1`), cap the container's CPU allocation so intra-worker threads can't fan out past what the memory budget tolerates. For a 15 GB cap, `cpus: "4"` (Docker Compose `deploy.resources.limits` or `--cpus 4` on `docker run`) has held under the limit in our runs. Tighten further if OOMs reappear. diff --git a/docker-compose.yaml b/docker-compose.yaml index f488f0e..22ed601 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -60,6 +60,7 @@ services: " storm-cloud-plugin: + image: storm-cloud-plugin:latest build: . depends_on: seed: diff --git a/src/actions/convert_to_dss.py b/src/actions/convert_to_dss.py index 51d9538..426b462 100644 --- a/src/actions/convert_to_dss.py +++ b/src/actions/convert_to_dss.py @@ -62,7 +62,7 @@ def convert_to_dss(ctx: dict[str, Any], action: Any) -> None: attrs = payload.attributes catalog_id = attrs["catalog_id"] output_dir = local_root / catalog_id - dss_dir = output_dir / "dss" + dss_dir = output_dir / "data" dss_dir.mkdir(parents=True, exist_ok=True) watershed_file = str(local_root / Path(payload.inputs[0].paths["watershed"]).name) diff --git a/src/actions/create_grid_file.py b/src/actions/create_grid_file.py new file mode 100644 index 0000000..1198f6a --- /dev/null +++ b/src/actions/create_grid_file.py @@ -0,0 +1,307 @@ +"""Action: create-grid-file — Emit a HEC-HMS Grid Manager (.grid) file for the catalog. + +Uses the older verbose schema (Variant blocks, ``DSS File Name``/``DSS Pathname``, +``Reference Height``, ``Use Lookup Table``) so the file is consumable by HMS 4.x +as well as newer releases. Modern HMS readers still parse Variant blocks for +DSS data sources, so this format is a safe lowest common denominator. + + - 5-space indent on grid sub-keys, 7-space indent inside Variant block + - LF line endings, UTF-8 + - Date "d MMMM yyyy", Time "HH:mm:ss" +""" + +from __future__ import annotations + +import logging +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable + +from pyproj import Transformer + +log = logging.getLogger(__name__) + +INDENT = " " # 5 spaces, matches HMS GridManagerWriter.LARGE_INDENT +VARIANT_INDENT = " " # 7 spaces, nested inside Variant block +GRID_MANAGER_VERSION = "4.11" +FILEPATH_SEPARATOR = "/" +DEFAULT_VARIANT_NAME = "Variant-1" +DEFAULT_REF_HEIGHT = 10.0 +DEFAULT_REF_UNITS = "Meters" + +MAX_FAILURE_RATIO = float(os.environ.get("GRID_MAX_FAILURE_RATIO", "0.5")) + +# USA Contiguous Albers Equal Area Conic (USGS), US survey feet. +# HEC's SHG reference frame — Storm Center X/Y are expected in this projection. +_ALBERS_CRS_WKT = ( + 'PROJCRS["USA_Contiguous_Albers_Equal_Area_Conic_USGS_version",' + 'BASEGEOGCRS["NAD83",DATUM["North American Datum 1983",' + 'ELLIPSOID["GRS 1980",6378137,298.257222101,LENGTHUNIT["metre",1]],' + 'ID["EPSG",6269]],PRIMEM["Greenwich",0,ANGLEUNIT["Degree",0.0174532925199433]]],' + 'CONVERSION["unnamed",METHOD["Albers Equal Area",ID["EPSG",9822]],' + 'PARAMETER["Latitude of false origin",23,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8821]],' + 'PARAMETER["Longitude of false origin",-96,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8822]],' + 'PARAMETER["Latitude of 1st standard parallel",29.5,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8823]],' + 'PARAMETER["Latitude of 2nd standard parallel",45.5,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8824]],' + 'PARAMETER["Easting at false origin",0,LENGTHUNIT["US survey foot",0.304800609601219],ID["EPSG",8826]],' + 'PARAMETER["Northing at false origin",0,LENGTHUNIT["US survey foot",0.304800609601219],ID["EPSG",8827]]],' + 'CS[Cartesian,2],AXIS["(E)",east,ORDER[1],LENGTHUNIT["US survey foot",0.304800609601219,ID["EPSG",9003]]],' + 'AXIS["(N)",north,ORDER[2],LENGTHUNIT["US survey foot",0.304800609601219,ID["EPSG",9003]]]]' +) + + +def _parse_storm_datetime(item: Any) -> datetime | None: + """Replicates convert_to_dss._parse_storm_datetime to pair items with DSS files.""" + try: + return datetime.strptime(item.id, "%Y-%m-%dT%H") + except ValueError: + return item.datetime if getattr(item, "datetime", None) else None + + +def _centroid_lonlat(item: Any) -> tuple[float, float] | None: + """Extract (lon, lat) from item.geometry (GeoJSON Point set in aorc.py:253).""" + geom = getattr(item, "geometry", None) + if not isinstance(geom, dict): + return None + if geom.get("type") != "Point": + return None + coords = geom.get("coordinates") + if not (isinstance(coords, (list, tuple)) and len(coords) >= 2): + return None + try: + return float(coords[0]), float(coords[1]) + except (TypeError, ValueError): + return None + + +def _earliest_dss_paths(dss_file: Path) -> tuple[str | None, str | None]: + """Return earliest PRECIPITATION and TEMPERATURE pathnames in a DSS file.""" + from hecdss import HecDss # runtime dep; not needed for pure-format tests + + precip_path: str | None = None + temp_path: str | None = None + earliest_precip: datetime | None = None + earliest_temp: datetime | None = None + + with HecDss(str(dss_file)) as dss: + for path_obj in dss.get_catalog(): + path_str = str(path_obj) + parts = path_str.strip("/").split("/") + if len(parts) < 6: + continue + part_c = parts[2].upper() + try: + dt = datetime.strptime(parts[3], "%d%b%Y:%H%M") + except ValueError: + continue + if part_c == "PRECIPITATION": + if earliest_precip is None or dt < earliest_precip: + precip_path, earliest_precip = path_str, dt + elif part_c == "TEMPERATURE": + if earliest_temp is None or dt < earliest_temp: + temp_path, earliest_temp = path_str, dt + + return precip_path, temp_path + + +def _render_grid_block( + *, + name: str, + grid_type: str, + modified_date: str, + modified_time: str, + storm_center_xy: tuple[float, float] | None, + dss_filename: str, + dss_pathname: str, +) -> list[str]: + """One grid record in the legacy verbose format (HMS 4.x compatible).""" + lines = [ + f"Grid: {name}\n", + f"{INDENT}Grid Type: {grid_type}\n", + f"{INDENT}Last Modified Date: {modified_date}\n", + f"{INDENT}Last Modified Time: {modified_time}\n", + f"{INDENT}Reference Height Units: {DEFAULT_REF_UNITS}\n", + f"{INDENT}Reference Height: {DEFAULT_REF_HEIGHT}\n", + f"{INDENT}Data Source Type: External DSS\n", + f"{INDENT}Variant: {DEFAULT_VARIANT_NAME}\n", + f"{VARIANT_INDENT}Last Variant Modified Date: {modified_date}\n", + f"{VARIANT_INDENT}Last Variant Modified Time: {modified_time}\n", + f"{VARIANT_INDENT}Default Variant: Yes\n", + f"{VARIANT_INDENT}DSS File Name: {dss_filename}\n", + f"{VARIANT_INDENT}DSS Pathname: {dss_pathname}\n", + f"{INDENT}End Variant: {DEFAULT_VARIANT_NAME}\n", + f"{INDENT}Use Lookup Table: No\n", + ] + if storm_center_xy is not None: + x, y = storm_center_xy + lines.append(f"{INDENT}Storm Center X: {x}\n") + lines.append(f"{INDENT}Storm Center Y: {y}\n") + lines.append("End:\n\n") + return lines + + +def build_grid_file( + entries: Iterable[dict[str, Any]], + *, + manager_name: str, + modified_date: str, + modified_time: str, + transformer: Transformer, +) -> str: + """Compose a full .grid file from ordered entries. + + Each entry: {name, grid_type, dss_filename, dss_pathname, storm_center_lonlat?}. + Caller controls ordering (typically by storm rank). + """ + out: list[str] = [ + f"Grid Manager: {manager_name}\n", + f"{INDENT}Version: {GRID_MANAGER_VERSION}\n", + f"{INDENT}Filepath Separator: {FILEPATH_SEPARATOR}\n", + "End:\n\n", + ] + for e in entries: + xy: tuple[float, float] | None = None + lonlat = e.get("storm_center_lonlat") + if lonlat is not None: + x, y = transformer.transform(lonlat[0], lonlat[1]) + xy = (x, y) + out.extend( + _render_grid_block( + name=e["name"], + grid_type=e["grid_type"], + modified_date=modified_date, + modified_time=modified_time, + storm_center_xy=xy, + dss_filename=e["dss_filename"], + dss_pathname=e["dss_pathname"], + ) + ) + return "".join(out) + + +def create_grid_file(ctx: dict[str, Any], action: Any) -> None: + payload = ctx["payload"] + local_root: Path = ctx["local_root"] + collection = ctx.get("collection") + storm_params = ctx.get("storm_params") + + if collection is None or storm_params is None: + raise RuntimeError( + "create-grid-file requires ctx['collection'] and ctx['storm_params']; " + "ensure 'process-storms' ran earlier in the action list" + ) + + attrs = payload.attributes + catalog_id = attrs["catalog_id"] + storm_duration = storm_params["storm_duration"] + + output_dir = local_root / catalog_id + dss_dir = output_dir / "data" + if not dss_dir.is_dir(): + raise FileNotFoundError( + f"Data directory not found at {dss_dir}; run 'convert-to-dss' first" + ) + + grid_path = output_dir / "catalog.grid" + if grid_path.exists(): + log.info("Skipping — %s already exists", grid_path) + return + + items = list(collection.get_all_items()) + if not items: + raise RuntimeError("No storm items in collection — nothing to grid") + + transformer = Transformer.from_crs("EPSG:4326", _ALBERS_CRS_WKT, always_xy=True) + now = datetime.now(timezone.utc) + modified_date = now.strftime("%d %B %Y").lstrip("0") # "d MMMM yyyy" + modified_time = now.strftime("%H:%M:%S") + + entries: list[dict[str, Any]] = [] + failed: list[str] = [] + + for idx, item in enumerate(items, start=1): + storm_start = _parse_storm_datetime(item) + if storm_start is None: + log.warning("Skipping item %s: unparseable datetime", item.id) + failed.append(item.id) + continue + + date_str = storm_start.strftime("%Y%m%d") + rank_padded = str(idx).zfill(3) + dss_filename = f"{date_str}_{storm_duration}hr_st1_r{rank_padded}.dss" + dss_path = dss_dir / dss_filename + + if not dss_path.exists(): + log.warning("Skipping %s: %s not found", item.id, dss_filename) + failed.append(item.id) + continue + + try: + precip_pn, temp_pn = _earliest_dss_paths(dss_path) + except Exception as e: + log.error("Skipping %s: could not read DSS catalog (%s)", item.id, e) + failed.append(item.id) + continue + + if precip_pn is None and temp_pn is None: + log.warning("Skipping %s: no PRECIPITATION or TEMPERATURE paths", item.id) + failed.append(item.id) + continue + + lonlat = _centroid_lonlat(item) + if lonlat is None: + log.warning( + "No centroid for %s — emitting grid without Storm Center", item.id + ) + + grid_base = dss_filename[:-4] # drop ".dss" + rel_dss = f"data/{dss_filename}" + + if precip_pn is not None: + entries.append( + { + "name": grid_base, + "grid_type": "Precipitation", + "dss_filename": rel_dss, + "dss_pathname": precip_pn, + "storm_center_lonlat": lonlat, + } + ) + if temp_pn is not None: + entries.append( + { + "name": grid_base, + "grid_type": "Temperature", + "dss_filename": rel_dss, + "dss_pathname": temp_pn, + "storm_center_lonlat": lonlat, + } + ) + + total = len(items) + n_failed = len(failed) + if n_failed == total: + raise RuntimeError( + f"All {total} storms failed grid entry construction: {failed}" + ) + if total > 0 and n_failed / total > MAX_FAILURE_RATIO: + raise RuntimeError( + f"Grid entry failure rate {n_failed}/{total} " + f"({n_failed / total:.0%}) exceeds threshold ({MAX_FAILURE_RATIO:.0%}): {failed}" + ) + + text = build_grid_file( + entries, + manager_name=catalog_id, + modified_date=modified_date, + modified_time=modified_time, + transformer=transformer, + ) + grid_path.write_text(text, encoding="utf-8", newline="\n") + log.info( + "Wrote %s (%d grid records, %d storms)", + grid_path, + len(entries), + total - n_failed, + ) diff --git a/src/plugin.py b/src/plugin.py index e081fa4..fc3c25c 100644 --- a/src/plugin.py +++ b/src/plugin.py @@ -23,6 +23,7 @@ from actions.download_inputs import download_inputs from actions.process_storms import process_storms from actions.convert_to_dss import convert_to_dss +from actions.create_grid_file import create_grid_file from actions.upload_outputs import upload_outputs @@ -81,6 +82,7 @@ class ExitCode(IntEnum): "download-inputs": download_inputs, "process-storms": process_storms, "convert-to-dss": convert_to_dss, + "create-grid-file": create_grid_file, "upload-outputs": upload_outputs, } diff --git a/src/worker_sizing.py b/src/worker_sizing.py index 7770426..513f8a8 100644 --- a/src/worker_sizing.py +++ b/src/worker_sizing.py @@ -4,6 +4,11 @@ which inside a container reads the *host* CPU count and can exceed the cgroup memory ceiling — causing OOM-driven ``BrokenProcessPool``. This module picks a safe count from the cgroup limit, with operator overrides. + +Assumes each worker runs single-threaded: dask's synchronous scheduler +and ``*_NUM_THREADS=1`` are set in the image (see Dockerfile). Without +those, per-worker RSS would also scale with visible vCPU count and this +heuristic would under-count memory pressure. """ from __future__ import annotations @@ -14,9 +19,10 @@ log = logging.getLogger(__name__) -# Per-worker memory budget: observed ~1–1.5 GB on trinity + 72 hr AORC; -# 2 GB absorbs transient spikes. -PER_WORKER_MB = 2048 +# Per-worker memory budget. With threads capped at 1, observed ~1.5 GB on +# a 72 hr AORC slice; 3 GB absorbs transient spikes and unmeasured headroom +# for larger domains. +PER_WORKER_MB = 3072 CGROUP_MEM_MAX = "/sys/fs/cgroup/memory.max" diff --git a/test/examples/payload-repro.json b/test/examples/payload-repro.json index 8b94f06..f76de1d 100644 --- a/test/examples/payload-repro.json +++ b/test/examples/payload-repro.json @@ -40,6 +40,7 @@ { "name": "download-inputs", "type": "utils", "description": "Download geometries", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, { "name": "process-storms", "type": "run", "description": "Create STAC storm catalog", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, { "name": "convert-to-dss", "type": "extract", "description": "Convert Zarr to HEC-DSS", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, + { "name": "create-grid-file", "type": "run", "description": "Emit HEC-HMS Grid Manager file", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, { "name": "upload-outputs", "type": "utils", "description": "Upload results", "attributes": {}, "stores": [], "inputs": [], "outputs": [] } ] } diff --git a/test/examples/payload.json b/test/examples/payload.json index d1f02c4..ba58a53 100644 --- a/test/examples/payload.json +++ b/test/examples/payload.json @@ -40,6 +40,7 @@ { "name": "download-inputs", "type": "utils", "description": "Download geometries", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, { "name": "process-storms", "type": "run", "description": "Create STAC storm catalog", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, { "name": "convert-to-dss", "type": "extract", "description": "Convert Zarr to HEC-DSS", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, + { "name": "create-grid-file", "type": "run", "description": "Emit HEC-HMS Grid Manager file", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, { "name": "upload-outputs", "type": "utils", "description": "Upload results", "attributes": {}, "stores": [], "inputs": [], "outputs": [] } ] } diff --git a/test/test_create_grid_file.py b/test/test_create_grid_file.py new file mode 100644 index 0000000..4930835 --- /dev/null +++ b/test/test_create_grid_file.py @@ -0,0 +1,101 @@ +"""Unit tests for create_grid_file.build_grid_file.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest +from pyproj import Transformer + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) + +from actions.create_grid_file import ( # noqa: E402 + _ALBERS_CRS_WKT, + build_grid_file, +) + + +@pytest.fixture +def transformer() -> Transformer: + return Transformer.from_crs("EPSG:4326", _ALBERS_CRS_WKT, always_xy=True) + + +def _entry(name: str, grid_type: str, lonlat=None) -> dict: + return { + "name": name, + "grid_type": grid_type, + "dss_filename": f"data/{name}.dss", + "dss_pathname": f"/SHG4K/TRINITY/{grid_type.upper()}/01JAN2020:0000/01JAN2020:0100/AORC/", + "storm_center_lonlat": lonlat, + } + + +def test_header_and_trailing_blank_line(transformer): + text = build_grid_file( + [], manager_name="cat-1", modified_date="1 January 2020", + modified_time="00:00:00", transformer=transformer, + ) + assert text.startswith("Grid Manager: cat-1\n") + assert " Version: 4.11\n" in text + assert " Filepath Separator: /\n" in text + assert text.endswith("End:\n\n") + + +def test_variant_block_and_legacy_keys(transformer): + text = build_grid_file( + [_entry("storm1", "Precipitation", (-90.0, 31.0))], + manager_name="cat-1", modified_date="1 January 2020", + modified_time="12:34:56", transformer=transformer, + ) + assert "Grid: storm1\n" in text + assert " Grid Type: Precipitation\n" in text + assert " Reference Height Units: Meters\n" in text + assert " Reference Height: 10.0\n" in text + assert " Data Source Type: External DSS\n" in text + assert " Variant: Variant-1\n" in text + assert " Default Variant: Yes\n" in text + assert " DSS File Name: data/storm1.dss\n" in text + assert " DSS Pathname: /SHG4K/TRINITY/PRECIPITATION/" in text + assert " End Variant: Variant-1\n" in text + assert " Use Lookup Table: No\n" in text + + +def test_storm_center_projected_to_albers(transformer): + text = build_grid_file( + [_entry("s", "Precipitation", (-96.0, 23.0))], # Albers false origin + manager_name="c", modified_date="1 January 2020", + modified_time="00:00:00", transformer=transformer, + ) + # False origin → (0, 0) in Albers, regardless of units + assert " Storm Center X: 0" in text + assert " Storm Center Y: 0" in text + + +def test_missing_centroid_omits_storm_center(transformer): + text = build_grid_file( + [_entry("s", "Precipitation", None)], + manager_name="c", modified_date="1 January 2020", + modified_time="00:00:00", transformer=transformer, + ) + assert "Storm Center X" not in text + assert "Storm Center Y" not in text + + +def test_lf_line_endings(transformer): + text = build_grid_file( + [_entry("s", "Temperature", (-90.0, 31.0))], + manager_name="c", modified_date="1 January 2020", + modified_time="00:00:00", transformer=transformer, + ) + assert "\r\n" not in text + + +def test_multiple_entries_each_end_with_end_marker(transformer): + text = build_grid_file( + [_entry("a", "Precipitation"), _entry("a", "Temperature")], + manager_name="c", modified_date="1 January 2020", + modified_time="00:00:00", transformer=transformer, + ) + # Header End: + two grid End: markers = 3 total + assert text.count("\nEnd:\n") + text.startswith("End:\n") == 3 diff --git a/test/test_worker_sizing.py b/test/test_worker_sizing.py index 5320837..8ccfba9 100644 --- a/test/test_worker_sizing.py +++ b/test/test_worker_sizing.py @@ -42,11 +42,13 @@ def test_empty_attribute_falls_through(no_cgroup): def test_auto_sizes_from_cgroup(monkeypatch): monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: 15000) - assert worker_sizing.resolve_num_workers({}) == 7 # 15000 // 2048 + # 15000 // 3072 == 4 — with thread caps in the image, workers fit + # memory only, independent of visible CPU count. + assert worker_sizing.resolve_num_workers({}) == 4 def test_auto_floors_at_one_when_budget_below_per_worker(monkeypatch): - monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: 512) + monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: 2048) assert worker_sizing.resolve_num_workers({}) == 1