diff --git a/turbonfs/inc/file_cache.h b/turbonfs/inc/file_cache.h index e51cee3c..2ac4bcaf 100644 --- a/turbonfs/inc/file_cache.h +++ b/turbonfs/inc/file_cache.h @@ -1192,6 +1192,8 @@ class bytes_chunk_cache */ std::vector get_commit_pending_bcs() const; + std::vector get_contigious_dirty_bcs(uint64_t& size); + /** * Drop cached data in the given range. * This must be called only for file-backed caches. For non file-backed @@ -1262,7 +1264,7 @@ class bytes_chunk_cache /** * Maximum size a dirty extent can grow before we should flush it. - * This is 60% of the allowed cache size or 1GB whichever is lower. + * This is 30% of the allowed cache size or 1GB whichever is lower. * The reason for limiting it to 1GB is because there's not much value in * holding more data than the Blob NFS server's scheduler cache size. * We want to send as prompt as possible to utilize the n/w b/w but slow @@ -1274,11 +1276,38 @@ class bytes_chunk_cache static const uint64_t max_total = (aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL); assert(max_total != 0); - static const uint64_t max_dirty_extent = (max_total * 0.6); + static const uint64_t max_dirty_extent = + std::max((uint64_t)(max_total * 0.3), (uint64_t)AZNFSCFG_WSIZE_MAX); return std::min(max_dirty_extent, uint64_t(1024 * 1024 * 1024ULL)); } + /* + * Maximum size of commit bytes that can be pending in cache, before we + * should commit it to Blob. + * It should be greater than max_dirty_extent_bytes() and smaller than + * inline_dirty_threshold. So, that inline pruning can be avoided. + * This is 60% of the allowed cache size. + * f.e = Cache size of 4GB then max_commit_bytes = 2.4GB + * - Flush will start every 1GB dirty data and each 1GB dirty data + * converted to commit_pending_bytes. + * + * This is 60% of the allowed cache size. + */ + uint64_t max_commit_bytes() const + { + // Maximum cache size allowed in bytes. + static const uint64_t max_total = + (aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL); + assert(max_total != 0); + + static const uint64_t max_commit_bytes = + (((uint64_t)(max_total * 0.6))/ max_dirty_extent_bytes()) * max_dirty_extent_bytes(); + + return std::max((max_commit_bytes - AZNFSCFG_WSIZE_MIN), + ((2 * max_dirty_extent_bytes()) - AZNFSCFG_WSIZE_MIN)); + } + /** * Get the amount of dirty data that needs to be flushed. * This excludes the data which is already flushing. @@ -1327,6 +1356,43 @@ class bytes_chunk_cache return bytes_flushing > 0; } + /* + * get_bytes_to_prune() returns the number of bytes that need to be flushed + * inline to free up the space. If there are enough bytes_flushing then we + * can just wait for them to complete. + * + * Note : We are not considering bytes_commit_pending in this calculation. + * If bytes_commit_pending are high then commit already started and + * if bytes_flushing are high once flushing is done commit will be + * triggered. + */ + uint64_t get_bytes_to_prune() const + { + static const uint64_t max_dirty_allowed_per_cache = + max_dirty_extent_bytes() * 2; + int64_t total_bytes = + std::max(int64_t(bytes_dirty - bytes_flushing), int64_t(0)); + const bool local_pressure = total_bytes > (int64_t)max_dirty_allowed_per_cache; + + if (local_pressure) { + return max_dirty_extent_bytes(); + } + + /* + * Global pressure is when get_prune_goals() returns non-zero bytes + * to be pruned inline. + */ + uint64_t inline_bytes; + + /* + * TODO: Noisy neighbor syndrome, where one file is hogging the cache, + * inline pruning will be triggered for other files. + */ + get_prune_goals(&inline_bytes, nullptr); + return std::max(int64_t(inline_bytes - (bytes_flushing + bytes_commit_pending)), + (int64_t)0); + } + /** * This should be called by writer threads to find out if they must wait * for the write to complete. This will check both the cache specific and @@ -1334,14 +1400,29 @@ class bytes_chunk_cache */ bool do_inline_write() const { + // Maximum cache size allowed in bytes. + static const uint64_t max_total = + (aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL); + assert(max_total != 0); + /* - * Allow two dirty extents before we force inline write. - * This way one of the extent can be getting flushed and we can populate - * the second one. + * Allow upto 80% of the cache size to be dirty. + * After this we enforce inline pruning for writers. This is to avoid + * writers getting blocked due to memory pressure. We allow upto 80% + * because we don't want to prune too aggressively, as it hurts write + * performance. */ static const uint64_t max_dirty_allowed_per_cache = - max_dirty_extent_bytes() * 2; - const bool local_pressure = bytes_dirty > max_dirty_allowed_per_cache; + max_total * 0.8; + + /* + * If current cache usage is more than the max_dirty_allowed_per_cache + * limit, we need to enforce inline pruning. Cache usage is the sum of + * bytes_dirty and bytes_commit_pending. Membufs can be dirty or commit + * pending, but not both at the same time. Both dirty and commit pending + * occupy memory, so we need to account for both. + */ + const bool local_pressure = (bytes_dirty + bytes_commit_pending) > max_dirty_allowed_per_cache; if (local_pressure) { return true; diff --git a/turbonfs/inc/nfs_inode.h b/turbonfs/inc/nfs_inode.h index 0d46a719..2660a2fb 100644 --- a/turbonfs/inc/nfs_inode.h +++ b/turbonfs/inc/nfs_inode.h @@ -279,6 +279,9 @@ struct nfs_inode */ struct stat attr; + off_t in_cache_filesize = 0; + off_t putblock_filesize = 0; + /* * Has this inode seen any non-append write? * This starts as false and remains false as long as copy_to_cache() only @@ -300,7 +303,7 @@ struct nfs_inode * Note: As of now, we are not using this flag as commit changes not yet * integrated, so we are setting this flag to true. */ - bool stable_write = true; + bool stable_write = false; public: /* @@ -418,6 +421,12 @@ struct nfs_inode std::atomic commit_state = commit_state_t::COMMIT_NOT_NEEDED; + /* + * commit_lock_5 is used to synchronize flush thread and write thread + * for commit operation. + */ + std::mutex commit_lock_5; + /** * TODO: Initialize attr with postop attributes received in the RPC * response. @@ -947,14 +956,14 @@ struct nfs_inode * updated from postop attributes. We will need to correctly * update that when file is truncated f.e. */ - if (!non_append_writes_seen && (offset != attr.st_size)) { + if (!non_append_writes_seen && (offset != in_cache_filesize)) { non_append_writes_seen = true; AZLogInfo("[{}] Non-append write seen [{}, {}), file size: {}", - ino, offset, offset+length, attr.st_size); + ino, offset, offset+length, in_cache_filesize); } - if (new_size > attr.st_size) { - attr.st_size = new_size; + if (new_size > in_cache_filesize) { + in_cache_filesize = new_size; } } @@ -1130,7 +1139,8 @@ struct nfs_inode bool is_commit_in_progress() const { assert(commit_state != commit_state_t::INVALID); - return (commit_state == commit_state_t::COMMIT_IN_PROGRESS); + return ((commit_state == commit_state_t::COMMIT_IN_PROGRESS) || + (commit_state == commit_state_t::NEEDS_COMMIT)); } /** @@ -1316,7 +1326,7 @@ struct nfs_inode * progress (which must have held the is_flushing lock). */ int flush_cache_and_wait(uint64_t start_off = 0, - uint64_t end_off = UINT64_MAX); + uint64_t end_off = UINT64_MAX, bool is_release = true); /** * Wait for currently flushing membufs to complete. @@ -1328,6 +1338,33 @@ struct nfs_inode int wait_for_ongoing_flush(uint64_t start_off = 0, uint64_t end_off = UINT64_MAX); + /* + * commit_membufs() is called to commit uncommitted membufs to the BLOB. + * It creates commit RPC and sends it to the NFS server. + */ + void commit_membufs(); + + + /* + * switch_to_stable_write() is called to switch the inode to stable write + * mode. There is should be no ongoing commit/flusing operation when this + * is called. It creates a commit RPC to commit all the uncopmmitted membufs + * to the BLOB. + */ + void switch_to_stable_write(); + + /** + * Check if stable write is required for the given offset. + * Given offset is the start of contigious dirty membufs that need to be + * flushed to the BLOB. + */ + bool check_stable_write_required(off_t offset); + + /** + * Wait for ongoing commit operation to complete. + */ + void wait_for_ongoing_commit(); + /** * Sync the dirty membufs in the file cache to the NFS server. * All contiguous dirty membufs are clubbed together and sent to the diff --git a/turbonfs/inc/rpc_stats.h b/turbonfs/inc/rpc_stats.h index 08736572..215378f0 100644 --- a/turbonfs/inc/rpc_stats.h +++ b/turbonfs/inc/rpc_stats.h @@ -116,9 +116,6 @@ class rpc_stats_az */ void on_rpc_issue() { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(stamp.issue == 0); stamp.issue = get_current_usecs(); assert(stamp.issue >= stamp.create); @@ -133,9 +130,6 @@ class rpc_stats_az */ void on_rpc_cancel() { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(stamp.issue != 0); assert((int64_t) stamp.issue <= get_current_usecs()); assert(stamp.dispatch == 0); @@ -155,9 +149,6 @@ class rpc_stats_az */ void on_rpc_complete(struct rpc_pdu *pdu, nfsstat3 status) { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(nfsstat3_to_errno(status) != -ERANGE); req_size = rpc_pdu_get_req_size(pdu); diff --git a/turbonfs/inc/rpc_task.h b/turbonfs/inc/rpc_task.h index 99e6a3d1..ff0fcddd 100644 --- a/turbonfs/inc/rpc_task.h +++ b/turbonfs/inc/rpc_task.h @@ -1474,7 +1474,6 @@ struct api_task_info */ fuse_req *req = nullptr; - /* * Only valid for FUSE_READ. * @@ -2177,6 +2176,8 @@ struct rpc_task return stats; } + void set_task_csched(bool stable_write); + struct nfs_context *get_nfs_context() const; struct rpc_context *get_rpc_ctx() const @@ -2503,6 +2504,8 @@ struct rpc_task bool add_bc(const bytes_chunk& bc); void issue_write_rpc(); + void issue_commit_rpc(); + #ifdef ENABLE_NO_FUSE /* * In nofuse mode we re-define these fuse_reply functions to copy the diff --git a/turbonfs/src/file_cache.cpp b/turbonfs/src/file_cache.cpp index 42884742..fb579e14 100644 --- a/turbonfs/src/file_cache.cpp +++ b/turbonfs/src/file_cache.cpp @@ -2302,6 +2302,42 @@ std::vector bytes_chunk_cache::get_flushing_bc_range( return bc_vec; } +std::vector bytes_chunk_cache::get_contigious_dirty_bcs(uint64_t& size) +{ + std::vector bc_vec; + size = 0; + + // TODO: Make it shared lock. + const std::unique_lock _lock(chunkmap_lock_43); + auto it = chunkmap.lower_bound(0); + uint64_t prev_offset = AZNFSC_BAD_OFFSET; + + while (it != chunkmap.cend()) { + const struct bytes_chunk& bc = it->second; + struct membuf *mb = bc.get_membuf(); + + if (mb->is_dirty() && !mb->is_flushing()) { + if (prev_offset != AZNFSC_BAD_OFFSET) { + if (prev_offset != bc.offset) { + break; + } else { + size += bc.length; + prev_offset = bc.offset + bc.length; + } + } else { + size += bc.length; + prev_offset = bc.offset + bc.length; + } + mb->set_inuse(); + bc_vec.emplace_back(bc); + } + + ++it; + } + + return bc_vec; +} + std::vector bytes_chunk_cache::get_dirty_bc_range(uint64_t start_off, uint64_t end_off) const { std::vector bc_vec; diff --git a/turbonfs/src/nfs_inode.cpp b/turbonfs/src/nfs_inode.cpp index 74715fc6..bf2536aa 100644 --- a/turbonfs/src/nfs_inode.cpp +++ b/turbonfs/src/nfs_inode.cpp @@ -34,7 +34,7 @@ nfs_inode::nfs_inode(const struct nfs_fh3 *filehandle, assert(write_error == 0); // TODO: Revert this to false once commit changes integrated. - assert(stable_write == true); + assert(stable_write == false); assert(commit_state == commit_state_t::COMMIT_NOT_NEEDED); #ifndef ENABLE_NON_AZURE_NFS @@ -375,12 +375,144 @@ int nfs_inode::get_actimeo_max() const } } +/* + * Caller should hold flush_lock(). + */ +void nfs_inode::wait_for_ongoing_commit() +{ + assert(is_flushing); + + while (is_commit_in_progress()) { + assert(!get_filecache()->is_flushing_in_progress()); + ::usleep(1000); + } + + assert(is_commit_in_progress() == false); +} + +/* + * This function is called with flush_lock() held. + * In normal case this function should be no-op as + * when we reach here all the previous flushes/commits + * should have been completed. But in case of committing + * at the close of file, we need to wait if any flush + * going on and commit all the dirty membufs which are + * not yet committed. + */ +void nfs_inode::switch_to_stable_write() +{ + AZLogInfo("[{}] Switching to stable write", ino); + + /* + * We should not be in commit in progress state. + * Switch_to_stable_write() is called by flush thread + * and it shouldn't be called when commit is in progress. + */ + assert(is_commit_in_progress() == false); + + /* + * switch_to_stable_write() called from following paths. + * 1. sync_membufs(), No new flush should be started if + * there is ongoing flush is in progress. + * 2. flush_cache_and_wait(), Wait for ongoing flush to + * complete. + * These are only two paths from where we flush writes. + */ + assert(!get_filecache()->is_flushing_in_progress()); + + /* + * Check if there is anything to commit, if not then + * switch to stable write. + */ + if (get_filecache()->get_bytes_to_commit() == 0) { + AZLogDebug("[{}] Nothing to commit, switching to stable write", ino); + set_stable_write(); + return; + } + + /* + * If there is something to commit, then commit it and + * wait for the commit to complete. + */ + set_commit_in_progress(); + commit_membufs(); + wait_for_ongoing_commit(); + + assert(get_filecache()->get_bytes_to_commit() == 0); + assert(!get_filecache()->is_flushing_in_progress()); + assert(!is_commit_in_progress()); + + set_stable_write(); + return; +} + +/* + * This function checks, whether switch to stable write or not. + */ +bool nfs_inode::check_stable_write_required(off_t offset) +{ + /* + * If stable_write is already set, we don't need to do anything. + * We don't need lock here as once stable_write is set it's never + * unset. + */ + if (is_stable_write()) { + return false; + } + + /* + * If current write is not append write, then we can't go for unstable writes + * It may be overwrite to existing data and we don't have the knowldege of existing + * block list, it maye require read modified write. So, we can't go for unstable write. + * Similarly, if the offset is more than end of the file, we need to write zero block + * in between the current end of the file and the offset. + */ + if (putblock_filesize != offset) { + AZLogInfo("Stable write required as flushed_size:{} is not at the offset:{}", putblock_filesize, offset); + + return true; + } + + return false; +} + + +/** + * commit_membufs() is called by writer thread to commit flushed membufs. + */ +void nfs_inode::commit_membufs() +{ + /* + * Create the commit task to carry out the write. + */ + struct rpc_task *commit_task = + get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_FLUSH); + commit_task->init_flush(nullptr /* fuse_req */, ino); + assert(commit_task->rpc_api->pvt == nullptr); + + commit_task->issue_commit_rpc(); +} + void nfs_inode::sync_membufs(std::vector &bc_vec, bool is_flush) { + /* + * New flush should not be started if flush/commit is in progress. + */ + assert(!is_commit_in_progress()); + assert(!get_filecache()->is_flushing_in_progress()); + if (bc_vec.empty()) { return; } + /* + * If new offset is not at the end of the file, + * then we need to switch to stable write. + */ + if(check_stable_write_required(bc_vec[0].offset)) { + switch_to_stable_write(); + } + /* * Create the flush task to carry out the write. */ @@ -470,6 +602,9 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, bool is_flush) * Once packed completely, then dispatch the write. */ if (write_task->add_bc(bc)) { + if ((uint64_t) putblock_filesize == bc.offset) { + putblock_filesize += bc.length; + } continue; } else { /* @@ -490,6 +625,9 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, bool is_flush) // Single bc addition should not fail. [[maybe_unused]] bool res = write_task->add_bc(bc); assert(res == true); + if ((uint64_t)putblock_filesize == bc.offset) { + putblock_filesize += bc.length; + } } } @@ -499,6 +637,7 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, bool is_flush) } } + /** * Note: This takes exclusive lock on ilock_1. */ @@ -685,6 +824,9 @@ int nfs_inode::copy_to_cache(const struct fuse_bufvec* bufv, /** * Note: Caller should call with flush_lock() held. + * + * TODO: In case of commit all the dirty membufs on file close, + * truncate call should commit all the dirty membufs. */ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) { @@ -712,20 +854,29 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) /* * Flushing not in progress and no new flushing can be started as we hold * the flush_lock(). + * It may be possible that the flush is not in progress and but we may have + * commit_in_progress. In that case we need to wait for the commit to complete. */ - if (!get_filecache()->is_flushing_in_progress()) { + if (!get_filecache()->is_flushing_in_progress() && !is_commit_in_progress()) { AZLogDebug("[{}] No flush in progress, returning", ino); return 0; } + std::vector bc_vec = {}; /* - * Get the flushing bytes_chunk from the filecache handle. - * This will grab an exclusive lock on the file cache and return the list - * of flushing bytes_chunks at that point. Note that we can have new dirty - * bytes_chunks created but we don't want to wait for those. + * Try to get the flushing bytes_chunk from the filecache handle if the + * flush is in progress. */ - std::vector bc_vec = - filecache_handle->get_flushing_bc_range(start_off, end_off); + if (get_filecache()->is_flushing_in_progress()) { + /* + * Get the flushing bytes_chunk from the filecache handle. + * This will grab an exclusive lock on the file cache and return the list + * of flushing bytes_chunks at that point. Note that we can have new dirty + * bytes_chunks created but we don't want to wait for those. + */ + bc_vec = + filecache_handle->get_flushing_bc_range(start_off, end_off); + } /* * Our caller expects us to return only after the flush completes. @@ -768,16 +919,33 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) */ filecache_handle->release(bc.offset, bc.length); } + AZLogDebug("[{}] wait_for_ongoing_flush() returning with error: {}", ino, get_write_error()); + + wait_for_ongoing_commit(); return get_write_error(); } /** - * Note: This takes shared lock on ilock_1. + * flsuh_cache_and_wait() is called by the writer/release thread to do inline + * flush of the dirty membufs. + * - Flush_lock held all the time, so that no new flush/commit can be started. + * - Before flushing it needs to wait for any ongoing commit to complete. + * - After that it gets the dirty membufs from the filecache handle, flushes + * them. + * - First it try to get contigious dirty membufs and check if it meets the + * prune_bytes, if not then it try to get all the dirty membufs and switch + * to stable write. + * - For release case, it gets all the dirty membufs and switch to stable write. + * The reason for that we don't know if get_contigious_dirty_bcs() returned + * all the dirty membufs or not. It can be optimized, but for now we are + * keeping it simple. */ -int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) +int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off, bool is_release) { + assert(start_off < end_off); + /* * MUST be called only for regular files. * Leave the assert to catch if fuse ever calls flush() on non-reg files. @@ -816,14 +984,91 @@ int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) */ flush_lock(); + /* + * Check if inline write is required or not. + * As it may happen previous thread may have already flushed + * the dirty membufs and we don't need to do inline write. + */ + if (!is_release && !get_filecache()->do_inline_write()) { + flush_unlock(); + AZLogDebug("[{}] Inline write not required", ino); + return 0; + } + + /* + * If flush is in progress, wait for it to complete. + * It may happen that last flush is still in progress and we need to wait + * for it to complete before we start the new flush. + * As we have the flush_lock() we are guaranteed that no new flush can be + * started till we release the flush_lock(). + * + * wait_for_ongoing_flush() will wait for the ongoing flush/commt to complete. + */ + const int err = wait_for_ongoing_flush(0, UINT64_MAX); + if (err != 0) { + AZLogError("[{}] Failed to flush cache to stable storage, " + "error={}", ino, err); + flush_unlock(); + return err; + } + /* * Get the dirty bytes_chunk from the filecache handle. * This will grab an exclusive lock on the file cache and return the list * of dirty bytes_chunks at that point. Note that we can have new dirty * bytes_chunks created but we don't want to wait for those. */ - std::vector bc_vec = - filecache_handle->get_dirty_bc_range(start_off, end_off); + std::vector bc_vec; + uint64_t size = 0; + if (is_release) { + /* + * Get the dirty bytes_chunk from the filecache handle. + * This will grab an exclusive lock on the file cache and return the list + * of dirty bytes_chunks at that point. Note that we can have new dirty + * bytes_chunks created but we don't want to wait for those. + */ + if (!is_stable_write()) { + bc_vec = filecache_handle->get_contigious_dirty_bcs(size); + + if (size != get_filecache()->get_bytes_to_flush()) { + AZLogInfo("SWITCHING TO STABLE WRITE"); + switch_to_stable_write(); + for (auto& bc : bc_vec) { + bc.get_membuf()->clear_inuse(); + } + bc_vec = filecache_handle->get_dirty_bc_range(start_off, end_off); + } + } else { + assert(get_filecache()->get_bytes_to_commit() == 0); + bc_vec = filecache_handle->get_dirty_bc_range(start_off, end_off); + } + } else { + uint64_t prune_bytes = get_filecache()->get_bytes_to_prune(); + assert(prune_bytes > 0); + + /* + * If prune_bytes is greater than the size of the bytes_chunk_vector returned + * then we need to switch to stable write. + */ + bc_vec = filecache_handle->get_contigious_dirty_bcs(size); + if (prune_bytes > size) { + AZLogInfo("[{}] Prune bytes: {} > size: {}", ino, prune_bytes, size); + AZLogInfo("SWITCHING TO STABLE WRITE"); + for (auto& bc : bc_vec) { + bc.get_membuf()->clear_inuse(); + } + + /* + * Switch to stable write and get all the dirty + * membufs from the filecache handle. + */ + switch_to_stable_write(); + bc_vec = filecache_handle->get_dirty_bc_range(start_off, end_off); + } else { + AZLogInfo("[{}] Prune bytes: {} <= size: {}", ino, prune_bytes, size); + AZLogInfo("Flush the contigious dirty membufs"); + } + } /* * sync_membufs() iterate over the bc_vec and starts flushing the dirty @@ -874,6 +1119,19 @@ int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) filecache_handle->release(bc.offset, bc.length); } + /* + * If this is a release call, then we need to commit all the dirty membufs + * which are not yet committed and need to wait for the commit to complete. + * For regular write, we don't need to commit as next write will take care + * of it. + */ + if (is_release && (get_filecache()->get_bytes_to_commit() > 0)) { + assert(!is_commit_in_progress()); + set_commit_in_progress(); + commit_membufs(); + wait_for_ongoing_commit(); + } + flush_unlock(); return get_write_error(); } @@ -974,6 +1232,12 @@ bool nfs_inode::truncate_start(size_t size) AZLogDebug("[{}] Filecache truncated to size={}", ino, truncate_size); + /* + * Set the new file size in the inode's cached attributes. + */ + in_cache_filesize = size; + putblock_filesize = size; + return true; } diff --git a/turbonfs/src/rpc_stats.cpp b/turbonfs/src/rpc_stats.cpp index afbda0aa..daf57cdb 100644 --- a/turbonfs/src/rpc_stats.cpp +++ b/turbonfs/src/rpc_stats.cpp @@ -241,6 +241,7 @@ do { \ DUMP_OP(FUSE_READDIRPLUS); DUMP_OP(FUSE_READ); DUMP_OP(FUSE_WRITE); + DUMP_OP(FUSE_FLUSH); /* * TODO: Add more ops. diff --git a/turbonfs/src/rpc_task.cpp b/turbonfs/src/rpc_task.cpp index 28da4c9c..2b02a6ed 100644 --- a/turbonfs/src/rpc_task.cpp +++ b/turbonfs/src/rpc_task.cpp @@ -794,6 +794,146 @@ void access_callback( } } +static void commit_callback( + struct rpc_context *rpc, + int rpc_status, + void *data, + void *private_data) +{ + rpc_task *task = (rpc_task*) private_data; + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_FLUSH); + assert(task->rpc_api->pvt != nullptr); + + auto res = (COMMIT3res*)data; + + INJECT_JUKEBOX(res, task); + + const fuse_ino_t ino = + task->rpc_api->flush_task.get_ino(); + struct nfs_inode *inode = + task->get_client()->get_nfs_inode_from_ino(ino); + auto bc_vec_ptr = (std::vector *)task->rpc_api->pvt; + + const int status = task->status(rpc_status, NFS_STATUS(res)); + UPDATE_INODE_WCC(inode, res->COMMIT3res_u.resok.file_wcc); + AZLogDebug("commit_callback"); + /* + * Now that the request has completed, we can query libnfs for the + * dispatch time. + */ + task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUS(res)); + if (status == 0) { + uint64_t offset = 0; + uint64_t length = 0; + auto &bc_vec = *bc_vec_ptr; // get the vector from the pointer. + + for (auto &bc : bc_vec) + { + struct membuf *mb = bc.get_membuf(); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_commit_pending()); + assert(mb->is_uptodate()); + + mb->clear_commit_pending(); + mb->clear_locked(); + mb->clear_inuse(); + + /** + * Collect the contiguous range of bc's to release. + */ + if (offset == 0 && length == 0) { + offset = bc.offset; + length = bc.length; + } else if (offset + length == bc.offset) { + length += bc.length; + } else { + auto release = inode->get_filecache()->release(offset, length); + AZLogInfo("BC released {}, asked for {}", release, length); + offset = bc.offset; + length = bc.length; + } + } + + if (length != 0) { + assert(length != 0); + auto release = inode->get_filecache()->release(offset, length); + + AZLogInfo("BC offset {}, asked for {}, release {}", (offset/1048576), (length/1048576), release); + } + + if (task->get_fuse_req() != nullptr) { + task->reply_error(status); + } else { + task->free_rpc_task(); + } + } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) { + task->get_client()->jukebox_retry(task); + return; + } else { + if (task->get_fuse_req() != nullptr) { + task->reply_error(status); + } else { + inode->set_write_error(status); + task->free_rpc_task(); + } + } + + delete bc_vec_ptr; + inode->clear_commit_in_progress(); +} + +void rpc_task::issue_commit_rpc() +{ + // Must only be called for a flush task. + assert(get_op_type() == FUSE_FLUSH); + + const fuse_ino_t ino = rpc_api->flush_task.get_ino(); + struct nfs_inode *inode = get_client()->get_nfs_inode_from_ino(ino); + assert(inode->is_stable_write() == false); + + COMMIT3args args; + ::memset(&args, 0, sizeof(args)); + bool rpc_retry = false; + + AZLogInfo("issue_commit_rpc"); + + /* + * Get the bcs marked for commit_pending. + */ + if (rpc_api->pvt == nullptr) { + rpc_api->pvt = static_cast(new std::vector(inode->get_filecache()->get_commit_pending_bcs())); + } + + args.file = inode->get_fh(); + args.offset = 0; + args.count = 0; + + do { + rpc_retry = false; + stats.on_rpc_issue(); + + if (rpc_nfs3_commit_task(get_rpc_ctx(), + commit_callback, &args, + this) == NULL) { + stats.on_rpc_cancel(); + /* + * Most common reason for this is memory allocation failure, + * hence wait for some time before retrying. Also block the + * current thread as we really want to slow down things. + * + * TODO: For soft mount should we fail this? + */ + rpc_retry = true; + + AZLogWarn("rpc_nfs3_write_task failed to issue, retrying " + "after 5 secs!"); + ::sleep(5); + } + } while (rpc_retry); +} + /* * Called when libnfs completes a WRITE_IOV RPC. */ @@ -886,7 +1026,7 @@ static void write_iov_callback( bciov->orig_offset + bciov->orig_length); // Update bciov after the current write. - bciov->on_io_complete(res->WRITE3res_u.resok.count); + bciov->on_io_complete(res->WRITE3res_u.resok.count, !inode->is_stable_write()); // Create a new write_task for the remaining bc_iovec. struct rpc_task *write_task = @@ -914,7 +1054,7 @@ static void write_iov_callback( return; } else { // Complete bc_iovec IO completed. - bciov->on_io_complete(res->WRITE3res_u.resok.count); + bciov->on_io_complete(res->WRITE3res_u.resok.count, !inode->is_stable_write()); // Complete data writen to blob. AZLogDebug("[{}] <{}> Completed write, off: {}, len: {}", @@ -947,6 +1087,30 @@ static void write_iov_callback( } delete bciov; + + // check if commit is pending. + if (!inode->get_filecache()->is_flushing_in_progress()) { + bool create_commit_task = false; + + std::unique_lock lock(inode->commit_lock_5); + { + if (inode->is_commit_pending()) { + assert(inode->is_stable_write() == false); + inode->set_commit_in_progress(); + create_commit_task = true; + } + } + lock.unlock(); + + if (create_commit_task) { + // Create a new flush_task for the remaining bc_iovec. + struct rpc_task *commit_task = + client->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_FLUSH); + commit_task->init_flush(nullptr /* fuse_req */, ino); + commit_task->issue_commit_rpc(); + } + } + task->rpc_api->pvt = nullptr; // Release the task. @@ -993,7 +1157,8 @@ void rpc_task::issue_write_rpc() args.file = inode->get_fh(); args.offset = offset; args.count = length; - args.stable = FILE_SYNC; + args.stable = inode->is_stable_write() ? FILE_SYNC : UNSTABLE; + set_task_csched(inode->is_stable_write()); do { rpc_retry = false; @@ -1925,8 +2090,6 @@ void rpc_task::run_access() } while (rpc_retry); } - - void rpc_task::run_write() { // This must be called only for front end tasks. @@ -1937,7 +2100,6 @@ void rpc_task::run_write() const size_t length = rpc_api->write_task.get_size(); struct fuse_bufvec *const bufv = rpc_api->write_task.get_buffer_vector(); const off_t offset = rpc_api->write_task.get_offset(); - const bool sparse_write = (offset > inode->get_file_size(true)); uint64_t extent_left = 0; uint64_t extent_right = 0; @@ -1948,6 +2110,9 @@ void rpc_task::run_write() assert(inode->has_filecache()); // Update cached write timestamp, if needed. + // Question: Should not be updated after copy_to_cache(), as technically + // the write is not yet done. It may happen copy_to_cache() fails + // and we don't write anything. inode->stamp_cached_write(); /* @@ -2024,75 +2189,216 @@ void rpc_task::run_write() assert(extent_right >= (extent_left + length)); /* - * If this is a sparse write beyond eof we perform inline write, w/o this - * we can have some reader read from the sparse part of the file which is - * not yet in the bytes_chunk_cache. This read will be issued to the server - * and since server doesn't know the updated file size (as the write may - * still be sitting in our bytes_chunk_cache) it will return eof. - * This is not correct as such reads issued after successful write, are - * valid and should return 0 bytes for the sparse range. + * If the extent size exceeds the max allowed dirty size as returned by + * max_dirty_extent_bytes(), then it's time to flush the extent. + * Note that this will cause sequential writes to be flushed at just the + * right intervals to optimize fewer write calls and also allowing the + * server scheduler to merge better. + * See bytes_to_flush for how random writes are flushed. + * + * Note: max_dirty_extent is static as it doesn't change after it's + * queried for the first time. + */ + static const uint64_t max_dirty_extent = + inode->get_filecache()->max_dirty_extent_bytes(); + assert(max_dirty_extent > 0); + + /* + * If the commit_pending_bytes exceeds the max_commit_bytes, then it's time + * to commit the data. + */ + static const uint64_t max_commit_bytes = + inode->get_filecache()->max_commit_bytes(); + assert(max_commit_bytes > 0); + + /* + * How many bytes in the cache need to be flushed and commit. */ - if (!sparse_write) { + const uint64_t bytes_to_flush = + inode->get_filecache()->get_bytes_to_flush(); + const uint64_t bytes_to_commit = + inode->get_filecache()->get_bytes_to_commit(); + + /* + * Check if we need to flush/commit/inline now. + */ + const bool needs_flush = bytes_to_flush >= max_dirty_extent; + const bool needs_commit = (bytes_to_commit >= max_commit_bytes); + const bool inline_write = inode->get_filecache()->do_inline_write(); + + AZLogDebug("[{}] extent_left: {}, extent_right: {}, size: {}, " + "bytes_to_flush: {}, bytes_to_commit: {} (max_dirty_extent: {})" + " (max_commit_bytes: {})", + ino, extent_left, extent_right, + (extent_right - extent_left), + bytes_to_flush, + bytes_to_commit, + max_dirty_extent, + max_commit_bytes); + + /* + * Most common case is that we don't need to flush/commit/inline, so we + * can reply immediately. + */ + if (!needs_flush && !needs_commit && !inline_write) { + AZLogDebug("Reply write without syncing to Blob"); + reply_write(length); + return; + } + + if ((extent_right - extent_left) < max_dirty_extent) { /* - * If the extent size exceeds the max allowed dirty size as returned by - * max_dirty_extent_bytes(), then it's time to flush the extent. - * Note that this will cause sequential writes to be flushed at just the - * right intervals to optimize fewer write calls and also allowing the - * server scheduler to merge better. - * See bytes_to_flush for how random writes are flushed. - * - * Note: max_dirty_extent is static as it doesn't change after it's - * queried for the first time. + * This is the case of non-sequential writes causing enough dirty + * data to be accumulated, need to flush all of that. */ - static const uint64_t max_dirty_extent = - inode->get_filecache()->max_dirty_extent_bytes(); - assert(max_dirty_extent > 0); + extent_left = 0; + extent_right = UINT64_MAX; + } + /* + * Write Process + * - Gather enough writes in the cache. + * - If size of dirty data exceeds max_dirty_extent, + * flush the contigious extent from the start of file. + * - Flushing of the dirty membufs on completion mark those membufs + * as commit_pending. + * - If size of commit_pending data exceeds max_commit_bytes and inline_write, + * not required starts the commit task. Before starting the commit task, + * check for any flushing in progress, if flushing is in progress, set + * commit_pending. Once flushing completes, it will start the commit task. + * If flushing is not in progress, then start the commit task. + * - If inline_write is required, then check if bytes_flushing + + * bytes_commit_pending enough to make space for new writes, wait for + * flushing/commit to complete. If bytes_flushing + bytes_commit_pending + * is not enough, then try to flush more data and wait for commit to make space + * for new writes. + * - If commit/flushing in progress no new flush can be started. + * + * f.e. + * 1. Flush_Limit = 1GB, Commit_Limit = 2GB, inline_limit = 3GB + * First case + * Write = 1GB/s + * Flush = 500MB/s + * 1sec = 1GB dirty_membuf + * 2 sec = 1GB dirty_membuf + 500MB flushing + 500 commit_pending + * --> Can't start the flushing as going on. + * 3 sec = 2GB flushing + 1GB commit_pending --- inline should trigger here + * (Wait for flushing to complete) --> 2GB takes 4 sec to flush. + * ======= Flushing completes ======== + * 7 sec = 3GB commit_pending + * 8 sec = 1GB dirty + * 9 sec = 1GB dirty + 500MB flushing + 500MB commit_pending + * + * Second case + * Write = 1GB/s + * Flush = 1GB/s + * 1sec = 1GB dirty_membuf (Trigger async flushing) + * 2 sec = 1GB flushing + 1GB commit_pending + * 3 sec = 1GB dirty + 2GB commit_pending + * =======start commits============= + * =======inline trigger============ + * ======= Wait for commit to complete ========= + * 4sec = 800MB dirty (approx 100 msec to complete commit) + * 5sec = 1GB dirty + 200MB flushing + 800MB commit_pending + * + * 2. Flush_Limit = 1GB, Commit_Limit = 1GB, inline_limit = 2.4GB + * + */ + if (needs_commit) { /* - * How many bytes in the cache need to be flushed. + * We want to commit, but we need to wait for current flushing to complete. + * 1. If flushing is going on, set commit_pending to inode, when last membuf flushed, + * write_iov_callback(), start new task to commit the data. + * 2. If flushing is not going on, then create the task and start commit of data. + * + * Note: Ensure lock is held till we set the commit state depending on flushing + * going on or not. This lock is required to async IOs. */ - const uint64_t bytes_to_flush = - inode->get_filecache()->get_bytes_to_flush(); - - AZLogDebug("[{}] extent_left: {}, extent_right: {}, size: {}, " - "bytes_to_flush: {} (max_dirty_extent: {})", - ino, extent_left, extent_right, - (extent_right - extent_left), - bytes_to_flush, - max_dirty_extent); + bool start_commit_task = false; + if (!inode->is_commit_in_progress()) { + std::unique_lock lock(inode->commit_lock_5); + if (!inode->is_commit_in_progress()) + { + if (inode->get_filecache()->is_flushing_in_progress()) { + inode->set_commit_pending(); + } else { + inode->set_commit_in_progress(); + start_commit_task = true; + } + } + lock.unlock(); - if ((extent_right - extent_left) < max_dirty_extent) { - /* - * Current extent is not big enough to be flushed, see if we have - * enough dirty data that needs to be flushed. This is to cause - * random writes to be periodically flushed. - */ - if (bytes_to_flush < max_dirty_extent) { - AZLogDebug("Reply write without syncing to Blob"); - reply_write(length); - return; + if (start_commit_task) { + inode->commit_membufs(); } + } - /* - * This is the case of non-sequential writes causing enough dirty - * data to be accumulated, need to flush all of that. - */ - extent_left = 0; - extent_right = UINT64_MAX; + assert(inode->is_commit_in_progress() == true); + + /* + * If commit is in progress, then no new flush or commit task is started. + */ + if (inode->is_commit_in_progress() && !inline_write) { + AZLogDebug("Commit in progress, skip commit, inline_write is false"); + reply_write(length); + return; } } /* * Ok, we need to flush the extent now, check if we must do it inline. */ - if (sparse_write || inode->get_filecache()->do_inline_write()) { + if (inline_write) { INC_GBL_STATS(inline_writes, 1); - AZLogDebug("[{}] Inline write (sparse={}), {} bytes, extent @ [{}, {})", - ino, sparse_write, (extent_right - extent_left), + AZLogDebug("[{}] Inline write ({} bytes, extent @ [{}, {})", + ino, (extent_right - extent_left), extent_left, extent_right); - const int err = inode->flush_cache_and_wait(extent_left, extent_right); + assert(!needs_commit || inode->is_commit_in_progress()); + + /* + * prune_bytes is the number of bytes that need to be flushed to + * make space for the new writes. If it's 0, then we don't need to + * flush more data we can just wait for the ongoing flush/commit + * to complete. + */ + uint64_t prune_bytes = inode->get_filecache()->get_bytes_to_prune(); + + int err = 0; + if (prune_bytes == 0) { + AZLogDebug("No bytes to prune, Wait for ongoing flush/commit" + "to complete."); + /* + * wait_for_ongoing_flush() will wait for the ongoing flush and + * commit to complete. + * In case of inline_write, we want to block the writes till the + * enough space is made for new writes. Every write thread will + * come here and try to take the lock, only one thread succeeds + * and wait for the ongoing flush/commit to complete. Rest of the + * threads will wait for the lock to be released by the first + * thread. After first thread completes the wait, check inline write + * condition again and if it's still true. Mostly it will be false + * as the first thread would have made enough space for new writes. + * + * Note: flush_lock() is held so that no new flush or commit task + * is started. + */ + inode->flush_lock(); + if (inode->get_filecache()->do_inline_write()) { + err = inode->wait_for_ongoing_flush(0, UINT64_MAX); + } + inode->flush_unlock(); + } else { + AZLogDebug("Prune {} bytes to make space for new writes", prune_bytes); + /* + * flush_cache_and_wait() wait for any commit in progress to + * complete and then flush the cache. + */ + err = inode->flush_cache_and_wait(extent_left, extent_right, + false /* is_release*/); + } + if (err == 0) { reply_write(length); return; @@ -2113,8 +2419,36 @@ void rpc_task::run_write() */ inode->flush_lock(); - std::vector bc_vec = - inode->get_filecache()->get_dirty_bc_range(extent_left, extent_right); + /* + * If commit/flushing is in progress, then no new flush is started. + * Commit is hard requirement, On server side ongoing commit will race + * with this new write, it can cause the new write to be lost. + * For Async writes/kernel writeback cache, multiple writes can + * reach here as needs_flush true for them at very instance. First + * write thread have started the flush with big enough extent, rest of + * threads have smaller extent. It's not worth to start the flush for + * them as it increases the number of put blocks on server side. + * That's why we need to check for ongoing flush as well. + */ + if (inode->is_commit_in_progress() || + inode->get_filecache()->is_flushing_in_progress()) { + inode->flush_unlock(); + AZLogDebug("Commit/Flush in progress, skip flush, inline_write is false"); + reply_write(length); + return; + } + + uint64_t size = 0; + std::vector bc_vec; + /* + * If the inode is stable write, then get the dirty membufs in the range. + * otherwise get the dirty membufs contigious range. + */ + if (inode->is_stable_write()) { + bc_vec = inode->get_filecache()->get_dirty_bc_range(extent_left, extent_right); + } else { + bc_vec = inode->get_filecache()->get_contigious_dirty_bcs(size); + } if (bc_vec.size() == 0) { inode->flush_unlock(); @@ -3564,6 +3898,14 @@ void rpc_task::free_rpc_task() client->get_rpc_task_helper()->free_rpc_task(this); } +void rpc_task::set_task_csched(bool stable_write) +{ + if (client->mnt_options.nfs_port == 2048) { + csched = stable_write ? CONN_SCHED_FH_HASH : CONN_SCHED_RR; + } + return; +} + struct nfs_context* rpc_task::get_nfs_context() const { return client->get_nfs_context(csched, fh_hash);