diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 81fecfabf..12812f74b 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -232,6 +232,7 @@ jobs: --exclude-throw-branches \ --xml coverage.xml \ --gcov-executable gcov \ + --exclude capio/tests \ ../build - name: "Compute Valid Artifact Name" diff --git a/capio/common/logger.hpp b/capio/common/logger.hpp index de3f6a649..0245ea2cb 100644 --- a/capio/common/logger.hpp +++ b/capio/common/logger.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include diff --git a/capio/server/include/remote/backend.hpp b/capio/server/include/remote/backend.hpp index 43ad8f5bb..342c511da 100644 --- a/capio/server/include/remote/backend.hpp +++ b/capio/server/include/remote/backend.hpp @@ -2,6 +2,7 @@ #define CAPIO_SERVER_REMOTE_BACKEND_HPP #include "common/logger.hpp" #include +#include class RemoteRequest { private: diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index c1f0b69bf..35ff7a843 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -1,506 +1,258 @@ -#ifndef CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP -#define CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP - -#include +#ifndef CAPIO_SERVER_CAPIO_FILE_HPP +#define CAPIO_SERVER_CAPIO_FILE_HPP +#include #include +#include +#include #include -#include +#include +#include #include #include -#include -#include -#include - -#include "common/logger.hpp" #include "common/queue.hpp" -#include "remote/backend.hpp" - -/* - * Only the server have all the information - * A process that only read from a file doesn't have the info on the _sectors - * A process that writes only have info on the sector that he wrote - * the file size is in shm because all the processes need this info - * and it's easy to provide it to them using the shm +/** + * @class CapioFile + * @brief Manages file data, sparse sectors, and synchronization for the CAPIO server. */ +class CapioFile { + /** + * @struct compareSectors + * @brief Comparator for the sectors set, ordering by offset. + */ + struct compareSectors { + bool operator()(const std::pair &lhs, + const std::pair &rhs) const; + }; -struct compare { - bool operator()(const std::pair &lhs, - const std::pair &rhs) const { - return (lhs.first < rhs.first); - } -}; + char *_buf = nullptr; ///< Raw pointer to memory buffer for file content + off64_t _buf_size = 0; ///< Allocated size of _buf + int _fd = -1; ///< File descriptor for permanent/mmap storage -class CapioFile { - char *_buf = nullptr; // buffer containing the data - off64_t _buf_size; - bool _directory = false; - // _fd is useful only when the file is memory-mapped - int _fd = -1; - bool _home_node = false; - int _n_links = 1; - long int _n_close = 0; - long int _n_close_expected = -1; - int _n_opens = 0; - bool _permanent = false; - // _sectors stored in memory of the files (only the home node is forced to - // be up to date) - std::set, compare> _sectors; - // vector of (tid, fd) + // TODO: check if it is possible to move from int to unsigned int + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) + const int _n_files_expected = -1; ///< Target dirent64 count (if directory) + const int _n_close_expected = -1; ///< Target close() operations for commitment + + bool _home_node = false; ///< True if this is the home node + bool _committed = false; ///< True if file is finalized + const bool _directory = false; ///< True if this instance represents a directory + const bool _permanent = false; ///< True if file persists after server exit + + std::atomic _first_write = true; ///< True if no data has been written yet + + /// @brief Set of [start, end] pairs representing valid data regions + std::set, compareSectors> _sectors; + mutable std::shared_mutex _mutex_sectors; ///< Shared lock to get concurrent access to _sectors + + std::atomic _real_file_size = 0; ///< Total logical size of the file + + /// @brief List of {Thread ID, FD} pairs associated with this file std::vector> _threads_fd; - bool _committed = false; // whether the file is completed / committed - - /*sync variables*/ - mutable std::mutex _mutex; - mutable std::condition_variable _complete_cv; - mutable std::condition_variable _data_avail_cv; - - off64_t _getStoredSize() const { - auto it = _sectors.rbegin(); - return (it == _sectors.rend()) ? 0 : it->second; - } - bool _first_write = true; - long _n_files = 0; // useful for directories - long _n_files_expected = -1; // useful for directories - - /* - * file size in the home node. In a given moment could not be up-to-date. - * This member is useful because a node different from the home node - * could need to known the size of the file but not its content + std::mutex _mutex_threads_fd; ///< Mutex to access _threads_fd + + mutable std::mutex _mutex; ///< Synchronization primitive for thread safety + mutable std::condition_variable _committed_cv; ///< Wait for commitment + mutable std::condition_variable _data_avail_cv; ///< Wait for data at specific offsets + + /** + * @brief Internal helper to calculate stored size without locking. + * @return Logical size based on the furthest sector end. */ + off64_t _getStoredSize() const; - std::size_t _real_file_size = 0; + /** + * @brief Reallocates the buffer and copies existing sectors to their correct offsets. + * @param new_p The pointer to the newly allocated memory. + * @param old_p The pointer to the old memory buffer. + */ + void _memcopyCapioFile(char *new_p, const char *old_p) const; public: - CapioFile() : _buf_size(0), _directory(false), _permanent(false) {} + /** @brief Default constructor. Initializes an empty file. */ + CapioFile(); - CapioFile(bool directory, long int n_files_expected, bool permanent, off64_t init_size, - long int n_close_expected) - : _buf_size(init_size), _directory(directory), _n_close_expected(n_close_expected), - _permanent(permanent), _n_files_expected(n_files_expected + 2) {} + /** + * @brief Explicit constructor for directory-specific initialization. + * @param directory Whether the file is a directory. + * @param n_files_expected Expected number of entries. + * @param permanent Persistence flag. + * @param init_size Initial buffer allocation size. + * @param n_close_expected Expected number of close calls. + */ + CapioFile(bool directory, int n_files_expected, bool permanent, off64_t init_size, + int n_close_expected); - CapioFile(bool directory, bool permanent, off64_t init_size, long int n_close_expected = -1) - : _buf_size(init_size), _directory(directory), _n_close_expected(n_close_expected), - _permanent(permanent) {} + /** + * @brief Standard constructor for files. + * @param directory Whether the file is a directory. + * @param permanent Persistence flag. + * @param init_size Initial buffer allocation size. + * @param n_close_expected Expected number of close calls. + */ + CapioFile(bool directory, bool permanent, off64_t init_size, int n_close_expected); CapioFile(const CapioFile &) = delete; CapioFile &operator=(const CapioFile &) = delete; - ~CapioFile() { - START_LOG(gettid(), "call()"); - LOG("Deleting capio_file"); - - if (_permanent && _home_node) { - if (_directory) { - delete[] _buf; - } else { - int res = munmap(_buf, _buf_size); - if (res == -1) { - ERR_EXIT("munmap CapioFile"); - } - } - } else { - delete[] _buf; - } - } - - [[nodiscard]] bool isCommitted() const { - START_LOG(gettid(), "capio_file is complete? %s", this->_committed ? "true" : "false"); - std::lock_guard lg(_mutex); - return this->_committed; - } - - void waitForCommit() const { - START_LOG(gettid(), "call()"); - LOG("Thread waiting for file to be committed"); - std::unique_lock lock(_mutex); - _complete_cv.wait(lock, [this] { return _committed; }); - } - - void waitForData(long offset) const { - START_LOG(gettid(), "call()"); - LOG("Thread waiting for data to be available"); - std::unique_lock lock(_mutex); - _data_avail_cv.wait(lock, [offset, this] { - return (offset >= this->_getStoredSize()) || this->_committed; - }); - } - - void setCommitted(bool complete = true) { - START_LOG(gettid(), "setting capio_file._complete=%s", complete ? "true" : "false"); - std::lock_guard lg(_mutex); - if (this->_committed != complete) { - this->_committed = complete; - if (this->_committed) { - _complete_cv.notify_all(); - _data_avail_cv.notify_all(); - } - } - } - - void addFd(int tid, int fd) { _threads_fd.emplace_back(tid, fd); } - - void close() { - _n_close++; - _n_opens--; - } - - void dump() { - START_LOG(gettid(), "call()"); - - if (_permanent && !_directory && _home_node) { - off64_t size = getFileSize(); - if (ftruncate(_fd, size) == -1) { - ERR_EXIT("ftruncate commit capio_file"); - } - _buf_size = size; - if (::close(_fd) == -1) { - ERR_EXIT("close commit capio_file"); - } - } - } - - /* - * To be called when a process - * execute a read or a write syscall + /** @brief Destructor. Cleans up allocated buffers and file descriptors. */ + ~CapioFile(); + + /** @return True if the file is committed and read-only. */ + [[nodiscard]] bool isCommitted() const; + + /** @return True if the internal buffer has not yet been allocated. */ + [[nodiscard]] bool bufferToAllocate() const; + + /** @return True if the close count matches expected closes. */ + [[nodiscard]] bool closed() const; + + /** @return True if the file is ready for removal. */ + [[nodiscard]] bool deletable() const; + + /** @return True if this is a directory. */ + [[nodiscard]] bool isDirectory() const; + + /** @return True if no write operations have been performed yet. */ + [[nodiscard]] bool isFirstWrite() const; + + /** @brief Blocks the calling thread until setCommitted() is called. */ + void waitForCommit() const; + + /** + * @brief Blocks until the requested offset is within a valid sector. + * @param offset The file offset to wait for. */ - void createBuffer(const std::filesystem::path &path, bool home_node) { - START_LOG(gettid(), "call(path=%s, home_node=%s)", path.c_str(), - home_node ? "true" : "false"); - std::lock_guard lock(_mutex); - if (this->_buf == nullptr) { - _home_node = home_node; - if (_permanent && home_node) { - if (_directory) { - std::filesystem::create_directory(path); - std::filesystem::permissions(path, std::filesystem::perms::owner_all); - _buf = new char[_buf_size]; - } else { - LOG("creating mem mapped file"); - _fd = ::open(path.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); - if (_fd == -1) { - ERR_EXIT("open %s CapioFile constructor", path.c_str()); - } - if (ftruncate(_fd, _buf_size) == -1) { - ERR_EXIT("ftruncate CapioFile constructor"); - } - _buf = (char *) mmap(nullptr, _buf_size, PROT_READ | PROT_WRITE, MAP_SHARED, - _fd, 0); - if (_buf == MAP_FAILED) { - ERR_EXIT("mmap CapioFile constructor"); - } - } - } else { - _buf = new char[_buf_size]; - } - } - } - - void memcopyCapioFile(char *new_p, char *old_p) const { - for (auto §or : _sectors) { - off64_t lbound = sector.first; - off64_t ubound = sector.second; - off64_t sector_length = ubound - lbound; - memcpy(new_p + lbound, old_p + lbound, sector_length); - } - } - - char *expandBuffer(off64_t data_size) { // TODO: use realloc - off64_t double_size = _buf_size * 2; - off64_t new_size = data_size > double_size ? data_size : double_size; - char *new_buf = new char[new_size]; - std::lock_guard lock(_mutex); - // memcpy(new_p, old_p, file_shm_size); //TODO memcpy only the - // sector - // stored in CapioFile - memcopyCapioFile(new_buf, _buf); - delete[] _buf; - _buf = new_buf; - _buf_size = new_size; - return new_buf; - } - - char *getBuffer() { return _buf; } - - [[nodiscard]] off64_t getBufSize() const { return _buf_size; } - - [[nodiscard]] const std::vector> &getFds() const { return _threads_fd; } - - [[nodiscard]] off64_t getFileSize() const { - std::lock_guard lock(_mutex); - if (!_sectors.empty()) { - return _sectors.rbegin()->second; - } else { - return 0; - } - } - - /* - * Returns the offset to the end of the sector - * if the offset parameter is inside the - * sector, -1 otherwise - * + void waitForData(long offset) const; + + /** + * @brief Marks the file as committed and notifies waiting threads. + * @param commit The new status (defaults to true). */ - [[nodiscard]] off64_t getSectorEnd(off64_t offset) const { - START_LOG(gettid(), "call(offset=%ld)", offset); - - off64_t sector_end = -1; - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - - if (!_sectors.empty() && it != _sectors.begin()) { - --it; - if (offset <= it->second) { - sector_end = it->second; - } - } - - return sector_end; - } - - [[nodiscard]] const std::set, compare> &getSectors() const { - return _sectors; - } - - /* - * get the size of the data stored in this node - * If the node is the home node then this is equals to - * the real size of the file + void setCommitted(bool commit = true); + + /** + * @brief Maps a Thread ID to a specific File Descriptor. + * @param tid Thread ID. + * @param fd File descriptor. */ - [[nodiscard]] off64_t getStoredSize() const { - std::lock_guard lock(_mutex); - return this->_getStoredSize(); - } - - /* - * Insert the new sector automatically modifying the - * existent _sectors if needed. - * - * Params: - * off64_t new_start: the beginning of the sector to insert - * off64_t new_end: the beginning of the sector to insert - * - * new_start must be > new_end otherwise the behaviour - * in undefined - * + void addFd(int tid, int fd); + + /** + * @brief Removes a Thread ID/FD mapping. + * @param tid Thread ID. + * @param fd File descriptor. */ - void insertSector(off64_t new_start, off64_t new_end) { - START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); - - auto p = std::make_pair(new_start, new_end); - std::lock_guard lock(_mutex); - - if (_sectors.empty()) { - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - return; - } - auto it_lbound = _sectors.upper_bound(p); - if (it_lbound == _sectors.begin()) { - if (new_end < it_lbound->first) { - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } else { - auto it = it_lbound; - bool end_before = false; - bool end_inside = false; - while (it != _sectors.end() && !end_before && !end_inside) { - end_before = p.second < it->first; - if (!end_before) { - end_inside = p.second <= it->second; - if (!end_inside) { - ++it; - } - } - } - - if (end_inside) { - p.second = it->second; - ++it; - } - _sectors.erase(it_lbound, it); - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } - } else { - --it_lbound; - auto it = it_lbound; - if (p.first <= it_lbound->second) { - // new sector starts inside a sector - p.first = it_lbound->first; - } else { // in this way the sector will not be deleted - ++it_lbound; - } - bool end_before = false; - bool end_inside = false; - while (it != _sectors.end() && !end_before && !end_inside) { - end_before = p.second < it->first; - if (!end_before) { - end_inside = p.second <= it->second; - if (!end_inside) { - ++it; - } - } - } - - if (end_inside) { - p.second = it->second; - ++it; - } - _sectors.erase(it_lbound, it); - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } - } - - [[nodiscard]] bool closed() const { - return _n_close_expected == -1 || _n_close == _n_close_expected; - } - - [[nodiscard]] bool deletable() const { return _n_opens <= 0; } - - [[nodiscard]] bool isDirectory() const { return _directory; } - - void open() { _n_opens++; } - - /* - * From the manual: - * - * Adjust the file offset to the next location in the file - * greater than or equal to offset containing data. If - * offset points to data, then the file offset is set to - * offset. - * - * Fails if offset points past the end of the file. - * + void removeFd(int tid, int fd); + + /** @brief Increments the internal open counter. */ + void open(); + + /** @brief Increments the _n_close counter, while decrementing the _n_open counter. */ + void close(); + + /** + * @brief Initializes the memory buffer or mmap area. + * @param path Path to the file. + * @param home_node Whether this node is the home for the file. + */ + void createBuffer(const std::filesystem::path &path, bool home_node); + + /** + * @brief Resizes the internal buffer to accommodate more data. + * @param data_size Required additional size. + * @return Pointer to the expanded buffer. + */ + char *expandBuffer(off64_t data_size); + + /** @brief Dump _buf buffer to the file system. */ + void dump(); + + /** + * @brief Tracks a new data range in the file. + * @param new_start Starting offset of the data. + * @param new_end Ending offset of the data. + */ + void insertSector(off64_t new_start, off64_t new_end); + + /** + * @brief Fetches data from a remote CAPIO node. + * @param dest Destination node identifier. + * @param offset File offset to read from. + * @param buffer_size Amount of data to fetch. + */ + void readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) const; + + /** + * @brief Transfers data from an SPSC queue into the file buffer. + * @param queue Source queue. + * @param offset Destination offset in this file. + * @param num_bytes Number of bytes to transfer. + */ + void readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) const; + + /** @brief Explicitly sets the total file size. */ + void setRealFileSize(off64_t size); + + /** @brief Marks that at least one write has occurred. */ + void registerFirstWrite(); + + /** + * @brief Increases the count of files contained in this directory. + * @param count the number to increase the internal counter + */ + void incrementDirectoryFileCount(int count = 1); + + /** @return Pointer to the raw memory buffer. */ + char *getBuffer() const; + + /** @return Vector of TID/FD pairs. */ + [[nodiscard]] const std::vector> &getFds() const; + + /** @return The physical size of the current buffer. */ + [[nodiscard]] off64_t getBufSize() const; + + /** @return The logical size of the file. */ + [[nodiscard]] off64_t getRealFileSize() const; + + /** @return The total size, accounting for holes and metadata. */ + [[nodiscard]] off64_t getFileSize() const; + + /** @return Size of data currently residing on this node. */ + [[nodiscard]] off64_t getStoredSize() const; + + /** @return Count of files currently indexed in this directory. */ + [[nodiscard]] int getCurrentDirectoryFileCount() const; + + /** @return Expected total files in this directory. */ + [[nodiscard]] int getDirectoryExpectedFileCount() const; + + /** @return Reference to the internal sector map. */ + [[nodiscard]] const std::set, compareSectors> &getSectors() const; + + /** + * @brief Finds the end of the sector containing the offset. + * @param offset Position to check. + * @return End offset of the sector, or -1 if in a hole. */ - off64_t seekData(off64_t offset) { - if (_sectors.empty()) { - if (offset == 0) { - return 0; - } else { - return -1; - } - } - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - if (it == _sectors.begin()) { - return it->first; - } - --it; - if (offset <= it->second) { - return offset; - } else { - ++it; - if (it == _sectors.end()) { - return -1; - } else { - return it->first; - } - } - } - - /* - * From the manual: - * - * Adjust the file offset to the next hole in the file - * greater than or equal to offset. If offset points into - * the middle of a hole, then the file offset is set to - * offset. If there is no hole past offset, then the file - * offset is adjusted to the end of the file (i.e., there is - * an implicit hole at the end of any file). - * - * - * Fails if offset points past the end of the file. - * + [[nodiscard]] off64_t getSectorEnd(off64_t offset) const; + + /** + * @brief Finds the next data segment. + * @param offset Start searching from here. + * @return Offset of data, or error if beyond end of file. */ - [[nodiscard]] off64_t seekHole(off64_t offset) const { - if (_sectors.empty()) { - if (offset == 0) { - return 0; - } else { - return -1; - } - } - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - if (it == _sectors.begin()) { - return offset; - } - --it; - if (offset <= it->second) { - return it->second; - } else { - ++it; - if (it == _sectors.end()) { - return -1; - } else { - return offset; - } - } - } - - void removeFd(int tid, int fd) { - auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); - if (it != _threads_fd.end()) { - _threads_fd.erase(it); - } - } + off64_t seekData(off64_t offset) const; /** - * Save data inside the capio_file buffer - * @param buffer - * @return + * @brief Finds the next hole in the file. + * @param offset Start searching from here. + * @return Offset of the hole. */ - void readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) { - std::unique_lock lock(_mutex); - backend->recv_file(_buf + offset, dest, buffer_size); - _data_avail_cv.notify_all(); - } - - void readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) { - START_LOG(gettid(), "call()"); - - std::unique_lock lock(_mutex); - queue.read(_buf + offset, num_bytes); - _data_avail_cv.notify_all(); - } - - // TODO: Understand the scope of this method and find a better name - std::size_t getRealFileSize() const { - std::lock_guard lg(_mutex); - return this->_real_file_size; - } - - void setRealFileSize(const off64_t size) { - std::lock_guard lg(_mutex); - this->_real_file_size = size; - } - - bool isFirstWrite() const { - std::lock_guard lg(_mutex); - return this->_first_write; - } - - // TODO: add a dedicated CapioFile::write() and CapioFile::read() method, remove this method - // from public scope - void registerFirstWrite() { - std::lock_guard lg(_mutex); - this->_first_write = false; - } - - void incrementDirectoryFileCount(const int count = 1) { - std::lock_guard lg(_mutex); - this->_n_files += count; - } - - long getCurrentDirectoryFileCount() const { - std::lock_guard lg(_mutex); - return this->_n_files; - } - - long getDirectoryExpectedFileCount() const { - std::lock_guard lg(_mutex); - return this->_n_files_expected; - } + [[nodiscard]] off64_t seekHole(off64_t offset) const; }; -#endif // CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP \ No newline at end of file +#endif // CAPIO_SERVER_CAPIO_FILE_HPP \ No newline at end of file diff --git a/capio/server/include/utils/location.hpp b/capio/server/include/utils/location.hpp index c43489a7e..5d573f950 100644 --- a/capio/server/include/utils/location.hpp +++ b/capio/server/include/utils/location.hpp @@ -1,12 +1,15 @@ #ifndef CAPIO_SERVER_UTILS_LOCATIONS_HPP #define CAPIO_SERVER_UTILS_LOCATIONS_HPP +#include "remote/backend.hpp" + #include #include #include "utils/types.hpp" extern char *node_name; +extern Backend *backend; constexpr char CAPIO_SERVER_FILES_LOCATION_NAME[] = "files_location_%s.txt"; constexpr char CAPIO_SERVER_INVALIDATE_FILE_PATH_CHAR = '#'; diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp new file mode 100644 index 000000000..45baf5c40 --- /dev/null +++ b/capio/server/src/capio_file.cpp @@ -0,0 +1,340 @@ +#include "server/include/storage/capio_file.hpp" +#include "common/logger.hpp" +#include "remote/backend.hpp" +#include "server/include/utils/common.hpp" +#include "utils/shared_mutex.hpp" + +bool CapioFile::compareSectors::operator()(const std::pair &lhs, + const std::pair &rhs) const { + return (lhs.first < rhs.first); +} + +CapioFile::CapioFile() = default; + +CapioFile::CapioFile(const bool directory, const int n_files_expected, const bool permanent, + const off64_t init_size, const int n_close_expected) + : _buf_size(init_size), _n_files_expected(n_files_expected + 2), + _n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {} + +CapioFile::CapioFile(const bool directory, const bool permanent, const off64_t init_size, + const int n_close_expected) + : _buf_size(init_size), _n_close_expected(n_close_expected), _directory(directory), + _permanent(permanent) {} + +CapioFile::~CapioFile() { + START_LOG(gettid(), "call()"); + LOG("Deleting capio_file"); + + if (_permanent && _home_node) { + if (_directory) { + delete[] _buf; + } else { + if (munmap(_buf, _buf_size) == -1) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "WARN: unable to unmap CapioFile: " + std::string(strerror(errno))); + } + } + } else { + delete[] _buf; + } +} + +bool CapioFile::isCommitted() const { + START_LOG(gettid(), "capio_file is complete? %s", this->_committed ? "true" : "false"); + std::lock_guard lg(_mutex); + return this->_committed; +} + +void CapioFile::waitForData(long offset) const { + START_LOG(gettid(), "call()"); + LOG("Thread waiting for data to be available"); + std::unique_lock lock(_mutex); + _data_avail_cv.wait(lock, + [offset, this] { return this->_getStoredSize() >= offset || _committed; }); +} + +void CapioFile::setCommitted(const bool commit) { + START_LOG(gettid(), "setting capio_file._complete=%s", commit ? "true" : "false"); + std::lock_guard lg(_mutex); + if (this->_committed != commit) { + this->_committed = commit; + if (this->_committed) { + _committed_cv.notify_all(); + _data_avail_cv.notify_all(); + } + } +} + +void CapioFile::addFd(int tid, int fd) { + START_LOG(gettid(), "call(tid=%d, fd=%d)", tid, fd); + std::lock_guard lg(_mutex_threads_fd); + _threads_fd.emplace_back(tid, fd); +} + +void CapioFile::waitForCommit() const { + START_LOG(gettid(), "call()"); + LOG("Thread waiting for file to be committed"); + std::unique_lock lock(_mutex); + _committed_cv.wait(lock, [this] { return _committed; }); +} + +void CapioFile::close() { + START_LOG(gettid(), "call()"); + ++_n_close; + --_n_opens; + LOG("AFTER: _n_close=%d / _n_opens=%d", _n_close.load(), _n_opens.load()); +} + +void CapioFile::dump() { + START_LOG(gettid(), "call()"); + + if (_permanent && !_directory && _home_node) { + const off64_t size = getFileSize(); + if (ftruncate(_fd, size) == -1) { + ERR_EXIT("ftruncate commit capio_file"); + } + _buf_size = size; + if (::close(_fd) == -1) { + ERR_EXIT("close commit capio_file"); + } + } +} + +void CapioFile::createBuffer(const std::filesystem::path &path, const bool home_node) { + START_LOG(gettid(), "call(path=%s, home_node=%s)", path.c_str(), home_node ? "true" : "false"); + std::lock_guard lock(_mutex); + if (this->_buf == nullptr) { + _home_node = home_node; + if (_permanent && home_node) { + if (_directory) { + std::filesystem::create_directory(path); + std::filesystem::permissions(path, std::filesystem::perms::owner_all); + _buf = new char[_buf_size]; + } else { + LOG("creating mem mapped file"); + _fd = ::open(path.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); + if (_fd == -1) { + ERR_EXIT("open %s CapioFile constructor", path.c_str()); + } + if (ftruncate(_fd, _buf_size) == -1) { + ERR_EXIT("ftruncate CapioFile constructor"); + } + _buf = static_cast( + mmap(nullptr, _buf_size, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0)); + if (_buf == MAP_FAILED) { + ERR_EXIT("mmap CapioFile constructor"); + } + } + } else { + _buf = new char[_buf_size]; + } + } +} + +void CapioFile::_memcopyCapioFile(char *new_p, const char *old_p) const { + // NOTE: do not lock here: this method is private and when reaching control in this function, + // the caller has already locked the mutex + START_LOG(gettid(), "call()"); + for (const auto &[lbound, ubound] : _sectors) { + memcpy(new_p + lbound, old_p + lbound, ubound - lbound); + } +} + +char *CapioFile::expandBuffer(const off64_t data_size) { + const off64_t double_size = _buf_size * 2; + const off64_t new_size = std::max(data_size, double_size); + const auto new_buf = new char[new_size]; + + std::lock_guard lock(_mutex); + _memcopyCapioFile(new_buf, _buf); + delete[] _buf; + _buf = new_buf; + _buf_size = new_size; + return new_buf; +} + +// TODO: This is a memory leak that may lead to race conditions. remove exposure of internal buffer +char *CapioFile::getBuffer() const { return _buf; } + +off64_t CapioFile::getBufSize() const { return _buf_size; } + +// TODO: This is a memory leak that may lead to race conditions. remove exposure of internal buffer +const std::vector> &CapioFile::getFds() const { return _threads_fd; } + +off64_t CapioFile::getFileSize() const { + const shared_lock_guard slg(_mutex_sectors); + if (!_sectors.empty()) { + return _sectors.rbegin()->second; + } else { + return 0; + } +} + +off64_t CapioFile::getSectorEnd(off64_t offset) const { + START_LOG(gettid(), "call(offset=%ld)", offset); + + const shared_lock_guard slg(_mutex_sectors); + + if (_sectors.empty()) { + return -1; + } + + if (auto it = _sectors.upper_bound(std::make_pair(offset, 0)); it != _sectors.begin()) { + --it; + if (offset <= it->second) { + return it->second; + } + } + + return -1; +} +// TODO: This is a memory leak that may lead to race conditions. remove exposure of internal buffer +const std::set, CapioFile::compareSectors> & +CapioFile::getSectors() const { + return _sectors; +} + +off64_t CapioFile::getStoredSize() const { + std::lock_guard lock(_mutex); + return this->_getStoredSize(); +} + +void CapioFile::insertSector(off64_t new_start, off64_t new_end) { + START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); + std::lock_guard lock(_mutex_sectors); + + // First sector whose start >= new_start + auto it_start = _sectors.lower_bound({new_start, LLONG_MIN}); + + // Check if the previous sector reaches into our range + if (it_start != _sectors.begin()) { + if (const auto it_prev = std::prev(it_start); it_prev->second >= new_start) { + new_start = it_prev->first; + new_end = std::max(new_end, it_prev->second); + it_start = it_prev; + } + } + + // First sector whose start > new_end (adjacent sectors included via +1) + const auto it_end = _sectors.upper_bound({new_end + 1, LLONG_MIN}); + + // Absorb the right edge of the last overlapping sector if needed + if (it_end != _sectors.begin()) { + if (const auto it_last = std::prev(it_end); it_last->second > new_end) { + new_end = it_last->second; + } + } + + _sectors.erase(it_start, it_end); + LOG("Insert sector <%ld, %ld>", new_start, new_end); + _sectors.emplace(new_start, new_end); +} + +bool CapioFile::closed() const { return _n_close_expected == -1 || _n_close == _n_close_expected; } + +bool CapioFile::deletable() const { return _n_opens <= 0; } + +bool CapioFile::isDirectory() const { return _directory; } + +void CapioFile::open() { ++_n_opens; } + +off64_t CapioFile::seekData(off64_t offset) const { + START_LOG(gettid(), "call(offset=%ld)", offset); + const shared_lock_guard slg(_mutex_sectors); + if (_sectors.empty()) { + if (offset == 0) { + return 0; + } else { + return -1; + } + } + auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + if (it == _sectors.begin()) { + return it->first; + } + --it; + if (offset <= it->second) { + return offset; + } else { + ++it; + if (it == _sectors.end()) { + return -1; + } else { + return it->first; + } + } +} + +off64_t CapioFile::seekHole(off64_t offset) const { + START_LOG(gettid(), "call(offset=%ld)", offset); + const shared_lock_guard slg(_mutex_sectors); + if (_sectors.empty()) { + if (offset == 0) { + return 0; + } else { + return -1; + } + } + auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + if (it == _sectors.begin()) { + return offset; + } + --it; + if (offset <= it->second) { + return it->second; + } else { + ++it; + if (it == _sectors.end()) { + return -1; + } else { + return offset; + } + } +} + +void CapioFile::removeFd(int tid, int fd) { + std::lock_guard lock(_mutex_threads_fd); + if (const auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); + it != _threads_fd.end()) { + _threads_fd.erase(it); + } +} + +void CapioFile::readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) const { + std::lock_guard lock(_mutex); + backend->recv_file(_buf + offset, dest, buffer_size); + _data_avail_cv.notify_all(); +} + +void CapioFile::readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) const { + START_LOG(gettid(), "call()"); + + std::lock_guard lock(_mutex); + queue.read(_buf + offset, num_bytes); + _data_avail_cv.notify_all(); +} + +off64_t CapioFile::_getStoredSize() const { + const shared_lock_guard slg(_mutex_sectors); + const auto it = _sectors.rbegin(); + return (it == _sectors.rend()) ? 0 : it->second; +} + +bool CapioFile::bufferToAllocate() const { + std::lock_guard lg(_mutex); + return _buf == nullptr; +} + +off64_t CapioFile::getRealFileSize() const { return this->_real_file_size; } + +void CapioFile::setRealFileSize(const off64_t size) { this->_real_file_size = size; } + +bool CapioFile::isFirstWrite() const { return this->_first_write; } + +void CapioFile::registerFirstWrite() { this->_first_write = false; } + +void CapioFile::incrementDirectoryFileCount(const int count) { this->_n_files += count; } + +int CapioFile::getCurrentDirectoryFileCount() const { return this->_n_files; } + +int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index c1e6d4c62..7742b865c 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -1,8 +1,9 @@ -#ifndef CAPIO_CAPIO_FILE_HPP -#define CAPIO_CAPIO_FILE_HPP #include "server/include/storage/capio_file.hpp" #include "common/env.hpp" +#include "remote/backend.hpp" + #include +#include TEST(ServerTest, TestInsertSingleSector) { CapioFile c_file; @@ -12,6 +13,16 @@ TEST(ServerTest, TestInsertSingleSector) { EXPECT_NE(sectors.find({1L, 3L}), sectors.end()); } +TEST(ServerTest, TestBufferAllocation) { + CapioFile c_file; + EXPECT_TRUE(c_file.bufferToAllocate()); + c_file.createBuffer("test.dat", true); + EXPECT_FALSE(c_file.bufferToAllocate()); + EXPECT_EQ(c_file.getBufSize(), 0); + EXPECT_EQ(c_file.getRealFileSize(), 0); + EXPECT_NE(nullptr, c_file.getBuffer()); +} + TEST(ServerTest, TestInsertTwoNonOverlappingSectors) { CapioFile c_file; c_file.insertSector(5, 7); @@ -59,4 +70,295 @@ TEST(ServerTest, TestInsertTwoOverlappingSectorsNested) { EXPECT_EQ(sectors.size(), 1); EXPECT_NE(sectors.find({1L, 4L}), sectors.end()); } -#endif // CAPIO_CAPIO_FILE_HPP + +TEST(ServerTest, TestDestructionOfPermanentCapioFile) { + auto *c_file = new CapioFile(false, true, 1000, 1); + c_file->createBuffer("test.dat", true); + delete c_file; + EXPECT_TRUE(std::filesystem::exists("test.dat")); + std::filesystem::remove("test.dat"); +} + +TEST(ServerTest, TestDestructionOfPermanentCapioFileDirectory) { + auto *c_file = new CapioFile(true, true, 1000, 1); + c_file->createBuffer("mydirectory", true); + delete c_file; + EXPECT_TRUE(std::filesystem::exists("mydirectory")); + EXPECT_TRUE(std::filesystem::is_directory("mydirectory")); + std::filesystem::remove("mydirectory"); +} + +TEST(ServerTest, TestCapioFileWaitForDataMultithreaded) { + CapioFile file; + + SPSCQueue queue("test_queue", get_cache_lines(), get_cache_line_size(), "test_wf"); + + std::mutex _lock; + _lock.lock(); + + std::thread t([&_lock, &file, &queue] { + _lock.lock(); + file.expandBuffer(1000); + file.registerFirstWrite(); + + EXPECT_NE(file.getBuffer(), nullptr); + char buffer[1000]; + for (std::size_t i = 0; i < 1000; ++i) { + buffer[i] = 33 + (i % 93); + } + + queue.write(buffer, 1000); + file.insertSector(0, 1000); + file.readFromQueue(queue, 0, 1000); + }); + + _lock.unlock(); + file.waitForData(1000); + + auto stored_size = file.getFileSize(); + EXPECT_EQ(stored_size, 1000); + + const auto buf = file.getBuffer(); + EXPECT_NE(buf, nullptr); + for (std::size_t i = 0; i < 1000; ++i) { + EXPECT_EQ(buf[i], 33 + (i % 93)); + } + + t.join(); +} + +TEST(ServerTest, TestCapioFileWaitForDataMultithreadedWithCommit) { + CapioFile file; + + SPSCQueue queue("test_queue", get_cache_lines(), get_cache_line_size(), "test_wf"); + + std::mutex _lock; + _lock.lock(); + + std::thread t([&_lock, &file, &queue] { + _lock.lock(); + file.setCommitted(); + }); + + _lock.unlock(); + file.waitForData(1000); + + EXPECT_EQ(file.getFileSize(), 0); + + t.join(); +} + +TEST(ServerTest, TestCapioFileWaitForCompletion) { + CapioFile file; + + std::mutex _lock; + _lock.lock(); + + std::thread t([&] { + _lock.lock(); + file.expandBuffer(1000); + file.registerFirstWrite(); + + EXPECT_NE(file.getBuffer(), nullptr); + char buffer[1000]; + for (std::size_t i = 0; i < 1000; ++i) { + buffer[i] = 33 + (i % 93); + } + + memcpy(file.getBuffer(), buffer, 1000); + + file.insertSector(0, 1000); + file.setCommitted(); + }); + + _lock.unlock(); + file.waitForCommit(); + + auto stored_size = file.getFileSize(); + EXPECT_EQ(stored_size, 1000); + + const auto buf = file.getBuffer(); + EXPECT_NE(buf, nullptr); + for (std::size_t i = 0; i < 1000; ++i) { + EXPECT_EQ(buf[i], 33 + (i % 93)); + } + + t.join(); +} + +TEST(ServerTest, TestCommitCapioFile) { + auto file = new CapioFile(false, true, 1000, 1); + file->createBuffer("test.dat", true); + EXPECT_EQ(std::filesystem::file_size("test.dat"), 1000); + file->close(); + file->dump(); + EXPECT_EQ(std::filesystem::file_size("test.dat"), 0); + delete file; + EXPECT_TRUE(std::filesystem::exists("test.dat")); + std::filesystem::remove("test.dat"); +} + +TEST(ServerTest, TestDumpNotHomeNode) { + auto file = new CapioFile(false, true, 1000, 1); + file->createBuffer("test.dat", false); + file->close(); + file->dump(); + delete file; + EXPECT_FALSE(std::filesystem::exists("test.dat")); +} + +TEST(ServerTest, TestCommitAndDeleteDirectory) { + EXPECT_FALSE(std::filesystem::exists("mydir")); + auto file = new CapioFile(true, true, 1000, 1); + file->createBuffer("mydir", true); + EXPECT_TRUE(std::filesystem::exists("mydir")); + EXPECT_TRUE(std::filesystem::is_directory("mydir")); + delete file; + EXPECT_TRUE(std::filesystem::exists("mydir")); + std::filesystem::remove("mydir"); +} + +TEST(ServerTest, TesMemcpyCapioFile) { + CapioFile file; + + file.createBuffer("test.dat", true); + + // NOTE: here we only simulate a write operation on the file, without actually writing to _buf + file.insertSector(0, 100); + file.insertSector(100, 200); + + file.expandBuffer(2000); + + EXPECT_EQ(file.getStoredSize(), 200); + EXPECT_EQ(file.getBufSize(), 2000); +} + +TEST(ServerTest, TestCloseCapioFile) { + CapioFile file(false, false, 0, -1); + EXPECT_TRUE(file.closed()); // TEST for n_close_expected == -1 + + CapioFile file1(false, false, 0, 10); + EXPECT_FALSE(file1.closed()); + for (std::size_t i = 0; i < 10; ++i) { + file1.open(); + EXPECT_FALSE(file1.closed()); + file1.close(); + } + EXPECT_TRUE(file1.closed()); +} + +TEST(ServerTest, TestCapioFileSeekData) { + CapioFile file; + + EXPECT_EQ(file.seekData(100), -1); + EXPECT_EQ(file.seekData(0), 0); + + file.insertSector(0, 1000); + EXPECT_EQ(file.seekData(100), 100); + EXPECT_EQ(file.seekData(2000), -1); + file.insertSector(2000, 3000); + EXPECT_NE(file.seekData(1500), 1500); // return here the closest offset... + + CapioFile file1; + file1.insertSector(200, 300); + EXPECT_EQ(file1.seekData(1), 200); +} + +TEST(ServerTest, TestCapioFileSeekHole) { + CapioFile file; + + EXPECT_EQ(file.seekHole(100), -1); + EXPECT_EQ(file.seekHole(0), 0); + file.insertSector(0, 1000); + EXPECT_EQ(file.seekHole(100), 1000); + EXPECT_EQ(file.seekHole(2000), -1); + file.insertSector(2000, 3000); + EXPECT_EQ(file.seekHole(1500), 1500); // return here the closest offset... + + CapioFile file1; + file1.insertSector(200, 300); + EXPECT_EQ(file1.seekHole(1), 1); +} + +TEST(ServerTest, TestAddAndRemoveFD) { + CapioFile file; + file.addFd(12345, 4); + file.addFd(12345, 5); + + file.removeFd(12345, 6); + EXPECT_EQ(file.getFds().size(), 2); + file.removeFd(12345, 5); + EXPECT_EQ(file.getFds().size(), 1); + file.removeFd(12345, 4); + EXPECT_EQ(file.getFds().size(), 0); +} + +TEST(ServerTest, TestSetGetRealFileSize) { + CapioFile file; + EXPECT_EQ(file.getRealFileSize(), 0); + file.setRealFileSize(1234); + EXPECT_EQ(file.getRealFileSize(), 1234); +} + +TEST(ServerTest, TestDeletePermanentDirectory) { + const auto file = new CapioFile(true, true, 1000, 1); + file->createBuffer("testDir", true); + delete file; + EXPECT_TRUE(std::filesystem::exists("testDir")); + EXPECT_TRUE(std::filesystem::is_directory("testDir")); + std::filesystem::remove("testDir"); +} + +TEST(ServerTest, TestFileSetCommitToFalse) { + CapioFile file; + file.setCommitted(); + EXPECT_TRUE(file.isCommitted()); + file.setCommitted(false); + EXPECT_FALSE(file.isCommitted()); +} + +class MockBackend : public Backend { + public: + void recv_file(char *shm, const std::string &source, const long int bytes_expected) override { + for (std::size_t i = 0; i < bytes_expected; ++i) { + shm[i] = 33 + (i % 93); + } + } + + const std::set get_nodes() override { return {}; } + void handshake_servers() override {} + RemoteRequest read_next_request() override { return {nullptr, ""}; } + void send_file(char *shm, long int nbytes, const std::string &target) override {} + void send_request(const char *message, int message_len, const std::string &target) override {} +}; + +TEST(ServerTest, TestReadFromNodeMockBackend) { + + backend = new MockBackend(); + + CapioFile file1; + file1.createBuffer("testDir", true); + file1.expandBuffer(1000); + file1.registerFirstWrite(); + file1.insertSector(0, 1000); + + file1.readFromNode("none", 0, 1000); + + const auto buf = file1.getBuffer(); + for (std::size_t i = 0; i < 1000; ++i) { + EXPECT_EQ(buf[i], 33 + (i % 93)); + } + + delete backend; +} + +TEST(ServerTest, TestGetSectorEnd) { + CapioFile file; + + EXPECT_EQ(file.getSectorEnd(1234), -1); + + file.insertSector(0, 1234); + + EXPECT_EQ(file.getSectorEnd(120), 1234); + EXPECT_EQ(file.getSectorEnd(12000), -1); +}