Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ the [CAPIO-CL Docs](https://capio.hpc4ai.it/docs/coord-language/) for details.

To launch your workflow with capio you can follow two routes:

#### A) Use `capiorun` for simplfied operations
#### A) Use `capiorun` for simplified operations

You can simplify the execution of workflow steps with CAPIO using the `capiorun` utility. See the
[`capiorun` documentation](capiorun/readme.md) for usage and examples. `capiorun` provides an easier way to manage
Expand Down
2 changes: 1 addition & 1 deletion src/common/capio/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ inline auto open_server_logfile() {
std::to_string(capio_syscall(SYS_gettid)) + ".log";

logfile.open(logfile_name, std::ofstream::out);
delete[] hostname;
capio_delete_vec(&hostname);

return logfile_name;
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/capio/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ template <class T, class Mutex> class Queue {

_sem_num_elems.lock();

std::lock_guard<Mutex> lg(_mutex);
const std::lock_guard<Mutex> lg(_mutex);
T *segment = reinterpret_cast<char *>(_shm) + *_first_elem;
*_first_elem = (*_first_elem + _elem_size) % _buff_size;

Expand Down Expand Up @@ -145,7 +145,7 @@ template <class T, class Mutex> class Queue {

_sem_num_empty.lock();

std::lock_guard<Mutex> lg(_mutex);
const std::lock_guard<Mutex> lg(_mutex);
T *segment = reinterpret_cast<char *>(_shm) + *_last_elem;
*_last_elem = (*_last_elem + _elem_size) % _buff_size;

Expand Down
2 changes: 1 addition & 1 deletion src/common/capio/shm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class CapioShmCanary {
auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)];
sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data());
std::cout << CAPIO_SERVER_CLI_LOG_SERVER_ERROR << message << std::endl;
delete[] message;
capio_delete_vec(&message);
#endif
ERR_EXIT("ERR: shm canary flag already exists");
}
Expand Down
48 changes: 48 additions & 0 deletions src/common/capio/utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#ifndef UTILS_H
#define UTILS_H
#include <capio/constants.hpp>
#include <iostream>
#include <unistd.h>

template <typename T> void capio_delete(T **ptr) {
if (*ptr != nullptr) {
delete *ptr;
*ptr = nullptr;
}
#ifndef __CAPIO_POSIX
else {
char nodename[HOST_NAME_MAX]{0};
gethostname(nodename, HOST_NAME_MAX);
std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << nodename << " ] "
<< "Double delete detected! avoided segfault..." << std::endl;
}
#endif
}

template <typename T> void capio_delete_vec(T **ptr) {
if (*ptr != nullptr) {
delete[] *ptr;
*ptr = nullptr;
}
#ifndef __CAPIO_POSIX
else {
char nodename[HOST_NAME_MAX]{0};
gethostname(nodename, HOST_NAME_MAX);
std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << nodename << " ] "
<< "Double delete[] detected! avoided segfault..." << std::endl;
}
#endif
}

#define lockguard_guard(expr) \
try { \
expr; \
} catch (const std::system_error &e) { \
char nodename[HOST_NAME_MAX]{0}; \
gethostname(nodename, HOST_NAME_MAX); \
std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << nodename << " ] " \
<< "Danger! caught possible deadlock due to already acquired semaphore" \
<< std::endl; \
}

#endif // UTILS_H
2 changes: 1 addition & 1 deletion src/posix/handlers/exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int exit_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg
LOG("Removed caches");

if (const auto itm = bufs_response->find(tid); itm != bufs_response->end()) {
delete itm->second;
capio_delete(&itm->second);
bufs_response->erase(tid);
LOG("Removed response buffer");
}
Expand Down
42 changes: 24 additions & 18 deletions src/posix/handlers/posix_readdir.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
#include <utils/requests.hpp>

// Map &DIR -> <dir_path, files already served>
inline std::unordered_map<unsigned long int, std::pair<std::string, int>> opened_directory;
inline std::unordered_map<unsigned long int, std::pair<std::string, int>> *opened_directory =
nullptr;

inline std::unordered_map<std::string, std::vector<dirent64 *> *> *directory_items;

Expand Down Expand Up @@ -49,8 +50,7 @@ inline int count_files_in_directory(const char *path) {
LOG("Directory abs path = %s", dir_abs_path.c_str());
}

if (auto directory_object = directory_items->find(dir_abs_path.c_str());
directory_object == directory_items->end()) {
if (directory_items->find(path) == directory_items->end()) {
LOG("Directory vector not present. Adding it at path %s", path);
directory_items->emplace(path, new std::vector<dirent64 *>());
}
Expand Down Expand Up @@ -108,6 +108,10 @@ DIR *opendir(const char *name) {
directory_items = new std::unordered_map<std::string, std::vector<dirent64 *> *>();
}

if (opened_directory == nullptr) {
opened_directory = new std::unordered_map<unsigned long, std::pair<std::string, int>>();
}

if (!is_capio_path(absolute_path)) {
LOG("Not a CAPIO path. continuing execution");
syscall_no_intercept_flag = true;
Expand All @@ -125,7 +129,7 @@ DIR *opendir(const char *name) {
syscall_no_intercept_flag = false;

LOG("Opened directory with offset %ld", dir);
opened_directory.insert(
opened_directory->insert(
{reinterpret_cast<unsigned long int>(dir), {std::string(absolute_path), 0}});
directory_items->emplace(std::string(absolute_path), new std::vector<dirent64 *>());

Expand All @@ -150,17 +154,18 @@ int closedir(DIR *dirp) {
}
}

if (const auto pos = opened_directory.find(reinterpret_cast<unsigned long int>(dirp));
pos != opened_directory.end()) {

if (const auto pos = opened_directory->find(reinterpret_cast<unsigned long int>(dirp));
pos != opened_directory->end()) {
LOG("Closing directory with path %s", pos->second.first.c_str());
close_request(pos->second.first.c_str(), capio_syscall(SYS_gettid));
syscall_no_intercept_flag = true;
delete_capio_fd(dirfd(dirp));
syscall_no_intercept_flag = false;

if (auto pos1 = directory_items->find(pos->second.first); pos1 != directory_items->end()) {
directory_items->erase(pos1);
}
opened_directory.erase(pos);
opened_directory->erase(pos);
LOG("removed dir from map of opened files");
}

Expand All @@ -175,20 +180,21 @@ int closedir(DIR *dirp) {
inline struct dirent64 *capio_internal_readdir(DIR *dirp, long pid) {
START_LOG(pid, "call(dirp=%ld)", dirp);

const auto directory_path =
std::get<0>(opened_directory.at(reinterpret_cast<unsigned long int>(dirp)));
auto directory_path =
std::get<0>(opened_directory->at(reinterpret_cast<unsigned long int>(dirp)));

if (directory_commit_token_path.find(directory_path) == directory_commit_token_path.end()) {
char token_path[PATH_MAX]{0};
posix_directory_committed_request(pid, directory_path.c_str(), token_path);
LOG("Commit token path was not found for path %s", directory_path.c_str());
auto token_path = new char[PATH_MAX]{0};
posix_directory_committed_request(pid, directory_path, token_path);
LOG("Inserting token path %s", token_path);
directory_commit_token_path.insert({directory_path, token_path});
}

const auto token_path = directory_commit_token_path.at(directory_path);

if (const auto item = opened_directory.find(reinterpret_cast<unsigned long int>(dirp));
item != opened_directory.end() || std::filesystem::exists(token_path)) {
if (const auto item = opened_directory->find(reinterpret_cast<unsigned long int>(dirp));
item != opened_directory->end() || std::filesystem::exists(token_path)) {
LOG("Found dirp.");
const auto dir_path_name = std::get<0>(item->second);
const auto capio_internal_offset = std::get<1>(item->second);
Expand Down Expand Up @@ -252,8 +258,8 @@ struct dirent *readdir(DIR *dirp) {
syscall_no_intercept_flag = false;
}

if (opened_directory.find(reinterpret_cast<unsigned long int>(dirp)) ==
opened_directory.end()) {
if (opened_directory->find(reinterpret_cast<unsigned long int>(dirp)) ==
opened_directory->end()) {
LOG("Directory is not handled by CAPIO. Returning false");
syscall_no_intercept_flag = true;
auto result = real_readdir(dirp);
Expand All @@ -279,8 +285,8 @@ struct dirent64 *readdir64(DIR *dirp) {
syscall_no_intercept_flag = false;
}

if (opened_directory.find(reinterpret_cast<unsigned long int>(dirp)) ==
opened_directory.end()) {
if (opened_directory->find(reinterpret_cast<unsigned long int>(dirp)) ==
opened_directory->end()) {
LOG("Directory is not handled by CAPIO. Returning false");
syscall_no_intercept_flag = true;
auto result = real_readdir64(dirp);
Expand Down
2 changes: 2 additions & 0 deletions src/posix/libcapio_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* logs up to CAPIO_MAX_LOG_LEVEL function calls
*/

#include <capio/utils.h>

#include <array>
#include <string>

Expand Down
14 changes: 7 additions & 7 deletions src/posix/utils/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ inline void init_caches() {

inline void delete_caches() {
START_LOG(capio_syscall(SYS_gettid), "call()");
delete write_request_cache_fs;
delete read_request_cache_fs;
delete consent_request_cache_fs;
delete write_request_cache_mem;
delete read_request_cache_mem;
capio_delete(&write_request_cache_fs);
capio_delete(&read_request_cache_fs);
capio_delete(&consent_request_cache_fs);
capio_delete(&write_request_cache_mem);
capio_delete(&read_request_cache_mem);

delete cts_queue;
capio_delete(&cts_queue);
LOG("Removed cts_queue");
delete stc_queue;
capio_delete(&stc_queue);
LOG("Removed stc_queue");
}

Expand Down
2 changes: 1 addition & 1 deletion src/posix/utils/cache/consent_request_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class ConsentRequestCache {

~ConsentRequestCache() {
START_LOG(capio_syscall(SYS_gettid), "call()");
delete available_consent;
capio_delete(&available_consent);
};

void consent_request(const std::filesystem::path &path, long tid,
Expand Down
2 changes: 1 addition & 1 deletion src/posix/utils/cache/read_request_cache_fs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ReadRequestCacheFS {

~ReadRequestCacheFS() {
START_LOG(capio_syscall(SYS_gettid), "call()");
delete available_read_cache;
capio_delete(&available_read_cache);
};

void read_request(std::filesystem::path path, const long end_of_read, int tid, const int fd) {
Expand Down
2 changes: 1 addition & 1 deletion src/posix/utils/cache/read_request_cache_mem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ReadRequestCacheMEM {

~ReadRequestCacheMEM() {
START_LOG(capio_syscall(SYS_gettid), "call()");
delete[] _cache;
capio_delete_vec(&_cache);
}

void flush() {
Expand Down
6 changes: 3 additions & 3 deletions src/posix/utils/clone.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ inline std::condition_variable clone_cv;
inline std::unordered_set<pid_t> *tids;

inline bool is_capio_tid(const pid_t tid) {
const std::lock_guard<std::mutex> lg(clone_mutex);
lockguard_guard(const std::lock_guard lg(clone_mutex));
return tids->find(tid) != tids->end();
}

inline void register_capio_tid(const pid_t tid) {
START_LOG(syscall_no_intercept(SYS_gettid), "call(tid=%ld)", tid);
const std::lock_guard<std::mutex> lg(clone_mutex);
lockguard_guard(const std::lock_guard lg(clone_mutex));
tids->insert(tid);
}

inline void remove_capio_tid(const pid_t tid) {
START_LOG(syscall_no_intercept(SYS_gettid), "call(tid=%ld)", tid);
const std::lock_guard<std::mutex> lg(clone_mutex);
lockguard_guard(std::lock_guard lg(clone_mutex));
if (tids->find(tid) != tids->end()) {
tids->erase(tid);
}
Expand Down
6 changes: 3 additions & 3 deletions src/posix/utils/filesystem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ inline void delete_capio_path(const std::string &path) {
*/
inline void destroy_filesystem() {
current_dir.reset();
delete capio_files_descriptors;
delete capio_files_paths;
delete files;
capio_delete(&capio_files_descriptors);
capio_delete(&capio_files_paths);
capio_delete(&files);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/posix/utils/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ inline std::vector<std::regex> *file_in_memory_request(const long pid) {
stc_queue->read(file, PATH_MAX);
LOG("Obtained path %s", file);
regex_vector->emplace_back(generateCapioRegex(file));
delete[] file;
capio_delete_vec(&file);
}
return regex_vector;
}
Expand Down
1 change: 0 additions & 1 deletion src/server/capio-cl-engine/capio_cl_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ class CapioCLEngine {
}

bool isFile(const std::string &path) const {
START_LOG(gettid(), "call(path=%s)", path.c_str());
if (const auto itm = _locations.find(path); itm != _locations.end()) {
return std::get<6>(itm->second);
}
Expand Down
4 changes: 3 additions & 1 deletion src/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ std::string workflow_name;
inline bool StoreOnlyInMemory = false;
char node_name[HOST_NAME_MAX];

#include <capio/utils.h>

#include "utils/types.hpp"

#include "capio/env.hpp"
Expand Down Expand Up @@ -174,7 +176,7 @@ std::string parseCLI(int argc, char **argv) {
<< "LOG_LEVEL set to: " << CAPIO_LOG_LEVEL << std::endl;
std::cout << CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING;
log->log("LOG_LEVEL set to: %d", CAPIO_LOG_LEVEL);
delete log;
capio_delete(&log);
#else
if (std::getenv("CAPIO_LOG_LEVEL") != nullptr) {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] "
Expand Down
8 changes: 4 additions & 4 deletions src/server/client-manager/client_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ class ClientManager {

~ClientManager() {
START_LOG(gettid(), "call()");
delete bufs_response;
delete app_names;
delete files_created_by_producer;
capio_delete(&bufs_response);
capio_delete(&app_names);
capio_delete(&files_created_by_producer);
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] "
<< "buf_response cleanup completed" << std::endl;
}
Expand Down Expand Up @@ -57,7 +57,7 @@ class ClientManager {
inline void remove_client(pid_t tid) const {
START_LOG(gettid(), "call(tid=%ld)", tid);
if (const auto it_resp = bufs_response->find(tid); it_resp != bufs_response->end()) {
delete it_resp->second;
capio_delete(&it_resp->second);
bufs_response->erase(it_resp);
}
files_created_by_producer->erase(tid);
Expand Down
10 changes: 3 additions & 7 deletions src/server/client-manager/request_handler_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class RequestHandlerEngine {
} else {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] "
<< "Received invalid code: " << code << std::endl;
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] "
<< "Offending request: " << ptr << " / " << req << std::endl;
ERR_EXIT("Invalid request %d%s", code, ptr);
}
return code;
Expand All @@ -91,7 +93,7 @@ class RequestHandlerEngine {

~RequestHandlerEngine() {
START_LOG(gettid(), "call()");
delete buf_requests;
capio_delete(&buf_requests);

std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] "
<< "buf_requests cleanup completed" << std::endl;
Expand All @@ -111,12 +113,6 @@ class RequestHandlerEngine {
while (true) {
LOG(CAPIO_LOG_SERVER_REQUEST_START);
int code = read_next_request(str.get());
if (code < 0 || code > CAPIO_NR_REQUESTS) {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] "
<< "Received invalid code: " << code << std::endl;

ERR_EXIT("Error: received invalid request code");
}
request_handlers[code](str.get());
LOG(CAPIO_LOG_SERVER_REQUEST_END);
}
Expand Down
Loading
Loading