From dd74085ed02bd98ad9123b37ed030033ffc58d57 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 10 Dec 2025 11:21:07 +0100 Subject: [PATCH 1/4] Introducing MemoryType::PINNED_HOST --- cpp/benchmarks/bench_shuffle.cpp | 6 +- .../streaming/bench_streaming_shuffle.cpp | 7 +- cpp/benchmarks/streaming/ndsh/q09.cpp | 4 +- cpp/include/rapidsmpf/memory/buffer.hpp | 4 +- .../rapidsmpf/memory/buffer_resource.hpp | 31 +++++++ cpp/include/rapidsmpf/memory/memory_type.hpp | 13 ++- cpp/src/memory/buffer_resource.cpp | 19 ++++- cpp/src/streaming/cudf/table_chunk.cpp | 1 + cpp/tests/streaming/test_table_chunk.cpp | 83 ++++++++++++++++--- cpp/tests/test_buffer_resource.cpp | 12 ++- cpp/tests/test_chunk.cpp | 1 + cpp/tests/test_shuffler.cpp | 9 +- cpp/tests/test_spill_manager.cpp | 2 + .../rapidsmpf/memory/buffer_resource.pxd | 15 +--- .../rapidsmpf/memory/buffer_resource.pyx | 7 +- 15 files changed, 173 insertions(+), 41 deletions(-) diff --git a/cpp/benchmarks/bench_shuffle.cpp b/cpp/benchmarks/bench_shuffle.cpp index ca7b8cc3a..d67818b4e 100644 --- a/cpp/benchmarks/bench_shuffle.cpp +++ b/cpp/benchmarks/bench_shuffle.cpp @@ -629,7 +629,11 @@ int main(int argc, char** argv) { } rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref(); - rapidsmpf::BufferResource br{mr, std::move(memory_available)}; + rapidsmpf::BufferResource br{ + mr, + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, + std::move(memory_available) + }; auto& log = comm->logger(); rmm::cuda_stream_view stream = cudf::get_default_stream(); diff --git a/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp b/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp index 11684ae3f..473b98636 100644 --- a/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp +++ b/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp @@ -356,8 +356,11 @@ int main(int argc, char** argv) { } rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref(); - auto br = - std::make_shared(mr, std::move(memory_available)); + auto br = std::make_shared( + mr, + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, + std::move(memory_available) + ); auto& log = comm->logger(); rmm::cuda_stream_view stream = cudf::get_default_stream(); diff --git a/cpp/benchmarks/streaming/ndsh/q09.cpp b/cpp/benchmarks/streaming/ndsh/q09.cpp index 6a6a78cfd..2275995ee 100644 --- a/cpp/benchmarks/streaming/ndsh/q09.cpp +++ b/cpp/benchmarks/streaming/ndsh/q09.cpp @@ -685,7 +685,9 @@ int main(int argc, char** argv) { }; } auto br = std::make_shared( - stats_mr, std::move(memory_available) + stats_mr, + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, + std::move(memory_available) ); auto envvars = rapidsmpf::config::get_environment_variables(); envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); diff --git a/cpp/include/rapidsmpf/memory/buffer.hpp b/cpp/include/rapidsmpf/memory/buffer.hpp index 73f4c8b43..ae655d7e3 100644 --- a/cpp/include/rapidsmpf/memory/buffer.hpp +++ b/cpp/include/rapidsmpf/memory/buffer.hpp @@ -69,7 +69,9 @@ class Buffer { * This ensures that the buffer is backed by memory that behaves as host * accessible memory. */ - static constexpr std::array host_buffer_types{MemoryType::HOST}; + static constexpr std::array host_buffer_types{ + MemoryType::HOST, MemoryType::PINNED_HOST + }; /** * @brief Access the underlying memory buffer (host or device memory). diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index f2f777bb1..8a42dc9b9 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -50,11 +51,18 @@ class BufferResource { */ using MemoryAvailable = std::function; + /// @brief Sentinel value used to disable pinned host memory. + static constexpr auto PinnedMemoryResourceDisabled = nullptr; + /** * @brief Constructs a buffer resource. * * @param device_mr Reference to the RMM device memory resource used for device * allocations. + * @param pinned_mr The pinned host memory resource used for `MemoryType::PINNED_HOST` + * allocations. If null, pinned host allocations are disabled. In that case, any + * attempt to allocate pinned memory will fail regardless of what @p memory_available + * reports. * @param memory_available Optional memory availability functions mapping memory types * to available memory checkers. Memory types without availability functions are * assumed to have unlimited memory. @@ -68,6 +76,7 @@ class BufferResource { */ BufferResource( rmm::device_async_resource_ref device_mr, + std::shared_ptr pinned_mr = PinnedMemoryResourceDisabled, std::unordered_map memory_available = {}, std::optional periodic_spill_check = std::chrono::milliseconds{1}, std::shared_ptr stream_pool = std::make_shared< @@ -95,6 +104,18 @@ class BufferResource { return host_mr_; } + /** + * @brief Get the RMM pinned host memory resource. + * + * @return Reference to the RMM resource used for pinned host allocations. + */ + [[nodiscard]] rmm::host_async_resource_ref pinned_mr() { + RAPIDSMPF_EXPECTS( + pinned_mr_, "no pinned memory resource is available", std::invalid_argument + ); + return *pinned_mr_; + } + /** * @brief Retrieves the memory availability function for a given memory type. * @@ -165,6 +186,10 @@ class BufferResource { /** * @brief Make a memory reservation or fail based on the given order of memory types. * + * The function attempts to reserve memory by iterating over @p mem_types in the given + * order of preference. For each memory type, it requests a reservation without + * overbooking. If no memory type can satisfy the request, the function throws. + * * @param size The size of the buffer to allocate. * @param mem_types Range of memory types to try to reserve memory from. * @return A memory reservation. @@ -176,6 +201,11 @@ class BufferResource { [[nodiscard]] MemoryReservation reserve_or_fail(size_t size, Range mem_types) { // try to reserve memory from the given order for (auto const& mem_type : mem_types) { + if (mem_type == MemoryType::PINNED_HOST && pinned_mr_ == nullptr) { + // Pinned host memory is only available if the memory resource is + // available. + continue; + } auto [res, _] = reserve(mem_type, size, false); if (res.size() == size) { return std::move(res); @@ -336,6 +366,7 @@ class BufferResource { private: std::mutex mutex_; rmm::device_async_resource_ref device_mr_; + std::shared_ptr pinned_mr_; HostMemoryResource host_mr_; std::unordered_map memory_available_; // Zero initialized reserved counters. diff --git a/cpp/include/rapidsmpf/memory/memory_type.hpp b/cpp/include/rapidsmpf/memory/memory_type.hpp index 91cf91178..ca41d0855 100644 --- a/cpp/include/rapidsmpf/memory/memory_type.hpp +++ b/cpp/include/rapidsmpf/memory/memory_type.hpp @@ -12,15 +12,18 @@ namespace rapidsmpf { /// @brief Enum representing the type of memory. enum class MemoryType : int { DEVICE = 0, ///< Device memory - HOST = 1 ///< Host memory + HOST = 1, ///< Host memory + PINNED_HOST = 2 ///< Pinned host memory }; /// @brief All memory types sorted in decreasing order of preference. -constexpr std::array MEMORY_TYPES{{MemoryType::DEVICE, MemoryType::HOST}}; +constexpr std::array MEMORY_TYPES{ + {MemoryType::DEVICE, MemoryType::PINNED_HOST, MemoryType::HOST} +}; /// @brief Memory type names sorted to match `MEMORY_TYPES`. constexpr std::array MEMORY_TYPE_NAMES{ - {"DEVICE", "HOST"} + {"DEVICE", "PINNED_HOST", "HOST"} }; /** @@ -31,7 +34,9 @@ constexpr std::array MEMORY_TYPE_NAMES{ * insufficient. The ordering reflects the policy of spilling in RapidsMPF, where * earlier entries are considered more desirable spill destinations. */ -constexpr std::array SPILL_TARGET_MEMORY_TYPES{{MemoryType::HOST}}; +constexpr std::array SPILL_TARGET_MEMORY_TYPES{ + {MemoryType::PINNED_HOST, MemoryType::HOST} +}; /** * @brief Get the name of a MemoryType. diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index ca30f4841..eb04e4688 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -15,25 +15,33 @@ namespace rapidsmpf { namespace { /// @brief Helper that adds missing functions to the `memory_available` argument. auto add_missing_availability_functions( - std::unordered_map&& memory_available + std::unordered_map&& memory_available, + bool pinned_mr_is_not_available ) { for (MemoryType mem_type : MEMORY_TYPES) { // Add missing memory availability functions. memory_available.try_emplace(mem_type, std::numeric_limits::max); } + if (pinned_mr_is_not_available) { + memory_available[MemoryType::PINNED_HOST] = []() -> std::int64_t { return 0; }; + } return memory_available; } } // namespace BufferResource::BufferResource( rmm::device_async_resource_ref device_mr, + std::shared_ptr pinned_mr, std::unordered_map memory_available, std::optional periodic_spill_check, std::shared_ptr stream_pool, std::shared_ptr statistics ) : device_mr_{device_mr}, - memory_available_{add_missing_availability_functions(std::move(memory_available))}, + pinned_mr_{std::move(pinned_mr)}, + memory_available_{add_missing_availability_functions( + std::move(memory_available), pinned_mr_ == nullptr + )}, stream_pool_{std::move(stream_pool)}, spill_manager_{this, periodic_spill_check}, statistics_{std::move(statistics)} { @@ -112,6 +120,13 @@ std::unique_ptr BufferResource::allocate( MemoryType::HOST )); break; + case MemoryType::PINNED_HOST: + ret = std::unique_ptr(new Buffer( + std::make_unique(size, stream, pinned_mr()), + stream, + MemoryType::PINNED_HOST + )); + break; case MemoryType::DEVICE: ret = std::unique_ptr(new Buffer( std::make_unique(size, stream, device_mr()), diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index 8e052c26a..d3b02c6a5 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -135,6 +135,7 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { return TableChunk(std::move(table), stream()); } case MemoryType::HOST: + case MemoryType::PINNED_HOST: { // Get the packed data either from `packed_columns_` or `table_view(). std::unique_ptr packed_data; diff --git a/cpp/tests/streaming/test_table_chunk.cpp b/cpp/tests/streaming/test_table_chunk.cpp index 24fd92f68..74cd8b509 100644 --- a/cpp/tests/streaming/test_table_chunk.cpp +++ b/cpp/tests/streaming/test_table_chunk.cpp @@ -28,7 +28,44 @@ using namespace rapidsmpf; using namespace rapidsmpf::streaming; -using StreamingTableChunk = BaseStreamingFixture; +class StreamingTableChunk : public BaseStreamingFixture, + public ::testing::WithParamInterface { + protected: + void SetUp() override { + rapidsmpf::config::Options options( + rapidsmpf::config::get_environment_variables() + ); + + std::unordered_map + memory_available{}; + auto stream_pool = std::make_shared( + 16, rmm::cuda_stream::flags::non_blocking + ); + std::shared_ptr pinned_mr = + BufferResource::PinnedMemoryResourceDisabled; + if (is_pinned_memory_resources_supported()) { + pinned_mr = std::make_shared(); + } + + stream = cudf::get_default_stream(); + br = std::make_shared( + mr_cuda, // device_mr + pinned_mr, // pinned_mr + memory_available, // memory_available + std::chrono::milliseconds{1}, // periodic_spill_check + stream_pool, // stream_pool + Statistics::disabled() // statistics + ); + ctx = std::make_shared( + options, GlobalEnvironment->comm_, br + ); + } + + rmm::cuda_stream_view stream; + rmm::mr::cuda_memory_resource mr_cuda; + std::shared_ptr br; + std::shared_ptr ctx; +}; TEST_F(StreamingTableChunk, FromTable) { constexpr unsigned int num_rows = 100; @@ -143,7 +180,23 @@ TEST_F(StreamingTableChunk, FromPackedDataOnDevice) { EXPECT_EQ(chunk.make_available_cost(), 0); } -TEST_F(StreamingTableChunk, FromPackedDataOnHost) { +INSTANTIATE_TEST_SUITE_P( + StreamingTableChunkWithSpillTargets, + StreamingTableChunk, + ::testing::ValuesIn(rapidsmpf::SPILL_TARGET_MEMORY_TYPES), + [](testing::TestParamInfo const& info) { + return std::string{rapidsmpf::to_string(info.param)}; + } +); + +TEST_P(StreamingTableChunk, FromPackedDataOn) { + auto const spill_mem_type = GetParam(); + if (spill_mem_type == MemoryType::PINNED_HOST + && !is_pinned_memory_resources_supported()) + { + GTEST_SKIP() << "MemoryType::PINNED_HOST isn't supported on the system."; + } + constexpr unsigned int num_rows = 100; constexpr std::int64_t seed = 1337; @@ -154,12 +207,12 @@ TEST_F(StreamingTableChunk, FromPackedDataOnHost) { // Move the gpu_data to a Buffer (still device memory). auto gpu_data_on_device = br->move(std::move(packed_columns.gpu_data), stream); - // Copy the GPU data to host memory. - auto [res, _] = br->reserve(MemoryType::HOST, size, true); - auto gpu_data_on_host = br->move(std::move(gpu_data_on_device), res); + // Copy the GPU data to the current spill target memory type. + auto [res, _] = br->reserve(spill_mem_type, size, true); + auto gpu_data_in_spill_memory = br->move(std::move(gpu_data_on_device), res); auto packed_data = std::make_unique( - std::move(packed_columns.metadata), std::move(gpu_data_on_host) + std::move(packed_columns.metadata), std::move(gpu_data_in_spill_memory) ); TableChunk chunk{std::move(packed_data)}; @@ -187,7 +240,14 @@ TEST_F(StreamingTableChunk, DeviceToDeviceCopy) { CUDF_TEST_EXPECT_TABLES_EQUIVALENT(chunk2.table_view(), expect); } -TEST_F(StreamingTableChunk, DeviceToHostRoundTripCopy) { +TEST_P(StreamingTableChunk, DeviceToHostRoundTripCopy) { + auto const spill_mem_type = GetParam(); + if (spill_mem_type == MemoryType::PINNED_HOST + && !is_pinned_memory_resources_supported()) + { + GTEST_SKIP() << "MemoryType::PINNED_HOST isn't supported on the system."; + } + constexpr unsigned int num_rows = 64; constexpr std::int64_t seed = 2025; @@ -208,7 +268,7 @@ TEST_F(StreamingTableChunk, DeviceToHostRoundTripCopy) { // Copy to host memory -> new chunk should be unavailable. auto host_res = br->reserve_or_fail( - dev_chunk.data_alloc_size(MemoryType::DEVICE), MemoryType::HOST + dev_chunk.data_alloc_size(MemoryType::DEVICE), spill_mem_type ); auto host_copy = dev_chunk.copy(host_res); EXPECT_FALSE(host_copy.is_available()); @@ -224,9 +284,8 @@ TEST_F(StreamingTableChunk, DeviceToHostRoundTripCopy) { } // Host to host copy. - auto host_res2 = br->reserve_or_fail( - host_copy.data_alloc_size(MemoryType::HOST), MemoryType::HOST - ); + auto host_res2 = + br->reserve_or_fail(host_copy.data_alloc_size(spill_mem_type), spill_mem_type); auto host_copy2 = host_copy.copy(host_res2); EXPECT_FALSE(host_copy2.is_available()); EXPECT_TRUE(host_copy2.is_spillable()); @@ -242,7 +301,7 @@ TEST_F(StreamingTableChunk, DeviceToHostRoundTripCopy) { // Bring the new host copy back to device and verify equality. auto dev_res = br->reserve_or_fail( - host_copy2.data_alloc_size(MemoryType::HOST), MemoryType::DEVICE + host_copy2.data_alloc_size(spill_mem_type), MemoryType::DEVICE ); auto dev_back = host_copy2.make_available(dev_res); EXPECT_TRUE(dev_back.is_available()); diff --git a/cpp/tests/test_buffer_resource.cpp b/cpp/tests/test_buffer_resource.cpp index e59a51bbc..e0803d66a 100644 --- a/cpp/tests/test_buffer_resource.cpp +++ b/cpp/tests/test_buffer_resource.cpp @@ -56,7 +56,9 @@ TEST(BufferResource, ReservationOverbooking) { // Create a buffer resource that always have 10 KiB of available device memory. auto dev_mem_available = []() -> std::int64_t { return 10_KiB; }; BufferResource br{ - cudf::get_current_device_resource_ref(), {{MemoryType::DEVICE, dev_mem_available}} + cudf::get_current_device_resource_ref(), + BufferResource::PinnedMemoryResourceDisabled, + {{MemoryType::DEVICE, dev_mem_available}} }; EXPECT_EQ(br.memory_reserved(MemoryType::DEVICE), 0); EXPECT_EQ(br.memory_reserved(MemoryType::HOST), 0); @@ -118,6 +120,7 @@ TEST(BufferResource, ReservationReleasing) { auto dev_mem_available = []() -> std::int64_t { return 10_KiB; }; BufferResource br{ cudf::get_current_device_resource_ref(), + BufferResource::PinnedMemoryResourceDisabled, {{MemoryType::DEVICE, dev_mem_available}, {MemoryType::HOST, dev_mem_available}} }; EXPECT_EQ(br.memory_reserved(MemoryType::DEVICE), 0); @@ -167,7 +170,11 @@ TEST(BufferResource, LimitAvailableMemory) { // Create a buffer resource that limit available device memory to 10 KiB. LimitAvailableMemory dev_mem_available{&mr, 10_KiB}; - BufferResource br{mr, {{MemoryType::DEVICE, dev_mem_available}}}; + BufferResource br{ + mr, + BufferResource::PinnedMemoryResourceDisabled, + {{MemoryType::DEVICE, dev_mem_available}} + }; EXPECT_EQ(dev_mem_available(), 10_KiB); EXPECT_EQ(br.memory_reserved(MemoryType::DEVICE), 0); EXPECT_EQ(br.memory_reserved(MemoryType::HOST), 0); @@ -235,6 +242,7 @@ class BufferResourceReserveOrFailTest : public ::testing::Test { mr = std::make_unique(*cuda_mr); br = std::make_unique( *mr, + BufferResource::PinnedMemoryResourceDisabled, std::unordered_map{ {MemoryType::DEVICE, LimitAvailableMemory{mr.get(), 10_KiB}} } diff --git a/cpp/tests/test_chunk.cpp b/cpp/tests/test_chunk.cpp index 0d2b45001..a510855f0 100644 --- a/cpp/tests/test_chunk.cpp +++ b/cpp/tests/test_chunk.cpp @@ -415,6 +415,7 @@ TEST_F(ChunkTest, ChunkConcatHostBufferAllocation) { // create a new buffer resource with only host memory available br = std::make_unique( cudf::get_current_device_resource_ref(), + BufferResource::PinnedMemoryResourceDisabled, std::unordered_map{ {MemoryType::DEVICE, []() { return 0; }} } diff --git a/cpp/tests/test_shuffler.cpp b/cpp/tests/test_shuffler.cpp index 4a511f1a4..f58a139d1 100644 --- a/cpp/tests/test_shuffler.cpp +++ b/cpp/tests/test_shuffler.cpp @@ -191,7 +191,11 @@ class MemoryAvailable_NumPartition memory_available = std::get<0>(GetParam()); total_num_partitions = std::get<1>(GetParam()); total_num_rows = std::get<2>(GetParam()); - br = std::make_unique(mr(), memory_available); + br = std::make_unique( + mr(), + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, + memory_available + ); shuffler = std::make_unique( GlobalEnvironment->comm_, @@ -629,6 +633,7 @@ TEST_P(ShuffleInsertGroupedTest, InsertPackedData) { // spilling chunks in the ready postbox br = std::make_unique( mr(), + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, get_memory_available_map(rapidsmpf::MemoryType::DEVICE), std::nullopt // disable periodic spill check ); @@ -655,6 +660,7 @@ TEST_P(ShuffleInsertGroupedTest, InsertPackedDataNoHeadroom) { // spilling chunks in the ready postbox br = std::make_unique( mr(), + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, get_memory_available_map(rapidsmpf::MemoryType::HOST), std::nullopt // disable periodic spill check ); @@ -703,6 +709,7 @@ TEST(Shuffler, SpillOnInsertAndExtraction) { std::int64_t device_memory_available{0}; rapidsmpf::BufferResource br{ mr, + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, {{rapidsmpf::MemoryType::DEVICE, [&device_memory_available]() -> std::int64_t { return device_memory_available; diff --git a/cpp/tests/test_spill_manager.cpp b/cpp/tests/test_spill_manager.cpp index b4b42d308..f061ea462 100644 --- a/cpp/tests/test_spill_manager.cpp +++ b/cpp/tests/test_spill_manager.cpp @@ -31,6 +31,7 @@ TEST(SpillManager, SpillFunction) { std::int64_t mem_available = 10_KiB; BufferResource br{ cudf::get_current_device_resource_ref(), + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, {{MemoryType::DEVICE, [&mem_available]() -> std::int64_t { return mem_available; }}} }; @@ -83,6 +84,7 @@ TEST(SpillManager, PeriodicSpillCheck) { std::chrono::milliseconds period{1}; BufferResource br{ cudf::get_current_device_resource_ref(), + BufferResource::PinnedMemoryResourceDisabled, {{MemoryType::DEVICE, []() -> std::int64_t { return -100_KiB; }}}, period, }; diff --git a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pxd b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pxd index 7ca0246a0..86f982ed7 100644 --- a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pxd +++ b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pxd @@ -1,16 +1,10 @@ # SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 -from cython.operator cimport dereference as deref from libc.stddef cimport size_t from libc.stdint cimport int64_t -from libcpp cimport bool as bool_t -from libcpp.memory cimport shared_ptr, unique_ptr -from libcpp.optional cimport optional -from libcpp.pair cimport pair -from libcpp.unordered_map cimport unordered_map +from libcpp.memory cimport shared_ptr from rmm.librmm.cuda_stream_pool cimport cuda_stream_pool -from rmm.librmm.memory_resource cimport device_memory_resource from rmm.pylibrmm.cuda_stream_pool cimport CudaStreamPool from rmm.pylibrmm.memory_resource cimport DeviceMemoryResource @@ -29,13 +23,6 @@ cdef extern from "" nogil: cdef extern from "" nogil: cdef cppclass cpp_BufferResource "rapidsmpf::BufferResource": - cpp_BufferResource( - device_memory_resource *device_mr, - unordered_map[MemoryType, cpp_MemoryAvailable] memory_available, - optional[cpp_Duration] periodic_spill_check, - shared_ptr[cuda_stream_pool] stream_pool, - shared_ptr[cpp_Statistics] statistics, - ) except + size_t memory_reserved(MemoryType mem_type) except + cpp_MemoryAvailable memory_available(MemoryType mem_type) except + cpp_SpillManager &spill_manager() except + diff --git a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx index 6439bc66c..50f3596e7 100644 --- a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx +++ b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx @@ -3,7 +3,11 @@ from cython.operator cimport dereference as deref from libc.stdint cimport int64_t -from libcpp.memory cimport make_shared, shared_ptr +from libcpp cimport bool as bool_t +from libcpp.memory cimport make_shared, nullptr, shared_ptr, unique_ptr +from libcpp.optional cimport optional +from libcpp.pair cimport pair +from libcpp.unordered_map cimport unordered_map from libcpp.utility cimport move from rmm.librmm.cuda_stream_pool cimport cuda_stream_pool @@ -188,6 +192,7 @@ cdef class BufferResource: with nogil: self._handle = make_shared[cpp_BufferResource]( device_mr.get_mr(), + nullptr, # TODO: Write Python bindings for PinnedMemoryResource move(_mem_available), period, cpp_stream_pool, From 7410e76b68c89cd31add282d29600d0d941efb5d Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Dec 2025 09:40:47 +0100 Subject: [PATCH 2/4] sort MemoryType --- cpp/include/rapidsmpf/memory/memory_type.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/memory_type.hpp b/cpp/include/rapidsmpf/memory/memory_type.hpp index ca41d0855..485c63f75 100644 --- a/cpp/include/rapidsmpf/memory/memory_type.hpp +++ b/cpp/include/rapidsmpf/memory/memory_type.hpp @@ -9,11 +9,11 @@ namespace rapidsmpf { -/// @brief Enum representing the type of memory. +/// @brief Enum representing the type of memory sorted in decreasing order of preference. enum class MemoryType : int { DEVICE = 0, ///< Device memory - HOST = 1, ///< Host memory - PINNED_HOST = 2 ///< Pinned host memory + PINNED_HOST = 1, ///< Pinned host memory + HOST = 2 ///< Host memory }; /// @brief All memory types sorted in decreasing order of preference. @@ -21,7 +21,7 @@ constexpr std::array MEMORY_TYPES{ {MemoryType::DEVICE, MemoryType::PINNED_HOST, MemoryType::HOST} }; -/// @brief Memory type names sorted to match `MEMORY_TYPES`. +/// @brief Memory type names sorted to match `MemoryType` and `MEMORY_TYPES`. constexpr std::array MEMORY_TYPE_NAMES{ {"DEVICE", "PINNED_HOST", "HOST"} }; From a80b49e593cc454b79cefa437c49f01c2f6036b5 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Dec 2025 12:39:25 +0100 Subject: [PATCH 3/4] Update cpp/include/rapidsmpf/memory/buffer_resource.hpp Co-authored-by: Lawrence Mitchell --- cpp/include/rapidsmpf/memory/buffer_resource.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 8a42dc9b9..46615a4f7 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -201,7 +201,7 @@ class BufferResource { [[nodiscard]] MemoryReservation reserve_or_fail(size_t size, Range mem_types) { // try to reserve memory from the given order for (auto const& mem_type : mem_types) { - if (mem_type == MemoryType::PINNED_HOST && pinned_mr_ == nullptr) { + if (mem_type == MemoryType::PINNED_HOST && pinned_mr_ == PinnedMemoryResourceDisabled) { // Pinned host memory is only available if the memory resource is // available. continue; From 8dff61cf8dc8beb2d4dac6f94592e9dc8e6f2556 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Dec 2025 12:41:41 +0100 Subject: [PATCH 4/4] PinnedMemoryResourceDisabled --- cpp/include/rapidsmpf/memory/buffer_resource.hpp | 4 +++- cpp/src/memory/buffer_resource.cpp | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 46615a4f7..56046e977 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -201,7 +201,9 @@ class BufferResource { [[nodiscard]] MemoryReservation reserve_or_fail(size_t size, Range mem_types) { // try to reserve memory from the given order for (auto const& mem_type : mem_types) { - if (mem_type == MemoryType::PINNED_HOST && pinned_mr_ == PinnedMemoryResourceDisabled) { + if (mem_type == MemoryType::PINNED_HOST + && pinned_mr_ == PinnedMemoryResourceDisabled) + { // Pinned host memory is only available if the memory resource is // available. continue; diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index eb04e4688..db7f0d42a 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -40,7 +40,7 @@ BufferResource::BufferResource( : device_mr_{device_mr}, pinned_mr_{std::move(pinned_mr)}, memory_available_{add_missing_availability_functions( - std::move(memory_available), pinned_mr_ == nullptr + std::move(memory_available), pinned_mr_ == PinnedMemoryResourceDisabled )}, stream_pool_{std::move(stream_pool)}, spill_manager_{this, periodic_spill_check},