Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ add_library(
src/config.cpp
src/cuda_event.cpp
src/integrations/cudf/bloom_filter.cu
src/integrations/cudf/pack.cpp
src/integrations/cudf/partition.cpp
src/integrations/cudf/utils.cpp
src/memory/buffer.cpp
Expand Down
111 changes: 111 additions & 0 deletions cpp/include/rapidsmpf/integrations/cudf/pack.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once

#include <memory>

#include <cudf/table/table_view.hpp>
#include <rmm/cuda_stream_view.hpp>

#include <rapidsmpf/memory/buffer.hpp>
#include <rapidsmpf/memory/memory_reservation.hpp>
#include <rapidsmpf/memory/memory_type.hpp>
#include <rapidsmpf/memory/packed_data.hpp>

namespace rapidsmpf {


/**
* @brief Pack a cudf table view into a contiguous buffer using chunked packing.
*
* This function serializes the given table view into a `PackedData` object
* using a bounce buffer for chunked transfer. This is useful when packing to
* host memory to avoid allocating temporary device memory for the entire table.
*
* @param table The table view to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param bounce_buffer Device buffer used as intermediate storage during chunked packing.
* @param pack_temp_mr Temporary memory resource used for packing.
* @param reservation Memory reservation to use for allocating the packed data buffer.
* @return A unique pointer to the packed data containing the serialized table.
*
* @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation.
*
* @see cudf::chunked_pack
*/
[[nodiscard]] std::unique_ptr<PackedData> chunked_pack(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
rmm::device_buffer& bounce_buffer,
rmm::device_async_resource_ref pack_temp_mr,
MemoryReservation& reservation
);

namespace detail {

/**
* @brief Pack a cudf table view into a contiguous buffer of the specified memory type.
*
* - Device:
* Uses cudf::pack(). Returns a `Buffer` with a `rmm::device_buffer`.
*
* - Pinned Host:
* Uses cudf::pack() with a pinned mr as device mr. Returns a `Buffer` with a pinned
* `HostBuffer`.
*
* - Host:
* Uses cudf::chunked_pack() with a device bounce buffer, if available, otherwise uses a
* pinned bounce buffer. Returns a `Buffer` with a `HostBuffer`.
*
* This function serializes the given table view into a `PackedData` object
* with the data buffer residing in the memory type specified by the template parameter.
* The memory for the packed data is allocated using the provided reservation.
*
* @tparam Destination The destination memory type for the packed data buffer.
* @param table The table view to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param reservation Memory reservation to use for allocating the packed data buffer.
* Must match the destination memory type.
* @return A unique pointer to the packed data containing the serialized table.
*
* @throws std::invalid_argument If the reservation's memory type does not match
* Destination.
* @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation.
*
* @see rapidsmpf::pack
* @see cudf::pack
*/
template <MemoryType Destination>
[[nodiscard]] std::unique_ptr<PackedData> pack(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
MemoryReservation& reservation
);

} // namespace detail

/**
* @brief Pack a cudf table view into a contiguous buffer.
*
* This function serializes the given table view into a `PackedData` object
* with the data buffer residing in the memory type of the provided reservation.
* The memory for the packed data is allocated using the provided reservation.
*
* @param table The table view to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param reservation Memory reservation to use for allocating the packed data buffer.
* @return A unique pointer to the packed data containing the serialized table.
*
* @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation.
*
* @see cudf::pack
*/
[[nodiscard]] std::unique_ptr<PackedData> pack(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
MemoryReservation& reservation
);

} // namespace rapidsmpf
23 changes: 21 additions & 2 deletions cpp/include/rapidsmpf/memory/buffer_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ class BufferResource {
*/
[[nodiscard]] rmm::host_async_resource_ref pinned_mr();

/**
* @brief Get the RMM pinned host memory resource as a device resource reference.
*
* @return Reference to the RMM resource used for pinned host allocations.
*/
[[nodiscard]] rmm::device_async_resource_ref pinned_mr_as_device();

/**
* @brief Check if pinned memory is available.
*
* @return true if pinned memory is available, false otherwise.
*/
[[nodiscard]] bool is_pinned_memory_available() const noexcept {
return pinned_mr_ != PinnedMemoryResource::Disabled;
}

/**
* @brief Retrieves the memory availability function for a given memory type.
*
Expand Down Expand Up @@ -293,15 +309,18 @@ class BufferResource {
);

/**
* @brief Move device buffer data into a Buffer.
* @brief Move rmm::device_buffer (resides in device or pinned host memory) into a
* Buffer.
*
* This operation is cheap; no copy is performed. The resulting Buffer resides in
* device memory.
* device memory or pinned host memory.
*
* If @p stream differs from the device buffer's current stream:
* - @p stream is synchronized with the device buffer's current stream, and
* - the device buffer's current stream is updated to @p stream.
*
* @note If the @p data is empty, the resulting Buffer will be DEVICE memory type.
*
* @param data Unique pointer to the device buffer.
* @param stream CUDA stream associated with the new Buffer. Use or synchronize with
* this stream when operating on the Buffer.
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/rapidsmpf/memory/host_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class HostBuffer {
static HostBuffer from_rmm_device_buffer(
std::unique_ptr<rmm::device_buffer> pinned_host_buffer,
rmm::cuda_stream_view stream,
PinnedMemoryResource& mr
rmm::host_async_resource_ref mr
);

private:
Expand Down
49 changes: 49 additions & 0 deletions cpp/include/rapidsmpf/memory/memory_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <ranges>
#include <span>

#include <rapidsmpf/utils/misc.hpp>

namespace rapidsmpf {

/// @brief Enum representing the type of memory sorted in decreasing order of preference.
Expand Down Expand Up @@ -66,6 +68,53 @@ static_assert(std::ranges::equal(
leq_memory_types(static_cast<MemoryType>(-1)), std::ranges::empty_view<MemoryType>{}
));

/**
* @brief Get the memory types that are device accessible.
*
* @return A span of memory types that are device accessible.
*/
constexpr std::span<MemoryType const> device_accessible_memory_types() noexcept {
return std::span{MEMORY_TYPES}.first<2>();
}

static_assert(std::ranges::equal(
device_accessible_memory_types(),
std::array{MemoryType::DEVICE, MemoryType::PINNED_HOST}
));

/**
* @brief Check if a memory type is device accessible.
*
* @param mem_type The memory type to check.
* @return true if the memory type is device accessible, false otherwise.
*/
constexpr bool is_device_accessible(MemoryType mem_type) noexcept {
return contains(device_accessible_memory_types(), mem_type);
}

/**
* @brief Get the memory types that are host accessible.
*
* @return A span of memory types that are host accessible.
*/
constexpr std::span<MemoryType const> host_accessible_memory_types() {
return std::span{MEMORY_TYPES}.last<2>();
}

static_assert(std::ranges::equal(
host_accessible_memory_types(), std::array{MemoryType::PINNED_HOST, MemoryType::HOST}
));

/**
* @brief Check if a memory type is host accessible.
*
* @param mem_type The memory type to check.
* @return true if the memory type is host accessible, false otherwise.
*/
constexpr bool is_host_accessible(MemoryType mem_type) noexcept {
return contains(host_accessible_memory_types(), mem_type);
}

/**
* @brief Get the name of a MemoryType.
*
Expand Down
Loading