diff --git a/src/common/capio/constants.hpp b/src/common/capio/constants.hpp index d2f43044e..e07920bcf 100644 --- a/src/common/capio/constants.hpp +++ b/src/common/capio/constants.hpp @@ -155,6 +155,9 @@ constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP[] = "specified, and a fatal termination point is reached, the behaviour of capio is undefined and " "should not be taken as valid"; +constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_CONTROL_PLANE_BACKEND[] = + "Which control plane backend to used. Options: . Defaults to "; + constexpr char CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING[] = "[ \033[1;33m SERVER \033[0m ]\033[1;31m " "|==================================================================|\033[0m\n" @@ -181,5 +184,8 @@ constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_BACKEND_HELP[] = constexpr int DEFAULT_CAPIO_BACKEND_PORT = 2222; constexpr int CAPIO_BACKEND_DEFAULT_SLEEP_TIME = 300; - -#endif // CAPIO_COMMON_CONSTANTS_HPP +constexpr char MULTICAST_DISCOVERY_ADDR[] = "234.234.234.1"; +constexpr int MULTICAST_DISCOVERY_PORT = 2223; +constexpr int MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE = + HOST_NAME_MAX + 10; // hostname + : + sizeof(port) +#endif // CAPIO_COMMON_CONSTANTS_HPP diff --git a/src/common/capio/logger.hpp b/src/common/capio/logger.hpp index cbe2967b4..555827eba 100644 --- a/src/common/capio/logger.hpp +++ b/src/common/capio/logger.hpp @@ -22,8 +22,8 @@ inline bool continue_on_error = false; // change behaviour of ERR_EXIT to contin #ifndef __CAPIO_POSIX #include thread_local std::ofstream logfile; // if building for server, self-contained logfile -std::string log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER; -std::string logfile_prefix = CAPIO_SERVER_DEFAULT_LOG_FILE_PREFIX; +inline std::string log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER; +inline std::string logfile_prefix = CAPIO_SERVER_DEFAULT_LOG_FILE_PREFIX; #else inline thread_local bool logfileOpen = false; inline thread_local int logfileFD = -1; @@ -46,6 +46,10 @@ inline auto open_server_logfile() { auto hostname = new char[HOST_NAME_MAX]; gethostname(hostname, HOST_NAME_MAX); + if (log_master_dir_name.empty()) { + log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER; + } + const std::filesystem::path output_folder = std::string{log_master_dir_name + "/server/" + hostname}; diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index 1478e2897..ce6109535 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -1,103 +1,26 @@ #ifndef CAPIOCOMMUNICATIONSERVICE_HPP #define CAPIOCOMMUNICATIONSERVICE_HPP -#include "BackendInterface.hpp" -#include "MTCL_backend.hpp" +#include "control_plane/capio_control_plane.hpp" +#include "data_plane/BackendInterface.hpp" -#include +#include "control_plane/fs_control_plane.hpp" +#include "control_plane/multicast_control_plane.hpp" +#include "data_plane/MTCL_backend.hpp" class CapioCommunicationService { char ownHostname[HOST_NAME_MAX] = {0}; - bool *continue_execution = new bool; - std::thread *thread_server_finder_fs; - - void generate_aliveness_token(const int port) const { - START_LOG(gettid(), "call(port=d)", port); - - std::string token_filename(ownHostname); - token_filename += ".alive_connection"; - - LOG("Creating alive token %s", token_filename.c_str()); - - std::ofstream FilePort(token_filename); - FilePort << port; - FilePort.close(); - - LOG("Saved self token info to FS"); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "Generated token at " << token_filename << std::endl; - } - - void delete_aliveness_token() const { - START_LOG(gettid(), "call()"); - - std::string token_filename(ownHostname); - token_filename += ".alive_connection"; - - LOG("Removing alive token %s", token_filename.c_str()); - std::filesystem::remove(token_filename); - LOG("Removed token"); - } - - /* - * Monitor the file system for the presence of tokens. - */ - static void find_new_server_from_fs_token_thread(const bool *continue_execution) { - START_LOG(gettid(), "call()"); - - std::vector handled_tokens; - - if (!continue_execution) { - LOG("Terminating execution"); - return; - } - - auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path()); - for (const auto &entry : dir_iterator) { - const auto &token_path = entry.path(); - - if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") { - LOG("Filename %s is not valid", entry.path().c_str()); - continue; - } - - if (std::find(handled_tokens.begin(), handled_tokens.end(), entry.path()) != - handled_tokens.end()) { - LOG("Token already handled... skipping it!"); - continue; - }; - - LOG("Found token %s", token_path.c_str()); - // TODO: as of now we will not connect with servers - // TODO: that terminates and then comes back up online... - handled_tokens.push_back(entry.path()); - - std::ifstream MyReadFile(token_path.filename()); - std::string remoteHost = entry.path().stem(), remotePort; - LOG("Testing for file: %s (hostname: %s, port=%s)", entry.path().filename().c_str(), - remoteHost.c_str(), remotePort.c_str()); - - getline(MyReadFile, remotePort); - MyReadFile.close(); - - capio_backend->connect_to(remoteHost, remotePort); - } - LOG("Terminated loop. sleeping one second"); - sleep(1); - } public: ~CapioCommunicationService() { - *continue_execution = false; - thread_server_finder_fs->join(); - delete_aliveness_token(); + delete capio_control_plane; delete capio_backend; }; - CapioCommunicationService(std::string &backend_name, const int port) { + CapioCommunicationService(std::string &backend_name, const int port, + const std::string &control_plane_backend = "multicast") { START_LOG(gettid(), "call(backend_name=%s)", backend_name.c_str()); - *continue_execution = true; gethostname(ownHostname, HOST_NAME_MAX); LOG("My hostname is %s. Starting to listen on connection", ownHostname); @@ -127,9 +50,19 @@ class CapioCommunicationService { << std::endl; ERR_EXIT("No valid backend was provided"); } - generate_aliveness_token(port); - thread_server_finder_fs = - new std::thread(find_new_server_from_fs_token_thread, std::ref(continue_execution)); + + if (control_plane_backend == "multicast") { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Starting multicast control plane" << std::endl; + capio_control_plane = new MulticastControlPlane(port); + } else { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Starting file system control plane" << std::endl; + capio_control_plane = new FSControlPlane(port); + } + + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + << "CapioCommunicationService initialization completed." << std::endl; } }; diff --git a/src/server/communication-service/control_plane/capio_control_plane.hpp b/src/server/communication-service/control_plane/capio_control_plane.hpp new file mode 100644 index 000000000..4fbf45bbd --- /dev/null +++ b/src/server/communication-service/control_plane/capio_control_plane.hpp @@ -0,0 +1,11 @@ +#ifndef CAPIO_CONTROL_PLANE_HPP +#define CAPIO_CONTROL_PLANE_HPP + +class CapioControlPlane { + public: + virtual ~CapioControlPlane() = default; +}; + +inline CapioControlPlane *capio_control_plane; + +#endif // CAPIO_CONTROL_PLANE_HPP diff --git a/src/server/communication-service/control_plane/fs_control_plane.hpp b/src/server/communication-service/control_plane/fs_control_plane.hpp new file mode 100644 index 000000000..591288aae --- /dev/null +++ b/src/server/communication-service/control_plane/fs_control_plane.hpp @@ -0,0 +1,120 @@ +#ifndef FS_CONTROL_PLANE_HPP +#define FS_CONTROL_PLANE_HPP + +#include +#include +#include + +class FSControlPlane : public CapioControlPlane { + char ownHostname[HOST_NAME_MAX] = {0}; + int _backend_port; + bool *continue_execution; + std::thread *thread; + std::vector token_used_to_connect; + std::mutex *token_used_to_connect_mutex; + + void generate_aliveness_token(const int port) const { + START_LOG(gettid(), "call(port=d)", port); + + std::string token_filename(ownHostname); + token_filename += ".alive_connection"; + + LOG("Creating alive token %s", token_filename.c_str()); + + std::ofstream FilePort(token_filename); + FilePort << port; + FilePort.close(); + + LOG("Saved self token info to FS"); + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + << "Generated token at " << token_filename << std::endl; + } + + void delete_aliveness_token() const { + START_LOG(gettid(), "call()"); + + std::string token_filename(ownHostname); + token_filename += ".alive_connection"; + if (!std::filesystem::exists(token_filename)) { + LOG("Token does not exists. Skipping delettion"); + return; + } + + LOG("Removing alive token %s", token_filename.c_str()); + std::filesystem::remove(token_filename); + LOG("Removed token"); + } + + /* + * Monitor the file system for the presence of tokens. + */ + static void fs_server_aliveness_detector_thread(const bool *continue_execution, + std::vector *token_used_to_connect, + std::mutex *token_used_to_connect_mutex) { + START_LOG(gettid(), "call()"); + + if (!continue_execution) { + LOG("Terminating execution"); + return; + } + + auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path()); + for (const auto &entry : dir_iterator) { + const auto token_path = entry.path(); + + if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") { + LOG("Filename %s is not valid", entry.path().c_str()); + continue; + } + + LOG("Found token %s", token_path.c_str()); + + std::ifstream MyReadFile(token_path.filename()); + std::string remoteHost = entry.path().stem(), remotePort; + LOG("Testing for file: %s (hostname: %s, port=%s)", entry.path().filename().c_str(), + remoteHost.c_str(), remotePort.c_str()); + + getline(MyReadFile, remotePort); + MyReadFile.close(); + + const auto hostname_port = std::string(remoteHost) + ":" + remotePort; + std::lock_guard lock(*token_used_to_connect_mutex); + if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), + hostname_port) != token_used_to_connect->end()) { + LOG("Token already handled... skipping it!"); + continue; + }; + + // TODO: as of now we will not connect with servers + // TODO: that terminates and then comes back up online... + token_used_to_connect->push_back(hostname_port); + capio_backend->connect_to(std::string(remoteHost) + ":" + remotePort); + } + LOG("Terminated loop. sleeping one second"); + sleep(1); + } + + public: + explicit FSControlPlane(int backend_port) : _backend_port(backend_port) { + gethostname(ownHostname, HOST_NAME_MAX); + generate_aliveness_token(backend_port); + continue_execution = new bool(true); + token_used_to_connect_mutex = new std::mutex(); + thread = new std::thread(fs_server_aliveness_detector_thread, std::ref(continue_execution), + &token_used_to_connect, token_used_to_connect_mutex); + }; + + ~FSControlPlane() override { + delete_aliveness_token(); + pthread_cancel(thread->native_handle()); + thread->join(); + delete thread; + delete continue_execution; + delete token_used_to_connect_mutex; + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "FSControlPlane correctly terminated" << std::endl; + } +}; + +#endif // FS_CONTROL_PLANE_HPP diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp new file mode 100644 index 000000000..385e6e2e4 --- /dev/null +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -0,0 +1,194 @@ +#ifndef MULTICAST_CONTROL_PLANE_HPP +#define MULTICAST_CONTROL_PLANE_HPP +#include +#include +#include +#include +#include +#include + +class MulticastControlPlane : public CapioControlPlane { + int _backend_port; + bool *continue_execution; + std::thread *thread; + std::vector token_used_to_connect; + std::mutex *token_used_to_connect_mutex; + char ownHostname[HOST_NAME_MAX] = {0}; + + static void send_multicast_alive_token(const int data_plane_backend_port) { + START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); + char hostname[HOST_NAME_MAX]; + gethostname(hostname, HOST_NAME_MAX); + + int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (transmission_socket < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " + << "WARNING: unable to bind multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << hostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + + sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); + addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); + + char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; + sprintf(message, "%s:%d", hostname, data_plane_backend_port); + + if (sendto(transmission_socket, message, strlen(message), 0, + reinterpret_cast(&addr), sizeof(addr)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " + << "WARNING: unable to send alive token(" << message + << ") to multicast address!: " << strerror(errno) << std::endl; + } + LOG("Sent multicast token"); + close(transmission_socket); + } + + static void multicast_server_aliveness_thread(const bool *continue_execution, + std::vector *token_used_to_connect, + std::mutex *token_used_to_connect_mutex, + const int data_plane_backend_port) { + + START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); + + char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE], ownHostname[HOST_NAME_MAX]; + gethostname(ownHostname, HOST_NAME_MAX); + + int loopback = 0; // disable receive loopback messages + u_int multiple_socket_on_same_address = 1; // enable multiple sockets on same address + + const std::string SELF_TOKEN = + std::string(ownHostname) + ":" + std::to_string(data_plane_backend_port); + + int discovery_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (discovery_socket < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to open multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + LOG("Created socket"); + + if (setsockopt(discovery_socket, SOL_SOCKET, SO_REUSEADDR, + (char *) &multiple_socket_on_same_address, + sizeof(multiple_socket_on_same_address)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to assign multiple sockets to same address: " + << strerror(errno) << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + LOG("Set IP address to accept multiple sockets on same address"); + + if (setsockopt(discovery_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loopback, + sizeof(loopback)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to filter out loopback incoming messages: " + << strerror(errno) << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + LOG("Disabled reception of loopback messages from socket"); + + sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); + socklen_t addrlen = sizeof(addr); + LOG("Set socket on IP: %s - PORT: %d", MULTICAST_DISCOVERY_ADDR, MULTICAST_DISCOVERY_PORT); + + // bind to receive address + if (bind(discovery_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to bind multicast socket: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + LOG("Binded socket"); + + ip_mreq mreq{}; + mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: unable to join multicast group: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" << std::endl; + return; + } + LOG("Successfully joined multicast group"); + + while (*continue_execution) { + bzero(incomingMessage, sizeof(incomingMessage)); + send_multicast_alive_token(data_plane_backend_port); + LOG("Waiting for incoming token..."); + + do { + const auto recv_sice = + recvfrom(discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, + 0, reinterpret_cast(&addr), &addrlen); + LOG("Received multicast data of size %ld and content %s", recv_sice, + incomingMessage); + if (recv_sice < 0) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + << "WARNING: received < 0 bytes from multicast: " << strerror(errno) + << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "Execution will continue only with FS discovery support" + << std::endl; + return; + } + } while (std::string(incomingMessage) == SELF_TOKEN); + + std::lock_guard lg(*token_used_to_connect_mutex); + if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), + incomingMessage) == token_used_to_connect->end()) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + << "Multicast adv: " << incomingMessage << std::endl; + LOG("Received message: %s", incomingMessage); + token_used_to_connect->push_back(incomingMessage); + capio_backend->connect_to(incomingMessage); + } + + sleep(1); + } + } + + public: + explicit MulticastControlPlane(int backend_port) : _backend_port(backend_port) { + gethostname(ownHostname, HOST_NAME_MAX); + continue_execution = new bool(true); + token_used_to_connect_mutex = new std::mutex(); + + thread = new std::thread(multicast_server_aliveness_thread, std::ref(continue_execution), + &token_used_to_connect, token_used_to_connect_mutex, backend_port); + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + << "Multicast discovery service @ " << MULTICAST_DISCOVERY_ADDR << ":" + << MULTICAST_DISCOVERY_PORT << std::endl; + } + + ~MulticastControlPlane() override { + *continue_execution = false; + pthread_cancel(thread->native_handle()); + thread->join(); + delete token_used_to_connect_mutex; + delete thread; + delete continue_execution; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << "MulticastControlPlane correctly terminated" << std::endl; + } +}; + +#endif // MULTICAST_CONTROL_PLANE_HPP diff --git a/src/server/communication-service/BackendInterface.hpp b/src/server/communication-service/data_plane/BackendInterface.hpp similarity index 89% rename from src/server/communication-service/BackendInterface.hpp rename to src/server/communication-service/data_plane/BackendInterface.hpp index e54089274..20a001f5a 100644 --- a/src/server/communication-service/BackendInterface.hpp +++ b/src/server/communication-service/data_plane/BackendInterface.hpp @@ -16,9 +16,10 @@ class BackendInterface { public: virtual ~BackendInterface() = default; - virtual void connect_to(std::string remoteHost, const std::string &remotePort) { - throw NotImplementedBackendMethod(); - }; + /** + * @param hostname_port who to connect to in the form of hostname:port + */ + virtual void connect_to(std::string hostname_port) { throw NotImplementedBackendMethod(); }; /** * @brief Send data to target @@ -59,7 +60,7 @@ class BackendInterface { */ class NoBackend final : public BackendInterface { public: - void connect_to(std::string remoteHost, const std::string &remotePort) override { return; }; + void connect_to(std::string hostname_port) override { return; }; void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath, capio_off64_t start_offset) override { diff --git a/src/server/communication-service/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp similarity index 84% rename from src/server/communication-service/MTCL_backend.hpp rename to src/server/communication-service/data_plane/MTCL_backend.hpp index 37f6e7d07..05e4f2296 100644 --- a/src/server/communication-service/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -31,6 +31,7 @@ class TransportUnit { }; class MTCL_backend : public BackendInterface { + typedef enum { FROM_REMOTE, TO_REMOTE } CONN_HANDLER_ORIGIN; typedef std::tuple *, std::queue *, std::mutex *> TransportUnitInterface; std::unordered_map connected_hostnames_map; @@ -48,14 +49,19 @@ class MTCL_backend : public BackendInterface { size_t filepath_len; const auto unit = new TransportUnit(); HandlerPointer->receive(&filepath_len, sizeof(size_t)); + LOG("Incoming path of length %ld", filepath_len); unit->_filepath.reserve(filepath_len + 1); HandlerPointer->receive(unit->_filepath.data(), filepath_len); + LOG("Received message! Path : %s", unit->_filepath.c_str()); HandlerPointer->receive(&unit->_buffer_size, sizeof(capio_off64_t)); + LOG("Buffer size for incoming data is %ld", unit->_buffer_size); unit->_bytes = new char[unit->_buffer_size]; + LOG("Allocated space for incoming data"); HandlerPointer->receive(unit->_bytes, unit->_buffer_size); + LOG("Received file buffer data"); HandlerPointer->receive(&unit->_start_write_offset, sizeof(capio_off64_t)); - LOG("[recv] Receiving %ld bytes of file %s", unit->_buffer_size, unit->_filepath.c_str()); - LOG("[recv] Offset of received chunk is %ld", unit->_start_write_offset); + LOG("Received chunk of data should be stored on offset %ld of file %s", + unit->_start_write_offset, unit->_filepath.c_str()); return unit; } @@ -69,14 +75,25 @@ class MTCL_backend : public BackendInterface { * step2: send data */ const size_t file_path_length = unit->_filepath.length(); + HandlerPointer->send(&file_path_length, sizeof(size_t)); + LOG("Size of path that is being sent: %ld", file_path_length); + HandlerPointer->send(unit->_filepath.c_str(), file_path_length); + LOG("Sent file path: %s", unit->_filepath.c_str()); + HandlerPointer->send(&unit->_buffer_size, sizeof(capio_off64_t)); + LOG("Size of file buffer to be sent: %ld", unit->_buffer_size); + HandlerPointer->send(unit->_bytes, unit->_buffer_size); + LOG("Sent %ld bytes of data chunk", unit->_buffer_size); + HandlerPointer->send(&unit->_start_write_offset, sizeof(capio_off64_t)); - LOG("[send] Sent %ld bytes of file %s with offset of %ld", unit->_buffer_size, - unit->_filepath.c_str(), unit->_start_write_offset); - delete unit; + LOG("Sent start write offset : %ld", unit->_start_write_offset); + + // DO NOT DELETE unit: here just afterward, the unit experiences a pop() which + // effectively calls delete on the container. If I delete it here, a double delete is + // raised } /** @@ -84,8 +101,10 @@ class MTCL_backend : public BackendInterface { */ void static server_connection_handler(MTCL::HandleUser HandlerPointer, const std::string remote_hostname, const int sleep_time, - TransportUnitInterface interface, const bool *terminate) { - START_LOG(gettid(), "call(remote_hostname=%s)", remote_hostname.c_str()); + TransportUnitInterface interface, const bool *terminate, + CONN_HANDLER_ORIGIN source) { + START_LOG(gettid(), "call(remote_hostname=%s, kind=%s)", remote_hostname.c_str(), + source == FROM_REMOTE ? "from remote server" : "to remote server"); // out = data to sent to others // in = data from others auto [in, out, mutex] = interface; @@ -168,7 +187,7 @@ class MTCL_backend : public BackendInterface { UserManager.receive(connected_hostname, HOST_NAME_MAX); std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "Connected to " << connected_hostname << std::endl; + << "Connected from " << connected_hostname << std::endl; LOG("Received connection hostname: %s", connected_hostname); @@ -181,23 +200,28 @@ class MTCL_backend : public BackendInterface { _connection_threads->push_back(new std::thread( server_connection_handler, std::move(UserManager), connected_hostname, sleep_time, - open_connections->at(connected_hostname), terminate)); + open_connections->at(connected_hostname), terminate, FROM_REMOTE)); } } public: - void connect_to(std::string remoteHost, const std::string &remotePort) override { - START_LOG(gettid(), "call( remoteHost=%s, remotePort=%s)", remoteHost.c_str(), - remotePort.c_str()); - - const std::string remoteToken = usedProtocol + ":" + remoteHost + ":" + remotePort; - - if (remoteToken == selfToken || - remoteToken == usedProtocol + ":" + ownHostname + ":" + remotePort) { + void connect_to(std::string hostname_port) override { + START_LOG(gettid(), "call( hostname_port=%s)", hostname_port.c_str()); + std::string remoteHost = hostname_port.substr(0, hostname_port.find_last_of(':')); + const std::string remoteToken = usedProtocol + ":" + hostname_port; + + if (remoteToken == selfToken || // skip on 0.0.0.0 + remoteToken == usedProtocol + ":" + ownHostname + ":" + ownPort // skip on my real IP + ) { LOG("Skipping to connect to self"); return; } + if (connected_hostnames_map.find(remoteToken) != connected_hostnames_map.end()) { + LOG("Remote host %s is already connected", remoteHost.c_str()); + return; + } + LOG("Trying to connect on remote: %s", remoteToken.c_str()); if (auto UserManager = MTCL::Manager::connect(remoteToken); UserManager.isValid()) { std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " @@ -213,7 +237,7 @@ class MTCL_backend : public BackendInterface { connection_threads.push_back(new std::thread( server_connection_handler, std::move(UserManager), remoteHost.c_str(), - thread_sleep_times, connection_tuple, terminate)); + thread_sleep_times, connection_tuple, terminate, TO_REMOTE)); } else { std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << ownHostname << " ] " << "Warning: found token " << remoteHost << ".alive_token" @@ -246,7 +270,7 @@ class MTCL_backend : public BackendInterface { th = new std::thread(incoming_connection_listener, std::ref(continue_execution), sleep_time, &connected_hostnames_map, _guard, &connection_threads, terminate); std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "CapioCommunicationService initialization completed." << std::endl; + << "MTCL data plane initialization completed." << std::endl; } ~MTCL_backend() override { @@ -284,6 +308,7 @@ class MTCL_backend : public BackendInterface { inQueue = std::get<0>(data); interface = data; found = !inQueue->empty(); + LOG("Hostname %s, %s incoming data", hostname.c_str(), found ? "has" : "has not"); } if (!found) { LOG("No incoming messages. Putting thread to sleep"); @@ -301,7 +326,6 @@ class MTCL_backend : public BackendInterface { std::string filename(inputUnit->_filepath); - delete inputUnit; return filename; } diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index 9bf6af5f8..cfa40f38b 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -33,6 +33,10 @@ std::string parseCLI(int argc, char **argv) { args::Flag memStorageOnly(arguments, "mem-storage-only", CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP, {"mem-only"}); + args::ValueFlag controlPlaneBackend( + arguments, "backend", CAPIO_SERVER_ARG_PARSER_CONFIG_CONTROL_PLANE_BACKEND, + {"control-backend"}); + try { parser.ParseCLI(argc, argv); } catch (args::Help &) { @@ -147,7 +151,22 @@ std::string parseCLI(int argc, char **argv) { port = args::get(backend_port); } - capio_communication_service = new CapioCommunicationService(backend_name, port); + std::string constrol_backend_name = "multicast"; + if (controlPlaneBackend) { + auto tmp = args::get(controlPlaneBackend); + if (tmp != "multicast" && tmp != "fs") { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << "Unknown control plane backend " << tmp << std::endl; + } else { + constrol_backend_name = tmp; + } + } + + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + << "Using control plane backend: " << constrol_backend_name << std::endl; + + capio_communication_service = + new CapioCommunicationService(backend_name, port, constrol_backend_name); } else { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " diff --git a/src/server/utils/signals.hpp b/src/server/utils/signals.hpp index 857bdbfb5..1541b0042 100644 --- a/src/server/utils/signals.hpp +++ b/src/server/utils/signals.hpp @@ -1,7 +1,7 @@ #ifndef CAPIO_SERVER_HANDLERS_SIGNALS_HPP #define CAPIO_SERVER_HANDLERS_SIGNALS_HPP -#include "communication-service/BackendInterface.hpp" +#include "../communication-service/data_plane/BackendInterface.hpp" #include "communication-service/CapioCommunicationService.hpp" #include diff --git a/tests/multinode/backend/docker-compose.yml b/tests/multinode/backend/docker-compose.yml index 27c619def..ec80cbff3 100644 --- a/tests/multinode/backend/docker-compose.yml +++ b/tests/multinode/backend/docker-compose.yml @@ -5,10 +5,13 @@ services: working_dir: /shared volumes: - shared_data:/shared - command: | - capio_backend_unit_tests \ - --gtest_break_on_failure \ - --gtest_print_time=1 + networks: + capio_net: + aliases: + - node1 + environment: + - CAPIO_LOG_LEVEL=-1 + command: capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1 node2: image: alphaunito/capio:latest @@ -16,7 +19,17 @@ services: working_dir: /shared volumes: - shared_data:/shared - command: bash -c "sleep 5 && capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" + networks: + capio_net: + aliases: + - node2 + environment: + - CAPIO_LOG_LEVEL=-1 + command: bash -c "sleep 5 && capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" volumes: - shared_data: \ No newline at end of file + shared_data: + +networks: + capio_net: + driver: bridge \ No newline at end of file diff --git a/tests/multinode/backend/src/MTCL.hpp b/tests/multinode/backend/src/MTCL.hpp index e8ed52588..e3b04ba92 100644 --- a/tests/multinode/backend/src/MTCL.hpp +++ b/tests/multinode/backend/src/MTCL.hpp @@ -12,9 +12,9 @@ constexpr capio_off64_t BUFFER_SIZES = 1024; TEST(CapioCommServiceTest, TestPingPong) { START_LOG(gettid(), "INFO: TestPingPong"); gethostname(node_name.data(), HOST_NAME_MAX); - const int port = 1234; - std::string proto = "TCP"; - CapioCommunicationService communication_service(proto, port); + const int port = 1234; + std::string proto = "TCP"; + auto communication_service = new CapioCommunicationService(proto, port, "multicast"); capio_off64_t size_revc, offset; std::vector connections; @@ -39,6 +39,7 @@ TEST(CapioCommServiceTest, TestPingPong) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping response from : " << i << std::endl; EXPECT_EQ(strcmp(buff, buff1), 0); + delete communication_service; return; } std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Listening for ping from: " << i @@ -49,6 +50,7 @@ TEST(CapioCommServiceTest, TestPingPong) { EXPECT_EQ(strcmp(recvBuff, TEST_MESSAGE), 0); capio_backend->send(i, recvBuff, size_revc, "./test", 0); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sent ping response to: " << i << std::endl; + delete communication_service; return; } } diff --git a/tests/multinode/integration/docker-compose.yml b/tests/multinode/integration/docker-compose.yml index 3e50178dd..12a0b086e 100644 --- a/tests/multinode/integration/docker-compose.yml +++ b/tests/multinode/integration/docker-compose.yml @@ -8,10 +8,11 @@ services: - CAPIO_LOG_LEVEL=-1 volumes: - shared_data:/shared - command: | - capio_integration_test_split \ - --gtest_break_on_failure \ - --gtest_print_time=1 + networks: + capio_net: + aliases: + - split + command: capio_integration_test_split --gtest_break_on_failure --gtest_print_time=1 map: image: alphaunito/capio:latest @@ -22,10 +23,11 @@ services: - CAPIO_LOG_LEVEL=-1 volumes: - shared_data:/shared - command: | - capio_integration_test_map \ - --gtest_break_on_failure \ - --gtest_print_time=1 + networks: + capio_net: + aliases: + - map + command: capio_integration_test_map --gtest_break_on_failure --gtest_print_time=1 merge: image: alphaunito/capio:latest @@ -36,9 +38,15 @@ services: - CAPIO_LOG_LEVEL=-1 volumes: - shared_data:/shared - command: | - capio_integration_test_merge \ - --gtest_break_on_failure \ - --gtest_print_time=1 + networks: + capio_net: + aliases: + - merge + command: capio_integration_test_merge --gtest_break_on_failure --gtest_print_time=1 + volumes: - shared_data: \ No newline at end of file + shared_data: + +networks: + capio_net: + driver: bridge \ No newline at end of file