From 4a24887ba67c5eeb7d7176337c705c43855aea42 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Fri, 11 Apr 2025 16:01:11 +0200 Subject: [PATCH] Reduces SHM element allocation This commit introduces a new category of response queue for requests which uses two, instead of five, shared memory object for each instanced element. The size is fixed to 64 bit, and can only be used to reply to the posix application, offsets or numerical values that fits within 64 bits of space. --- src/common/capio/response_queue.hpp | 70 +++++++++++++++++++ .../utils/cache/consent_request_cache.hpp | 3 +- .../utils/cache/read_request_cache_fs.hpp | 9 ++- .../utils/cache/read_request_cache_mem.hpp | 3 +- src/posix/utils/clone.hpp | 3 +- src/posix/utils/requests.hpp | 13 ++-- src/server/client-manager/client_manager.hpp | 12 ++-- .../server/src/CapioCacheSPSCQueueTests.hpp | 17 ++--- 8 files changed, 99 insertions(+), 31 deletions(-) create mode 100644 src/common/capio/response_queue.hpp diff --git a/src/common/capio/response_queue.hpp b/src/common/capio/response_queue.hpp new file mode 100644 index 000000000..0f73458b5 --- /dev/null +++ b/src/common/capio/response_queue.hpp @@ -0,0 +1,70 @@ +#ifndef CAPIO_RESPONSE_QUEUE_HPP +#define CAPIO_RESPONSE_QUEUE_HPP + +#include +#include + +#include + +#include "capio/env.hpp" +#include "capio/logger.hpp" +#include "capio/semaphore.hpp" +#include "capio/shm.hpp" + +/** + * @brief Efficient implementation of class Queue for offset responses + * + * @tparam T Type of data that is being transported + */ +class ResponseQueue { + void *_shm; + const std::string _shm_name; + bool require_cleanup; + NamedSemaphore _shared_mutex; + + public: + explicit ResponseQueue(const std::string &shm_name, bool cleanup = true) + : _shm_name(get_capio_workflow_name() + "_" + shm_name), require_cleanup(cleanup), + _shared_mutex(get_capio_workflow_name() + SHM_SEM_ELEMS + shm_name, 0, cleanup) { + START_LOG(capio_syscall(SYS_gettid), "call(shm_name=%s, workflow_name=%s, cleanup=%s)", + shm_name.data(), get_capio_workflow_name().data(), cleanup ? "yes" : "no"); +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = true; +#endif + + _shm = get_shm_if_exist(_shm_name); + if (_shm == nullptr) { + _shm = create_shm(_shm_name, sizeof(capio_off64_t)); + } +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = false; +#endif + } + + ResponseQueue(const ResponseQueue &) = delete; + ResponseQueue &operator=(const ResponseQueue &) = delete; + ~ResponseQueue() { + START_LOG(capio_syscall(SYS_gettid), "call(_shm_name=%s)", _shm_name.c_str()); + if (require_cleanup) { + LOG("Performing cleanup of allocated resources"); +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = true; +#endif + SHM_DESTROY_CHECK(_shm_name.c_str()); +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = false; +#endif + } + } + + auto read() { + _shared_mutex.lock(); + return *static_cast(_shm); + } + + void write(const capio_off64_t data) { + *static_cast(_shm) = data; + _shared_mutex.unlock(); + } +}; +#endif // CAPIO_RESPONSE_QUEUE_HPP diff --git a/src/posix/utils/cache/consent_request_cache.hpp b/src/posix/utils/cache/consent_request_cache.hpp index 6e1cc7637..022eab605 100644 --- a/src/posix/utils/cache/consent_request_cache.hpp +++ b/src/posix/utils/cache/consent_request_cache.hpp @@ -14,8 +14,7 @@ class ConsentRequestCache { sprintf(req, "%04d %ld %s %s", CAPIO_REQUEST_CONSENT, tid, path.c_str(), source_func.c_str()); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); - capio_off64_t res; - bufs_response->at(tid)->read(&res); + capio_off64_t res = bufs_response->at(tid)->read(); LOG("Obtained from server %llu", res); return res; } diff --git a/src/posix/utils/cache/read_request_cache_fs.hpp b/src/posix/utils/cache/read_request_cache_fs.hpp index 9047f6485..4e1d184ff 100644 --- a/src/posix/utils/cache/read_request_cache_fs.hpp +++ b/src/posix/utils/cache/read_request_cache_fs.hpp @@ -16,8 +16,7 @@ class ReadRequestCacheFS { sprintf(req, "%04d %ld %ld %s %ld", CAPIO_REQUEST_READ, tid, fd, path.c_str(), end_of_Read); LOG("Sending read request %s", req); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); - capio_off64_t res; - bufs_response->at(tid)->read(&res); + const capio_off64_t res = bufs_response->at(tid)->read(); LOG("Response to request is %llu", res); return res; } @@ -29,7 +28,7 @@ class ReadRequestCacheFS { ~ReadRequestCacheFS() { delete available_read_cache; }; - void read_request(std::filesystem::path path, long end_of_read, int tid, int fd) { + void read_request(std::filesystem::path path, const long end_of_read, int tid, const int fd) { START_LOG(capio_syscall(SYS_gettid), "[cache] call(path=%s, end_of_read=%ld, tid=%ld)", path.c_str(), end_of_read, tid); if (fd != current_fd || path.compare(current_path) != 0) { @@ -38,8 +37,8 @@ class ReadRequestCacheFS { current_path = std::move(path); current_fd = fd; - auto item = available_read_cache->find(current_path); - if (item != available_read_cache->end()) { + if (const auto item = available_read_cache->find(current_path); + item != available_read_cache->end()) { LOG("[cache] Found file entry in cache"); max_read = item->second; } else { diff --git a/src/posix/utils/cache/read_request_cache_mem.hpp b/src/posix/utils/cache/read_request_cache_mem.hpp index a49465ba4..29930efb2 100644 --- a/src/posix/utils/cache/read_request_cache_mem.hpp +++ b/src/posix/utils/cache/read_request_cache_mem.hpp @@ -38,8 +38,7 @@ class ReadRequestCacheMEM { count, _max_line_size, get_capio_fd_path(fd).c_str()); LOG("Sending read request %s", req); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); - capio_off64_t stc_queue_read; - bufs_response->at(tid)->read(&stc_queue_read); + capio_off64_t stc_queue_read = bufs_response->at(tid)->read(); LOG("Response to request is %ld", stc_queue_read); // FIXME: if count > _max_line_size, a deadlock or SEGFAULT is foreseen Fix it asap. diff --git a/src/posix/utils/clone.hpp b/src/posix/utils/clone.hpp index 81398be32..0795c8c0d 100644 --- a/src/posix/utils/clone.hpp +++ b/src/posix/utils/clone.hpp @@ -38,8 +38,7 @@ inline void init_process(pid_t tid) { syscall_no_intercept_flag = true; - auto *p_buf_response = new CircularBuffer( - SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), CAPIO_REQ_BUFF_CNT, sizeof(capio_off64_t)); + auto *p_buf_response = new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid)); bufs_response->insert(std::make_pair(tid, p_buf_response)); LOG("Created request response buffer with name: %s", diff --git a/src/posix/utils/requests.hpp b/src/posix/utils/requests.hpp index 28041de17..97b236236 100644 --- a/src/posix/utils/requests.hpp +++ b/src/posix/utils/requests.hpp @@ -4,6 +4,8 @@ #include #include "capio/requests.hpp" +#include +#include #include "env.hpp" #include "filesystem.hpp" @@ -12,7 +14,7 @@ inline thread_local std::vector *paths_to_store_in_memory = nullptr; inline CircularBuffer *buf_requests; -inline CPBufResponse_t *bufs_response; +inline std::unordered_map *bufs_response; inline thread_local SPSCQueue *cts_queue; inline thread_local SPSCQueue *stc_queue; @@ -27,7 +29,7 @@ inline void init_client() { buf_requests = new CircularBuffer(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE); - bufs_response = new CPBufResponse_t(); + bufs_response = new std::unordered_map(); } /** @@ -55,11 +57,11 @@ inline void handshake_request(const long tid, const long pid, const std::string inline std::vector *file_in_memory_request(const long pid) { START_LOG(capio_syscall(SYS_gettid), "call(pid=%ld)", pid); char req[CAPIO_REQ_MAX_SIZE]; - capio_off64_t files_to_read_from_queue = 0; + sprintf(req, "%04d %ld ", CAPIO_REQUEST_QUERY_MEM_FILE, pid); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); LOG("Sent query for which file to store in memory"); - bufs_response->at(pid)->read(&files_to_read_from_queue, sizeof(files_to_read_from_queue)); + capio_off64_t files_to_read_from_queue = bufs_response->at(pid)->read(); LOG("Need to read %llu files from data queues", files_to_read_from_queue); const auto regex_vector = new std::vector; for (int i = 0; i < files_to_read_from_queue; i++) { @@ -107,8 +109,7 @@ inline void open_request(const int fd, const std::filesystem::path &path, const char req[CAPIO_REQ_MAX_SIZE]; sprintf(req, "%04d %ld %d %s", CAPIO_REQUEST_OPEN, tid, fd, path.c_str()); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); - capio_off64_t res; - bufs_response->at(tid)->read(&res); + capio_off64_t res = bufs_response->at(tid)->read(); LOG("Obtained from server %llu", res); } diff --git a/src/server/client-manager/client_manager.hpp b/src/server/client-manager/client_manager.hpp index a233b48e6..ac42d37bf 100644 --- a/src/server/client-manager/client_manager.hpp +++ b/src/server/client-manager/client_manager.hpp @@ -1,11 +1,12 @@ #ifndef CLIENT_MANAGER_HPP #define CLIENT_MANAGER_HPP +#include /** * @brief Class to handle libcapio_posix clients applications */ class ClientManager { - CSBufResponse_t *bufs_response; + std::unordered_map *bufs_response; std::unordered_map *app_names; /** @@ -16,7 +17,7 @@ class ClientManager { public: ClientManager() { START_LOG(gettid(), "call()"); - bufs_response = new CSBufResponse_t(); + bufs_response = new std::unordered_map(); app_names = new std::unordered_map; files_created_by_producer = new std::unordered_map *>; std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " @@ -40,9 +41,8 @@ class ClientManager { inline void register_client(const std::string &app_name, pid_t tid) const { START_LOG(gettid(), "call(tid=%ld, app_name=%s)", tid, app_name.c_str()); // TODO: replace numbers with constexpr - auto *p_buf_response = new CircularBuffer( - SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), CAPIO_REQ_BUFF_CNT, - sizeof(capio_off64_t), workflow_name, false); + auto *p_buf_response = + new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), false); bufs_response->insert(std::make_pair(tid, p_buf_response)); app_names->emplace(tid, app_name); @@ -72,7 +72,7 @@ class ClientManager { void reply_to_client(const pid_t tid, const capio_off64_t offset) const { START_LOG(gettid(), "call(tid=%ld, offset=%llu)", tid, offset); if (const auto out = bufs_response->find(tid); out != bufs_response->end()) { - out->second->write(&offset); + out->second->write(offset); return; } LOG("Err: no such buffer for provided tid"); diff --git a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp index 4a61a4de6..6912875c9 100644 --- a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp +++ b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp @@ -1,13 +1,15 @@ #ifndef CAPIOCACHESPSCQUEUETESTS_HPP #define CAPIOCACHESPSCQUEUETESTS_HPP +#include "../common/capio/response_queue.hpp" #include "../posix/utils/env.hpp" #include "../posix/utils/filesystem.hpp" #include "../posix/utils/types.hpp" + #include "storage-service/CapioFile/CapioMemoryFile.hpp" inline SPSCQueue *cts_queue, *stc_queue; -inline CPBufResponse_t *bufs_response; +inline std::unordered_map *bufs_response; inline CircularBuffer *buf_requests; #include "../posix/utils/cache.hpp" @@ -22,7 +24,7 @@ std::string test_file_name = "test.dat"; void init_server_data_structures() { writeCache = new WriteRequestCacheMEM(); readCache = new ReadRequestCacheMEM(); - bufs_response = new CPBufResponse_t; + bufs_response = new std::unordered_map(); files = new CPFiles_t(); capio_files_descriptors = new CPFileDescriptors_t(); cts_queue = new SPSCQueue("queue-tests.cts", get_cache_lines(), get_cache_line_size()); @@ -31,9 +33,8 @@ void init_server_data_structures() { new CircularBuffer(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE); auto tid = gettid(); - bufs_response->insert(std::make_pair( - tid, new CircularBuffer(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), - CAPIO_REQ_BUFF_CNT, sizeof(capio_off64_t)))); + bufs_response->insert( + std::make_pair(tid, new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid)))); } void delete_server_data_structures() { @@ -260,7 +261,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueRead) { auto size_to_send = read_size < client_cache_line_size ? read_size : client_cache_line_size; - bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t)); + bufs_response->at(response_tid)->write(size_to_send); stc_queue->write(SOURCE_TEST_TEXT + read_begin_offset, size_to_send); total_data_sent += size_to_send; iteration++; @@ -311,7 +312,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFile) { auto size_to_send = read_size < client_cache_line_size ? read_size : client_cache_line_size; - bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t)); + bufs_response->at(response_tid)->write(size_to_send); testFile.writeToQueue(*stc_queue, read_begin_offset, size_to_send); total_data_sent += size_to_send; iteration++; @@ -362,7 +363,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFileAndSeek) { auto size_to_send = read_size < client_cache_line_size ? read_size : client_cache_line_size; - bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t)); + bufs_response->at(response_tid)->write(size_to_send); testFile.writeToQueue(*stc_queue, read_begin_offset, size_to_send); total_data_sent += size_to_send; iteration++;