From 3bc23e523b53da3ff9e60fd96de156ffe1f9f123 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 4 Mar 2026 14:03:08 +0000 Subject: [PATCH 01/15] Split declaration and implementation of CapioFile class Tests --- .github/workflows/ci-tests.yaml | 1 + capio/common/logger.hpp | 1 + capio/server/include/remote/backend.hpp | 1 + capio/server/include/storage/capio_file.hpp | 695 +++++++------------- capio/server/include/utils/location.hpp | 3 + capio/server/src/capio_file.cpp | 383 +++++++++++ capio/tests/unit/server/src/capio_file.cpp | 221 ++++++- 7 files changed, 828 insertions(+), 477 deletions(-) create mode 100644 capio/server/src/capio_file.cpp diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 81fecfabf..12812f74b 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -232,6 +232,7 @@ jobs: --exclude-throw-branches \ --xml coverage.xml \ --gcov-executable gcov \ + --exclude capio/tests \ ../build - name: "Compute Valid Artifact Name" diff --git a/capio/common/logger.hpp b/capio/common/logger.hpp index de3f6a649..0245ea2cb 100644 --- a/capio/common/logger.hpp +++ b/capio/common/logger.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include diff --git a/capio/server/include/remote/backend.hpp b/capio/server/include/remote/backend.hpp index 43ad8f5bb..342c511da 100644 --- a/capio/server/include/remote/backend.hpp +++ b/capio/server/include/remote/backend.hpp @@ -2,6 +2,7 @@ #define CAPIO_SERVER_REMOTE_BACKEND_HPP #include "common/logger.hpp" #include +#include class RemoteRequest { private: diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index c1f0b69bf..949f3efd4 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -1,506 +1,253 @@ -#ifndef CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP -#define CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP +#ifndef CAPIO_SERVER_CAPIO_FILE_HPP +#define CAPIO_SERVER_CAPIO_FILE_HPP -#include #include +#include +#include #include -#include +#include #include #include -#include -#include -#include - -#include "common/logger.hpp" #include "common/queue.hpp" -#include "remote/backend.hpp" - -/* - * Only the server have all the information - * A process that only read from a file doesn't have the info on the _sectors - * A process that writes only have info on the sector that he wrote - * the file size is in shm because all the processes need this info - * and it's easy to provide it to them using the shm +/** + * @class CapioFile + * @brief Manages file data, sparse sectors, and synchronization for the CAPIO server. */ - -struct compare { - bool operator()(const std::pair &lhs, - const std::pair &rhs) const { - return (lhs.first < rhs.first); - } -}; - class CapioFile { - char *_buf = nullptr; // buffer containing the data - off64_t _buf_size; - bool _directory = false; - // _fd is useful only when the file is memory-mapped - int _fd = -1; - bool _home_node = false; - int _n_links = 1; - long int _n_close = 0; - long int _n_close_expected = -1; - int _n_opens = 0; - bool _permanent = false; - // _sectors stored in memory of the files (only the home node is forced to - // be up to date) - std::set, compare> _sectors; - // vector of (tid, fd) + /** + * @struct compareSectors + * @brief Comparator for the sectors set, ordering by offset. + */ + struct compareSectors { + bool operator()(const std::pair &lhs, + const std::pair &rhs) const; + }; + + char *_buf = nullptr; ///< Raw pointer to memory buffer for file content + off64_t _buf_size = 0; ///< Allocated size of _buf + int _fd = -1; ///< File descriptor for permanent/mmap storage + int _n_links = 1; ///< Number of symbolic links to the file + long int _n_close_expected = -1; ///< Target close() operations for commitment + long int _n_close = 0; ///< Current count of close() operations + int _n_opens = 0; ///< Current count of open() operations + int _n_files = 0; ///< Count of dirent64 stored (if directory) + int _n_files_expected = -1; ///< Target dirent64 count (if directory) + + bool _home_node = false; ///< True if this is the home node + bool _directory = false; ///< True if this instance represents a directory + bool _permanent = false; ///< True if file persists after server exit + bool _committed = false; ///< True if file is finalized + bool _first_write = true; ///< True if no data has been written yet + + /// @brief Set of [start, end] pairs representing valid data regions + std::set, compareSectors> _sectors; + + off64_t _real_file_size = 0; ///< Total logical size of the file + + /// @brief List of {Thread ID, FD} pairs associated with this file std::vector> _threads_fd; - bool _committed = false; // whether the file is completed / committed - - /*sync variables*/ - mutable std::mutex _mutex; - mutable std::condition_variable _complete_cv; - mutable std::condition_variable _data_avail_cv; - - off64_t _getStoredSize() const { - auto it = _sectors.rbegin(); - return (it == _sectors.rend()) ? 0 : it->second; - } - bool _first_write = true; - long _n_files = 0; // useful for directories - long _n_files_expected = -1; // useful for directories - - /* - * file size in the home node. In a given moment could not be up-to-date. - * This member is useful because a node different from the home node - * could need to known the size of the file but not its content + + mutable std::mutex _mutex; ///< Synchronization primitive for thread safety + mutable std::condition_variable _committed_cv; ///< Wait for commitment + mutable std::condition_variable _data_avail_cv; ///< Wait for data at specific offsets + + /** + * @brief Internal helper to calculate stored size without locking. + * @return Logical size based on the furthest sector end. */ + off64_t _getStoredSize() const; - std::size_t _real_file_size = 0; + /** + * @brief Reallocates the buffer and copies existing sectors to their correct offsets. + * @param new_p The pointer to the newly allocated memory. + * @param old_p The pointer to the old memory buffer. + */ + void _memcopyCapioFile(char *new_p, char *old_p) const; public: - CapioFile() : _buf_size(0), _directory(false), _permanent(false) {} + /** @brief Default constructor. Initializes an empty file. */ + CapioFile(); - CapioFile(bool directory, long int n_files_expected, bool permanent, off64_t init_size, - long int n_close_expected) - : _buf_size(init_size), _directory(directory), _n_close_expected(n_close_expected), - _permanent(permanent), _n_files_expected(n_files_expected + 2) {} + /** + * @brief Explicit constructor for directory-specific initialization. + * @param directory Whether the file is a directory. + * @param n_files_expected Expected number of entries. + * @param permanent Persistence flag. + * @param init_size Initial buffer allocation size. + * @param n_close_expected Expected number of close calls. + */ + CapioFile(bool directory, int n_files_expected, bool permanent, off64_t init_size, + long int n_close_expected); - CapioFile(bool directory, bool permanent, off64_t init_size, long int n_close_expected = -1) - : _buf_size(init_size), _directory(directory), _n_close_expected(n_close_expected), - _permanent(permanent) {} + /** + * @brief Standard constructor for files. + * @param directory Whether the file is a directory. + * @param permanent Persistence flag. + * @param init_size Initial buffer allocation size. + * @param n_close_expected Expected number of close calls. + */ + CapioFile(bool directory, bool permanent, off64_t init_size, long int n_close_expected); CapioFile(const CapioFile &) = delete; CapioFile &operator=(const CapioFile &) = delete; - ~CapioFile() { - START_LOG(gettid(), "call()"); - LOG("Deleting capio_file"); - - if (_permanent && _home_node) { - if (_directory) { - delete[] _buf; - } else { - int res = munmap(_buf, _buf_size); - if (res == -1) { - ERR_EXIT("munmap CapioFile"); - } - } - } else { - delete[] _buf; - } - } - - [[nodiscard]] bool isCommitted() const { - START_LOG(gettid(), "capio_file is complete? %s", this->_committed ? "true" : "false"); - std::lock_guard lg(_mutex); - return this->_committed; - } - - void waitForCommit() const { - START_LOG(gettid(), "call()"); - LOG("Thread waiting for file to be committed"); - std::unique_lock lock(_mutex); - _complete_cv.wait(lock, [this] { return _committed; }); - } - - void waitForData(long offset) const { - START_LOG(gettid(), "call()"); - LOG("Thread waiting for data to be available"); - std::unique_lock lock(_mutex); - _data_avail_cv.wait(lock, [offset, this] { - return (offset >= this->_getStoredSize()) || this->_committed; - }); - } - - void setCommitted(bool complete = true) { - START_LOG(gettid(), "setting capio_file._complete=%s", complete ? "true" : "false"); - std::lock_guard lg(_mutex); - if (this->_committed != complete) { - this->_committed = complete; - if (this->_committed) { - _complete_cv.notify_all(); - _data_avail_cv.notify_all(); - } - } - } - - void addFd(int tid, int fd) { _threads_fd.emplace_back(tid, fd); } - - void close() { - _n_close++; - _n_opens--; - } - - void dump() { - START_LOG(gettid(), "call()"); - - if (_permanent && !_directory && _home_node) { - off64_t size = getFileSize(); - if (ftruncate(_fd, size) == -1) { - ERR_EXIT("ftruncate commit capio_file"); - } - _buf_size = size; - if (::close(_fd) == -1) { - ERR_EXIT("close commit capio_file"); - } - } - } - - /* - * To be called when a process - * execute a read or a write syscall + /** @brief Destructor. Cleans up allocated buffers and file descriptors. */ + ~CapioFile(); + + /** @return True if the file is committed and read-only. */ + [[nodiscard]] bool isCommitted() const; + + /** @return True if the internal buffer has not yet been allocated. */ + [[nodiscard]] bool bufferToAllocate() const; + + /** @return True if the close count matches expected closes. */ + [[nodiscard]] bool closed() const; + + /** @return True if the file is ready for removal. */ + [[nodiscard]] bool deletable() const; + + /** @return True if this is a directory. */ + [[nodiscard]] bool isDirectory() const; + + /** @return True if no write operations have been performed yet. */ + [[nodiscard]] bool isFirstWrite() const; + + /** @brief Blocks the calling thread until setCommitted() is called. */ + void waitForCommit() const; + + /** + * @brief Blocks until the requested offset is within a valid sector. + * @param offset The file offset to wait for. */ - void createBuffer(const std::filesystem::path &path, bool home_node) { - START_LOG(gettid(), "call(path=%s, home_node=%s)", path.c_str(), - home_node ? "true" : "false"); - std::lock_guard lock(_mutex); - if (this->_buf == nullptr) { - _home_node = home_node; - if (_permanent && home_node) { - if (_directory) { - std::filesystem::create_directory(path); - std::filesystem::permissions(path, std::filesystem::perms::owner_all); - _buf = new char[_buf_size]; - } else { - LOG("creating mem mapped file"); - _fd = ::open(path.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); - if (_fd == -1) { - ERR_EXIT("open %s CapioFile constructor", path.c_str()); - } - if (ftruncate(_fd, _buf_size) == -1) { - ERR_EXIT("ftruncate CapioFile constructor"); - } - _buf = (char *) mmap(nullptr, _buf_size, PROT_READ | PROT_WRITE, MAP_SHARED, - _fd, 0); - if (_buf == MAP_FAILED) { - ERR_EXIT("mmap CapioFile constructor"); - } - } - } else { - _buf = new char[_buf_size]; - } - } - } - - void memcopyCapioFile(char *new_p, char *old_p) const { - for (auto §or : _sectors) { - off64_t lbound = sector.first; - off64_t ubound = sector.second; - off64_t sector_length = ubound - lbound; - memcpy(new_p + lbound, old_p + lbound, sector_length); - } - } - - char *expandBuffer(off64_t data_size) { // TODO: use realloc - off64_t double_size = _buf_size * 2; - off64_t new_size = data_size > double_size ? data_size : double_size; - char *new_buf = new char[new_size]; - std::lock_guard lock(_mutex); - // memcpy(new_p, old_p, file_shm_size); //TODO memcpy only the - // sector - // stored in CapioFile - memcopyCapioFile(new_buf, _buf); - delete[] _buf; - _buf = new_buf; - _buf_size = new_size; - return new_buf; - } - - char *getBuffer() { return _buf; } - - [[nodiscard]] off64_t getBufSize() const { return _buf_size; } - - [[nodiscard]] const std::vector> &getFds() const { return _threads_fd; } - - [[nodiscard]] off64_t getFileSize() const { - std::lock_guard lock(_mutex); - if (!_sectors.empty()) { - return _sectors.rbegin()->second; - } else { - return 0; - } - } - - /* - * Returns the offset to the end of the sector - * if the offset parameter is inside the - * sector, -1 otherwise - * + void waitForData(long offset) const; + + /** + * @brief Marks the file as committed and notifies waiting threads. + * @param commit The new status (defaults to true). */ - [[nodiscard]] off64_t getSectorEnd(off64_t offset) const { - START_LOG(gettid(), "call(offset=%ld)", offset); - - off64_t sector_end = -1; - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - - if (!_sectors.empty() && it != _sectors.begin()) { - --it; - if (offset <= it->second) { - sector_end = it->second; - } - } - - return sector_end; - } - - [[nodiscard]] const std::set, compare> &getSectors() const { - return _sectors; - } - - /* - * get the size of the data stored in this node - * If the node is the home node then this is equals to - * the real size of the file + void setCommitted(bool commit = true); + + /** + * @brief Maps a Thread ID to a specific File Descriptor. + * @param tid Thread ID. + * @param fd File descriptor. */ - [[nodiscard]] off64_t getStoredSize() const { - std::lock_guard lock(_mutex); - return this->_getStoredSize(); - } - - /* - * Insert the new sector automatically modifying the - * existent _sectors if needed. - * - * Params: - * off64_t new_start: the beginning of the sector to insert - * off64_t new_end: the beginning of the sector to insert - * - * new_start must be > new_end otherwise the behaviour - * in undefined - * + void addFd(int tid, int fd); + + /** + * @brief Removes a Thread ID/FD mapping. + * @param tid Thread ID. + * @param fd File descriptor. */ - void insertSector(off64_t new_start, off64_t new_end) { - START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); - - auto p = std::make_pair(new_start, new_end); - std::lock_guard lock(_mutex); - - if (_sectors.empty()) { - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - return; - } - auto it_lbound = _sectors.upper_bound(p); - if (it_lbound == _sectors.begin()) { - if (new_end < it_lbound->first) { - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } else { - auto it = it_lbound; - bool end_before = false; - bool end_inside = false; - while (it != _sectors.end() && !end_before && !end_inside) { - end_before = p.second < it->first; - if (!end_before) { - end_inside = p.second <= it->second; - if (!end_inside) { - ++it; - } - } - } - - if (end_inside) { - p.second = it->second; - ++it; - } - _sectors.erase(it_lbound, it); - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } - } else { - --it_lbound; - auto it = it_lbound; - if (p.first <= it_lbound->second) { - // new sector starts inside a sector - p.first = it_lbound->first; - } else { // in this way the sector will not be deleted - ++it_lbound; - } - bool end_before = false; - bool end_inside = false; - while (it != _sectors.end() && !end_before && !end_inside) { - end_before = p.second < it->first; - if (!end_before) { - end_inside = p.second <= it->second; - if (!end_inside) { - ++it; - } - } - } - - if (end_inside) { - p.second = it->second; - ++it; - } - _sectors.erase(it_lbound, it); - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } - } - - [[nodiscard]] bool closed() const { - return _n_close_expected == -1 || _n_close == _n_close_expected; - } - - [[nodiscard]] bool deletable() const { return _n_opens <= 0; } - - [[nodiscard]] bool isDirectory() const { return _directory; } - - void open() { _n_opens++; } - - /* - * From the manual: - * - * Adjust the file offset to the next location in the file - * greater than or equal to offset containing data. If - * offset points to data, then the file offset is set to - * offset. - * - * Fails if offset points past the end of the file. - * + void removeFd(int tid, int fd); + + /** @brief Increments the internal open counter. */ + void open(); + + /** @brief Increments the _n_close counter, while decrementing the _n_open counter. */ + void close(); + + /** + * @brief Initializes the memory buffer or mmap area. + * @param path Path to the file. + * @param home_node Whether this node is the home for the file. */ - off64_t seekData(off64_t offset) { - if (_sectors.empty()) { - if (offset == 0) { - return 0; - } else { - return -1; - } - } - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - if (it == _sectors.begin()) { - return it->first; - } - --it; - if (offset <= it->second) { - return offset; - } else { - ++it; - if (it == _sectors.end()) { - return -1; - } else { - return it->first; - } - } - } - - /* - * From the manual: - * - * Adjust the file offset to the next hole in the file - * greater than or equal to offset. If offset points into - * the middle of a hole, then the file offset is set to - * offset. If there is no hole past offset, then the file - * offset is adjusted to the end of the file (i.e., there is - * an implicit hole at the end of any file). - * - * - * Fails if offset points past the end of the file. - * + void createBuffer(const std::filesystem::path &path, bool home_node); + + /** + * @brief Resizes the internal buffer to accommodate more data. + * @param data_size Required additional size. + * @return Pointer to the expanded buffer. + */ + char *expandBuffer(off64_t data_size); + + /** @brief Dump _buf buffer to the file system. */ + void dump(); + + /** + * @brief Tracks a new data range in the file. + * @param new_start Starting offset of the data. + * @param new_end Ending offset of the data. + */ + void insertSector(off64_t new_start, off64_t new_end); + + /** + * @brief Fetches data from a remote CAPIO node. + * @param dest Destination node identifier. + * @param offset File offset to read from. + * @param buffer_size Amount of data to fetch. + */ + void readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) const; + + /** + * @brief Transfers data from an SPSC queue into the file buffer. + * @param queue Source queue. + * @param offset Destination offset in this file. + * @param num_bytes Number of bytes to transfer. + */ + void readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) const; + + /** @brief Explicitly sets the total file size. */ + void setRealFileSize(off64_t size); + + /** @brief Marks that at least one write has occurred. */ + void registerFirstWrite(); + + /** + * @brief Increases the count of files contained in this directory. + * @param count the number to increase the internal counter + */ + void incrementDirectoryFileCount(int count = 1); + + /** @return Pointer to the raw memory buffer. */ + char *getBuffer() const; + + /** @return Vector of TID/FD pairs. */ + [[nodiscard]] const std::vector> &getFds() const; + + /** @return The physical size of the current buffer. */ + [[nodiscard]] off64_t getBufSize() const; + + /** @return The logical size of the file. */ + [[nodiscard]] off64_t getRealFileSize() const; + + /** @return The total size, accounting for holes and metadata. */ + [[nodiscard]] off64_t getFileSize() const; + + /** @return Size of data currently residing on this node. */ + [[nodiscard]] off64_t getStoredSize() const; + + /** @return Count of files currently indexed in this directory. */ + [[nodiscard]] int getCurrentDirectoryFileCount() const; + + /** @return Expected total files in this directory. */ + [[nodiscard]] int getDirectoryExpectedFileCount() const; + + /** @return Reference to the internal sector map. */ + [[nodiscard]] const std::set, compareSectors> &getSectors() const; + + /** + * @brief Finds the end of the sector containing the offset. + * @param offset Position to check. + * @return End offset of the sector, or -1 if in a hole. + */ + [[nodiscard]] off64_t getSectorEnd(off64_t offset) const; + + /** + * @brief Finds the next data segment. + * @param offset Start searching from here. + * @return Offset of data, or error if beyond end of file. */ - [[nodiscard]] off64_t seekHole(off64_t offset) const { - if (_sectors.empty()) { - if (offset == 0) { - return 0; - } else { - return -1; - } - } - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - if (it == _sectors.begin()) { - return offset; - } - --it; - if (offset <= it->second) { - return it->second; - } else { - ++it; - if (it == _sectors.end()) { - return -1; - } else { - return offset; - } - } - } - - void removeFd(int tid, int fd) { - auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); - if (it != _threads_fd.end()) { - _threads_fd.erase(it); - } - } + off64_t seekData(off64_t offset); /** - * Save data inside the capio_file buffer - * @param buffer - * @return + * @brief Finds the next hole in the file. + * @param offset Start searching from here. + * @return Offset of the hole. */ - void readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) { - std::unique_lock lock(_mutex); - backend->recv_file(_buf + offset, dest, buffer_size); - _data_avail_cv.notify_all(); - } - - void readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) { - START_LOG(gettid(), "call()"); - - std::unique_lock lock(_mutex); - queue.read(_buf + offset, num_bytes); - _data_avail_cv.notify_all(); - } - - // TODO: Understand the scope of this method and find a better name - std::size_t getRealFileSize() const { - std::lock_guard lg(_mutex); - return this->_real_file_size; - } - - void setRealFileSize(const off64_t size) { - std::lock_guard lg(_mutex); - this->_real_file_size = size; - } - - bool isFirstWrite() const { - std::lock_guard lg(_mutex); - return this->_first_write; - } - - // TODO: add a dedicated CapioFile::write() and CapioFile::read() method, remove this method - // from public scope - void registerFirstWrite() { - std::lock_guard lg(_mutex); - this->_first_write = false; - } - - void incrementDirectoryFileCount(const int count = 1) { - std::lock_guard lg(_mutex); - this->_n_files += count; - } - - long getCurrentDirectoryFileCount() const { - std::lock_guard lg(_mutex); - return this->_n_files; - } - - long getDirectoryExpectedFileCount() const { - std::lock_guard lg(_mutex); - return this->_n_files_expected; - } + [[nodiscard]] off64_t seekHole(off64_t offset) const; }; -#endif // CAPIO_SERVER_STORAGE_CAPIO_FILE_HPP \ No newline at end of file +#endif // CAPIO_SERVER_CAPIO_FILE_HPP \ No newline at end of file diff --git a/capio/server/include/utils/location.hpp b/capio/server/include/utils/location.hpp index c43489a7e..5d573f950 100644 --- a/capio/server/include/utils/location.hpp +++ b/capio/server/include/utils/location.hpp @@ -1,12 +1,15 @@ #ifndef CAPIO_SERVER_UTILS_LOCATIONS_HPP #define CAPIO_SERVER_UTILS_LOCATIONS_HPP +#include "remote/backend.hpp" + #include #include #include "utils/types.hpp" extern char *node_name; +extern Backend *backend; constexpr char CAPIO_SERVER_FILES_LOCATION_NAME[] = "files_location_%s.txt"; constexpr char CAPIO_SERVER_INVALIDATE_FILE_PATH_CHAR = '#'; diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp new file mode 100644 index 000000000..048a8de95 --- /dev/null +++ b/capio/server/src/capio_file.cpp @@ -0,0 +1,383 @@ +#include "server/include/storage/capio_file.hpp" +#include "common/logger.hpp" +#include "remote/backend.hpp" +#include "server/include/utils/common.hpp" +#include "utils/common.hpp" + +#include + +bool CapioFile::compareSectors::operator()(const std::pair &lhs, + const std::pair &rhs) const { + return (lhs.first < rhs.first); +} + +CapioFile::CapioFile() = default; + +CapioFile::CapioFile(const bool directory, const int n_files_expected, const bool permanent, + const off64_t init_size, const long int n_close_expected) + : _buf_size(init_size), _n_close_expected(n_close_expected), + _n_files_expected(n_files_expected + 2), _directory(directory), _permanent(permanent) {} + +CapioFile::CapioFile(const bool directory, const bool permanent, const off64_t init_size, + const long int n_close_expected) + : _buf_size(init_size), _n_close_expected(n_close_expected), _directory(directory), + _permanent(permanent) {} + +CapioFile::~CapioFile() { + START_LOG(gettid(), "call()"); + LOG("Deleting capio_file"); + + if (_permanent && _home_node) { + if (_directory) { + delete[] _buf; + } else { + if (munmap(_buf, _buf_size) == -1) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "WARN: unable to unmap CapioFile: " + std::string(strerror(errno))); + } + } + } else { + delete[] _buf; + } +} + +bool CapioFile::isCommitted() const { + START_LOG(gettid(), "capio_file is complete? %s", this->_committed ? "true" : "false"); + std::lock_guard lg(_mutex); + return this->_committed; +} + +void CapioFile::waitForData(long offset) const { + START_LOG(gettid(), "call()"); + LOG("Thread waiting for data to be available"); + std::unique_lock lock(_mutex); + _data_avail_cv.wait(lock, + [offset, this] { return this->_getStoredSize() >= offset || _committed; }); +} + +void CapioFile::setCommitted(bool commit) { + START_LOG(gettid(), "setting capio_file._complete=%s", commit ? "true" : "false"); + std::lock_guard lg(_mutex); + if (this->_committed != commit) { + this->_committed = commit; + if (this->_committed) { + _committed_cv.notify_all(); + _data_avail_cv.notify_all(); + } + } +} + +void CapioFile::addFd(int tid, int fd) { _threads_fd.emplace_back(tid, fd); } + +void CapioFile::waitForCommit() const { + START_LOG(gettid(), "call()"); + LOG("Thread waiting for file to be committed"); + std::unique_lock lock(_mutex); + _committed_cv.wait(lock, [this] { return _committed; }); +} + +void CapioFile::close() { + _n_close++; + _n_opens--; +} + +void CapioFile::dump() { + START_LOG(gettid(), "call()"); + + if (_permanent && !_directory && _home_node) { + off64_t size = getFileSize(); + if (ftruncate(_fd, size) == -1) { + ERR_EXIT("ftruncate commit capio_file"); + } + _buf_size = size; + if (::close(_fd) == -1) { + ERR_EXIT("close commit capio_file"); + } + } +} + +void CapioFile::createBuffer(const std::filesystem::path &path, const bool home_node) { + START_LOG(gettid(), "call(path=%s, home_node=%s)", path.c_str(), home_node ? "true" : "false"); + std::lock_guard lock(_mutex); + if (this->_buf == nullptr) { + _home_node = home_node; + if (_permanent && home_node) { + if (_directory) { + std::filesystem::create_directory(path); + std::filesystem::permissions(path, std::filesystem::perms::owner_all); + _buf = new char[_buf_size]; + } else { + LOG("creating mem mapped file"); + _fd = ::open(path.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); + if (_fd == -1) { + ERR_EXIT("open %s CapioFile constructor", path.c_str()); + } + if (ftruncate(_fd, _buf_size) == -1) { + ERR_EXIT("ftruncate CapioFile constructor"); + } + _buf = + (char *) mmap(nullptr, _buf_size, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0); + if (_buf == MAP_FAILED) { + ERR_EXIT("mmap CapioFile constructor"); + } + } + } else { + _buf = new char[_buf_size]; + } + } +} + +void CapioFile::_memcopyCapioFile(char *new_p, char *old_p) const { + for (auto §or : _sectors) { + off64_t lbound = sector.first; + off64_t ubound = sector.second; + off64_t sector_length = ubound - lbound; + memcpy(new_p + lbound, old_p + lbound, sector_length); + } +} + +char *CapioFile::expandBuffer(const off64_t data_size) { + const off64_t double_size = _buf_size * 2; + const off64_t new_size = std::max(data_size, double_size); + const auto new_buf = new char[new_size]; + std::lock_guard lock(_mutex); + _memcopyCapioFile(new_buf, _buf); + delete[] _buf; + _buf = new_buf; + _buf_size = new_size; + return new_buf; +} + +char *CapioFile::getBuffer() const { return _buf; } + +off64_t CapioFile::getBufSize() const { return _buf_size; } + +const std::vector> &CapioFile::getFds() const { return _threads_fd; } + +off64_t CapioFile::getFileSize() const { + std::lock_guard lock(_mutex); + if (!_sectors.empty()) { + return _sectors.rbegin()->second; + } else { + return 0; + } +} + +off64_t CapioFile::getSectorEnd(off64_t offset) const { + START_LOG(gettid(), "call(offset=%ld)", offset); + + off64_t sector_end = -1; + auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + + if (!_sectors.empty() && it != _sectors.begin()) { + --it; + if (offset <= it->second) { + sector_end = it->second; + } + } + + return sector_end; +} + +const std::set, CapioFile::compareSectors> & +CapioFile::getSectors() const { + return _sectors; +} + +off64_t CapioFile::getStoredSize() const { + std::lock_guard lock(_mutex); + return this->_getStoredSize(); +} + +void CapioFile::insertSector(off64_t new_start, off64_t new_end) { + START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); + + auto p = std::make_pair(new_start, new_end); + std::lock_guard lock(_mutex); + + if (_sectors.empty()) { + LOG("Insert sector <%ld, %ld>", p.first, p.second); + _sectors.insert(p); + return; + } + auto it_lbound = _sectors.upper_bound(p); + if (it_lbound == _sectors.begin()) { + if (new_end < it_lbound->first) { + LOG("Insert sector <%ld, %ld>", p.first, p.second); + _sectors.insert(p); + } else { + auto it = it_lbound; + bool end_before = false; + bool end_inside = false; + while (it != _sectors.end() && !end_before && !end_inside) { + end_before = p.second < it->first; + if (!end_before) { + end_inside = p.second <= it->second; + if (!end_inside) { + ++it; + } + } + } + + if (end_inside) { + p.second = it->second; + ++it; + } + _sectors.erase(it_lbound, it); + LOG("Insert sector <%ld, %ld>", p.first, p.second); + _sectors.insert(p); + } + } else { + --it_lbound; + auto it = it_lbound; + if (p.first <= it_lbound->second) { + // new sector starts inside a sector + p.first = it_lbound->first; + } else { // in this way the sector will not be deleted + ++it_lbound; + } + bool end_before = false; + bool end_inside = false; + while (it != _sectors.end() && !end_before && !end_inside) { + end_before = p.second < it->first; + if (!end_before) { + end_inside = p.second <= it->second; + if (!end_inside) { + ++it; + } + } + } + + if (end_inside) { + p.second = it->second; + ++it; + } + _sectors.erase(it_lbound, it); + LOG("Insert sector <%ld, %ld>", p.first, p.second); + _sectors.insert(p); + } +} + +bool CapioFile::closed() const { return _n_close_expected == -1 || _n_close == _n_close_expected; } + +bool CapioFile::deletable() const { return _n_opens <= 0; } + +bool CapioFile::isDirectory() const { return _directory; } + +void CapioFile::open() { _n_opens++; } + +off64_t CapioFile::seekData(off64_t offset) { + if (_sectors.empty()) { + if (offset == 0) { + return 0; + } else { + return -1; + } + } + auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + if (it == _sectors.begin()) { + return it->first; + } + --it; + if (offset <= it->second) { + return offset; + } else { + ++it; + if (it == _sectors.end()) { + return -1; + } else { + return it->first; + } + } +} + +off64_t CapioFile::seekHole(off64_t offset) const { + if (_sectors.empty()) { + if (offset == 0) { + return 0; + } else { + return -1; + } + } + auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + if (it == _sectors.begin()) { + return offset; + } + --it; + if (offset <= it->second) { + return it->second; + } else { + ++it; + if (it == _sectors.end()) { + return -1; + } else { + return offset; + } + } +} + +void CapioFile::removeFd(int tid, int fd) { + auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); + if (it != _threads_fd.end()) { + _threads_fd.erase(it); + } +} + +void CapioFile::readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) const { + std::unique_lock lock(_mutex); + backend->recv_file(_buf + offset, dest, buffer_size); + _data_avail_cv.notify_all(); +} + +void CapioFile::readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) const { + START_LOG(gettid(), "call()"); + + std::unique_lock lock(_mutex); + queue.read(_buf + offset, num_bytes); + _data_avail_cv.notify_all(); +} + +off64_t CapioFile::_getStoredSize() const { + const auto it = _sectors.rbegin(); + return (it == _sectors.rend()) ? 0 : it->second; +} + +bool CapioFile::bufferToAllocate() const { + std::lock_guard lg(_mutex); + return _buf == nullptr; +} + +off64_t CapioFile::getRealFileSize() const { + std::lock_guard lg(_mutex); + return this->_real_file_size; +} + +void CapioFile::setRealFileSize(const off64_t size) { + std::lock_guard lg(_mutex); + this->_real_file_size = size; +} + +bool CapioFile::isFirstWrite() const { + std::lock_guard lg(_mutex); + return this->_first_write; +} + +void CapioFile::registerFirstWrite() { + std::lock_guard lg(_mutex); + this->_first_write = false; +} + +void CapioFile::incrementDirectoryFileCount(const int count) { + std::lock_guard lg(_mutex); + this->_n_files += count; +} + +int CapioFile::getCurrentDirectoryFileCount() const { + std::lock_guard lg(_mutex); + return this->_n_files; +} + +int CapioFile::getDirectoryExpectedFileCount() const { + std::lock_guard lg(_mutex); + return this->_n_files_expected; +} diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index c1e6d4c62..29bc25c79 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -1,8 +1,8 @@ -#ifndef CAPIO_CAPIO_FILE_HPP -#define CAPIO_CAPIO_FILE_HPP #include "server/include/storage/capio_file.hpp" #include "common/env.hpp" + #include +#include TEST(ServerTest, TestInsertSingleSector) { CapioFile c_file; @@ -59,4 +59,219 @@ TEST(ServerTest, TestInsertTwoOverlappingSectorsNested) { EXPECT_EQ(sectors.size(), 1); EXPECT_NE(sectors.find({1L, 4L}), sectors.end()); } -#endif // CAPIO_CAPIO_FILE_HPP + +TEST(ServerTest, TestDestructionOfPermanentCapioFile) { + auto *c_file = new CapioFile(false, true, 1000, 1); + c_file->createBuffer("test.dat", true); + delete c_file; + EXPECT_TRUE(std::filesystem::exists("test.dat")); + std::filesystem::remove("test.dat"); +} + +TEST(ServerTest, TestDestructionOfPermanentCapioFileDirectory) { + auto *c_file = new CapioFile(true, true, 1000, 1); + c_file->createBuffer("mydirectory", true); + delete c_file; + EXPECT_TRUE(std::filesystem::exists("mydirectory")); + EXPECT_TRUE(std::filesystem::is_directory("mydirectory")); + std::filesystem::remove("mydirectory"); +} + +TEST(ServerTest, TestCapioFileWaitForDataMultithreaded) { + CapioFile file; + + SPSCQueue queue("test_queue", get_cache_lines(), get_cache_line_size(), "test_wf"); + + std::mutex _lock; + _lock.lock(); + + std::thread t([&_lock, &file, &queue] { + _lock.lock(); + file.expandBuffer(1000); + file.registerFirstWrite(); + + EXPECT_NE(file.getBuffer(), nullptr); + char buffer[1000]; + for (std::size_t i = 0; i < 1000; ++i) { + buffer[i] = 33 + (i % 93); + } + + queue.write(buffer, 1000); + file.insertSector(0, 1000); + file.readFromQueue(queue, 0, 1000); + }); + + _lock.unlock(); + file.waitForData(1000); + + auto stored_size = file.getFileSize(); + EXPECT_EQ(stored_size, 1000); + + const auto buf = file.getBuffer(); + EXPECT_NE(buf, nullptr); + for (std::size_t i = 0; i < 1000; ++i) { + EXPECT_EQ(buf[i], 33 + (i % 93)); + } + + t.join(); +} + +TEST(ServerTest, TestCapioFileWaitForCompletion) { + CapioFile file; + + std::mutex _lock; + _lock.lock(); + + std::thread t([&] { + _lock.lock(); + file.expandBuffer(1000); + file.registerFirstWrite(); + + EXPECT_NE(file.getBuffer(), nullptr); + char buffer[1000]; + for (std::size_t i = 0; i < 1000; ++i) { + buffer[i] = 33 + (i % 93); + } + + memcpy(file.getBuffer(), buffer, 1000); + + file.insertSector(0, 1000); + file.setCommitted(); + }); + + _lock.unlock(); + file.waitForCommit(); + + auto stored_size = file.getFileSize(); + EXPECT_EQ(stored_size, 1000); + + const auto buf = file.getBuffer(); + EXPECT_NE(buf, nullptr); + for (std::size_t i = 0; i < 1000; ++i) { + EXPECT_EQ(buf[i], 33 + (i % 93)); + } + + t.join(); +} + +TEST(ServerTest, TestCommitCapioFile) { + auto file = new CapioFile(false, true, 1000, 1); + file->createBuffer("test.dat", true); + EXPECT_EQ(std::filesystem::file_size("test.dat"), 1000); + file->close(); + file->dump(); + EXPECT_EQ(std::filesystem::file_size("test.dat"), 0); + delete file; + EXPECT_TRUE(std::filesystem::exists("test.dat")); + std::filesystem::remove("test.dat"); +} + +TEST(ServerTest, TestCommitAndDeleteDirectory) { + EXPECT_FALSE(std::filesystem::exists("mydir")); + auto file = new CapioFile(true, true, 1000, 1); + file->createBuffer("mydir", true); + EXPECT_TRUE(std::filesystem::exists("mydir")); + EXPECT_TRUE(std::filesystem::is_directory("mydir")); + delete file; + EXPECT_TRUE(std::filesystem::exists("mydir")); + std::filesystem::remove("mydir"); +} + +TEST(ServerTest, TesMemcpyCapioFile) { + CapioFile file; + + file.createBuffer("test.dat", true); + + // NOTE: here we only simulate a write operation on the file, without actually writing to _buf + file.insertSector(0, 100); + file.insertSector(100, 200); + + file.expandBuffer(2000); + + EXPECT_EQ(file.getStoredSize(), 200); + EXPECT_EQ(file.getBufSize(), 2000); +} + +TEST(ServerTest, TestCloseCapioFile) { + CapioFile file(false, false, 0, -1); + EXPECT_TRUE(file.closed()); // TEST for n_close_expected == -1 + + CapioFile file1(false, false, 0, 10); + EXPECT_FALSE(file1.closed()); + for (std::size_t i = 0; i < 10; ++i) { + file1.open(); + EXPECT_FALSE(file1.closed()); + file1.close(); + } + EXPECT_TRUE(file1.closed()); +} + +TEST(ServerTest, TestCapioFileSeekData) { + CapioFile file; + + EXPECT_EQ(file.seekData(100), -1); + EXPECT_EQ(file.seekData(0), 0); + + file.insertSector(0, 1000); + EXPECT_EQ(file.seekData(100), 100); + EXPECT_EQ(file.seekData(2000), -1); + file.insertSector(2000, 3000); + EXPECT_NE(file.seekData(1500), 1500); // return here the closest offset... + + CapioFile file1; + file1.insertSector(200, 300); + EXPECT_EQ(file1.seekData(1), 200); +} + +TEST(ServerTest, TestCapioFileSeekHole) { + CapioFile file; + + EXPECT_EQ(file.seekHole(100), -1); + EXPECT_EQ(file.seekHole(0), 0); + file.insertSector(0, 1000); + EXPECT_EQ(file.seekHole(100), 1000); + EXPECT_EQ(file.seekHole(2000), -1); + file.insertSector(2000, 3000); + EXPECT_EQ(file.seekHole(1500), 1500); // return here the closest offset... + + CapioFile file1; + file1.insertSector(200, 300); + EXPECT_EQ(file1.seekHole(1), 1); +} + +TEST(ServerTest, TestAddAndRemoveFD) { + CapioFile file; + file.addFd(12345, 4); + file.addFd(12345, 5); + + file.removeFd(12345, 6); + EXPECT_EQ(file.getFds().size(), 2); + file.removeFd(12345, 5); + EXPECT_EQ(file.getFds().size(), 1); + file.removeFd(12345, 4); + EXPECT_EQ(file.getFds().size(), 0); +} + +TEST(ServerTest, TestSetGetRealFileSize) { + CapioFile file; + EXPECT_EQ(file.getRealFileSize(), 0); + file.setRealFileSize(1234); + EXPECT_EQ(file.getRealFileSize(), 1234); +} + +TEST(ServerTest, TestDeletePermanentDirectory) { + const auto file = new CapioFile(true, true, 1000, 1); + file->createBuffer("testDir", true); + delete file; + EXPECT_TRUE(std::filesystem::exists("testDir")); + EXPECT_TRUE(std::filesystem::is_directory("testDir")); + std::filesystem::remove("testDir"); +} + +TEST(ServerTest, TestFileSetCommitToFalse) { + CapioFile file; + file.setCommitted(); + EXPECT_TRUE(file.isCommitted()); + file.setCommitted(false); + EXPECT_FALSE(file.isCommitted()); +} From 6fc2a7e86f7d3c4c657f90bc84b144e958489014 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 18 Mar 2026 22:57:50 +0000 Subject: [PATCH 02/15] Removed useless locks --- capio/server/include/storage/capio_file.hpp | 30 ++++++++++----------- capio/server/src/capio_file.cpp | 5 +--- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 949f3efd4..faed83b07 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -25,21 +25,21 @@ class CapioFile { const std::pair &rhs) const; }; - char *_buf = nullptr; ///< Raw pointer to memory buffer for file content - off64_t _buf_size = 0; ///< Allocated size of _buf - int _fd = -1; ///< File descriptor for permanent/mmap storage - int _n_links = 1; ///< Number of symbolic links to the file - long int _n_close_expected = -1; ///< Target close() operations for commitment - long int _n_close = 0; ///< Current count of close() operations - int _n_opens = 0; ///< Current count of open() operations - int _n_files = 0; ///< Count of dirent64 stored (if directory) - int _n_files_expected = -1; ///< Target dirent64 count (if directory) - - bool _home_node = false; ///< True if this is the home node - bool _directory = false; ///< True if this instance represents a directory - bool _permanent = false; ///< True if file persists after server exit - bool _committed = false; ///< True if file is finalized - bool _first_write = true; ///< True if no data has been written yet + char *_buf = nullptr; ///< Raw pointer to memory buffer for file content + off64_t _buf_size = 0; ///< Allocated size of _buf + int _fd = -1; ///< File descriptor for permanent/mmap storage + int _n_links = 1; ///< Number of symbolic links to the file + const long int _n_close_expected = -1; ///< Target close() operations for commitment + long int _n_close = 0; ///< Current count of close() operations + int _n_opens = 0; ///< Current count of open() operations + int _n_files = 0; ///< Count of dirent64 stored (if directory) + const int _n_files_expected = -1; ///< Target dirent64 count (if directory) + + bool _home_node = false; ///< True if this is the home node + const bool _directory = false; ///< True if this instance represents a directory + const bool _permanent = false; ///< True if file persists after server exit + bool _committed = false; ///< True if file is finalized + bool _first_write = true; ///< True if no data has been written yet /// @brief Set of [start, end] pairs representing valid data regions std::set, compareSectors> _sectors; diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 048a8de95..8505e807f 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -377,7 +377,4 @@ int CapioFile::getCurrentDirectoryFileCount() const { return this->_n_files; } -int CapioFile::getDirectoryExpectedFileCount() const { - std::lock_guard lg(_mutex); - return this->_n_files_expected; -} +int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } From a4e73597f2a7d128dc72b57f1215aa7ff1c6a17c Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 08:51:01 +0000 Subject: [PATCH 03/15] Added mmissing lock guards --- capio/server/include/storage/capio_file.hpp | 21 +++--- capio/server/src/capio_file.cpp | 84 ++++++++++----------- 2 files changed, 49 insertions(+), 56 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index faed83b07..b0a4f0aa2 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -1,6 +1,7 @@ #ifndef CAPIO_SERVER_CAPIO_FILE_HPP #define CAPIO_SERVER_CAPIO_FILE_HPP +#include #include #include #include @@ -30,21 +31,21 @@ class CapioFile { int _fd = -1; ///< File descriptor for permanent/mmap storage int _n_links = 1; ///< Number of symbolic links to the file const long int _n_close_expected = -1; ///< Target close() operations for commitment - long int _n_close = 0; ///< Current count of close() operations - int _n_opens = 0; ///< Current count of open() operations - int _n_files = 0; ///< Count of dirent64 stored (if directory) + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - bool _home_node = false; ///< True if this is the home node - const bool _directory = false; ///< True if this instance represents a directory - const bool _permanent = false; ///< True if file persists after server exit - bool _committed = false; ///< True if file is finalized - bool _first_write = true; ///< True if no data has been written yet + bool _home_node = false; ///< True if this is the home node + const bool _directory = false; ///< True if this instance represents a directory + const bool _permanent = false; ///< True if file persists after server exit + bool _committed = false; ///< True if file is finalized + std::atomic _first_write = true; ///< True if no data has been written yet /// @brief Set of [start, end] pairs representing valid data regions std::set, compareSectors> _sectors; - off64_t _real_file_size = 0; ///< Total logical size of the file + std::atomic _real_file_size = 0; ///< Total logical size of the file /// @brief List of {Thread ID, FD} pairs associated with this file std::vector> _threads_fd; @@ -64,7 +65,7 @@ class CapioFile { * @param new_p The pointer to the newly allocated memory. * @param old_p The pointer to the old memory buffer. */ - void _memcopyCapioFile(char *new_p, char *old_p) const; + void _memcopyCapioFile(char *new_p, const char *old_p) const; public: /** @brief Default constructor. Initializes an empty file. */ diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 8505e807f..486748411 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -2,7 +2,6 @@ #include "common/logger.hpp" #include "remote/backend.hpp" #include "server/include/utils/common.hpp" -#include "utils/common.hpp" #include @@ -55,7 +54,7 @@ void CapioFile::waitForData(long offset) const { [offset, this] { return this->_getStoredSize() >= offset || _committed; }); } -void CapioFile::setCommitted(bool commit) { +void CapioFile::setCommitted(const bool commit) { START_LOG(gettid(), "setting capio_file._complete=%s", commit ? "true" : "false"); std::lock_guard lg(_mutex); if (this->_committed != commit) { @@ -67,7 +66,11 @@ void CapioFile::setCommitted(bool commit) { } } -void CapioFile::addFd(int tid, int fd) { _threads_fd.emplace_back(tid, fd); } +void CapioFile::addFd(int tid, int fd) { + START_LOG(gettid(), "call(tid=%d, fd=%d)", tid, fd); + std::lock_guard lg(_mutex); + _threads_fd.emplace_back(tid, fd); +} void CapioFile::waitForCommit() const { START_LOG(gettid(), "call()"); @@ -77,15 +80,17 @@ void CapioFile::waitForCommit() const { } void CapioFile::close() { - _n_close++; - _n_opens--; + START_LOG(gettid(), "call()"); + ++_n_close; + --_n_opens; + LOG("AFTER: _n_close=%d / _n_opens=%d", _n_close.load(), _n_opens.load()); } void CapioFile::dump() { START_LOG(gettid(), "call()"); if (_permanent && !_directory && _home_node) { - off64_t size = getFileSize(); + const off64_t size = getFileSize(); if (ftruncate(_fd, size) == -1) { ERR_EXIT("ftruncate commit capio_file"); } @@ -115,8 +120,8 @@ void CapioFile::createBuffer(const std::filesystem::path &path, const bool home_ if (ftruncate(_fd, _buf_size) == -1) { ERR_EXIT("ftruncate CapioFile constructor"); } - _buf = - (char *) mmap(nullptr, _buf_size, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0); + _buf = static_cast( + mmap(nullptr, _buf_size, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0)); if (_buf == MAP_FAILED) { ERR_EXIT("mmap CapioFile constructor"); } @@ -127,12 +132,12 @@ void CapioFile::createBuffer(const std::filesystem::path &path, const bool home_ } } -void CapioFile::_memcopyCapioFile(char *new_p, char *old_p) const { - for (auto §or : _sectors) { - off64_t lbound = sector.first; - off64_t ubound = sector.second; - off64_t sector_length = ubound - lbound; - memcpy(new_p + lbound, old_p + lbound, sector_length); +void CapioFile::_memcopyCapioFile(char *new_p, const char *old_p) const { + // NOTE: do not lock here: this method is private and when reaching control in this function, + // the caller has already locked the mutex + START_LOG(gettid(), "call()"); + for (const auto &[lbound, ubound] : _sectors) { + memcpy(new_p + lbound, old_p + lbound, ubound - lbound); } } @@ -140,6 +145,7 @@ char *CapioFile::expandBuffer(const off64_t data_size) { const off64_t double_size = _buf_size * 2; const off64_t new_size = std::max(data_size, double_size); const auto new_buf = new char[new_size]; + std::lock_guard lock(_mutex); _memcopyCapioFile(new_buf, _buf); delete[] _buf; @@ -167,9 +173,9 @@ off64_t CapioFile::getSectorEnd(off64_t offset) const { START_LOG(gettid(), "call(offset=%ld)", offset); off64_t sector_end = -1; - auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - - if (!_sectors.empty() && it != _sectors.begin()) { + std::lock_guard lock(_mutex); + if (auto it = _sectors.upper_bound(std::make_pair(offset, 0)); + !_sectors.empty() && it != _sectors.begin()) { --it; if (offset <= it->second) { sector_end = it->second; @@ -200,8 +206,7 @@ void CapioFile::insertSector(off64_t new_start, off64_t new_end) { _sectors.insert(p); return; } - auto it_lbound = _sectors.upper_bound(p); - if (it_lbound == _sectors.begin()) { + if (auto it_lbound = _sectors.upper_bound(p); it_lbound == _sectors.begin()) { if (new_end < it_lbound->first) { LOG("Insert sector <%ld, %ld>", p.first, p.second); _sectors.insert(p); @@ -264,9 +269,11 @@ bool CapioFile::deletable() const { return _n_opens <= 0; } bool CapioFile::isDirectory() const { return _directory; } -void CapioFile::open() { _n_opens++; } +void CapioFile::open() { ++_n_opens; } off64_t CapioFile::seekData(off64_t offset) { + START_LOG(gettid(), "call(offset=%ld)", offset); + std::lock_guard lock(_mutex); if (_sectors.empty()) { if (offset == 0) { return 0; @@ -292,6 +299,8 @@ off64_t CapioFile::seekData(off64_t offset) { } off64_t CapioFile::seekHole(off64_t offset) const { + START_LOG(gettid(), "call(offset=%ld)", offset); + std::lock_guard lock(_mutex); if (_sectors.empty()) { if (offset == 0) { return 0; @@ -317,8 +326,9 @@ off64_t CapioFile::seekHole(off64_t offset) const { } void CapioFile::removeFd(int tid, int fd) { - auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); - if (it != _threads_fd.end()) { + std::unique_lock lock(_mutex); + if (const auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); + it != _threads_fd.end()) { _threads_fd.erase(it); } } @@ -347,34 +357,16 @@ bool CapioFile::bufferToAllocate() const { return _buf == nullptr; } -off64_t CapioFile::getRealFileSize() const { - std::lock_guard lg(_mutex); - return this->_real_file_size; -} +off64_t CapioFile::getRealFileSize() const { return this->_real_file_size; } -void CapioFile::setRealFileSize(const off64_t size) { - std::lock_guard lg(_mutex); - this->_real_file_size = size; -} +void CapioFile::setRealFileSize(const off64_t size) { this->_real_file_size = size; } -bool CapioFile::isFirstWrite() const { - std::lock_guard lg(_mutex); - return this->_first_write; -} +bool CapioFile::isFirstWrite() const { return this->_first_write; } -void CapioFile::registerFirstWrite() { - std::lock_guard lg(_mutex); - this->_first_write = false; -} +void CapioFile::registerFirstWrite() { this->_first_write = false; } -void CapioFile::incrementDirectoryFileCount(const int count) { - std::lock_guard lg(_mutex); - this->_n_files += count; -} +void CapioFile::incrementDirectoryFileCount(const int count) { this->_n_files += count; } -int CapioFile::getCurrentDirectoryFileCount() const { - std::lock_guard lg(_mutex); - return this->_n_files; -} +int CapioFile::getCurrentDirectoryFileCount() const { return this->_n_files; } int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } From 61372c0723bde37b87b41addb4da7b844ce44808 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 09:07:59 +0000 Subject: [PATCH 04/15] Added shared mutex when possible --- capio/server/include/storage/capio_file.hpp | 6 ++++-- capio/server/src/capio_file.cpp | 18 ++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index b0a4f0aa2..6f20d2d32 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -1,11 +1,11 @@ #ifndef CAPIO_SERVER_CAPIO_FILE_HPP #define CAPIO_SERVER_CAPIO_FILE_HPP - #include #include #include #include #include +#include #include #include #include @@ -44,11 +44,13 @@ class CapioFile { /// @brief Set of [start, end] pairs representing valid data regions std::set, compareSectors> _sectors; + mutable std::shared_mutex _mutex_sectors; ///< Shared lock to get concurrent access to _sectors std::atomic _real_file_size = 0; ///< Total logical size of the file /// @brief List of {Thread ID, FD} pairs associated with this file std::vector> _threads_fd; + std::mutex _mutex_threads_fd; ///< Mutex to access _threads_fd mutable std::mutex _mutex; ///< Synchronization primitive for thread safety mutable std::condition_variable _committed_cv; ///< Wait for commitment @@ -241,7 +243,7 @@ class CapioFile { * @param offset Start searching from here. * @return Offset of data, or error if beyond end of file. */ - off64_t seekData(off64_t offset); + off64_t seekData(off64_t offset) const; /** * @brief Finds the next hole in the file. diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 486748411..789d2d34b 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -2,6 +2,7 @@ #include "common/logger.hpp" #include "remote/backend.hpp" #include "server/include/utils/common.hpp" +#include "utils/shared_mutex.hpp" #include @@ -68,7 +69,7 @@ void CapioFile::setCommitted(const bool commit) { void CapioFile::addFd(int tid, int fd) { START_LOG(gettid(), "call(tid=%d, fd=%d)", tid, fd); - std::lock_guard lg(_mutex); + std::lock_guard lg(_mutex_threads_fd); _threads_fd.emplace_back(tid, fd); } @@ -161,7 +162,7 @@ off64_t CapioFile::getBufSize() const { return _buf_size; } const std::vector> &CapioFile::getFds() const { return _threads_fd; } off64_t CapioFile::getFileSize() const { - std::lock_guard lock(_mutex); + const shared_lock_guard slg(_mutex_sectors); if (!_sectors.empty()) { return _sectors.rbegin()->second; } else { @@ -173,7 +174,7 @@ off64_t CapioFile::getSectorEnd(off64_t offset) const { START_LOG(gettid(), "call(offset=%ld)", offset); off64_t sector_end = -1; - std::lock_guard lock(_mutex); + const shared_lock_guard slg(_mutex_sectors); if (auto it = _sectors.upper_bound(std::make_pair(offset, 0)); !_sectors.empty() && it != _sectors.begin()) { --it; @@ -199,7 +200,7 @@ void CapioFile::insertSector(off64_t new_start, off64_t new_end) { START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); auto p = std::make_pair(new_start, new_end); - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex_sectors); if (_sectors.empty()) { LOG("Insert sector <%ld, %ld>", p.first, p.second); @@ -271,9 +272,9 @@ bool CapioFile::isDirectory() const { return _directory; } void CapioFile::open() { ++_n_opens; } -off64_t CapioFile::seekData(off64_t offset) { +off64_t CapioFile::seekData(off64_t offset) const { START_LOG(gettid(), "call(offset=%ld)", offset); - std::lock_guard lock(_mutex); + const shared_lock_guard slg(_mutex_sectors); if (_sectors.empty()) { if (offset == 0) { return 0; @@ -300,7 +301,7 @@ off64_t CapioFile::seekData(off64_t offset) { off64_t CapioFile::seekHole(off64_t offset) const { START_LOG(gettid(), "call(offset=%ld)", offset); - std::lock_guard lock(_mutex); + const shared_lock_guard slg(_mutex_sectors); if (_sectors.empty()) { if (offset == 0) { return 0; @@ -326,7 +327,7 @@ off64_t CapioFile::seekHole(off64_t offset) const { } void CapioFile::removeFd(int tid, int fd) { - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex_threads_fd); if (const auto it = std::find(_threads_fd.begin(), _threads_fd.end(), std::make_pair(tid, fd)); it != _threads_fd.end()) { _threads_fd.erase(it); @@ -348,6 +349,7 @@ void CapioFile::readFromQueue(SPSCQueue &queue, size_t offset, long int num_byte } off64_t CapioFile::_getStoredSize() const { + const shared_lock_guard slg(_mutex_sectors); const auto it = _sectors.rbegin(); return (it == _sectors.rend()) ? 0 : it->second; } From c763778b65e9e8e8c661adabce677178b687a998 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 10:41:54 +0000 Subject: [PATCH 05/15] More tests --- capio/tests/unit/server/src/capio_file.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index 29bc25c79..d1725824c 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -12,6 +12,16 @@ TEST(ServerTest, TestInsertSingleSector) { EXPECT_NE(sectors.find({1L, 3L}), sectors.end()); } +TEST(ServerTest, TestBufferAllocation) { + CapioFile c_file; + EXPECT_TRUE(c_file.bufferToAllocate()); + c_file.createBuffer("test.dat", true); + EXPECT_FALSE(c_file.bufferToAllocate()); + EXPECT_EQ(c_file.getBufSize(), 0); + EXPECT_EQ(c_file.getRealFileSize(), 0); + EXPECT_NE(nullptr, c_file.getBuffer()); +} + TEST(ServerTest, TestInsertTwoNonOverlappingSectors) { CapioFile c_file; c_file.insertSector(5, 7); From 8921ccc686ff153690ba8c16e03b41498e43d5eb Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 10:45:10 +0000 Subject: [PATCH 06/15] More tests --- capio/tests/unit/server/src/capio_file.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index d1725824c..ff3e0c66f 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -176,6 +176,15 @@ TEST(ServerTest, TestCommitCapioFile) { std::filesystem::remove("test.dat"); } +TEST(ServerTest, TestDumpNotHomeNode) { + auto file = new CapioFile(false, true, 1000, 1); + file->createBuffer("test.dat", false); + file->close(); + file->dump(); + delete file; + EXPECT_FALSE(std::filesystem::exists("test.dat")); +} + TEST(ServerTest, TestCommitAndDeleteDirectory) { EXPECT_FALSE(std::filesystem::exists("mydir")); auto file = new CapioFile(true, true, 1000, 1); From d8bbc97f024264626c76961b488adb8f1ccf518e Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 10:57:43 +0000 Subject: [PATCH 07/15] More tests --- capio/server/src/capio_file.cpp | 2 -- capio/tests/unit/server/src/capio_file.cpp | 21 +++++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 789d2d34b..bb1956bd2 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -4,8 +4,6 @@ #include "server/include/utils/common.hpp" #include "utils/shared_mutex.hpp" -#include - bool CapioFile::compareSectors::operator()(const std::pair &lhs, const std::pair &rhs) const { return (lhs.first < rhs.first); diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index ff3e0c66f..65791528c 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -126,6 +126,27 @@ TEST(ServerTest, TestCapioFileWaitForDataMultithreaded) { t.join(); } +TEST(ServerTest, TestCapioFileWaitForDataMultithreadedWithCommit) { + CapioFile file; + + SPSCQueue queue("test_queue", get_cache_lines(), get_cache_line_size(), "test_wf"); + + std::mutex _lock; + _lock.lock(); + + std::thread t([&_lock, &file, &queue] { + _lock.lock(); + file.setCommitted(); + }); + + _lock.unlock(); + file.waitForData(1000); + + EXPECT_EQ(file.getFileSize(), 0); + + t.join(); +} + TEST(ServerTest, TestCapioFileWaitForCompletion) { CapioFile file; From f6a825772b0d776b60f5f3744a991c6457429aa3 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 11:09:33 +0000 Subject: [PATCH 08/15] todos --- capio/server/src/capio_file.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index bb1956bd2..d53f4140b 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -194,6 +194,10 @@ off64_t CapioFile::getStoredSize() const { return this->_getStoredSize(); } +// TODO: The logic of this function is most likely incorrect. Recheck it in the future. Also +// remember to check that the CapioFile::compareSectors::operator() method is wrong (but +// should be ok with this implementation) as the check should be: +// return (lhs.second < rhs.first); void CapioFile::insertSector(off64_t new_start, off64_t new_end) { START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); From dde269ec8263c582c6ab4e8774bddb2a75919bd2 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 11:18:53 +0000 Subject: [PATCH 09/15] Rewrote of insertSector for a better and correct version --- capio/server/src/capio_file.cpp | 81 ++++++++------------------------- 1 file changed, 20 insertions(+), 61 deletions(-) diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index d53f4140b..62ce05026 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -194,76 +194,35 @@ off64_t CapioFile::getStoredSize() const { return this->_getStoredSize(); } -// TODO: The logic of this function is most likely incorrect. Recheck it in the future. Also -// remember to check that the CapioFile::compareSectors::operator() method is wrong (but -// should be ok with this implementation) as the check should be: -// return (lhs.second < rhs.first); void CapioFile::insertSector(off64_t new_start, off64_t new_end) { START_LOG(gettid(), "call(new_start=%ld, new_end=%ld)", new_start, new_end); - - auto p = std::make_pair(new_start, new_end); std::lock_guard lock(_mutex_sectors); - if (_sectors.empty()) { - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - return; - } - if (auto it_lbound = _sectors.upper_bound(p); it_lbound == _sectors.begin()) { - if (new_end < it_lbound->first) { - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } else { - auto it = it_lbound; - bool end_before = false; - bool end_inside = false; - while (it != _sectors.end() && !end_before && !end_inside) { - end_before = p.second < it->first; - if (!end_before) { - end_inside = p.second <= it->second; - if (!end_inside) { - ++it; - } - } - } + // First sector whose start >= new_start + auto it_start = _sectors.lower_bound({new_start, LLONG_MIN}); - if (end_inside) { - p.second = it->second; - ++it; - } - _sectors.erase(it_lbound, it); - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); - } - } else { - --it_lbound; - auto it = it_lbound; - if (p.first <= it_lbound->second) { - // new sector starts inside a sector - p.first = it_lbound->first; - } else { // in this way the sector will not be deleted - ++it_lbound; - } - bool end_before = false; - bool end_inside = false; - while (it != _sectors.end() && !end_before && !end_inside) { - end_before = p.second < it->first; - if (!end_before) { - end_inside = p.second <= it->second; - if (!end_inside) { - ++it; - } - } + // Check if the previous sector reaches into our range + if (it_start != _sectors.begin()) { + if (const auto it_prev = std::prev(it_start); it_prev->second >= new_start) { + new_start = it_prev->first; + new_end = std::max(new_end, it_prev->second); + it_start = it_prev; } + } + + // First sector whose start > new_end (adjacent sectors included via +1) + const auto it_end = _sectors.upper_bound({new_end + 1, LLONG_MIN}); - if (end_inside) { - p.second = it->second; - ++it; + // Absorb the right edge of the last overlapping sector if needed + if (it_end != _sectors.begin()) { + if (const auto it_last = std::prev(it_end); it_last->second > new_end) { + new_end = it_last->second; } - _sectors.erase(it_lbound, it); - LOG("Insert sector <%ld, %ld>", p.first, p.second); - _sectors.insert(p); } + + _sectors.erase(it_start, it_end); + LOG("Insert sector <%ld, %ld>", new_start, new_end); + _sectors.emplace(new_start, new_end); } bool CapioFile::closed() const { return _n_close_expected == -1 || _n_close == _n_close_expected; } From 3f0da000e9fd992dd142fec06d4247fe34c93df5 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 11:33:34 +0000 Subject: [PATCH 10/15] Changed _n_close data type --- capio/server/include/storage/capio_file.hpp | 22 ++++++++++----------- capio/server/src/capio_file.cpp | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 6f20d2d32..a34a25b35 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -26,15 +26,15 @@ class CapioFile { const std::pair &rhs) const; }; - char *_buf = nullptr; ///< Raw pointer to memory buffer for file content - off64_t _buf_size = 0; ///< Allocated size of _buf - int _fd = -1; ///< File descriptor for permanent/mmap storage - int _n_links = 1; ///< Number of symbolic links to the file - const long int _n_close_expected = -1; ///< Target close() operations for commitment - std::atomic _n_close = 0; ///< Current count of close() operations - std::atomic _n_opens = 0; ///< Current count of open() operations - std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) - const int _n_files_expected = -1; ///< Target dirent64 count (if directory) + char *_buf = nullptr; ///< Raw pointer to memory buffer for file content + off64_t _buf_size = 0; ///< Allocated size of _buf + int _fd = -1; ///< File descriptor for permanent/mmap storage + int _n_links = 1; ///< Number of symbolic links to the file + const int _n_close_expected = -1; ///< Target close() operations for commitment + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) + const int _n_files_expected = -1; ///< Target dirent64 count (if directory) bool _home_node = false; ///< True if this is the home node const bool _directory = false; ///< True if this instance represents a directory @@ -82,7 +82,7 @@ class CapioFile { * @param n_close_expected Expected number of close calls. */ CapioFile(bool directory, int n_files_expected, bool permanent, off64_t init_size, - long int n_close_expected); + int n_close_expected); /** * @brief Standard constructor for files. @@ -91,7 +91,7 @@ class CapioFile { * @param init_size Initial buffer allocation size. * @param n_close_expected Expected number of close calls. */ - CapioFile(bool directory, bool permanent, off64_t init_size, long int n_close_expected); + CapioFile(bool directory, bool permanent, off64_t init_size, int n_close_expected); CapioFile(const CapioFile &) = delete; CapioFile &operator=(const CapioFile &) = delete; diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 62ce05026..dc8142652 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -12,12 +12,12 @@ bool CapioFile::compareSectors::operator()(const std::pair &lh CapioFile::CapioFile() = default; CapioFile::CapioFile(const bool directory, const int n_files_expected, const bool permanent, - const off64_t init_size, const long int n_close_expected) + const off64_t init_size, const int n_close_expected) : _buf_size(init_size), _n_close_expected(n_close_expected), _n_files_expected(n_files_expected + 2), _directory(directory), _permanent(permanent) {} CapioFile::CapioFile(const bool directory, const bool permanent, const off64_t init_size, - const long int n_close_expected) + const int n_close_expected) : _buf_size(init_size), _n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {} From 1990160782c1e491a03814d92b4a9de769e219e4 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 11:37:31 +0000 Subject: [PATCH 11/15] data types --- capio/server/include/storage/capio_file.hpp | 30 ++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index a34a25b35..3da502fec 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -26,21 +26,21 @@ class CapioFile { const std::pair &rhs) const; }; - char *_buf = nullptr; ///< Raw pointer to memory buffer for file content - off64_t _buf_size = 0; ///< Allocated size of _buf - int _fd = -1; ///< File descriptor for permanent/mmap storage - int _n_links = 1; ///< Number of symbolic links to the file - const int _n_close_expected = -1; ///< Target close() operations for commitment - std::atomic _n_close = 0; ///< Current count of close() operations - std::atomic _n_opens = 0; ///< Current count of open() operations - std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) - const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - - bool _home_node = false; ///< True if this is the home node - const bool _directory = false; ///< True if this instance represents a directory - const bool _permanent = false; ///< True if file persists after server exit - bool _committed = false; ///< True if file is finalized - std::atomic _first_write = true; ///< True if no data has been written yet + char *_buf = nullptr; ///< Raw pointer to memory buffer for file content + off64_t _buf_size = 0; ///< Allocated size of _buf + int _fd = -1; ///< File descriptor for permanent/mmap storage + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) + const int _n_files_expected = -1; ///< Target dirent64 count (if directory) + const int _n_close_expected = -1; ///< Target close() operations for commitment + + bool _home_node = false; ///< True if this is the home node + bool _committed = false; ///< True if file is finalized + const bool _directory = false; ///< True if this instance represents a directory + const bool _permanent = false; ///< True if file persists after server exit + + std::atomic _first_write = true; ///< True if no data has been written yet /// @brief Set of [start, end] pairs representing valid data regions std::set, compareSectors> _sectors; From f07126f339e36d0cc9346dbdc0da47238d3d13dd Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 11:42:28 +0000 Subject: [PATCH 12/15] Reverted data types --- capio/server/include/storage/capio_file.hpp | 16 ++++++++-------- capio/server/src/capio_file.cpp | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 3da502fec..c42c312fd 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -26,14 +26,14 @@ class CapioFile { const std::pair &rhs) const; }; - char *_buf = nullptr; ///< Raw pointer to memory buffer for file content - off64_t _buf_size = 0; ///< Allocated size of _buf - int _fd = -1; ///< File descriptor for permanent/mmap storage - std::atomic _n_close = 0; ///< Current count of close() operations - std::atomic _n_opens = 0; ///< Current count of open() operations - std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) - const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - const int _n_close_expected = -1; ///< Target close() operations for commitment + char *_buf = nullptr; ///< Raw pointer to memory buffer for file content + off64_t _buf_size = 0; ///< Allocated size of _buf + int _fd = -1; ///< File descriptor for permanent/mmap storage + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) + const int _n_files_expected = -1; ///< Target dirent64 count (if directory) + const int _n_close_expected = -1; ///< Target close() operations for commitment bool _home_node = false; ///< True if this is the home node bool _committed = false; ///< True if file is finalized diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index dc8142652..2fec9954b 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -13,8 +13,8 @@ CapioFile::CapioFile() = default; CapioFile::CapioFile(const bool directory, const int n_files_expected, const bool permanent, const off64_t init_size, const int n_close_expected) - : _buf_size(init_size), _n_close_expected(n_close_expected), - _n_files_expected(n_files_expected + 2), _directory(directory), _permanent(permanent) {} + : _buf_size(init_size), _n_files_expected(n_files_expected + 2), + _n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {} CapioFile::CapioFile(const bool directory, const bool permanent, const off64_t init_size, const int n_close_expected) From a53bb52d22569149c4d2897054d8ee72a6c0d2ce Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 11:45:25 +0000 Subject: [PATCH 13/15] Added TODO to convert atomic to atomic --- capio/server/include/storage/capio_file.hpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index c42c312fd..35ff7a843 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -26,14 +26,16 @@ class CapioFile { const std::pair &rhs) const; }; - char *_buf = nullptr; ///< Raw pointer to memory buffer for file content - off64_t _buf_size = 0; ///< Allocated size of _buf - int _fd = -1; ///< File descriptor for permanent/mmap storage - std::atomic _n_close = 0; ///< Current count of close() operations - std::atomic _n_opens = 0; ///< Current count of open() operations - std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) - const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - const int _n_close_expected = -1; ///< Target close() operations for commitment + char *_buf = nullptr; ///< Raw pointer to memory buffer for file content + off64_t _buf_size = 0; ///< Allocated size of _buf + int _fd = -1; ///< File descriptor for permanent/mmap storage + + // TODO: check if it is possible to move from int to unsigned int + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) + const int _n_files_expected = -1; ///< Target dirent64 count (if directory) + const int _n_close_expected = -1; ///< Target close() operations for commitment bool _home_node = false; ///< True if this is the home node bool _committed = false; ///< True if file is finalized From 5883993652608d868c3a3f4bc89cb6aa521f49d6 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 12:01:48 +0000 Subject: [PATCH 14/15] More tests and more comments with future todos --- capio/server/src/capio_file.cpp | 7 +++-- capio/tests/unit/server/src/capio_file.cpp | 34 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 2fec9954b..ae6367092 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -153,10 +153,12 @@ char *CapioFile::expandBuffer(const off64_t data_size) { return new_buf; } +// TODO: This is a memory leak that may lead to race conditions. remove exposure of internal buffer char *CapioFile::getBuffer() const { return _buf; } off64_t CapioFile::getBufSize() const { return _buf_size; } +// TODO: This is a memory leak that may lead to race conditions. remove exposure of internal buffer const std::vector> &CapioFile::getFds() const { return _threads_fd; } off64_t CapioFile::getFileSize() const { @@ -184,6 +186,7 @@ off64_t CapioFile::getSectorEnd(off64_t offset) const { return sector_end; } +// TODO: This is a memory leak that may lead to race conditions. remove exposure of internal buffer const std::set, CapioFile::compareSectors> & CapioFile::getSectors() const { return _sectors; @@ -296,7 +299,7 @@ void CapioFile::removeFd(int tid, int fd) { } void CapioFile::readFromNode(const std::string &dest, off64_t offset, off64_t buffer_size) const { - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex); backend->recv_file(_buf + offset, dest, buffer_size); _data_avail_cv.notify_all(); } @@ -304,7 +307,7 @@ void CapioFile::readFromNode(const std::string &dest, off64_t offset, off64_t bu void CapioFile::readFromQueue(SPSCQueue &queue, size_t offset, long int num_bytes) const { START_LOG(gettid(), "call()"); - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex); queue.read(_buf + offset, num_bytes); _data_avail_cv.notify_all(); } diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index 65791528c..b95b2991d 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -1,5 +1,6 @@ #include "server/include/storage/capio_file.hpp" #include "common/env.hpp" +#include "remote/backend.hpp" #include #include @@ -315,3 +316,36 @@ TEST(ServerTest, TestFileSetCommitToFalse) { file.setCommitted(false); EXPECT_FALSE(file.isCommitted()); } + +class MockBackend : public Backend { + public: + void recv_file(char *shm, const std::string &source, const long int bytes_expected) override { + for (std::size_t i = 0; i < bytes_expected; ++i) { + shm[i] = 33 + (i % 93); + } + } + + const std::set get_nodes() override { return {}; } + void handshake_servers() override {} + RemoteRequest read_next_request() override { return {nullptr, ""}; } + void send_file(char *shm, long int nbytes, const std::string &target) override {} + void send_request(const char *message, int message_len, const std::string &target) override {} +}; + +TEST(ServerTest, TestReadFromNodeMockBackend) { + + backend = new MockBackend(); + + CapioFile file1; + file1.createBuffer("testDir", true); + file1.expandBuffer(1000); + file1.registerFirstWrite(); + file1.insertSector(0, 1000); + + file1.readFromNode("none", 0, 1000); + + const auto buf = file1.getBuffer(); + for (std::size_t i = 0; i < 1000; ++i) { + EXPECT_EQ(buf[i], 33 + (i % 93)); + } +} From 0b0e8e08072c1857184f0d84e6ace4ef42bee997 Mon Sep 17 00:00:00 2001 From: = <=> Date: Thu, 19 Mar 2026 13:02:08 +0000 Subject: [PATCH 15/15] Added missing tests --- capio/server/src/capio_file.cpp | 14 ++++++++------ capio/tests/unit/server/src/capio_file.cpp | 13 +++++++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index ae6367092..45baf5c40 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -173,19 +173,21 @@ off64_t CapioFile::getFileSize() const { off64_t CapioFile::getSectorEnd(off64_t offset) const { START_LOG(gettid(), "call(offset=%ld)", offset); - off64_t sector_end = -1; const shared_lock_guard slg(_mutex_sectors); - if (auto it = _sectors.upper_bound(std::make_pair(offset, 0)); - !_sectors.empty() && it != _sectors.begin()) { + + if (_sectors.empty()) { + return -1; + } + + if (auto it = _sectors.upper_bound(std::make_pair(offset, 0)); it != _sectors.begin()) { --it; if (offset <= it->second) { - sector_end = it->second; + return it->second; } } - return sector_end; + return -1; } - // TODO: This is a memory leak that may lead to race conditions. remove exposure of internal buffer const std::set, CapioFile::compareSectors> & CapioFile::getSectors() const { diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index b95b2991d..7742b865c 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -348,4 +348,17 @@ TEST(ServerTest, TestReadFromNodeMockBackend) { for (std::size_t i = 0; i < 1000; ++i) { EXPECT_EQ(buf[i], 33 + (i % 93)); } + + delete backend; +} + +TEST(ServerTest, TestGetSectorEnd) { + CapioFile file; + + EXPECT_EQ(file.getSectorEnd(1234), -1); + + file.insertSector(0, 1234); + + EXPECT_EQ(file.getSectorEnd(120), 1234); + EXPECT_EQ(file.getSectorEnd(12000), -1); }