Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,26 @@ void msgq_close_queue(msgq_queue_t *q){
}
}

static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR2);
#endif
}

void msgq_init_publisher(msgq_queue_t * q) {
//std::cout << "Starting publisher" << std::endl;
uint64_t uid = msgq_get_uid();

// Notify readers to connect
uint64_t num_readers = *q->num_readers;
for (uint64_t i = 0; i < num_readers; i++){
uint64_t reader_uid = *q->read_uids[i];
thread_signal(reader_uid & 0xFFFFFFFF);
}

*q->write_uid = uid;
*q->num_readers = 0;

Expand All @@ -158,15 +173,6 @@ void msgq_init_publisher(msgq_queue_t * q) {
q->write_uid_local = uid;
}

static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR2);
#endif
}

void msgq_init_subscriber(msgq_queue_t * q) {
assert(q != NULL);
assert(q->num_readers != NULL);
Expand Down