Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en
}

q = new msgq_queue_t;
int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE, true);
if (r != 0){
return r;
}
Expand Down
14 changes: 13 additions & 1 deletion messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "cereal/messaging/msgq.h"


void sigusr2_handler(int signal) {
assert(signal == SIGUSR2);
}
Expand Down Expand Up @@ -83,7 +84,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
return;
}

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preallocate) {
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
std::signal(SIGUSR2, sigusr2_handler);

Expand All @@ -105,6 +106,17 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
close(fd);
return -1;
}

if (preallocate && (std::getenv("MSGQ_PREALLOCATE") != nullptr)) {
do {
rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t));
} while (rc == EINTR);
if (rc < 0){
close(fd);
return -1;
}
}

char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);

Expand Down
2 changes: 1 addition & 1 deletion messaging/msgq.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ int msgq_msg_init_size(msgq_msg_t *msg, size_t size);
int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size);
int msgq_msg_close(msgq_msg_t *msg);

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size);
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preallocate = false);
void msgq_close_queue(msgq_queue_t *q);
void msgq_init_publisher(msgq_queue_t * q);
void msgq_init_subscriber(msgq_queue_t * q);
Expand Down
16 changes: 15 additions & 1 deletion services.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,18 @@ def build_header():


if __name__ == "__main__":
print(build_header())
#print(build_header())

# get ms
import capnp
from cereal import log
for k, v in SERVICE_LIST.items():
sz = None
dat = log.Event.new_message()
try:
dat.init(k)
sz = dat.total_size.word_count*8
except capnp.lib.capnp.KjException:
# TODO: lists
pass
print(k.ljust(30), sz)