diff --git a/cpp/include/rapidsmpf/coll/allgather.hpp b/cpp/include/rapidsmpf/coll/allgather.hpp index 4f3eebdb0..6b5b8e5a8 100644 --- a/cpp/include/rapidsmpf/coll/allgather.hpp +++ b/cpp/include/rapidsmpf/coll/allgather.hpp @@ -63,6 +63,8 @@ class Chunk { * @param id Unique chunk identifier. * @param metadata Serialized metadata for the chunk. * @param data Data buffer containing the chunk's payload. + * + * @throw std::invalid_argument If either @p metadata and @p data are null. */ Chunk( ChunkID id, diff --git a/cpp/include/rapidsmpf/communicator/communicator.hpp b/cpp/include/rapidsmpf/communicator/communicator.hpp index a5b7123d5..94fdc5a4d 100644 --- a/cpp/include/rapidsmpf/communicator/communicator.hpp +++ b/cpp/include/rapidsmpf/communicator/communicator.hpp @@ -430,6 +430,8 @@ class Communicator { * @param rank The destination rank. * @param tag Message tag for identification. * @return A unique pointer to a `Future` representing the asynchronous operation. + * + * @throws std::invalid_argument If @p msg is `nullptr`. */ [[nodiscard]] virtual std::unique_ptr send( std::unique_ptr> msg, Rank rank, Tag tag @@ -443,11 +445,13 @@ class Communicator { * @param tag Message tag for identification. * @return A unique pointer to a `Future` representing the asynchronous operation. * + * @throws std::invalid_argument If @p msg is `nullptr` or it is not ready (see + * warning for more details). + * * @warning The caller is responsible to ensure the underlying `Buffer` allocation * and data are already valid before calling, for example, when a CUDA allocation * and/or copy are done asynchronously. Specifically, the caller should ensure - * `Buffer::is_ready()` returns true before calling this function, if not, a - * warning is printed and the application will terminate. + * `Buffer::is_ready()` returns true before calling this function. */ [[nodiscard]] virtual std::unique_ptr send( std::unique_ptr msg, Rank rank, Tag tag @@ -462,11 +466,13 @@ class Communicator { * @param recv_buffer The receive buffer. * @return A unique pointer to a `Future` representing the asynchronous operation. * + * @throws std::invalid_argument If @p recv_buffer is `nullptr` or it is not ready + * (see warning for more details). + * * @warning The caller is responsible to ensure the underlying `Buffer` allocation * is already valid before calling, for example, when a CUDA allocation * and/or copy are done asynchronously. Specifically, the caller should ensure - * `Buffer::is_ready()` returns true before calling this function, if not, a - * warning is printed and the application will terminate. + * `Buffer::is_ready()` returns true before calling this function. */ [[nodiscard]] virtual std::unique_ptr recv( Rank rank, Tag tag, std::unique_ptr recv_buffer @@ -481,6 +487,8 @@ class Communicator { * @param tag Message tag for identification. * @param synced_buffer The receive buffer. * @return A unique pointer to a `Future` representing the asynchronous operation. + * + * @throws std::invalid_argument If @p synced_buffer is `nullptr`. */ [[nodiscard]] virtual std::unique_ptr recv_sync_host_data( Rank rank, Tag tag, std::unique_ptr> synced_buffer diff --git a/cpp/include/rapidsmpf/communicator/mpi.hpp b/cpp/include/rapidsmpf/communicator/mpi.hpp index 2eeb8e7e1..bcd995a0a 100644 --- a/cpp/include/rapidsmpf/communicator/mpi.hpp +++ b/cpp/include/rapidsmpf/communicator/mpi.hpp @@ -139,6 +139,8 @@ class MPI final : public Communicator { /** * @copydoc Communicator::send + * + * @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes). */ [[nodiscard]] std::unique_ptr send( std::unique_ptr> msg, Rank rank, Tag tag @@ -147,6 +149,8 @@ class MPI final : public Communicator { // clang-format off /** * @copydoc Communicator::send(std::unique_ptr msg, Rank rank, Tag tag) + * + * @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes). */ // clang-format on [[nodiscard]] std::unique_ptr send( @@ -155,6 +159,8 @@ class MPI final : public Communicator { /** * @copydoc Communicator::recv + * + * @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes). */ [[nodiscard]] std::unique_ptr recv( Rank rank, Tag tag, std::unique_ptr recv_buffer @@ -163,6 +169,8 @@ class MPI final : public Communicator { // clang-format off /** * @copydoc Communicator::recv_sync_host_data(Rank rank, Tag tag, std::unique_ptr> synced_buffer) + * + * @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes). */ // clang-format on [[nodiscard]] std::unique_ptr recv_sync_host_data( diff --git a/cpp/include/rapidsmpf/memory/buffer.hpp b/cpp/include/rapidsmpf/memory/buffer.hpp index 70fd8cb46..b61fcca80 100644 --- a/cpp/include/rapidsmpf/memory/buffer.hpp +++ b/cpp/include/rapidsmpf/memory/buffer.hpp @@ -280,8 +280,8 @@ class Buffer { * @param stream CUDA stream to associate with the Buffer for future operations. * @param mem_type The memory type of the underlying @p host_buffer. * - * @throws std::invalid_argument If @p host_buffer is null. - * @throws std::invalid_argument If @p mem_type is not suitable for host buffers. + * @throws std::invalid_argument If @p host_buffer is null or @p mem_type is not + * suitable for @p host_buffer. * @throws std::logic_error If the buffer is locked. */ Buffer( @@ -305,8 +305,8 @@ class Buffer { * @param device_buffer Unique pointer to a device buffer. Must be non-null. * @param mem_type The memory type of the underlying @p device_buffer. * - * @throws std::invalid_argument If @p device_buffer is null. - * @throws std::invalid_argument If @p mem_type is not suitable for device buffers. + * @throws std::invalid_argument If @p device_buffer is null or @p mem_type is not + * suitable for @p device_buffer. * @throws std::logic_error If the buffer is locked. */ Buffer(std::unique_ptr device_buffer, MemoryType mem_type); diff --git a/cpp/include/rapidsmpf/memory/host_buffer.hpp b/cpp/include/rapidsmpf/memory/host_buffer.hpp index 3ad35d4a4..27c25d55d 100644 --- a/cpp/include/rapidsmpf/memory/host_buffer.hpp +++ b/cpp/include/rapidsmpf/memory/host_buffer.hpp @@ -203,8 +203,8 @@ class HostBuffer { * * @return A new `HostBuffer` owning the device buffer's memory. * - * @throws std::invalid_argument if `pinned_host_buffer` is null or if the memory type - * of the buffer is not pinned host. + * @throws std::invalid_argument if @p pinned_host_buffer is null or if it is not + * of pinned host memory type. */ static HostBuffer from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, diff --git a/cpp/src/coll/allgather.cpp b/cpp/src/coll/allgather.cpp index 606a6f7c1..02a61f7e8 100644 --- a/cpp/src/coll/allgather.cpp +++ b/cpp/src/coll/allgather.cpp @@ -31,7 +31,11 @@ Chunk::Chunk( metadata_{std::move(metadata)}, data_{std::move(data)}, data_size_{data_ ? data_->size : 0} { - RAPIDSMPF_EXPECTS(metadata_ && data_, "Non-finish chunk must have metadata and data"); + RAPIDSMPF_EXPECTS( + metadata_ && data_, + "Non-finish chunk must have metadata and data", + std::invalid_argument + ); } Chunk::Chunk(ChunkID id) : id_{id}, metadata_{nullptr}, data_{nullptr}, data_size_{0} {} diff --git a/cpp/src/communicator/mpi.cpp b/cpp/src/communicator/mpi.cpp index 6213cf18a..27f1acefa 100644 --- a/cpp/src/communicator/mpi.cpp +++ b/cpp/src/communicator/mpi.cpp @@ -110,6 +110,7 @@ MPI::MPI(MPI_Comm comm, config::Options options) std::unique_ptr MPI::send( std::unique_ptr> msg, Rank rank, Tag tag ) { + RAPIDSMPF_EXPECTS(msg != nullptr, "msg cannot be null", std::invalid_argument); RAPIDSMPF_EXPECTS( msg->size() <= std::numeric_limits::max(), "send buffer size exceeds MPI max count" @@ -124,6 +125,7 @@ std::unique_ptr MPI::send( std::unique_ptr MPI::send( std::unique_ptr msg, Rank rank, Tag tag ) { + RAPIDSMPF_EXPECTS(msg != nullptr, "msg buffer cannot be null", std::invalid_argument); RAPIDSMPF_EXPECTS(msg->is_latest_write_done(), "msg must be ready"); RAPIDSMPF_EXPECTS( msg->size <= std::numeric_limits::max(), diff --git a/cpp/src/communicator/ucxx.cpp b/cpp/src/communicator/ucxx.cpp index 043f830ac..ba1f136b9 100644 --- a/cpp/src/communicator/ucxx.cpp +++ b/cpp/src/communicator/ucxx.cpp @@ -1134,6 +1134,7 @@ std::shared_ptr<::ucxx::Endpoint> UCXX::get_endpoint(Rank rank) { std::unique_ptr UCXX::send( std::unique_ptr> msg, Rank rank, Tag tag ) { + RAPIDSMPF_EXPECTS(msg != nullptr, "msg cannot be null", std::invalid_argument); auto req = get_endpoint(rank)->tagSend( msg->data(), msg->size(), @@ -1145,6 +1146,7 @@ std::unique_ptr UCXX::send( std::unique_ptr UCXX::send( std::unique_ptr msg, Rank rank, Tag tag ) { + RAPIDSMPF_EXPECTS(msg != nullptr, "msg buffer cannot be null", std::invalid_argument); RAPIDSMPF_EXPECTS(msg->is_latest_write_done(), "msg must be ready"); auto req = get_endpoint(rank)->tagSend( msg->data(), msg->size, tag_with_rank(shared_resources_->rank(), tag) @@ -1155,7 +1157,9 @@ std::unique_ptr UCXX::send( std::unique_ptr UCXX::recv( Rank rank, Tag tag, std::unique_ptr recv_buffer ) { - RAPIDSMPF_EXPECTS(recv_buffer != nullptr, "recv buffer is nullptr"); + RAPIDSMPF_EXPECTS( + recv_buffer != nullptr, "recv buffer cannot be null", std::invalid_argument + ); RAPIDSMPF_EXPECTS(recv_buffer->is_latest_write_done(), "msg must be ready"); auto req = get_endpoint(rank)->tagRecv( recv_buffer->exclusive_data_access(), @@ -1169,7 +1173,9 @@ std::unique_ptr UCXX::recv( std::unique_ptr UCXX::recv_sync_host_data( Rank rank, Tag tag, std::unique_ptr> synced_buffer ) { - RAPIDSMPF_EXPECTS(synced_buffer != nullptr, "recv host buffer is nullptr"); + RAPIDSMPF_EXPECTS( + synced_buffer != nullptr, "recv host buffer cannot be null", std::invalid_argument + ); auto req = get_endpoint(rank)->tagRecv( synced_buffer->data(), synced_buffer->size(), diff --git a/cpp/src/memory/buffer.cpp b/cpp/src/memory/buffer.cpp index 385685903..c8425ccde 100644 --- a/cpp/src/memory/buffer.cpp +++ b/cpp/src/memory/buffer.cpp @@ -28,7 +28,9 @@ Buffer::Buffer( storage_{std::move(host_buffer)}, stream_{stream} { RAPIDSMPF_EXPECTS( - std::get(storage_) != nullptr, "the host_buffer cannot be NULL" + std::get(storage_) != nullptr, + "the host_buffer cannot be NULL", + std::invalid_argument ); RAPIDSMPF_EXPECTS( contains(host_buffer_types, mem_type_),