From 967261702aaf2f90fd23fc386f6eb1be6193f192 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 23 Jul 2025 13:51:42 +0200 Subject: [PATCH 01/11] bugfix for std::find due to missing import --- .../communication-service/CapioCommunicationService.hpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index 1478e2897..253842316 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -4,6 +4,7 @@ #include "BackendInterface.hpp" #include "MTCL_backend.hpp" +#include #include class CapioCommunicationService { @@ -46,7 +47,7 @@ class CapioCommunicationService { static void find_new_server_from_fs_token_thread(const bool *continue_execution) { START_LOG(gettid(), "call()"); - std::vector handled_tokens; + std::vector handled_tokens; if (!continue_execution) { LOG("Terminating execution"); @@ -55,14 +56,14 @@ class CapioCommunicationService { auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path()); for (const auto &entry : dir_iterator) { - const auto &token_path = entry.path(); + const auto token_path = entry.path(); if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") { LOG("Filename %s is not valid", entry.path().c_str()); continue; } - if (std::find(handled_tokens.begin(), handled_tokens.end(), entry.path()) != + if (std::find(handled_tokens.begin(), handled_tokens.end(), token_path) != handled_tokens.end()) { LOG("Token already handled... skipping it!"); continue; From c89d517da2e91a0719202e38baf77e46eb057363 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 23 Jul 2025 16:24:19 +0200 Subject: [PATCH 02/11] Work in progress: server discovery over multicast Added support for server multicast connections and discovery in parallel with file system aliveness tokens --- src/common/capio/constants.hpp | 7 +- .../BackendInterface.hpp | 9 +- .../CapioCommunicationService.hpp | 183 ++++++++++++++++-- .../communication-service/MTCL_backend.hpp | 21 +- tests/multinode/backend/docker-compose.yml | 25 ++- .../multinode/integration/docker-compose.yml | 34 ++-- 6 files changed, 230 insertions(+), 49 deletions(-) diff --git a/src/common/capio/constants.hpp b/src/common/capio/constants.hpp index d2f43044e..05131e832 100644 --- a/src/common/capio/constants.hpp +++ b/src/common/capio/constants.hpp @@ -181,5 +181,8 @@ constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_BACKEND_HELP[] = constexpr int DEFAULT_CAPIO_BACKEND_PORT = 2222; constexpr int CAPIO_BACKEND_DEFAULT_SLEEP_TIME = 300; - -#endif // CAPIO_COMMON_CONSTANTS_HPP +constexpr char MULTICAST_DISCOVERY_ADDR[] = "234.234.234.1"; +constexpr int MULTICAST_DISCOVERY_PORT = 2223; +constexpr int MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE = + HOST_NAME_MAX + 10; // hostname + : + sizeof(port) +#endif // CAPIO_COMMON_CONSTANTS_HPP diff --git a/src/server/communication-service/BackendInterface.hpp b/src/server/communication-service/BackendInterface.hpp index e54089274..20a001f5a 100644 --- a/src/server/communication-service/BackendInterface.hpp +++ b/src/server/communication-service/BackendInterface.hpp @@ -16,9 +16,10 @@ class BackendInterface { public: virtual ~BackendInterface() = default; - virtual void connect_to(std::string remoteHost, const std::string &remotePort) { - throw NotImplementedBackendMethod(); - }; + /** + * @param hostname_port who to connect to in the form of hostname:port + */ + virtual void connect_to(std::string hostname_port) { throw NotImplementedBackendMethod(); }; /** * @brief Send data to target @@ -59,7 +60,7 @@ class BackendInterface { */ class NoBackend final : public BackendInterface { public: - void connect_to(std::string remoteHost, const std::string &remotePort) override { return; }; + void connect_to(std::string hostname_port) override { return; }; void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath, capio_off64_t start_offset) override { diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index 253842316..0293b2cc3 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -4,14 +4,23 @@ #include "BackendInterface.hpp" #include "MTCL_backend.hpp" -#include #include +#include + +#include +#include +#include +#include +#include class CapioCommunicationService { char ownHostname[HOST_NAME_MAX] = {0}; bool *continue_execution = new bool; - std::thread *thread_server_finder_fs; + std::thread *thread_server_finder_fs, *thread_server_finder_multicast; + + std::vector token_used_to_connect; + std::mutex *token_used_to_connect_mutex; void generate_aliveness_token(const int port) const { START_LOG(gettid(), "call(port=d)", port); @@ -35,20 +44,147 @@ class CapioCommunicationService { std::string token_filename(ownHostname); token_filename += ".alive_connection"; + if (!std::filesystem::exists(token_filename)) { + LOG("Token does not exists. Skipping delettion"); + return; + } LOG("Removing alive token %s", token_filename.c_str()); std::filesystem::remove(token_filename); LOG("Removed token"); } + static void send_multicast_alive_token(int backend_port) { + char hostname[HOST_NAME_MAX]; + gethostname(hostname, HOST_NAME_MAX); + + int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (transmission_socket < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " + << "WARNING: unable to bind multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << hostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); + addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); + + char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; + sprintf(message, "%s:%d", hostname, backend_port); + + if (sendto(transmission_socket, message, strlen(message), 0, + reinterpret_cast(&addr), sizeof(addr)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " + << "WARNING: unable to send alive token(" << message + << ") to multicast address!: " << strerror(errno) << std::endl; + } + close(transmission_socket); + } + + static void find_new_server_from_multicast_thread( + const bool *continue_execution, std::vector *token_used_to_connect, + std::mutex *token_used_to_connect_mutex, int backend_port) { + char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; + char ownHostname[HOST_NAME_MAX]; + gethostname(ownHostname, HOST_NAME_MAX); + + int discovery_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (discovery_socket < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to open multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + u_int multiple_socket_on_same_address = 1; + if (setsockopt(discovery_socket, SOL_SOCKET, SO_REUSEADDR, + (char *) &multiple_socket_on_same_address, + sizeof(multiple_socket_on_same_address)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to assign multiple sockets to same address: " + << strerror(errno) << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + struct sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); + socklen_t addrlen = sizeof(addr); + + // bind to receive address + if (bind(discovery_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to bind multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to join multicast group: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + while (*continue_execution) { + bzero(incomingMessage, sizeof(incomingMessage)); + send_multicast_alive_token(backend_port); + if (recvfrom(discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, 0, + reinterpret_cast(&addr), &addrlen) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: recvied < 0 bytes from multicast: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + if (std::string(incomingMessage) == + std::string(ownHostname) + ":" + std::to_string(backend_port)) { + // skip myself + continue; + } + + std::lock_guard lg(*token_used_to_connect_mutex); + if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), + incomingMessage) == token_used_to_connect->end()) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Connecting to " << incomingMessage << " from multicast advert." + << std::endl; + token_used_to_connect->push_back(incomingMessage); + capio_backend->connect_to(incomingMessage); + } + + sleep(1); + } + } + /* * Monitor the file system for the presence of tokens. */ - static void find_new_server_from_fs_token_thread(const bool *continue_execution) { + static void + find_new_server_from_fs_token_thread(const bool *continue_execution, + std::vector *token_used_to_connect, + std::mutex *token_used_to_connect_mutex) { START_LOG(gettid(), "call()"); - std::vector handled_tokens; - if (!continue_execution) { LOG("Terminating execution"); return; @@ -63,16 +199,7 @@ class CapioCommunicationService { continue; } - if (std::find(handled_tokens.begin(), handled_tokens.end(), token_path) != - handled_tokens.end()) { - LOG("Token already handled... skipping it!"); - continue; - }; - LOG("Found token %s", token_path.c_str()); - // TODO: as of now we will not connect with servers - // TODO: that terminates and then comes back up online... - handled_tokens.push_back(entry.path()); std::ifstream MyReadFile(token_path.filename()); std::string remoteHost = entry.path().stem(), remotePort; @@ -82,7 +209,18 @@ class CapioCommunicationService { getline(MyReadFile, remotePort); MyReadFile.close(); - capio_backend->connect_to(remoteHost, remotePort); + const auto hostname_port = std::string(remoteHost) + ":" + remotePort; + std::lock_guard lock(*token_used_to_connect_mutex); + if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), + hostname_port) != token_used_to_connect->end()) { + LOG("Token already handled... skipping it!"); + continue; + }; + + // TODO: as of now we will not connect with servers + // TODO: that terminates and then comes back up online... + token_used_to_connect->push_back(hostname_port); + capio_backend->connect_to(std::string(remoteHost) + ":" + remotePort); } LOG("Terminated loop. sleeping one second"); sleep(1); @@ -91,9 +229,15 @@ class CapioCommunicationService { public: ~CapioCommunicationService() { *continue_execution = false; + + pthread_cancel(thread_server_finder_multicast->native_handle()); + pthread_cancel(thread_server_finder_fs->native_handle()); thread_server_finder_fs->join(); + thread_server_finder_multicast->join(); + delete_aliveness_token(); delete capio_backend; + delete token_used_to_connect_mutex; }; CapioCommunicationService(std::string &backend_name, const int port) { @@ -102,6 +246,8 @@ class CapioCommunicationService { gethostname(ownHostname, HOST_NAME_MAX); LOG("My hostname is %s. Starting to listen on connection", ownHostname); + token_used_to_connect_mutex = new std::mutex(); + if (backend_name == "MQTT" || backend_name == "MPI") { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " @@ -130,7 +276,12 @@ class CapioCommunicationService { } generate_aliveness_token(port); thread_server_finder_fs = - new std::thread(find_new_server_from_fs_token_thread, std::ref(continue_execution)); + new std::thread(find_new_server_from_fs_token_thread, std::ref(continue_execution), + &token_used_to_connect, token_used_to_connect_mutex); + + thread_server_finder_multicast = + new std::thread(find_new_server_from_multicast_thread, std::ref(continue_execution), + &token_used_to_connect, token_used_to_connect_mutex, port); } }; diff --git a/src/server/communication-service/MTCL_backend.hpp b/src/server/communication-service/MTCL_backend.hpp index 37f6e7d07..f4a582367 100644 --- a/src/server/communication-service/MTCL_backend.hpp +++ b/src/server/communication-service/MTCL_backend.hpp @@ -186,18 +186,23 @@ class MTCL_backend : public BackendInterface { } public: - void connect_to(std::string remoteHost, const std::string &remotePort) override { - START_LOG(gettid(), "call( remoteHost=%s, remotePort=%s)", remoteHost.c_str(), - remotePort.c_str()); - - const std::string remoteToken = usedProtocol + ":" + remoteHost + ":" + remotePort; - - if (remoteToken == selfToken || - remoteToken == usedProtocol + ":" + ownHostname + ":" + remotePort) { + void connect_to(std::string hostname_port) override { + START_LOG(gettid(), "call( hostname_port=%s)", hostname_port.c_str()); + std::string remoteHost = hostname_port.substr(0, hostname_port.find_last_of(':')); + const std::string remoteToken = usedProtocol + ":" + hostname_port; + + if (remoteToken == selfToken || // skip on 0.0.0.0 + remoteToken == usedProtocol + ":" + ownHostname + ":" + ownPort // skip on my real IP + ) { LOG("Skipping to connect to self"); return; } + if (connected_hostnames_map.find(remoteToken) != connected_hostnames_map.end()) { + LOG("Remote host %s is already connected", remoteHost.c_str()); + return; + } + LOG("Trying to connect on remote: %s", remoteToken.c_str()); if (auto UserManager = MTCL::Manager::connect(remoteToken); UserManager.isValid()) { std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " diff --git a/tests/multinode/backend/docker-compose.yml b/tests/multinode/backend/docker-compose.yml index 27c619def..ec80cbff3 100644 --- a/tests/multinode/backend/docker-compose.yml +++ b/tests/multinode/backend/docker-compose.yml @@ -5,10 +5,13 @@ services: working_dir: /shared volumes: - shared_data:/shared - command: | - capio_backend_unit_tests \ - --gtest_break_on_failure \ - --gtest_print_time=1 + networks: + capio_net: + aliases: + - node1 + environment: + - CAPIO_LOG_LEVEL=-1 + command: capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1 node2: image: alphaunito/capio:latest @@ -16,7 +19,17 @@ services: working_dir: /shared volumes: - shared_data:/shared - command: bash -c "sleep 5 && capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" + networks: + capio_net: + aliases: + - node2 + environment: + - CAPIO_LOG_LEVEL=-1 + command: bash -c "sleep 5 && capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" volumes: - shared_data: \ No newline at end of file + shared_data: + +networks: + capio_net: + driver: bridge \ No newline at end of file diff --git a/tests/multinode/integration/docker-compose.yml b/tests/multinode/integration/docker-compose.yml index 3e50178dd..12a0b086e 100644 --- a/tests/multinode/integration/docker-compose.yml +++ b/tests/multinode/integration/docker-compose.yml @@ -8,10 +8,11 @@ services: - CAPIO_LOG_LEVEL=-1 volumes: - shared_data:/shared - command: | - capio_integration_test_split \ - --gtest_break_on_failure \ - --gtest_print_time=1 + networks: + capio_net: + aliases: + - split + command: capio_integration_test_split --gtest_break_on_failure --gtest_print_time=1 map: image: alphaunito/capio:latest @@ -22,10 +23,11 @@ services: - CAPIO_LOG_LEVEL=-1 volumes: - shared_data:/shared - command: | - capio_integration_test_map \ - --gtest_break_on_failure \ - --gtest_print_time=1 + networks: + capio_net: + aliases: + - map + command: capio_integration_test_map --gtest_break_on_failure --gtest_print_time=1 merge: image: alphaunito/capio:latest @@ -36,9 +38,15 @@ services: - CAPIO_LOG_LEVEL=-1 volumes: - shared_data:/shared - command: | - capio_integration_test_merge \ - --gtest_break_on_failure \ - --gtest_print_time=1 + networks: + capio_net: + aliases: + - merge + command: capio_integration_test_merge --gtest_break_on_failure --gtest_print_time=1 + volumes: - shared_data: \ No newline at end of file + shared_data: + +networks: + capio_net: + driver: bridge \ No newline at end of file From c3912014985c6e7206097c27faa3b68e30bd6b8c Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 24 Jul 2025 14:41:06 +0200 Subject: [PATCH 03/11] Refactored structure of backend code --- .../CapioCommunicationService.hpp | 256 ++---------------- .../control_plane/capio_control_plane.hpp | 11 + .../control_plane/fs_control_plane.hpp | 120 ++++++++ .../control_plane/multicast_control_plane.hpp | 163 +++++++++++ .../{ => data_plane}/BackendInterface.hpp | 0 .../{ => data_plane}/MTCL_backend.hpp | 2 +- src/server/utils/signals.hpp | 2 +- tests/multinode/backend/src/MTCL.hpp | 8 +- 8 files changed, 318 insertions(+), 244 deletions(-) create mode 100644 src/server/communication-service/control_plane/capio_control_plane.hpp create mode 100644 src/server/communication-service/control_plane/fs_control_plane.hpp create mode 100644 src/server/communication-service/control_plane/multicast_control_plane.hpp rename src/server/communication-service/{ => data_plane}/BackendInterface.hpp (100%) rename src/server/communication-service/{ => data_plane}/MTCL_backend.hpp (99%) diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index 0293b2cc3..60d2acb21 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -1,253 +1,29 @@ #ifndef CAPIOCOMMUNICATIONSERVICE_HPP #define CAPIOCOMMUNICATIONSERVICE_HPP -#include "BackendInterface.hpp" -#include "MTCL_backend.hpp" +#include "control_plane/capio_control_plane.hpp" +#include "data_plane/BackendInterface.hpp" -#include -#include - -#include -#include -#include -#include -#include +#include "control_plane/fs_control_plane.hpp" +#include "control_plane/multicast_control_plane.hpp" +#include "data_plane/MTCL_backend.hpp" class CapioCommunicationService { char ownHostname[HOST_NAME_MAX] = {0}; - bool *continue_execution = new bool; - std::thread *thread_server_finder_fs, *thread_server_finder_multicast; - - std::vector token_used_to_connect; - std::mutex *token_used_to_connect_mutex; - - void generate_aliveness_token(const int port) const { - START_LOG(gettid(), "call(port=d)", port); - - std::string token_filename(ownHostname); - token_filename += ".alive_connection"; - - LOG("Creating alive token %s", token_filename.c_str()); - - std::ofstream FilePort(token_filename); - FilePort << port; - FilePort.close(); - - LOG("Saved self token info to FS"); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "Generated token at " << token_filename << std::endl; - } - - void delete_aliveness_token() const { - START_LOG(gettid(), "call()"); - - std::string token_filename(ownHostname); - token_filename += ".alive_connection"; - if (!std::filesystem::exists(token_filename)) { - LOG("Token does not exists. Skipping delettion"); - return; - } - - LOG("Removing alive token %s", token_filename.c_str()); - std::filesystem::remove(token_filename); - LOG("Removed token"); - } - - static void send_multicast_alive_token(int backend_port) { - char hostname[HOST_NAME_MAX]; - gethostname(hostname, HOST_NAME_MAX); - - int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); - if (transmission_socket < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " - << "WARNING: unable to bind multicast socket: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << hostname << " ] " - << "Execution will continue only with FS discovery support" << std::endl; - return; - } - - sockaddr_in addr = {}; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); - addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); - - char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; - sprintf(message, "%s:%d", hostname, backend_port); - - if (sendto(transmission_socket, message, strlen(message), 0, - reinterpret_cast(&addr), sizeof(addr)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " - << "WARNING: unable to send alive token(" << message - << ") to multicast address!: " << strerror(errno) << std::endl; - } - close(transmission_socket); - } - - static void find_new_server_from_multicast_thread( - const bool *continue_execution, std::vector *token_used_to_connect, - std::mutex *token_used_to_connect_mutex, int backend_port) { - char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; - char ownHostname[HOST_NAME_MAX]; - gethostname(ownHostname, HOST_NAME_MAX); - - int discovery_socket = socket(AF_INET, SOCK_DGRAM, 0); - if (discovery_socket < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " - << "WARNING: unable to open multicast socket: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "Execution will continue only with FS discovery support" << std::endl; - return; - } - - u_int multiple_socket_on_same_address = 1; - if (setsockopt(discovery_socket, SOL_SOCKET, SO_REUSEADDR, - (char *) &multiple_socket_on_same_address, - sizeof(multiple_socket_on_same_address)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " - << "WARNING: unable to assign multiple sockets to same address: " - << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "Execution will continue only with FS discovery support" << std::endl; - return; - } - - struct sockaddr_in addr = {}; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); - addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); - socklen_t addrlen = sizeof(addr); - - // bind to receive address - if (bind(discovery_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { - - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " - << "WARNING: unable to bind multicast socket: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "Execution will continue only with FS discovery support" << std::endl; - return; - } - - struct ip_mreq mreq; - mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); - if (setsockopt(discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " - << "WARNING: unable to join multicast group: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "Execution will continue only with FS discovery support" << std::endl; - return; - } - - while (*continue_execution) { - bzero(incomingMessage, sizeof(incomingMessage)); - send_multicast_alive_token(backend_port); - if (recvfrom(discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, 0, - reinterpret_cast(&addr), &addrlen) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " - << "WARNING: recvied < 0 bytes from multicast: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "Execution will continue only with FS discovery support" << std::endl; - return; - } - - if (std::string(incomingMessage) == - std::string(ownHostname) + ":" + std::to_string(backend_port)) { - // skip myself - continue; - } - - std::lock_guard lg(*token_used_to_connect_mutex); - if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), - incomingMessage) == token_used_to_connect->end()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " - << "Connecting to " << incomingMessage << " from multicast advert." - << std::endl; - token_used_to_connect->push_back(incomingMessage); - capio_backend->connect_to(incomingMessage); - } - - sleep(1); - } - } - - /* - * Monitor the file system for the presence of tokens. - */ - static void - find_new_server_from_fs_token_thread(const bool *continue_execution, - std::vector *token_used_to_connect, - std::mutex *token_used_to_connect_mutex) { - START_LOG(gettid(), "call()"); - - if (!continue_execution) { - LOG("Terminating execution"); - return; - } - - auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path()); - for (const auto &entry : dir_iterator) { - const auto token_path = entry.path(); - - if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") { - LOG("Filename %s is not valid", entry.path().c_str()); - continue; - } - - LOG("Found token %s", token_path.c_str()); - - std::ifstream MyReadFile(token_path.filename()); - std::string remoteHost = entry.path().stem(), remotePort; - LOG("Testing for file: %s (hostname: %s, port=%s)", entry.path().filename().c_str(), - remoteHost.c_str(), remotePort.c_str()); - - getline(MyReadFile, remotePort); - MyReadFile.close(); - - const auto hostname_port = std::string(remoteHost) + ":" + remotePort; - std::lock_guard lock(*token_used_to_connect_mutex); - if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), - hostname_port) != token_used_to_connect->end()) { - LOG("Token already handled... skipping it!"); - continue; - }; - - // TODO: as of now we will not connect with servers - // TODO: that terminates and then comes back up online... - token_used_to_connect->push_back(hostname_port); - capio_backend->connect_to(std::string(remoteHost) + ":" + remotePort); - } - LOG("Terminated loop. sleeping one second"); - sleep(1); - } public: ~CapioCommunicationService() { - *continue_execution = false; - - pthread_cancel(thread_server_finder_multicast->native_handle()); - pthread_cancel(thread_server_finder_fs->native_handle()); - thread_server_finder_fs->join(); - thread_server_finder_multicast->join(); - - delete_aliveness_token(); + delete capio_control_plane; delete capio_backend; - delete token_used_to_connect_mutex; }; - CapioCommunicationService(std::string &backend_name, const int port) { + CapioCommunicationService(std::string &backend_name, const int port, + const std::string &control_plane_backend = "multicast") { START_LOG(gettid(), "call(backend_name=%s)", backend_name.c_str()); - *continue_execution = true; gethostname(ownHostname, HOST_NAME_MAX); LOG("My hostname is %s. Starting to listen on connection", ownHostname); - token_used_to_connect_mutex = new std::mutex(); - if (backend_name == "MQTT" || backend_name == "MPI") { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " @@ -274,14 +50,16 @@ class CapioCommunicationService { << std::endl; ERR_EXIT("No valid backend was provided"); } - generate_aliveness_token(port); - thread_server_finder_fs = - new std::thread(find_new_server_from_fs_token_thread, std::ref(continue_execution), - &token_used_to_connect, token_used_to_connect_mutex); - thread_server_finder_multicast = - new std::thread(find_new_server_from_multicast_thread, std::ref(continue_execution), - &token_used_to_connect, token_used_to_connect_mutex, port); + if (control_plane_backend == "multicast") { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Starting multicast control plane" << std::endl; + capio_control_plane = new MulticastControlPlane(port); + } else { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Starting file system control plane" << std::endl; + capio_control_plane = new FSControlPlane(port); + } } }; diff --git a/src/server/communication-service/control_plane/capio_control_plane.hpp b/src/server/communication-service/control_plane/capio_control_plane.hpp new file mode 100644 index 000000000..4fbf45bbd --- /dev/null +++ b/src/server/communication-service/control_plane/capio_control_plane.hpp @@ -0,0 +1,11 @@ +#ifndef CAPIO_CONTROL_PLANE_HPP +#define CAPIO_CONTROL_PLANE_HPP + +class CapioControlPlane { + public: + virtual ~CapioControlPlane() = default; +}; + +inline CapioControlPlane *capio_control_plane; + +#endif // CAPIO_CONTROL_PLANE_HPP diff --git a/src/server/communication-service/control_plane/fs_control_plane.hpp b/src/server/communication-service/control_plane/fs_control_plane.hpp new file mode 100644 index 000000000..591288aae --- /dev/null +++ b/src/server/communication-service/control_plane/fs_control_plane.hpp @@ -0,0 +1,120 @@ +#ifndef FS_CONTROL_PLANE_HPP +#define FS_CONTROL_PLANE_HPP + +#include +#include +#include + +class FSControlPlane : public CapioControlPlane { + char ownHostname[HOST_NAME_MAX] = {0}; + int _backend_port; + bool *continue_execution; + std::thread *thread; + std::vector token_used_to_connect; + std::mutex *token_used_to_connect_mutex; + + void generate_aliveness_token(const int port) const { + START_LOG(gettid(), "call(port=d)", port); + + std::string token_filename(ownHostname); + token_filename += ".alive_connection"; + + LOG("Creating alive token %s", token_filename.c_str()); + + std::ofstream FilePort(token_filename); + FilePort << port; + FilePort.close(); + + LOG("Saved self token info to FS"); + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + << "Generated token at " << token_filename << std::endl; + } + + void delete_aliveness_token() const { + START_LOG(gettid(), "call()"); + + std::string token_filename(ownHostname); + token_filename += ".alive_connection"; + if (!std::filesystem::exists(token_filename)) { + LOG("Token does not exists. Skipping delettion"); + return; + } + + LOG("Removing alive token %s", token_filename.c_str()); + std::filesystem::remove(token_filename); + LOG("Removed token"); + } + + /* + * Monitor the file system for the presence of tokens. + */ + static void fs_server_aliveness_detector_thread(const bool *continue_execution, + std::vector *token_used_to_connect, + std::mutex *token_used_to_connect_mutex) { + START_LOG(gettid(), "call()"); + + if (!continue_execution) { + LOG("Terminating execution"); + return; + } + + auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path()); + for (const auto &entry : dir_iterator) { + const auto token_path = entry.path(); + + if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") { + LOG("Filename %s is not valid", entry.path().c_str()); + continue; + } + + LOG("Found token %s", token_path.c_str()); + + std::ifstream MyReadFile(token_path.filename()); + std::string remoteHost = entry.path().stem(), remotePort; + LOG("Testing for file: %s (hostname: %s, port=%s)", entry.path().filename().c_str(), + remoteHost.c_str(), remotePort.c_str()); + + getline(MyReadFile, remotePort); + MyReadFile.close(); + + const auto hostname_port = std::string(remoteHost) + ":" + remotePort; + std::lock_guard lock(*token_used_to_connect_mutex); + if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), + hostname_port) != token_used_to_connect->end()) { + LOG("Token already handled... skipping it!"); + continue; + }; + + // TODO: as of now we will not connect with servers + // TODO: that terminates and then comes back up online... + token_used_to_connect->push_back(hostname_port); + capio_backend->connect_to(std::string(remoteHost) + ":" + remotePort); + } + LOG("Terminated loop. sleeping one second"); + sleep(1); + } + + public: + explicit FSControlPlane(int backend_port) : _backend_port(backend_port) { + gethostname(ownHostname, HOST_NAME_MAX); + generate_aliveness_token(backend_port); + continue_execution = new bool(true); + token_used_to_connect_mutex = new std::mutex(); + thread = new std::thread(fs_server_aliveness_detector_thread, std::ref(continue_execution), + &token_used_to_connect, token_used_to_connect_mutex); + }; + + ~FSControlPlane() override { + delete_aliveness_token(); + pthread_cancel(thread->native_handle()); + thread->join(); + delete thread; + delete continue_execution; + delete token_used_to_connect_mutex; + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "FSControlPlane correctly terminated" << std::endl; + } +}; + +#endif // FS_CONTROL_PLANE_HPP diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp new file mode 100644 index 000000000..21d13aa72 --- /dev/null +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -0,0 +1,163 @@ +#ifndef MULTICAST_CONTROL_PLANE_HPP +#define MULTICAST_CONTROL_PLANE_HPP +#include +#include +#include +#include +#include +#include + +class MulticastControlPlane : public CapioControlPlane { + int _backend_port; + bool *continue_execution; + std::thread *thread; + std::vector token_used_to_connect; + std::mutex *token_used_to_connect_mutex; + char ownHostname[HOST_NAME_MAX] = {0}; + + static void send_multicast_alive_token(const int backend_port) { + char hostname[HOST_NAME_MAX]; + gethostname(hostname, HOST_NAME_MAX); + + int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (transmission_socket < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " + << "WARNING: unable to bind multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << hostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); + addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); + + char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; + sprintf(message, "%s:%d", hostname, backend_port); + + if (sendto(transmission_socket, message, strlen(message), 0, + reinterpret_cast(&addr), sizeof(addr)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " + << "WARNING: unable to send alive token(" << message + << ") to multicast address!: " << strerror(errno) << std::endl; + } + close(transmission_socket); + } + + static void multicast_server_aliveness_detector_thread( + const bool *continue_execution, std::vector *token_used_to_connect, + std::mutex *token_used_to_connect_mutex, const int backend_port) { + + char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; + char ownHostname[HOST_NAME_MAX]; + gethostname(ownHostname, HOST_NAME_MAX); + + int discovery_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (discovery_socket < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to open multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + u_int multiple_socket_on_same_address = 1; + if (setsockopt(discovery_socket, SOL_SOCKET, SO_REUSEADDR, + (char *) &multiple_socket_on_same_address, + sizeof(multiple_socket_on_same_address)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to assign multiple sockets to same address: " + << strerror(errno) << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + struct sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); + socklen_t addrlen = sizeof(addr); + + // bind to receive address + if (bind(discovery_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to bind multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to join multicast group: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + while (*continue_execution) { + bzero(incomingMessage, sizeof(incomingMessage)); + send_multicast_alive_token(backend_port); + if (recvfrom(discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, 0, + reinterpret_cast(&addr), &addrlen) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: recvied < 0 bytes from multicast: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + if (std::string(incomingMessage) == + std::string(ownHostname) + ":" + std::to_string(backend_port)) { + // skip myself + continue; + } + + std::lock_guard lg(*token_used_to_connect_mutex); + if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), + incomingMessage) == token_used_to_connect->end()) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Multicast adv: " << incomingMessage << std::endl; + token_used_to_connect->push_back(incomingMessage); + capio_backend->connect_to(incomingMessage); + } + + sleep(1); + } + } + + public: + explicit MulticastControlPlane(int backend_port) : _backend_port(backend_port) { + gethostname(ownHostname, HOST_NAME_MAX); + continue_execution = new bool(true); + token_used_to_connect_mutex = new std::mutex(); + + thread = new std::thread(multicast_server_aliveness_detector_thread, + std::ref(continue_execution), &token_used_to_connect, + token_used_to_connect_mutex, backend_port); + } + + ~MulticastControlPlane() override { + *continue_execution = false; + pthread_cancel(thread->native_handle()); + thread->join(); + delete token_used_to_connect_mutex; + delete thread; + delete continue_execution; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "MulticastControlPlane correctly terminated" << std::endl; + } +}; + +#endif // MULTICAST_CONTROL_PLANE_HPP diff --git a/src/server/communication-service/BackendInterface.hpp b/src/server/communication-service/data_plane/BackendInterface.hpp similarity index 100% rename from src/server/communication-service/BackendInterface.hpp rename to src/server/communication-service/data_plane/BackendInterface.hpp diff --git a/src/server/communication-service/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp similarity index 99% rename from src/server/communication-service/MTCL_backend.hpp rename to src/server/communication-service/data_plane/MTCL_backend.hpp index f4a582367..4d8cd4542 100644 --- a/src/server/communication-service/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -168,7 +168,7 @@ class MTCL_backend : public BackendInterface { UserManager.receive(connected_hostname, HOST_NAME_MAX); std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "Connected to " << connected_hostname << std::endl; + << "Connected from " << connected_hostname << std::endl; LOG("Received connection hostname: %s", connected_hostname); diff --git a/src/server/utils/signals.hpp b/src/server/utils/signals.hpp index 857bdbfb5..1541b0042 100644 --- a/src/server/utils/signals.hpp +++ b/src/server/utils/signals.hpp @@ -1,7 +1,7 @@ #ifndef CAPIO_SERVER_HANDLERS_SIGNALS_HPP #define CAPIO_SERVER_HANDLERS_SIGNALS_HPP -#include "communication-service/BackendInterface.hpp" +#include "../communication-service/data_plane/BackendInterface.hpp" #include "communication-service/CapioCommunicationService.hpp" #include diff --git a/tests/multinode/backend/src/MTCL.hpp b/tests/multinode/backend/src/MTCL.hpp index e8ed52588..e3b04ba92 100644 --- a/tests/multinode/backend/src/MTCL.hpp +++ b/tests/multinode/backend/src/MTCL.hpp @@ -12,9 +12,9 @@ constexpr capio_off64_t BUFFER_SIZES = 1024; TEST(CapioCommServiceTest, TestPingPong) { START_LOG(gettid(), "INFO: TestPingPong"); gethostname(node_name.data(), HOST_NAME_MAX); - const int port = 1234; - std::string proto = "TCP"; - CapioCommunicationService communication_service(proto, port); + const int port = 1234; + std::string proto = "TCP"; + auto communication_service = new CapioCommunicationService(proto, port, "multicast"); capio_off64_t size_revc, offset; std::vector connections; @@ -39,6 +39,7 @@ TEST(CapioCommServiceTest, TestPingPong) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping response from : " << i << std::endl; EXPECT_EQ(strcmp(buff, buff1), 0); + delete communication_service; return; } std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Listening for ping from: " << i @@ -49,6 +50,7 @@ TEST(CapioCommServiceTest, TestPingPong) { EXPECT_EQ(strcmp(recvBuff, TEST_MESSAGE), 0); capio_backend->send(i, recvBuff, size_revc, "./test", 0); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sent ping response to: " << i << std::endl; + delete communication_service; return; } } From 89bd5a0a02f0ba44cec08ca741e58c36593bf743 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 24 Jul 2025 16:47:04 +0200 Subject: [PATCH 04/11] Added multicast flags to cli --- src/common/capio/constants.hpp | 3 +++ .../CapioCommunicationService.hpp | 3 +++ .../control_plane/multicast_control_plane.hpp | 3 +++ .../data_plane/MTCL_backend.hpp | 2 +- src/server/utils/parser.hpp | 21 ++++++++++++++++++- 5 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/common/capio/constants.hpp b/src/common/capio/constants.hpp index 05131e832..e07920bcf 100644 --- a/src/common/capio/constants.hpp +++ b/src/common/capio/constants.hpp @@ -155,6 +155,9 @@ constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP[] = "specified, and a fatal termination point is reached, the behaviour of capio is undefined and " "should not be taken as valid"; +constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_CONTROL_PLANE_BACKEND[] = + "Which control plane backend to used. Options: . Defaults to "; + constexpr char CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING[] = "[ \033[1;33m SERVER \033[0m ]\033[1;31m " "|==================================================================|\033[0m\n" diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index 60d2acb21..ce6109535 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -60,6 +60,9 @@ class CapioCommunicationService { << "Starting file system control plane" << std::endl; capio_control_plane = new FSControlPlane(port); } + + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + << "CapioCommunicationService initialization completed." << std::endl; } }; diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp index 21d13aa72..fb122d946 100644 --- a/src/server/communication-service/control_plane/multicast_control_plane.hpp +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -146,6 +146,9 @@ class MulticastControlPlane : public CapioControlPlane { thread = new std::thread(multicast_server_aliveness_detector_thread, std::ref(continue_execution), &token_used_to_connect, token_used_to_connect_mutex, backend_port); + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + << "Multicast discovery service @ " << MULTICAST_DISCOVERY_ADDR << ":" + << MULTICAST_DISCOVERY_PORT << std::endl; } ~MulticastControlPlane() override { diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index 4d8cd4542..5f7ac8d35 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -251,7 +251,7 @@ class MTCL_backend : public BackendInterface { th = new std::thread(incoming_connection_listener, std::ref(continue_execution), sleep_time, &connected_hostnames_map, _guard, &connection_threads, terminate); std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "CapioCommunicationService initialization completed." << std::endl; + << "MTCL data plane initialization completed." << std::endl; } ~MTCL_backend() override { diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index 9bf6af5f8..cfa40f38b 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -33,6 +33,10 @@ std::string parseCLI(int argc, char **argv) { args::Flag memStorageOnly(arguments, "mem-storage-only", CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP, {"mem-only"}); + args::ValueFlag controlPlaneBackend( + arguments, "backend", CAPIO_SERVER_ARG_PARSER_CONFIG_CONTROL_PLANE_BACKEND, + {"control-backend"}); + try { parser.ParseCLI(argc, argv); } catch (args::Help &) { @@ -147,7 +151,22 @@ std::string parseCLI(int argc, char **argv) { port = args::get(backend_port); } - capio_communication_service = new CapioCommunicationService(backend_name, port); + std::string constrol_backend_name = "multicast"; + if (controlPlaneBackend) { + auto tmp = args::get(controlPlaneBackend); + if (tmp != "multicast" && tmp != "fs") { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << "Unknown control plane backend " << tmp << std::endl; + } else { + constrol_backend_name = tmp; + } + } + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "Using control plane backend: " << constrol_backend_name << std::endl; + + capio_communication_service = + new CapioCommunicationService(backend_name, port, constrol_backend_name); } else { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " From 686e1a63a448e1a325265a8bf710eb13daa269bf Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 30 Jul 2025 17:20:20 +0200 Subject: [PATCH 05/11] logs --- .../data_plane/MTCL_backend.hpp | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index 5f7ac8d35..1722396d2 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -48,14 +48,19 @@ class MTCL_backend : public BackendInterface { size_t filepath_len; const auto unit = new TransportUnit(); HandlerPointer->receive(&filepath_len, sizeof(size_t)); + LOG("Incoming path of length %ld", filepath_len); unit->_filepath.reserve(filepath_len + 1); HandlerPointer->receive(unit->_filepath.data(), filepath_len); + LOG("Received message! Path : %s", unit->_filepath.c_str()); HandlerPointer->receive(&unit->_buffer_size, sizeof(capio_off64_t)); + LOG("Buffer size for incoming data is %ld", unit->_buffer_size); unit->_bytes = new char[unit->_buffer_size]; + LOG("Allocated space for incoming data"); HandlerPointer->receive(unit->_bytes, unit->_buffer_size); + LOG("Received file buffer data"); HandlerPointer->receive(&unit->_start_write_offset, sizeof(capio_off64_t)); - LOG("[recv] Receiving %ld bytes of file %s", unit->_buffer_size, unit->_filepath.c_str()); - LOG("[recv] Offset of received chunk is %ld", unit->_start_write_offset); + LOG("Received chunk of data should be stored on offset %ld of file %s", + unit->_start_write_offset, unit->_filepath.c_str()); return unit; } @@ -69,14 +74,27 @@ class MTCL_backend : public BackendInterface { * step2: send data */ const size_t file_path_length = unit->_filepath.length(); + HandlerPointer->send(&file_path_length, sizeof(size_t)); + LOG("Size of path that is being sent: %ld", file_path_length); + HandlerPointer->send(unit->_filepath.c_str(), file_path_length); + LOG("Sent file path: %s", unit->_filepath.c_str()); + HandlerPointer->send(&unit->_buffer_size, sizeof(capio_off64_t)); + LOG("Size of file buffer to be sent: %ld", unit->_buffer_size); + HandlerPointer->send(unit->_bytes, unit->_buffer_size); + LOG("Sent %ld bytes of data chunk", unit->_buffer_size); + HandlerPointer->send(&unit->_start_write_offset, sizeof(capio_off64_t)); - LOG("[send] Sent %ld bytes of file %s with offset of %ld", unit->_buffer_size, - unit->_filepath.c_str(), unit->_start_write_offset); + LOG("Sent start write offset : %ld", unit->_start_write_offset); + + delete unit->_bytes; + LOG("Freed transfer unit _bytes buffer"); + delete unit; + LOG("Deleted transfer unit"); } /** From a19e3fe85fdae70efe3e27a22a3437b9db1029eb Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 31 Jul 2025 10:48:28 +0200 Subject: [PATCH 06/11] logs --- .../control_plane/multicast_control_plane.hpp | 94 ++++++++++++------- .../data_plane/MTCL_backend.hpp | 12 ++- 2 files changed, 69 insertions(+), 37 deletions(-) diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp index fb122d946..385e6e2e4 100644 --- a/src/server/communication-service/control_plane/multicast_control_plane.hpp +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -15,7 +15,8 @@ class MulticastControlPlane : public CapioControlPlane { std::mutex *token_used_to_connect_mutex; char ownHostname[HOST_NAME_MAX] = {0}; - static void send_multicast_alive_token(const int backend_port) { + static void send_multicast_alive_token(const int data_plane_backend_port) { + START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); char hostname[HOST_NAME_MAX]; gethostname(hostname, HOST_NAME_MAX); @@ -35,7 +36,7 @@ class MulticastControlPlane : public CapioControlPlane { addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; - sprintf(message, "%s:%d", hostname, backend_port); + sprintf(message, "%s:%d", hostname, data_plane_backend_port); if (sendto(transmission_socket, message, strlen(message), 0, reinterpret_cast(&addr), sizeof(addr)) < 0) { @@ -43,17 +44,26 @@ class MulticastControlPlane : public CapioControlPlane { << "WARNING: unable to send alive token(" << message << ") to multicast address!: " << strerror(errno) << std::endl; } + LOG("Sent multicast token"); close(transmission_socket); } - static void multicast_server_aliveness_detector_thread( - const bool *continue_execution, std::vector *token_used_to_connect, - std::mutex *token_used_to_connect_mutex, const int backend_port) { + static void multicast_server_aliveness_thread(const bool *continue_execution, + std::vector *token_used_to_connect, + std::mutex *token_used_to_connect_mutex, + const int data_plane_backend_port) { - char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; - char ownHostname[HOST_NAME_MAX]; + START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); + + char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE], ownHostname[HOST_NAME_MAX]; gethostname(ownHostname, HOST_NAME_MAX); + int loopback = 0; // disable receive loopback messages + u_int multiple_socket_on_same_address = 1; // enable multiple sockets on same address + + const std::string SELF_TOKEN = + std::string(ownHostname) + ":" + std::to_string(data_plane_backend_port); + int discovery_socket = socket(AF_INET, SOCK_DGRAM, 0); if (discovery_socket < 0) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " @@ -63,8 +73,8 @@ class MulticastControlPlane : public CapioControlPlane { << "Execution will continue only with FS discovery support" << std::endl; return; } + LOG("Created socket"); - u_int multiple_socket_on_same_address = 1; if (setsockopt(discovery_socket, SOL_SOCKET, SO_REUSEADDR, (char *) &multiple_socket_on_same_address, sizeof(multiple_socket_on_same_address)) < 0) { @@ -75,12 +85,25 @@ class MulticastControlPlane : public CapioControlPlane { << "Execution will continue only with FS discovery support" << std::endl; return; } + LOG("Set IP address to accept multiple sockets on same address"); + + if (setsockopt(discovery_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loopback, + sizeof(loopback)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to filter out loopback incoming messages: " + << strerror(errno) << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + LOG("Disabled reception of loopback messages from socket"); - struct sockaddr_in addr = {}; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); - addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); - socklen_t addrlen = sizeof(addr); + sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); + socklen_t addrlen = sizeof(addr); + LOG("Set socket on IP: %s - PORT: %d", MULTICAST_DISCOVERY_ADDR, MULTICAST_DISCOVERY_PORT); // bind to receive address if (bind(discovery_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { @@ -92,8 +115,9 @@ class MulticastControlPlane : public CapioControlPlane { << "Execution will continue only with FS discovery support" << std::endl; return; } + LOG("Binded socket"); - struct ip_mreq mreq; + ip_mreq mreq{}; mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { @@ -104,31 +128,36 @@ class MulticastControlPlane : public CapioControlPlane { << "Execution will continue only with FS discovery support" << std::endl; return; } + LOG("Successfully joined multicast group"); while (*continue_execution) { bzero(incomingMessage, sizeof(incomingMessage)); - send_multicast_alive_token(backend_port); - if (recvfrom(discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, 0, - reinterpret_cast(&addr), &addrlen) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " - << "WARNING: recvied < 0 bytes from multicast: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "Execution will continue only with FS discovery support" << std::endl; - return; - } - - if (std::string(incomingMessage) == - std::string(ownHostname) + ":" + std::to_string(backend_port)) { - // skip myself - continue; - } + send_multicast_alive_token(data_plane_backend_port); + LOG("Waiting for incoming token..."); + + do { + const auto recv_sice = + recvfrom(discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, + 0, reinterpret_cast(&addr), &addrlen); + LOG("Received multicast data of size %ld and content %s", recv_sice, + incomingMessage); + if (recv_sice < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: received < 0 bytes from multicast: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" + << std::endl; + return; + } + } while (std::string(incomingMessage) == SELF_TOKEN); std::lock_guard lg(*token_used_to_connect_mutex); if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), incomingMessage) == token_used_to_connect->end()) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " << "Multicast adv: " << incomingMessage << std::endl; + LOG("Received message: %s", incomingMessage); token_used_to_connect->push_back(incomingMessage); capio_backend->connect_to(incomingMessage); } @@ -143,9 +172,8 @@ class MulticastControlPlane : public CapioControlPlane { continue_execution = new bool(true); token_used_to_connect_mutex = new std::mutex(); - thread = new std::thread(multicast_server_aliveness_detector_thread, - std::ref(continue_execution), &token_used_to_connect, - token_used_to_connect_mutex, backend_port); + thread = new std::thread(multicast_server_aliveness_thread, std::ref(continue_execution), + &token_used_to_connect, token_used_to_connect_mutex, backend_port); std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " << "Multicast discovery service @ " << MULTICAST_DISCOVERY_ADDR << ":" << MULTICAST_DISCOVERY_PORT << std::endl; diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index 1722396d2..b6696c823 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -31,6 +31,7 @@ class TransportUnit { }; class MTCL_backend : public BackendInterface { + typedef enum { FROM_REMOTE, TO_REMOTE } CONN_HANDLER_ORIGIN; typedef std::tuple *, std::queue *, std::mutex *> TransportUnitInterface; std::unordered_map connected_hostnames_map; @@ -102,8 +103,10 @@ class MTCL_backend : public BackendInterface { */ void static server_connection_handler(MTCL::HandleUser HandlerPointer, const std::string remote_hostname, const int sleep_time, - TransportUnitInterface interface, const bool *terminate) { - START_LOG(gettid(), "call(remote_hostname=%s)", remote_hostname.c_str()); + TransportUnitInterface interface, const bool *terminate, + CONN_HANDLER_ORIGIN source) { + START_LOG(gettid(), "call(remote_hostname=%s, kind=%s)", remote_hostname.c_str(), + source == FROM_REMOTE ? "from remote server" : "to remote server"); // out = data to sent to others // in = data from others auto [in, out, mutex] = interface; @@ -199,7 +202,7 @@ class MTCL_backend : public BackendInterface { _connection_threads->push_back(new std::thread( server_connection_handler, std::move(UserManager), connected_hostname, sleep_time, - open_connections->at(connected_hostname), terminate)); + open_connections->at(connected_hostname), terminate, FROM_REMOTE)); } } @@ -236,7 +239,7 @@ class MTCL_backend : public BackendInterface { connection_threads.push_back(new std::thread( server_connection_handler, std::move(UserManager), remoteHost.c_str(), - thread_sleep_times, connection_tuple, terminate)); + thread_sleep_times, connection_tuple, terminate, TO_REMOTE)); } else { std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << ownHostname << " ] " << "Warning: found token " << remoteHost << ".alive_token" @@ -307,6 +310,7 @@ class MTCL_backend : public BackendInterface { inQueue = std::get<0>(data); interface = data; found = !inQueue->empty(); + LOG("Hostname %s, %s incoming data", hostname.c_str(), found ? "has" : "has not"); } if (!found) { LOG("No incoming messages. Putting thread to sleep"); From c1f2a7127dac0a2b9fded3c8442dddd02a320c83 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 31 Jul 2025 11:08:36 +0200 Subject: [PATCH 07/11] fix --- src/common/capio/logger.hpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/common/capio/logger.hpp b/src/common/capio/logger.hpp index cbe2967b4..555827eba 100644 --- a/src/common/capio/logger.hpp +++ b/src/common/capio/logger.hpp @@ -22,8 +22,8 @@ inline bool continue_on_error = false; // change behaviour of ERR_EXIT to contin #ifndef __CAPIO_POSIX #include thread_local std::ofstream logfile; // if building for server, self-contained logfile -std::string log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER; -std::string logfile_prefix = CAPIO_SERVER_DEFAULT_LOG_FILE_PREFIX; +inline std::string log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER; +inline std::string logfile_prefix = CAPIO_SERVER_DEFAULT_LOG_FILE_PREFIX; #else inline thread_local bool logfileOpen = false; inline thread_local int logfileFD = -1; @@ -46,6 +46,10 @@ inline auto open_server_logfile() { auto hostname = new char[HOST_NAME_MAX]; gethostname(hostname, HOST_NAME_MAX); + if (log_master_dir_name.empty()) { + log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER; + } + const std::filesystem::path output_folder = std::string{log_master_dir_name + "/server/" + hostname}; From c9a6574db56fe5223ba8ea481cccf41ce5a85892 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 31 Jul 2025 12:41:39 +0200 Subject: [PATCH 08/11] Fixes --- src/server/communication-service/data_plane/MTCL_backend.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index b6696c823..acfe9a655 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -94,8 +94,9 @@ class MTCL_backend : public BackendInterface { delete unit->_bytes; LOG("Freed transfer unit _bytes buffer"); - delete unit; - LOG("Deleted transfer unit"); + // DO NOT DELETE unit: here just afterwards, the unit experiences a pop() which + // effectively calls a delete on the container. If I delete it here, a double delete is raised + } /** From 193d3dd7d81f27c4967bea06fe298588090a3d40 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 31 Jul 2025 12:48:01 +0200 Subject: [PATCH 09/11] fix --- src/server/communication-service/data_plane/MTCL_backend.hpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index acfe9a655..3dc1632d1 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -91,9 +91,6 @@ class MTCL_backend : public BackendInterface { HandlerPointer->send(&unit->_start_write_offset, sizeof(capio_off64_t)); LOG("Sent start write offset : %ld", unit->_start_write_offset); - delete unit->_bytes; - LOG("Freed transfer unit _bytes buffer"); - // DO NOT DELETE unit: here just afterwards, the unit experiences a pop() which // effectively calls a delete on the container. If I delete it here, a double delete is raised From bae1a32303f0827643cd4495ff5e0ab72d547db6 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 31 Jul 2025 12:53:11 +0200 Subject: [PATCH 10/11] fix --- src/server/communication-service/data_plane/MTCL_backend.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index 3dc1632d1..f9b97d59e 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -326,7 +326,6 @@ class MTCL_backend : public BackendInterface { std::string filename(inputUnit->_filepath); - delete inputUnit; return filename; } From 477490dbe29c961d3349129730688238dd813d15 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 31 Jul 2025 13:18:17 +0200 Subject: [PATCH 11/11] clang-format --- .../communication-service/data_plane/MTCL_backend.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index f9b97d59e..05e4f2296 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -91,9 +91,9 @@ class MTCL_backend : public BackendInterface { HandlerPointer->send(&unit->_start_write_offset, sizeof(capio_off64_t)); LOG("Sent start write offset : %ld", unit->_start_write_offset); - // DO NOT DELETE unit: here just afterwards, the unit experiences a pop() which - // effectively calls a delete on the container. If I delete it here, a double delete is raised - + // DO NOT DELETE unit: here just afterward, the unit experiences a pop() which + // effectively calls delete on the container. If I delete it here, a double delete is + // raised } /**