Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ the [CAPIO-CL Docs](https://capio.hpc4ai.it/docs/coord-language/) for details.

To launch your workflow with capio you can follow two routes:

#### A) Use `capiorun` for simplfied operations
#### A) Use `capiorun` for simplified operations

You can simplify the execution of workflow steps with CAPIO using the `capiorun` utility. See the
[`capiorun` documentation](capiorun/readme.md) for usage and examples. `capiorun` provides an easier way to manage
Expand Down
1 change: 1 addition & 0 deletions src/common/capio/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ template <class T, class Mutex> class Queue {
_last_elem = (long int *) create_shm(_last_elem_name, sizeof(long int));
_shm = get_shm_if_exist(_shm_name);
if (_shm == nullptr) {
LOG("Creating shared memory");
*_first_elem = 0;
*_last_elem = 0;
_shm = create_shm(_shm_name, _buff_size);
Expand Down
1 change: 1 addition & 0 deletions src/posix/handlers/exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ int exit_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg
}

delete_caches();
delete_queues();
LOG("Removed caches");

if (const auto itm = bufs_response->find(tid); itm != bufs_response->end()) {
Expand Down
51 changes: 47 additions & 4 deletions src/posix/libcapio_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@
#ifdef CAPIO_LOG
CAPIO_LOG_LEVEL = get_capio_log_level();
#endif

START_LOG(syscall_no_intercept(SYS_gettid), "call(syscall_number=%ld)", syscall_number);
long tid = syscall_no_intercept(SYS_gettid);
START_LOG(tid, "call(syscall_number=%ld)", syscall_number);

// If the syscall_number is higher than the maximum
// syscall captured by CAPIO, simply return
Expand All @@ -408,8 +408,51 @@
return 1;
}

if (clone_after_null_child_stack) {

LOG("Initializing bufs_response to new empty object.");
bufs_response = new std::unordered_map<long, ResponseQueue *>();
LOG("Inizializing process");

Check failure on line 415 in src/posix/libcapio_posix.cpp

View workflow job for this annotation

GitHub Actions / Check codespell conformance

Inizializing ==> Initializing
init_process(tid);
LOG("Child thread %d initialized", tid);
LOG("Starting child thread %d", tid);
init_caches();
clone_after_null_child_stack = false;
}

if (syscall_number == SYS_clone
#ifdef SYS_clone3
|| syscall_number == SYS_clone3
#endif
) {
clone_after_null_child_stack = arg1 == 0;
LOG("Clone will occur with child_stack == NULL ? %s ",
clone_after_null_child_stack ? "true" : "false");
}

LOG("Handling syscall NO %ld (max num is %ld)", syscall_number, CAPIO_NR_SYSCALLS);
return syscallTable[syscall_number](arg0, arg1, arg2, arg3, arg4, arg5, result);
try {
return syscallTable[syscall_number](arg0, arg1, arg2, arg3, arg4, arg5, result);
} catch (const std::exception &exception) {
syscall_no_intercept_flag = true;

std::cout
<< std::endl
<< "~~~~~~~~~~~~~~[\033[31mlibcapio_posix.so: FATAL EXCEPTION\033[0m]~~~~~~~~~~~~~~"
<< std::endl
<< "| Exception thrown while handling syscall " << syscall_number << std::endl
<< "| TID of offending thread: " << syscall_no_intercept(SYS_gettid) << std::endl
<< "| PID of offending thread: " << syscall_no_intercept(SYS_getpid) << std::endl
<< "| PPID of offending thread: " << syscall_no_intercept(SYS_getppid) << std::endl
<< "| " << std::endl
<< "| `" << typeid(exception).name() << ": " << exception.what() << std::endl
<< "|" << std::endl
<< "~~~~~~~~~~~~~~[\033[31mlibcapio_posix.so: FATAL EXCEPTION\033[0m]~~~~~~~~~~~~~~"
<< std::endl
<< std::endl;

exit(EXIT_FAILURE);
}
}

static __attribute__((constructor)) void init() {
Expand All @@ -434,4 +477,4 @@
intercept_hook_point_clone_parent = hook_clone_parent;
intercept_hook_point = hook;
START_SYSCALL_LOGGING();
}
}
5 changes: 3 additions & 2 deletions src/posix/syscall_intercept/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ include(ExternalProject)
# Import external project from git
#####################################
ExternalProject_Add(syscall_intercept
GIT_REPOSITORY https://github.com/pmem/syscall_intercept.git
GIT_TAG ca4b13531f883597c2f04a40e095f76f6c3a6d22
GIT_REPOSITORY https://github.com/alpha-unito/syscall_intercept
GIT_TAG 623aa8415b2ff0b9b81bd08f791850723edea18c
PREFIX ${CMAKE_CURRENT_BINARY_DIR}
CMAKE_ARGS
-DBUILD_TESTS=OFF
-DSTATIC_CAPSTONE=ON
-DBUILD_EXAMPLES=OFF
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR>
Expand Down
3 changes: 3 additions & 0 deletions src/posix/utils/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ inline void delete_caches() {
delete consent_request_cache_fs;
delete write_request_cache_mem;
delete read_request_cache_mem;
}

inline void delete_queues() {
START_LOG(capio_syscall(SYS_gettid), "call()");
delete cts_queue;
LOG("Removed cts_queue");
delete stc_queue;
Expand Down
72 changes: 55 additions & 17 deletions src/posix/utils/clone.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ inline std::mutex clone_mutex;
inline std::condition_variable clone_cv;
inline std::unordered_set<pid_t> *tids;

inline bool clone_after_null_child_stack = false;

inline bool is_capio_tid(const pid_t tid) {
const std::lock_guard<std::mutex> lg(clone_mutex);
return tids->find(tid) != tids->end();
Expand All @@ -21,6 +23,7 @@ inline void register_capio_tid(const pid_t tid) {
START_LOG(syscall_no_intercept(SYS_gettid), "call(tid=%ld)", tid);
const std::lock_guard<std::mutex> lg(clone_mutex);
tids->insert(tid);
LOG("Pid inserted ? %s", tids->find(tid) == tids->end() ? "NO" : "YES");
}

inline void remove_capio_tid(const pid_t tid) {
Expand All @@ -39,13 +42,24 @@ inline void init_process(pid_t tid) {
syscall_no_intercept_flag = true;

auto *p_buf_response = new ResponseQueue(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid));
bufs_response->insert(std::make_pair(tid, p_buf_response));

DBG(tid, [](auto bufs_response, auto tid) {
START_LOG(tid, "call(DBG)");
LOG("Created buf response. buf_response map initialized ? %s",
bufs_response != nullptr ? "YES" : "NO");
LOG("buf_Response size for tid %d: %ld", tid, bufs_response->size());
for (auto &[fst, snd] : *bufs_response) {
LOG("Found entry for tid %ld", fst);
}
}(bufs_response, tid));

bufs_response->insert(std::make_pair(tid, p_buf_response));
LOG("Created request response buffer with name: %s",
(SHM_COMM_CHAN_NAME_RESP + std::to_string(tid)).c_str());

const char *capio_app_name = get_capio_app_name();
auto pid = static_cast<pid_t>(syscall_no_intercept(SYS_gettid));
LOG("sending handshake with tid=%ld, pid=%ld", tid, pid);

/**
* The previous if, for an anonymous handshake was present, however the get_capio_app_name()
Expand All @@ -60,44 +74,66 @@ inline void init_process(pid_t tid) {
inline void hook_clone_child() {
auto tid = static_cast<pid_t>(syscall_no_intercept(SYS_gettid));

#ifdef __CAPIO_POSIX
syscall_no_intercept_flag = true;

/*
* This piece of code is aimed at addressing issues with applications that spawn several
* thousand threads that only do computations. When this occurs, under some circumstances CAPIO
* might fail to allocate shared memory objects. As such, if child threads ONLY do computation,
* we can safely ignore them with CAPIO.
*/
syscall_no_intercept_flag = true;
thread_local char *skip_child = std::getenv("CAPIO_IGNORE_CHILD_THREADS");
if (skip_child != nullptr) {
auto skip_child_str = std::string(skip_child);
if (skip_child_str == "ON" || skip_child_str == "TRUE" || skip_child_str == "YES") {
return;
}
}
syscall_no_intercept_flag = false;

if (!clone_after_null_child_stack) {
syscall_no_intercept_flag = true;
std::unique_lock<std::mutex> lock(clone_mutex);
clone_cv.wait(lock, [&tid] { return tids->find(tid) != tids->end(); });
/**
* Freeing memory here through `tids.erase()` can cause a SIGSEGV error
* in the libc, which tries to load the `__ctype_b_loc` table but fails
* because it is not initialized yet. For this reason, a thread's `tid`
* is removed from the `tids` set only when the thread terminates.
*/
lock.unlock();
syscall_no_intercept_flag = false;
} else {
/*
* Needed to enable logging when SYS_clone is called with child_stack==NULL.
* In this case, the thread_local variables are initialized and not set to a nullptr.
* For this reason, we reset them here
*/

#ifdef CAPIO_LOG
logfileOpen = false;
logfileFD = -1;
bzero(logfile_path, PATH_MAX);
#endif
std::unique_lock<std::mutex> lock(clone_mutex);
clone_cv.wait(lock, [&tid] { return tids->find(tid) != tids->end(); });
// We cannot perform delete, as it will destroy also shm objects. put ptr to nullptr
// and accept a small memory leak
stc_queue = nullptr;
cts_queue = nullptr;
write_request_cache_fs = nullptr;
read_request_cache_fs = nullptr;
consent_request_cache_fs = nullptr;
write_request_cache_mem = nullptr;
read_request_cache_mem = nullptr;
START_SYSCALL_LOGGING();
return;
}

/**
* Freeing memory here through `tids.erase()` can cause a SIGSEGV error
* in the libc, which tries to load the `__ctype_b_loc` table but fails
* because it is not initialized yet. For this reason, a thread's `tid`
* is removed from the `tids` set only when the thread terminates.
*/
lock.unlock();
START_SYSCALL_LOGGING();
START_LOG(tid, "call()");
LOG("Initializing child thread %d", tid);
init_process(tid);
LOG("Child thread %d initialized", tid);
LOG("Starting child thread %d", tid);
init_caches();
#ifdef __CAPIO_POSIX
syscall_no_intercept_flag = false;
#endif
}

inline void hook_clone_parent(long child_tid) {
Expand All @@ -110,8 +146,10 @@ inline void hook_clone_parent(long child_tid) {
return;
}

clone_after_null_child_stack = false;

register_capio_tid(child_tid);
clone_cv.notify_all();
}

#endif // CAPIO_POSIX_UTILS_CLONE_HPP
#endif // CAPIO_POSIX_UTILS_CLONE_HPP
4 changes: 4 additions & 0 deletions src/posix/utils/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ inline thread_local SPSCQueue *stc_queue;
* @return
*/
inline void init_client() {
START_LOG(capio_syscall(SYS_gettid), "call()");

buf_requests =
new CircularBuffer<char>(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE);
LOG("Initialized buf_requests");
bufs_response = new std::unordered_map<long, ResponseQueue *>();
LOG("Initialized bufs_response");
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ set(TARGET_SOURCES
FetchContent_Declare(
args
GIT_REPOSITORY https://github.com/Taywee/args.git
GIT_TAG 6.4.6
GIT_TAG 6.4.7
)
FetchContent_Declare(
simdjson
Expand Down
Loading