Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
efd659a
Fixed mechanism to send memory storage regex
Apr 9, 2025
498bda5
Fixed mechanism to send memory storage regex
Apr 10, 2025
b8fd13c
Added flag to store everything in memory
Apr 24, 2025
bf1d4ba
Added request for memory file in init_client()
Apr 24, 2025
a59f2b9
fixed an issue in return value for posix in memory write handler, and…
Apr 25, 2025
a260dde
Fixed write in memory for simple test
Apr 25, 2025
7194357
began work on integrating read from memory
Apr 25, 2025
3df961d
Fixes on deadlock when reading in memory
Apr 27, 2025
0eb705a
Continuing work on fixing read in memory. now segfaults when server r…
Apr 28, 2025
2e02f26
Fixed in memory read
Apr 28, 2025
cb068c5
Bug: data is being messed up somewhere. find where
Apr 28, 2025
d9f644e
Fix on writev
Apr 29, 2025
69638d0
Fix on cache wrong bytes. TODO: fix wrong read size
Apr 30, 2025
ad61cf7
Completed IO operations on single machine
Apr 30, 2025
ac1240c
Added test for read and write in memory file. broken when reaching 1.…
Apr 30, 2025
9abf0ff
Bugfix on offset calculation in capio memory file
May 2, 2025
36bb4d6
Fixed reads on different mem pages
May 2, 2025
6221bd0
Fixed custom cache line size issues
May 2, 2025
0685915
Fixes
May 2, 2025
739ae8b
Improved test with bandwidth data
May 5, 2025
a8fa522
Performance improvement for data transfer
May 5, 2025
a095f7d
Fixes
May 6, 2025
f7bc6bf
code cleanup
May 6, 2025
c9ab329
Added tests
May 6, 2025
2718483
fix
May 6, 2025
4dbaff1
fix
May 6, 2025
94811b1
test
May 6, 2025
1025ad0
test
May 6, 2025
1f88af1
Investigating test failure
May 7, 2025
d695ea2
code cleanup
May 12, 2025
c9cb02d
code cleanup
May 12, 2025
c4a85d2
code cleanup
May 12, 2025
e2b0460
code cleanup
May 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ on:
- capio-v2
concurrency:
group: build-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
codespell-check:
name: "Check codespell conformance"
runs-on: ubuntu-22.04
continue-on-error: true
steps:
- uses: actions/checkout@v4
- name: "Run codespell"
Expand Down Expand Up @@ -75,13 +75,14 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: "Run clang-format style check"
uses: jidicula/clang-format-action@v4.13.0
uses: jidicula/clang-format-action@v4.15.0
with:
clang-format-version: "16"
clang-format-version: "20"
check-path: "${{ matrix.path }}"
unit-tests:
name: "Build ${{ matrix.build_type }} with ${{ matrix.cxx }}"
runs-on: ubuntu-22.04
continue-on-error: true
strategy:
matrix:
build_type:
Expand Down Expand Up @@ -170,6 +171,12 @@ jobs:
capio_posix_unit_tests \
--gtest_break_on_failure \
--gtest_print_time=1

echo "Run CAPIO memory file integration tests"
LD_PRELOAD=libcapio_posix.so \
capio_memory_file_unit_tests \
--gtest_break_on_failure \
--gtest_print_time=1

- name: "Show client logs on failure"
if: ${{ always() && steps.run-tests.outcome == 'failure' && matrix.build_type == 'Debug' }}
Expand Down
5 changes: 4 additions & 1 deletion src/common/capio/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ constexpr char CAPIO_SHM_CANARY_ERROR[] =
// CAPIO communication constants
constexpr int CAPIO_REQ_BUFF_CNT = 512; // Max number of elements inside buffers
constexpr int CAPIO_CACHE_LINES_DEFAULT = 10;
constexpr int CAPIO_CACHE_LINE_SIZE_DEFAULT = 4096;
constexpr int CAPIO_CACHE_LINE_SIZE_DEFAULT = 32768; // 32K of default size for cache lines
// TODO: use that in communication only uses the file descriptor instead of the path to save on the
// PATH_MAX
constexpr size_t CAPIO_REQ_MAX_SIZE = (PATH_MAX + 256) * sizeof(char);
Expand Down Expand Up @@ -144,6 +144,9 @@ constexpr char CAPIO_SERVER_ARG_PARSER_BACKEND_PORT_OPT_HELP[] =
"A valid PORT for the Communication backend";
constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_NO_CONF_FILE_HELP[] =
"If specified, server application will start without a config file, using default settings.";
constexpr char CAPIO_SERVER_ARG_PARSER_MEM_STORAGE_ONLY_HELP[] =
"If set, all files will be stored inside the home node server memory and never on file system "
"(unless memory limit is reached, or server instance terminates).";
constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_NCONTINUE_ON_ERROR_HELP[] =
"If specified, Capio will try to continue its execution to continue even if it has reached a "
"fatal termination point. This flag should be used only to debug capio. If this flag is "
Expand Down
7 changes: 5 additions & 2 deletions src/common/capio/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
* @tparam Mutex Type of semaphore
*/
template <class T, class Mutex> class Queue {
private:
void *_shm;
const long int _max_num_elems, _elem_size; // elements size in bytes
long int _buff_size; // buffer size in bytes
Expand Down Expand Up @@ -83,6 +82,7 @@ template <class T, class Mutex> class Queue {

Queue(const Queue &) = delete;
Queue &operator=(const Queue &) = delete;

~Queue() {
START_LOG(capio_syscall(SYS_gettid),
"call(_shm_name=%s, _first_elem_name=%s, _last_elem_name=%s)", _shm_name.c_str(),
Expand All @@ -93,8 +93,11 @@ template <class T, class Mutex> class Queue {
syscall_no_intercept_flag = true;
#endif
SHM_DESTROY_CHECK(_shm_name.c_str());
LOG("Removed %s", _shm_name.c_str());
SHM_DESTROY_CHECK(_first_elem_name.c_str());
LOG("Removed %s", _first_elem_name.c_str());
SHM_DESTROY_CHECK(_last_elem_name.c_str());
LOG("Removed %s", _last_elem_name.c_str());
#ifdef __CAPIO_POSIX
syscall_no_intercept_flag = false;
#endif
Expand Down Expand Up @@ -176,4 +179,4 @@ template <class T> using CircularBuffer = Queue<T, NamedSemaphore>;

// Single Producer Single Consumer queue
using SPSCQueue = Queue<char, NoLock>;
#endif // CAPIO_QUEUE_HPP
#endif // CAPIO_QUEUE_HPP
5 changes: 4 additions & 1 deletion src/common/capio/semaphore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class NoLock {

NoLock(const NoLock &) = delete;
NoLock &operator=(const NoLock &) = delete;
~NoLock() = default;

~NoLock() { START_LOG(capio_syscall(SYS_gettid), "call()"); };

static inline void lock() { START_LOG(capio_syscall(SYS_gettid), "call()"); };

Expand Down Expand Up @@ -58,6 +59,7 @@ class NamedSemaphore {

NamedSemaphore(const NamedSemaphore &) = delete;
NamedSemaphore &operator=(const NamedSemaphore &) = delete;

~NamedSemaphore() {
START_LOG(capio_syscall(SYS_gettid), "call()");
if (_require_cleanup) {
Expand Down Expand Up @@ -125,6 +127,7 @@ class Semaphore {

Semaphore(const Semaphore &) = delete;
Semaphore &operator=(const Semaphore &) = delete;

~Semaphore() {
START_LOG(capio_syscall(SYS_gettid), "call()");
if (_require_cleanup) {
Expand Down
1 change: 1 addition & 0 deletions src/posix/handlers/close.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ int close_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar

if (exists_capio_fd(fd)) {
close_request(get_capio_fd_path(fd), tid);
write_request_cache_mem->flush();
delete_capio_fd(fd);
}

Expand Down
17 changes: 9 additions & 8 deletions src/posix/handlers/exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,28 @@ int exit_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg
auto tid = static_cast<pid_t>(syscall_no_intercept(SYS_gettid));
START_LOG(tid, "call()");

syscall_no_intercept_flag = true;
LOG("syscall_no_intercept_flag = true");

if (is_capio_tid(tid)) {
LOG("Thread %d is a CAPIO thread: clean up", tid);
exit_group_request(tid);
remove_capio_tid(tid);
}

delete_caches();
LOG("Removed caches");

if (const auto itm = bufs_response->find(tid); itm != bufs_response->end()) {
delete itm->second;
bufs_response->erase(tid);
LOG("Removed response buffer");
}

delete_caches();
LOG("Removed caches");

delete stc_queue;
delete cts_queue;
LOG("Removed data queues");
syscall_no_intercept_flag = false;
LOG("syscall_no_intercept_flag = false");

return CAPIO_POSIX_SYSCALL_SKIP;
}

#endif // SYS_exit || SYS_exit_group
#endif // CAPIO_POSIX_HANDLERS_EXIT_GROUP_HPP
#endif // CAPIO_POSIX_HANDLERS_EXIT_GROUP_HPP
8 changes: 7 additions & 1 deletion src/posix/handlers/open.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ int open_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg
LOG("not O_CREAT");
open_request(-1, path.data(), tid);
}
} else {
LOG("Not a CAPIO path. skipping...");
return CAPIO_POSIX_SYSCALL_REQUEST_SKIP;
}

int fd = static_cast<int>(syscall_no_intercept(SYS_open, arg0, arg1, arg2, arg3, arg4, arg5));
Expand Down Expand Up @@ -112,6 +115,9 @@ int openat_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a
LOG("not O_CREAT");
open_request(-1, path.data(), tid);
}
} else {
LOG("Not a CAPIO path. skipping...");
return CAPIO_POSIX_SYSCALL_REQUEST_SKIP;
}

int fd = static_cast<int>(syscall_no_intercept(SYS_openat, arg0, arg1, arg2, arg3, arg4, arg5));
Expand All @@ -127,4 +133,4 @@ int openat_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a
}
#endif // SYS_openat

#endif // CAPIO_POSIX_HANDLERS_OPENAT_HPP
#endif // CAPIO_POSIX_HANDLERS_OPENAT_HPP
47 changes: 38 additions & 9 deletions src/posix/handlers/read.hpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
#ifndef CAPIO_POSIX_HANDLERS_READ_HPP
#define CAPIO_POSIX_HANDLERS_READ_HPP

#if defined(SYS_read)
int read_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg5, long *result) {
int fd = static_cast<int>(arg0);
auto count = static_cast<capio_off64_t>(arg2);
auto tid = static_cast<pid_t>(syscall_no_intercept(SYS_gettid));

START_LOG(capio_syscall(SYS_gettid), "call(fd=%d, tid=%d, count=%ld)", fd, tid, count);

inline off64_t capio_read_fs(int fd, size_t count, pid_t tid) {
START_LOG(capio_syscall(SYS_gettid), "call(fd=%d, count=%ld, tid=%ld)", fd, count, tid);
if (exists_capio_fd(fd)) {
auto computed_offset = get_capio_fd_offset(fd) + count;

Expand All @@ -21,6 +15,41 @@ int read_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg
}
return CAPIO_POSIX_SYSCALL_REQUEST_SKIP;
}

inline off64_t capio_read_mem(int fd, size_t count, void *buffer, long *result) {
START_LOG(capio_syscall(SYS_gettid), "call(fd=%d, count=%ld)", fd, count);
if (exists_capio_fd(fd)) {
auto computed_offset = get_capio_fd_offset(fd) + count;

LOG("Handling read on file %s up to byte %ld", get_capio_fd_path(fd).c_str(),
computed_offset);

*result = read_request_cache_mem->read(fd, buffer, count);
LOG("Result of read is %lu", *result);
return CAPIO_POSIX_SYSCALL_SUCCESS;
}
return CAPIO_POSIX_SYSCALL_REQUEST_SKIP;
}

#if defined(SYS_read)
int read_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg5, long *result) {
int fd = static_cast<int>(arg0);
auto count = static_cast<capio_off64_t>(arg2);
auto buffer = reinterpret_cast<void *>(arg1);
auto tid = static_cast<pid_t>(syscall_no_intercept(SYS_gettid));

START_LOG(capio_syscall(SYS_gettid), "call(fd=%d, tid=%d, count=%ld)", fd, tid, count);
if (exists_capio_fd(fd)) {
auto read_result = store_file_in_memory(get_capio_fd_path(fd), tid)
? capio_read_mem(fd, count, buffer, result)
: capio_read_fs(fd, count, tid);

LOG("read result: %ld", read_result);
return read_result;
}
LOG("Not a CAPIO fd... skipping...");
return CAPIO_POSIX_SYSCALL_REQUEST_SKIP;
}
#endif // SYS_read

#if defined(SYS_readv)
Expand All @@ -40,4 +69,4 @@ int readv_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar
}
#endif // SYS_readv

#endif // CAPIO_POSIX_HANDLERS_READ_HPP
#endif // CAPIO_POSIX_HANDLERS_READ_HPP
39 changes: 30 additions & 9 deletions src/posix/handlers/write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ inline off64_t capio_write_fs(int fd, capio_off64_t count, pid_t tid) {
return CAPIO_POSIX_SYSCALL_REQUEST_SKIP;
}

inline off64_t capio_write_mem(int fd, char *buffer, capio_off64_t count, pid_t tid) {
inline off64_t capio_writev_fs(int fd, char *buffer, off64_t count, pid_t tid) {
START_LOG(tid, "Handling FS write within writev");
capio_write_fs(fd, count, tid);
const off64_t write_count = syscall_no_intercept(SYS_write, fd, buffer, count);
LOG("Wrote %ld bytes", write_count);
return write_count;
}

inline off64_t capio_write_mem(int fd, char *buffer, capio_off64_t count, pid_t tid) {
START_LOG(tid, "call(fd=%d, count=%ld)", fd, count);
return 0;
write_request_cache_mem->write(fd, buffer, count);
return count;
}

#if defined(SYS_write)
Expand All @@ -34,28 +42,41 @@ int write_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar
? capio_write_mem(fd, buffer, count, tid)
: capio_write_fs(fd, count, tid);

LOG("Write result: %ld", write_result);

return posix_return_value(write_result, result);
}
#endif // SYS_write

#if defined(SYS_writev)
int writev_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg5, long *result) {
auto fd = static_cast<int>(arg0);
auto buffer = reinterpret_cast<char *>(arg1);
auto io_vec = reinterpret_cast<const struct iovec *>(arg1);
auto iovcnt = static_cast<int>(arg2);
long tid = syscall_no_intercept(SYS_gettid);
START_LOG(tid, "call(fd=%d, buffer=%p, count=%ld, id=%ld)", fd, buffer, iovcnt, tid);
START_LOG(tid, "call(fd=%d, buffer=%p, count=%ld, pid=%ld)", fd, io_vec->iov_base,
io_vec->iov_len, tid);
if (!exists_capio_fd(fd)) {
LOG("FD %d is not handled by capio... skipping syscall", fd);
LOG("FD %d is not handled by CAPIO... skipping syscall", fd);
return CAPIO_POSIX_SYSCALL_REQUEST_SKIP;
}

auto write_result = store_file_in_memory(get_capio_fd_path(fd), tid)
? capio_write_mem(fd, buffer, iovcnt, tid)
: capio_write_fs(fd, iovcnt, tid);
LOG("Need to handle %ld IOVEC objects", iovcnt);
int write_result = 0;
for (auto i = 0; i < iovcnt; ++i) {
const auto [iov_base, iov_len] = io_vec[i];
if (iov_len == 0) {
LOG("Size of IOVEC is 0. Skipping write request");
continue;
}
LOG("Handling IOVEC elements %d of size %ld", i, iov_len);
write_result += store_file_in_memory(get_capio_fd_path(fd), tid)
? capio_write_mem(fd, static_cast<char *>(iov_base), iov_len, tid)
: capio_writev_fs(fd, static_cast<char *>(iov_base), iov_len, tid);
}

return posix_return_value(write_result, result);
}
#endif // SYS_writev

#endif // CAPIO_POSIX_HANDLERS_WRITE_HPP
#endif // CAPIO_POSIX_HANDLERS_WRITE_HPP
7 changes: 6 additions & 1 deletion src/posix/utils/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ inline void delete_caches() {
delete consent_request_cache_fs;
delete write_request_cache_mem;
delete read_request_cache_mem;

delete cts_queue;
LOG("Removed cts_queue");
delete stc_queue;
LOG("Removed stc_queue");
}

#endif // CAPIO_CACHE_HPP
#endif // CAPIO_CACHE_HPP
5 changes: 4 additions & 1 deletion src/posix/utils/cache/consent_request_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ class ConsentRequestCache {
available_consent = new std::unordered_map<std::string, capio_off64_t>;
};

~ConsentRequestCache() { delete available_consent; };
~ConsentRequestCache() {
START_LOG(capio_syscall(SYS_gettid), "call()");
delete available_consent;
};

void consent_request(const std::filesystem::path &path, long tid,
const std::string &source_func) const {
Expand Down
5 changes: 4 additions & 1 deletion src/posix/utils/cache/read_request_cache_fs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class ReadRequestCacheFS {
available_read_cache = new std::unordered_map<std::string, capio_off64_t>;
};

~ReadRequestCacheFS() { delete available_read_cache; };
~ReadRequestCacheFS() {
START_LOG(capio_syscall(SYS_gettid), "call()");
delete available_read_cache;
};

void read_request(std::filesystem::path path, const long end_of_read, int tid, const int fd) {
START_LOG(capio_syscall(SYS_gettid), "[cache] call(path=%s, end_of_read=%ld, tid=%ld)",
Expand Down
Loading