Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/common/capio/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP[] =
"specified, and a fatal termination point is reached, the behaviour of capio is undefined and "
"should not be taken as valid";

constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_CONTROL_PLANE_BACKEND[] =
"Which control plane backend to used. Options: <multicast | fs>. Defaults to <multicast>";

constexpr char CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING[] =
"[ \033[1;33m SERVER \033[0m ]\033[1;31m "
"|==================================================================|\033[0m\n"
Expand All @@ -181,5 +184,8 @@ constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_BACKEND_HELP[] =

constexpr int DEFAULT_CAPIO_BACKEND_PORT = 2222;
constexpr int CAPIO_BACKEND_DEFAULT_SLEEP_TIME = 300;

#endif // CAPIO_COMMON_CONSTANTS_HPP
constexpr char MULTICAST_DISCOVERY_ADDR[] = "234.234.234.1";
constexpr int MULTICAST_DISCOVERY_PORT = 2223;
constexpr int MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE =
HOST_NAME_MAX + 10; // hostname + : + sizeof(port)
#endif // CAPIO_COMMON_CONSTANTS_HPP
8 changes: 6 additions & 2 deletions src/common/capio/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ inline bool continue_on_error = false; // change behaviour of ERR_EXIT to contin
#ifndef __CAPIO_POSIX
#include <filesystem>
thread_local std::ofstream logfile; // if building for server, self-contained logfile
std::string log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER;
std::string logfile_prefix = CAPIO_SERVER_DEFAULT_LOG_FILE_PREFIX;
inline std::string log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER;
inline std::string logfile_prefix = CAPIO_SERVER_DEFAULT_LOG_FILE_PREFIX;
#else
inline thread_local bool logfileOpen = false;
inline thread_local int logfileFD = -1;
Expand All @@ -46,6 +46,10 @@ inline auto open_server_logfile() {
auto hostname = new char[HOST_NAME_MAX];
gethostname(hostname, HOST_NAME_MAX);

if (log_master_dir_name.empty()) {
log_master_dir_name = CAPIO_DEFAULT_LOG_FOLDER;
}

const std::filesystem::path output_folder =
std::string{log_master_dir_name + "/server/" + hostname};

Expand Down
109 changes: 21 additions & 88 deletions src/server/communication-service/CapioCommunicationService.hpp
Original file line number Diff line number Diff line change
@@ -1,103 +1,26 @@
#ifndef CAPIOCOMMUNICATIONSERVICE_HPP
#define CAPIOCOMMUNICATIONSERVICE_HPP

#include "BackendInterface.hpp"
#include "MTCL_backend.hpp"
#include "control_plane/capio_control_plane.hpp"
#include "data_plane/BackendInterface.hpp"

#include <algorithm>
#include "control_plane/fs_control_plane.hpp"
#include "control_plane/multicast_control_plane.hpp"
#include "data_plane/MTCL_backend.hpp"

class CapioCommunicationService {

char ownHostname[HOST_NAME_MAX] = {0};
bool *continue_execution = new bool;
std::thread *thread_server_finder_fs;

void generate_aliveness_token(const int port) const {
START_LOG(gettid(), "call(port=d)", port);

std::string token_filename(ownHostname);
token_filename += ".alive_connection";

LOG("Creating alive token %s", token_filename.c_str());

std::ofstream FilePort(token_filename);
FilePort << port;
FilePort.close();

LOG("Saved self token info to FS");
std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] "
<< "Generated token at " << token_filename << std::endl;
}

void delete_aliveness_token() const {
START_LOG(gettid(), "call()");

std::string token_filename(ownHostname);
token_filename += ".alive_connection";

LOG("Removing alive token %s", token_filename.c_str());
std::filesystem::remove(token_filename);
LOG("Removed token");
}

/*
* Monitor the file system for the presence of tokens.
*/
static void find_new_server_from_fs_token_thread(const bool *continue_execution) {
START_LOG(gettid(), "call()");

std::vector<std::string> handled_tokens;

if (!continue_execution) {
LOG("Terminating execution");
return;
}

auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path());
for (const auto &entry : dir_iterator) {
const auto &token_path = entry.path();

if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") {
LOG("Filename %s is not valid", entry.path().c_str());
continue;
}

if (std::find(handled_tokens.begin(), handled_tokens.end(), entry.path()) !=
handled_tokens.end()) {
LOG("Token already handled... skipping it!");
continue;
};

LOG("Found token %s", token_path.c_str());
// TODO: as of now we will not connect with servers
// TODO: that terminates and then comes back up online...
handled_tokens.push_back(entry.path());

std::ifstream MyReadFile(token_path.filename());
std::string remoteHost = entry.path().stem(), remotePort;
LOG("Testing for file: %s (hostname: %s, port=%s)", entry.path().filename().c_str(),
remoteHost.c_str(), remotePort.c_str());

getline(MyReadFile, remotePort);
MyReadFile.close();

capio_backend->connect_to(remoteHost, remotePort);
}
LOG("Terminated loop. sleeping one second");
sleep(1);
}

public:
~CapioCommunicationService() {
*continue_execution = false;
thread_server_finder_fs->join();
delete_aliveness_token();
delete capio_control_plane;
delete capio_backend;
};

CapioCommunicationService(std::string &backend_name, const int port) {
CapioCommunicationService(std::string &backend_name, const int port,
const std::string &control_plane_backend = "multicast") {
START_LOG(gettid(), "call(backend_name=%s)", backend_name.c_str());
*continue_execution = true;
gethostname(ownHostname, HOST_NAME_MAX);
LOG("My hostname is %s. Starting to listen on connection", ownHostname);

Expand Down Expand Up @@ -127,9 +50,19 @@ class CapioCommunicationService {
<< std::endl;
ERR_EXIT("No valid backend was provided");
}
generate_aliveness_token(port);
thread_server_finder_fs =
new std::thread(find_new_server_from_fs_token_thread, std::ref(continue_execution));

if (control_plane_backend == "multicast") {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] "
<< "Starting multicast control plane" << std::endl;
capio_control_plane = new MulticastControlPlane(port);
} else {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] "
<< "Starting file system control plane" << std::endl;
capio_control_plane = new FSControlPlane(port);
}

std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] "
<< "CapioCommunicationService initialization completed." << std::endl;
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#ifndef CAPIO_CONTROL_PLANE_HPP
#define CAPIO_CONTROL_PLANE_HPP

class CapioControlPlane {
public:
virtual ~CapioControlPlane() = default;
};

inline CapioControlPlane *capio_control_plane;

#endif // CAPIO_CONTROL_PLANE_HPP
120 changes: 120 additions & 0 deletions src/server/communication-service/control_plane/fs_control_plane.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#ifndef FS_CONTROL_PLANE_HPP
#define FS_CONTROL_PLANE_HPP

#include <algorithm>
#include <capio/logger.hpp>
#include <filesystem>

class FSControlPlane : public CapioControlPlane {
char ownHostname[HOST_NAME_MAX] = {0};
int _backend_port;
bool *continue_execution;
std::thread *thread;
std::vector<std::string> token_used_to_connect;
std::mutex *token_used_to_connect_mutex;

void generate_aliveness_token(const int port) const {
START_LOG(gettid(), "call(port=d)", port);

std::string token_filename(ownHostname);
token_filename += ".alive_connection";

LOG("Creating alive token %s", token_filename.c_str());

std::ofstream FilePort(token_filename);
FilePort << port;
FilePort.close();

LOG("Saved self token info to FS");
std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] "
<< "Generated token at " << token_filename << std::endl;
}

void delete_aliveness_token() const {
START_LOG(gettid(), "call()");

std::string token_filename(ownHostname);
token_filename += ".alive_connection";
if (!std::filesystem::exists(token_filename)) {
LOG("Token does not exists. Skipping delettion");
return;
}

LOG("Removing alive token %s", token_filename.c_str());
std::filesystem::remove(token_filename);
LOG("Removed token");
}

/*
* Monitor the file system for the presence of tokens.
*/
static void fs_server_aliveness_detector_thread(const bool *continue_execution,
std::vector<std::string> *token_used_to_connect,
std::mutex *token_used_to_connect_mutex) {
START_LOG(gettid(), "call()");

if (!continue_execution) {
LOG("Terminating execution");
return;
}

auto dir_iterator = std::filesystem::directory_iterator(std::filesystem::current_path());
for (const auto &entry : dir_iterator) {
const auto token_path = entry.path();

if (!entry.is_regular_file() || token_path.extension() != ".alive_connection") {
LOG("Filename %s is not valid", entry.path().c_str());
continue;
}

LOG("Found token %s", token_path.c_str());

std::ifstream MyReadFile(token_path.filename());
std::string remoteHost = entry.path().stem(), remotePort;
LOG("Testing for file: %s (hostname: %s, port=%s)", entry.path().filename().c_str(),
remoteHost.c_str(), remotePort.c_str());

getline(MyReadFile, remotePort);
MyReadFile.close();

const auto hostname_port = std::string(remoteHost) + ":" + remotePort;
std::lock_guard lock(*token_used_to_connect_mutex);
if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(),
hostname_port) != token_used_to_connect->end()) {
LOG("Token already handled... skipping it!");
continue;
};

// TODO: as of now we will not connect with servers
// TODO: that terminates and then comes back up online...
token_used_to_connect->push_back(hostname_port);
capio_backend->connect_to(std::string(remoteHost) + ":" + remotePort);
}
LOG("Terminated loop. sleeping one second");
sleep(1);
}

public:
explicit FSControlPlane(int backend_port) : _backend_port(backend_port) {
gethostname(ownHostname, HOST_NAME_MAX);
generate_aliveness_token(backend_port);
continue_execution = new bool(true);
token_used_to_connect_mutex = new std::mutex();
thread = new std::thread(fs_server_aliveness_detector_thread, std::ref(continue_execution),
&token_used_to_connect, token_used_to_connect_mutex);
};

~FSControlPlane() override {
delete_aliveness_token();
pthread_cancel(thread->native_handle());
thread->join();
delete thread;
delete continue_execution;
delete token_used_to_connect_mutex;

std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] "
<< "FSControlPlane correctly terminated" << std::endl;
}
};

#endif // FS_CONTROL_PLANE_HPP
Loading