diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index d2f1b809..1ae637ef 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,8 @@ DEFINE_bool(active_generator, false, "When 'true' this host is generating the traffic, otherwise it is " "bouncing."); DEFINE_bool(verify, false, "Verify payload of received messages."); +DEFINE_uint32(blocking, 0, "Blocking receive"); +DEFINE_uint32(load, 100, "kRPS load"); static volatile int g_keep_running = 1; @@ -102,6 +105,7 @@ class ThreadCtx { hdr_histogram *latency_hist; size_t num_request_latency_samples; std::vector msg_latency_info_vec; + std::chrono::microseconds time_limit; struct { stats_t current; @@ -171,7 +175,7 @@ void ServerLoop(void *channel_ctx) { MachnetFlow_t rx_flow; const ssize_t rx_size = machnet_recv(channel_ctx, thread_ctx.rx_message.data(), - thread_ctx.rx_message.size(), &rx_flow); + thread_ctx.rx_message.size(), &rx_flow, FLAGS_blocking); if (rx_size <= 0) continue; stats_cur.rx_count++; stats_cur.rx_bytes += rx_size; @@ -235,75 +239,88 @@ void ClientSendOne(ThreadCtx *thread_ctx, uint64_t window_slot) { } // Return the window slot for which a response was received -uint64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { +int64_t ClientRecvOne(ThreadCtx *thread_ctx) { const auto *channel_ctx = thread_ctx->channel_ctx; - while (true) { - if (g_keep_running == 0) { - LOG(INFO) << "ClientRecvOneBlocking: Exiting."; - return 0; - } + MachnetFlow_t rx_flow; + const ssize_t rx_size = + machnet_recv(channel_ctx, thread_ctx->rx_message.data(), + thread_ctx->rx_message.size(), &rx_flow, FLAGS_blocking); + if (rx_size <= 0) return -1; - MachnetFlow_t rx_flow; - const ssize_t rx_size = - machnet_recv(channel_ctx, thread_ctx->rx_message.data(), - thread_ctx->rx_message.size(), &rx_flow); - if (rx_size <= 0) continue; + thread_ctx->stats.current.rx_count++; + thread_ctx->stats.current.rx_bytes += rx_size; - thread_ctx->stats.current.rx_count++; - thread_ctx->stats.current.rx_bytes += rx_size; - - const auto *msg_hdr = - reinterpret_cast(thread_ctx->rx_message.data()); - if (msg_hdr->window_slot >= FLAGS_msg_window) { - LOG(ERROR) << "Received invalid window slot: " << msg_hdr->window_slot; - continue; - } + const auto *msg_hdr = + reinterpret_cast(thread_ctx->rx_message.data()); + if (msg_hdr->window_slot > FLAGS_msg_window) { + LOG(ERROR) << "Received invalid window slot: " << msg_hdr->window_slot; + abort(); + } - const size_t latency_us = - thread_ctx->RecordRequestEnd(msg_hdr->window_slot); - VLOG(1) << "Client: Received message for window slot " - << msg_hdr->window_slot << " in " << latency_us << " us"; - - if (FLAGS_verify) { - for (uint32_t i = sizeof(msg_hdr_t); i < rx_size; i++) { - if (thread_ctx->rx_message[i] != thread_ctx->message_gold[i]) { - LOG(ERROR) << "Message data mismatch at index " << i << std::hex - << " " << static_cast(thread_ctx->rx_message[i]) - << " " - << static_cast(thread_ctx->message_gold[i]); - break; - } + const size_t latency_us = thread_ctx->RecordRequestEnd(msg_hdr->window_slot); + VLOG(1) << "Client: Received message for window slot " << msg_hdr->window_slot + << " in " << latency_us << " us"; + + if (FLAGS_verify) { + for (uint32_t i = sizeof(msg_hdr_t); i < rx_size; i++) { + if (thread_ctx->rx_message[i] != thread_ctx->message_gold[i]) { + LOG(ERROR) << "Message data mismatch at index " << i << std::hex << " " + << static_cast(thread_ctx->rx_message[i]) << " " + << static_cast(thread_ctx->message_gold[i]); + break; } } - - return msg_hdr->window_slot; } - - LOG(FATAL) << "Should not reach here"; - return 0; + return msg_hdr->window_slot; } void ClientLoop(void *channel_ctx, MachnetFlow *flow) { ThreadCtx thread_ctx(channel_ctx, flow); + thread_ctx.time_limit = std::chrono::duration_cast( + std::chrono::microseconds(1000) / (FLAGS_load)); LOG(INFO) << "Client Loop: Starting."; + LOG(INFO) << "Time limit is: " << thread_ctx.time_limit.count(); // Send a full window of messages for (uint32_t i = 0; i < FLAGS_msg_window; i++) { ClientSendOne(&thread_ctx, i /* window slot */); } - while (true) { - if (g_keep_running == 0) { - LOG(INFO) << "MsgGenLoop: Exiting."; - break; + auto next = std::chrono::steady_clock::now() + thread_ctx.time_limit; + std::deque backlog; + + while (g_keep_running) { + auto rx_window_slot = ClientRecvOne(&thread_ctx); + + if (rx_window_slot <= 0) { + // Inner loop to handle the case where no message is received + while (g_keep_running) { + rx_window_slot = ClientRecvOne(&thread_ctx); + if (rx_window_slot > 0) break; + + if (std::chrono::steady_clock::now() > next) { + // Handle timeout scenario + next = std::chrono::steady_clock::now() + thread_ctx.time_limit; + auto next_window = ++FLAGS_msg_window; + thread_ctx.msg_latency_info_vec.resize(next_window); + backlog.push_back(next_window); + ClientSendOne(&thread_ctx, backlog.front()); + backlog.pop_front(); + } + } } - - const uint64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); - ClientSendOne(&thread_ctx, rx_window_slot); - + if (g_keep_running == 0) break; + // Check if the time limit has passed and a message is received + if (std::chrono::steady_clock::now() > next) { + ClientSendOne(&thread_ctx, backlog.front()); + backlog.pop_front(); + next = std::chrono::steady_clock::now() + thread_ctx.time_limit; + } + backlog.push_back(rx_window_slot); ReportStats(&thread_ctx); } + LOG(INFO) << "MsgGenLoop: Exiting."; auto &stats_cur = thread_ctx.stats.current; LOG(INFO) << "Application Statistics (TOTAL) - [TX] Sent: " diff --git a/src/core/drivers/shm/channel.cc b/src/core/drivers/shm/channel.cc index 3e9e82e5..2c6d08bc 100644 --- a/src/core/drivers/shm/channel.cc +++ b/src/core/drivers/shm/channel.cc @@ -21,7 +21,8 @@ ShmChannel::ShmChannel(const std::string channel_name, channel_fd_(channel_fd), cached_buf_indices(), cached_bufs(), - cached_buf_count(0) {} + cached_buf_count(0), + posted(0) {} ShmChannel::~ShmChannel() { __machnet_channel_destroy( diff --git a/src/core/drivers/shm/channel_bench.cc b/src/core/drivers/shm/channel_bench.cc index ef18884a..6b0407e5 100644 --- a/src/core/drivers/shm/channel_bench.cc +++ b/src/core/drivers/shm/channel_bench.cc @@ -4,13 +4,15 @@ #include #include #include -#include -#include #include #include +#include #include #include +#include + +DEFINE_uint32(blocking, 0, "Block on receive"); static constexpr uint8_t kStackCpuCoreId = 3; static constexpr uint8_t kAppCpuCoreId = 5; @@ -39,7 +41,7 @@ struct thread_conf { thread_conf(std::shared_ptr ch, uint8_t core_id, uint64_t messages_to_send, uint64_t tx_message_size, uint64_t messages_to_receive) - : channel(ch), + : channel(std::move(ch)), cpu_core(core_id), messages_to_send(messages_to_send), tx_message_size(tx_message_size), @@ -107,7 +109,6 @@ void stack_loop(thread_conf *conf) { } continue; } - buf = channel->MsgBufAlloc(); if (buf == nullptr) { continue; @@ -126,7 +127,6 @@ void stack_loop(thread_conf *conf) { // Send the message. ret = channel->EnqueueMessages(&buf, 1); if (ret != 1) { - LOG(ERROR) << "Couldn't enqueue message. ret: " << ret; channel->MsgBufFree(buf); } conf->messages_sent += ret; @@ -159,15 +159,14 @@ void application_loop(thread_conf *conf) { while (!g_start.load()) { __asm__ volatile("pause" ::: "memory"); } - // Now start receiving messages. auto start = std::chrono::high_resolution_clock::now(); while (!g_should_stop.load()) { // RX. MachnetFlow_t flow; - auto nbytes = - machnet_recv(channel->ctx(), rx_buffer.data(), rx_buffer.size(), &flow); + auto nbytes = machnet_recv(channel->ctx(), rx_buffer.data(), + rx_buffer.size(), &flow, FLAGS_blocking); if (nbytes > 0) { conf->messages_received++; CHECK_EQ(nbytes, conf->tx_message_size); @@ -284,11 +283,18 @@ void print_results(const thread_conf &stack_conf, const thread_conf &app_conf) { 1e9)) : 0.0) << std::endl; + if (FLAGS_blocking) { + std::cout << juggler::utils::Format( + "Stack notified Application side %d times", channel->GetPosted()); + channel->ResetPosted(); + } std::cout << std::endl; } -int main() { +int main(int argc, char *argv[]) { google::InitGoogleLogging("channel_bench"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + FLAGS_logtostderr = 1; signal(SIGINT, [](int) { g_should_stop.store(true); }); @@ -306,7 +312,8 @@ int main() { std::vector> exp_config_vec; exp_config_vec.emplace_back(kMessagesToSend, 0); // Stack -> app only - exp_config_vec.emplace_back(0, kMessagesToSend); // App -> stack only + if (!FLAGS_blocking) + exp_config_vec.emplace_back(0, kMessagesToSend); // App -> stack only exp_config_vec.emplace_back(kMessagesToSend, kMessagesToSend); // Bi-dir LOG(INFO) << "Running channel_bench"; diff --git a/src/core/drivers/shm/channel_test.cc b/src/core/drivers/shm/channel_test.cc index 6ca481a2..0b73504d 100644 --- a/src/core/drivers/shm/channel_test.cc +++ b/src/core/drivers/shm/channel_test.cc @@ -212,7 +212,7 @@ TEST(BasicChannelTest, ChannelEnqueue) { rx_msghdr.msg_iov = &rx_iov; rx_msghdr.msg_iovlen = 1; - EXPECT_EQ(machnet_recvmsg(channel->ctx(), &rx_msghdr), 1); + EXPECT_EQ(machnet_recvmsg(channel->ctx(), &rx_msghdr, NON_BLOCKING), 1); EXPECT_EQ(rx_msghdr.msg_size, kMessageSize); EXPECT_EQ(rx_msg, tx_msg); } @@ -396,7 +396,7 @@ TEST(ChannelFullDuplex, SendRecvMsg) { msghdr.flags = 0; msghdr.flow_info = { .src_ip = 0, .dst_ip = 0, .src_port = 0, .dst_port = 0}; - auto ret = machnet_recvmsg(ctx, &msghdr); + auto ret = machnet_recvmsg(ctx, &msghdr, NON_BLOCKING); if (ret == 1) msg_rx++; // If already sent the amount of messages needed skip. diff --git a/src/core/flow_test.cc b/src/core/flow_test.cc index 3ce77169..6a88e00e 100644 --- a/src/core/flow_test.cc +++ b/src/core/flow_test.cc @@ -124,6 +124,8 @@ class FlowTest : public ::testing::Test { num_msgbufs -= msgbuf_nr; batch.Clear(); } + CHECK_NOTNULL(head); + CHECK_NOTNULL(tail); head->set_msg_length(data.size()); head->set_last(tail->index()); head->mark_first(); @@ -317,7 +319,7 @@ TEST_F(FlowTest, RXQueue_Push) { rx_msghdr.flow_info = {0, 0, 0, 0}; rx_msghdr.msg_iov = &rx_iov; rx_msghdr.msg_iovlen = 1; - auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr); + auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Failed to deliver message to application"; EXPECT_EQ(tx_message, rx_message); EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount()); @@ -397,7 +399,7 @@ TEST_F(FlowTest, RXQueue_Push_OutOfOrder1) { rx_msghdr.flow_info = {0, 0, 0, 0}; rx_msghdr.msg_iov = &rx_iov; rx_msghdr.msg_iovlen = 1; - auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr); + auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Failed to deliver message to application"; EXPECT_EQ(tx_message, rx_message); EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount()); @@ -496,7 +498,7 @@ TEST_F(FlowTest, RXQueue_Push_OutOfOrder2) { rx_msghdr.flow_info = {0, 0, 0, 0}; rx_msghdr.msg_iov = &rx_iov; rx_msghdr.msg_iovlen = 1; - auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr); + auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Failed to deliver message to application"; EXPECT_EQ(tx_message, rx_message); EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount()); diff --git a/src/ext/Makefile b/src/ext/Makefile index 0a24964f..ec15148f 100644 --- a/src/ext/Makefile +++ b/src/ext/Makefile @@ -1,6 +1,6 @@ CC = gcc CFLAGS = -Wall -fPIC -LDFLAGS = -shared +LDFLAGS = -shared -pthread LIBS = -luuid TARGET = libmachnet_shim.so SRCS = machnet.c diff --git a/src/ext/machnet.c b/src/ext/machnet.c index d104c7f5..3e706434 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -639,7 +640,7 @@ int machnet_sendmmsg(const void *channel_ctx, } ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len, - MachnetFlow_t *flow) { + MachnetFlow_t *flow, uint32_t blocking) { MachnetMsgHdr_t msghdr; MachnetIovec_t iov; iov.base = buf; @@ -647,14 +648,15 @@ ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len, msghdr.msg_iov = &iov; msghdr.msg_iovlen = 1; - const int ret = machnet_recvmsg(channel_ctx, &msghdr); + const int ret = machnet_recvmsg(channel_ctx, &msghdr, blocking); if (ret <= 0) return ret; // No message available, or error code *flow = msghdr.flow_info; return msghdr.msg_size; } -int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) { +int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, + uint32_t blocking) { assert(channel_ctx != NULL); assert(msghdr != NULL); MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)channel_ctx; @@ -664,8 +666,17 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) { // Deque a message from the ring. MachnetRingSlot_t buffer_index; uint32_t n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); - if (n != 1) return 0; // No message available. - + while (n == 0) { + if (!blocking) return 0; + __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); + // n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); + // if (n == 0) { + sem_wait(&ctx->sem); + n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); + // assert(n == 1); + // } + } + assert(n == 1); MachnetMsgBuf_t *buffer; buffer = __machnet_channel_buf(ctx, buffer_index); MachnetFlow_t flow_info = buffer->flow; diff --git a/src/ext/machnet.h b/src/ext/machnet.h index 831e2798..ba2f8ad8 100644 --- a/src/ext/machnet.h +++ b/src/ext/machnet.h @@ -18,6 +18,9 @@ extern "C" { #include "machnet_common.h" +#define BLOCKING 1 +#define NON_BLOCKING 0 + /** * @brief Descriptor for SG data that constitute a message. * @@ -158,12 +161,13 @@ int machnet_sendmmsg(const void *channel_ctx, * @param[out] buf The data buffer to receive the message * @param[in] len The length of \p buf in bytes * @param[out] flow The flow information of the sender + * @param blocking The flag to block and wait for a message to arrive * * @return 0 if no message is available, -1 on failure, otherwise the number of * bytes received. */ ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len, - MachnetFlow_t *flow); + MachnetFlow_t *flow, uint32_t blocking); /** * This function receives a pending message (destined to the application) from @@ -182,7 +186,8 @@ ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len, * @return 0 if no pending message, 1 if a message is * received, -1 on failure */ -int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr); +int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, + uint32_t blocking); #ifdef __cplusplus } diff --git a/src/ext/machnet_common.h b/src/ext/machnet_common.h index fdc3753c..9bed3d42 100644 --- a/src/ext/machnet_common.h +++ b/src/ext/machnet_common.h @@ -46,6 +46,7 @@ extern "C" { #include #include /* For O_* constants */ +#include #include #include /* For mode constants */ @@ -137,6 +138,8 @@ struct MachnetChannelCtx { #define MACHNET_CHANNEL_VERSION 0x01 uint16_t version; uint64_t size; // Size of the Channel's memory, including this context. + sem_t sem; // to implement blocking recv + volatile uint32_t receiver_active; #define MACHNET_CHANNEL_NAME_MAX_LEN 256 char name[MACHNET_CHANNEL_NAME_MAX_LEN]; MachnetChannelCtrlCtx_t ctrl_ctx; // Control channel's specific metadata. diff --git a/src/ext/machnet_private.h b/src/ext/machnet_private.h index 5d6494d2..ef643aad 100644 --- a/src/ext/machnet_private.h +++ b/src/ext/machnet_private.h @@ -154,6 +154,8 @@ static inline int __machnet_channel_dataplane_init( MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)shm; ctx->version = MACHNET_CHANNEL_VERSION; ctx->size = total_size; + sem_init(&ctx->sem, 1, 0); + ctx->receiver_active = 1; strncpy(ctx->name, name, sizeof(ctx->name)); ctx->name[sizeof(ctx->name) - 1] = '\0'; diff --git a/src/ext/machnet_test.cc b/src/ext/machnet_test.cc index 6d7cb672..bd130f1c 100644 --- a/src/ext/machnet_test.cc +++ b/src/ext/machnet_test.cc @@ -270,7 +270,7 @@ TEST(MachnetTest, SimpleSendRecvMsg) { iov.len = recv_data.size(); msghdr.msg_size = 0; msghdr.flow_info = {.src_ip = 0, .dst_ip = 0, .src_port = 0, .dst_port = 0}; - EXPECT_EQ(machnet_recvmsg(g_channel_ctx, &msghdr), 1); + EXPECT_EQ(machnet_recvmsg(g_channel_ctx, &msghdr, NON_BLOCKING), 1); EXPECT_EQ(msghdr.msg_size, orig_data.size()); EXPECT_EQ(recv_data, orig_data); EXPECT_EQ(msghdr.flow_info.src_ip, UINT32_MAX); @@ -310,7 +310,7 @@ TEST(MachnetTest, MultiBufferSendRecvMsg) { prepare_segments(msg_size, 1, &rx_msg_data); prepare_rx_msg(&rx_iov, &rx_msghdr, &rx_msg_data, msg_size); - ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr); + ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Msg size: " << msg_size; EXPECT_EQ(rx_msg_data, tx_msg_data) << "Msg size: " << msg_size; EXPECT_EQ(memcmp(&rx_msghdr.flow_info, &flow, sizeof(flow)), 0); @@ -353,7 +353,7 @@ TEST(MachnetTest, MultiBufferSGSendRecvMsg) { MachnetMsgHdr_t rx_msghdr; prepare_rx_msg(&rx_iov, &rx_msghdr, &rx_segments, msg_size); - ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr); + ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Msg size: " << msg_size; auto flatten_msg = [](std::vector> *segments) { @@ -431,7 +431,7 @@ TEST(MachnetTest, InvalidSendRecv) { MachnetMsgHdr_t rx_msghdr; prepare_rx_msg(&rx_iov, &rx_msghdr, &rx_segments, msg_size / 2); - ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr); + ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, -1) << "Msg size: " << msg_size; EXPECT_TRUE(check_buffer_pool(g_channel_ctx)); } diff --git a/src/include/channel.h b/src/include/channel.h index 0b535325..8d631b06 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -174,8 +174,19 @@ class ShmChannel { */ uint32_t EnqueueMessages(MachnetRingSlot_t *msgbuf_indices, uint32_t nb_msgs) { - return __machnet_channel_machnet_ring_enqueue(ctx_, nb_msgs, - msgbuf_indices); + auto ret = + __machnet_channel_machnet_ring_enqueue(ctx_, nb_msgs, msgbuf_indices); + if (ret != 0) { + if (!__atomic_load_n(&ctx_->receiver_active, __ATOMIC_SEQ_CST)) { + __atomic_store_n(__DECONST(uint32_t *, &ctx_->receiver_active), 1, + __ATOMIC_SEQ_CST); + if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { + fprintf(stderr, "Couldn't notify app side in case of blocking\n"); + } + posted++; + } + } + return ret; } /** @@ -369,6 +380,9 @@ class ShmChannel { return ret; } + [[nodiscard]] uint32_t GetPosted() const { return posted; } + void ResetPosted() { posted = 0; } + private: const std::string name_; const MachnetChannelCtx_t *ctx_; @@ -378,6 +392,7 @@ class ShmChannel { std::array cached_buf_indices; std::array cached_bufs; uint32_t cached_buf_count; + uint32_t posted; }; /** @@ -512,7 +527,7 @@ template