From 7263bb7131bc3b39b79b3bfd908d27f58a7c7a28 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sun, 25 May 2025 18:02:54 +0000 Subject: [PATCH 1/8] WIP --- src/registered_memory.cc | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 3e2bc92a1..bc552545f 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -277,7 +277,15 @@ RegisteredMemory::Impl::Impl(const std::vector& serialization) { RegisteredMemory::Impl::~Impl() { // Close the CUDA IPC handle if it was opened during deserialization - if (data && transports.has(Transport::CudaIpc) && getHostHash() == this->hostHash && getPidHash() != this->pidHash) { + if (data && transports.has(Transport::CudaIpc) && getHostHash() == this->hostHash) { + if (getPidHash() == this->pidHash) { + // For local registered memory + if (fileDesc >= 0) { + close(fileDesc); + fileDesc = -1; + } + return; + } void* base = static_cast(data) - getTransportInfo(Transport::CudaIpc).cudaIpcOffsetFromBase; if (this->isCuMemMapAlloc) { CUmemGenericAllocationHandle handle; @@ -288,9 +296,6 @@ RegisteredMemory::Impl::~Impl() { MSCCLPP_CULOG_WARN(cuMemUnmap((CUdeviceptr)base, size)); MSCCLPP_CULOG_WARN(cuMemRelease(handle)); MSCCLPP_CULOG_WARN(cuMemAddressFree((CUdeviceptr)base, size)); - if (getNvlsMemHandleType() == CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR && fileDesc >= 0) { - close(fileDesc); - } } else { cudaError_t err = cudaIpcCloseMemHandle(base); if (err != cudaSuccess) { From 75ad66aaf6054785a873d3476dc74e8639d192af Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 27 May 2025 16:22:22 +0000 Subject: [PATCH 2/8] Add dma buf check --- src/ib.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ib.cc b/src/ib.cc index 475579638..e7468c56b 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -59,8 +59,11 @@ IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size) : buff(buff) { MSCCLPP_CUTHROW(cuCtxGetDevice(&dev)); MSCCLPP_CUTHROW(cuDeviceGetAttribute(&dmaBufSupported, CU_DEVICE_ATTRIBUTE_DMA_BUF_SUPPORTED, dev)); #endif // !defined(__HIP_PLATFORM_AMD__) - if (cuMemAlloc && dmaBufSupported) { + if (cuMemAlloc) { #if !defined(__HIP_PLATFORM_AMD__) + if (!dmaBufSupported) { + throw mscclpp::Error("Please make sure dma buffer is supported by the device", ErrorCode::InvalidUsage); + } int fd; MSCCLPP_CUTHROW(cuMemGetHandleForAddressRange(&fd, addr, pages * pageSize, CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD, 0)); From 594c269adbadddc4ce0c2d29d5f1c51ac6ea9d07 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 3 Jun 2025 22:31:35 +0000 Subject: [PATCH 3/8] WIP --- src/nvls.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/nvls.cc b/src/nvls.cc index 188028aad..9317cb640 100644 --- a/src/nvls.cc +++ b/src/nvls.cc @@ -109,7 +109,8 @@ NvlsConnection::Impl::Impl(const std::vector& data) { } NvlsConnection::Impl::~Impl() { - // we don't need to free multicast handle object according to NCCL. + // Please ensure that all memory mappings are unmapped from the handle before calling the connection destructor. + cuMemRelease(mcHandle_); if (rootPid_ == getpid()) { close(mcFileDesc_); } From 36c88931607a72e5973568108469f22caa0c296b Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 3 Jun 2025 23:49:18 +0000 Subject: [PATCH 4/8] add removeMemory API --- include/mscclpp/port_channel.hpp | 7 +++++++ src/port_channel.cc | 16 ++++++++++++++++ test/unit/core_tests.cc | 12 ++++++++++++ 3 files changed, 35 insertions(+) diff --git a/include/mscclpp/port_channel.hpp b/include/mscclpp/port_channel.hpp index dfcdf0cd3..2f6eab5af 100644 --- a/include/mscclpp/port_channel.hpp +++ b/include/mscclpp/port_channel.hpp @@ -4,6 +4,8 @@ #ifndef MSCCLPP_PORT_CHANNEL_HPP_ #define MSCCLPP_PORT_CHANNEL_HPP_ +#include + #include "core.hpp" #include "port_channel_device.hpp" #include "proxy.hpp" @@ -45,6 +47,10 @@ class ProxyService : public BaseProxyService { /// @return The ID of the memory region. MemoryId addMemory(RegisteredMemory memory); + /// Unregister a memory region from the proxy service. + /// @param memoryId The ID of the memory region to unregister. + void removeMemory(MemoryId memoryId); + /// Get a semaphore by ID. /// @param id The ID of the semaphore. /// @return The semaphore. @@ -72,6 +78,7 @@ class ProxyService : public BaseProxyService { std::vector> semaphores_; std::vector memories_; std::shared_ptr proxy_; + std::set reusableMemoryIds_; int deviceNumaNode; std::unordered_map, int> inflightRequests; diff --git a/src/port_channel.cc b/src/port_channel.cc index d05162be5..a3f006716 100644 --- a/src/port_channel.cc +++ b/src/port_channel.cc @@ -38,10 +38,26 @@ MSCCLPP_API_CPP SemaphoreId ProxyService::addSemaphore(std::shared_ptr= memories_.size()) { + WARN("Attempted to remove a memory that is not registered or already removed: %u", memoryId); + return; + } + memories_[memoryId] = RegisteredMemory(); + reusableMemoryIds_.insert(memoryId); +} + MSCCLPP_API_CPP std::shared_ptr ProxyService::semaphore(SemaphoreId id) const { return semaphores_[id]; } diff --git a/test/unit/core_tests.cc b/test/unit/core_tests.cc index d2a53d434..0a0fb7ef1 100644 --- a/test/unit/core_tests.cc +++ b/test/unit/core_tests.cc @@ -5,6 +5,7 @@ #include #include +#include class LocalCommunicatorTest : public ::testing::Test { protected: @@ -12,10 +13,12 @@ class LocalCommunicatorTest : public ::testing::Test { bootstrap = std::make_shared(0, 1); bootstrap->initialize(bootstrap->createUniqueId()); comm = std::make_shared(bootstrap); + proxyService = std::make_shared(); } std::shared_ptr bootstrap; std::shared_ptr comm; + std::shared_ptr proxyService; }; TEST_F(LocalCommunicatorTest, RegisterMemory) { @@ -36,3 +39,12 @@ TEST_F(LocalCommunicatorTest, SendMemoryToSelf) { EXPECT_EQ(sameMemory.size(), memory.size()); EXPECT_EQ(sameMemory.transports(), memory.transports()); } + +TEST_F(LocalCommunicatorTest, ProxyServiceAddRemoveMemory) { + auto memory = mscclpp::RegisteredMemory(); + auto memoryId = proxyService->addMemory(memory); + EXPECT_EQ(memoryId, 0); + proxyService->removeMemory(memoryId); + memoryId = proxyService->addMemory(memory); + EXPECT_EQ(memoryId, 0); +} From f4f23955151d41531a8e9929257a9023b3a41d58 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 12 Jun 2025 23:50:34 +0000 Subject: [PATCH 5/8] WIP --- include/mscclpp/port_channel.hpp | 8 ++++++-- include/mscclpp/utils.hpp | 10 ++++++++++ src/port_channel.cc | 19 +++++++++++-------- src/utils.cc | 8 ++++++++ 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/include/mscclpp/port_channel.hpp b/include/mscclpp/port_channel.hpp index 2f6eab5af..ff374f11e 100644 --- a/include/mscclpp/port_channel.hpp +++ b/include/mscclpp/port_channel.hpp @@ -48,6 +48,9 @@ class ProxyService : public BaseProxyService { MemoryId addMemory(RegisteredMemory memory); /// Unregister a memory region from the proxy service. + /// @note It is the caller’s responsibility to manage memory lifetimes safely. + /// ProxyService only ensures that memory remains valid while it is in use by the service; + /// other peers may still hold references to that memory beyond this scope. /// @param memoryId The ID of the memory region to unregister. void removeMemory(MemoryId memoryId); @@ -79,8 +82,9 @@ class ProxyService : public BaseProxyService { std::vector memories_; std::shared_ptr proxy_; std::set reusableMemoryIds_; - int deviceNumaNode; - std::unordered_map, int> inflightRequests; + int deviceNumaNode_; + std::unordered_map, int> inflightRequests_; + std::atomic_flag lock_; void bindThread(); diff --git a/include/mscclpp/utils.hpp b/include/mscclpp/utils.hpp index 882622fca..69e98beff 100644 --- a/include/mscclpp/utils.hpp +++ b/include/mscclpp/utils.hpp @@ -35,6 +35,16 @@ std::string getIBDeviceName(Transport ibTransport); /// @return The InfiniBand transport associated with the specified device name. Transport getIBTransportByDeviceName(const std::string& ibDeviceName); +/// A simple spinlock implementation using std::atomic_flag. +/// It is used to protect shared resources in a multi-threaded environment. +class SpinLock { + public: + SpinLock(std::atomic_flag& flag); + ~SpinLock(); + + private: + std::atomic_flag& flag_; +}; } // namespace mscclpp #endif // MSCCLPP_UTILS_HPP_ diff --git a/src/port_channel.cc b/src/port_channel.cc index a3f006716..4115cab2e 100644 --- a/src/port_channel.cc +++ b/src/port_channel.cc @@ -23,7 +23,8 @@ MSCCLPP_API_CPP ProxyService::ProxyService(size_t fifoSize) [&]() { bindThread(); }, fifoSize)) { int cudaDevice; MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice)); - deviceNumaNode = getDeviceNumaNode(cudaDevice); + deviceNumaNode_ = getDeviceNumaNode(cudaDevice); + inflightRequests_.reserve(32); // Reserve to prevent frequent reallocations. } MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& communicator, @@ -50,6 +51,7 @@ MSCCLPP_API_CPP MemoryId ProxyService::addMemory(RegisteredMemory memory) { } MSCCLPP_API_CPP void ProxyService::removeMemory(MemoryId memoryId) { + SpinLock spin(lock_); if (reusableMemoryIds_.find(memoryId) != reusableMemoryIds_.end() || memoryId >= memories_.size()) { WARN("Attempted to remove a memory that is not registered or already removed: %u", memoryId); return; @@ -75,9 +77,9 @@ MSCCLPP_API_CPP void ProxyService::startProxy() { proxy_->start(); } MSCCLPP_API_CPP void ProxyService::stopProxy() { proxy_->stop(); } MSCCLPP_API_CPP void ProxyService::bindThread() { - if (deviceNumaNode >= 0) { - numaBind(deviceNumaNode); - INFO(MSCCLPP_INIT, "NUMA node of ProxyService proxy thread is set to %d", deviceNumaNode); + if (deviceNumaNode_ >= 0) { + numaBind(deviceNumaNode_); + INFO(MSCCLPP_INIT, "NUMA node of ProxyService proxy thread is set to %d", deviceNumaNode_); } } @@ -89,23 +91,24 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) { int maxWriteQueueSize = semaphore->connection()->getMaxWriteQueueSize(); if (trigger->fields.type & TriggerData) { + SpinLock spin(lock_); RegisteredMemory& dst = memories_[trigger->fields.dstMemoryId]; RegisteredMemory& src = memories_[trigger->fields.srcMemoryId]; semaphore->connection()->write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, trigger->fields.size); - inflightRequests[semaphore->connection()]++; + inflightRequests_[semaphore->connection()]++; } if (trigger->fields.type & TriggerFlag) { semaphore->signal(); - inflightRequests[semaphore->connection()]++; + inflightRequests_[semaphore->connection()]++; } if (trigger->fields.type & TriggerSync || - (maxWriteQueueSize != -1 && inflightRequests[semaphore->connection()] > maxWriteQueueSize)) { + (maxWriteQueueSize != -1 && inflightRequests_[semaphore->connection()] > maxWriteQueueSize)) { semaphore->connection()->flush(); result = ProxyHandlerResult::FlushFifoTailAndContinue; - inflightRequests[semaphore->connection()] = 0; + inflightRequests_[semaphore->connection()] = 0; } return result; diff --git a/src/utils.cc b/src/utils.cc index 803a5fc62..932359676 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -21,4 +21,12 @@ std::string getHostName(int maxlen, const char delim) { return hostname.substr(0, i); } +SpinLock::SpinLock(std::atomic_flag& flag) : flag_(flag) { + while (flag_.test_and_set(std::memory_order_acq_rel)) { + // Spin until the lock is acquired. + } +} + +SpinLock::~SpinLock() { flag_.clear(std::memory_order_release); } + } // namespace mscclpp From 3189de4f0c4e8cf9bb6659893b6d2e62ad3f2f28 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 13 Jun 2025 02:30:20 +0000 Subject: [PATCH 6/8] WIP --- src/port_channel.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/port_channel.cc b/src/port_channel.cc index 4115cab2e..46375a666 100644 --- a/src/port_channel.cc +++ b/src/port_channel.cc @@ -24,7 +24,6 @@ MSCCLPP_API_CPP ProxyService::ProxyService(size_t fifoSize) int cudaDevice; MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice)); deviceNumaNode_ = getDeviceNumaNode(cudaDevice); - inflightRequests_.reserve(32); // Reserve to prevent frequent reallocations. } MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& communicator, From 631c665c29277f296af280eb39161e696e5cd190 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 13 Jun 2025 18:23:34 +0000 Subject: [PATCH 7/8] WIP --- include/mscclpp/utils.hpp | 2 +- src/port_channel.cc | 5 ++++- src/utils.cc | 6 ++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/include/mscclpp/utils.hpp b/include/mscclpp/utils.hpp index 69e98beff..b04a2f63f 100644 --- a/include/mscclpp/utils.hpp +++ b/include/mscclpp/utils.hpp @@ -39,7 +39,7 @@ Transport getIBTransportByDeviceName(const std::string& ibDeviceName); /// It is used to protect shared resources in a multi-threaded environment. class SpinLock { public: - SpinLock(std::atomic_flag& flag); + SpinLock(std::atomic_flag& flag, bool yield = true); ~SpinLock(); private: diff --git a/src/port_channel.cc b/src/port_channel.cc index 46375a666..d6869019f 100644 --- a/src/port_channel.cc +++ b/src/port_channel.cc @@ -28,16 +28,19 @@ MSCCLPP_API_CPP ProxyService::ProxyService(size_t fifoSize) MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& communicator, std::shared_ptr connection) { + SpinLock spin(lock_); semaphores_.push_back(std::make_shared(communicator, connection)); return semaphores_.size() - 1; } MSCCLPP_API_CPP SemaphoreId ProxyService::addSemaphore(std::shared_ptr semaphore) { + SpinLock spin(lock_); semaphores_.push_back(semaphore); return semaphores_.size() - 1; } MSCCLPP_API_CPP MemoryId ProxyService::addMemory(RegisteredMemory memory) { + SpinLock spin(lock_); if (!reusableMemoryIds_.empty()) { auto it = reusableMemoryIds_.begin(); MemoryId memoryId = *it; @@ -83,6 +86,7 @@ MSCCLPP_API_CPP void ProxyService::bindThread() { } ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) { + SpinLock spin(lock_, false); ChannelTrigger* trigger = reinterpret_cast(&triggerRaw); std::shared_ptr semaphore = semaphores_[trigger->fields.semaphoreId]; @@ -90,7 +94,6 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) { int maxWriteQueueSize = semaphore->connection()->getMaxWriteQueueSize(); if (trigger->fields.type & TriggerData) { - SpinLock spin(lock_); RegisteredMemory& dst = memories_[trigger->fields.dstMemoryId]; RegisteredMemory& src = memories_[trigger->fields.srcMemoryId]; semaphore->connection()->write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, diff --git a/src/utils.cc b/src/utils.cc index 932359676..e450e2bd3 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -21,9 +21,11 @@ std::string getHostName(int maxlen, const char delim) { return hostname.substr(0, i); } -SpinLock::SpinLock(std::atomic_flag& flag) : flag_(flag) { +SpinLock::SpinLock(std::atomic_flag& flag, bool yield) : flag_(flag) { while (flag_.test_and_set(std::memory_order_acq_rel)) { - // Spin until the lock is acquired. + if (yield) { + std::this_thread::yield(); + } } } From 11e7b7e1973a45f1be247c9db43be977e7d76564 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 13 Jun 2025 18:29:53 +0000 Subject: [PATCH 8/8] WIP --- src/port_channel.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/port_channel.cc b/src/port_channel.cc index d6869019f..74119ffdb 100644 --- a/src/port_channel.cc +++ b/src/port_channel.cc @@ -20,7 +20,8 @@ MSCCLPP_API_CPP PortChannel::PortChannel(SemaphoreId semaphoreId, std::shared_pt MSCCLPP_API_CPP ProxyService::ProxyService(size_t fifoSize) : proxy_(std::make_shared([&](ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, - [&]() { bindThread(); }, fifoSize)) { + [&]() { bindThread(); }, fifoSize)), + lock_(false) { int cudaDevice; MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice)); deviceNumaNode_ = getDeviceNumaNode(cudaDevice);