From 73e94c0fb94e187b44a4d2beb1de0c6c9d6526bb Mon Sep 17 00:00:00 2001 From: Ravi Gupta Date: Thu, 16 Apr 2026 04:11:30 +0000 Subject: [PATCH 1/2] [DistInf] Enable RDMA over Ionic AINICs for MoRI EP disaggregated inference Enable MoRI IO KV cache transfer over Ionic RDMA NICs on clusters where public IPs are not routable between compute nodes. Key changes: - Mount host RDMA libraries (libionic, libibverbs, librdmacm) and provider directory into the container so MORI IO can discover Ionic NICs - Set VLLM_HOST_IP to each node's overlay IP so MoRIIO control plane (ZMQ handshake, block allocation notifications, proxy registration) routes through the overlay network instead of unreachable public IPs - Pass through MORI RDMA env vars (MORI_IB_GID_INDEX, MORI_RDMA_DEVICES, MORI_IO_LOG_LEVEL) from the launcher into the container - Switch from docker to podman for rootless container execution - Use --overlap on srun commands to avoid blocking the SLURM job step - Prefer 10.x.x.x overlay IPs for MASTER_ADDR and inter-node comms - Prefer MODEL_DIR for model path resolution before standard paths - Add PYTHONUNBUFFERED=1 for real-time Python log output - Add launch_mori_1p1d.sh convenience launcher for 1P/1D benchmarks - Update Dockerfile to install MORI from pinned commit on main Tested: DeepSeek-V3 1P/1D on 2x MI355X nodes with Ionic AINICs, full benchmark suite (ISL/OSL: 1024/1024, 8192/1024, 1024/8192, concurrency: 8-512), all requests successful with 0 failures. Made-with: Cursor --- docker/update_mori.Dockerfile | 21 +++++ ...llm_disagg_inference.ubuntu.amd.Dockerfile | 15 +++- scripts/vllm_dissag/launch_mori_1p1d.sh | 38 +++++++++ scripts/vllm_dissag/run_xPyD_models.slurm | 81 ++++++++++++------- scripts/vllm_dissag/vllm_disagg_mori_ep.sh | 23 +++++- 5 files changed, 142 insertions(+), 36 deletions(-) create mode 100644 docker/update_mori.Dockerfile create mode 100644 scripts/vllm_dissag/launch_mori_1p1d.sh diff --git a/docker/update_mori.Dockerfile b/docker/update_mori.Dockerfile new file mode 100644 index 0000000..0cea72e --- /dev/null +++ b/docker/update_mori.Dockerfile @@ -0,0 +1,21 @@ +FROM localhost/mad-mori-ep:gfx950-v1 + +ARG MORI_COMMIT=241461c0aaf8be2a502397668d4b3e1aab90a188 + +WORKDIR /app + +# Remove old mori completely (including stale C++ extensions that confuse profiler detection) +RUN pip uninstall -y mori amd-mori amd_mori 2>/dev/null || true && \ + rm -rf /usr/local/lib/python3.12/dist-packages/mori* && \ + rm -rf /app/mori + +RUN git clone --recursive https://github.com/ROCm/mori.git /app/mori && \ + cd /app/mori && \ + git checkout ${MORI_COMMIT} && \ + PYTORCH_ROCM_ARCH=gfx950 pip install -e . && \ + echo "MORI updated to $(git rev-parse --short HEAD) on $(date -u +%Y-%m-%d)" + +RUN sed -i "s|^MORI_BRANCH:.*|MORI_BRANCH: $(cd /app/mori && git rev-parse --short HEAD)|" /app/versions.txt && \ + cat /app/versions.txt + +WORKDIR /app diff --git a/docker/vllm_disagg_inference.ubuntu.amd.Dockerfile b/docker/vllm_disagg_inference.ubuntu.amd.Dockerfile index 882fc0d..30effeb 100644 --- a/docker/vllm_disagg_inference.ubuntu.amd.Dockerfile +++ b/docker/vllm_disagg_inference.ubuntu.amd.Dockerfile @@ -113,11 +113,18 @@ RUN pip install vllm-router WORKDIR /app -# versions.txt is provided by the base image and contains MORI_REPO / MORI_BRANCH entries. +# Install MORI from latest main (pinned commit). +ARG MORI_REPO=https://github.com/ROCm/mori.git +ARG MORI_COMMIT=241461c0aaf8be2a502397668d4b3e1aab90a188 RUN pip install tqdm prettytable -RUN git clone --recursive $(grep '^MORI_REPO:' /app/versions.txt | cut -d' ' -f2) && \ - cd mori && \ - git checkout $(grep '^MORI_BRANCH:' /app/versions.txt | cut -d' ' -f2) +RUN pip uninstall -y mori 2>/dev/null || true && \ + rm -rf /app/mori && \ + git clone --recursive ${MORI_REPO} /app/mori && \ + cd /app/mori && \ + git checkout ${MORI_COMMIT} && \ + PYTORCH_ROCM_ARCH=${GFX_COMPILATION_ARCH} pip install . && \ + echo "MORI_REPO: ${MORI_REPO}" > /tmp/_mori_ver && \ + echo "MORI_BRANCH: $(git rev-parse --short HEAD)" >> /tmp/_mori_ver RUN git clone --no-checkout --filter=blob:none https://github.com/ROCm/rocm-systems.git && cd rocm-systems && \ git sparse-checkout set --cone projects/rocshmem && \ diff --git a/scripts/vllm_dissag/launch_mori_1p1d.sh b/scripts/vllm_dissag/launch_mori_1p1d.sh new file mode 100644 index 0000000..8b40fbe --- /dev/null +++ b/scripts/vllm_dissag/launch_mori_1p1d.sh @@ -0,0 +1,38 @@ +#!/bin/bash +set -euo pipefail + +JOBID=1300 +NODE0=mi355-gpu-39 +NODE1=mi355-gpu-40 + +export DOCKER_IMAGE_NAME="localhost/mad-mori-ep:gfx950-v2" +export MODEL_NAME="DeepSeek-V3" +export MODEL_DIR="/shared/amdgpu/home/ravgupta_qle/models" +export xP=1 +export yD=1 +export RUN_MORI=1 +export BENCHMARK_COMBINATIONS="1024/1024 8192/1024 1024/8192" +export BENCHMARK_CON="8 16 32 64 128 256 512" +export BENCHMARK_ITR=1 +export LOG_PATH="/shared/amdgpu/home/ravgupta_qle/logs_mori" +export GLOO_SOCKET_IFNAME=enp193s0f1np1 +export NCCL_SOCKET_IFNAME=enp193s0f1np1 + +# MORI IO RDMA: use global IPv6 GID for routed RoCE over Ionic AINICs +export MORI_IB_GID_INDEX=1 +export MORI_IO_LOG_LEVEL=INFO + +export SLURM_JOB_ID=$JOBID +export SLURM_JOB_NODELIST="${NODE0},${NODE1}" +export SLURM_NNODES=2 +export SLURM_NTASKS=2 +export SLURM_NTASKS_PER_NODE=1 +export SLURM_SUBMIT_DIR="$(cd "$(dirname "$0")" && pwd)" + +echo "=== Launching MORI EP 1P/1D ===" +echo "Nodes: $NODE0, $NODE1" +echo "Model: $MODEL_NAME" +echo "Image: $DOCKER_IMAGE_NAME" +echo "" + +bash "$(dirname "$0")/run_xPyD_models.slurm" 2>&1 | tee log_mori_${MODEL_NAME}_1p1d.log diff --git a/scripts/vllm_dissag/run_xPyD_models.slurm b/scripts/vllm_dissag/run_xPyD_models.slurm index 87c9713..7985fc3 100755 --- a/scripts/vllm_dissag/run_xPyD_models.slurm +++ b/scripts/vllm_dissag/run_xPyD_models.slurm @@ -191,7 +191,7 @@ check_model_path() { echo "Checking $check_name: $path" # Run check on all nodes in parallel - srun --nodes=$SLURM_NNODES --ntasks=$SLURM_NNODES /bin/bash -c " + srun --overlap --nodes=$SLURM_NNODES --ntasks=$SLURM_NNODES /bin/bash -c " if [ -d '$path' ]; then echo \"\$(hostname): ✓ Found $path\" exit 0 @@ -212,24 +212,23 @@ check_model_path() { fi } -# Check /mnt/m2m_nobackup/models_blog first -MODEL_PATH_1="/mnt/m2m_nobackup/models_blog/$MODEL_NAME" -if check_model_path "$MODEL_PATH_1" "/mnt/m2m_nobackup/models_blog"; then - MODEL_PATH="$MODEL_PATH_1" +# Check MODEL_DIR first (user-provided), then standard paths +if check_model_path "$MODEL_DIR/$MODEL_NAME" "$MODEL_DIR"; then + MODEL_PATH="$MODEL_DIR/$MODEL_NAME" echo "" echo "✓ Selected MODEL_PATH: $MODEL_PATH (available on all nodes)" -# Check /shared-inference/models_blog -elif check_model_path "/shared_inference/models_blog/$MODEL_NAME" "/shared_inference/models_blog"; then - MODEL_PATH="/shared_inference/models_blog/$MODEL_NAME" +elif check_model_path "/mnt/m2m_nobackup/models_blog/$MODEL_NAME" "/mnt/m2m_nobackup/models_blog"; then + MODEL_PATH="/mnt/m2m_nobackup/models_blog/$MODEL_NAME" echo "" echo "✓ Selected MODEL_PATH: $MODEL_PATH (available on all nodes)" -elif check_model_path "$MODEL_DIR/$MODEL_NAME" "$MODEL_DIR"; then - MODEL_PATH="$MODEL_DIR/$MODEL_NAME" +elif check_model_path "/shared_inference/models_blog/$MODEL_NAME" "/shared_inference/models_blog"; then + MODEL_PATH="/shared_inference/models_blog/$MODEL_NAME" echo "" echo "✓ Selected MODEL_PATH: $MODEL_PATH (available on all nodes)" else echo "" echo "✗ FATAL ERROR: Model '$MODEL_NAME' not found on ALL allocated nodes in either:" + echo " - $MODEL_DIR/$MODEL_NAME" echo " - /mnt/m2m_nobackup/models_blog/$MODEL_NAME" echo " - /shared_inference/models_blog/$MODEL_NAME" echo "" @@ -322,16 +321,23 @@ echo "" # Node information USER_NAME=$(whoami) MASTER_NODE=$(echo "$SELECTED_NODES" | head -n 1) -MASTER_ADDR=$(srun --nodes=1 --ntasks=1 --time=00:20:00 --nodelist="$MASTER_NODE" bash -c 'hostname -I') -MASTER_ADDR=$(echo "$MASTER_ADDR" | awk 'NR==1 {print $1}') -MASTER_PORT=39566 # Choose an open port +# Prefer 10.x.x.x overlay IPs for inter-node communication. +_pick_overlay_ip() { + local raw + raw=$(srun --overlap --nodes=1 --ntasks=1 --time=00:20:00 --nodelist="$1" bash -c 'hostname -I') + local ip + for ip in $raw; do + case "$ip" in 10.*) echo "$ip"; return ;; esac + done + echo "$raw" | awk 'NR==1 {print $1}' +} -IPS=() +MASTER_ADDR=$(_pick_overlay_ip "$MASTER_NODE") +MASTER_PORT=39566 +IPS=() for NODE in $SELECTED_NODES; do - IP=$(srun --nodes=1 --ntasks=1 --time=00:20:00 --nodelist="$NODE" bash -c 'hostname -I') - IP=$(echo "$IP" | awk 'NR==1 {print $1}') - IPS+=("$IP") + IPS+=("$(_pick_overlay_ip "$NODE")") done echo "Selected node IPs: ${IPS[*]}" | sed 's/ /,/g' @@ -378,32 +384,37 @@ export RUN_FILE_FULL="$NIXL_COOKBOOK_PATH/${RUN_FILE}" # Use only the selected nodes for srun execution SELECTED_NODELIST_SRUN=$(echo "$SELECTED_NODES" | paste -sd,) -srun --nodelist="$SELECTED_NODELIST_SRUN" bash -c ' +srun --overlap --nodelist="$SELECTED_NODELIST_SRUN" bash -c ' echo "Rank $SLURM_PROCID on $(hostname)"; -docker ps -q | xargs --no-run-if-empty docker stop; +podman ps -q | xargs --no-run-if-empty podman stop 2>/dev/null || true; fuser -k 5000/tcp 2>/dev/null || true; fuser -k 2222/tcp 2>/dev/null || true; fuser -k 15000/tcp 2>/dev/null || true; +fuser -k 20005/tcp 2>/dev/null || true; +fuser -k 10001/tcp 2>/dev/null || true; sleep 2; -docker pull $DOCKER_IMAGE_NAME 2>/dev/null || true; + +# Set VLLM_HOST_IP to this node'\''s overlay IP so MoRIIO uses routable addresses +_VLLM_HOST_IP=$(echo "$IPADDRS" | tr "," "\n" | sed -n "$((SLURM_PROCID + 1))p") +echo "[vllm-host-ip] SLURM_PROCID=$SLURM_PROCID VLLM_HOST_IP=$_VLLM_HOST_IP" # --- Build host RDMA library mounts --- -# Mount the host MLNX OFED userspace libraries into the container so that -# libmlx5 / libibverbs always match the host kernel module, preventing -# mlx5dv_devx_alloc_uar failures from ABI mismatches. _RDMA_MOUNTS="" _LIBDIR=/usr/lib/x86_64-linux-gnu -for _lib in libmlx5.so libmlx5.so.1 libibverbs.so libibverbs.so.1 librdmacm.so librdmacm.so.1; do +for _lib in libmlx5.so libmlx5.so.1 libibverbs.so libibverbs.so.1 librdmacm.so librdmacm.so.1 libionic.so libionic.so.1; do [ -e "$_LIBDIR/$_lib" ] && _RDMA_MOUNTS="$_RDMA_MOUNTS -v $_LIBDIR/$_lib:$_LIBDIR/$_lib:ro" done -for _vlib in $_LIBDIR/libmlx5.so.1.* $_LIBDIR/libibverbs.so.1.* $_LIBDIR/librdmacm.so.1.*; do +for _vlib in $_LIBDIR/libmlx5.so.1.* $_LIBDIR/libibverbs.so.1.* $_LIBDIR/librdmacm.so.1.* $_LIBDIR/libionic.so.1.*; do [ -e "$_vlib" ] && _RDMA_MOUNTS="$_RDMA_MOUNTS -v $_vlib:$_vlib:ro" done [ -d "$_LIBDIR/libibverbs" ] && _RDMA_MOUNTS="$_RDMA_MOUNTS -v $_LIBDIR/libibverbs:$_LIBDIR/libibverbs:ro" [ -d /etc/libibverbs.d ] && _RDMA_MOUNTS="$_RDMA_MOUNTS -v /etc/libibverbs.d:/etc/libibverbs.d:ro" + +# Ionic RDMA: if libibverbs dir is already mounted from host, the ionic +# provider inside it is included automatically; no extra mounts needed. echo "[host-rdma] mounts: $_RDMA_MOUNTS" -docker run --rm \ +podman run --rm \ --device /dev/dri \ --device /dev/kfd \ --device /dev/infiniband \ @@ -414,10 +425,11 @@ docker run --rm \ --security-opt seccomp=unconfined \ --privileged \ -v $HOME:$HOME \ - -v /shared_inference:/shared_inference \ - -v /mnt/m2m_nobackup:/mnt/m2m_nobackup \ + $([ -d /shared_inference ] && echo "-v /shared_inference:/shared_inference") \ + $([ -d /shared/amdgpu ] && echo "-v /shared/amdgpu:/shared/amdgpu") \ + $([ -d /mnt/m2m_nobackup ] && echo "-v /mnt/m2m_nobackup:/mnt/m2m_nobackup") \ + -v /sys:/sys:ro \ -v $HOME/.ssh:/root/.ssh \ - --shm-size 64G \ --ulimit nofile=524288:524288 \ -v ${LOG_PATH}:/run_logs \ -v $NIXL_REPO_DIR:$NIXL_COOKBOOK_PATH \ @@ -447,11 +459,20 @@ docker run --rm \ -e ENABLE_DBO=${ENABLE_DBO:-} \ -e DBO_COMM_SMS=${DBO_COMM_SMS:-} \ -e ENABLE_PROFILING=${ENABLE_PROFILING:-} \ + -e GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-eth0} \ + -e NCCL_SOCKET_IFNAME=${NCCL_SOCKET_IFNAME:-eth0} \ + -e MORI_GPU_ARCHS=${MORI_GPU_ARCHS:-gfx950} \ + -e MORI_RDMA_DEVICES="${MORI_RDMA_DEVICES:-}" \ + -e MORI_IB_GID_INDEX="${MORI_IB_GID_INDEX:-}" \ + -e MORI_IO_LOG_LEVEL="${MORI_IO_LOG_LEVEL:-INFO}" \ + -e VLLM_HOST_IP=$_VLLM_HOST_IP \ + -e PYTHONUNBUFFERED=1 \ --name $DOCKER_CONT_NAME \ $DOCKER_IMAGE_NAME -c " mkdir -p /run_logs/${SLURM_JOB_ID} + rm -rf /root/.triton/cache/ /tmp/triton* /tmp/torchinductor_root/ $RUN_FILE_FULL 2>&1 | tee /run_logs/${SLURM_JOB_ID}/pd_vllm_bench_NODE${SLURM_PROCID}.log " ' -srun --nodelist="$SELECTED_NODELIST_SRUN" bash -c 'docker stop $DOCKER_CONT_NAME 2>/dev/null || true; docker rm $DOCKER_CONT_NAME 2>/dev/null || true' +srun --overlap --nodelist="$SELECTED_NODELIST_SRUN" bash -c 'podman stop $DOCKER_CONT_NAME 2>/dev/null || true; podman rm $DOCKER_CONT_NAME 2>/dev/null || true' diff --git a/scripts/vllm_dissag/vllm_disagg_mori_ep.sh b/scripts/vllm_dissag/vllm_disagg_mori_ep.sh index db5dd57..e96dacd 100755 --- a/scripts/vllm_dissag/vllm_disagg_mori_ep.sh +++ b/scripts/vllm_dissag/vllm_disagg_mori_ep.sh @@ -59,7 +59,12 @@ echo "PREFILL_DP_START_RANK=${PREFILL_DP_START_RANK}" echo "PREFILL_MASTER_ADDR=${PREFILL_MASTER_ADDR}" echo "DECODE_DP_START_RANK=${DECODE_DP_START_RANK}" echo "DECODE_MASTER_ADDR=${DECODE_MASTER_ADDR}" -host_ip=$(hostname -I | awk '{print $1}') +# Prefer 10.x.x.x overlay IP for inter-node communication +host_ip="" +for _ip in $(hostname -I); do + case "$_ip" in 10.*) host_ip="$_ip"; break ;; esac +done +[ -z "$host_ip" ] && host_ip=$(hostname -I | awk '{print $1}') host_name=$(hostname) # ============================================================================= @@ -74,10 +79,24 @@ setup_mori_env() { export VLLM_ROCM_USE_AITER_MLA=1 export VLLM_ROCM_USE_AITER_FUSION_SHARED_EXPERTS=0 export VLLM_ALL2ALL_BACKEND=mori - export GLOO_SOCKET_IFNAME=eth0 + export GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-eth0} export VLLM_ENGINE_READY_TIMEOUT_S=3600 export VLLM_RINGBUFFER_WARNING_INTERVAL=3600 export VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS=3600 + export MORI_GPU_ARCHS=${MORI_GPU_ARCHS:-gfx950} + export MORI_SHMEM_HEAP_SIZE=${MORI_SHMEM_HEAP_SIZE:-6442450944} + export ROCSHMEM_TEST_UUID=1 + + # MORI IO RDMA configuration for Ionic AINIC + if [ -n "${MORI_RDMA_DEVICES:-}" ]; then + export MORI_RDMA_DEVICES="${MORI_RDMA_DEVICES}" + fi + if [ -n "${MORI_IB_GID_INDEX:-}" ]; then + export MORI_IB_GID_INDEX="${MORI_IB_GID_INDEX}" + fi + if [ -n "${MORI_IO_LOG_LEVEL:-}" ]; then + export MORI_IO_LOG_LEVEL="${MORI_IO_LOG_LEVEL}" + fi } build_kv_transfer_config() { From 7f25e71ba9acbe9924009bc700287b435c209956 Mon Sep 17 00:00:00 2001 From: Ravi Gupta Date: Thu, 16 Apr 2026 09:46:00 +0000 Subject: [PATCH 2/2] [DistInf] Enable multi-node 2P/2D MoRI EP disaggregated inference on Ionic AINIC Extends the Ionic AINIC RDMA support to multi-node disaggregated inference with 2 Prefill + 2 Decode nodes (DP=16). Key changes: - Remove 1P/1D restriction from run_xPyD_models.slurm and vllm_disagg_mori_ep.sh to allow xP>1 / yD>1 topologies - Add --ulimit memlock=-1:-1 to podman for large RDMA memory registrations (>32GB) required by MoRI IO - Pass NCCL_IB_HCA, NCCL_IB_GID_INDEX, NCCL_NET_GDR_LEVEL, NCCL_CROSS_NIC, and MORI_SOCKET_IFNAME into containers for proper multi-node RCCL and MoRI bootstrap over Ionic AINICs - Add apply_moriio_2pd_patches.sh for runtime vLLM patches (PR vllm-project/vllm#39276) fixing engine_id collisions and MoRIIO robustness in multi-node DP configurations - Restrict --kv-transfer-config to master nodes only (child nodes join via --headless and participate in EP all-to-all) - Add launch_mori_2p2d.sh example launcher for 2P/2D benchmarks Tested on AAC MI355X cluster with Ionic RDMA NICs achieving balanced RDMA traffic across all 4 nodes and 1,344 tok/s total throughput on DeepSeek-V3-5layer. Made-with: Cursor --- .../vllm_dissag/apply_moriio_2pd_patches.sh | 331 ++++++++++++++++++ scripts/vllm_dissag/launch_mori_2p2d.sh | 69 ++++ scripts/vllm_dissag/run_xPyD_models.slurm | 11 +- scripts/vllm_dissag/vllm_disagg_mori_ep.sh | 31 +- 4 files changed, 431 insertions(+), 11 deletions(-) create mode 100755 scripts/vllm_dissag/apply_moriio_2pd_patches.sh create mode 100755 scripts/vllm_dissag/launch_mori_2p2d.sh diff --git a/scripts/vllm_dissag/apply_moriio_2pd_patches.sh b/scripts/vllm_dissag/apply_moriio_2pd_patches.sh new file mode 100755 index 0000000..984b408 --- /dev/null +++ b/scripts/vllm_dissag/apply_moriio_2pd_patches.sh @@ -0,0 +1,331 @@ +#!/bin/bash +# apply_moriio_2pd_patches.sh — Runtime patches for multi-node disagg DP (PR #39276) +# Applied at container startup before vLLM launches. +# Once PR #39276 is merged upstream, this script becomes a no-op. +set -euo pipefail + +VLLM_ROOT=$(python3 -c "import vllm, os; print(os.path.dirname(vllm.__file__))") +MORIIO_DIR="${VLLM_ROOT}/distributed/kv_transfer/kv_connector/v1/moriio" +ENGINE_DIR="${VLLM_ROOT}/v1/engine" + +echo "[patch] vLLM root: ${VLLM_ROOT}" +echo "[patch] Applying PR #39276 runtime patches for multi-node disagg DP..." + +# -------------------------------------------------------------------------- +# Patch 1: core.py — engine_id: local_dp_rank → dp_rank +# -------------------------------------------------------------------------- +CORE_PY="${ENGINE_DIR}/core.py" +if grep -q 'engine_id}_dp{local_dp_rank}' "$CORE_PY" 2>/dev/null; then + sed -i 's/engine_id}_dp{local_dp_rank}/engine_id}_dp{dp_rank}/g' "$CORE_PY" + echo "[patch] core.py: engine_id fixed (local_dp_rank → dp_rank)" +else + echo "[patch] core.py: already patched or not applicable" +fi + +# -------------------------------------------------------------------------- +# Patch 2: utils.py — engine_id: local_index → index +# -------------------------------------------------------------------------- +UTILS_PY="${ENGINE_DIR}/utils.py" +if grep -q 'engine_id}_dp{local_index}' "$UTILS_PY" 2>/dev/null; then + sed -i 's/engine_id}_dp{local_index}/engine_id}_dp{index}/g' "$UTILS_PY" + echo "[patch] utils.py: engine_id fixed (local_index → index)" +else + echo "[patch] utils.py: already patched or not applicable" +fi + +# -------------------------------------------------------------------------- +# Patch 3: moriio_common.py — Use data_parallel_size_local and local dp_rank +# -------------------------------------------------------------------------- +COMMON_PY="${MORIIO_DIR}/moriio_common.py" + +# 3a: dp_rank = ... % data_parallel_size_local +if grep -q 'dp_rank = vllm_config.parallel_config.data_parallel_rank$' "$COMMON_PY" 2>/dev/null; then + sed -i 's/dp_rank = vllm_config.parallel_config.data_parallel_rank$/dp_rank = (vllm_config.parallel_config.data_parallel_rank % vllm_config.parallel_config.data_parallel_size_local)/' "$COMMON_PY" + echo "[patch] moriio_common.py: dp_rank uses local modulo" +else + echo "[patch] moriio_common.py: dp_rank already patched" +fi + +# 3b: dp_size = data_parallel_size_local +if grep -q 'dp_size = vllm_config.parallel_config.data_parallel_size$' "$COMMON_PY" 2>/dev/null; then + sed -i 's/dp_size = vllm_config.parallel_config.data_parallel_size$/dp_size = vllm_config.parallel_config.data_parallel_size_local/' "$COMMON_PY" + echo "[patch] moriio_common.py: dp_size uses local size" +else + echo "[patch] moriio_common.py: dp_size already patched" +fi + +# 3c: Default ports for remote_handshake_port and remote_notify_port +if grep -q 'remote_handshake_port=kv_transfer_params\["remote_handshake_port"\]' "$COMMON_PY" 2>/dev/null; then + sed -i 's/remote_handshake_port=kv_transfer_params\["remote_handshake_port"\]/remote_handshake_port=kv_transfer_params.get("remote_handshake_port", 8405)/' "$COMMON_PY" + sed -i 's/remote_notify_port=kv_transfer_params\["remote_notify_port"\]/remote_notify_port=kv_transfer_params.get("remote_notify_port", 61005)/' "$COMMON_PY" + echo "[patch] moriio_common.py: default ports added" +else + echo "[patch] moriio_common.py: default ports already patched" +fi + +# -------------------------------------------------------------------------- +# Patch 4: moriio_connector.py — Full multi-node DP fixes +# -------------------------------------------------------------------------- +CONNECTOR_PY="${MORIIO_DIR}/moriio_connector.py" + +# 4a: Add _is_kv_master flag and _req_kv_params cache and local dp_rank +python3 << 'PYEOF' +import re, sys + +fpath = sys.argv[1] if len(sys.argv) > 1 else "" +if not fpath: + sys.exit(1) + +with open(fpath, "r") as f: + src = f.read() + +changed = False + +# Fix: dp_rank use local modulo in scheduler connector +old = "self.dp_rank = self.vllm_config.parallel_config.data_parallel_rank\n" +new = ("self.dp_rank = (self.vllm_config.parallel_config.data_parallel_rank\n" + " % self.vllm_config.parallel_config\n" + " .data_parallel_size_local)\n" + " self._is_kv_master = (\n" + " self.vllm_config.parallel_config.data_parallel_rank\n" + " < self.vllm_config.parallel_config.data_parallel_size_local)\n") +if old in src and "_is_kv_master" not in src: + src = src.replace(old, new, 1) + changed = True + print("[patch] moriio_connector.py: _is_kv_master + local dp_rank added") + +# Fix: Add _req_kv_params dict +old2 = "self._reqs_need_save: dict[ReqId, tuple[Request, list[int]]] = {}" +new2 = (old2 + "\n self._req_kv_params: dict[ReqId, dict] = {}") +if old2 in src and "_req_kv_params" not in src: + src = src.replace(old2, new2, 1) + changed = True + print("[patch] moriio_connector.py: _req_kv_params dict added") + +# Fix: Cache kv_transfer_params on do_remote_decode +old3 = 'self._reqs_need_save[request.request_id] = (request, local_block_ids)' +new3 = (old3 + '\n self._req_kv_params[request.request_id] = dict(params)') +if old3 in src and '_req_kv_params[request.request_id] = dict(params)' not in src: + src = src.replace(old3, new3, 1) + changed = True + print("[patch] moriio_connector.py: cached kv_params for do_remote_decode") + +# Fix: Guard send_notify_block with _is_kv_master +old4 = " for tp_index in range(self.tp_size):\n target_port = request.kv_transfer_params[\n" +new4 = " if self._is_kv_master:\n for tp_index in range(self.tp_size):\n target_port = request.kv_transfer_params[\n" +if old4 in src and "if self._is_kv_master:" not in src: + src = src.replace(old4, new4, 1) + changed = True + print("[patch] moriio_connector.py: send_notify_block guarded with _is_kv_master") + +# Fix: block_size assertion → graceful override +old5 = " assert block_size == self.block_size" +new5 = (" if block_size != self.block_size:\n" + " logger.info(\n" + ' "KV cache block_size=%d differs from config block_size=%d; "\n' + ' "using actual tensor shape (attention backend override).",\n' + " block_size, self.block_size)\n" + " self.block_size = block_size") +if old5 in src: + src = src.replace(old5, new5, 1) + changed = True + print("[patch] moriio_connector.py: block_size assert → graceful override") + +# Fix: Use cached _req_kv_params in build_connector_meta for _reqs_need_recv +old6 = " for req_id, (req, block_ids) in self._reqs_need_recv.items():\n assert req.kv_transfer_params is not None\n" +new6 = (" for req_id, (req, block_ids) in self._reqs_need_recv.items():\n" + " kv_params = self._req_kv_params.get(\n" + " req_id, req.kv_transfer_params or {}\n" + " )\n") +if old6 in src: + src = src.replace(old6, new6, 1) + changed = True + print("[patch] moriio_connector.py: _reqs_need_recv uses cached params") + +# Fix: Use cached _req_kv_params for _reqs_need_save +old7 = " for req_id, (req, block_ids) in self._reqs_need_save.items():\n assert req.kv_transfer_params is not None\n" +new7 = (" for req_id, (req, block_ids) in self._reqs_need_save.items():\n" + " kv_params = self._req_kv_params.get(\n" + " req_id, req.kv_transfer_params or {}\n" + " )\n") +if old7 in src: + src = src.replace(old7, new7, 1) + changed = True + print("[patch] moriio_connector.py: _reqs_need_save uses cached params") + +# Fix: Add _recving_transfers_start dict +old8 = "self._recving_transfers_callback_addr: dict[ReqId, tuple[str, str]] = {}" +new8 = (old8 + "\n self._recving_transfers_start: dict[str, float] = {}") +if old8 in src and "_recving_transfers_start" not in src: + src = src.replace(old8, new8, 1) + changed = True + print("[patch] moriio_connector.py: _recving_transfers_start dict added") + +if changed: + with open(fpath, "w") as f: + f.write(src) + print("[patch] moriio_connector.py: all patches applied") +else: + print("[patch] moriio_connector.py: already patched or no changes needed") +PYEOF +python3 /dev/stdin "$CONNECTOR_PY" < /dev/null 2>&1 || echo "[patch] WARNING: connector patch script had errors" + +# -------------------------------------------------------------------------- +# Patch 5: moriio_engine.py — Timeouts, failure detection, ZMQ retry +# -------------------------------------------------------------------------- +ENGINE_PY="${MORIIO_DIR}/moriio_engine.py" + +# 5a: Add imports (os, time) +if ! grep -q '^import os$' "$ENGINE_PY" 2>/dev/null; then + sed -i '1,/^import threading/s/^import threading/import os\nimport threading\nimport time/' "$ENGINE_PY" + echo "[patch] moriio_engine.py: added os/time imports" +fi + +# 5b: Deferred task timeout +if ! grep -q 'VLLM_MORIIO_DEFERRED_TIMEOUT_S' "$ENGINE_PY" 2>/dev/null; then + python3 << 'PYEOF2' +import sys + +fpath = sys.argv[1] if len(sys.argv) > 1 else "" +if not fpath: + sys.exit(1) + +with open(fpath, "r") as f: + src = f.read() + +changed = False + +# Deferred task timeout +old = """ still_deferred: list[WriteTask] = [] + for task in self._deferred_tasks: + if self._is_remote_ready(task): + self._execute_write_task(task) + else: + still_deferred.append(task)""" + +new = """ _defer_timeout = int( + os.environ.get("VLLM_MORIIO_DEFERRED_TIMEOUT_S", "300")) + still_deferred: list[WriteTask] = [] + _now = time.monotonic() + for task in self._deferred_tasks: + if self._is_remote_ready(task): + self._execute_write_task(task) + elif (hasattr(task, '_defer_time') + and (_now - task._defer_time) > _defer_timeout): + logger.error( + "Deferred write task EXPIRED for req %s " + "(transfer %s) after %ds", + task.request_id, task.transfer_id, _defer_timeout) + else: + if not hasattr(task, '_defer_time'): + task._defer_time = _now + still_deferred.append(task)""" + +if old in src: + src = src.replace(old, new, 1) + changed = True + print("[patch] moriio_engine.py: deferred task timeout added") + +# Transfer timeout (replace blocking Wait) +old2 = """ for status in transfers_to_wait: + try: + status.Wait() + if not status.Succeeded(): + logger.error( + "Transfer failed: %s, Code: %s", status.Message(), status.Code() + ) + raise TransferError("MoRIIO transfer failed!") + except Exception as e: + logger.error("Transfer %s failed: %s", status, e) + raise""" + +new2 = """ _xfer_timeout = int( + os.environ.get("VLLM_MORIIO_TRANSFER_TIMEOUT_S", "120")) + for status in transfers_to_wait: + _deadline = time.monotonic() + _xfer_timeout + while status.InProgress(): + if time.monotonic() > _deadline: + logger.error( + "RDMA write timed out after %ds " + "(Code=%s, Msg=%s)", + _xfer_timeout, status.Code(), + status.Message()) + break + time.sleep(0.001) + if status.Failed(): + logger.error( + "Transfer failed: %s, Code: %s", + status.Message(), status.Code()) + elif not status.Succeeded(): + logger.error( + "Transfer did not succeed " + "(timeout or unknown state, Code=%s)", + status.Code())""" + +if old2 in src: + src = src.replace(old2, new2, 1) + changed = True + print("[patch] moriio_engine.py: transfer timeout added") + +# ZMQ retry +old3 = """ sock = self.paths[path] + try: + for req_id in req_list: + if not isinstance(req_id, str): + logger.warning( + "Invalid req_id type: %s, expected str", type(req_id) + ) + continue + sock.send(req_id.encode("utf-8")) + except Exception as e: + logger.error("Failed to send notification to %s: %s", path, e) + self.paths.pop(path, None) + raise""" + +new3 = """ sock = self.paths[path] + _MAX_RETRIES = 3 + for req_id in req_list: + if not isinstance(req_id, str): + logger.warning( + "Invalid req_id type: %s, expected str", type(req_id)) + continue + for _attempt in range(_MAX_RETRIES): + try: + sock.send(req_id.encode("utf-8"), zmq.NOBLOCK) + break + except zmq.Again: + if _attempt < _MAX_RETRIES - 1: + time.sleep(0.01 * (_attempt + 1)) + logger.warning( + "ZMQ send retry %d for req %s to %s", + _attempt + 1, req_id, path) + else: + logger.error( + "ZMQ send FAILED after %d retries " + "for req %s to %s", + _MAX_RETRIES, req_id, path) + except Exception as e: + logger.error( + "Failed to send notification to %s: %s", + path, e) + self.paths.pop(path, None) + raise""" + +if old3 in src: + src = src.replace(old3, new3, 1) + changed = True + print("[patch] moriio_engine.py: ZMQ retry added") + +if changed: + with open(fpath, "w") as f: + f.write(src) + print("[patch] moriio_engine.py: all patches applied") +else: + print("[patch] moriio_engine.py: already patched or no changes needed") +PYEOF2 + python3 /dev/stdin "$ENGINE_PY" < /dev/null 2>&1 || echo "[patch] WARNING: engine patch script had errors" +else + echo "[patch] moriio_engine.py: already patched" +fi + +echo "[patch] All PR #39276 patches applied successfully." diff --git a/scripts/vllm_dissag/launch_mori_2p2d.sh b/scripts/vllm_dissag/launch_mori_2p2d.sh new file mode 100755 index 0000000..0058a3b --- /dev/null +++ b/scripts/vllm_dissag/launch_mori_2p2d.sh @@ -0,0 +1,69 @@ +#!/bin/bash +# launch_mori_2p2d.sh — Example launcher for 2-Prefill / 2-Decode MoRI EP +# disaggregated inference over Ionic AINIC RDMA on AMD MI355X clusters. +# +# Prerequisites: +# 1. 4-node SLURM allocation with full CPUs: +# sbatch --wrap="sleep infinity" -N4 --cpus-per-task=256 \ +# --nodelist=,,, ... +# 2. Docker image loaded on all nodes: +# localhost/mad-mori-ep:gfx950-v2 +# 3. Model downloaded to shared storage. +# +# Usage: Edit JOBID and NODE0-3 below, then: +# bash launch_mori_2p2d.sh +set -euo pipefail + +# ── Cluster-specific settings (edit these) ─────────────────────────────────── +JOBID=${JOBID:-1346} +NODE0=${NODE0:-mi355-gpu-39} # Prefill master + proxy +NODE1=${NODE1:-mi355-gpu-40} # Prefill child +NODE2=${NODE2:-mi355-gpu-51} # Decode master +NODE3=${NODE3:-mi355-gpu-55} # Decode child + +export DOCKER_IMAGE_NAME="${DOCKER_IMAGE_NAME:-localhost/mad-mori-ep:gfx950-v2}" +export MODEL_NAME="${MODEL_NAME:-DeepSeek-V3-5layer}" +export MODEL_DIR="${MODEL_DIR:-/shared/amdgpu/home/ravgupta_qle/models}" + +# ── Disaggregated inference topology ───────────────────────────────────────── +export xP=2 +export yD=2 +export RUN_MORI=1 +export BENCHMARK_COMBINATIONS="${BENCHMARK_COMBINATIONS:-512/512}" +export BENCHMARK_CON="${BENCHMARK_CON:-8 16}" +export BENCHMARK_ITR="${BENCHMARK_ITR:-1}" +export LOG_PATH="${LOG_PATH:-/shared/amdgpu/home/ravgupta_qle/logs_mori}" + +# ── Network interface for TCP control plane ────────────────────────────────── +export GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-enp193s0f1np1} +export NCCL_SOCKET_IFNAME=${NCCL_SOCKET_IFNAME:-enp193s0f1np1} +export MORI_SOCKET_IFNAME=${MORI_SOCKET_IFNAME:-enp193s0f1np1} + +# ── NCCL/RCCL multi-node IB transport over Ionic AINICs ───────────────────── +# Exclude Broadcom frontend NICs whose GID[1] is fe80:: (link-local, not routable) +export NCCL_IB_HCA="${NCCL_IB_HCA:-^rocep193s0f0,rocep193s0f1}" +export NCCL_IB_GID_INDEX=${NCCL_IB_GID_INDEX:-1} +export NCCL_NET_GDR_LEVEL=${NCCL_NET_GDR_LEVEL:-3} +export NCCL_CROSS_NIC=${NCCL_CROSS_NIC:-1} +export NCCL_DEBUG=${NCCL_DEBUG:-WARN} + +# ── MORI IO RDMA configuration ────────────────────────────────────────────── +export MORI_IB_GID_INDEX=${MORI_IB_GID_INDEX:-1} +export MORI_IO_LOG_LEVEL=${MORI_IO_LOG_LEVEL:-INFO} + +# ── SLURM environment (synthesized for the launcher) ───────────────────────── +export SLURM_JOB_ID=$JOBID +export SLURM_JOB_NODELIST="${NODE0},${NODE1},${NODE2},${NODE3}" +export SLURM_NNODES=4 +export SLURM_NTASKS=4 +export SLURM_NTASKS_PER_NODE=1 +export SLURM_SUBMIT_DIR="$(cd "$(dirname "$0")" && pwd)" + +echo "=== Launching MORI EP 2P/2D ===" +echo "Nodes: ${NODE0} (prefill master+proxy), ${NODE1} (prefill child)," +echo " ${NODE2} (decode master), ${NODE3} (decode child)" +echo "Model: $MODEL_NAME" +echo "Image: $DOCKER_IMAGE_NAME" +echo "" + +bash "$(dirname "$0")/run_xPyD_models.slurm" 2>&1 | tee log_mori_${MODEL_NAME}_2p2d.log diff --git a/scripts/vllm_dissag/run_xPyD_models.slurm b/scripts/vllm_dissag/run_xPyD_models.slurm index 7985fc3..97c6816 100755 --- a/scripts/vllm_dissag/run_xPyD_models.slurm +++ b/scripts/vllm_dissag/run_xPyD_models.slurm @@ -122,10 +122,6 @@ if [[ "$_run_mori" == "1" && "$_run_deepep" == "1" ]]; then fi if [[ "$_run_mori" == "1" ]]; then - if (( xP != 1 || yD != 1 )); then - echo "Error: RUN_MORI=1 requires xP=1 and yD=1 (got xP=$xP, yD=$yD). Multi-node MoRI EP is not yet supported." >&2 - exit 1 - fi if model_allows_mori_ep "$MODEL_NAME"; then RUN_FILE="vllm_disagg_mori_ep.sh" echo "RUN_MORI=1: using $RUN_FILE for model '$MODEL_NAME'" @@ -431,6 +427,7 @@ podman run --rm \ -v /sys:/sys:ro \ -v $HOME/.ssh:/root/.ssh \ --ulimit nofile=524288:524288 \ + --ulimit memlock=-1:-1 \ -v ${LOG_PATH}:/run_logs \ -v $NIXL_REPO_DIR:$NIXL_COOKBOOK_PATH \ $_RDMA_MOUNTS \ @@ -461,6 +458,12 @@ podman run --rm \ -e ENABLE_PROFILING=${ENABLE_PROFILING:-} \ -e GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-eth0} \ -e NCCL_SOCKET_IFNAME=${NCCL_SOCKET_IFNAME:-eth0} \ + -e NCCL_IB_HCA="${NCCL_IB_HCA:-}" \ + -e NCCL_IB_GID_INDEX=${NCCL_IB_GID_INDEX:-} \ + -e NCCL_NET_GDR_LEVEL=${NCCL_NET_GDR_LEVEL:-} \ + -e NCCL_CROSS_NIC=${NCCL_CROSS_NIC:-} \ + -e NCCL_DEBUG=${NCCL_DEBUG:-WARN} \ + -e MORI_SOCKET_IFNAME=${MORI_SOCKET_IFNAME:-} \ -e MORI_GPU_ARCHS=${MORI_GPU_ARCHS:-gfx950} \ -e MORI_RDMA_DEVICES="${MORI_RDMA_DEVICES:-}" \ -e MORI_IB_GID_INDEX="${MORI_IB_GID_INDEX:-}" \ diff --git a/scripts/vllm_dissag/vllm_disagg_mori_ep.sh b/scripts/vllm_dissag/vllm_disagg_mori_ep.sh index e96dacd..32af0e0 100755 --- a/scripts/vllm_dissag/vllm_disagg_mori_ep.sh +++ b/scripts/vllm_dissag/vllm_disagg_mori_ep.sh @@ -14,10 +14,8 @@ MODEL_PATH=$MODEL_PATH MODEL_NAME="${MODEL_NAME:-}" xP="${xP:-1}" yD="${yD:-1}" -if [ "$xP" -gt 1 ] || [ "$yD" -gt 1 ]; then - echo "Error: xP > 1 or yD > 1 is not supported yet due to MoRI IO connector issues." >&2 - exit 1 -fi +# Multi-node DP (xP>1 or yD>1) requires PR #39276 patches. +# Patches are applied at runtime by apply_moriio_2pd_patches.sh. IPADDRS="${IPADDRS:-localhost}" IFS=',' read -ra IP_ARRAY <<< "${IPADDRS}" @@ -67,6 +65,18 @@ done [ -z "$host_ip" ] && host_ip=$(hostname -I | awk '{print $1}') host_name=$(hostname) +# ============================================================================= +# Apply PR #39276 patches for multi-node DP (idempotent) +# ============================================================================= +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +if [ -f "${SCRIPT_DIR}/apply_moriio_2pd_patches.sh" ]; then + bash "${SCRIPT_DIR}/apply_moriio_2pd_patches.sh" +elif [ -f "/app/scripts/apply_moriio_2pd_patches.sh" ]; then + bash "/app/scripts/apply_moriio_2pd_patches.sh" +else + echo "[WARNING] apply_moriio_2pd_patches.sh not found — skipping runtime patches" +fi + # ============================================================================= # Helper Functions # ============================================================================= @@ -80,6 +90,9 @@ setup_mori_env() { export VLLM_ROCM_USE_AITER_FUSION_SHARED_EXPERTS=0 export VLLM_ALL2ALL_BACKEND=mori export GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-eth0} + if [ -n "${MORI_SOCKET_IFNAME:-}" ]; then + export MORI_SOCKET_IFNAME="${MORI_SOCKET_IFNAME}" + fi export VLLM_ENGINE_READY_TIMEOUT_S=3600 export VLLM_RINGBUFFER_WARNING_INTERVAL=3600 export VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS=3600 @@ -128,8 +141,12 @@ launch_vllm_worker() { extra_args+=(--data-parallel-start-rank "${dp_start_rank}" --headless) fi - local kv_config - kv_config=$(build_kv_transfer_config "${kv_role}") + local kv_args=() + if [[ "$role" == "master" ]]; then + local kv_config + kv_config=$(build_kv_transfer_config "${kv_role}") + kv_args+=(--kv-transfer-config "${kv_config}") + fi vllm serve ${MODEL_PATH} \ -tp 1 \ @@ -147,7 +164,7 @@ launch_vllm_worker() { --trust-remote-code \ --enforce-eager \ "${extra_args[@]}" \ - --kv-transfer-config "${kv_config}" \ + "${kv_args[@]}" \ 2>&1 | tee /run_logs/${SLURM_JOB_ID}/${log_prefix}_NODE${NODE_RANK}.log >/dev/null & WORKER_PID=$!