Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/include/rapidsmpf/coll/allgather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class Chunk {
* @param id Unique chunk identifier.
* @param metadata Serialized metadata for the chunk.
* @param data Data buffer containing the chunk's payload.
*
* @throw std::invalid_argument If either @p metadata and @p data are null.
*/
Chunk(
ChunkID id,
Expand Down
16 changes: 12 additions & 4 deletions cpp/include/rapidsmpf/communicator/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ class Communicator {
* @param rank The destination rank.
* @param tag Message tag for identification.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @throws std::invalid_argument If @p msg is `nullptr`.
*/
[[nodiscard]] virtual std::unique_ptr<Future> send(
std::unique_ptr<std::vector<uint8_t>> msg, Rank rank, Tag tag
Expand All @@ -443,11 +445,13 @@ class Communicator {
* @param tag Message tag for identification.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @throws std::invalid_argument If @p msg is `nullptr` or it is not ready (see
* warning for more details).
*
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
* and data are already valid before calling, for example, when a CUDA allocation
* and/or copy are done asynchronously. Specifically, the caller should ensure
* `Buffer::is_ready()` returns true before calling this function, if not, a
* warning is printed and the application will terminate.
* `Buffer::is_ready()` returns true before calling this function.
*/
[[nodiscard]] virtual std::unique_ptr<Future> send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
Expand All @@ -462,11 +466,13 @@ class Communicator {
* @param recv_buffer The receive buffer.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @throws std::invalid_argument If @p recv_buffer is `nullptr` or it is not ready
* (see warning for more details).
*
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
* is already valid before calling, for example, when a CUDA allocation
* and/or copy are done asynchronously. Specifically, the caller should ensure
* `Buffer::is_ready()` returns true before calling this function, if not, a
* warning is printed and the application will terminate.
* `Buffer::is_ready()` returns true before calling this function.
*/
[[nodiscard]] virtual std::unique_ptr<Future> recv(
Rank rank, Tag tag, std::unique_ptr<Buffer> recv_buffer
Expand All @@ -481,6 +487,8 @@ class Communicator {
* @param tag Message tag for identification.
* @param synced_buffer The receive buffer.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @throws std::invalid_argument If @p synced_buffer is `nullptr`.
*/
[[nodiscard]] virtual std::unique_ptr<Future> recv_sync_host_data(
Rank rank, Tag tag, std::unique_ptr<std::vector<uint8_t>> synced_buffer
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/rapidsmpf/communicator/mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class MPI final : public Communicator {

/**
* @copydoc Communicator::send
*
* @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes).
*/
[[nodiscard]] std::unique_ptr<Communicator::Future> send(
std::unique_ptr<std::vector<uint8_t>> msg, Rank rank, Tag tag
Expand All @@ -147,6 +149,8 @@ class MPI final : public Communicator {
// clang-format off
/**
* @copydoc Communicator::send(std::unique_ptr<Buffer> msg, Rank rank, Tag tag)
*
* @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes).
*/
// clang-format on
[[nodiscard]] std::unique_ptr<Communicator::Future> send(
Expand All @@ -155,6 +159,8 @@ class MPI final : public Communicator {

/**
* @copydoc Communicator::recv
*
* @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes).
*/
[[nodiscard]] std::unique_ptr<Communicator::Future> recv(
Rank rank, Tag tag, std::unique_ptr<Buffer> recv_buffer
Expand All @@ -163,6 +169,8 @@ class MPI final : public Communicator {
// clang-format off
/**
* @copydoc Communicator::recv_sync_host_data(Rank rank, Tag tag, std::unique_ptr<std::vector<uint8_t>> synced_buffer)
*
* @throws std::runtime_error If the message exceeds MPI size limit (2^31 bytes).
*/
// clang-format on
[[nodiscard]] std::unique_ptr<Communicator::Future> recv_sync_host_data(
Expand Down
8 changes: 4 additions & 4 deletions cpp/include/rapidsmpf/memory/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ class Buffer {
* @param stream CUDA stream to associate with the Buffer for future operations.
* @param mem_type The memory type of the underlying @p host_buffer.
*
* @throws std::invalid_argument If @p host_buffer is null.
* @throws std::invalid_argument If @p mem_type is not suitable for host buffers.
* @throws std::invalid_argument If @p host_buffer is null or @p mem_type is not
* suitable for @p host_buffer.
* @throws std::logic_error If the buffer is locked.
*/
Buffer(
Expand All @@ -305,8 +305,8 @@ class Buffer {
* @param device_buffer Unique pointer to a device buffer. Must be non-null.
* @param mem_type The memory type of the underlying @p device_buffer.
*
* @throws std::invalid_argument If @p device_buffer is null.
* @throws std::invalid_argument If @p mem_type is not suitable for device buffers.
* @throws std::invalid_argument If @p device_buffer is null or @p mem_type is not
* suitable for @p device_buffer.
* @throws std::logic_error If the buffer is locked.
*/
Buffer(std::unique_ptr<rmm::device_buffer> device_buffer, MemoryType mem_type);
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/rapidsmpf/memory/host_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ class HostBuffer {
*
* @return A new `HostBuffer` owning the device buffer's memory.
*
* @throws std::invalid_argument if `pinned_host_buffer` is null or if the memory type
* of the buffer is not pinned host.
* @throws std::invalid_argument if @p pinned_host_buffer is null or if it is not
* of pinned host memory type.
*/
static HostBuffer from_rmm_device_buffer(
std::unique_ptr<rmm::device_buffer> pinned_host_buffer,
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/coll/allgather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ Chunk::Chunk(
metadata_{std::move(metadata)},
data_{std::move(data)},
data_size_{data_ ? data_->size : 0} {
RAPIDSMPF_EXPECTS(metadata_ && data_, "Non-finish chunk must have metadata and data");
RAPIDSMPF_EXPECTS(
metadata_ && data_,
"Non-finish chunk must have metadata and data",
std::invalid_argument
);
}

Chunk::Chunk(ChunkID id) : id_{id}, metadata_{nullptr}, data_{nullptr}, data_size_{0} {}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/communicator/mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ MPI::MPI(MPI_Comm comm, config::Options options)
std::unique_ptr<Communicator::Future> MPI::send(
std::unique_ptr<std::vector<uint8_t>> msg, Rank rank, Tag tag
) {
RAPIDSMPF_EXPECTS(msg != nullptr, "msg cannot be null", std::invalid_argument);
RAPIDSMPF_EXPECTS(
msg->size() <= std::numeric_limits<int>::max(),
"send buffer size exceeds MPI max count"
Expand All @@ -124,6 +125,7 @@ std::unique_ptr<Communicator::Future> MPI::send(
std::unique_ptr<Communicator::Future> MPI::send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
) {
RAPIDSMPF_EXPECTS(msg != nullptr, "msg buffer cannot be null", std::invalid_argument);
RAPIDSMPF_EXPECTS(msg->is_latest_write_done(), "msg must be ready");
RAPIDSMPF_EXPECTS(
msg->size <= std::numeric_limits<int>::max(),
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/communicator/ucxx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,7 @@ std::shared_ptr<::ucxx::Endpoint> UCXX::get_endpoint(Rank rank) {
std::unique_ptr<Communicator::Future> UCXX::send(
std::unique_ptr<std::vector<uint8_t>> msg, Rank rank, Tag tag
) {
RAPIDSMPF_EXPECTS(msg != nullptr, "msg cannot be null", std::invalid_argument);
auto req = get_endpoint(rank)->tagSend(
msg->data(),
msg->size(),
Expand All @@ -1145,6 +1146,7 @@ std::unique_ptr<Communicator::Future> UCXX::send(
std::unique_ptr<Communicator::Future> UCXX::send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
) {
RAPIDSMPF_EXPECTS(msg != nullptr, "msg buffer cannot be null", std::invalid_argument);
RAPIDSMPF_EXPECTS(msg->is_latest_write_done(), "msg must be ready");
auto req = get_endpoint(rank)->tagSend(
msg->data(), msg->size, tag_with_rank(shared_resources_->rank(), tag)
Expand All @@ -1155,7 +1157,9 @@ std::unique_ptr<Communicator::Future> UCXX::send(
std::unique_ptr<Communicator::Future> UCXX::recv(
Rank rank, Tag tag, std::unique_ptr<Buffer> recv_buffer
) {
RAPIDSMPF_EXPECTS(recv_buffer != nullptr, "recv buffer is nullptr");
RAPIDSMPF_EXPECTS(
recv_buffer != nullptr, "recv buffer cannot be null", std::invalid_argument
);
RAPIDSMPF_EXPECTS(recv_buffer->is_latest_write_done(), "msg must be ready");
auto req = get_endpoint(rank)->tagRecv(
recv_buffer->exclusive_data_access(),
Expand All @@ -1169,7 +1173,9 @@ std::unique_ptr<Communicator::Future> UCXX::recv(
std::unique_ptr<Communicator::Future> UCXX::recv_sync_host_data(
Rank rank, Tag tag, std::unique_ptr<std::vector<uint8_t>> synced_buffer
) {
RAPIDSMPF_EXPECTS(synced_buffer != nullptr, "recv host buffer is nullptr");
RAPIDSMPF_EXPECTS(
synced_buffer != nullptr, "recv host buffer cannot be null", std::invalid_argument
);
auto req = get_endpoint(rank)->tagRecv(
synced_buffer->data(),
synced_buffer->size(),
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/memory/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ Buffer::Buffer(
storage_{std::move(host_buffer)},
stream_{stream} {
RAPIDSMPF_EXPECTS(
std::get<HostBufferT>(storage_) != nullptr, "the host_buffer cannot be NULL"
std::get<HostBufferT>(storage_) != nullptr,
"the host_buffer cannot be NULL",
std::invalid_argument
);
RAPIDSMPF_EXPECTS(
contains(host_buffer_types, mem_type_),
Expand Down
Loading