diff --git a/turbonfs/inc/fcsm.h b/turbonfs/inc/fcsm.h index 09ce10be..db034b4b 100644 --- a/turbonfs/inc/fcsm.h +++ b/turbonfs/inc/fcsm.h @@ -95,8 +95,7 @@ class fcsm * * LOCKS: flush_lock. */ - void ensure_flush(uint64_t flush_bytes, - uint64_t write_off, + void ensure_flush(uint64_t write_off, uint64_t write_len, struct rpc_task *task = nullptr); @@ -107,8 +106,7 @@ class fcsm * it'll add a blocking commit target for completing task when given commit * goal is met. */ - void ensure_commit(uint64_t commit_bytes, - struct rpc_task *task = nullptr); + void ensure_commit(struct rpc_task *task = nullptr); /** * Callbacks to be called when flush/commit successfully complete. @@ -116,7 +114,7 @@ class fcsm * targets from ftgtq/ctgtq as appropriate. */ void on_flush_complete(uint64_t flush_bytes); - void on_commit_complete(); + void on_commit_complete(uint64_t commit_bytes); /** * Is the state machine currently running, i.e. it has sent (one or more) @@ -148,6 +146,10 @@ class fcsm void mark_running(); void clear_running(); + void run(struct rpc_task *task, + uint64_t extent_left, + uint64_t extent_right); + /** * Call when more writes are dispatched, or prepared to be dispatched. * This MUST be called before the write callback can be called. @@ -180,6 +182,21 @@ class fcsm return (fc_cb_count() > 0); } + /** + * Call when more commit are dispatched, or prepared to be dispatched. + * This MUST be called before the commit_callback can be called. + */ + void add_committing(uint64_t bytes) + { + assert(committed_seq_num <= committing_seq_num); + committing_seq_num += bytes; + } + + /* + * ctgtq_cleanup() is called when we switch to stable writes. + */ + void ctgtq_cleanup(); + private: /* * The singleton nfs_client, for convenience. diff --git a/turbonfs/inc/file_cache.h b/turbonfs/inc/file_cache.h index a00a4f45..731482c3 100644 --- a/turbonfs/inc/file_cache.h +++ b/turbonfs/inc/file_cache.h @@ -1239,6 +1239,16 @@ class bytes_chunk_cache std::vector get_flushing_bc_range(uint64_t st_off, uint64_t end_off) const; + /** + * Returns contiguous dirty (and not flushing) chunks from chunmap, starting + * with the lowest dirty offset, and returns the total number of (dirty) + * bytes contained in the returned chunks. + * Before returning it increases the inuse count of underlying membuf(s). + * Caller will typically flush these to the backing Blob as UNSTABLE + * writes. + */ + std::vector get_contiguous_dirty_bcs(uint64_t *bytes) const; + /* * Returns *all* commit pending chunks in chunkmap. * Before returning it increases the inuse count of underlying membuf(s) @@ -1257,7 +1267,7 @@ class bytes_chunk_cache * MUST check for that after holding the membuf lock, before it tries * to commit those membuf(s). */ - std::vector get_commit_pending_bcs() const; + std::vector get_commit_pending_bcs(uint64_t *bytes) const; /** * Drop cached data in the given range. @@ -1341,7 +1351,7 @@ class bytes_chunk_cache * We want to send as prompt as possible to utilize the n/w b/w but slow * enough to give the write scheduler an opportunity to merge better. */ - uint64_t max_dirty_extent_bytes() const + static uint64_t max_dirty_extent_bytes() { // Maximum cache size allowed in bytes. static const uint64_t max_total = @@ -1407,12 +1417,137 @@ class bytes_chunk_cache return bytes_flushing > 0; } + /** + * Maximum size of commit_pending data that can be in cache, before we + * must commit it to Blob. + * It should be greater than or equal to the flush threshold (as returned + * by max_dirty_extent_bytes()) and smaller than the inline write threshold + * (as suggested by do_inline_write()), to minimize inline flush waits as + * much as possible, in steady state. + */ + static uint64_t max_commit_bytes() + { + // Maximum cache size allowed in bytes. + static const uint64_t max_total = + (aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL); + assert(max_total != 0); + + /* + * Minimum of 60% of max cache and 2 times the flush limit. + * We want to commit as soon as possible w/o affecting performance. + * If we commit too often, since commit is a serializing operation, + * it'll affect the write throughput, otoh, if we commit too late + * then we might hit the inline write threshold, which again would + * serialize writes, bringing down throughput. + */ + static const uint64_t max_commit_bytes = + std::min((uint64_t)(max_total * 0.6), + 2 * max_dirty_extent_bytes()); + assert(max_commit_bytes > 0); + + return max_commit_bytes; + } + + /** + * Check if we must initiate a COMMIT RPC now. Note that the caller would + * just send the COMMIT RPC and not necessarily block the user write + * request till the COMMIT RPC completes, i.e., it's not an inline commit. + * + * We must start commit if: + * 1. We have enough commit_pending data for this file/cache, or, + * 2. Global memory pressure dictates that we commit now to free up + * memory. In this case we might be committing more frequently which + * won't necessarily be optimal, but we have no choice due to the + * memory pressure. + */ + bool commit_required() const + { + const bool local_pressure = + (bytes_commit_pending >= max_commit_bytes()); + + if (local_pressure) { + return true; + } + + /* + * TODO: Take cue from global memory pressure. + */ + return false; + } + + /** + * Check if we must initiate flush of some cached data. Note that the caller + * would just send the corresponding WRITE RPC and not necessarily block the + * user write request till the WRITE RPC completes, i.e., it's not an inline + * write. + * + * We must start flush/write if: + * 1. We have enough bytes to flush so that we can write a full sized + * block, or for the case of stable write, we have enough data to fill + * the scheduler queue. + * 2. Global memory pressure dictates that we flush now to free up memory. + * In this case we might be flushing more frequently which won't + * necessarily be optimal, but we have no choice due to the memory + * pressure. + */ + bool flush_required() const + { + const bool local_pressure = + (get_bytes_to_flush() >= max_dirty_extent_bytes()); + + if (local_pressure) { + return true; + } + + /* + * TODO: Take cue from global memory pressure. + */ + return false; + } + /** * This should be called by writer threads to find out if they must wait * for the write to complete. This will check both the cache specific and * global memory pressure. */ bool do_inline_write() const + { + /* + * Allow four dirty extents before we force inline write. + * This way 2 extents are commit_pending, 1 is dirty. We can issue commit + * for the commit_pending extents and accumulate new writes in the dirty. + * Before we hit inline limit, 2GB worth of space is freed up. This cycle + * should be good enough to keep the cache size in check. + */ + static const uint64_t max_dirty_allowed_per_cache = + max_dirty_extent_bytes() * 4; + const bool local_pressure = (bytes_dirty + bytes_commit_pending) > max_dirty_allowed_per_cache; + + if (local_pressure) { + return true; + } + + /* + * Global pressure is when get_prune_goals() returns non-zero bytes + * to be pruned inline. + */ + uint64_t inline_bytes; + + get_prune_goals(&inline_bytes, nullptr); + return (inline_bytes > 0); + } + + /** + * Inline write/flush means that we are under sufficient memory pressure + * that we want to slow down the application writes by not completing them + * after copying the data to cache (using copy_to_cache()) but instead + * complete application writes only after the flush completes. + * + * This function returns a non-zero number representing the number of bytes + * that we need to write inline, else if not under memory pressure returns + * zero. + */ + uint64_t get_inline_flush_bytes() const { /* * Allow two dirty extents before we force inline write. @@ -1420,11 +1555,18 @@ class bytes_chunk_cache * the second one. */ static const uint64_t max_dirty_allowed_per_cache = - max_dirty_extent_bytes() * 2; - const bool local_pressure = bytes_dirty > max_dirty_allowed_per_cache; + (max_dirty_extent_bytes() * 2); + const bool local_pressure = + ((int64_t) bytes_dirty > (int64_t) max_dirty_allowed_per_cache); if (local_pressure) { - return true; + /* + * Leave one max_dirty_extent_bytes worth of dirty bytes, and + * flush the rest. + */ + const int64_t flush_now = + (bytes_dirty - max_dirty_extent_bytes()); + return flush_now; } /* @@ -1433,8 +1575,20 @@ class bytes_chunk_cache */ uint64_t inline_bytes; + /* + * TODO: Noisy neighbor syndrome, where one file is hogging the cache, + * inline pruning will be triggered for other files. + */ get_prune_goals(&inline_bytes, nullptr); - return (inline_bytes > 0); + + /* + * (bytes_flushing + bytes_commit_pending) represents the data which + * is either already flushed or being flushed. Exclude that from the + * needs-to-be-flushed data. + */ + const int64_t flush_now = + (inline_bytes - (bytes_flushing + bytes_commit_pending)); + return std::max((int64_t) 0, flush_now); } /** diff --git a/turbonfs/inc/nfs_client.h b/turbonfs/inc/nfs_client.h index 40c365e4..84a78c09 100644 --- a/turbonfs/inc/nfs_client.h +++ b/turbonfs/inc/nfs_client.h @@ -547,6 +547,8 @@ struct nfs_client void jukebox_write(struct api_task_info *rpc_api); + void jukebox_flush(struct api_task_info *rpc_api); + /** * Convert between NFS fattr3 and POSIX struct stat. */ diff --git a/turbonfs/inc/nfs_inode.h b/turbonfs/inc/nfs_inode.h index f374f98c..cfa39341 100644 --- a/turbonfs/inc/nfs_inode.h +++ b/turbonfs/inc/nfs_inode.h @@ -289,6 +289,29 @@ struct nfs_inode */ struct stat attr; + /** + * We maintain following multiple views of the file and thus multiple file + * sizes for those views. + * - Cached. + * This is the view of the file that comprises of data that has been + * written by the application and saved in file cache. It may or may not + * have been flushed and/or committed. This is the most uptodate view of + * the file and applications must use this view. + * cached_filesize tracks the file size for this view. + * - Uncommited. + * This is the view of the file that tracks data that has been flushed + * using UNSTABLE writes but not yet COMMITted to the Blob. This view of + * the file is only used to see if the next PB call will write after the + * last PB'ed byte and thus can be appended. + * putblock_filesize tracks the file size for this view. + * - Committed. + * This is the view of the file that tracks data committed to the Blob. + * Other clients will see this view. + * attr.st_size tracks the file size for this view. + */ + off_t cached_filesize = 0; + mutable off_t putblock_filesize = 0; + /* * Has this inode seen any non-append write? * This starts as false and remains false as long as copy_to_cache() only @@ -310,7 +333,7 @@ struct nfs_inode * Note: As of now, we are not using this flag as commit changes not yet * integrated, so we are setting this flag to true. */ - bool stable_write = true; + bool stable_write = false; public: /* @@ -1400,11 +1423,10 @@ struct nfs_inode * initiate any new flush operations while some truncate call is in * progress (which must have held the flush_lock). */ - int flush_cache_and_wait(uint64_t start_off = 0, - uint64_t end_off = UINT64_MAX); + int flush_cache_and_wait(); /** - * Wait for currently flushing membufs to complete. + * Wait for currently flushing/committing membufs to complete. * Returns 0 on success and a positive errno value on error. * * Note : Caller must hold the inode flush_lock to ensure that @@ -1416,6 +1438,34 @@ struct nfs_inode int wait_for_ongoing_flush(uint64_t start_off = 0, uint64_t end_off = UINT64_MAX); + /** + * commit_membufs() is called to commit uncommitted membufs to the Blob. + * It creates commit RPC and sends it to the NFS server. + */ + void commit_membufs(std::vector &bcs); + + /** + * switch_to_stable_write() is called to switch the inode to stable write + * mode. It waits for all ongoing flush and subsequent commit to complete. + * If not already scheduled, it'll perform an explicit commit after the + * flush complete. + * Post that it'll mark inode for stable write and return. From then on + * any writes to this inode will be sent as stable writes. + */ + void switch_to_stable_write(); + + /** + * Check if stable write is required for the given offset. + * Given offset is the start of contiguous dirty membufs that need to be + * flushed to the Blob. + */ + bool check_stable_write_required(off_t offset); + + /** + * Wait for ongoing commit operation to complete. + */ + void wait_for_ongoing_commit(); + /** * Sync the dirty membufs in the file cache to the NFS server. * All contiguous dirty membufs are clubbed together and sent to the diff --git a/turbonfs/inc/rpc_stats.h b/turbonfs/inc/rpc_stats.h index 626914b1..d0a7bbed 100644 --- a/turbonfs/inc/rpc_stats.h +++ b/turbonfs/inc/rpc_stats.h @@ -139,9 +139,6 @@ class rpc_stats_az */ void on_rpc_issue() { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(stamp.issue == 0); stamp.issue = get_current_usecs(); assert(stamp.issue >= stamp.create); @@ -156,9 +153,6 @@ class rpc_stats_az */ void on_rpc_cancel() { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(stamp.issue != 0); assert((int64_t) stamp.issue <= get_current_usecs()); assert(stamp.dispatch == 0); @@ -178,9 +172,6 @@ class rpc_stats_az */ void on_rpc_complete(struct rpc_pdu *pdu, nfsstat3 status) { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert((status == NFS3ERR_RPC_ERROR) || (nfsstat3_to_errno(status) != -ERANGE)); diff --git a/turbonfs/inc/rpc_task.h b/turbonfs/inc/rpc_task.h index 2468d60d..94f09d18 100644 --- a/turbonfs/inc/rpc_task.h +++ b/turbonfs/inc/rpc_task.h @@ -2428,6 +2428,7 @@ struct rpc_task */ bool add_bc(const bytes_chunk& bc); void issue_write_rpc(); + void issue_commit_rpc(); #ifdef ENABLE_NO_FUSE /* diff --git a/turbonfs/src/fcsm.cpp b/turbonfs/src/fcsm.cpp index 7231591d..76ebb043 100644 --- a/turbonfs/src/fcsm.cpp +++ b/turbonfs/src/fcsm.cpp @@ -45,7 +45,7 @@ fcsm::fctgt::fctgt(struct fcsm *fcsm, } #endif - AZLogDebug("[{}] [FCSM] {} fctgt queued (F: {}, C: {}, T: {})", + AZLogInfo("[{}] [FCSM] {} fctgt queued (F: {}, C: {}, T: {})", fcsm->get_inode()->get_fuse_ino(), task ? "Blocking" : "Non-blocking", flush_seq, @@ -88,12 +88,358 @@ void fcsm::add_flushing(uint64_t bytes) flushing_seq_num += bytes; } -void fcsm::ensure_flush(uint64_t flush_bytes, - uint64_t write_off, +void fcsm::run(struct rpc_task *task, + uint64_t extent_left, + uint64_t extent_right) +{ +#ifdef NDEBUG + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); +#endif + + const size_t length = task->rpc_api->write_task.get_size(); + const off_t offset = task->rpc_api->write_task.get_offset(); + const bool sparse_write = false; + const fuse_ino_t ino = task->rpc_api->write_task.get_ino(); + + /* + * We have successfully copied the user data into the cache. + * We can have the following cases (in decreasing order of criticality): + * 1. Cache has dirty+uncommitted data beyond the "inline write threshold", + * ref do_inline_write(). + * In this case we begin flush of the dirty data and/or commit of the + * uncommitted data. + * This is a memory pressure situation and hence we do not complete the + * application write till all the above backend writes complete. + * This will happen when application is writing faster than our backend + * write throughput, eventually dirty data will grow beyond the "inline + * write threshold" and then we have to slow down the writers by delaying + * completion. + * 2. Cache has uncommitted data beyond the "commit threshold", ref + * commit_required(). + * In this case we free up space in the cache by committing data. + * We just initiate a commit while the current write request is + * completed. Note that we want to delay commits so that we can reduce + * the commit calls (commit more data per call) as commit calls sort + * of serialize the writes as we cannot send any other write to the + * server while commit is on. + * 3. Cache has enough dirty data that we can flush. + * For sequential writes, this means the new write caused a contiguous + * extent to be greater than max_dirty_extent_bytes(), aka MDEB, while + * for non-seq writes it would mean that the total dirty data grew beyond + * MDEB. + * In this case we begin flush of this contiguous extent (in multiple + * parallel wsize sized blocks) since there's no benefit in waiting more + * as the data is sufficient for the server scheduler to effectively + * write, in optimal sized blocks. + * We complete the application write rightaway without waiting for the + * flush to complete as we are not under memory pressure. + * + * Other than this we have a special case of "write beyond eof" (termed + * sparse write). In sparse write case also we perform "inline write" of + * all the dirty data. This is needed for correct read behaviour. Imagine + * a reader reading from the sparse part of the file which is not yet in + * the bytes_chunk_cache. This read will be issued to the server and since + * server doesn't know the updated file size (as the write may still be + * sitting in our bytes_chunk_cache) it will return eof. This is not correct + * as such reads issued after successful write, are valid and should return + * 0 bytes for the sparse range. + */ + + /* + * If the extent size exceeds the max allowed dirty size as returned by + * max_dirty_extent_bytes(), then it's time to flush the extent. + * Note that this will cause sequential writes to be flushed at just the + * right intervals to optimize fewer write calls and also allowing the + * server scheduler to merge better. + * See bytes_to_flush for how random writes are flushed. + * + * Note: max_dirty_extent is static as it doesn't change after it's + * queried for the first time. + */ + static const uint64_t max_dirty_extent = + inode->get_filecache()->max_dirty_extent_bytes(); + assert(max_dirty_extent > 0); + + /* + * Check what kind of limit we have hit. + */ + const bool need_inline_write = + (sparse_write || inode->get_filecache()->do_inline_write()); + const bool need_commit = + inode->get_filecache()->commit_required(); + const bool need_flush = + ((extent_right - extent_left) >= max_dirty_extent) || + inode->get_filecache()->flush_required(); + + AZLogDebug("[{}] handle write (sparse={}, need_inline_write={}, " + "need_commit={}, need_flush={}, extent=[{}, {}))", + inode->get_fuse_ino(), sparse_write, need_inline_write, + need_commit, need_flush, extent_left, extent_right); + + /* + * Nothing to do, we can complete the application write rightaway. + * This should be the happy path! + */ + if (!need_inline_write && !need_commit && !need_flush) { + task->reply_write(length); + return; + } + + /* + * Do we need to perform "inline write"? + * Inline write implies, we flush all the dirty data and wait for all the + * corresponding backend writes to complete. + */ + if (need_inline_write) { + INC_GBL_STATS(inline_writes, 1); + + AZLogDebug("[{}] Inline write (sparse={}), {} bytes, extent @ [{}, {})", + ino, sparse_write, (extent_right - extent_left), + extent_left, extent_right); + + /* + * ensure_flush() will arrange to flush all the dirty data and complete + * the write task when the flush completes. + */ + inode->flush_lock(); + if (inode->is_stable_write()) { + inode->get_fcsm()->ensure_flush(offset, length, task); + } else { + inode->get_fcsm()->ensure_commit(task); + } + inode->flush_unlock(); + + // Free up the fuse thread without completing the application write. + return; + } + + // Case 2: Commit + /* + * We don't need to do inline writes. See if we need to commit the + * uncommitted data to the backend. We just need to stat the commit + * and not hold the current write task till the commit completes. + */ + if (need_commit) { + inode->flush_lock(); + if (inode->is_commit_in_progress()) { + AZLogDebug("[{}] Commit already in progress, skipping commit", + ino); + } else { + AZLogDebug("[{}] Committing {} bytes", ino, + inode->get_filecache()->max_commit_bytes()); + inode->get_fcsm()->ensure_commit(nullptr); + } + inode->flush_unlock(); + + assert(!need_inline_write); + } + + /* + * Ok, we don't need to do inline writes. See if we have enough dirty + * data and we need to start async flush. + */ + + // Case 3: Flush + if ((extent_right - extent_left) < max_dirty_extent) { + /* + * This is the case of non-sequential writes causing enough dirty + * data to be accumulated, need to flush all of that. + */ + extent_left = 0; + extent_right = UINT64_MAX; + } + + /* + * Queue a non-blocking flush target for flushing all the dirty data. + */ + inode->flush_lock(); + inode->get_fcsm()->ensure_flush(offset, length, nullptr); + inode->flush_unlock(); + + /* + * Complete the write request without waiting for the backend flush to + * complete. + */ + task->reply_write(length); +} + +void fcsm::ctgtq_cleanup() +{ + assert(inode->is_flushing); + assert(inode->is_stable_write()); + + AZLogDebug("[FCSM][{}] ctgtq_cleanup()", inode->get_fuse_ino()); + while (!ctgtq.empty()) { + struct fctgt &ctgt = ctgtq.front(); + assert(ctgt.fcsm == this); + + struct rpc_task *task = ctgt.task; + if (task) { + #ifdef NDEBUG + // Complete the commit task as we switch to stable writes. + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); + #endif + task->reply_write(task->rpc_api->write_task.get_size()); + } + + AZLogInfo("[FCSM][{}] ctgtq_cleanup(): completed task: {} commit_seq: {}", + inode->get_fuse_ino(), + fmt::ptr(task), + ctgt.commit_seq); + ctgtq.pop(); + } + + assert(ctgtq.empty()); +} + +void fcsm::ensure_commit(struct rpc_task *task) +{ + uint64_t commit_bytes = 0; + + assert(inode->is_flushing == true); + AZLogDebug("[{}] [FCSM] ensure_commit<{}> task: {}", + inode->get_fuse_ino(), + task ? "blocking" : "non-blocking", + fmt::ptr(task)); + + assert(!inode->is_stable_write()); + + // committed_seq_num can never be more than committing_seq_num. + assert(committed_seq_num <= committing_seq_num); + +#ifdef NDEBUG + if (task) { + // task provided must be a frontend write task. + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); + } +#endif + + /* + * What will be the committed_seq_num value after commit_bytes are committed? + */ + if (inode->get_filecache()->get_bytes_to_commit() == 0) { + int64_t bytes = inode->get_filecache()->get_bytes_to_flush() - + inode->get_filecache()->max_dirty_extent_bytes(); + commit_bytes = std::max(bytes, (int64_t) 0); + } else { + commit_bytes = inode->get_filecache()->get_bytes_to_commit(); + } + + if (commit_bytes == 0) { + AZLogDebug("COMMIT BYTES ZERO"); + } + + const uint64_t target_committed_seq_num = committed_seq_num + commit_bytes; + + /* + * If the state machine is already running, we just need to add an + * appropriate commit target and return. When the ongoing operation + * completes, this commit would be dispatched. + */ + if (is_running()) { +#ifdef NDEBUG + /* + * Make sure commit targets are always added in an increasing commit_seq. + */ + if (!ctgtq.empty()) { + assert(ctgtq.front().commit_seq <= target_committed_seq_num); + assert(ctgtq.front().flush_seq == 0); + } +#endif + ctgtq.emplace(this, + 0 /* target flush_seq */, + target_committed_seq_num /* target commit_seq */, + task); + assert(is_running()); + assert(inode->is_flushing); + assert(!inode->is_stable_write()); + return; + } + + /* + * FCSM not running. + * Flushed_seq_num tells us how much data is already flushed, If it's less + * than the target_committed_seq_num, we need to schedule a flush to catch up + * with the target_committed_seq_num. + */ + if (flushed_seq_num < target_committed_seq_num) { + AZLogDebug("[{}] [FCSM] not running, schedule a new flush to catch up, " + "flushed_seq_num: {}, target_committed_seq_num: {}", + inode->get_fuse_ino(), + flushed_seq_num.load(), + target_committed_seq_num); + + ensure_flush(0, 0, nullptr); + assert(is_running()); + assert(flushing_seq_num >= target_committed_seq_num); + + if (!inode->is_stable_write()) { + /** + * Enqueue a commit target to be triggered once the flush completes. + */ + ctgtq.emplace(this, + 0 /* target flush_seq */, + target_committed_seq_num /* target commit_seq */, + task); + } else { + task->reply_write(task->rpc_api->write_task.get_size()); + } + + return; + } else { + AZLogDebug("[{}] [FCSM] not running, schedule a new commit," + " flushed_seq_num: {}, " + "target_committed_seq_num: {}", + inode->get_fuse_ino(), + flushed_seq_num.load(), + target_committed_seq_num); + + uint64_t bytes; + std::vector bc_vec = + inode->get_filecache()->get_commit_pending_bcs(&bytes); + assert(bc_vec.empty() == (bytes == 0)); + + /* + * FCSM not running and nothing to commit, complete the task and return. + * It may happen due writes are changed to stable write. + */ + if (bytes == 0) { + assert(inode->is_stable_write() || commit_bytes == 0); + if (task) { + AZLogDebug("[{}] [FCSM] not running and nothing to commit, " + "completing fuse write rightaway", + inode->get_fuse_ino()); + task->reply_write(task->rpc_api->write_task.get_size()); + } + return; + } + + inode->commit_membufs(bc_vec); + assert(is_running()); + assert(committing_seq_num >= commit_bytes); + } +} + +/** + * Must be called with flush_lock() held. + */ +void fcsm::ensure_flush(uint64_t write_off, uint64_t write_len, struct rpc_task *task) { - AZLogDebug("[{}] [FCSM] ensure_flush<{}>({}), write req [{}, {}), task: {}", + uint64_t flush_bytes = 0; + + assert(inode->is_flushing == true); + AZLogDebug("[{}] [FCSM] ensure_flush<{}>({}), write req [{}, {}], task: {}", inode->get_fuse_ino(), task ? "blocking" : "non-blocking", flush_bytes, @@ -113,30 +459,29 @@ void fcsm::ensure_flush(uint64_t flush_bytes, if (task) { // task provided must be a frontend write task. assert(task->magic == RPC_TASK_MAGIC); - assert(task->get_op_type() == FUSE_WRITE); - assert(task->rpc_api->write_task.is_fe()); - assert(task->rpc_api->write_task.get_size() > 0); - - // write_len and write_off must match that of the task. - assert(task->rpc_api->write_task.get_size() == write_len); - assert(task->rpc_api->write_task.get_offset() == (off_t) write_off); + if (task->get_op_type() == FUSE_WRITE) { + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); + // write_len and write_off must match that of the task. + assert(task->rpc_api->write_task.get_size() == write_len); + assert((uint64_t) task->rpc_api->write_task.get_offset() == + write_off); + } else { + assert(0); + } } #endif - /* - * Grab flush_lock to atomically get list of dirty chunks, which are not - * already being flushed. This also protects us racing with a truncate - * call and growing the file size after truncate shrinks the file. - */ - inode->flush_lock(); - /* * What will be the flushed_seq_num value after all current dirty bytes are * flushed? That becomes our target flushed_seq_num. */ const uint64_t bytes_to_flush = inode->get_filecache()->get_bytes_to_flush(); - const uint64_t target_flushed_seq_num = flushing_seq_num + bytes_to_flush; + const uint64_t last_flush_seq = + !ftgtq.empty() ? ftgtq.front().flush_seq : 0; + const uint64_t target_flushed_seq_num = + std::max((flushing_seq_num + bytes_to_flush), last_flush_seq); /* * If the state machine is already running, we just need to add an @@ -171,11 +516,17 @@ void fcsm::ensure_flush(uint64_t flush_bytes, } } #endif + if (task == nullptr && + (target_flushed_seq_num == last_flush_seq)) { + assert(is_running()); + return; + } + ftgtq.emplace(this, target_flushed_seq_num /* target flush_seq */, 0 /* commit_seq */, task); - inode->flush_unlock(); + assert(is_running()); return; } @@ -183,27 +534,16 @@ void fcsm::ensure_flush(uint64_t flush_bytes, * FCSM not running. */ uint64_t bytes; - std::vector bc_vec = - inode->get_filecache()->get_dirty_nonflushing_bcs_range(0, UINT64_MAX, - &bytes); - assert(bc_vec.empty() == (bytes == 0)); + std::vector bc_vec; - /* - * FCSM not running and nothing to flush, complete the task and return. - * Note that moe dirty data can be added after we get the list of dirty - * and non-flushing bcs above, but they can be flushed at a later point. - */ - if (bytes == 0) { - inode->flush_unlock(); - - if (task) { - AZLogDebug("[{}] [FCSM] not running and nothing to flush, " - "completing fuse write [{}, {}) rightaway", - inode->get_fuse_ino(), write_off, write_off+write_len); - task->reply_write(write_len); - } - return; + if (inode->is_stable_write()) { + bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range( + 0, UINT64_MAX, &bytes); + } else { + bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(&bytes); } + assert(bc_vec.empty() == (bytes == 0)); + assert(bytes > 0); /* * Kickstart the state machine. @@ -231,6 +571,152 @@ void fcsm::ensure_flush(uint64_t flush_bytes, assert(is_running()); assert(flushing_seq_num == (flushing_seq_num_before + bytes)); assert(flushed_seq_num <= flushing_seq_num); +} + +/** + * TODO: We MUST ensure that on_commit_complete() doesn't block else it'll + * block a libnfs thread which may stall further request processing + * which may cause deadlock. + */ +void fcsm::on_commit_complete(uint64_t commit_bytes) +{ + // Must be called only for success. + assert(commit_bytes > 0); + assert(!inode->get_filecache()->is_flushing_in_progress()); + + // Commit callback can only be called if FCSM is running. + assert(is_running()); + + // Update committed_seq_num to account for the commit_bytes. + committed_seq_num += commit_bytes; + + // committed_seq_num can never go more than committing_seq_num. + assert(committed_seq_num == committing_seq_num); + + AZLogDebug("[{}] [FCSM] on_commit_complete, committed_seq_num now: {}, " + "committing_seq_num: {}", + inode->get_fuse_ino(), + committed_seq_num.load(), + committing_seq_num.load()); + + inode->flush_lock(); + + /* + * Go over all queued commit targets to see if any can be completed after + * the latest commit completed. + */ + while (!ctgtq.empty()) { + struct fctgt& tgt = ctgtq.front(); + + assert(tgt.fcsm == this); + + /* + * ftgtq has commit targets in increasing order of committed_seq_num, so + * as soon as we find one that's greater than committed_seq_num, we can + * safely skip the rest. + */ + if (tgt.commit_seq > committed_seq_num) { + break; + } + + if (tgt.task) { + assert(tgt.task->magic == RPC_TASK_MAGIC); + if (tgt.task->get_op_type() == FUSE_WRITE) { + assert(tgt.task->rpc_api->write_task.is_fe()); + assert(tgt.task->rpc_api->write_task.get_size() > 0); + + AZLogInfo("[{}] [FCSM] completing blocking commit target: {}, " + "committed_seq_num: {}, write task: [{}, {})", + inode->get_fuse_ino(), + tgt.commit_seq, + committed_seq_num.load(), + tgt.task->rpc_api->write_task.get_offset(), + tgt.task->rpc_api->write_task.get_offset() + + tgt.task->rpc_api->write_task.get_size()); + + tgt.task->reply_write( + tgt.task->rpc_api->write_task.get_size()); + } else { + assert(0); + } + } else { + AZLogDebug("[{}] [FCSM] completing non-blocking flush target: {}, " + "flushed_seq_num: {}", + inode->get_fuse_ino(), + tgt.flush_seq, + flushed_seq_num.load()); + } + + // Flush target accomplished, remove from queue. + ctgtq.pop(); + } + + assert(!inode->get_filecache()->is_flushing_in_progress()); + assert(!inode->is_commit_in_progress()); + assert(committed_seq_num == committing_seq_num); + assert(flushed_seq_num == committed_seq_num); + assert(!inode->is_stable_write() || ctgtq.empty()); + + /* + * See if we have more commit targets and issue flush for the same. + */ + if (!ftgtq.empty() || !ctgtq.empty()) { + assert(ftgtq.empty() || ftgtq.front().flush_seq > flushed_seq_num); + assert(ctgtq.empty() || ctgtq.front().commit_seq > committed_seq_num); + /* + * See if we have more flush targets and issue them. + */ + uint64_t bytes; + std::vector bc_vec; + if (inode->is_stable_write()) { + bc_vec = + inode->get_filecache()->get_dirty_nonflushing_bcs_range( + 0, UINT64_MAX, &bytes); + } else { + bc_vec = + inode->get_filecache()->get_contiguous_dirty_bcs(&bytes); + } + + /* + * Since we have a flush target asking more data to be flushed, we must + * have the corresponding bcs in the file cache. + */ + assert(!bc_vec.empty()); + assert(bytes > 0); + + // flushed_seq_num can never be more than flushing_seq_num. + assert(flushed_seq_num <= flushing_seq_num); + + AZLogDebug("[{}] [FCSM] continuing, flushing_seq_num now: {}, " + "flushed_seq_num: {}", + inode->get_fuse_ino(), + flushing_seq_num.load(), + flushed_seq_num.load()); + + // sync_membufs() will update flushing_seq_num() and mark fcsm running. + inode->sync_membufs(bc_vec, false /* is_flush */); + assert(flushing_seq_num >= bytes); + } else { + AZLogDebug("[{}] [FCSM] idling, flushed_seq_num now: {}, " + "committed_seq_num: {}", + inode->get_fuse_ino(), + flushed_seq_num.load(), + committed_seq_num.load()); + + // FCSM should not idle when there's any ongoing flush. + assert(flushing_seq_num == flushed_seq_num); + assert(committed_seq_num == committing_seq_num); + assert(flushed_seq_num == committed_seq_num); + + /* + * It may happen flush initiated from flush_cache_and_wait(), so + * we should not clear the running flag. + */ + if(!inode->get_filecache()->is_flushing_in_progress()) { + assert(ctgtq.empty() && ftgtq.empty()); + clear_running(); + } + } inode->flush_unlock(); } @@ -252,7 +738,7 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) assert(fc_cb_running()); // Flush callback can only be called if FCSM is running. - assert(is_running()); + // assert(is_running()); // Update flushed_seq_num to account for the newly flushed bytes. flushed_seq_num += flush_bytes; @@ -285,7 +771,14 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) * queued flush targets which includes completing the waiting tasks and/or * trigger pending flush/commit. */ - if (inode->get_filecache()->is_flushing_in_progress()) { + if (inode->get_filecache()->is_flushing_in_progress() || + !is_running() || + inode->is_commit_in_progress()) { + AZLogDebug("[{}] [FCSM] not running, flushing_seq_num now: {}, " + "flushed_seq_num: {}", + inode->get_fuse_ino(), + flushing_seq_num.load(), + flushed_seq_num.load()); inode->flush_unlock(); return; } @@ -310,21 +803,24 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) if (tgt.task) { assert(tgt.task->magic == RPC_TASK_MAGIC); - assert(tgt.task->get_op_type() == FUSE_WRITE); - assert(tgt.task->rpc_api->write_task.is_fe()); - assert(tgt.task->rpc_api->write_task.get_size() > 0); - - AZLogDebug("[{}] [FCSM] completing blocking flush target: {}, " - "flushed_seq_num: {}, write task: [{}, {})", - inode->get_fuse_ino(), - tgt.flush_seq, - flushed_seq_num.load(), - tgt.task->rpc_api->write_task.get_offset(), - tgt.task->rpc_api->write_task.get_offset() + - tgt.task->rpc_api->write_task.get_size()); - - tgt.task->reply_write( - tgt.task->rpc_api->write_task.get_size()); + if (tgt.task->get_op_type() == FUSE_WRITE) { + assert(tgt.task->rpc_api->write_task.is_fe()); + assert(tgt.task->rpc_api->write_task.get_size() > 0); + + AZLogInfo("[{}] [FCSM] completing blocking flush target: {}, " + "flushed_seq_num: {}, write task: [{}, {})", + inode->get_fuse_ino(), + tgt.flush_seq, + flushed_seq_num.load(), + tgt.task->rpc_api->write_task.get_offset(), + tgt.task->rpc_api->write_task.get_offset() + + tgt.task->rpc_api->write_task.get_size()); + + tgt.task->reply_write( + tgt.task->rpc_api->write_task.get_size()); + } else { + assert(0); + } } else { AZLogDebug("[{}] [FCSM] completing non-blocking flush target: {}, " "flushed_seq_num: {}", @@ -338,22 +834,56 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) } /* - * See if we have more flush targets and check if the next flush target - * has its flush issued. If yes, then we need to wait for this flush to - * complete and we will take stock of flush targets when that completes. + * Check if there commit target to be triggered. + * If we flushed, we should trigger commit so that memory is released. */ - if (!ftgtq.empty() && (ftgtq.front().flush_seq > flushing_seq_num)) { + if (!ctgtq.empty() && (ctgtq.front().commit_seq < flushed_seq_num)) { + assert(!inode->is_stable_write()); + uint64_t bytes; std::vector bc_vec = - inode->get_filecache()->get_dirty_nonflushing_bcs_range( - 0, UINT64_MAX, &bytes); + inode->get_filecache()->get_commit_pending_bcs(&bytes); + assert(bc_vec.empty() == (bytes == 0)); + + /* + * Since we have a commit target asking more data to be committed, we + * must have the corresponding bcs in the file cache. + */ + assert(!bc_vec.empty()); + assert(bytes > 0); + + // commit_membufs() will update committing_seq_num() and mark fcsm running. + inode->commit_membufs(bc_vec); + assert(committing_seq_num >= bytes); + } else if ((!ftgtq.empty() && (ftgtq.front().flush_seq > flushed_seq_num)) || + (!ctgtq.empty() && (ctgtq.front().commit_seq > committed_seq_num))) { + /* + * See if we have more flush targets and check if the next flush target + * has its flush issued. If yes, then we need to wait for this flush to + * complete and we will take stock of flush targets when that completes. + */ + uint64_t bytes; + std::vector bc_vec; + if (inode->is_stable_write()) { + bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range( + 0, UINT64_MAX, + &bytes); + } else { + bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(&bytes); + } /* * Since we have a flush target asking more data to be flushed, we must * have the corresponding bcs in the file cache. */ assert(!bc_vec.empty()); - assert(bytes >= (ftgtq.front().flush_seq - flushing_seq_num)); + + if (inode->is_stable_write()) { + // We should have all the dirty data in the chunkmap. + assert(bytes >= (ftgtq.front().flush_seq - flushed_seq_num)); + } else { + assert(bytes > 0); + } // flushed_seq_num can never be more than flushing_seq_num. assert(flushed_seq_num <= flushing_seq_num); @@ -367,7 +897,7 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) // sync_membufs() will update flushing_seq_num() and mark fcsm running. inode->sync_membufs(bc_vec, false /* is_flush */); assert(flushing_seq_num >= bytes); - } else if (ftgtq.empty()) { + } else if (ftgtq.empty() && ctgtq.empty()) { AZLogDebug("[{}] [FCSM] idling, flushing_seq_num now: {}, " "flushed_seq_num: {}", inode->get_fuse_ino(), @@ -375,13 +905,19 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) flushed_seq_num.load()); // FCSM should not idle when there's any ongoing flush. - assert(flushing_seq_num == flushed_seq_num); + assert(flushing_seq_num >= flushed_seq_num); /* - * No more flush targets, pause the state machine. - * TODO: We need to clear running when flush fails. + * If commit is in progress, then we should not clear + * the running flag. Most likely it's issued from flush_cache_and_wait(). */ - clear_running(); + if (!inode->is_commit_in_progress()) { + assert(ftgtq.empty() || ctgtq.empty()); + clear_running(); + } + } else { + assert(0); + AZLogDebug("SHould not reach here"); } inode->flush_unlock(); diff --git a/turbonfs/src/file_cache.cpp b/turbonfs/src/file_cache.cpp index 10284f2b..5fc2989d 100644 --- a/turbonfs/src/file_cache.cpp +++ b/turbonfs/src/file_cache.cpp @@ -2651,10 +2651,14 @@ void bytes_chunk_cache::clear_nolock(bool shutdown) * TODO: As bcs returned by this function are locked, it block parallel read * on these bcs. Need to check if we can relax lock condition. */ -std::vector bytes_chunk_cache::get_commit_pending_bcs() const +std::vector bytes_chunk_cache::get_commit_pending_bcs(uint64_t *bytes) const { std::vector bc_vec; + if (bytes) { + *bytes = 0; + } + // TODO: Make it shared lock. const std::unique_lock _lock(chunkmap_lock_43); auto it = chunkmap.lower_bound(0); @@ -2669,6 +2673,53 @@ std::vector bytes_chunk_cache::get_commit_pending_bcs() const assert(!mb->is_dirty()); assert(mb->is_uptodate()); bc_vec.emplace_back(bc); + + if (bytes) { + *bytes += bc.length; + } + } + + ++it; + } + + return bc_vec; +} + +std::vector bytes_chunk_cache::get_contiguous_dirty_bcs( + uint64_t *bytes) const +{ + std::vector bc_vec; + if (bytes) { + *bytes = 0; + } + + // TODO: Make it shared lock. + const std::unique_lock _lock(chunkmap_lock_43); + auto it = chunkmap.lower_bound(0); + uint64_t prev_offset = AZNFSC_BAD_OFFSET; + + while (it != chunkmap.cend()) { + const struct bytes_chunk& bc = it->second; + struct membuf *mb = bc.get_membuf(); + + if (mb->is_dirty() && !mb->is_flushing()) { + if (prev_offset != AZNFSC_BAD_OFFSET) { + if (prev_offset != bc.offset) { + break; + } else { + if (bytes) { + *bytes += bc.length; + } + prev_offset = bc.offset + bc.length; + } + } else { + if (bytes) { + *bytes += bc.length; + } + prev_offset = bc.offset + bc.length; + } + mb->set_inuse(); + bc_vec.emplace_back(bc); } ++it; diff --git a/turbonfs/src/nfs_client.cpp b/turbonfs/src/nfs_client.cpp index 77ef0cd1..978887dd 100644 --- a/turbonfs/src/nfs_client.cpp +++ b/turbonfs/src/nfs_client.cpp @@ -461,6 +461,13 @@ void nfs_client::jukebox_runner() js->rpc_api->write_task.get_ino()); jukebox_write(js->rpc_api); break; + case FUSE_FLUSH: + AZLogWarn("[JUKEBOX REISSUE] flush(req={}, ino={})", + fmt::ptr(js->rpc_api->req), + js->rpc_api->flush_task.get_ino()); + jukebox_flush(js->rpc_api); + break; + /* TODO: Add other request types */ /* TODO: Add other request types */ default: AZLogError("Unknown jukebox seed type: {}", (int) js->rpc_api->optype); @@ -1636,6 +1643,39 @@ void nfs_client::jukebox_write(struct api_task_info *rpc_api) write_task->issue_write_rpc(); } +/* + * This function will be called only to retry the commit requests that failed + * with JUKEBOX error. + * rpc_api defines the RPC request that need to be retried. + */ +void nfs_client::jukebox_flush(struct api_task_info *rpc_api) +{ + /* + * For commit task pvt has bc_vec, which has copy of byte_chunk vector. + * To proceed it should be valid. + * + * Note: Commit task is always a backend task, 'req' is nullptr. + */ + assert(rpc_api->pvt != nullptr); + assert(rpc_api->optype == FUSE_FLUSH); + assert(rpc_api->req == nullptr); + + /* + * Create a new task to retry the commit request. + */ + struct rpc_task *commit_task = + get_rpc_task_helper()->alloc_rpc_task(FUSE_FLUSH); + commit_task->init_flush(rpc_api->req /* fuse_req */, + rpc_api->flush_task.get_ino()); + commit_task->rpc_api->pvt = rpc_api->pvt; + rpc_api->pvt = nullptr; + + // Any new task should start fresh as a parent task. + assert(commit_task->rpc_api->parent_task == nullptr); + + commit_task->issue_commit_rpc(); +} + /* * This function will be called only to retry the read requests that failed * with JUKEBOX error. diff --git a/turbonfs/src/nfs_inode.cpp b/turbonfs/src/nfs_inode.cpp index d7468eff..55f5c3f9 100644 --- a/turbonfs/src/nfs_inode.cpp +++ b/turbonfs/src/nfs_inode.cpp @@ -34,7 +34,7 @@ nfs_inode::nfs_inode(const struct nfs_fh3 *filehandle, assert(write_error == 0); // TODO: Revert this to false once commit changes integrated. - assert(stable_write == true); + assert(stable_write == false); assert(commit_state == commit_state_t::COMMIT_NOT_NEEDED); #ifndef ENABLE_NON_AZURE_NFS @@ -380,6 +380,175 @@ int nfs_inode::get_actimeo_max() const } } +/* + * Caller should hold flush_lock(). + */ +void nfs_inode::wait_for_ongoing_commit() +{ + /* + * TODO: See if we can eliminate inline sleep. + */ + if (is_commit_in_progress()) { + AZLogWarn("[{}] wait_for_ongoing_commit() will sleep inline!!", + get_fuse_ino()); + } + + while (is_commit_in_progress()) { + assert(!get_filecache()->is_flushing_in_progress()); + ::usleep(1000); + } + + assert(!is_commit_in_progress()); +} + +/* + * This function is called with flush_lock() held. + * This should be called whenever we figure out that we cannot proceed with + * unstable writes (most common reason being, next write is not an append + * write). Once this function returns, following is guaranteed: + * - There will be no flushes in progress. + * - There will be no commit_pending data and no commit inprogress. + * - inode->stable_write will be set to true. + */ +void nfs_inode::switch_to_stable_write() +{ + assert(is_flushing); + assert(!is_stable_write()); + + AZLogInfo("[{}] Switching to stable write", ino); + + /* + * switch_to_stable_write() is called from places where we are about to + * start a flush operation. Before that we check to see if we need to + * change to stable write. Since we are not flushing yet and since we + * do not support multiple ongoing flushes, we are guaranteed that no + * flush should be in progress when we reach here. + * Similarly commit should not be in progress. + */ + assert(!is_commit_in_progress()); + assert(!get_filecache()->is_flushing_in_progress()); + + /* + * Check if there is anything to commit, if not then + * switch to stable write. + */ + if (get_filecache()->get_bytes_to_commit() == 0) { + AZLogDebug("[{}] Nothing to commit, switching to stable write", ino); + set_stable_write(); + + /* + * Now we moved to stable write, cleanup the commit target queue. + */ + get_fcsm()->ctgtq_cleanup(); + return; + } + + uint64_t bytes; + std::vector bc_vec = + get_filecache()->get_commit_pending_bcs(&bytes); + assert(bc_vec.empty() == (bytes == 0)); + assert(bytes > 0); + + /* + * Issue the commit RPC to commit the pending data. + */ + commit_membufs(bc_vec); + + /* + * Wait for the commit to complete. + */ + wait_for_ongoing_commit(); + + assert(get_filecache()->get_bytes_to_commit() == 0); + assert(!get_filecache()->is_flushing_in_progress()); + assert(!is_commit_in_progress()); + + set_stable_write(); + + /* + * Now we moved to stable write, cleanup the commit target queue. + */ + get_fcsm()->ctgtq_cleanup(); + return; +} + +/* + * This function checks, whether switch to stable write or not. + */ +bool nfs_inode::check_stable_write_required(off_t offset) +{ + /* + * If stable_write is already set, we don't need to do anything. + * We don't need lock here as once stable_write is set it's never + * unset. + */ + if (is_stable_write()) { + return false; + } + + /* + * If current write is not append write, then we can't go for unstable writes + * It may be overwrite to existing data and we don't have the knowldege of + * existing block list, it maye require read modified write. So, we can't go + * for unstable write. Similarly, if the offset is more than end of the file, + * we need to write zero block in between the current end of the file and the + * offset. + */ + if (putblock_filesize != offset) { + AZLogInfo("Stable write required as putblock_filesize:{} is not at the" + "offset:{}", putblock_filesize, offset); + + return true; + } + + return false; +} + +/** + * commit_membufs() is called by writer thread to commit flushed membufs. + * It's always issued under flush_lock(). + */ +void nfs_inode::commit_membufs(std::vector &bc_vec) +{ + assert(is_flushing); + + set_commit_in_progress(); + + uint64_t prev_offset = 0; + for (bytes_chunk& bc : bc_vec) { + [[maybe_unused]] struct membuf *mb = bc.get_membuf(); + assert(mb != nullptr); + assert(mb->is_inuse()); + assert(mb->is_commit_pending()); + + if (prev_offset == 0) { + prev_offset = bc.offset + bc.length; + } else { + assert(prev_offset == bc.offset); + prev_offset += bc.length; + } + + /* + * We have at least one commit to issue, mark fcsm as running, + * if not already marked. + */ + get_fcsm()->mark_running(); + get_fcsm()->add_committing(bc.length); + } + + /* + * Create the commit task to carry out the write. + */ + struct rpc_task *commit_task = + get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_FLUSH); + commit_task->init_flush(nullptr /* fuse_req */, ino); + assert(commit_task->rpc_api->pvt == nullptr); + + commit_task->rpc_api->pvt = static_cast(new std::vector(bc_vec)); + + commit_task->issue_commit_rpc(); +} + /** * Note: We dispatch WRITE RPCs as we gather full wsize sized data bytes, * while there may be more bcs that we have not yet processed. This means @@ -394,6 +563,20 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, // Caller must hold the flush_lock. assert(is_flushing); + if (!is_stable_write()) { + /* + * We do not allow a new flush while there's an ongoing one, in case + * of unstable writes. + */ + assert(!get_filecache()->is_flushing_in_progress()); + } + + /* + * Stable won't have commit and for unstable we cannot flush while + * commit is going on. + */ + assert(!is_commit_in_progress()); + if (bc_vec.empty()) { return; } @@ -422,6 +605,14 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, parent_task->num_ongoing_backend_writes = 1; } + /* + * If new offset is not at the end of the file, + * then we need to switch to stable write. + */ + if (check_stable_write_required(bc_vec[0].offset)) { + switch_to_stable_write(); + } + /* * Create the flush task to carry out the write. */ @@ -564,6 +755,7 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, * Once packed completely, then dispatch the write. */ if (write_task->add_bc(bc)) { + putblock_filesize += bc.length; continue; } else { /* @@ -589,6 +781,8 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, // Single bc addition should not fail. [[maybe_unused]] bool res = write_task->add_bc(bc); assert(res == true); + + putblock_filesize += bc.length; } } @@ -845,9 +1039,25 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) * Flushing not in progress and no new flushing can be started as we hold * the flush_lock(), and callback drained. */ - if (!get_filecache()->is_flushing_in_progress() && - !get_fcsm()->fc_cb_running()) { - AZLogDebug("[{}] No flush in progress, returning", ino); + /* + * Stable writes do not need commit, so no commit inprogress and no pending + * commit data. + */ + if (is_stable_write()) { + assert(!is_commit_in_progress()); + assert(get_filecache()->bytes_commit_pending == 0); + } + + if (get_filecache()->is_flushing_in_progress()) { + assert(!is_commit_in_progress()); + } else if (!is_commit_in_progress() && !get_fcsm()->fc_cb_running() && + get_filecache()->bytes_commit_pending == 0) { + /* + * Flushing not in progress and no new flushing can be started as we hold + * the flush_lock(), and callback drained. + * No commit inprogress and no pending commit data, return. + */ + AZLogDebug("[{}] No flush or commit in progress, returning", ino); return 0; } @@ -979,14 +1189,57 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) assert(is_flushing); assert(!get_fcsm()->fc_cb_running()); - return err; + /* + * Unstable write case, we need to wait for the commit to complete. + */ + if (get_filecache()->get_bytes_to_commit() > 0) { + assert(!is_stable_write()); + + if (!is_commit_in_progress()) { + uint64_t bytes = 0; + std::vector bc_vec = + get_filecache()->get_commit_pending_bcs(&bytes); + assert(bc_vec.empty() == (bytes == 0)); + assert(bytes > 0); + + /* + * Issue the commit RPC to commit the pending data. + */ + commit_membufs(bc_vec); + } + } + + if (is_commit_in_progress()) { + wait_for_ongoing_commit(); + } + + return get_write_error(); } /** - * Note: This takes shared lock on ilock_1. + * flush_cache_and_wait() is called only from the release/flush call. + * It's called with flush_lock() held. + * Things it does: + * - Before flushing it needs to wait for any ongoing commit to complete. + * - First it try to get membufs through get_contiguous_dirty_bcs() or + * get_dirty_bc_range() based on the unstable write or stable write. + * - Issue flush for the dirty membufs and wait for the flush to complete. + * - If it's unstable write, it issue commit for commit pending membufs. + * - Wait for the commit to complete. + * - Returns the error code if any. + * + * Note: Flush_cache_and_wait() blocks the fuse thread till the flush completes. + * It's called from the release(), flush() and getattr() calls. It's ok + * as of now as it's not very often called. We can optimize to complete + * the flush in background and return immediately. For that we need to add + * special handling for the getattr() call. */ -int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) +int nfs_inode::flush_cache_and_wait() { + assert(!is_stable_write() || + get_filecache()->bytes_commit_pending == 0); + assert(!is_stable_write() || !is_commit_in_progress()); + /* * MUST be called only for regular files. * Leave the assert to catch if fuse ever calls flush() on non-reg files. @@ -1016,85 +1269,147 @@ int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) } /* - * Grab the inode flush_lock to ensure that we don't initiate any new flush - * operation while some truncate call is in progress (which must have taken - * the flush_lock). - * Once flush_lock() returns we have the flush_lock and we are + * Grab the flush_lock to ensure that we don't initiate any new flushes + * while some truncate call is in progress (which must have taken the + * flush_lock). Once flush_lock() returns we have the lock and we are * guaranteed that no new truncate operation can start till we release * the flush_lock. We can safely start the flush then. */ flush_lock(); /* - * Get the dirty bytes_chunk from the filecache handle. - * This will grab an exclusive lock on the file cache and return the list - * of dirty bytes_chunks at that point. Note that we can have new dirty - * bytes_chunks created but we don't want to wait for those. + * If flush is in progress, wait for it to complete. + * It may happen that last flush is still in progress and we need to wait + * for it to complete before we start the new flush. + * As we have the flush_lock() we are guaranteed that no new flush can be + * started till we release the flush_lock(). + * + * wait_for_ongoing_flush() will wait for the ongoing flush/commt to + * complete. + * + * Note: For stable write case, we don't need to wait for the ongoing flush, + * as there is no commit required for stable write. */ - std::vector bc_vec = - filecache_handle->get_dirty_bc_range(start_off, end_off); + if (!is_stable_write()) { + int retry, err = 0; + const int max_retry = 200; + for (retry = 0; retry < max_retry; retry++) { + assert(is_flushing); + err = wait_for_ongoing_flush(); + assert(is_flushing); + + if (!get_fcsm()->fc_cb_running()) { + break; + } + flush_unlock(); + + /* + * We issue commit as part of wait_for_ongoing_flush() + * with flush_lock() held. This cause on_commit_complete() + * not run completely. + * Give 10ms to the callback to drain completely. + */ + if (get_fcsm()->fc_cb_running()) { + ::usleep(10 * 1000); + } + + /* + * Lock again to check if something flushing again, + * and wait for it to complete in wait_for_ongoing_flush(). + * Repeat again to check if on_commit_complete() completed. + */ + flush_lock(); + } + + if (err != 0) { + flush_unlock(); + AZLogError("[{}] Failed to flush cache to stable storage, " + "error={}", ino, err); + return err; + } + } + assert(is_flushing); /* - * sync_membufs() iterate over the bc_vec and starts flushing the dirty - * membufs. It batches the contiguous dirty membufs and issues a single - * write RPC for them. + * For unstable write, If no bytes to flush, then return. + * For stable write, If no bytes to flush and no flush in + * progress, then return. As for stable write, we didn't + * wait for the flush to complete as above. + * + * TODO: Need to check getattr() call. As that can cause this + * assert to fail. */ - sync_membufs(bc_vec, true); - - flush_unlock(); + assert(!get_filecache()->is_flushing_in_progress() || + is_stable_write()); + assert(!is_commit_in_progress()); /* - * Our caller expects us to return only after the flush completes. - * Wait for all the membufs to flush and get result back. + * For stable case we didn't wait for ongoing flush to complete. + * And, it may be last flush initiated to BLOB, we need to wait + * for this flush to complete. Application not writing anymore + * get_bytes_to_flush() can return 0. */ - for (bytes_chunk &bc : bc_vec) { - struct membuf *mb = bc.get_membuf(); + if (!get_filecache()->is_flushing_in_progress() && + get_filecache()->get_bytes_to_flush() == 0) { + assert(get_filecache()->bytes_dirty == 0); + assert(get_filecache()->bytes_flushing == 0); - assert(mb != nullptr); - assert(mb->is_inuse()); - mb->set_locked(); + AZLogDebug("[{}] Nothing to flush, returning", ino); + flush_unlock(); + return 0; + } - /* - * If still dirty after we get the lock, it may mean two things: - * - Write failed. - * - Some other thread got the lock before us and it made the - * membuf dirty again. - */ - if (mb->is_dirty() && get_write_error()) { - AZLogError("[{}] Flush [{}, {}) failed with error: {}", - ino, - bc.offset, bc.offset + bc.length, - get_write_error()); - } + /* + * Get the dirty bytes_chunk from the filecache handle. + * This will grab an exclusive lock on the file cache and return the list + * of dirty bytes_chunks at that point. + */ + std::vector bc_vec; + uint64_t size = 0; + if (!is_stable_write()) { + bc_vec = filecache_handle->get_contiguous_dirty_bcs(&size); - mb->clear_locked(); - mb->clear_inuse(); + if (size != get_filecache()->get_bytes_to_flush()) { + AZLogInfo("[{}] Flushed size: {} != bytes to flush: {}", + ino, size, get_filecache()->get_bytes_to_flush()); + AZLogInfo("SWITCHING TO STABLE WRITE"); - /* - * Release the bytes_chunk back to the filecache. - * These bytes_chunks are not needed anymore as the flush is done. - * - * Note: We come here for bytes_chunks which were found dirty by the - * above loop. These writes may or may not have been issued by - * us (if not issued by us it was because some other thread, - * mostly the writer issued the write so we found it flushing - * and hence didn't issue). In any case since we have an inuse - * count, release() called from write_callback() would not have - * released it, so we need to release it now. - */ - filecache_handle->release(bc.offset, bc.length); + /* + * This is rare and no-op as we already completed the pending + * flush and commit. + */ + switch_to_stable_write(); + for (auto& bc : bc_vec) { + bc.get_membuf()->clear_inuse(); + } + bc_vec = filecache_handle->get_dirty_bc_range(0, UINT64_MAX); + } + } else { + bc_vec = filecache_handle->get_dirty_bc_range(0, UINT64_MAX); } /* - * If the file is deleted while we still have data in the cache, don't - * treat it as a failure to flush. The file has gone and we don't really - * care about the unwritten data. + * sync_membufs() iterate over the bc_vec and starts flushing the dirty + * membufs. It batches the contiguous dirty membufs and issues a single + * write RPC for them. */ - if (get_write_error() == ENOENT || get_write_error() == ESTALE) { - return 0; + sync_membufs(bc_vec, false /* is_flush */); + + if (wait_for_ongoing_flush()) { + AZLogError("[{}] Failed to flush cache to stable storage, " + "error={}", ino, get_write_error()); + flush_unlock(); + return get_write_error(); } - return get_write_error(); + assert(!is_commit_in_progress()); + assert(!get_filecache()->is_flushing_in_progress()); + assert(get_filecache()->bytes_dirty == 0); + assert(get_filecache()->bytes_commit_pending == 0); + assert(get_filecache()->bytes_flushing == 0); + + flush_unlock(); + return 0; } void nfs_inode::flush_lock() const @@ -1152,6 +1467,11 @@ void nfs_inode::truncate_end(size_t size) const [[maybe_unused]] const uint64_t bytes_truncated = filecache_handle->truncate(size, true /* post */); + /* + * Update the in cache putblock_filesize to reflect the new size. + */ + putblock_filesize = size; + AZLogDebug("[{}] Filecache truncated to size={} " "(bytes truncated: {})", ino, size, bytes_truncated); diff --git a/turbonfs/src/rpc_task.cpp b/turbonfs/src/rpc_task.cpp index 8632e631..f7e5b3ef 100644 --- a/turbonfs/src/rpc_task.cpp +++ b/turbonfs/src/rpc_task.cpp @@ -828,6 +828,212 @@ void access_callback( } } +static void commit_callback( + struct rpc_context *rpc, + int rpc_status, + void *data, + void *private_data) +{ + rpc_task *task = (rpc_task*) private_data; + + /* + * Commit is issued as a FUSE_FLUSH. + * TODO: Maybe it should have a type of its own. + */ + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_FLUSH); + assert(task->rpc_api->pvt != nullptr); + // Commit is never called for a fuse request. + assert(task->get_fuse_req() == nullptr); + + auto res = (COMMIT3res*) data; + + INJECT_JUKEBOX(res, task); + + const fuse_ino_t ino = task->rpc_api->flush_task.get_ino(); + struct nfs_inode *inode = + task->get_client()->get_nfs_inode_from_ino(ino); + // List of bcs committed by this commit call. + auto bc_vec_ptr = (std::vector *) task->rpc_api->pvt; + + // We should call commit only when inode is doing unstable+commit. + assert(!inode->is_stable_write()); + // Caller must be inprogress. + assert(inode->is_commit_in_progress()); + assert(!inode->get_filecache()->is_flushing_in_progress()); + + const int status = task->status(rpc_status, NFS_STATUS(res)); + UPDATE_INODE_WCC(inode, res->COMMIT3res_u.resok.file_wcc); + FC_CB_TRACKER fccbt(inode); + + AZLogDebug("[{}] commit_callback", ino); + + uint64_t commit_bytes = 0; + /* + * Now that the request has completed, we can query libnfs for the + * dispatch time. + */ + task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUS(res)); + + if (status == 0) { + uint64_t offset = 0; + uint64_t length = 0; + + /* + * Go over all the successfully committed bcs and release them from + * file cache (note that successful commit confirms that server has + * persisted the data and client can fee it). + * Also complete any tasks waiting for these membufs to be committed. + */ + for (auto &bc : *bc_vec_ptr) { + struct membuf *mb = bc.get_membuf(); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_commit_pending()); + assert(mb->is_uptodate()); + // Dirty membufs must not be committed. + assert(!mb->is_dirty()); + + mb->clear_commit_pending(); + mb->clear_locked(); + mb->clear_inuse(); + + /** + * Collect the contiguous range of bc's to release. + */ + if (offset == 0 && length == 0) { + offset = bc.offset; + length = bc.length; + } else if (offset + length == bc.offset) { + length += bc.length; + } else { + commit_bytes += length; + const uint64_t released = + inode->get_filecache()->release(offset, length); + AZLogInfo("[{}] commit_callback releasing bc [{}, {}), " + "released {} bytes", + ino, offset, offset+length, released); + offset = bc.offset; + length = bc.length; + } + } + + // Release the last bc not released by the above loop. + if (length != 0) { + commit_bytes += length; + const uint64_t released = + inode->get_filecache()->release(offset, length); + AZLogInfo("[{}] commit_callback releasing bc [{}, {}), " + "released {} bytes", + ino, offset, offset+length, released); + } + + } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) { + task->get_client()->jukebox_retry(task); + return; + } else { + /* + * Commit has failed. + * Go over all the bcs that this commit was targetting, mark the + * membufs back as dirty and clear the commit_pending flag. + * Next write will initiate the flush again with stable write. + */ + for (auto &bc : *bc_vec_ptr) { + struct membuf *mb = bc.get_membuf(); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_commit_pending()); + assert(mb->is_uptodate()); + assert(!mb->is_dirty()); + + mb->clear_commit_pending(); + mb->set_dirty(); + mb->clear_locked(); + mb->clear_inuse(); + } + /* + * Set the inode to stable write, so that next write will initiate + * the flush again with stable write. + * There should be no flush in progress as this moment. + */ + assert(inode->get_filecache()->is_flushing_in_progress() == false); + inode->set_stable_write(); + } + + /* + * Clear the commit in progress flag. + */ + inode->clear_commit_in_progress(); + + if (status == 0) { + assert(commit_bytes > 0); + + // Update the commit bytes in the inode. + inode->get_fcsm()->on_commit_complete(commit_bytes); + } else { + assert(0); + } + + delete bc_vec_ptr; + task->rpc_api->pvt = nullptr; + task->free_rpc_task(); +} + +void rpc_task::issue_commit_rpc() +{ + // Must only be called for a flush task. + assert(get_op_type() == FUSE_FLUSH); + // Commit is never called for a fuse request. + assert(get_fuse_req() == nullptr); + + const fuse_ino_t ino = rpc_api->flush_task.get_ino(); + struct nfs_inode *inode = get_client()->get_nfs_inode_from_ino(ino); + + // Commit must not be called if we are doing stable writes. + assert(!inode->is_stable_write()); + // Commit must mot be sent with ongoing flushes. + assert(!inode->get_filecache()->is_flushing_in_progress()); + // Caller must have marked commit inprogress. + assert(inode->is_commit_in_progress()); + + COMMIT3args args; + ::memset(&args, 0, sizeof(args)); + bool rpc_retry = false; + + AZLogDebug("[{}] issue_commit_rpc", ino); + + /* + * Get the bcs marked for commit_pending. + */ + assert(rpc_api->pvt != nullptr); + + args.file = inode->get_fh(); + args.offset = 0; + args.count = 0; + + do { + rpc_retry = false; + stats.on_rpc_issue(); + + if (rpc_nfs3_commit_task(get_rpc_ctx(), + commit_callback, &args, this) == NULL) { + stats.on_rpc_cancel(); + /* + * Most common reason for this is memory allocation failure, + * hence wait for some time before retrying. Also block the + * current thread as we really want to slow down things. + * + * TODO: For soft mount should we fail this? + */ + rpc_retry = true; + + AZLogWarn("rpc_nfs3_write_task failed to issue, retrying " + "after 5 secs!"); + ::sleep(5); + } + } while (rpc_retry); +} + /** * Must be called when bytes_completed bytes are successfully read/written. */ @@ -1085,7 +1291,7 @@ static void write_iov_callback( bciov->orig_offset + bciov->orig_length); // Update bciov after the current write. - bciov->on_io_complete(res->WRITE3res_u.resok.count); + bciov->on_io_complete(res->WRITE3res_u.resok.count, !inode->is_stable_write()); // Create a new write_task for the remaining bc_iovec. struct rpc_task *write_task = @@ -1122,7 +1328,7 @@ static void write_iov_callback( return; } else { // Complete bc_iovec IO completed. - bciov->on_io_complete(res->WRITE3res_u.resok.count); + bciov->on_io_complete(res->WRITE3res_u.resok.count, !inode->is_stable_write()); // Complete data writen to blob. AZLogDebug("[{}] <{}> Completed write, off: {}, len: {}", @@ -1205,6 +1411,8 @@ static void write_iov_callback( */ if (inode->get_write_error() == 0) { inode->get_fcsm()->on_flush_complete(bciov->orig_length); + } else { + assert(0); } delete bciov; @@ -1278,7 +1486,16 @@ void rpc_task::issue_write_rpc() args.file = inode->get_fh(); args.offset = offset; args.count = length; - args.stable = FILE_SYNC; + args.stable = inode->is_stable_write() ? FILE_SYNC : UNSTABLE; + + /* + * Unstable writes want to make use of multiple connections for better perf. + * Note that they aren't affected by the optimistic concurrency backoff + * issues as seen by stable writes. + */ + if (!inode->is_stable_write()) { + set_csched(CONN_SCHED_RR); + } do { rpc_retry = false; @@ -2339,7 +2556,6 @@ void rpc_task::run_write() const size_t length = rpc_api->write_task.get_size(); struct fuse_bufvec *const bufv = rpc_api->write_task.get_buffer_vector(); const off_t offset = rpc_api->write_task.get_offset(); - const bool sparse_write = (offset > inode->get_file_size(true)); uint64_t extent_left = 0; uint64_t extent_right = 0; @@ -2429,140 +2645,8 @@ void rpc_task::run_write() assert(extent_right >= (extent_left + length)); - /* - * We have successfully copied the user data into the cache. - * We can have the following cases: - * 1. This new write caused a contiguous extent to be greater than - * max_dirty_extent_bytes(), aka MDEB. - * In this case we begin flush of this contiguous extent (in multiple - * parallel wsize sized blocks) since there's no benefit in waiting more - * as the data is sufficient for the server scheduler to effectively - * write, in optimal sized blocks. - * We complete the application write rightaway without waiting for the - * flush to complete as we are not under memory pressure. - * This will happen when user is sequentially writing to the file. - * 2. A single contiguous extent is not greater than MDEB but total dirty - * bytes waiting to be flushed is more than MDEB. - * In this case we begin flush of the entire dirty data from the cache - * as we have sufficient data for the server scheduler to perform - * batched writes effectively. - * We complete the application write rightaway without waiting for the - * flush to complete as we are not under memory pressure. - * This will happen when user is writing to the file in a random or - * pseudo-sequential fashion. - * 3. Cache has dirty data beyond the "inline write" threshold, ref - * do_inline_write(). - * In this case we begin flush of the entire dirty data. - * This is a memory pressure situation and hence we do not complete the - * application write till all the backend writes complete. - * This will happen when user is writing faster than our backend write - * throughput, eventually dirty data will grow beyond the "inline write" - * threshold and then we have to slow down the writers by delaying - * completion. - * - * Other than this we have a special case of "write beyond eof" (termed - * sparse write). In sparse write case also we perform "inline write" of - * all the dirty data. This is needed for correct read behaviour. Imagine - * a reader reading from the sparse part of the file which is not yet in - * the bytes_chunk_cache. This read will be issued to the server and since - * server doesn't know the updated file size (as the write may still be - * sitting in our bytes_chunk_cache) it will return eof. This is not correct - * as such reads issued after successful write, are valid and should return - * 0 bytes for the sparse range. - */ - - /* - * Do we need to perform "inline write"? - * Inline write implies, we flush all the dirty data and wait for all the - * corresponding backend writes to complete. - */ - const bool need_inline_write = - (sparse_write || inode->get_filecache()->do_inline_write()); - - if (need_inline_write) { - INC_GBL_STATS(inline_writes, 1); - - AZLogDebug("[{}] Inline write (sparse={}), {} bytes, extent @ [{}, {})", - ino, sparse_write, (extent_right - extent_left), - extent_left, extent_right); - - /* - * ensure_flush() will arrange to flush all the dirty data and complete - * the write task when the flush completes. - */ - inode->get_fcsm()->ensure_flush(0, offset, length, this); - - // Free up the fuse thread without completing the application write. - return; - } - - /* - * Ok, we don't need to do inline writes. See if we have enough dirty - * data and we need to start async flush. - */ - - /* - * If the extent size exceeds the max allowed dirty size as returned by - * max_dirty_extent_bytes(), then it's time to flush the extent. - * Note that this will cause sequential writes to be flushed at just the - * right intervals to optimize fewer write calls and also allowing the - * server scheduler to merge better. - * See bytes_to_flush for how random writes are flushed. - * - * Note: max_dirty_extent is static as it doesn't change after it's - * queried for the first time. - */ - static const uint64_t max_dirty_extent = - inode->get_filecache()->max_dirty_extent_bytes(); - assert(max_dirty_extent > 0); - - /* - * How many bytes in the cache need to be flushed. These are dirty chunks - * which have not started flushing yet. - */ - const uint64_t bytes_to_flush = - inode->get_filecache()->get_bytes_to_flush(); - - AZLogDebug("[{}] extent_left: {}, extent_right: {}, size: {}, " - "bytes_to_flush: {} (max_dirty_extent: {})", - ino, extent_left, extent_right, - (extent_right - extent_left), - bytes_to_flush, - max_dirty_extent); - - if ((extent_right - extent_left) < max_dirty_extent) { - /* - * Current extent is not large enough to be flushed, see if we have - * enough total dirty data that needs to be flushed. This is to cause - * random writes to be periodically flushed. - */ - if (bytes_to_flush < max_dirty_extent) { - /* - * No memory pressure. - */ - AZLogDebug("Reply write without syncing to Blob"); - reply_write(length); - return; - } - - /* - * This is the case of non-sequential writes causing enough dirty - * data to be accumulated, need to flush all of that. - */ - extent_left = 0; - extent_right = UINT64_MAX; - } - - /* - * Queue a non-blocking flush target for flushing all the dirty data. - */ - inode->get_fcsm()->ensure_flush(0, offset, length, nullptr); - - /* - * Complete the write request without waiting for the backend flush to - * complete. - */ - reply_write(length); + // Run this task through flush commit state machine + inode->get_fcsm()->run(this, extent_left, extent_right); } void rpc_task::run_flush() @@ -2575,6 +2659,7 @@ void rpc_task::run_flush() * to wait. Waiting in libnfs thread context will cause deadlocks. */ assert(get_fuse_req() != nullptr); + reply_error(inode->flush_cache_and_wait()); }