From 1d28b4e5ecdb6e3240d95828e40058de28f81902 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Tue, 9 Dec 2025 14:05:57 -0800 Subject: [PATCH 01/11] adding chunked_pack Signed-off-by: niranda perera --- .../rapidsmpf/integrations/cudf/partition.hpp | 20 +++++ cpp/include/rapidsmpf/memory/buffer.hpp | 7 ++ cpp/src/integrations/cudf/partition.cpp | 39 +++++++++ cpp/src/memory/buffer.cpp | 10 +++ cpp/tests/test_partition.cpp | 87 +++++++++++++++++++ 5 files changed, 163 insertions(+) diff --git a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp index 4b03c6826..b8bb1ce2c 100644 --- a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp +++ b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp @@ -209,4 +209,24 @@ std::vector unspill_partitions( std::shared_ptr statistics = Statistics::disabled() ); +/** + * @brief Pack a table using a @p chunk_size device buffer using `cudf::chunked_pack`. + * + * @param table The table to pack. + * @param chunk_size The size of the temporary device buffer to use (must be at least 1 + * MiB enforced by cudf::chunked_pack). + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param br Buffer resource for memory allocations. + * + * @return A `PackedData` containing the packed table. + * + * @throws std::runtime_error If the memory allocation fails. + */ +PackedData chunked_pack( + cudf::table_view const& table, + size_t chunk_size, + rmm::cuda_stream_view stream, + BufferResource* br +); + } // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/memory/buffer.hpp b/cpp/include/rapidsmpf/memory/buffer.hpp index b6c73c16e..9ecdc66a6 100644 --- a/cpp/include/rapidsmpf/memory/buffer.hpp +++ b/cpp/include/rapidsmpf/memory/buffer.hpp @@ -172,6 +172,13 @@ class Buffer { */ std::byte* exclusive_data_access(); + /** + * @brief Record the latest write event on a locked buffer and unlock the buffer. + * + * @throws std::logic_error If the buffer is not locked. + */ + void record_lastest_write_and_unlock(); + /** * @brief Release the exclusive lock acquired by `exclusive_data_access()`. */ diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index 31b023cb6..295fa6092 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -267,4 +267,43 @@ std::vector unspill_partitions( statistics->add_bytes_stat("spill-bytes-host-to-device", non_device_size); return ret; } + +PackedData chunked_pack( + cudf::table_view const& table, + size_t chunk_size, + rmm::cuda_stream_view stream, + BufferResource* br +) { + cudf::chunked_pack packer(table, chunk_size, stream, br->device_mr()); + + // make a reservation for the bounce buffer with overbooking and hold it until we are + // done + auto [bounce_buf_res, _] = br->reserve(MemoryType::DEVICE, chunk_size, true); + auto bounce_buf = br->allocate(chunk_size, stream, bounce_buf_res); + cudf::device_span buf_span( + reinterpret_cast(bounce_buf->exclusive_data_access()), chunk_size + ); + + auto const total_size = packer.get_total_contiguous_size(); + // make a reservation from any available memory type + auto reservation = br->reserve_or_fail(total_size, MEMORY_TYPES); + + auto data = br->allocate(total_size, stream, reservation); + std::byte* data_ptr = data->exclusive_data_access(); + size_t offset = 0; + while (packer.has_next()) { + size_t n_bytes = packer.next(buf_span); + RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync( + data_ptr + offset, buf_span.data(), n_bytes, cudaMemcpyDefault, stream + )); + offset += n_bytes; + } + // record the latest write and unlock the data buffer + data->record_lastest_write_and_unlock(); + + // release the exclusive lock on the bounce buffer + bounce_buf->unlock(); + + return {packer.build_metadata(), std::move(data)}; +} } // namespace rapidsmpf diff --git a/cpp/src/memory/buffer.cpp b/cpp/src/memory/buffer.cpp index 04c467b90..581ff5fa3 100644 --- a/cpp/src/memory/buffer.cpp +++ b/cpp/src/memory/buffer.cpp @@ -107,6 +107,16 @@ std::byte* Buffer::exclusive_data_access() { ); } +void Buffer::record_lastest_write_and_unlock() { + RAPIDSMPF_EXPECTS( + lock_.load(std::memory_order_acquire), + "the buffer is not locked", + std::logic_error + ); + latest_write_event_.record(stream_); + unlock(); +} + void Buffer::unlock() { lock_.store(false, std::memory_order_release); } diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index 8433ce630..30c9593a1 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -132,3 +132,90 @@ TEST_F(SpillingTest, SpillUnspillRoundtripPreservesDataAndMetadata) { auto actual = br->move_to_host_buffer(std::move(back_on_host[0].data), res); EXPECT_EQ(actual->copy_to_uint8_vector(), payload); } + +class NumOfRows : public ::testing::TestWithParam> { + protected: + // cudf::chunked_pack requires at least a 1 MiB bounce buffer + static constexpr size_t chunk_size = 1 << 20; + + void SetUp() override { + auto mem_type = std::get<1>(GetParam()); + + std::unordered_map memory_available; + // disable all memory types except mem_type + for (auto mt : MEMORY_TYPES) { + if (mt != mem_type) { + memory_available[mt] = []() { return 0; }; + } + } + + br = std::make_unique( + cudf::get_current_device_resource_ref(), memory_available + ); + stream = cudf::get_default_stream(); + } + + std::unique_ptr br; + rmm::cuda_stream_view stream; +}; + +// test different `num_rows` and `MemoryType`. +INSTANTIATE_TEST_SUITE_P( + ChunkedPack, + NumOfRows, + ::testing::Combine( + ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), + ::testing::Values(MemoryType::DEVICE, MemoryType::HOST) + ), + [](const testing::TestParamInfo& info) { + return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_" + + MEMORY_TYPE_NAMES[static_cast(std::get<1>(info.param))]; + } +); + +TEST_P(NumOfRows, chunked_pack) { + auto const [num_rows, mem_type] = GetParam(); + std::int64_t const seed = 42; + + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + // Get result from split_and_pack with empty splits (single partition) + std::vector empty_splits{}; + auto split_result = + rapidsmpf::split_and_pack(input_table, empty_splits, stream, br.get()); + ASSERT_EQ(split_result.size(), 1); + auto& split_packed = split_result.at(0); + + // Get result from chunked_pack + auto chunked_packed = + rapidsmpf::chunked_pack(input_table, chunk_size, stream, br.get()); + + if (num_rows == 0) { // empty buffers are always marked as device memory + EXPECT_EQ(MemoryType::DEVICE, chunked_packed.data->mem_type()); + } else { + EXPECT_EQ(mem_type, chunked_packed.data->mem_type()); + } + + // Unpack both and compare the resulting tables + cudf::packed_columns split_columns{ + std::move(split_packed.metadata), + std::make_unique( + split_packed.data->data(), split_packed.data->size, stream, br->device_mr() + ) + }; + cudf::packed_columns chunked_columns{ + std::move(chunked_packed.metadata), + std::make_unique( + chunked_packed.data->data(), + chunked_packed.data->size, + stream, + br->device_mr() + ) + }; + + stream.synchronize(); + + auto split_table = cudf::unpack(split_columns); + auto chunked_table = cudf::unpack(chunked_columns); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(split_table, chunked_table); +} From f6224a8a964039cdf31d3f6a2663d593b17ae886 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Tue, 9 Dec 2025 16:08:02 -0800 Subject: [PATCH 02/11] adding chunked_pack to table chunk Signed-off-by: niranda perera --- .../rapidsmpf/integrations/cudf/partition.hpp | 20 +++++- cpp/src/integrations/cudf/partition.cpp | 33 ++++++--- cpp/src/streaming/cudf/table_chunk.cpp | 69 +++++++++++++------ cpp/tests/test_partition.cpp | 15 ++-- 4 files changed, 97 insertions(+), 40 deletions(-) diff --git a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp index b8bb1ce2c..c0df7c4e4 100644 --- a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp +++ b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp @@ -209,6 +209,19 @@ std::vector unspill_partitions( std::shared_ptr statistics = Statistics::disabled() ); +/// @brief The amount of extra memory to reserve for packing. +constexpr size_t packing_wiggle_room_per_column = 1024; ///< 1 KiB per column + +/** + * @brief The total amount of extra memory to reserve for packing. + * + * @param table The table to pack. + * @return The total amount of extra memory to reserve for packing. + */ +inline size_t total_packing_wiggle_room(cudf::table_view const& table) { + return packing_wiggle_room_per_column * static_cast(table.num_columns()); +} + /** * @brief Pack a table using a @p chunk_size device buffer using `cudf::chunked_pack`. * @@ -216,7 +229,9 @@ std::vector unspill_partitions( * @param chunk_size The size of the temporary device buffer to use (must be at least 1 * MiB enforced by cudf::chunked_pack). * @param stream CUDA stream used for device memory operations and kernel launches. - * @param br Buffer resource for memory allocations. + * @param bounce_buf_res Device memory reservation for the bounce buffer. + * @param data_res Memory reservation for the data buffer. If the final packed buffer size + * is with in a wiggle room, this @p data_res will be padded to the packed buffer size. * * @return A `PackedData` containing the packed table. * @@ -226,7 +241,8 @@ PackedData chunked_pack( cudf::table_view const& table, size_t chunk_size, rmm::cuda_stream_view stream, - BufferResource* br + MemoryReservation& bounce_buf_res, + MemoryReservation& data_res ); } // namespace rapidsmpf diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index 295fa6092..c3091f61d 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -272,23 +272,36 @@ PackedData chunked_pack( cudf::table_view const& table, size_t chunk_size, rmm::cuda_stream_view stream, - BufferResource* br + MemoryReservation& bounce_buf_res, + MemoryReservation& data_res ) { - cudf::chunked_pack packer(table, chunk_size, stream, br->device_mr()); + RAPIDSMPF_EXPECTS( + bounce_buf_res.mem_type() == MemoryType::DEVICE, + "bounce buffer must be in device memory" + ); + cudf::chunked_pack packer( + table, chunk_size, stream, bounce_buf_res.br()->device_mr() + ); + auto const packed_size = packer.get_total_contiguous_size(); + + // if the packed size > data reservation, and it is within the wiggle room, pad the + // data reservation to the packed size from the same memory type. + if (packed_size > data_res.size()) { + if (packed_size <= data_res.size() + total_packing_wiggle_room(table)) { + data_res = + data_res.br()->reserve(data_res.mem_type(), packed_size, true).first; + } + } // make a reservation for the bounce buffer with overbooking and hold it until we are // done - auto [bounce_buf_res, _] = br->reserve(MemoryType::DEVICE, chunk_size, true); - auto bounce_buf = br->allocate(chunk_size, stream, bounce_buf_res); + auto bounce_buf = bounce_buf_res.br()->allocate(chunk_size, stream, bounce_buf_res); cudf::device_span buf_span( reinterpret_cast(bounce_buf->exclusive_data_access()), chunk_size ); - auto const total_size = packer.get_total_contiguous_size(); - // make a reservation from any available memory type - auto reservation = br->reserve_or_fail(total_size, MEMORY_TYPES); - - auto data = br->allocate(total_size, stream, reservation); + // allocate the data buffer + auto data = data_res.br()->allocate(packed_size, stream, data_res); std::byte* data_ptr = data->exclusive_data_access(); size_t offset = 0; while (packer.has_next()) { @@ -302,6 +315,8 @@ PackedData chunked_pack( data->record_lastest_write_and_unlock(); // release the exclusive lock on the bounce buffer + // TODO: there is a possibility that bounce buffer destructor is a called before the + // async copies are completed. Should we synchronize the stream here? bounce_buf->unlock(); return {packer.build_metadata(), std::move(data)}; diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index 8e052c26a..d1b4ec66b 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -5,6 +5,7 @@ #include +#include #include #include #include @@ -169,32 +170,56 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // serialize `table_view()` into a packed_columns and then we move // the packed_columns' gpu_data to a new host buffer. - // TODO: use `cudf::chunked_pack()` with a bounce buffer. Currently, - // `cudf::pack()` allocates device memory we haven't reserved. - auto packed_columns = - cudf::pack(table_view(), stream(), br->device_mr()); - packed_data = std::make_unique( - std::move(packed_columns.metadata), - br->move(std::move(packed_columns.gpu_data), stream()) + // make a reservation for packing + auto [pack_res, overbooking] = br->reserve( + MemoryType::DEVICE, + estimated_memory_usage(table_view(), stream()), + true ); - // Handle the case where `cudf::pack` allocates slightly more than the - // input size. This can occur because cudf uses aligned allocations, - // which may exceed the requested size. To accommodate this, we - // allow some wiggle room. - if (packed_data->data->size > reservation.size()) { - auto const wiggle_room = - 1024 * static_cast(table_view().num_columns()); - if (packed_data->data->size <= reservation.size() + wiggle_room) { - reservation = - br->reserve( - MemoryType::HOST, packed_data->data->size, true - ) - .first; + if (overbooking > 0) { + // there is not enough memory to pack the table. + size_t avail_dev_mem = pack_res.size() - overbooking; + RAPIDSMPF_EXPECTS( + avail_dev_mem > 1 << 20, + "not enough device memory for the bounce buffer", + std::runtime_error + ); + + packed_data = std::make_unique(chunked_pack( + table_view(), avail_dev_mem, stream(), pack_res, reservation + )); + } else { + // if there is enough memory to pack the table, use `cudf::pack` + auto packed_columns = + cudf::pack(table_view(), stream(), br->device_mr()); + // clear the reservation as we are done with it. + pack_res.clear(); + packed_data = std::make_unique( + std::move(packed_columns.metadata), + br->move(std::move(packed_columns.gpu_data), stream()) + ); + + // Handle the case where `cudf::pack` allocates slightly more than + // the input size. This can occur because cudf uses aligned + // allocations, which may exceed the requested size. To + // accommodate this, we allow some wiggle room. + if (packed_data->data->size > reservation.size()) { + if (packed_data->data->size + <= reservation.size() + + total_packing_wiggle_room(table_view())) + { + reservation = + br->reserve( + MemoryType::HOST, packed_data->data->size, true + ) + .first; + } } + // finally copy the packed data device buffer to HOST memory + packed_data->data = + br->move(std::move(packed_data->data), reservation); } - packed_data->data = - br->move(std::move(packed_data->data), reservation); } return TableChunk(std::move(packed_data)); } diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index 30c9593a1..a2a4c94dc 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -186,15 +186,16 @@ TEST_P(NumOfRows, chunked_pack) { ASSERT_EQ(split_result.size(), 1); auto& split_packed = split_result.at(0); + auto [bounce_buf_res, _] = br->reserve(MemoryType::DEVICE, chunk_size, true); + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + // Get result from chunked_pack - auto chunked_packed = - rapidsmpf::chunked_pack(input_table, chunk_size, stream, br.get()); + auto chunked_packed = rapidsmpf::chunked_pack( + input_table, chunk_size, stream, bounce_buf_res, data_res + ); - if (num_rows == 0) { // empty buffers are always marked as device memory - EXPECT_EQ(MemoryType::DEVICE, chunked_packed.data->mem_type()); - } else { - EXPECT_EQ(mem_type, chunked_packed.data->mem_type()); - } + EXPECT_EQ(mem_type, chunked_packed.data->mem_type()); // Unpack both and compare the resulting tables cudf::packed_columns split_columns{ From 3f67fce4472de0e407db1220691e56c61233d2e5 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Tue, 9 Dec 2025 16:42:06 -0800 Subject: [PATCH 03/11] fix tests Signed-off-by: niranda perera --- cpp/include/rapidsmpf/memory/buffer.hpp | 7 ----- cpp/src/integrations/cudf/partition.cpp | 40 ++++++++++++++----------- cpp/src/memory/buffer.cpp | 11 +------ 3 files changed, 23 insertions(+), 35 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/buffer.hpp b/cpp/include/rapidsmpf/memory/buffer.hpp index 9ecdc66a6..b6c73c16e 100644 --- a/cpp/include/rapidsmpf/memory/buffer.hpp +++ b/cpp/include/rapidsmpf/memory/buffer.hpp @@ -172,13 +172,6 @@ class Buffer { */ std::byte* exclusive_data_access(); - /** - * @brief Record the latest write event on a locked buffer and unlock the buffer. - * - * @throws std::logic_error If the buffer is not locked. - */ - void record_lastest_write_and_unlock(); - /** * @brief Release the exclusive lock acquired by `exclusive_data_access()`. */ diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index c3091f61d..5431366a3 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -296,28 +296,32 @@ PackedData chunked_pack( // make a reservation for the bounce buffer with overbooking and hold it until we are // done auto bounce_buf = bounce_buf_res.br()->allocate(chunk_size, stream, bounce_buf_res); - cudf::device_span buf_span( - reinterpret_cast(bounce_buf->exclusive_data_access()), chunk_size - ); - // allocate the data buffer - auto data = data_res.br()->allocate(packed_size, stream, data_res); - std::byte* data_ptr = data->exclusive_data_access(); - size_t offset = 0; - while (packer.has_next()) { - size_t n_bytes = packer.next(buf_span); - RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync( - data_ptr + offset, buf_span.data(), n_bytes, cudaMemcpyDefault, stream - )); - offset += n_bytes; - } - // record the latest write and unlock the data buffer - data->record_lastest_write_and_unlock(); + auto data = bounce_buf->write_access([&](std::byte* bounce_buf_ptr, + rmm::cuda_stream_view stream) { + cudf::device_span buf_span( + reinterpret_cast(bounce_buf_ptr), chunk_size + ); + + // allocate the data buffer + auto data = data_res.br()->allocate(packed_size, stream, data_res); + + data->write_access([&](std::byte* data_ptr, rmm::cuda_stream_view stream) { + size_t offset = 0; + while (packer.has_next()) { + size_t n_bytes = packer.next(buf_span); + RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync( + data_ptr + offset, buf_span.data(), n_bytes, cudaMemcpyDefault, stream + )); + offset += n_bytes; + } + }); + + return data; + }); - // release the exclusive lock on the bounce buffer // TODO: there is a possibility that bounce buffer destructor is a called before the // async copies are completed. Should we synchronize the stream here? - bounce_buf->unlock(); return {packer.build_metadata(), std::move(data)}; } diff --git a/cpp/src/memory/buffer.cpp b/cpp/src/memory/buffer.cpp index 581ff5fa3..fe53ee116 100644 --- a/cpp/src/memory/buffer.cpp +++ b/cpp/src/memory/buffer.cpp @@ -25,6 +25,7 @@ Buffer::Buffer(std::unique_ptr host_buffer, rmm::cuda_stream_view st RAPIDSMPF_EXPECTS( std::get(storage_) != nullptr, "the host_buffer cannot be NULL" ); + latest_write_event_.record(stream_); } Buffer::Buffer(std::unique_ptr device_buffer) @@ -107,16 +108,6 @@ std::byte* Buffer::exclusive_data_access() { ); } -void Buffer::record_lastest_write_and_unlock() { - RAPIDSMPF_EXPECTS( - lock_.load(std::memory_order_acquire), - "the buffer is not locked", - std::logic_error - ); - latest_write_event_.record(stream_); - unlock(); -} - void Buffer::unlock() { lock_.store(false, std::memory_order_release); } From 241d69774c305e6a6f92f45b20ec21cbfbb825fd Mon Sep 17 00:00:00 2001 From: niranda perera Date: Tue, 9 Dec 2025 22:04:52 -0800 Subject: [PATCH 04/11] minor change Signed-off-by: niranda perera --- cpp/src/memory/buffer.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/memory/buffer.cpp b/cpp/src/memory/buffer.cpp index fe53ee116..04c467b90 100644 --- a/cpp/src/memory/buffer.cpp +++ b/cpp/src/memory/buffer.cpp @@ -25,7 +25,6 @@ Buffer::Buffer(std::unique_ptr host_buffer, rmm::cuda_stream_view st RAPIDSMPF_EXPECTS( std::get(storage_) != nullptr, "the host_buffer cannot be NULL" ); - latest_write_event_.record(stream_); } Buffer::Buffer(std::unique_ptr device_buffer) From d12c5eb531b09e4ac5c8af9a98005e743254e3d4 Mon Sep 17 00:00:00 2001 From: Niranda Perera Date: Wed, 10 Dec 2025 09:39:44 -0800 Subject: [PATCH 05/11] Apply suggestions from code review Co-authored-by: Mads R. B. Kristensen --- cpp/include/rapidsmpf/integrations/cudf/partition.hpp | 2 +- cpp/src/integrations/cudf/partition.cpp | 2 -- cpp/tests/test_partition.cpp | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp index c0df7c4e4..7d7df7a4b 100644 --- a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp +++ b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp @@ -228,7 +228,7 @@ inline size_t total_packing_wiggle_room(cudf::table_view const& table) { * @param table The table to pack. * @param chunk_size The size of the temporary device buffer to use (must be at least 1 * MiB enforced by cudf::chunked_pack). - * @param stream CUDA stream used for device memory operations and kernel launches. + * @param stream The CUDA stream @p table was created on. * @param bounce_buf_res Device memory reservation for the bounce buffer. * @param data_res Memory reservation for the data buffer. If the final packed buffer size * is with in a wiggle room, this @p data_res will be padded to the packed buffer size. diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index 5431366a3..e8fe64119 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -293,8 +293,6 @@ PackedData chunked_pack( } } - // make a reservation for the bounce buffer with overbooking and hold it until we are - // done auto bounce_buf = bounce_buf_res.br()->allocate(chunk_size, stream, bounce_buf_res); auto data = bounce_buf->write_access([&](std::byte* bounce_buf_ptr, diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index a2a4c94dc..1b4cec292 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -165,7 +165,7 @@ INSTANTIATE_TEST_SUITE_P( NumOfRows, ::testing::Combine( ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), - ::testing::Values(MemoryType::DEVICE, MemoryType::HOST) + ::testing::ValuesIn(MEMORY_TYPES) ), [](const testing::TestParamInfo& info) { return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_" From 2a08a1d86f78b6e550868792ecc9a3deb3b09b90 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 10 Dec 2025 10:12:21 -0800 Subject: [PATCH 06/11] addressing PR comments Signed-off-by: niranda perera --- .../rapidsmpf/integrations/cudf/partition.hpp | 15 +++---- cpp/src/integrations/cudf/partition.cpp | 40 ++++++++----------- cpp/src/streaming/cudf/table_chunk.cpp | 7 ++-- cpp/tests/test_partition.cpp | 6 +-- 4 files changed, 30 insertions(+), 38 deletions(-) diff --git a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp index 7d7df7a4b..355265d68 100644 --- a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp +++ b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp @@ -225,24 +225,21 @@ inline size_t total_packing_wiggle_room(cudf::table_view const& table) { /** * @brief Pack a table using a @p chunk_size device buffer using `cudf::chunked_pack`. * + * All device operations will be performed on @p bounce_buf 's stream. + * `cudf::chunked_pack` requires the buffer to be at least 1 MiB in size. + * * @param table The table to pack. - * @param chunk_size The size of the temporary device buffer to use (must be at least 1 - * MiB enforced by cudf::chunked_pack). - * @param stream The CUDA stream @p table was created on. - * @param bounce_buf_res Device memory reservation for the bounce buffer. + * @param bounce_buf A device bounce buffer to use for packing. * @param data_res Memory reservation for the data buffer. If the final packed buffer size * is with in a wiggle room, this @p data_res will be padded to the packed buffer size. * * @return A `PackedData` containing the packed table. * * @throws std::runtime_error If the memory allocation fails. + * @throws std::invalid_argument If the bounce buffer is not in device memory. */ PackedData chunked_pack( - cudf::table_view const& table, - size_t chunk_size, - rmm::cuda_stream_view stream, - MemoryReservation& bounce_buf_res, - MemoryReservation& data_res + cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res ); } // namespace rapidsmpf diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index e8fe64119..53a857d6c 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -269,19 +269,19 @@ std::vector unspill_partitions( } PackedData chunked_pack( - cudf::table_view const& table, - size_t chunk_size, - rmm::cuda_stream_view stream, - MemoryReservation& bounce_buf_res, - MemoryReservation& data_res + cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res ) { RAPIDSMPF_EXPECTS( - bounce_buf_res.mem_type() == MemoryType::DEVICE, - "bounce buffer must be in device memory" - ); - cudf::chunked_pack packer( - table, chunk_size, stream, bounce_buf_res.br()->device_mr() + bounce_buf.mem_type() == MemoryType::DEVICE, + "bounce buffer must be in device memory", + std::invalid_argument ); + // all copies will be done on the bounce buffer's stream + auto const& stream = bounce_buf.stream(); + auto* br = data_res.br(); + size_t chunk_size = bounce_buf.size; + + cudf::chunked_pack packer(table, chunk_size, stream, br->device_mr()); auto const packed_size = packer.get_total_contiguous_size(); // if the packed size > data reservation, and it is within the wiggle room, pad the @@ -293,18 +293,16 @@ PackedData chunked_pack( } } - auto bounce_buf = bounce_buf_res.br()->allocate(chunk_size, stream, bounce_buf_res); + // allocate the data buffer + auto data_buf = br->allocate(packed_size, stream, data_res); - auto data = bounce_buf->write_access([&](std::byte* bounce_buf_ptr, - rmm::cuda_stream_view stream) { + bounce_buf.write_access([&](std::byte* bounce_buf_ptr, rmm::cuda_stream_view) { + // all copies are done on the same stream, so we can omit the stream parameter cudf::device_span buf_span( reinterpret_cast(bounce_buf_ptr), chunk_size ); - // allocate the data buffer - auto data = data_res.br()->allocate(packed_size, stream, data_res); - - data->write_access([&](std::byte* data_ptr, rmm::cuda_stream_view stream) { + data_buf->write_access([&](std::byte* data_ptr, rmm::cuda_stream_view) { size_t offset = 0; while (packer.has_next()) { size_t n_bytes = packer.next(buf_span); @@ -314,13 +312,9 @@ PackedData chunked_pack( offset += n_bytes; } }); - - return data; }); - // TODO: there is a possibility that bounce buffer destructor is a called before the - // async copies are completed. Should we synchronize the stream here? - - return {packer.build_metadata(), std::move(data)}; + return {packer.build_metadata(), std::move(data_buf)}; } + } // namespace rapidsmpf diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index d1b4ec66b..c9d7e889a 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -185,10 +185,11 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { "not enough device memory for the bounce buffer", std::runtime_error ); + auto bounce_buf = br->allocate(avail_dev_mem, stream(), pack_res); - packed_data = std::make_unique(chunked_pack( - table_view(), avail_dev_mem, stream(), pack_res, reservation - )); + packed_data = std::make_unique( + chunked_pack(table_view(), *bounce_buf, reservation) + ); } else { // if there is enough memory to pack the table, use `cudf::pack` auto packed_columns = diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index 1b4cec292..e30608d94 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -187,13 +187,13 @@ TEST_P(NumOfRows, chunked_pack) { auto& split_packed = split_result.at(0); auto [bounce_buf_res, _] = br->reserve(MemoryType::DEVICE, chunk_size, true); + auto bounce_buf = br->allocate(chunk_size, stream, bounce_buf_res); + auto data_res = br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); // Get result from chunked_pack - auto chunked_packed = rapidsmpf::chunked_pack( - input_table, chunk_size, stream, bounce_buf_res, data_res - ); + auto chunked_packed = rapidsmpf::chunked_pack(input_table, *bounce_buf, data_res); EXPECT_EQ(mem_type, chunked_packed.data->mem_type()); From bf5088da7d066ad933e97e2468e4b476d2ce1f0e Mon Sep 17 00:00:00 2001 From: Niranda Perera Date: Thu, 11 Dec 2025 09:26:10 -0800 Subject: [PATCH 07/11] Apply suggestions from code review Co-authored-by: Mads R. B. Kristensen Co-authored-by: Lawrence Mitchell --- cpp/src/integrations/cudf/partition.cpp | 3 +-- cpp/tests/test_partition.cpp | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index 53a857d6c..af8a41c2f 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -277,7 +277,7 @@ PackedData chunked_pack( std::invalid_argument ); // all copies will be done on the bounce buffer's stream - auto const& stream = bounce_buf.stream(); + auto stream = bounce_buf.stream(); auto* br = data_res.br(); size_t chunk_size = bounce_buf.size; @@ -293,7 +293,6 @@ PackedData chunked_pack( } } - // allocate the data buffer auto data_buf = br->allocate(packed_size, stream, data_res); bounce_buf.write_access([&](std::byte* bounce_buf_ptr, rmm::cuda_stream_view) { diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index e30608d94..36ea74405 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -179,10 +179,8 @@ TEST_P(NumOfRows, chunked_pack) { cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); - // Get result from split_and_pack with empty splits (single partition) - std::vector empty_splits{}; auto split_result = - rapidsmpf::split_and_pack(input_table, empty_splits, stream, br.get()); + rapidsmpf::split_and_pack(input_table, {}, stream, br.get()); ASSERT_EQ(split_result.size(), 1); auto& split_packed = split_result.at(0); From 5f984f62c7822e3eeb11d56173c32fb8e65a7209 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 17 Dec 2025 11:20:03 -0800 Subject: [PATCH 08/11] adding pack_to_host Signed-off-by: niranda perera --- .../rapidsmpf/integrations/cudf/partition.hpp | 32 +++++ .../rapidsmpf/memory/buffer_resource.hpp | 4 +- cpp/include/rapidsmpf/memory/memory_type.hpp | 36 +++++ cpp/src/integrations/cudf/partition.cpp | 71 +++++++++- cpp/src/streaming/cudf/table_chunk.cpp | 52 +------ cpp/tests/test_partition.cpp | 134 ++++++++++++++---- 6 files changed, 246 insertions(+), 83 deletions(-) diff --git a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp index 355265d68..aeb78e1a8 100644 --- a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp +++ b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp @@ -242,4 +242,36 @@ PackedData chunked_pack( cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res ); +/// @brief The minimum buffer size for `cudf::chunked_pack`. +constexpr size_t cudf_chunked_pack_min_buffer_size = 1 << 20; ///< 1 MiB + +/** + * @brief Pack a table to host memory using `cudf::pack` or `cudf::chunked_pack`. + * + * If device memory reservation can be made for the estimated table size, `cudf::pack` + * is used. Otherwise, `cudf::chunked_pack` is used with a bounce buffer size of the + * estimated table size * @p cpack_buf_size_factor (with at least + * `cudf_chunked_pack_min_buffer_size`). + * + * @param table The table to pack. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param host_data_res Memory reservation for the host data buffer. + * @param cpack_buf_size_factor The factor to use for the chunked pack buffer size. + * Default is 0.1, i.e. 10% of the estimated table size. + * @param cpack_buf_mem_types The memory types to use for the bounce buffer. Default is + * `DEVICE_ACCESSIBLE_MEMORY_TYPES`. + * + * @return A `PackedData` containing the packed table. + * + * @throws std::invalid_argument If the memory reservation is not host accessible. + * @throws std::runtime_error If the memory reservation fails. + */ +std::unique_ptr pack_to_host( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& host_data_res, + float cpack_buf_size_factor = 0.1, + std::span cpack_buf_mem_types = DEVICE_ACCESSIBLE_MEMORY_TYPES +); + } // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 8bc7df73c..eb594cd6d 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -211,7 +211,9 @@ class BufferResource { return std::move(res); } } - RAPIDSMPF_FAIL("failed to reserve memory", std::runtime_error); + RAPIDSMPF_FAIL( + "failed to reserve memory " + std::to_string(size), std::runtime_error + ); } /** diff --git a/cpp/include/rapidsmpf/memory/memory_type.hpp b/cpp/include/rapidsmpf/memory/memory_type.hpp index 01b75d9a6..04250d3ba 100644 --- a/cpp/include/rapidsmpf/memory/memory_type.hpp +++ b/cpp/include/rapidsmpf/memory/memory_type.hpp @@ -40,6 +40,20 @@ constexpr std::array SPILL_TARGET_MEMORY_TYPES{ {MemoryType::PINNED_HOST, MemoryType::HOST} }; +/** + * @brief Memory types that are device accessible in the order of preference. + */ +constexpr std::array DEVICE_ACCESSIBLE_MEMORY_TYPES{ + {MemoryType::DEVICE, MemoryType::PINNED_HOST} +}; + +/** + * @brief Memory types that are host accessible in the order of preference. + */ +constexpr std::array HOST_ACCESSIBLE_MEMORY_TYPES{ + {MemoryType::PINNED_HOST, MemoryType::HOST} +}; + /** * @brief Get the memory types with preference lower than or equal to @p mem_type. * @@ -65,6 +79,28 @@ static_assert(std::ranges::equal( leq_memory_types(static_cast(-1)), std::ranges::empty_view{} )); +/** + * @brief Check if the memory type is host accessible. + * + * @param mem_type The memory type. + * @return True if the memory type is host accessible, false otherwise. + */ +constexpr bool is_host_accessible(MemoryType mem_type) noexcept { + return std::ranges::find(HOST_ACCESSIBLE_MEMORY_TYPES, mem_type) + != std::ranges::end(HOST_ACCESSIBLE_MEMORY_TYPES); +}; + +/** + * @brief Check if the memory type is device accessible. + * + * @param mem_type The memory type. + * @return True if the memory type is device accessible, false otherwise. + */ +constexpr bool is_device_accessible(MemoryType mem_type) noexcept { + return std::ranges::find(DEVICE_ACCESSIBLE_MEMORY_TYPES, mem_type) + != std::ranges::end(DEVICE_ACCESSIBLE_MEMORY_TYPES); +}; + /** * @brief Get the name of a MemoryType. * diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index af8a41c2f..a3cabccf5 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -272,10 +272,11 @@ PackedData chunked_pack( cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res ) { RAPIDSMPF_EXPECTS( - bounce_buf.mem_type() == MemoryType::DEVICE, - "bounce buffer must be in device memory", + is_device_accessible(bounce_buf.mem_type()), + "bounce buffer is not device accessible", std::invalid_argument ); + // all copies will be done on the bounce buffer's stream auto stream = bounce_buf.stream(); auto* br = data_res.br(); @@ -316,4 +317,70 @@ PackedData chunked_pack( return {packer.build_metadata(), std::move(data_buf)}; } +std::unique_ptr pack_to_host( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& host_data_res, + float chunked_pack_buffer_size_factor, + std::span cpack_buf_mem_types +) { + RAPIDSMPF_EXPECTS( + is_host_accessible(host_data_res.mem_type()), + "memory reservation is not host accessible", + std::invalid_argument + ); + + auto* br = host_data_res.br(); + + size_t est_table_size = estimated_memory_usage(table, stream); + { + // make a device reservation for packing + auto [pack_res, overbooking] = + br->reserve(MemoryType::DEVICE, est_table_size, true); + + if (overbooking == 0) { + // if there is enough memory to pack the table, use `cudf::pack` + auto packed_columns = cudf::pack(table, stream, br->device_mr()); + // clear the reservation as we are done with it. + pack_res.clear(); + + // note that this is a device buffer, so we need to move it to host memory + auto packed_data = std::make_unique( + std::move(packed_columns.metadata), + br->move(std::move(packed_columns.gpu_data), stream) + ); + + // Handle the case where `cudf::pack` allocates slightly more than + // the input size. This can occur because cudf uses aligned + // allocations, which may exceed the requested size. To + // accommodate this, we allow some wiggle room. + if (packed_data->data->size > host_data_res.size()) { + if (packed_data->data->size + <= host_data_res.size() + total_packing_wiggle_room(table)) + { + host_data_res = + br->reserve( + host_data_res.mem_type(), packed_data->data->size, true + ) + .first; + } + } + + // finally copy the packed data device buffer to HOST memory + packed_data->data = br->move(std::move(packed_data->data), host_data_res); + return packed_data; + } + } + + // there is not enough memory to use cudf::pack. Use chunked_pack. + auto chunk_size = std::max( + static_cast(est_table_size * chunked_pack_buffer_size_factor), + cudf_chunked_pack_min_buffer_size + ); + auto bounce_res = br->reserve_or_fail(chunk_size, cpack_buf_mem_types); + auto bounce_buf = br->allocate(chunk_size, stream, bounce_res); + + return std::make_unique(chunked_pack(table, *bounce_buf, host_data_res)); +} + } // namespace rapidsmpf diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index c221e8a9c..a363b30cc 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -171,57 +171,7 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // serialize `table_view()` into a packed_columns and then we move // the packed_columns' gpu_data to a new host buffer. - // make a reservation for packing - auto [pack_res, overbooking] = br->reserve( - MemoryType::DEVICE, - estimated_memory_usage(table_view(), stream()), - true - ); - - if (overbooking > 0) { - // there is not enough memory to pack the table. - size_t avail_dev_mem = pack_res.size() - overbooking; - RAPIDSMPF_EXPECTS( - avail_dev_mem > 1 << 20, - "not enough device memory for the bounce buffer", - std::runtime_error - ); - auto bounce_buf = br->allocate(avail_dev_mem, stream(), pack_res); - - packed_data = std::make_unique( - chunked_pack(table_view(), *bounce_buf, reservation) - ); - } else { - // if there is enough memory to pack the table, use `cudf::pack` - auto packed_columns = - cudf::pack(table_view(), stream(), br->device_mr()); - // clear the reservation as we are done with it. - pack_res.clear(); - packed_data = std::make_unique( - std::move(packed_columns.metadata), - br->move(std::move(packed_columns.gpu_data), stream()) - ); - - // Handle the case where `cudf::pack` allocates slightly more than - // the input size. This can occur because cudf uses aligned - // allocations, which may exceed the requested size. To - // accommodate this, we allow some wiggle room. - if (packed_data->data->size > reservation.size()) { - if (packed_data->data->size - <= reservation.size() - + total_packing_wiggle_room(table_view())) - { - reservation = - br->reserve( - MemoryType::HOST, packed_data->data->size, true - ) - .first; - } - } - // finally copy the packed data device buffer to HOST memory - packed_data->data = - br->move(std::move(packed_data->data), reservation); - } + packed_data = pack_to_host(table_view(), stream(), reservation); } return TableChunk(std::move(packed_data)); } diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index 36ea74405..34edac0a8 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -149,13 +150,26 @@ class NumOfRows : public ::testing::TestWithParam> { } } + if (rapidsmpf::is_pinned_memory_resources_supported()) { + pinned_mr = std::make_shared(); + } else { + pinned_mr = PinnedMemoryResource::Disabled; + } + + if (mem_type == MemoryType::PINNED_HOST + && pinned_mr == PinnedMemoryResource::Disabled) + { + GTEST_SKIP() << "MemoryType::PINNED_HOST isn't supported on the system."; + } + br = std::make_unique( - cudf::get_current_device_resource_ref(), memory_available + cudf::get_current_device_resource_ref(), pinned_mr, memory_available ); stream = cudf::get_default_stream(); } std::unique_ptr br; + std::shared_ptr pinned_mr; rmm::cuda_stream_view stream; }; @@ -164,8 +178,8 @@ INSTANTIATE_TEST_SUITE_P( ChunkedPack, NumOfRows, ::testing::Combine( - ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), - ::testing::ValuesIn(MEMORY_TYPES) + ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), // num rows + ::testing::ValuesIn(MEMORY_TYPES) // output memory type ), [](const testing::TestParamInfo& info) { return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_" @@ -176,45 +190,107 @@ INSTANTIATE_TEST_SUITE_P( TEST_P(NumOfRows, chunked_pack) { auto const [num_rows, mem_type] = GetParam(); std::int64_t const seed = 42; - cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); - auto split_result = - rapidsmpf::split_and_pack(input_table, {}, stream, br.get()); - ASSERT_EQ(split_result.size(), 1); - auto& split_packed = split_result.at(0); - auto [bounce_buf_res, _] = br->reserve(MemoryType::DEVICE, chunk_size, true); auto bounce_buf = br->allocate(chunk_size, stream, bounce_buf_res); auto data_res = br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); - // Get result from chunked_pack auto chunked_packed = rapidsmpf::chunked_pack(input_table, *bounce_buf, data_res); EXPECT_EQ(mem_type, chunked_packed.data->mem_type()); - // Unpack both and compare the resulting tables - cudf::packed_columns split_columns{ - std::move(split_packed.metadata), - std::make_unique( - split_packed.data->data(), split_packed.data->size, stream, br->device_mr() - ) - }; - cudf::packed_columns chunked_columns{ - std::move(chunked_packed.metadata), - std::make_unique( - chunked_packed.data->data(), - chunked_packed.data->size, - stream, - br->device_mr() - ) - }; + auto to_device = std::make_unique( + chunked_packed.data->data(), chunked_packed.data->size, stream, br->device_mr() + ); + stream.synchronize(); + + cudf::packed_columns packed_columns( + std::move(chunked_packed.metadata), std::move(to_device) + ); + auto unpacked_table = cudf::unpack(packed_columns); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(input_table, unpacked_table); +} + +class PackToHost : public ::testing::TestWithParam> { + protected: + void SetUp() override { + auto mem_type = std::get<1>(GetParam()); + + std::unordered_map memory_available; + memory_available[rapidsmpf::MemoryType::DEVICE] = []() { + return 1 << 20; + }; // 1 MiB + + if (rapidsmpf::is_pinned_memory_resources_supported()) { + pinned_mr = std::make_shared(); + } else { + pinned_mr = PinnedMemoryResource::Disabled; + } + + if (mem_type == MemoryType::PINNED_HOST + && pinned_mr == PinnedMemoryResource::Disabled) + { + GTEST_SKIP() << "MemoryType::PINNED_HOST isn't supported on the system."; + } + + br = std::make_unique( + cudf::get_current_device_resource_ref(), pinned_mr, memory_available + ); + stream = cudf::get_default_stream(); + } + + std::int64_t const seed = 42; + std::unique_ptr br; + std::shared_ptr pinned_mr; + rmm::cuda_stream_view stream; +}; + +INSTANTIATE_TEST_SUITE_P( + PackToHost, + PackToHost, + ::testing::Combine( + ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), // num rows + ::testing::ValuesIn(SPILL_TARGET_MEMORY_TYPES) // output memory type + ), + [](const testing::TestParamInfo& info) { + return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_" + + MEMORY_TYPE_NAMES[static_cast(std::get<1>(info.param))]; + } +); +TEST_P(PackToHost, pack_to_host) { + auto const [num_rows, mem_type] = GetParam(); + std::int64_t const seed = 42; + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + std::unique_ptr packed_data; + if (rapidsmpf::is_pinned_memory_resources_supported()) { + packed_data = rapidsmpf::pack_to_host(input_table, stream, data_res); + } else { + // only try to use device memory for the bounce buffer since pinned memory is not + // supported. Also force to use a 1MB bounce buffer since device memory is limited + // to 1MB. + std::array cpack_buf_mem_types = {MemoryType::DEVICE}; + packed_data = rapidsmpf::pack_to_host( + input_table, stream, data_res, 0.001, cpack_buf_mem_types + ); + } + EXPECT_EQ(mem_type, packed_data->data->mem_type()); + + auto to_device = std::make_unique( + packed_data->data->data(), packed_data->data->size, stream, br->device_mr() + ); stream.synchronize(); - auto split_table = cudf::unpack(split_columns); - auto chunked_table = cudf::unpack(chunked_columns); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(split_table, chunked_table); + cudf::packed_columns packed_columns( + std::move(packed_data->metadata), std::move(to_device) + ); + auto unpacked_table = cudf::unpack(packed_columns); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(input_table, unpacked_table); } From 78ed513d99ae15f75e7a1a086fd3c36b8e9c328e Mon Sep 17 00:00:00 2001 From: niranda perera Date: Thu, 18 Dec 2025 17:01:00 -0800 Subject: [PATCH 09/11] finalize packing Signed-off-by: niranda perera --- .../rapidsmpf/integrations/cudf/partition.hpp | 45 ++-- .../rapidsmpf/memory/buffer_resource.hpp | 22 +- cpp/include/rapidsmpf/memory/memory_type.hpp | 4 +- cpp/src/integrations/cudf/partition.cpp | 152 +++++++----- cpp/src/memory/buffer_resource.cpp | 18 ++ cpp/src/streaming/cudf/table_chunk.cpp | 2 +- cpp/tests/test_partition.cpp | 216 ++++++++++-------- 7 files changed, 292 insertions(+), 167 deletions(-) diff --git a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp index aeb78e1a8..120de2534 100644 --- a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp +++ b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp @@ -243,35 +243,50 @@ PackedData chunked_pack( ); /// @brief The minimum buffer size for `cudf::chunked_pack`. -constexpr size_t cudf_chunked_pack_min_buffer_size = 1 << 20; ///< 1 MiB +constexpr size_t cudf_chunked_pack_min_buffer_size = size_t(1) << 20; ///< 1 MiB /** * @brief Pack a table to host memory using `cudf::pack` or `cudf::chunked_pack`. * - * If device memory reservation can be made for the estimated table size, `cudf::pack` - * is used. Otherwise, `cudf::chunked_pack` is used with a bounce buffer size of the - * estimated table size * @p cpack_buf_size_factor (with at least - * `cudf_chunked_pack_min_buffer_size`). + * Based on benchmarks (rapidsai/rapidsmpf#745), the order of packing performance is as + * follows: + * - `cudf::pack` -> DEVICE + * - `cudf::chunked_pack` -> DEVICE + * - `cudf::pack` -> PINNED_HOST + * - `cudf::chunked_pack` -> PINNED HOST + * + * This utility using the following strategy: + * - data reservation must be big enough to pack the table. + * - if the data reservation is from device accessible memory, use cudf::pack, as it + * requires O(estimated_table_size) memory, which is already reserved up front. + * - if the data reservation is from host memory, for each memory type in @p + * bounce_buf_types, do the following: + * - try to reserve estimated_table_size for the memory type. + * - if the reservation is successful without overbooking, use cudf::pack, and move the + * packed data device buffer to the data reservation. + * - else if the leftover memory `>= cudf_chunked_pack_min_buffer_size`, allocate a + * device accessible bounce buffer, and use chunked_pack to pack to the data + * reservation. + * - else loop again with the next memory type. + * - if all memory types are tried and no success, fail. * * @param table The table to pack. * @param stream CUDA stream used for device memory operations and kernel launches. - * @param host_data_res Memory reservation for the host data buffer. - * @param cpack_buf_size_factor The factor to use for the chunked pack buffer size. - * Default is 0.1, i.e. 10% of the estimated table size. - * @param cpack_buf_mem_types The memory types to use for the bounce buffer. Default is + * @param data_res Memory reservation for the host data buffer. + * @param bounce_buf_types The memory types to use for the bounce buffer. Default is * `DEVICE_ACCESSIBLE_MEMORY_TYPES`. * * @return A `PackedData` containing the packed table. * - * @throws std::invalid_argument If the memory reservation is not host accessible. - * @throws std::runtime_error If the memory reservation fails. + * @throws std::invalid_argument If the memory reservation is not big enough to pack the + * table. + * @throws std::runtime_error If all attempts to pack the table fail. */ -std::unique_ptr pack_to_host( +std::unique_ptr pack( cudf::table_view const& table, rmm::cuda_stream_view stream, - MemoryReservation& host_data_res, - float cpack_buf_size_factor = 0.1, - std::span cpack_buf_mem_types = DEVICE_ACCESSIBLE_MEMORY_TYPES + MemoryReservation& data_res, + std::span bounce_buf_types = DEVICE_ACCESSIBLE_MEMORY_TYPES ); } // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index eb594cd6d..6d960660e 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -107,13 +107,33 @@ class BufferResource { * * @return Reference to the RMM resource used for pinned host allocations. */ - [[nodiscard]] rmm::host_async_resource_ref pinned_mr() { + [[nodiscard]] rmm::host_device_async_resource_ref pinned_mr() { RAPIDSMPF_EXPECTS( pinned_mr_, "no pinned memory resource is available", std::invalid_argument ); return *pinned_mr_; } + /** + * @brief Get the RMM device memory resource for a given memory type. + * + * @param mem_type The memory type. + * @return Reference to the RMM resource used for device allocations. + * @throws std::invalid_argument if the memory type is not device accessible. + */ + [[nodiscard]] rmm::device_async_resource_ref get_device_mr( + MemoryType const& mem_type + ); + + /** + * @brief Get the RMM host memory resource for a given memory type. + * + * @param mem_type The memory type. + * @return Reference to the RMM resource used for host allocations. + * @throws std::invalid_argument if the memory type is not host accessible. + */ + [[nodiscard]] rmm::host_async_resource_ref get_host_mr(MemoryType const& mem_type); + /** * @brief Retrieves the memory availability function for a given memory type. * diff --git a/cpp/include/rapidsmpf/memory/memory_type.hpp b/cpp/include/rapidsmpf/memory/memory_type.hpp index 04250d3ba..31243b0d9 100644 --- a/cpp/include/rapidsmpf/memory/memory_type.hpp +++ b/cpp/include/rapidsmpf/memory/memory_type.hpp @@ -85,7 +85,7 @@ static_assert(std::ranges::equal( * @param mem_type The memory type. * @return True if the memory type is host accessible, false otherwise. */ -constexpr bool is_host_accessible(MemoryType mem_type) noexcept { +constexpr bool is_host_accessible(MemoryType const& mem_type) noexcept { return std::ranges::find(HOST_ACCESSIBLE_MEMORY_TYPES, mem_type) != std::ranges::end(HOST_ACCESSIBLE_MEMORY_TYPES); }; @@ -96,7 +96,7 @@ constexpr bool is_host_accessible(MemoryType mem_type) noexcept { * @param mem_type The memory type. * @return True if the memory type is device accessible, false otherwise. */ -constexpr bool is_device_accessible(MemoryType mem_type) noexcept { +constexpr bool is_device_accessible(MemoryType const& mem_type) noexcept { return std::ranges::find(DEVICE_ACCESSIBLE_MEMORY_TYPES, mem_type) != std::ranges::end(DEVICE_ACCESSIBLE_MEMORY_TYPES); }; diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index a3cabccf5..c6b3799b8 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -268,6 +268,30 @@ std::vector unspill_partitions( return ret; } +namespace { + +/** + * @brief Pad the data reservation to the packed size if the packed size is within the + * wiggle room. + * + * @param data_res The data reservation to pad. + * @param packed_size The size of the packed data. + * @param table The table to pack. + */ +void pad_data_reservation( + MemoryReservation& data_res, size_t packed_size, cudf::table_view const& table +) { + if (packed_size > data_res.size()) { + if (packed_size <= data_res.size() + total_packing_wiggle_room(table)) { + data_res.clear(); // clear the current reservation + data_res = std::get<0>( + data_res.br()->reserve(data_res.mem_type(), packed_size, true) + ); + } + } +} +} // namespace + PackedData chunked_pack( cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res ) { @@ -287,12 +311,7 @@ PackedData chunked_pack( // if the packed size > data reservation, and it is within the wiggle room, pad the // data reservation to the packed size from the same memory type. - if (packed_size > data_res.size()) { - if (packed_size <= data_res.size() + total_packing_wiggle_room(table)) { - data_res = - data_res.br()->reserve(data_res.mem_type(), packed_size, true).first; - } - } + pad_data_reservation(data_res, packed_size, table); auto data_buf = br->allocate(packed_size, stream, data_res); @@ -317,70 +336,89 @@ PackedData chunked_pack( return {packer.build_metadata(), std::move(data_buf)}; } -std::unique_ptr pack_to_host( +std::unique_ptr pack( cudf::table_view const& table, rmm::cuda_stream_view stream, - MemoryReservation& host_data_res, - float chunked_pack_buffer_size_factor, + MemoryReservation& data_res, std::span cpack_buf_mem_types ) { + auto* br = data_res.br(); + + auto cudf_pack = + [&](rmm::device_async_resource_ref device_mr) -> std::unique_ptr { + // if there is enough memory to pack the table, use `cudf::pack` + auto packed_columns = cudf::pack(table, stream, device_mr); + + auto packed_data = std::make_unique( + std::move(packed_columns.metadata), + br->move(std::move(packed_columns.gpu_data), stream) + ); + + pad_data_reservation(data_res, packed_data->data->size, table); + + // Note: in when using pinned memory, data is returned as a rmm::device_buffer. + // This data can not be released. Therefore, we need to make a copy. + // if the data res is device, this will be a no-op. + packed_data->data = br->move(std::move(packed_data->data), data_res); + + return packed_data; + }; + + size_t est_table_size = estimated_memory_usage(table, stream); + + // irrepective of the memory type, the reservation must be big enough to copy the + // output data buffer. RAPIDSMPF_EXPECTS( - is_host_accessible(host_data_res.mem_type()), - "memory reservation is not host accessible", + data_res.size() >= est_table_size, + "data reservation is not big enough to pack the table", std::invalid_argument ); - auto* br = host_data_res.br(); + // if the data reservation is from device accessible memory, use cudf::pack, as it + // performs better than chunked_pack. cudf::pack will require O(estimated_table_size) + // memory. + if (is_device_accessible(data_res.mem_type())) { + // use the memory resource corresponding to the data reservation, so that + // cudf::pack will allocate memory from that memory type. + return cudf_pack(br->get_device_mr(data_res.mem_type())); + } else { // HOST data reservations. + + // try to allocate as much device accessible memory as possible for the bounce + // buffer (max est_table_size). + for (auto const& mem_type : cpack_buf_mem_types) { + auto [res, overbooking] = br->reserve(mem_type, est_table_size, true); + + if (overbooking == 0) { + // there is enough memory to pack the table, use `cudf::pack` + auto packed_data = cudf_pack(br->get_device_mr(mem_type)); + + // finally copy the packed data device buffer to data reservation + + // if the packed data size is within a certain wiggle room, pad the data + // reservation to that size. + pad_data_reservation(data_res, packed_data->data->size, table); + + // finally copy the packed data device buffer to HOST memory. + // Note that if the padding exceeds the wiggle room, the following move + // will likely OOM. + packed_data->data = br->move(std::move(packed_data->data), data_res); + return packed_data; + } - size_t est_table_size = estimated_memory_usage(table, stream); - { - // make a device reservation for packing - auto [pack_res, overbooking] = - br->reserve(MemoryType::DEVICE, est_table_size, true); - - if (overbooking == 0) { - // if there is enough memory to pack the table, use `cudf::pack` - auto packed_columns = cudf::pack(table, stream, br->device_mr()); - // clear the reservation as we are done with it. - pack_res.clear(); - - // note that this is a device buffer, so we need to move it to host memory - auto packed_data = std::make_unique( - std::move(packed_columns.metadata), - br->move(std::move(packed_columns.gpu_data), stream) - ); + size_t leftover_mem = res.size() > overbooking ? res.size() - overbooking : 0; - // Handle the case where `cudf::pack` allocates slightly more than - // the input size. This can occur because cudf uses aligned - // allocations, which may exceed the requested size. To - // accommodate this, we allow some wiggle room. - if (packed_data->data->size > host_data_res.size()) { - if (packed_data->data->size - <= host_data_res.size() + total_packing_wiggle_room(table)) - { - host_data_res = - br->reserve( - host_data_res.mem_type(), packed_data->data->size, true - ) - .first; - } + if (leftover_mem >= cudf_chunked_pack_min_buffer_size) { + // use device memory for the bounce buffer + auto bounce_buf = br->allocate(leftover_mem, stream, res); + return std::make_unique( + chunked_pack(table, *bounce_buf, data_res) + ); } - - // finally copy the packed data device buffer to HOST memory - packed_data->data = br->move(std::move(packed_data->data), host_data_res); - return packed_data; } - } - - // there is not enough memory to use cudf::pack. Use chunked_pack. - auto chunk_size = std::max( - static_cast(est_table_size * chunked_pack_buffer_size_factor), - cudf_chunked_pack_min_buffer_size - ); - auto bounce_res = br->reserve_or_fail(chunk_size, cpack_buf_mem_types); - auto bounce_buf = br->allocate(chunk_size, stream, bounce_res); - return std::make_unique(chunked_pack(table, *bounce_buf, host_data_res)); + // if we get here, all attempts to pack the table have failed. + RAPIDSMPF_FAIL("failed to pack the table", std::runtime_error); + } } } // namespace rapidsmpf diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index f2cf48aff..0ddb4a184 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -49,6 +49,24 @@ BufferResource::BufferResource( RAPIDSMPF_EXPECTS(statistics_ != nullptr, "the statistics pointer cannot be NULL"); } +rmm::device_async_resource_ref BufferResource::get_device_mr(MemoryType const& mem_type) { + RAPIDSMPF_EXPECTS( + is_device_accessible(mem_type), + "memory type must be device accessible", + std::invalid_argument + ); + return mem_type == MemoryType::DEVICE ? device_mr() : pinned_mr(); +} + +rmm::host_async_resource_ref BufferResource::get_host_mr(MemoryType const& mem_type) { + RAPIDSMPF_EXPECTS( + is_host_accessible(mem_type), + "memory type must be host accessible", + std::invalid_argument + ); + return mem_type == MemoryType::PINNED_HOST ? pinned_mr() : host_mr(); +} + std::pair BufferResource::reserve( MemoryType mem_type, std::size_t size, bool allow_overbooking ) { diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index a363b30cc..c4deb4dc8 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -171,7 +171,7 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // serialize `table_view()` into a packed_columns and then we move // the packed_columns' gpu_data to a new host buffer. - packed_data = pack_to_host(table_view(), stream(), reservation); + packed_data = pack(table_view(), stream(), reservation); } return TableChunk(std::move(packed_data)); } diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index 34edac0a8..5b2009bbb 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -134,22 +134,16 @@ TEST_F(SpillingTest, SpillUnspillRoundtripPreservesDataAndMetadata) { EXPECT_EQ(actual->copy_to_uint8_vector(), payload); } -class NumOfRows : public ::testing::TestWithParam> { +class NumOfRows_MemType : public ::testing::TestWithParam> { protected: // cudf::chunked_pack requires at least a 1 MiB bounce buffer static constexpr size_t chunk_size = 1 << 20; + static constexpr int64_t seed = 42; - void SetUp() override { - auto mem_type = std::get<1>(GetParam()); - - std::unordered_map memory_available; - // disable all memory types except mem_type - for (auto mt : MEMORY_TYPES) { - if (mt != mem_type) { - memory_available[mt] = []() { return 0; }; - } - } - + void setup_br( + MemoryType mem_type, + std::unordered_map&& memory_available + ) { if (rapidsmpf::is_pinned_memory_resources_supported()) { pinned_mr = std::make_shared(); } else { @@ -163,11 +157,46 @@ class NumOfRows : public ::testing::TestWithParam> { } br = std::make_unique( - cudf::get_current_device_resource_ref(), pinned_mr, memory_available + cudf::get_current_device_resource_ref(), + pinned_mr, + std::move(memory_available) ); + } + + void SetUp() override { + std::tie(num_rows, mem_type) = GetParam(); + + std::unordered_map memory_available; + // disable all memory types except mem_type + for (auto mt : MEMORY_TYPES) { + if (mt != mem_type) { + memory_available[mt] = []() { return 0; }; + } + } + setup_br(mem_type, std::move(memory_available)); stream = cudf::get_default_stream(); } + void validate_packed_table( + cudf::table_view const& input_table, PackedData&& packed_data + ) { + EXPECT_EQ(mem_type, packed_data.data->mem_type()); + + auto to_device = std::make_unique( + packed_data.data->data(), packed_data.data->size, stream, br->device_mr() + ); + stream.synchronize(); + + cudf::packed_columns packed_columns( + std::move(packed_data.metadata), std::move(to_device) + ); + auto unpacked_table = cudf::unpack(packed_columns); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(input_table, unpacked_table); + } + + int num_rows; + MemoryType mem_type; + cudf::table input_table; std::unique_ptr br; std::shared_ptr pinned_mr; rmm::cuda_stream_view stream; @@ -176,20 +205,18 @@ class NumOfRows : public ::testing::TestWithParam> { // test different `num_rows` and `MemoryType`. INSTANTIATE_TEST_SUITE_P( ChunkedPack, - NumOfRows, + NumOfRows_MemType, ::testing::Combine( ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), // num rows ::testing::ValuesIn(MEMORY_TYPES) // output memory type ), - [](const testing::TestParamInfo& info) { + [](const testing::TestParamInfo& info) { return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_" + MEMORY_TYPE_NAMES[static_cast(std::get<1>(info.param))]; } ); -TEST_P(NumOfRows, chunked_pack) { - auto const [num_rows, mem_type] = GetParam(); - std::int64_t const seed = 42; +TEST_P(NumOfRows_MemType, chunked_pack) { cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); auto [bounce_buf_res, _] = br->reserve(MemoryType::DEVICE, chunk_size, true); @@ -200,97 +227,104 @@ TEST_P(NumOfRows, chunked_pack) { auto chunked_packed = rapidsmpf::chunked_pack(input_table, *bounce_buf, data_res); - EXPECT_EQ(mem_type, chunked_packed.data->mem_type()); - - auto to_device = std::make_unique( - chunked_packed.data->data(), chunked_packed.data->size, stream, br->device_mr() - ); - stream.synchronize(); - - cudf::packed_columns packed_columns( - std::move(chunked_packed.metadata), std::move(to_device) - ); - auto unpacked_table = cudf::unpack(packed_columns); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(input_table, unpacked_table); + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(chunked_packed))); } -class PackToHost : public ::testing::TestWithParam> { - protected: - void SetUp() override { - auto mem_type = std::get<1>(GetParam()); - - std::unordered_map memory_available; - memory_available[rapidsmpf::MemoryType::DEVICE] = []() { - return 1 << 20; - }; // 1 MiB - - if (rapidsmpf::is_pinned_memory_resources_supported()) { - pinned_mr = std::make_shared(); - } else { - pinned_mr = PinnedMemoryResource::Disabled; - } - - if (mem_type == MemoryType::PINNED_HOST - && pinned_mr == PinnedMemoryResource::Disabled) - { - GTEST_SKIP() << "MemoryType::PINNED_HOST isn't supported on the system."; - } - - br = std::make_unique( - cudf::get_current_device_resource_ref(), pinned_mr, memory_available - ); - stream = cudf::get_default_stream(); - } - - std::int64_t const seed = 42; - std::unique_ptr br; - std::shared_ptr pinned_mr; - rmm::cuda_stream_view stream; -}; +class NumOfRows_MemType2 : public NumOfRows_MemType {}; INSTANTIATE_TEST_SUITE_P( - PackToHost, - PackToHost, + PackTable, + NumOfRows_MemType2, ::testing::Combine( ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), // num rows - ::testing::ValuesIn(SPILL_TARGET_MEMORY_TYPES) // output memory type + ::testing::Values(MemoryType::HOST) // output memory type ), - [](const testing::TestParamInfo& info) { + [](const testing::TestParamInfo& info) { return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_" + MEMORY_TYPE_NAMES[static_cast(std::get<1>(info.param))]; } ); -TEST_P(PackToHost, pack_to_host) { - auto const [num_rows, mem_type] = GetParam(); - std::int64_t const seed = 42; +// device table to host packed data using 1MB device buffer +TEST_P(NumOfRows_MemType2, pack_to_host_with_1MB_device_buffer) { + // override br with just 1MB device memory. + std::unordered_map memory_available{ + {rapidsmpf::MemoryType::DEVICE, [] { return 1 << 20; }} + }; + setup_br(mem_type, std::move(memory_available)); + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); auto data_res = br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); - std::unique_ptr packed_data; - if (rapidsmpf::is_pinned_memory_resources_supported()) { - packed_data = rapidsmpf::pack_to_host(input_table, stream, data_res); - } else { - // only try to use device memory for the bounce buffer since pinned memory is not - // supported. Also force to use a 1MB bounce buffer since device memory is limited - // to 1MB. - std::array cpack_buf_mem_types = {MemoryType::DEVICE}; - packed_data = rapidsmpf::pack_to_host( - input_table, stream, data_res, 0.001, cpack_buf_mem_types - ); + std::array device_type{MemoryType::DEVICE}; + auto packed_data = rapidsmpf::pack(input_table, stream, data_res, device_type); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); +} + +// device table to host packed data using 1MB device buffer +TEST_P(NumOfRows_MemType2, pack_to_host_with_unlimited_device_buffer) { + // override br with just 1MB device memory. + std::unordered_map memory_available{ + {rapidsmpf::MemoryType::DEVICE, + [] { return std::numeric_limits::max(); }} + }; + setup_br(mem_type, std::move(memory_available)); + + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + std::array device_type{MemoryType::DEVICE}; + auto packed_data = rapidsmpf::pack(input_table, stream, data_res, device_type); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); +} + +// device table to host packed data using 1MB pinned buffer +TEST_P(NumOfRows_MemType2, pack_to_host_with_1MB_pinned_buffer) { + if (!rapidsmpf::is_pinned_memory_resources_supported()) { + GTEST_SKIP() << "Pinned memory resources are not supported on the system."; + return; } - EXPECT_EQ(mem_type, packed_data->data->mem_type()); - auto to_device = std::make_unique( - packed_data->data->data(), packed_data->data->size, stream, br->device_mr() - ); - stream.synchronize(); + std::unordered_map memory_available{ + {rapidsmpf::MemoryType::PINNED_HOST, [] { return 1 << 20; }} + }; + setup_br(mem_type, std::move(memory_available)); - cudf::packed_columns packed_columns( - std::move(packed_data->metadata), std::move(to_device) - ); - auto unpacked_table = cudf::unpack(packed_columns); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(input_table, unpacked_table); + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + auto packed_data = rapidsmpf::pack(input_table, stream, data_res); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); +} + +// device table to host packed data using unlimited pinned memory +TEST_P(NumOfRows_MemType2, pack_to_host_with_unlimited_pinned_buffer) { + if (!rapidsmpf::is_pinned_memory_resources_supported()) { + GTEST_SKIP() << "Pinned memory resources are not supported on the system."; + return; + } + + std::unordered_map memory_available{ + {rapidsmpf::MemoryType::PINNED_HOST, + [] { return std::numeric_limits::max(); }} + }; + setup_br(mem_type, std::move(memory_available)); + + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + auto packed_data = rapidsmpf::pack(input_table, stream, data_res); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); } From d35feb53ac3d0b607d1824c7721b101a20c373f1 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 7 Jan 2026 16:36:48 -0800 Subject: [PATCH 10/11] fix build Signed-off-by: niranda perera --- .../rapidsmpf/integrations/cudf/partition.hpp | 2 +- .../rapidsmpf/memory/buffer_resource.hpp | 19 ++++++++++++++----- cpp/include/rapidsmpf/memory/memory_type.hpp | 2 +- cpp/src/integrations/cudf/partition.cpp | 2 +- cpp/src/memory/buffer_resource.cpp | 6 +++--- cpp/src/streaming/cudf/table_chunk.cpp | 2 +- cpp/tests/test_partition.cpp | 2 +- 7 files changed, 22 insertions(+), 13 deletions(-) diff --git a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp index 120de2534..67b8b72e1 100644 --- a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp +++ b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ #pragma once diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 6d960660e..b8379b73b 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -108,10 +108,7 @@ class BufferResource { * @return Reference to the RMM resource used for pinned host allocations. */ [[nodiscard]] rmm::host_device_async_resource_ref pinned_mr() { - RAPIDSMPF_EXPECTS( - pinned_mr_, "no pinned memory resource is available", std::invalid_argument - ); - return *pinned_mr_; + return get_checked_pinned_mr(); } /** @@ -386,6 +383,18 @@ class BufferResource { std::shared_ptr statistics(); private: + /** + * @brief Get the RMM pinned host memory resource. + * + * @return Reference to the RMM resource used for pinned host allocations. + */ + [[nodiscard]] PinnedMemoryResource& get_checked_pinned_mr() { + RAPIDSMPF_EXPECTS( + pinned_mr_, "no pinned memory resource is available", std::invalid_argument + ); + return *pinned_mr_; + } + std::mutex mutex_; rmm::device_async_resource_ref device_mr_; std::shared_ptr pinned_mr_; diff --git a/cpp/include/rapidsmpf/memory/memory_type.hpp b/cpp/include/rapidsmpf/memory/memory_type.hpp index 31243b0d9..b4406f854 100644 --- a/cpp/include/rapidsmpf/memory/memory_type.hpp +++ b/cpp/include/rapidsmpf/memory/memory_type.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ #pragma once diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index c057388bc..fe5e0a56f 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index fdaeef661..4116a03f9 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -56,7 +56,7 @@ rmm::device_async_resource_ref BufferResource::get_device_mr(MemoryType const& m "memory type must be device accessible", std::invalid_argument ); - return mem_type == MemoryType::DEVICE ? device_mr() : pinned_mr(); + return mem_type == MemoryType::DEVICE ? device_mr() : get_checked_pinned_mr(); } rmm::host_async_resource_ref BufferResource::get_host_mr(MemoryType const& mem_type) { @@ -65,7 +65,7 @@ rmm::host_async_resource_ref BufferResource::get_host_mr(MemoryType const& mem_t "memory type must be host accessible", std::invalid_argument ); - return mem_type == MemoryType::PINNED_HOST ? pinned_mr() : host_mr(); + return mem_type == MemoryType::PINNED_HOST ? get_checked_pinned_mr() : host_mr(); } std::pair BufferResource::reserve( diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index c4deb4dc8..f94c67afb 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index 5b2009bbb..d8e2b3bc5 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ From e1fcf233e10b662faa721e60c0ff0054f3e29ac6 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Mon, 12 Jan 2026 16:41:39 -0800 Subject: [PATCH 11/11] fixing tests Signed-off-by: niranda perera --- .../rapidsmpf/memory/buffer_resource.hpp | 11 +++++-- cpp/include/rapidsmpf/memory/memory_type.hpp | 8 ++--- cpp/src/cuda_event.cpp | 6 ++-- cpp/src/integrations/cudf/partition.cpp | 32 ++++++++----------- cpp/src/memory/buffer_resource.cpp | 16 +++++++++- cpp/src/memory/host_buffer.cpp | 5 ++- cpp/tests/test_partition.cpp | 31 +++++++++++------- 7 files changed, 68 insertions(+), 41 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index b8379b73b..cd5f225de 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -290,10 +290,10 @@ class BufferResource { ); /** - * @brief Move device buffer data into a Buffer. + * @brief Move device/ pinned host buffer data into a Buffer. * * This operation is cheap; no copy is performed. The resulting Buffer resides in - * device memory. + * device/ pinned host memory. * * If @p stream differs from the device buffer's current stream: * - @p stream is synchronized with the device buffer's current stream, and @@ -302,10 +302,15 @@ class BufferResource { * @param data Unique pointer to the device buffer. * @param stream CUDA stream associated with the new Buffer. Use or synchronize with * this stream when operating on the Buffer. + * @param mem_type The memory type of the device buffer. Defaults to + * `MemoryType::DEVICE`. + * * @return Unique pointer to the resulting Buffer. */ std::unique_ptr move( - std::unique_ptr data, rmm::cuda_stream_view stream + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType mem_type = MemoryType::DEVICE ); /** diff --git a/cpp/include/rapidsmpf/memory/memory_type.hpp b/cpp/include/rapidsmpf/memory/memory_type.hpp index b4406f854..d91181c85 100644 --- a/cpp/include/rapidsmpf/memory/memory_type.hpp +++ b/cpp/include/rapidsmpf/memory/memory_type.hpp @@ -9,6 +9,8 @@ #include #include +#include + namespace rapidsmpf { /// @brief Enum representing the type of memory sorted in decreasing order of preference. @@ -86,8 +88,7 @@ static_assert(std::ranges::equal( * @return True if the memory type is host accessible, false otherwise. */ constexpr bool is_host_accessible(MemoryType const& mem_type) noexcept { - return std::ranges::find(HOST_ACCESSIBLE_MEMORY_TYPES, mem_type) - != std::ranges::end(HOST_ACCESSIBLE_MEMORY_TYPES); + return contains(HOST_ACCESSIBLE_MEMORY_TYPES, mem_type); }; /** @@ -97,8 +98,7 @@ constexpr bool is_host_accessible(MemoryType const& mem_type) noexcept { * @return True if the memory type is device accessible, false otherwise. */ constexpr bool is_device_accessible(MemoryType const& mem_type) noexcept { - return std::ranges::find(DEVICE_ACCESSIBLE_MEMORY_TYPES, mem_type) - != std::ranges::end(DEVICE_ACCESSIBLE_MEMORY_TYPES); + return contains(DEVICE_ACCESSIBLE_MEMORY_TYPES, mem_type); }; /** diff --git a/cpp/src/cuda_event.cpp b/cpp/src/cuda_event.cpp index 03e4a539a..8db79c4f7 100644 --- a/cpp/src/cuda_event.cpp +++ b/cpp/src/cuda_event.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -45,7 +45,7 @@ void CudaEvent::record(rmm::cuda_stream_view stream) { RAPIDSMPF_CUDA_TRY(cudaEventRecord(event_, stream)); } -[[nodiscard]] bool CudaEvent::CudaEvent::is_ready() const { +[[nodiscard]] bool CudaEvent::is_ready() const { auto result = cudaEventQuery(event_); if (result != cudaSuccess && result != cudaErrorNotReady) { RAPIDSMPF_CUDA_TRY(result); @@ -53,7 +53,7 @@ void CudaEvent::record(rmm::cuda_stream_view stream) { return result == cudaSuccess; } -void CudaEvent::CudaEvent::host_wait() const { +void CudaEvent::host_wait() const { RAPIDSMPF_CUDA_TRY(cudaEventSynchronize(event_)); } diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index fe5e0a56f..e458cde03 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -345,24 +345,15 @@ std::unique_ptr pack( ) { auto* br = data_res.br(); - auto cudf_pack = - [&](rmm::device_async_resource_ref device_mr) -> std::unique_ptr { - // if there is enough memory to pack the table, use `cudf::pack` - auto packed_columns = cudf::pack(table, stream, device_mr); + // use cudf::pack to pack the table to the data reservation. + auto cudf_pack = [&](MemoryReservation& res) -> std::unique_ptr { + auto dev_mr = br->get_device_mr(res.mem_type()); + auto packed_columns = cudf::pack(table, stream, dev_mr); - auto packed_data = std::make_unique( + return std::make_unique( std::move(packed_columns.metadata), - br->move(std::move(packed_columns.gpu_data), stream) + br->move(std::move(packed_columns.gpu_data), stream, res.mem_type()) ); - - pad_data_reservation(data_res, packed_data->data->size, table); - - // Note: in when using pinned memory, data is returned as a rmm::device_buffer. - // This data can not be released. Therefore, we need to make a copy. - // if the data res is device, this will be a no-op. - packed_data->data = br->move(std::move(packed_data->data), data_res); - - return packed_data; }; size_t est_table_size = estimated_memory_usage(table, stream); @@ -381,7 +372,12 @@ std::unique_ptr pack( if (is_device_accessible(data_res.mem_type())) { // use the memory resource corresponding to the data reservation, so that // cudf::pack will allocate memory from that memory type. - return cudf_pack(br->get_device_mr(data_res.mem_type())); + auto packed_data = cudf_pack(data_res); + + // release the amount of memory used by the packed data. + br->release(data_res, packed_data->data->size); + + return packed_data; } else { // HOST data reservations. // try to allocate as much device accessible memory as possible for the bounce @@ -391,9 +387,9 @@ std::unique_ptr pack( if (overbooking == 0) { // there is enough memory to pack the table, use `cudf::pack` - auto packed_data = cudf_pack(br->get_device_mr(mem_type)); + auto packed_data = cudf_pack(res); - // finally copy the packed data device buffer to data reservation + // finally copy the packed data device buffer to data reservation. // if the packed data size is within a certain wiggle room, pad the data // reservation to that size. diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index 4116a03f9..c024f7efd 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -6,6 +6,8 @@ #include #include +#include + #include #include #include @@ -166,13 +168,25 @@ std::unique_ptr BufferResource::allocate( } std::unique_ptr BufferResource::move( - std::unique_ptr data, rmm::cuda_stream_view stream + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType mem_type ) { auto upstream = data->stream(); if (upstream.value() != stream.value()) { cuda_stream_join(stream, upstream); data->set_stream(stream); } + // if the device_buffer is host accessible, wrap it in a HostBuffer + if (mem_type == MemoryType::PINNED_HOST) { + return std::unique_ptr(new Buffer( + std::make_unique(HostBuffer::from_rmm_device_buffer( + std::move(data), stream, get_checked_pinned_mr() + )), + stream, + MemoryType::PINNED_HOST + )); + } return std::unique_ptr(new Buffer(std::move(data), MemoryType::DEVICE)); } diff --git a/cpp/src/memory/host_buffer.cpp b/cpp/src/memory/host_buffer.cpp index 3e54a60e8..bf4d34027 100644 --- a/cpp/src/memory/host_buffer.cpp +++ b/cpp/src/memory/host_buffer.cpp @@ -145,8 +145,11 @@ HostBuffer HostBuffer::from_rmm_device_buffer( std::invalid_argument ); + // cuda::is_host_accessible will return false for an empty device buffer. So, bypass + // the check if the device buffer is empty. RAPIDSMPF_EXPECTS( - cuda::is_host_accessible(pinned_host_buffer->data()), + pinned_host_buffer->data() == nullptr + || cuda::is_host_accessible(pinned_host_buffer->data()), "pinned_host_buffer must be host accessible", std::invalid_argument ); diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index d8e2b3bc5..819b3634a 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -7,6 +7,8 @@ #include +#include + #include #include #include @@ -202,10 +204,12 @@ class NumOfRows_MemType : public ::testing::TestWithParamreserve(MemoryType::DEVICE, chunk_size, true); @@ -230,11 +234,11 @@ TEST_P(NumOfRows_MemType, chunked_pack) { EXPECT_NO_THROW(validate_packed_table(input_table, std::move(chunked_packed))); } -class NumOfRows_MemType2 : public NumOfRows_MemType {}; +class PackToHostTest : public NumOfRows_MemType {}; INSTANTIATE_TEST_SUITE_P( - PackTable, - NumOfRows_MemType2, + PackTableToHost, + PackToHostTest, ::testing::Combine( ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), // num rows ::testing::Values(MemoryType::HOST) // output memory type @@ -246,7 +250,7 @@ INSTANTIATE_TEST_SUITE_P( ); // device table to host packed data using 1MB device buffer -TEST_P(NumOfRows_MemType2, pack_to_host_with_1MB_device_buffer) { +TEST_P(PackToHostTest, pack_to_host_with_1MB_device_buffer) { // override br with just 1MB device memory. std::unordered_map memory_available{ {rapidsmpf::MemoryType::DEVICE, [] { return 1 << 20; }} @@ -265,7 +269,7 @@ TEST_P(NumOfRows_MemType2, pack_to_host_with_1MB_device_buffer) { } // device table to host packed data using 1MB device buffer -TEST_P(NumOfRows_MemType2, pack_to_host_with_unlimited_device_buffer) { +TEST_P(PackToHostTest, pack_to_host_with_unlimited_device_buffer) { // override br with just 1MB device memory. std::unordered_map memory_available{ {rapidsmpf::MemoryType::DEVICE, @@ -278,6 +282,9 @@ TEST_P(NumOfRows_MemType2, pack_to_host_with_unlimited_device_buffer) { auto data_res = br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + std::cout << data_res.size() << " " + << rapidsmpf::total_packing_wiggle_room(input_table) << std::endl; + std::array device_type{MemoryType::DEVICE}; auto packed_data = rapidsmpf::pack(input_table, stream, data_res, device_type); @@ -285,7 +292,7 @@ TEST_P(NumOfRows_MemType2, pack_to_host_with_unlimited_device_buffer) { } // device table to host packed data using 1MB pinned buffer -TEST_P(NumOfRows_MemType2, pack_to_host_with_1MB_pinned_buffer) { +TEST_P(PackToHostTest, pack_to_host_with_1MB_pinned_buffer) { if (!rapidsmpf::is_pinned_memory_resources_supported()) { GTEST_SKIP() << "Pinned memory resources are not supported on the system."; return; @@ -301,13 +308,14 @@ TEST_P(NumOfRows_MemType2, pack_to_host_with_1MB_pinned_buffer) { auto data_res = br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); - auto packed_data = rapidsmpf::pack(input_table, stream, data_res); + auto packed_data = + rapidsmpf::pack(input_table, stream, data_res, DEVICE_ACCESSIBLE_MEMORY_TYPES); EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); } // device table to host packed data using unlimited pinned memory -TEST_P(NumOfRows_MemType2, pack_to_host_with_unlimited_pinned_buffer) { +TEST_P(PackToHostTest, pack_to_host_with_unlimited_pinned_buffer) { if (!rapidsmpf::is_pinned_memory_resources_supported()) { GTEST_SKIP() << "Pinned memory resources are not supported on the system."; return; @@ -324,7 +332,8 @@ TEST_P(NumOfRows_MemType2, pack_to_host_with_unlimited_pinned_buffer) { auto data_res = br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); - auto packed_data = rapidsmpf::pack(input_table, stream, data_res); + auto packed_data = + rapidsmpf::pack(input_table, stream, data_res, DEVICE_ACCESSIBLE_MEMORY_TYPES); EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); }