From 63c360eed0a29365ad437069a423e79381015ba1 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 5 Feb 2026 04:37:18 -0800 Subject: [PATCH] Add hybrid Slurm support to rrun with PMIx-based coordination --- cmake/thirdparty/get_pmix.cmake | 210 +++ .../all_cuda-129_arch-aarch64.yaml | 1 + .../all_cuda-129_arch-x86_64.yaml | 1 + .../all_cuda-131_arch-aarch64.yaml | 1 + .../all_cuda-131_arch-x86_64.yaml | 1 + conda/recipes/librapidsmpf/recipe.yaml | 5 +- cpp/CMakeLists.txt | 21 + cpp/benchmarks/bench_comm.cpp | 2 +- cpp/benchmarks/bench_shuffle.cpp | 2 +- .../streaming/bench_streaming_shuffle.cpp | 2 +- cpp/benchmarks/streaming/ndsh/utils.cpp | 2 +- cpp/include/rapidsmpf/bootstrap/backend.hpp | 124 ++ cpp/include/rapidsmpf/bootstrap/bootstrap.hpp | 50 +- .../rapidsmpf/bootstrap/file_backend.hpp | 41 +- .../rapidsmpf/bootstrap/slurm_backend.hpp | 145 +++ cpp/include/rapidsmpf/bootstrap/ucxx.hpp | 4 +- cpp/include/rapidsmpf/bootstrap/utils.hpp | 62 +- cpp/src/bootstrap/bootstrap.cpp | 262 ++-- cpp/src/bootstrap/file_backend.cpp | 8 +- cpp/src/bootstrap/slurm_backend.cpp | 313 +++++ cpp/src/bootstrap/ucxx.cpp | 107 +- cpp/src/bootstrap/utils.cpp | 86 +- cpp/tools/CMakeLists.txt | 13 +- cpp/tools/rrun.cpp | 1139 +++++++++++++---- dependencies.yaml | 7 + .../rapidsmpf/rapidsmpf/bootstrap/__init__.py | 6 +- .../rapidsmpf/bootstrap/bootstrap.pyi | 4 +- .../rapidsmpf/bootstrap/bootstrap.pyx | 14 +- .../rapidsmpf/examples/bulk_mpi_shuffle.py | 4 +- 29 files changed, 2153 insertions(+), 484 deletions(-) create mode 100644 cmake/thirdparty/get_pmix.cmake create mode 100644 cpp/include/rapidsmpf/bootstrap/backend.hpp create mode 100644 cpp/include/rapidsmpf/bootstrap/slurm_backend.hpp create mode 100644 cpp/src/bootstrap/slurm_backend.cpp diff --git a/cmake/thirdparty/get_pmix.cmake b/cmake/thirdparty/get_pmix.cmake new file mode 100644 index 000000000..aaaacc272 --- /dev/null +++ b/cmake/thirdparty/get_pmix.cmake @@ -0,0 +1,210 @@ +# ============================================================================= +# cmake-format: off +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +# cmake-format: on +# ============================================================================= + +#[=======================================================================[.rst: +get_pmix +-------- + +Find the PMIx (Process Management Interface - Exascale) library. + +This module finds the PMIx library, which is typically provided by Slurm +or OpenPMIx installations. PMIx enables scalable process coordination +without requiring a shared filesystem. + +Imported Targets +^^^^^^^^^^^^^^^^ + +This module provides the following imported targets, if found: + +``PMIx::PMIx`` + The PMIx library + +Result Variables +^^^^^^^^^^^^^^^^ + +This will define the following variables: + +``PMIx_FOUND`` + True if the system has the PMIx library. +``PMIx_VERSION`` + The version of the PMIx library which was found. +``PMIx_INCLUDE_DIRS`` + Include directories needed to use PMIx. +``PMIx_LIBRARIES`` + Libraries needed to link to PMIx. + +Hints +^^^^^ + +The following variables can be set to help find PMIx: + +``PMIx_ROOT`` + Root directory of PMIx installation. +``PMIX_ROOT`` + Alternative root directory variable. +``SLURM_ROOT`` + Slurm installation directory (PMIx may be bundled with Slurm). + +#]=======================================================================] + +# Extract PMIx version from header file. Sets PMIx_VERSION in parent scope if version can be +# determined. +function(_pmix_extract_version include_dir) + if(NOT EXISTS "${include_dir}/pmix_version.h") + return() + endif() + + file(STRINGS "${include_dir}/pmix_version.h" _pmix_version_lines + REGEX "#define[ \t]+PMIX_(MAJOR|MINOR|RELEASE)_VERSION" + ) + + foreach(_line ${_pmix_version_lines}) + if(_line MATCHES "#define[ \t]+PMIX_MAJOR_VERSION[ \t]+([0-9]+)") + set(_pmix_major "${CMAKE_MATCH_1}") + elseif(_line MATCHES "#define[ \t]+PMIX_MINOR_VERSION[ \t]+([0-9]+)") + set(_pmix_minor "${CMAKE_MATCH_1}") + elseif(_line MATCHES "#define[ \t]+PMIX_RELEASE_VERSION[ \t]+([0-9]+)") + set(_pmix_release "${CMAKE_MATCH_1}") + endif() + endforeach() + + if(DEFINED _pmix_major + AND DEFINED _pmix_minor + AND DEFINED _pmix_release + ) + set(PMIx_VERSION + "${_pmix_major}.${_pmix_minor}.${_pmix_release}" + PARENT_SCOPE + ) + elseif(DEFINED _pmix_major AND DEFINED _pmix_minor) + set(PMIx_VERSION + "${_pmix_major}.${_pmix_minor}" + PARENT_SCOPE + ) + endif() +endfunction() + +# Create the PMIx::PMIx imported target and find optional dependencies. +function(_pmix_create_target library include_dir) + if(TARGET PMIx::PMIx) + return() + endif() + + add_library(PMIx::PMIx UNKNOWN IMPORTED) + set_target_properties( + PMIx::PMIx PROPERTIES IMPORTED_LOCATION "${library}" INTERFACE_INCLUDE_DIRECTORIES + "${include_dir}" + ) + + # PMIx may have dependencies on libevent or hwloc. Try to find and link them if available. + find_library(EVENT_CORE_LIBRARY event_core) + find_library(EVENT_PTHREADS_LIBRARY event_pthreads) + find_library(HWLOC_LIBRARY hwloc) + + set(_pmix_extra_libs "") + foreach(_lib EVENT_CORE_LIBRARY EVENT_PTHREADS_LIBRARY HWLOC_LIBRARY) + if(${_lib}) + list(APPEND _pmix_extra_libs "${${_lib}}") + endif() + endforeach() + + if(_pmix_extra_libs) + set_property( + TARGET PMIx::PMIx + APPEND + PROPERTY INTERFACE_LINK_LIBRARIES "${_pmix_extra_libs}" + ) + endif() + + mark_as_advanced( + PMIx_INCLUDE_DIR PMIx_LIBRARY EVENT_CORE_LIBRARY EVENT_PTHREADS_LIBRARY HWLOC_LIBRARY + ) +endfunction() + +# Find and configure the PMIx library. Sets PMIx_FOUND, PMIx_VERSION, PMIx_INCLUDE_DIRS, +# PMIx_LIBRARIES in parent scope. Creates PMIx::PMIx imported target if found. +function(find_and_configure_pmix) + # Return early if already found + if(TARGET PMIx::PMIx) + set(PMIx_FOUND + TRUE + PARENT_SCOPE + ) + return() + endif() + + # First try pkg-config (most reliable method) + find_package(PkgConfig QUIET) + if(PKG_CONFIG_FOUND) + pkg_check_modules(PC_PMIx QUIET pmix) + endif() + + # Find include directory + find_path( + PMIx_INCLUDE_DIR + NAMES pmix.h + HINTS ${PC_PMIx_INCLUDEDIR} ${PC_PMIx_INCLUDE_DIRS} ${PMIx_ROOT}/include $ENV{PMIx_ROOT}/include + $ENV{PMIX_ROOT}/include ${SLURM_ROOT}/include $ENV{SLURM_ROOT}/include + PATHS /usr/include /usr/local/include /opt/pmix/include /usr/include/slurm + /usr/local/include/slurm + ) + + # Find library + find_library( + PMIx_LIBRARY + NAMES pmix + HINTS ${PC_PMIx_LIBDIR} + ${PC_PMIx_LIBRARY_DIRS} + ${PMIx_ROOT}/lib + ${PMIx_ROOT}/lib64 + $ENV{PMIx_ROOT}/lib + $ENV{PMIx_ROOT}/lib64 + $ENV{PMIX_ROOT}/lib + $ENV{PMIX_ROOT}/lib64 + ${SLURM_ROOT}/lib + ${SLURM_ROOT}/lib64 + $ENV{SLURM_ROOT}/lib + $ENV{SLURM_ROOT}/lib64 + PATHS /usr/lib /usr/lib64 /usr/local/lib /usr/local/lib64 /opt/pmix/lib /opt/pmix/lib64 + ) + + # Get version from header if found + if(PMIx_INCLUDE_DIR) + _pmix_extract_version("${PMIx_INCLUDE_DIR}") + endif() + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args( + PMIx + REQUIRED_VARS PMIx_LIBRARY PMIx_INCLUDE_DIR + VERSION_VAR PMIx_VERSION + ) + + if(PMIx_FOUND) + _pmix_create_target("${PMIx_LIBRARY}" "${PMIx_INCLUDE_DIR}") + endif() + + # Export results to parent scope + set(PMIx_FOUND + ${PMIx_FOUND} + PARENT_SCOPE + ) + if(DEFINED PMIx_VERSION) + set(PMIx_VERSION + ${PMIx_VERSION} + PARENT_SCOPE + ) + endif() + set(PMIx_INCLUDE_DIRS + ${PMIx_INCLUDE_DIR} + PARENT_SCOPE + ) + set(PMIx_LIBRARIES + ${PMIx_LIBRARY} + PARENT_SCOPE + ) +endfunction() diff --git a/conda/environments/all_cuda-129_arch-aarch64.yaml b/conda/environments/all_cuda-129_arch-aarch64.yaml index d15e47a02..fe15de305 100644 --- a/conda/environments/all_cuda-129_arch-aarch64.yaml +++ b/conda/environments/all_cuda-129_arch-aarch64.yaml @@ -31,6 +31,7 @@ dependencies: - ipython - libcudf==26.4.*,>=0.0.0a0 - libnuma +- libpmix-devel >=5.0,<6.0 - librmm==26.4.*,>=0.0.0a0 - libucxx==0.49.*,>=0.0.0a0 - make diff --git a/conda/environments/all_cuda-129_arch-x86_64.yaml b/conda/environments/all_cuda-129_arch-x86_64.yaml index 9c64f5c5c..bfd1ee279 100644 --- a/conda/environments/all_cuda-129_arch-x86_64.yaml +++ b/conda/environments/all_cuda-129_arch-x86_64.yaml @@ -31,6 +31,7 @@ dependencies: - ipython - libcudf==26.4.*,>=0.0.0a0 - libnuma +- libpmix-devel >=5.0,<6.0 - librmm==26.4.*,>=0.0.0a0 - libucxx==0.49.*,>=0.0.0a0 - make diff --git a/conda/environments/all_cuda-131_arch-aarch64.yaml b/conda/environments/all_cuda-131_arch-aarch64.yaml index b588df7c1..078dda33d 100644 --- a/conda/environments/all_cuda-131_arch-aarch64.yaml +++ b/conda/environments/all_cuda-131_arch-aarch64.yaml @@ -31,6 +31,7 @@ dependencies: - ipython - libcudf==26.4.*,>=0.0.0a0 - libnuma +- libpmix-devel >=5.0,<6.0 - librmm==26.4.*,>=0.0.0a0 - libucxx==0.49.*,>=0.0.0a0 - make diff --git a/conda/environments/all_cuda-131_arch-x86_64.yaml b/conda/environments/all_cuda-131_arch-x86_64.yaml index d55c6835f..979757ec2 100644 --- a/conda/environments/all_cuda-131_arch-x86_64.yaml +++ b/conda/environments/all_cuda-131_arch-x86_64.yaml @@ -31,6 +31,7 @@ dependencies: - ipython - libcudf==26.4.*,>=0.0.0a0 - libnuma +- libpmix-devel >=5.0,<6.0 - librmm==26.4.*,>=0.0.0a0 - libucxx==0.49.*,>=0.0.0a0 - make diff --git a/conda/recipes/librapidsmpf/recipe.yaml b/conda/recipes/librapidsmpf/recipe.yaml index d4a48ac68..b6de7059d 100644 --- a/conda/recipes/librapidsmpf/recipe.yaml +++ b/conda/recipes/librapidsmpf/recipe.yaml @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 schema_version: 1 @@ -79,6 +79,7 @@ cache: - cuda-nvml-dev - libcudf =${{ minor_version }} - libnuma + - libpmix-devel >=5.0,<6.0 - librmm =${{ minor_version }} - openmpi >=5.0 # See - ucxx ${{ ucxx_version }} @@ -115,6 +116,7 @@ outputs: - cuda-cupti-dev - cuda-nvml-dev - libcudf =${{ minor_version }} + - libpmix-devel >=5.0,<6.0 - openmpi >=5.0 - ucxx ${{ ucxx_version }} run: @@ -123,6 +125,7 @@ outputs: - cuda-cupti - librmm =${{ minor_version }} - libcudf =${{ minor_version }} + - libpmix >=5.0,<6.0 - openmpi >=5.0 # See - ucxx ${{ ucxx_version }} ignore_run_exports: diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 61af23d54..62e3c1581 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -46,6 +46,7 @@ option(BUILD_UCXX_SUPPORT "Build RapidsMPF with UCXX support" ON) option(BUILD_STREAMING_SUPPORT "Build RapidsMPF with streaming support" ON) option(BUILD_CUPTI_SUPPORT "Build RapidsMPF with CUPTI support" OFF) option(BUILD_NUMA_SUPPORT "Build RapidsMPF with NUMA support" ON) +option(BUILD_SLURM_SUPPORT "Build RapidsMPF with Slurm/PMIx bootstrap support" ON) option(BUILD_TESTS "Configure CMake to build tests" ON) option(BUILD_BENCHMARKS "Configure CMake to build benchmarks" ON) option(BUILD_EXAMPLES "Configure CMake to build examples" ON) @@ -62,6 +63,7 @@ message(STATUS " BUILD_UCXX_SUPPORT : ${BUILD_UCXX_SUPPORT}") message(STATUS " BUILD_STREAMING_SUPPORT : ${BUILD_STREAMING_SUPPORT}") message(STATUS " BUILD_CUPTI_SUPPORT : ${BUILD_CUPTI_SUPPORT}") message(STATUS " BUILD_NUMA_SUPPORT : ${BUILD_NUMA_SUPPORT}") +message(STATUS " BUILD_SLURM_SUPPORT : ${BUILD_SLURM_SUPPORT}") message(STATUS " BUILD_TESTS : ${BUILD_TESTS}") message(STATUS " BUILD_BENCHMARKS : ${BUILD_BENCHMARKS}") message(STATUS " BUILD_EXAMPLES : ${BUILD_EXAMPLES}") @@ -77,6 +79,7 @@ set(RAPIDSMPF_HAVE_UCXX ${BUILD_UCXX_SUPPORT}) set(RAPIDSMPF_HAVE_STREAMING ${BUILD_STREAMING_SUPPORT}) set(RAPIDSMPF_HAVE_CUPTI ${BUILD_CUPTI_SUPPORT}) set(RAPIDSMPF_HAVE_NUMA ${BUILD_NUMA_SUPPORT}) +set(RAPIDSMPF_HAVE_SLURM OFF) # Will be set to ON if PMIx is found set(RAPIDSMPF_BUILD_TESTS ${BUILD_TESTS}) set(RAPIDSMPF_BUILD_BENCHMARKS ${BUILD_BENCHMARKS}) set(RAPIDSMPF_BUILD_EXAMPLES ${BUILD_EXAMPLES}) @@ -156,6 +159,18 @@ endif() if(RAPIDSMPF_HAVE_STREAMING) include(../cmake/thirdparty/get_libcoro.cmake) endif() +if(BUILD_SLURM_SUPPORT) + include(../cmake/thirdparty/get_pmix.cmake) + find_and_configure_pmix() + if(PMIx_FOUND) + set(RAPIDSMPF_HAVE_SLURM ON) + message(STATUS "PMIx found (version ${PMIx_VERSION}) - Slurm bootstrap backend enabled") + else() + message(WARNING "PMIx not found - Slurm bootstrap backend will be disabled. " + "Set PMIx_ROOT or PMIX_ROOT to the PMIx installation directory." + ) + endif() +endif() # ################################################################################################## # * library targets -------------------------------------------------------------------------------- @@ -236,6 +251,9 @@ endif() if(RAPIDSMPF_HAVE_CUPTI) target_sources(rapidsmpf PRIVATE src/cupti.cpp) endif() +if(RAPIDSMPF_HAVE_SLURM) + target_sources(rapidsmpf PRIVATE src/bootstrap/slurm_backend.cpp) +endif() set_target_properties( rapidsmpf @@ -300,6 +318,7 @@ target_link_libraries( $<$:numa> $ $<$:CUDA::cupti> + $ $ maybe_asan $ @@ -315,6 +334,7 @@ target_compile_definitions( $<$:RAPIDSMPF_HAVE_STREAMING> $<$:RAPIDSMPF_HAVE_CUPTI> $<$:RAPIDSMPF_HAVE_NUMA> + $<$:RAPIDSMPF_HAVE_SLURM> $<$:RAPIDSMPF_VERBOSE_INFO> ) @@ -434,6 +454,7 @@ string( "set(RAPIDSMPF_HAVE_UCXX [=[${RAPIDSMPF_HAVE_UCXX}]=])" "set(RAPIDSMPF_HAVE_STREAMING [=[${RAPIDSMPF_HAVE_STREAMING}]=])" "set(RAPIDSMPF_HAVE_CUPTI [=[${RAPIDSMPF_HAVE_CUPTI}]=])" + "set(RAPIDSMPF_HAVE_SLURM [=[${RAPIDSMPF_HAVE_SLURM}]=])" ) rapids_export( diff --git a/cpp/benchmarks/bench_comm.cpp b/cpp/benchmarks/bench_comm.cpp index ce5166d88..3924d6820 100644 --- a/cpp/benchmarks/bench_comm.cpp +++ b/cpp/benchmarks/bench_comm.cpp @@ -290,7 +290,7 @@ int main(int argc, char** argv) { if (use_bootstrap) { // Launched with rrun - use bootstrap backend comm = rapidsmpf::bootstrap::create_ucxx_comm( - rapidsmpf::bootstrap::Backend::AUTO, options + rapidsmpf::bootstrap::BackendType::AUTO, options ); } else { // Launched with mpirun - use MPI bootstrap diff --git a/cpp/benchmarks/bench_shuffle.cpp b/cpp/benchmarks/bench_shuffle.cpp index 126d1a32a..224d3c5a5 100644 --- a/cpp/benchmarks/bench_shuffle.cpp +++ b/cpp/benchmarks/bench_shuffle.cpp @@ -614,7 +614,7 @@ int main(int argc, char** argv) { if (use_bootstrap) { // Launched with rrun - use bootstrap backend comm = rapidsmpf::bootstrap::create_ucxx_comm( - rapidsmpf::bootstrap::Backend::AUTO, options + rapidsmpf::bootstrap::BackendType::AUTO, options ); } else { // Launched with mpirun - use MPI bootstrap diff --git a/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp b/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp index 0b73fd763..5b2c4932d 100644 --- a/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp +++ b/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp @@ -335,7 +335,7 @@ int main(int argc, char** argv) { if (use_bootstrap) { // Launched with rrun - use bootstrap backend comm = rapidsmpf::bootstrap::create_ucxx_comm( - rapidsmpf::bootstrap::Backend::AUTO, options + rapidsmpf::bootstrap::BackendType::AUTO, options ); } else { // Launched with mpirun - use MPI bootstrap diff --git a/cpp/benchmarks/streaming/ndsh/utils.cpp b/cpp/benchmarks/streaming/ndsh/utils.cpp index 72c9f0994..a8a5ffdd8 100644 --- a/cpp/benchmarks/streaming/ndsh/utils.cpp +++ b/cpp/benchmarks/streaming/ndsh/utils.cpp @@ -197,7 +197,7 @@ std::shared_ptr create_context( break; case CommType::UCXX: if (bootstrap::is_running_with_rrun()) { - comm = bootstrap::create_ucxx_comm(bootstrap::Backend::AUTO, options); + comm = bootstrap::create_ucxx_comm(bootstrap::BackendType::AUTO, options); } else { mpi::init(nullptr, nullptr); comm = ucxx::init_using_mpi(MPI_COMM_WORLD, options); diff --git a/cpp/include/rapidsmpf/bootstrap/backend.hpp b/cpp/include/rapidsmpf/bootstrap/backend.hpp new file mode 100644 index 000000000..dd7c842be --- /dev/null +++ b/cpp/include/rapidsmpf/bootstrap/backend.hpp @@ -0,0 +1,124 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +#include + +namespace rapidsmpf::bootstrap { + +/** + * @brief Backend types for process coordination and bootstrapping. + */ +enum class BackendType { + /** + * @brief Automatically detect the best backend based on environment. + * + * Detection order: + * 1. File-based (if RAPIDSMPF_COORD_DIR or RAPIDSMPF_ROOT_ADDRESS set by rrun) + * 2. Slurm/PMIx (if SLURM environment detected) + * 3. File-based (default fallback) + */ + AUTO, + + /** + * @brief File-based coordination using a shared directory. + * + * Uses filesystem for rank coordination and address exchange. Works on single-node + * and multi-node with shared storage (e.g., NFS) via SSH. Requires RAPIDSMPF_RANK, + * RAPIDSMPF_NRANKS, RAPIDSMPF_COORD_DIR environment variables. + */ + FILE, + + /** + * @brief Slurm-based coordination using PMIx. + * + * Uses PMIx (Process Management Interface for Exascale) for scalable process + * coordination without requiring a shared filesystem. Designed for Slurm clusters + * and supports multi-node deployments. + * + * Run with: `srun --mpi=pmix -n ./program` + * + * Environment variables (automatically set by Slurm): + * - PMIX_NAMESPACE: PMIx namespace identifier + * - SLURM_PROCID: Process rank + * - SLURM_NPROCS/SLURM_NTASKS: Total number of processes + */ + SLURM, +}; + +namespace detail { + +/** + * @brief Abstract interface for bootstrap coordination backends. + * + * This interface defines the common operations that all backend implementations + * must support. Backend instances are stored in Context and reused across + * multiple operations to preserve state. + */ +class Backend { + public: + virtual ~Backend() = default; + + /** + * @brief Store a key-value pair. + * + * The key-value pair is committed immediately and made visible to other + * ranks after a collective `sync()`. + * + * @param key Key name. + * @param value Value to store. + */ + virtual void put(std::string const& key, std::string const& value) = 0; + + /** + * @brief Retrieve a value, blocking until available or timeout occurs. + * + * @param key Key name. + * @param timeout Timeout duration. + * @return Value associated with key. + * + * @throws std::runtime_error if key not found within timeout. + */ + virtual std::string get(std::string const& key, Duration timeout) = 0; + + /** + * @brief Perform a barrier synchronization. + * + * All ranks must call this before any rank proceeds. + */ + virtual void barrier() = 0; + + /** + * @brief Ensure all previous put() operations are globally visible. + */ + virtual void sync() = 0; + + /** + * @brief Broadcast data from root to all ranks. + * + * @param data Data buffer (input on root, output on other ranks). + * @param size Size in bytes. + * @param root Root rank. + * + * @throws std::runtime_error if broadcast fails or size mismatch occurs. + */ + virtual void broadcast(void* data, std::size_t size, Rank root) = 0; + + // Non-copyable, non-movable (backends manage resources) + Backend(Backend const&) = delete; + Backend& operator=(Backend const&) = delete; + Backend(Backend&&) = delete; + Backend& operator=(Backend&&) = delete; + + protected: + Backend() = default; +}; + +} // namespace detail +} // namespace rapidsmpf::bootstrap diff --git a/cpp/include/rapidsmpf/bootstrap/bootstrap.hpp b/cpp/include/rapidsmpf/bootstrap/bootstrap.hpp index 11cf0d7ae..5bba00a17 100644 --- a/cpp/include/rapidsmpf/bootstrap/bootstrap.hpp +++ b/cpp/include/rapidsmpf/bootstrap/bootstrap.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ @@ -11,32 +11,11 @@ #include #include +#include #include namespace rapidsmpf::bootstrap { -/** - * @brief Backend types for process coordination and bootstrapping. - */ -enum class Backend { - /** - * @brief Automatically detect the best backend based on environment. - * - * Detection order: - * 1. File-based (default fallback) - */ - AUTO, - - /** - * @brief File-based coordination using a shared directory. - * - * Uses filesystem for rank coordination and address exchange. Works on single-node - * and multi-node with shared storage (e.g., NFS) via SSH. Requires RAPIDSMPF_RANK, - * RAPIDSMPF_NRANKS, RAPIDSMPF_COORD_DIR environment variables. - */ - FILE, -}; - /** * @brief Context information for the current process/rank. * @@ -50,11 +29,14 @@ struct Context { /** @brief Total number of ranks in the job. */ Rank nranks; - /** @brief Backend used for coordination. */ - Backend backend; + /** @brief Backend type used for coordination. */ + BackendType type; /** @brief Coordination directory (for FILE backend). */ std::optional coord_dir; + + /** @brief Backend implementation (internal, do not access directly). */ + std::shared_ptr backend; }; /** @@ -68,7 +50,7 @@ struct Context { * - RAPIDSMPF_NRANKS: Explicitly set total rank count * - RAPIDSMPF_COORD_DIR: File-based coordination directory * - * @param backend Backend to use (default: AUTO for auto-detection). + * @param type Backend type to use (default: AUTO for auto-detection). * @return Context object containing rank and coordination information. * @throws std::runtime_error if environment is not properly configured. * @@ -77,7 +59,7 @@ struct Context { * std::cout << "I am rank " << ctx.rank << " of " << ctx.nranks << std::endl; * @endcode */ -Context init(Backend backend = Backend::AUTO); +Context init(BackendType type = BackendType::AUTO); /** * @brief Broadcast data from root rank to all other ranks. @@ -101,6 +83,20 @@ void broadcast(Context const& ctx, void* data, std::size_t size, Rank root = 0); */ void barrier(Context const& ctx); +/** + * @brief Ensure all previous put() operations are globally visible. + * + * Different backends have different visibility semantics for put() operations: + * - Slurm/PMIx: Requires explicit fence (PMIx_Fence) to make data visible across nodes. + * - FILE: put() operations are immediately visible via atomic filesystem operations. + * + * This function abstracts these differences. Call sync() after put() operations + * to ensure data is visible to other ranks before they attempt get(). + * + * @param ctx Bootstrap context. + */ +void sync(Context const& ctx); + /** * @brief Store a key-value pair in the coordination backend. * diff --git a/cpp/include/rapidsmpf/bootstrap/file_backend.hpp b/cpp/include/rapidsmpf/bootstrap/file_backend.hpp index 3805ad798..1c25ba4dc 100644 --- a/cpp/include/rapidsmpf/bootstrap/file_backend.hpp +++ b/cpp/include/rapidsmpf/bootstrap/file_backend.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ @@ -8,6 +8,7 @@ #include #include +#include #include namespace rapidsmpf::bootstrap::detail { @@ -29,7 +30,7 @@ namespace rapidsmpf::bootstrap::detail { * └── barrier_ # Barrier synchronization * ``` */ -class FileBackend { +class FileBackend : public Backend { public: /** * @brief Construct a file backend. @@ -38,40 +39,32 @@ class FileBackend { */ explicit FileBackend(Context ctx); - ~FileBackend(); + ~FileBackend() override; /** - * @brief Store a key-value pair. - * - * @param key Key name. - * @param value Value to store. + * @copydoc Backend::put */ - void put(std::string const& key, std::string const& value); + void put(std::string const& key, std::string const& value) override; /** - * @brief Retrieve a value, blocking until available or timeout occurs. - * - * @param key Key name. - * @param timeout Timeout duration. - * @return Value associated with key. + * @copydoc Backend::get */ - std::string get(std::string const& key, Duration timeout); + std::string get(std::string const& key, Duration timeout) override; /** - * @brief Perform a barrier synchronization. - * - * All ranks must call this before any rank proceeds. + * @copydoc Backend::barrier */ - void barrier(); + void barrier() override; /** - * @brief Broadcast data from root to all ranks. - * - * @param data Data buffer. - * @param size Size in bytes. - * @param root Root rank. + * @copydoc Backend::sync + */ + void sync() override; + + /** + * @copydoc Backend::broadcast */ - void broadcast(void* data, std::size_t size, Rank root); + void broadcast(void* data, std::size_t size, Rank root) override; private: Context ctx_; diff --git a/cpp/include/rapidsmpf/bootstrap/slurm_backend.hpp b/cpp/include/rapidsmpf/bootstrap/slurm_backend.hpp new file mode 100644 index 000000000..6550b90c0 --- /dev/null +++ b/cpp/include/rapidsmpf/bootstrap/slurm_backend.hpp @@ -0,0 +1,145 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#ifdef RAPIDSMPF_HAVE_SLURM + +#include +#include +#include + +#include + +#include +#include + +namespace rapidsmpf::bootstrap::detail { + +/** + * @brief Slurm-based coordination backend using PMIx. + * + * This class implements coordination using PMIx (Process Management Interface + * for Exascale), which provides scalable process coordination without requiring + * a shared filesystem. It is designed for Slurm clusters and supports multi-node + * deployments. + * + * Usage: + * ```bash + * # Passthrough: multiple (4) tasks per node, one task per GPU, two nodes. + * srun \ + * --mpi=pmix \ + * --nodes=2 \ + * --ntasks-per-node=4 \ + * --cpus-per-task=36 \ + * --gpus-per-task=1 \ + * --gres=gpu:4 \ + * rrun ./benchmarks/bench_shuffle -C ucxx + * + * # Hybrid mode: one task per node, 4 GPUs per task, two nodes. + * srun \ + * --mpi=pmix \ + * --nodes=2 \ + * --ntasks-per-node=1 \ + * --cpus-per-task=144 \ + * --gpus-per-task=4 \ + * --gres=gpu:4 \ + * rrun -n 4 ./benchmarks/bench_shuffle -C ucxx + * ``` + */ +class SlurmBackend : public Backend { + public: + /** + * @brief Construct a Slurm backend using PMIx. + * + * Initializes PMIx and retrieves process information from the runtime. + * + * @param ctx Bootstrap context containing rank information. + * + * @throws std::runtime_error if PMIx initialization fails. + */ + explicit SlurmBackend(Context ctx); + + /** + * @brief Destructor that finalizes PMIx. + */ + ~SlurmBackend() override; + + // Non-copyable, non-movable (PMIx state is process-global) + SlurmBackend(SlurmBackend const&) = delete; + SlurmBackend& operator=(SlurmBackend const&) = delete; + SlurmBackend(SlurmBackend&&) = delete; + SlurmBackend& operator=(SlurmBackend&&) = delete; + + /** + * @copydoc Backend::put() + * + * @throws std::runtime_error if PMIx operation fails. + */ + void put(std::string const& key, std::string const& value) override; + + /** + * @copydoc Backend::get() + */ + std::string get(std::string const& key, Duration timeout) override; + + /** + * @copydoc Backend::barrier() + * + * @throws std::runtime_error if PMIx_Fence fails. + */ + void barrier() override; + + /** + * @copydoc Backend::sync() + * + * @throws std::runtime_error if PMIx_Fence fails. + */ + void sync() override; + + /** + * @copydoc Backend::broadcast() + * + * @throws std::runtime_error if PMIx operation fails. + */ + void broadcast(void* data, std::size_t size, Rank root) override; + + /** + * @brief Explicitly finalize the global PMIx session. + * + * This is useful for scenarios like rrun parent coordination where PMIx + * needs to be finalized before process exit (e.g., after child processes + * complete). If not called explicitly, PMIx will be finalized when the + * process exits via the PmixGlobalState destructor. + * + * This function is safe to call multiple times, subsequent calls are no-ops. + * + * @throws std::runtime_error if PMIx_Finalize fails. + */ + static void finalize_pmix(); + + private: + Context ctx_; + std::size_t barrier_count_{0}; + bool pmix_initialized_{false}; + pmix_proc_t proc_{}; ///< PMIx process identifier + std::array nspace_{}; ///< PMIx namespace (job identifier) + + /** + * @brief Commit local key-value pairs to make them visible. + * + * Must be called after put() operations. The subsequent fence() + * or barrier() will make the data globally visible. + * + * @throws std::runtime_error if PMIx_Commit fails. + */ + void commit(); +}; + +} // namespace rapidsmpf::bootstrap::detail + +#endif // RAPIDSMPF_HAVE_SLURM diff --git a/cpp/include/rapidsmpf/bootstrap/ucxx.hpp b/cpp/include/rapidsmpf/bootstrap/ucxx.hpp index 636748947..18f7faac5 100644 --- a/cpp/include/rapidsmpf/bootstrap/ucxx.hpp +++ b/cpp/include/rapidsmpf/bootstrap/ucxx.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ @@ -44,7 +44,7 @@ namespace bootstrap { * @endcode */ std::shared_ptr create_ucxx_comm( - Backend backend = Backend::AUTO, config::Options options = config::Options{} + BackendType type = BackendType::AUTO, config::Options options = config::Options{} ); } // namespace bootstrap diff --git a/cpp/include/rapidsmpf/bootstrap/utils.hpp b/cpp/include/rapidsmpf/bootstrap/utils.hpp index f71374e5c..a42460914 100644 --- a/cpp/include/rapidsmpf/bootstrap/utils.hpp +++ b/cpp/include/rapidsmpf/bootstrap/utils.hpp @@ -5,13 +5,37 @@ #pragma once +#include #include +#include #include #include namespace rapidsmpf::bootstrap { +/** + * @brief Get environment variable as optional string. + * + * Retrieves the value of an environment variable by name, returning it as + * std::optional. Returns std::nullopt if the variable is not set. + * + * @param name Name of the environment variable to retrieve. + * @return Value of the environment variable, or std::nullopt if not set. + */ +std::optional getenv_optional(std::string_view name); + +/** + * @brief Parse integer from environment variable. + * + * Retrieves an environment variable and parses it as an integer. + * + * @param name Name of the environment variable to retrieve. + * @return Parsed integer value, or std::nullopt if not set. + * @throws std::runtime_error if the variable is set but cannot be parsed as an integer. + */ +std::optional getenv_int(std::string_view name); + /** * @brief Get current CPU affinity as a string. * @@ -55,24 +79,44 @@ int get_gpu_id(); bool is_running_with_rrun(); /** - * @brief Get the current `rrun` rank. + * @brief Check if the current process is running under Slurm with PMIx. + * + * This helper detects Slurm environment by checking for PMIx namespace + * or Slurm job step environment variables. + * + * @return true if running under Slurm with PMIx, false otherwise. + */ +bool is_running_with_slurm(); + +/** + * @brief Get the current bootstrap rank. * - * This helper retrieves the rank of the current process when running with `rrun`. - * The rank is fetched from the `RAPIDSMPF_RANK` environment variable. + * This helper retrieves the rank of the current process when running with a + * bootstrap launcher (rrun or Slurm). Checks environment variables in order: + * 1. RAPIDSMPF_RANK (set by rrun) + * 2. PMIX_RANK (set by PMIx) + * 3. SLURM_PROCID (set by Slurm) * - * @return Rank of the current process (>= 0) if found, -1 otherwise. + * @return Rank of the current process. + * + * @throws std::runtime_error if not running with a bootstrap launcher or if + * the environment variable cannot be parsed. */ Rank get_rank(); /** - * @brief Get the number of `rrun` ranks. + * @brief Get the number of bootstrap ranks. * - * This helper retrieves the number of ranks when running with `rrun`. - * The number of ranks is fetched from the `RAPIDSMPF_NRANKS` environment variable. + * This helper retrieves the number of ranks when running with a bootstrap + * launcher (rrun or Slurm). Checks environment variables in order: + * 1. RAPIDSMPF_NRANKS (set by rrun) + * 2. SLURM_NPROCS (set by Slurm) + * 3. SLURM_NTASKS (set by Slurm) * * @return Number of ranks. - * @throws std::runtime_error if not running with `rrun` or if `RAPIDSMPF_NRANKS` is not - * set or cannot be parsed. + * + * @throws std::runtime_error if not running with a bootstrap launcher or if + * the environment variable cannot be parsed. */ Rank get_nranks(); diff --git a/cpp/src/bootstrap/bootstrap.cpp b/cpp/src/bootstrap/bootstrap.cpp index a2ad0f7d3..85a62ed16 100644 --- a/cpp/src/bootstrap/bootstrap.cpp +++ b/cpp/src/bootstrap/bootstrap.cpp @@ -9,6 +9,12 @@ #include #include +#include +#include + +#ifdef RAPIDSMPF_HAVE_SLURM +#include +#endif // NOTE: Do not use RAPIDSMPF_EXPECTS or RAPIDSMPF_FAIL in this file. // Using these macros introduces a CUDA dependency via rapidsmpf/error.hpp. @@ -18,154 +24,188 @@ namespace rapidsmpf::bootstrap { namespace { /** - * @brief Get environment variable as string. + * @brief Detect backend from environment variables. */ -std::optional getenv_optional(std::string_view name) { - // std::getenv requires a null-terminated string; construct a std::string - // to ensure this even when called with a non-literal std::string_view. - char const* value = std::getenv(std::string{name}.c_str()); - if (value == nullptr) { - return std::nullopt; - } - return std::string{value}; +BackendType detect_backend() { + // Check for rrun coordination first (explicit configuration takes priority). + // If RAPIDSMPF_COORD_DIR or RAPIDSMPF_ROOT_ADDRESS is set, rrun is coordinating + // and we should use FILE backend (with or without pre-coordinated address). + if (getenv_optional("RAPIDSMPF_COORD_DIR") + || getenv_optional("RAPIDSMPF_ROOT_ADDRESS")) + { + return BackendType::FILE; + } + +#ifdef RAPIDSMPF_HAVE_SLURM + // Check for Slurm-specific environment variables ONLY if rrun is NOT coordinating. + // This allows direct use of Slurm/PMIx backend when NOT launched via rrun. + // Note: We don't check PMIX_NAMESPACE alone because OpenMPI also uses PMIx + // internally and sets PMIX_NAMESPACE when launched with mpirun. + // SLURM_JOB_ID + SLURM_PROCID is specific to Slurm srun tasks. + // + // Important: This path should only be taken by Slurm parent processes that are + // NOT launched by rrun. Child processes launched by rrun will have RAPIDSMPF_* + // variables set and will use FILE backend above. + if (is_running_with_slurm()) { + return BackendType::SLURM; + } +#endif + + // Default to file-based + return BackendType::FILE; } /** - * @brief Parse integer from environment variable. + * @brief Initialize context for FILE backend. */ -std::optional getenv_int(std::string_view name) { - auto value = getenv_optional(name); - if (!value) { - return std::nullopt; +Context file_backend_init() { + Context ctx; + ctx.type = BackendType::FILE; + + // Require explicit RAPIDSMPF_RANK and RAPIDSMPF_NRANKS + auto rank_opt = getenv_int("RAPIDSMPF_RANK"); + auto nranks_opt = getenv_int("RAPIDSMPF_NRANKS"); + auto coord_dir_opt = getenv_optional("RAPIDSMPF_COORD_DIR"); + + if (!rank_opt.has_value()) { + throw std::runtime_error( + "RAPIDSMPF_RANK environment variable not set. " + "Set it or use a launcher like 'rrun'." + ); } - try { - return std::stoi(*value); - } catch (...) { + + if (!nranks_opt.has_value()) { throw std::runtime_error( - std::string{"Failed to parse integer from environment variable "} - + std::string{name} + ": " + *value + "RAPIDSMPF_NRANKS environment variable not set. " + "Set it or use a launcher like 'rrun'." ); } + + if (!coord_dir_opt.has_value()) { + throw std::runtime_error( + "RAPIDSMPF_COORD_DIR environment variable not set. " + "Set it or use a launcher like 'rrun'." + ); + } + + ctx.rank = static_cast(*rank_opt); + ctx.nranks = static_cast(*nranks_opt); + ctx.coord_dir = *coord_dir_opt; + + if (!(ctx.rank >= 0 && ctx.rank < ctx.nranks)) { + throw std::runtime_error( + "Invalid rank: RAPIDSMPF_RANK=" + std::to_string(ctx.rank) + + " must be in range [0, " + std::to_string(ctx.nranks) + ")" + ); + } + + return ctx; } +#ifdef RAPIDSMPF_HAVE_SLURM /** - * @brief Detect backend from environment variables. + * @brief Initialize context for SLURM backend. */ -Backend detect_backend() { - // Check for file-based coordination - if (getenv_optional("RAPIDSMPF_COORD_DIR")) { - return Backend::FILE; +Context slurm_backend_init() { + Context ctx; + ctx.type = BackendType::SLURM; + + try { + ctx.rank = get_rank(); + } catch (const std::runtime_error& e) { + throw std::runtime_error( + "Could not determine rank for Slurm backend. " + "Ensure you're running with 'srun --mpi=pmix'." + ); } - // Default to file-based - return Backend::FILE; + try { + ctx.nranks = get_nranks(); + } catch (const std::runtime_error& e) { + throw std::runtime_error( + "Could not determine nranks for Slurm backend. " + "Ensure you're running with 'srun --mpi=pmix'." + ); + } + + if (!(ctx.rank >= 0 && ctx.rank < ctx.nranks)) { + throw std::runtime_error( + "Invalid rank: " + std::to_string(ctx.rank) + " must be in range [0, " + + std::to_string(ctx.nranks) + ")" + ); + } + + return ctx; } +#endif } // namespace -Context init(Backend backend) { +Context init(BackendType type) { + if (type == BackendType::AUTO) { + type = detect_backend(); + } + Context ctx; - ctx.backend = (backend == Backend::AUTO) ? detect_backend() : backend; - - // Get rank and nranks based on backend - switch (ctx.backend) { - case Backend::FILE: - { - // Require explicit RAPIDSMPF_RANK and RAPIDSMPF_NRANKS - auto rank_opt = getenv_int("RAPIDSMPF_RANK"); - auto nranks_opt = getenv_int("RAPIDSMPF_NRANKS"); - auto coord_dir_opt = getenv_optional("RAPIDSMPF_COORD_DIR"); - - if (!rank_opt.has_value()) { - throw std::runtime_error( - "RAPIDSMPF_RANK environment variable not set. " - "Set it or use a launcher like 'rrun'." - ); - } - - if (!nranks_opt.has_value()) { - throw std::runtime_error( - "RAPIDSMPF_NRANKS environment variable not set. " - "Set it or use a launcher like 'rrun'." - ); - } - - if (!coord_dir_opt.has_value()) { - throw std::runtime_error( - "RAPIDSMPF_COORD_DIR environment variable not set. " - "Set it or use a launcher like 'rrun'." - ); - } - - ctx.rank = static_cast(*rank_opt); - ctx.nranks = static_cast(*nranks_opt); - ctx.coord_dir = *coord_dir_opt; - - if (!(ctx.rank >= 0 && ctx.rank < ctx.nranks)) { - throw std::runtime_error( - "Invalid rank: RAPIDSMPF_RANK=" + std::to_string(ctx.rank) - + " must be in range [0, " + std::to_string(ctx.nranks) + ")" - ); - } - break; - } - case Backend::AUTO: - { - // Should have been resolved above - throw std::logic_error("Backend::AUTO should have been resolved"); - } + + // Get rank and nranks based on backend, then create backend instance + switch (type) { + case BackendType::FILE: + ctx = file_backend_init(); + ctx.backend = std::make_shared(ctx); + break; +#ifdef RAPIDSMPF_HAVE_SLURM + case BackendType::SLURM: + ctx = slurm_backend_init(); + ctx.backend = std::make_shared(ctx); + break; +#else + case BackendType::SLURM: + throw std::runtime_error( + "SLURM backend requested but rapidsmpf was not built with PMIx support. " + "Rebuild with RAPIDSMPF_ENABLE_SLURM=ON and ensure PMIx is available." + ); +#endif + case BackendType::AUTO: + // Should have been resolved above + throw std::logic_error("BackendType::AUTO should have been resolved"); } + return ctx; } void broadcast(Context const& ctx, void* data, std::size_t size, Rank root) { - switch (ctx.backend) { - case Backend::FILE: - { - detail::FileBackend backend{ctx}; - backend.broadcast(data, size, root); - break; - } - default: - throw std::runtime_error("broadcast not implemented for this backend"); + if (!ctx.backend) { + throw std::runtime_error("Context not properly initialized - backend is null"); } + ctx.backend->broadcast(data, size, root); } void barrier(Context const& ctx) { - switch (ctx.backend) { - case Backend::FILE: - { - detail::FileBackend backend{ctx}; - backend.barrier(); - break; - } - default: - throw std::runtime_error("barrier not implemented for this backend"); + if (!ctx.backend) { + throw std::runtime_error("Context not properly initialized - backend is null"); + } + ctx.backend->barrier(); +} + +void sync(Context const& ctx) { + if (!ctx.backend) { + throw std::runtime_error("Context not properly initialized - backend is null"); } + ctx.backend->sync(); } void put(Context const& ctx, std::string const& key, std::string const& value) { - switch (ctx.backend) { - case Backend::FILE: - { - detail::FileBackend backend{ctx}; - backend.put(key, value); - break; - } - default: - throw std::runtime_error("put not implemented for this backend"); + if (!ctx.backend) { + throw std::runtime_error("Context not properly initialized - backend is null"); } + ctx.backend->put(key, value); } std::string get(Context const& ctx, std::string const& key, Duration timeout) { - switch (ctx.backend) { - case Backend::FILE: - { - detail::FileBackend backend{ctx}; - return backend.get(key, timeout); - } - default: - throw std::runtime_error("get not implemented for this backend"); + if (!ctx.backend) { + throw std::runtime_error("Context not properly initialized - backend is null"); } + return ctx.backend->get(key, timeout); } } // namespace rapidsmpf::bootstrap diff --git a/cpp/src/bootstrap/file_backend.cpp b/cpp/src/bootstrap/file_backend.cpp index c91a3fbce..5525162cf 100644 --- a/cpp/src/bootstrap/file_backend.cpp +++ b/cpp/src/bootstrap/file_backend.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ @@ -108,6 +108,12 @@ void FileBackend::barrier() { std::filesystem::remove(my_barrier_file, ec); } +void FileBackend::sync() { + // For FileBackend, this is a no-op since put() operations use atomic + // file writes that are immediately visible to all processes via the + // shared filesystem. +} + void FileBackend::broadcast(void* data, std::size_t size, Rank root) { if (ctx_.rank == root) { // Root writes data diff --git a/cpp/src/bootstrap/slurm_backend.cpp b/cpp/src/bootstrap/slurm_backend.cpp new file mode 100644 index 000000000..b94b1fb6a --- /dev/null +++ b/cpp/src/bootstrap/slurm_backend.cpp @@ -0,0 +1,313 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#ifdef RAPIDSMPF_HAVE_SLURM + +#include +#include +#include +#include +#include +#include + +#include + +// NOTE: Do not use RAPIDSMPF_EXPECTS or RAPIDSMPF_FAIL in this file. +// Using these macros introduces a CUDA dependency via rapidsmpf/error.hpp. +// Prefer throwing standard exceptions instead. + +namespace rapidsmpf::bootstrap::detail { + +namespace { + +/** + * @brief Process-global PMIx state and lifecycle management. + * + * PMIx initialization is process-global and must only happen once. + * This singleton encapsulates all PMIx state and ensures proper cleanup + * when the process exits, without breaking ongoing collective operations. + */ +class PmixGlobalState { + public: + std::mutex mutex; + bool initialized{false}; + pmix_proc_t proc{}; + std::array nspace{}; + + /** + * @brief Get the singleton instance. + */ + static PmixGlobalState& instance() { + static PmixGlobalState state; + return state; + } + + /** + * @brief Explicitly finalize PMIx session. + * + * Safe to call multiple times, subsequent calls are no-ops. + */ + void finalize() { + std::lock_guard lock{mutex}; + if (initialized) { + PMIx_Finalize(nullptr, 0); + initialized = false; + } + } + + PmixGlobalState(PmixGlobalState const&) = delete; + PmixGlobalState& operator=(PmixGlobalState const&) = delete; + PmixGlobalState(PmixGlobalState&&) = delete; + PmixGlobalState& operator=(PmixGlobalState&&) = delete; + + private: + PmixGlobalState() = default; + + /** + * @brief Destructor ensuring PMIx finalization only at program exit. + */ + ~PmixGlobalState() { + if (initialized) { + PMIx_Finalize(nullptr, 0); + initialized = false; + } + } +}; + +/** + * @brief Convert PMIx status to string for error messages. + * + * @param status PMIx status code to convert. + * @return Human-readable string describing the status. + */ +std::string pmix_error_string(pmix_status_t status) { + return std::string{PMIx_Error_string(status)}; +} + +/** + * @brief Check PMIx status and throw on error. + * + * @param status PMIx status code to check. + * @param operation Description of the operation (used in error message). + * @throws std::runtime_error if status is not PMIX_SUCCESS. + */ +void check_pmix_status(pmix_status_t status, std::string const& operation) { + if (status != PMIX_SUCCESS) { + throw std::runtime_error(operation + " failed: " + pmix_error_string(status)); + } +} + +/** + * @brief Perform PMIx fence operation across all ranks. + * + * Executes PMIx_Fence with PMIX_COLLECT_DATA to synchronize all ranks + * in the namespace and exchange data. Accepts both PMIX_SUCCESS and + * PMIX_ERR_PARTIAL_SUCCESS as success conditions, since PARTIAL_SUCCESS + * can occur in some PMIx implementations when not all processes have + * data to contribute, but the synchronization succeeded. + * + * @param nspace The PMIx namespace to fence across. + * @param operation_name Name of the operation for error messages (e.g., "barrier", + * "sync"). + * @throws std::runtime_error if the fence operation fails. + */ +void pmix_fence_all( + std::array const& nspace, std::string const& operation_name +) { + pmix_proc_t proc; + PMIX_PROC_CONSTRUCT(&proc); + std::memcpy(proc.nspace, nspace.data(), nspace.size()); + proc.rank = PMIX_RANK_WILDCARD; + + pmix_info_t info; + bool collect = true; + PMIX_INFO_CONSTRUCT(&info); + PMIX_INFO_LOAD(&info, PMIX_COLLECT_DATA, &collect, PMIX_BOOL); + + pmix_status_t rc = PMIx_Fence(&proc, 1, &info, 1); + PMIX_INFO_DESTRUCT(&info); + + if (rc != PMIX_SUCCESS && rc != PMIX_ERR_PARTIAL_SUCCESS) { + throw std::runtime_error( + "PMIx_Fence (" + operation_name + ") failed: " + pmix_error_string(rc) + ); + } +} + +} // namespace + +SlurmBackend::SlurmBackend(Context ctx) : ctx_{std::move(ctx)} { + auto& pmix_state = PmixGlobalState::instance(); + std::lock_guard lock{pmix_state.mutex}; + + if (!pmix_state.initialized) { + pmix_proc_t proc; + pmix_status_t rc = PMIx_Init(&proc, nullptr, 0); + if (rc != PMIX_SUCCESS) { + throw std::runtime_error( + "PMIx_Init failed: " + pmix_error_string(rc) + + ". Ensure you're running under Slurm with --mpi=pmix" + ); + } + + pmix_state.proc = proc; + // Copy full nspace buffer (both are PMIX_MAX_NSLEN + 1 in size) + static_assert(sizeof(proc.nspace) == PMIX_MAX_NSLEN + 1); + std::memcpy(pmix_state.nspace.data(), proc.nspace, pmix_state.nspace.size()); + pmix_state.initialized = true; + } + + pmix_initialized_ = true; + + // Copy global state to instance members + proc_ = pmix_state.proc; + nspace_ = pmix_state.nspace; + + // Verify rank matches what we expect (if context has a valid rank) + // Note: For Slurm backend, ctx_.rank may be set from environment variables + // before PMIx_Init, so we verify they match + if (ctx_.rank >= 0 && std::cmp_not_equal(pmix_state.proc.rank, ctx_.rank)) { + throw std::runtime_error( + "PMIx rank (" + std::to_string(pmix_state.proc.rank) + + ") doesn't match context rank (" + std::to_string(ctx_.rank) + ")" + ); + } + + // Update context rank from PMIx if not already set + if (ctx_.rank < 0) { + ctx_.rank = static_cast(pmix_state.proc.rank); + } +} + +SlurmBackend::~SlurmBackend() { + // PMIx must stay initialized for the lifetime of the process because + // multiple SlurmBackend instances may be created and destroyed during + // bootstrap operations, and finalizing PMIx while other processes are + // still in collective operations (fence/barrier) will cause errors. +} + +void SlurmBackend::put(std::string const& key, std::string const& value) { + pmix_value_t pmix_value; + PMIX_VALUE_CONSTRUCT(&pmix_value); + pmix_value.type = PMIX_BYTE_OBJECT; + pmix_value.data.bo.bytes = const_cast(value.data()); + pmix_value.data.bo.size = value.size(); + + pmix_status_t rc = PMIx_Put(PMIX_GLOBAL, key.c_str(), &pmix_value); + if (rc != PMIX_SUCCESS) { + throw std::runtime_error( + "PMIx_Put for key '" + key + "' failed: " + pmix_error_string(rc) + ); + } + + // Commit to make the data available + commit(); +} + +void SlurmBackend::commit() { + pmix_status_t rc = PMIx_Commit(); + check_pmix_status(rc, "PMIx_Commit"); +} + +std::string SlurmBackend::get(std::string const& key, Duration timeout) { + auto start = std::chrono::steady_clock::now(); + auto poll_interval = std::chrono::milliseconds{100}; + + // Get from rank 0 specifically (since that's where the key is stored) + // Using PMIX_RANK_WILDCARD doesn't seem to work reliably + pmix_proc_t proc; + PMIX_PROC_CONSTRUCT(&proc); + std::memcpy(proc.nspace, nspace_.data(), nspace_.size()); + proc.rank = 0; + + while (true) { + pmix_value_t* val = nullptr; + pmix_status_t rc = PMIx_Get(&proc, key.c_str(), nullptr, 0, &val); + + if (rc == PMIX_SUCCESS && val != nullptr) { + std::string result; + + if (val->type == PMIX_BYTE_OBJECT) { + result = std::string{ + static_cast(val->data.bo.bytes), val->data.bo.size + }; + } else if (val->type == PMIX_STRING) { + result = std::string{val->data.string}; + } else { + PMIX_VALUE_RELEASE(val); + throw std::runtime_error( + "Unexpected PMIx value type for key '" + key + + "': " + std::to_string(static_cast(val->type)) + ); + } + + PMIX_VALUE_RELEASE(val); + return result; + } + + auto elapsed = std::chrono::steady_clock::now() - start; + if (elapsed >= timeout) { + throw std::runtime_error( + "Key '" + key + "' not available within " + + std::to_string( + std::chrono::duration_cast(timeout).count() + ) + + "s timeout (last error: " + pmix_error_string(rc) + ")" + ); + } + + std::this_thread::sleep_for(poll_interval); + } +} + +void SlurmBackend::barrier() { + pmix_fence_all(nspace_, "barrier"); +} + +void SlurmBackend::sync() { + // For Slurm/PMIx backend, this executes PMIx_Fence to make all committed + // key-value pairs visible across all nodes. This is required because + // PMIx_Put + PMIx_Commit only makes data locally visible; PMIx_Fence + // performs the global synchronization and data exchange. + pmix_fence_all(nspace_, "sync"); +} + +void SlurmBackend::broadcast(void* data, std::size_t size, Rank root) { + // Use unique key for each broadcast to avoid collisions + std::string bcast_key = + "bcast_" + std::to_string(root) + "_" + std::to_string(barrier_count_++); + + if (ctx_.rank == root) { + // Root publishes data + std::string bcast_data{static_cast(data), size}; + put(bcast_key, bcast_data); + } + + barrier(); + + if (ctx_.rank != root) { + // Non-root ranks retrieve data + std::string bcast_data = get(bcast_key, std::chrono::seconds{30}); + if (bcast_data.size() != size) { + throw std::runtime_error( + "Broadcast size mismatch: expected " + std::to_string(size) + ", got " + + std::to_string(bcast_data.size()) + ); + } + std::memcpy(data, bcast_data.data(), size); + } + + barrier(); +} + +void SlurmBackend::finalize_pmix() { + PmixGlobalState::instance().finalize(); +} + +} // namespace rapidsmpf::bootstrap::detail + +#endif // RAPIDSMPF_HAVE_SLURM diff --git a/cpp/src/bootstrap/ucxx.cpp b/cpp/src/bootstrap/ucxx.cpp index 6f42aa34a..8fc788d03 100644 --- a/cpp/src/bootstrap/ucxx.cpp +++ b/cpp/src/bootstrap/ucxx.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ @@ -9,40 +9,129 @@ #include #include +#include #include +#include #include #include +#include // for unsetenv #include #include +#include #include #include namespace rapidsmpf::bootstrap { -std::shared_ptr create_ucxx_comm(Backend backend, config::Options options) { - auto ctx = init(backend); +namespace { +// Hex encoding for binary-safe address transmission +std::string hex_encode(std::string const& input) { + static constexpr const char* hex_chars = "0123456789abcdef"; + std::string result; + result.reserve(input.size() * 2); + for (char ch : input) { + auto c = static_cast(ch); + result.push_back(hex_chars[c >> 4]); + result.push_back(hex_chars[c & 0x0F]); + } + return result; +} + +std::string hex_decode(std::string const& input) { + std::string result; + result.reserve(input.size() / 2); + for (size_t i = 0; i < input.size(); i += 2) { + auto high = static_cast( + (input[i] >= 'a') ? (input[i] - 'a' + 10) : (input[i] - '0') + ); + auto low = static_cast( + (input[i + 1] >= 'a') ? (input[i + 1] - 'a' + 10) : (input[i + 1] - '0') + ); + result.push_back(static_cast((high << 4) | low)); + } + return result; +} +} // namespace + +std::shared_ptr create_ucxx_comm(BackendType type, config::Options options) { + auto ctx = init(type); // Ensure CUDA context is created before UCX is initialized cudaFree(nullptr); std::shared_ptr comm; - if (ctx.rank == 0) { - // Create root UCXX communicator + auto precomputed_address_encoded = getenv_optional("RAPIDSMPF_ROOT_ADDRESS"); + auto address_file = getenv_optional("RAPIDSMPF_ROOT_ADDRESS_FILE"); + + // Path 1: Early address mode for root rank in Slurm hybrid mode. + // Rank 0 is launched first to create its address and write it to a file. + // Parent will coordinate with other parents via PMIx, then launch worker ranks + // with RAPIDSMPF_ROOT_ADDRESS set. No PMIx put/barrier/get bootstrap coordination. + if (ctx.rank == 0 && address_file.has_value()) { + auto ucxx_initialized_rank = + ucxx::init(nullptr, ctx.nranks, std::nullopt, options); + comm = std::make_shared(std::move(ucxx_initialized_rank), options); + + auto listener_address = comm->listener_address(); + auto root_worker_address_str = + std::get>(listener_address.address) + ->getString(); + + std::string encoded_address = hex_encode(root_worker_address_str); + std::ofstream addr_file(*address_file); + if (!addr_file) { + throw std::runtime_error( + "Failed to write root address to file: " + *address_file + ); + } + addr_file << encoded_address << std::endl; + addr_file.close(); + + auto verbose = getenv_optional("RAPIDSMPF_VERBOSE"); + if (verbose && *verbose == "1") { + std::cerr << "[rank 0] Wrote address to " << *address_file + << ", skipping bootstrap coordination" << std::endl; + } + + // Unset the flag so rank 0 participates in the final barrier + unsetenv("RAPIDSMPF_ROOT_ADDRESS_FILE"); + } + // Path 2: Slurm hybrid mode for non-root ranks. + // Parent process already coordinated the root address via PMIx and provided it + // via RAPIDSMPF_ROOT_ADDRESS environment variable (hex-encoded). + else if (precomputed_address_encoded.has_value() && ctx.rank != 0) + { + std::string precomputed_address = hex_decode(*precomputed_address_encoded); + auto root_worker_address = ::ucxx::createAddressFromString(precomputed_address); + auto ucxx_initialized_rank = + ucxx::init(nullptr, ctx.nranks, root_worker_address, options); + comm = std::make_shared(std::move(ucxx_initialized_rank), options); + } + // Path 3: Normal bootstrap mode for root rank. + // Create listener and publish address via put() for non-root ranks to retrieve. + else if (ctx.rank == 0) + { auto ucxx_initialized_rank = ucxx::init(nullptr, ctx.nranks, std::nullopt, options); comm = std::make_shared(std::move(ucxx_initialized_rank), options); - // Get the listener address and publish auto listener_address = comm->listener_address(); auto root_worker_address_str = std::get>(listener_address.address) ->getString(); + put(ctx, "ucxx_root_address", root_worker_address_str); - } else { - // Worker ranks retrieve the root address and connect + sync(ctx); + } + // Path 4: Normal bootstrap mode for non-root ranks. + // Retrieve root address via get() and connect. + else + { + sync(ctx); + auto root_worker_address_str = get(ctx, "ucxx_root_address", std::chrono::seconds{30}); auto root_worker_address = @@ -52,7 +141,9 @@ std::shared_ptr create_ucxx_comm(Backend backend, config::Options op ucxx::init(nullptr, ctx.nranks, root_worker_address, options); comm = std::make_shared(std::move(ucxx_initialized_rank), options); } + comm->barrier(); + return comm; } } // namespace rapidsmpf::bootstrap diff --git a/cpp/src/bootstrap/utils.cpp b/cpp/src/bootstrap/utils.cpp index 490362fba..6203b8118 100644 --- a/cpp/src/bootstrap/utils.cpp +++ b/cpp/src/bootstrap/utils.cpp @@ -26,6 +26,31 @@ namespace rapidsmpf::bootstrap { +std::optional getenv_optional(std::string_view name) { + // std::getenv requires a null-terminated string; construct a std::string + // to ensure this even when called with a non-literal std::string_view. + char const* value = std::getenv(std::string{name}.c_str()); + if (value == nullptr) { + return std::nullopt; + } + return std::string{value}; +} + +std::optional getenv_int(std::string_view name) { + auto value = getenv_optional(name); + if (!value) { + return std::nullopt; + } + try { + return std::stoi(*value); + } catch (...) { + throw std::runtime_error( + std::string{"Failed to parse integer from environment variable "} + + std::string{name} + ": " + *value + ); + } +} + std::string get_current_cpu_affinity() { cpu_set_t cpuset; CPU_ZERO(&cpuset); @@ -73,61 +98,56 @@ std::string get_current_cpu_affinity() { } std::string get_ucx_net_devices() { - char* env = std::getenv("UCX_NET_DEVICES"); - return env ? std::string(env) : std::string(); + return getenv_optional("UCX_NET_DEVICES").value_or(""); } int get_gpu_id() { - char* cuda_visible = std::getenv("CUDA_VISIBLE_DEVICES"); + auto cuda_visible = getenv_optional("CUDA_VISIBLE_DEVICES"); if (cuda_visible) { try { - return std::stoi(cuda_visible); + return std::stoi(*cuda_visible); } catch (...) { // Ignore parse errors } } - return -1; } bool is_running_with_rrun() { - return std::getenv("RAPIDSMPF_RANK") != nullptr; + return getenv_optional("RAPIDSMPF_RANK").has_value(); } -Rank get_rank() { - char* rank_env = std::getenv("RAPIDSMPF_RANK"); - if (rank_env) { - try { - return std::stoi(rank_env); - } catch (...) { - // Ignore parse errors - } - } - return -1; +bool is_running_with_slurm() { + return getenv_optional("SLURM_JOB_ID").has_value() + && getenv_optional("SLURM_PROCID").has_value(); } -Rank get_nranks() { - if (!is_running_with_rrun()) { - throw std::runtime_error( - "get_nranks() can only be called when running with `rrun`. " - "Set RAPIDSMPF_RANK environment variable or use a launcher like 'rrun'." - ); - } - - char const* nranks_str = std::getenv("RAPIDSMPF_NRANKS"); - if (nranks_str == nullptr) { +Rank get_rank() { + if (auto rank_opt = getenv_int("RAPIDSMPF_RANK")) { + return *rank_opt; + } else if (auto rank_opt = getenv_int("PMIX_RANK")) { + return *rank_opt; + } else if (auto rank_opt = getenv_int("SLURM_PROCID")) { + return *rank_opt; + } else { throw std::runtime_error( - "RAPIDSMPF_NRANKS environment variable not set. " - "Make sure to use a rrun launcher to call this function." + "Could not determine number of ranks. " + "Ensure RAPIDSMPF_RANK, PMIX_RANK, or SLURM_PROCID is set." ); } +} - try { - return std::stoi(nranks_str); - } catch (...) { +Rank get_nranks() { + if (auto nranks_opt = getenv_int("RAPIDSMPF_NRANKS")) { + return *nranks_opt; + } else if (auto nranks_opt = getenv_int("SLURM_NPROCS")) { + return *nranks_opt; + } else if (auto nranks_opt = getenv_int("SLURM_NTASKS")) { + return *nranks_opt; + } else { throw std::runtime_error( - "Failed to parse integer from RAPIDSMPF_NRANKS environment variable: " - + std::string(nranks_str) + "Could not determine number of ranks. " + "Ensure RAPIDSMPF_NRANKS, SLURM_NPROCS, or SLURM_NTASKS is set." ); } } diff --git a/cpp/tools/CMakeLists.txt b/cpp/tools/CMakeLists.txt index a3c1aff29..7df00008f 100644 --- a/cpp/tools/CMakeLists.txt +++ b/cpp/tools/CMakeLists.txt @@ -5,7 +5,12 @@ # cmake-format: on # ================================================================================= -add_executable(rrun "rrun.cpp" "$") +add_executable( + rrun + "rrun.cpp" + "$" + "$<$:$>" +) set_target_properties( rrun PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${RAPIDSMPF_BINARY_DIR}/tools" @@ -14,7 +19,10 @@ set_target_properties( ) target_include_directories(rrun PRIVATE "$") target_compile_options(rrun PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>") -target_compile_definitions(rrun PRIVATE $<$:RAPIDSMPF_HAVE_NUMA>) +target_compile_definitions( + rrun PRIVATE $<$:RAPIDSMPF_HAVE_NUMA> + $<$:RAPIDSMPF_HAVE_SLURM> +) target_link_libraries( rrun PRIVATE Threads::Threads @@ -23,6 +31,7 @@ target_link_libraries( $ maybe_asan $<$:numa> + $ ${CMAKE_DL_LIBS} ) install( diff --git a/cpp/tools/rrun.cpp b/cpp/tools/rrun.cpp index 5489da3c6..8f61eb837 100644 --- a/cpp/tools/rrun.cpp +++ b/cpp/tools/rrun.cpp @@ -46,12 +46,50 @@ #include +#ifdef RAPIDSMPF_HAVE_SLURM +#include + +#include +#endif + +namespace { + +// Forward declarations of mode execution functions (defined later, outside namespace) +struct Config; +[[noreturn]] void execute_slurm_passthrough_mode(Config const& cfg); +int execute_single_node_mode(Config& cfg); +#ifdef RAPIDSMPF_HAVE_SLURM +int execute_slurm_hybrid_mode(Config& cfg); +std::string launch_rank0_and_get_address( + Config const& cfg, std::string const& address_file, int total_ranks +); +std::string coordinate_root_address_via_pmix( + std::optional const& root_address_to_publish, bool verbose +); +#endif +int launch_ranks_fork_based( + Config const& cfg, + int rank_offset, + int ranks_per_task, + int total_ranks, + std::optional const& root_address, + bool is_root_parent +); +[[noreturn]] void exec_application(Config const& cfg); +pid_t launch_rank_local( + Config const& cfg, + int global_rank, + int local_rank, + int total_ranks, + std::optional const& root_address, + int* out_fd_stdout, + int* out_fd_stderr +); + // NOTE: Do not use RAPIDSMPF_EXPECTS or RAPIDSMPF_FAIL in this file. // Using these macros introduces a CUDA dependency via rapidsmpf/error.hpp. // Prefer throwing standard exceptions instead. -namespace { - static std::mutex output_mutex; /** @@ -87,6 +125,10 @@ struct Config { topology; // Discovered topology information std::map gpu_topology_map; // Map GPU ID to topology info + bool slurm_mode{false}; // Running under Slurm (--slurm or auto-detected) + int slurm_local_id{-1}; // Local rank within node (SLURM_LOCALID) + int slurm_global_rank{-1}; // Global rank (SLURM_PROCID) + int slurm_ntasks{-1}; // Total number of tasks (SLURM_NTASKS) }; /** @@ -123,7 +165,8 @@ std::vector detect_gpus() { FILE* pipe = popen("nvidia-smi --query-gpu=index --format=csv,noheader 2>/dev/null", "r"); if (!pipe) { - std::cerr << "Warning: Could not detect GPUs using nvidia-smi" << std::endl; + std::cerr << "[rrun] Warning: Could not detect GPUs using nvidia-smi" + << std::endl; return {}; } @@ -147,17 +190,28 @@ void print_usage(std::string_view prog_name) { << "rrun - RapidsMPF Process Launcher\n\n" << "Usage: " << prog_name << " [options] [app_args...]\n\n" << "Single-Node Options:\n" - << " -n Number of ranks to launch (required)\n" + << " -n Number of ranks to launch (required in single-node " + << " mode)\n" << " -g Comma-separated list of GPU IDs (e.g., 0,1,2,3)\n" << " If not specified, auto-detect available GPUs\n\n" + << "Slurm Options:\n" + << " --slurm Run in Slurm mode (auto-detected when SLURM_JOB_ID is " + << " set)\n" + << " Two sub-modes:\n" + << " 1. Passthrough (no -n): Apply bindings and exec\n" + << " 2. Hybrid (with -n): Launch N ranks per Slurm task\n" + << " In hybrid mode, each Slurm task launches multiple\n" + << " ranks with coordinated global rank numbering\n\n" << "Common Options:\n" << " -d Coordination directory (default: /tmp/rrun_)\n" + << " Not applicable in Slurm mode\n" << " --tag-output Tag stdout and stderr with rank number\n" + << " Not applicable in Slurm mode\n" << " --bind-to Bind to topology resources (default: all)\n" << " Can be specified multiple times\n" << " Options: cpu, memory, network, all, none\n" << " Examples: --bind-to cpu --bind-to network\n" - << " --bind-to none (disable all bindings)\n" + << " --bind-to none (disable all bindings)\n" << " -x, --set-env \n" << " Set environment variable for all ranks\n" << " Can be specified multiple times\n" @@ -175,6 +229,15 @@ void print_usage(std::string_view prog_name) { << " # Launch with custom environment variables:\n" << " rrun -n 2 -x UCX_TLS=cuda_copy,cuda_ipc,rc,tcp -x MY_VAR=value " "./bench_comm\n\n" + << "Slurm Examples:\n" + << " # Passthrough: multiple (4) tasks per node, one task per GPU, two nodes.\n" + << " srun --mpi=pmix --nodes=2 --ntasks-per-node=4 --cpus-per-task=36 \\\n" + << " --gpus-per-task=1 --gres=gpu:4 \\\n" + << " rrun ./benchmarks/bench_shuffle -C ucxx\n\n" + << " # Hybrid mode: one task per node, 4 GPUs per task, two nodes.\n" + << " srun --mpi=pmix --nodes=2 --ntasks-per-node=1 --cpus-per-task=144 \\\n" + << " --gpus-per-task=4 --gres=gpu:4 \\\n" + << " rrun -n 4 ./benchmarks/bench_shuffle -C ucxx\n\n" << std::endl; } @@ -289,6 +352,105 @@ bool set_numa_memory_binding(std::vector const& memory_binding) { #endif } +/** + * @brief Check if running under Slurm and populate Slurm-related config fields. + * + * @param cfg Configuration to populate with Slurm information. + * @return true if running under Slurm with required environment variables. + */ +bool detect_slurm_environment(Config& cfg) { + // Check for required Slurm environment variables + char const* slurm_job_id = std::getenv("SLURM_JOB_ID"); + char const* slurm_local_id = std::getenv("SLURM_LOCALID"); + char const* slurm_procid = std::getenv("SLURM_PROCID"); + char const* slurm_ntasks = std::getenv("SLURM_NTASKS"); + + // Need at least job ID and local ID to be in Slurm mode + if (!slurm_job_id || !slurm_local_id) { + return false; + } + + try { + cfg.slurm_local_id = std::stoi(slurm_local_id); + + if (slurm_procid) { + cfg.slurm_global_rank = std::stoi(slurm_procid); + } + + if (slurm_ntasks) { + cfg.slurm_ntasks = std::stoi(slurm_ntasks); + } else { + // Try SLURM_NPROCS as fallback + char const* slurm_nprocs = std::getenv("SLURM_NPROCS"); + if (slurm_nprocs) { + cfg.slurm_ntasks = std::stoi(slurm_nprocs); + } + } + + return true; + } catch (...) { + return false; + } +} + +/** + * @brief Apply topology-based bindings for a specific GPU. + * + * This function sets CPU affinity, NUMA memory binding, and network device + * environment variables based on the topology information for the given GPU. + * + * @param cfg Configuration containing topology information. + * @param gpu_id GPU ID to apply bindings for. + * @param verbose Print warnings on failure. + */ +void apply_topology_bindings(Config const& cfg, int gpu_id, bool verbose) { + if (!cfg.topology.has_value() || gpu_id < 0) { + return; + } + + auto it = cfg.gpu_topology_map.find(gpu_id); + if (it == cfg.gpu_topology_map.end()) { + if (verbose) { + std::cerr << "[rrun] Warning: No topology information for GPU " << gpu_id + << std::endl; + } + return; + } + + auto const& gpu_info = *it->second; + + if (cfg.bind_cpu && !gpu_info.cpu_affinity_list.empty()) { + if (!set_cpu_affinity(gpu_info.cpu_affinity_list)) { + if (verbose) { + std::cerr << "[rrun] Warning: Failed to set CPU affinity for GPU " + << gpu_id << std::endl; + } + } + } + + if (cfg.bind_memory && !gpu_info.memory_binding.empty()) { + if (!set_numa_memory_binding(gpu_info.memory_binding)) { +#if RAPIDSMPF_HAVE_NUMA + if (verbose) { + std::cerr << "[rrun] Warning: Failed to set NUMA memory binding for GPU " + << gpu_id << std::endl; + } +#endif + } + } + + if (cfg.bind_network && !gpu_info.network_devices.empty()) { + std::string ucx_net_devices; + for (size_t i = 0; i < gpu_info.network_devices.size(); ++i) { + if (i > 0) { + ucx_net_devices += ","; + } + ucx_net_devices += gpu_info.network_devices[i]; + } + setenv("UCX_NET_DEVICES", ucx_net_devices.c_str(), 1); + } +} + /** * @brief Parse GPU list from comma-separated string. */ @@ -414,6 +576,17 @@ Config parse_args(int argc, char* argv[]) { cfg.verbose = true; } else if (arg == "--no-cleanup") { cfg.cleanup = false; + } else if (arg == "--slurm") { + cfg.slurm_mode = true; + } else if (arg == "--") { + // Everything after -- is the application and its arguments + if (i + 1 < argc) { + cfg.app_binary = argv[i + 1]; + for (int j = i + 2; j < argc; ++j) { + cfg.app_args.push_back(argv[j]); + } + } + break; } else if (arg[0] == '-') { throw std::runtime_error("Unknown option: " + arg); } else { @@ -433,30 +606,65 @@ Config parse_args(int argc, char* argv[]) { throw std::runtime_error("Missing application binary"); } - // Single-node mode validation - if (cfg.nranks <= 0) { - throw std::runtime_error("Number of ranks (-n) must be specified and positive"); + // Auto-detect Slurm mode if not explicitly specified + if (!cfg.slurm_mode) { + cfg.slurm_mode = detect_slurm_environment(cfg); + } else { + // --slurm was specified, populate Slurm info + if (!detect_slurm_environment(cfg)) { + throw std::runtime_error( + "--slurm specified but required Slurm environment variables " + "(SLURM_JOB_ID, SLURM_LOCALID) are not set. " + "Ensure you're running under srun." + ); + } + } + + if (cfg.slurm_mode) { + // Slurm mode validation + if (cfg.slurm_local_id < 0) { + throw std::runtime_error( + "SLURM_LOCALID environment variable not set or invalid" + ); + } + + // In Slurm mode: + // - If -n is specified: launch N ranks per Slurm task (hybrid mode) + // - If -n is not specified: just apply bindings and exec (passthrough mode, + // one rank per task) + if (cfg.nranks <= 0) { + cfg.nranks = 1; + } + } else { + // Single-node mode validation + if (cfg.nranks <= 0) { + throw std::runtime_error( + "Number of ranks (-n) must be specified and positive" + ); + } } // Auto-detect GPUs if not specified if (cfg.gpus.empty()) { cfg.gpus = detect_gpus(); if (cfg.gpus.empty()) { - std::cerr - << "Warning: No GPUs detected. CUDA_VISIBLE_DEVICES will not be set." - << std::endl; + std::cerr << "[rrun] Warning: No GPUs detected. CUDA_VISIBLE_DEVICES will " + "not be set." + << std::endl; } } - // Validate GPU count vs rank count - if (!cfg.gpus.empty() && cfg.nranks > static_cast(cfg.gpus.size())) { - std::cerr << "Warning: Number of ranks (" << cfg.nranks + // Validate GPU count vs rank count (only warn in single-node mode) + if (!cfg.slurm_mode && !cfg.gpus.empty() + && cfg.nranks > static_cast(cfg.gpus.size())) + { + std::cerr << "[rrun] Warning: Number of ranks (" << cfg.nranks << ") exceeds number of GPUs (" << cfg.gpus.size() << "). Multiple ranks will share GPUs." << std::endl; } - // Generate coordination directory if not specified - if (cfg.coord_dir.empty()) { + // Generate coordination directory if not specified (not needed in Slurm mode) + if (cfg.coord_dir.empty() && !cfg.slurm_mode) { cfg.coord_dir = "/tmp/rrun_" + generate_session_id(); } @@ -477,7 +685,7 @@ Config parse_args(int argc, char* argv[]) { } } else { if (cfg.verbose) { - std::cerr << "Warning: Failed to discover system topology. " + std::cerr << "[rrun] Warning: Failed to discover system topology. " << "CPU affinity, NUMA binding, and UCX network device " << "configuration will be skipped." << std::endl; } @@ -569,130 +777,665 @@ pid_t fork_with_piped_stdio( } /** - * @brief Launch a single rank locally (fork-based). + * @brief Common helper to set up coordination, launch ranks, and cleanup. + * + * This function encapsulates the common workflow shared by both Slurm hybrid mode + * and single-node mode: create coordination directory, launch ranks via fork, + * cleanup, and report results. + * + * A task here denotes a Slurm unit of execution, e.g., a single instance of a + * program or process, e.g., an instance of the `rrun` executable itself. + * + * @param cfg Configuration (will modify coord_dir if empty). + * @param rank_offset Starting global rank for this task. + * @param ranks_per_task Number of ranks to launch locally. + * @param total_ranks Total ranks across all tasks. + * @param root_address Pre-coordinated root address (empty for FILE backend). + * @param is_root_parent Whether this is root parent (affects launch logic). + * @param coord_dir_hint Hint for coordination directory name (e.g., job ID). + * @return Exit status (0 for success). */ -pid_t launch_rank_local( - Config const& cfg, int rank, int* out_fd_stdout, int* out_fd_stderr +int setup_launch_and_cleanup( + Config& cfg, + int rank_offset, + int ranks_per_task, + int total_ranks, + std::optional const& root_address, + bool is_root_parent, + std::string const& coord_dir_hint = "" ) { - // Capture rank by value explicitly to avoid any potential issues - int captured_rank = rank; - return fork_with_piped_stdio( - out_fd_stdout, - out_fd_stderr, - /*combine_stderr*/ false, - [&cfg, captured_rank]() { - // Set custom environment variables first (can be overridden by specific vars) - for (auto const& env_pair : cfg.env_vars) { - setenv(env_pair.first.c_str(), env_pair.second.c_str(), 1); - } + // Set up coordination directory + if (cfg.coord_dir.empty()) { + if (!coord_dir_hint.empty()) { + cfg.coord_dir = "/tmp/rrun_" + coord_dir_hint; + } else { + cfg.coord_dir = "/tmp/rrun_" + generate_session_id(); + } + } + std::filesystem::create_directories(cfg.coord_dir); - // Set environment variables - setenv("RAPIDSMPF_RANK", std::to_string(captured_rank).c_str(), 1); - setenv("RAPIDSMPF_NRANKS", std::to_string(cfg.nranks).c_str(), 1); - setenv("RAPIDSMPF_COORD_DIR", cfg.coord_dir.c_str(), 1); + // Launch ranks and wait for completion + int exit_status = launch_ranks_fork_based( + cfg, rank_offset, ranks_per_task, total_ranks, root_address, is_root_parent + ); - // Set CUDA_VISIBLE_DEVICES if GPUs are available - int gpu_id = -1; - if (!cfg.gpus.empty()) { - gpu_id = cfg.gpus[static_cast(captured_rank) % cfg.gpus.size()]; - setenv("CUDA_VISIBLE_DEVICES", std::to_string(gpu_id).c_str(), 1); - } + if (cfg.cleanup) { + if (cfg.verbose) { + std::cout << "[rrun] Cleaning up coordination directory: " << cfg.coord_dir + << std::endl; + } + std::error_code ec; + std::filesystem::remove_all(cfg.coord_dir, ec); + if (ec) { + std::cerr << "[rrun] Warning: Failed to cleanup directory: " << cfg.coord_dir + << ": " << ec.message() << std::endl; + } + } else if (cfg.verbose) { + std::cout << "[rrun] Coordination directory preserved: " << cfg.coord_dir + << std::endl; + } - // Apply topology-based configuration if available - if (cfg.topology.has_value() && gpu_id >= 0) { - auto it = cfg.gpu_topology_map.find(gpu_id); - if (it != cfg.gpu_topology_map.end()) { - auto const& gpu_info = *it->second; + if (cfg.verbose && exit_status == 0) { + std::cout << "\n[rrun] All ranks completed successfully." << std::endl; + } - if (cfg.bind_cpu && !gpu_info.cpu_affinity_list.empty()) { - if (!set_cpu_affinity(gpu_info.cpu_affinity_list)) { - std::cerr << "Warning: Failed to set CPU affinity for rank " - << captured_rank << " (GPU " << gpu_id << ")" - << std::endl; - } - } + return exit_status; +} - if (cfg.bind_memory && !gpu_info.memory_binding.empty()) { - if (!set_numa_memory_binding(gpu_info.memory_binding)) { -#if RAPIDSMPF_HAVE_NUMA - std::cerr - << "Warning: Failed to set NUMA memory binding for rank " - << captured_rank << " (GPU " << gpu_id << ")" - << std::endl; -#endif - } - } +/** + * @brief Execute application via execvp (never returns). + * + * Prepares arguments and calls execvp. On failure, prints error and exits. + * This function never returns - it either replaces the current process + * or calls _exit(1) on error. + * + * @param cfg Configuration containing application binary and arguments. + */ +[[noreturn]] void exec_application(Config const& cfg) { + // Prepare arguments for execvp + std::vector exec_args; + exec_args.push_back(const_cast(cfg.app_binary.c_str())); + for (auto const& arg : cfg.app_args) { + exec_args.push_back(const_cast(arg.c_str())); + } + exec_args.push_back(nullptr); - if (cfg.bind_network && !gpu_info.network_devices.empty()) { - std::string ucx_net_devices; - for (size_t i = 0; i < gpu_info.network_devices.size(); ++i) { - if (i > 0) { - ucx_net_devices += ","; - } - ucx_net_devices += gpu_info.network_devices[i]; - } - setenv("UCX_NET_DEVICES", ucx_net_devices.c_str(), 1); - } - } + // Exec the application (this replaces the current process) + execvp(cfg.app_binary.c_str(), exec_args.data()); + + // If we get here, execvp failed + std::cerr << "[rrun] Failed to execute " << cfg.app_binary << ": " + << std::strerror(errno) << std::endl; + _exit(1); +} + +#ifdef RAPIDSMPF_HAVE_SLURM +/** + * @brief Execute application in Slurm hybrid mode with PMIx coordination. + * + * Root parent launches rank 0 first to get address, coordinates via PMIx, then parents + * on all nodes launch their remaining ranks. Uses fork-based execution. + * + * @param cfg Configuration. + * @return Exit status (0 for success). + */ +int execute_slurm_hybrid_mode(Config& cfg) { + if (cfg.verbose) { + std::cout << "[rrun] Slurm hybrid mode: task " << cfg.slurm_global_rank + << " launching " << cfg.nranks << " ranks per task" << std::endl; + std::cout << "[rrun] Using PMIx for parent coordination (no file I/O)" + << std::endl; + } + + // Set up coordination directory FIRST (needed by rank 0 when it's launched early) + char const* job_id = std::getenv("SLURM_JOB_ID"); + if (cfg.coord_dir.empty()) { + if (job_id) { + cfg.coord_dir = "/tmp/rrun_slurm_" + std::string{job_id}; + } else { + cfg.coord_dir = "/tmp/rrun_" + generate_session_id(); + } + } + std::filesystem::create_directories(cfg.coord_dir); + + // Root parent needs to launch rank 0 first to get address + bool is_root_parent = (cfg.slurm_global_rank == 0); + + // Coordinate root address with other nodes via PMIx + int slurm_ntasks = cfg.slurm_ntasks > 0 ? cfg.slurm_ntasks : 1; + int total_ranks = slurm_ntasks * cfg.nranks; + std::string encoded_root_address, coordinated_root_address; + + if (is_root_parent) { + // Root parent: Launch rank 0, get address, coordinate via PMIx + std::string address_file = + "/tmp/rapidsmpf_root_address_" + std::string{job_id ? job_id : "unknown"}; + encoded_root_address = + launch_rank0_and_get_address(cfg, address_file, total_ranks); + coordinated_root_address = + coordinate_root_address_via_pmix(encoded_root_address, cfg.verbose); + } else { + // Non-root parent: Get address from root via PMIx + coordinated_root_address = + coordinate_root_address_via_pmix(std::nullopt, cfg.verbose); + } + + unsetenv("RAPIDSMPF_ROOT_ADDRESS_FILE"); + + int rank_offset = cfg.slurm_global_rank * cfg.nranks; + + if (cfg.verbose) { + std::cout << "[rrun] Task " << cfg.slurm_global_rank << " launching ranks " + << rank_offset << "-" << (rank_offset + cfg.nranks - 1) + << " (total: " << total_ranks << " ranks)" << std::endl; + } + + std::string coord_hint = job_id ? ("slurm_" + std::string{job_id}) : ""; + int exit_status = setup_launch_and_cleanup( + cfg, + rank_offset, + cfg.nranks, + total_ranks, + coordinated_root_address, + is_root_parent, + coord_hint + ); + + // Finalize PMIx session used for parent coordination + if (!coordinated_root_address.empty()) { + if (cfg.verbose) { + std::cout << "[rrun] Finalizing PMIx in parent" << std::endl; + } + rapidsmpf::bootstrap::detail::SlurmBackend::finalize_pmix(); + } + + return exit_status; +} +#endif // RAPIDSMPF_HAVE_SLURM + +/** + * @brief Execute application in single-node mode with FILE backend. + * + * Uses fork-based execution with file-based coordination. + * + * @param cfg Configuration. + * @return Exit status (0 for success). + */ +int execute_single_node_mode(Config& cfg) { + if (cfg.verbose) { + std::cout << "[rrun] Single-node mode: launching " << cfg.nranks << " ranks" + << std::endl; + } + + return setup_launch_and_cleanup(cfg, 0, cfg.nranks, cfg.nranks, std::nullopt, false); +} + +/** + * @brief Execute application in Slurm passthrough mode (single rank per task). + * + * Applies topology bindings and executes the application directly without forking. + * This function never returns - it either replaces the current process or exits on error. + * + * @param cfg Configuration. + */ +[[noreturn]] void execute_slurm_passthrough_mode(Config const& cfg) { + if (cfg.verbose) { + std::cout << "[rrun] Slurm passthrough mode: applying bindings and exec'ing" + << std::endl; + } + + // Set rrun coordination environment variables so the application knows + // it's being launched by rrun and should use bootstrap mode + setenv("RAPIDSMPF_RANK", std::to_string(cfg.slurm_global_rank).c_str(), 1); + setenv("RAPIDSMPF_NRANKS", std::to_string(cfg.slurm_ntasks).c_str(), 1); + + // Determine GPU for this Slurm task + int gpu_id = -1; + if (!cfg.gpus.empty()) { + gpu_id = cfg.gpus[static_cast(cfg.slurm_local_id) % cfg.gpus.size()]; + setenv("CUDA_VISIBLE_DEVICES", std::to_string(gpu_id).c_str(), 1); + + if (cfg.verbose) { + std::cout << "[rrun] Slurm task (passthrough) local_id=" << cfg.slurm_local_id + << " assigned to GPU " << gpu_id << std::endl; + } + } + + // Set custom environment variables + for (auto const& env_pair : cfg.env_vars) { + setenv(env_pair.first.c_str(), env_pair.second.c_str(), 1); + } + + apply_topology_bindings(cfg, gpu_id, cfg.verbose); + + exec_application(cfg); +} + +#ifdef RAPIDSMPF_HAVE_SLURM +/** + * @brief Launch rank 0 first to obtain its UCXX root address. + * + * @param cfg Configuration. + * @param address_file Path to file where rank 0 will write its address. + * @param total_ranks Total number of ranks across all tasks. + * @return Hex-encoded root address. + * + * @throws std::runtime_error on timeout or launch failure. + */ +std::string launch_rank0_and_get_address( + Config const& cfg, std::string const& address_file, int total_ranks +) { + if (cfg.verbose) { + std::cout << "[rrun] Root parent: launching rank 0 first to get address" + << std::endl; + } + + setenv("RAPIDSMPF_ROOT_ADDRESS_FILE", address_file.c_str(), 1); + + int fd_out = -1, fd_err = -1; + pid_t rank0_pid = + launch_rank_local(cfg, 0, 0, total_ranks, std::nullopt, &fd_out, &fd_err); + + // Start forwarders for rank 0 output + std::thread rank0_stdout_forwarder; + std::thread rank0_stderr_forwarder; + auto suppress = std::make_shared>(false); + + if (fd_out >= 0) { + rank0_stdout_forwarder = std::thread([fd_out, suppress]() { + FILE* stream = fdopen(fd_out, "r"); + if (!stream) { + close(fd_out); + return; + } + char buffer[4096]; + while (fgets(buffer, sizeof(buffer), stream) != nullptr) { + if (suppress->load()) + continue; + std::lock_guard lock(output_mutex); + fputs(buffer, stdout); + fflush(stdout); } + fclose(stream); + }); + } - // Prepare arguments for execvp - std::vector exec_args; - exec_args.push_back(const_cast(cfg.app_binary.c_str())); - for (auto const& arg : cfg.app_args) { - exec_args.push_back(const_cast(arg.c_str())); + if (fd_err >= 0) { + rank0_stderr_forwarder = std::thread([fd_err, suppress]() { + FILE* stream = fdopen(fd_err, "r"); + if (!stream) { + close(fd_err); + return; } - exec_args.push_back(nullptr); + char buffer[4096]; + while (fgets(buffer, sizeof(buffer), stream) != nullptr) { + if (suppress->load()) + continue; + std::lock_guard lock(output_mutex); + fputs(buffer, stderr); + fflush(stderr); + } + fclose(stream); + }); + } - execvp(cfg.app_binary.c_str(), exec_args.data()); - std::cerr << "Failed to execute " << cfg.app_binary << ": " - << std::strerror(errno) << std::endl; - _exit(1); + // Wait for rank 0 to write the address file (with timeout) + auto start = std::chrono::steady_clock::now(); + while (!std::filesystem::exists(address_file)) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto elapsed = std::chrono::steady_clock::now() - start; + if (elapsed > std::chrono::seconds(30)) { + suppress->store(true); + kill(rank0_pid, SIGKILL); + waitpid(rank0_pid, nullptr, 0); + if (rank0_stdout_forwarder.joinable()) + rank0_stdout_forwarder.join(); + if (rank0_stderr_forwarder.joinable()) + rank0_stderr_forwarder.join(); + throw std::runtime_error("Timeout waiting for rank 0 to write root address"); } - ); + } + + // Read the hex-encoded address and remove file + std::string encoded_address; + std::ifstream addr_stream(address_file); + std::getline(addr_stream, encoded_address); + addr_stream.close(); + std::filesystem::remove(address_file); + + if (cfg.verbose) { + std::cout << "[rrun] Got root address from rank 0 (hex-encoded, " + << encoded_address.size() << " chars)" << std::endl; + } + + // Rank 0 is already running - detach forwarders + if (rank0_stdout_forwarder.joinable()) + rank0_stdout_forwarder.detach(); + if (rank0_stderr_forwarder.joinable()) + rank0_stderr_forwarder.detach(); + + return encoded_address; } /** - * @brief Wait for all child processes and check their exit status. + * @brief Coordinate root address between parent processes using SlurmBackend. + * + * This function is called by parent rrun processes in Slurm hybrid mode. + * The root parent (SLURM_PROCID=0) publishes the root address, and non-root + * parents retrieve it. + * + * @param root_address_to_publish Root address to publish. If set (has_value()), this is + * the root parent and it will publish. If empty (nullopt), + * this is a non-root parent and it will retrieve. + * @param verbose Whether to print debug messages. + * @return Root address (either published or retrieved). + * + * @throws std::runtime_error on coordination errors. */ -int wait_for_ranks(std::vector const& pids) { - int overall_status = 0; +std::string coordinate_root_address_via_pmix( + std::optional const& root_address_to_publish, bool verbose +) { + // Get Slurm rank information for parent coordination + char const* slurm_procid = std::getenv("SLURM_PROCID"); + char const* slurm_ntasks = std::getenv("SLURM_NTASKS"); - for (size_t i = 0; i < pids.size(); ++i) { - int status; - while (true) { - pid_t result = waitpid(pids[i], &status, 0); + if (!slurm_procid || !slurm_ntasks) { + throw std::runtime_error( + "SLURM_PROCID and SLURM_NTASKS must be set for parent coordination" + ); + } + + int parent_rank = std::stoi(slurm_procid); + int parent_nranks = std::stoi(slurm_ntasks); + + // Create SlurmBackend for parent-level coordination + rapidsmpf::bootstrap::Context parent_ctx{ + parent_rank, + parent_nranks, + rapidsmpf::bootstrap::BackendType::SLURM, + std::nullopt, + nullptr + }; - if (result < 0) { - if (errno == EINTR) { - // Retry waitpid for the same pid + auto backend = + std::make_shared(parent_ctx); + + if (verbose) { + std::cout << "[rrun] Parent coordination initialized: rank " << parent_rank + << " of " << parent_nranks << std::endl; + } + + std::string root_address; + + if (root_address_to_publish.has_value()) { + // Root parent publishes the address (already hex-encoded for binary safety) + if (verbose) { + std::cout << "[rrun] Publishing root address via SlurmBackend (hex-encoded, " + << root_address_to_publish.value().size() << " chars)" << std::endl; + } + + backend->put("rapidsmpf_root_address", root_address_to_publish.value()); + root_address = root_address_to_publish.value(); + } + + backend->sync(); + + if (!root_address_to_publish.has_value()) { + // Non-root parents retrieve the address + root_address = backend->get("rapidsmpf_root_address", std::chrono::seconds{30}); + + if (verbose) { + std::cout << "[rrun] Retrieved root address via SlurmBackend (hex-encoded, " + << root_address.size() << " chars)" << std::endl; + } + } + + // Note: PMIx session will be explicitly finalized after children complete + // (see execute_slurm_hybrid_mode where finalize_pmix() is called) + + return root_address; +} +#endif // RAPIDSMPF_HAVE_SLURM + +/** + * @brief Launch multiple ranks locally using fork. + * + * A task here denotes a Slurm unit of execution, e.g., a single instance of a + * program or process, e.g., an instance of the `rrun` executable itself. + * + * @param cfg Configuration. + * @param rank_offset Starting global rank for this task. + * @param ranks_per_task Number of ranks to launch. + * @param total_ranks Total ranks across all tasks. + * @param root_address Pre-coordinated root address (empty for FILE backend). + * @param is_root_parent Whether this is root parent (affects which ranks to launch). + * @return Exit status (0 for success). + */ +int launch_ranks_fork_based( + Config const& cfg, + int rank_offset, + int ranks_per_task, + int total_ranks, + std::optional const& root_address, + bool is_root_parent +) { + std::vector pids; + pids.reserve(static_cast(ranks_per_task)); + + // Block SIGINT/SIGTERM in this thread; a dedicated thread will handle them. + sigset_t signal_set; + sigemptyset(&signal_set); + sigaddset(&signal_set, SIGINT); + sigaddset(&signal_set, SIGTERM); + sigprocmask(SIG_BLOCK, &signal_set, nullptr); + + // Output suppression flag and forwarder threads + auto suppress_output = std::make_shared>(false); + std::vector forwarders; + forwarders.reserve(static_cast(ranks_per_task) * 2); + + // Helper to start a forwarder thread for a given fd + auto start_forwarder = [&](int fd, int rank, bool to_stderr) { + if (fd < 0) { + return; + } + forwarders.emplace_back([fd, rank, to_stderr, &cfg, suppress_output]() { + FILE* stream = fdopen(fd, "r"); + if (!stream) { + close(fd); + return; + } + std::string tag = + cfg.tag_output ? ("[" + std::to_string(rank) + "] ") : std::string{}; + char buffer[4096]; + while (fgets(buffer, sizeof(buffer), stream) != nullptr) { + if (suppress_output->load(std::memory_order_relaxed)) { continue; } - std::cerr << "Error waiting for rank " << i << ": " - << std::strerror(errno) << std::endl; - overall_status = 1; - break; - } - - if (WIFEXITED(status)) { - int exit_code = WEXITSTATUS(status); - if (exit_code != 0) { - std::cerr << "Rank " << i << " (PID " << pids[i] - << ") exited with code " << exit_code << std::endl; - overall_status = exit_code; + FILE* out = to_stderr ? stderr : stdout; + { + std::lock_guard lock(output_mutex); + if (!tag.empty()) { + fputs(tag.c_str(), out); + } + fputs(buffer, out); + fflush(out); } - } else if (WIFSIGNALED(status)) { - int signal = WTERMSIG(status); - std::cerr << "Rank " << i << " (PID " << pids[i] - << ") terminated by signal " << signal << std::endl; - overall_status = 128 + signal; } - break; + fclose(stream); + }); + }; + + // Launch ranks (skip rank 0 if root parent already launched it) + int start_local_rank = (is_root_parent && root_address.has_value()) ? 1 : 0; + + for (int local_rank = start_local_rank; local_rank < ranks_per_task; ++local_rank) { + int global_rank = rank_offset + local_rank; + int fd_out = -1; + int fd_err = -1; + pid_t pid = launch_rank_local( + cfg, global_rank, local_rank, total_ranks, root_address, &fd_out, &fd_err + ); + pids.push_back(pid); + + if (cfg.verbose) { + std::ostringstream msg; + msg << "[rrun] Launched rank " << global_rank << " (PID " << pid << ")"; + if (!cfg.gpus.empty()) { + msg << " on GPU " + << cfg.gpus[static_cast(local_rank) % cfg.gpus.size()]; + } + msg << std::endl; + std::string msg_str = msg.str(); + + std::cout << msg_str; + std::cout.flush(); } + start_forwarder(fd_out, global_rank, false); + start_forwarder(fd_err, global_rank, true); } - return overall_status; + // Start a signal-waiting thread to forward signals. + std::thread([signal_set, &pids, suppress_output]() mutable { + for (;;) { + int sig = 0; + int rc = sigwait(&signal_set, &sig); + if (rc != 0) { + continue; + } + suppress_output->store(true, std::memory_order_relaxed); + for (pid_t pid : pids) { + kill(pid, sig); + } + return; + } + }).detach(); + + std::cout << "\n[rrun] All ranks launched. Waiting for completion...\n" << std::endl; + + // Wait for all processes + int exit_status = 0; + for (size_t i = 0; i < pids.size(); ++i) { + int status = 0; + pid_t pid = pids[i]; + if (waitpid(pid, &status, 0) < 0) { + std::cerr << "[rrun] Failed to wait for rank " << i << " (PID " << pid + << "): " << std::strerror(errno) << std::endl; + exit_status = 1; + continue; + } + + if (WIFEXITED(status)) { + int code = WEXITSTATUS(status); + if (code != 0) { + std::cerr << "[rrun] Rank " + << (static_cast(rank_offset) + + (is_root_parent && root_address.has_value() ? i + 1 : i)) + << " (PID " << pid << ") exited with code " << code + << std::endl; + exit_status = code; + } + } else if (WIFSIGNALED(status)) { + int sig = WTERMSIG(status); + std::cerr << "[rrun] Rank " + << (static_cast(rank_offset) + + (is_root_parent && root_address.has_value() ? i + 1 : i)) + << " (PID " << pid << ") terminated by signal " << sig << std::endl; + exit_status = 128 + sig; + } + } + + // Wait for forwarder threads to finish + for (auto& t : forwarders) { + if (t.joinable()) { + t.join(); + } + } + + return exit_status; } + +/** + * @brief Launch a single rank locally (fork-based). + * + * @param cfg Configuration. + * @param global_rank Global rank number (used for RAPIDSMPF_RANK). + * @param local_rank Local rank for GPU assignment (defaults to global_rank). + * @param total_ranks Total number of ranks across all tasks (used for RAPIDSMPF_NRANKS). + * @param root_address Optional pre-coordinated root address (for hybrid mode). + * @param out_fd_stdout Output file descriptor for stdout. + * @param out_fd_stderr Output file descriptor for stderr. + * @return Child process PID. + */ +pid_t launch_rank_local( + Config const& cfg, + int global_rank, + int local_rank, + int total_ranks, + std::optional const& root_address, + int* out_fd_stdout, + int* out_fd_stderr +) { + // Capture all parameters by value to avoid any potential issues + int captured_global_rank = global_rank; + int captured_local_rank = local_rank; + int captured_total_ranks = total_ranks; + std::optional captured_root_address = root_address; + + return fork_with_piped_stdio( + out_fd_stdout, + out_fd_stderr, + /*combine_stderr*/ false, + [&cfg, + captured_global_rank, + captured_local_rank, + captured_total_ranks, + captured_root_address]() { + // Set custom environment variables first (can be overridden by specific vars) + for (auto const& env_pair : cfg.env_vars) { + setenv(env_pair.first.c_str(), env_pair.second.c_str(), 1); + } + + setenv("RAPIDSMPF_RANK", std::to_string(captured_global_rank).c_str(), 1); + setenv("RAPIDSMPF_NRANKS", std::to_string(captured_total_ranks).c_str(), 1); + + // Always set coord_dir for bootstrap initialization + // (needed even if using RAPIDSMPF_ROOT_ADDRESS for coordination) + if (!cfg.coord_dir.empty()) { + setenv("RAPIDSMPF_COORD_DIR", cfg.coord_dir.c_str(), 1); + } + + // If root address was pre-coordinated by parent, set it (already hex-encoded) + // This allows children to skip bootstrap coordination entirely + if (captured_root_address.has_value()) { + setenv("RAPIDSMPF_ROOT_ADDRESS", captured_root_address->c_str(), 1); + } + + // In Slurm hybrid mode, unset Slurm/PMIx rank variables to avoid confusion + // Children should not try to initialize PMIx themselves + if (cfg.slurm_mode) { + unsetenv("SLURM_PROCID"); + unsetenv("SLURM_LOCALID"); + unsetenv("PMIX_RANK"); + unsetenv("PMIX_NAMESPACE"); + } + + // Set CUDA_VISIBLE_DEVICES if GPUs are available + // Use local_rank for GPU assignment (for Slurm hybrid mode) + int gpu_id = -1; + if (!cfg.gpus.empty()) { + gpu_id = + cfg.gpus[static_cast(captured_local_rank) % cfg.gpus.size()]; + setenv("CUDA_VISIBLE_DEVICES", std::to_string(gpu_id).c_str(), 1); + } + + apply_topology_bindings(cfg, gpu_id, cfg.verbose); + + exec_application(cfg); + } + ); +} + } // namespace int main(int argc, char* argv[]) { @@ -702,7 +1445,8 @@ int main(int argc, char* argv[]) { if (cfg.verbose) { std::cout << "rrun configuration:\n"; - std::cout << " Mode: Single-node\n" + std::cout << " Mode: " << (cfg.slurm_mode ? "Slurm" : "Single-node") + << "\n" << " GPUs: "; if (cfg.gpus.empty()) { std::cout << "(none)\n"; @@ -714,13 +1458,19 @@ int main(int argc, char* argv[]) { } std::cout << "\n"; } - if (cfg.tag_output) { - std::cout << " Tag Output: Yes\n"; + if (cfg.slurm_mode) { + std::cout << " Slurm Local ID: " << cfg.slurm_local_id << "\n" + << " Slurm Rank: " << cfg.slurm_global_rank << "\n" + << " Slurm NTasks: " << cfg.slurm_ntasks << "\n"; + } else { + if (cfg.tag_output) { + std::cout << " Tag Output: Yes\n"; + } + std::cout << " Ranks: " << cfg.nranks << "\n" + << " Coord Dir: " << cfg.coord_dir << "\n" + << " Cleanup: " << (cfg.cleanup ? "yes" : "no") << "\n"; } - std::cout << " Ranks: " << cfg.nranks << "\n" - << " Application: " << cfg.app_binary << "\n" - << " Coord Dir: " << cfg.coord_dir << "\n" - << " Cleanup: " << (cfg.cleanup ? "yes" : "no") << "\n"; + std::cout << " Application: " << cfg.app_binary << "\n"; std::vector bind_types; if (cfg.bind_cpu) bind_types.push_back("cpu"); @@ -752,137 +1502,30 @@ int main(int argc, char* argv[]) { std::cout << std::endl; } - std::filesystem::create_directories(cfg.coord_dir); - - std::vector pids; - pids.reserve(static_cast(cfg.nranks)); - - // Block SIGINT/SIGTERM in this thread; a dedicated thread will handle them. - sigset_t signal_set; - sigemptyset(&signal_set); - sigaddset(&signal_set, SIGINT); - sigaddset(&signal_set, SIGTERM); - sigprocmask(SIG_BLOCK, &signal_set, nullptr); - - // Output suppression flag and forwarder threads - auto suppress_output = std::make_shared>(false); - std::vector forwarders; - forwarders.reserve(static_cast(cfg.nranks) * 2); - - // Helper to start a forwarder thread for a given fd - auto start_forwarder = [&](int fd, int rank, bool to_stderr) { - if (fd < 0) { - return; - } - forwarders.emplace_back([fd, rank, to_stderr, &cfg, suppress_output]() { - FILE* stream = fdopen(fd, "r"); - if (!stream) { - close(fd); - return; - } - std::string tag = - cfg.tag_output ? ("[" + std::to_string(rank) + "] ") : std::string{}; - char buffer[4096]; - while (fgets(buffer, sizeof(buffer), stream) != nullptr) { - if (suppress_output->load(std::memory_order_relaxed)) { - // Discard further lines after suppression - continue; - } - FILE* out = to_stderr ? stderr : stdout; - { - std::lock_guard lock(output_mutex); - if (!tag.empty()) { - fputs(tag.c_str(), out); - } - fputs(buffer, out); - fflush(out); - } - } - fclose(stream); - }); - }; - - // Single-node local mode - for (int rank = 0; rank < cfg.nranks; ++rank) { - int fd_out = -1; - int fd_err = -1; - pid_t pid = launch_rank_local(cfg, rank, &fd_out, &fd_err); - pids.push_back(pid); - - if (cfg.verbose) { - std::ostringstream msg; - msg << "Launched rank " << rank << " (PID " << pid << ")"; - if (!cfg.gpus.empty()) { - msg << " on GPU " - << cfg.gpus[static_cast(rank) % cfg.gpus.size()]; - } - msg << std::endl; - std::string msg_str = msg.str(); - - std::cout << msg_str; - std::cout.flush(); - } - // Parent-side forwarders for local stdout and stderr - start_forwarder(fd_out, rank, false); - start_forwarder(fd_err, rank, true); - } - - // Start a signal-waiting thread to forward signals. - std::thread([signal_set, &pids, suppress_output]() mutable { - for (;;) { - int sig = 0; - int rc = sigwait(&signal_set, &sig); - if (rc != 0) { - return; - } - // Stop printing further output immediately - suppress_output->store(true, std::memory_order_relaxed); - // Forward signal to all local children - for (pid_t pid : pids) { - std::ignore = kill(pid, sig); - } - } - }).detach(); - - if (cfg.verbose) { - std::cout << "\nAll ranks launched. Waiting for completion...\n" << std::endl; - } - - // Wait for all ranks to complete - int exit_status = wait_for_ranks(pids); - - // Join forwarders before cleanup - for (auto& th : forwarders) { - if (th.joinable()) { - th.join(); - } - } - - if (cfg.cleanup) { - if (cfg.verbose) { - std::cout << "Cleaning up coordination directory: " << cfg.coord_dir - << std::endl; - } - std::error_code ec; - std::filesystem::remove_all(cfg.coord_dir, ec); - if (ec) { - std::cerr << "Warning: Failed to cleanup directory: " << cfg.coord_dir - << ": " << ec.message() << std::endl; + if (cfg.slurm_mode) { + if (cfg.nranks == 1) { + // Slurm passthrough mode: single rank per task, no forking + execute_slurm_passthrough_mode(cfg); } - } else if (cfg.verbose) { - std::cout << "Coordination directory preserved: " << cfg.coord_dir - << std::endl; - } - - if (cfg.verbose && exit_status == 0) { - std::cout << "\nAll ranks completed successfully." << std::endl; + // Slurm hybrid mode: multiple ranks per task with PMIx coordination +#ifdef RAPIDSMPF_HAVE_SLURM + return execute_slurm_hybrid_mode(cfg); +#else + std::cerr << "[rrun] Error: Slurm hybrid mode requires PMIx support but " + << "rapidsmpf was not built with PMIx." << std::endl; + std::cerr << "[rrun] Rebuild with -DBUILD_SLURM_SUPPORT=ON or use " + "passthrough mode " + << "(without -n flag)." << std::endl; + return 1; +#endif + } else { + // Single-node mode with file backend + return execute_single_node_mode(cfg); } - return exit_status; - } catch (std::exception const& e) { - std::cerr << "Error: " << e.what() << std::endl; - std::cerr << "Run with -h or --help for usage information." << std::endl; + std::cerr << "[rrun] Error: " << e.what() << std::endl; + std::cerr << "[rrun] Run with -h or --help for usage information." << std::endl; return 1; } } diff --git a/dependencies.yaml b/dependencies.yaml index 4f59362d6..0b2a4c8ee 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -13,6 +13,7 @@ files: - build-cpp - build-python - build-mpi + - build-pmix - checks - clang_tidy - cuda @@ -176,6 +177,7 @@ dependencies: - cuda-nvcc - cxx-compiler - libnuma + - libpmix-devel >=5.0,<6.0 - openmpi >=5.0 # See specific: - output_types: conda @@ -217,6 +219,11 @@ dependencies: packages: - openmpi >=5.0 # See - mpi4py + build-pmix: + common: + - output_types: conda + packages: + - libpmix-devel >=5.0,<6.0 build-python: common: - output_types: [conda, pyproject, requirements] diff --git a/python/rapidsmpf/rapidsmpf/bootstrap/__init__.py b/python/rapidsmpf/rapidsmpf/bootstrap/__init__.py index d1e3e5a54..bc92910c0 100644 --- a/python/rapidsmpf/rapidsmpf/bootstrap/__init__.py +++ b/python/rapidsmpf/rapidsmpf/bootstrap/__init__.py @@ -1,13 +1,13 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """Bootstrap utilities for communicator creation.""" from __future__ import annotations from rapidsmpf.bootstrap.bootstrap import ( - Backend, + BackendType, create_ucxx_comm, is_running_with_rrun, ) -__all__ = ["Backend", "create_ucxx_comm", "is_running_with_rrun"] +__all__ = ["BackendType", "create_ucxx_comm", "is_running_with_rrun"] diff --git a/python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyi b/python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyi index bf3c0477d..fc5d39b4f 100644 --- a/python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyi +++ b/python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyi @@ -6,12 +6,12 @@ from enum import IntEnum from rapidsmpf.communicator.communicator import Communicator from rapidsmpf.config import Options -class Backend(IntEnum): +class BackendType(IntEnum): AUTO = ... FILE = ... def create_ucxx_comm( - backend: Backend = ..., + type: BackendType = ..., options: Options | None = ..., ) -> Communicator: ... def is_running_with_rrun() -> bool: ... diff --git a/python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyx b/python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyx index e85b1b60b..b43511ce5 100644 --- a/python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyx +++ b/python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyx @@ -8,9 +8,9 @@ from rapidsmpf.communicator.communicator cimport Communicator, cpp_Communicator from rapidsmpf.config cimport Options, cpp_Options -cdef extern from "" namespace \ +cdef extern from "" namespace \ "rapidsmpf::bootstrap" nogil: - cpdef enum class Backend(int): + cpdef enum class BackendType(int): AUTO FILE @@ -31,12 +31,12 @@ cdef extern from "" nogil: cdef extern from "" nogil: shared_ptr[cpp_UCXX_Communicator] cpp_create_ucxx_comm \ "rapidsmpf::bootstrap::create_ucxx_comm"( - Backend backend, + BackendType type, cpp_Options options, ) except +ex_handler -def create_ucxx_comm(Backend backend = Backend.AUTO, options = None): +def create_ucxx_comm(BackendType type = BackendType.AUTO, options = None): """ Create a UCXX communicator using the bootstrap backend. @@ -47,8 +47,8 @@ def create_ucxx_comm(Backend backend = Backend.AUTO, options = None): Parameters ---------- - backend - Backend to use for coordination. By default, ``Backend.AUTO`` is used, + type + Backend type to use for coordination. By default, ``BackendType.AUTO`` is used, which currently resolves to the file-based backend. options Configuration options for the UCXX communicator. If ``None``, a default @@ -73,7 +73,7 @@ def create_ucxx_comm(Backend backend = Backend.AUTO, options = None): cpp_options = options with nogil: - ucxx_comm = cpp_create_ucxx_comm(backend, cpp_options._handle) + ucxx_comm = cpp_create_ucxx_comm(type, cpp_options._handle) base_comm = dynamic_pointer_cast[cpp_Communicator, cpp_UCXX_Communicator]( ucxx_comm ) diff --git a/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py b/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py index 7e6b271f4..efd9193d8 100644 --- a/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py +++ b/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """Bulk-synchronous MPI shuffle.""" @@ -299,7 +299,7 @@ def setup_and_run(args: argparse.Namespace) -> None: elif args.cluster_type == "ucxx": if rapidsmpf.bootstrap.is_running_with_rrun(): comm = rapidsmpf.bootstrap.create_ucxx_comm( - backend=rapidsmpf.bootstrap.Backend.AUTO, options=options + type=rapidsmpf.bootstrap.BackendType.AUTO, options=options ) else: comm = ucxx_mpi_setup(options)