Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ python run.py # Builds image, starts MinIO, runs plugin (~2 min first r

Results at http://localhost:9001 (ccuser/ccpassword).

> Local dev runs serialize storm-search by default (1 worker) because
> no container memory limit is enforced. For a faster loop, set
> `CC_NUM_WORKERS=4` in `test/local.env` or pass `num_workers` in
> the payload `attributes`.

## Custom Payloads

Edit `test/examples/payload.json` or copy it and pass the path:
Expand All @@ -38,6 +43,7 @@ Storm parameters are in `attributes`. All values are strings (CC SDK convention)
| `min_precip_threshold` | no | `"0.0"` | Minimum mean precipitation (mm) |
| `check_every_n_hours` | no | `"24"` | How often to sample storm start times |
| `specific_dates` | no | | JSON array of dates to force-include |
| `num_workers` | no | auto | Parallel workers for storm search. Auto-sized from container memory (cgroup). Use `CC_NUM_WORKERS` env for a fleet default. Falls back to 1 worker when no memory limit is set. |
| `input_path` | yes | | S3 path to watershed/transposition geometries |
| `output_path` | yes | | S3 path for results |

Expand All @@ -53,6 +59,22 @@ python run.py clean # Remove containers, volumes, Local/
python run.py down # Stop containers
```

## Reproducing the OOM Failure Mode

The vendored stormhub library would spawn `os.cpu_count() - 2` workers,
which inside a container reads the *host* CPU count and can exceed the
container's memory ceiling. To reproduce the original failure under a
3 GB cap:

```bash
docker compose -f docker-compose.yaml -f docker-compose.mem-limit.yaml build
docker compose -f docker-compose.yaml -f docker-compose.mem-limit.yaml run --rm seed
docker compose -f docker-compose.yaml -f docker-compose.mem-limit.yaml run --rm storm-cloud-plugin
```

With the fix, the resolver reads the cgroup limit and picks 1 worker;
without it, the library would pick 6 and `BrokenProcessPool`.

## Known Limitations

- **stormhub v0.5.0**: Workers hang during storm collection. Pinned to v0.4.0.
4 changes: 4 additions & 0 deletions docker-compose.mem-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
services:
storm-cloud-plugin:
mem_limit: 3g
memswap_limit: 3g
17 changes: 13 additions & 4 deletions src/actions/process_storms.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import json
import logging
import os
from concurrent.futures.process import BrokenProcessPool
from pathlib import Path
from typing import Any

from stormhub.met.storm_catalog import StormCatalog, new_catalog, new_collection

from worker_sizing import resolve_num_workers

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -65,6 +68,7 @@ def process_storms(ctx: dict[str, Any], action: Any) -> None:
"min_precip_threshold": float(attrs.get("min_precip_threshold", "0.0")),
"top_n_events": int(attrs.get("top_n_events", "10")),
"check_every_n_hours": int(attrs.get("check_every_n_hours", "24")),
"num_workers": resolve_num_workers(attrs),
"specific_dates": json.loads(attrs["specific_dates"])
if attrs.get("specific_dates")
else [],
Expand All @@ -83,11 +87,16 @@ def process_storms(ctx: dict[str, Any], action: Any) -> None:
catalog_description=attrs["catalog_description"],
)

collection = new_collection(catalog, **storm_params)
if collection is None:
try:
collection = new_collection(catalog, **storm_params)
except BrokenProcessPool as e:
raise RuntimeError(
"new_collection returned None — no storms found matching criteria"
)
f"Storm processing pool died with num_workers="
f"{storm_params['num_workers']} (likely OOM). Lower via "
"'num_workers' payload attribute or CC_NUM_WORKERS env."
) from e
if collection is None:
raise RuntimeError("no storms found matching criteria")

log.info("Catalog and collection ready")

Expand Down
57 changes: 57 additions & 0 deletions src/worker_sizing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Pick a worker count the container can afford.

The vendored stormhub library defaults to ``os.cpu_count() - 2`` workers,
which inside a container reads the *host* CPU count and can exceed the
cgroup memory ceiling — causing OOM-driven ``BrokenProcessPool``. This
module picks a safe count from the cgroup limit, with operator overrides.
"""

from __future__ import annotations

import logging
import os
from pathlib import Path

log = logging.getLogger(__name__)

# Per-worker memory budget: observed ~1–1.5 GB on trinity + 72 hr AORC;
# 2 GB absorbs transient spikes.
PER_WORKER_MB = 2048

CGROUP_MEM_MAX = "/sys/fs/cgroup/memory.max"


def resolve_num_workers(attrs: dict) -> int:
"""Payload attribute > CC_NUM_WORKERS env > cgroup-derived > 1."""
source, n = _resolve(attrs)
log.info("num_workers=%d (%s)", n, source)
return n


def _resolve(attrs: dict) -> tuple[str, int]:
if attrs.get("num_workers"):
return "from payload attribute", max(1, int(attrs["num_workers"]))
if os.environ.get("CC_NUM_WORKERS"):
return "from CC_NUM_WORKERS env", max(1, int(os.environ["CC_NUM_WORKERS"]))
mem_mb = _cgroup_mem_limit_mb()
if mem_mb is None:
return "cgroup unset — fallback", 1
return "auto-sized from cgroup", max(1, mem_mb // PER_WORKER_MB)


def _cgroup_mem_limit_mb() -> int | None:
"""Read cgroup v2 ``memory.max`` in MiB, or None if unlimited/absent."""
try:
raw = Path(CGROUP_MEM_MAX).read_text(encoding="utf-8").strip()
except (FileNotFoundError, OSError):
return None
if raw == "max":
return None
try:
bytes_ = int(raw)
except ValueError:
return None
# Kernel sentinels for "no limit" are huge.
if bytes_ <= 0 or bytes_ >= (1 << 62):
return None
return bytes_ // (1024 * 1024)
45 changes: 45 additions & 0 deletions test/examples/payload-repro.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"attributes": {
"catalog_id": "repro-brokenpool",
"catalog_description": "Reproduce BrokenProcessPool — wide search, many workers, tight memory",
"start_date": "2022-11-02",
"end_date": "2022-12-01",
"storm_duration": "72",
"top_n_events": "10",
"min_precip_threshold": "0",
"check_every_n_hours": "12",
"input_path": "conformance/storm-catalog",
"output_path": "conformance/storm-catalog/generated-outputs"
},
"stores": [
{
"name": "StormHubStore",
"store_type": "S3",
"profile": "FFRD",
"params": { "root": "/model-library/ffrd-trinity" }
}
],
"inputs": [
{
"name": "StormHubInputs",
"paths": {
"transposition": "{ATTR::input_path}/transposition-domain.geojson",
"watershed": "{ATTR::input_path}/watershed-boundary.geojson"
},
"store_name": "StormHubStore"
}
],
"outputs": [
{
"name": "StormHubOutputs",
"paths": { "root": "{ATTR::output_path}" },
"store_name": "StormHubStore"
}
],
"actions": [
{ "name": "download-inputs", "type": "utils", "description": "Download geometries", "attributes": {}, "stores": [], "inputs": [], "outputs": [] },
{ "name": "process-storms", "type": "run", "description": "Create STAC storm catalog", "attributes": {}, "stores": [], "inputs": [], "outputs": [] },
{ "name": "convert-to-dss", "type": "extract", "description": "Convert Zarr to HEC-DSS", "attributes": {}, "stores": [], "inputs": [], "outputs": [] },
{ "name": "upload-outputs", "type": "utils", "description": "Upload results", "attributes": {}, "stores": [], "inputs": [], "outputs": [] }
]
}
89 changes: 89 additions & 0 deletions test/test_worker_sizing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Unit tests for worker_sizing.resolve_num_workers."""

from __future__ import annotations

import sys
from pathlib import Path

import pytest

sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))

import worker_sizing # noqa: E402


@pytest.fixture(autouse=True)
def _clean_env(monkeypatch):
monkeypatch.delenv("CC_NUM_WORKERS", raising=False)


@pytest.fixture
def no_cgroup(monkeypatch):
monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: None)


def test_payload_attribute_wins(monkeypatch, no_cgroup):
monkeypatch.setenv("CC_NUM_WORKERS", "7")
assert worker_sizing.resolve_num_workers({"num_workers": "3"}) == 3


def test_payload_attribute_floors_at_one(no_cgroup):
assert worker_sizing.resolve_num_workers({"num_workers": "0"}) == 1


def test_env_used_when_no_attribute(monkeypatch, no_cgroup):
monkeypatch.setenv("CC_NUM_WORKERS", "5")
assert worker_sizing.resolve_num_workers({}) == 5


def test_empty_attribute_falls_through(no_cgroup):
assert worker_sizing.resolve_num_workers({"num_workers": ""}) == 1


def test_auto_sizes_from_cgroup(monkeypatch):
monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: 15000)
assert worker_sizing.resolve_num_workers({}) == 7 # 15000 // 2048


def test_auto_floors_at_one_when_budget_below_per_worker(monkeypatch):
monkeypatch.setattr(worker_sizing, "_cgroup_mem_limit_mb", lambda: 512)
assert worker_sizing.resolve_num_workers({}) == 1


def test_fallback_to_one_when_cgroup_unset(no_cgroup):
assert worker_sizing.resolve_num_workers({}) == 1


def _patch_cgroup_read(monkeypatch, contents):
class FakePath:
def __init__(self, *_): pass
def read_text(self, **_): return contents
monkeypatch.setattr(worker_sizing, "Path", FakePath)


def test_cgroup_max_means_unlimited(monkeypatch):
_patch_cgroup_read(monkeypatch, "max\n")
assert worker_sizing._cgroup_mem_limit_mb() is None


def test_cgroup_bytes_converted(monkeypatch):
_patch_cgroup_read(monkeypatch, f"{3 * 1024 * 1024 * 1024}\n")
assert worker_sizing._cgroup_mem_limit_mb() == 3072


def test_cgroup_huge_sentinel_treated_as_unlimited(monkeypatch):
_patch_cgroup_read(monkeypatch, str(1 << 63))
assert worker_sizing._cgroup_mem_limit_mb() is None


def test_cgroup_missing_returns_none(monkeypatch):
class MissingPath:
def __init__(self, *_): pass
def read_text(self, **_): raise FileNotFoundError
monkeypatch.setattr(worker_sizing, "Path", MissingPath)
assert worker_sizing._cgroup_mem_limit_mb() is None


def test_cgroup_malformed_returns_none(monkeypatch):
_patch_cgroup_read(monkeypatch, "garbage")
assert worker_sizing._cgroup_mem_limit_mb() is None
Loading