Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/common/capio/response_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#ifndef CAPIO_RESPONSE_QUEUE_HPP
#define CAPIO_RESPONSE_QUEUE_HPP

#include <iostream>
#include <mutex>

#include <semaphore.h>

#include "capio/env.hpp"
#include "capio/logger.hpp"
#include "capio/semaphore.hpp"
#include "capio/shm.hpp"

/**
* @brief Efficient implementation of class Queue for offset responses
*
* @tparam T Type of data that is being transported
*/
class ResponseQueue {
void *_shm;
const std::string _shm_name;
bool require_cleanup;
NamedSemaphore _shared_mutex;

public:
explicit ResponseQueue(const std::string &shm_name, bool cleanup = true)
: _shm_name(get_capio_workflow_name() + "_" + shm_name), require_cleanup(cleanup),
_shared_mutex(get_capio_workflow_name() + SHM_SEM_ELEMS + shm_name, 0, cleanup) {
START_LOG(capio_syscall(SYS_gettid), "call(shm_name=%s, workflow_name=%s, cleanup=%s)",
shm_name.data(), get_capio_workflow_name().data(), cleanup ? "yes" : "no");
#ifdef __CAPIO_POSIX
syscall_no_intercept_flag = true;
#endif

_shm = get_shm_if_exist(_shm_name);
if (_shm == nullptr) {
_shm = create_shm(_shm_name, sizeof(capio_off64_t));
}
#ifdef __CAPIO_POSIX
syscall_no_intercept_flag = false;
#endif
}

ResponseQueue(const ResponseQueue &) = delete;
ResponseQueue &operator=(const ResponseQueue &) = delete;
~ResponseQueue() {
START_LOG(capio_syscall(SYS_gettid), "call(_shm_name=%s)", _shm_name.c_str());
if (require_cleanup) {
LOG("Performing cleanup of allocated resources");
#ifdef __CAPIO_POSIX
syscall_no_intercept_flag = true;
#endif
SHM_DESTROY_CHECK(_shm_name.c_str());
#ifdef __CAPIO_POSIX
syscall_no_intercept_flag = false;
#endif
}
}

auto read() {
_shared_mutex.lock();
return *static_cast<capio_off64_t *>(_shm);
}

void write(const capio_off64_t data) {
*static_cast<capio_off64_t *>(_shm) = data;
_shared_mutex.unlock();
}
};
#endif // CAPIO_RESPONSE_QUEUE_HPP
3 changes: 1 addition & 2 deletions src/posix/utils/cache/consent_request_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ class ConsentRequestCache {
sprintf(req, "%04d %ld %s %s", CAPIO_REQUEST_CONSENT, tid, path.c_str(),
source_func.c_str());
buf_requests->write(req, CAPIO_REQ_MAX_SIZE);
capio_off64_t res;
bufs_response->at(tid)->read(&res);
capio_off64_t res = bufs_response->at(tid)->read();
LOG("Obtained from server %llu", res);
return res;
}
Expand Down
9 changes: 4 additions & 5 deletions src/posix/utils/cache/read_request_cache_fs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ class ReadRequestCacheFS {
sprintf(req, "%04d %ld %ld %s %ld", CAPIO_REQUEST_READ, tid, fd, path.c_str(), end_of_Read);
LOG("Sending read request %s", req);
buf_requests->write(req, CAPIO_REQ_MAX_SIZE);
capio_off64_t res;
bufs_response->at(tid)->read(&res);
const capio_off64_t res = bufs_response->at(tid)->read();
LOG("Response to request is %llu", res);
return res;
}
Expand All @@ -29,7 +28,7 @@ class ReadRequestCacheFS {

~ReadRequestCacheFS() { delete available_read_cache; };

void read_request(std::filesystem::path path, long end_of_read, int tid, int fd) {
void read_request(std::filesystem::path path, const long end_of_read, int tid, const int fd) {
START_LOG(capio_syscall(SYS_gettid), "[cache] call(path=%s, end_of_read=%ld, tid=%ld)",
path.c_str(), end_of_read, tid);
if (fd != current_fd || path.compare(current_path) != 0) {
Expand All @@ -38,8 +37,8 @@ class ReadRequestCacheFS {
current_path = std::move(path);
current_fd = fd;

auto item = available_read_cache->find(current_path);
if (item != available_read_cache->end()) {
if (const auto item = available_read_cache->find(current_path);
item != available_read_cache->end()) {
LOG("[cache] Found file entry in cache");
max_read = item->second;
} else {
Expand Down
3 changes: 1 addition & 2 deletions src/posix/utils/cache/read_request_cache_mem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ class ReadRequestCacheMEM {
count, _max_line_size, get_capio_fd_path(fd).c_str());
LOG("Sending read request %s", req);
buf_requests->write(req, CAPIO_REQ_MAX_SIZE);
capio_off64_t stc_queue_read;
bufs_response->at(tid)->read(&stc_queue_read);
capio_off64_t stc_queue_read = bufs_response->at(tid)->read();
LOG("Response to request is %ld", stc_queue_read);

// FIXME: if count > _max_line_size, a deadlock or SEGFAULT is foreseen Fix it asap.
Expand Down
3 changes: 1 addition & 2 deletions src/posix/utils/clone.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ inline void init_process(pid_t tid) {

syscall_no_intercept_flag = true;

auto *p_buf_response = new CircularBuffer<capio_off64_t>(
SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), CAPIO_REQ_BUFF_CNT, sizeof(capio_off64_t));
auto *p_buf_response = new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid));
bufs_response->insert(std::make_pair(tid, p_buf_response));

LOG("Created request response buffer with name: %s",
Expand Down
13 changes: 7 additions & 6 deletions src/posix/utils/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <utility>

#include "capio/requests.hpp"
#include <capio/queue.hpp>
#include <capio/response_queue.hpp>

#include "env.hpp"
#include "filesystem.hpp"
Expand All @@ -12,7 +14,7 @@
inline thread_local std::vector<std::regex> *paths_to_store_in_memory = nullptr;

inline CircularBuffer<char> *buf_requests;
inline CPBufResponse_t *bufs_response;
inline std::unordered_map<long, ResponseQueue *> *bufs_response;

inline thread_local SPSCQueue *cts_queue;
inline thread_local SPSCQueue *stc_queue;
Expand All @@ -27,7 +29,7 @@ inline void init_client() {

buf_requests =
new CircularBuffer<char>(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE);
bufs_response = new CPBufResponse_t();
bufs_response = new std::unordered_map<long, ResponseQueue *>();
}

/**
Expand Down Expand Up @@ -55,11 +57,11 @@ inline void handshake_request(const long tid, const long pid, const std::string
inline std::vector<std::regex> *file_in_memory_request(const long pid) {
START_LOG(capio_syscall(SYS_gettid), "call(pid=%ld)", pid);
char req[CAPIO_REQ_MAX_SIZE];
capio_off64_t files_to_read_from_queue = 0;

sprintf(req, "%04d %ld ", CAPIO_REQUEST_QUERY_MEM_FILE, pid);
buf_requests->write(req, CAPIO_REQ_MAX_SIZE);
LOG("Sent query for which file to store in memory");
bufs_response->at(pid)->read(&files_to_read_from_queue, sizeof(files_to_read_from_queue));
capio_off64_t files_to_read_from_queue = bufs_response->at(pid)->read();
LOG("Need to read %llu files from data queues", files_to_read_from_queue);
const auto regex_vector = new std::vector<std::regex>;
for (int i = 0; i < files_to_read_from_queue; i++) {
Expand Down Expand Up @@ -107,8 +109,7 @@ inline void open_request(const int fd, const std::filesystem::path &path, const
char req[CAPIO_REQ_MAX_SIZE];
sprintf(req, "%04d %ld %d %s", CAPIO_REQUEST_OPEN, tid, fd, path.c_str());
buf_requests->write(req, CAPIO_REQ_MAX_SIZE);
capio_off64_t res;
bufs_response->at(tid)->read(&res);
capio_off64_t res = bufs_response->at(tid)->read();
LOG("Obtained from server %llu", res);
}

Expand Down
12 changes: 6 additions & 6 deletions src/server/client-manager/client_manager.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#ifndef CLIENT_MANAGER_HPP
#define CLIENT_MANAGER_HPP
#include <capio/response_queue.hpp>

/**
* @brief Class to handle libcapio_posix clients applications
*/
class ClientManager {
CSBufResponse_t *bufs_response;
std::unordered_map<long, ResponseQueue *> *bufs_response;
std::unordered_map<int, const std::string> *app_names;

/**
Expand All @@ -16,7 +17,7 @@ class ClientManager {
public:
ClientManager() {
START_LOG(gettid(), "call()");
bufs_response = new CSBufResponse_t();
bufs_response = new std::unordered_map<long, ResponseQueue *>();
app_names = new std::unordered_map<int, const std::string>;
files_created_by_producer = new std::unordered_map<pid_t, std::vector<std::string> *>;
std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] "
Expand All @@ -40,9 +41,8 @@ class ClientManager {
inline void register_client(const std::string &app_name, pid_t tid) const {
START_LOG(gettid(), "call(tid=%ld, app_name=%s)", tid, app_name.c_str());
// TODO: replace numbers with constexpr
auto *p_buf_response = new CircularBuffer<capio_off64_t>(
SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), CAPIO_REQ_BUFF_CNT,
sizeof(capio_off64_t), workflow_name, false);
auto *p_buf_response =
new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid), false);

bufs_response->insert(std::make_pair(tid, p_buf_response));
app_names->emplace(tid, app_name);
Expand Down Expand Up @@ -72,7 +72,7 @@ class ClientManager {
void reply_to_client(const pid_t tid, const capio_off64_t offset) const {
START_LOG(gettid(), "call(tid=%ld, offset=%llu)", tid, offset);
if (const auto out = bufs_response->find(tid); out != bufs_response->end()) {
out->second->write(&offset);
out->second->write(offset);
return;
}
LOG("Err: no such buffer for provided tid");
Expand Down
17 changes: 9 additions & 8 deletions tests/unit/server/src/CapioCacheSPSCQueueTests.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#ifndef CAPIOCACHESPSCQUEUETESTS_HPP
#define CAPIOCACHESPSCQUEUETESTS_HPP

#include "../common/capio/response_queue.hpp"
#include "../posix/utils/env.hpp"
#include "../posix/utils/filesystem.hpp"
#include "../posix/utils/types.hpp"

#include "storage-service/CapioFile/CapioMemoryFile.hpp"

inline SPSCQueue *cts_queue, *stc_queue;
inline CPBufResponse_t *bufs_response;
inline std::unordered_map<long, ResponseQueue *> *bufs_response;
inline CircularBuffer<char> *buf_requests;

#include "../posix/utils/cache.hpp"
Expand All @@ -22,7 +24,7 @@ std::string test_file_name = "test.dat";
void init_server_data_structures() {
writeCache = new WriteRequestCacheMEM();
readCache = new ReadRequestCacheMEM();
bufs_response = new CPBufResponse_t;
bufs_response = new std::unordered_map<long, ResponseQueue *>();
files = new CPFiles_t();
capio_files_descriptors = new CPFileDescriptors_t();
cts_queue = new SPSCQueue("queue-tests.cts", get_cache_lines(), get_cache_line_size());
Expand All @@ -31,9 +33,8 @@ void init_server_data_structures() {
new CircularBuffer<char>(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE);

auto tid = gettid();
bufs_response->insert(std::make_pair(
tid, new CircularBuffer<capio_off64_t>(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid),
CAPIO_REQ_BUFF_CNT, sizeof(capio_off64_t))));
bufs_response->insert(
std::make_pair(tid, new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid))));
}

void delete_server_data_structures() {
Expand Down Expand Up @@ -260,7 +261,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueRead) {
auto size_to_send =
read_size < client_cache_line_size ? read_size : client_cache_line_size;

bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t));
bufs_response->at(response_tid)->write(size_to_send);
stc_queue->write(SOURCE_TEST_TEXT + read_begin_offset, size_to_send);
total_data_sent += size_to_send;
iteration++;
Expand Down Expand Up @@ -311,7 +312,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFile) {
auto size_to_send =
read_size < client_cache_line_size ? read_size : client_cache_line_size;

bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t));
bufs_response->at(response_tid)->write(size_to_send);
testFile.writeToQueue(*stc_queue, read_begin_offset, size_to_send);
total_data_sent += size_to_send;
iteration++;
Expand Down Expand Up @@ -362,7 +363,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFileAndSeek) {
auto size_to_send =
read_size < client_cache_line_size ? read_size : client_cache_line_size;

bufs_response->at(response_tid)->write(&size_to_send, sizeof(capio_off64_t));
bufs_response->at(response_tid)->write(size_to_send);
testFile.writeToQueue(*stc_queue, read_begin_offset, size_to_send);
total_data_sent += size_to_send;
iteration++;
Expand Down