From ed2258e91fb5f05ed6e82b8c37be2cad4a6ae9db Mon Sep 17 00:00:00 2001 From: Mikhail Kuznetsov Date: Thu, 23 Apr 2026 11:26:20 +0000 Subject: [PATCH 1/8] feat(cli,slurm): isolate MODEL_DIR side effects for lightweight CLI invocations - core/constants.py: detect --version/--help via new _is_lightweight_cli_invocation() helper and skip MAD_SETUP_MODEL_DIR side effects; keep --version/--help output clean even when MAD_VERBOSE_CONFIG=true. - slurm job.sh.j2: run preflight 'madengine --version' and 'madengine --help' probes under env -u MODEL_DIR on both single-node and multi-node paths so the probes don't inherit MODEL_DIR and trigger file copies. Made-with: Cursor --- src/madengine/core/constants.py | 18 ++++++++++++++++-- .../deployment/templates/slurm/job.sh.j2 | 18 ++++++++++++++---- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/madengine/core/constants.py b/src/madengine/core/constants.py index 8ca10483..15d1c545 100644 --- a/src/madengine/core/constants.py +++ b/src/madengine/core/constants.py @@ -26,17 +26,28 @@ import os import json import logging +import sys # Utility function for optional verbose logging of configuration def _log_config_info(message: str, force_print: bool = False): """Log configuration information either to logger or print if specified.""" + # Keep --version/--help output clean even if MAD_VERBOSE_CONFIG=true. + if any(arg in {"--version", "-V", "--help", "-h"} for arg in sys.argv[1:]): + logging.debug(message) + return if force_print or os.environ.get("MAD_VERBOSE_CONFIG", "").lower() == "true": print(message) else: logging.debug(message) +def _is_lightweight_cli_invocation() -> bool: + """Return True for metadata/help invocations that should avoid side effects.""" + lightweight_flags = {"--version", "-V", "--help", "-h"} + return any(arg in lightweight_flags for arg in sys.argv[1:]) + + # third-party modules from madengine.core.console import Console @@ -65,9 +76,12 @@ def _setup_model_dir(): _log_config_info(f"Model dir: {MODEL_DIR} copied to current dir: {cwd_abs}") -# Only setup model directory if explicitly requested (when not just importing for constants) +# Only setup model directory if explicitly requested and invocation is not metadata-only. if os.environ.get("MAD_SETUP_MODEL_DIR", "").lower() == "true": - _setup_model_dir() + if _is_lightweight_cli_invocation(): + _log_config_info("Skipping MODEL_DIR setup for lightweight CLI invocation (--version/--help).") + else: + _setup_model_dir() # madengine credentials configuration CRED_FILE = "credential.json" diff --git a/src/madengine/deployment/templates/slurm/job.sh.j2 b/src/madengine/deployment/templates/slurm/job.sh.j2 index 5f8e8266..56115251 100644 --- a/src/madengine/deployment/templates/slurm/job.sh.j2 +++ b/src/madengine/deployment/templates/slurm/job.sh.j2 @@ -213,7 +213,8 @@ fi echo "" echo "Verifying madengine availability..." if command -v madengine >/dev/null 2>&1; then - MAD_CLI_VERSION=$(madengine --version 2>&1 | head -n1 || echo "unknown") + # MODEL_DIR can trigger side effects in madengine startup; unset it for preflight probes only. + MAD_CLI_VERSION=$(env -u MODEL_DIR madengine --version 2>&1 | head -n1 || echo "unknown") MAD_CLI_PATH=$(which madengine 2>/dev/null || echo "unknown") echo " ✓ madengine available" @@ -221,7 +222,7 @@ if command -v madengine >/dev/null 2>&1; then echo " Path: $MAD_CLI_PATH" # Verify it's executable - if madengine --help >/dev/null 2>&1; then + if env -u MODEL_DIR madengine --help >/dev/null 2>&1; then export MAD_CLI_COMMAND="madengine" else echo " ❌ ERROR: madengine found but not functional!" @@ -488,7 +489,11 @@ trap 'ec=$?; echo "[DEBUG] $(date -Iseconds) Node ${SLURM_PROCID} ($(hostname)): echo "Verifying madengine availability..." if command -v madengine >/dev/null 2>&1; then - MAD_CLI_VERSION=$(madengine --version 2>&1 | head -n1 || echo "unknown") + # MODEL_DIR can trigger side effects in madengine startup; isolate it for preflight probes. + set +e + MAD_VERSION_RAW_SANITIZED=$(env -u MODEL_DIR madengine --version 2>&1) + set -e + MAD_CLI_VERSION=$(printf "%s" "$MAD_VERSION_RAW_SANITIZED" | head -n1 || echo "unknown") MAD_CLI_PATH=$(which madengine 2>/dev/null || echo "unknown") echo "✓ madengine available" @@ -496,7 +501,12 @@ if command -v madengine >/dev/null 2>&1; then echo " Path: $MAD_CLI_PATH" # Verify it's executable - if madengine --help >/dev/null 2>&1; then + set +e + MAD_HELP_RAW_SANITIZED=$(env -u MODEL_DIR madengine --help 2>&1) + MAD_HELP_EXIT_SANITIZED=$? + set -e + + if [ "${MAD_HELP_EXIT_SANITIZED}" -eq 0 ]; then echo " ✓ Verified: madengine is functional" MAD_CLI_COMMAND="madengine" else From d703c6e40d31b09ed7c73a9422e3ac744b326c9d Mon Sep 17 00:00:00 2001 From: Mikhail Kuznetsov Date: Thu, 23 Apr 2026 11:26:57 +0000 Subject: [PATCH 2/8] feat(orchestration): restore runtime context fields from manifest on execution When replaying a manifest on a compute node, also hydrate docker_mounts, docker_build_arg, docker_gpus, gpu_vendor, and guest_os from the manifest context if not already present in the runtime context. Runtime-detected values keep priority; manifest values only fill in missing keys. This lets distributed runs reuse host path mounts and build args that were resolved at manifest-generation time but are not re-detected on compute nodes. Made-with: Cursor --- .../orchestration/run_orchestrator.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/madengine/orchestration/run_orchestrator.py b/src/madengine/orchestration/run_orchestrator.py index 6725a457..368bad8a 100644 --- a/src/madengine/orchestration/run_orchestrator.py +++ b/src/madengine/orchestration/run_orchestrator.py @@ -559,6 +559,27 @@ def _execute_local(self, manifest_file: str, timeout: int) -> Dict: # Restore context from manifest if present if "context" in manifest: manifest_context = manifest["context"] + # Restore host-level runtime context fields from manifest. + # Keep runtime-detected values as priority; bring missing keys from manifest + # (especially docker_mounts for host path visibility on compute nodes). + if "docker_mounts" in manifest_context: + if "docker_mounts" not in self.context.ctx: + self.context.ctx["docker_mounts"] = {} + for container_path, host_path in manifest_context["docker_mounts"].items(): + if container_path not in self.context.ctx["docker_mounts"]: + self.context.ctx["docker_mounts"][container_path] = host_path + if "docker_build_arg" in manifest_context: + if "docker_build_arg" not in self.context.ctx: + self.context.ctx["docker_build_arg"] = {} + for key, value in manifest_context["docker_build_arg"].items(): + if key not in self.context.ctx["docker_build_arg"]: + self.context.ctx["docker_build_arg"][key] = value + if "docker_gpus" in manifest_context and "docker_gpus" not in self.context.ctx: + self.context.ctx["docker_gpus"] = manifest_context["docker_gpus"] + if "gpu_vendor" in manifest_context and "gpu_vendor" not in self.context.ctx: + self.context.ctx["gpu_vendor"] = manifest_context["gpu_vendor"] + if "guest_os" in manifest_context and "guest_os" not in self.context.ctx: + self.context.ctx["guest_os"] = manifest_context["guest_os"] if "tools" in manifest_context: self.context.ctx["tools"] = manifest_context["tools"] if "pre_scripts" in manifest_context: From 0d73748ff9daa1510d0bffecb0e23f92b2fa9137 Mon Sep 17 00:00:00 2001 From: Mikhail Kuznetsov Date: Thu, 23 Apr 2026 11:28:41 +0000 Subject: [PATCH 3/8] feat(execution): deduplicate docker mounts and widen SLURM env var pass-through - Add ContainerRunner._extract_additional_mount_targets() to parse -v/--volume tokens out of model_info.additional_docker_run_options, and have get_mount_arg skip context-mounted paths whose container target is already covered, fixing "Duplicate mount point" errors from docker run. - Extend the SLURM shell-to-docker env var pass-through list with disaggregated launcher and benchmarking vars (MODEL_NAME, MODEL_DIR, xP, yD, PD_SYNC_ROOT, PD_RUN_ID, PROXY_TYPE, ROUTER_PORT, BENCHMARK_PORT, SLURM_JOB_ID, OUTPUT_DIR, BARRIER_TIMEOUT_S, PROXY_CLOSE_TIMEOUT_S, REQUIRE_RDMA, KV_UCX_TLS, KV_UCX_SOCKADDR_TLS_PRIORITY) so vLLM/sglang disagg workloads work end-to-end. - Add small ContainerRunner._get_build_args() helper that converts context.docker_build_arg into a docker --build-arg string; used by upcoming local-image build-from-manifest support. Made-with: Cursor --- src/madengine/execution/container_runner.py | 74 ++++++++++++++++++++- 1 file changed, 71 insertions(+), 3 deletions(-) diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index d5f27cf0..d1862acc 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -607,9 +607,62 @@ def get_env_arg(self, run_env: typing.Dict) -> str: print(f"Env arguments: {env_args}") return env_args - def get_mount_arg(self, mount_datapaths: typing.List) -> str: + def _get_build_args(self) -> str: + """Build ``docker build --build-arg`` string from ``docker_build_arg`` context.""" + docker_build_arg = self.context.ctx.get("docker_build_arg", {}) if self.context else {} + if not docker_build_arg: + return "" + build_args = "" + for key, value in docker_build_arg.items(): + build_args += f"--build-arg {key}='{value}' " + return build_args + + def _extract_additional_mount_targets(self, additional_opts: str) -> typing.Set[str]: + """Extract container-side mount targets from free-form docker run options. + + Parses ``-v`` / ``--volume`` tokens from ``additional_docker_run_options`` + and returns the set of container paths already being mounted, so that + :meth:`get_mount_arg` can skip duplicates that would otherwise cause + docker to reject the run ("Duplicate mount point"). + """ + targets: typing.Set[str] = set() + if not additional_opts: + return targets + try: + tokens = shlex.split(additional_opts) + except Exception: + return targets + + i = 0 + while i < len(tokens): + token = tokens[i] + if token in ("-v", "--volume") and i + 1 < len(tokens): + spec = tokens[i + 1] + i += 2 + elif token.startswith("-v") and len(token) > 2: + spec = token[2:] + i += 1 + elif token.startswith("--volume="): + spec = token.split("=", 1)[1] + i += 1 + else: + i += 1 + continue + + # spec format: /host:/container[:mode] + parts = spec.split(":") + if len(parts) >= 2: + targets.add(parts[1]) + return targets + + def get_mount_arg( + self, + mount_datapaths: typing.List, + excluded_container_targets: typing.Optional[typing.Set[str]] = None, + ) -> str: """Get the mount arguments for docker run.""" mount_args = "" + excluded_container_targets = excluded_container_targets or set() # Mount data paths if mount_datapaths: @@ -629,6 +682,10 @@ def get_mount_arg(self, mount_datapaths: typing.List) -> str: # Mount context paths if "docker_mounts" in self.context.ctx: for mount_arg in self.context.ctx["docker_mounts"].keys(): + # Avoid duplicate mount points when additional_docker_run_options + # already mounts the same container target. + if mount_arg in excluded_container_targets: + continue mount_args += ( f"-v {self.context.ctx['docker_mounts'][mount_arg]}:{mount_arg} " ) @@ -902,6 +959,12 @@ def run_container( 'PRIMUS_CONFIG_PATH', 'PRIMUS_CLI_EXTRA', # Rendezvous timeout so all nodes can join after pull 'TORCH_ELASTIC_RDZV_TIMEOUT', + # Workload-level settings commonly provided via deployment_config.env_vars + # (required for disaggregated launchers like vLLM / SGLang disagg) + 'MODEL_NAME', 'MODEL_DIR', 'xP', 'yD', 'PD_SYNC_ROOT', 'PD_RUN_ID', + 'PROXY_TYPE', 'ROUTER_PORT', 'BENCHMARK_PORT', 'SLURM_JOB_ID', + 'OUTPUT_DIR', 'BARRIER_TIMEOUT_S', 'PROXY_CLOSE_TIMEOUT_S', + 'REQUIRE_RDMA', 'KV_UCX_TLS', 'KV_UCX_SOCKADDR_TLS_PRIORITY', # GPU visibility variables for Ray-based launchers (vLLM, SGLang) # CRITICAL: These must be passed to Docker for proper GPU device mapping 'HIP_VISIBLE_DEVICES', 'ROCR_VISIBLE_DEVICES', 'CUDA_VISIBLE_DEVICES' @@ -978,9 +1041,14 @@ def run_container( self.context.ctx["docker_env_vars"]["MIOPEN_USER_DB_PATH"] = os.environ["MIOPEN_USER_DB_PATH"] print(f"ℹ️ Added MIOPEN_USER_DB_PATH to docker_env_vars: {os.environ['MIOPEN_USER_DB_PATH']}") + additional_docker_run_options = model_info.get("additional_docker_run_options", "") + additional_mount_targets = self._extract_additional_mount_targets(additional_docker_run_options) docker_options += self.get_env_arg(run_env) - docker_options += self.get_mount_arg(mount_datapaths) - docker_options += f" {model_info.get('additional_docker_run_options', '')}" + docker_options += self.get_mount_arg( + mount_datapaths, + excluded_container_targets=additional_mount_targets, + ) + docker_options += f" {additional_docker_run_options}" # Generate container name base_container_name = "container_" + re.sub( From b8fb50d0c00d031bc965ad69860759ff5209d8d5 Mon Sep 17 00:00:00 2001 From: Mikhail Kuznetsov Date: Thu, 23 Apr 2026 11:30:13 +0000 Subject: [PATCH 4/8] feat(execution): collect container diagnostics on model script failure - Wrap the model script's 'docker exec' with a try/except RuntimeError and, on failure, probe the failing container for a process table, listening socket snapshot (ss / netstat / lsof), and tails of /run_logs and /myworkspace/ logs via Console.sh before re-raising. Each probe is independently best-effort and never suppresses the original failure. - Extend benign log patterns in the failure-pattern scanner with rocEnvTool timeouts that don't affect run correctness ('RuntimeError: Console script timeout', 'rocEnvTool/console.py', 'rocEnvTool/rocenv_tool.py'). - Stage extra workload-level artifacts (perf_*.csv, perf-*.csv, benchmark_*_CONCURRENCY.log) from , /workdir and /run_logs/$SLURM_JOB_ID into cwd before run_directory cleanup so SLURM per-node result collection finds them for disagg launchers. Made-with: Cursor --- src/madengine/execution/container_runner.py | 83 ++++++++++++++++++++- 1 file changed, 79 insertions(+), 4 deletions(-) diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index d1862acc..a182dbd3 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -1255,10 +1255,61 @@ def run_container( ) # Use the container timeout (default 7200s) for script execution # to prevent indefinite hangs - model_output = model_docker.sh( - f"cd {model_dir} && {script_name} {model_args}", - timeout=timeout, - ) + try: + model_output = model_docker.sh( + f"cd {model_dir} && {script_name} {model_args}", + timeout=timeout, + ) + except RuntimeError as run_err: + # On script failure, collect lightweight diagnostics from the + # running container (process table, listening ports, log tails). + # These are printed via Console.sh so they land in the run log + # alongside the failure. Failures here are non-fatal. + run_err_str = str(run_err) + container_id_match = re.search( + r"docker exec\s+([a-f0-9]+)\s+bash", + run_err_str, + ) + failed_container_id = ( + container_id_match.group(1) + if container_id_match + else None + ) + if failed_container_id: + try: + self.console.sh( + f"docker exec {failed_container_id} bash -lc " + f"\"ps -eo pid,ppid,stat,etime,cmd | sed -n '1,160p'\"", + timeout=20, + ) + except Exception: + pass + try: + self.console.sh( + f"docker exec {failed_container_id} bash -lc " + f"\"(ss -lntp 2>/dev/null || netstat -lntp 2>/dev/null " + f"|| lsof -nP -iTCP -sTCP:LISTEN 2>/dev/null || true) " + f"| sed -n '1,200p'\"", + timeout=20, + ) + except Exception: + pass + try: + self.console.sh( + f"docker exec {failed_container_id} bash -lc " + f"\"for d in /run_logs /run_logs/${{SLURM_JOB_ID:-}} " + f"/myworkspace/{model_dir}; do " + f"if [ -d \\\"$d\\\" ]; then echo ===DIR:$d===; " + f"ls -lah \\\"$d\\\" | sed -n '1,80p'; fi; done; " + f"for f in /run_logs/*.log /run_logs/${{SLURM_JOB_ID:-}}/*.log " + f"/myworkspace/{model_dir}/*.log; do " + f"if [ -f \\\"$f\\\" ]; then echo ===$f===; " + f"tail -n 80 \\\"$f\\\"; fi; done\"", + timeout=30, + ) + except Exception: + pass + raise # When live_output is True, Console.sh() already streamed the output; avoid duplicate print. if not self.live_output: print(model_output) @@ -1412,6 +1463,10 @@ def run_container( "RpcError: Running out of retries to initialize the metrics agent", "Metrics will not be exported", "FutureWarning", + # rocEnvTool pre-script can timeout rocm-smi without affecting run correctness. + "RuntimeError: Console script timeout", + "rocEnvTool/console.py", + "rocEnvTool/rocenv_tool.py", "Opened result file:", "SQLite3 generation ::", "rocpd_op:", @@ -1648,6 +1703,26 @@ def run_container( model_docker.sh( _cp_model_dir_file_to_cwd_cmd(model_dir, "library_trace.csv") ) + # Additionally stage workload-level perf/benchmark artifacts that + # some frameworks (e.g. vLLM/SGLang disagg) write under workdir or + # /run_logs, so SLURM per-node result collection can find them. + _md_q = _bash_quote_path(_md) + _cwd_q = _bash_quote_path(".") + model_docker.sh(f"cp -- {_md_q}/perf_*.csv {_cwd_q} 2>/dev/null || true") + model_docker.sh(f"cp -- {_md_q}/perf-*.csv {_cwd_q} 2>/dev/null || true") + model_docker.sh( + f"cp -- {_md_q}/benchmark_*_CONCURRENCY.log {_cwd_q} 2>/dev/null || true" + ) + model_docker.sh(f"cp -- {_md_q}/workdir/perf_*.csv {_cwd_q} 2>/dev/null || true") + model_docker.sh(f"cp -- {_md_q}/workdir/perf-*.csv {_cwd_q} 2>/dev/null || true") + model_docker.sh( + f"cp -- {_md_q}/workdir/benchmark_*_CONCURRENCY.log {_cwd_q} 2>/dev/null || true" + ) + slurm_job_id = os.environ.get("SLURM_JOB_ID", "*") + model_docker.sh( + f"cp -- /run_logs/{shlex.quote(slurm_job_id)}/benchmark_*_CONCURRENCY.log " + f"{_cwd_q} 2>/dev/null || true" + ) except Exception as e: # Ignore errors if no profiler/trace output files exist pass From e89087d9f2f82d078554060f3ede186ff90ab0b4 Mon Sep 17 00:00:00 2001 From: Mikhail Kuznetsov Date: Thu, 23 Apr 2026 11:32:08 +0000 Subject: [PATCH 5/8] feat(execution): build missing local images from manifest and sync nodes via TCP barrier In distributed (multi-node) runs with --manifest-file, the pre-built local image may not be present on every compute node (e.g. when MAD_CONTAINER_IMAGE was built only on the submission node and no shared image registry is used). Previously, such nodes would fall back to `docker pull`, which fails for images that exist only locally. Add a self-healing path that rebuilds the image from the manifest's Dockerfile on the current node when it is missing, and fall back to pull only if the build attempt itself fails. Introduce a TCP-based rendezvous barrier between NODE_RANK=0 and worker nodes so that all nodes finish image preparation before any container starts, without requiring shared filesystem visibility between nodes. Made-with: Cursor --- src/madengine/execution/container_runner.py | 235 +++++++++++++++++++- 1 file changed, 228 insertions(+), 7 deletions(-) diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index a182dbd3..56f0b502 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -10,6 +10,7 @@ import os import re import shlex +import socket import subprocess import time import json @@ -617,6 +618,205 @@ def _get_build_args(self) -> str: build_args += f"--build-arg {key}='{value}' " return build_args + def _build_local_image_from_manifest( + self, run_image: str, build_info: typing.Dict, model_info: typing.Dict + ) -> None: + """Build ``run_image`` on the current compute node using its manifest dockerfile. + + Used by ``run --manifest-file`` in distributed mode when the local image + is not present on a compute node and pulling is not desired or possible. + """ + dockerfile = build_info.get("dockerfile", "") + if not dockerfile or dockerfile == "N/A (local image mode)": + raise RuntimeError( + f"Cannot build image {run_image}: dockerfile is missing in manifest" + ) + + if not os.path.exists(dockerfile): + raise RuntimeError( + f"Cannot build image {run_image}: dockerfile not found at '{dockerfile}'" + ) + + docker_context = model_info.get("dockercontext", "") or "./docker" + if not os.path.exists(docker_context): + # Fallback to dockerfile directory if the provided context path is unavailable + # on the compute node (e.g. workspace layout differs from submission node). + docker_context = os.path.dirname(dockerfile) or "." + + build_args = self._get_build_args() + build_command = ( + f"docker build --network=host -t {run_image} --pull -f {dockerfile} " + f"{build_args}{docker_context}" + ) + + self.rich_console.print( + f"[yellow]🔨 Building missing local image on this node:[/yellow] {run_image}" + ) + self.rich_console.print(f"[dim] Dockerfile: {dockerfile}[/dim]") + self.rich_console.print(f"[dim] Context: {docker_context}[/dim]") + self.console.sh(build_command, timeout=None) + self.console.sh(f"docker image inspect {run_image} > /dev/null 2>&1") + self.rich_console.print( + f"[green]✅ Built local image on this node:[/green] {run_image}" + ) + + def _sync_after_local_image_ready( + self, run_image: str, timeout_s: int = 1800 + ) -> None: + """Barrier for multi-node local-image runs so all nodes continue together. + + Relies on a TCP rendezvous between ``NODE_RANK=0`` and worker nodes so + that no shared filesystem visibility is required. No-op for single-node + runs (``NNODES<=1``). + """ + nnodes_raw = os.environ.get("NNODES") or os.environ.get("WORLD_SIZE") or "1" + node_rank = os.environ.get("NODE_RANK") or os.environ.get("RANK") or "0" + try: + nnodes = int(nnodes_raw) + except Exception: + nnodes = 1 + if nnodes <= 1: + return + + self._tcp_image_ready_barrier( + nnodes=nnodes, + node_rank=node_rank, + timeout_s=timeout_s, + ) + + def _tcp_image_ready_barrier( + self, nnodes: int, node_rank: str, timeout_s: int + ) -> None: + """TCP rendezvous barrier that does not require shared filesystem visibility. + + Node 0 listens on one of ``candidate_ports`` derived from ``MASTER_PORT`` + and ``SLURM_JOB_ID``; workers send ``READY `` and wait for + ``GO ``. The port range and token defend against multiple + concurrent jobs reusing the same master host. + """ + master_addr = os.environ.get("MASTER_ADDR", "127.0.0.1") + job_id_raw = os.environ.get("SLURM_JOB_ID", "0") + try: + job_id = int(job_id_raw) + except Exception: + job_id = 0 + token = f"JOB{job_id}" + master_port_raw = os.environ.get("MASTER_PORT", "29500") + try: + master_port = int(master_port_raw) + except Exception: + master_port = 29500 + base_port = 43000 + ((master_port + job_id) % 1000) + candidate_ports = [base_port + i for i in range(0, 16)] + deadline = time.time() + timeout_s + rank_int = int(node_rank) + + if rank_int == 0: + accepted = 0 + peers = [] + waiting: typing.Dict[int, socket.socket] = {} + server = None + port = None + try: + bind_errors = [] + for candidate in candidate_ports: + trial = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + trial.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + trial.bind(("0.0.0.0", candidate)) + server = trial + port = candidate + break + except Exception as e: + bind_errors.append({"port": candidate, "error": str(e)}) + try: + trial.close() + except Exception: + pass + if server is None or port is None: + raise RuntimeError( + f"TCP barrier bind failed on all candidate ports: {bind_errors}" + ) + server.listen(max(1, nnodes - 1)) + server.settimeout(2.0) + while accepted < max(0, nnodes - 1) and time.time() < deadline: + try: + conn, addr = server.accept() + conn.settimeout(2.0) + payload = conn.recv(128).decode("utf-8", errors="ignore").strip() + parts = payload.split() + if len(parts) != 3 or parts[0] != "READY" or parts[1] != token: + conn.close() + continue + try: + worker_rank = int(parts[2]) + except Exception: + conn.close() + continue + if worker_rank <= 0 or worker_rank >= nnodes: + conn.close() + continue + if worker_rank in waiting: + try: + waiting[worker_rank].close() + except Exception: + pass + waiting[worker_rank] = conn + peers.append(f"{addr[0]}:r{worker_rank}") + accepted = len(waiting) + except socket.timeout: + continue + if accepted < max(0, nnodes - 1): + raise RuntimeError( + f"TCP barrier timeout on master: accepted={accepted}/" + f"{max(0, nnodes - 1)} port={port}" + ) + for worker_rank, conn in waiting.items(): + try: + conn.sendall(f"GO {token} {worker_rank}\n".encode("utf-8")) + finally: + try: + conn.close() + except Exception: + pass + return + finally: + try: + if server is not None: + server.close() + except Exception: + pass + + last_error = "" + connect_attempts = 0 + while time.time() < deadline: + for candidate in candidate_ports: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + connect_attempts += 1 + try: + sock.settimeout(1.5) + sock.connect((master_addr, candidate)) + sock.sendall(f"READY {token} {node_rank}\n".encode("utf-8")) + remaining_s = max(1.0, deadline - time.time()) + sock.settimeout(remaining_s) + ack = sock.recv(128).decode("utf-8", errors="ignore").strip() + if ack == f"GO {token} {node_rank}": + return + last_error = f"unexpected_ack={ack!r} port={candidate}" + except Exception as e: + last_error = f"{e} port={candidate}" + finally: + try: + sock.close() + except Exception: + pass + time.sleep(1) + + raise RuntimeError( + f"TCP barrier timeout on worker rank={node_rank} master={master_addr} " + f"ports={candidate_ports} attempts={connect_attempts} last_error={last_error}" + ) + def _extract_additional_mount_targets(self, additional_opts: str) -> typing.Set[str]: """Extract container-side mount targets from free-form docker run options. @@ -1889,16 +2089,37 @@ def run_models_from_manifest( # Local image mode (MAD_CONTAINER_IMAGE): Use the provided image directly run_image = build_info.get("docker_image") self.rich_console.print(f"[yellow]🏠 Using local image: {run_image}[/yellow]") - - # Verify image exists + + # Verify image exists; if missing, build from manifest dockerfile on + # the current compute node first, and fall back to pull only if the + # build attempt fails. This makes distributed runs self-sufficient + # when the pre-built image has not been staged to every node. try: self.console.sh(f"docker image inspect {run_image} > /dev/null 2>&1") - except (subprocess.CalledProcessError, RuntimeError) as e: - self.rich_console.print(f"[yellow]⚠️ Image {run_image} not found, attempting to pull...[/yellow]") + except (subprocess.CalledProcessError, RuntimeError): + self.rich_console.print( + f"[yellow]⚠️ Image {run_image} not found on this node.[/yellow]" + ) try: - self.pull_image(run_image) - except Exception as e: - raise RuntimeError(f"Failed to find or pull local image {run_image}: {e}") + self._build_local_image_from_manifest( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) + except Exception as build_error: + self.rich_console.print( + "[yellow]⚠️ Local build failed, attempting pull as fallback...[/yellow]" + ) + try: + self.pull_image(run_image) + except Exception as pull_error: + raise RuntimeError( + f"Failed to build or pull local image {run_image}: " + f"build_error={build_error}; pull_error={pull_error}" + ) + # Ensure all nodes reach this point before entering container run, + # otherwise workers may start while node 0 is still building / loading. + self._sync_after_local_image_ready(run_image=run_image) elif build_info.get("registry_image"): # Registry image: Pull from registry From afcd7e36fa7bc22a275b4f41c29060460f7dd550 Mon Sep 17 00:00:00 2001 From: Mikhail Kuznetsov Date: Thu, 23 Apr 2026 11:35:21 +0000 Subject: [PATCH 6/8] feat(execution): add shared-tar cache for local images and node-coordinated image preparation Extract the local-image preparation logic into a dedicated helper (_ensure_local_image_available) and add an optional shared tar cache, enabled by the MAD_DOCKER_BUILDS environment variable. * Node 0 builds (or pulls) the image, then `docker save` it into MAD_DOCKER_BUILDS/.tar. * Worker nodes wait on the existing TCP rendezvous barrier and then `docker load` the tar instead of rebuilding or hitting the registry. * Without MAD_DOCKER_BUILDS, each node prepares the image independently (existing behavior). This avoids redundant `docker build` work and unnecessary registry traffic in multi-node runs, while keeping single-node behavior unchanged. Add integration tests covering the primary-saves-tar, existing-tar-is-loaded, and worker-waits-for-primary paths. Made-with: Cursor --- src/madengine/execution/container_runner.py | 184 +++++++++++++++--- tests/integration/test_container_execution.py | 100 ++++++++++ 2 files changed, 257 insertions(+), 27 deletions(-) diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index 56f0b502..962f8080 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -618,6 +618,158 @@ def _get_build_args(self) -> str: build_args += f"--build-arg {key}='{value}' " return build_args + def _get_node_rank(self) -> int: + """Return the current node rank for distributed runs.""" + node_rank_raw = os.environ.get("NODE_RANK") or os.environ.get("RANK") or "0" + try: + return int(node_rank_raw) + except Exception: + return 0 + + def _local_image_exists(self, run_image: str) -> bool: + """Check whether a Docker image already exists locally.""" + try: + self.console.sh( + f"docker image inspect {shlex.quote(run_image)} > /dev/null 2>&1" + ) + return True + except (subprocess.CalledProcessError, RuntimeError): + return False + + def _get_local_image_tar_path(self, run_image: str) -> typing.Optional[str]: + """Resolve the shared tar path for a local image, if configured. + + When ``MAD_DOCKER_BUILDS`` points at a shared directory (e.g. a network + filesystem visible to all nodes), this path is used to stage a + ``docker save`` tar of the pre-built local image so that worker nodes + can ``docker load`` it instead of rebuilding or pulling. + """ + builds_dir = (os.environ.get("MAD_DOCKER_BUILDS") or "").strip() + if not builds_dir: + return None + + safe_image_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", run_image).strip("._") + if not safe_image_name: + safe_image_name = "docker_image" + return os.path.join(builds_dir, f"{safe_image_name}.tar") + + def _load_local_image_from_tar(self, run_image: str, tar_path: str) -> None: + """Load a Docker image from a previously saved tar archive.""" + if not os.path.exists(tar_path): + raise RuntimeError(f"Image tar not found for {run_image}: {tar_path}") + + self.rich_console.print( + f"[yellow]📦 Loading local image tar:[/yellow] {tar_path}" + ) + self.console.sh(f"docker load -i {shlex.quote(tar_path)}", timeout=None) + self.console.sh( + f"docker image inspect {shlex.quote(run_image)} > /dev/null 2>&1" + ) + self.rich_console.print( + f"[green]✅ Loaded local image from tar:[/green] {run_image}" + ) + + def _save_local_image_to_tar(self, run_image: str, tar_path: str) -> None: + """Persist a local Docker image into the shared tar cache.""" + tar_dir = os.path.dirname(tar_path) + if tar_dir: + os.makedirs(tar_dir, exist_ok=True) + + self.rich_console.print( + f"[yellow]💾 Saving local image tar:[/yellow] {tar_path}" + ) + self.console.sh( + f"docker save -o {shlex.quote(tar_path)} {shlex.quote(run_image)}", + timeout=None, + ) + self.rich_console.print( + f"[green]✅ Saved local image tar:[/green] {tar_path}" + ) + + def _build_or_pull_local_image( + self, run_image: str, build_info: typing.Dict, model_info: typing.Dict + ) -> None: + """Ensure the local image exists by building it first and pulling as fallback.""" + self.rich_console.print( + f"[yellow]⚠️ Image {run_image} not found on this node.[/yellow]" + ) + try: + self._build_local_image_from_manifest( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) + except Exception as build_error: + self.rich_console.print( + "[yellow]⚠️ Local build failed, attempting pull as fallback...[/yellow]" + ) + try: + self.pull_image(run_image) + except Exception as pull_error: + raise RuntimeError( + f"Failed to build or pull local image {run_image}: " + f"build_error={build_error}; pull_error={pull_error}" + ) + + def _ensure_local_image_available( + self, run_image: str, build_info: typing.Dict, model_info: typing.Dict + ) -> None: + """Prepare a local image, optionally using a shared tar cache. + + Behavior by node and cache state: + + * If ``MAD_DOCKER_BUILDS`` is configured and the tar is missing, + only node 0 builds/pulls the image and saves the tar, while + worker nodes wait at :meth:`_sync_after_local_image_ready` + and then load the tar. + * If the tar already exists, any node loads it directly. + * Without a shared tar cache, each node builds/pulls independently. + """ + tar_path = self._get_local_image_tar_path(run_image) + node_rank = self._get_node_rank() + is_primary_node = node_rank == 0 + image_exists = self._local_image_exists(run_image) + tar_exists = bool(tar_path) and os.path.exists(tar_path) + tar_missing_at_start = bool(tar_path) and not tar_exists + + if tar_missing_at_start: + if is_primary_node: + if not image_exists: + self._build_or_pull_local_image( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) + image_exists = True + if not tar_exists: + self._save_local_image_to_tar(run_image, tar_path) + tar_exists = True + + self._sync_after_local_image_ready(run_image=run_image) + + if not image_exists: + if not tar_exists and not os.path.exists(tar_path): + raise RuntimeError( + f"Node 0 did not produce image tar for {run_image}: {tar_path}" + ) + self._load_local_image_from_tar(run_image, tar_path) + image_exists = True + + elif not image_exists: + if tar_exists: + self._load_local_image_from_tar(run_image, tar_path) + image_exists = True + else: + self._build_or_pull_local_image( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) + image_exists = True + + if tar_path and image_exists and is_primary_node and not tar_exists: + self._save_local_image_to_tar(run_image, tar_path) + def _build_local_image_from_manifest( self, run_image: str, build_info: typing.Dict, model_info: typing.Dict ) -> None: @@ -2090,33 +2242,11 @@ def run_models_from_manifest( run_image = build_info.get("docker_image") self.rich_console.print(f"[yellow]🏠 Using local image: {run_image}[/yellow]") - # Verify image exists; if missing, build from manifest dockerfile on - # the current compute node first, and fall back to pull only if the - # build attempt fails. This makes distributed runs self-sufficient - # when the pre-built image has not been staged to every node. - try: - self.console.sh(f"docker image inspect {run_image} > /dev/null 2>&1") - except (subprocess.CalledProcessError, RuntimeError): - self.rich_console.print( - f"[yellow]⚠️ Image {run_image} not found on this node.[/yellow]" - ) - try: - self._build_local_image_from_manifest( - run_image=run_image, - build_info=build_info, - model_info=model_info, - ) - except Exception as build_error: - self.rich_console.print( - "[yellow]⚠️ Local build failed, attempting pull as fallback...[/yellow]" - ) - try: - self.pull_image(run_image) - except Exception as pull_error: - raise RuntimeError( - f"Failed to build or pull local image {run_image}: " - f"build_error={build_error}; pull_error={pull_error}" - ) + self._ensure_local_image_available( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) # Ensure all nodes reach this point before entering container run, # otherwise workers may start while node 0 is still building / loading. self._sync_after_local_image_ready(run_image=run_image) diff --git a/tests/integration/test_container_execution.py b/tests/integration/test_container_execution.py index c11c2755..69cd7973 100644 --- a/tests/integration/test_container_execution.py +++ b/tests/integration/test_container_execution.py @@ -76,6 +76,106 @@ def test_load_build_manifest(self): assert "images" in result assert "model1" in result["images"] + @patch.dict(os.environ, {"MAD_DOCKER_BUILDS": "/shared/builds", "NODE_RANK": "0"}, clear=False) + @patch.object(ContainerRunner, "_sync_after_local_image_ready") + @patch.object(ContainerRunner, "_save_local_image_to_tar") + @patch.object(ContainerRunner, "_build_or_pull_local_image") + @patch.object(ContainerRunner, "_local_image_exists", return_value=True) + @patch("os.path.exists", return_value=False) + def test_ensure_local_image_available_saves_tar_on_primary_node( + self, + mock_exists, + mock_local_image_exists, + mock_build_or_pull, + mock_save_to_tar, + mock_sync, + ): + """Primary node should save a tar when image exists but cache file is missing.""" + runner = ContainerRunner() + + runner._ensure_local_image_available( + run_image="rocm/pyt_mlperf_training:full-tefix", + build_info={}, + model_info={}, + ) + + mock_build_or_pull.assert_not_called() + mock_save_to_tar.assert_called_once_with( + "rocm/pyt_mlperf_training:full-tefix", + "/shared/builds/rocm_pyt_mlperf_training_full-tefix.tar", + ) + assert mock_sync.call_count == 1 + + @patch.dict(os.environ, {"MAD_DOCKER_BUILDS": "/shared/builds", "NODE_RANK": "0"}, clear=False) + @patch.object(ContainerRunner, "_save_local_image_to_tar") + @patch.object(ContainerRunner, "_build_or_pull_local_image") + @patch.object(ContainerRunner, "_load_local_image_from_tar") + @patch.object(ContainerRunner, "_local_image_exists", return_value=False) + @patch("os.path.exists", return_value=True) + def test_ensure_local_image_available_loads_existing_tar( + self, + mock_exists, + mock_local_image_exists, + mock_load_from_tar, + mock_build_or_pull, + mock_save_to_tar, + ): + """Existing tar cache should be loaded instead of rebuilding.""" + runner = ContainerRunner() + + runner._ensure_local_image_available( + run_image="rocm/pyt_mlperf_training:full-tefix", + build_info={}, + model_info={}, + ) + + mock_load_from_tar.assert_called_once_with( + "rocm/pyt_mlperf_training:full-tefix", + "/shared/builds/rocm_pyt_mlperf_training_full-tefix.tar", + ) + mock_build_or_pull.assert_not_called() + mock_save_to_tar.assert_not_called() + + @patch.dict(os.environ, {"MAD_DOCKER_BUILDS": "/shared/builds", "NODE_RANK": "1"}, clear=False) + @patch.object(ContainerRunner, "_save_local_image_to_tar") + @patch.object(ContainerRunner, "_build_or_pull_local_image") + @patch.object(ContainerRunner, "_load_local_image_from_tar") + @patch.object(ContainerRunner, "_sync_after_local_image_ready") + @patch.object(ContainerRunner, "_local_image_exists", return_value=False) + @patch("os.path.exists", return_value=False) + def test_ensure_local_image_available_waits_for_primary_tar_on_worker( + self, + mock_exists, + mock_local_image_exists, + mock_sync, + mock_load_from_tar, + mock_build_or_pull, + mock_save_to_tar, + ): + """Worker nodes should wait for node 0 and then load the shared tar.""" + runner = ContainerRunner() + + def exists_side_effect(path): + if path == "/shared/builds/rocm_pyt_mlperf_training_full-tefix.tar": + return mock_sync.call_count > 0 + return False + + mock_exists.side_effect = exists_side_effect + + runner._ensure_local_image_available( + run_image="rocm/pyt_mlperf_training:full-tefix", + build_info={}, + model_info={}, + ) + + mock_sync.assert_called_once_with(run_image="rocm/pyt_mlperf_training:full-tefix") + mock_load_from_tar.assert_called_once_with( + "rocm/pyt_mlperf_training:full-tefix", + "/shared/builds/rocm_pyt_mlperf_training_full-tefix.tar", + ) + mock_build_or_pull.assert_not_called() + mock_save_to_tar.assert_not_called() + @patch.object(Console, "sh") def test_pull_image(self, mock_sh): """Test pulling image from registry.""" From 02d84b8ff45d5950e24f533bb1229c8dade01fc7 Mon Sep 17 00:00:00 2001 From: Mikhail Kuznetsov Date: Thu, 23 Apr 2026 11:38:03 +0000 Subject: [PATCH 7/8] feat(slurm): pick best multi-node CSV and defer empty perf check in multi-node runs Multi-node SLURM runs stage each node's copy of the workload-level multi-results CSV into the login node's job collection directory, but only some nodes observe the final throughput numbers and therefore populate the "performance" column. Previously, collect_results took the first matching candidate, which could be an empty file even when another node produced a complete one; and container_runner marked the result as failed on the first empty CSV it saw. * collect_results now ranks per-node candidates by the number of non-empty "performance" rows and picks the richest file. Falls back to the previous behavior when no candidate has a performance column. * container_runner no longer nulls out the performance path on an empty CSV in multi-node runs; it emits a warning instead and defers the final decision to the login-node aggregation step. Single-node runs keep the previous hard-error semantics. Made-with: Cursor --- src/madengine/deployment/slurm.py | 85 +++++++++++++++++++-- src/madengine/execution/container_runner.py | 23 +++++- 2 files changed, 98 insertions(+), 10 deletions(-) diff --git a/src/madengine/deployment/slurm.py b/src/madengine/deployment/slurm.py index a45f83d3..524db206 100644 --- a/src/madengine/deployment/slurm.py +++ b/src/madengine/deployment/slurm.py @@ -1270,18 +1270,24 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: model_key, {} ) if model_key else {} - # Multiple results path: resolve CSV from job_dir/node_*, then cwd/run_directory + # Multiple results path: resolve CSV from job_dir/node_*, then cwd/run_directory. + # In multi-node runs, different nodes may produce the CSV with different levels + # of completeness (e.g. only one node observes the final throughput numbers and + # populates the "performance" column). Prefer the candidate with the most + # non-empty "performance" rows so aggregation does not silently pick an empty one. mult_res = model_info_for_entry.get("multiple_results") if mult_res: resolved_csv: Optional[Path] = None + candidates: List[Path] = [] if (job_dir / mult_res).is_file(): - resolved_csv = job_dir / mult_res - else: - for i in range(self.nodes): - candidate = job_dir / f"node_{i}" / mult_res - if candidate.is_file(): - resolved_csv = candidate - break + candidates.append(job_dir / mult_res) + for i in range(self.nodes): + per_node_candidate = job_dir / f"node_{i}" / mult_res + if per_node_candidate.is_file(): + candidates.append(per_node_candidate) + + if candidates: + resolved_csv = self._select_best_multiple_results_csv(candidates) if not resolved_csv and Path(mult_res).is_file(): resolved_csv = Path(mult_res) if not resolved_csv and Path("run_directory", mult_res).is_file(): @@ -1492,6 +1498,69 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: ) return results + def _select_best_multiple_results_csv( + self, candidates: List[Path] + ) -> Optional[Path]: + """Pick the CSV with the most non-empty ``performance`` entries. + + In multi-node SLURM runs, every node copies its local copy of the + workload's multi-results CSV into ``job_dir/node_/``. Only + some nodes will observe the final throughput numbers and therefore + populate the ``performance`` column; others may have the file but + with empty values. Ranking candidates by the number of non-empty + ``performance`` rows lets downstream aggregation use the richest + available data without depending on node-0 winning every race. + + Falls back to the first candidate when none has a ``performance`` + column or when counting fails, preserving previous behavior. + """ + if not candidates: + return None + if len(candidates) == 1: + return candidates[0] + + import csv as _csv + + best_candidate: Optional[Path] = None + best_score = -1 + best_rows = -1 + for candidate in candidates: + non_empty_perf = 0 + total_rows = 0 + has_perf_column = False + try: + with open(candidate, "r", encoding="utf-8", errors="ignore") as f: + reader = _csv.DictReader(f) + fieldnames = reader.fieldnames or [] + stripped_fields = [fn.strip() for fn in fieldnames] + has_perf_column = "performance" in stripped_fields + for row in reader: + total_rows += 1 + if has_perf_column: + value = (row.get("performance") or "").strip() + if value: + non_empty_perf += 1 + except Exception: + continue + + score = non_empty_perf if has_perf_column else 0 + if ( + score > best_score + or (score == best_score and total_rows > best_rows) + ): + best_score = score + best_rows = total_rows + best_candidate = candidate + + if best_candidate is None: + return candidates[0] + if best_score > 0: + self.console.print( + f"[dim] Selected multiple_results CSV with {best_score} " + f"non-empty performance rows: {best_candidate}[/dim]" + ) + return best_candidate + def _collect_results_parse_perf_csv( self, results: Dict[str, Any], session_start_row: Optional[int] ) -> None: diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index 962f8080..05ee198d 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -1731,8 +1731,27 @@ def run_container( break if not has_valid_perf: - run_results["performance"] = None - print("Error: Performance metric is empty in all rows of multiple results file.") + nnodes_env = os.environ.get("NNODES", "1") + try: + nnodes = int(nnodes_env) + except (TypeError, ValueError): + nnodes = 1 + + if nnodes > 1: + # In multi-node runs the performance CSV on this + # node may legitimately lack values (metrics are + # only populated on the rank that parses the final + # output). Keep the path so downstream per-node + # aggregation on the login node can pick the best + # candidate across all nodes. + print( + "Warning: Performance metric is currently empty in " + "multiple results file during multi-node run; " + "deferring final decision to aggregation step." + ) + else: + run_results["performance"] = None + print("Error: Performance metric is empty in all rows of multiple results file.") except Exception as e: self.rich_console.print( f"[yellow]Warning: Could not validate multiple results file: {e}[/yellow]" From e5f0eff6fe365d35b4a270943956db64f5ad967d Mon Sep 17 00:00:00 2001 From: Mikhail Kuznetsov Date: Thu, 23 Apr 2026 12:47:25 +0000 Subject: [PATCH 8/8] fix(orchestration): do not overwrite runtime docker_env_vars with manifest placeholders The manifest-context restore path for docker_env_vars unconditionally overwrote values already populated by Context from os.environ (e.g. MAD_SECRETS_HFTOKEN exported on the submission node). When a previously generated manifest contained an unexpanded "${MAD_SECRETS_HFTOKEN}" literal (because the variable was not exported at manifest-creation time), the restore step clobbered the real token on rerun and the container received the literal placeholder, causing HF 401 on gated models (e.g. meta-llama/Llama-3.1-8B via Primus HuggingFaceTokenizer). Switch to the same "set only if absent" semantics we already use for docker_mounts and docker_build_arg, so os.environ-sourced values keep priority and stale/unexpanded placeholders from old manifests no longer override them. Made-with: Cursor --- src/madengine/orchestration/run_orchestrator.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/madengine/orchestration/run_orchestrator.py b/src/madengine/orchestration/run_orchestrator.py index 368bad8a..beb17953 100644 --- a/src/madengine/orchestration/run_orchestrator.py +++ b/src/madengine/orchestration/run_orchestrator.py @@ -588,12 +588,16 @@ def _execute_local(self, manifest_file: str, timeout: int) -> Dict: self.context.ctx["post_scripts"] = manifest_context["post_scripts"] if "encapsulate_script" in manifest_context: self.context.ctx["encapsulate_script"] = manifest_context["encapsulate_script"] - # Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs) + # Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs). + # Keep runtime-detected values as priority (consistent with docker_mounts / docker_build_arg): + # values already populated by Context (e.g. MAD_SECRETS_* read from os.environ) must not be + # overwritten by manifest entries that may still contain unexpanded "${VAR}" placeholders. if "docker_env_vars" in manifest_context and manifest_context["docker_env_vars"]: if "docker_env_vars" not in self.context.ctx: self.context.ctx["docker_env_vars"] = {} for k, v in manifest_context["docker_env_vars"].items(): - self.context.ctx["docker_env_vars"][k] = v + if k not in self.context.ctx["docker_env_vars"]: + self.context.ctx["docker_env_vars"][k] = v # Merge runtime additional_context (takes precedence over manifest) # This allows users to override tools/scripts at runtime