diff --git a/messaging/msgq.cc b/messaging/msgq.cc index af93bbf60..a06db165a 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -142,11 +142,26 @@ void msgq_close_queue(msgq_queue_t *q){ } } +static void thread_signal(uint32_t tid) { + #ifndef SYS_tkill + // TODO: this won't work for multithreaded programs + kill(tid, SIGUSR2); + #else + syscall(SYS_tkill, tid, SIGUSR2); + #endif +} void msgq_init_publisher(msgq_queue_t * q) { //std::cout << "Starting publisher" << std::endl; uint64_t uid = msgq_get_uid(); + // Notify readers to connect + uint64_t num_readers = *q->num_readers; + for (uint64_t i = 0; i < num_readers; i++){ + uint64_t reader_uid = *q->read_uids[i]; + thread_signal(reader_uid & 0xFFFFFFFF); + } + *q->write_uid = uid; *q->num_readers = 0; @@ -158,15 +173,6 @@ void msgq_init_publisher(msgq_queue_t * q) { q->write_uid_local = uid; } -static void thread_signal(uint32_t tid) { - #ifndef SYS_tkill - // TODO: this won't work for multithreaded programs - kill(tid, SIGUSR2); - #else - syscall(SYS_tkill, tid, SIGUSR2); - #endif -} - void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL);