Skip to content
2 changes: 1 addition & 1 deletion bench/python/dlslime_custom_sendrecv_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def setup_rdma_connection(args):
"""Establish RDMA connection between sender and receiver"""
print(f'Initializing RDMA endpoint for {args.mode}...')

num_qp = 4
num_qp = 1
end_point = _slime_c.rdma_endpoint(args.device, args.rdmaport, args.type, num_qp)

zmq_ctx = zmq.Context()
Expand Down
13 changes: 7 additions & 6 deletions bench/python/dlslime_torch_dist_sendrecv_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
print(e, "please install dlslime backend first")
exit()


def benchmark_send_recv(args):
# Initialize process group
print("Initialize process group")
Expand All @@ -38,27 +39,28 @@ def benchmark_send_recv(args):
if args.sizes:
sizes = [int(s) for s in args.sizes]
else:
sizes = [2**n for n in range(11, 26)] # 256B to 256MB
sizes = [2**n for n in range(11, 12)] # 256B to 256MB

print("Prepare data sizes: ", sizes)
benchmark_data = []
num = 2
num = 1
print("Start to test the bench")
for size in sizes:
num_elements = max(1, size // 4)
send_batch = [
torch.ones(num_elements, device=device, dtype=torch.float32)
for _ in range(num)
]
# print(send_batch[0])
recv_batch = [
torch.zeros(num_elements, device=device, dtype=torch.float32)
for _ in range(num)
]

if args.use_gpu:
torch.cuda.synchronize()
for _ in range(25):

for _ in range(args.iterations):
all_work = []
reqs = []
for i in range(num):
Expand All @@ -76,15 +78,14 @@ def benchmark_send_recv(args):
if args.use_gpu:
torch.cuda.synchronize()
start_time = time.time()

for _ in range(args.iterations):
all_work = []
for i in range(num):
if rank == 0:
send_op = dist.isend(send_batch[i], dst=1, group=slime_group)
all_work.extend([send_op])
else:
recv_op = dist.irecv(recv_batch[i], src=0 ,group=slime_group)
recv_op = dist.irecv(recv_batch[i], src=0, group=slime_group)
all_work.extend([recv_op])

[w.wait() for w in all_work]
Expand Down
1 change: 1 addition & 0 deletions compile_commands.json
2 changes: 0 additions & 2 deletions csrc/engine/rdma/memory_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ int RDMAMemoryPool::register_memory_region(const std::string& mr_key, uintptr_t
/* MemoryRegion Access Right = 777 */
const static int access_rights = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
ibv_mr* mr = ibv_reg_mr(pd_, (void*)data_ptr, length, access_rights);

SLIME_ASSERT(mr, " Failed to register memory " << data_ptr);

SLIME_LOG_DEBUG("Memory region: " << mr_key << ", " << (void*)data_ptr << " -- " << (void*)(data_ptr + length)
<< ", Device name: " << pd_->context->device->dev_name << ", Length: " << length
<< " (" << length / 1024 / 1024 << " MB)"
Expand Down
15 changes: 9 additions & 6 deletions csrc/engine/rdma/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ typedef struct remote_mr {
remote_mr(uintptr_t addr, size_t length, uint32_t rkey): addr(addr), length(length), rkey(rkey) {}

uintptr_t addr{(uintptr_t) nullptr};
size_t length{};
uint32_t rkey{};
size_t length{0};
uint32_t rkey{0};
} remote_mr_t;

class RDMAMemoryPool {
Expand All @@ -34,6 +34,7 @@ class RDMAMemoryPool {

~RDMAMemoryPool()
{

for (auto& mr : mrs_) {
if (mr.second)
ibv_dereg_mr(mr.second);
Expand All @@ -51,19 +52,21 @@ class RDMAMemoryPool {
inline struct ibv_mr* get_mr(const std::string& mr_key)
{
std::unique_lock<std::mutex> lock(mrs_mutex_);
if (mrs_.find(mr_key) != mrs_.end())
if (mrs_.find(mr_key) != mrs_.end()) {
SLIME_LOG_DEBUG("mr_key: ", mr_key, " is found in mrs_");
return mrs_[mr_key];

}
SLIME_LOG_WARN("mr_key: ", mr_key, "not found in mrs_");
return nullptr;
}

inline remote_mr_t get_remote_mr(const std::string& mr_key)
{
std::unique_lock<std::mutex> lock(remote_mrs_mutex_);
if (remote_mrs_.find(mr_key) != remote_mrs_.end())
if (remote_mrs_.find(mr_key) != remote_mrs_.end()) {
SLIME_LOG_DEBUG("mr_key: ", mr_key, " is found in remote_mrs_");
return remote_mrs_[mr_key];

}
SLIME_LOG_WARN("mr_key: ", mr_key, " not found in remote_mrs_");
return remote_mr_t();
}
Expand Down
3 changes: 2 additions & 1 deletion csrc/engine/rdma/rdma_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ typedef struct callback_info {
callback_info() = default;
callback_info(OpCode opcode, size_t batch_size, callback_fn_t callback): opcode_(opcode), batch_size_(batch_size)
{
if (callback)
if (callback) {
callback_ = std::move(callback);
}
}

callback_fn_t callback_{[this](int code, int imm_data) {
Expand Down
24 changes: 13 additions & 11 deletions csrc/engine/rdma/rdma_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@

#include "engine/rdma/rdma_buffer.h"
#include "engine/assignment.h"

namespace slime {

void RDMABuffer::send()
{
endpoint_->addSendTask(shared_from_this());
endpoint_->addRDMABuffer(OpCode::SEND, shared_from_this());
}

void RDMABuffer::recv()
{
endpoint_->addRecvTask(shared_from_this());
endpoint_->addRDMABuffer(OpCode::RECV, shared_from_this());
}

void RDMABuffer::send_done_callback()
void RDMABuffer::sendDoneCallback()
{
std::unique_lock<std::mutex> lock(send_mutex_);
++send_completed_;
send_cv_.notify_all();
}

void RDMABuffer::recv_done_callback()
void RDMABuffer::recvDoneCallback()
{
std::unique_lock<std::mutex> lock(recv_mutex_);
++recv_completed_;
Expand All @@ -31,25 +32,26 @@ bool RDMABuffer::waitSend()
{
std::unique_lock<std::mutex> lock(send_mutex_);

if (send_completed_)
return send_completed_;

if (send_completed_) {
return true;
}
send_cv_.wait(lock, [this]() { return send_completed_ > 0; });
send_pending_ = false;
SLIME_LOG_INFO("complete to send the data.");
return send_completed_;
return true;
}

bool RDMABuffer::waitRecv()
{
std::unique_lock<std::mutex> lock(recv_mutex_);

if (recv_completed_)
return recv_completed_;
if (recv_completed_) {
return true;
}

recv_cv_.wait(lock, [this]() { return recv_completed_ > 0; });
recv_pending_ = false;
SLIME_LOG_INFO("complete to send the data.");
return recv_completed_;
return true;
}
} // namespace slime
46 changes: 18 additions & 28 deletions csrc/engine/rdma/rdma_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "engine/rdma/rdma_endpoint.h"

#include <condition_variable>
#include <cstdint>
#include <infiniband/verbs.h>
#include <map>
#include <memory>
Expand All @@ -13,6 +14,7 @@
#include <unordered_map>
#include <vector>

#include "logging.h"
#include "rdma_common.h"

namespace slime {
Expand All @@ -23,36 +25,24 @@ class RDMABuffer: public std::enable_shared_from_this<RDMABuffer> {
friend class RDMAEndpoint;

public:
RDMABuffer(std::shared_ptr<RDMAEndpoint> endpoint, storage_view_batch_t& batch):
endpoint_(endpoint), storage_view_batch_(std::move(batch))
RDMABuffer(std::shared_ptr<RDMAEndpoint> endpoint, uintptr_t ptr, size_t offset, size_t data_size):
endpoint_(endpoint), ptr_(ptr), offset_(offset), data_size_(data_size)
{
}

RDMABuffer(std::shared_ptr<RDMAEndpoint> endpoint,
std::vector<uintptr_t> ptrs,
std::vector<size_t> offset,
std::vector<size_t> data_size)
RDMABuffer(std::shared_ptr<RDMAEndpoint> endpoint, storage_view_batch_t& batch):
endpoint_(endpoint), storage_view_batch_(std::move(batch))
{

batch_size_ = ptrs.size();
ptrs_ = ptrs;
offset_ = offset;
data_size_ = data_size;
for (uint32_t i = 0; i < batch_size_; ++i) {
storage_view_t view{.data_ptr = ptrs[i], .storage_offset = offset[i], .length = data_size[i]};
storage_view_batch_.push_back(view);
}
endpoint_ = endpoint;
}

~RDMABuffer() = default;

const size_t batchSize()
const size_t batch_size()
{
return storage_view_batch_.size();
}

const storage_view_batch_t& storageViewBatch()
const storage_view_batch_t& view_batch()
{
return storage_view_batch_;
}
Expand All @@ -63,20 +53,20 @@ class RDMABuffer: public std::enable_shared_from_this<RDMABuffer> {
bool waitSend();
bool waitRecv();

void send_done_callback();
void recv_done_callback();

void get_time();
void sendDoneCallback();
void recvDoneCallback();

private:
storage_view_batch_t storage_view_batch_;

std::shared_ptr<RDMAEndpoint> endpoint_;
std::vector<uintptr_t> ptrs_;
std::vector<size_t> offset_;
std::vector<size_t> data_size_;
uintptr_t ptr_;
size_t offset_;
size_t data_size_;

size_t batch_size_;
std::vector<uintptr_t> ptrs_batch_;
std::vector<size_t> offset_batch_;
std::vector<size_t> data_size_batch_;

storage_view_batch_t storage_view_batch_;

std::atomic<int> send_pending_{0};
std::atomic<int> recv_pending_{0};
Expand Down
2 changes: 0 additions & 2 deletions csrc/engine/rdma/rdma_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ void split_assign_by_max_length(OpCode opcode,
AssignmentBatch& batch_split_after_max_length,
size_t max_length)
{
// split assignment by length
for (size_t i = 0; i < batch.size(); ++i) {
if (batch[i].length < max_length) {
batch_split_after_max_length.push_back(std::move(batch[i]));
Expand Down Expand Up @@ -434,7 +433,6 @@ RDMAContext::submit(OpCode opcode, AssignmentBatch& batch, callback_fn_t callbac
size_t length = SLIME_MAX_LENGTH_PER_ASSIGNMENT;
AssignmentBatch batch_split;
split_assign_by_max_length(opcode, batch, batch_split, length);

AssignmentBatch batch_after_agg_qp;
while (batch_split.size() < SLIME_AGG_QP_NUM) {
length = length / 2;
Expand Down
2 changes: 1 addition & 1 deletion csrc/engine/rdma/rdma_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class RDMAContext {

if (pd_)
ibv_dealloc_pd(pd_);

if (ib_ctx_)
ibv_close_device(ib_ctx_);

Expand Down
Loading