From cb7b910301d6af810fc3ed8fa7294e1d7beb61e4 Mon Sep 17 00:00:00 2001 From: Nghiem Date: Tue, 14 Apr 2026 21:14:24 +0000 Subject: [PATCH] fix: cap worker pool to cgroup memory budget MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The vendored stormhub library defaulted to os.cpu_count()-2 workers, which inside a container reads the host CPU count and can exceed the cgroup memory ceiling — causing OOM-driven BrokenProcessPool on hosts with high CPU count relative to the container's memory budget. Resolve num_workers from: payload attribute > CC_NUM_WORKERS env > cgroup memory limit / 2 GB per worker > 1 (safe fallback). Plumb the resolved value into new_collection and translate BrokenProcessPool into an actionable RuntimeError. Includes a local reproducer (test/examples/payload-repro.json plus docker-compose.mem-limit.yaml capping the container at 3 GB) that deterministically triggers the original failure without the fix. --- README.md | 22 ++++++++ docker-compose.mem-limit.yaml | 4 ++ src/actions/process_storms.py | 17 ++++-- src/worker_sizing.py | 57 ++++++++++++++++++++ test/examples/payload-repro.json | 45 ++++++++++++++++ test/test_worker_sizing.py | 89 ++++++++++++++++++++++++++++++++ 6 files changed, 230 insertions(+), 4 deletions(-) create mode 100644 docker-compose.mem-limit.yaml create mode 100644 src/worker_sizing.py create mode 100644 test/examples/payload-repro.json create mode 100644 test/test_worker_sizing.py diff --git a/README.md b/README.md index 76fc01c..39d63b9 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,11 @@ python run.py # Builds image, starts MinIO, runs plugin (~2 min first r Results at http://localhost:9001 (ccuser/ccpassword). +> Local dev runs serialize storm-search by default (1 worker) because +> no container memory limit is enforced. For a faster loop, set +> `CC_NUM_WORKERS=4` in `test/local.env` or pass `num_workers` in +> the payload `attributes`. + ## Custom Payloads Edit `test/examples/payload.json` or copy it and pass the path: @@ -38,6 +43,7 @@ Storm parameters are in `attributes`. All values are strings (CC SDK convention) | `min_precip_threshold` | no | `"0.0"` | Minimum mean precipitation (mm) | | `check_every_n_hours` | no | `"24"` | How often to sample storm start times | | `specific_dates` | no | | JSON array of dates to force-include | +| `num_workers` | no | auto | Parallel workers for storm search. Auto-sized from container memory (cgroup). Use `CC_NUM_WORKERS` env for a fleet default. Falls back to 1 worker when no memory limit is set. | | `input_path` | yes | | S3 path to watershed/transposition geometries | | `output_path` | yes | | S3 path for results | @@ -53,6 +59,22 @@ python run.py clean # Remove containers, volumes, Local/ python run.py down # Stop containers ``` +## Reproducing the OOM Failure Mode + +The vendored stormhub library would spawn `os.cpu_count() - 2` workers, +which inside a container reads the *host* CPU count and can exceed the +container's memory ceiling. To reproduce the original failure under a +3 GB cap: + +```bash +docker compose -f docker-compose.yaml -f docker-compose.mem-limit.yaml build +docker compose -f docker-compose.yaml -f docker-compose.mem-limit.yaml run --rm seed +docker compose -f docker-compose.yaml -f docker-compose.mem-limit.yaml run --rm storm-cloud-plugin +``` + +With the fix, the resolver reads the cgroup limit and picks 1 worker; +without it, the library would pick 6 and `BrokenProcessPool`. + ## Known Limitations - **stormhub v0.5.0**: Workers hang during storm collection. Pinned to v0.4.0. diff --git a/docker-compose.mem-limit.yaml b/docker-compose.mem-limit.yaml new file mode 100644 index 0000000..cadf267 --- /dev/null +++ b/docker-compose.mem-limit.yaml @@ -0,0 +1,4 @@ +services: + storm-cloud-plugin: + mem_limit: 3g + memswap_limit: 3g diff --git a/src/actions/process_storms.py b/src/actions/process_storms.py index f61f836..dcf3ef4 100644 --- a/src/actions/process_storms.py +++ b/src/actions/process_storms.py @@ -5,11 +5,14 @@ import json import logging import os +from concurrent.futures.process import BrokenProcessPool from pathlib import Path from typing import Any from stormhub.met.storm_catalog import StormCatalog, new_catalog, new_collection +from worker_sizing import resolve_num_workers + log = logging.getLogger(__name__) @@ -65,6 +68,7 @@ def process_storms(ctx: dict[str, Any], action: Any) -> None: "min_precip_threshold": float(attrs.get("min_precip_threshold", "0.0")), "top_n_events": int(attrs.get("top_n_events", "10")), "check_every_n_hours": int(attrs.get("check_every_n_hours", "24")), + "num_workers": resolve_num_workers(attrs), "specific_dates": json.loads(attrs["specific_dates"]) if attrs.get("specific_dates") else [], @@ -83,11 +87,16 @@ def process_storms(ctx: dict[str, Any], action: Any) -> None: catalog_description=attrs["catalog_description"], ) - collection = new_collection(catalog, **storm_params) - if collection is None: + try: + collection = new_collection(catalog, **storm_params) + except BrokenProcessPool as e: raise RuntimeError( - "new_collection returned None — no storms found matching criteria" - ) + f"Storm processing pool died with num_workers=" + f"{storm_params['num_workers']} (likely OOM). Lower via " + "'num_workers' payload attribute or CC_NUM_WORKERS env." + ) from e + if collection is None: + raise RuntimeError("no storms found matching criteria") log.info("Catalog and collection ready") diff --git a/src/worker_sizing.py b/src/worker_sizing.py new file mode 100644 index 0000000..7770426 --- /dev/null +++ b/src/worker_sizing.py @@ -0,0 +1,57 @@ +"""Pick a worker count the container can afford. + +The vendored stormhub library defaults to ``os.cpu_count() - 2`` workers, +which inside a container reads the *host* CPU count and can exceed the +cgroup memory ceiling — causing OOM-driven ``BrokenProcessPool``. This +module picks a safe count from the cgroup limit, with operator overrides. +""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path + +log = logging.getLogger(__name__) + +# Per-worker memory budget: observed ~1–1.5 GB on trinity + 72 hr AORC; +# 2 GB absorbs transient spikes. +PER_WORKER_MB = 2048 + +CGROUP_MEM_MAX = "/sys/fs/cgroup/memory.max" + + +def resolve_num_workers(attrs: dict) -> int: + """Payload attribute > CC_NUM_WORKERS env > cgroup-derived > 1.""" + source, n = _resolve(attrs) + log.info("num_workers=%d (%s)", n, source) + return n + + +def _resolve(attrs: dict) -> tuple[str, int]: + if attrs.get("num_workers"): + return "from payload attribute", max(1, int(attrs["num_workers"])) + if os.environ.get("CC_NUM_WORKERS"): + return "from CC_NUM_WORKERS env", max(1, int(os.environ["CC_NUM_WORKERS"])) + mem_mb = _cgroup_mem_limit_mb() + if mem_mb is None: + return "cgroup unset — fallback", 1 + return "auto-sized from cgroup", max(1, mem_mb // PER_WORKER_MB) + + +def _cgroup_mem_limit_mb() -> int | None: + """Read cgroup v2 ``memory.max`` in MiB, or None if unlimited/absent.""" + try: + raw = Path(CGROUP_MEM_MAX).read_text(encoding="utf-8").strip() + except (FileNotFoundError, OSError): + return None + if raw == "max": + return None + try: + bytes_ = int(raw) + except ValueError: + return None + # Kernel sentinels for "no limit" are huge. + if bytes_ <= 0 or bytes_ >= (1 << 62): + return None + return bytes_ // (1024 * 1024) diff --git a/test/examples/payload-repro.json b/test/examples/payload-repro.json new file mode 100644 index 0000000..8b94f06 --- /dev/null +++ b/test/examples/payload-repro.json @@ -0,0 +1,45 @@ +{ + "attributes": { + "catalog_id": "repro-brokenpool", + "catalog_description": "Reproduce BrokenProcessPool — wide search, many workers, tight memory", + "start_date": "2022-11-02", + "end_date": "2022-12-01", + "storm_duration": "72", + "top_n_events": "10", + "min_precip_threshold": "0", + "check_every_n_hours": "12", + "input_path": "conformance/storm-catalog", + "output_path": "conformance/storm-catalog/generated-outputs" + }, + "stores": [ + { + "name": "StormHubStore", + "store_type": "S3", + "profile": "FFRD", + "params": { "root": "/model-library/ffrd-trinity" } + } + ], + "inputs": [ + { + "name": "StormHubInputs", + "paths": { + "transposition": "{ATTR::input_path}/transposition-domain.geojson", + "watershed": "{ATTR::input_path}/watershed-boundary.geojson" + }, + "store_name": "StormHubStore" + } + ], + "outputs": [ + { + "name": "StormHubOutputs", + "paths": { "root": "{ATTR::output_path}" }, + "store_name": "StormHubStore" + } + ], + "actions": [ + { "name": "download-inputs", "type": "utils", "description": "Download geometries", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, + { "name": "process-storms", "type": "run", "description": "Create STAC storm catalog", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, + { "name": "convert-to-dss", "type": "extract", "description": "Convert Zarr to HEC-DSS", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }, + { "name": "upload-outputs", "type": "utils", "description": "Upload results", "attributes": {}, "stores": [], "inputs": [], "outputs": [] } + ] +} diff --git a/test/test_worker_sizing.py b/test/test_worker_sizing.py new file mode 100644 index 0000000..5320837 --- /dev/null +++ b/test/test_worker_sizing.py @@ -0,0 +1,89 @@ +"""Unit tests for worker_sizing.resolve_num_workers.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) + +import worker_sizing # noqa: E402 + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + monkeypatch.delenv("CC_NUM_WORKERS", raising=False) + + +@pytest.fixture +def no_cgroup(monkeypatch): + monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: None) + + +def test_payload_attribute_wins(monkeypatch, no_cgroup): + monkeypatch.setenv("CC_NUM_WORKERS", "7") + assert worker_sizing.resolve_num_workers({"num_workers": "3"}) == 3 + + +def test_payload_attribute_floors_at_one(no_cgroup): + assert worker_sizing.resolve_num_workers({"num_workers": "0"}) == 1 + + +def test_env_used_when_no_attribute(monkeypatch, no_cgroup): + monkeypatch.setenv("CC_NUM_WORKERS", "5") + assert worker_sizing.resolve_num_workers({}) == 5 + + +def test_empty_attribute_falls_through(no_cgroup): + assert worker_sizing.resolve_num_workers({"num_workers": ""}) == 1 + + +def test_auto_sizes_from_cgroup(monkeypatch): + monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: 15000) + assert worker_sizing.resolve_num_workers({}) == 7 # 15000 // 2048 + + +def test_auto_floors_at_one_when_budget_below_per_worker(monkeypatch): + monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: 512) + assert worker_sizing.resolve_num_workers({}) == 1 + + +def test_fallback_to_one_when_cgroup_unset(no_cgroup): + assert worker_sizing.resolve_num_workers({}) == 1 + + +def _patch_cgroup_read(monkeypatch, contents): + class FakePath: + def __init__(self, *_): pass + def read_text(self, **_): return contents + monkeypatch.setattr(worker_sizing, "Path", FakePath) + + +def test_cgroup_max_means_unlimited(monkeypatch): + _patch_cgroup_read(monkeypatch, "max\n") + assert worker_sizing._cgroup_mem_limit_mb() is None + + +def test_cgroup_bytes_converted(monkeypatch): + _patch_cgroup_read(monkeypatch, f"{3 * 1024 * 1024 * 1024}\n") + assert worker_sizing._cgroup_mem_limit_mb() == 3072 + + +def test_cgroup_huge_sentinel_treated_as_unlimited(monkeypatch): + _patch_cgroup_read(monkeypatch, str(1 << 63)) + assert worker_sizing._cgroup_mem_limit_mb() is None + + +def test_cgroup_missing_returns_none(monkeypatch): + class MissingPath: + def __init__(self, *_): pass + def read_text(self, **_): raise FileNotFoundError + monkeypatch.setattr(worker_sizing, "Path", MissingPath) + assert worker_sizing._cgroup_mem_limit_mb() is None + + +def test_cgroup_malformed_returns_none(monkeypatch): + _patch_cgroup_read(monkeypatch, "garbage") + assert worker_sizing._cgroup_mem_limit_mb() is None