diff --git a/docker/update_mori.Dockerfile b/docker/update_mori.Dockerfile new file mode 100644 index 0000000..0cea72e --- /dev/null +++ b/docker/update_mori.Dockerfile @@ -0,0 +1,21 @@ +FROM localhost/mad-mori-ep:gfx950-v1 + +ARG MORI_COMMIT=241461c0aaf8be2a502397668d4b3e1aab90a188 + +WORKDIR /app + +# Remove old mori completely (including stale C++ extensions that confuse profiler detection) +RUN pip uninstall -y mori amd-mori amd_mori 2>/dev/null || true && \ + rm -rf /usr/local/lib/python3.12/dist-packages/mori* && \ + rm -rf /app/mori + +RUN git clone --recursive https://github.com/ROCm/mori.git /app/mori && \ + cd /app/mori && \ + git checkout ${MORI_COMMIT} && \ + PYTORCH_ROCM_ARCH=gfx950 pip install -e . && \ + echo "MORI updated to $(git rev-parse --short HEAD) on $(date -u +%Y-%m-%d)" + +RUN sed -i "s|^MORI_BRANCH:.*|MORI_BRANCH: $(cd /app/mori && git rev-parse --short HEAD)|" /app/versions.txt && \ + cat /app/versions.txt + +WORKDIR /app diff --git a/docker/vllm_disagg_inference.ubuntu.amd.Dockerfile b/docker/vllm_disagg_inference.ubuntu.amd.Dockerfile index 882fc0d..30effeb 100644 --- a/docker/vllm_disagg_inference.ubuntu.amd.Dockerfile +++ b/docker/vllm_disagg_inference.ubuntu.amd.Dockerfile @@ -113,11 +113,18 @@ RUN pip install vllm-router WORKDIR /app -# versions.txt is provided by the base image and contains MORI_REPO / MORI_BRANCH entries. +# Install MORI from latest main (pinned commit). +ARG MORI_REPO=https://github.com/ROCm/mori.git +ARG MORI_COMMIT=241461c0aaf8be2a502397668d4b3e1aab90a188 RUN pip install tqdm prettytable -RUN git clone --recursive $(grep '^MORI_REPO:' /app/versions.txt | cut -d' ' -f2) && \ - cd mori && \ - git checkout $(grep '^MORI_BRANCH:' /app/versions.txt | cut -d' ' -f2) +RUN pip uninstall -y mori 2>/dev/null || true && \ + rm -rf /app/mori && \ + git clone --recursive ${MORI_REPO} /app/mori && \ + cd /app/mori && \ + git checkout ${MORI_COMMIT} && \ + PYTORCH_ROCM_ARCH=${GFX_COMPILATION_ARCH} pip install . && \ + echo "MORI_REPO: ${MORI_REPO}" > /tmp/_mori_ver && \ + echo "MORI_BRANCH: $(git rev-parse --short HEAD)" >> /tmp/_mori_ver RUN git clone --no-checkout --filter=blob:none https://github.com/ROCm/rocm-systems.git && cd rocm-systems && \ git sparse-checkout set --cone projects/rocshmem && \ diff --git a/scripts/vllm_dissag/apply_moriio_2pd_patches.sh b/scripts/vllm_dissag/apply_moriio_2pd_patches.sh new file mode 100755 index 0000000..984b408 --- /dev/null +++ b/scripts/vllm_dissag/apply_moriio_2pd_patches.sh @@ -0,0 +1,331 @@ +#!/bin/bash +# apply_moriio_2pd_patches.sh — Runtime patches for multi-node disagg DP (PR #39276) +# Applied at container startup before vLLM launches. +# Once PR #39276 is merged upstream, this script becomes a no-op. +set -euo pipefail + +VLLM_ROOT=$(python3 -c "import vllm, os; print(os.path.dirname(vllm.__file__))") +MORIIO_DIR="${VLLM_ROOT}/distributed/kv_transfer/kv_connector/v1/moriio" +ENGINE_DIR="${VLLM_ROOT}/v1/engine" + +echo "[patch] vLLM root: ${VLLM_ROOT}" +echo "[patch] Applying PR #39276 runtime patches for multi-node disagg DP..." + +# -------------------------------------------------------------------------- +# Patch 1: core.py — engine_id: local_dp_rank → dp_rank +# -------------------------------------------------------------------------- +CORE_PY="${ENGINE_DIR}/core.py" +if grep -q 'engine_id}_dp{local_dp_rank}' "$CORE_PY" 2>/dev/null; then + sed -i 's/engine_id}_dp{local_dp_rank}/engine_id}_dp{dp_rank}/g' "$CORE_PY" + echo "[patch] core.py: engine_id fixed (local_dp_rank → dp_rank)" +else + echo "[patch] core.py: already patched or not applicable" +fi + +# -------------------------------------------------------------------------- +# Patch 2: utils.py — engine_id: local_index → index +# -------------------------------------------------------------------------- +UTILS_PY="${ENGINE_DIR}/utils.py" +if grep -q 'engine_id}_dp{local_index}' "$UTILS_PY" 2>/dev/null; then + sed -i 's/engine_id}_dp{local_index}/engine_id}_dp{index}/g' "$UTILS_PY" + echo "[patch] utils.py: engine_id fixed (local_index → index)" +else + echo "[patch] utils.py: already patched or not applicable" +fi + +# -------------------------------------------------------------------------- +# Patch 3: moriio_common.py — Use data_parallel_size_local and local dp_rank +# -------------------------------------------------------------------------- +COMMON_PY="${MORIIO_DIR}/moriio_common.py" + +# 3a: dp_rank = ... % data_parallel_size_local +if grep -q 'dp_rank = vllm_config.parallel_config.data_parallel_rank$' "$COMMON_PY" 2>/dev/null; then + sed -i 's/dp_rank = vllm_config.parallel_config.data_parallel_rank$/dp_rank = (vllm_config.parallel_config.data_parallel_rank % vllm_config.parallel_config.data_parallel_size_local)/' "$COMMON_PY" + echo "[patch] moriio_common.py: dp_rank uses local modulo" +else + echo "[patch] moriio_common.py: dp_rank already patched" +fi + +# 3b: dp_size = data_parallel_size_local +if grep -q 'dp_size = vllm_config.parallel_config.data_parallel_size$' "$COMMON_PY" 2>/dev/null; then + sed -i 's/dp_size = vllm_config.parallel_config.data_parallel_size$/dp_size = vllm_config.parallel_config.data_parallel_size_local/' "$COMMON_PY" + echo "[patch] moriio_common.py: dp_size uses local size" +else + echo "[patch] moriio_common.py: dp_size already patched" +fi + +# 3c: Default ports for remote_handshake_port and remote_notify_port +if grep -q 'remote_handshake_port=kv_transfer_params\["remote_handshake_port"\]' "$COMMON_PY" 2>/dev/null; then + sed -i 's/remote_handshake_port=kv_transfer_params\["remote_handshake_port"\]/remote_handshake_port=kv_transfer_params.get("remote_handshake_port", 8405)/' "$COMMON_PY" + sed -i 's/remote_notify_port=kv_transfer_params\["remote_notify_port"\]/remote_notify_port=kv_transfer_params.get("remote_notify_port", 61005)/' "$COMMON_PY" + echo "[patch] moriio_common.py: default ports added" +else + echo "[patch] moriio_common.py: default ports already patched" +fi + +# -------------------------------------------------------------------------- +# Patch 4: moriio_connector.py — Full multi-node DP fixes +# -------------------------------------------------------------------------- +CONNECTOR_PY="${MORIIO_DIR}/moriio_connector.py" + +# 4a: Add _is_kv_master flag and _req_kv_params cache and local dp_rank +python3 << 'PYEOF' +import re, sys + +fpath = sys.argv[1] if len(sys.argv) > 1 else "" +if not fpath: + sys.exit(1) + +with open(fpath, "r") as f: + src = f.read() + +changed = False + +# Fix: dp_rank use local modulo in scheduler connector +old = "self.dp_rank = self.vllm_config.parallel_config.data_parallel_rank\n" +new = ("self.dp_rank = (self.vllm_config.parallel_config.data_parallel_rank\n" + " % self.vllm_config.parallel_config\n" + " .data_parallel_size_local)\n" + " self._is_kv_master = (\n" + " self.vllm_config.parallel_config.data_parallel_rank\n" + " < self.vllm_config.parallel_config.data_parallel_size_local)\n") +if old in src and "_is_kv_master" not in src: + src = src.replace(old, new, 1) + changed = True + print("[patch] moriio_connector.py: _is_kv_master + local dp_rank added") + +# Fix: Add _req_kv_params dict +old2 = "self._reqs_need_save: dict[ReqId, tuple[Request, list[int]]] = {}" +new2 = (old2 + "\n self._req_kv_params: dict[ReqId, dict] = {}") +if old2 in src and "_req_kv_params" not in src: + src = src.replace(old2, new2, 1) + changed = True + print("[patch] moriio_connector.py: _req_kv_params dict added") + +# Fix: Cache kv_transfer_params on do_remote_decode +old3 = 'self._reqs_need_save[request.request_id] = (request, local_block_ids)' +new3 = (old3 + '\n self._req_kv_params[request.request_id] = dict(params)') +if old3 in src and '_req_kv_params[request.request_id] = dict(params)' not in src: + src = src.replace(old3, new3, 1) + changed = True + print("[patch] moriio_connector.py: cached kv_params for do_remote_decode") + +# Fix: Guard send_notify_block with _is_kv_master +old4 = " for tp_index in range(self.tp_size):\n target_port = request.kv_transfer_params[\n" +new4 = " if self._is_kv_master:\n for tp_index in range(self.tp_size):\n target_port = request.kv_transfer_params[\n" +if old4 in src and "if self._is_kv_master:" not in src: + src = src.replace(old4, new4, 1) + changed = True + print("[patch] moriio_connector.py: send_notify_block guarded with _is_kv_master") + +# Fix: block_size assertion → graceful override +old5 = " assert block_size == self.block_size" +new5 = (" if block_size != self.block_size:\n" + " logger.info(\n" + ' "KV cache block_size=%d differs from config block_size=%d; "\n' + ' "using actual tensor shape (attention backend override).",\n' + " block_size, self.block_size)\n" + " self.block_size = block_size") +if old5 in src: + src = src.replace(old5, new5, 1) + changed = True + print("[patch] moriio_connector.py: block_size assert → graceful override") + +# Fix: Use cached _req_kv_params in build_connector_meta for _reqs_need_recv +old6 = " for req_id, (req, block_ids) in self._reqs_need_recv.items():\n assert req.kv_transfer_params is not None\n" +new6 = (" for req_id, (req, block_ids) in self._reqs_need_recv.items():\n" + " kv_params = self._req_kv_params.get(\n" + " req_id, req.kv_transfer_params or {}\n" + " )\n") +if old6 in src: + src = src.replace(old6, new6, 1) + changed = True + print("[patch] moriio_connector.py: _reqs_need_recv uses cached params") + +# Fix: Use cached _req_kv_params for _reqs_need_save +old7 = " for req_id, (req, block_ids) in self._reqs_need_save.items():\n assert req.kv_transfer_params is not None\n" +new7 = (" for req_id, (req, block_ids) in self._reqs_need_save.items():\n" + " kv_params = self._req_kv_params.get(\n" + " req_id, req.kv_transfer_params or {}\n" + " )\n") +if old7 in src: + src = src.replace(old7, new7, 1) + changed = True + print("[patch] moriio_connector.py: _reqs_need_save uses cached params") + +# Fix: Add _recving_transfers_start dict +old8 = "self._recving_transfers_callback_addr: dict[ReqId, tuple[str, str]] = {}" +new8 = (old8 + "\n self._recving_transfers_start: dict[str, float] = {}") +if old8 in src and "_recving_transfers_start" not in src: + src = src.replace(old8, new8, 1) + changed = True + print("[patch] moriio_connector.py: _recving_transfers_start dict added") + +if changed: + with open(fpath, "w") as f: + f.write(src) + print("[patch] moriio_connector.py: all patches applied") +else: + print("[patch] moriio_connector.py: already patched or no changes needed") +PYEOF +python3 /dev/stdin "$CONNECTOR_PY" < /dev/null 2>&1 || echo "[patch] WARNING: connector patch script had errors" + +# -------------------------------------------------------------------------- +# Patch 5: moriio_engine.py — Timeouts, failure detection, ZMQ retry +# -------------------------------------------------------------------------- +ENGINE_PY="${MORIIO_DIR}/moriio_engine.py" + +# 5a: Add imports (os, time) +if ! grep -q '^import os$' "$ENGINE_PY" 2>/dev/null; then + sed -i '1,/^import threading/s/^import threading/import os\nimport threading\nimport time/' "$ENGINE_PY" + echo "[patch] moriio_engine.py: added os/time imports" +fi + +# 5b: Deferred task timeout +if ! grep -q 'VLLM_MORIIO_DEFERRED_TIMEOUT_S' "$ENGINE_PY" 2>/dev/null; then + python3 << 'PYEOF2' +import sys + +fpath = sys.argv[1] if len(sys.argv) > 1 else "" +if not fpath: + sys.exit(1) + +with open(fpath, "r") as f: + src = f.read() + +changed = False + +# Deferred task timeout +old = """ still_deferred: list[WriteTask] = [] + for task in self._deferred_tasks: + if self._is_remote_ready(task): + self._execute_write_task(task) + else: + still_deferred.append(task)""" + +new = """ _defer_timeout = int( + os.environ.get("VLLM_MORIIO_DEFERRED_TIMEOUT_S", "300")) + still_deferred: list[WriteTask] = [] + _now = time.monotonic() + for task in self._deferred_tasks: + if self._is_remote_ready(task): + self._execute_write_task(task) + elif (hasattr(task, '_defer_time') + and (_now - task._defer_time) > _defer_timeout): + logger.error( + "Deferred write task EXPIRED for req %s " + "(transfer %s) after %ds", + task.request_id, task.transfer_id, _defer_timeout) + else: + if not hasattr(task, '_defer_time'): + task._defer_time = _now + still_deferred.append(task)""" + +if old in src: + src = src.replace(old, new, 1) + changed = True + print("[patch] moriio_engine.py: deferred task timeout added") + +# Transfer timeout (replace blocking Wait) +old2 = """ for status in transfers_to_wait: + try: + status.Wait() + if not status.Succeeded(): + logger.error( + "Transfer failed: %s, Code: %s", status.Message(), status.Code() + ) + raise TransferError("MoRIIO transfer failed!") + except Exception as e: + logger.error("Transfer %s failed: %s", status, e) + raise""" + +new2 = """ _xfer_timeout = int( + os.environ.get("VLLM_MORIIO_TRANSFER_TIMEOUT_S", "120")) + for status in transfers_to_wait: + _deadline = time.monotonic() + _xfer_timeout + while status.InProgress(): + if time.monotonic() > _deadline: + logger.error( + "RDMA write timed out after %ds " + "(Code=%s, Msg=%s)", + _xfer_timeout, status.Code(), + status.Message()) + break + time.sleep(0.001) + if status.Failed(): + logger.error( + "Transfer failed: %s, Code: %s", + status.Message(), status.Code()) + elif not status.Succeeded(): + logger.error( + "Transfer did not succeed " + "(timeout or unknown state, Code=%s)", + status.Code())""" + +if old2 in src: + src = src.replace(old2, new2, 1) + changed = True + print("[patch] moriio_engine.py: transfer timeout added") + +# ZMQ retry +old3 = """ sock = self.paths[path] + try: + for req_id in req_list: + if not isinstance(req_id, str): + logger.warning( + "Invalid req_id type: %s, expected str", type(req_id) + ) + continue + sock.send(req_id.encode("utf-8")) + except Exception as e: + logger.error("Failed to send notification to %s: %s", path, e) + self.paths.pop(path, None) + raise""" + +new3 = """ sock = self.paths[path] + _MAX_RETRIES = 3 + for req_id in req_list: + if not isinstance(req_id, str): + logger.warning( + "Invalid req_id type: %s, expected str", type(req_id)) + continue + for _attempt in range(_MAX_RETRIES): + try: + sock.send(req_id.encode("utf-8"), zmq.NOBLOCK) + break + except zmq.Again: + if _attempt < _MAX_RETRIES - 1: + time.sleep(0.01 * (_attempt + 1)) + logger.warning( + "ZMQ send retry %d for req %s to %s", + _attempt + 1, req_id, path) + else: + logger.error( + "ZMQ send FAILED after %d retries " + "for req %s to %s", + _MAX_RETRIES, req_id, path) + except Exception as e: + logger.error( + "Failed to send notification to %s: %s", + path, e) + self.paths.pop(path, None) + raise""" + +if old3 in src: + src = src.replace(old3, new3, 1) + changed = True + print("[patch] moriio_engine.py: ZMQ retry added") + +if changed: + with open(fpath, "w") as f: + f.write(src) + print("[patch] moriio_engine.py: all patches applied") +else: + print("[patch] moriio_engine.py: already patched or no changes needed") +PYEOF2 + python3 /dev/stdin "$ENGINE_PY" < /dev/null 2>&1 || echo "[patch] WARNING: engine patch script had errors" +else + echo "[patch] moriio_engine.py: already patched" +fi + +echo "[patch] All PR #39276 patches applied successfully." diff --git a/scripts/vllm_dissag/launch_mori_1p1d.sh b/scripts/vllm_dissag/launch_mori_1p1d.sh new file mode 100644 index 0000000..8b40fbe --- /dev/null +++ b/scripts/vllm_dissag/launch_mori_1p1d.sh @@ -0,0 +1,38 @@ +#!/bin/bash +set -euo pipefail + +JOBID=1300 +NODE0=mi355-gpu-39 +NODE1=mi355-gpu-40 + +export DOCKER_IMAGE_NAME="localhost/mad-mori-ep:gfx950-v2" +export MODEL_NAME="DeepSeek-V3" +export MODEL_DIR="/shared/amdgpu/home/ravgupta_qle/models" +export xP=1 +export yD=1 +export RUN_MORI=1 +export BENCHMARK_COMBINATIONS="1024/1024 8192/1024 1024/8192" +export BENCHMARK_CON="8 16 32 64 128 256 512" +export BENCHMARK_ITR=1 +export LOG_PATH="/shared/amdgpu/home/ravgupta_qle/logs_mori" +export GLOO_SOCKET_IFNAME=enp193s0f1np1 +export NCCL_SOCKET_IFNAME=enp193s0f1np1 + +# MORI IO RDMA: use global IPv6 GID for routed RoCE over Ionic AINICs +export MORI_IB_GID_INDEX=1 +export MORI_IO_LOG_LEVEL=INFO + +export SLURM_JOB_ID=$JOBID +export SLURM_JOB_NODELIST="${NODE0},${NODE1}" +export SLURM_NNODES=2 +export SLURM_NTASKS=2 +export SLURM_NTASKS_PER_NODE=1 +export SLURM_SUBMIT_DIR="$(cd "$(dirname "$0")" && pwd)" + +echo "=== Launching MORI EP 1P/1D ===" +echo "Nodes: $NODE0, $NODE1" +echo "Model: $MODEL_NAME" +echo "Image: $DOCKER_IMAGE_NAME" +echo "" + +bash "$(dirname "$0")/run_xPyD_models.slurm" 2>&1 | tee log_mori_${MODEL_NAME}_1p1d.log diff --git a/scripts/vllm_dissag/launch_mori_2p2d.sh b/scripts/vllm_dissag/launch_mori_2p2d.sh new file mode 100755 index 0000000..0058a3b --- /dev/null +++ b/scripts/vllm_dissag/launch_mori_2p2d.sh @@ -0,0 +1,69 @@ +#!/bin/bash +# launch_mori_2p2d.sh — Example launcher for 2-Prefill / 2-Decode MoRI EP +# disaggregated inference over Ionic AINIC RDMA on AMD MI355X clusters. +# +# Prerequisites: +# 1. 4-node SLURM allocation with full CPUs: +# sbatch --wrap="sleep infinity" -N4 --cpus-per-task=256 \ +# --nodelist=,,, ... +# 2. Docker image loaded on all nodes: +# localhost/mad-mori-ep:gfx950-v2 +# 3. Model downloaded to shared storage. +# +# Usage: Edit JOBID and NODE0-3 below, then: +# bash launch_mori_2p2d.sh +set -euo pipefail + +# ── Cluster-specific settings (edit these) ─────────────────────────────────── +JOBID=${JOBID:-1346} +NODE0=${NODE0:-mi355-gpu-39} # Prefill master + proxy +NODE1=${NODE1:-mi355-gpu-40} # Prefill child +NODE2=${NODE2:-mi355-gpu-51} # Decode master +NODE3=${NODE3:-mi355-gpu-55} # Decode child + +export DOCKER_IMAGE_NAME="${DOCKER_IMAGE_NAME:-localhost/mad-mori-ep:gfx950-v2}" +export MODEL_NAME="${MODEL_NAME:-DeepSeek-V3-5layer}" +export MODEL_DIR="${MODEL_DIR:-/shared/amdgpu/home/ravgupta_qle/models}" + +# ── Disaggregated inference topology ───────────────────────────────────────── +export xP=2 +export yD=2 +export RUN_MORI=1 +export BENCHMARK_COMBINATIONS="${BENCHMARK_COMBINATIONS:-512/512}" +export BENCHMARK_CON="${BENCHMARK_CON:-8 16}" +export BENCHMARK_ITR="${BENCHMARK_ITR:-1}" +export LOG_PATH="${LOG_PATH:-/shared/amdgpu/home/ravgupta_qle/logs_mori}" + +# ── Network interface for TCP control plane ────────────────────────────────── +export GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-enp193s0f1np1} +export NCCL_SOCKET_IFNAME=${NCCL_SOCKET_IFNAME:-enp193s0f1np1} +export MORI_SOCKET_IFNAME=${MORI_SOCKET_IFNAME:-enp193s0f1np1} + +# ── NCCL/RCCL multi-node IB transport over Ionic AINICs ───────────────────── +# Exclude Broadcom frontend NICs whose GID[1] is fe80:: (link-local, not routable) +export NCCL_IB_HCA="${NCCL_IB_HCA:-^rocep193s0f0,rocep193s0f1}" +export NCCL_IB_GID_INDEX=${NCCL_IB_GID_INDEX:-1} +export NCCL_NET_GDR_LEVEL=${NCCL_NET_GDR_LEVEL:-3} +export NCCL_CROSS_NIC=${NCCL_CROSS_NIC:-1} +export NCCL_DEBUG=${NCCL_DEBUG:-WARN} + +# ── MORI IO RDMA configuration ────────────────────────────────────────────── +export MORI_IB_GID_INDEX=${MORI_IB_GID_INDEX:-1} +export MORI_IO_LOG_LEVEL=${MORI_IO_LOG_LEVEL:-INFO} + +# ── SLURM environment (synthesized for the launcher) ───────────────────────── +export SLURM_JOB_ID=$JOBID +export SLURM_JOB_NODELIST="${NODE0},${NODE1},${NODE2},${NODE3}" +export SLURM_NNODES=4 +export SLURM_NTASKS=4 +export SLURM_NTASKS_PER_NODE=1 +export SLURM_SUBMIT_DIR="$(cd "$(dirname "$0")" && pwd)" + +echo "=== Launching MORI EP 2P/2D ===" +echo "Nodes: ${NODE0} (prefill master+proxy), ${NODE1} (prefill child)," +echo " ${NODE2} (decode master), ${NODE3} (decode child)" +echo "Model: $MODEL_NAME" +echo "Image: $DOCKER_IMAGE_NAME" +echo "" + +bash "$(dirname "$0")/run_xPyD_models.slurm" 2>&1 | tee log_mori_${MODEL_NAME}_2p2d.log diff --git a/scripts/vllm_dissag/run_xPyD_models.slurm b/scripts/vllm_dissag/run_xPyD_models.slurm index 87c9713..97c6816 100755 --- a/scripts/vllm_dissag/run_xPyD_models.slurm +++ b/scripts/vllm_dissag/run_xPyD_models.slurm @@ -122,10 +122,6 @@ if [[ "$_run_mori" == "1" && "$_run_deepep" == "1" ]]; then fi if [[ "$_run_mori" == "1" ]]; then - if (( xP != 1 || yD != 1 )); then - echo "Error: RUN_MORI=1 requires xP=1 and yD=1 (got xP=$xP, yD=$yD). Multi-node MoRI EP is not yet supported." >&2 - exit 1 - fi if model_allows_mori_ep "$MODEL_NAME"; then RUN_FILE="vllm_disagg_mori_ep.sh" echo "RUN_MORI=1: using $RUN_FILE for model '$MODEL_NAME'" @@ -191,7 +187,7 @@ check_model_path() { echo "Checking $check_name: $path" # Run check on all nodes in parallel - srun --nodes=$SLURM_NNODES --ntasks=$SLURM_NNODES /bin/bash -c " + srun --overlap --nodes=$SLURM_NNODES --ntasks=$SLURM_NNODES /bin/bash -c " if [ -d '$path' ]; then echo \"\$(hostname): ✓ Found $path\" exit 0 @@ -212,24 +208,23 @@ check_model_path() { fi } -# Check /mnt/m2m_nobackup/models_blog first -MODEL_PATH_1="/mnt/m2m_nobackup/models_blog/$MODEL_NAME" -if check_model_path "$MODEL_PATH_1" "/mnt/m2m_nobackup/models_blog"; then - MODEL_PATH="$MODEL_PATH_1" +# Check MODEL_DIR first (user-provided), then standard paths +if check_model_path "$MODEL_DIR/$MODEL_NAME" "$MODEL_DIR"; then + MODEL_PATH="$MODEL_DIR/$MODEL_NAME" echo "" echo "✓ Selected MODEL_PATH: $MODEL_PATH (available on all nodes)" -# Check /shared-inference/models_blog -elif check_model_path "/shared_inference/models_blog/$MODEL_NAME" "/shared_inference/models_blog"; then - MODEL_PATH="/shared_inference/models_blog/$MODEL_NAME" +elif check_model_path "/mnt/m2m_nobackup/models_blog/$MODEL_NAME" "/mnt/m2m_nobackup/models_blog"; then + MODEL_PATH="/mnt/m2m_nobackup/models_blog/$MODEL_NAME" echo "" echo "✓ Selected MODEL_PATH: $MODEL_PATH (available on all nodes)" -elif check_model_path "$MODEL_DIR/$MODEL_NAME" "$MODEL_DIR"; then - MODEL_PATH="$MODEL_DIR/$MODEL_NAME" +elif check_model_path "/shared_inference/models_blog/$MODEL_NAME" "/shared_inference/models_blog"; then + MODEL_PATH="/shared_inference/models_blog/$MODEL_NAME" echo "" echo "✓ Selected MODEL_PATH: $MODEL_PATH (available on all nodes)" else echo "" echo "✗ FATAL ERROR: Model '$MODEL_NAME' not found on ALL allocated nodes in either:" + echo " - $MODEL_DIR/$MODEL_NAME" echo " - /mnt/m2m_nobackup/models_blog/$MODEL_NAME" echo " - /shared_inference/models_blog/$MODEL_NAME" echo "" @@ -322,16 +317,23 @@ echo "" # Node information USER_NAME=$(whoami) MASTER_NODE=$(echo "$SELECTED_NODES" | head -n 1) -MASTER_ADDR=$(srun --nodes=1 --ntasks=1 --time=00:20:00 --nodelist="$MASTER_NODE" bash -c 'hostname -I') -MASTER_ADDR=$(echo "$MASTER_ADDR" | awk 'NR==1 {print $1}') -MASTER_PORT=39566 # Choose an open port +# Prefer 10.x.x.x overlay IPs for inter-node communication. +_pick_overlay_ip() { + local raw + raw=$(srun --overlap --nodes=1 --ntasks=1 --time=00:20:00 --nodelist="$1" bash -c 'hostname -I') + local ip + for ip in $raw; do + case "$ip" in 10.*) echo "$ip"; return ;; esac + done + echo "$raw" | awk 'NR==1 {print $1}' +} -IPS=() +MASTER_ADDR=$(_pick_overlay_ip "$MASTER_NODE") +MASTER_PORT=39566 +IPS=() for NODE in $SELECTED_NODES; do - IP=$(srun --nodes=1 --ntasks=1 --time=00:20:00 --nodelist="$NODE" bash -c 'hostname -I') - IP=$(echo "$IP" | awk 'NR==1 {print $1}') - IPS+=("$IP") + IPS+=("$(_pick_overlay_ip "$NODE")") done echo "Selected node IPs: ${IPS[*]}" | sed 's/ /,/g' @@ -378,32 +380,37 @@ export RUN_FILE_FULL="$NIXL_COOKBOOK_PATH/${RUN_FILE}" # Use only the selected nodes for srun execution SELECTED_NODELIST_SRUN=$(echo "$SELECTED_NODES" | paste -sd,) -srun --nodelist="$SELECTED_NODELIST_SRUN" bash -c ' +srun --overlap --nodelist="$SELECTED_NODELIST_SRUN" bash -c ' echo "Rank $SLURM_PROCID on $(hostname)"; -docker ps -q | xargs --no-run-if-empty docker stop; +podman ps -q | xargs --no-run-if-empty podman stop 2>/dev/null || true; fuser -k 5000/tcp 2>/dev/null || true; fuser -k 2222/tcp 2>/dev/null || true; fuser -k 15000/tcp 2>/dev/null || true; +fuser -k 20005/tcp 2>/dev/null || true; +fuser -k 10001/tcp 2>/dev/null || true; sleep 2; -docker pull $DOCKER_IMAGE_NAME 2>/dev/null || true; + +# Set VLLM_HOST_IP to this node'\''s overlay IP so MoRIIO uses routable addresses +_VLLM_HOST_IP=$(echo "$IPADDRS" | tr "," "\n" | sed -n "$((SLURM_PROCID + 1))p") +echo "[vllm-host-ip] SLURM_PROCID=$SLURM_PROCID VLLM_HOST_IP=$_VLLM_HOST_IP" # --- Build host RDMA library mounts --- -# Mount the host MLNX OFED userspace libraries into the container so that -# libmlx5 / libibverbs always match the host kernel module, preventing -# mlx5dv_devx_alloc_uar failures from ABI mismatches. _RDMA_MOUNTS="" _LIBDIR=/usr/lib/x86_64-linux-gnu -for _lib in libmlx5.so libmlx5.so.1 libibverbs.so libibverbs.so.1 librdmacm.so librdmacm.so.1; do +for _lib in libmlx5.so libmlx5.so.1 libibverbs.so libibverbs.so.1 librdmacm.so librdmacm.so.1 libionic.so libionic.so.1; do [ -e "$_LIBDIR/$_lib" ] && _RDMA_MOUNTS="$_RDMA_MOUNTS -v $_LIBDIR/$_lib:$_LIBDIR/$_lib:ro" done -for _vlib in $_LIBDIR/libmlx5.so.1.* $_LIBDIR/libibverbs.so.1.* $_LIBDIR/librdmacm.so.1.*; do +for _vlib in $_LIBDIR/libmlx5.so.1.* $_LIBDIR/libibverbs.so.1.* $_LIBDIR/librdmacm.so.1.* $_LIBDIR/libionic.so.1.*; do [ -e "$_vlib" ] && _RDMA_MOUNTS="$_RDMA_MOUNTS -v $_vlib:$_vlib:ro" done [ -d "$_LIBDIR/libibverbs" ] && _RDMA_MOUNTS="$_RDMA_MOUNTS -v $_LIBDIR/libibverbs:$_LIBDIR/libibverbs:ro" [ -d /etc/libibverbs.d ] && _RDMA_MOUNTS="$_RDMA_MOUNTS -v /etc/libibverbs.d:/etc/libibverbs.d:ro" + +# Ionic RDMA: if libibverbs dir is already mounted from host, the ionic +# provider inside it is included automatically; no extra mounts needed. echo "[host-rdma] mounts: $_RDMA_MOUNTS" -docker run --rm \ +podman run --rm \ --device /dev/dri \ --device /dev/kfd \ --device /dev/infiniband \ @@ -414,11 +421,13 @@ docker run --rm \ --security-opt seccomp=unconfined \ --privileged \ -v $HOME:$HOME \ - -v /shared_inference:/shared_inference \ - -v /mnt/m2m_nobackup:/mnt/m2m_nobackup \ + $([ -d /shared_inference ] && echo "-v /shared_inference:/shared_inference") \ + $([ -d /shared/amdgpu ] && echo "-v /shared/amdgpu:/shared/amdgpu") \ + $([ -d /mnt/m2m_nobackup ] && echo "-v /mnt/m2m_nobackup:/mnt/m2m_nobackup") \ + -v /sys:/sys:ro \ -v $HOME/.ssh:/root/.ssh \ - --shm-size 64G \ --ulimit nofile=524288:524288 \ + --ulimit memlock=-1:-1 \ -v ${LOG_PATH}:/run_logs \ -v $NIXL_REPO_DIR:$NIXL_COOKBOOK_PATH \ $_RDMA_MOUNTS \ @@ -447,11 +456,26 @@ docker run --rm \ -e ENABLE_DBO=${ENABLE_DBO:-} \ -e DBO_COMM_SMS=${DBO_COMM_SMS:-} \ -e ENABLE_PROFILING=${ENABLE_PROFILING:-} \ + -e GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-eth0} \ + -e NCCL_SOCKET_IFNAME=${NCCL_SOCKET_IFNAME:-eth0} \ + -e NCCL_IB_HCA="${NCCL_IB_HCA:-}" \ + -e NCCL_IB_GID_INDEX=${NCCL_IB_GID_INDEX:-} \ + -e NCCL_NET_GDR_LEVEL=${NCCL_NET_GDR_LEVEL:-} \ + -e NCCL_CROSS_NIC=${NCCL_CROSS_NIC:-} \ + -e NCCL_DEBUG=${NCCL_DEBUG:-WARN} \ + -e MORI_SOCKET_IFNAME=${MORI_SOCKET_IFNAME:-} \ + -e MORI_GPU_ARCHS=${MORI_GPU_ARCHS:-gfx950} \ + -e MORI_RDMA_DEVICES="${MORI_RDMA_DEVICES:-}" \ + -e MORI_IB_GID_INDEX="${MORI_IB_GID_INDEX:-}" \ + -e MORI_IO_LOG_LEVEL="${MORI_IO_LOG_LEVEL:-INFO}" \ + -e VLLM_HOST_IP=$_VLLM_HOST_IP \ + -e PYTHONUNBUFFERED=1 \ --name $DOCKER_CONT_NAME \ $DOCKER_IMAGE_NAME -c " mkdir -p /run_logs/${SLURM_JOB_ID} + rm -rf /root/.triton/cache/ /tmp/triton* /tmp/torchinductor_root/ $RUN_FILE_FULL 2>&1 | tee /run_logs/${SLURM_JOB_ID}/pd_vllm_bench_NODE${SLURM_PROCID}.log " ' -srun --nodelist="$SELECTED_NODELIST_SRUN" bash -c 'docker stop $DOCKER_CONT_NAME 2>/dev/null || true; docker rm $DOCKER_CONT_NAME 2>/dev/null || true' +srun --overlap --nodelist="$SELECTED_NODELIST_SRUN" bash -c 'podman stop $DOCKER_CONT_NAME 2>/dev/null || true; podman rm $DOCKER_CONT_NAME 2>/dev/null || true' diff --git a/scripts/vllm_dissag/vllm_disagg_mori_ep.sh b/scripts/vllm_dissag/vllm_disagg_mori_ep.sh index db5dd57..32af0e0 100755 --- a/scripts/vllm_dissag/vllm_disagg_mori_ep.sh +++ b/scripts/vllm_dissag/vllm_disagg_mori_ep.sh @@ -14,10 +14,8 @@ MODEL_PATH=$MODEL_PATH MODEL_NAME="${MODEL_NAME:-}" xP="${xP:-1}" yD="${yD:-1}" -if [ "$xP" -gt 1 ] || [ "$yD" -gt 1 ]; then - echo "Error: xP > 1 or yD > 1 is not supported yet due to MoRI IO connector issues." >&2 - exit 1 -fi +# Multi-node DP (xP>1 or yD>1) requires PR #39276 patches. +# Patches are applied at runtime by apply_moriio_2pd_patches.sh. IPADDRS="${IPADDRS:-localhost}" IFS=',' read -ra IP_ARRAY <<< "${IPADDRS}" @@ -59,9 +57,26 @@ echo "PREFILL_DP_START_RANK=${PREFILL_DP_START_RANK}" echo "PREFILL_MASTER_ADDR=${PREFILL_MASTER_ADDR}" echo "DECODE_DP_START_RANK=${DECODE_DP_START_RANK}" echo "DECODE_MASTER_ADDR=${DECODE_MASTER_ADDR}" -host_ip=$(hostname -I | awk '{print $1}') +# Prefer 10.x.x.x overlay IP for inter-node communication +host_ip="" +for _ip in $(hostname -I); do + case "$_ip" in 10.*) host_ip="$_ip"; break ;; esac +done +[ -z "$host_ip" ] && host_ip=$(hostname -I | awk '{print $1}') host_name=$(hostname) +# ============================================================================= +# Apply PR #39276 patches for multi-node DP (idempotent) +# ============================================================================= +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +if [ -f "${SCRIPT_DIR}/apply_moriio_2pd_patches.sh" ]; then + bash "${SCRIPT_DIR}/apply_moriio_2pd_patches.sh" +elif [ -f "/app/scripts/apply_moriio_2pd_patches.sh" ]; then + bash "/app/scripts/apply_moriio_2pd_patches.sh" +else + echo "[WARNING] apply_moriio_2pd_patches.sh not found — skipping runtime patches" +fi + # ============================================================================= # Helper Functions # ============================================================================= @@ -74,10 +89,27 @@ setup_mori_env() { export VLLM_ROCM_USE_AITER_MLA=1 export VLLM_ROCM_USE_AITER_FUSION_SHARED_EXPERTS=0 export VLLM_ALL2ALL_BACKEND=mori - export GLOO_SOCKET_IFNAME=eth0 + export GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-eth0} + if [ -n "${MORI_SOCKET_IFNAME:-}" ]; then + export MORI_SOCKET_IFNAME="${MORI_SOCKET_IFNAME}" + fi export VLLM_ENGINE_READY_TIMEOUT_S=3600 export VLLM_RINGBUFFER_WARNING_INTERVAL=3600 export VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS=3600 + export MORI_GPU_ARCHS=${MORI_GPU_ARCHS:-gfx950} + export MORI_SHMEM_HEAP_SIZE=${MORI_SHMEM_HEAP_SIZE:-6442450944} + export ROCSHMEM_TEST_UUID=1 + + # MORI IO RDMA configuration for Ionic AINIC + if [ -n "${MORI_RDMA_DEVICES:-}" ]; then + export MORI_RDMA_DEVICES="${MORI_RDMA_DEVICES}" + fi + if [ -n "${MORI_IB_GID_INDEX:-}" ]; then + export MORI_IB_GID_INDEX="${MORI_IB_GID_INDEX}" + fi + if [ -n "${MORI_IO_LOG_LEVEL:-}" ]; then + export MORI_IO_LOG_LEVEL="${MORI_IO_LOG_LEVEL}" + fi } build_kv_transfer_config() { @@ -109,8 +141,12 @@ launch_vllm_worker() { extra_args+=(--data-parallel-start-rank "${dp_start_rank}" --headless) fi - local kv_config - kv_config=$(build_kv_transfer_config "${kv_role}") + local kv_args=() + if [[ "$role" == "master" ]]; then + local kv_config + kv_config=$(build_kv_transfer_config "${kv_role}") + kv_args+=(--kv-transfer-config "${kv_config}") + fi vllm serve ${MODEL_PATH} \ -tp 1 \ @@ -128,7 +164,7 @@ launch_vllm_worker() { --trust-remote-code \ --enforce-eager \ "${extra_args[@]}" \ - --kv-transfer-config "${kv_config}" \ + "${kv_args[@]}" \ 2>&1 | tee /run_logs/${SLURM_JOB_ID}/${log_prefix}_NODE${NODE_RANK}.log >/dev/null & WORKER_PID=$!