diff --git a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp index 4b03c6826..67b8b72e1 100644 --- a/cpp/include/rapidsmpf/integrations/cudf/partition.hpp +++ b/cpp/include/rapidsmpf/integrations/cudf/partition.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -209,4 +209,84 @@ std::vector unspill_partitions( std::shared_ptr statistics = Statistics::disabled() ); +/// @brief The amount of extra memory to reserve for packing. +constexpr size_t packing_wiggle_room_per_column = 1024; ///< 1 KiB per column + +/** + * @brief The total amount of extra memory to reserve for packing. + * + * @param table The table to pack. + * @return The total amount of extra memory to reserve for packing. + */ +inline size_t total_packing_wiggle_room(cudf::table_view const& table) { + return packing_wiggle_room_per_column * static_cast(table.num_columns()); +} + +/** + * @brief Pack a table using a @p chunk_size device buffer using `cudf::chunked_pack`. + * + * All device operations will be performed on @p bounce_buf 's stream. + * `cudf::chunked_pack` requires the buffer to be at least 1 MiB in size. + * + * @param table The table to pack. + * @param bounce_buf A device bounce buffer to use for packing. + * @param data_res Memory reservation for the data buffer. If the final packed buffer size + * is with in a wiggle room, this @p data_res will be padded to the packed buffer size. + * + * @return A `PackedData` containing the packed table. + * + * @throws std::runtime_error If the memory allocation fails. + * @throws std::invalid_argument If the bounce buffer is not in device memory. + */ +PackedData chunked_pack( + cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res +); + +/// @brief The minimum buffer size for `cudf::chunked_pack`. +constexpr size_t cudf_chunked_pack_min_buffer_size = size_t(1) << 20; ///< 1 MiB + +/** + * @brief Pack a table to host memory using `cudf::pack` or `cudf::chunked_pack`. + * + * Based on benchmarks (rapidsai/rapidsmpf#745), the order of packing performance is as + * follows: + * - `cudf::pack` -> DEVICE + * - `cudf::chunked_pack` -> DEVICE + * - `cudf::pack` -> PINNED_HOST + * - `cudf::chunked_pack` -> PINNED HOST + * + * This utility using the following strategy: + * - data reservation must be big enough to pack the table. + * - if the data reservation is from device accessible memory, use cudf::pack, as it + * requires O(estimated_table_size) memory, which is already reserved up front. + * - if the data reservation is from host memory, for each memory type in @p + * bounce_buf_types, do the following: + * - try to reserve estimated_table_size for the memory type. + * - if the reservation is successful without overbooking, use cudf::pack, and move the + * packed data device buffer to the data reservation. + * - else if the leftover memory `>= cudf_chunked_pack_min_buffer_size`, allocate a + * device accessible bounce buffer, and use chunked_pack to pack to the data + * reservation. + * - else loop again with the next memory type. + * - if all memory types are tried and no success, fail. + * + * @param table The table to pack. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param data_res Memory reservation for the host data buffer. + * @param bounce_buf_types The memory types to use for the bounce buffer. Default is + * `DEVICE_ACCESSIBLE_MEMORY_TYPES`. + * + * @return A `PackedData` containing the packed table. + * + * @throws std::invalid_argument If the memory reservation is not big enough to pack the + * table. + * @throws std::runtime_error If all attempts to pack the table fail. + */ +std::unique_ptr pack( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& data_res, + std::span bounce_buf_types = DEVICE_ACCESSIBLE_MEMORY_TYPES +); + } // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 8bc7df73c..cd5f225de 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -107,13 +107,30 @@ class BufferResource { * * @return Reference to the RMM resource used for pinned host allocations. */ - [[nodiscard]] rmm::host_async_resource_ref pinned_mr() { - RAPIDSMPF_EXPECTS( - pinned_mr_, "no pinned memory resource is available", std::invalid_argument - ); - return *pinned_mr_; + [[nodiscard]] rmm::host_device_async_resource_ref pinned_mr() { + return get_checked_pinned_mr(); } + /** + * @brief Get the RMM device memory resource for a given memory type. + * + * @param mem_type The memory type. + * @return Reference to the RMM resource used for device allocations. + * @throws std::invalid_argument if the memory type is not device accessible. + */ + [[nodiscard]] rmm::device_async_resource_ref get_device_mr( + MemoryType const& mem_type + ); + + /** + * @brief Get the RMM host memory resource for a given memory type. + * + * @param mem_type The memory type. + * @return Reference to the RMM resource used for host allocations. + * @throws std::invalid_argument if the memory type is not host accessible. + */ + [[nodiscard]] rmm::host_async_resource_ref get_host_mr(MemoryType const& mem_type); + /** * @brief Retrieves the memory availability function for a given memory type. * @@ -211,7 +228,9 @@ class BufferResource { return std::move(res); } } - RAPIDSMPF_FAIL("failed to reserve memory", std::runtime_error); + RAPIDSMPF_FAIL( + "failed to reserve memory " + std::to_string(size), std::runtime_error + ); } /** @@ -271,10 +290,10 @@ class BufferResource { ); /** - * @brief Move device buffer data into a Buffer. + * @brief Move device/ pinned host buffer data into a Buffer. * * This operation is cheap; no copy is performed. The resulting Buffer resides in - * device memory. + * device/ pinned host memory. * * If @p stream differs from the device buffer's current stream: * - @p stream is synchronized with the device buffer's current stream, and @@ -283,10 +302,15 @@ class BufferResource { * @param data Unique pointer to the device buffer. * @param stream CUDA stream associated with the new Buffer. Use or synchronize with * this stream when operating on the Buffer. + * @param mem_type The memory type of the device buffer. Defaults to + * `MemoryType::DEVICE`. + * * @return Unique pointer to the resulting Buffer. */ std::unique_ptr move( - std::unique_ptr data, rmm::cuda_stream_view stream + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType mem_type = MemoryType::DEVICE ); /** @@ -364,6 +388,18 @@ class BufferResource { std::shared_ptr statistics(); private: + /** + * @brief Get the RMM pinned host memory resource. + * + * @return Reference to the RMM resource used for pinned host allocations. + */ + [[nodiscard]] PinnedMemoryResource& get_checked_pinned_mr() { + RAPIDSMPF_EXPECTS( + pinned_mr_, "no pinned memory resource is available", std::invalid_argument + ); + return *pinned_mr_; + } + std::mutex mutex_; rmm::device_async_resource_ref device_mr_; std::shared_ptr pinned_mr_; diff --git a/cpp/include/rapidsmpf/memory/memory_type.hpp b/cpp/include/rapidsmpf/memory/memory_type.hpp index 01b75d9a6..d91181c85 100644 --- a/cpp/include/rapidsmpf/memory/memory_type.hpp +++ b/cpp/include/rapidsmpf/memory/memory_type.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -9,6 +9,8 @@ #include #include +#include + namespace rapidsmpf { /// @brief Enum representing the type of memory sorted in decreasing order of preference. @@ -40,6 +42,20 @@ constexpr std::array SPILL_TARGET_MEMORY_TYPES{ {MemoryType::PINNED_HOST, MemoryType::HOST} }; +/** + * @brief Memory types that are device accessible in the order of preference. + */ +constexpr std::array DEVICE_ACCESSIBLE_MEMORY_TYPES{ + {MemoryType::DEVICE, MemoryType::PINNED_HOST} +}; + +/** + * @brief Memory types that are host accessible in the order of preference. + */ +constexpr std::array HOST_ACCESSIBLE_MEMORY_TYPES{ + {MemoryType::PINNED_HOST, MemoryType::HOST} +}; + /** * @brief Get the memory types with preference lower than or equal to @p mem_type. * @@ -65,6 +81,26 @@ static_assert(std::ranges::equal( leq_memory_types(static_cast(-1)), std::ranges::empty_view{} )); +/** + * @brief Check if the memory type is host accessible. + * + * @param mem_type The memory type. + * @return True if the memory type is host accessible, false otherwise. + */ +constexpr bool is_host_accessible(MemoryType const& mem_type) noexcept { + return contains(HOST_ACCESSIBLE_MEMORY_TYPES, mem_type); +}; + +/** + * @brief Check if the memory type is device accessible. + * + * @param mem_type The memory type. + * @return True if the memory type is device accessible, false otherwise. + */ +constexpr bool is_device_accessible(MemoryType const& mem_type) noexcept { + return contains(DEVICE_ACCESSIBLE_MEMORY_TYPES, mem_type); +}; + /** * @brief Get the name of a MemoryType. * diff --git a/cpp/src/cuda_event.cpp b/cpp/src/cuda_event.cpp index 03e4a539a..8db79c4f7 100644 --- a/cpp/src/cuda_event.cpp +++ b/cpp/src/cuda_event.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -45,7 +45,7 @@ void CudaEvent::record(rmm::cuda_stream_view stream) { RAPIDSMPF_CUDA_TRY(cudaEventRecord(event_, stream)); } -[[nodiscard]] bool CudaEvent::CudaEvent::is_ready() const { +[[nodiscard]] bool CudaEvent::is_ready() const { auto result = cudaEventQuery(event_); if (result != cudaSuccess && result != cudaErrorNotReady) { RAPIDSMPF_CUDA_TRY(result); @@ -53,7 +53,7 @@ void CudaEvent::record(rmm::cuda_stream_view stream) { return result == cudaSuccess; } -void CudaEvent::CudaEvent::host_wait() const { +void CudaEvent::host_wait() const { RAPIDSMPF_CUDA_TRY(cudaEventSynchronize(event_)); } diff --git a/cpp/src/integrations/cudf/partition.cpp b/cpp/src/integrations/cudf/partition.cpp index ec5d57a8c..e458cde03 100644 --- a/cpp/src/integrations/cudf/partition.cpp +++ b/cpp/src/integrations/cudf/partition.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -268,4 +268,154 @@ std::vector unspill_partitions( statistics->add_bytes_stat("spill-bytes-host-to-device", non_device_size); return ret; } + +namespace { + +/** + * @brief Pad the data reservation to the packed size if the packed size is within the + * wiggle room. + * + * @param data_res The data reservation to pad. + * @param packed_size The size of the packed data. + * @param table The table to pack. + */ +void pad_data_reservation( + MemoryReservation& data_res, size_t packed_size, cudf::table_view const& table +) { + if (packed_size > data_res.size()) { + if (packed_size <= data_res.size() + total_packing_wiggle_room(table)) { + data_res.clear(); // clear the current reservation + data_res = std::get<0>( + data_res.br()->reserve(data_res.mem_type(), packed_size, true) + ); + } + } +} +} // namespace + +PackedData chunked_pack( + cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res +) { + RAPIDSMPF_EXPECTS( + is_device_accessible(bounce_buf.mem_type()), + "bounce buffer is not device accessible", + std::invalid_argument + ); + + // all copies will be done on the bounce buffer's stream + auto stream = bounce_buf.stream(); + auto* br = data_res.br(); + size_t chunk_size = bounce_buf.size; + + cudf::chunked_pack packer(table, chunk_size, stream, br->device_mr()); + auto const packed_size = packer.get_total_contiguous_size(); + + // if the packed size > data reservation, and it is within the wiggle room, pad the + // data reservation to the packed size from the same memory type. + pad_data_reservation(data_res, packed_size, table); + + auto data_buf = br->allocate(packed_size, stream, data_res); + + bounce_buf.write_access([&](std::byte* bounce_buf_ptr, rmm::cuda_stream_view) { + // all copies are done on the same stream, so we can omit the stream parameter + cudf::device_span buf_span( + reinterpret_cast(bounce_buf_ptr), chunk_size + ); + + data_buf->write_access([&](std::byte* data_ptr, rmm::cuda_stream_view) { + size_t offset = 0; + while (packer.has_next()) { + size_t n_bytes = packer.next(buf_span); + RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync( + data_ptr + offset, buf_span.data(), n_bytes, cudaMemcpyDefault, stream + )); + offset += n_bytes; + } + }); + }); + + return {packer.build_metadata(), std::move(data_buf)}; +} + +std::unique_ptr pack( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& data_res, + std::span cpack_buf_mem_types +) { + auto* br = data_res.br(); + + // use cudf::pack to pack the table to the data reservation. + auto cudf_pack = [&](MemoryReservation& res) -> std::unique_ptr { + auto dev_mr = br->get_device_mr(res.mem_type()); + auto packed_columns = cudf::pack(table, stream, dev_mr); + + return std::make_unique( + std::move(packed_columns.metadata), + br->move(std::move(packed_columns.gpu_data), stream, res.mem_type()) + ); + }; + + size_t est_table_size = estimated_memory_usage(table, stream); + + // irrepective of the memory type, the reservation must be big enough to copy the + // output data buffer. + RAPIDSMPF_EXPECTS( + data_res.size() >= est_table_size, + "data reservation is not big enough to pack the table", + std::invalid_argument + ); + + // if the data reservation is from device accessible memory, use cudf::pack, as it + // performs better than chunked_pack. cudf::pack will require O(estimated_table_size) + // memory. + if (is_device_accessible(data_res.mem_type())) { + // use the memory resource corresponding to the data reservation, so that + // cudf::pack will allocate memory from that memory type. + auto packed_data = cudf_pack(data_res); + + // release the amount of memory used by the packed data. + br->release(data_res, packed_data->data->size); + + return packed_data; + } else { // HOST data reservations. + + // try to allocate as much device accessible memory as possible for the bounce + // buffer (max est_table_size). + for (auto const& mem_type : cpack_buf_mem_types) { + auto [res, overbooking] = br->reserve(mem_type, est_table_size, true); + + if (overbooking == 0) { + // there is enough memory to pack the table, use `cudf::pack` + auto packed_data = cudf_pack(res); + + // finally copy the packed data device buffer to data reservation. + + // if the packed data size is within a certain wiggle room, pad the data + // reservation to that size. + pad_data_reservation(data_res, packed_data->data->size, table); + + // finally copy the packed data device buffer to HOST memory. + // Note that if the padding exceeds the wiggle room, the following move + // will likely OOM. + packed_data->data = br->move(std::move(packed_data->data), data_res); + return packed_data; + } + + size_t leftover_mem = res.size() > overbooking ? res.size() - overbooking : 0; + + if (leftover_mem >= cudf_chunked_pack_min_buffer_size) { + // use device memory for the bounce buffer + auto bounce_buf = br->allocate(leftover_mem, stream, res); + return std::make_unique( + chunked_pack(table, *bounce_buf, data_res) + ); + } + } + + // if we get here, all attempts to pack the table have failed. + RAPIDSMPF_FAIL("failed to pack the table", std::runtime_error); + } +} + } // namespace rapidsmpf diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index f1e99949d..c024f7efd 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -1,11 +1,13 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ #include #include +#include + #include #include #include @@ -50,6 +52,24 @@ BufferResource::BufferResource( RAPIDSMPF_EXPECTS(statistics_ != nullptr, "the statistics pointer cannot be NULL"); } +rmm::device_async_resource_ref BufferResource::get_device_mr(MemoryType const& mem_type) { + RAPIDSMPF_EXPECTS( + is_device_accessible(mem_type), + "memory type must be device accessible", + std::invalid_argument + ); + return mem_type == MemoryType::DEVICE ? device_mr() : get_checked_pinned_mr(); +} + +rmm::host_async_resource_ref BufferResource::get_host_mr(MemoryType const& mem_type) { + RAPIDSMPF_EXPECTS( + is_host_accessible(mem_type), + "memory type must be host accessible", + std::invalid_argument + ); + return mem_type == MemoryType::PINNED_HOST ? get_checked_pinned_mr() : host_mr(); +} + std::pair BufferResource::reserve( MemoryType mem_type, std::size_t size, bool allow_overbooking ) { @@ -148,13 +168,25 @@ std::unique_ptr BufferResource::allocate( } std::unique_ptr BufferResource::move( - std::unique_ptr data, rmm::cuda_stream_view stream + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType mem_type ) { auto upstream = data->stream(); if (upstream.value() != stream.value()) { cuda_stream_join(stream, upstream); data->set_stream(stream); } + // if the device_buffer is host accessible, wrap it in a HostBuffer + if (mem_type == MemoryType::PINNED_HOST) { + return std::unique_ptr(new Buffer( + std::make_unique(HostBuffer::from_rmm_device_buffer( + std::move(data), stream, get_checked_pinned_mr() + )), + stream, + MemoryType::PINNED_HOST + )); + } return std::unique_ptr(new Buffer(std::move(data), MemoryType::DEVICE)); } diff --git a/cpp/src/memory/host_buffer.cpp b/cpp/src/memory/host_buffer.cpp index 3e54a60e8..bf4d34027 100644 --- a/cpp/src/memory/host_buffer.cpp +++ b/cpp/src/memory/host_buffer.cpp @@ -145,8 +145,11 @@ HostBuffer HostBuffer::from_rmm_device_buffer( std::invalid_argument ); + // cuda::is_host_accessible will return false for an empty device buffer. So, bypass + // the check if the device buffer is empty. RAPIDSMPF_EXPECTS( - cuda::is_host_accessible(pinned_host_buffer->data()), + pinned_host_buffer->data() == nullptr + || cuda::is_host_accessible(pinned_host_buffer->data()), "pinned_host_buffer must be host accessible", std::invalid_argument ); diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index d3b02c6a5..f94c67afb 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -1,10 +1,11 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ #include +#include #include #include #include @@ -170,32 +171,7 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // serialize `table_view()` into a packed_columns and then we move // the packed_columns' gpu_data to a new host buffer. - // TODO: use `cudf::chunked_pack()` with a bounce buffer. Currently, - // `cudf::pack()` allocates device memory we haven't reserved. - auto packed_columns = - cudf::pack(table_view(), stream(), br->device_mr()); - packed_data = std::make_unique( - std::move(packed_columns.metadata), - br->move(std::move(packed_columns.gpu_data), stream()) - ); - - // Handle the case where `cudf::pack` allocates slightly more than the - // input size. This can occur because cudf uses aligned allocations, - // which may exceed the requested size. To accommodate this, we - // allow some wiggle room. - if (packed_data->data->size > reservation.size()) { - auto const wiggle_room = - 1024 * static_cast(table_view().num_columns()); - if (packed_data->data->size <= reservation.size() + wiggle_room) { - reservation = - br->reserve( - MemoryType::HOST, packed_data->data->size, true - ) - .first; - } - } - packed_data->data = - br->move(std::move(packed_data->data), reservation); + packed_data = pack(table_view(), stream(), reservation); } return TableChunk(std::move(packed_data)); } diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index 8433ce630..819b3634a 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -7,6 +7,8 @@ #include +#include + #include #include #include @@ -19,6 +21,7 @@ #include #include #include +#include #include #include @@ -132,3 +135,205 @@ TEST_F(SpillingTest, SpillUnspillRoundtripPreservesDataAndMetadata) { auto actual = br->move_to_host_buffer(std::move(back_on_host[0].data), res); EXPECT_EQ(actual->copy_to_uint8_vector(), payload); } + +class NumOfRows_MemType : public ::testing::TestWithParam> { + protected: + // cudf::chunked_pack requires at least a 1 MiB bounce buffer + static constexpr size_t chunk_size = 1 << 20; + static constexpr int64_t seed = 42; + + void setup_br( + MemoryType mem_type, + std::unordered_map&& memory_available + ) { + if (rapidsmpf::is_pinned_memory_resources_supported()) { + pinned_mr = std::make_shared(); + } else { + pinned_mr = PinnedMemoryResource::Disabled; + } + + if (mem_type == MemoryType::PINNED_HOST + && pinned_mr == PinnedMemoryResource::Disabled) + { + GTEST_SKIP() << "MemoryType::PINNED_HOST isn't supported on the system."; + } + + br = std::make_unique( + cudf::get_current_device_resource_ref(), + pinned_mr, + std::move(memory_available) + ); + } + + void SetUp() override { + std::tie(num_rows, mem_type) = GetParam(); + + std::unordered_map memory_available; + // disable all memory types except mem_type + for (auto mt : MEMORY_TYPES) { + if (mt != mem_type) { + memory_available[mt] = []() { return 0; }; + } + } + setup_br(mem_type, std::move(memory_available)); + stream = cudf::get_default_stream(); + } + + void validate_packed_table( + cudf::table_view const& input_table, PackedData&& packed_data + ) { + EXPECT_EQ(mem_type, packed_data.data->mem_type()); + + auto to_device = std::make_unique( + packed_data.data->data(), packed_data.data->size, stream, br->device_mr() + ); + stream.synchronize(); + + cudf::packed_columns packed_columns( + std::move(packed_data.metadata), std::move(to_device) + ); + auto unpacked_table = cudf::unpack(packed_columns); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(input_table, unpacked_table); + } + + int num_rows; + MemoryType mem_type; + cudf::table input_table; + std::unique_ptr br; + std::shared_ptr pinned_mr; + rmm::cuda_stream_view stream; +}; + +class ChunkedPackTest : public NumOfRows_MemType {}; + +// test different `num_rows` and `MemoryType`. +INSTANTIATE_TEST_SUITE_P( + ChunkedPack, + ChunkedPackTest, + ::testing::Combine( + ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), // num rows + ::testing::ValuesIn(MEMORY_TYPES) // output memory type + ), + [](const testing::TestParamInfo& info) { + return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_" + + MEMORY_TYPE_NAMES[static_cast(std::get<1>(info.param))]; + } +); + +TEST_P(ChunkedPackTest, chunked_pack) { + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto [bounce_buf_res, _] = br->reserve(MemoryType::DEVICE, chunk_size, true); + auto bounce_buf = br->allocate(chunk_size, stream, bounce_buf_res); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + auto chunked_packed = rapidsmpf::chunked_pack(input_table, *bounce_buf, data_res); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(chunked_packed))); +} + +class PackToHostTest : public NumOfRows_MemType {}; + +INSTANTIATE_TEST_SUITE_P( + PackTableToHost, + PackToHostTest, + ::testing::Combine( + ::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000), // num rows + ::testing::Values(MemoryType::HOST) // output memory type + ), + [](const testing::TestParamInfo& info) { + return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_" + + MEMORY_TYPE_NAMES[static_cast(std::get<1>(info.param))]; + } +); + +// device table to host packed data using 1MB device buffer +TEST_P(PackToHostTest, pack_to_host_with_1MB_device_buffer) { + // override br with just 1MB device memory. + std::unordered_map memory_available{ + {rapidsmpf::MemoryType::DEVICE, [] { return 1 << 20; }} + }; + setup_br(mem_type, std::move(memory_available)); + + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + std::array device_type{MemoryType::DEVICE}; + auto packed_data = rapidsmpf::pack(input_table, stream, data_res, device_type); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); +} + +// device table to host packed data using 1MB device buffer +TEST_P(PackToHostTest, pack_to_host_with_unlimited_device_buffer) { + // override br with just 1MB device memory. + std::unordered_map memory_available{ + {rapidsmpf::MemoryType::DEVICE, + [] { return std::numeric_limits::max(); }} + }; + setup_br(mem_type, std::move(memory_available)); + + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + std::cout << data_res.size() << " " + << rapidsmpf::total_packing_wiggle_room(input_table) << std::endl; + + std::array device_type{MemoryType::DEVICE}; + auto packed_data = rapidsmpf::pack(input_table, stream, data_res, device_type); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); +} + +// device table to host packed data using 1MB pinned buffer +TEST_P(PackToHostTest, pack_to_host_with_1MB_pinned_buffer) { + if (!rapidsmpf::is_pinned_memory_resources_supported()) { + GTEST_SKIP() << "Pinned memory resources are not supported on the system."; + return; + } + + std::unordered_map memory_available{ + {rapidsmpf::MemoryType::PINNED_HOST, [] { return 1 << 20; }} + }; + setup_br(mem_type, std::move(memory_available)); + + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + auto packed_data = + rapidsmpf::pack(input_table, stream, data_res, DEVICE_ACCESSIBLE_MEMORY_TYPES); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); +} + +// device table to host packed data using unlimited pinned memory +TEST_P(PackToHostTest, pack_to_host_with_unlimited_pinned_buffer) { + if (!rapidsmpf::is_pinned_memory_resources_supported()) { + GTEST_SKIP() << "Pinned memory resources are not supported on the system."; + return; + } + + std::unordered_map memory_available{ + {rapidsmpf::MemoryType::PINNED_HOST, + [] { return std::numeric_limits::max(); }} + }; + setup_br(mem_type, std::move(memory_available)); + + cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10); + + auto data_res = + br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type); + + auto packed_data = + rapidsmpf::pack(input_table, stream, data_res, DEVICE_ACCESSIBLE_MEMORY_TYPES); + + EXPECT_NO_THROW(validate_packed_table(input_table, std::move(*packed_data))); +}