diff --git a/src/common/capio/response_queue.hpp b/src/common/capio/response_queue.hpp new file mode 100644 index 000000000..0f73458b5 --- /dev/null +++ b/src/common/capio/response_queue.hpp @@ -0,0 +1,70 @@ +#ifndef CAPIO_RESPONSE_QUEUE_HPP +#define CAPIO_RESPONSE_QUEUE_HPP + +#include +#include + +#include + +#include "capio/env.hpp" +#include "capio/logger.hpp" +#include "capio/semaphore.hpp" +#include "capio/shm.hpp" + +/** + * @brief Efficient implementation of class Queue for offset responses + * + * @tparam T Type of data that is being transported + */ +class ResponseQueue { + void *_shm; + const std::string _shm_name; + bool require_cleanup; + NamedSemaphore _shared_mutex; + + public: + explicit ResponseQueue(const std::string &shm_name, bool cleanup = true) + : _shm_name(get_capio_workflow_name() + "_" + shm_name), require_cleanup(cleanup), + _shared_mutex(get_capio_workflow_name() + SHM_SEM_ELEMS + shm_name, 0, cleanup) { + START_LOG(capio_syscall(SYS_gettid), "call(shm_name=%s, workflow_name=%s, cleanup=%s)", + shm_name.data(), get_capio_workflow_name().data(), cleanup ? "yes" : "no"); +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = true; +#endif + + _shm = get_shm_if_exist(_shm_name); + if (_shm == nullptr) { + _shm = create_shm(_shm_name, sizeof(capio_off64_t)); + } +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = false; +#endif + } + + ResponseQueue(const ResponseQueue &) = delete; + ResponseQueue &operator=(const ResponseQueue &) = delete; + ~ResponseQueue() { + START_LOG(capio_syscall(SYS_gettid), "call(_shm_name=%s)", _shm_name.c_str()); + if (require_cleanup) { + LOG("Performing cleanup of allocated resources"); +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = true; +#endif + SHM_DESTROY_CHECK(_shm_name.c_str()); +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = false; +#endif + } + } + + auto read() { + _shared_mutex.lock(); + return *static_cast(_shm); + } + + void write(const capio_off64_t data) { + *static_cast(_shm) = data; + _shared_mutex.unlock(); + } +}; +#endif // CAPIO_RESPONSE_QUEUE_HPP diff --git a/src/posix/utils/cache/consent_request_cache.hpp b/src/posix/utils/cache/consent_request_cache.hpp index 6e1cc7637..022eab605 100644 --- a/src/posix/utils/cache/consent_request_cache.hpp +++ b/src/posix/utils/cache/consent_request_cache.hpp @@ -14,8 +14,7 @@ class ConsentRequestCache { sprintf(req, "%04d %ld %s %s", CAPIO_REQUEST_CONSENT, tid, path.c_str(), source_func.c_str()); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); - capio_off64_t res; - bufs_response->at(tid)->read(&res); + capio_off64_t res = bufs_response->at(tid)->read(); LOG("Obtained from server %llu", res); return res; } diff --git a/src/posix/utils/cache/read_request_cache_fs.hpp b/src/posix/utils/cache/read_request_cache_fs.hpp index 9047f6485..4e1d184ff 100644 --- a/src/posix/utils/cache/read_request_cache_fs.hpp +++ b/src/posix/utils/cache/read_request_cache_fs.hpp @@ -16,8 +16,7 @@ class ReadRequestCacheFS { sprintf(req, "%04d %ld %ld %s %ld", CAPIO_REQUEST_READ, tid, fd, path.c_str(), end_of_Read); LOG("Sending read request %s", req); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); - capio_off64_t res; - bufs_response->at(tid)->read(&res); + const capio_off64_t res = bufs_response->at(tid)->read(); LOG("Response to request is %llu", res); return res; } @@ -29,7 +28,7 @@ class ReadRequestCacheFS { ~ReadRequestCacheFS() { delete available_read_cache; }; - void read_request(std::filesystem::path path, long end_of_read, int tid, int fd) { + void read_request(std::filesystem::path path, const long end_of_read, int tid, const int fd) { START_LOG(capio_syscall(SYS_gettid), "[cache] call(path=%s, end_of_read=%ld, tid=%ld)", path.c_str(), end_of_read, tid); if (fd != current_fd || path.compare(current_path) != 0) { @@ -38,8 +37,8 @@ class ReadRequestCacheFS { current_path = std::move(path); current_fd = fd; - auto item = available_read_cache->find(current_path); - if (item != available_read_cache->end()) { + if (const auto item = available_read_cache->find(current_path); + item != available_read_cache->end()) { LOG("[cache] Found file entry in cache"); max_read = item->second; } else { diff --git a/src/posix/utils/cache/read_request_cache_mem.hpp b/src/posix/utils/cache/read_request_cache_mem.hpp index a49465ba4..29930efb2 100644 --- a/src/posix/utils/cache/read_request_cache_mem.hpp +++ b/src/posix/utils/cache/read_request_cache_mem.hpp @@ -38,8 +38,7 @@ class ReadRequestCacheMEM { count, _max_line_size, get_capio_fd_path(fd).c_str()); LOG("Sending read request %s", req); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); - capio_off64_t stc_queue_read; - bufs_response->at(tid)->read(&stc_queue_read); + capio_off64_t stc_queue_read = bufs_response->at(tid)->read(); LOG("Response to request is %ld", stc_queue_read); // FIXME: if count > _max_line_size, a deadlock or SEGFAULT is foreseen Fix it asap. diff --git a/src/posix/utils/clone.hpp b/src/posix/utils/clone.hpp index 81398be32..0795c8c0d 100644 --- a/src/posix/utils/clone.hpp +++ b/src/posix/utils/clone.hpp @@ -38,8 +38,7 @@ inline void init_process(pid_t tid) { syscall_no_intercept_flag = true; - auto *p_buf_response = new CircularBuffer( - SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), CAPIO_REQ_BUFF_CNT, sizeof(capio_off64_t)); + auto *p_buf_response = new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid)); bufs_response->insert(std::make_pair(tid, p_buf_response)); LOG("Created request response buffer with name: %s", diff --git a/src/posix/utils/requests.hpp b/src/posix/utils/requests.hpp index 28041de17..97b236236 100644 --- a/src/posix/utils/requests.hpp +++ b/src/posix/utils/requests.hpp @@ -4,6 +4,8 @@ #include #include "capio/requests.hpp" +#include +#include #include "env.hpp" #include "filesystem.hpp" @@ -12,7 +14,7 @@ inline thread_local std::vector *paths_to_store_in_memory = nullptr; inline CircularBuffer *buf_requests; -inline CPBufResponse_t *bufs_response; +inline std::unordered_map *bufs_response; inline thread_local SPSCQueue *cts_queue; inline thread_local SPSCQueue *stc_queue; @@ -27,7 +29,7 @@ inline void init_client() { buf_requests = new CircularBuffer(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE); - bufs_response = new CPBufResponse_t(); + bufs_response = new std::unordered_map(); } /** @@ -55,11 +57,11 @@ inline void handshake_request(const long tid, const long pid, const std::string inline std::vector *file_in_memory_request(const long pid) { START_LOG(capio_syscall(SYS_gettid), "call(pid=%ld)", pid); char req[CAPIO_REQ_MAX_SIZE]; - capio_off64_t files_to_read_from_queue = 0; + sprintf(req, "%04d %ld ", CAPIO_REQUEST_QUERY_MEM_FILE, pid); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); LOG("Sent query for which file to store in memory"); - bufs_response->at(pid)->read(&files_to_read_from_queue, sizeof(files_to_read_from_queue)); + capio_off64_t files_to_read_from_queue = bufs_response->at(pid)->read(); LOG("Need to read %llu files from data queues", files_to_read_from_queue); const auto regex_vector = new std::vector; for (int i = 0; i < files_to_read_from_queue; i++) { @@ -107,8 +109,7 @@ inline void open_request(const int fd, const std::filesystem::path &path, const char req[CAPIO_REQ_MAX_SIZE]; sprintf(req, "%04d %ld %d %s", CAPIO_REQUEST_OPEN, tid, fd, path.c_str()); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); - capio_off64_t res; - bufs_response->at(tid)->read(&res); + capio_off64_t res = bufs_response->at(tid)->read(); LOG("Obtained from server %llu", res); } diff --git a/src/server/client-manager/client_manager.hpp b/src/server/client-manager/client_manager.hpp index a233b48e6..ac42d37bf 100644 --- a/src/server/client-manager/client_manager.hpp +++ b/src/server/client-manager/client_manager.hpp @@ -1,11 +1,12 @@ #ifndef CLIENT_MANAGER_HPP #define CLIENT_MANAGER_HPP +#include /** * @brief Class to handle libcapio_posix clients applications */ class ClientManager { - CSBufResponse_t *bufs_response; + std::unordered_map *bufs_response; std::unordered_map *app_names; /** @@ -16,7 +17,7 @@ class ClientManager { public: ClientManager() { START_LOG(gettid(), "call()"); - bufs_response = new CSBufResponse_t(); + bufs_response = new std::unordered_map(); app_names = new std::unordered_map; files_created_by_producer = new std::unordered_map *>; std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " @@ -40,9 +41,8 @@ class ClientManager { inline void register_client(const std::string &app_name, pid_t tid) const { START_LOG(gettid(), "call(tid=%ld, app_name=%s)", tid, app_name.c_str()); // TODO: replace numbers with constexpr - auto *p_buf_response = new CircularBuffer( - SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), CAPIO_REQ_BUFF_CNT, - sizeof(capio_off64_t), workflow_name, false); + auto *p_buf_response = + new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), false); bufs_response->insert(std::make_pair(tid, p_buf_response)); app_names->emplace(tid, app_name); @@ -72,7 +72,7 @@ class ClientManager { void reply_to_client(const pid_t tid, const capio_off64_t offset) const { START_LOG(gettid(), "call(tid=%ld, offset=%llu)", tid, offset); if (const auto out = bufs_response->find(tid); out != bufs_response->end()) { - out->second->write(&offset); + out->second->write(offset); return; } LOG("Err: no such buffer for provided tid"); diff --git a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp index 4a61a4de6..6912875c9 100644 --- a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp +++ b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp @@ -1,13 +1,15 @@ #ifndef CAPIOCACHESPSCQUEUETESTS_HPP #define CAPIOCACHESPSCQUEUETESTS_HPP +#include "../common/capio/response_queue.hpp" #include "../posix/utils/env.hpp" #include "../posix/utils/filesystem.hpp" #include "../posix/utils/types.hpp" + #include "storage-service/CapioFile/CapioMemoryFile.hpp" inline SPSCQueue *cts_queue, *stc_queue; -inline CPBufResponse_t *bufs_response; +inline std::unordered_map *bufs_response; inline CircularBuffer *buf_requests; #include "../posix/utils/cache.hpp" @@ -22,7 +24,7 @@ std::string test_file_name = "test.dat"; void init_server_data_structures() { writeCache = new WriteRequestCacheMEM(); readCache = new ReadRequestCacheMEM(); - bufs_response = new CPBufResponse_t; + bufs_response = new std::unordered_map(); files = new CPFiles_t(); capio_files_descriptors = new CPFileDescriptors_t(); cts_queue = new SPSCQueue("queue-tests.cts", get_cache_lines(), get_cache_line_size()); @@ -31,9 +33,8 @@ void init_server_data_structures() { new CircularBuffer(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE); auto tid = gettid(); - bufs_response->insert(std::make_pair( - tid, new CircularBuffer(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), - CAPIO_REQ_BUFF_CNT, sizeof(capio_off64_t)))); + bufs_response->insert( + std::make_pair(tid, new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid)))); } void delete_server_data_structures() { @@ -260,7 +261,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueRead) { auto size_to_send = read_size < client_cache_line_size ? read_size : client_cache_line_size; - bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t)); + bufs_response->at(response_tid)->write(size_to_send); stc_queue->write(SOURCE_TEST_TEXT + read_begin_offset, size_to_send); total_data_sent += size_to_send; iteration++; @@ -311,7 +312,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFile) { auto size_to_send = read_size < client_cache_line_size ? read_size : client_cache_line_size; - bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t)); + bufs_response->at(response_tid)->write(size_to_send); testFile.writeToQueue(*stc_queue, read_begin_offset, size_to_send); total_data_sent += size_to_send; iteration++; @@ -362,7 +363,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFileAndSeek) { auto size_to_send = read_size < client_cache_line_size ? read_size : client_cache_line_size; - bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t)); + bufs_response->at(response_tid)->write(size_to_send); testFile.writeToQueue(*stc_queue, read_begin_offset, size_to_send); total_data_sent += size_to_send; iteration++;