Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions capio/server/include/handlers/close.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ inline void handle_close(int tid, int fd) {
c_file.closed()) {
LOG("Capio File %s is closed and commit rule is on_close. setting it to complete",
path.c_str());
c_file.setComplete();
c_file.commit();
c_file.setCommitted();
c_file.dump();
}

LOG("Deleting capio file %s from tid=%d", path.c_str(), tid);
Expand Down
12 changes: 6 additions & 6 deletions capio/server/include/handlers/exig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ inline void handle_exit_group(int tid) {
LOG("Handling file %s", path.c_str());
if (CapioCLEngine::get().getCommitRule(path) == capiocl::commitRules::ON_TERMINATION) {
CapioFile &c_file = storage_manager->get(path);
if (c_file.directory()) {
if (c_file.isDirectory()) {
LOG("file %s is dir", path.c_str());
long int n_committed = c_file.n_files_expected;
if (n_committed <= c_file.n_files) {
long int n_committed = c_file.getDirectoryExpectedFileCount();
if (n_committed <= c_file.getCurrentDirectoryFileCount()) {
LOG("Setting file %s to complete", path.c_str());
c_file.setComplete();
c_file.setCommitted();
}
} else {
LOG("Setting file %s to complete", path.c_str());
c_file.setComplete();
c_file.commit();
c_file.setCommitted();
c_file.dump();
}
c_file.close();
}
Expand Down
7 changes: 4 additions & 3 deletions capio/server/include/handlers/getdents.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ inline void request_remote_getdents(int tid, int fd, off64_t count) {
off64_t end_of_read = offset + count;
off64_t end_of_sector = c_file.getSectorEnd(offset);

if (c_file.complete() && (end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) {
if (c_file.isCommitted() &&
(end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.getRealFileSize())) {
LOG("Handling local read");
send_dirent_to_client(tid, fd, c_file, offset, count);
} else if (end_of_read <= end_of_sector) {
LOG("?");
c_file.createBufferIfNeeded(storage_manager->getPath(tid, fd), false);
c_file.createBuffer(storage_manager->getPath(tid, fd), false);
client_manager->replyToClient(tid, offset, c_file.getBuffer(), count);
storage_manager->setFileOffset(tid, fd, offset + count);
} else {
Expand Down
4 changes: 2 additions & 2 deletions capio/server/include/handlers/open.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ inline void update_file_metadata(const std::filesystem::path &path, int tid, int
: storage_manager->add(path, false, get_file_initial_size());
storage_manager->addFileToTid(tid, fd, path, offset);

if (c_file.first_write && is_creat) {
if (c_file.isFirstWrite() && is_creat) {
client_manager->registerProducedFile(tid, path);
c_file.first_write = false;
c_file.registerFirstWrite();
write_file_location(path);
storage_manager->updateDirectory(tid, path);
}
Expand Down
15 changes: 8 additions & 7 deletions capio/server/include/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ inline void handle_pending_read(int tid, int fd, long int process_offset, long i
bytes_read = end_of_sector - process_offset;
}

c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
client_manager->replyToClient(tid, process_offset, c_file.getBuffer(), bytes_read);
storage_manager->setFileOffset(tid, fd, process_offset + bytes_read);

Expand All @@ -47,13 +47,13 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) {
off64_t process_offset = storage_manager->getFileOffset(tid, fd);

// if a process is the producer of a file, then the file is always complete for that process
const bool file_complete = c_file.complete() || is_prod;
const bool file_complete = c_file.isCommitted() || is_prod;

if (!(file_complete || CapioCLEngine::get().isFirable(path))) {
// wait for file to be completed and then do what is done inside handle pending read
LOG("Data is not available yet. Starting async thread to wait for file availability");
std::thread t([&c_file, tid, fd, count, process_offset] {
c_file.waitForCompletion();
c_file.waitForCommit();
handle_pending_read(tid, fd, process_offset, count);
});
t.detach();
Expand Down Expand Up @@ -85,7 +85,7 @@ inline void handle_local_read(int tid, int fd, off64_t count, bool is_prod) {
const auto read_size = std::min(count, end_of_sector - process_offset);
LOG("Requested read within end of sector, and data is available. Serving %ld bytes", read_size);

c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
client_manager->replyToClient(tid, process_offset, c_file.getBuffer(), read_size);
storage_manager->setFileOffset(tid, fd, process_offset + read_size);
}
Expand All @@ -99,13 +99,14 @@ inline void request_remote_read(int tid, int fd, off64_t count) {
off64_t end_of_read = offset + count;
off64_t end_of_sector = c_file.getSectorEnd(offset);

if (c_file.complete() && (end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.real_file_size)) {
if (c_file.isCommitted() &&
(end_of_read <= end_of_sector ||
(end_of_sector == -1 ? 0 : end_of_sector) == c_file.getRealFileSize())) {
LOG("Handling local read");
handle_local_read(tid, fd, count, true);
} else if (end_of_read <= end_of_sector) {
LOG("Data is present locally and can be served to client");
c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);

client_manager->replyToClient(tid, offset, c_file.getBuffer(), count);
storage_manager->setFileOffset(tid, fd, offset + count);
Expand Down
10 changes: 5 additions & 5 deletions capio/server/include/handlers/stat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ void wait_for_file_completion(int tid, const std::filesystem::path &path) {
CapioFile &c_file = storage_manager->get(path);

// if file is streamable
if (c_file.complete() || CapioCLEngine::get().isFirable(path) ||
if (c_file.isCommitted() || CapioCLEngine::get().isFirable(path) ||
strcmp(std::get<0>(get_file_location(path)), node_name) == 0) {

client_manager->replyToClient(tid, c_file.getFileSize());
client_manager->replyToClient(tid, static_cast<int>(c_file.directory() ? 1 : 0));
client_manager->replyToClient(tid, static_cast<int>(c_file.isDirectory() ? 1 : 0));

} else {
handle_remote_stat_request(tid, path);
Expand Down Expand Up @@ -72,15 +72,15 @@ inline void reply_stat(int tid, const std::filesystem::path &path) {
LOG("File is now present from remote node. retrieving file again.");
file_location_opt = get_file_location_opt(path);
}
if (c_file.complete() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||
if (c_file.isCommitted() || strcmp(std::get<0>(file_location_opt->get()), node_name) == 0 ||
CapioCLEngine::get().isFirable(path) || capio_dir == path) {
LOG("Sending response to client");
client_manager->replyToClient(tid, c_file.getFileSize());
client_manager->replyToClient(tid, static_cast<int>(c_file.directory() ? 1 : 0));
client_manager->replyToClient(tid, static_cast<int>(c_file.isDirectory() ? 1 : 0));
} else {
LOG("Delegating backend to reply to remote stats");
// send a request for file. then start a thread to wait for the request completion
c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
handle_remote_stat_request(tid, path);
}
}
Expand Down
8 changes: 4 additions & 4 deletions capio/server/include/handlers/write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ void write_handler(const char *const str) {
off64_t end_of_write = offset + count;
const std::filesystem::path &path = storage_manager->getPath(tid, fd);
CapioFile &c_file = storage_manager->get(path);
off64_t file_shm_size = c_file.getBufferSize();
off64_t file_shm_size = c_file.getBufSize();
SPSCQueue &data_buf = client_manager->getClientToServerDataBuffers(tid);

c_file.createBufferIfNeeded(path, true);
c_file.createBuffer(path, true);
if (end_of_write > file_shm_size) {
c_file.expandBuffer(end_of_write);
}
c_file.readFromQueue(data_buf, offset, count);

client_manager->registerProducedFile(tid, path);
c_file.insertSector(offset, end_of_write);
if (c_file.first_write) {
c_file.first_write = false;
if (c_file.isFirstWrite()) {
c_file.registerFirstWrite();
write_file_location(path);
// TODO: it works only if there is one prod per file
storage_manager->updateDirectory(tid, path);
Expand Down
16 changes: 8 additions & 8 deletions capio/server/include/remote/handlers/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ inline void handle_read_reply(int tid, int fd, long count, off64_t file_size, of
const std::filesystem::path &path = storage_manager->getPath(tid, fd);
CapioFile &c_file = storage_manager->get(path);
off64_t offset = storage_manager->getFileOffset(tid, fd);
c_file.real_file_size = file_size;
c_file.setRealFileSize(file_size);
c_file.insertSector(offset, offset + nbytes);
c_file.setComplete(complete);
c_file.setCommitted(complete);

off64_t end_of_sector = c_file.getSectorEnd(offset);
c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
off64_t bytes_read;
off64_t end_of_read = offset + count;
if (end_of_sector > end_of_read) {
Expand All @@ -75,7 +75,7 @@ void wait_for_data(const std::filesystem::path &path, const std::string &dest, i
const CapioFile &c_file = storage_manager->get(path);
// wait that nbytes are written
c_file.waitForData(offset + count);
serve_remote_read(path, dest, tid, fd, count, offset, c_file.complete(), is_getdents);
serve_remote_read(path, dest, tid, fd, count, offset, c_file.isCommitted(), is_getdents);
}

inline void handle_remote_read(const std::filesystem::path &path, const std::string &source,
Expand All @@ -86,8 +86,8 @@ inline void handle_remote_read(const std::filesystem::path &path, const std::str

CapioFile &c_file = storage_manager->get(path);
bool data_available = (offset + count <= c_file.getStoredSize());
if (c_file.complete() || (CapioCLEngine::get().isFirable(path) && data_available)) {
serve_remote_read(path, source, tid, fd, count, offset, c_file.complete(), is_getdents);
if (c_file.isCommitted() || (CapioCLEngine::get().isFirable(path) && data_available)) {
serve_remote_read(path, source, tid, fd, count, offset, c_file.isCommitted(), is_getdents);
} else {
std::thread t(wait_for_data, path, source, tid, fd, count, offset, is_getdents);
t.detach();
Expand All @@ -107,9 +107,9 @@ inline void handle_remote_read_reply(const std::string &source, int tid, int fd,
off64_t offset = storage_manager->getFileOffset(tid, fd);
CapioFile &c_file = storage_manager->get(path);

c_file.createBufferIfNeeded(path, false);
c_file.createBuffer(path, false);
if (nbytes != 0) {
auto file_shm_size = c_file.getBufferSize();
auto file_shm_size = c_file.getBufSize();
auto file_size_recv = offset + nbytes;
if (file_size_recv > file_shm_size) {
c_file.expandBuffer(file_size_recv);
Expand Down
6 changes: 3 additions & 3 deletions capio/server/include/remote/handlers/stat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ inline void serve_remote_stat(const std::filesystem::path &path, const std::stri

const CapioFile &c_file = storage_manager->get(path);
off64_t file_size = c_file.getFileSize();
bool is_dir = c_file.directory();
bool is_dir = c_file.isDirectory();
serve_remote_stat_request(path, source_tid, file_size, is_dir, dest);
}

Expand All @@ -26,7 +26,7 @@ void wait_for_completion(const std::filesystem::path &path, int source_tid,
dest.c_str());

const CapioFile &c_file = storage_manager->get(path);
c_file.waitForCompletion();
c_file.waitForCommit();
LOG("File %s has been completed. serving stats data", path.c_str());
serve_remote_stat(path, dest, source_tid);
}
Expand All @@ -39,7 +39,7 @@ inline void handle_remote_stat(int source_tid, const std::filesystem::path &path
const auto c_file = storage_manager->tryGet(path);
if (c_file) {
LOG("File %s is present on capio file system", path.c_str());
if (c_file->get().complete() || CapioCLEngine::get().isFirable(path)) {
if (c_file->get().isCommitted() || CapioCLEngine::get().isFirable(path)) {
LOG("file is complete. serving file");
serve_remote_stat(path, dest, source_tid);
} else { // wait for completion
Expand Down
Loading
Loading