From d22952d0d6f6ade89c1b031d79fcb6cb58b47de4 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Wed, 1 Nov 2023 11:38:03 -0700 Subject: [PATCH 01/33] implement blocking recv Vanilla implementation resulting in 2x latency increase per message --- src/ext/Makefile | 2 +- src/ext/machnet.c | 2 ++ src/ext/machnet_common.h | 2 ++ src/ext/machnet_private.h | 1 + src/include/channel.h | 13 +++++++++---- 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/ext/Makefile b/src/ext/Makefile index 0a24964f..ec15148f 100644 --- a/src/ext/Makefile +++ b/src/ext/Makefile @@ -1,6 +1,6 @@ CC = gcc CFLAGS = -Wall -fPIC -LDFLAGS = -shared +LDFLAGS = -shared -pthread LIBS = -luuid TARGET = libmachnet_shim.so SRCS = machnet.c diff --git a/src/ext/machnet.c b/src/ext/machnet.c index d104c7f5..236b08a3 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -661,6 +662,7 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) { const uint32_t kBufferBatchSize = 16; + sem_wait(&ctx->sem); // Deque a message from the ring. MachnetRingSlot_t buffer_index; uint32_t n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); diff --git a/src/ext/machnet_common.h b/src/ext/machnet_common.h index fdc3753c..c9f2bb40 100644 --- a/src/ext/machnet_common.h +++ b/src/ext/machnet_common.h @@ -46,6 +46,7 @@ extern "C" { #include #include /* For O_* constants */ +#include #include #include /* For mode constants */ @@ -137,6 +138,7 @@ struct MachnetChannelCtx { #define MACHNET_CHANNEL_VERSION 0x01 uint16_t version; uint64_t size; // Size of the Channel's memory, including this context. + sem_t sem; // to implement blocking recv #define MACHNET_CHANNEL_NAME_MAX_LEN 256 char name[MACHNET_CHANNEL_NAME_MAX_LEN]; MachnetChannelCtrlCtx_t ctrl_ctx; // Control channel's specific metadata. diff --git a/src/ext/machnet_private.h b/src/ext/machnet_private.h index 5d6494d2..c7b24213 100644 --- a/src/ext/machnet_private.h +++ b/src/ext/machnet_private.h @@ -154,6 +154,7 @@ static inline int __machnet_channel_dataplane_init( MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)shm; ctx->version = MACHNET_CHANNEL_VERSION; ctx->size = total_size; + sem_init(&ctx->sem, 0, 0); strncpy(ctx->name, name, sizeof(ctx->name)); ctx->name[sizeof(ctx->name) - 1] = '\0'; diff --git a/src/include/channel.h b/src/include/channel.h index 0b535325..2c244fc0 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -174,8 +174,14 @@ class ShmChannel { */ uint32_t EnqueueMessages(MachnetRingSlot_t *msgbuf_indices, uint32_t nb_msgs) { - return __machnet_channel_machnet_ring_enqueue(ctx_, nb_msgs, - msgbuf_indices); + auto ret = + __machnet_channel_machnet_ring_enqueue(ctx_, nb_msgs, msgbuf_indices); + if (ret != 0) { + if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { + fprintf(stderr, "Couldn't notify app side in case of blocking\n"); + } + } + return ret; } /** @@ -330,8 +336,7 @@ class ShmChannel { auto ret = MsgBufBulkFree(batch->buf_indices(), batch->GetSize()); - if (ret == 0) [[unlikely]] - return false; // NOLINT + if (ret == 0) [[unlikely]] return false; // NOLINT batch->Clear(); return true; } From 85166246e6c178591513cb52ce657f12a977f74e Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Sat, 11 Nov 2023 10:53:08 -0500 Subject: [PATCH 02/33] WIP: blocking_recv --- src/core/drivers/shm/channel_bench.cc | 43 ++++++++++++++------------- src/ext/machnet.c | 13 ++++++-- src/ext/machnet_common.h | 1 + src/ext/machnet_private.h | 1 + src/include/channel.h | 7 +++-- 5 files changed, 39 insertions(+), 26 deletions(-) diff --git a/src/core/drivers/shm/channel_bench.cc b/src/core/drivers/shm/channel_bench.cc index ef18884a..0478c2c5 100644 --- a/src/core/drivers/shm/channel_bench.cc +++ b/src/core/drivers/shm/channel_bench.cc @@ -107,29 +107,30 @@ void stack_loop(thread_conf *conf) { } continue; } + for (size_t i = 0; i < 100; i++) { + buf = channel->MsgBufAlloc(); + if (buf == nullptr) { + continue; + } - buf = channel->MsgBufAlloc(); - if (buf == nullptr) { - continue; - } - - CHECK_NOTNULL(buf->append(tx_buffer.size())); - - juggler::utils::Copy(buf->head_data(), tx_buffer.data(), buf->length()); - buf->set_src_ip(kDummyIp); - buf->set_src_port(kDummyPort); - buf->set_dst_ip(kDummyIp); - buf->set_dst_port(kDummyPort); - buf->mark_first(); - buf->mark_last(); - // TODO(ilias): Copy some data. - // Send the message. - ret = channel->EnqueueMessages(&buf, 1); - if (ret != 1) { - LOG(ERROR) << "Couldn't enqueue message. ret: " << ret; - channel->MsgBufFree(buf); + CHECK_NOTNULL(buf->append(tx_buffer.size())); + + juggler::utils::Copy(buf->head_data(), tx_buffer.data(), buf->length()); + buf->set_src_ip(kDummyIp); + buf->set_src_port(kDummyPort); + buf->set_dst_ip(kDummyIp); + buf->set_dst_port(kDummyPort); + buf->mark_first(); + buf->mark_last(); + // TODO(ilias): Copy some data. + // Send the message. + ret = channel->EnqueueMessages(&buf, 1); + if (ret != 1) { + channel->MsgBufFree(buf); + } + conf->messages_sent += ret; } - conf->messages_sent += ret; + fprintf(stderr, "Msgs_sent: %lu\n", conf->messages_sent); } // Calculate the duration in nanoseconds. auto end = std::chrono::high_resolution_clock::now(); diff --git a/src/ext/machnet.c b/src/ext/machnet.c index 236b08a3..d7ccaf32 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -662,12 +662,19 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) { const uint32_t kBufferBatchSize = 16; - sem_wait(&ctx->sem); + // sem_wait(&ctx->sem); // Deque a message from the ring. MachnetRingSlot_t buffer_index; uint32_t n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); - if (n != 1) return 0; // No message available. - + // if (n != 1) return 0; // No message available. + if (n == 0) { + ctx->receiver_active = 0; + fprintf(stderr, "No msgs in ring. sleeping...\n"); + sem_wait(&ctx->sem); + n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); + assert(n == 1); + ctx->receiver_active = 1; + } MachnetMsgBuf_t *buffer; buffer = __machnet_channel_buf(ctx, buffer_index); MachnetFlow_t flow_info = buffer->flow; diff --git a/src/ext/machnet_common.h b/src/ext/machnet_common.h index c9f2bb40..aeafdf25 100644 --- a/src/ext/machnet_common.h +++ b/src/ext/machnet_common.h @@ -139,6 +139,7 @@ struct MachnetChannelCtx { uint16_t version; uint64_t size; // Size of the Channel's memory, including this context. sem_t sem; // to implement blocking recv + uint32_t receiver_active; #define MACHNET_CHANNEL_NAME_MAX_LEN 256 char name[MACHNET_CHANNEL_NAME_MAX_LEN]; MachnetChannelCtrlCtx_t ctrl_ctx; // Control channel's specific metadata. diff --git a/src/ext/machnet_private.h b/src/ext/machnet_private.h index c7b24213..fc0e0686 100644 --- a/src/ext/machnet_private.h +++ b/src/ext/machnet_private.h @@ -155,6 +155,7 @@ static inline int __machnet_channel_dataplane_init( ctx->version = MACHNET_CHANNEL_VERSION; ctx->size = total_size; sem_init(&ctx->sem, 0, 0); + ctx->receiver_active = 1; strncpy(ctx->name, name, sizeof(ctx->name)); ctx->name[sizeof(ctx->name) - 1] = '\0'; diff --git a/src/include/channel.h b/src/include/channel.h index 2c244fc0..5fecc468 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -177,8 +177,11 @@ class ShmChannel { auto ret = __machnet_channel_machnet_ring_enqueue(ctx_, nb_msgs, msgbuf_indices); if (ret != 0) { - if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { - fprintf(stderr, "Couldn't notify app side in case of blocking\n"); + if (!ctx_->receiver_active) { + fprintf(stderr, "Notify app side\n"); + if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { + fprintf(stderr, "Couldn't notify app side in case of blocking\n"); + } } } return ret; From 66c3517c7801c3ca8e852c23760dc6e54793a587 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Sat, 11 Nov 2023 15:15:11 -0500 Subject: [PATCH 03/33] implement blocking recv * Fullly working version * Temporary `posted` variable added to track number of `sem_post` calls; to be removed upon PR --- src/core/drivers/shm/channel.cc | 4 ++- src/core/drivers/shm/channel_bench.cc | 46 ++++++++++++--------------- src/ext/machnet.c | 4 +-- src/ext/machnet_common.h | 2 +- src/include/channel.h | 9 ++++-- 5 files changed, 32 insertions(+), 33 deletions(-) diff --git a/src/core/drivers/shm/channel.cc b/src/core/drivers/shm/channel.cc index 3e9e82e5..c7e14aeb 100644 --- a/src/core/drivers/shm/channel.cc +++ b/src/core/drivers/shm/channel.cc @@ -21,12 +21,14 @@ ShmChannel::ShmChannel(const std::string channel_name, channel_fd_(channel_fd), cached_buf_indices(), cached_bufs(), - cached_buf_count(0) {} + cached_buf_count(0), + posted(0) {} ShmChannel::~ShmChannel() { __machnet_channel_destroy( const_cast(reinterpret_cast(ctx_)), mem_size_, &channel_fd_, is_posix_shm_, name_.c_str()); + LOG(INFO) << "Notified application side " << posted << " times"; } Channel::Channel(const std::string &channel_name, diff --git a/src/core/drivers/shm/channel_bench.cc b/src/core/drivers/shm/channel_bench.cc index 0478c2c5..ab048981 100644 --- a/src/core/drivers/shm/channel_bench.cc +++ b/src/core/drivers/shm/channel_bench.cc @@ -107,30 +107,27 @@ void stack_loop(thread_conf *conf) { } continue; } - for (size_t i = 0; i < 100; i++) { - buf = channel->MsgBufAlloc(); - if (buf == nullptr) { - continue; - } + buf = channel->MsgBufAlloc(); + if (buf == nullptr) { + continue; + } - CHECK_NOTNULL(buf->append(tx_buffer.size())); - - juggler::utils::Copy(buf->head_data(), tx_buffer.data(), buf->length()); - buf->set_src_ip(kDummyIp); - buf->set_src_port(kDummyPort); - buf->set_dst_ip(kDummyIp); - buf->set_dst_port(kDummyPort); - buf->mark_first(); - buf->mark_last(); - // TODO(ilias): Copy some data. - // Send the message. - ret = channel->EnqueueMessages(&buf, 1); - if (ret != 1) { - channel->MsgBufFree(buf); - } - conf->messages_sent += ret; + CHECK_NOTNULL(buf->append(tx_buffer.size())); + + juggler::utils::Copy(buf->head_data(), tx_buffer.data(), buf->length()); + buf->set_src_ip(kDummyIp); + buf->set_src_port(kDummyPort); + buf->set_dst_ip(kDummyIp); + buf->set_dst_port(kDummyPort); + buf->mark_first(); + buf->mark_last(); + // TODO(ilias): Copy some data. + // Send the message. + ret = channel->EnqueueMessages(&buf, 1); + if (ret != 1) { + channel->MsgBufFree(buf); } - fprintf(stderr, "Msgs_sent: %lu\n", conf->messages_sent); + conf->messages_sent += ret; } // Calculate the duration in nanoseconds. auto end = std::chrono::high_resolution_clock::now(); @@ -160,7 +157,6 @@ void application_loop(thread_conf *conf) { while (!g_start.load()) { __asm__ volatile("pause" ::: "memory"); } - // Now start receiving messages. auto start = std::chrono::high_resolution_clock::now(); while (!g_should_stop.load()) { @@ -306,8 +302,8 @@ int main() { const uint64_t kTxMessageSize = 64; std::vector> exp_config_vec; - exp_config_vec.emplace_back(kMessagesToSend, 0); // Stack -> app only - exp_config_vec.emplace_back(0, kMessagesToSend); // App -> stack only + // exp_config_vec.emplace_back(kMessagesToSend, 0); // Stack -> app only + // exp_config_vec.emplace_back(0, kMessagesToSend); // App -> stack only exp_config_vec.emplace_back(kMessagesToSend, kMessagesToSend); // Bi-dir LOG(INFO) << "Running channel_bench"; diff --git a/src/ext/machnet.c b/src/ext/machnet.c index d7ccaf32..ac41cec8 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -668,12 +668,10 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) { uint32_t n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); // if (n != 1) return 0; // No message available. if (n == 0) { - ctx->receiver_active = 0; - fprintf(stderr, "No msgs in ring. sleeping...\n"); + __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); sem_wait(&ctx->sem); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); assert(n == 1); - ctx->receiver_active = 1; } MachnetMsgBuf_t *buffer; buffer = __machnet_channel_buf(ctx, buffer_index); diff --git a/src/ext/machnet_common.h b/src/ext/machnet_common.h index aeafdf25..9bed3d42 100644 --- a/src/ext/machnet_common.h +++ b/src/ext/machnet_common.h @@ -139,7 +139,7 @@ struct MachnetChannelCtx { uint16_t version; uint64_t size; // Size of the Channel's memory, including this context. sem_t sem; // to implement blocking recv - uint32_t receiver_active; + volatile uint32_t receiver_active; #define MACHNET_CHANNEL_NAME_MAX_LEN 256 char name[MACHNET_CHANNEL_NAME_MAX_LEN]; MachnetChannelCtrlCtx_t ctrl_ctx; // Control channel's specific metadata. diff --git a/src/include/channel.h b/src/include/channel.h index 5fecc468..0a4c0fdb 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -177,11 +177,12 @@ class ShmChannel { auto ret = __machnet_channel_machnet_ring_enqueue(ctx_, nb_msgs, msgbuf_indices); if (ret != 0) { - if (!ctx_->receiver_active) { - fprintf(stderr, "Notify app side\n"); + if (!__atomic_load_n(&ctx_->receiver_active, __ATOMIC_SEQ_CST)) { + *__DECONST(uint32_t *, &ctx_->receiver_active) = 1; if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { fprintf(stderr, "Couldn't notify app side in case of blocking\n"); } + posted++; } } return ret; @@ -339,7 +340,8 @@ class ShmChannel { auto ret = MsgBufBulkFree(batch->buf_indices(), batch->GetSize()); - if (ret == 0) [[unlikely]] return false; // NOLINT + if (ret == 0) [[unlikely]] + return false; // NOLINT batch->Clear(); return true; } @@ -386,6 +388,7 @@ class ShmChannel { std::array cached_buf_indices; std::array cached_bufs; uint32_t cached_buf_count; + uint32_t posted; }; /** From 36d4400dbfdea7a65f0bc70b78c5452ddc79301b Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Sat, 11 Nov 2023 16:24:57 -0500 Subject: [PATCH 04/33] change machnet_recv API Added `blocking` argument to recv related calls, when set(1) calling thread will block after a message available to receive --- src/apps/msg_gen/main.cc | 4 ++-- src/core/drivers/shm/channel.cc | 1 - src/core/drivers/shm/channel_bench.cc | 26 ++++++++++++++++++-------- src/core/drivers/shm/channel_test.cc | 4 ++-- src/core/flow_test.cc | 8 +++++--- src/ext/machnet.c | 8 +++++--- src/ext/machnet.h | 9 +++++++-- src/ext/machnet_test.cc | 8 ++++---- src/include/channel.h | 3 +++ 9 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index d2f1b809..51a106e5 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -171,7 +171,7 @@ void ServerLoop(void *channel_ctx) { MachnetFlow_t rx_flow; const ssize_t rx_size = machnet_recv(channel_ctx, thread_ctx.rx_message.data(), - thread_ctx.rx_message.size(), &rx_flow); + thread_ctx.rx_message.size(), &rx_flow, NON_BLOCKING); if (rx_size <= 0) continue; stats_cur.rx_count++; stats_cur.rx_bytes += rx_size; @@ -247,7 +247,7 @@ uint64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { MachnetFlow_t rx_flow; const ssize_t rx_size = machnet_recv(channel_ctx, thread_ctx->rx_message.data(), - thread_ctx->rx_message.size(), &rx_flow); + thread_ctx->rx_message.size(), &rx_flow, NON_BLOCKING); if (rx_size <= 0) continue; thread_ctx->stats.current.rx_count++; diff --git a/src/core/drivers/shm/channel.cc b/src/core/drivers/shm/channel.cc index c7e14aeb..2c6d08bc 100644 --- a/src/core/drivers/shm/channel.cc +++ b/src/core/drivers/shm/channel.cc @@ -28,7 +28,6 @@ ShmChannel::~ShmChannel() { __machnet_channel_destroy( const_cast(reinterpret_cast(ctx_)), mem_size_, &channel_fd_, is_posix_shm_, name_.c_str()); - LOG(INFO) << "Notified application side " << posted << " times"; } Channel::Channel(const std::string &channel_name, diff --git a/src/core/drivers/shm/channel_bench.cc b/src/core/drivers/shm/channel_bench.cc index ab048981..6b0407e5 100644 --- a/src/core/drivers/shm/channel_bench.cc +++ b/src/core/drivers/shm/channel_bench.cc @@ -4,13 +4,15 @@ #include #include #include -#include -#include #include #include +#include #include #include +#include + +DEFINE_uint32(blocking, 0, "Block on receive"); static constexpr uint8_t kStackCpuCoreId = 3; static constexpr uint8_t kAppCpuCoreId = 5; @@ -39,7 +41,7 @@ struct thread_conf { thread_conf(std::shared_ptr ch, uint8_t core_id, uint64_t messages_to_send, uint64_t tx_message_size, uint64_t messages_to_receive) - : channel(ch), + : channel(std::move(ch)), cpu_core(core_id), messages_to_send(messages_to_send), tx_message_size(tx_message_size), @@ -163,8 +165,8 @@ void application_loop(thread_conf *conf) { // RX. MachnetFlow_t flow; - auto nbytes = - machnet_recv(channel->ctx(), rx_buffer.data(), rx_buffer.size(), &flow); + auto nbytes = machnet_recv(channel->ctx(), rx_buffer.data(), + rx_buffer.size(), &flow, FLAGS_blocking); if (nbytes > 0) { conf->messages_received++; CHECK_EQ(nbytes, conf->tx_message_size); @@ -281,11 +283,18 @@ void print_results(const thread_conf &stack_conf, const thread_conf &app_conf) { 1e9)) : 0.0) << std::endl; + if (FLAGS_blocking) { + std::cout << juggler::utils::Format( + "Stack notified Application side %d times", channel->GetPosted()); + channel->ResetPosted(); + } std::cout << std::endl; } -int main() { +int main(int argc, char *argv[]) { google::InitGoogleLogging("channel_bench"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + FLAGS_logtostderr = 1; signal(SIGINT, [](int) { g_should_stop.store(true); }); @@ -302,8 +311,9 @@ int main() { const uint64_t kTxMessageSize = 64; std::vector> exp_config_vec; - // exp_config_vec.emplace_back(kMessagesToSend, 0); // Stack -> app only - // exp_config_vec.emplace_back(0, kMessagesToSend); // App -> stack only + exp_config_vec.emplace_back(kMessagesToSend, 0); // Stack -> app only + if (!FLAGS_blocking) + exp_config_vec.emplace_back(0, kMessagesToSend); // App -> stack only exp_config_vec.emplace_back(kMessagesToSend, kMessagesToSend); // Bi-dir LOG(INFO) << "Running channel_bench"; diff --git a/src/core/drivers/shm/channel_test.cc b/src/core/drivers/shm/channel_test.cc index 6ca481a2..0b73504d 100644 --- a/src/core/drivers/shm/channel_test.cc +++ b/src/core/drivers/shm/channel_test.cc @@ -212,7 +212,7 @@ TEST(BasicChannelTest, ChannelEnqueue) { rx_msghdr.msg_iov = &rx_iov; rx_msghdr.msg_iovlen = 1; - EXPECT_EQ(machnet_recvmsg(channel->ctx(), &rx_msghdr), 1); + EXPECT_EQ(machnet_recvmsg(channel->ctx(), &rx_msghdr, NON_BLOCKING), 1); EXPECT_EQ(rx_msghdr.msg_size, kMessageSize); EXPECT_EQ(rx_msg, tx_msg); } @@ -396,7 +396,7 @@ TEST(ChannelFullDuplex, SendRecvMsg) { msghdr.flags = 0; msghdr.flow_info = { .src_ip = 0, .dst_ip = 0, .src_port = 0, .dst_port = 0}; - auto ret = machnet_recvmsg(ctx, &msghdr); + auto ret = machnet_recvmsg(ctx, &msghdr, NON_BLOCKING); if (ret == 1) msg_rx++; // If already sent the amount of messages needed skip. diff --git a/src/core/flow_test.cc b/src/core/flow_test.cc index 3ce77169..6a88e00e 100644 --- a/src/core/flow_test.cc +++ b/src/core/flow_test.cc @@ -124,6 +124,8 @@ class FlowTest : public ::testing::Test { num_msgbufs -= msgbuf_nr; batch.Clear(); } + CHECK_NOTNULL(head); + CHECK_NOTNULL(tail); head->set_msg_length(data.size()); head->set_last(tail->index()); head->mark_first(); @@ -317,7 +319,7 @@ TEST_F(FlowTest, RXQueue_Push) { rx_msghdr.flow_info = {0, 0, 0, 0}; rx_msghdr.msg_iov = &rx_iov; rx_msghdr.msg_iovlen = 1; - auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr); + auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Failed to deliver message to application"; EXPECT_EQ(tx_message, rx_message); EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount()); @@ -397,7 +399,7 @@ TEST_F(FlowTest, RXQueue_Push_OutOfOrder1) { rx_msghdr.flow_info = {0, 0, 0, 0}; rx_msghdr.msg_iov = &rx_iov; rx_msghdr.msg_iovlen = 1; - auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr); + auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Failed to deliver message to application"; EXPECT_EQ(tx_message, rx_message); EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount()); @@ -496,7 +498,7 @@ TEST_F(FlowTest, RXQueue_Push_OutOfOrder2) { rx_msghdr.flow_info = {0, 0, 0, 0}; rx_msghdr.msg_iov = &rx_iov; rx_msghdr.msg_iovlen = 1; - auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr); + auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Failed to deliver message to application"; EXPECT_EQ(tx_message, rx_message); EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount()); diff --git a/src/ext/machnet.c b/src/ext/machnet.c index ac41cec8..e511b119 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -640,7 +640,7 @@ int machnet_sendmmsg(const void *channel_ctx, } ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len, - MachnetFlow_t *flow) { + MachnetFlow_t *flow, uint32_t blocking) { MachnetMsgHdr_t msghdr; MachnetIovec_t iov; iov.base = buf; @@ -648,14 +648,15 @@ ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len, msghdr.msg_iov = &iov; msghdr.msg_iovlen = 1; - const int ret = machnet_recvmsg(channel_ctx, &msghdr); + const int ret = machnet_recvmsg(channel_ctx, &msghdr, blocking); if (ret <= 0) return ret; // No message available, or error code *flow = msghdr.flow_info; return msghdr.msg_size; } -int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) { +int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, + uint32_t blocking) { assert(channel_ctx != NULL); assert(msghdr != NULL); MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)channel_ctx; @@ -668,6 +669,7 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) { uint32_t n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); // if (n != 1) return 0; // No message available. if (n == 0) { + if (!blocking) return 0; __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); sem_wait(&ctx->sem); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); diff --git a/src/ext/machnet.h b/src/ext/machnet.h index 831e2798..ba2f8ad8 100644 --- a/src/ext/machnet.h +++ b/src/ext/machnet.h @@ -18,6 +18,9 @@ extern "C" { #include "machnet_common.h" +#define BLOCKING 1 +#define NON_BLOCKING 0 + /** * @brief Descriptor for SG data that constitute a message. * @@ -158,12 +161,13 @@ int machnet_sendmmsg(const void *channel_ctx, * @param[out] buf The data buffer to receive the message * @param[in] len The length of \p buf in bytes * @param[out] flow The flow information of the sender + * @param blocking The flag to block and wait for a message to arrive * * @return 0 if no message is available, -1 on failure, otherwise the number of * bytes received. */ ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len, - MachnetFlow_t *flow); + MachnetFlow_t *flow, uint32_t blocking); /** * This function receives a pending message (destined to the application) from @@ -182,7 +186,8 @@ ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len, * @return 0 if no pending message, 1 if a message is * received, -1 on failure */ -int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr); +int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, + uint32_t blocking); #ifdef __cplusplus } diff --git a/src/ext/machnet_test.cc b/src/ext/machnet_test.cc index 6d7cb672..bd130f1c 100644 --- a/src/ext/machnet_test.cc +++ b/src/ext/machnet_test.cc @@ -270,7 +270,7 @@ TEST(MachnetTest, SimpleSendRecvMsg) { iov.len = recv_data.size(); msghdr.msg_size = 0; msghdr.flow_info = {.src_ip = 0, .dst_ip = 0, .src_port = 0, .dst_port = 0}; - EXPECT_EQ(machnet_recvmsg(g_channel_ctx, &msghdr), 1); + EXPECT_EQ(machnet_recvmsg(g_channel_ctx, &msghdr, NON_BLOCKING), 1); EXPECT_EQ(msghdr.msg_size, orig_data.size()); EXPECT_EQ(recv_data, orig_data); EXPECT_EQ(msghdr.flow_info.src_ip, UINT32_MAX); @@ -310,7 +310,7 @@ TEST(MachnetTest, MultiBufferSendRecvMsg) { prepare_segments(msg_size, 1, &rx_msg_data); prepare_rx_msg(&rx_iov, &rx_msghdr, &rx_msg_data, msg_size); - ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr); + ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Msg size: " << msg_size; EXPECT_EQ(rx_msg_data, tx_msg_data) << "Msg size: " << msg_size; EXPECT_EQ(memcmp(&rx_msghdr.flow_info, &flow, sizeof(flow)), 0); @@ -353,7 +353,7 @@ TEST(MachnetTest, MultiBufferSGSendRecvMsg) { MachnetMsgHdr_t rx_msghdr; prepare_rx_msg(&rx_iov, &rx_msghdr, &rx_segments, msg_size); - ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr); + ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, 1) << "Msg size: " << msg_size; auto flatten_msg = [](std::vector> *segments) { @@ -431,7 +431,7 @@ TEST(MachnetTest, InvalidSendRecv) { MachnetMsgHdr_t rx_msghdr; prepare_rx_msg(&rx_iov, &rx_msghdr, &rx_segments, msg_size / 2); - ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr); + ret = machnet_recvmsg(g_channel_ctx, &rx_msghdr, NON_BLOCKING); EXPECT_EQ(ret, -1) << "Msg size: " << msg_size; EXPECT_TRUE(check_buffer_pool(g_channel_ctx)); } diff --git a/src/include/channel.h b/src/include/channel.h index 0a4c0fdb..df5b467d 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -379,6 +379,9 @@ class ShmChannel { return ret; } + [[nodiscard]] uint32_t GetPosted() const { return posted; } + void ResetPosted() { posted = 0; } + private: const std::string name_; const MachnetChannelCtx_t *ctx_; From d4e8d1b04eb768ff1f114cb6d7148ce81cad8c8a Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Sat, 11 Nov 2023 16:32:53 -0500 Subject: [PATCH 05/33] remove unnecessary code parts --- src/ext/machnet.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ext/machnet.c b/src/ext/machnet.c index e511b119..fe327c01 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -663,11 +663,9 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, const uint32_t kBufferBatchSize = 16; - // sem_wait(&ctx->sem); // Deque a message from the ring. MachnetRingSlot_t buffer_index; uint32_t n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); - // if (n != 1) return 0; // No message available. if (n == 0) { if (!blocking) return 0; __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); From d1d79292b70972c43b38f29d5c5f87afada19fd9 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 17:23:10 -0500 Subject: [PATCH 06/33] make operation atomic --- src/include/channel.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/include/channel.h b/src/include/channel.h index df5b467d..54c7acd7 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -178,7 +178,8 @@ class ShmChannel { __machnet_channel_machnet_ring_enqueue(ctx_, nb_msgs, msgbuf_indices); if (ret != 0) { if (!__atomic_load_n(&ctx_->receiver_active, __ATOMIC_SEQ_CST)) { - *__DECONST(uint32_t *, &ctx_->receiver_active) = 1; + __atomic_store_n(__DECONST(uint32_t *, &ctx_->receiver_active), 1, + __ATOMIC_SEQ_CST); if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { fprintf(stderr, "Couldn't notify app side in case of blocking\n"); } From c139ea9770eec5dff8325ca8747e7c8603d8aec1 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 17:23:27 -0500 Subject: [PATCH 07/33] msg_gen blocking recv support --- src/apps/msg_gen/main.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 51a106e5..127b7d9b 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -32,6 +32,7 @@ DEFINE_bool(active_generator, false, "When 'true' this host is generating the traffic, otherwise it is " "bouncing."); DEFINE_bool(verify, false, "Verify payload of received messages."); +DEFINE_uint32(blocking, 0, "Blocking receive"); static volatile int g_keep_running = 1; @@ -171,7 +172,7 @@ void ServerLoop(void *channel_ctx) { MachnetFlow_t rx_flow; const ssize_t rx_size = machnet_recv(channel_ctx, thread_ctx.rx_message.data(), - thread_ctx.rx_message.size(), &rx_flow, NON_BLOCKING); + thread_ctx.rx_message.size(), &rx_flow, FLAGS_blocking); if (rx_size <= 0) continue; stats_cur.rx_count++; stats_cur.rx_bytes += rx_size; @@ -247,7 +248,7 @@ uint64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { MachnetFlow_t rx_flow; const ssize_t rx_size = machnet_recv(channel_ctx, thread_ctx->rx_message.data(), - thread_ctx->rx_message.size(), &rx_flow, NON_BLOCKING); + thread_ctx->rx_message.size(), &rx_flow, FLAGS_blocking); if (rx_size <= 0) continue; thread_ctx->stats.current.rx_count++; From ec29287e25e7e3da747416b6a54dd1585b7327f0 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 17:45:16 -0500 Subject: [PATCH 08/33] replace if by while --- src/ext/machnet.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ext/machnet.c b/src/ext/machnet.c index fe327c01..91a6f4d2 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -666,7 +666,7 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, // Deque a message from the ring. MachnetRingSlot_t buffer_index; uint32_t n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); - if (n == 0) { + while (n == 0) { if (!blocking) return 0; __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); sem_wait(&ctx->sem); From e4da3cf4ed56359fe7d9bc127aa19d6516721f00 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 18:01:24 -0500 Subject: [PATCH 09/33] test after setting inactive --- src/ext/machnet.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/ext/machnet.c b/src/ext/machnet.c index 91a6f4d2..cbffe012 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -669,9 +669,12 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, while (n == 0) { if (!blocking) return 0; __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); - sem_wait(&ctx->sem); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); - assert(n == 1); + if (n == 0) { + sem_wait(&ctx->sem); + n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); + assert(n == 1); + } } MachnetMsgBuf_t *buffer; buffer = __machnet_channel_buf(ctx, buffer_index); From 581bec6d8c26e0c05b8ee17b78ae1771c6bf1628 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 18:07:31 -0500 Subject: [PATCH 10/33] adding log message --- src/ext/machnet.c | 1 + src/include/channel.h | 1 + 2 files changed, 2 insertions(+) diff --git a/src/ext/machnet.c b/src/ext/machnet.c index cbffe012..334146f8 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -671,6 +671,7 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); if (n == 0) { + fprintf(stderr, "recv: machnet_ring empty, going to sleep....\n"); sem_wait(&ctx->sem); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); assert(n == 1); diff --git a/src/include/channel.h b/src/include/channel.h index 54c7acd7..70a2a4f8 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -183,6 +183,7 @@ class ShmChannel { if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { fprintf(stderr, "Couldn't notify app side in case of blocking\n"); } + fprintf(stderr, "enqueue: notified app side ...\n"); posted++; } } From 54452e02308616490c4f91fea94a43567545e91d Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 18:22:53 -0500 Subject: [PATCH 11/33] init sem 1 --- src/ext/machnet_private.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ext/machnet_private.h b/src/ext/machnet_private.h index fc0e0686..2090009f 100644 --- a/src/ext/machnet_private.h +++ b/src/ext/machnet_private.h @@ -154,7 +154,7 @@ static inline int __machnet_channel_dataplane_init( MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)shm; ctx->version = MACHNET_CHANNEL_VERSION; ctx->size = total_size; - sem_init(&ctx->sem, 0, 0); + sem_init(&ctx->sem, 0, 1); ctx->receiver_active = 1; strncpy(ctx->name, name, sizeof(ctx->name)); ctx->name[sizeof(ctx->name) - 1] = '\0'; From 56e70c0e450c7d9a146db6bdb2a1189df33cb2ac Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 18:32:30 -0500 Subject: [PATCH 12/33] debug --- src/ext/machnet_private.h | 2 +- src/include/channel.h | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ext/machnet_private.h b/src/ext/machnet_private.h index 2090009f..fc0e0686 100644 --- a/src/ext/machnet_private.h +++ b/src/ext/machnet_private.h @@ -154,7 +154,7 @@ static inline int __machnet_channel_dataplane_init( MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)shm; ctx->version = MACHNET_CHANNEL_VERSION; ctx->size = total_size; - sem_init(&ctx->sem, 0, 1); + sem_init(&ctx->sem, 0, 0); ctx->receiver_active = 1; strncpy(ctx->name, name, sizeof(ctx->name)); ctx->name[sizeof(ctx->name) - 1] = '\0'; diff --git a/src/include/channel.h b/src/include/channel.h index 70a2a4f8..09fd94d2 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -180,10 +180,15 @@ class ShmChannel { if (!__atomic_load_n(&ctx_->receiver_active, __ATOMIC_SEQ_CST)) { __atomic_store_n(__DECONST(uint32_t *, &ctx_->receiver_active), 1, __ATOMIC_SEQ_CST); + int value; + sem_getvalue(__DECONST(sem_t *, &ctx_->sem), &value); + fprintf(stderr, "semaphore's prev value %d\n", value); if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { fprintf(stderr, "Couldn't notify app side in case of blocking\n"); } fprintf(stderr, "enqueue: notified app side ...\n"); + sem_getvalue(__DECONST(sem_t *, &ctx_->sem), &value); + fprintf(stderr, "semaphore's next value %d\n", value); posted++; } } From 85c4c0b589206511bf117d7c7f33b3ebb93dc813 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 18:34:54 -0500 Subject: [PATCH 13/33] add debug for sem value --- src/ext/machnet.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ext/machnet.c b/src/ext/machnet.c index 334146f8..a21b8a1e 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -671,6 +671,9 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); if (n == 0) { + int value; + sem_getvalue(&ctx->sem, &value); + fprintf(stderr, "semaphore's value %d\n", value); fprintf(stderr, "recv: machnet_ring empty, going to sleep....\n"); sem_wait(&ctx->sem); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); From 597963dab02744bad562a86666a3d662ba34d91d Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 18:45:42 -0500 Subject: [PATCH 14/33] debug --- src/ext/machnet.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ext/machnet.c b/src/ext/machnet.c index a21b8a1e..c45890bc 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -676,6 +676,7 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, fprintf(stderr, "semaphore's value %d\n", value); fprintf(stderr, "recv: machnet_ring empty, going to sleep....\n"); sem_wait(&ctx->sem); + fprintf(stderr, "recv: woken up ...\n"); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); assert(n == 1); } From 98556bde5b93f91871d5216a64a87e6f7f09d676 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 18:52:16 -0500 Subject: [PATCH 15/33] make sem shared --- src/ext/machnet_private.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ext/machnet_private.h b/src/ext/machnet_private.h index fc0e0686..ef643aad 100644 --- a/src/ext/machnet_private.h +++ b/src/ext/machnet_private.h @@ -154,7 +154,7 @@ static inline int __machnet_channel_dataplane_init( MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)shm; ctx->version = MACHNET_CHANNEL_VERSION; ctx->size = total_size; - sem_init(&ctx->sem, 0, 0); + sem_init(&ctx->sem, 1, 0); ctx->receiver_active = 1; strncpy(ctx->name, name, sizeof(ctx->name)); ctx->name[sizeof(ctx->name) - 1] = '\0'; From 05b3e6816ac0dfe4476ef4f0b17193991596d314 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Mon, 13 Nov 2023 18:59:29 -0500 Subject: [PATCH 16/33] make sem shared --- src/ext/machnet.c | 16 ++++++---------- src/include/channel.h | 6 ------ 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/ext/machnet.c b/src/ext/machnet.c index c45890bc..3e706434 100644 --- a/src/ext/machnet.c +++ b/src/ext/machnet.c @@ -669,18 +669,14 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr, while (n == 0) { if (!blocking) return 0; __atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST); + // n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); + // if (n == 0) { + sem_wait(&ctx->sem); n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); - if (n == 0) { - int value; - sem_getvalue(&ctx->sem, &value); - fprintf(stderr, "semaphore's value %d\n", value); - fprintf(stderr, "recv: machnet_ring empty, going to sleep....\n"); - sem_wait(&ctx->sem); - fprintf(stderr, "recv: woken up ...\n"); - n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index); - assert(n == 1); - } + // assert(n == 1); + // } } + assert(n == 1); MachnetMsgBuf_t *buffer; buffer = __machnet_channel_buf(ctx, buffer_index); MachnetFlow_t flow_info = buffer->flow; diff --git a/src/include/channel.h b/src/include/channel.h index 09fd94d2..54c7acd7 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -180,15 +180,9 @@ class ShmChannel { if (!__atomic_load_n(&ctx_->receiver_active, __ATOMIC_SEQ_CST)) { __atomic_store_n(__DECONST(uint32_t *, &ctx_->receiver_active), 1, __ATOMIC_SEQ_CST); - int value; - sem_getvalue(__DECONST(sem_t *, &ctx_->sem), &value); - fprintf(stderr, "semaphore's prev value %d\n", value); if (sem_post(__DECONST(sem_t *, &ctx_->sem)) < 0) { fprintf(stderr, "Couldn't notify app side in case of blocking\n"); } - fprintf(stderr, "enqueue: notified app side ...\n"); - sem_getvalue(__DECONST(sem_t *, &ctx_->sem), &value); - fprintf(stderr, "semaphore's next value %d\n", value); posted++; } } From 10339c075052389f689191f5b7ea3fc7c7325140 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 21:53:53 -0500 Subject: [PATCH 17/33] make msg_gen open_loop --- src/apps/msg_gen/main.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 127b7d9b..74d93047 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -236,7 +236,7 @@ void ClientSendOne(ThreadCtx *thread_ctx, uint64_t window_slot) { } // Return the window slot for which a response was received -uint64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { +int64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { const auto *channel_ctx = thread_ctx->channel_ctx; while (true) { @@ -249,7 +249,7 @@ uint64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { const ssize_t rx_size = machnet_recv(channel_ctx, thread_ctx->rx_message.data(), thread_ctx->rx_message.size(), &rx_flow, FLAGS_blocking); - if (rx_size <= 0) continue; + if (rx_size <= 0) return -1; thread_ctx->stats.current.rx_count++; thread_ctx->stats.current.rx_bytes += rx_size; @@ -300,7 +300,11 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { break; } - const uint64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); + int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); + if (rx_window_slot < 0) { + rx_window_slot = ++FLAGS_msg_window; + thread_ctx.msg_latency_info_vec.resize(rx_window_slot); + } ClientSendOne(&thread_ctx, rx_window_slot); ReportStats(&thread_ctx); From 27d4dbe0c0e97f38fd10e6a0c1f78cb50208090c Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 22:02:17 -0500 Subject: [PATCH 18/33] add log when increasing window size --- src/apps/msg_gen/main.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 74d93047..fff5303f 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -303,6 +303,8 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot < 0) { rx_window_slot = ++FLAGS_msg_window; + LOG(INFO) << "Server busy ... increasing window size to " + << rx_window_slot; thread_ctx.msg_latency_info_vec.resize(rx_window_slot); } ClientSendOne(&thread_ctx, rx_window_slot); From cb47c5af14db4f85694c7d4f1a28930e65104392 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 22:11:03 -0500 Subject: [PATCH 19/33] add sleep before increasing window_size --- src/apps/msg_gen/main.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index fff5303f..39a66674 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -302,6 +302,7 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot < 0) { + std::this_thread::sleep_for(std::chrono::microseconds(1)); rx_window_slot = ++FLAGS_msg_window; LOG(INFO) << "Server busy ... increasing window size to " << rx_window_slot; From 27123daab7b3390ee2dc449772bf1c8058819a4f Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 22:13:37 -0500 Subject: [PATCH 20/33] fix window size check --- src/apps/msg_gen/main.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 39a66674..6bd8226f 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -256,7 +256,7 @@ int64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { const auto *msg_hdr = reinterpret_cast(thread_ctx->rx_message.data()); - if (msg_hdr->window_slot >= FLAGS_msg_window) { + if (msg_hdr->window_slot > FLAGS_msg_window) { LOG(ERROR) << "Received invalid window slot: " << msg_hdr->window_slot; continue; } From d30f6208c28ba68059fda0f8951bb5ad81b3ec37 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 22:15:06 -0500 Subject: [PATCH 21/33] increase sleep duration --- src/apps/msg_gen/main.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 6bd8226f..f7c752c3 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -302,7 +302,7 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot < 0) { - std::this_thread::sleep_for(std::chrono::microseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(500)); rx_window_slot = ++FLAGS_msg_window; LOG(INFO) << "Server busy ... increasing window size to " << rx_window_slot; From 3aa3cbe3384f9c41b88df12985deada4e6551569 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 22:18:39 -0500 Subject: [PATCH 22/33] rm logging --- src/apps/msg_gen/main.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index f7c752c3..5eeafa82 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -304,8 +304,8 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { if (rx_window_slot < 0) { std::this_thread::sleep_for(std::chrono::microseconds(500)); rx_window_slot = ++FLAGS_msg_window; - LOG(INFO) << "Server busy ... increasing window size to " - << rx_window_slot; + // LOG(INFO) << "Server busy ... increasing window size to " + // << rx_window_slot; thread_ctx.msg_latency_info_vec.resize(rx_window_slot); } ClientSendOne(&thread_ctx, rx_window_slot); From 2aac9f69235190185fc245d84e0edda5d882150f Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 22:19:44 -0500 Subject: [PATCH 23/33] set sleep --- src/apps/msg_gen/main.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 5eeafa82..a6a5bccf 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -302,7 +302,7 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot < 0) { - std::this_thread::sleep_for(std::chrono::microseconds(500)); + std::this_thread::sleep_for(std::chrono::microseconds(150)); rx_window_slot = ++FLAGS_msg_window; // LOG(INFO) << "Server busy ... increasing window size to " // << rx_window_slot; From 5c3f6efbef645c85eab6e9430c608ba164a472bd Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 22:47:09 -0500 Subject: [PATCH 24/33] replace sleep by busy poll --- src/apps/msg_gen/main.cc | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index a6a5bccf..affea172 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -302,11 +302,21 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot < 0) { - std::this_thread::sleep_for(std::chrono::microseconds(150)); - rx_window_slot = ++FLAGS_msg_window; + auto next = + std::chrono::steady_clock::now() + std::chrono::milliseconds(150); + while (true) { + rx_window_slot = ClientRecvOneBlocking(&thread_ctx); + if (rx_window_slot > 0) break; + if (std::chrono::steady_clock::now() > next) { + rx_window_slot = ++FLAGS_msg_window; + thread_ctx.msg_latency_info_vec.resize(rx_window_slot); + break; + } + } + // std::this_thread::sleep_for(std::chrono::microseconds(150)); + // LOG(INFO) << "Server busy ... increasing window size to " // << rx_window_slot; - thread_ctx.msg_latency_info_vec.resize(rx_window_slot); } ClientSendOne(&thread_ctx, rx_window_slot); From 13cc7cfb1a5ef7e742d18b8a2708537681e8ffcb Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Tue, 14 Nov 2023 22:51:30 -0500 Subject: [PATCH 25/33] send next window in 70ms --- src/apps/msg_gen/main.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index affea172..a8989219 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -303,7 +303,7 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot < 0) { auto next = - std::chrono::steady_clock::now() + std::chrono::milliseconds(150); + std::chrono::steady_clock::now() + std::chrono::milliseconds(70); while (true) { rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot > 0) break; From 6e0edadcc8ad9a245c1fd059324cf9be6f410c77 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Wed, 15 Nov 2023 12:25:03 -0500 Subject: [PATCH 26/33] add load flag Default to 100k RPS --- src/apps/msg_gen/main.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index a8989219..5f041a78 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -33,6 +33,7 @@ DEFINE_bool(active_generator, false, "bouncing."); DEFINE_bool(verify, false, "Verify payload of received messages."); DEFINE_uint32(blocking, 0, "Blocking receive"); +DEFINE_uint32(load, 100, "kRPS load"); static volatile int g_keep_running = 1; @@ -103,6 +104,7 @@ class ThreadCtx { hdr_histogram *latency_hist; size_t num_request_latency_samples; std::vector msg_latency_info_vec; + std::chrono::milliseconds time_limit; struct { stats_t current; @@ -287,6 +289,8 @@ int64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { void ClientLoop(void *channel_ctx, MachnetFlow *flow) { ThreadCtx thread_ctx(channel_ctx, flow); + thread_ctx.time_limit = std::chrono::duration_cast( + std::chrono::seconds(1) / (FLAGS_load * 1000)); LOG(INFO) << "Client Loop: Starting."; // Send a full window of messages @@ -302,8 +306,7 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot < 0) { - auto next = - std::chrono::steady_clock::now() + std::chrono::milliseconds(70); + auto next = std::chrono::steady_clock::now() + thread_ctx.time_limit; while (true) { rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot > 0) break; From 14b5aad7507dc2e40a8d01ba6a6258ee4a80c57b Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Wed, 15 Nov 2023 12:33:09 -0500 Subject: [PATCH 27/33] add logging --- src/apps/msg_gen/main.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 5f041a78..8a942217 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -292,6 +292,7 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { thread_ctx.time_limit = std::chrono::duration_cast( std::chrono::seconds(1) / (FLAGS_load * 1000)); LOG(INFO) << "Client Loop: Starting."; + LOG(INFO) << "Time limit is: " << thread_ctx.time_limit.count(); // Send a full window of messages for (uint32_t i = 0; i < FLAGS_msg_window; i++) { From e5336752a77813e912130bac01378816531b39c7 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Wed, 15 Nov 2023 13:07:28 -0500 Subject: [PATCH 28/33] improve rate limiting --- src/apps/msg_gen/main.cc | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 8a942217..e8a0ad38 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -299,6 +300,8 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { ClientSendOne(&thread_ctx, i /* window slot */); } + auto next = std::chrono::steady_clock::now() + thread_ctx.time_limit; + std::deque backlog; while (true) { if (g_keep_running == 0) { LOG(INFO) << "MsgGenLoop: Exiting."; @@ -307,23 +310,27 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot < 0) { - auto next = std::chrono::steady_clock::now() + thread_ctx.time_limit; while (true) { rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot > 0) break; if (std::chrono::steady_clock::now() > next) { - rx_window_slot = ++FLAGS_msg_window; - thread_ctx.msg_latency_info_vec.resize(rx_window_slot); - break; + // timeout but no message received yet --> increase window + auto next_window = ++FLAGS_msg_window; + thread_ctx.msg_latency_info_vec.resize(next_window); + backlog.push_back(next_window); + ClientSendOne(&thread_ctx, backlog.front()); + backlog.pop_front(); + next = std::chrono::steady_clock::now() + thread_ctx.time_limit; } } - // std::this_thread::sleep_for(std::chrono::microseconds(150)); - - // LOG(INFO) << "Server busy ... increasing window size to " - // << rx_window_slot; } - ClientSendOne(&thread_ctx, rx_window_slot); - + // msg received, if time limit passed send next msg from backlog + if (std::chrono::steady_clock::now() > next) { + ClientSendOne(&thread_ctx, backlog.front()); + backlog.pop_front(); + next = std::chrono::steady_clock::now() + thread_ctx.time_limit; + } + backlog.push_back(rx_window_slot); ReportStats(&thread_ctx); } From f10a9677084bc5827246cc811772ffb825472dd8 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Wed, 15 Nov 2023 13:15:51 -0500 Subject: [PATCH 29/33] change measure to microseconds --- src/apps/msg_gen/main.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index e8a0ad38..59c26106 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -105,7 +105,7 @@ class ThreadCtx { hdr_histogram *latency_hist; size_t num_request_latency_samples; std::vector msg_latency_info_vec; - std::chrono::milliseconds time_limit; + std::chrono::microseconds time_limit; struct { stats_t current; @@ -290,8 +290,8 @@ int64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { void ClientLoop(void *channel_ctx, MachnetFlow *flow) { ThreadCtx thread_ctx(channel_ctx, flow); - thread_ctx.time_limit = std::chrono::duration_cast( - std::chrono::seconds(1) / (FLAGS_load * 1000)); + thread_ctx.time_limit = std::chrono::duration_cast( + std::chrono::microseconds(1000) / (FLAGS_load)); LOG(INFO) << "Client Loop: Starting."; LOG(INFO) << "Time limit is: " << thread_ctx.time_limit.count(); From bcaf0950eb4c5b4ac2aec63b09d89217a4418eef Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Wed, 15 Nov 2023 14:12:49 -0500 Subject: [PATCH 30/33] handle correct exit --- src/apps/msg_gen/main.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 59c26106..d25f8e98 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -322,6 +322,7 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { backlog.pop_front(); next = std::chrono::steady_clock::now() + thread_ctx.time_limit; } + if (g_keep_running == 0) continue; } } // msg received, if time limit passed send next msg from backlog From ebe93f93aa94028ed55a6179450707b7d2f77aa1 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Wed, 15 Nov 2023 14:24:24 -0500 Subject: [PATCH 31/33] polish --- src/apps/msg_gen/main.cc | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index d25f8e98..3dcc8526 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -302,30 +302,29 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { auto next = std::chrono::steady_clock::now() + thread_ctx.time_limit; std::deque backlog; - while (true) { - if (g_keep_running == 0) { - LOG(INFO) << "MsgGenLoop: Exiting."; - break; - } - int64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx); - if (rx_window_slot < 0) { - while (true) { + while (g_keep_running) { + auto rx_window_slot = ClientRecvOneBlocking(&thread_ctx); + + if (rx_window_slot <= 0) { + // Inner loop to handle the case where no message is received + while (g_keep_running) { rx_window_slot = ClientRecvOneBlocking(&thread_ctx); if (rx_window_slot > 0) break; + if (std::chrono::steady_clock::now() > next) { - // timeout but no message received yet --> increase window + // Handle timeout scenario + next = std::chrono::steady_clock::now() + thread_ctx.time_limit; auto next_window = ++FLAGS_msg_window; thread_ctx.msg_latency_info_vec.resize(next_window); backlog.push_back(next_window); ClientSendOne(&thread_ctx, backlog.front()); backlog.pop_front(); - next = std::chrono::steady_clock::now() + thread_ctx.time_limit; } - if (g_keep_running == 0) continue; } } - // msg received, if time limit passed send next msg from backlog + if (g_keep_running == 0) break; + // Check if the time limit has passed and a message is received if (std::chrono::steady_clock::now() > next) { ClientSendOne(&thread_ctx, backlog.front()); backlog.pop_front(); @@ -334,6 +333,7 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { backlog.push_back(rx_window_slot); ReportStats(&thread_ctx); } + LOG(INFO) << "MsgGenLoop: Exiting."; auto &stats_cur = thread_ctx.stats.current; LOG(INFO) << "Application Statistics (TOTAL) - [TX] Sent: " From 2a6c68e4619146578a05c5366b96bdc647b7c046 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Wed, 15 Nov 2023 17:18:47 -0500 Subject: [PATCH 32/33] minor changes to improve code quality --- src/apps/msg_gen/main.cc | 71 ++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 42 deletions(-) diff --git a/src/apps/msg_gen/main.cc b/src/apps/msg_gen/main.cc index 3dcc8526..1ae637ef 100644 --- a/src/apps/msg_gen/main.cc +++ b/src/apps/msg_gen/main.cc @@ -239,53 +239,40 @@ void ClientSendOne(ThreadCtx *thread_ctx, uint64_t window_slot) { } // Return the window slot for which a response was received -int64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) { +int64_t ClientRecvOne(ThreadCtx *thread_ctx) { const auto *channel_ctx = thread_ctx->channel_ctx; - while (true) { - if (g_keep_running == 0) { - LOG(INFO) << "ClientRecvOneBlocking: Exiting."; - return 0; - } + MachnetFlow_t rx_flow; + const ssize_t rx_size = + machnet_recv(channel_ctx, thread_ctx->rx_message.data(), + thread_ctx->rx_message.size(), &rx_flow, FLAGS_blocking); + if (rx_size <= 0) return -1; - MachnetFlow_t rx_flow; - const ssize_t rx_size = - machnet_recv(channel_ctx, thread_ctx->rx_message.data(), - thread_ctx->rx_message.size(), &rx_flow, FLAGS_blocking); - if (rx_size <= 0) return -1; - - thread_ctx->stats.current.rx_count++; - thread_ctx->stats.current.rx_bytes += rx_size; - - const auto *msg_hdr = - reinterpret_cast(thread_ctx->rx_message.data()); - if (msg_hdr->window_slot > FLAGS_msg_window) { - LOG(ERROR) << "Received invalid window slot: " << msg_hdr->window_slot; - continue; - } + thread_ctx->stats.current.rx_count++; + thread_ctx->stats.current.rx_bytes += rx_size; - const size_t latency_us = - thread_ctx->RecordRequestEnd(msg_hdr->window_slot); - VLOG(1) << "Client: Received message for window slot " - << msg_hdr->window_slot << " in " << latency_us << " us"; - - if (FLAGS_verify) { - for (uint32_t i = sizeof(msg_hdr_t); i < rx_size; i++) { - if (thread_ctx->rx_message[i] != thread_ctx->message_gold[i]) { - LOG(ERROR) << "Message data mismatch at index " << i << std::hex - << " " << static_cast(thread_ctx->rx_message[i]) - << " " - << static_cast(thread_ctx->message_gold[i]); - break; - } + const auto *msg_hdr = + reinterpret_cast(thread_ctx->rx_message.data()); + if (msg_hdr->window_slot > FLAGS_msg_window) { + LOG(ERROR) << "Received invalid window slot: " << msg_hdr->window_slot; + abort(); + } + + const size_t latency_us = thread_ctx->RecordRequestEnd(msg_hdr->window_slot); + VLOG(1) << "Client: Received message for window slot " << msg_hdr->window_slot + << " in " << latency_us << " us"; + + if (FLAGS_verify) { + for (uint32_t i = sizeof(msg_hdr_t); i < rx_size; i++) { + if (thread_ctx->rx_message[i] != thread_ctx->message_gold[i]) { + LOG(ERROR) << "Message data mismatch at index " << i << std::hex << " " + << static_cast(thread_ctx->rx_message[i]) << " " + << static_cast(thread_ctx->message_gold[i]); + break; } } - - return msg_hdr->window_slot; } - - LOG(FATAL) << "Should not reach here"; - return 0; + return msg_hdr->window_slot; } void ClientLoop(void *channel_ctx, MachnetFlow *flow) { @@ -304,12 +291,12 @@ void ClientLoop(void *channel_ctx, MachnetFlow *flow) { std::deque backlog; while (g_keep_running) { - auto rx_window_slot = ClientRecvOneBlocking(&thread_ctx); + auto rx_window_slot = ClientRecvOne(&thread_ctx); if (rx_window_slot <= 0) { // Inner loop to handle the case where no message is received while (g_keep_running) { - rx_window_slot = ClientRecvOneBlocking(&thread_ctx); + rx_window_slot = ClientRecvOne(&thread_ctx); if (rx_window_slot > 0) break; if (std::chrono::steady_clock::now() > next) { From 5c1b94f0f2d7581aa95c23f38b87524627a1a098 Mon Sep 17 00:00:00 2001 From: vjabrayilov Date: Thu, 16 Nov 2023 16:00:28 -0500 Subject: [PATCH 33/33] increase default ring size To accommodate CPU pinning experiments --- src/include/channel.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/include/channel.h b/src/include/channel.h index 54c7acd7..8d631b06 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -527,7 +527,7 @@ template