From 9a2ab34706d32c3fd12fb7d014302228b2ca39db Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Wed, 14 Feb 2024 17:57:37 -0800 Subject: [PATCH 1/4] stash --- messaging/impl_msgq.cc | 1 + messaging/msgq.cc | 27 +++++++++++++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index 8f2c10a08..70c89f44b 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -155,6 +155,7 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en } q = new msgq_queue_t; + printf("endpoint: %s\n", endpoint.c_str()); int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); if (r != 0){ return r; diff --git a/messaging/msgq.cc b/messaging/msgq.cc index af93bbf60..d994f6574 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -142,6 +142,14 @@ void msgq_close_queue(msgq_queue_t *q){ } } +static void thread_signal(uint32_t tid) { + #ifndef SYS_tkill + // TODO: this won't work for multithreaded programs + kill(tid, SIGUSR2); + #else + syscall(SYS_tkill, tid, SIGUSR2); + #endif +} void msgq_init_publisher(msgq_queue_t * q) { //std::cout << "Starting publisher" << std::endl; @@ -156,15 +164,15 @@ void msgq_init_publisher(msgq_queue_t * q) { } q->write_uid_local = uid; -} -static void thread_signal(uint32_t tid) { - #ifndef SYS_tkill - // TODO: this won't work for multithreaded programs - kill(tid, SIGUSR2); - #else - syscall(SYS_tkill, tid, SIGUSR2); - #endif + // Notify readers new +// uint64_t num_readers = *q->num_readers; +// std::cout << "Notifying " << num_readers << " readers" << std::endl; +// for (uint64_t i = 0; i < num_readers; i++){ +//// std::cout << "notifying reader: " << i << std::endl; +// uint64_t reader_uid = *q->read_uids[i]; +// thread_signal(reader_uid & 0xFFFFFFFF); +// } } void msgq_init_subscriber(msgq_queue_t * q) { @@ -459,10 +467,13 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ bool msgq_all_readers_updated(msgq_queue_t *q) { uint64_t num_readers = *q->num_readers; +// std::cout << "all_readers_updated: " << num_readers << std::endl; for (uint64_t i = 0; i < num_readers; i++) { if (*q->read_valids[i] && *q->write_pointer != *q->read_pointers[i]) { +// std::cout << "returning false"; return false; } } +// std::cout << "here"; return num_readers > 0; } From d0a0099aab43d2cca34552f2f30344a13f16c392 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Wed, 14 Feb 2024 17:58:33 -0800 Subject: [PATCH 2/4] needs to be before reader uid reset --- messaging/msgq.cc | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index d994f6574..d7d8d590e 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -155,6 +155,13 @@ void msgq_init_publisher(msgq_queue_t * q) { //std::cout << "Starting publisher" << std::endl; uint64_t uid = msgq_get_uid(); + // Notify readers to connect + uint64_t num_readers = *q->num_readers; + for (uint64_t i = 0; i < num_readers; i++){ + uint64_t reader_uid = *q->read_uids[i]; + thread_signal(reader_uid & 0xFFFFFFFF); + } + *q->write_uid = uid; *q->num_readers = 0; @@ -164,15 +171,6 @@ void msgq_init_publisher(msgq_queue_t * q) { } q->write_uid_local = uid; - - // Notify readers new -// uint64_t num_readers = *q->num_readers; -// std::cout << "Notifying " << num_readers << " readers" << std::endl; -// for (uint64_t i = 0; i < num_readers; i++){ -//// std::cout << "notifying reader: " << i << std::endl; -// uint64_t reader_uid = *q->read_uids[i]; -// thread_signal(reader_uid & 0xFFFFFFFF); -// } } void msgq_init_subscriber(msgq_queue_t * q) { From 384c6a566d3c48badf614b4f9a86963600cb6bc6 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Wed, 14 Feb 2024 19:11:01 -0800 Subject: [PATCH 3/4] clean up --- messaging/msgq.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index d7d8d590e..a06db165a 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -465,13 +465,10 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ bool msgq_all_readers_updated(msgq_queue_t *q) { uint64_t num_readers = *q->num_readers; -// std::cout << "all_readers_updated: " << num_readers << std::endl; for (uint64_t i = 0; i < num_readers; i++) { if (*q->read_valids[i] && *q->write_pointer != *q->read_pointers[i]) { -// std::cout << "returning false"; return false; } } -// std::cout << "here"; return num_readers > 0; } From 7fa84db484401836f67a58fac64cf429e54a0583 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Wed, 14 Feb 2024 19:11:32 -0800 Subject: [PATCH 4/4] Update messaging/impl_msgq.cc --- messaging/impl_msgq.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index 70c89f44b..8f2c10a08 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -155,7 +155,6 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en } q = new msgq_queue_t; - printf("endpoint: %s\n", endpoint.c_str()); int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); if (r != 0){ return r;