diff --git a/README.md b/README.md index 966ce761..e13a7a0c 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ madengine is a modern CLI tool for running Large Language Models (LLMs) and Deep - [Key Features](#-key-features) - [Quick Start](#-quick-start) +- [Smoke Testing](#-smoke-testing) - [Commands](#-commands) - [Documentation](#-documentation) - [Architecture](#-architecture) @@ -80,6 +81,31 @@ madengine run --tags dummy --rocm-path /path/to/rocm **Results:** Performance data is written to `perf.csv` (and optionally `perf_entry.csv`). The file is created automatically if missing. Failed runs (including pre-run setup failures) are recorded with status `FAILURE` so every attempted model appears in the table. See [Exit Codes](docs/cli-reference.md#exit-codes) for CI/script usage. +## ๐Ÿงช Smoke Testing + +Use the prebuilt smoke configs and wrapper script under `examples/` to quickly validate: + +- RDMA recommender on SLURM + Kubernetes +- GCM preflight/collector on SLURM (phase-1 scope) + +```bash +# SLURM smoke (build + run) + artifact verification +examples/run-smoke.sh slurm MODEL_DIR=/path/to/model MODEL_TAG=your_tag +examples/run-smoke.sh verify-slurm + +# Kubernetes smoke (build + run) + artifact verification +examples/run-smoke.sh k8s MODEL_DIR=/path/to/model MODEL_TAG=your_tag +examples/run-smoke.sh verify-k8s +``` + +Smoke assets: + +- `examples/run-smoke.sh` +- `examples/Makefile.smoke` +- `examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json` +- `examples/k8s-configs/configs/smoke-rdma-k8s.json` +- `examples/cluster-smoke-checklist.md` + ## ๐Ÿ“‹ Commands madengine provides five main commands for model automation and benchmarking: diff --git a/docs/configuration.md b/docs/configuration.md index f6ae79a1..313c7147 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -119,6 +119,93 @@ madengine run --tags my_unit_test_suite \ Disabling the scan does **not** change performance metric extraction from the log; it only affects the post-hoc grep used to set `has_errors` for status. +## Cluster Feature Layer (`additional_context.cluster`) + +`cluster` is an additive feature-flag namespace for RDMA and (SLURM-only) GCM integration. +Nothing changes unless you explicitly set `cluster.*.enabled: true`. + +### Schema + +```json +{ + "cluster": { + "rdma": { + "enabled": false, + "strict": false, + "mode": "recommend", + "apply_env": true, + "artifact_name": "rdma_recommendation.json" + }, + "gcm": { + "enabled": false, + "enabled_platforms": ["slurm"], + "source": { + "repo": "https://github.com/coketaste/gcm", + "ref": "9fed02cd0721d3937f8749672951185f31955bd4" + }, + "strict": false, + "health_checks": ["check-hca", "check-ibstat"], + "collector": { + "enabled": false, + "command": "slurm_job_monitor", + "once": true, + "sink": "file", + "timeout_sec": 120, + "max_retries": 1, + "best_effort": true + }, + "artifacts": { + "dir": "./slurm_results/cluster_artifacts", + "files": { + "health_summary_json": "gcm_health_summary.json", + "health_raw_log": "gcm_health_raw.log", + "collector_output": "gcm_collector_output.log" + } + } + } + } +} +``` + +### RDMA behavior (SLURM + Kubernetes) + +- `mode: "recommend"` keeps user `env_vars` precedence; only missing RDMA vars are injected. +- `mode: "enforce"` lets recommender output override existing conflicting RDMA env vars. +- `strict: true` fails the workload when no valid RDMA recommendation can be produced. +- Artifacts are written per node/pod and included in deployment result summaries. + +### GCM behavior (SLURM only in this phase) + +- Health checks run in preflight (`check-hca`, `check-ibstat` allowlist only). +- `strict: true` gates submission on health-check failures; `strict: false` warns and continues. +- Collector runs as one-shot `gcm slurm_job_monitor --once` during result collection. +- Collector defaults to best effort (`best_effort: true`) and does not gate workload success. +- Source is pinned to `coketaste/gcm` with fixed commit ref for reproducibility checks. + +### Rollout guidance + +1. Start with `cluster.rdma.enabled=true`, `strict=false`, `mode="recommend"`. +2. Validate RDMA artifacts and selected env vars on single-node, then multi-node. +3. Enable `cluster.gcm.enabled=true` with `strict=false` to observe health output. +4. Turn on `cluster.gcm.strict=true` only after cluster baseline is stable. +5. Keep collector best-effort initially; tighten only after runtime overhead is validated. + +### Smoke configs and one-line runner + +Prebuilt smoke configs are available at: + +- `examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json` +- `examples/k8s-configs/configs/smoke-rdma-k8s.json` + +Run them with: + +```bash +examples/run-smoke.sh slurm MODEL_DIR=/path/to/model MODEL_TAG=your_tag +examples/run-smoke.sh k8s MODEL_DIR=/path/to/model MODEL_TAG=your_tag +``` + +Artifact verification commands are documented in `examples/cluster-smoke-checklist.md`. + ## Basic Configuration **gpu_vendor** (case-insensitive): diff --git a/docs/deployment.md b/docs/deployment.md index 07ae870b..d1e11554 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -1,6 +1,7 @@ # Deployment Guide Deploy madengine workloads to Kubernetes or SLURM clusters for distributed execution. +For quick end-to-end validation commands, see the README [Smoke Testing](../README.md#-smoke-testing) section. ## Overview @@ -11,6 +12,41 @@ madengine supports two deployment backends: Deployment is configured via `--additional-context` and happens automatically during the run phase. +## Cluster Feature Stages + +When `additional_context.cluster` is enabled, stage placement is: + +- **RDMA recommender**: runtime stage on both SLURM and Kubernetes (before workload launch). +- **GCM health checks**: SLURM preflight stage before `sbatch` submission. +- **GCM collector snapshot**: SLURM result-collection stage (post-run, one-shot). + +Current scope is intentionally phased: + +- RDMA: **SLURM + Kubernetes** +- GCM: **SLURM only** + +## Cluster Smoke Runner + +For quick validation of the new cluster feature layer, use the smoke assets under `examples/`: + +- Configs: + - `examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json` + - `examples/k8s-configs/configs/smoke-rdma-k8s.json` +- One-line wrapper: + - `examples/run-smoke.sh` +- Full checklist: + - `examples/cluster-smoke-checklist.md` + +Example: + +```bash +examples/run-smoke.sh slurm MODEL_DIR=/path/to/model MODEL_TAG=your_tag +examples/run-smoke.sh verify-slurm + +examples/run-smoke.sh k8s MODEL_DIR=/path/to/model MODEL_TAG=your_tag +examples/run-smoke.sh verify-k8s +``` + ## Deployment Workflow ``` diff --git a/examples/Makefile.smoke b/examples/Makefile.smoke new file mode 100644 index 00000000..b9fafaac --- /dev/null +++ b/examples/Makefile.smoke @@ -0,0 +1,53 @@ +.PHONY: smoke-slurm smoke-k8s smoke-slurm-verify smoke-k8s-verify + +SLURM_SMOKE_CONFIG ?= examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json +K8S_SMOKE_CONFIG ?= examples/k8s-configs/configs/smoke-rdma-k8s.json + +SLURM_SMOKE_MANIFEST ?= build_manifest.slurm.smoke.json +K8S_SMOKE_MANIFEST ?= build_manifest.k8s.smoke.json + +_require-model-vars: + @test -n "$(MODEL_DIR)" || (echo "ERROR: MODEL_DIR is required"; exit 1) + @test -n "$(MODEL_TAG)" || (echo "ERROR: MODEL_TAG is required"; exit 1) + +smoke-slurm: _require-model-vars + MODEL_DIR="$(MODEL_DIR)" madengine build \ + --tags "$(MODEL_TAG)" \ + --additional-context-file "$(SLURM_SMOKE_CONFIG)" \ + --manifest-output "$(SLURM_SMOKE_MANIFEST)" + MODEL_DIR="$(MODEL_DIR)" madengine run \ + --manifest-file "$(SLURM_SMOKE_MANIFEST)" \ + --timeout 3600 + +smoke-k8s: _require-model-vars + MODEL_DIR="$(MODEL_DIR)" madengine build \ + --tags "$(MODEL_TAG)" \ + --additional-context-file "$(K8S_SMOKE_CONFIG)" \ + --manifest-output "$(K8S_SMOKE_MANIFEST)" + MODEL_DIR="$(MODEL_DIR)" madengine run \ + --manifest-file "$(K8S_SMOKE_MANIFEST)" \ + --timeout 3600 + +smoke-slurm-verify: + python3 - <<'PY' +import glob, json +health = glob.glob("slurm_results/cluster_artifacts/**/gcm_health_summary.json", recursive=True) +collector = glob.glob("slurm_results/cluster_artifacts/**/gcm_collector_output.log", recursive=True) +rdma = glob.glob("slurm_results/**/rdma_recommendation.json", recursive=True) +print("gcm health:", health) +print("gcm collector:", collector) +print("rdma artifacts:", rdma) +for p in rdma[:3]: + data = json.load(open(p)) + print(p, data.get("status")) +PY + +smoke-k8s-verify: + python3 - <<'PY' +import glob, json +rdma = glob.glob("k8s_results/**/rdma_recommendation.json", recursive=True) +print("rdma artifacts:", rdma) +for p in rdma[:3]: + data = json.load(open(p)) + print(p, data.get("status")) +PY diff --git a/examples/cluster-smoke-checklist.md b/examples/cluster-smoke-checklist.md new file mode 100644 index 00000000..477eebb6 --- /dev/null +++ b/examples/cluster-smoke-checklist.md @@ -0,0 +1,135 @@ +# Cluster Smoke Checklist (RDMA + GCM Phase 1) + +This checklist validates: + +- RDMA recommender on **SLURM + Kubernetes** +- GCM health checks + one-shot collector on **SLURM only** + +If you prefer one-liners, use: + +```bash +make -f examples/Makefile.smoke smoke-slurm MODEL_DIR= MODEL_TAG= +make -f examples/Makefile.smoke smoke-k8s MODEL_DIR= MODEL_TAG= +``` + +Or use the wrapper script: + +```bash +examples/run-smoke.sh slurm MODEL_DIR= MODEL_TAG= +examples/run-smoke.sh verify-slurm +examples/run-smoke.sh k8s MODEL_DIR= MODEL_TAG= +examples/run-smoke.sh verify-k8s +``` + +## 0) Set shared variables + +```bash +cd /home/ysha/amd/madengine +export MODEL_DIR="" +export MODEL_TAG="" +``` + +--- + +## 1) SLURM smoke (RDMA + GCM) + +### 1.1 Build + +```bash +madengine build \ + --tags "${MODEL_TAG}" \ + --additional-context-file "examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json" \ + --manifest-output "build_manifest.slurm.smoke.json" +``` + +### 1.2 Run + +```bash +madengine run \ + --manifest-file "build_manifest.slurm.smoke.json" \ + --timeout 3600 +``` + +### 1.3 Verify artifacts + +```bash +# GCM health summary +python3 - <<'PY' +import glob, json, os +matches = glob.glob("slurm_results/cluster_artifacts/**/gcm_health_summary.json", recursive=True) +print("gcm_health_summary files:", matches) +for p in matches[:2]: + print(p, json.load(open(p)).get("status")) +PY + +# GCM collector output +python3 - <<'PY' +import glob +matches = glob.glob("slurm_results/cluster_artifacts/**/gcm_collector_output.log", recursive=True) +print("gcm_collector_output files:", matches) +PY + +# RDMA artifacts copied per node collection directory +python3 - <<'PY' +import glob, json +matches = glob.glob("slurm_results/**/rdma_recommendation.json", recursive=True) +print("rdma_recommendation files:", matches) +for p in matches[:3]: + data = json.load(open(p)) + print(p, data.get("status"), sorted((data.get("recommended_env") or {}).keys())[:6]) +PY +``` + +--- + +## 2) Kubernetes smoke (RDMA only) + +### 2.1 Build + +```bash +madengine build \ + --tags "${MODEL_TAG}" \ + --additional-context-file "examples/k8s-configs/configs/smoke-rdma-k8s.json" \ + --manifest-output "build_manifest.k8s.smoke.json" +``` + +### 2.2 Run + +```bash +madengine run \ + --manifest-file "build_manifest.k8s.smoke.json" \ + --timeout 3600 +``` + +### 2.3 Verify artifacts + +```bash +python3 - <<'PY' +import glob, json +matches = glob.glob("k8s_results/**/rdma_recommendation.json", recursive=True) +print("rdma_recommendation files:", matches) +for p in matches[:3]: + data = json.load(open(p)) + print(p, data.get("status"), sorted((data.get("recommended_env") or {}).keys())[:6]) +PY +``` + +--- + +## 3) Optional strict-mode gate checks + +### 3.1 SLURM GCM strict gate + +Set in `examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json`: + +- `cluster.gcm.strict: true` + +Then rerun section 1. If health checks fail, submission should fail early. + +### 3.2 RDMA strict gate + +Set in smoke config(s): + +- `cluster.rdma.strict: true` + +Then rerun section 1 or 2. Workload should fail when RDMA recommendation cannot be produced. diff --git a/examples/k8s-configs/README.md b/examples/k8s-configs/README.md index 09cf492e..bbe2d9fc 100644 --- a/examples/k8s-configs/README.md +++ b/examples/k8s-configs/README.md @@ -146,6 +146,24 @@ MODEL_DIR=tests/fixtures/dummy madengine run \ --live-output ``` +## ๐Ÿงช Cluster Feature Smoke Config (RDMA) + +Use this phase-1 smoke config to validate RDMA recommender behavior on Kubernetes. +(`cluster.gcm` remains SLURM-only in this phase.) + +Config file: + +- `examples/k8s-configs/configs/smoke-rdma-k8s.json` + +One-line runner: + +```bash +examples/run-smoke.sh k8s MODEL_DIR=/path/to/model MODEL_TAG=your_tag +examples/run-smoke.sh verify-k8s +``` + +For full command-by-command verification, see `examples/cluster-smoke-checklist.md`. + --- ## ๐Ÿ“ Available Configurations diff --git a/examples/k8s-configs/configs/smoke-rdma-k8s.json b/examples/k8s-configs/configs/smoke-rdma-k8s.json new file mode 100644 index 00000000..8ab7dca7 --- /dev/null +++ b/examples/k8s-configs/configs/smoke-rdma-k8s.json @@ -0,0 +1,21 @@ +{ + "_comment": "Phase-1 smoke config: RDMA on Kubernetes (GCM remains SLURM-only).", + "k8s": { + "gpu_count": 1, + "namespace": "default" + }, + "distributed": { + "launcher": "torchrun", + "nnodes": 1, + "nproc_per_node": 1 + }, + "cluster": { + "rdma": { + "enabled": true, + "strict": false, + "mode": "recommend", + "apply_env": true, + "artifact_name": "rdma_recommendation.json" + } + } +} diff --git a/examples/run-smoke.sh b/examples/run-smoke.sh new file mode 100755 index 00000000..f5723fc3 --- /dev/null +++ b/examples/run-smoke.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +MAKEFILE_PATH="${ROOT_DIR}/examples/Makefile.smoke" + +usage() { + cat <<'EOF' +Usage: + examples/run-smoke.sh MODEL_DIR= MODEL_TAG= [extra make vars...] + +Commands: + slurm Run SLURM smoke (build + run) + k8s Run Kubernetes smoke (build + run) + verify-slurm Verify SLURM smoke artifacts + verify-k8s Verify Kubernetes smoke artifacts + +Examples: + examples/run-smoke.sh slurm MODEL_DIR=/path/to/model MODEL_TAG=dummy + examples/run-smoke.sh k8s MODEL_DIR=/path/to/model MODEL_TAG=dummy + examples/run-smoke.sh verify-slurm + examples/run-smoke.sh verify-k8s +EOF +} + +if [[ $# -lt 1 ]]; then + usage + exit 1 +fi + +cmd="$1" +shift + +case "${cmd}" in + slurm) + exec make -f "${MAKEFILE_PATH}" smoke-slurm "$@" + ;; + k8s) + exec make -f "${MAKEFILE_PATH}" smoke-k8s "$@" + ;; + verify-slurm) + exec make -f "${MAKEFILE_PATH}" smoke-slurm-verify "$@" + ;; + verify-k8s) + exec make -f "${MAKEFILE_PATH}" smoke-k8s-verify "$@" + ;; + -h|--help|help) + usage + ;; + *) + echo "Unknown command: ${cmd}" >&2 + usage + exit 2 + ;; +esac diff --git a/examples/slurm-configs/README.md b/examples/slurm-configs/README.md index 7d714a08..f0ca733f 100644 --- a/examples/slurm-configs/README.md +++ b/examples/slurm-configs/README.md @@ -170,6 +170,26 @@ madengine run --tags model_tag \ --additional-context '{"slurm": {"nodes": 4, "time": "48:00:00"}}' ``` +## ๐Ÿงช Cluster Feature Smoke Config (RDMA + GCM) + +Use this phase-1 smoke config to validate: + +- RDMA recommender in runtime path +- GCM health preflight + one-shot collector (SLURM only) + +Config file: + +- `examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json` + +One-line runner: + +```bash +examples/run-smoke.sh slurm MODEL_DIR=/path/to/model MODEL_TAG=your_tag +examples/run-smoke.sh verify-slurm +``` + +For full command-by-command verification, see `examples/cluster-smoke-checklist.md`. + ## ๐Ÿ”„ Distributed Workload Support The SLURM deployment **automatically configures distributed execution** for multi-node and multi-GPU setups (training with torchrun/deepspeed or inference with vLLM/SGLang): diff --git a/examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json b/examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json new file mode 100644 index 00000000..0d6ad1bd --- /dev/null +++ b/examples/slurm-configs/configs/smoke-rdma-gcm-slurm.json @@ -0,0 +1,56 @@ +{ + "_comment": "Phase-1 smoke config: RDMA + SLURM-only GCM checks/collector.", + "slurm": { + "partition": "gpu", + "nodes": 1, + "gpus_per_node": 1, + "time": "00:30:00", + "output_dir": "./slurm_results" + }, + "distributed": { + "launcher": "torchrun", + "nnodes": 1, + "nproc_per_node": 1 + }, + "cluster": { + "rdma": { + "enabled": true, + "strict": false, + "mode": "recommend", + "apply_env": true, + "artifact_name": "rdma_recommendation.json" + }, + "gcm": { + "enabled": true, + "enabled_platforms": [ + "slurm" + ], + "source": { + "repo": "https://github.com/coketaste/gcm", + "ref": "9fed02cd0721d3937f8749672951185f31955bd4" + }, + "strict": false, + "health_checks": [ + "check-hca", + "check-ibstat" + ], + "collector": { + "enabled": true, + "command": "slurm_job_monitor", + "once": true, + "sink": "file", + "timeout_sec": 120, + "max_retries": 1, + "best_effort": true + }, + "artifacts": { + "dir": "./slurm_results/cluster_artifacts", + "files": { + "health_summary_json": "gcm_health_summary.json", + "health_raw_log": "gcm_health_raw.log", + "collector_output": "gcm_collector_output.log" + } + } + } + } +} diff --git a/src/madengine/cli/validators.py b/src/madengine/cli/validators.py index d99e87f7..c6f60b48 100644 --- a/src/madengine/cli/validators.py +++ b/src/madengine/cli/validators.py @@ -104,6 +104,11 @@ def _fail_structure(key: str, expected: str) -> None: raise typer.Exit(ExitCode.INVALID_ARGS) +def _fail_value(key: str, message: str) -> None: + console.print(f"โŒ Invalid additional context: key [red]{key}[/red] {message}.") + raise typer.Exit(ExitCode.INVALID_ARGS) + + def validate_additional_context_structure(context: Dict[str, Any]) -> None: """Validate types of known keys after defaults are applied.""" if "docker_build_arg" in context: @@ -142,6 +147,7 @@ def validate_additional_context_structure(context: Dict[str, Any]) -> None: "kubernetes", "distributed", "vllm", + "cluster", "deployment_config", "deploy", ): @@ -198,6 +204,104 @@ def validate_additional_context_structure(context: Dict[str, Any]) -> None: "a non-empty array of strings", ) + if "cluster" in context: + cluster = context["cluster"] + if not isinstance(cluster, dict): + _fail_structure("cluster", "an object") + + rdma = cluster.get("rdma") + if rdma is not None: + if not isinstance(rdma, dict): + _fail_structure("cluster.rdma", "an object") + if "enabled" in rdma and not isinstance(rdma["enabled"], bool): + _fail_structure("cluster.rdma.enabled", "a boolean") + if "strict" in rdma and not isinstance(rdma["strict"], bool): + _fail_structure("cluster.rdma.strict", "a boolean") + if "apply_env" in rdma and not isinstance(rdma["apply_env"], bool): + _fail_structure("cluster.rdma.apply_env", "a boolean") + if "artifact_name" in rdma and not isinstance(rdma["artifact_name"], str): + _fail_structure("cluster.rdma.artifact_name", "a string") + mode = rdma.get("mode") + if mode is not None and mode not in {"recommend", "enforce"}: + _fail_value("cluster.rdma.mode", "must be 'recommend' or 'enforce'") + + gcm = cluster.get("gcm") + if gcm is not None: + if not isinstance(gcm, dict): + _fail_structure("cluster.gcm", "an object") + if "enabled" in gcm and not isinstance(gcm["enabled"], bool): + _fail_structure("cluster.gcm.enabled", "a boolean") + if "strict" in gcm and not isinstance(gcm["strict"], bool): + _fail_structure("cluster.gcm.strict", "a boolean") + if "enabled_platforms" in gcm: + platforms = gcm["enabled_platforms"] + if not isinstance(platforms, list) or not all( + isinstance(x, str) for x in platforms + ): + _fail_structure("cluster.gcm.enabled_platforms", "an array of strings") + if "health_checks" in gcm: + checks = gcm["health_checks"] + if not isinstance(checks, list) or not all( + isinstance(x, str) for x in checks + ): + _fail_structure("cluster.gcm.health_checks", "an array of strings") + allowed_checks = {"check-hca", "check-ibstat"} + disallowed = [x for x in checks if x not in allowed_checks] + if disallowed: + _fail_value( + "cluster.gcm.health_checks", + f"contains unsupported value(s) {disallowed}; allowed={sorted(allowed_checks)}", + ) + source = gcm.get("source") + if source is not None: + if not isinstance(source, dict): + _fail_structure("cluster.gcm.source", "an object") + if "repo" in source and not isinstance(source["repo"], str): + _fail_structure("cluster.gcm.source.repo", "a string") + if "ref" in source and not isinstance(source["ref"], str): + _fail_structure("cluster.gcm.source.ref", "a string") + collector = gcm.get("collector") + if collector is not None: + if not isinstance(collector, dict): + _fail_structure("cluster.gcm.collector", "an object") + for bool_key in ("enabled", "once", "best_effort"): + if bool_key in collector and not isinstance( + collector[bool_key], bool + ): + _fail_structure(f"cluster.gcm.collector.{bool_key}", "a boolean") + for int_key in ("timeout_sec", "max_retries"): + if int_key in collector and not isinstance(collector[int_key], int): + _fail_structure( + f"cluster.gcm.collector.{int_key}", "an integer" + ) + if "command" in collector and not isinstance(collector["command"], str): + _fail_structure("cluster.gcm.collector.command", "a string") + if "sink" in collector and not isinstance(collector["sink"], str): + _fail_structure("cluster.gcm.collector.sink", "a string") + if collector.get("command") and collector["command"] not in { + "slurm_job_monitor" + }: + _fail_value( + "cluster.gcm.collector.command", + "must be 'slurm_job_monitor' in this phase", + ) + artifacts = gcm.get("artifacts") + if artifacts is not None: + if not isinstance(artifacts, dict): + _fail_structure("cluster.gcm.artifacts", "an object") + if "dir" in artifacts and not isinstance(artifacts["dir"], str): + _fail_structure("cluster.gcm.artifacts.dir", "a string") + files = artifacts.get("files") + if files is not None: + if not isinstance(files, dict): + _fail_structure("cluster.gcm.artifacts.files", "an object") + for key, value in files.items(): + if not isinstance(key, str) or not isinstance(value, str): + _fail_structure( + "cluster.gcm.artifacts.files", + "an object with string keys and values", + ) + def _normalize_docker_build_arg_values(context: Dict[str, Any]) -> None: dba = context.get("docker_build_arg") diff --git a/src/madengine/deployment/kubernetes.py b/src/madengine/deployment/kubernetes.py index 3430c9d0..cd8eb74e 100644 --- a/src/madengine/deployment/kubernetes.py +++ b/src/madengine/deployment/kubernetes.py @@ -180,6 +180,8 @@ def __init__(self, config: DeploymentConfig): self.namespace = self.k8s_config.get("namespace", "default") self.gpu_resource_name = self.k8s_config.get("gpu_resource_name", "amd.com/gpu") + self.cluster_config = config.additional_context.get("cluster", {}) + self.rdma_config = self.cluster_config.get("rdma", {}) # Setup Jinja2 template environment template_dir = Path(__file__).parent / "templates" / "kubernetes" @@ -1074,6 +1076,24 @@ def _prepare_template_context( # Load pre/post script contents for ConfigMap (since madengine not installed in container) pre_post_script_contents = self._load_common_scripts(pre_scripts + post_scripts) + rdma_enabled = bool(self.rdma_config.get("enabled", False)) + rdma_strict = bool(self.rdma_config.get("strict", False)) + rdma_mode = self.rdma_config.get("mode", "recommend") + rdma_apply_env = bool(self.rdma_config.get("apply_env", True)) + rdma_artifact_name = self.rdma_config.get( + "artifact_name", "rdma_recommendation.json" + ) + if rdma_enabled: + rdma_script = Path(__file__).parent / "rdma_recommender.py" + if rdma_script.exists(): + pre_post_script_contents[ + "scripts/common/tools/rdma_recommender.py" + ] = rdma_script.read_text(encoding="utf-8") + else: + self.console.print( + "[yellow]Warning: RDMA recommender module not found; runtime RDMA stage will be skipped.[/yellow]" + ) + merged_sec = merge_secrets_config(self.k8s_config) strategy = merged_sec.get("strategy", SECRETS_STRATEGY_FROM_LOCAL) cred_path = Path("credential.json") @@ -1212,6 +1232,12 @@ def _prepare_template_context( "common_script_contents": pre_post_script_contents, # Multiple results file (e.g. perf_dummy.csv) - copied to PVC for K8s result collection "multiple_results": model_info.get("multiple_results") or "", + # Cluster RDMA feature + "rdma_enabled": rdma_enabled, + "rdma_strict": rdma_strict, + "rdma_mode": rdma_mode, + "rdma_apply_env": rdma_apply_env, + "rdma_artifact_name": rdma_artifact_name, } est = estimate_configmap_payload_bytes(context) @@ -2160,6 +2186,7 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: "artifacts": [], "successful_runs": [], "failed_runs": [], + "cluster_features": {}, } # Create results directory for this deployment @@ -2544,6 +2571,31 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: ) # 4. Generate summary + rdma_cfg = self.config.additional_context.get("cluster", {}).get("rdma", {}) + if rdma_cfg.get("enabled", False): + artifact_name = rdma_cfg.get( + "artifact_name", "rdma_recommendation.json" + ) + rdma_files = sorted(results_dir.glob(f"**/{artifact_name}")) + rdma_entries = [] + for artifact in rdma_files: + try: + payload = json.loads(artifact.read_text(encoding="utf-8")) + except json.JSONDecodeError: + payload = {"status": "invalid_json"} + rdma_entries.append( + { + "artifact": str(artifact), + "status": payload.get("status", "unknown"), + "recommended_env": payload.get("recommended_env", {}), + } + ) + if rdma_entries: + results["cluster_features"]["rdma"] = { + "artifact_name": artifact_name, + "entries": rdma_entries, + } + self._generate_results_summary(results, results_dir) except Exception as e: diff --git a/src/madengine/deployment/rdma_recommender.py b/src/madengine/deployment/rdma_recommender.py new file mode 100644 index 00000000..d1e52e1a --- /dev/null +++ b/src/madengine/deployment/rdma_recommender.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +Native RDMA environment recommender for cluster deployments. + +Produces a stable JSON contract plus optional KEY=VALUE env file that can be +consumed by SLURM/Kubernetes runtime scripts. +""" + +from __future__ import annotations + +import argparse +import glob +import json +import os +import re +import subprocess +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + + +def _run(cmd: List[str], timeout_sec: int = 8) -> str: + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout_sec, + check=False, + ) + except (OSError, subprocess.SubprocessError): + return "" + if result.returncode != 0: + return "" + return result.stdout.strip() + + +def _pci_from_device(device_path: str) -> str: + link = os.path.join(device_path, "device") + if os.path.islink(link): + try: + return os.path.basename(os.readlink(link)) + except OSError: + return "UNKNOWN_PCI" + return "UNKNOWN_PCI" + + +def _netdev_for_pci(target_pci: str) -> str: + for netdev in glob.glob("/sys/class/net/*"): + link = os.path.join(netdev, "device") + if not os.path.islink(link): + continue + try: + if os.path.basename(os.readlink(link)) == target_pci: + return os.path.basename(netdev) + except OSError: + continue + return "NO_NETDEV" + + +def _vendor_from_pci(pci_addr: str) -> str: + if not pci_addr or pci_addr == "UNKNOWN_PCI": + return "UNKNOWN" + out = _run(["lspci", "-s", pci_addr, "-nn"]).lower() + if "pensando" in out: + return "AINIC" + if "broadcom" in out: + return "BNXT" + if "mellanox" in out or "nvidia" in out: + return "MLNX" + return "UNKNOWN" + + +def _ibv_devinfo(device: str) -> str: + return _run(["ibv_devinfo", "-d", device, "-v"], timeout_sec=10) + + +def _firmware_from_ibv(output: str) -> str: + for line in output.splitlines(): + line = line.strip() + if line.startswith("fw_ver:"): + return line.split("fw_ver:", 1)[1].strip() + return "UNKNOWN" + + +def _gid_from_ibv(output: str) -> Tuple[str, str]: + for line in output.splitlines(): + if "::ffff:" not in line or "GID[" not in line: + continue + idx = re.search(r"GID\[\s*(\d+)\]", line) + ip = re.search(r"(::ffff:[0-9.]+)", line) + if idx and ip: + return idx.group(1), ip.group(1) + return "-", "N/A" + + +def _socket_ifname() -> str: + route = _run(["bash", "-lc", "ip route show default | awk '{print $5}'"]) + if not route: + return "eth0" + ifnames = list(dict.fromkeys(route.splitlines())) + return ifnames[0] if ifnames else "eth0" + + +def discover_rdma_devices() -> List[Dict[str, str]]: + devices: List[Dict[str, str]] = [] + for path in sorted(glob.glob("/sys/class/infiniband/*")): + rdma_name = os.path.basename(path) + pci = _pci_from_device(path) + netdev = _netdev_for_pci(pci) + ibv = _ibv_devinfo(rdma_name) + gid_index, gid_value = _gid_from_ibv(ibv) + devices.append( + { + "rdma": rdma_name, + "pci": pci, + "netdev": netdev, + "firmware": _firmware_from_ibv(ibv), + "gid_index": gid_index, + "gid_value": gid_value, + "vendor": _vendor_from_pci(pci), + } + ) + return devices + + +def recommend_env_vars(devices: List[Dict[str, str]]) -> Dict[str, str]: + if not devices: + return {"NCCL_IB_DISABLE": "1"} + + env: Dict[str, str] = { + "NCCL_IB_DISABLE": "0", + "NCCL_IGNORE_CPU_AFFINITY": "1", + "NCCL_SOCKET_IFNAME": _socket_ifname(), + "GLOO_SOCKET_IFNAME": _socket_ifname(), + } + + gid_indexes = [d["gid_index"] for d in devices if str(d["gid_index"]).isdigit()] + if gid_indexes: + env["NCCL_IB_GID_INDEX"] = str(max(int(g) for g in gid_indexes)) + + prioritized = sorted( + devices, + key=lambda d: ( + d.get("firmware", ""), + 1 if str(d.get("gid_index", "")).isdigit() else 0, + ), + reverse=True, + ) + hca = [d["rdma"] for d in prioritized if d.get("rdma")] + if hca: + env["NCCL_IB_HCA"] = ",".join(hca) + return env + + +def build_recommendation() -> Dict[str, Any]: + devices = discover_rdma_devices() + status = "ok" if devices else "no_rdma" + confidence = "high" if devices else "low" + env = recommend_env_vars(devices) + return { + "schema_version": "1.0", + "timestamp_utc": datetime.now(timezone.utc).isoformat(), + "status": status, + "confidence": confidence, + "errors": [], + "devices": devices, + "recommended_env": env, + } + + +def write_artifact(payload: Dict[str, Any], output_path: Optional[str]) -> None: + if not output_path: + return + out = Path(output_path) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + +def write_env_file(env: Dict[str, str], env_file: Optional[str]) -> None: + if not env_file: + return + out = Path(env_file) + out.parent.mkdir(parents=True, exist_ok=True) + lines = [f"{k}={v}" for k, v in env.items()] + out.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def run_cli() -> int: + parser = argparse.ArgumentParser(description="Generate RDMA env recommendations.") + parser.add_argument("--output", default="", help="JSON artifact path") + parser.add_argument("--env-file", default="", help="KEY=VALUE env output path") + parser.add_argument( + "--strict", + action="store_true", + help="Exit non-zero when no valid RDMA setup is detected", + ) + args = parser.parse_args() + + payload = build_recommendation() + write_artifact(payload, args.output or None) + write_env_file(payload.get("recommended_env", {}), args.env_file or None) + + if args.strict and payload.get("status") != "ok": + return 2 + return 0 + + +if __name__ == "__main__": + raise SystemExit(run_cli()) diff --git a/src/madengine/deployment/slurm.py b/src/madengine/deployment/slurm.py index a45f83d3..8ea9980e 100644 --- a/src/madengine/deployment/slurm.py +++ b/src/madengine/deployment/slurm.py @@ -12,6 +12,7 @@ """ import os +import shutil import subprocess from pathlib import Path from typing import Any, Dict, List, Optional @@ -48,6 +49,8 @@ class SlurmDeployment(BaseDeployment): DEPLOYMENT_TYPE = "slurm" REQUIRED_TOOLS = ["sbatch", "squeue", "scontrol"] # Must be available locally + GCM_ALLOWED_HEALTH_CHECKS = {"check-hca", "check-ibstat"} + GCM_ALLOWED_COLLECTORS = {"slurm_job_monitor"} def __init__(self, config: DeploymentConfig): """ @@ -62,6 +65,9 @@ def __init__(self, config: DeploymentConfig): # Parse SLURM configuration (now with defaults applied) self.slurm_config = config.additional_context.get("slurm", {}) self.distributed_config = config.additional_context.get("distributed", {}) + self.cluster_config = config.additional_context.get("cluster", {}) + self.rdma_config = self.cluster_config.get("rdma", {}) + self.gcm_config = self.cluster_config.get("gcm", {}) # SLURM parameters self.partition = self.slurm_config.get("partition", "gpu") @@ -77,6 +83,112 @@ def __init__(self, config: DeploymentConfig): # Generated script path self.script_path = None + def _gcm_enabled(self) -> bool: + return bool(self.gcm_config.get("enabled", False)) + + def _gcm_artifact_layout(self) -> Dict[str, str]: + source = self.gcm_config.get("source", {}) + artifacts = self.gcm_config.get("artifacts", {}) + files = artifacts.get("files", {}) + return { + "repo": source.get("repo", "https://github.com/coketaste/gcm"), + "ref": source.get("ref", "9fed02cd0721d3937f8749672951185f31955bd4"), + "dir": artifacts.get("dir", str(self.output_dir / "cluster_artifacts")), + "health_summary_json": files.get( + "health_summary_json", "gcm_health_summary.json" + ), + "health_raw_log": files.get("health_raw_log", "gcm_health_raw.log"), + "collector_output": files.get("collector_output", "gcm_collector_output.log"), + "provenance_json": files.get( + "provenance_json", "gcm_source_provenance.json" + ), + } + + def _validate_gcm_allowlist(self) -> None: + checks = self.gcm_config.get("health_checks", ["check-hca", "check-ibstat"]) + invalid_checks = [c for c in checks if c not in self.GCM_ALLOWED_HEALTH_CHECKS] + if invalid_checks: + raise ValueError( + f"Invalid cluster.gcm.health_checks value(s): {invalid_checks}. " + f"Allowed: {sorted(self.GCM_ALLOWED_HEALTH_CHECKS)}" + ) + + collector_cfg = self.gcm_config.get("collector", {}) + if not collector_cfg.get("enabled", False): + return + collector_cmd = collector_cfg.get("command", "slurm_job_monitor") + if collector_cmd not in self.GCM_ALLOWED_COLLECTORS: + raise ValueError( + f"Invalid cluster.gcm.collector.command: {collector_cmd}. " + f"Allowed: {sorted(self.GCM_ALLOWED_COLLECTORS)}" + ) + + def _resolve_gcm_provenance(self) -> Dict[str, str]: + code = ( + "import importlib.metadata as m, json, pathlib; " + "d=m.distribution('gcm'); " + "u=d.read_text('direct_url.json') or '{}'; " + "print(json.dumps({'version': d.version, 'direct_url': json.loads(u)}))" + ) + result = subprocess.run( + ["python3", "-c", code], + capture_output=True, + text=True, + timeout=15, + check=False, + ) + if result.returncode != 0 or not result.stdout.strip(): + return {} + try: + parsed = json.loads(result.stdout) + except json.JSONDecodeError: + return {} + direct_url = parsed.get("direct_url", {}) + vcs_info = direct_url.get("vcs_info", {}) + return { + "version": str(parsed.get("version", "")), + "url": str(direct_url.get("url", "")), + "commit_id": str(vcs_info.get("commit_id", "")), + } + + def _ensure_gcm_installation(self) -> Dict[str, Any]: + layout = self._gcm_artifact_layout() + expected_repo = layout["repo"] + expected_ref = layout["ref"] + strict = bool(self.gcm_config.get("strict", False)) + + existing = self._resolve_gcm_provenance() + has_expected = ( + existing.get("url", "").startswith(expected_repo) + and existing.get("commit_id", "").startswith(expected_ref) + ) + + install_log = {"installed": False, "provenance": existing} + if has_expected: + return install_log + + install_target = f"git+{expected_repo}.git@{expected_ref}" + pip_cmd = ["python3", "-m", "pip", "install", install_target] + result = subprocess.run( + pip_cmd, capture_output=True, text=True, timeout=300, check=False + ) + install_log["installed"] = result.returncode == 0 + install_log["install_stdout"] = result.stdout[-4000:] + install_log["install_stderr"] = result.stderr[-4000:] + + refreshed = self._resolve_gcm_provenance() + install_log["provenance"] = refreshed + now_expected = ( + refreshed.get("url", "").startswith(expected_repo) + and refreshed.get("commit_id", "").startswith(expected_ref) + ) + if strict and not now_expected: + raise RuntimeError( + "GCM strict mode: installed package provenance does not match " + f"{expected_repo}@{expected_ref}" + ) + return install_log + def validate(self) -> bool: """Validate SLURM commands are available locally.""" # Check required SLURM CLI tools @@ -316,6 +428,14 @@ def debug(self, msg): "nproc_per_node": nproc_per_node, # Profiling tools (processed for multi-node compatibility) "tools": tools, + # Cluster feature flags + "rdma_enabled": self.rdma_config.get("enabled", False), + "rdma_strict": self.rdma_config.get("strict", False), + "rdma_mode": self.rdma_config.get("mode", "recommend"), + "rdma_apply_env": self.rdma_config.get("apply_env", True), + "rdma_artifact_name": self.rdma_config.get( + "artifact_name", "rdma_recommendation.json" + ), } def _generate_launcher_command( @@ -707,6 +827,117 @@ def _generate_basic_env_command( export MASTER_PORT={master_port} # Model script should handle launcher invocation''' + def _run_gcm_health_preflight(self) -> None: + if not self._gcm_enabled(): + return + enabled_platforms = self.gcm_config.get("enabled_platforms", ["slurm"]) + if "slurm" not in enabled_platforms: + self.console.print( + "[dim]GCM configured but slurm not enabled in enabled_platforms; skipping.[/dim]" + ) + return + + strict = bool(self.gcm_config.get("strict", False)) + layout = self._gcm_artifact_layout() + artifact_dir = Path(layout["dir"]) + artifact_dir.mkdir(parents=True, exist_ok=True) + summary_path = artifact_dir / layout["health_summary_json"] + raw_log_path = artifact_dir / layout["health_raw_log"] + provenance_path = artifact_dir / layout["provenance_json"] + + self._validate_gcm_allowlist() + install_log = self._ensure_gcm_installation() + provenance_path.write_text( + json.dumps( + { + "expected_repo": layout["repo"], + "expected_ref": layout["ref"], + "install": install_log, + }, + indent=2, + ), + encoding="utf-8", + ) + + gcm_bin = shutil.which("gcm") + health_bin = shutil.which("health_checks") + missing = [] + if not gcm_bin: + missing.append("gcm") + if not health_bin: + missing.append("health_checks") + if missing: + msg = f"Missing GCM binaries: {missing}" + if strict: + raise RuntimeError(msg) + self.console.print(f"[yellow]{msg}; continuing (strict=false).[/yellow]") + summary_path.write_text( + json.dumps({"status": "skipped", "reason": msg}, indent=2), + encoding="utf-8", + ) + return + + checks = self.gcm_config.get("health_checks", ["check-hca", "check-ibstat"]) + per_check: List[Dict[str, Any]] = [] + raw_chunks: List[str] = [] + timeout_sec = int(self.gcm_config.get("timeout_sec", 60)) + for check in checks: + cmd = [ + "srun", + "--nodes", + str(self.nodes), + "--ntasks", + str(self.nodes), + "--ntasks-per-node", + "1", + "health_checks", + check, + ] + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout_sec, + check=False, + ) + returncode = result.returncode + stdout = result.stdout + stderr = result.stderr + except subprocess.TimeoutExpired as exc: + returncode = 124 + stdout = exc.stdout or "" + stderr = (exc.stderr or "") + "\nhealth check timeout" + per_check.append( + { + "check": check, + "returncode": returncode, + "passed": returncode == 0, + } + ) + raw_chunks.append(f"## {check}\n{stdout}\n{stderr}\n") + + raw_log_path.write_text("\n".join(raw_chunks), encoding="utf-8") + failed = [c for c in per_check if not c["passed"]] + summary = { + "status": "failed" if failed else "passed", + "strict": strict, + "checks": per_check, + "artifact_raw_log": str(raw_log_path), + "artifact_provenance": str(provenance_path), + } + summary_path.write_text(json.dumps(summary, indent=2), encoding="utf-8") + + if failed and strict: + raise RuntimeError( + "GCM strict preflight failed: " + + ", ".join(c["check"] for c in failed) + ) + if failed: + self.console.print( + "[yellow]GCM preflight reported failures; continuing (strict=false).[/yellow]" + ) + def deploy(self) -> DeploymentResult: """Submit sbatch script to SLURM scheduler (locally).""" if not self.script_path or not self.script_path.exists(): @@ -782,6 +1013,20 @@ def deploy(self) -> DeploymentResult: SlurmNodeSelector.cancel_health_check_jobs(health_check_job_name, self.console) # ==================== END PREFLIGHT ==================== + try: + self._run_gcm_health_preflight() + except Exception as e: + strict = bool(self.gcm_config.get("strict", False)) + if strict: + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id="", + message=f"GCM preflight failed: {e}", + ) + self.console.print( + f"[yellow]โš  GCM preflight warning: {e} (strict=false, continuing)[/yellow]" + ) + try: # Submit job to SLURM (runs locally on login node) result = subprocess.run( @@ -1166,7 +1411,21 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: "successful_runs": [], "failed_runs": [], "session_start_row": session_start_row, + "cluster_features": {}, } + if self._gcm_enabled(): + layout = self._gcm_artifact_layout() + health_summary = Path(layout["dir"]) / layout["health_summary_json"] + if health_summary.exists(): + try: + results["cluster_features"]["gcm_health"] = json.loads( + health_summary.read_text(encoding="utf-8") + ) + except json.JSONDecodeError: + results["cluster_features"]["gcm_health"] = { + "status": "error", + "error": "Invalid health summary JSON", + } model_keys = list(self.manifest.get("built_models") or {}) model_key = model_keys[0] if model_keys else None @@ -1189,6 +1448,34 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: job_dir = self.output_dir / model_name_for_path / deployment_id job_dir.mkdir(parents=True, exist_ok=True) + rdma_cfg = self.cluster_config.get("rdma", {}) + if rdma_cfg.get("enabled", False): + rdma_artifact_name = rdma_cfg.get( + "artifact_name", "rdma_recommendation.json" + ) + rdma_entries: List[Dict[str, Any]] = [] + for node_dir in sorted(job_dir.glob("node_*")): + artifact = node_dir / rdma_artifact_name + if not artifact.exists(): + continue + try: + payload = json.loads(artifact.read_text(encoding="utf-8")) + except json.JSONDecodeError: + payload = {"status": "invalid_json"} + rdma_entries.append( + { + "node": node_dir.name, + "artifact": str(artifact), + "status": payload.get("status", "unknown"), + "recommended_env": payload.get("recommended_env", {}), + } + ) + if rdma_entries: + results["cluster_features"]["rdma"] = { + "nodes": rdma_entries, + "artifact_name": rdma_artifact_name, + } + # Gather log content per node: from job_dir/node_N/ (new) or flat output_dir .out files per_node_log_contents: List[tuple] = [] flat_out_files = sorted(self.output_dir.glob(f"madengine-*_{deployment_id}_*.out")) @@ -1340,7 +1627,7 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: self.console.print( f"[green]โœ“ Updated perf.csv, perf_super.* from multiple_results (Docker-compatible)[/green]" ) - return results + return self._attach_gcm_collector_summary(results, deployment_id) # multiple_results set but CSV not found: fall through to single-result path (may write FAILURE) if self.nodes > 1 and per_node_metrics: @@ -1404,7 +1691,7 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: f"{len(results['logs'])} log files[/green]" ) self._collect_results_parse_perf_csv(results, session_start_row) - return results + return self._attach_gcm_collector_summary(results, deployment_id) else: # Multi-node but no metrics parsed - optional failure record if per_node_metrics and model_info_for_entry: @@ -1490,7 +1777,7 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: f"[green]โœ“ Collected results: {len(results['perf_files'])} perf files, " f"{len(results['logs'])} log files[/green]" ) - return results + return self._attach_gcm_collector_summary(results, deployment_id) def _collect_results_parse_perf_csv( self, results: Dict[str, Any], session_start_row: Optional[int] @@ -1527,6 +1814,95 @@ def _collect_results_parse_perf_csv( except Exception as e: self.console.print(f"[yellow]โš  Could not parse perf.csv: {e}[/yellow]") + def _run_gcm_collector_snapshot(self, deployment_id: str) -> Dict[str, Any]: + if not self._gcm_enabled(): + return {"status": "disabled"} + + collector_cfg = self.gcm_config.get("collector", {}) + if not collector_cfg.get("enabled", False): + return {"status": "disabled"} + + self._validate_gcm_allowlist() + layout = self._gcm_artifact_layout() + artifact_dir = Path(layout["dir"]) / deployment_id + artifact_dir.mkdir(parents=True, exist_ok=True) + output_path = artifact_dir / layout["collector_output"] + + collector_cmd = collector_cfg.get("command", "slurm_job_monitor") + timeout_sec = int(collector_cfg.get("timeout_sec", 120)) + max_retries = int(collector_cfg.get("max_retries", 1)) + best_effort = bool(collector_cfg.get("best_effort", True)) + sink = collector_cfg.get("sink", "file") + once_enabled = bool(collector_cfg.get("once", True)) + + if not shutil.which("gcm"): + msg = "gcm binary not found; collector skipped" + if best_effort: + output_path.write_text(msg + "\n", encoding="utf-8") + return {"status": "skipped", "reason": msg, "output": str(output_path)} + raise RuntimeError(msg) + + args = ["gcm", collector_cmd] + if once_enabled: + args.append("--once") + if sink in {"stdout", "file"}: + args.extend(["--sink", sink]) + + attempts = 0 + last_error = "" + while attempts < max_retries: + attempts += 1 + try: + result = subprocess.run( + args, + capture_output=True, + text=True, + timeout=timeout_sec, + check=False, + ) + returncode = result.returncode + stdout = result.stdout + stderr = result.stderr + except subprocess.TimeoutExpired as exc: + returncode = 124 + stdout = exc.stdout or "" + stderr = (exc.stderr or "") + "\ncollector timeout" + output_path.write_text( + f"# attempt={attempts}\n{stdout}\n{stderr}\n", + encoding="utf-8", + ) + if returncode == 0: + return { + "status": "success", + "attempts": attempts, + "output": str(output_path), + "command": " ".join(args), + } + last_error = f"returncode={returncode}" + + if best_effort: + return { + "status": "failed_best_effort", + "attempts": attempts, + "output": str(output_path), + "error": last_error, + } + raise RuntimeError(f"GCM collector failed after {attempts} attempt(s): {last_error}") + + def _attach_gcm_collector_summary( + self, results: Dict[str, Any], deployment_id: str + ) -> Dict[str, Any]: + results.setdefault("cluster_features", {}) + try: + collector_summary = self._run_gcm_collector_snapshot(deployment_id) + results["cluster_features"]["gcm_collector"] = collector_summary + except Exception as e: + results["cluster_features"]["gcm_collector"] = { + "status": "error", + "error": str(e), + } + return results + def cleanup(self, deployment_id: str) -> bool: """Cancel SLURM job if still running (locally).""" try: diff --git a/src/madengine/deployment/templates/kubernetes/job.yaml.j2 b/src/madengine/deployment/templates/kubernetes/job.yaml.j2 index 320d049f..1b6f01e0 100644 --- a/src/madengine/deployment/templates/kubernetes/job.yaml.j2 +++ b/src/madengine/deployment/templates/kubernetes/job.yaml.j2 @@ -180,6 +180,50 @@ spec: export MAD_K8S_NAMESPACE={{ namespace }} export MAD_K8S_JOB=true export MAD_DEPLOYMENT_TYPE=kubernetes + + {% if rdma_enabled %} + echo "" + echo "=== Running RDMA recommender ===" + RDMA_ARTIFACT="/workspace/{{ rdma_artifact_name }}" + RDMA_ENV_FILE="/workspace/rdma_env.list" + if [ -f /workspace/scripts/common/tools/rdma_recommender.py ]; then + if [ "{{ rdma_strict }}" = "True" ]; then + python3 /workspace/scripts/common/tools/rdma_recommender.py \ + --output "${RDMA_ARTIFACT}" \ + --env-file "${RDMA_ENV_FILE}" \ + --strict + else + python3 /workspace/scripts/common/tools/rdma_recommender.py \ + --output "${RDMA_ARTIFACT}" \ + --env-file "${RDMA_ENV_FILE}" + fi + RDMA_EXIT=$? + if [ ${RDMA_EXIT} -ne 0 ]; then + echo "RDMA recommender failed with exit code ${RDMA_EXIT}" + {% if rdma_strict %} + echo "RDMA strict mode enabled; failing pod." + exit ${RDMA_EXIT} + {% else %} + echo "Continuing (strict=false)." + {% endif %} + fi + if [ "{{ rdma_apply_env }}" = "True" ] && [ -f "${RDMA_ENV_FILE}" ]; then + while IFS='=' read -r key value; do + [ -z "${key}" ] && continue + if [ "{{ rdma_mode }}" = "enforce" ] || [ -z "${!key+x}" ]; then + export "${key}=${value}" + fi + done < "${RDMA_ENV_FILE}" + echo "Applied RDMA env vars (mode={{ rdma_mode }})." + fi + else + echo "RDMA recommender script not found in workspace." + {% if rdma_strict %} + echo "RDMA strict mode enabled; failing pod." + exit 2 + {% endif %} + fi + {% endif %} {% if launcher_type == "torchrun" or launcher_type == "deepspeed" or launcher_type == "megatron" or launcher_type == "primus" or launcher_type == "torchtitan" %} # {{ launcher_type }} distributed environment (auto-configured from K8s) @@ -340,6 +384,12 @@ spec: cp perf.csv /results/${HOSTNAME}/perf.csv echo "โœ“ Copied perf.csv" fi + {% if rdma_enabled %} + if [ -f "/workspace/{{ rdma_artifact_name }}" ]; then + cp "/workspace/{{ rdma_artifact_name }}" /results/${HOSTNAME}/ + echo "โœ“ Copied {{ rdma_artifact_name }}" + fi + {% endif %} {% if multiple_results %} # Copy multiple_results CSV (same contract as Docker: file may be in CWD or parent; use absolute paths for reliability) if [ -f "/workspace/{{ multiple_results }}" ]; then @@ -529,6 +579,12 @@ spec: cp perf.csv /results/${HOSTNAME}/perf.csv echo "โœ“ Copied perf.csv" fi + {% if rdma_enabled %} + if [ -f "/workspace/{{ rdma_artifact_name }}" ]; then + cp "/workspace/{{ rdma_artifact_name }}" /results/${HOSTNAME}/ + echo "โœ“ Copied {{ rdma_artifact_name }}" + fi + {% endif %} {% if multiple_results %} # Copy multiple_results CSV (same contract as Docker: file may be in CWD or parent; use absolute paths for reliability) if [ -f "/workspace/{{ multiple_results }}" ]; then diff --git a/src/madengine/deployment/templates/slurm/job.sh.j2 b/src/madengine/deployment/templates/slurm/job.sh.j2 index 5f8e8266..a461f911 100644 --- a/src/madengine/deployment/templates/slurm/job.sh.j2 +++ b/src/madengine/deployment/templates/slurm/job.sh.j2 @@ -305,6 +305,43 @@ echo " CUDA_VISIBLE_DEVICES: ${CUDA_VISIBLE_DEVICES:-not set}" echo " ROCR_VISIBLE_DEVICES: ${ROCR_VISIBLE_DEVICES:-not set}" echo " Node: ${SLURM_NODEID}/${SLURM_NNODES} (Rank ${SLURM_PROCID}/${SLURM_NTASKS})" +{% if rdma_enabled and nodes == 1 %} +# Native RDMA recommender (submission node / single-node path) +echo "" +echo "Running RDMA recommender..." +RDMA_ARTIFACT="${WORKSPACE}/{{ rdma_artifact_name }}" +RDMA_ENV_FILE="${WORKSPACE}/rdma_env.list" +if [ "{{ rdma_strict }}" = "True" ]; then + python3 -m madengine.deployment.rdma_recommender \ + --output "${RDMA_ARTIFACT}" \ + --env-file "${RDMA_ENV_FILE}" \ + --strict +else + python3 -m madengine.deployment.rdma_recommender \ + --output "${RDMA_ARTIFACT}" \ + --env-file "${RDMA_ENV_FILE}" +fi +RDMA_EXIT=$? +if [ ${RDMA_EXIT} -ne 0 ]; then + echo "RDMA recommender failed with exit code ${RDMA_EXIT}" + {% if rdma_strict %} + echo "RDMA strict mode enabled; stopping job." + exit ${RDMA_EXIT} + {% else %} + echo "Continuing (strict=false)." + {% endif %} +fi +if [ "{{ rdma_apply_env }}" = "True" ] && [ -f "${RDMA_ENV_FILE}" ]; then + while IFS='=' read -r key value; do + [ -z "${key}" ] && continue + if [ "{{ rdma_mode }}" = "enforce" ] || [ -z "${!key+x}" ]; then + export "${key}=${value}" + fi + done < "${RDMA_ENV_FILE}" + echo "Applied RDMA environment variables (mode={{ rdma_mode }})." +fi +{% endif %} + # Set deployment environment flags export MAD_IN_SLURM_JOB=1 export MAD_DEPLOYMENT_TYPE=slurm @@ -580,6 +617,41 @@ echo " Manifest: $EXEC_MANIFEST" echo " Command: $MAD_CLI_COMMAND" echo "" +{% if rdma_enabled %} +echo "Running RDMA recommender (node ${SLURM_PROCID})..." +RDMA_ARTIFACT="${WORKSPACE}/{{ rdma_artifact_name }}" +RDMA_ENV_FILE="${WORKSPACE}/rdma_env.list" +if [ "{{ rdma_strict }}" = "True" ]; then + python3 -m madengine.deployment.rdma_recommender \ + --output "${RDMA_ARTIFACT}" \ + --env-file "${RDMA_ENV_FILE}" \ + --strict +else + python3 -m madengine.deployment.rdma_recommender \ + --output "${RDMA_ARTIFACT}" \ + --env-file "${RDMA_ENV_FILE}" +fi +RDMA_EXIT=$? +if [ ${RDMA_EXIT} -ne 0 ]; then + echo "RDMA recommender failed with exit code ${RDMA_EXIT}" + {% if rdma_strict %} + echo "RDMA strict mode enabled; failing node task." + exit ${RDMA_EXIT} + {% else %} + echo "Continuing (strict=false)." + {% endif %} +fi +if [ "{{ rdma_apply_env }}" = "True" ] && [ -f "${RDMA_ENV_FILE}" ]; then + while IFS='=' read -r key value; do + [ -z "${key}" ] && continue + if [ "{{ rdma_mode }}" = "enforce" ] || [ -z "${!key+x}" ]; then + export "${key}=${value}" + fi + done < "${RDMA_ENV_FILE}" + echo "Applied RDMA environment variables (mode={{ rdma_mode }})." +fi +{% endif %} + # Execute madengine echo "Executing madengine in LOCAL mode..." @@ -687,6 +759,9 @@ if [ $TASK_EXIT -eq 0 ]; then for f in "$WORKSPACE"/*_env.csv "$WORKSPACE"/*.log "$WORKSPACE"/multi_results*.csv "$WORKSPACE"/results* "$WORKSPACE"/gpu_info_*.csv "$WORKSPACE"/library_trace.csv; do if [ -f "$f" ]; then cp "$f" "$NODE_COLLECTION_DIR/" 2>/dev/null || true; fi done + {% if rdma_enabled %} + if [ -f "$WORKSPACE/{{ rdma_artifact_name }}" ]; then cp "$WORKSPACE/{{ rdma_artifact_name }}" "$NODE_COLLECTION_DIR/" 2>/dev/null || true; fi + {% endif %} {% if multiple_results %} if [ -f "$WORKSPACE/run_directory/{{ multiple_results }}" ]; then cp "$WORKSPACE/run_directory/{{ multiple_results }}" "$NODE_COLLECTION_DIR/" 2>/dev/null || true; fi if [ -f "$WORKSPACE/{{ multiple_results }}" ]; then cp "$WORKSPACE/{{ multiple_results }}" "$NODE_COLLECTION_DIR/" 2>/dev/null || true; fi @@ -703,6 +778,9 @@ else # Copy node logs so collect_results can still inspect them if [ -f "${NODE_LOG_OUT}" ]; then cp "${NODE_LOG_OUT}" "$NODE_COLLECTION_DIR/stdout.out" 2>/dev/null || true; fi if [ -f "${NODE_LOG_ERR}" ]; then cp "${NODE_LOG_ERR}" "$NODE_COLLECTION_DIR/stderr.err" 2>/dev/null || true; fi + {% if rdma_enabled %} + if [ -f "$WORKSPACE/{{ rdma_artifact_name }}" ]; then cp "$WORKSPACE/{{ rdma_artifact_name }}" "$NODE_COLLECTION_DIR/" 2>/dev/null || true; fi + {% endif %} fi exit $TASK_EXIT @@ -734,6 +812,41 @@ echo " MAD_MULTI_NODE_RUNNER: ${MAD_MULTI_NODE_RUNNER}" echo "==========================================" echo "" +{% if rdma_enabled %} +echo "Running RDMA recommender..." +RDMA_ARTIFACT="${WORKSPACE}/{{ rdma_artifact_name }}" +RDMA_ENV_FILE="${WORKSPACE}/rdma_env.list" +if [ "{{ rdma_strict }}" = "True" ]; then + python3 -m madengine.deployment.rdma_recommender \ + --output "${RDMA_ARTIFACT}" \ + --env-file "${RDMA_ENV_FILE}" \ + --strict +else + python3 -m madengine.deployment.rdma_recommender \ + --output "${RDMA_ARTIFACT}" \ + --env-file "${RDMA_ENV_FILE}" +fi +RDMA_EXIT=$? +if [ ${RDMA_EXIT} -ne 0 ]; then + echo "RDMA recommender failed with exit code ${RDMA_EXIT}" + {% if rdma_strict %} + echo "RDMA strict mode enabled; stopping job." + exit ${RDMA_EXIT} + {% else %} + echo "Continuing (strict=false)." + {% endif %} +fi +if [ "{{ rdma_apply_env }}" = "True" ] && [ -f "${RDMA_ENV_FILE}" ]; then + while IFS='=' read -r key value; do + [ -z "${key}" ] && continue + if [ "{{ rdma_mode }}" = "enforce" ] || [ -z "${!key+x}" ]; then + export "${key}=${value}" + fi + done < "${RDMA_ENV_FILE}" + echo "Applied RDMA environment variables (mode={{ rdma_mode }})." +fi +{% endif %} + echo "Executing madengine in LOCAL mode (inside SLURM job)" echo " Command: $MAD_CLI_COMMAND" echo "" @@ -754,6 +867,9 @@ if [ $EXIT_CODE -eq 0 ]; then JOB_COLLECTION_DIR="${SUBMISSION_DIR}/{{ output_dir }}/{{ model_name }}/${SLURM_JOB_ID}" NODE_COLLECTION_DIR="${JOB_COLLECTION_DIR}/node_0" mkdir -p "$NODE_COLLECTION_DIR" + {% if rdma_enabled %} + if [ -f "$WORKSPACE/{{ rdma_artifact_name }}" ]; then cp "$WORKSPACE/{{ rdma_artifact_name }}" "$NODE_COLLECTION_DIR/" 2>/dev/null || true; fi + {% endif %} if [ -f "$WORKSPACE/run_directory/{{ multiple_results }}" ]; then cp "$WORKSPACE/run_directory/{{ multiple_results }}" "$NODE_COLLECTION_DIR/" 2>/dev/null || true; fi if [ -f "$WORKSPACE/{{ multiple_results }}" ]; then cp "$WORKSPACE/{{ multiple_results }}" "$NODE_COLLECTION_DIR/" 2>/dev/null || true; fi fi diff --git a/src/madengine/orchestration/build_orchestrator.py b/src/madengine/orchestration/build_orchestrator.py index da06f91f..6a0fda21 100644 --- a/src/madengine/orchestration/build_orchestrator.py +++ b/src/madengine/orchestration/build_orchestrator.py @@ -442,6 +442,7 @@ def _save_deployment_config(self, manifest_file: str): "kubernetes": self.additional_context.get("kubernetes"), "distributed": self.additional_context.get("distributed"), "vllm": self.additional_context.get("vllm"), + "cluster": self.additional_context.get("cluster"), "env_vars": env_vars, "debug": self.additional_context.get("debug", False), } diff --git a/src/madengine/orchestration/run_orchestrator.py b/src/madengine/orchestration/run_orchestrator.py index 6725a457..561fa900 100644 --- a/src/madengine/orchestration/run_orchestrator.py +++ b/src/madengine/orchestration/run_orchestrator.py @@ -231,7 +231,7 @@ def execute( self.additional_context = {} # Merge deployment_config into additional_context (for deployment layer to use) - for key in ["slurm", "k8s", "kubernetes", "distributed", "vllm", "env_vars", "debug"]: + for key in ["slurm", "k8s", "kubernetes", "distributed", "vllm", "cluster", "env_vars", "debug"]: if key in deployment_config and key not in self.additional_context: self.additional_context[key] = deployment_config[key] @@ -431,13 +431,28 @@ def _create_manifest_from_local_image( ) # Create manifest structure + deployment_config = self.additional_context.get("deployment_config", {}).copy() + if not deployment_config and self.additional_context: + deployment_config = { + "target": self._infer_deployment_target(self.additional_context), + "slurm": self.additional_context.get("slurm"), + "k8s": self.additional_context.get("k8s"), + "kubernetes": self.additional_context.get("kubernetes"), + "distributed": self.additional_context.get("distributed"), + "vllm": self.additional_context.get("vllm"), + "cluster": self.additional_context.get("cluster"), + "env_vars": self.additional_context.get("env_vars", {}), + "debug": self.additional_context.get("debug", False), + } + deployment_config = {k: v for k, v in deployment_config.items() if v is not None} + manifest = { "built_images": {}, "built_models": {}, "context": build_context.ctx, "local_image_mode": True, "local_image_name": image_name, - "deployment_config": self.additional_context.get("deployment_config", {}), + "deployment_config": deployment_config, } # For each model, create a synthetic entry using the provided image @@ -506,7 +521,7 @@ def _load_and_merge_manifest(self, manifest_file: str) -> str: if "deployment_config" in manifest: stored_config = manifest["deployment_config"] # Runtime --additional-context overrides stored config - for key in ["deploy", "slurm", "k8s", "kubernetes", "distributed", "vllm", "env_vars", "debug"]: + for key in ["deploy", "slurm", "k8s", "kubernetes", "distributed", "vllm", "cluster", "env_vars", "debug"]: if key in self.additional_context: stored_config[key] = self.additional_context[key] manifest["deployment_config"] = stored_config