From 34f0e891cade5fcb4657c5d58e6ae1974c3f2d1d Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 23 Jul 2025 09:57:06 +0200 Subject: [PATCH 1/3] Added CapioRemoteFile - Added CapioRemoteFile class - Refactor and general cleanup --- .../capio-cl-engine/capio_cl_engine.hpp | 12 ++ src/server/capio_server.cpp | 184 +---------------- src/server/client-manager/handlers/create.hpp | 2 +- src/server/client-manager/handlers/open.hpp | 9 +- src/server/file-manager/file_manager_impl.hpp | 2 +- .../CapioFile/CapioRemoteFile.hpp | 59 ++++++ .../storage-service/capio_storage_service.hpp | 19 +- src/server/utils/parser.hpp | 188 ++++++++++++++++++ 8 files changed, 285 insertions(+), 190 deletions(-) create mode 100644 src/server/storage-service/CapioFile/CapioRemoteFile.hpp create mode 100644 src/server/utils/parser.hpp diff --git a/src/server/capio-cl-engine/capio_cl_engine.hpp b/src/server/capio-cl-engine/capio_cl_engine.hpp index 6d77511b9..5b3b829d6 100644 --- a/src/server/capio-cl-engine/capio_cl_engine.hpp +++ b/src/server/capio-cl-engine/capio_cl_engine.hpp @@ -440,6 +440,18 @@ class CapioCLEngine { return files; } + auto get_home_node(const std::string &path) { + // TODO: understand here how to get the home node policy. + START_LOG(gettid(), "call(path=%s)", path.c_str()); + if (const auto location = _locations.find(path); location == _locations.end()) { + LOG("No rule for home node. Returning create home node"); + return node_name; + }else{ + LOG("Found location entry"); + } + return node_name; + } + protected: const auto *getLocations() const { return &_locations; } }; diff --git a/src/server/capio_server.cpp b/src/server/capio_server.cpp index af05d4e9a..1be87502f 100644 --- a/src/server/capio_server.cpp +++ b/src/server/capio_server.cpp @@ -46,189 +46,7 @@ char node_name[HOST_NAME_MAX]; #include "communication-service/MTCL_backend.hpp" -std::string parseCLI(int argc, char **argv) { - Logger *log; - - args::ArgumentParser parser(CAPIO_SERVER_ARG_PARSER_PRE, CAPIO_SERVER_ARG_PARSER_EPILOGUE); - parser.LongSeparator(" "); - parser.LongPrefix("--"); - parser.ShortPrefix("-"); - - args::Group arguments(parser, "Arguments"); - args::HelpFlag help(arguments, "help", "Display this help menu", {'h', "help"}); - args::ValueFlag logfile_src(arguments, "filename", - CAPIO_SERVER_ARG_PARSER_LOGILE_OPT_HELP, {'l', "log"}); - args::ValueFlag logfile_folder( - arguments, "filename", CAPIO_SERVER_ARG_PARSER_LOGILE_DIR_OPT_HELP, {'d', "log-dir"}); - args::ValueFlag config(arguments, "filename", - CAPIO_SERVER_ARG_PARSER_CONFIG_OPT_HELP, {'c', "config"}); - args::Flag noConfigFile(arguments, "no-config", - CAPIO_SERVER_ARG_PARSER_CONFIG_NO_CONF_FILE_HELP, {"no-config"}); - - args::ValueFlag backend( - arguments, "backend", CAPIO_SERVER_ARG_PARSER_BACKEND_OPT_HELP, {'b', "backend"}); - - args::ValueFlag backend_port(arguments, "port", - CAPIO_SERVER_ARG_PARSER_BACKEND_PORT_OPT_HELP, {'p', "port"}); - - args::Flag continueOnErrorFlag(arguments, "continue-on-error", - CAPIO_SERVER_ARG_PARSER_MEM_STORAGE_ONLY_HELP, - {"continue-on-error"}); - - args::Flag memStorageOnly(arguments, "mem-storage-only", - CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP, {"mem-only"}); - - try { - parser.ParseCLI(argc, argv); - } catch (args::Help &) { - std::cout << CAPIO_SERVER_ARG_PARSER_PRE_COMMAND << parser; - exit(EXIT_SUCCESS); - } catch (args::ParseError &e) { - START_LOG(gettid(), "call()"); - std::cerr << e.what() << std::endl; - std::cerr << parser; - ERR_EXIT("%s", e.what()); - } catch (args::ValidationError &e) { - START_LOG(gettid(), "call()"); - std::cerr << e.what() << std::endl; - std::cerr << parser; - ERR_EXIT("%s", e.what()); - } - - if (continueOnErrorFlag) { -#ifdef CAPIO_LOG - continue_on_error = true; - std::cout << CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING << std::endl; -#else - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING - << "--continue-on-error flag given, but logger is not compiled into CAPIO. Flag " - "is ignored." - << std::endl; -#endif - } - - if (memStorageOnly) { - StoreOnlyInMemory = true; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "All files will be stored in memory whenever possible." << std::endl; - } - - if (logfile_folder) { -#ifdef CAPIO_LOG - log_master_dir_name = args::get(logfile_folder); -#else - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING - << "Capio logfile folder, but logging capabilities not compiled into capio!" - << std::endl; -#endif - } - - if (logfile_src) { -#ifdef CAPIO_LOG - // log file was given - std::string token = args::get(logfile_src); - if (token.find(".log") != std::string::npos) { - token.erase(token.length() - 4); // delete .log if for some reason - // is given as parameter - } - logfile_prefix = token; -#else - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING - << "Capio logfile provided, but logging capabilities not compiled into capio!" - << std::endl; -#endif - } -#ifdef CAPIO_LOG - auto logname = open_server_logfile(); - log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file"); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " - << "started logging to logfile " << logname << std::endl; -#endif - - if (config) { - std::string token = args::get(config); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "parsing config file: " << token << std::endl; - // TODO: pass config file path - } else if (noConfigFile) { - workflow_name = std::string_view(get_capio_workflow_name()); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << "skipping config file parsing." << std::endl - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << "Obtained from environment variable current workflow name: " - << workflow_name.data() << std::endl; - - } else { - START_LOG(gettid(), "call()"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " - << "Error: no config file provided. To skip config file use --no-config option!" - << std::endl; - ERR_EXIT("no config file provided, and --no-config not provided"); - } - - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "CAPIO_DIR=" << get_capio_dir().c_str() << std::endl; - -#ifdef CAPIO_LOG - CAPIO_LOG_LEVEL = get_capio_log_level(); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "LOG_LEVEL set to: " << CAPIO_LOG_LEVEL << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING; - log->log("LOG_LEVEL set to: %d", CAPIO_LOG_LEVEL); - delete log; -#else - if (std::getenv("CAPIO_LOG_LEVEL") != nullptr) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE << std::endl; - } -#endif - - if (backend) { - std::string backend_name = args::get(backend); - std::transform(backend_name.begin(), backend_name.end(), backend_name.begin(), ::toupper); - - int port = DEFAULT_CAPIO_BACKEND_PORT; - if (backend_port) { - port = args::get(backend_port); - } - - if (backend_name == "MQTT" || backend_name == "MPI") { - std::cout - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << "Warn: selected backend is not yet officially supported. Setting backend to TCP" - << std::endl; - backend_name = "TCP"; - } - - if (backend_name == "TCP" || backend_name == "UCX") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "Selected backend is: " << backend_name << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "Selected backend port is: " << port << std::endl; - capio_backend = new MTCL_backend(backend_name, std::to_string(port), - CAPIO_BACKEND_DEFAULT_SLEEP_TIME); - } else if (backend_name == "FS") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "Selected backend is File System" << std::endl; - capio_backend = new NoBackend(); - } else { - START_LOG(gettid(), "call()"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " - << "Provided communication backend " << backend_name << " is invalid" - << std::endl; - ERR_EXIT("No valid backend was provided"); - } - } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "Selected backend is File System" << std::endl; - capio_backend = new NoBackend(); - } - - if (config) { - return args::get(config); - } - return ""; -} +#include int main(int argc, char **argv) { diff --git a/src/server/client-manager/handlers/create.hpp b/src/server/client-manager/handlers/create.hpp index 39801ff69..7736f2c1d 100644 --- a/src/server/client-manager/handlers/create.hpp +++ b/src/server/client-manager/handlers/create.hpp @@ -23,7 +23,7 @@ inline void create_handler(const char *const str) { capio_cl_engine->addProducer(path, name); client_manager->register_produced_file(tid, path_str); - storage_service->createFile(path); + storage_service->createMemoryFile(path); } #endif // CAPIO_CREATE_HPP diff --git a/src/server/client-manager/handlers/open.hpp b/src/server/client-manager/handlers/open.hpp index 04aa072af..1fb6bc35a 100644 --- a/src/server/client-manager/handlers/open.hpp +++ b/src/server/client-manager/handlers/open.hpp @@ -17,14 +17,19 @@ inline void open_handler(const char *const str) { if (capio_cl_engine->isProducer(path, tid)) { LOG("Thread is producer. allowing to continue with open"); client_manager->reply_to_client(tid, 1); - storage_service->createFile(path); + storage_service->createMemoryFile(path); return; } if (std::filesystem::exists(path)) { LOG("File already exists! allowing to continue with open"); client_manager->reply_to_client(tid, 1); - storage_service->createFile(path); + + /* + * At this point, the file that needs to be created more likely than not is not local to the + * machine. As such, we call the creation of a new CapioRemoteFile + */ + storage_service->createRemoteFile(path); return; } diff --git a/src/server/file-manager/file_manager_impl.hpp b/src/server/file-manager/file_manager_impl.hpp index 977a414ad..8a37ca1d3 100644 --- a/src/server/file-manager/file_manager_impl.hpp +++ b/src/server/file-manager/file_manager_impl.hpp @@ -68,7 +68,7 @@ inline void CapioFileManager::_unlockThreadAwaitingCreation(const std::string &p START_LOG(gettid(), "call(path=%s)", path.c_str()); for (const auto tid : pids) { client_manager->reply_to_client(tid, 1); - storage_service->createFile(path); + storage_service->createMemoryFile(path); } } diff --git a/src/server/storage-service/CapioFile/CapioRemoteFile.hpp b/src/server/storage-service/CapioFile/CapioRemoteFile.hpp new file mode 100644 index 000000000..2e7966d64 --- /dev/null +++ b/src/server/storage-service/CapioFile/CapioRemoteFile.hpp @@ -0,0 +1,59 @@ +#ifndef CAPIOREMOTEFILE_HPP +#define CAPIOREMOTEFILE_HPP + +#include + +#include "CapioFile.hpp" + +class CapioRemoteFile : public CapioFile { + + public: + explicit CapioRemoteFile(const std::string &filePath) : CapioFile(filePath) {} + + ~CapioRemoteFile() override {} + + /** + * Write data to a file stored inside the memory + * @param buffer buffer to store inside memory (i.e. content of the file) + * @param file_offset offset internal to the file + * @param buffer_length Size of the buffer. + */ + std::size_t writeData(const char *buffer, const std::size_t file_offset, + std::size_t buffer_length) override { + return 0; + } + + /** + * Read from Capio File + * @param buffer Buffer to read to + * @param file_offset Starting offset of read operation from CapioMemFile + * @param buffer_size Length of buffer + * @return number of bytes read from CapioMemoryFile + */ + std::size_t readData(char *buffer, std::size_t file_offset, std::size_t buffer_size) { + return 0; + } + + /** + * Store data inside the CapioMemoryFile by reading it from a SPSCQueue object. Behaves just + * like the writeData method + * @param queue + * @param offset + * @param length + */ + void readFromQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) override {} + + /** + * Write the content of the capioFile to a SPSCQueue object + * @param queue + * @param offset + * @param length + * @return + */ + std::size_t writeToQueue(SPSCQueue &queue, std::size_t offset, + std::size_t length) const override { + return 0; + } +}; + +#endif // CAPIOMEMORYFILE_HPP diff --git a/src/server/storage-service/capio_storage_service.hpp b/src/server/storage-service/capio_storage_service.hpp index 88d8f1385..54f4187b3 100644 --- a/src/server/storage-service/capio_storage_service.hpp +++ b/src/server/storage-service/capio_storage_service.hpp @@ -4,6 +4,7 @@ #include "../../posix/utils/env.hpp" #include "CapioFile/CapioFile.hpp" #include "CapioFile/CapioMemoryFile.hpp" +#include "CapioFile/CapioRemoteFile.hpp" class CapioStorageService { @@ -23,7 +24,7 @@ class CapioStorageService { */ [[nodiscard]] auto getFile(const std::string &file_name) const { if (_stored_files->find(file_name) == _stored_files->end()) { - createFile(file_name); + createMemoryFile(file_name); } return _stored_files->at(file_name); } @@ -49,11 +50,23 @@ class CapioStorageService { delete _threads_waiting_for_memory_data; } - void createFile(const std::string &file_name) const { - // TODO: understand when is local or remte CapioFile + void createMemoryFile(const std::string &file_name) const { _stored_files->emplace(file_name, new CapioMemoryFile(file_name)); } + void createRemoteFile(const std::string &file_name) const { + /* + * First we check that the file associate does not yet exists, as it might be produced + * by another app running under the same server instance. if it is not found, we create + * the file + */ + START_LOG(gettid(), "call(file_name=%s)", file_name.c_str()); + if (_stored_files->find(file_name) == _stored_files->end()) { + LOG("File not found. Creating a new remote file"); + _stored_files->emplace(file_name, new CapioRemoteFile(file_name)); + } + } + void deleteFile(const std::string &file_name) const { _stored_files->erase(file_name); } /** diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp new file mode 100644 index 000000000..97084742d --- /dev/null +++ b/src/server/utils/parser.hpp @@ -0,0 +1,188 @@ +#ifndef PARSER_HPP +#define PARSER_HPP + +std::string parseCLI(int argc, char **argv) { + Logger *log; + + args::ArgumentParser parser(CAPIO_SERVER_ARG_PARSER_PRE, CAPIO_SERVER_ARG_PARSER_EPILOGUE); + parser.LongSeparator(" "); + parser.LongPrefix("--"); + parser.ShortPrefix("-"); + + args::Group arguments(parser, "Arguments"); + args::HelpFlag help(arguments, "help", "Display this help menu", {'h', "help"}); + args::ValueFlag logfile_src(arguments, "filename", + CAPIO_SERVER_ARG_PARSER_LOGILE_OPT_HELP, {'l', "log"}); + args::ValueFlag logfile_folder( + arguments, "filename", CAPIO_SERVER_ARG_PARSER_LOGILE_DIR_OPT_HELP, {'d', "log-dir"}); + args::ValueFlag config(arguments, "filename", + CAPIO_SERVER_ARG_PARSER_CONFIG_OPT_HELP, {'c', "config"}); + args::Flag noConfigFile(arguments, "no-config", + CAPIO_SERVER_ARG_PARSER_CONFIG_NO_CONF_FILE_HELP, {"no-config"}); + + args::ValueFlag backend( + arguments, "backend", CAPIO_SERVER_ARG_PARSER_BACKEND_OPT_HELP, {'b', "backend"}); + + args::ValueFlag backend_port(arguments, "port", + CAPIO_SERVER_ARG_PARSER_BACKEND_PORT_OPT_HELP, {'p', "port"}); + + args::Flag continueOnErrorFlag(arguments, "continue-on-error", + CAPIO_SERVER_ARG_PARSER_MEM_STORAGE_ONLY_HELP, + {"continue-on-error"}); + + args::Flag memStorageOnly(arguments, "mem-storage-only", + CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP, {"mem-only"}); + + try { + parser.ParseCLI(argc, argv); + } catch (args::Help &) { + std::cout << CAPIO_SERVER_ARG_PARSER_PRE_COMMAND << parser; + exit(EXIT_SUCCESS); + } catch (args::ParseError &e) { + START_LOG(gettid(), "call()"); + std::cerr << e.what() << std::endl; + std::cerr << parser; + ERR_EXIT("%s", e.what()); + } catch (args::ValidationError &e) { + START_LOG(gettid(), "call()"); + std::cerr << e.what() << std::endl; + std::cerr << parser; + ERR_EXIT("%s", e.what()); + } + + if (continueOnErrorFlag) { +#ifdef CAPIO_LOG + continue_on_error = true; + std::cout << CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING << std::endl; +#else + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING + << "--continue-on-error flag given, but logger is not compiled into CAPIO. Flag " + "is ignored." + << std::endl; +#endif + } + + if (memStorageOnly) { + StoreOnlyInMemory = true; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "All files will be stored in memory whenever possible." << std::endl; + } + + if (logfile_folder) { +#ifdef CAPIO_LOG + log_master_dir_name = args::get(logfile_folder); +#else + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING + << "Capio logfile folder, but logging capabilities not compiled into capio!" + << std::endl; +#endif + } + + if (logfile_src) { +#ifdef CAPIO_LOG + // log file was given + std::string token = args::get(logfile_src); + if (token.find(".log") != std::string::npos) { + token.erase(token.length() - 4); // delete .log if for some reason + // is given as parameter + } + logfile_prefix = token; +#else + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING + << "Capio logfile provided, but logging capabilities not compiled into capio!" + << std::endl; +#endif + } +#ifdef CAPIO_LOG + auto logname = open_server_logfile(); + log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file"); + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + << "started logging to logfile " << logname << std::endl; +#endif + + if (config) { + std::string token = args::get(config); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "parsing config file: " << token << std::endl; + // TODO: pass config file path + } else if (noConfigFile) { + workflow_name = std::string_view(get_capio_workflow_name()); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << "skipping config file parsing." << std::endl + << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << "Obtained from environment variable current workflow name: " + << workflow_name.data() << std::endl; + + } else { + START_LOG(gettid(), "call()"); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + << "Error: no config file provided. To skip config file use --no-config option!" + << std::endl; + ERR_EXIT("no config file provided, and --no-config not provided"); + } + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "CAPIO_DIR=" << get_capio_dir().c_str() << std::endl; + +#ifdef CAPIO_LOG + CAPIO_LOG_LEVEL = get_capio_log_level(); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "LOG_LEVEL set to: " << CAPIO_LOG_LEVEL << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING; + log->log("LOG_LEVEL set to: %d", CAPIO_LOG_LEVEL); + delete log; +#else + if (std::getenv("CAPIO_LOG_LEVEL") != nullptr) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE << std::endl; + } +#endif + + if (backend) { + std::string backend_name = args::get(backend); + std::transform(backend_name.begin(), backend_name.end(), backend_name.begin(), ::toupper); + + int port = DEFAULT_CAPIO_BACKEND_PORT; + if (backend_port) { + port = args::get(backend_port); + } + + if (backend_name == "MQTT" || backend_name == "MPI") { + std::cout + << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << "Warn: selected backend is not yet officially supported. Setting backend to TCP" + << std::endl; + backend_name = "TCP"; + } + + if (backend_name == "TCP" || backend_name == "UCX") { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "Selected backend is: " << backend_name << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "Selected backend port is: " << port << std::endl; + capio_backend = new MTCL_backend(backend_name, std::to_string(port), + CAPIO_BACKEND_DEFAULT_SLEEP_TIME); + } else if (backend_name == "FS") { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "Selected backend is File System" << std::endl; + capio_backend = new NoBackend(); + } else { + START_LOG(gettid(), "call()"); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + << "Provided communication backend " << backend_name << " is invalid" + << std::endl; + ERR_EXIT("No valid backend was provided"); + } + } else { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "Selected backend is File System" << std::endl; + capio_backend = new NoBackend(); + } + + if (config) { + return args::get(config); + } + return ""; +} + +#endif // PARSER_HPP From bd2a1a0fa035f1baa005185368949a18b4c96f9d Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 23 Jul 2025 10:06:32 +0200 Subject: [PATCH 2/3] Fixes --- src/server/capio-cl-engine/capio_cl_engine.hpp | 2 +- src/server/file-manager/file_manager_impl.hpp | 6 +++++- src/server/storage-service/capio_storage_service.hpp | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/server/capio-cl-engine/capio_cl_engine.hpp b/src/server/capio-cl-engine/capio_cl_engine.hpp index 5b3b829d6..2f56bc5de 100644 --- a/src/server/capio-cl-engine/capio_cl_engine.hpp +++ b/src/server/capio-cl-engine/capio_cl_engine.hpp @@ -446,7 +446,7 @@ class CapioCLEngine { if (const auto location = _locations.find(path); location == _locations.end()) { LOG("No rule for home node. Returning create home node"); return node_name; - }else{ + } else { LOG("Found location entry"); } return node_name; diff --git a/src/server/file-manager/file_manager_impl.hpp b/src/server/file-manager/file_manager_impl.hpp index 8a37ca1d3..dc8b4ca0d 100644 --- a/src/server/file-manager/file_manager_impl.hpp +++ b/src/server/file-manager/file_manager_impl.hpp @@ -68,7 +68,11 @@ inline void CapioFileManager::_unlockThreadAwaitingCreation(const std::string &p START_LOG(gettid(), "call(path=%s)", path.c_str()); for (const auto tid : pids) { client_manager->reply_to_client(tid, 1); - storage_service->createMemoryFile(path); + /* + * Here we need to create a new remote file, as it might be that the file is not + * produced by this node but by another remote one + */ + storage_service->createRemoteFile(path); } } diff --git a/src/server/storage-service/capio_storage_service.hpp b/src/server/storage-service/capio_storage_service.hpp index 54f4187b3..27b0b672d 100644 --- a/src/server/storage-service/capio_storage_service.hpp +++ b/src/server/storage-service/capio_storage_service.hpp @@ -54,6 +54,11 @@ class CapioStorageService { _stored_files->emplace(file_name, new CapioMemoryFile(file_name)); } + /** + * Create a CapioRemoteFile, after checking that an instance of CapioMemoryFile (meaning a local + * file) is not present + * @param file_name file path + */ void createRemoteFile(const std::string &file_name) const { /* * First we check that the file associate does not yet exists, as it might be produced From 909a2aae6dd30e662ef5d8b4e689dae8a7cce023 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 23 Jul 2025 11:11:20 +0200 Subject: [PATCH 3/3] Added CapioCommunicationService Added new class to abstract over CAPIO data plane and CAPIO control plane. Refactored code independent of the data plane backend --- src/server/capio_server.cpp | 4 +- .../BackendInterface.hpp | 7 + .../CapioCommunicationService.hpp | 136 ++++++++++++++++++ .../communication-service/MTCL_backend.hpp | 123 ++++++---------- src/server/utils/parser.hpp | 26 +--- src/server/utils/signals.hpp | 3 +- tests/multinode/backend/src/MTCL.hpp | 17 +-- 7 files changed, 198 insertions(+), 118 deletions(-) create mode 100644 src/server/communication-service/CapioCommunicationService.hpp diff --git a/src/server/capio_server.cpp b/src/server/capio_server.cpp index 1be87502f..835902552 100644 --- a/src/server/capio_server.cpp +++ b/src/server/capio_server.cpp @@ -42,9 +42,7 @@ char node_name[HOST_NAME_MAX]; #include "file-manager/file_manager.hpp" -#include "communication-service/BackendInterface.hpp" - -#include "communication-service/MTCL_backend.hpp" +#include "communication-service/CapioCommunicationService.hpp" #include diff --git a/src/server/communication-service/BackendInterface.hpp b/src/server/communication-service/BackendInterface.hpp index cc37bf147..e54089274 100644 --- a/src/server/communication-service/BackendInterface.hpp +++ b/src/server/communication-service/BackendInterface.hpp @@ -15,6 +15,11 @@ class NotImplementedBackendMethod : public std::exception { class BackendInterface { public: virtual ~BackendInterface() = default; + + virtual void connect_to(std::string remoteHost, const std::string &remotePort) { + throw NotImplementedBackendMethod(); + }; + /** * @brief Send data to target * @@ -54,6 +59,8 @@ class BackendInterface { */ class NoBackend final : public BackendInterface { public: + void connect_to(std::string remoteHost, const std::string &remotePort) override { return; }; + void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath, capio_off64_t start_offset) override { return; diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp new file mode 100644 index 000000000..9e24f9c04 --- /dev/null +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -0,0 +1,136 @@ +#ifndef CAPIOCOMMUNICATIONSERVICE_HPP +#define CAPIOCOMMUNICATIONSERVICE_HPP + +#include "BackendInterface.hpp" +#include "MTCL_backend.hpp" + +class CapioCommunicationService { + + char ownHostname[HOST_NAME_MAX] = {0}; + bool *continue_execution = new bool; + std::thread *thread_server_finder_fs; + + void generate_aliveness_token(const int port) const { + START_LOG(gettid(), "call(port=d)", port); + + std::string token_filename(ownHostname); + token_filename += ".alive_connection"; + + LOG("Creating alive token %s", token_filename.c_str()); + + std::ofstream FilePort(token_filename); + FilePort << port; + FilePort.close(); + + LOG("Saved self token info to FS"); + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + << "Generated token at " << token_filename << std::endl; + } + + void delete_aliveness_token() const { + START_LOG(gettid(), "call()"); + + std::string token_filename(ownHostname); + token_filename += ".alive_connection"; + + LOG("Removing alive token %s", token_filename.c_str()); + std::filesystem::remove(token_filename); + LOG("Removed token"); + } + + /* + * Monitor the file system for the presence of tokens. + */ + static void find_new_server_from_fs_token_thread(const bool *continue_execution) { + START_LOG(gettid(), "call()"); + + std::vector handled_tokens; + + if (!continue_execution) { + LOG("Terminating execution"); + return; + } + + auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path()); + for (const auto &entry : dir_iterator) { + const auto &token_path = entry.path(); + + if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") { + LOG("Filename %s is not valid", entry.path().c_str()); + continue; + } + + if (std::find(handled_tokens.begin(), handled_tokens.end(), entry.path()) != + handled_tokens.end()) { + LOG("Token already handled... skipping it!"); + continue; + }; + + LOG("Found token %s", token_path.c_str()); + // TODO: as of now we will not connect with servers + // TODO: that terminates and then comes back up online... + handled_tokens.push_back(entry.path()); + + std::ifstream MyReadFile(token_path.filename()); + std::string remoteHost = entry.path().stem(), remotePort; + LOG("Testing for file: %s (hostname: %s, port=%s)", entry.path().filename().c_str(), + remoteHost.c_str(), remotePort.c_str()); + + getline(MyReadFile, remotePort); + MyReadFile.close(); + + capio_backend->connect_to(remoteHost, remotePort); + } + LOG("Terminated loop. sleeping one second"); + sleep(1); + } + + public: + ~CapioCommunicationService() { + *continue_execution = false; + thread_server_finder_fs->join(); + delete_aliveness_token(); + delete capio_backend; + }; + + CapioCommunicationService(std::string &backend_name, const int port) { + START_LOG(gettid(), "call(backend_name=%s)", backend_name.c_str()); + *continue_execution = true; + gethostname(ownHostname, HOST_NAME_MAX); + LOG("My hostname is %s. Starting to listen on connection", ownHostname); + + if (backend_name == "MQTT" || backend_name == "MPI") { + std::cout + << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Warn: selected backend is not yet officially supported. Setting backend to TCP" + << std::endl; + backend_name = "TCP"; + } + + if (backend_name == "TCP" || backend_name == "UCX") { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Selected backend is: " << backend_name << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Selected backend port is: " << port << std::endl; + capio_backend = new MTCL_backend(backend_name, std::to_string(port), + CAPIO_BACKEND_DEFAULT_SLEEP_TIME); + } else if (backend_name == "FS") { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Selected backend is File System" << std::endl; + capio_backend = new NoBackend(); + } else { + START_LOG(gettid(), "call()"); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "Provided communication backend " << backend_name << " is invalid" + << std::endl; + ERR_EXIT("No valid backend was provided"); + } + generate_aliveness_token(port); + thread_server_finder_fs = + new std::thread(find_new_server_from_fs_token_thread, std::ref(continue_execution)); + } +}; + +inline CapioCommunicationService *capio_communication_service; + +#endif // CAPIOCOMMUNICATIONSERVICE_HPP diff --git a/src/server/communication-service/MTCL_backend.hpp b/src/server/communication-service/MTCL_backend.hpp index b17ab2224..37f6e7d07 100644 --- a/src/server/communication-service/MTCL_backend.hpp +++ b/src/server/communication-service/MTCL_backend.hpp @@ -34,7 +34,7 @@ class MTCL_backend : public BackendInterface { typedef std::tuple *, std::queue *, std::mutex *> TransportUnitInterface; std::unordered_map connected_hostnames_map; - std::string selfToken, connectedHostname, ownPort; + std::string selfToken, connectedHostname, ownPort, usedProtocol; char ownHostname[HOST_NAME_MAX] = {0}; int thread_sleep_times = 0; bool *continue_execution = new bool; @@ -151,7 +151,11 @@ class MTCL_backend : public BackendInterface { const bool *continue_execution, int sleep_time, std::unordered_map *open_connections, std::mutex *guard, std::vector *_connection_threads, bool *terminate) { - START_LOG(gettid(), "call(sleep_time=%d)", sleep_time); + + char ownHostname[HOST_NAME_MAX] = {0}; + gethostname(ownHostname, HOST_NAME_MAX); + + START_LOG(gettid(), "call(sleep_time=%d, hostname=%s)", sleep_time, ownHostname); while (*continue_execution) { auto UserManager = MTCL::Manager::getNext(std::chrono::microseconds(sleep_time)); @@ -163,7 +167,7 @@ class MTCL_backend : public BackendInterface { char connected_hostname[HOST_NAME_MAX] = {0}; UserManager.receive(connected_hostname, HOST_NAME_MAX); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " << "Connected to " << connected_hostname << std::endl; LOG("Received connection hostname: %s", connected_hostname); @@ -181,35 +185,45 @@ class MTCL_backend : public BackendInterface { } } - void generate_aliveness_token(const std::string &port) const { - START_LOG(gettid(), "call(port=%s)", port.c_str()); - - std::string token_filename(ownHostname); - token_filename += ".alive_connection"; - - LOG("Creating alive token %s", token_filename.c_str()); - - std::ofstream FilePort(token_filename); - FilePort << port; - FilePort.close(); - - LOG("Saved self token info to FS"); - } + public: + void connect_to(std::string remoteHost, const std::string &remotePort) override { + START_LOG(gettid(), "call( remoteHost=%s, remotePort=%s)", remoteHost.c_str(), + remotePort.c_str()); - void delete_aliveness_token(const std::string &port) const { - START_LOG(gettid(), "call(port=%s)", port.c_str()); + const std::string remoteToken = usedProtocol + ":" + remoteHost + ":" + remotePort; - std::string token_filename(ownHostname); - token_filename += ".alive_connection"; + if (remoteToken == selfToken || + remoteToken == usedProtocol + ":" + ownHostname + ":" + remotePort) { + LOG("Skipping to connect to self"); + return; + } - LOG("Removing alive token %s", token_filename.c_str()); - std::filesystem::remove(token_filename); - LOG("Removed token"); + LOG("Trying to connect on remote: %s", remoteToken.c_str()); + if (auto UserManager = MTCL::Manager::connect(remoteToken); UserManager.isValid()) { + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + << "Connected to " << remoteToken << std::endl; + LOG("Connected to: %s", remoteToken.c_str()); + UserManager.send(ownHostname, HOST_NAME_MAX); + const std::lock_guard lg(*_guard); + + auto connection_tuple = + std::make_tuple(new std::queue(), + new std::queue(), new std::mutex()); + connected_hostnames_map.insert({remoteHost, connection_tuple}); + + connection_threads.push_back(new std::thread( + server_connection_handler, std::move(UserManager), remoteHost.c_str(), + thread_sleep_times, connection_tuple, terminate)); + } else { + std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << ownHostname << " ] " + << "Warning: found token " << remoteHost << ".alive_token" + << ", but connection is not valid" << std::endl; + } } - public: explicit MTCL_backend(const std::string &proto, const std::string &port, int sleep_time) - : selfToken(proto + ":0.0.0.0:" + port), ownPort(port), thread_sleep_times(sleep_time) { + : selfToken(proto + ":0.0.0.0:" + port), ownPort(port), usedProtocol(proto), + thread_sleep_times(sleep_time) { START_LOG(gettid(), "INFO: instance of CapioCommunicationService"); terminate = new bool; @@ -225,64 +239,13 @@ class MTCL_backend : public BackendInterface { hostname_id += ownHostname; MTCL::Manager::init(hostname_id); - generate_aliveness_token(ownPort); - - auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path()); - for (const auto &entry : dir_iterator) { - const auto &token_path = entry.path(); - - if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") { - LOG("Filename %s is not valid", entry.path().c_str()); - continue; - } - LOG("Found token %s", token_path.c_str()); - - std::ifstream MyReadFile(token_path.filename()); - std::string remoteHost = entry.path().stem(), remotePort; - LOG("Testing for file: %s (token: %s, tryHostName=%s)", entry.path().filename().c_str(), - selfToken.c_str(), remoteHost.c_str()); - - getline(MyReadFile, remotePort); - MyReadFile.close(); - - std::string remoteToken = proto + ":" + remoteHost + ":" + ownPort; - - if (remoteToken == selfToken || remoteToken == proto + ":" + ownHostname + ":" + port) { - LOG("Skipping to connect to self"); - continue; - } - - LOG("Trying to connect on remote: %s", remoteToken.c_str()); - - if (auto UserManager = MTCL::Manager::connect(remoteToken); UserManager.isValid()) { - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " - << "Connected to " << remoteToken << std::endl; - LOG("Connected to: %s", remoteToken.c_str()); - UserManager.send(ownHostname, HOST_NAME_MAX); - const std::lock_guard lg(*_guard); - - auto connection_tuple = - std::make_tuple(new std::queue(), - new std::queue(), new std::mutex()); - connected_hostnames_map.insert({remoteHost, connection_tuple}); - - connection_threads.push_back( - new std::thread(server_connection_handler, std::move(UserManager), - remoteHost.c_str(), sleep_time, connection_tuple, terminate)); - } else { - std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << node_name << " ] " - << "Warning: found token " << token_path.filename() - << ", but connection is not valid" << std::endl; - } - } - *continue_execution = true; MTCL::Manager::listen(selfToken); th = new std::thread(incoming_connection_listener, std::ref(continue_execution), sleep_time, &connected_hostnames_map, _guard, &connection_threads, terminate); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " << "CapioCommunicationService initialization completed." << std::endl; } @@ -304,11 +267,9 @@ class MTCL_backend : public BackendInterface { LOG("Handler closed."); - delete_aliveness_token(ownPort); - MTCL::Manager::finalize(); LOG("Finalizing MTCL backend"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " << "MTCL backend correctly terminated" << std::endl; } diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index 97084742d..9bf6af5f8 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -147,32 +147,8 @@ std::string parseCLI(int argc, char **argv) { port = args::get(backend_port); } - if (backend_name == "MQTT" || backend_name == "MPI") { - std::cout - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << "Warn: selected backend is not yet officially supported. Setting backend to TCP" - << std::endl; - backend_name = "TCP"; - } + capio_communication_service = new CapioCommunicationService(backend_name, port); - if (backend_name == "TCP" || backend_name == "UCX") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "Selected backend is: " << backend_name << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "Selected backend port is: " << port << std::endl; - capio_backend = new MTCL_backend(backend_name, std::to_string(port), - CAPIO_BACKEND_DEFAULT_SLEEP_TIME); - } else if (backend_name == "FS") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "Selected backend is File System" << std::endl; - capio_backend = new NoBackend(); - } else { - START_LOG(gettid(), "call()"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " - << "Provided communication backend " << backend_name << " is invalid" - << std::endl; - ERR_EXIT("No valid backend was provided"); - } } else { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " << "Selected backend is File System" << std::endl; diff --git a/src/server/utils/signals.hpp b/src/server/utils/signals.hpp index 7398c7521..857bdbfb5 100644 --- a/src/server/utils/signals.hpp +++ b/src/server/utils/signals.hpp @@ -2,6 +2,7 @@ #define CAPIO_SERVER_HANDLERS_SIGNALS_HPP #include "communication-service/BackendInterface.hpp" +#include "communication-service/CapioCommunicationService.hpp" #include @@ -38,7 +39,7 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { delete request_handlers_engine; delete fs_monitor; - delete capio_backend; + delete capio_communication_service; delete shm_canary; std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " << "Bye!" << std::endl; diff --git a/tests/multinode/backend/src/MTCL.hpp b/tests/multinode/backend/src/MTCL.hpp index ee5fde92d..e8ed52588 100644 --- a/tests/multinode/backend/src/MTCL.hpp +++ b/tests/multinode/backend/src/MTCL.hpp @@ -2,8 +2,7 @@ #define TEST_CAPIOCOMMUNICATIONSERVICE_HPP #include -#include -#include +#include #include #include @@ -13,13 +12,15 @@ constexpr capio_off64_t BUFFER_SIZES = 1024; TEST(CapioCommServiceTest, TestPingPong) { START_LOG(gettid(), "INFO: TestPingPong"); gethostname(node_name.data(), HOST_NAME_MAX); - MTCL_backend backend("TCP", "1234", 300); + const int port = 1234; + std::string proto = "TCP"; + CapioCommunicationService communication_service(proto, port); capio_off64_t size_revc, offset; std::vector connections; do { - connections = backend.get_open_connections(); + connections = capio_backend->get_open_connections(); std::this_thread::sleep_for(std::chrono::milliseconds(300)); } while (connections.empty()); @@ -31,10 +32,10 @@ TEST(CapioCommServiceTest, TestPingPong) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sending ping to: " << i << std::endl; char buff[BUFFER_SIZES]{0}, buff1[BUFFER_SIZES]{0}; memcpy(buff, TEST_MESSAGE, strlen(TEST_MESSAGE)); - backend.send(i, buff, BUFFER_SIZES, "./test", 0); + capio_backend->send(i, buff, BUFFER_SIZES, "./test", 0); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "sent ping to: " << i << ". Waiting for response" << std::endl; - backend.receive(buff1, &size_revc, &offset); + capio_backend->receive(buff1, &size_revc, &offset); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping response from : " << i << std::endl; EXPECT_EQ(strcmp(buff, buff1), 0); @@ -43,10 +44,10 @@ TEST(CapioCommServiceTest, TestPingPong) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Listening for ping from: " << i << std::endl; char recvBuff[BUFFER_SIZES]; - backend.receive(recvBuff, &size_revc, &offset); + capio_backend->receive(recvBuff, &size_revc, &offset); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping from: " << i << std::endl; EXPECT_EQ(strcmp(recvBuff, TEST_MESSAGE), 0); - backend.send(i, recvBuff, size_revc, "./test", 0); + capio_backend->send(i, recvBuff, size_revc, "./test", 0); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sent ping response to: " << i << std::endl; return; }