-
Notifications
You must be signed in to change notification settings - Fork 5
Open
Description
The code excerpt below triggers an assertion failure. The error messages are related to eventfd.
ERROR: tll.channel.reader_channel: Failed to read from eventfd: Resource temporarily unavailable
ERROR: tll.channel.reader_channel: Failed to clear event
ERROR: tll.channel.reader_channel: Process failed
#include "gtest/gtest.h"
#include "tll/channel/base.h"
#include <thread>
#include <mutex>
TEST(MemChannel, Mem)
{
tll::Logger::set("tll", tll::Logger::Info, true);
tll::Logger::set("tll.channel.writer_channel", tll::Logger::Critical, true);
auto ctx = tll::channel::Context(tll::Config());
auto base_url = *tll::Channel::Url::parse("mem://;size=32kb;framed=yes;fd=yes;dump=no");
std::mutex mtx;
auto writer_url = base_url.copy();
writer_url.set("name", "writer_channel");
writer_url.set("dir", "out");
auto reader_url = base_url.copy();
reader_url.set("name", "reader_channel");
reader_url.set("dir", "in");
reader_url.set("master", "writer_channel");
constexpr int64_t max_seq = 50000;
for (int iter = 0; iter < 1000; ++iter) {
std::atomic_int start_reader_latch={0};
std::atomic_int stop_writer_latch={0};
std::thread writer_thread {[&](){
std::unique_ptr<tll::Channel> writer;
{
std::lock_guard<std::mutex> lk(mtx);
writer = ctx.channel(writer_url);
}
writer->open("");
for (int64_t i = 0; i < max_seq+1; ++i) {
if (i==100) {
start_reader_latch.store(1);
}
tll_msg_t msg;
msg.type = TLL_MESSAGE_DATA;
msg.data = &i;
msg.size = 8;
msg.seq = i;
while (writer->post(&msg)==EAGAIN) {
std::this_thread::yield();
}
}
while (stop_writer_latch.load()==0) {
std::this_thread::yield();
}
return;
}};
std::thread reader_thread {[&](){
while (start_reader_latch.load()==0) {
std::this_thread::yield();
}
std::unique_ptr<tll::Channel> reader;
{
std::lock_guard<std::mutex> lk(mtx);
reader = ctx.channel(reader_url);
}
int64_t last_seq = -1;
auto cb = [](const tll_channel_t *, const tll_msg_t *m, void * user) -> int {
int64_t& seq = *(int64_t*)user;
seq = std::max(seq, (int64_t)m->seq);
return 0;
};
reader->callback_add(cb, &last_seq, TLL_MESSAGE_MASK_DATA);
reader->open("");
while (reader->state()!=tll::state::Active)
reader->process(100);
do {
int process_res = reader->process(100);
if (last_seq==max_seq) {
stop_writer_latch.store(1);
break;
}
if (process_res==EAGAIN) {
std::this_thread::yield();
continue;
}
if (process_res==0) {
continue;
}
ASSERT_EQ(process_res, 0);
} while (1);
return;
}};
writer_thread.join();
reader_thread.join();
}
}
Metadata
Metadata
Assignees
Labels
No labels