From 06454907ff603d0d48d3b4301e9f532a8f96c008 Mon Sep 17 00:00:00 2001 From: = <=> Date: Mon, 23 Mar 2026 14:41:30 +0000 Subject: [PATCH 1/6] Added directory streaming feature --- capio/posix/utils/cache.hpp | 3 ++- capio/server/include/storage/capio_file.hpp | 2 +- capio/server/src/capio_file.cpp | 7 +++++-- capio/server/src/storage_manager.cpp | 3 ++- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/capio/posix/utils/cache.hpp b/capio/posix/utils/cache.hpp index d7c7e4307..597d644a5 100644 --- a/capio/posix/utils/cache.hpp +++ b/capio/posix/utils/cache.hpp @@ -70,7 +70,8 @@ class ReadCache { _read(buffer, remaining_bytes); buffer = reinterpret_cast(buffer) + remaining_bytes; - if (read_size > _max_line_size) { + // NOTE: if getdents send a request for exactly the correct amount of data. + if (read_size > _max_line_size || is_getdents) { LOG("count - remaining_bytes %ld > _max_line_size %ld", read_size, _max_line_size); LOG("Reading exactly requested size"); off64_t end_of_read = is_getdents ? getdents_request(fd, read_size, is64bit, _tid) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 35ff7a843..9d4fd4637 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -35,7 +35,7 @@ class CapioFile { std::atomic _n_opens = 0; ///< Current count of open() operations std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - const int _n_close_expected = -1; ///< Target close() operations for commitment + const int _n_close_expected = 0; ///< Target close() operations for commitment bool _home_node = false; ///< True if this is the home node bool _committed = false; ///< True if file is finalized diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 45baf5c40..1595f0652 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -230,7 +230,7 @@ void CapioFile::insertSector(off64_t new_start, off64_t new_end) { _sectors.emplace(new_start, new_end); } -bool CapioFile::closed() const { return _n_close_expected == -1 || _n_close == _n_close_expected; } +bool CapioFile::closed() const { return _n_close_expected <= 0 || _n_close == _n_close_expected; } bool CapioFile::deletable() const { return _n_opens <= 0; } @@ -333,7 +333,10 @@ bool CapioFile::isFirstWrite() const { return this->_first_write; } void CapioFile::registerFirstWrite() { this->_first_write = false; } -void CapioFile::incrementDirectoryFileCount(const int count) { this->_n_files += count; } +void CapioFile::incrementDirectoryFileCount(const int count) { + this->_n_files += count; + this->_data_avail_cv.notify_all(); +} int CapioFile::getCurrentDirectoryFileCount() const { return this->_n_files; } diff --git a/capio/server/src/storage_manager.cpp b/capio/server/src/storage_manager.cpp index 93639cfc1..79e409e53 100644 --- a/capio/server/src/storage_manager.cpp +++ b/capio/server/src/storage_manager.cpp @@ -61,7 +61,8 @@ void StorageManager::addDirectoryEntry(const pid_t tid, const std::filesystem::p c_file.insertSector(base_offset, data_size); c_file.incrementDirectoryFileCount(); client_manager->registerProducedFile(tid, dir); - if (c_file.getCurrentDirectoryFileCount() == c_file.getDirectoryExpectedFileCount()) { + if (c_file.getCurrentDirectoryFileCount() == c_file.getDirectoryExpectedFileCount()&& + CapioCLEngine::get().getCommitRule(file_path) == capiocl::commitRules::ON_N_FILES) { c_file.setCommitted(); } } From 3c5395be12d24357a8a7003a7965165091222010 Mon Sep 17 00:00:00 2001 From: = <=> Date: Mon, 23 Mar 2026 16:22:15 +0000 Subject: [PATCH 2/6] Format --- .github/workflows/ci-tests.yaml | 1 + capio/server/include/storage/capio_file.hpp | 2 +- capio/server/src/storage_manager.cpp | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 12812f74b..64a56db95 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -233,6 +233,7 @@ jobs: --xml coverage.xml \ --gcov-executable gcov \ --exclude capio/tests \ + --gcov-ignore-parse-errors=negative_hits.warn \ ../build - name: "Compute Valid Artifact Name" diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 9d4fd4637..2859d7b5a 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -35,7 +35,7 @@ class CapioFile { std::atomic _n_opens = 0; ///< Current count of open() operations std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - const int _n_close_expected = 0; ///< Target close() operations for commitment + const int _n_close_expected = 0; ///< Target close() operations for commitment bool _home_node = false; ///< True if this is the home node bool _committed = false; ///< True if file is finalized diff --git a/capio/server/src/storage_manager.cpp b/capio/server/src/storage_manager.cpp index 79e409e53..ef193a942 100644 --- a/capio/server/src/storage_manager.cpp +++ b/capio/server/src/storage_manager.cpp @@ -61,7 +61,7 @@ void StorageManager::addDirectoryEntry(const pid_t tid, const std::filesystem::p c_file.insertSector(base_offset, data_size); c_file.incrementDirectoryFileCount(); client_manager->registerProducedFile(tid, dir); - if (c_file.getCurrentDirectoryFileCount() == c_file.getDirectoryExpectedFileCount()&& + if (c_file.getCurrentDirectoryFileCount() == c_file.getDirectoryExpectedFileCount() && CapioCLEngine::get().getCommitRule(file_path) == capiocl::commitRules::ON_N_FILES) { c_file.setCommitted(); } From fd29010b7845cffd3b5a9e2c4e002c85bf79cf98 Mon Sep 17 00:00:00 2001 From: = <=> Date: Mon, 23 Mar 2026 17:45:27 +0000 Subject: [PATCH 3/6] Added regression unit test --- capio/tests/unit/server/src/capio_file.cpp | 62 +++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index 7742b865c..96c921b8a 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -1,6 +1,10 @@ -#include "server/include/storage/capio_file.hpp" +#include "storage/capio_file.hpp" +#include "common/dirent.hpp" #include "common/env.hpp" #include "remote/backend.hpp" +#include "storage/manager.hpp" + +extern StorageManager *storage_manager; #include #include @@ -362,3 +366,59 @@ TEST(ServerTest, TestGetSectorEnd) { EXPECT_EQ(file.getSectorEnd(120), 1234); EXPECT_EQ(file.getSectorEnd(12000), -1); } + +TEST(ServerTest, TestSimulateDirectorySrteaming) { + + constexpr int NUM_FILES_EXPECTED = 10; + + const std::filesystem::path CAPIO_DIR = "/tmp"; + const std::filesystem::path stream_directory = CAPIO_DIR / "my_streaming_directory"; + + setenv("CAPIO_DIR", CAPIO_DIR.c_str(), 1); + storage_manager->addDirectory(1234, CAPIO_DIR); + storage_manager->addDirectory(1234, stream_directory); + + std::mutex mutex_continue; + + std::thread t([&] { + for (auto i = 0; i < NUM_FILES_EXPECTED; ++i) { + mutex_continue.lock(); + const std::string filename = "file." + std::to_string(i); + storage_manager->updateDirectory(1234, stream_directory / filename); + } + }); + + const auto &file = storage_manager->get(stream_directory); + + long current_offset = 0; + + linux_dirent64 dirent{}; + + file.waitForData(current_offset + sizeof(linux_dirent64)); + memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64)); + current_offset += sizeof(linux_dirent64); + EXPECT_EQ(strcmp(dirent.d_name, "."), 0); + bzero(&dirent, sizeof(linux_dirent64)); + + file.waitForData(current_offset + sizeof(linux_dirent64)); + memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64)); + current_offset += sizeof(linux_dirent64); + EXPECT_EQ(strcmp(dirent.d_name, ".."), 0); + bzero(&dirent, sizeof(linux_dirent64)); + + for (auto i = 0; i < NUM_FILES_EXPECTED; ++i) { + + file.waitForData(current_offset + sizeof(linux_dirent64)); + memcpy(&dirent, file.getBuffer() + current_offset, sizeof(linux_dirent64)); + const std::string expected_filename = "file." + std::to_string(i); + EXPECT_EQ(strcmp(dirent.d_name, expected_filename.c_str()), 0); + bzero(&dirent, sizeof(linux_dirent64)); + mutex_continue.unlock(); + current_offset += sizeof(linux_dirent64); + } + + t.join(); + + storage_manager->remove(stream_directory); + storage_manager->remove(CAPIO_DIR); +} \ No newline at end of file From 3c7287389ab0d77e77c4a3c410ce9e975612163a Mon Sep 17 00:00:00 2001 From: = <=> Date: Mon, 23 Mar 2026 19:19:29 +0000 Subject: [PATCH 4/6] Made _expected_n_close unsigned --- capio/server/include/storage/capio_file.hpp | 12 ++++++------ capio/server/src/capio_file.cpp | 2 +- capio/tests/unit/server/src/capio_file.cpp | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index 2859d7b5a..d8648f7f3 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -31,11 +31,11 @@ class CapioFile { int _fd = -1; ///< File descriptor for permanent/mmap storage // TODO: check if it is possible to move from int to unsigned int - std::atomic _n_close = 0; ///< Current count of close() operations - std::atomic _n_opens = 0; ///< Current count of open() operations - std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) - const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - const int _n_close_expected = 0; ///< Target close() operations for commitment + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) + const int _n_files_expected = -1; ///< Target dirent64 count (if directory) + const unsigned int _n_close_expected = 0; ///< Target close() operations for commitment bool _home_node = false; ///< True if this is the home node bool _committed = false; ///< True if file is finalized @@ -93,7 +93,7 @@ class CapioFile { * @param init_size Initial buffer allocation size. * @param n_close_expected Expected number of close calls. */ - CapioFile(bool directory, bool permanent, off64_t init_size, int n_close_expected); + CapioFile(bool directory, bool permanent, off64_t init_size, unsigned int n_close_expected); CapioFile(const CapioFile &) = delete; CapioFile &operator=(const CapioFile &) = delete; diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 1595f0652..23f7341e9 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -17,7 +17,7 @@ CapioFile::CapioFile(const bool directory, const int n_files_expected, const boo _n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {} CapioFile::CapioFile(const bool directory, const bool permanent, const off64_t init_size, - const int n_close_expected) + const unsigned int n_close_expected) : _buf_size(init_size), _n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {} diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index 96c921b8a..834b04d24 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -238,7 +238,7 @@ TEST(ServerTest, TesMemcpyCapioFile) { } TEST(ServerTest, TestCloseCapioFile) { - CapioFile file(false, false, 0, -1); + CapioFile file(false, false, 0, 0); EXPECT_TRUE(file.closed()); // TEST for n_close_expected == -1 CapioFile file1(false, false, 0, 10); From 41f8e3306f70179c7410a98894fc88a914f8445e Mon Sep 17 00:00:00 2001 From: = <=> Date: Mon, 23 Mar 2026 19:29:49 +0000 Subject: [PATCH 5/6] Remooved last signed int from CapioFile --- capio/server/include/storage/capio_file.hpp | 14 +++++++------- capio/server/src/capio_file.cpp | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/capio/server/include/storage/capio_file.hpp b/capio/server/include/storage/capio_file.hpp index d8648f7f3..d9295d420 100644 --- a/capio/server/include/storage/capio_file.hpp +++ b/capio/server/include/storage/capio_file.hpp @@ -31,11 +31,11 @@ class CapioFile { int _fd = -1; ///< File descriptor for permanent/mmap storage // TODO: check if it is possible to move from int to unsigned int - std::atomic _n_close = 0; ///< Current count of close() operations - std::atomic _n_opens = 0; ///< Current count of open() operations - std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) - const int _n_files_expected = -1; ///< Target dirent64 count (if directory) - const unsigned int _n_close_expected = 0; ///< Target close() operations for commitment + std::atomic _n_close = 0; ///< Current count of close() operations + std::atomic _n_opens = 0; ///< Current count of open() operations + std::atomic _n_files = 0; ///< Count of dirent64 stored (if directory) + const unsigned int _n_files_expected = 0; ///< Target dirent64 count (if directory) + const unsigned int _n_close_expected = 0; ///< Target close() operations for commitment bool _home_node = false; ///< True if this is the home node bool _committed = false; ///< True if file is finalized @@ -83,7 +83,7 @@ class CapioFile { * @param init_size Initial buffer allocation size. * @param n_close_expected Expected number of close calls. */ - CapioFile(bool directory, int n_files_expected, bool permanent, off64_t init_size, + CapioFile(bool directory, unsigned int n_files_expected, bool permanent, off64_t init_size, int n_close_expected); /** @@ -228,7 +228,7 @@ class CapioFile { [[nodiscard]] int getCurrentDirectoryFileCount() const; /** @return Expected total files in this directory. */ - [[nodiscard]] int getDirectoryExpectedFileCount() const; + [[nodiscard]] unsigned int getDirectoryExpectedFileCount() const; /** @return Reference to the internal sector map. */ [[nodiscard]] const std::set, compareSectors> &getSectors() const; diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index 23f7341e9..a871c796b 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -11,8 +11,8 @@ bool CapioFile::compareSectors::operator()(const std::pair &lh CapioFile::CapioFile() = default; -CapioFile::CapioFile(const bool directory, const int n_files_expected, const bool permanent, - const off64_t init_size, const int n_close_expected) +CapioFile::CapioFile(const bool directory, const unsigned int n_files_expected, + const bool permanent, const off64_t init_size, const int n_close_expected) : _buf_size(init_size), _n_files_expected(n_files_expected + 2), _n_close_expected(n_close_expected), _directory(directory), _permanent(permanent) {} @@ -340,4 +340,4 @@ void CapioFile::incrementDirectoryFileCount(const int count) { int CapioFile::getCurrentDirectoryFileCount() const { return this->_n_files; } -int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } +unsigned int CapioFile::getDirectoryExpectedFileCount() const { return this->_n_files_expected; } From 9c1504c917f8e8e57bdd8a1554afc14785dca942 Mon Sep 17 00:00:00 2001 From: = <=> Date: Tue, 24 Mar 2026 07:53:52 +0000 Subject: [PATCH 6/6] format --- capio/server/src/capio_file.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/capio/server/src/capio_file.cpp b/capio/server/src/capio_file.cpp index a871c796b..855dda932 100644 --- a/capio/server/src/capio_file.cpp +++ b/capio/server/src/capio_file.cpp @@ -1,4 +1,5 @@ #include "server/include/storage/capio_file.hpp" + #include "common/logger.hpp" #include "remote/backend.hpp" #include "server/include/utils/common.hpp"