From ce6b21dd70e4b22ea54c400e116317df169d1d0b Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Mon, 30 Jun 2025 16:46:17 +0200 Subject: [PATCH 1/4] Bugfixes on directory streaming related code --- src/posix/handlers/posix_readdir.hpp | 42 +++++++++++-------- .../capio-cl-engine/capio_cl_engine.hpp | 1 - src/server/file-manager/file_manager_impl.hpp | 25 ++++++----- 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/posix/handlers/posix_readdir.hpp b/src/posix/handlers/posix_readdir.hpp index 2f9c8d8d6..f9adfc026 100644 --- a/src/posix/handlers/posix_readdir.hpp +++ b/src/posix/handlers/posix_readdir.hpp @@ -10,7 +10,8 @@ #include // Map &DIR -> -inline std::unordered_map> opened_directory; +inline std::unordered_map> *opened_directory = + nullptr; inline std::unordered_map *> *directory_items; @@ -49,8 +50,7 @@ inline int count_files_in_directory(const char *path) { LOG("Directory abs path = %s", dir_abs_path.c_str()); } - if (auto directory_object = directory_items->find(dir_abs_path.c_str()); - directory_object == directory_items->end()) { + if (directory_items->find(path) == directory_items->end()) { LOG("Directory vector not present. Adding it at path %s", path); directory_items->emplace(path, new std::vector()); } @@ -108,6 +108,10 @@ DIR *opendir(const char *name) { directory_items = new std::unordered_map *>(); } + if (opened_directory == nullptr) { + opened_directory = new std::unordered_map>(); + } + if (!is_capio_path(absolute_path)) { LOG("Not a CAPIO path. continuing execution"); syscall_no_intercept_flag = true; @@ -125,7 +129,7 @@ DIR *opendir(const char *name) { syscall_no_intercept_flag = false; LOG("Opened directory with offset %ld", dir); - opened_directory.insert( + opened_directory->insert( {reinterpret_cast(dir), {std::string(absolute_path), 0}}); directory_items->emplace(std::string(absolute_path), new std::vector()); @@ -150,9 +154,10 @@ int closedir(DIR *dirp) { } } - if (const auto pos = opened_directory.find(reinterpret_cast(dirp)); - pos != opened_directory.end()) { - + if (const auto pos = opened_directory->find(reinterpret_cast(dirp)); + pos != opened_directory->end()) { + LOG("Closing directory with path %s", pos->second.first.c_str()); + close_request(pos->second.first.c_str(), capio_syscall(SYS_gettid)); syscall_no_intercept_flag = true; delete_capio_fd(dirfd(dirp)); syscall_no_intercept_flag = false; @@ -160,7 +165,7 @@ int closedir(DIR *dirp) { if (auto pos1 = directory_items->find(pos->second.first); pos1 != directory_items->end()) { directory_items->erase(pos1); } - opened_directory.erase(pos); + opened_directory->erase(pos); LOG("removed dir from map of opened files"); } @@ -175,20 +180,21 @@ int closedir(DIR *dirp) { inline struct dirent64 *capio_internal_readdir(DIR *dirp, long pid) { START_LOG(pid, "call(dirp=%ld)", dirp); - const auto directory_path = - std::get<0>(opened_directory.at(reinterpret_cast(dirp))); + auto directory_path = + std::get<0>(opened_directory->at(reinterpret_cast(dirp))); if (directory_commit_token_path.find(directory_path) == directory_commit_token_path.end()) { - char token_path[PATH_MAX]{0}; - posix_directory_committed_request(pid, directory_path.c_str(), token_path); + LOG("Commit token path was not found for path %s", directory_path.c_str()); + auto token_path = new char[PATH_MAX]{0}; + posix_directory_committed_request(pid, directory_path, token_path); LOG("Inserting token path %s", token_path); directory_commit_token_path.insert({directory_path, token_path}); } const auto token_path = directory_commit_token_path.at(directory_path); - if (const auto item = opened_directory.find(reinterpret_cast(dirp)); - item != opened_directory.end() || std::filesystem::exists(token_path)) { + if (const auto item = opened_directory->find(reinterpret_cast(dirp)); + item != opened_directory->end() || std::filesystem::exists(token_path)) { LOG("Found dirp."); const auto dir_path_name = std::get<0>(item->second); const auto capio_internal_offset = std::get<1>(item->second); @@ -252,8 +258,8 @@ struct dirent *readdir(DIR *dirp) { syscall_no_intercept_flag = false; } - if (opened_directory.find(reinterpret_cast(dirp)) == - opened_directory.end()) { + if (opened_directory->find(reinterpret_cast(dirp)) == + opened_directory->end()) { LOG("Directory is not handled by CAPIO. Returning false"); syscall_no_intercept_flag = true; auto result = real_readdir(dirp); @@ -279,8 +285,8 @@ struct dirent64 *readdir64(DIR *dirp) { syscall_no_intercept_flag = false; } - if (opened_directory.find(reinterpret_cast(dirp)) == - opened_directory.end()) { + if (opened_directory->find(reinterpret_cast(dirp)) == + opened_directory->end()) { LOG("Directory is not handled by CAPIO. Returning false"); syscall_no_intercept_flag = true; auto result = real_readdir64(dirp); diff --git a/src/server/capio-cl-engine/capio_cl_engine.hpp b/src/server/capio-cl-engine/capio_cl_engine.hpp index 89214524c..caceabba1 100644 --- a/src/server/capio-cl-engine/capio_cl_engine.hpp +++ b/src/server/capio-cl-engine/capio_cl_engine.hpp @@ -291,7 +291,6 @@ class CapioCLEngine { } bool isFile(const std::string &path) const { - START_LOG(gettid(), "call(path=%s)", path.c_str()); if (const auto itm = _locations.find(path); itm != _locations.end()) { return std::get<6>(itm->second); } diff --git a/src/server/file-manager/file_manager_impl.hpp b/src/server/file-manager/file_manager_impl.hpp index deb99110c..dd79af0f3 100644 --- a/src/server/file-manager/file_manager_impl.hpp +++ b/src/server/file-manager/file_manager_impl.hpp @@ -343,19 +343,21 @@ inline bool CapioFileManager::isCommitted(const std::filesystem::path &path) { inline void CapioFileManager::checkFilesAwaitingCreation() { // NOTE: do not put inside here log code as it will generate a lot of useless log std::lock_guard lg(creation_mutex); + std::vector path_to_delete; - for (auto element = thread_awaiting_file_creation.begin(); - element != thread_awaiting_file_creation.end();) { - if (std::filesystem::exists(element->first)) { + for (auto element : thread_awaiting_file_creation) { + if (std::filesystem::exists(element.first)) { START_LOG(gettid(), "\n\ncall()"); - LOG("File %s exists. Unlocking thread awaiting for creation", element->first.c_str()); - CapioFileManager::_unlockThreadAwaitingCreation(element->first, element->second); + LOG("File %s exists. Unlocking thread awaiting for creation", element.first.c_str()); + CapioFileManager::_unlockThreadAwaitingCreation(element.first, element.second); LOG("Completed handling."); - element = thread_awaiting_file_creation.erase(element); - } else { - ++element; + path_to_delete.push_back(element.first); } } + + for (auto path : path_to_delete) { + thread_awaiting_file_creation.erase(path); + } } /** @@ -387,17 +389,18 @@ inline void CapioFileManager::checkFileAwaitingData() { } /** - * @brief commit firectories that have NFILES inside them if their commit rule is n_files + * @brief commit directories that have NFILES inside them if their commit rule is n_files */ inline void CapioFileManager::checkDirectoriesNFiles() const { for (const auto &path_config : capio_cl_engine->getPathsInConfig()) { - if (!capio_cl_engine->isDirectory(path_config)) { + if (capio_cl_engine->isFile(path_config)) { continue; } - START_LOG(gettid(), "call()"); + auto n_files = capio_cl_engine->getDirectoryFileCount(path_config); if (n_files > 0) { + START_LOG(gettid(), "call()"); LOG("Directory %s needs %ld files before being committed", path_config.c_str(), n_files); // There must be n_files inside the directory to commit the file From 2d8bba8ae4ea0fe4f1bf1a41415de904d2f9b31b Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Tue, 1 Jul 2025 15:47:35 +0200 Subject: [PATCH 2/4] Protected against double free errors --- src/common/capio/logger.hpp | 2 +- src/common/capio/shm.hpp | 2 +- src/common/capio/utils.h | 38 +++++++++++++++++++ src/posix/handlers/exit.hpp | 2 +- src/posix/libcapio_posix.cpp | 2 + src/posix/utils/cache.hpp | 14 +++---- .../utils/cache/consent_request_cache.hpp | 2 +- .../utils/cache/read_request_cache_fs.hpp | 2 +- .../utils/cache/read_request_cache_mem.hpp | 2 +- src/posix/utils/filesystem.hpp | 6 +-- src/posix/utils/requests.hpp | 2 +- src/server/capio_server.cpp | 4 +- src/server/client-manager/client_manager.hpp | 8 ++-- .../client-manager/request_handler_engine.hpp | 2 +- .../communication-service/MTCL_backend.hpp | 18 ++++----- src/server/file-manager/file_manager_impl.hpp | 4 +- src/server/file-manager/fs_monitor.hpp | 4 +- .../CapioFile/CapioMemoryFile.hpp | 2 +- .../storage-service/capio_storage_service.hpp | 8 ++-- src/server/utils/signals.hpp | 8 ++-- 20 files changed, 87 insertions(+), 45 deletions(-) create mode 100644 src/common/capio/utils.h diff --git a/src/common/capio/logger.hpp b/src/common/capio/logger.hpp index 43534871e..c5ec15d5c 100644 --- a/src/common/capio/logger.hpp +++ b/src/common/capio/logger.hpp @@ -55,7 +55,7 @@ inline auto open_server_logfile() { std::to_string(capio_syscall(SYS_gettid)) + ".log"; logfile.open(logfile_name, std::ofstream::out); - delete[] hostname; + capio_delete_vec(&hostname); return logfile_name; } diff --git a/src/common/capio/shm.hpp b/src/common/capio/shm.hpp index 71128a652..66d6b27c8 100644 --- a/src/common/capio/shm.hpp +++ b/src/common/capio/shm.hpp @@ -66,7 +66,7 @@ class CapioShmCanary { auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)]; sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data()); std::cout << CAPIO_SERVER_CLI_LOG_SERVER_ERROR << message << std::endl; - delete[] message; + capio_delete_vec(&message); #endif ERR_EXIT("ERR: shm canary flag already exists"); } diff --git a/src/common/capio/utils.h b/src/common/capio/utils.h new file mode 100644 index 000000000..223c8fb5a --- /dev/null +++ b/src/common/capio/utils.h @@ -0,0 +1,38 @@ +#ifndef UTILS_H +#define UTILS_H +#include +#include +#include + +template void capio_delete(T **ptr) { + if (*ptr != nullptr) { + delete *ptr; + *ptr = nullptr; + } +#ifndef __CAPIO_POSIX + else { + char nodename[HOST_NAME_MAX]{0}; + gethostname(nodename, HOST_NAME_MAX); + std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << nodename << " ] " + << "Double delete detected! avoided segfault..." << std::endl; + } +#endif +} + + +template void capio_delete_vec(T **ptr) { + if (*ptr != nullptr) { + delete[] *ptr; + *ptr = nullptr; + } +#ifndef __CAPIO_POSIX + else { + char nodename[HOST_NAME_MAX]{0}; + gethostname(nodename, HOST_NAME_MAX); + std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << nodename << " ] " + << "Double delete[] detected! avoided segfault..." << std::endl; + } +#endif +} + +#endif // UTILS_H diff --git a/src/posix/handlers/exit.hpp b/src/posix/handlers/exit.hpp index c966035fc..f8bcbb69a 100644 --- a/src/posix/handlers/exit.hpp +++ b/src/posix/handlers/exit.hpp @@ -31,7 +31,7 @@ int exit_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg LOG("Removed caches"); if (const auto itm = bufs_response->find(tid); itm != bufs_response->end()) { - delete itm->second; + capio_delete(&itm->second); bufs_response->erase(tid); LOG("Removed response buffer"); } diff --git a/src/posix/libcapio_posix.cpp b/src/posix/libcapio_posix.cpp index 93dc776eb..8a70343f5 100644 --- a/src/posix/libcapio_posix.cpp +++ b/src/posix/libcapio_posix.cpp @@ -4,6 +4,8 @@ * logs up to CAPIO_MAX_LOG_LEVEL function calls */ +#include + #include #include diff --git a/src/posix/utils/cache.hpp b/src/posix/utils/cache.hpp index 67e07977f..6a2e9060d 100644 --- a/src/posix/utils/cache.hpp +++ b/src/posix/utils/cache.hpp @@ -26,15 +26,15 @@ inline void init_caches() { inline void delete_caches() { START_LOG(capio_syscall(SYS_gettid), "call()"); - delete write_request_cache_fs; - delete read_request_cache_fs; - delete consent_request_cache_fs; - delete write_request_cache_mem; - delete read_request_cache_mem; + capio_delete(&write_request_cache_fs); + capio_delete(&read_request_cache_fs); + capio_delete(&consent_request_cache_fs); + capio_delete(&write_request_cache_mem); + capio_delete(&read_request_cache_mem); - delete cts_queue; + capio_delete(&cts_queue); LOG("Removed cts_queue"); - delete stc_queue; + capio_delete(&stc_queue); LOG("Removed stc_queue"); } diff --git a/src/posix/utils/cache/consent_request_cache.hpp b/src/posix/utils/cache/consent_request_cache.hpp index bf5cced10..b060c4e7e 100644 --- a/src/posix/utils/cache/consent_request_cache.hpp +++ b/src/posix/utils/cache/consent_request_cache.hpp @@ -26,7 +26,7 @@ class ConsentRequestCache { ~ConsentRequestCache() { START_LOG(capio_syscall(SYS_gettid), "call()"); - delete available_consent; + capio_delete(&available_consent); }; void consent_request(const std::filesystem::path &path, long tid, diff --git a/src/posix/utils/cache/read_request_cache_fs.hpp b/src/posix/utils/cache/read_request_cache_fs.hpp index d8a6f07c2..f55c674c2 100644 --- a/src/posix/utils/cache/read_request_cache_fs.hpp +++ b/src/posix/utils/cache/read_request_cache_fs.hpp @@ -28,7 +28,7 @@ class ReadRequestCacheFS { ~ReadRequestCacheFS() { START_LOG(capio_syscall(SYS_gettid), "call()"); - delete available_read_cache; + capio_delete(&available_read_cache); }; void read_request(std::filesystem::path path, const long end_of_read, int tid, const int fd) { diff --git a/src/posix/utils/cache/read_request_cache_mem.hpp b/src/posix/utils/cache/read_request_cache_mem.hpp index 16661c02a..9816dc522 100644 --- a/src/posix/utils/cache/read_request_cache_mem.hpp +++ b/src/posix/utils/cache/read_request_cache_mem.hpp @@ -73,7 +73,7 @@ class ReadRequestCacheMEM { ~ReadRequestCacheMEM() { START_LOG(capio_syscall(SYS_gettid), "call()"); - delete[] _cache; + capio_delete_vec(&_cache); } void flush() { diff --git a/src/posix/utils/filesystem.hpp b/src/posix/utils/filesystem.hpp index c64d8c4e6..1dbe26da1 100644 --- a/src/posix/utils/filesystem.hpp +++ b/src/posix/utils/filesystem.hpp @@ -156,9 +156,9 @@ inline void delete_capio_path(const std::string &path) { */ inline void destroy_filesystem() { current_dir.reset(); - delete capio_files_descriptors; - delete capio_files_paths; - delete files; + capio_delete(&capio_files_descriptors); + capio_delete(&capio_files_paths); + capio_delete(&files); } /** diff --git a/src/posix/utils/requests.hpp b/src/posix/utils/requests.hpp index 661642cad..0e6e49aec 100644 --- a/src/posix/utils/requests.hpp +++ b/src/posix/utils/requests.hpp @@ -69,7 +69,7 @@ inline std::vector *file_in_memory_request(const long pid) { stc_queue->read(file, PATH_MAX); LOG("Obtained path %s", file); regex_vector->emplace_back(generateCapioRegex(file)); - delete[] file; + capio_delete_vec(&file); } return regex_vector; } diff --git a/src/server/capio_server.cpp b/src/server/capio_server.cpp index 367b35991..78dbc6db2 100644 --- a/src/server/capio_server.cpp +++ b/src/server/capio_server.cpp @@ -28,6 +28,8 @@ std::string workflow_name; inline bool StoreOnlyInMemory = false; char node_name[HOST_NAME_MAX]; +#include + #include "utils/types.hpp" #include "capio/env.hpp" @@ -174,7 +176,7 @@ std::string parseCLI(int argc, char **argv) { << "LOG_LEVEL set to: " << CAPIO_LOG_LEVEL << std::endl; std::cout << CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING; log->log("LOG_LEVEL set to: %d", CAPIO_LOG_LEVEL); - delete log; + capio_delete(&log); #else if (std::getenv("CAPIO_LOG_LEVEL") != nullptr) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " diff --git a/src/server/client-manager/client_manager.hpp b/src/server/client-manager/client_manager.hpp index ac42d37bf..d39490533 100644 --- a/src/server/client-manager/client_manager.hpp +++ b/src/server/client-manager/client_manager.hpp @@ -26,9 +26,9 @@ class ClientManager { ~ClientManager() { START_LOG(gettid(), "call()"); - delete bufs_response; - delete app_names; - delete files_created_by_producer; + capio_delete(&bufs_response); + capio_delete(&app_names); + capio_delete(&files_created_by_producer); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " << "buf_response cleanup completed" << std::endl; } @@ -57,7 +57,7 @@ class ClientManager { inline void remove_client(pid_t tid) const { START_LOG(gettid(), "call(tid=%ld)", tid); if (const auto it_resp = bufs_response->find(tid); it_resp != bufs_response->end()) { - delete it_resp->second; + capio_delete(&it_resp->second); bufs_response->erase(it_resp); } files_created_by_producer->erase(tid); diff --git a/src/server/client-manager/request_handler_engine.hpp b/src/server/client-manager/request_handler_engine.hpp index db1c340af..a0b5855bf 100644 --- a/src/server/client-manager/request_handler_engine.hpp +++ b/src/server/client-manager/request_handler_engine.hpp @@ -91,7 +91,7 @@ class RequestHandlerEngine { ~RequestHandlerEngine() { START_LOG(gettid(), "call()"); - delete buf_requests; + capio_delete(&buf_requests); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " << "buf_requests cleanup completed" << std::endl; diff --git a/src/server/communication-service/MTCL_backend.hpp b/src/server/communication-service/MTCL_backend.hpp index 49b7f0723..431f4a413 100644 --- a/src/server/communication-service/MTCL_backend.hpp +++ b/src/server/communication-service/MTCL_backend.hpp @@ -25,7 +25,7 @@ class TransportUnit { public: TransportUnit() = default; - ~TransportUnit() { delete[] _bytes; } + ~TransportUnit() { capio_delete_vec(&_bytes); } friend class MTCL_backend; }; @@ -76,7 +76,7 @@ class MTCL_backend : public BackendInterface { HandlerPointer->send(&unit->_start_write_offset, sizeof(capio_off64_t)); LOG("[send] Sent %ld bytes of file %s with offset of %ld", unit->_buffer_size, unit->_filepath.c_str(), unit->_start_write_offset); - delete unit; + capio_delete(&unit); } /** @@ -298,9 +298,9 @@ class MTCL_backend : public BackendInterface { pthread_cancel(th->native_handle()); th->join(); - delete th; - delete continue_execution; - delete terminate; + capio_delete(&th); + capio_delete(&continue_execution); + capio_delete(&terminate); LOG("Handler closed."); @@ -331,16 +331,16 @@ class MTCL_backend : public BackendInterface { } LOG("Found incoming message"); std::lock_guard lg(*std::get<2>(interface)); - const auto inputUnit = inQueue->front(); - *buf_size = inputUnit->_buffer_size; - *start_offset = inputUnit->_start_write_offset; + auto inputUnit = inQueue->front(); + *buf_size = inputUnit->_buffer_size; + *start_offset = inputUnit->_start_write_offset; memcpy(buf, inputUnit->_bytes, *buf_size); LOG("Received buffer: %s", inputUnit->_bytes); inQueue->pop(); std::string filename(inputUnit->_filepath); - delete inputUnit; + capio_delete(&inputUnit); return filename; } diff --git a/src/server/file-manager/file_manager_impl.hpp b/src/server/file-manager/file_manager_impl.hpp index dd79af0f3..cd2d639c8 100644 --- a/src/server/file-manager/file_manager_impl.hpp +++ b/src/server/file-manager/file_manager_impl.hpp @@ -172,7 +172,7 @@ inline void CapioFileManager::_unlockThreadAwaitingData( inline void CapioFileManager::increaseCloseCount(const std::filesystem::path &path) { START_LOG(gettid(), "call(path=%s)", path.c_str()); auto metadata_path = getAndCreateMetadataPath(path); - const auto lock = new DistributedSemaphore(metadata_path + ".lock", 300); + auto lock = new DistributedSemaphore(metadata_path + ".lock", 300); long long close_count = 0; LOG("Gained mutual exclusive access to token file %s", (metadata_path + ".lock").c_str()); @@ -189,7 +189,7 @@ inline void CapioFileManager::increaseCloseCount(const std::filesystem::path &pa LOG("Updated close count to %llu", close_count); - delete lock; + capio_delete(&lock); } /** diff --git a/src/server/file-manager/fs_monitor.hpp b/src/server/file-manager/fs_monitor.hpp index 92af56dda..de2ca8f2d 100644 --- a/src/server/file-manager/fs_monitor.hpp +++ b/src/server/file-manager/fs_monitor.hpp @@ -50,8 +50,8 @@ class FileSystemMonitor { START_LOG(gettid(), "call()"); *continue_execution = false; th->join(); - delete th; - delete continue_execution; + capio_delete(&th); + capio_delete(&continue_execution); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " << "fs_monitor cleanup completed" << std::endl; } diff --git a/src/server/storage-service/CapioFile/CapioMemoryFile.hpp b/src/server/storage-service/CapioFile/CapioMemoryFile.hpp index 5fd98bd8d..3de28527f 100644 --- a/src/server/storage-service/CapioFile/CapioMemoryFile.hpp +++ b/src/server/storage-service/CapioFile/CapioMemoryFile.hpp @@ -64,7 +64,7 @@ class CapioMemoryFile : public CapioFile { cross_page_buffer_view = new char[_pageSizeBytes]; } - ~CapioMemoryFile() override { delete[] cross_page_buffer_view; } + ~CapioMemoryFile() override { capio_delete_vec(&cross_page_buffer_view); } /** * Write data to a file stored inside the memory diff --git a/src/server/storage-service/capio_storage_service.hpp b/src/server/storage-service/capio_storage_service.hpp index 88d8f1385..ff61c5ab3 100644 --- a/src/server/storage-service/capio_storage_service.hpp +++ b/src/server/storage-service/capio_storage_service.hpp @@ -43,10 +43,10 @@ class CapioStorageService { ~CapioStorageService() { // TODO: dump files to FS - delete _stored_files; - delete _client_to_server_queue; - delete _server_to_client_queue; - delete _threads_waiting_for_memory_data; + capio_delete(&_stored_files); + capio_delete(&_client_to_server_queue); + capio_delete(&_server_to_client_queue); + capio_delete(&_threads_waiting_for_memory_data); } void createFile(const std::string &file_name) const { diff --git a/src/server/utils/signals.hpp b/src/server/utils/signals.hpp index 0332cc914..51f08ae2a 100644 --- a/src/server/utils/signals.hpp +++ b/src/server/utils/signals.hpp @@ -33,10 +33,10 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { __gcov_dump(); #endif - delete request_handlers_engine; - delete fs_monitor; - delete capio_backend; - delete shm_canary; + capio_delete(&request_handlers_engine); + capio_delete(&fs_monitor); + capio_delete(&capio_backend); + capio_delete(&shm_canary); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " << "Bye!" << std::endl; exit(EXIT_SUCCESS); From f4b98ae43616f0af4b5f7c993cd820057e25fb51 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 2 Jul 2025 09:34:55 +0200 Subject: [PATCH 3/4] Protected against system errors --- src/common/capio/queue.hpp | 4 ++-- src/common/capio/utils.h | 12 +++++++++++- src/posix/utils/clone.hpp | 6 +++--- .../client-manager/request_handler_engine.hpp | 8 ++------ .../communication-service/BackendInterface.hpp | 7 +++---- .../communication-service/MTCL_backend.hpp | 16 ++++++++-------- src/server/file-manager/file_manager_impl.hpp | 10 +++++----- 7 files changed, 34 insertions(+), 29 deletions(-) diff --git a/src/common/capio/queue.hpp b/src/common/capio/queue.hpp index fcdb3ed15..73cccb414 100644 --- a/src/common/capio/queue.hpp +++ b/src/common/capio/queue.hpp @@ -109,7 +109,7 @@ template class Queue { _sem_num_elems.lock(); - std::lock_guard lg(_mutex); + const std::lock_guard lg(_mutex); T *segment = reinterpret_cast(_shm) + *_first_elem; *_first_elem = (*_first_elem + _elem_size) % _buff_size; @@ -145,7 +145,7 @@ template class Queue { _sem_num_empty.lock(); - std::lock_guard lg(_mutex); + const std::lock_guard lg(_mutex); T *segment = reinterpret_cast(_shm) + *_last_elem; *_last_elem = (*_last_elem + _elem_size) % _buff_size; diff --git a/src/common/capio/utils.h b/src/common/capio/utils.h index 223c8fb5a..5b29dbffc 100644 --- a/src/common/capio/utils.h +++ b/src/common/capio/utils.h @@ -19,7 +19,6 @@ template void capio_delete(T **ptr) { #endif } - template void capio_delete_vec(T **ptr) { if (*ptr != nullptr) { delete[] *ptr; @@ -35,4 +34,15 @@ template void capio_delete_vec(T **ptr) { #endif } +#define lockguard_guard(expr) \ + try { \ + expr; \ + } catch (const std::system_error &e) { \ + char nodename[HOST_NAME_MAX]{0}; \ + gethostname(nodename, HOST_NAME_MAX); \ + std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << nodename << " ] " \ + << "Danger! caught possible deadlock due to already acquired semaphore" \ + << std::endl; \ + } + #endif // UTILS_H diff --git a/src/posix/utils/clone.hpp b/src/posix/utils/clone.hpp index 0795c8c0d..d137c9cda 100644 --- a/src/posix/utils/clone.hpp +++ b/src/posix/utils/clone.hpp @@ -13,19 +13,19 @@ inline std::condition_variable clone_cv; inline std::unordered_set *tids; inline bool is_capio_tid(const pid_t tid) { - const std::lock_guard lg(clone_mutex); + lockguard_guard(const std::lock_guard lg(clone_mutex)); return tids->find(tid) != tids->end(); } inline void register_capio_tid(const pid_t tid) { START_LOG(syscall_no_intercept(SYS_gettid), "call(tid=%ld)", tid); - const std::lock_guard lg(clone_mutex); + lockguard_guard(const std::lock_guard lg(clone_mutex)); tids->insert(tid); } inline void remove_capio_tid(const pid_t tid) { START_LOG(syscall_no_intercept(SYS_gettid), "call(tid=%ld)", tid); - const std::lock_guard lg(clone_mutex); + lockguard_guard(std::lock_guard lg(clone_mutex)); if (tids->find(tid) != tids->end()) { tids->erase(tid); } diff --git a/src/server/client-manager/request_handler_engine.hpp b/src/server/client-manager/request_handler_engine.hpp index a0b5855bf..6aaaf1da6 100644 --- a/src/server/client-manager/request_handler_engine.hpp +++ b/src/server/client-manager/request_handler_engine.hpp @@ -71,6 +71,8 @@ class RequestHandlerEngine { } else { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " << "Received invalid code: " << code << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + << "Offending request: " << ptr << " / " << req << std::endl; ERR_EXIT("Invalid request %d%s", code, ptr); } return code; @@ -111,12 +113,6 @@ class RequestHandlerEngine { while (true) { LOG(CAPIO_LOG_SERVER_REQUEST_START); int code = read_next_request(str.get()); - if (code < 0 || code > CAPIO_NR_REQUESTS) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " - << "Received invalid code: " << code << std::endl; - - ERR_EXIT("Error: received invalid request code"); - } request_handlers[code](str.get()); LOG(CAPIO_LOG_SERVER_REQUEST_END); } diff --git a/src/server/communication-service/BackendInterface.hpp b/src/server/communication-service/BackendInterface.hpp index 1232cb323..cc37bf147 100644 --- a/src/server/communication-service/BackendInterface.hpp +++ b/src/server/communication-service/BackendInterface.hpp @@ -37,7 +37,7 @@ class BackendInterface { * @param start_offset * @return std::string hostname of sender */ - virtual std::string &receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) { + virtual std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) { throw NotImplementedBackendMethod(); }; @@ -59,9 +59,8 @@ class NoBackend final : public BackendInterface { return; }; - std::string &receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override { - auto s = std::string("no-backend"); - return s; + std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override { + return {"no-backend"}; } std::vector get_open_connections() override { return {}; } diff --git a/src/server/communication-service/MTCL_backend.hpp b/src/server/communication-service/MTCL_backend.hpp index 431f4a413..223309252 100644 --- a/src/server/communication-service/MTCL_backend.hpp +++ b/src/server/communication-service/MTCL_backend.hpp @@ -106,7 +106,7 @@ class MTCL_backend : public BackendInterface { send_unit(&HandlerPointer, unit); LOG("[send] Message sent"); - std::lock_guard lg(*mutex); + lockguard_guard(const std::lock_guard lg(*mutex)); LOG("[send] Locked guard"); out->pop(); } @@ -118,7 +118,7 @@ class MTCL_backend : public BackendInterface { LOG("[recv] Receiving data"); auto unit = receive_unit(&HandlerPointer); LOG("[recv] Lock guard"); - std::lock_guard lg(*mutex); + lockguard_guard(const std::lock_guard lg(*mutex)); in->push(unit); LOG("[recv] Pushed %ld bytes to be stored on file %s", unit->_buffer_size, unit->_filepath.c_str()); @@ -130,7 +130,7 @@ class MTCL_backend : public BackendInterface { // terminate phase if (*terminate) { - std::lock_guard lg(*mutex); + lockguard_guard(const std::lock_guard lg(*mutex)); LOG("[TERM PHASE] Locked access send and receive queues"); while (!out->empty()) { const auto unit = out->front(); @@ -168,7 +168,7 @@ class MTCL_backend : public BackendInterface { LOG("Received connection hostname: %s", connected_hostname); - std::lock_guard lock(*guard); + lockguard_guard(const std::lock_guard lock(*guard)); open_connections->insert( {connected_hostname, @@ -259,7 +259,7 @@ class MTCL_backend : public BackendInterface { << "Connected to " << remoteToken << std::endl; LOG("Connected to: %s", remoteToken.c_str()); UserManager.send(ownHostname, HOST_NAME_MAX); - std::lock_guard lg(*_guard); + lockguard_guard(const std::lock_guard lg(*_guard)); auto connection_tuple = std::make_tuple(new std::queue(), @@ -312,7 +312,7 @@ class MTCL_backend : public BackendInterface { << "MTCL backend correctly terminated" << std::endl; } - std::string &receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override { + std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override { START_LOG(gettid(), "call()"); std::queue *inQueue = nullptr; @@ -330,7 +330,7 @@ class MTCL_backend : public BackendInterface { } } LOG("Found incoming message"); - std::lock_guard lg(*std::get<2>(interface)); + lockguard_guard(const std::lock_guard lg(*std::get<2>(interface))); auto inputUnit = inQueue->front(); *buf_size = inputUnit->_buffer_size; *start_offset = inputUnit->_start_write_offset; @@ -364,7 +364,7 @@ class MTCL_backend : public BackendInterface { memcpy(outputUnit->_bytes, buf, buf_size); LOG("Copied buffer: %s", outputUnit->_bytes); - std::lock_guard lg(*std::get<2>(interface)); + lockguard_guard(const std::lock_guard lg(*std::get<2>(interface))); LOG("Pushing Transport unit to out queue"); out->push(outputUnit); } else { diff --git a/src/server/file-manager/file_manager_impl.hpp b/src/server/file-manager/file_manager_impl.hpp index cd2d639c8..dd918fea2 100644 --- a/src/server/file-manager/file_manager_impl.hpp +++ b/src/server/file-manager/file_manager_impl.hpp @@ -53,7 +53,7 @@ inline uintmax_t CapioFileManager::get_file_size_if_exists(const std::filesystem */ inline void CapioFileManager::addThreadAwaitingCreation(const std::string &path, pid_t tid) { START_LOG(gettid(), "call(path=%s, tid=%ld)", path.c_str(), tid); - std::lock_guard lg(creation_mutex); + lockguard_guard(const std::lock_guard lg(creation_mutex)); thread_awaiting_file_creation[path].push_back(tid); } @@ -92,7 +92,7 @@ inline void CapioFileManager::addThreadAwaitingData(const std::string &path, int return; } - std::lock_guard lg(data_mutex); + lockguard_guard(const std::lock_guard lg(data_mutex)); thread_awaiting_data[path].emplace(tid, expected_size); } @@ -172,7 +172,7 @@ inline void CapioFileManager::_unlockThreadAwaitingData( inline void CapioFileManager::increaseCloseCount(const std::filesystem::path &path) { START_LOG(gettid(), "call(path=%s)", path.c_str()); auto metadata_path = getAndCreateMetadataPath(path); - auto lock = new DistributedSemaphore(metadata_path + ".lock", 300); + auto lock = new DistributedSemaphore(metadata_path + ".lock", 300); long long close_count = 0; LOG("Gained mutual exclusive access to token file %s", (metadata_path + ".lock").c_str()); @@ -342,7 +342,7 @@ inline bool CapioFileManager::isCommitted(const std::filesystem::path &path) { */ inline void CapioFileManager::checkFilesAwaitingCreation() { // NOTE: do not put inside here log code as it will generate a lot of useless log - std::lock_guard lg(creation_mutex); + lockguard_guard(const std::lock_guard lg(creation_mutex)); std::vector path_to_delete; for (auto element : thread_awaiting_file_creation) { @@ -367,7 +367,7 @@ inline void CapioFileManager::checkFilesAwaitingCreation() { */ inline void CapioFileManager::checkFileAwaitingData() { // NOTE: do not put inside here log code as it will generate a lot of useless log - std::lock_guard lg(data_mutex); + lockguard_guard(const std::lock_guard lg(data_mutex)); for (auto iter = thread_awaiting_data.begin(); iter != thread_awaiting_data.end();) { START_LOG(gettid(), "\n\ncall()"); // no need to check if file exists as this method is called only by read_handler From 73eda871f7b31ee44db110f0f14b563f8af1472d Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 2 Jul 2025 09:53:36 +0200 Subject: [PATCH 4/4] fixed includes --- README.md | 2 +- tests/multinode/backend/src/main.cpp | 1 + tests/multinode/integration/src/common.hpp | 2 +- tests/unit/MemoryFiles/src/main.cpp | 1 + tests/unit/posix/src/realpath.cpp | 6 +++--- tests/unit/server/src/CapioCacheSPSCQueueTests.hpp | 2 ++ tests/unit/server/src/main.cpp | 4 ++-- tests/unit/syscall/src/main.cpp | 2 ++ 8 files changed, 13 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index ed472d009..4bd88bf4b 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ the [CAPIO-CL Docs](https://capio.hpc4ai.it/docs/coord-language/) for details. To launch your workflow with capio you can follow two routes: -#### A) Use `capiorun` for simplfied operations +#### A) Use `capiorun` for simplified operations You can simplify the execution of workflow steps with CAPIO using the `capiorun` utility. See the [`capiorun` documentation](capiorun/readme.md) for usage and examples. `capiorun` provides an easier way to manage diff --git a/tests/multinode/backend/src/main.cpp b/tests/multinode/backend/src/main.cpp index 85422f163..274f7800b 100644 --- a/tests/multinode/backend/src/main.cpp +++ b/tests/multinode/backend/src/main.cpp @@ -1,3 +1,4 @@ +#include #include #include diff --git a/tests/multinode/integration/src/common.hpp b/tests/multinode/integration/src/common.hpp index 44b5f93ff..868a1e055 100644 --- a/tests/multinode/integration/src/common.hpp +++ b/tests/multinode/integration/src/common.hpp @@ -1,6 +1,6 @@ #ifndef COMMON_HPP #define COMMON_HPP - +#include #include #include diff --git a/tests/unit/MemoryFiles/src/main.cpp b/tests/unit/MemoryFiles/src/main.cpp index 4c51b8cff..03485460f 100644 --- a/tests/unit/MemoryFiles/src/main.cpp +++ b/tests/unit/MemoryFiles/src/main.cpp @@ -1,3 +1,4 @@ +#include #include #include diff --git a/tests/unit/posix/src/realpath.cpp b/tests/unit/posix/src/realpath.cpp index cfa38e8ae..8da761fff 100644 --- a/tests/unit/posix/src/realpath.cpp +++ b/tests/unit/posix/src/realpath.cpp @@ -1,8 +1,8 @@ -#include - +#include #include +#include -#include +#include #include "utils/filesystem.hpp" diff --git a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp index 331aecdd2..4e9b94843 100644 --- a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp +++ b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp @@ -1,6 +1,8 @@ #ifndef CAPIOCACHESPSCQUEUETESTS_HPP #define CAPIOCACHESPSCQUEUETESTS_HPP +#include + #include "../common/capio/response_queue.hpp" #include "../posix/utils/env.hpp" #include "../posix/utils/filesystem.hpp" diff --git a/tests/unit/server/src/main.cpp b/tests/unit/server/src/main.cpp index c94508840..dda929955 100644 --- a/tests/unit/server/src/main.cpp +++ b/tests/unit/server/src/main.cpp @@ -1,6 +1,6 @@ -#include "capio/constants.hpp" +#include +#include #include -#include std::string workflow_name = CAPIO_DEFAULT_WORKFLOW_NAME; diff --git a/tests/unit/syscall/src/main.cpp b/tests/unit/syscall/src/main.cpp index c3d8572e1..9098c72ea 100644 --- a/tests/unit/syscall/src/main.cpp +++ b/tests/unit/syscall/src/main.cpp @@ -1,3 +1,5 @@ +#include + #include "capio/constants.hpp" #include