From 1ef0b01de2af1921ec8c7ba33aaffbedcd633dd1 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 4 Mar 2026 11:06:05 +0000 Subject: [PATCH 1/2] Removed dead batch code --- capio/common/requests.hpp | 12 +- capio/server/capio_server.cpp | 7 - capio/server/include/handlers/close.hpp | 33 ---- capio/server/include/handlers/getdents.hpp | 27 --- capio/server/include/handlers/read.hpp | 25 --- capio/server/include/remote/handlers/read.hpp | 165 ------------------ capio/server/include/remote/listener.hpp | 3 - capio/server/include/remote/requests.hpp | 45 ----- 8 files changed, 5 insertions(+), 312 deletions(-) diff --git a/capio/common/requests.hpp b/capio/common/requests.hpp index 533a68178..2ae70083a 100644 --- a/capio/common/requests.hpp +++ b/capio/common/requests.hpp @@ -30,13 +30,11 @@ constexpr const int CAPIO_NR_REQUESTS = 24; /*REQUESTS FOR SERVER TO SERVER COMMUNICATION*/ -constexpr const int CAPIO_SERVER_REQUEST_READ = 0; -constexpr const int CAPIO_SERVER_REQUEST_READ_REPLY = 1; -constexpr const int CAPIO_SERVER_REQUEST_READ_BATCH = 2; -constexpr const int CAPIO_SERVER_REQUEST_READ_BATCH_REPLY = 3; -constexpr const int CAPIO_SERVER_REQUEST_STAT = 4; -constexpr const int CAPIO_SERVER_REQUEST_STAT_REPLY = 5; +constexpr const int CAPIO_SERVER_REQUEST_READ = 0; +constexpr const int CAPIO_SERVER_REQUEST_READ_REPLY = 1; +constexpr const int CAPIO_SERVER_REQUEST_STAT = 2; +constexpr const int CAPIO_SERVER_REQUEST_STAT_REPLY = 3; -constexpr const int CAPIO_SERVER_NR_REQUEST = 6; +constexpr const int CAPIO_SERVER_NR_REQUEST = 4; #endif // CAPIO_COMMON_REQUESTS_HPP diff --git a/capio/server/capio_server.cpp b/capio/server/capio_server.cpp index c0c29afa8..d6fcb2a01 100644 --- a/capio/server/capio_server.cpp +++ b/capio/server/capio_server.cpp @@ -42,13 +42,6 @@ int n_servers; // name of the node char *node_name; -// application name -> set of files already sent -CSFilesSentMap_t files_sent; - -CSClientsRemotePendingNFilesMap_t clients_remote_pending_nfiles; - -std::mutex nfiles_mutex; - #include "handlers.hpp" #include "utils/location.hpp" #include "utils/signals.hpp" diff --git a/capio/server/include/handlers/close.hpp b/capio/server/include/handlers/close.hpp index b3478fd5d..f50a886e7 100644 --- a/capio/server/include/handlers/close.hpp +++ b/capio/server/include/handlers/close.hpp @@ -5,38 +5,6 @@ extern StorageManager *storage_manager; -inline void handle_pending_remote_nfiles(const std::filesystem::path &path) { - START_LOG(gettid(), "call(%s)", path.c_str()); - - std::lock_guard lg(nfiles_mutex); - - for (auto &p : clients_remote_pending_nfiles) { - - auto &[app, app_pending_nfiles] = p; - LOG("Handling pending files for app: %s", app.c_str()); - - for (const auto &[prefix, batch_size, dest, files_path, sem] : app_pending_nfiles) { - LOG("Expanded iterator: prefix=%s, batch_size=%ld, dest=%s [others missing....]", - prefix.c_str(), batch_size, dest.c_str()); - auto &files = files_sent[app]; - LOG("Obtained files for app %s", app.c_str()); - auto file_location_opt = get_file_location_opt(path); - LOG("Handling files for prefix: %s. batch size is: %d", prefix.c_str(), batch_size); - if (files.find(path) == files.end() && file_location_opt && - std::get<0>(file_location_opt->get()) == std::string(node_name) && - path.native().compare(0, prefix.native().length(), prefix) == 0) { - files_path->push_back(path); - files.insert(path); - LOG("Inserted file %s in batch", path.c_str()); - if (files_path->size() == batch_size) { - LOG("Waking up thread to handle batch, as batch is full and can be served"); - sem->unlock(); - } - } - } - } -} - inline void handle_close(int tid, int fd) { START_LOG(gettid(), "call(tid=%d, fd=%d)", tid, fd); @@ -57,7 +25,6 @@ inline void handle_close(int tid, int fd) { "starting batch handling", path.c_str()); c_file.set_complete(); - handle_pending_remote_nfiles(path); c_file.commit(); } diff --git a/capio/server/include/handlers/getdents.hpp b/capio/server/include/handlers/getdents.hpp index 797943802..dee5d77f0 100644 --- a/capio/server/include/handlers/getdents.hpp +++ b/capio/server/include/handlers/getdents.hpp @@ -53,18 +53,6 @@ inline void handle_getdents(int tid, int fd, long int count) { if (strcmp(std::get<0>(get_file_location(path_to_check)), node_name) == 0) { handle_getdents(tid, fd, count); } else { - const CapioFile &c_file = storage_manager->get(path_to_check); - const auto &remote_app = client_manager->getAppName(tid); - if (!c_file.is_complete()) { - if (const off64_t batch_size = - CapioCLEngine::get().getDirectoryFileCount(path_to_check); - batch_size > 0) { - handle_remote_read_batch_request(tid, fd, count, remote_app, - path_to_check.parent_path(), batch_size, - true); - return; - } - } request_remote_getdents(tid, fd, count); } }); @@ -76,21 +64,6 @@ inline void handle_getdents(int tid, int fd, long int count) { send_dirent_to_client(tid, fd, c_file, offset, count); } else { LOG("File is remote"); - CapioFile &c_file = storage_manager->get(path_to_check); - - if (!c_file.is_complete()) { - LOG("File not complete"); - const std::string &app_name_inner = client_manager->getAppName(tid); - LOG("Glob matched"); - std::string prefix = path_to_check.parent_path(); - off64_t batch_size = CapioCLEngine::get().getDirectoryFileCount(path_to_check); - if (batch_size > 0) { - LOG("Handling batch file"); - handle_remote_read_batch_request(tid, fd, count, app_name_inner, prefix, batch_size, - true); - return; - } - } LOG("Delegating to backend remote read"); request_remote_getdents(tid, fd, count); } diff --git a/capio/server/include/handlers/read.hpp b/capio/server/include/handlers/read.hpp index e1944d200..29b9c2bc1 100644 --- a/capio/server/include/handlers/read.hpp +++ b/capio/server/include/handlers/read.hpp @@ -125,17 +125,6 @@ void wait_for_file(const std::filesystem::path &path, int tid, int fd, off64_t c if (strcmp(std::get<0>(get_file_location(path)), node_name) == 0) { handle_local_read(tid, fd, count, false); } else { - const CapioFile &c_file = storage_manager->get(path); - const auto &remote_app = client_manager->getAppName(tid); - if (!c_file.is_complete()) { - std::string prefix = path.parent_path(); - off64_t batch_size = CapioCLEngine::get().getDirectoryFileCount(path); - if (batch_size > 0) { - handle_remote_read_batch_request(tid, fd, count, remote_app, prefix, batch_size, - false); - return; - } - } request_remote_read(tid, fd, count); } } @@ -163,20 +152,6 @@ inline void handle_read(int tid, int fd, off64_t count) { handle_local_read(tid, fd, count, is_prod); } else { LOG("File is remote"); - CapioFile &c_file = storage_manager->get(path); - if (!c_file.is_complete()) { - LOG("File not complete"); - const std::string &app_name = client_manager->getAppName(tid); - - std::string prefix = path.parent_path(); - off64_t batch_size = CapioCLEngine::get().getDirectoryFileCount(path); - if (batch_size > 0) { - LOG("Handling batch file"); - handle_remote_read_batch_request(tid, fd, count, app_name, prefix, batch_size, - false); - return; - } - } LOG("Delegating to backend remote read"); request_remote_read(tid, fd, count); } diff --git a/capio/server/include/remote/handlers/read.hpp b/capio/server/include/remote/handlers/read.hpp index 9b4a55c5f..787d4f854 100644 --- a/capio/server/include/remote/handlers/read.hpp +++ b/capio/server/include/remote/handlers/read.hpp @@ -33,41 +33,6 @@ inline void serve_remote_read(const std::filesystem::path &path, const std::stri backend->send_file(c_file.get_buffer() + offset, nbytes, dest); } -std::vector *files_available(const std::string &prefix, const std::string &app_name, - const std::string &path) { - START_LOG(gettid(), "call(prefix=%s, app_name=%s, path=%s)", prefix.c_str(), app_name.c_str(), - path.c_str()); - - auto files_to_send = new std::vector; - std::unordered_set &files = files_sent[app_name]; - const auto capio_file_opt = storage_manager->tryGet(path); - - if (capio_file_opt) { - if (capio_file_opt->get().is_complete()) { - files_to_send->emplace_back(path); - files.insert(path); - } - } else { - return files_to_send; - } - - for (auto &file_path : storage_manager->getPaths()) { - auto file_location_opt = get_file_location_opt(file_path); - - if (files.find(file_path) == files.end() && file_location_opt && - strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 && - file_path.native().compare(0, prefix.length(), prefix) == 0) { - - CapioFile &c_file = storage_manager->get(file_path); - if (c_file.is_complete() && !c_file.is_dir()) { - files_to_send->emplace_back(file_path); - files.insert(file_path); - } - } - } - return files_to_send; -} - inline void handle_read_reply(int tid, int fd, long count, off64_t file_size, off64_t nbytes, bool complete, bool is_getdents) { START_LOG( @@ -113,100 +78,6 @@ void wait_for_data(const std::filesystem::path &path, const std::string &dest, i serve_remote_read(path, dest, tid, fd, count, offset, c_file.is_complete(), is_getdents); } -inline void send_files_batch(const std::string &prefix, const std::string &dest, int tid, int fd, - off64_t count, bool is_getdents, - const std::vector *files_to_send) { - START_LOG(gettid(), "call(prefix=%s, dest=%s, tid=%d, fd=%d, count=%ld, is_getdents=%s)", - prefix.c_str(), dest.c_str(), tid, fd, count, is_getdents ? "true" : "false"); - - // send request - send_files_batch_request(prefix, tid, fd, count, is_getdents, dest, files_to_send); - - // send data - for (const std::string &path : *files_to_send) { - LOG("Sending file %s to target %s", path.c_str(), dest.c_str()); - CapioFile &c_file = storage_manager->get(path); - backend->send_file(c_file.get_buffer(), c_file.get_stored_size(), dest); - } -} - -void wait_for_files_batch(const std::filesystem::path &prefix, const std::string &dest, int tid, - int fd, off64_t count, bool is_getdents, - const std::vector *files, Semaphore *n_files_ready) { - START_LOG(gettid(), "call(prefix=%s, dest=%s, tid=%d, fd=%d, count=%ld, is_getdents=%s)", - prefix.c_str(), dest.c_str(), tid, fd, count, is_getdents ? "true" : "false"); - - n_files_ready->lock(); - LOG("Files are available. sending batch of files"); - send_files_batch(prefix, dest, tid, fd, count, is_getdents, files); - - delete n_files_ready; -} - -inline void handle_remote_read_batch(const std::filesystem::path &path, const std::string &dest, - int tid, int fd, off64_t count, off64_t batch_size, - const std::string &app_name, - const std::filesystem::path &prefix, bool is_getdents) { - START_LOG( - gettid(), - "call(path=%s, dest=%s, tid=%d, fd=%d, count=%ld, batch_size=%ld, app_name=%s, prefix=%s, " - "is_getdents=%s)", - path.c_str(), dest.c_str(), tid, fd, count, batch_size, app_name.c_str(), prefix.c_str(), - is_getdents ? "true" : "false"); - - // FIXME: this assignment always overrides the request parameter, which is never used - batch_size = CapioCLEngine::get().getDirectoryFileCount(path); - auto *files = files_available(prefix, app_name, path); - LOG("files==nullptr? %s", files == nullptr ? "true" : "false"); - if (files->size() == batch_size) { - LOG("files->size() == batch_size"); - send_files_batch(prefix, dest, tid, fd, count, is_getdents, files); - } else { - /* - * create a thread that waits for the completion of such - * files and then send those files - */ - LOG("files->size() != batch_size"); - auto *sem = new Semaphore(0); - std::thread t(wait_for_files_batch, prefix, dest, tid, fd, count, is_getdents, files, sem); - t.detach(); - LOG("Thread for batch started."); - std::lock_guard lg(nfiles_mutex); - clients_remote_pending_nfiles[app_name].emplace_back(prefix, batch_size, dest, files, sem); - } -} - -inline void -handle_remote_read_batch_reply(const std::string &source, int tid, int fd, off64_t count, - const std::vector> &files, - bool is_getdents) { - START_LOG(gettid(), "call(source=%s, tid=%d, fd=%d, count=%ld, is_getdents=%s)", source.c_str(), - tid, fd, count, is_getdents ? "true" : "false"); - - for (const auto &[path, nbytes] : files) { - auto c_file_opt = storage_manager->tryGet(path); - if (c_file_opt) { - CapioFile &c_file = c_file_opt->get(); - c_file.create_buffer_if_needed(path, false); - size_t file_shm_size = c_file.get_buf_size(); - if (nbytes > file_shm_size) { - c_file.expand_buffer(nbytes); - } - c_file.first_write = false; - } else { - add_file_location(path, source.c_str(), -1); - CapioFile &c_file = storage_manager->add(path, false, nbytes); - c_file.insert_sector(0, nbytes); - c_file.real_file_size = nbytes; - c_file.first_write = false; - c_file.set_complete(); - } - // as was done previously, write to the capio file buffer from its beginning - c_file_opt->get().read_from_node(source, 0, nbytes); - handle_read_reply(tid, fd, count, nbytes, nbytes, true, is_getdents); - } -} - inline void handle_remote_read(const std::filesystem::path &path, const std::string &source, int tid, int fd, off64_t count, off64_t offset, bool is_getdents) { START_LOG(gettid(), @@ -249,42 +120,6 @@ inline void handle_remote_read_reply(const std::string &source, int tid, int fd, handle_read_reply(tid, fd, count, file_size, nbytes, complete, is_getdents); } -void remote_read_batch_handler(const RemoteRequest &request) { - const std::string &dest = request.get_source(); - int tid, fd, is_getdents; - off64_t count, batch_size; - char path[PATH_MAX], app_name[512], prefix[PATH_MAX]; - sscanf(request.get_content(), "%s %d %d %ld %ld %s %s %d", path, &tid, &fd, &count, &batch_size, - app_name, prefix, &is_getdents); - handle_remote_read_batch(path, dest, tid, fd, count, batch_size, app_name, prefix, is_getdents); -} - -// TODO: refactor this -void remote_read_batch_reply_handler(const RemoteRequest &request) { - std::string dest = request.get_source(); - std::string path, prefix, tmp; - std::vector> files; - - std::istringstream content(request.get_content()); - std::getline(content, prefix, ' '); - std::getline(content, tmp, ' '); - int tid = std::stoi(tmp); - std::getline(content, tmp, ' '); - int fd = std::stoi(tmp); - std::getline(content, tmp, ' '); - off64_t count = std::stol(tmp); - std::getline(content, tmp, ' '); - bool is_getdents = std::stoi(tmp); - - while (getline(content, path, ' ')) { - path = prefix.append(path); - std::getline(content, tmp, ' '); - files.emplace_back(path, std::stol(tmp)); - } - - handle_remote_read_batch_reply(dest, tid, fd, count, files, is_getdents); -} - void remote_read_handler(const RemoteRequest &request) { const std::string &dest = request.get_source(); char path[PATH_MAX]; diff --git a/capio/server/include/remote/listener.hpp b/capio/server/include/remote/listener.hpp index 90c908edd..61f89efbf 100644 --- a/capio/server/include/remote/listener.hpp +++ b/capio/server/include/remote/listener.hpp @@ -16,9 +16,6 @@ build_server_request_handlers_table() { _server_request_handlers[CAPIO_SERVER_REQUEST_READ] = remote_read_handler; _server_request_handlers[CAPIO_SERVER_REQUEST_READ_REPLY] = remote_read_reply_handler; - _server_request_handlers[CAPIO_SERVER_REQUEST_READ_BATCH] = remote_read_batch_handler; - _server_request_handlers[CAPIO_SERVER_REQUEST_READ_BATCH_REPLY] = - remote_read_batch_reply_handler; _server_request_handlers[CAPIO_SERVER_REQUEST_STAT] = remote_stat_handler; _server_request_handlers[CAPIO_SERVER_REQUEST_STAT_REPLY] = remote_stat_reply_handler; diff --git a/capio/server/include/remote/requests.hpp b/capio/server/include/remote/requests.hpp index b7175cbd7..b7515fa25 100644 --- a/capio/server/include/remote/requests.hpp +++ b/capio/server/include/remote/requests.hpp @@ -34,28 +34,6 @@ inline void serve_remote_read_request(int tid, int fd, int count, long int nbyte backend->send_request(message.get(), size + 1, dest); } -inline void send_files_batch_request(const std::string &prefix, int tid, int fd, int count, - bool is_getdents, const std::string &dest, - const std::vector *files_to_send) { - START_LOG(gettid(), "call()"); - const char *const format = "%04d %s %d %d %d %d"; - const int size = snprintf(nullptr, 0, format, CAPIO_SERVER_REQUEST_READ_BATCH_REPLY, - prefix.c_str(), tid, fd, count, is_getdents); - const std::unique_ptr header(new char[size + 1]); - sprintf(header.get(), format, CAPIO_SERVER_REQUEST_READ_BATCH_REPLY, prefix.c_str(), tid, fd, - count, is_getdents); - std::string message(header.get()); - for (const std::string &path : *files_to_send) { - CapioFile &c_file = storage_manager->get(path); - message.append(" " + path.substr(prefix.length()) + " " + - std::to_string(c_file.get_stored_size())); - } - LOG("Message = %s", message.c_str()); - - // send request - backend->send_request(message.c_str(), message.length(), dest); -} - inline void handle_remote_stat_request(int tid, const std::filesystem::path &path) { START_LOG(gettid(), "call(tid=%d, path=%s)", tid, path.c_str()); @@ -71,29 +49,6 @@ inline void handle_remote_stat_request(int tid, const std::filesystem::path &pat LOG("message sent"); } -inline void handle_remote_read_batch_request(int tid, int fd, off64_t count, - const std::string &app_name, const std::string &prefix, - off64_t batch_size, bool is_getdents) { - START_LOG(gettid(), - "call(tid=%d, fd=%d, count=%ld, app_name=%s, prefix=%s, " - "batch_size=%ld, is_getdents=%s)", - tid, fd, count, app_name.c_str(), prefix.c_str(), batch_size, - is_getdents ? "true" : "false"); - - const std::filesystem::path &path = storage_manager->getPath(tid, fd); - std::string dest = std::get<0>(get_file_location(path)); - - const char *const format = "%04d %s %d %d %ld %ld %s %s %d"; - const int size = - snprintf(nullptr, 0, format, CAPIO_SERVER_REQUEST_READ_BATCH, path.c_str(), tid, fd, count, - batch_size, app_name.c_str(), prefix.c_str(), is_getdents); - const std::unique_ptr message(new char[size + 1]); - sprintf(message.get(), format, CAPIO_SERVER_REQUEST_READ_BATCH, path.c_str(), tid, fd, count, - batch_size, app_name.c_str(), prefix.c_str(), is_getdents); - LOG("Message = %s", message.get()); - backend->send_request(message.get(), size + 1, dest); -} - inline void handle_remote_read_request(int tid, int fd, off64_t count, bool is_getdents) { START_LOG(gettid(), "call(tid=%d, fd=%d, count=%ld, is_getdents=%s)", tid, fd, count, is_getdents ? "true" : "false"); From 6ee9dc1e5facec19584296da3e6c05c00dbf2447 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria <39337626+marcoSanti@users.noreply.github.com> Date: Thu, 5 Mar 2026 08:57:16 +0000 Subject: [PATCH 2/2] Update capio/server/include/handlers/close.hpp Co-authored-by: Iacopo Colonnelli --- capio/server/include/handlers/close.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/capio/server/include/handlers/close.hpp b/capio/server/include/handlers/close.hpp index f50a886e7..e9be35c6c 100644 --- a/capio/server/include/handlers/close.hpp +++ b/capio/server/include/handlers/close.hpp @@ -21,8 +21,7 @@ inline void handle_close(int tid, int fd) { if (CapioCLEngine::get().getCommitRule(path) == capiocl::commitRules::ON_CLOSE && c_file.is_closed()) { - LOG("Capio File %s is closed and commit rule is on_close. setting it to complete and " - "starting batch handling", + LOG("Capio File %s is closed and commit rule is on_close. setting it to complete", path.c_str()); c_file.set_complete(); c_file.commit();