Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/server/capio-cl-engine/capio_cl_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,18 @@ class CapioCLEngine {
return files;
}

auto get_home_node(const std::string &path) {
// TODO: understand here how to get the home node policy.
START_LOG(gettid(), "call(path=%s)", path.c_str());
if (const auto location = _locations.find(path); location == _locations.end()) {
LOG("No rule for home node. Returning create home node");
return node_name;
} else {
LOG("Found location entry");
}
return node_name;
}

protected:
const auto *getLocations() const { return &_locations; }
};
Expand Down
188 changes: 2 additions & 186 deletions src/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,193 +42,9 @@ char node_name[HOST_NAME_MAX];

#include "file-manager/file_manager.hpp"

#include "communication-service/BackendInterface.hpp"
#include "communication-service/CapioCommunicationService.hpp"

#include "communication-service/MTCL_backend.hpp"

std::string parseCLI(int argc, char **argv) {
Logger *log;

args::ArgumentParser parser(CAPIO_SERVER_ARG_PARSER_PRE, CAPIO_SERVER_ARG_PARSER_EPILOGUE);
parser.LongSeparator(" ");
parser.LongPrefix("--");
parser.ShortPrefix("-");

args::Group arguments(parser, "Arguments");
args::HelpFlag help(arguments, "help", "Display this help menu", {'h', "help"});
args::ValueFlag<std::string> logfile_src(arguments, "filename",
CAPIO_SERVER_ARG_PARSER_LOGILE_OPT_HELP, {'l', "log"});
args::ValueFlag<std::string> logfile_folder(
arguments, "filename", CAPIO_SERVER_ARG_PARSER_LOGILE_DIR_OPT_HELP, {'d', "log-dir"});
args::ValueFlag<std::string> config(arguments, "filename",
CAPIO_SERVER_ARG_PARSER_CONFIG_OPT_HELP, {'c', "config"});
args::Flag noConfigFile(arguments, "no-config",
CAPIO_SERVER_ARG_PARSER_CONFIG_NO_CONF_FILE_HELP, {"no-config"});

args::ValueFlag<std::string> backend(
arguments, "backend", CAPIO_SERVER_ARG_PARSER_BACKEND_OPT_HELP, {'b', "backend"});

args::ValueFlag<int> backend_port(arguments, "port",
CAPIO_SERVER_ARG_PARSER_BACKEND_PORT_OPT_HELP, {'p', "port"});

args::Flag continueOnErrorFlag(arguments, "continue-on-error",
CAPIO_SERVER_ARG_PARSER_MEM_STORAGE_ONLY_HELP,
{"continue-on-error"});

args::Flag memStorageOnly(arguments, "mem-storage-only",
CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP, {"mem-only"});

try {
parser.ParseCLI(argc, argv);
} catch (args::Help &) {
std::cout << CAPIO_SERVER_ARG_PARSER_PRE_COMMAND << parser;
exit(EXIT_SUCCESS);
} catch (args::ParseError &e) {
START_LOG(gettid(), "call()");
std::cerr << e.what() << std::endl;
std::cerr << parser;
ERR_EXIT("%s", e.what());
} catch (args::ValidationError &e) {
START_LOG(gettid(), "call()");
std::cerr << e.what() << std::endl;
std::cerr << parser;
ERR_EXIT("%s", e.what());
}

if (continueOnErrorFlag) {
#ifdef CAPIO_LOG
continue_on_error = true;
std::cout << CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING << std::endl;
#else
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING
<< "--continue-on-error flag given, but logger is not compiled into CAPIO. Flag "
"is ignored."
<< std::endl;
#endif
}

if (memStorageOnly) {
StoreOnlyInMemory = true;
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] "
<< "All files will be stored in memory whenever possible." << std::endl;
}

if (logfile_folder) {
#ifdef CAPIO_LOG
log_master_dir_name = args::get(logfile_folder);
#else
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING
<< "Capio logfile folder, but logging capabilities not compiled into capio!"
<< std::endl;
#endif
}

if (logfile_src) {
#ifdef CAPIO_LOG
// log file was given
std::string token = args::get(logfile_src);
if (token.find(".log") != std::string::npos) {
token.erase(token.length() - 4); // delete .log if for some reason
// is given as parameter
}
logfile_prefix = token;
#else
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING
<< "Capio logfile provided, but logging capabilities not compiled into capio!"
<< std::endl;
#endif
}
#ifdef CAPIO_LOG
auto logname = open_server_logfile();
log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file");
std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] "
<< "started logging to logfile " << logname << std::endl;
#endif

if (config) {
std::string token = args::get(config);
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] "
<< "parsing config file: " << token << std::endl;
// TODO: pass config file path
} else if (noConfigFile) {
workflow_name = std::string_view(get_capio_workflow_name());
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] "
<< "skipping config file parsing." << std::endl
<< CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] "
<< "Obtained from environment variable current workflow name: "
<< workflow_name.data() << std::endl;

} else {
START_LOG(gettid(), "call()");
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] "
<< "Error: no config file provided. To skip config file use --no-config option!"
<< std::endl;
ERR_EXIT("no config file provided, and --no-config not provided");
}

std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] "
<< "CAPIO_DIR=" << get_capio_dir().c_str() << std::endl;

#ifdef CAPIO_LOG
CAPIO_LOG_LEVEL = get_capio_log_level();
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] "
<< "LOG_LEVEL set to: " << CAPIO_LOG_LEVEL << std::endl;
std::cout << CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING;
log->log("LOG_LEVEL set to: %d", CAPIO_LOG_LEVEL);
delete log;
#else
if (std::getenv("CAPIO_LOG_LEVEL") != nullptr) {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] "
<< CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE << std::endl;
}
#endif

if (backend) {
std::string backend_name = args::get(backend);
std::transform(backend_name.begin(), backend_name.end(), backend_name.begin(), ::toupper);

int port = DEFAULT_CAPIO_BACKEND_PORT;
if (backend_port) {
port = args::get(backend_port);
}

if (backend_name == "MQTT" || backend_name == "MPI") {
std::cout
<< CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] "
<< "Warn: selected backend is not yet officially supported. Setting backend to TCP"
<< std::endl;
backend_name = "TCP";
}

if (backend_name == "TCP" || backend_name == "UCX") {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] "
<< "Selected backend is: " << backend_name << std::endl;
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] "
<< "Selected backend port is: " << port << std::endl;
capio_backend = new MTCL_backend(backend_name, std::to_string(port),
CAPIO_BACKEND_DEFAULT_SLEEP_TIME);
} else if (backend_name == "FS") {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] "
<< "Selected backend is File System" << std::endl;
capio_backend = new NoBackend();
} else {
START_LOG(gettid(), "call()");
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] "
<< "Provided communication backend " << backend_name << " is invalid"
<< std::endl;
ERR_EXIT("No valid backend was provided");
}
} else {
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] "
<< "Selected backend is File System" << std::endl;
capio_backend = new NoBackend();
}

if (config) {
return args::get(config);
}
return "";
}
#include <utils/parser.hpp>

int main(int argc, char **argv) {

Expand Down
2 changes: 1 addition & 1 deletion src/server/client-manager/handlers/create.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ inline void create_handler(const char *const str) {

capio_cl_engine->addProducer(path, name);
client_manager->register_produced_file(tid, path_str);
storage_service->createFile(path);
storage_service->createMemoryFile(path);
}

#endif // CAPIO_CREATE_HPP
9 changes: 7 additions & 2 deletions src/server/client-manager/handlers/open.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ inline void open_handler(const char *const str) {
if (capio_cl_engine->isProducer(path, tid)) {
LOG("Thread is producer. allowing to continue with open");
client_manager->reply_to_client(tid, 1);
storage_service->createFile(path);
storage_service->createMemoryFile(path);
return;
}

if (std::filesystem::exists(path)) {
LOG("File already exists! allowing to continue with open");
client_manager->reply_to_client(tid, 1);
storage_service->createFile(path);

/*
* At this point, the file that needs to be created more likely than not is not local to the
* machine. As such, we call the creation of a new CapioRemoteFile
*/
storage_service->createRemoteFile(path);
return;
}

Expand Down
7 changes: 7 additions & 0 deletions src/server/communication-service/BackendInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ class NotImplementedBackendMethod : public std::exception {
class BackendInterface {
public:
virtual ~BackendInterface() = default;

virtual void connect_to(std::string remoteHost, const std::string &remotePort) {
throw NotImplementedBackendMethod();
};

/**
* @brief Send data to target
*
Expand Down Expand Up @@ -54,6 +59,8 @@ class BackendInterface {
*/
class NoBackend final : public BackendInterface {
public:
void connect_to(std::string remoteHost, const std::string &remotePort) override { return; };

void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath,
capio_off64_t start_offset) override {
return;
Expand Down
Loading