Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/benchmarks/bench_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,11 @@ int main(int argc, char** argv) {
}

rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();
rapidsmpf::BufferResource br{mr, std::move(memory_available)};
rapidsmpf::BufferResource br{
mr,
rapidsmpf::BufferResource::PinnedMemoryResourceDisabled,
std::move(memory_available)
};

auto& log = comm->logger();
rmm::cuda_stream_view stream = cudf::get_default_stream();
Expand Down
7 changes: 5 additions & 2 deletions cpp/benchmarks/streaming/bench_streaming_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,11 @@ int main(int argc, char** argv) {
}

rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();
auto br =
std::make_shared<rapidsmpf::BufferResource>(mr, std::move(memory_available));
auto br = std::make_shared<rapidsmpf::BufferResource>(
mr,
rapidsmpf::BufferResource::PinnedMemoryResourceDisabled,
std::move(memory_available)
);

auto& log = comm->logger();
rmm::cuda_stream_view stream = cudf::get_default_stream();
Expand Down
4 changes: 3 additions & 1 deletion cpp/benchmarks/streaming/ndsh/q09.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,9 @@ int main(int argc, char** argv) {
};
}
auto br = std::make_shared<rapidsmpf::BufferResource>(
stats_mr, std::move(memory_available)
stats_mr,
rapidsmpf::BufferResource::PinnedMemoryResourceDisabled,
std::move(memory_available)
);
auto envvars = rapidsmpf::config::get_environment_variables();
envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads);
Expand Down
4 changes: 3 additions & 1 deletion cpp/include/rapidsmpf/memory/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ class Buffer {
* This ensures that the buffer is backed by memory that behaves as host
* accessible memory.
*/
static constexpr std::array<MemoryType, 1> host_buffer_types{MemoryType::HOST};
static constexpr std::array<MemoryType, 2> host_buffer_types{
MemoryType::HOST, MemoryType::PINNED_HOST
};

/**
* @brief Access the underlying memory buffer (host or device memory).
Expand Down
33 changes: 33 additions & 0 deletions cpp/include/rapidsmpf/memory/buffer_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <rapidsmpf/memory/buffer.hpp>
#include <rapidsmpf/memory/host_memory_resource.hpp>
#include <rapidsmpf/memory/memory_reservation.hpp>
#include <rapidsmpf/memory/pinned_memory_resource.hpp>
#include <rapidsmpf/memory/spill_manager.hpp>
#include <rapidsmpf/rmm_resource_adaptor.hpp>
#include <rapidsmpf/statistics.hpp>
Expand Down Expand Up @@ -50,11 +51,18 @@ class BufferResource {
*/
using MemoryAvailable = std::function<std::int64_t()>;

/// @brief Sentinel value used to disable pinned host memory.
static constexpr auto PinnedMemoryResourceDisabled = nullptr;

/**
* @brief Constructs a buffer resource.
*
* @param device_mr Reference to the RMM device memory resource used for device
* allocations.
* @param pinned_mr The pinned host memory resource used for `MemoryType::PINNED_HOST`
* allocations. If null, pinned host allocations are disabled. In that case, any
* attempt to allocate pinned memory will fail regardless of what @p memory_available
* reports.
* @param memory_available Optional memory availability functions mapping memory types
* to available memory checkers. Memory types without availability functions are
* assumed to have unlimited memory.
Expand All @@ -68,6 +76,7 @@ class BufferResource {
*/
BufferResource(
rmm::device_async_resource_ref device_mr,
std::shared_ptr<PinnedMemoryResource> pinned_mr = PinnedMemoryResourceDisabled,
std::unordered_map<MemoryType, MemoryAvailable> memory_available = {},
std::optional<Duration> periodic_spill_check = std::chrono::milliseconds{1},
std::shared_ptr<rmm::cuda_stream_pool> stream_pool = std::make_shared<
Expand Down Expand Up @@ -95,6 +104,18 @@ class BufferResource {
return host_mr_;
}

/**
* @brief Get the RMM pinned host memory resource.
*
* @return Reference to the RMM resource used for pinned host allocations.
*/
[[nodiscard]] rmm::host_async_resource_ref pinned_mr() {
RAPIDSMPF_EXPECTS(
pinned_mr_, "no pinned memory resource is available", std::invalid_argument
);
return *pinned_mr_;
}

/**
* @brief Retrieves the memory availability function for a given memory type.
*
Expand Down Expand Up @@ -165,6 +186,10 @@ class BufferResource {
/**
* @brief Make a memory reservation or fail based on the given order of memory types.
*
* The function attempts to reserve memory by iterating over @p mem_types in the given
* order of preference. For each memory type, it requests a reservation without
* overbooking. If no memory type can satisfy the request, the function throws.
*
* @param size The size of the buffer to allocate.
* @param mem_types Range of memory types to try to reserve memory from.
* @return A memory reservation.
Expand All @@ -176,6 +201,13 @@ class BufferResource {
[[nodiscard]] MemoryReservation reserve_or_fail(size_t size, Range mem_types) {
// try to reserve memory from the given order
for (auto const& mem_type : mem_types) {
if (mem_type == MemoryType::PINNED_HOST
&& pinned_mr_ == PinnedMemoryResourceDisabled)
{
// Pinned host memory is only available if the memory resource is
// available.
continue;
}
auto [res, _] = reserve(mem_type, size, false);
if (res.size() == size) {
return std::move(res);
Expand Down Expand Up @@ -336,6 +368,7 @@ class BufferResource {
private:
std::mutex mutex_;
rmm::device_async_resource_ref device_mr_;
std::shared_ptr<PinnedMemoryResource> pinned_mr_;
HostMemoryResource host_mr_;
std::unordered_map<MemoryType, MemoryAvailable> memory_available_;
// Zero initialized reserved counters.
Expand Down
17 changes: 11 additions & 6 deletions cpp/include/rapidsmpf/memory/memory_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@

namespace rapidsmpf {

/// @brief Enum representing the type of memory.
/// @brief Enum representing the type of memory sorted in decreasing order of preference.
enum class MemoryType : int {
DEVICE = 0, ///< Device memory
HOST = 1 ///< Host memory
PINNED_HOST = 1, ///< Pinned host memory
HOST = 2 ///< Host memory
};

/// @brief All memory types sorted in decreasing order of preference.
constexpr std::array<MemoryType, 2> MEMORY_TYPES{{MemoryType::DEVICE, MemoryType::HOST}};
constexpr std::array<MemoryType, 3> MEMORY_TYPES{
{MemoryType::DEVICE, MemoryType::PINNED_HOST, MemoryType::HOST}
};

/// @brief Memory type names sorted to match `MEMORY_TYPES`.
/// @brief Memory type names sorted to match `MemoryType` and `MEMORY_TYPES`.
constexpr std::array<char const*, MEMORY_TYPES.size()> MEMORY_TYPE_NAMES{
{"DEVICE", "HOST"}
{"DEVICE", "PINNED_HOST", "HOST"}
};

/**
Expand All @@ -31,7 +34,9 @@ constexpr std::array<char const*, MEMORY_TYPES.size()> MEMORY_TYPE_NAMES{
* insufficient. The ordering reflects the policy of spilling in RapidsMPF, where
* earlier entries are considered more desirable spill destinations.
*/
constexpr std::array<MemoryType, 1> SPILL_TARGET_MEMORY_TYPES{{MemoryType::HOST}};
constexpr std::array<MemoryType, 2> SPILL_TARGET_MEMORY_TYPES{
{MemoryType::PINNED_HOST, MemoryType::HOST}
};

/**
* @brief Get the name of a MemoryType.
Expand Down
19 changes: 17 additions & 2 deletions cpp/src/memory/buffer_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,33 @@ namespace rapidsmpf {
namespace {
/// @brief Helper that adds missing functions to the `memory_available` argument.
auto add_missing_availability_functions(
std::unordered_map<MemoryType, BufferResource::MemoryAvailable>&& memory_available
std::unordered_map<MemoryType, BufferResource::MemoryAvailable>&& memory_available,
bool pinned_mr_is_not_available
) {
for (MemoryType mem_type : MEMORY_TYPES) {
// Add missing memory availability functions.
memory_available.try_emplace(mem_type, std::numeric_limits<std::int64_t>::max);
}
if (pinned_mr_is_not_available) {
memory_available[MemoryType::PINNED_HOST] = []() -> std::int64_t { return 0; };
}
return memory_available;
}
} // namespace

BufferResource::BufferResource(
rmm::device_async_resource_ref device_mr,
std::shared_ptr<PinnedMemoryResource> pinned_mr,
std::unordered_map<MemoryType, MemoryAvailable> memory_available,
std::optional<Duration> periodic_spill_check,
std::shared_ptr<rmm::cuda_stream_pool> stream_pool,
std::shared_ptr<Statistics> statistics
)
: device_mr_{device_mr},
memory_available_{add_missing_availability_functions(std::move(memory_available))},
pinned_mr_{std::move(pinned_mr)},
memory_available_{add_missing_availability_functions(
std::move(memory_available), pinned_mr_ == PinnedMemoryResourceDisabled
)},
stream_pool_{std::move(stream_pool)},
spill_manager_{this, periodic_spill_check},
statistics_{std::move(statistics)} {
Expand Down Expand Up @@ -112,6 +120,13 @@ std::unique_ptr<Buffer> BufferResource::allocate(
MemoryType::HOST
));
break;
case MemoryType::PINNED_HOST:
ret = std::unique_ptr<Buffer>(new Buffer(
std::make_unique<HostBuffer>(size, stream, pinned_mr()),
stream,
MemoryType::PINNED_HOST
));
break;
case MemoryType::DEVICE:
ret = std::unique_ptr<Buffer>(new Buffer(
std::make_unique<rmm::device_buffer>(size, stream, device_mr()),
Expand Down
1 change: 1 addition & 0 deletions cpp/src/streaming/cudf/table_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const {
return TableChunk(std::move(table), stream());
}
case MemoryType::HOST:
case MemoryType::PINNED_HOST:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we implement an abstraction, or an type checker perhaps to use in such situations? E.g.:

is_host_memory(MemoryType type) {
    switch (mem_type) {
    case MemoryType::HOST:
    case MemoryType::PINNED_HOST:
        return true;
    default:
        return false;
}

Copy link
Member Author

@madsbk madsbk Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait. In this case, I think we want to change behavior for PINNED_HOST: #726 (comment)

{
// Get the packed data either from `packed_columns_` or `table_view().
std::unique_ptr<PackedData> packed_data;
Expand Down
83 changes: 71 additions & 12 deletions cpp/tests/streaming/test_table_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,44 @@
using namespace rapidsmpf;
using namespace rapidsmpf::streaming;

using StreamingTableChunk = BaseStreamingFixture;
class StreamingTableChunk : public BaseStreamingFixture,
public ::testing::WithParamInterface<rapidsmpf::MemoryType> {
protected:
void SetUp() override {
rapidsmpf::config::Options options(
rapidsmpf::config::get_environment_variables()
);

std::unordered_map<MemoryType, rapidsmpf::BufferResource::MemoryAvailable>
memory_available{};
auto stream_pool = std::make_shared<rmm::cuda_stream_pool>(
16, rmm::cuda_stream::flags::non_blocking
);
std::shared_ptr<PinnedMemoryResource> pinned_mr =
BufferResource::PinnedMemoryResourceDisabled;
if (is_pinned_memory_resources_supported()) {
pinned_mr = std::make_shared<PinnedMemoryResource>();
}

stream = cudf::get_default_stream();
br = std::make_shared<rapidsmpf::BufferResource>(
mr_cuda, // device_mr
pinned_mr, // pinned_mr
memory_available, // memory_available
std::chrono::milliseconds{1}, // periodic_spill_check
stream_pool, // stream_pool
Statistics::disabled() // statistics
);
ctx = std::make_shared<rapidsmpf::streaming::Context>(
options, GlobalEnvironment->comm_, br
);
}

rmm::cuda_stream_view stream;
rmm::mr::cuda_memory_resource mr_cuda;
std::shared_ptr<rapidsmpf::BufferResource> br;
std::shared_ptr<rapidsmpf::streaming::Context> ctx;
};

TEST_F(StreamingTableChunk, FromTable) {
constexpr unsigned int num_rows = 100;
Expand Down Expand Up @@ -143,7 +180,23 @@ TEST_F(StreamingTableChunk, FromPackedDataOnDevice) {
EXPECT_EQ(chunk.make_available_cost(), 0);
}

TEST_F(StreamingTableChunk, FromPackedDataOnHost) {
INSTANTIATE_TEST_SUITE_P(
StreamingTableChunkWithSpillTargets,
StreamingTableChunk,
::testing::ValuesIn(rapidsmpf::SPILL_TARGET_MEMORY_TYPES),
[](testing::TestParamInfo<rapidsmpf::MemoryType> const& info) {
return std::string{rapidsmpf::to_string(info.param)};
}
);

TEST_P(StreamingTableChunk, FromPackedDataOn) {
auto const spill_mem_type = GetParam();
if (spill_mem_type == MemoryType::PINNED_HOST
&& !is_pinned_memory_resources_supported())
{
GTEST_SKIP() << "MemoryType::PINNED_HOST isn't supported on the system.";
}

constexpr unsigned int num_rows = 100;
constexpr std::int64_t seed = 1337;

Expand All @@ -154,12 +207,12 @@ TEST_F(StreamingTableChunk, FromPackedDataOnHost) {
// Move the gpu_data to a Buffer (still device memory).
auto gpu_data_on_device = br->move(std::move(packed_columns.gpu_data), stream);

// Copy the GPU data to host memory.
auto [res, _] = br->reserve(MemoryType::HOST, size, true);
auto gpu_data_on_host = br->move(std::move(gpu_data_on_device), res);
// Copy the GPU data to the current spill target memory type.
auto [res, _] = br->reserve(spill_mem_type, size, true);
auto gpu_data_in_spill_memory = br->move(std::move(gpu_data_on_device), res);

auto packed_data = std::make_unique<PackedData>(
std::move(packed_columns.metadata), std::move(gpu_data_on_host)
std::move(packed_columns.metadata), std::move(gpu_data_in_spill_memory)
);
TableChunk chunk{std::move(packed_data)};

Expand Down Expand Up @@ -187,7 +240,14 @@ TEST_F(StreamingTableChunk, DeviceToDeviceCopy) {
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(chunk2.table_view(), expect);
}

TEST_F(StreamingTableChunk, DeviceToHostRoundTripCopy) {
TEST_P(StreamingTableChunk, DeviceToHostRoundTripCopy) {
auto const spill_mem_type = GetParam();
if (spill_mem_type == MemoryType::PINNED_HOST
&& !is_pinned_memory_resources_supported())
{
GTEST_SKIP() << "MemoryType::PINNED_HOST isn't supported on the system.";
}

constexpr unsigned int num_rows = 64;
constexpr std::int64_t seed = 2025;

Expand All @@ -208,7 +268,7 @@ TEST_F(StreamingTableChunk, DeviceToHostRoundTripCopy) {

// Copy to host memory -> new chunk should be unavailable.
auto host_res = br->reserve_or_fail(
dev_chunk.data_alloc_size(MemoryType::DEVICE), MemoryType::HOST
dev_chunk.data_alloc_size(MemoryType::DEVICE), spill_mem_type
);
auto host_copy = dev_chunk.copy(host_res);
EXPECT_FALSE(host_copy.is_available());
Expand All @@ -224,9 +284,8 @@ TEST_F(StreamingTableChunk, DeviceToHostRoundTripCopy) {
}

// Host to host copy.
auto host_res2 = br->reserve_or_fail(
host_copy.data_alloc_size(MemoryType::HOST), MemoryType::HOST
);
auto host_res2 =
br->reserve_or_fail(host_copy.data_alloc_size(spill_mem_type), spill_mem_type);
auto host_copy2 = host_copy.copy(host_res2);
EXPECT_FALSE(host_copy2.is_available());
EXPECT_TRUE(host_copy2.is_spillable());
Expand All @@ -242,7 +301,7 @@ TEST_F(StreamingTableChunk, DeviceToHostRoundTripCopy) {

// Bring the new host copy back to device and verify equality.
auto dev_res = br->reserve_or_fail(
host_copy2.data_alloc_size(MemoryType::HOST), MemoryType::DEVICE
host_copy2.data_alloc_size(spill_mem_type), MemoryType::DEVICE
);
auto dev_back = host_copy2.make_available(dev_res);
EXPECT_TRUE(dev_back.is_available());
Expand Down
Loading