From 6a4a494002cd02a9530b0b02caf1d1437e968a26 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 10 Nov 2025 12:55:56 -0800 Subject: [PATCH 1/3] Integrate `MetadataPayloadExchange` in `Shuffler` --- cpp/include/rapidsmpf/shuffler/shuffler.hpp | 15 +- .../metadata_payload_exchange/tag.cpp | 16 +- cpp/src/shuffler/shuffler.cpp | 308 +++++------------- python/rapidsmpf/rapidsmpf/tests/test_dask.py | 12 +- 4 files changed, 115 insertions(+), 236 deletions(-) diff --git a/cpp/include/rapidsmpf/shuffler/shuffler.hpp b/cpp/include/rapidsmpf/shuffler/shuffler.hpp index 8b4ba0d8a..16e7e8214 100644 --- a/cpp/include/rapidsmpf/shuffler/shuffler.hpp +++ b/cpp/include/rapidsmpf/shuffler/shuffler.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -94,6 +95,8 @@ class Shuffler { * @param finished_callback Callback to notify when a partition is finished. * @param statistics The statistics instance to use (disabled by default). * @param partition_owner Function to determine partition ownership. + * @param mpe Optional custom metadata payload exchange. If not provided, + * uses the default tag-based implementation. * * @note The caller promises that inserted buffers are stream-ordered with respect * to their own stream, and extracted buffers are likewise guaranteed to be stream- @@ -107,7 +110,8 @@ class Shuffler { BufferResource* br, FinishedCallback&& finished_callback, std::shared_ptr statistics = Statistics::disabled(), - PartitionOwner partition_owner = round_robin + PartitionOwner partition_owner = round_robin, + std::unique_ptr mpe = nullptr ); /** @@ -121,6 +125,8 @@ class Shuffler { * @param br Buffer resource used to allocate temporary and the shuffle result. * @param statistics The statistics instance to use (disabled by default). * @param partition_owner Function to determine partition ownership. + * @param mpe Optional custom metadata payload exchange. If not provided, + * uses the default tag-based implementation. * * @note The caller promises that inserted buffers are stream-ordered with respect * to their own stream, and extracted buffers are likewise guaranteed to be stream- @@ -133,7 +139,8 @@ class Shuffler { PartID total_num_partitions, BufferResource* br, std::shared_ptr statistics = Statistics::disabled(), - PartitionOwner partition_owner = round_robin + PartitionOwner partition_owner = round_robin, + std::unique_ptr mpe = nullptr ) : Shuffler( comm, @@ -143,7 +150,8 @@ class Shuffler { br, nullptr, statistics, - partition_owner + partition_owner, + std::move(mpe) ) {} ~Shuffler(); @@ -348,6 +356,7 @@ class Shuffler { ///< ready to be extracted by the user. std::shared_ptr comm_; + std::unique_ptr mpe_; std::shared_ptr progress_thread_; ProgressThread::FunctionID progress_thread_function_id_; OpID const op_id_; diff --git a/cpp/src/communicator/metadata_payload_exchange/tag.cpp b/cpp/src/communicator/metadata_payload_exchange/tag.cpp index 354c56eed..23f04f79d 100644 --- a/cpp/src/communicator/metadata_payload_exchange/tag.cpp +++ b/cpp/src/communicator/metadata_payload_exchange/tag.cpp @@ -94,7 +94,9 @@ void TagMetadataPayloadExchange::send( } } - statistics_->add_duration_stat("comms-interface-send-messages", Clock::now() - t0); + statistics_->add_duration_stat( + "metadata-payload-exchange-send-messages", Clock::now() - t0 + ); } void TagMetadataPayloadExchange::progress() { @@ -111,7 +113,9 @@ void TagMetadataPayloadExchange::progress() { cleanup_completed_operations(); - statistics_->add_duration_stat("comms-interface-progress", Clock::now() - t0); + statistics_->add_duration_stat( + "metadata-payload-exchange-progress", Clock::now() - t0 + ); } std::vector> @@ -178,7 +182,9 @@ void TagMetadataPayloadExchange::receive_metadata() { ); } - statistics_->add_duration_stat("comms-interface-receive-metadata", Clock::now() - t0); + statistics_->add_duration_stat( + "metadata-payload-exchange-receive-metadata", Clock::now() - t0 + ); } std::vector> @@ -259,7 +265,7 @@ TagMetadataPayloadExchange::setup_data_receives() { } statistics_->add_duration_stat( - "comms-interface-setup-data-receives", Clock::now() - t0 + "metadata-payload-exchange-setup-data-receives", Clock::now() - t0 ); return completed_messages; @@ -320,7 +326,7 @@ TagMetadataPayloadExchange::complete_data_transfers() { } statistics_->add_duration_stat( - "comms-interface-complete-data-transfers", Clock::now() - t0 + "metadata-payload-exchange-complete-data-transfers", Clock::now() - t0 ); return completed_messages; diff --git a/cpp/src/shuffler/shuffler.cpp b/cpp/src/shuffler/shuffler.cpp index 00f911daf..28e4bee75 100644 --- a/cpp/src/shuffler/shuffler.cpp +++ b/cpp/src/shuffler/shuffler.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include #include @@ -174,239 +176,102 @@ class Shuffler::Progress { RAPIDSMPF_NVTX_SCOPED_RANGE("Shuffler.Progress", p_iters++); auto const t0_event_loop = Clock::now(); - // Tags for each stage of the shuffle - Tag const ready_for_data_tag{shuffler_.op_id_, 1}; - Tag const metadata_tag{shuffler_.op_id_, 2}; - Tag const gpu_data_tag{shuffler_.op_id_, 3}; - - auto& log = shuffler_.comm_->logger(); auto& stats = *shuffler_.statistics_; - // Check for new chunks in the inbox and send off their metadata. + // Submit outgoing chunks to the metadata payload exchange { - auto const t0_send_metadata = Clock::now(); + auto const t0_submit_outgoing = Clock::now(); auto ready_chunks = shuffler_.outgoing_postbox_.extract_all_ready(); - RAPIDSMPF_NVTX_SCOPED_RANGE("meta_send", ready_chunks.size()); - for (auto&& chunk : ready_chunks) { - // All messages in the chunk maps to the same key (checked by the PostBox) - // thus we can use the partition ID of the first message in the chunk to - // determine the source rank of all of them. - auto dst = shuffler_.partition_owner(shuffler_.comm_, chunk.part_id(0)); - log.trace("send metadata to ", dst, ": ", chunk); - RAPIDSMPF_EXPECTS( - dst != shuffler_.comm_->rank(), "sending chunk to ourselves" - ); - - fire_and_forget_.push_back( - shuffler_.comm_->send(chunk.serialize(), dst, metadata_tag) - ); - if (chunk.concat_data_size() > 0) { + RAPIDSMPF_NVTX_SCOPED_RANGE("submit_outgoing", ready_chunks.size()); + + if (!ready_chunks.empty()) { + // Define peer rank function for chunks + auto peer_rank_fn = [&shuffler = + shuffler_](detail::Chunk const& chunk) -> Rank { + auto dst = shuffler.partition_owner(shuffler.comm_, chunk.part_id(0)); + shuffler.comm_->logger().trace( + "submitting message to ", dst, ": ", chunk.str() + ); RAPIDSMPF_EXPECTS( - outgoing_chunks_.emplace(chunk.chunk_id(), std::move(chunk)) - .second, - "outgoing chunk already exist" + dst != shuffler.comm_->rank(), "sending message to ourselves" ); - ready_ack_receives_[dst].push_back( - shuffler_.comm_->recv_sync_host_data( - dst, - ready_for_data_tag, - std::make_unique>( - ReadyForDataMessage::byte_size - ) + return dst; + }; + + // Convert chunks to simple messages manually + std::vector< + std::unique_ptr> + messages; + messages.reserve(ready_chunks.size()); + + for (auto&& chunk : ready_chunks) { + auto dst = peer_rank_fn(chunk); + auto metadata = *chunk.serialize(); + auto data = chunk.release_data_buffer(); + + messages.push_back( + std::make_unique( + dst, std::move(metadata), std::move(data) ) ); } - } - stats.add_duration_stat( - "event-loop-metadata-send", Clock::now() - t0_send_metadata - ); - } - // Receive any incoming metadata of remote chunks and place them in - // `incoming_chunks_`. - { - auto const t0_metadata_recv = Clock::now(); - RAPIDSMPF_NVTX_SCOPED_RANGE("meta_recv"); - int i = 0; - while (true) { - auto const [msg, src] = shuffler_.comm_->recv_any(metadata_tag); - if (msg) { - auto chunk = Chunk::deserialize(*msg, false); - log.trace("recv_any from ", src, ": ", chunk); - // All messages in the chunk maps to the same Rank (checked by the - // PostBox) thus we can use the partition ID of the first message in - // the chunk to determine the source rank of all of them. - RAPIDSMPF_EXPECTS( - shuffler_.partition_owner(shuffler_.comm_, chunk.part_id(0)) - == shuffler_.comm_->rank(), - "receiving chunk not owned by us" - ); - incoming_chunks_.emplace(src, std::move(chunk)); - } else { - break; - } - i++; + shuffler_.mpe_->send(std::move(messages)); } stats.add_duration_stat( - "event-loop-metadata-recv", Clock::now() - t0_metadata_recv + "event-loop-submit-outgoing", Clock::now() - t0_submit_outgoing ); - RAPIDSMPF_NVTX_MARKER("meta_recv_iters", i); } - // Post receives for incoming chunks + // Process all communication operations and get completed chunks { - RAPIDSMPF_NVTX_SCOPED_RANGE("post_chunk_recv", incoming_chunks_.size()); - auto const t0_post_incoming_chunk_recv = Clock::now(); - for (auto it = incoming_chunks_.begin(); it != incoming_chunks_.end();) { - auto& [src, chunk] = *it; - log.trace("checking incoming chunk data from ", src, ": ", chunk); - - // If the chunk contains GPU data, we need to receive it. Otherwise, it - // goes directly to the ready postbox. - if (chunk.concat_data_size() > 0) { - if (!chunk.is_data_buffer_set()) { - // Create a new buffer and let the buffer resource decide the - // memory type. - chunk.set_data_buffer(allocate_buffer( - chunk.concat_data_size(), - shuffler_.br_->stream_pool().get_stream(), - shuffler_.br_ - )); - if (chunk.data_memory_type() == MemoryType::HOST) { - stats.add_bytes_stat( - "spill-bytes-recv-to-host", chunk.concat_data_size() - ); - } - } + auto const t0_process_comm = Clock::now(); + RAPIDSMPF_NVTX_SCOPED_RANGE("process_communication"); - // Check if the buffer is ready to be used - if (!chunk.is_ready()) { - // Buffer is not ready yet, skip to next item - ++it; - continue; - } + shuffler_.mpe_->progress(); + auto completed_messages = shuffler_.mpe_->recv(); - // At this point we know we can process this item, so extract it. - // Note: extract_item invalidates the iterator, so must increment - // here. - auto [src, chunk] = extract_item(incoming_chunks_, it++); - auto chunk_id = chunk.chunk_id(); - auto data_size = chunk.concat_data_size(); - - // Setup to receive the chunk into `in_transit_*`. - // transfer the data buffer from the chunk to the future - auto future = shuffler_.comm_->recv( - src, gpu_data_tag, chunk.release_data_buffer() - ); - RAPIDSMPF_EXPECTS( - in_transit_futures_.emplace(chunk_id, std::move(future)).second, - "in transit future already exist" - ); - RAPIDSMPF_EXPECTS( - in_transit_chunks_.emplace(chunk_id, std::move(chunk)).second, - "in transit chunk already exist" - ); - shuffler_.statistics_->add_bytes_stat( - "shuffle-payload-recv", data_size - ); - // Tell the source of the chunk that we are ready to receive it. - // All partition IDs in the chunk must map to the same key (rank). - fire_and_forget_.push_back(shuffler_.comm_->send( - ReadyForDataMessage{chunk_id}.pack(), src, ready_for_data_tag - )); - } else { // chunk contains control messages and/or metadata-only messages - // At this point we know we can process this item, so extract it. - // Note: extract_item invalidates the iterator, so must increment - // here. - auto [src, chunk] = extract_item(incoming_chunks_, it++); - - // iterate over all messages in the chunk - for (size_t i = 0; i < chunk.n_messages(); ++i) { - // ready postbox uniquely identifies chunks by their [partition - // ID, chunk ID] pair. We can reuse the same chunk ID for the - // copy because the partition IDs are unique within a chunk. - auto chunk_copy = - chunk.get_data(chunk.chunk_id(), i, shuffler_.br_); - shuffler_.insert_into_ready_postbox(std::move(chunk_copy)); - } - } - } + // Convert simple messages back to chunks manually + std::vector final_chunks; + final_chunks.reserve(completed_messages.size()); - stats.add_duration_stat( - "event-loop-post-incoming-chunk-recv", - Clock::now() - t0_post_incoming_chunk_recv - ); - } - - // Receive any incoming ready-for-data messages and start sending the - // requested data. - { - auto const t0_init_gpu_data_send = Clock::now(); - RAPIDSMPF_NVTX_SCOPED_RANGE( - "init_gpu_send", - std::transform_reduce( - ready_ack_receives_.begin(), - ready_ack_receives_.end(), - 0, - std::plus<>(), - [](auto& kv) { return kv.second.size(); } - ) - ); - // ready_ack_receives_ are separated by rank so that we - // can guarantee that we don't match messages out of order - // when using the UCXX communicator. See comment in - // ucxx.cpp::test_some. - for (auto& [dst, futures] : ready_ack_receives_) { - auto [finished, _] = shuffler_.comm_->test_some(futures); - for (auto&& future : finished) { - auto const msg_data = - shuffler_.comm_->release_sync_host_data(std::move(future)); - auto msg = ReadyForDataMessage::unpack(msg_data); - auto chunk = extract_value(outgoing_chunks_, msg.cid); - shuffler_.statistics_->add_bytes_stat( - "shuffle-payload-send", chunk.concat_data_size() - ); - fire_and_forget_.push_back(shuffler_.comm_->send( - chunk.release_data_buffer(), dst, gpu_data_tag - )); + for (auto&& message : completed_messages) { + auto chunk = detail::Chunk::deserialize(message->metadata(), false); + if (message->data() != nullptr) { + chunk.set_data_buffer(message->release_data()); } + final_chunks.push_back(std::move(chunk)); } - stats.add_duration_stat( - "event-loop-init-gpu-data-send", Clock::now() - t0_init_gpu_data_send - ); - } - // Check if any data in transit is finished. - { - auto const t0_check_future_finish = Clock::now(); - RAPIDSMPF_NVTX_SCOPED_RANGE("check_fut_finish", in_transit_futures_.size()); - if (!in_transit_futures_.empty()) { - std::vector finished = - shuffler_.comm_->test_some(in_transit_futures_); - for (auto cid : finished) { - auto chunk = extract_value(in_transit_chunks_, cid); - auto future = extract_value(in_transit_futures_, cid); - chunk.set_data_buffer( - shuffler_.comm_->release_data(std::move(future)) - ); + // Process completed chunks and insert them into the ready postbox + for (auto&& chunk : final_chunks) { + // Validate ownership + RAPIDSMPF_EXPECTS( + shuffler_.partition_owner(shuffler_.comm_, chunk.part_id(0)) + == shuffler_.comm_->rank(), + "receiving chunk not owned by us" + ); - for (size_t i = 0; i < chunk.n_messages(); ++i) { - // ready postbox uniquely identifies chunks by their [partition - // ID, chunk ID] pair. We can reuse the same chunk ID for the - // copy because the partition IDs are unique within a chunk. - shuffler_.insert_into_ready_postbox( - chunk.get_data(chunk.chunk_id(), i, shuffler_.br_) + if (chunk.concat_data_size() > 0) { + stats.add_bytes_stat( + "shuffle-payload-recv", chunk.concat_data_size() + ); + if (chunk.data_memory_type() == MemoryType::HOST) { + stats.add_bytes_stat( + "spill-bytes-recv-to-host", chunk.concat_data_size() ); } } - } - // Check if we can free some of the outstanding futures. - if (!fire_and_forget_.empty()) { - std::ignore = shuffler_.comm_->test_some(fire_and_forget_); + // Split multi-message chunks into individual chunks for the ready postbox + for (size_t i = 0; i < chunk.n_messages(); ++i) { + auto chunk_copy = chunk.get_data(chunk.chunk_id(), i, shuffler_.br_); + shuffler_.insert_into_ready_postbox(std::move(chunk_copy)); + } } + stats.add_duration_stat( - "event-loop-check-future-finish", Clock::now() - t0_check_future_finish + "event-loop-process-communication", Clock::now() - t0_process_comm ); } @@ -415,30 +280,13 @@ class Shuffler::Progress { // Return Done only if the shuffler is inactive (shutdown was called) _and_ // all containers are empty (all work is done). return (shuffler_.active_.load(std::memory_order_acquire) - || !( - fire_and_forget_.empty() && incoming_chunks_.empty() - && outgoing_chunks_.empty() && in_transit_chunks_.empty() - && in_transit_futures_.empty() && shuffler_.outgoing_postbox_.empty() - )) + || !shuffler_.mpe_->is_idle() || !shuffler_.outgoing_postbox_.empty()) ? ProgressThread::ProgressState::InProgress : ProgressThread::ProgressState::Done; } private: Shuffler& shuffler_; - std::vector> - fire_and_forget_; ///< Ongoing "fire-and-forget" operations (non-blocking sends). - std::multimap - incoming_chunks_; ///< Chunks ready to be received. - std::unordered_map - outgoing_chunks_; ///< Chunks ready to be sent. - std::unordered_map - in_transit_chunks_; ///< Chunks currently in transit. - std::unordered_map> - in_transit_futures_; ///< Futures corresponding to in-transit chunks. - std::unordered_map>> - ready_ack_receives_; ///< Receives matching ready for data messages. - int64_t p_iters = 0; ///< Number of progress iterations (for NVTX) }; @@ -464,7 +312,8 @@ Shuffler::Shuffler( BufferResource* br, FinishedCallback&& finished_callback, std::shared_ptr statistics, - PartitionOwner partition_owner_fn + PartitionOwner partition_owner_fn, + std::unique_ptr mpe ) : total_num_partitions{total_num_partitions}, partition_owner{std::move(partition_owner_fn)}, @@ -480,6 +329,19 @@ Shuffler::Shuffler( static_cast(total_num_partitions), }, comm_{std::move(comm)}, + mpe_{ + mpe ? std::move(mpe) + : std::make_unique( + comm_, + op_id, + [this](std::size_t size) -> std::unique_ptr { + return allocate_buffer( + size, br_->stream_pool().get_stream(), br_ + ); + }, + statistics + ) + }, progress_thread_{std::move(progress_thread)}, op_id_{op_id}, local_partitions_{local_partitions(comm_, total_num_partitions, partition_owner)}, diff --git a/python/rapidsmpf/rapidsmpf/tests/test_dask.py b/python/rapidsmpf/rapidsmpf/tests/test_dask.py index 3741edf0f..21127b1bd 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_dask.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_dask.py @@ -377,11 +377,13 @@ def test_gather_shuffle_statistics() -> None: stats = gather_shuffle_statistics(client) expected_stats = { - "event-loop-check-future-finish", - "event-loop-init-gpu-data-send", - "event-loop-metadata-recv", - "event-loop-metadata-send", - "event-loop-post-incoming-chunk-recv", + "metadata-payload-exchange-complete-data-transfers", + "metadata-payload-exchange-progress", + "metadata-payload-exchange-receive-metadata", + "metadata-payload-exchange-send-messages", + "metadata-payload-exchange-setup-data-receives", + "event-loop-process-communication", + "event-loop-submit-outgoing", "event-loop-total", "shuffle-payload-recv", "shuffle-payload-send", From 095c14e34a864933cd7d5a8364581a6090f24187 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 10 Nov 2025 14:03:30 -0800 Subject: [PATCH 2/3] Fix send of zero-sized payloads --- cpp/src/communicator/metadata_payload_exchange/tag.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/communicator/metadata_payload_exchange/tag.cpp b/cpp/src/communicator/metadata_payload_exchange/tag.cpp index 23f04f79d..ea9d26fff 100644 --- a/cpp/src/communicator/metadata_payload_exchange/tag.cpp +++ b/cpp/src/communicator/metadata_payload_exchange/tag.cpp @@ -87,7 +87,7 @@ void TagMetadataPayloadExchange::send( ); // Send data immediately after metadata (if any) - if (message->data() != nullptr) { + if (payload_size > 0) { fire_and_forget_.push_back( comm_->send(message->release_data(), dst, gpu_data_tag_) ); From 5cf22a18732849bf5fe8e4958d2ac0731c6d5a91 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 11 Nov 2025 01:49:28 -0800 Subject: [PATCH 3/3] Add a helper function to convert Chunks into Messages --- cpp/src/shuffler/shuffler.cpp | 55 ++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/cpp/src/shuffler/shuffler.cpp b/cpp/src/shuffler/shuffler.cpp index 28e4bee75..af17ec2b1 100644 --- a/cpp/src/shuffler/shuffler.cpp +++ b/cpp/src/shuffler/shuffler.cpp @@ -87,6 +87,41 @@ std::unique_ptr allocate_buffer( return ret; } +/** + * @brief Convert chunks into messages for communication. + * + * This function converts a vector of chunks into messages suitable for sending + * through the metadata payload exchange. Each chunk is serialized and its data + * buffer is released to create the message. + * + * @param chunks Vector of chunks to convert (will be moved from). + * @param peer_rank_fn Function to determine the destination rank for each chunk. + * + * @return A vector of message unique pointers ready to be sent. + */ +template +std::vector> +convert_chunks_to_messages( + std::vector&& chunks, PeerRankFn&& peer_rank_fn +) { + std::vector> messages; + messages.reserve(chunks.size()); + + for (auto&& chunk : chunks) { + auto dst = peer_rank_fn(chunk); + auto metadata = *chunk.serialize(); + auto data = chunk.release_data_buffer(); + + messages.push_back( + std::make_unique( + dst, std::move(metadata), std::move(data) + ) + ); + } + + return messages; +} + /** * @brief Spills memory buffers within a postbox, e.g., from device to host memory. * @@ -198,23 +233,9 @@ class Shuffler::Progress { return dst; }; - // Convert chunks to simple messages manually - std::vector< - std::unique_ptr> - messages; - messages.reserve(ready_chunks.size()); - - for (auto&& chunk : ready_chunks) { - auto dst = peer_rank_fn(chunk); - auto metadata = *chunk.serialize(); - auto data = chunk.release_data_buffer(); - - messages.push_back( - std::make_unique( - dst, std::move(metadata), std::move(data) - ) - ); - } + // Convert chunks to messages using the helper function + auto messages = + convert_chunks_to_messages(std::move(ready_chunks), peer_rank_fn); shuffler_.mpe_->send(std::move(messages)); }