Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions capio/common/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ constexpr const int CAPIO_NR_REQUESTS = 24;

/*REQUESTS FOR SERVER TO SERVER COMMUNICATION*/

constexpr const int CAPIO_SERVER_REQUEST_READ = 0;
constexpr const int CAPIO_SERVER_REQUEST_READ_REPLY = 1;
constexpr const int CAPIO_SERVER_REQUEST_READ_BATCH = 2;
constexpr const int CAPIO_SERVER_REQUEST_READ_BATCH_REPLY = 3;
constexpr const int CAPIO_SERVER_REQUEST_STAT = 4;
constexpr const int CAPIO_SERVER_REQUEST_STAT_REPLY = 5;
constexpr const int CAPIO_SERVER_REQUEST_READ = 0;
constexpr const int CAPIO_SERVER_REQUEST_READ_REPLY = 1;
constexpr const int CAPIO_SERVER_REQUEST_STAT = 2;
constexpr const int CAPIO_SERVER_REQUEST_STAT_REPLY = 3;

constexpr const int CAPIO_SERVER_NR_REQUEST = 6;
constexpr const int CAPIO_SERVER_NR_REQUEST = 4;

#endif // CAPIO_COMMON_REQUESTS_HPP
7 changes: 0 additions & 7 deletions capio/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ int n_servers;
// name of the node
char *node_name;

// application name -> set of files already sent
CSFilesSentMap_t files_sent;

CSClientsRemotePendingNFilesMap_t clients_remote_pending_nfiles;

std::mutex nfiles_mutex;

#include "handlers.hpp"
#include "utils/location.hpp"
#include "utils/signals.hpp"
Expand Down
36 changes: 1 addition & 35 deletions capio/server/include/handlers/close.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,6 @@

extern StorageManager *storage_manager;

inline void handle_pending_remote_nfiles(const std::filesystem::path &path) {
START_LOG(gettid(), "call(%s)", path.c_str());

std::lock_guard<std::mutex> lg(nfiles_mutex);

for (auto &p : clients_remote_pending_nfiles) {

auto &[app, app_pending_nfiles] = p;
LOG("Handling pending files for app: %s", app.c_str());

for (const auto &[prefix, batch_size, dest, files_path, sem] : app_pending_nfiles) {
LOG("Expanded iterator: prefix=%s, batch_size=%ld, dest=%s [others missing....]",
prefix.c_str(), batch_size, dest.c_str());
auto &files = files_sent[app];
LOG("Obtained files for app %s", app.c_str());
auto file_location_opt = get_file_location_opt(path);
LOG("Handling files for prefix: %s. batch size is: %d", prefix.c_str(), batch_size);
if (files.find(path) == files.end() && file_location_opt &&
std::get<0>(file_location_opt->get()) == std::string(node_name) &&
path.native().compare(0, prefix.native().length(), prefix) == 0) {
files_path->push_back(path);
files.insert(path);
LOG("Inserted file %s in batch", path.c_str());
if (files_path->size() == batch_size) {
LOG("Waking up thread to handle batch, as batch is full and can be served");
sem->unlock();
}
}
}
}
}

inline void handle_close(int tid, int fd) {
START_LOG(gettid(), "call(tid=%d, fd=%d)", tid, fd);

Expand All @@ -53,11 +21,9 @@ inline void handle_close(int tid, int fd) {

if (CapioCLEngine::get().getCommitRule(path) == capiocl::commitRules::ON_CLOSE &&
c_file.is_closed()) {
LOG("Capio File %s is closed and commit rule is on_close. setting it to complete and "
"starting batch handling",
LOG("Capio File %s is closed and commit rule is on_close. setting it to complete",
path.c_str());
c_file.set_complete();
handle_pending_remote_nfiles(path);
c_file.commit();
}

Expand Down
27 changes: 0 additions & 27 deletions capio/server/include/handlers/getdents.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,6 @@ inline void handle_getdents(int tid, int fd, long int count) {
if (strcmp(std::get<0>(get_file_location(path_to_check)), node_name) == 0) {
handle_getdents(tid, fd, count);
} else {
const CapioFile &c_file = storage_manager->get(path_to_check);
const auto &remote_app = client_manager->getAppName(tid);
if (!c_file.is_complete()) {
if (const off64_t batch_size =
CapioCLEngine::get().getDirectoryFileCount(path_to_check);
batch_size > 0) {
handle_remote_read_batch_request(tid, fd, count, remote_app,
path_to_check.parent_path(), batch_size,
true);
return;
}
}
request_remote_getdents(tid, fd, count);
}
});
Expand All @@ -76,21 +64,6 @@ inline void handle_getdents(int tid, int fd, long int count) {
send_dirent_to_client(tid, fd, c_file, offset, count);
} else {
LOG("File is remote");
CapioFile &c_file = storage_manager->get(path_to_check);

if (!c_file.is_complete()) {
LOG("File not complete");
const std::string &app_name_inner = client_manager->getAppName(tid);
LOG("Glob matched");
std::string prefix = path_to_check.parent_path();
off64_t batch_size = CapioCLEngine::get().getDirectoryFileCount(path_to_check);
if (batch_size > 0) {
LOG("Handling batch file");
handle_remote_read_batch_request(tid, fd, count, app_name_inner, prefix, batch_size,
true);
return;
}
}
LOG("Delegating to backend remote read");
request_remote_getdents(tid, fd, count);
}
Expand Down
25 changes: 0 additions & 25 deletions capio/server/include/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,6 @@ void wait_for_file(const std::filesystem::path &path, int tid, int fd, off64_t c
if (strcmp(std::get<0>(get_file_location(path)), node_name) == 0) {
handle_local_read(tid, fd, count, false);
} else {
const CapioFile &c_file = storage_manager->get(path);
const auto &remote_app = client_manager->getAppName(tid);
if (!c_file.is_complete()) {
std::string prefix = path.parent_path();
off64_t batch_size = CapioCLEngine::get().getDirectoryFileCount(path);
if (batch_size > 0) {
handle_remote_read_batch_request(tid, fd, count, remote_app, prefix, batch_size,
false);
return;
}
}
request_remote_read(tid, fd, count);
}
}
Expand Down Expand Up @@ -163,20 +152,6 @@ inline void handle_read(int tid, int fd, off64_t count) {
handle_local_read(tid, fd, count, is_prod);
} else {
LOG("File is remote");
CapioFile &c_file = storage_manager->get(path);
if (!c_file.is_complete()) {
LOG("File not complete");
const std::string &app_name = client_manager->getAppName(tid);

std::string prefix = path.parent_path();
off64_t batch_size = CapioCLEngine::get().getDirectoryFileCount(path);
if (batch_size > 0) {
LOG("Handling batch file");
handle_remote_read_batch_request(tid, fd, count, app_name, prefix, batch_size,
false);
return;
}
}
LOG("Delegating to backend remote read");
request_remote_read(tid, fd, count);
}
Expand Down
165 changes: 0 additions & 165 deletions capio/server/include/remote/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,6 @@ inline void serve_remote_read(const std::filesystem::path &path, const std::stri
backend->send_file(c_file.get_buffer() + offset, nbytes, dest);
}

std::vector<std::string> *files_available(const std::string &prefix, const std::string &app_name,
const std::string &path) {
START_LOG(gettid(), "call(prefix=%s, app_name=%s, path=%s)", prefix.c_str(), app_name.c_str(),
path.c_str());

auto files_to_send = new std::vector<std::string>;
std::unordered_set<std::string> &files = files_sent[app_name];
const auto capio_file_opt = storage_manager->tryGet(path);

if (capio_file_opt) {
if (capio_file_opt->get().is_complete()) {
files_to_send->emplace_back(path);
files.insert(path);
}
} else {
return files_to_send;
}

for (auto &file_path : storage_manager->getPaths()) {
auto file_location_opt = get_file_location_opt(file_path);

if (files.find(file_path) == files.end() && file_location_opt &&
strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 &&
file_path.native().compare(0, prefix.length(), prefix) == 0) {

CapioFile &c_file = storage_manager->get(file_path);
if (c_file.is_complete() && !c_file.is_dir()) {
files_to_send->emplace_back(file_path);
files.insert(file_path);
}
}
}
return files_to_send;
}

inline void handle_read_reply(int tid, int fd, long count, off64_t file_size, off64_t nbytes,
bool complete, bool is_getdents) {
START_LOG(
Expand Down Expand Up @@ -113,100 +78,6 @@ void wait_for_data(const std::filesystem::path &path, const std::string &dest, i
serve_remote_read(path, dest, tid, fd, count, offset, c_file.is_complete(), is_getdents);
}

inline void send_files_batch(const std::string &prefix, const std::string &dest, int tid, int fd,
off64_t count, bool is_getdents,
const std::vector<std::string> *files_to_send) {
START_LOG(gettid(), "call(prefix=%s, dest=%s, tid=%d, fd=%d, count=%ld, is_getdents=%s)",
prefix.c_str(), dest.c_str(), tid, fd, count, is_getdents ? "true" : "false");

// send request
send_files_batch_request(prefix, tid, fd, count, is_getdents, dest, files_to_send);

// send data
for (const std::string &path : *files_to_send) {
LOG("Sending file %s to target %s", path.c_str(), dest.c_str());
CapioFile &c_file = storage_manager->get(path);
backend->send_file(c_file.get_buffer(), c_file.get_stored_size(), dest);
}
}

void wait_for_files_batch(const std::filesystem::path &prefix, const std::string &dest, int tid,
int fd, off64_t count, bool is_getdents,
const std::vector<std::string> *files, Semaphore *n_files_ready) {
START_LOG(gettid(), "call(prefix=%s, dest=%s, tid=%d, fd=%d, count=%ld, is_getdents=%s)",
prefix.c_str(), dest.c_str(), tid, fd, count, is_getdents ? "true" : "false");

n_files_ready->lock();
LOG("Files are available. sending batch of files");
send_files_batch(prefix, dest, tid, fd, count, is_getdents, files);

delete n_files_ready;
}

inline void handle_remote_read_batch(const std::filesystem::path &path, const std::string &dest,
int tid, int fd, off64_t count, off64_t batch_size,
const std::string &app_name,
const std::filesystem::path &prefix, bool is_getdents) {
START_LOG(
gettid(),
"call(path=%s, dest=%s, tid=%d, fd=%d, count=%ld, batch_size=%ld, app_name=%s, prefix=%s, "
"is_getdents=%s)",
path.c_str(), dest.c_str(), tid, fd, count, batch_size, app_name.c_str(), prefix.c_str(),
is_getdents ? "true" : "false");

// FIXME: this assignment always overrides the request parameter, which is never used
batch_size = CapioCLEngine::get().getDirectoryFileCount(path);
auto *files = files_available(prefix, app_name, path);
LOG("files==nullptr? %s", files == nullptr ? "true" : "false");
if (files->size() == batch_size) {
LOG("files->size() == batch_size");
send_files_batch(prefix, dest, tid, fd, count, is_getdents, files);
} else {
/*
* create a thread that waits for the completion of such
* files and then send those files
*/
LOG("files->size() != batch_size");
auto *sem = new Semaphore(0);
std::thread t(wait_for_files_batch, prefix, dest, tid, fd, count, is_getdents, files, sem);
t.detach();
LOG("Thread for batch started.");
std::lock_guard<std::mutex> lg(nfiles_mutex);
clients_remote_pending_nfiles[app_name].emplace_back(prefix, batch_size, dest, files, sem);
}
}

inline void
handle_remote_read_batch_reply(const std::string &source, int tid, int fd, off64_t count,
const std::vector<std::pair<std::filesystem::path, off64_t>> &files,
bool is_getdents) {
START_LOG(gettid(), "call(source=%s, tid=%d, fd=%d, count=%ld, is_getdents=%s)", source.c_str(),
tid, fd, count, is_getdents ? "true" : "false");

for (const auto &[path, nbytes] : files) {
auto c_file_opt = storage_manager->tryGet(path);
if (c_file_opt) {
CapioFile &c_file = c_file_opt->get();
c_file.create_buffer_if_needed(path, false);
size_t file_shm_size = c_file.get_buf_size();
if (nbytes > file_shm_size) {
c_file.expand_buffer(nbytes);
}
c_file.first_write = false;
} else {
add_file_location(path, source.c_str(), -1);
CapioFile &c_file = storage_manager->add(path, false, nbytes);
c_file.insert_sector(0, nbytes);
c_file.real_file_size = nbytes;
c_file.first_write = false;
c_file.set_complete();
}
// as was done previously, write to the capio file buffer from its beginning
c_file_opt->get().read_from_node(source, 0, nbytes);
handle_read_reply(tid, fd, count, nbytes, nbytes, true, is_getdents);
}
}

inline void handle_remote_read(const std::filesystem::path &path, const std::string &source,
int tid, int fd, off64_t count, off64_t offset, bool is_getdents) {
START_LOG(gettid(),
Expand Down Expand Up @@ -249,42 +120,6 @@ inline void handle_remote_read_reply(const std::string &source, int tid, int fd,
handle_read_reply(tid, fd, count, file_size, nbytes, complete, is_getdents);
}

void remote_read_batch_handler(const RemoteRequest &request) {
const std::string &dest = request.get_source();
int tid, fd, is_getdents;
off64_t count, batch_size;
char path[PATH_MAX], app_name[512], prefix[PATH_MAX];
sscanf(request.get_content(), "%s %d %d %ld %ld %s %s %d", path, &tid, &fd, &count, &batch_size,
app_name, prefix, &is_getdents);
handle_remote_read_batch(path, dest, tid, fd, count, batch_size, app_name, prefix, is_getdents);
}

// TODO: refactor this
void remote_read_batch_reply_handler(const RemoteRequest &request) {
std::string dest = request.get_source();
std::string path, prefix, tmp;
std::vector<std::pair<std::filesystem::path, off64_t>> files;

std::istringstream content(request.get_content());
std::getline(content, prefix, ' ');
std::getline(content, tmp, ' ');
int tid = std::stoi(tmp);
std::getline(content, tmp, ' ');
int fd = std::stoi(tmp);
std::getline(content, tmp, ' ');
off64_t count = std::stol(tmp);
std::getline(content, tmp, ' ');
bool is_getdents = std::stoi(tmp);

while (getline(content, path, ' ')) {
path = prefix.append(path);
std::getline(content, tmp, ' ');
files.emplace_back(path, std::stol(tmp));
}

handle_remote_read_batch_reply(dest, tid, fd, count, files, is_getdents);
}

void remote_read_handler(const RemoteRequest &request) {
const std::string &dest = request.get_source();
char path[PATH_MAX];
Expand Down
3 changes: 0 additions & 3 deletions capio/server/include/remote/listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ build_server_request_handlers_table() {

_server_request_handlers[CAPIO_SERVER_REQUEST_READ] = remote_read_handler;
_server_request_handlers[CAPIO_SERVER_REQUEST_READ_REPLY] = remote_read_reply_handler;
_server_request_handlers[CAPIO_SERVER_REQUEST_READ_BATCH] = remote_read_batch_handler;
_server_request_handlers[CAPIO_SERVER_REQUEST_READ_BATCH_REPLY] =
remote_read_batch_reply_handler;
_server_request_handlers[CAPIO_SERVER_REQUEST_STAT] = remote_stat_handler;
_server_request_handlers[CAPIO_SERVER_REQUEST_STAT_REPLY] = remote_stat_reply_handler;

Expand Down
Loading