diff --git a/src/common/capio/filesystem.hpp b/src/common/capio/filesystem.hpp index bd295df86..eafdb1018 100644 --- a/src/common/capio/filesystem.hpp +++ b/src/common/capio/filesystem.hpp @@ -106,7 +106,7 @@ inline bool is_capio_path(const std::filesystem::path &path_to_check) { START_LOG(gettid(), "call(capio_path=%s)", capio_path.c_str()); auto computed = replaceSymbol(capio_path, '.', "\\."); computed = replaceSymbol(computed, '/', "\\/"); - computed = replaceSymbol(computed, '*', R"([a-zA-Z0-9\/\.\-_]*)"); + computed = replaceSymbol(computed, '*', R"([a-zA-Z0-9\/\.\-_:]*)"); computed = replaceSymbol(computed, '+', "."); LOG("Computed regex: %s", computed.c_str()); return std::regex(computed); diff --git a/src/common/capio/requests.hpp b/src/common/capio/requests.hpp index 5e4b68863..627cdff92 100644 --- a/src/common/capio/requests.hpp +++ b/src/common/capio/requests.hpp @@ -1,20 +1,21 @@ #ifndef CAPIO_COMMON_REQUESTS_HPP #define CAPIO_COMMON_REQUESTS_HPP -constexpr const int CAPIO_REQUEST_CONSENT = 0; -constexpr const int CAPIO_REQUEST_CLOSE = 1; -constexpr const int CAPIO_REQUEST_CREATE = 2; -constexpr const int CAPIO_REQUEST_EXIT_GROUP = 3; -constexpr const int CAPIO_REQUEST_HANDSHAKE = 4; -constexpr const int CAPIO_REQUEST_MKDIR = 5; -constexpr const int CAPIO_REQUEST_OPEN = 6; -constexpr const int CAPIO_REQUEST_READ = 7; -constexpr const int CAPIO_REQUEST_READ_MEM = 8; -constexpr const int CAPIO_REQUEST_RENAME = 9; -constexpr const int CAPIO_REQUEST_WRITE = 10; -constexpr const int CAPIO_REQUEST_WRITE_MEM = 11; -constexpr const int CAPIO_REQUEST_QUERY_MEM_FILE = 12; +constexpr const int CAPIO_REQUEST_CONSENT = 0; +constexpr const int CAPIO_REQUEST_CLOSE = 1; +constexpr const int CAPIO_REQUEST_CREATE = 2; +constexpr const int CAPIO_REQUEST_EXIT_GROUP = 3; +constexpr const int CAPIO_REQUEST_HANDSHAKE = 4; +constexpr const int CAPIO_REQUEST_MKDIR = 5; +constexpr const int CAPIO_REQUEST_OPEN = 6; +constexpr const int CAPIO_REQUEST_READ = 7; +constexpr const int CAPIO_REQUEST_READ_MEM = 8; +constexpr const int CAPIO_REQUEST_RENAME = 9; +constexpr const int CAPIO_REQUEST_WRITE = 10; +constexpr const int CAPIO_REQUEST_WRITE_MEM = 11; +constexpr const int CAPIO_REQUEST_QUERY_MEM_FILE = 12; +constexpr const int CAPIO_REQUEST_POSIX_DIR_COMMITTED = 13; -constexpr const int CAPIO_NR_REQUESTS = 13; +constexpr const int CAPIO_NR_REQUESTS = 14; #endif // CAPIO_COMMON_REQUESTS_HPP diff --git a/src/posix/handlers.hpp b/src/posix/handlers.hpp index 653620fa0..6cbd7172d 100644 --- a/src/posix/handlers.hpp +++ b/src/posix/handlers.hpp @@ -1,6 +1,10 @@ #ifndef CAPIO_POSIX_HANDLERS_HPP #define CAPIO_POSIX_HANDLERS_HPP +/********************/ +// SYSCALL HANDLERS // +/********************/ + #include "handlers/access.hpp" #include "handlers/chdir.hpp" #include "handlers/close.hpp" @@ -26,4 +30,10 @@ #include "handlers/unlink.hpp" #include "handlers/write.hpp" +/********************/ +// POSIX HANDLERS // +/********************/ + +#include "handlers/posix_readdir.hpp" + #endif // CAPIO_POSIX_HANDLERS_HPP diff --git a/src/posix/handlers/open.hpp b/src/posix/handlers/open.hpp index 65340469f..f8424a25d 100644 --- a/src/posix/handlers/open.hpp +++ b/src/posix/handlers/open.hpp @@ -103,7 +103,8 @@ int openat_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a int flags = static_cast(arg2); mode_t mode = static_cast(arg3); auto tid = static_cast(syscall_no_intercept(SYS_gettid)); - START_LOG(tid, "call(path=%s, flags=%d, mode=%d)", pathname.data(), flags, mode); + START_LOG(tid, "call(dirfd=%ld, path=%s, flags=%d, mode=%d)", dirfd, pathname.data(), flags, + mode); std::string path = compute_abs_path(pathname.data(), dirfd); diff --git a/src/posix/handlers/posix_readdir.hpp b/src/posix/handlers/posix_readdir.hpp new file mode 100644 index 000000000..2f9c8d8d6 --- /dev/null +++ b/src/posix/handlers/posix_readdir.hpp @@ -0,0 +1,297 @@ +#ifndef POSIX_READDIR_HPP +#define POSIX_READDIR_HPP + +#include +#include +#include +#include +#include +#include +#include + +// Map &DIR -> +inline std::unordered_map> opened_directory; + +inline std::unordered_map *> *directory_items; + +inline std::unordered_map directory_commit_token_path; + +inline int count_files_in_directory(const char *path) { + static struct dirent64 *(*real_readdir64)(DIR *) = NULL; + static DIR *(*real_opendir)(const char *) = NULL; + static int (*real_closedir)(DIR *) = NULL; + + START_LOG(capio_syscall(SYS_gettid), "call(path=%s)", path); + syscall_no_intercept_flag = true; + if (!real_readdir64) { + real_readdir64 = (struct dirent64 * (*) (DIR *) ) dlsym(RTLD_NEXT, "readdir64"); + } + + if (!real_opendir) { + real_opendir = (DIR * (*) (const char *) ) dlsym(RTLD_NEXT, "opendir"); + } + + if (!real_closedir) { + real_closedir = (int (*)(DIR *)) dlsym(RTLD_NEXT, "closedir"); + } + + struct dirent64 *entry; + DIR *dir = real_opendir(path); + int count = 0; + + while ((entry = real_readdir64(dir)) != NULL) { + + std::filesystem::path dir_abs_path(entry->d_name); + + if (!(strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)) { + LOG("Entry name is %s. computing absolute path", entry->d_name); + dir_abs_path = std::filesystem::path(path) / entry->d_name; + LOG("Directory abs path = %s", dir_abs_path.c_str()); + } + + if (auto directory_object = directory_items->find(dir_abs_path.c_str()); + directory_object == directory_items->end()) { + LOG("Directory vector not present. Adding it at path %s", path); + directory_items->emplace(path, new std::vector()); + } + + auto directory_object = directory_items->at(path); + + auto itm = std::find_if(directory_object->begin(), directory_object->end(), + [&](const dirent64 *_scope_entry) { + return std::string(entry->d_name) == _scope_entry->d_name; + }); + + if (itm == directory_object->end()) { + LOG("Item %s is not stored within internal capio data structure. adding it", + dir_abs_path.c_str()); + auto *new_entry = new dirent64(); + memcpy(new_entry->d_name, entry->d_name, sizeof(entry->d_name)); + new_entry->d_ino = entry->d_ino; + new_entry->d_off = entry->d_off; + new_entry->d_reclen = entry->d_reclen; + new_entry->d_type = entry->d_type; + directory_object->emplace_back(new_entry); + } + count++; + } + + LOG("Found %ld items.", count); + + real_closedir(dir); + syscall_no_intercept_flag = false; + return count; +} + +DIR *opendir(const char *name) { + + auto tmp = std::string(name); + + START_LOG(capio_syscall(SYS_gettid), "call(path=%s)", tmp.c_str()); + + auto absolute_path = capio_absolute(name); + + LOG("Resolved absolute path = %s", absolute_path.c_str()); + + static DIR *(*real_opendir)(const char *) = NULL; + + if (!real_opendir) { + syscall_no_intercept_flag = true; + real_opendir = (DIR * (*) (const char *) ) dlsym(RTLD_NEXT, "opendir"); + syscall_no_intercept_flag = false; + if (!real_opendir) { + ERR_EXIT("Failed to find original opendir: %s\n", dlerror()); + } + } + + if (directory_items == nullptr) { + directory_items = new std::unordered_map *>(); + } + + if (!is_capio_path(absolute_path)) { + LOG("Not a CAPIO path. continuing execution"); + syscall_no_intercept_flag = true; + auto dir = real_opendir(absolute_path.c_str()); + syscall_no_intercept_flag = false; + + return dir; + } + + LOG("Performing consent request to open directory %s", absolute_path.c_str()); + consent_request_cache_fs->consent_request(absolute_path.c_str(), gettid(), __FUNCTION__); + + syscall_no_intercept_flag = true; + auto dir = real_opendir(absolute_path.c_str()); + syscall_no_intercept_flag = false; + + LOG("Opened directory with offset %ld", dir); + opened_directory.insert( + {reinterpret_cast(dir), {std::string(absolute_path), 0}}); + directory_items->emplace(std::string(absolute_path), new std::vector()); + + auto fd = dirfd(dir); + LOG("File descriptor for directory %s is %d", absolute_path.c_str(), fd); + + add_capio_fd(capio_syscall(SYS_gettid), absolute_path.c_str(), fd, 0, 0); + + return dir; +} + +int closedir(DIR *dirp) { + START_LOG(capio_syscall(SYS_gettid), "call(dir=%ld)", dirp); + + static int (*real_closedir)(DIR *) = NULL; + if (!real_closedir) { + syscall_no_intercept_flag = true; + real_closedir = (int (*)(DIR *)) dlsym(RTLD_NEXT, "closedir"); + syscall_no_intercept_flag = false; + if (!real_closedir) { + ERR_EXIT("Failed to find original closedir: %s\n", dlerror()); + } + } + + if (const auto pos = opened_directory.find(reinterpret_cast(dirp)); + pos != opened_directory.end()) { + + syscall_no_intercept_flag = true; + delete_capio_fd(dirfd(dirp)); + syscall_no_intercept_flag = false; + + if (auto pos1 = directory_items->find(pos->second.first); pos1 != directory_items->end()) { + directory_items->erase(pos1); + } + opened_directory.erase(pos); + LOG("removed dir from map of opened files"); + } + + syscall_no_intercept_flag = true; + auto return_code = real_closedir(dirp); + syscall_no_intercept_flag = false; + LOG("Return code of closedir = %d", return_code); + + return return_code; +} + +inline struct dirent64 *capio_internal_readdir(DIR *dirp, long pid) { + START_LOG(pid, "call(dirp=%ld)", dirp); + + const auto directory_path = + std::get<0>(opened_directory.at(reinterpret_cast(dirp))); + + if (directory_commit_token_path.find(directory_path) == directory_commit_token_path.end()) { + char token_path[PATH_MAX]{0}; + posix_directory_committed_request(pid, directory_path.c_str(), token_path); + LOG("Inserting token path %s", token_path); + directory_commit_token_path.insert({directory_path, token_path}); + } + + const auto token_path = directory_commit_token_path.at(directory_path); + + if (const auto item = opened_directory.find(reinterpret_cast(dirp)); + item != opened_directory.end() || std::filesystem::exists(token_path)) { + LOG("Found dirp."); + const auto dir_path_name = std::get<0>(item->second); + const auto capio_internal_offset = std::get<1>(item->second); + + auto files_in_directory = count_files_in_directory(dir_path_name.c_str()); + LOG("There are %ld files inside %s", files_in_directory, dir_path_name.c_str()); + while (files_in_directory <= capio_internal_offset) { + LOG("Not enough files: expected %ld, got %ld... waiting", files_in_directory, + capio_internal_offset); + LOG("Checking for commit token existence (%s)", token_path.c_str()); + syscall_no_intercept_flag = true; + bool is_committed = std::filesystem::exists(token_path); + syscall_no_intercept_flag = false; + LOG("File %s committed", is_committed ? "is" : "is not"); + if (is_committed) { + LOG("Returning NULL as result"); + errno = 0; + return NULL; + } + + struct timespec req{0}; + req.tv_sec = 0; + req.tv_nsec = 100 * 1000000L; // 100 ms + syscall_no_intercept(SYS_nanosleep, &req, NULL); + files_in_directory = count_files_in_directory(dir_path_name.c_str()); + LOG("There are %ld files inside %s", files_in_directory, dir_path_name.c_str()); + } + + LOG("Returning item %d", std::get<1>(item->second)); + + char real_path[PATH_MAX]; + capio_realpath(dir_path_name.c_str(), real_path); + + LOG("Getting files inside directory %s", real_path); + + const auto return_value = directory_items->at(real_path)->at(std::get<1>(item->second)); + std::get<1>(item->second)++; + + LOG("Returned dirent structure:"); + LOG("dirent.d_name = %s", return_value->d_name); + LOG("dirent.d_type = %d", return_value->d_type); + LOG("dirent.d_ino = %d", return_value->d_ino); + LOG("dirent.d_off = %d", return_value->d_off); + LOG("dirent.d_reclen = %d", return_value->d_reclen); + return return_value; + } + LOG("Reached end of branch... something might be amiss.. returning EOS"); + errno = 0; + return NULL; +} + +struct dirent *readdir(DIR *dirp) { + long pid = capio_syscall(SYS_gettid); + START_LOG(pid, "call(dir=%ld)", dirp); + + static struct dirent *(*real_readdir)(DIR *) = NULL; + if (!real_readdir) { + LOG("Loading real glibc method"); + syscall_no_intercept_flag = true; + real_readdir = (struct dirent * (*) (DIR *) ) dlsym(RTLD_NEXT, "readdir"); + syscall_no_intercept_flag = false; + } + + if (opened_directory.find(reinterpret_cast(dirp)) == + opened_directory.end()) { + LOG("Directory is not handled by CAPIO. Returning false"); + syscall_no_intercept_flag = true; + auto result = real_readdir(dirp); + syscall_no_intercept_flag = false; + + return result; + } + + struct dirent64 *capio_internal_dirent64 = capio_internal_readdir(dirp, pid); + LOG("return value == NULL ? %s", capio_internal_dirent64 == NULL ? "TRUE" : "FALSE"); + return reinterpret_cast(capio_internal_dirent64); +} + +struct dirent64 *readdir64(DIR *dirp) { + long pid = capio_syscall(SYS_gettid); + START_LOG(pid, "call(dir=%ld)", dirp); + + static struct dirent64 *(*real_readdir64)(DIR *) = NULL; + if (!real_readdir64) { + LOG("Loading real glibc method"); + syscall_no_intercept_flag = true; + real_readdir64 = (struct dirent64 * (*) (DIR *) ) dlsym(RTLD_NEXT, "readdir64"); + syscall_no_intercept_flag = false; + } + + if (opened_directory.find(reinterpret_cast(dirp)) == + opened_directory.end()) { + LOG("Directory is not handled by CAPIO. Returning false"); + syscall_no_intercept_flag = true; + auto result = real_readdir64(dirp); + syscall_no_intercept_flag = false; + + return result; + } + + auto capio_internal_dirent64 = capio_internal_readdir(dirp, pid); + LOG("return value == NULL ? %s", capio_internal_dirent64 == NULL ? "TRUE" : "FALSE"); + return capio_internal_dirent64; +} + +#endif // POSIX_READDIR_HPP diff --git a/src/posix/utils/requests.hpp b/src/posix/utils/requests.hpp index e32858515..661642cad 100644 --- a/src/posix/utils/requests.hpp +++ b/src/posix/utils/requests.hpp @@ -26,7 +26,6 @@ inline thread_local SPSCQueue *stc_queue; * @return */ inline void init_client() { - buf_requests = new CircularBuffer(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE); bufs_response = new std::unordered_map(); @@ -75,6 +74,23 @@ inline std::vector *file_in_memory_request(const long pid) { return regex_vector; } +inline capio_off64_t posix_directory_committed_request(const long pid, + const std::filesystem::path &path, + char *token_path) { + START_LOG(capio_syscall(SYS_gettid), "call(path=%s)", path.c_str()); + char req[CAPIO_REQ_MAX_SIZE]; + + sprintf(req, "%04d %ld %s ", CAPIO_REQUEST_POSIX_DIR_COMMITTED, pid, path.c_str()); + buf_requests->write(req, CAPIO_REQ_MAX_SIZE); + LOG("Sent query for directory committement"); + capio_off64_t path_len = bufs_response->at(pid)->read(); + LOG("Directory %s has the token length of %llu", path.c_str(), path_len); + + stc_queue->read(token_path, path_len); + LOG("commit token path will exist at %s", token_path); + return path_len; +} + // non blocking inline void close_request(const std::filesystem::path &path, const long tid) { START_LOG(capio_syscall(SYS_gettid), "call(path=%s, tid=%ld)", path.c_str(), tid); @@ -125,4 +141,4 @@ inline void rename_request(const std::filesystem::path &old_path, #include "utils/storage.hpp" -#endif // CAPIO_POSIX_UTILS_REQUESTS_HPP \ No newline at end of file +#endif // CAPIO_POSIX_UTILS_REQUESTS_HPP diff --git a/src/server/capio-cl-engine/capio_cl_engine.hpp b/src/server/capio-cl-engine/capio_cl_engine.hpp index bbd8ac3cc..89214524c 100644 --- a/src/server/capio-cl-engine/capio_cl_engine.hpp +++ b/src/server/capio-cl-engine/capio_cl_engine.hpp @@ -141,6 +141,7 @@ class CapioCLEngine { * @return */ bool contains(const std::filesystem::path &file) { + START_LOG(gettid(), "call(file=%s)", file.c_str()); return std::any_of(_locations.begin(), _locations.end(), [&](auto &itm) { return std::regex_match(file.c_str(), std::get<10>(itm.second)); }); @@ -377,6 +378,12 @@ class CapioCLEngine { void setFileDeps(const std::filesystem::path &path, const std::vector &dependencies) { START_LOG(gettid(), "call()"); + if (dependencies.empty()) { + return; + } + if (_locations.find(path) == _locations.end()) { + this->newFile(path); + } std::get<9>(_locations.at(path)) = dependencies; for (const auto &itm : dependencies) { LOG("Creating new fie (if it exists) for path %s", itm.c_str()); @@ -431,6 +438,13 @@ class CapioCLEngine { return files; } + + std::vector getPathsInConfig() { + std::vector paths; + std::transform(_locations.begin(), _locations.end(), std::back_inserter(paths), + [](auto pair) { return pair.first; }); + return paths; + } }; inline CapioCLEngine *capio_cl_engine; diff --git a/src/server/client-manager/handlers/posix_readdir.hpp b/src/server/client-manager/handlers/posix_readdir.hpp new file mode 100644 index 000000000..3e9377d0d --- /dev/null +++ b/src/server/client-manager/handlers/posix_readdir.hpp @@ -0,0 +1,18 @@ + +#ifndef POSIX_READDIR_HPP +#define POSIX_READDIR_HPP + +inline void posix_readdir_handler(const char *const str) { + pid_t pid; + char path[PATH_MAX]; + sscanf(str, "%d %s", &pid, path); + START_LOG(gettid(), "call(pid=%d, path=%s", pid, path); + + auto metadata_token = file_manager->getMetadataPath(path); + LOG("sending to pid %ld token path of %s", pid, metadata_token.c_str()); + + client_manager->reply_to_client(pid, metadata_token.length()); + storage_service->reply_to_client_raw(pid, metadata_token.c_str(), metadata_token.length()); +} + +#endif // POSIX_READDIR_HPP diff --git a/src/server/client-manager/request_handler_engine.hpp b/src/server/client-manager/request_handler_engine.hpp index fc96a0857..db1c340af 100644 --- a/src/server/client-manager/request_handler_engine.hpp +++ b/src/server/client-manager/request_handler_engine.hpp @@ -8,7 +8,7 @@ #include "file-manager/file_manager.hpp" /* - * REQUESTS handlers + * SYSCALL REQUESTS handlers */ #include "handlers/close.hpp" #include "handlers/consent.hpp" @@ -21,6 +21,11 @@ #include "handlers/rename.hpp" #include "handlers/write.hpp" +/* + * POSIX GLIBC REQUESTS handlers + */ +#include "handlers/posix_readdir.hpp" + /** * @brief Class that handles the system calls received from the posix client application * @@ -32,19 +37,20 @@ class RequestHandlerEngine { static constexpr std::array build_request_handlers_table() { std::array _request_handlers{0}; - _request_handlers[CAPIO_REQUEST_CONSENT] = consent_to_proceed_handler; - _request_handlers[CAPIO_REQUEST_CLOSE] = close_handler; - _request_handlers[CAPIO_REQUEST_CREATE] = create_handler; - _request_handlers[CAPIO_REQUEST_EXIT_GROUP] = exit_handler; - _request_handlers[CAPIO_REQUEST_HANDSHAKE] = handshake_handler; - _request_handlers[CAPIO_REQUEST_MKDIR] = create_handler; - _request_handlers[CAPIO_REQUEST_OPEN] = open_handler; - _request_handlers[CAPIO_REQUEST_READ] = read_handler; - _request_handlers[CAPIO_REQUEST_RENAME] = rename_handler; - _request_handlers[CAPIO_REQUEST_WRITE] = write_handler; - _request_handlers[CAPIO_REQUEST_QUERY_MEM_FILE] = files_to_store_in_memory_handler; - _request_handlers[CAPIO_REQUEST_READ_MEM] = read_mem_handler; - _request_handlers[CAPIO_REQUEST_WRITE_MEM] = write_mem_handler; + _request_handlers[CAPIO_REQUEST_CONSENT] = consent_to_proceed_handler; + _request_handlers[CAPIO_REQUEST_CLOSE] = close_handler; + _request_handlers[CAPIO_REQUEST_CREATE] = create_handler; + _request_handlers[CAPIO_REQUEST_EXIT_GROUP] = exit_handler; + _request_handlers[CAPIO_REQUEST_HANDSHAKE] = handshake_handler; + _request_handlers[CAPIO_REQUEST_MKDIR] = create_handler; + _request_handlers[CAPIO_REQUEST_OPEN] = open_handler; + _request_handlers[CAPIO_REQUEST_READ] = read_handler; + _request_handlers[CAPIO_REQUEST_RENAME] = rename_handler; + _request_handlers[CAPIO_REQUEST_WRITE] = write_handler; + _request_handlers[CAPIO_REQUEST_QUERY_MEM_FILE] = files_to_store_in_memory_handler; + _request_handlers[CAPIO_REQUEST_READ_MEM] = read_mem_handler; + _request_handlers[CAPIO_REQUEST_WRITE_MEM] = write_mem_handler; + _request_handlers[CAPIO_REQUEST_POSIX_DIR_COMMITTED] = posix_readdir_handler; return _request_handlers; } diff --git a/src/server/file-manager/file_manager.hpp b/src/server/file-manager/file_manager.hpp index 05664a0db..1fab3e965 100644 --- a/src/server/file-manager/file_manager.hpp +++ b/src/server/file-manager/file_manager.hpp @@ -31,6 +31,7 @@ class CapioFileManager { ~CapioFileManager() { START_LOG(gettid(), "call()"); } static uintmax_t get_file_size_if_exists(const std::filesystem::path &path); + static std::string getMetadataPath(const std::string &path); static void increaseCloseCount(const std::filesystem::path &path); static bool isCommitted(const std::filesystem::path &path); static void setCommitted(const std::filesystem::path &path); @@ -39,6 +40,7 @@ class CapioFileManager { void addThreadAwaitingCreation(const std::string &path, pid_t tid); void checkFilesAwaitingCreation(); void checkFileAwaitingData(); + void checkDirectoriesNFiles() const; }; inline CapioFileManager *file_manager; diff --git a/src/server/file-manager/file_manager_impl.hpp b/src/server/file-manager/file_manager_impl.hpp index 775b36ca3..deb99110c 100644 --- a/src/server/file-manager/file_manager_impl.hpp +++ b/src/server/file-manager/file_manager_impl.hpp @@ -6,10 +6,14 @@ #include "storage-service/capio_storage_service.hpp" #include "utils/distributed_semaphore.hpp" +inline std::string CapioFileManager::getMetadataPath(const std::string &path) { + return get_capio_metadata_path() / (path.substr(path.find(get_capio_dir()) + 1) + ".capio"); +} + /** * @brief Creates the directory structure for the metadata file and proceed to return the path * pointing to the metadata token file. For improvements in performances, a hash map is included to - * cache the computed paths. For thread safety conserns, see + * cache the computed paths. For thread safety concerns, see * https://en.cppreference.com/w/cpp/container#Thread_safety * * @param path real path of the file @@ -19,8 +23,8 @@ inline std::string CapioFileManager::getAndCreateMetadataPath(const std::string START_LOG(gettid(), "call(path=%s)", path.c_str()); static std::unordered_map metadata_paths; if (metadata_paths.find(path) == metadata_paths.end()) { - std::filesystem::path result = - get_capio_metadata_path() / (path.substr(path.find(get_capio_dir()) + 1) + ".capio"); + std::filesystem::path result = getMetadataPath(path); + metadata_paths.emplace(path, result); LOG("Creating metadata directory (%s)", result.parent_path().c_str()); std::filesystem::create_directories(result.parent_path()); @@ -382,4 +386,36 @@ inline void CapioFileManager::checkFileAwaitingData() { } } +/** + * @brief commit firectories that have NFILES inside them if their commit rule is n_files + */ +inline void CapioFileManager::checkDirectoriesNFiles() const { + + for (const auto &path_config : capio_cl_engine->getPathsInConfig()) { + if (!capio_cl_engine->isDirectory(path_config)) { + continue; + } + START_LOG(gettid(), "call()"); + auto n_files = capio_cl_engine->getDirectoryFileCount(path_config); + if (n_files > 0) { + LOG("Directory %s needs %ld files before being committed", path_config.c_str(), + n_files); + // There must be n_files inside the directory to commit the file + long count = 0; + if (std::filesystem::exists(path_config)) { + auto iterator = std::filesystem::directory_iterator(path_config); + for (const auto &entry : iterator) { + ++count; + } + } + + LOG("Directory %s has %ld files inside", path_config.c_str(), count); + if (count >= n_files) { + LOG("Committing directory"); + this->setCommitted(path_config); + } + } + } +} + #endif // FILE_MANAGER_HPP \ No newline at end of file diff --git a/src/server/file-manager/fs_monitor.hpp b/src/server/file-manager/fs_monitor.hpp index 0be699842..92af56dda 100644 --- a/src/server/file-manager/fs_monitor.hpp +++ b/src/server/file-manager/fs_monitor.hpp @@ -32,6 +32,7 @@ class FileSystemMonitor { file_manager->checkFilesAwaitingCreation(); file_manager->checkFileAwaitingData(); + file_manager->checkDirectoriesNFiles(); nanosleep(&sleep, nullptr); } diff --git a/src/server/storage-service/capio_storage_service.hpp b/src/server/storage-service/capio_storage_service.hpp index 5de1c3d6d..88d8f1385 100644 --- a/src/server/storage-service/capio_storage_service.hpp +++ b/src/server/storage-service/capio_storage_service.hpp @@ -137,6 +137,16 @@ class CapioStorageService { getFile(file)->writeToQueue(*_server_to_client_queue->at(pid), offset, size); } + /** + * Send raw data to client without fetching from the storage manager itself + * @param pid + * @param data + * @param len + */ + void reply_to_client_raw(pid_t pid, const char *data, const capio_off64_t len) const { + _server_to_client_queue->at(pid)->write(data, len); + } + /** * Receive the file content from the client application * @param tid