From 5aa90e12abf1442880419fd6c9be950ecb2286ac Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 2 Feb 2026 04:27:19 -0800 Subject: [PATCH 1/4] Use `RAPIDSMPF_EXPECTED_FATAL` in communicator, raise for `nullptr` --- .../rapidsmpf/communicator/communicator.hpp | 8 ++++++++ cpp/src/communicator/mpi.cpp | 10 ++++++---- cpp/src/communicator/ucxx.cpp | 14 ++++++++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/cpp/include/rapidsmpf/communicator/communicator.hpp b/cpp/include/rapidsmpf/communicator/communicator.hpp index a5b7123d5..86651eb4b 100644 --- a/cpp/include/rapidsmpf/communicator/communicator.hpp +++ b/cpp/include/rapidsmpf/communicator/communicator.hpp @@ -430,6 +430,8 @@ class Communicator { * @param rank The destination rank. * @param tag Message tag for identification. * @return A unique pointer to a `Future` representing the asynchronous operation. + * + * @throws std::invalid_argument If @p msg is `nullptr`. */ [[nodiscard]] virtual std::unique_ptr send( std::unique_ptr> msg, Rank rank, Tag tag @@ -443,6 +445,8 @@ class Communicator { * @param tag Message tag for identification. * @return A unique pointer to a `Future` representing the asynchronous operation. * + * @throws std::invalid_argument If @p msg is `nullptr`. + * * @warning The caller is responsible to ensure the underlying `Buffer` allocation * and data are already valid before calling, for example, when a CUDA allocation * and/or copy are done asynchronously. Specifically, the caller should ensure @@ -462,6 +466,8 @@ class Communicator { * @param recv_buffer The receive buffer. * @return A unique pointer to a `Future` representing the asynchronous operation. * + * @throws std::invalid_argument If @p recv_buffer is `nullptr`. + * * @warning The caller is responsible to ensure the underlying `Buffer` allocation * is already valid before calling, for example, when a CUDA allocation * and/or copy are done asynchronously. Specifically, the caller should ensure @@ -481,6 +487,8 @@ class Communicator { * @param tag Message tag for identification. * @param synced_buffer The receive buffer. * @return A unique pointer to a `Future` representing the asynchronous operation. + * + * @throws std::invalid_argument If @p synced_buffer is `nullptr`. */ [[nodiscard]] virtual std::unique_ptr recv_sync_host_data( Rank rank, Tag tag, std::unique_ptr> synced_buffer diff --git a/cpp/src/communicator/mpi.cpp b/cpp/src/communicator/mpi.cpp index 6213cf18a..7e7aac9b9 100644 --- a/cpp/src/communicator/mpi.cpp +++ b/cpp/src/communicator/mpi.cpp @@ -110,7 +110,8 @@ MPI::MPI(MPI_Comm comm, config::Options options) std::unique_ptr MPI::send( std::unique_ptr> msg, Rank rank, Tag tag ) { - RAPIDSMPF_EXPECTS( + RAPIDSMPF_EXPECTS(msg != nullptr, "msg cannot be null", std::invalid_argument); + RAPIDSMPF_EXPECTS_FATAL( msg->size() <= std::numeric_limits::max(), "send buffer size exceeds MPI max count" ); @@ -124,8 +125,9 @@ std::unique_ptr MPI::send( std::unique_ptr MPI::send( std::unique_ptr msg, Rank rank, Tag tag ) { - RAPIDSMPF_EXPECTS(msg->is_latest_write_done(), "msg must be ready"); - RAPIDSMPF_EXPECTS( + RAPIDSMPF_EXPECTS(msg != nullptr, "msg buffer cannot be null", std::invalid_argument); + RAPIDSMPF_EXPECTS_FATAL(msg->is_latest_write_done(), "msg must be ready"); + RAPIDSMPF_EXPECTS_FATAL( msg->size <= std::numeric_limits::max(), "send buffer size exceeds MPI max count" ); @@ -153,7 +155,7 @@ std::unique_ptr MPI::recv( RAPIDSMPF_EXPECTS( recv_buffer != nullptr, "recv buffer cannot be null", std::invalid_argument ); - RAPIDSMPF_EXPECTS(recv_buffer->is_latest_write_done(), "msg must be ready"); + RAPIDSMPF_EXPECTS_FATAL(recv_buffer->is_latest_write_done(), "msg must be ready"); MPI_Request req; mpi_recv_impl( rank, tag, recv_buffer->exclusive_data_access(), recv_buffer->size, comm_, &req diff --git a/cpp/src/communicator/ucxx.cpp b/cpp/src/communicator/ucxx.cpp index 043f830ac..d234fb0e6 100644 --- a/cpp/src/communicator/ucxx.cpp +++ b/cpp/src/communicator/ucxx.cpp @@ -1134,6 +1134,7 @@ std::shared_ptr<::ucxx::Endpoint> UCXX::get_endpoint(Rank rank) { std::unique_ptr UCXX::send( std::unique_ptr> msg, Rank rank, Tag tag ) { + RAPIDSMPF_EXPECTS(msg != nullptr, "msg cannot be null", std::invalid_argument); auto req = get_endpoint(rank)->tagSend( msg->data(), msg->size(), @@ -1145,7 +1146,8 @@ std::unique_ptr UCXX::send( std::unique_ptr UCXX::send( std::unique_ptr msg, Rank rank, Tag tag ) { - RAPIDSMPF_EXPECTS(msg->is_latest_write_done(), "msg must be ready"); + RAPIDSMPF_EXPECTS(msg != nullptr, "msg buffer cannot be null", std::invalid_argument); + RAPIDSMPF_EXPECTS_FATAL(msg->is_latest_write_done(), "msg must be ready"); auto req = get_endpoint(rank)->tagSend( msg->data(), msg->size, tag_with_rank(shared_resources_->rank(), tag) ); @@ -1155,8 +1157,10 @@ std::unique_ptr UCXX::send( std::unique_ptr UCXX::recv( Rank rank, Tag tag, std::unique_ptr recv_buffer ) { - RAPIDSMPF_EXPECTS(recv_buffer != nullptr, "recv buffer is nullptr"); - RAPIDSMPF_EXPECTS(recv_buffer->is_latest_write_done(), "msg must be ready"); + RAPIDSMPF_EXPECTS( + recv_buffer != nullptr, "recv buffer cannot be null", std::invalid_argument + ); + RAPIDSMPF_EXPECTS_FATAL(recv_buffer->is_latest_write_done(), "msg must be ready"); auto req = get_endpoint(rank)->tagRecv( recv_buffer->exclusive_data_access(), recv_buffer->size, @@ -1169,7 +1173,9 @@ std::unique_ptr UCXX::recv( std::unique_ptr UCXX::recv_sync_host_data( Rank rank, Tag tag, std::unique_ptr> synced_buffer ) { - RAPIDSMPF_EXPECTS(synced_buffer != nullptr, "recv host buffer is nullptr"); + RAPIDSMPF_EXPECTS( + synced_buffer != nullptr, "recv host buffer cannot be null", std::invalid_argument + ); auto req = get_endpoint(rank)->tagRecv( synced_buffer->data(), synced_buffer->size(), From 20b8ead6764073d571cb5accb516193b92609337 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 2 Feb 2026 04:36:46 -0800 Subject: [PATCH 2/4] Use `RAPIDSMPF_EXPECTED_FATAL` on unsuitable memory conversion --- cpp/include/rapidsmpf/memory/buffer.hpp | 10 ++++++++-- cpp/include/rapidsmpf/memory/host_buffer.hpp | 7 +++++-- cpp/src/memory/buffer.cpp | 14 +++++--------- cpp/src/memory/host_buffer.cpp | 9 +++------ 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/buffer.hpp b/cpp/include/rapidsmpf/memory/buffer.hpp index 70fd8cb46..7fab5aecf 100644 --- a/cpp/include/rapidsmpf/memory/buffer.hpp +++ b/cpp/include/rapidsmpf/memory/buffer.hpp @@ -281,8 +281,11 @@ class Buffer { * @param mem_type The memory type of the underlying @p host_buffer. * * @throws std::invalid_argument If @p host_buffer is null. - * @throws std::invalid_argument If @p mem_type is not suitable for host buffers. * @throws std::logic_error If the buffer is locked. + * + * @warning The caller is responsible to ensure @p mem_type is suitable for + * @p host_buffer, if not, a warning is printed and the application will + * terminate. */ Buffer( std::unique_ptr host_buffer, @@ -306,8 +309,11 @@ class Buffer { * @param mem_type The memory type of the underlying @p device_buffer. * * @throws std::invalid_argument If @p device_buffer is null. - * @throws std::invalid_argument If @p mem_type is not suitable for device buffers. * @throws std::logic_error If the buffer is locked. + * + * @warning The caller is responsible to ensure @p mem_type is suitable for + * @p device_buffer, if not, a warning is printed and the application will + * terminate. */ Buffer(std::unique_ptr device_buffer, MemoryType mem_type); diff --git a/cpp/include/rapidsmpf/memory/host_buffer.hpp b/cpp/include/rapidsmpf/memory/host_buffer.hpp index 3ad35d4a4..c3813a77d 100644 --- a/cpp/include/rapidsmpf/memory/host_buffer.hpp +++ b/cpp/include/rapidsmpf/memory/host_buffer.hpp @@ -203,8 +203,11 @@ class HostBuffer { * * @return A new `HostBuffer` owning the device buffer's memory. * - * @throws std::invalid_argument if `pinned_host_buffer` is null or if the memory type - * of the buffer is not pinned host. + * @throws std::invalid_argument if `pinned_host_buffer` is null. + * + * @warning The caller is responsible to ensure @p pinned_host_buffer is of pinned + * host memory type, if not, a warning is printed and the application will + * terminate. */ static HostBuffer from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, diff --git a/cpp/src/memory/buffer.cpp b/cpp/src/memory/buffer.cpp index 385685903..4df61f729 100644 --- a/cpp/src/memory/buffer.cpp +++ b/cpp/src/memory/buffer.cpp @@ -30,10 +30,9 @@ Buffer::Buffer( RAPIDSMPF_EXPECTS( std::get(storage_) != nullptr, "the host_buffer cannot be NULL" ); - RAPIDSMPF_EXPECTS( + RAPIDSMPF_EXPECTS_FATAL( contains(host_buffer_types, mem_type_), - "memory type is not suitable for a host buffer", - std::invalid_argument + "memory type is not suitable for a host buffer" ); } @@ -42,14 +41,11 @@ Buffer::Buffer(std::unique_ptr device_buffer, MemoryType mem mem_type_{mem_type}, storage_{std::move(device_buffer)} { RAPIDSMPF_EXPECTS( - std::get(storage_) != nullptr, - "the device buffer cannot be NULL", - std::invalid_argument + std::get(storage_) != nullptr, "the device buffer cannot be NULL" ); - RAPIDSMPF_EXPECTS( + RAPIDSMPF_EXPECTS_FATAL( contains(device_buffer_types, mem_type_), - "memory type is not suitable for a device buffer", - std::invalid_argument + "memory type is not suitable for a device buffer" ); stream_ = std::get(storage_)->stream(); latest_write_event_.record(stream_); diff --git a/cpp/src/memory/host_buffer.cpp b/cpp/src/memory/host_buffer.cpp index 4c2594257..c5035a68f 100644 --- a/cpp/src/memory/host_buffer.cpp +++ b/cpp/src/memory/host_buffer.cpp @@ -144,15 +144,12 @@ HostBuffer HostBuffer::from_rmm_device_buffer( PinnedMemoryResource& mr ) { RAPIDSMPF_EXPECTS( - pinned_host_buffer != nullptr, - "pinned_host_buffer must not be null", - std::invalid_argument + pinned_host_buffer != nullptr, "pinned_host_buffer must not be null" ); - RAPIDSMPF_EXPECTS( + RAPIDSMPF_EXPECTS_FATAL( cuda::is_host_accessible(pinned_host_buffer->data()), - "pinned_host_buffer must be host accessible", - std::invalid_argument + "pinned_host_buffer must be host accessible" ); // Wrap in shared_ptr so the lambda is copyable (required by std::function). From 3c9803ba4e225474d64331ea51020ea7b86b344e Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 2 Feb 2026 04:45:40 -0800 Subject: [PATCH 3/4] Use `RAPIDSMPF_EXPECTS_FATAL` for `Chunk` --- cpp/include/rapidsmpf/coll/allgather.hpp | 6 ++++++ cpp/src/coll/allgather.cpp | 10 +++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/cpp/include/rapidsmpf/coll/allgather.hpp b/cpp/include/rapidsmpf/coll/allgather.hpp index 4f3eebdb0..8308b77e0 100644 --- a/cpp/include/rapidsmpf/coll/allgather.hpp +++ b/cpp/include/rapidsmpf/coll/allgather.hpp @@ -63,6 +63,12 @@ class Chunk { * @param id Unique chunk identifier. * @param metadata Serialized metadata for the chunk. * @param data Data buffer containing the chunk's payload. + * + * @throw std::invalid_argument If both @p metadata and @p data are null. + * + * @warning The caller is responsible to ensure both @p metadata and @p data are + * non-null, but if one of them is null a warning is printed and the application + * will terminate. */ Chunk( ChunkID id, diff --git a/cpp/src/coll/allgather.cpp b/cpp/src/coll/allgather.cpp index 606a6f7c1..11a24a5f0 100644 --- a/cpp/src/coll/allgather.cpp +++ b/cpp/src/coll/allgather.cpp @@ -31,7 +31,15 @@ Chunk::Chunk( metadata_{std::move(metadata)}, data_{std::move(data)}, data_size_{data_ ? data_->size : 0} { - RAPIDSMPF_EXPECTS(metadata_ && data_, "Non-finish chunk must have metadata and data"); + RAPIDSMPF_EXPECTS( + metadata_ && data_, + "Non-finish chunk must have metadata and data", + std::invalid_argument + ); + RAPIDSMPF_EXPECTS_FATAL( + (metadata_ != nullptr) == (data_ != nullptr), + "Non-finish chunk must have both metadata and data, but one of them is null" + ); } Chunk::Chunk(ChunkID id) : id_{id}, metadata_{nullptr}, data_{nullptr}, data_size_{0} {} From 1e75c3e4f03af7e9a6ce812bd43d8358ea1abdec Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 4 Feb 2026 04:34:44 -0800 Subject: [PATCH 4/4] Remove fatal --- cpp/include/rapidsmpf/coll/allgather.hpp | 6 +----- .../rapidsmpf/communicator/communicator.hpp | 12 ++++++------ cpp/include/rapidsmpf/communicator/mpi.hpp | 8 ++++++++ cpp/include/rapidsmpf/memory/buffer.hpp | 14 ++++---------- cpp/include/rapidsmpf/memory/host_buffer.hpp | 7 ++----- cpp/src/coll/allgather.cpp | 4 ---- cpp/src/communicator/mpi.cpp | 8 ++++---- cpp/src/communicator/ucxx.cpp | 4 ++-- cpp/src/memory/buffer.cpp | 18 ++++++++++++------ cpp/src/memory/host_buffer.cpp | 9 ++++++--- 10 files changed, 45 insertions(+), 45 deletions(-) diff --git a/cpp/include/rapidsmpf/coll/allgather.hpp b/cpp/include/rapidsmpf/coll/allgather.hpp index 8308b77e0..6b5b8e5a8 100644 --- a/cpp/include/rapidsmpf/coll/allgather.hpp +++ b/cpp/include/rapidsmpf/coll/allgather.hpp @@ -64,11 +64,7 @@ class Chunk { * @param metadata Serialized metadata for the chunk. * @param data Data buffer containing the chunk's payload. * - * @throw std::invalid_argument If both @p metadata and @p data are null. - * - * @warning The caller is responsible to ensure both @p metadata and @p data are - * non-null, but if one of them is null a warning is printed and the application - * will terminate. + * @throw std::invalid_argument If either @p metadata and @p data are null. */ Chunk( ChunkID id, diff --git a/cpp/include/rapidsmpf/communicator/communicator.hpp b/cpp/include/rapidsmpf/communicator/communicator.hpp index 86651eb4b..94fdc5a4d 100644 --- a/cpp/include/rapidsmpf/communicator/communicator.hpp +++ b/cpp/include/rapidsmpf/communicator/communicator.hpp @@ -445,13 +445,13 @@ class Communicator { * @param tag Message tag for identification. * @return A unique pointer to a `Future` representing the asynchronous operation. * - * @throws std::invalid_argument If @p msg is `nullptr`. + * @throws std::invalid_argument If @p msg is `nullptr` or it is not ready (see + * warning for more details). * * @warning The caller is responsible to ensure the underlying `Buffer` allocation * and data are already valid before calling, for example, when a CUDA allocation * and/or copy are done asynchronously. Specifically, the caller should ensure - * `Buffer::is_ready()` returns true before calling this function, if not, a - * warning is printed and the application will terminate. + * `Buffer::is_ready()` returns true before calling this function. */ [[nodiscard]] virtual std::unique_ptr send( std::unique_ptr msg, Rank rank, Tag tag @@ -466,13 +466,13 @@ class Communicator { * @param recv_buffer The receive buffer. * @return A unique pointer to a `Future` representing the asynchronous operation. * - * @throws std::invalid_argument If @p recv_buffer is `nullptr`. + * @throws std::invalid_argument If @p recv_buffer is `nullptr` or it is not ready + * (see warning for more details). * * @warning The caller is responsible to ensure the underlying `Buffer` allocation * is already valid before calling, for example, when a CUDA allocation * and/or copy are done asynchronously. Specifically, the caller should ensure - * `Buffer::is_ready()` returns true before calling this function, if not, a - * warning is printed and the application will terminate. + * `Buffer::is_ready()` returns true before calling this function. */ [[nodiscard]] virtual std::unique_ptr recv( Rank rank, Tag tag, std::unique_ptr recv_buffer diff --git a/cpp/include/rapidsmpf/communicator/mpi.hpp b/cpp/include/rapidsmpf/communicator/mpi.hpp index 2eeb8e7e1..bcd995a0a 100644 --- a/cpp/include/rapidsmpf/communicator/mpi.hpp +++ b/cpp/include/rapidsmpf/communicator/mpi.hpp @@ -139,6 +139,8 @@ class MPI final : public Communicator { /** * @copydoc Communicator::send + * + * @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes). */ [[nodiscard]] std::unique_ptr send( std::unique_ptr> msg, Rank rank, Tag tag @@ -147,6 +149,8 @@ class MPI final : public Communicator { // clang-format off /** * @copydoc Communicator::send(std::unique_ptr msg, Rank rank, Tag tag) + * + * @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes). */ // clang-format on [[nodiscard]] std::unique_ptr send( @@ -155,6 +159,8 @@ class MPI final : public Communicator { /** * @copydoc Communicator::recv + * + * @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes). */ [[nodiscard]] std::unique_ptr recv( Rank rank, Tag tag, std::unique_ptr recv_buffer @@ -163,6 +169,8 @@ class MPI final : public Communicator { // clang-format off /** * @copydoc Communicator::recv_sync_host_data(Rank rank, Tag tag, std::unique_ptr> synced_buffer) + * + * @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes). */ // clang-format on [[nodiscard]] std::unique_ptr recv_sync_host_data( diff --git a/cpp/include/rapidsmpf/memory/buffer.hpp b/cpp/include/rapidsmpf/memory/buffer.hpp index 7fab5aecf..b61fcca80 100644 --- a/cpp/include/rapidsmpf/memory/buffer.hpp +++ b/cpp/include/rapidsmpf/memory/buffer.hpp @@ -280,12 +280,9 @@ class Buffer { * @param stream CUDA stream to associate with the Buffer for future operations. * @param mem_type The memory type of the underlying @p host_buffer. * - * @throws std::invalid_argument If @p host_buffer is null. + * @throws std::invalid_argument If @p host_buffer is null or @p mem_type is not + * suitable for @p host_buffer. * @throws std::logic_error If the buffer is locked. - * - * @warning The caller is responsible to ensure @p mem_type is suitable for - * @p host_buffer, if not, a warning is printed and the application will - * terminate. */ Buffer( std::unique_ptr host_buffer, @@ -308,12 +305,9 @@ class Buffer { * @param device_buffer Unique pointer to a device buffer. Must be non-null. * @param mem_type The memory type of the underlying @p device_buffer. * - * @throws std::invalid_argument If @p device_buffer is null. + * @throws std::invalid_argument If @p device_buffer is null or @p mem_type is not + * suitable for @p device_buffer. * @throws std::logic_error If the buffer is locked. - * - * @warning The caller is responsible to ensure @p mem_type is suitable for - * @p device_buffer, if not, a warning is printed and the application will - * terminate. */ Buffer(std::unique_ptr device_buffer, MemoryType mem_type); diff --git a/cpp/include/rapidsmpf/memory/host_buffer.hpp b/cpp/include/rapidsmpf/memory/host_buffer.hpp index c3813a77d..27c25d55d 100644 --- a/cpp/include/rapidsmpf/memory/host_buffer.hpp +++ b/cpp/include/rapidsmpf/memory/host_buffer.hpp @@ -203,11 +203,8 @@ class HostBuffer { * * @return A new `HostBuffer` owning the device buffer's memory. * - * @throws std::invalid_argument if `pinned_host_buffer` is null. - * - * @warning The caller is responsible to ensure @p pinned_host_buffer is of pinned - * host memory type, if not, a warning is printed and the application will - * terminate. + * @throws std::invalid_argument if @p pinned_host_buffer is null or if it is not + * of pinned host memory type. */ static HostBuffer from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, diff --git a/cpp/src/coll/allgather.cpp b/cpp/src/coll/allgather.cpp index 11a24a5f0..02a61f7e8 100644 --- a/cpp/src/coll/allgather.cpp +++ b/cpp/src/coll/allgather.cpp @@ -36,10 +36,6 @@ Chunk::Chunk( "Non-finish chunk must have metadata and data", std::invalid_argument ); - RAPIDSMPF_EXPECTS_FATAL( - (metadata_ != nullptr) == (data_ != nullptr), - "Non-finish chunk must have both metadata and data, but one of them is null" - ); } Chunk::Chunk(ChunkID id) : id_{id}, metadata_{nullptr}, data_{nullptr}, data_size_{0} {} diff --git a/cpp/src/communicator/mpi.cpp b/cpp/src/communicator/mpi.cpp index 7e7aac9b9..27f1acefa 100644 --- a/cpp/src/communicator/mpi.cpp +++ b/cpp/src/communicator/mpi.cpp @@ -111,7 +111,7 @@ std::unique_ptr MPI::send( std::unique_ptr> msg, Rank rank, Tag tag ) { RAPIDSMPF_EXPECTS(msg != nullptr, "msg cannot be null", std::invalid_argument); - RAPIDSMPF_EXPECTS_FATAL( + RAPIDSMPF_EXPECTS( msg->size() <= std::numeric_limits::max(), "send buffer size exceeds MPI max count" ); @@ -126,8 +126,8 @@ std::unique_ptr MPI::send( std::unique_ptr msg, Rank rank, Tag tag ) { RAPIDSMPF_EXPECTS(msg != nullptr, "msg buffer cannot be null", std::invalid_argument); - RAPIDSMPF_EXPECTS_FATAL(msg->is_latest_write_done(), "msg must be ready"); - RAPIDSMPF_EXPECTS_FATAL( + RAPIDSMPF_EXPECTS(msg->is_latest_write_done(), "msg must be ready"); + RAPIDSMPF_EXPECTS( msg->size <= std::numeric_limits::max(), "send buffer size exceeds MPI max count" ); @@ -155,7 +155,7 @@ std::unique_ptr MPI::recv( RAPIDSMPF_EXPECTS( recv_buffer != nullptr, "recv buffer cannot be null", std::invalid_argument ); - RAPIDSMPF_EXPECTS_FATAL(recv_buffer->is_latest_write_done(), "msg must be ready"); + RAPIDSMPF_EXPECTS(recv_buffer->is_latest_write_done(), "msg must be ready"); MPI_Request req; mpi_recv_impl( rank, tag, recv_buffer->exclusive_data_access(), recv_buffer->size, comm_, &req diff --git a/cpp/src/communicator/ucxx.cpp b/cpp/src/communicator/ucxx.cpp index d234fb0e6..ba1f136b9 100644 --- a/cpp/src/communicator/ucxx.cpp +++ b/cpp/src/communicator/ucxx.cpp @@ -1147,7 +1147,7 @@ std::unique_ptr UCXX::send( std::unique_ptr msg, Rank rank, Tag tag ) { RAPIDSMPF_EXPECTS(msg != nullptr, "msg buffer cannot be null", std::invalid_argument); - RAPIDSMPF_EXPECTS_FATAL(msg->is_latest_write_done(), "msg must be ready"); + RAPIDSMPF_EXPECTS(msg->is_latest_write_done(), "msg must be ready"); auto req = get_endpoint(rank)->tagSend( msg->data(), msg->size, tag_with_rank(shared_resources_->rank(), tag) ); @@ -1160,7 +1160,7 @@ std::unique_ptr UCXX::recv( RAPIDSMPF_EXPECTS( recv_buffer != nullptr, "recv buffer cannot be null", std::invalid_argument ); - RAPIDSMPF_EXPECTS_FATAL(recv_buffer->is_latest_write_done(), "msg must be ready"); + RAPIDSMPF_EXPECTS(recv_buffer->is_latest_write_done(), "msg must be ready"); auto req = get_endpoint(rank)->tagRecv( recv_buffer->exclusive_data_access(), recv_buffer->size, diff --git a/cpp/src/memory/buffer.cpp b/cpp/src/memory/buffer.cpp index 4df61f729..c8425ccde 100644 --- a/cpp/src/memory/buffer.cpp +++ b/cpp/src/memory/buffer.cpp @@ -28,11 +28,14 @@ Buffer::Buffer( storage_{std::move(host_buffer)}, stream_{stream} { RAPIDSMPF_EXPECTS( - std::get(storage_) != nullptr, "the host_buffer cannot be NULL" + std::get(storage_) != nullptr, + "the host_buffer cannot be NULL", + std::invalid_argument ); - RAPIDSMPF_EXPECTS_FATAL( + RAPIDSMPF_EXPECTS( contains(host_buffer_types, mem_type_), - "memory type is not suitable for a host buffer" + "memory type is not suitable for a host buffer", + std::invalid_argument ); } @@ -41,11 +44,14 @@ Buffer::Buffer(std::unique_ptr device_buffer, MemoryType mem mem_type_{mem_type}, storage_{std::move(device_buffer)} { RAPIDSMPF_EXPECTS( - std::get(storage_) != nullptr, "the device buffer cannot be NULL" + std::get(storage_) != nullptr, + "the device buffer cannot be NULL", + std::invalid_argument ); - RAPIDSMPF_EXPECTS_FATAL( + RAPIDSMPF_EXPECTS( contains(device_buffer_types, mem_type_), - "memory type is not suitable for a device buffer" + "memory type is not suitable for a device buffer", + std::invalid_argument ); stream_ = std::get(storage_)->stream(); latest_write_event_.record(stream_); diff --git a/cpp/src/memory/host_buffer.cpp b/cpp/src/memory/host_buffer.cpp index c5035a68f..4c2594257 100644 --- a/cpp/src/memory/host_buffer.cpp +++ b/cpp/src/memory/host_buffer.cpp @@ -144,12 +144,15 @@ HostBuffer HostBuffer::from_rmm_device_buffer( PinnedMemoryResource& mr ) { RAPIDSMPF_EXPECTS( - pinned_host_buffer != nullptr, "pinned_host_buffer must not be null" + pinned_host_buffer != nullptr, + "pinned_host_buffer must not be null", + std::invalid_argument ); - RAPIDSMPF_EXPECTS_FATAL( + RAPIDSMPF_EXPECTS( cuda::is_host_accessible(pinned_host_buffer->data()), - "pinned_host_buffer must be host accessible" + "pinned_host_buffer must be host accessible", + std::invalid_argument ); // Wrap in shared_ptr so the lambda is copyable (required by std::function).