diff --git a/turbonfs/inc/fcsm.h b/turbonfs/inc/fcsm.h index 09ce10be..0582ce25 100644 --- a/turbonfs/inc/fcsm.h +++ b/turbonfs/inc/fcsm.h @@ -69,9 +69,8 @@ class fcsm struct nfs_inode *_inode); /** - * Ensure 'flush_bytes' additional bytes are flushed from the cache, above - * and beyond what's already flushed or flushing now. flush_bytes==0 implies - * flush all dirty bytes. + * Ensure all or some dirty bytes are flushed or scheduled for flushing (if + * a flush or commit is already ongoing). * If the state machine is currently not running, it'll kick off the state * machine by calling sync_membufs(), else it'll add a new flush target to * ftgtq, which will be run by on_flush_complete() when the ongoing flush @@ -79,36 +78,38 @@ class fcsm * initiated this flush and in that case a blocking target will be added * which means the specified task will block till the requested target * completes, else a non-blocking target will be added which just requests - * the specific amount of bytes to be flushed w/o having the application - * wait. + * dirty bytes to be flushed w/o having the application wait. * * ensure_flush() provides the following guarantees: - * - Additional flush_bytes bytes will be flushed. This is beyond what's - * already flushed or scheduled for flush. + * - It'll flush all or part of cached dirty bytes starting from the lowest + * offset. The actual number of bytes flushed is decided based on various + * config settings and the memory pressure. * - On completion of that flush, task will be completed. - * If after grabbing flush_lock it figures out that the requested flush - * target is already met, it completes the task rightaway. * * write_off and write_len describe the current application write call. - * write_len is needed for completing the write rightaway for non-blocking - * cases, where 'task' is null. + * They are needed for logging when task is nullptr. * - * LOCKS: flush_lock. + * Caller MUST hold the flush_lock. */ - void ensure_flush(uint64_t flush_bytes, - uint64_t write_off, + void ensure_flush(uint64_t write_off, uint64_t write_len, - struct rpc_task *task = nullptr); + struct rpc_task *task = nullptr, + std::atomic *conditional_variable = nullptr); /** - * Ensure 'commit_bytes' additional bytes are committed from the cache, - * above and beyond what's already committed or committing now. + * Ensure all or some commit-pending bytes are committed or scheduled for + * commit (if a flush or commit is already ongoing). * If 'task' is null it'll add a non-blocking commit target to ctgtq, else * it'll add a blocking commit target for completing task when given commit * goal is met. + * + * Caller MUST hold the flush_lock. */ - void ensure_commit(uint64_t commit_bytes, - struct rpc_task *task = nullptr); + void ensure_commit(uint64_t write_off, + uint64_t write_len, + struct rpc_task *task = nullptr, + std::atomic *conditional_variable = nullptr, + bool commit_full = false); /** * Callbacks to be called when flush/commit successfully complete. @@ -116,7 +117,7 @@ class fcsm * targets from ftgtq/ctgtq as appropriate. */ void on_flush_complete(uint64_t flush_bytes); - void on_commit_complete(); + void on_commit_complete(uint64_t commit_bytes); /** * Is the state machine currently running, i.e. it has sent (one or more) @@ -148,9 +149,27 @@ class fcsm void mark_running(); void clear_running(); + /** + * Nudge the flush-commit state machine. + * After the fuse thread copies the application data into the cache, it + * must call this to let FCSM know that some more dirty data has been + * added. It checks the dirty data against the configured limits and + * decides which of the following action to take: + * - Do nothing, as dirty data is still within limits. + * - Start flushing. + * - Start committing. + * - Flush/commit while blocking the task till flush/commit completes. + */ + void run(struct rpc_task *task, + uint64_t extent_left, + uint64_t extent_right); + /** * Call when more writes are dispatched, or prepared to be dispatched. - * This MUST be called before the write callback can be called. + * These must correspond to membufs which are dirty and not already + * flushing. + * This MUST be called before the write_iov_callback() can be called, i.e., + * before the actual write call is issued. */ void add_flushing(uint64_t bytes); @@ -180,6 +199,22 @@ class fcsm return (fc_cb_count() > 0); } + /** + * Call when more commit are dispatched, or prepared to be dispatched. + * These must correspond to membufs which are already flushed, i.e., they + * must not be dirty or flushing and must be commit pending. + * This MUST be called before the commit_callback can be called, i.e., + * before the actual commit call is issued. + */ + void add_committing(uint64_t bytes); + + /** + * ctgtq_cleanup() is called when we switch to stable writes. + * It clears up all the queued commit targets as stable writes will not + * cause a commit and those targets would never normally complete. + */ + void ctgtq_cleanup(); + private: /* * The singleton nfs_client, for convenience. @@ -216,7 +251,8 @@ class fcsm fctgt(struct fcsm *fcsm, uint64_t _flush_seq, uint64_t _commit_seq, - struct rpc_task *_task = nullptr); + struct rpc_task *_task = nullptr, + std::atomic *conditional_variable = nullptr); /* * Flush and commit targets (in terms of flushed_seq_num/committed_seq_num) @@ -235,6 +271,12 @@ class fcsm * Pointer to the containing fcsm. */ struct fcsm *const fcsm = nullptr; + + /* + * If non-null, it's initial value is false. + * Caller who enqueue the target waiting for it to be true. + */ + std::atomic *conditional_variable = nullptr; #if 0 /* * Has the required flush/commit task started? @@ -262,7 +304,10 @@ class fcsm /* * Continually increasing seq number of the last byte successfully flushed * and committed. Flush-commit targets (fctgt) are expressed in terms of - * these. + * these. These are actually numerically one more than the last + * flushing/commiting and flushed/committed byte's seq number, e.g., if the + * last byte flushed was byte #1 (0 and 1 are flushed), then flushed_seq_num + * would be 2. * Note that these are *not* offsets in the file but these are cumulative * values for total bytes flushed/committed till now since the file was * opened. In case of overwrite same byte(s) may be repeatedly flushed diff --git a/turbonfs/inc/file_cache.h b/turbonfs/inc/file_cache.h index f91c7767..5d63bd02 100644 --- a/turbonfs/inc/file_cache.h +++ b/turbonfs/inc/file_cache.h @@ -694,6 +694,11 @@ struct bytes_chunk */ bool is_new = false; + /** + * Get the inode corresponding to this bc. + */ + struct nfs_inode *get_inode() const; + /** * Return membuf corresponding to this bytes_chunk. * This will be used by caller to synchronize operations on the membuf. @@ -1200,8 +1205,8 @@ class bytes_chunk_cache * check for that after holding the membuf lock, before it tries to * flush those membuf(s). */ - std::vector get_dirty_bc_range(uint64_t st_off, - uint64_t end_off) const; + std::vector get_dirty_bc_range( + uint64_t st_off = 0, uint64_t end_off = UINT64_MAX) const; /* * Returns dirty chunks which are not already flushing, in the given range, @@ -1239,8 +1244,8 @@ class bytes_chunk_cache * check for that after holding the membuf lock, before it tries to * commit those membuf(s). */ - std::vector get_flushing_bc_range(uint64_t st_off, - uint64_t end_off) const; + std::vector get_flushing_bc_range( + uint64_t st_off = 0, uint64_t end_off = UINT64_MAX) const; /** * Returns contiguous dirty (and not flushing) chunks from chunmap, starting @@ -1526,7 +1531,7 @@ class bytes_chunk_cache * e.g., * if the max_dirty_extent_bytes() is 1GB, then we have * flush_required() @ 1GB - * commit_required() @ 1GB + * commit_required() @ 2GB * do_inline_write() @ 4GB. * * Assuming backend flush speed of 1GB/s and memory write speed of diff --git a/turbonfs/inc/nfs_inode.h b/turbonfs/inc/nfs_inode.h index f374f98c..bae13533 100644 --- a/turbonfs/inc/nfs_inode.h +++ b/turbonfs/inc/nfs_inode.h @@ -289,6 +289,29 @@ struct nfs_inode */ struct stat attr; + /** + * We maintain following multiple views of the file and thus multiple file + * sizes for those views. + * - Cached. + * This is the view of the file that comprises of data that has been + * written by the application and saved in file cache. It may or may not + * have been flushed and/or committed. This is the most uptodate view of + * the file and applications must use this view. + * cached_filesize tracks the file size for this view. + * - Uncommited. + * This is the view of the file that tracks data that has been flushed + * using UNSTABLE writes but not yet COMMITted to the Blob. This view of + * the file is only used to see if the next PB call will write after the + * last PB'ed byte and thus can be appended. + * putblock_filesize tracks the file size for this view. + * - Committed. + * This is the view of the file that tracks data committed to the Blob. + * Other clients will see this view. + * attr.st_size tracks the file size for this view. + */ + off_t cached_filesize = 0; + mutable off_t putblock_filesize = 0; + /* * Has this inode seen any non-append write? * This starts as false and remains false as long as copy_to_cache() only @@ -310,7 +333,7 @@ struct nfs_inode * Note: As of now, we are not using this flag as commit changes not yet * integrated, so we are setting this flag to true. */ - bool stable_write = true; + bool stable_write = false; public: /* @@ -1400,12 +1423,16 @@ struct nfs_inode * initiate any new flush operations while some truncate call is in * progress (which must have held the flush_lock). */ - int flush_cache_and_wait(uint64_t start_off = 0, - uint64_t end_off = UINT64_MAX); + int flush_cache_and_wait(); /** - * Wait for currently flushing membufs to complete. + * Wait for currently flushing/committing membufs to complete. + * It will wait till the currently flushing membufs complete and then + * issue a commit and wait for that. If no flush is ongoing but there's + * commit_pending data, it'll commit that and return after the commit + * completes. * Returns 0 on success and a positive errno value on error. + * Once it returns, commit_pending will be 0. * * Note : Caller must hold the inode flush_lock to ensure that * no new membufs are added till this call completes. @@ -1413,8 +1440,35 @@ struct nfs_inode * flush/write requests to complete, but it'll exit with flush_lock * held. */ - int wait_for_ongoing_flush(uint64_t start_off = 0, - uint64_t end_off = UINT64_MAX); + int wait_for_ongoing_flush(); + + /** + * commit_membufs() is called to commit uncommitted membufs to the Blob. + * It creates commit RPC and sends it to the NFS server. + */ + void commit_membufs(std::vector &bcs); + + /** + * switch_to_stable_write() is called to switch the inode to stable write + * mode. It waits for all ongoing flush and subsequent commit to complete. + * If not already scheduled, it'll perform an explicit commit after the + * flush complete. + * Post that it'll mark inode for stable write and return. From then on + * any writes to this inode will be sent as stable writes. + */ + void switch_to_stable_write(); + + /** + * Check if stable write is required for the given offset. + * Given offset is the start of contiguous dirty membufs that need to be + * flushed to the Blob. + */ + bool check_stable_write_required(off_t offset); + + /** + * Wait for ongoing commit operation to complete. + */ + void wait_for_ongoing_commit(); /** * Sync the dirty membufs in the file cache to the NFS server. diff --git a/turbonfs/inc/rpc_stats.h b/turbonfs/inc/rpc_stats.h index 626914b1..d0a7bbed 100644 --- a/turbonfs/inc/rpc_stats.h +++ b/turbonfs/inc/rpc_stats.h @@ -139,9 +139,6 @@ class rpc_stats_az */ void on_rpc_issue() { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(stamp.issue == 0); stamp.issue = get_current_usecs(); assert(stamp.issue >= stamp.create); @@ -156,9 +153,6 @@ class rpc_stats_az */ void on_rpc_cancel() { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(stamp.issue != 0); assert((int64_t) stamp.issue <= get_current_usecs()); assert(stamp.dispatch == 0); @@ -178,9 +172,6 @@ class rpc_stats_az */ void on_rpc_complete(struct rpc_pdu *pdu, nfsstat3 status) { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert((status == NFS3ERR_RPC_ERROR) || (nfsstat3_to_errno(status) != -ERANGE)); diff --git a/turbonfs/inc/rpc_task.h b/turbonfs/inc/rpc_task.h index 2468d60d..94f09d18 100644 --- a/turbonfs/inc/rpc_task.h +++ b/turbonfs/inc/rpc_task.h @@ -2428,6 +2428,7 @@ struct rpc_task */ bool add_bc(const bytes_chunk& bc); void issue_write_rpc(); + void issue_commit_rpc(); #ifdef ENABLE_NO_FUSE /* diff --git a/turbonfs/src/fcsm.cpp b/turbonfs/src/fcsm.cpp index 7231591d..c86a4e93 100644 --- a/turbonfs/src/fcsm.cpp +++ b/turbonfs/src/fcsm.cpp @@ -25,17 +25,21 @@ fcsm::fcsm(struct nfs_client *_client, fcsm::fctgt::fctgt(struct fcsm *fcsm, uint64_t _flush_seq, uint64_t _commit_seq, - struct rpc_task *_task) : + struct rpc_task *_task, + std::atomic *_cv) : flush_seq(_flush_seq), commit_seq(_commit_seq), task(_task), - fcsm(fcsm) + fcsm(fcsm), + conditional_variable(_cv) { assert(fcsm->magic == FCSM_MAGIC); // At least one of flush/commit goals must be set. assert((flush_seq != 0) || (commit_seq != 0)); -#ifndef NDEBUG + // If conditional variable, it's initial value should be false. + assert(!conditional_variable || *conditional_variable == false); + if (task) { // Only frontend write tasks must be specified. assert(task->magic == RPC_TASK_MAGIC); @@ -43,7 +47,6 @@ fcsm::fctgt::fctgt(struct fcsm *fcsm, assert(task->rpc_api->write_task.is_fe()); assert(task->rpc_api->write_task.get_size() > 0); } -#endif AZLogDebug("[{}] [FCSM] {} fctgt queued (F: {}, C: {}, T: {})", fcsm->get_inode()->get_fuse_ino(), @@ -85,58 +88,519 @@ void fcsm::add_flushing(uint64_t bytes) { assert(inode->is_flushing); assert(flushed_seq_num <= flushing_seq_num); + assert(committed_seq_num <= committing_seq_num); + assert(committing_seq_num <= flushed_seq_num); + flushing_seq_num += bytes; } -void fcsm::ensure_flush(uint64_t flush_bytes, - uint64_t write_off, - uint64_t write_len, - struct rpc_task *task) +void fcsm::add_committing(uint64_t bytes) +{ + assert(inode->is_flushing); + // Must be called only for unstable writes. + assert(!inode->is_stable_write()); + assert(committed_seq_num <= committing_seq_num); + + committing_seq_num += bytes; + + // We can only commit a byte that's flushed. + assert(flushed_seq_num <= flushing_seq_num); + assert(committing_seq_num <= flushed_seq_num); +} + +void fcsm::run(struct rpc_task *task, + uint64_t extent_left, + uint64_t extent_right) +{ + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); + + const size_t length = task->rpc_api->write_task.get_size(); + const off_t offset = task->rpc_api->write_task.get_offset(); + const bool sparse_write = false; + const fuse_ino_t ino = task->rpc_api->write_task.get_ino(); + + /* + * fcsm::run() is called after fuse thread successfully copies user data + * into the cache. We can have the following cases (in decreasing order of + * criticality): + * 1. Cache has dirty+uncommitted data beyond the "inline write threshold", + * ref do_inline_write(). + * In this case we begin flush of the dirty data and/or commit of the + * uncommitted data. + * This is a memory pressure situation and hence we do not complete the + * application write till all the above backend writes complete. + * This will happen when application is writing faster than our backend + * write throughput, eventually dirty data will grow beyond the "inline + * write threshold" and then we have to slow down the writers by delaying + * completion. + * 2. Cache has uncommitted data beyond the "commit threshold", ref + * commit_required(). + * In this case we free up space in the cache by committing data. + * We just initiate a commit while the current write request is + * completed. Note that we want to delay commits so that we can reduce + * the commit calls (commit more data per call) as commit calls sort + * of serialize the writes as we cannot send any other write to the + * server while commit is on. + * 3. Cache has enough dirty data that we can flush. + * For sequential writes, this means the new write caused a contiguous + * extent to be greater than max_dirty_extent_bytes(), aka MDEB, while + * for non-seq writes it would mean that the total dirty data grew beyond + * MDEB. + * In this case we begin flush of this contiguous extent (in multiple + * parallel wsize sized blocks) since there's no benefit in waiting more + * as the data is sufficient for the server scheduler to effectively + * write, in optimal sized blocks. + * We complete the application write rightaway without waiting for the + * flush to complete as we are not under memory pressure. + * + * Other than this we have a special case of "write beyond eof" (termed + * sparse write). In sparse write case also we perform "inline write" of + * all the dirty data. This is needed for correct read behaviour. Imagine + * a reader reading from the sparse part of the file which is not yet in + * the bytes_chunk_cache. This read will be issued to the server and since + * server doesn't know the updated file size (as the write may still be + * sitting in our bytes_chunk_cache) it will return eof. This is not correct + * as such reads issued after successful write, are valid and should return + * 0 bytes for the sparse range. + */ + + /* + * If the extent size exceeds the max allowed dirty size as returned by + * max_dirty_extent_bytes(), then it's time to flush the extent. + * Note that this will cause sequential writes to be flushed at just the + * right intervals to optimize fewer write calls and also allowing the + * server scheduler to merge better. + * See bytes_to_flush for how random writes are flushed. + * + * Note: max_dirty_extent is static as it doesn't change after it's + * queried for the first time. + */ + static const uint64_t max_dirty_extent = + inode->get_filecache()->max_dirty_extent_bytes(); + assert(max_dirty_extent > 0); + + /* + * Check what kind of limit we have hit. + */ + const bool need_inline_write = + (sparse_write || inode->get_filecache()->do_inline_write()); + const bool need_commit = + inode->get_filecache()->commit_required(); + const bool need_flush = + ((extent_right - extent_left) >= max_dirty_extent) || + inode->get_filecache()->flush_required(); + + AZLogDebug("[{}] fcsm::run() (sparse={}, need_inline_write={}, " + "need_commit={}, need_flush={}, extent=[{}, {}))", + inode->get_fuse_ino(), sparse_write, need_inline_write, + need_commit, need_flush, extent_left, extent_right); + + /* + * Nothing to do, we can complete the application write rightaway. + * This should be the happy path! + */ + if (!need_inline_write && !need_commit && !need_flush) { + task->reply_write(length); + return; + } + + /* + * Do we need to perform "inline write"? + * Inline write implies, we flush all the dirty data and wait for all the + * corresponding backend writes to complete. + */ + if (need_inline_write) { + INC_GBL_STATS(inline_writes, 1); + + AZLogDebug("[{}] Inline write (sparse={}), {} bytes, extent @ [{}, {})", + ino, sparse_write, (extent_right - extent_left), + extent_left, extent_right); + + /* + * Queue a blocking flush/commit target, which will complete the fuse + * write after flush/commit completes. + * In case of stable writes we queue a flush target while in case of + * unstable writes we queue a commit target. Commit target implicitly + * performs flush before the commit. + */ + inode->flush_lock(); + if (inode->is_stable_write()) { + inode->get_fcsm()->ensure_flush(offset, length, task); + } else { + inode->get_fcsm()->ensure_commit(offset, length, task); + } + inode->flush_unlock(); + + // Free up the fuse thread without completing the application write. + return; + } + + // Case 2: Commit + /* + * We don't need to do inline writes. See if we need to commit the + * uncommitted data to the backend. We just need to stat the commit + * and not hold the current write task till the commit completes. + */ + if (need_commit) { + assert(!inode->is_stable_write()); + + inode->flush_lock(); + /* + * Commit will only start after all ongoing flush complete and no new + * flush can start (as we have the flush_lock). This means no new + * commit-pending data can be added and hence the current inprogress + * commit will finish committing all commit-pending bytes. + */ + if (inode->is_commit_in_progress()) { + assert(!inode->get_filecache()->is_flushing_in_progress()); + + AZLogDebug("[{}] Commit already in progress, skipping commit", + ino); + } else { + AZLogDebug("[{}] Committing {} bytes", ino, + inode->get_filecache()->max_commit_bytes()); + inode->get_fcsm()->ensure_commit(offset, length, nullptr); + } + inode->flush_unlock(); + } + + /* + * Ok, we don't need to do inline writes. See if we have enough dirty + * data and we need to start async flush. + */ + + // Case 3: Flush + if ((extent_right - extent_left) < max_dirty_extent) { + /* + * This is the case of non-sequential writes causing enough dirty + * data to be accumulated, need to flush all of that. + */ + extent_left = 0; + extent_right = UINT64_MAX; + } + + /* + * Queue a non-blocking flush target for flushing all the dirty data. + */ + if (need_flush) { + inode->flush_lock(); + inode->get_fcsm()->ensure_flush(offset, length, nullptr); + inode->flush_unlock(); + } + + /* + * Complete the write request without waiting for the backend flush/commit + * to complete. For need_inline_write we should not complete the task now. + */ + assert(!need_inline_write); + task->reply_write(length); +} + +void fcsm::ctgtq_cleanup() { - AZLogDebug("[{}] [FCSM] ensure_flush<{}>({}), write req [{}, {}), task: {}", + assert(inode->is_flushing); + // Must be called after switching the inode to stable write. + assert(inode->is_stable_write()); + + AZLogDebug("[FCSM][{}] ctgtq_cleanup()", inode->get_fuse_ino()); + + while (!ctgtq.empty()) { + struct fctgt &ctgt = ctgtq.front(); + assert(ctgt.fcsm == this); + + struct rpc_task *task = ctgt.task; + if (task) { + /* + * ctgtq_cleanup() is called when we have decided to swith to + * stable writes. Since commit is never called for stable writes, + * these tasks waiting for commit must be completed now. + */ + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); + + task->reply_write(task->rpc_api->write_task.get_size()); + } + + AZLogInfo("[FCSM][{}] ctgtq_cleanup(): completed write task: {} " + "commit_seq: {}", + inode->get_fuse_ino(), + fmt::ptr(task), + ctgt.commit_seq); + + ctgtq.pop(); + } + + assert(ctgtq.empty()); +} + +void fcsm::ensure_commit(uint64_t write_off, + uint64_t write_len, + struct rpc_task *task, + std::atomic *conditional_variable, + bool commit_full) +{ + assert(inode->is_flushing); + assert(!inode->is_stable_write()); + assert(!commit_full || task == nullptr); + + /* + * If any of the flush/commit targets are waiting completion, state machine + * must be running. + */ + assert(is_running() || (ctgtq.empty() && ftgtq.empty())); + assert(is_running() || (flushed_seq_num == flushing_seq_num)); + assert(is_running() || (committed_seq_num == committing_seq_num)); + + AZLogDebug("[{}] [FCSM] ensure_commit<{}> write req [{}, {}], task: {}", inode->get_fuse_ino(), task ? "blocking" : "non-blocking", - flush_bytes, write_off, write_off + write_len, fmt::ptr(task)); + // committed_seq_num can never be more than committing_seq_num. + assert(committed_seq_num <= committing_seq_num); + + // we can only commit bytes which are flushed. + assert(committing_seq_num <= flushed_seq_num); + + if (task) { + // task provided must be a frontend write task. + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); + } + + /* + * Find how many bytes we would like to commit. + * If there are some commit-pending bytes we commit all of those, else + * we set a commit target large enough to flush+commit all leaving + * one full sized dirty extent. + */ + uint64_t commit_bytes = + inode->get_filecache()->get_bytes_to_commit(); + /* - * TODO: Do we have a usecase for caller specifying how many bytes to - * flush. + * If commit_full flag is true, wait_for_ongoing_flush() + * committed the commit_pending bytes. Now we need to flush + * dirty bytes and commit them. */ - assert(flush_bytes == 0); + if (commit_full) { + assert(commit_bytes == 0); + commit_bytes = inode->get_filecache()->get_bytes_to_flush(); + } + + if (commit_bytes == 0) { + /* + * TODO: Make sure this doesn't result in small-blocks being written. + */ + const int64_t bytes = (inode->get_filecache()->get_bytes_to_flush() - + inode->get_filecache()->max_dirty_extent_bytes()); + + commit_bytes = std::max(bytes, (int64_t) 0); + } + + /* + * No new bytes to commit, complete the task if it was a blocking call. + */ + if (commit_bytes == 0) { + AZLogDebug("COMMIT BYTES ZERO"); + if (task) { + task->reply_write(task->rpc_api->write_task.get_size()); + } + if (conditional_variable) { + *conditional_variable = true; + } + return; + } + + /* + * What will be the committed_seq_num value after commit_bytes are committed? + * Since commit_pending_bytes can reduce as another thread could be parallely + * running commit completion, so we may set target_commited_seq_num lower than + * the last queued commit_seq, so take the max. + */ + const uint64_t last_commit_seq = + !ctgtq.empty() ? ctgtq.front().commit_seq : 0; + const uint64_t target_committed_seq_num = + std::max((committed_seq_num + commit_bytes), last_commit_seq); + + /* + * If the state machine is already running, we just need to add an + * appropriate commit target and return. When the ongoing operation + * completes, this commit would be dispatched. + */ + if (is_running()) { +#ifndef NDEBUG + /* + * Make sure commit targets are always added in an increasing commit_seq. + */ + if (!ctgtq.empty()) { + assert(ctgtq.front().commit_seq <= target_committed_seq_num); + assert(ctgtq.front().flush_seq == 0); + } +#endif + ctgtq.emplace(this, + 0 /* target flush_seq */, + target_committed_seq_num /* target commit_seq */, + task, + conditional_variable); + return; + } + + /* + * FCSM not running. + * Flushed_seq_num tells us how much data is already flushed, If it's less + * than the target_committed_seq_num, we need to schedule a flush to catch up + * with the target_committed_seq_num. + */ + if (flushed_seq_num < target_committed_seq_num) { + AZLogDebug("[{}] [FCSM] not running, schedule a new flush to catch up, " + "flushed_seq_num: {}, target_committed_seq_num: {}", + inode->get_fuse_ino(), + flushed_seq_num.load(), + target_committed_seq_num); + + ensure_flush(task ? task->rpc_api->write_task.get_offset() : 0, + task ? task->rpc_api->write_task.get_size() : 0, + nullptr); + /* + * ensure_flush() must have scheduled flushing till + * target_committed_seq_num. + * + * TODO: We need to ensure that ensure_flush() does indeed flush till + * target_committed_seq_num, maybe convey it to ensure_flush(). + */ + assert(flushing_seq_num >= target_committed_seq_num); + + /* + * ensure_flush()->sync_membufs() might have converted this inode to + * stable writes. + */ + if (!inode->is_stable_write()) { + /** + * Enqueue a commit target to be triggered once the flush completes. + */ + ctgtq.emplace(this, + 0 /* target flush_seq */, + target_committed_seq_num /* target commit_seq */, + task, + conditional_variable); + } else { + if (task) { + task->reply_write(task->rpc_api->write_task.get_size()); + } + + /* + * Flush_cache_and_wait() waiting for dirty_bytes to flushed, + * can't complete until all dirty bytes flushed. so add the + * flush target. + */ + if (commit_full) { + ensure_flush(0, 0, nullptr, conditional_variable); + } + } + + return; + } else { + /* + * No new data to flush for the current commit goal, just add a commit. + * target and we are done. + * Since FCSM is not running and we discovered that we have one or more + * bytes to be committed, get_commit_pending_bcs() MUST return those. + */ + AZLogDebug("[{}] [FCSM] not running, schedule a new commit, " + "flushed_seq_num: {}, " + "target_committed_seq_num: {}", + inode->get_fuse_ino(), + flushed_seq_num.load(), + target_committed_seq_num); + + uint64_t bytes; + std::vector bc_vec = + inode->get_filecache()->get_commit_pending_bcs(&bytes); + assert(!bc_vec.empty()); + assert(bytes > 0); + + // With FCSM not running, these should be same. + assert(committing_seq_num == committed_seq_num); + [[maybe_unused]] + const uint64_t prev_committing_seq_num = committing_seq_num; + inode->commit_membufs(bc_vec); + assert(is_running()); + + assert(committing_seq_num == (prev_committing_seq_num + bytes)); + assert(committing_seq_num > committed_seq_num); + + /* + * Enqueue this target, on_commit_callback() completes + * this target. Otherwise task/conditional_variable + * waiting for this to complete never called. + */ + ctgtq.emplace(this, + 0 /* target flush_seq */, + target_committed_seq_num /* target commit_seq */, + task, + conditional_variable); + } +} + +/** + * Must be called with flush_lock() held. + */ +void fcsm::ensure_flush(uint64_t write_off, + uint64_t write_len, + struct rpc_task *task, + std::atomic *conditional_variable) +{ + assert(inode->is_flushing); + /* + * If any of the flush/commit targets are waiting completion, state machine + * must be running. + */ + assert(is_running() || (ctgtq.empty() && ftgtq.empty())); + assert(is_running() || (flushed_seq_num == flushing_seq_num)); + assert(is_running() || (committed_seq_num == committing_seq_num)); + + AZLogDebug("[{}] [FCSM] ensure_flush<{}> write req [{}, {}], task: {}", + inode->get_fuse_ino(), + task ? "blocking" : "non-blocking", + write_off, write_off + write_len, + fmt::ptr(task)); // flushed_seq_num can never be more than flushing_seq_num. assert(flushed_seq_num <= flushing_seq_num); -#ifndef NDEBUG if (task) { // task provided must be a frontend write task. assert(task->magic == RPC_TASK_MAGIC); assert(task->get_op_type() == FUSE_WRITE); assert(task->rpc_api->write_task.is_fe()); assert(task->rpc_api->write_task.get_size() > 0); - // write_len and write_off must match that of the task. assert(task->rpc_api->write_task.get_size() == write_len); assert(task->rpc_api->write_task.get_offset() == (off_t) write_off); } -#endif - - /* - * Grab flush_lock to atomically get list of dirty chunks, which are not - * already being flushed. This also protects us racing with a truncate - * call and growing the file size after truncate shrinks the file. - */ - inode->flush_lock(); /* * What will be the flushed_seq_num value after all current dirty bytes are * flushed? That becomes our target flushed_seq_num. + * Since bytes_chunk_cache::{bytes_dirty,bytes_flushing} are not updated + * inside flush_lock, we can have race conditions where later values of + * target_flushed_seq_num may be less than what we have already queued in + * the latest flush target. In such case, just wait for the larger value. */ const uint64_t bytes_to_flush = inode->get_filecache()->get_bytes_to_flush(); - const uint64_t target_flushed_seq_num = flushing_seq_num + bytes_to_flush; + const uint64_t last_flush_seq = + !ftgtq.empty() ? ftgtq.front().flush_seq : 0; + const uint64_t target_flushed_seq_num = + std::max((flushing_seq_num + bytes_to_flush), last_flush_seq); /* * If the state machine is already running, we just need to add an @@ -171,40 +635,53 @@ void fcsm::ensure_flush(uint64_t flush_bytes, } } #endif + + /* + * If no task and no new flush target, don't add a dup target. + */ + if (!task && conditional_variable && + (target_flushed_seq_num == last_flush_seq)) { + return; + } + ftgtq.emplace(this, target_flushed_seq_num /* target flush_seq */, 0 /* commit_seq */, - task); - inode->flush_unlock(); + task, + conditional_variable); return; } /* * FCSM not running. */ - uint64_t bytes; - std::vector bc_vec = - inode->get_filecache()->get_dirty_nonflushing_bcs_range(0, UINT64_MAX, - &bytes); - assert(bc_vec.empty() == (bytes == 0)); - - /* - * FCSM not running and nothing to flush, complete the task and return. - * Note that moe dirty data can be added after we get the list of dirty - * and non-flushing bcs above, but they can be flushed at a later point. - */ - if (bytes == 0) { - inode->flush_unlock(); + assert(flushed_seq_num == flushing_seq_num); + assert(target_flushed_seq_num >= flushing_seq_num); + // No new data to flush. + if (target_flushed_seq_num == flushed_seq_num) { if (task) { - AZLogDebug("[{}] [FCSM] not running and nothing to flush, " - "completing fuse write [{}, {}) rightaway", - inode->get_fuse_ino(), write_off, write_off+write_len); - task->reply_write(write_len); + task->reply_write(task->rpc_api->write_task.get_size()); + } + + if (conditional_variable) { + *conditional_variable = true; } return; } + uint64_t bytes; + std::vector bc_vec; + + if (inode->is_stable_write()) { + bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range( + 0, UINT64_MAX, &bytes); + } else { + bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(&bytes); + } + assert(bc_vec.empty() == (bytes == 0)); + assert(bytes > 0); + /* * Kickstart the state machine. * Since we pass the 3rd arg to sync_membufs, it tells sync_membufs() @@ -216,22 +693,235 @@ void fcsm::ensure_flush(uint64_t flush_bytes, * writes complete before sync_membufs() can return. * DO NOT access rpc_task after sync_membufs() call. */ - AZLogDebug("[{}] [FCSM] kicking, flushing_seq_num now: {}", + AZLogDebug("[{}] [FCSM] kicking, flushing_seq_num now: {} " + "flushed_seq_num: {}", inode->get_fuse_ino(), - flushing_seq_num.load()); + flushing_seq_num.load(), + flushed_seq_num.load()); -#ifndef NDEBUG + [[maybe_unused]] const uint64_t flushing_seq_num_before = flushing_seq_num; -#endif assert(flushed_seq_num <= flushing_seq_num); - // sync_membufs() will update flushing_seq_num() and mark fcsm running. - inode->sync_membufs(bc_vec, false /* is_flush */, task); + /* + * sync_membufs() will update flushing_seq_num() and mark fcsm running. + * Task is not passed to sync_membufs, but enqueued to ftgtq. + */ + inode->sync_membufs(bc_vec, false /* is_flush */, nullptr); assert(is_running()); assert(flushing_seq_num == (flushing_seq_num_before + bytes)); assert(flushed_seq_num <= flushing_seq_num); + /* + * Enqueue this target, on_flush_complete() completes + * this target. Otherwise task/conditional_variable + * waiting for this to complete never called. + */ + ftgtq.emplace(this, + target_flushed_seq_num /* target flush_seq */, + 0 /* commit_seq */, + task, + conditional_variable); +} + +/** + * TODO: We MUST ensure that on_commit_complete() doesn't block else it'll + * block a libnfs thread which may stall further request processing + * which may cause deadlock. + */ +void fcsm::on_commit_complete(uint64_t commit_bytes) +{ + // Commit must be called only for unstable writes. + assert(!inode->is_stable_write()); + + // Must be called only for success. + assert(inode->get_write_error() == 0); + assert(commit_bytes > 0); + + // Must be called from flush/write callback. + assert(fc_cb_running()); + + // Commit callback can only be called if FCSM is running. + assert(is_running()); + + /* + * Commit callback can be called only when commit is in progress, clear + * it now. Must do it before grabbing the flush_lock, note that + * wait_for_ongoing_commit() is waiting for commit-in-progress to be + * cleared, with flush_lock held. + */ + assert(inode->is_commit_in_progress()); + inode->clear_commit_in_progress(); + + // If commit is running, flush cannot be running. + assert(!inode->get_filecache()->is_flushing_in_progress()); + + // commit_pending_bytes must be 0 here. + assert(inode->get_filecache()->get_bytes_to_commit() == 0); + + // a byte can only be committed after it's flushed successfully. + assert(committing_seq_num <= flushed_seq_num); + assert(committing_seq_num <= flushing_seq_num); + + // Update committed_seq_num to account for the commit_bytes. + committed_seq_num += commit_bytes; + + /* + * When a commit completes it commits everything that has been flushed + * till now also whatever has been scheduled for commit. + */ + assert(flushed_seq_num == committed_seq_num); + assert(committed_seq_num == committing_seq_num); + + AZLogDebug("[{}] [FCSM] on_commit_complete({}), Fd: {}, Fing: {}, " + "Cd: {}, Cing: {}, Fq: {}, Cq: {}, bytes_flushing: {}", + inode->get_fuse_ino(), + commit_bytes, + flushed_seq_num.load(), + flushing_seq_num.load(), + committed_seq_num.load(), + committing_seq_num.load(), + ftgtq.size(), + ctgtq.size(), + inode->get_filecache()->bytes_flushing.load()); + + inode->flush_lock(); + + /* + * This can only come here with stable write true when + * switch_to_stable_write() was waiting for ongoing commits to complete + * and it went ahead and set inode stable write after we cleared the + * commit_in_progress above. + */ + assert(!inode->is_stable_write() || ctgtq.empty()); + + /* + * Go over all queued commit targets to see if any can be completed after + * the latest commit completed. + */ + while (!ctgtq.empty()) { + struct fctgt& tgt = ctgtq.front(); + + assert(tgt.fcsm == this); + + /* + * ftgtq has commit targets in increasing order of committed_seq_num, so + * as soon as we find one that's greater than committed_seq_num, we can + * safely skip the rest. + */ + if (tgt.commit_seq > committed_seq_num) { + break; + } + + if (tgt.task) { + assert(tgt.task->magic == RPC_TASK_MAGIC); + assert(tgt.task->get_op_type() == FUSE_WRITE); + assert(tgt.task->rpc_api->write_task.is_fe()); + assert(tgt.task->rpc_api->write_task.get_size() > 0); + + AZLogInfo("[{}] [FCSM] completing blocking commit target: {}, " + "committed_seq_num: {}, write task: [{}, {})", + inode->get_fuse_ino(), + tgt.commit_seq, + committed_seq_num.load(), + tgt.task->rpc_api->write_task.get_offset(), + tgt.task->rpc_api->write_task.get_offset() + + tgt.task->rpc_api->write_task.get_size()); + + tgt.task->reply_write( + tgt.task->rpc_api->write_task.get_size()); + } else if (tgt.conditional_variable) { + assert(*tgt.conditional_variable == false); + *tgt.conditional_variable = true; + } else { + AZLogDebug("[{}] [FCSM] completing non-blocking commit target: {}, " + "committed_seq_num: {}", + inode->get_fuse_ino(), + tgt.commit_seq, + committed_seq_num.load()); + } + + // Commit target accomplished, remove from queue. + ctgtq.pop(); + } + + /* + * See if we have more commit targets and issue flush for the same. + */ + if (!ftgtq.empty() || !ctgtq.empty()) { + /* + * If we have any commit target here it must have commit_seq greater + * than committed_seq_num, else it would have been completed by the + * above loop. + * If we have any flush target it must have flush_seq greater than + * flushed_seq_num. This is because commit would have started after + * the flush and we would have completed all eligible flush targets. + */ + assert(ftgtq.empty() || ftgtq.front().flush_seq > flushed_seq_num); + assert(ctgtq.empty() || ctgtq.front().commit_seq > committed_seq_num); + + uint64_t bytes; + std::vector bc_vec; + + /* + * This means we are here after switch_to_stable_write() switched to + * stable, we need to handle that. + */ + if (inode->is_stable_write()) { + assert(ctgtq.empty()); + bc_vec = + inode->get_filecache()->get_dirty_nonflushing_bcs_range( + 0, UINT64_MAX, &bytes); + // Here bc_vec can be empty, sync_membufs() can handle that. + } else { + bc_vec = + inode->get_filecache()->get_contiguous_dirty_bcs(&bytes); + /* + * Since we have a flush target asking more data to be flushed, we + * must have the corresponding bcs in the file cache. + * + * Note: We cannot have this assert for the stable write case, as + * sync_membufs() that called switch_to_stable_write(), might + * have consumed all these bcs and marked them flushing. When + * we come here we won't find any dirty-and-not-flushing bcs. + */ + assert(!bc_vec.empty()); + assert(bytes > 0); + } + + // flushed_seq_num can never be more than flushing_seq_num. + assert(flushed_seq_num <= flushing_seq_num); + + AZLogDebug("[{}] [FCSM] continuing, flushing_seq_num now: {}, " + "flushed_seq_num: {}", + inode->get_fuse_ino(), + flushing_seq_num.load(), + flushed_seq_num.load()); + + // sync_membufs() will update flushing_seq_num() and mark fcsm running. + [[maybe_unused]] + const uint64_t prev_flushing_seq_num = flushing_seq_num; + inode->sync_membufs(bc_vec, false /* is_flush */); + assert(flushing_seq_num == (prev_flushing_seq_num + bytes)); + } else { + AZLogDebug("[{}] [FCSM] idling, flushed_seq_num now: {}, " + "committed_seq_num: {}", + inode->get_fuse_ino(), + flushed_seq_num.load(), + committed_seq_num.load()); + + // FCSM should not idle when there's any ongoing flush or commit. + assert(flushing_seq_num == flushed_seq_num); + assert(committing_seq_num == committed_seq_num); + assert(flushed_seq_num == committed_seq_num); + + assert(!inode->get_filecache()->is_flushing_in_progress()); + assert(!inode->is_commit_in_progress()); + + clear_running(); + } + inode->flush_unlock(); } @@ -248,11 +938,27 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) // Must be called only for success. assert(inode->get_write_error() == 0); assert(flush_bytes > 0); + // Must be called from flush/write callback. assert(fc_cb_running()); + // See below why we cannot assert this. +#if 0 // Flush callback can only be called if FCSM is running. - assert(is_running()); + assert(is_running); +#endif + + /* + * Commit will only be run after current flush completes. + * Since we are inside flush completion callback, commit cannot be + * running yet. + */ + assert(!inode->is_commit_in_progress()); + + // a byte can only be committed after it's flushed successfully. + assert(committing_seq_num <= flushed_seq_num); + assert(committed_seq_num <= committing_seq_num); + assert(committing_seq_num <= flushing_seq_num); // Update flushed_seq_num to account for the newly flushed bytes. flushed_seq_num += flush_bytes; @@ -260,12 +966,17 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) // flushed_seq_num can never go more than flushing_seq_num. assert(flushed_seq_num <= flushing_seq_num); - AZLogDebug("[{}] [FCSM] on_flush_complete({}), flushed_seq_num now: {}, " - "flushing_in_progress: {}", + AZLogDebug("[{}] [FCSM] on_flush_complete({}), Fd: {}, Fing: {}, " + "Cd: {}, Cing: {}, Fq: {}, Cq: {}, bytes_flushing: {}", inode->get_fuse_ino(), flush_bytes, flushed_seq_num.load(), - inode->get_filecache()->is_flushing_in_progress()); + flushing_seq_num.load(), + committed_seq_num.load(), + committing_seq_num.load(), + ftgtq.size(), + ctgtq.size(), + inode->get_filecache()->bytes_flushing.load()); /* * If this is not the last completing flush (of the multiple parallel @@ -283,13 +994,27 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) * Multiple libnfs (callback) threads can find is_flushing_in_progress() * return false. The first one to get the flush_lock, gets to run the * queued flush targets which includes completing the waiting tasks and/or - * trigger pending flush/commit. + * trigger pending flush/commit. Other flush callback threads which get + * the lock after the first one, should simply return. They check for + * one of the following conditions to avoid duplicating work: + * 1. The first one didn't find anything to do, so it stopped the FSCM. + * 2. The first one triggered a flush target. + * 3. The first one triggered a commit target. */ - if (inode->get_filecache()->is_flushing_in_progress()) { + if (inode->get_filecache()->is_flushing_in_progress() || + inode->is_commit_in_progress() || + !is_running()) { + assert(is_running() || ftgtq.empty()); inode->flush_unlock(); return; } + /* + * Entire flush is done and no new flush can start, so flushed_seq_num must + * match flushing_seq_num. + */ + assert(flushed_seq_num == flushing_seq_num); + /* * Go over all queued flush targets to see if any can be completed after * the latest flush completed. @@ -314,17 +1039,20 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) assert(tgt.task->rpc_api->write_task.is_fe()); assert(tgt.task->rpc_api->write_task.get_size() > 0); - AZLogDebug("[{}] [FCSM] completing blocking flush target: {}, " - "flushed_seq_num: {}, write task: [{}, {})", - inode->get_fuse_ino(), - tgt.flush_seq, - flushed_seq_num.load(), - tgt.task->rpc_api->write_task.get_offset(), - tgt.task->rpc_api->write_task.get_offset() + - tgt.task->rpc_api->write_task.get_size()); + AZLogInfo("[{}] [FCSM] completing blocking flush target: {}, " + "flushed_seq_num: {}, write task: [{}, {})", + inode->get_fuse_ino(), + tgt.flush_seq, + flushed_seq_num.load(), + tgt.task->rpc_api->write_task.get_offset(), + tgt.task->rpc_api->write_task.get_offset() + + tgt.task->rpc_api->write_task.get_size()); tgt.task->reply_write( tgt.task->rpc_api->write_task.get_size()); + } else if (tgt.conditional_variable) { + assert(*tgt.conditional_variable == false); + *tgt.conditional_variable = true; } else { AZLogDebug("[{}] [FCSM] completing non-blocking flush target: {}, " "flushed_seq_num: {}", @@ -338,22 +1066,69 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) } /* - * See if we have more flush targets and check if the next flush target - * has its flush issued. If yes, then we need to wait for this flush to - * complete and we will take stock of flush targets when that completes. + * We just completed a flush. See if we have some commit targets that we + * should trigger now. A commit target can only be triggered if we have + * flushed all bytes till the commit target. + * We check commit target before any other flush targets as committing + * helps us free memory. */ - if (!ftgtq.empty() && (ftgtq.front().flush_seq > flushing_seq_num)) { + if (!ctgtq.empty() && (flushed_seq_num >= ctgtq.front().commit_seq)) { + assert(!inode->is_stable_write()); + uint64_t bytes; std::vector bc_vec = - inode->get_filecache()->get_dirty_nonflushing_bcs_range( - 0, UINT64_MAX, &bytes); + inode->get_filecache()->get_commit_pending_bcs(&bytes); + assert(bc_vec.empty() == (bytes == 0)); + + /* + * Since we have a commit target asking more data to be committed, we + * must have the corresponding bcs in the file cache. + */ + assert(!bc_vec.empty()); + assert(bytes > 0); + + /* + * commit_membufs() must increase committing_seq_num exactly by bytes, + * as all the bcs in bc_vec should be committed. + */ + [[maybe_unused]] + const uint64_t prev_committing_seq_num = committing_seq_num; + inode->commit_membufs(bc_vec); + assert(committing_seq_num == (prev_committing_seq_num + bytes)); + + } else if ((!ftgtq.empty() && (ftgtq.front().flush_seq > flushing_seq_num)) || + (!ctgtq.empty() && (ctgtq.front().commit_seq > flushing_seq_num))) { + /* + * Nothing to commit, or what we want to commit has not yet flushed + * successfully. Do we want to flush more? We check two things: + * 1. Is there an explicit flush target which has not yet started + * flushing? + * 2. Is there an commit target which implies flushing? + * + * If the next flush or commit target has its flush issued, then we + * just have to wait for that flush to complete and then we will decide + * the next action, else issue it now. + */ + uint64_t bytes; + std::vector bc_vec; + if (inode->is_stable_write()) { + bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range( + 0, UINT64_MAX, + &bytes); + } else { + bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(&bytes); + } /* * Since we have a flush target asking more data to be flushed, we must * have the corresponding bcs in the file cache. */ assert(!bc_vec.empty()); - assert(bytes >= (ftgtq.front().flush_seq - flushing_seq_num)); + // We should flush all the dirty data in the chunkmap. + const uint64_t next_goal = + std::max((ftgtq.empty() ? 0 : ftgtq.front().flush_seq), + (ctgtq.empty() ? 0 : ctgtq.front().commit_seq)); + assert(bytes >= (next_goal - flushing_seq_num)); // flushed_seq_num can never be more than flushing_seq_num. assert(flushed_seq_num <= flushing_seq_num); @@ -364,10 +1139,16 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) flushing_seq_num.load(), flushed_seq_num.load()); - // sync_membufs() will update flushing_seq_num() and mark fcsm running. + // sync_membufs() will update flushing_seq_num. + [[maybe_unused]] + const uint64_t prev_flushing_seq_num = flushing_seq_num; inode->sync_membufs(bc_vec, false /* is_flush */); - assert(flushing_seq_num >= bytes); - } else if (ftgtq.empty()) { + assert(flushing_seq_num == (prev_flushing_seq_num + bytes)); + } else if (ftgtq.empty() && ctgtq.empty()) { + /* + * No flush to issue, if we don't have any to wait for, then we can + * stop the state machine. + */ AZLogDebug("[{}] [FCSM] idling, flushing_seq_num now: {}, " "flushed_seq_num: {}", inode->get_fuse_ino(), @@ -375,13 +1156,20 @@ void fcsm::on_flush_complete(uint64_t flush_bytes) flushed_seq_num.load()); // FCSM should not idle when there's any ongoing flush. - assert(flushing_seq_num == flushed_seq_num); + assert(flushing_seq_num >= flushed_seq_num); /* - * No more flush targets, pause the state machine. - * TODO: We need to clear running when flush fails. + * TODO: Modify flush_cache_and_wait() to also use the FCSM for + * performing the flush. Then we have any flush or commit + * only peformed by the state machine. */ + assert(!inode->get_filecache()->is_flushing_in_progress()); + assert(!inode->is_commit_in_progress()); + clear_running(); + } else { + AZLogCrit("Should not reach here"); + assert(0); } inode->flush_unlock(); diff --git a/turbonfs/src/file_cache.cpp b/turbonfs/src/file_cache.cpp index 46040037..30c488c9 100644 --- a/turbonfs/src/file_cache.cpp +++ b/turbonfs/src/file_cache.cpp @@ -601,7 +601,8 @@ void membuf::clear_flushing() bcc->bytes_flushing_g -= length; AZLogDebug("Clear flushing membuf [{}, {}), fd={}", - offset.load(), offset.load()+length.load(), backing_file_fd); + offset.load(), offset.load()+length.load(), + backing_file_fd); } /** @@ -923,6 +924,14 @@ bytes_chunk::bytes_chunk(bytes_chunk_cache *_bcc, assert(get_buffer() != nullptr); } +/** + * Get the inode corresponding to this bc. + */ +struct nfs_inode *bytes_chunk::get_inode() const +{ + return bcc->get_inode(); +} + bytes_chunk_cache::bytes_chunk_cache(struct nfs_inode *_inode, const char *_backing_file_name) : inode(_inode), diff --git a/turbonfs/src/nfs_inode.cpp b/turbonfs/src/nfs_inode.cpp index d7468eff..312ed324 100644 --- a/turbonfs/src/nfs_inode.cpp +++ b/turbonfs/src/nfs_inode.cpp @@ -33,8 +33,8 @@ nfs_inode::nfs_inode(const struct nfs_fh3 *filehandle, assert(client->magic == NFS_CLIENT_MAGIC); assert(write_error == 0); - // TODO: Revert this to false once commit changes integrated. - assert(stable_write == true); + // We start doing unstable writes until proven o/w. + assert(stable_write == false); assert(commit_state == commit_state_t::COMMIT_NOT_NEEDED); #ifndef ENABLE_NON_AZURE_NFS @@ -380,6 +380,222 @@ int nfs_inode::get_actimeo_max() const } } +/* + * Caller should hold flush_lock(). + */ +void nfs_inode::wait_for_ongoing_commit() +{ + assert(is_flushing); + /* + * FCSM must be running, we only FCSM can complete commit that we are + * waiting for. + */ + assert(get_fcsm()->is_running()); + + /* + * TODO: See if we can eliminate inline sleep. + */ + if (is_commit_in_progress()) { + AZLogWarn("[{}] wait_for_ongoing_commit() will sleep inline!!", + get_fuse_ino()); + } + + int iter = 0; + while (is_commit_in_progress()) { + // Flush can commit are mutually exclusive operations. + assert(!get_filecache()->is_flushing_in_progress()); + + if (++iter % 1000 == 0) { + AZLogWarn("[{}] wait_for_ongoing_commit() still waiting, iter: {}", + get_fuse_ino(), iter); + } + ::usleep(1000); + } + + assert(!is_commit_in_progress()); +} + +/* + * This function is called with flush_lock() held. + * This should be called whenever we figure out that we cannot proceed with + * unstable writes (most common reason being, next write is not an append + * write). Once this function returns, following is guaranteed: + * - There will be no flushes in progress. + * - There will be no commit_pending data and no commit inprogress. + * - inode->stable_write will be set to true. + */ +void nfs_inode::switch_to_stable_write() +{ + assert(is_flushing); + assert(!is_stable_write()); + + AZLogInfo("[{}] Switching to stable write", ino); + + /* + * switch_to_stable_write() is called from places where we are about to + * start a flush operation. Before that we check to see if we need to + * change to stable write. Since we are not flushing yet and since we + * do not support multiple ongoing flushes, we are guaranteed that no + * flush should be in progress when we reach here. + * Similarly commit should not be in progress as flush and commit are + * mutually exclusive. + */ + assert(!is_commit_in_progress()); + assert(!get_filecache()->is_flushing_in_progress()); + + /* + * Check if there is anything to commit, if not then simply update the + * inode state to "doing stable writes". + */ + if (get_filecache()->get_bytes_to_commit() == 0) { + AZLogDebug("[{}] Nothing to commit, switching to stable write", ino); + + set_stable_write(); + + /* + * Now we moved to stable write, cleanup the commit target queue. + */ + get_fcsm()->ctgtq_cleanup(); + return; + } + + /* + * There is some commit_pending data that we need to commit before we can + * make the switch to stable writes. + */ + + uint64_t bytes; + std::vector bc_vec = + get_filecache()->get_commit_pending_bcs(&bytes); + assert(bc_vec.empty() == (bytes == 0)); + assert(bytes > 0); + + /* + * Since switch_to_stable_write() can be called from libnfs threads + * too, we have a risk that it may block the only libnfs thread and + * everything will stall. To avoid this, let's not try to commit, but + * instead make all the commit_pending data back to dirty. + */ + for (bytes_chunk& bc : bc_vec) { + [[maybe_unused]] struct membuf *mb = bc.get_membuf(); + assert(mb != nullptr); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_uptodate()); + assert(mb->is_commit_pending()); + assert(!mb->is_dirty()); + + /* + * Clear the commit_pending bit, now these are rewritten + * as stable writes. + */ + mb->clear_commit_pending(); + mb->set_dirty(); + mb->clear_locked(); + mb->clear_inuse(); + } + + assert(get_filecache()->get_bytes_to_commit() == 0); + assert(!get_filecache()->is_flushing_in_progress()); + assert(!is_commit_in_progress()); + + set_stable_write(); + + putblock_filesize = 0; + + /* + * Now we moved to stable write, cleanup the commit target queue. + */ + get_fcsm()->ctgtq_cleanup(); + return; +} + +/* + * This function checks whether we need to switch to stable write or not. + */ +bool nfs_inode::check_stable_write_required(off_t offset) +{ + // Caller must hold the flush_lock. + assert(is_flushing); + assert(offset <= (off_t) AZNFSC_MAX_FILE_SIZE); + + /* + * If stable_write is already set, we don't need to do anything. + * We don't need lock here as once stable_write is set it's never + * unset. + */ + if (is_stable_write()) { + return false; + } + + /* + * If current write is not append write, then we can't go for unstable writes + * It may be overwrite to existing data and we don't have the knowldege of + * existing block list, it maye require read modified write. So, we can't go + * for unstable write. Similarly, if the offset is more than end of the file, + * we need to write zero block in between the current end of the file and the + * offset. + */ + if (putblock_filesize != offset) { + AZLogInfo("[{}] Non-append write detected (expected: {}, got: {}), " + "will switch to stable writes", + get_fuse_ino(), putblock_filesize, offset); + + return true; + } + + return false; +} + +/** + * commit_membufs() is called by writer thread to commit flushed membufs. + * It's always issued under flush_lock(). + */ +void nfs_inode::commit_membufs(std::vector& bc_vec) +{ + assert(is_flushing); + + /* + * Commit is only called by FCSM, either callback of completion + * or inline as part of ensure_commit. + */ + assert(get_fcsm()->is_running() || !get_fcsm()->fc_cb_count()); + + set_commit_in_progress(); + + uint64_t prev_offset = 0; + for (bytes_chunk& bc : bc_vec) { + [[maybe_unused]] struct membuf *mb = bc.get_membuf(); + assert(mb != nullptr); + assert(mb->is_inuse()); + assert(mb->is_commit_pending()); + + if (prev_offset == 0) { + prev_offset = bc.offset + bc.length; + } else { + // Caller must pass us contiguous membufs for committing. + assert(prev_offset == bc.offset); + prev_offset += bc.length; + } + + get_fcsm()->add_committing(bc.length); + get_fcsm()->mark_running(); + } + + /* + * Create the commit task to carry out the write. + */ + struct rpc_task *commit_task = + get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_FLUSH); + // XXX Do we need to ever call flush with fuse_req? + commit_task->init_flush(nullptr /* fuse_req */, ino); + assert(commit_task->rpc_api->pvt == nullptr); + + commit_task->rpc_api->pvt = static_cast(new std::vector(bc_vec)); + + commit_task->issue_commit_rpc(); +} + /** * Note: We dispatch WRITE RPCs as we gather full wsize sized data bytes, * while there may be more bcs that we have not yet processed. This means @@ -394,6 +610,20 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, // Caller must hold the flush_lock. assert(is_flushing); + if (!is_stable_write()) { + /* + * We do not allow a new flush while there's an ongoing one, in case + * of unstable writes. + */ + assert(!get_filecache()->is_flushing_in_progress()); + } + + /* + * Stable won't have commit and for unstable we cannot flush while + * commit is going on. + */ + assert(!is_commit_in_progress()); + if (bc_vec.empty()) { return; } @@ -422,6 +652,14 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, parent_task->num_ongoing_backend_writes = 1; } + /* + * If the new data being written is not right after the last one written + * we need to switch to stable write. + */ + if (check_stable_write_required(bc_vec[0].offset)) { + switch_to_stable_write(); + } + /* * Create the flush task to carry out the write. */ @@ -559,11 +797,21 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, get_fcsm()->mark_running(); } + /* + * XXX Add an assert that unstable writes should only have contiguous + * bcs . + */ + /* * Add as many bytes_chunk to the write_task as it allows. * Once packed completely, then dispatch the write. */ if (write_task->add_bc(bc)) { + if (!is_stable_write()) { + putblock_filesize += bc.length; + } else { + assert(putblock_filesize == 0); + } continue; } else { /* @@ -589,6 +837,12 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, // Single bc addition should not fail. [[maybe_unused]] bool res = write_task->add_bc(bc); assert(res == true); + + if (!is_stable_write()) { + putblock_filesize += bc.length; + } else { + assert(putblock_filesize == 0); + } } } @@ -818,10 +1072,8 @@ int nfs_inode::copy_to_cache(const struct fuse_bufvec* bufv, * It will release the flush_lock if it has to wait for flush to * complete. Before returning it'll re-acquire the flush_lock. */ -int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) +int nfs_inode::wait_for_ongoing_flush() { - assert(start_off < end_off); - // Caller must call us with flush_lock held. assert(is_flushing); @@ -845,9 +1097,26 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) * Flushing not in progress and no new flushing can be started as we hold * the flush_lock(), and callback drained. */ - if (!get_filecache()->is_flushing_in_progress() && - !get_fcsm()->fc_cb_running()) { - AZLogDebug("[{}] No flush in progress, returning", ino); + /* + * Stable writes do not need commit, so no commit inprogress and no pending + * commit data. + */ + if (is_stable_write()) { + assert(!is_commit_in_progress()); + assert(get_filecache()->bytes_commit_pending == 0); + } + + if (get_filecache()->is_flushing_in_progress()) { + assert(!is_commit_in_progress()); + } else if (!is_commit_in_progress() && + !get_fcsm()->fc_cb_running() && + (get_filecache()->bytes_commit_pending == 0)) { + /* + * Flushing not in progress and no new flushing can be started as we hold + * the flush_lock(), and callback drained. + * No commit inprogress and no pending commit data, return. + */ + AZLogDebug("[{}] No flush or commit in progress, returning", ino); return 0; } @@ -873,7 +1142,7 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) * new dirty bytes_chunks created but we don't want to wait for those. */ std::vector bc_vec = - filecache_handle->get_flushing_bc_range(start_off, end_off); + filecache_handle->get_flushing_bc_range(); /* * Nothing to flush and callback drained, job done! @@ -979,14 +1248,73 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) assert(is_flushing); assert(!get_fcsm()->fc_cb_running()); - return err; + /* + * Unstable write case, we need to wait for the commit to complete. + */ + if (get_filecache()->get_bytes_to_commit() > 0) { + assert(!is_stable_write()); + + if (!is_commit_in_progress()) { + uint64_t bytes = 0; + std::vector bc_vec = + get_filecache()->get_commit_pending_bcs(&bytes); + assert(bc_vec.empty() == (bytes == 0)); + assert(bytes > 0); + + /* + * Issue the commit RPC to commit the pending data. + */ + commit_membufs(bc_vec); + } + } + + if (is_commit_in_progress()) { + wait_for_ongoing_commit(); + } + + assert(!is_commit_in_progress()); + assert(!get_filecache()->is_flushing_in_progress()); + + return get_write_error(); } /** - * Note: This takes shared lock on ilock_1. + * flush_cache_and_wait() is called only from the release/flush call. + * It's called with flush_lock() held. + * Things it does: + * - Before flushing it needs to wait for any ongoing commit to complete. + * - First it try to get membufs through get_contiguous_dirty_bcs() or + * get_dirty_bc_range() based on the unstable write or stable write. + * - Issue flush for the dirty membufs and wait for the flush to complete. + * - If it's unstable write, it issue commit for commit pending membufs. + * - Wait for the commit to complete. + * - Returns the error code if any. + * + * Note: Flush_cache_and_wait() blocks the fuse thread till the flush completes. + * It's called from the release(), flush() and getattr() calls. It's ok + * as of now as it's not very often called. We can optimize to complete + * the flush in background and return immediately. For that we need to add + * special handling for the getattr() call. */ -int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) +int nfs_inode::flush_cache_and_wait() { + /* + * If flush() is called w/o open(), there won't be any cache, skip. + */ + if (!has_filecache()) { + return 0; + } + + /* + * TODO: Let's use ensure_flush()/ensure_commit() here so perform the + * flush+commit using the FCSM. It can convey pointer to an atomic + * boolean which the callback can set and it can wait here. + */ + if (is_stable_write()) { + assert(get_filecache()->bytes_commit_pending == 0); + assert(!is_commit_in_progress()); + } + /* * MUST be called only for regular files. * Leave the assert to catch if fuse ever calls flush() on non-reg files. @@ -1009,89 +1337,44 @@ int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) } /* - * If flush() is called w/o open(), there won't be any cache, skip. - */ - if (!has_filecache()) { - return 0; - } - - /* - * Grab the inode flush_lock to ensure that we don't initiate any new flush - * operation while some truncate call is in progress (which must have taken - * the flush_lock). - * Once flush_lock() returns we have the flush_lock and we are + * Grab the flush_lock to ensure that we don't initiate any new flushes + * while some truncate call is in progress (which must have taken the + * flush_lock). Once flush_lock() returns we have the lock and we are * guaranteed that no new truncate operation can start till we release * the flush_lock. We can safely start the flush then. */ flush_lock(); - /* - * Get the dirty bytes_chunk from the filecache handle. - * This will grab an exclusive lock on the file cache and return the list - * of dirty bytes_chunks at that point. Note that we can have new dirty - * bytes_chunks created but we don't want to wait for those. - */ - std::vector bc_vec = - filecache_handle->get_dirty_bc_range(start_off, end_off); /* - * sync_membufs() iterate over the bc_vec and starts flushing the dirty - * membufs. It batches the contiguous dirty membufs and issues a single - * write RPC for them. + * Wait for ongoing flush/commit to complete, so that + * ensure_flush() able to get right dirty_bytes, as + * bytes_flushing = 0, and nobody changes those values. */ - sync_membufs(bc_vec, true); - - flush_unlock(); + wait_for_ongoing_flush(); + std::atomic_bool complete = false; /* - * Our caller expects us to return only after the flush completes. - * Wait for all the membufs to flush and get result back. + * Set the flush target for both stable/unstable write. + * ensure_commit() doesn't set target for whole unflushed + * bytes. */ - for (bytes_chunk &bc : bc_vec) { - struct membuf *mb = bc.get_membuf(); - - assert(mb != nullptr); - assert(mb->is_inuse()); - mb->set_locked(); - - /* - * If still dirty after we get the lock, it may mean two things: - * - Write failed. - * - Some other thread got the lock before us and it made the - * membuf dirty again. - */ - if (mb->is_dirty() && get_write_error()) { - AZLogError("[{}] Flush [{}, {}) failed with error: {}", - ino, - bc.offset, bc.offset + bc.length, - get_write_error()); - } - - mb->clear_locked(); - mb->clear_inuse(); - - /* - * Release the bytes_chunk back to the filecache. - * These bytes_chunks are not needed anymore as the flush is done. - * - * Note: We come here for bytes_chunks which were found dirty by the - * above loop. These writes may or may not have been issued by - * us (if not issued by us it was because some other thread, - * mostly the writer issued the write so we found it flushing - * and hence didn't issue). In any case since we have an inuse - * count, release() called from write_callback() would not have - * released it, so we need to release it now. - */ - filecache_handle->release(bc.offset, bc.length); + if (is_stable_write()) { + get_fcsm()->ensure_flush(0, 0, nullptr, &complete); + } else { + get_fcsm()->ensure_commit(0, 0, nullptr, &complete, true); } + flush_unlock(); - /* - * If the file is deleted while we still have data in the cache, don't - * treat it as a failure to flush. The file has gone and we don't really - * care about the unwritten data. - */ - if (get_write_error() == ENOENT || get_write_error() == ESTALE) { - return 0; + // Wait for flush to complete. + int iter = 0; + while (complete != true) { + if (++iter % 1000 == 0) { + AZLogWarn("[{}] flush_cache_and_wait() waiting for ongoing" + " flush to complete still waiting, iter: {}", + get_fuse_ino(), iter); + } + ::usleep(1000); } return get_write_error(); @@ -1152,6 +1435,11 @@ void nfs_inode::truncate_end(size_t size) const [[maybe_unused]] const uint64_t bytes_truncated = filecache_handle->truncate(size, true /* post */); + /* + * Update the in cache putblock_filesize to reflect the new size. + */ + putblock_filesize = size; + AZLogDebug("[{}] Filecache truncated to size={} " "(bytes truncated: {})", ino, size, bytes_truncated); @@ -1207,7 +1495,7 @@ bool nfs_inode::truncate_start(size_t size) */ flush_lock(); - wait_for_ongoing_flush(0, UINT64_MAX); + wait_for_ongoing_flush(); AZLogDebug("[{}] Ongoing flush operations completed", ino); diff --git a/turbonfs/src/rpc_task.cpp b/turbonfs/src/rpc_task.cpp index 8632e631..3836d20e 100644 --- a/turbonfs/src/rpc_task.cpp +++ b/turbonfs/src/rpc_task.cpp @@ -828,6 +828,268 @@ void access_callback( } } +static void commit_callback( + struct rpc_context *rpc, + int rpc_status, + void *data, + void *private_data) +{ + rpc_task *task = (rpc_task*) private_data; + + /* + * Commit is issued as a FUSE_FLUSH. + * TODO: Maybe it should have a type of its own. + * + * Note that flush/write is issued as FUSE_WRITE. + */ + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_FLUSH); + // rpc_api->pvt must contain the list of bcs which were committed. + assert(task->rpc_api->pvt != nullptr); + // Commit is never called for a fuse request. + assert(task->get_fuse_req() == nullptr); + + auto res = (COMMIT3res*) data; + + INJECT_JUKEBOX(res, task); + + const fuse_ino_t ino = task->rpc_api->flush_task.get_ino(); + struct nfs_inode *inode = + task->get_client()->get_nfs_inode_from_ino(ino); + // List of bcs committed by this commit call. + auto bc_vec_ptr = (std::vector *) task->rpc_api->pvt; + + // We should call commit only when inode is doing unstable+commit. + assert(!inode->is_stable_write()); + // Inode must be marked "commit in progress". + assert(inode->is_commit_in_progress()); + // Commit and flush/write are exclusive. + assert(!inode->get_filecache()->is_flushing_in_progress()); + // bytes_commit_pending will be reduced later below. + assert(inode->get_filecache()->get_bytes_to_commit() > 0); + + const int status = task->status(rpc_status, NFS_STATUS(res)); + UPDATE_INODE_WCC(inode, res->COMMIT3res_u.resok.file_wcc); + + // Set "in commit callback". + FC_CB_TRACKER fccbt(inode); + + AZLogDebug("[{}] commit_callback, number of bc committed: {}", + ino, bc_vec_ptr->size()); + + uint64_t commit_bytes = 0; + /* + * Now that the request has completed, we can query libnfs for the + * dispatch time. + */ + task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUS(res)); + + if (status == 0) { + uint64_t offset = 0; + uint64_t length = 0; + + /* + * Go over all the successfully committed bcs and release them from + * file cache (note that successful commit confirms that server has + * persisted the data and client can fee it). + * Also complete any tasks waiting for these membufs to be committed. + */ + for (auto& bc : *bc_vec_ptr) { + struct membuf *mb = bc.get_membuf(); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_commit_pending()); + assert(mb->is_uptodate()); + // Dirty membufs must not be committed. + assert(!mb->is_dirty()); + + // We should not have a zero length bc. + assert(bc.length > 0); + assert(bc.length <= AZNFSC_MAX_CHUNK_SIZE); + assert((bc.offset + bc.length) <= AZNFSC_MAX_FILE_SIZE); + + mb->clear_commit_pending(); + mb->clear_locked(); + mb->clear_inuse(); + + /** + * Release commited data from file cache, one contiguous range at + * a time. + */ + if (offset == 0 && length == 0) { + offset = bc.offset; + length = bc.length; + } else if (offset + length == bc.offset) { + length += bc.length; + assert(length <= AZNFSC_MAX_FILE_SIZE); + } else { + commit_bytes += length; + const uint64_t released = + inode->get_filecache()->release(offset, length); + AZLogInfo("[{}] commit_callback releasing bc [{}, {}), " + "released {} bytes", + ino, offset, offset+length, released); + offset = bc.offset; + length = bc.length; + } + } + + // Release the last bc not released by the above loop. + if (length != 0) { + commit_bytes += length; + const uint64_t released = + inode->get_filecache()->release(offset, length); + AZLogInfo("[{}] commit_callback releasing bc [{}, {}), " + "released {} bytes", + ino, offset, offset+length, released); + } + } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) { + task->get_client()->jukebox_retry(task); + return; + } else { + /* + * Commit has failed. + * Go over all the bcs that this commit was targetting, mark the + * membufs back as dirty and clear the commit_pending flag. + * Next write will initiate the flush again with stable write. + */ + for (auto& bc : *bc_vec_ptr) { + struct membuf *mb = bc.get_membuf(); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_commit_pending()); + assert(mb->is_uptodate()); + /* + * TODO: What happens if application writes over these + * commit_pending bcs? + */ + assert(!mb->is_dirty()); + + mb->clear_commit_pending(); + mb->set_dirty(); + mb->clear_locked(); + mb->clear_inuse(); + } + + /* + * Set the inode to stable write, so that next write will initiate + * the flush again with stable write. + * There should be no flush in progress at this moment, also since + * we always commit *all* the commit pending bcs, there should not + * be any more commit pending bcs, so we can safely just enable + * stable writes for the inode w/o the elaborate + * switch_to_stable_write(). + */ + assert(inode->get_filecache()->is_flushing_in_progress() == false); + inode->set_stable_write(); + } + +#ifdef ENABLE_PARANOID + /* + * We always pick *all* commit-pending bcs for committing, and while commit + * is going on no other flush can run, hence new commit pending bcs cannot + * be added, so when an ongoing commit completes we we must not have any + * commit pending bcs in the cache. + */ + { + std::vector bc_vec = + inode->get_filecache()->get_commit_pending_bcs(); + assert(bc_vec.empty()); + } +#endif + + if (status == 0) { + assert(commit_bytes > 0); + /* + * Update the commit bytes in the inode. + * This will also clear the commit-in-progress flag in the inode as + * the very first thing. + */ + inode->get_fcsm()->on_commit_complete(commit_bytes); + } else { + // TODO: Add fcsm::on_commit_fail() and call it from here. + // Clear the commit in progress flag. + inode->clear_commit_in_progress(); + assert(0); + } + + delete bc_vec_ptr; + + task->rpc_api->pvt = nullptr; + task->free_rpc_task(); +} + +void rpc_task::issue_commit_rpc() +{ + // Must only be called for a flush task. + assert(get_op_type() == FUSE_FLUSH); + // Commit is never called for a fuse request. + assert(get_fuse_req() == nullptr); + + const fuse_ino_t ino = rpc_api->flush_task.get_ino(); + struct nfs_inode *inode = get_client()->get_nfs_inode_from_ino(ino); + + // Commit must not be called if we are doing stable writes. + assert(!inode->is_stable_write()); + // Commit must mot be sent with ongoing flushes. + assert(!inode->get_filecache()->is_flushing_in_progress()); + // Caller must have marked commit inprogress. + assert(inode->is_commit_in_progress()); + + // Must be called by FCSM. + assert(inode->get_fcsm()->is_running()); + + // List of bcs to be committed by this commit call. + assert(rpc_api->pvt != nullptr); + auto bc_vec_ptr = (std::vector *) rpc_api->pvt; + + assert(bc_vec_ptr->size() > 0); + AZLogDebug("[{}] issue_commit_rpc, num bc: {}", ino, bc_vec_ptr->size()); + +#ifdef ENABLE_PARANOID + // Make sure bcs being committed have correct state. + for (auto& bc : *bc_vec_ptr) { + struct membuf *mb = bc.get_membuf(); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_commit_pending()); + assert(mb->is_uptodate()); + // Dirty membufs must not be committed. + assert(!mb->is_dirty()); + } +#endif + + COMMIT3args args; + ::memset(&args, 0, sizeof(args)); + bool rpc_retry = false; + + args.file = inode->get_fh(); + args.offset = 0; + args.count = 0; + + do { + rpc_retry = false; + stats.on_rpc_issue(); + + if (rpc_nfs3_commit_task(get_rpc_ctx(), + commit_callback, &args, this) == NULL) { + stats.on_rpc_cancel(); + /* + * Most common reason for this is memory allocation failure, + * hence wait for some time before retrying. Also block the + * current thread as we really want to slow down things. + * + * TODO: For soft mount should we fail this? + */ + rpc_retry = true; + + AZLogWarn("rpc_nfs3_write_task failed to issue, retrying " + "after 5 secs!"); + ::sleep(5); + } + } while (rpc_retry); +} + /** * Must be called when bytes_completed bytes are successfully read/written. */ @@ -853,6 +1115,9 @@ void bc_iovec::on_io_complete(uint64_t bytes_completed, bool is_unstable_write) assert(!bcq.empty()); struct bytes_chunk& bc = bcq.front(); + // Caller must not send incorrect value for unstable write. + assert(bc.get_inode()->is_stable_write() == !is_unstable_write); + /* * Absolute offset and length represented by this bytes_chunk. * bc->pvt is adjusted as partial reads/writes complete part of @@ -1085,7 +1350,8 @@ static void write_iov_callback( bciov->orig_offset + bciov->orig_length); // Update bciov after the current write. - bciov->on_io_complete(res->WRITE3res_u.resok.count); + bciov->on_io_complete(res->WRITE3res_u.resok.count, + !inode->is_stable_write()); // Create a new write_task for the remaining bc_iovec. struct rpc_task *write_task = @@ -1122,7 +1388,8 @@ static void write_iov_callback( return; } else { // Complete bc_iovec IO completed. - bciov->on_io_complete(res->WRITE3res_u.resok.count); + bciov->on_io_complete(res->WRITE3res_u.resok.count, + !inode->is_stable_write()); // Complete data writen to blob. AZLogDebug("[{}] <{}> Completed write, off: {}, len: {}", @@ -1205,6 +1472,9 @@ static void write_iov_callback( */ if (inode->get_write_error() == 0) { inode->get_fcsm()->on_flush_complete(bciov->orig_length); + } else { + // TODO: Add fcsm::on_flush_fail() and call it from here. + assert(0); } delete bciov; @@ -1278,7 +1548,16 @@ void rpc_task::issue_write_rpc() args.file = inode->get_fh(); args.offset = offset; args.count = length; - args.stable = FILE_SYNC; + args.stable = inode->is_stable_write() ? FILE_SYNC : UNSTABLE; + + /* + * Unstable writes want to make use of multiple connections for better perf. + * Note that they aren't affected by the optimistic concurrency backoff + * issues as seen by stable writes. + */ + if (!inode->is_stable_write()) { + set_csched(CONN_SCHED_RR); + } do { rpc_retry = false; @@ -2339,7 +2618,6 @@ void rpc_task::run_write() const size_t length = rpc_api->write_task.get_size(); struct fuse_bufvec *const bufv = rpc_api->write_task.get_buffer_vector(); const off_t offset = rpc_api->write_task.get_offset(); - const bool sparse_write = (offset > inode->get_file_size(true)); uint64_t extent_left = 0; uint64_t extent_right = 0; @@ -2430,139 +2708,11 @@ void rpc_task::run_write() assert(extent_right >= (extent_left + length)); /* - * We have successfully copied the user data into the cache. - * We can have the following cases: - * 1. This new write caused a contiguous extent to be greater than - * max_dirty_extent_bytes(), aka MDEB. - * In this case we begin flush of this contiguous extent (in multiple - * parallel wsize sized blocks) since there's no benefit in waiting more - * as the data is sufficient for the server scheduler to effectively - * write, in optimal sized blocks. - * We complete the application write rightaway without waiting for the - * flush to complete as we are not under memory pressure. - * This will happen when user is sequentially writing to the file. - * 2. A single contiguous extent is not greater than MDEB but total dirty - * bytes waiting to be flushed is more than MDEB. - * In this case we begin flush of the entire dirty data from the cache - * as we have sufficient data for the server scheduler to perform - * batched writes effectively. - * We complete the application write rightaway without waiting for the - * flush to complete as we are not under memory pressure. - * This will happen when user is writing to the file in a random or - * pseudo-sequential fashion. - * 3. Cache has dirty data beyond the "inline write" threshold, ref - * do_inline_write(). - * In this case we begin flush of the entire dirty data. - * This is a memory pressure situation and hence we do not complete the - * application write till all the backend writes complete. - * This will happen when user is writing faster than our backend write - * throughput, eventually dirty data will grow beyond the "inline write" - * threshold and then we have to slow down the writers by delaying - * completion. - * - * Other than this we have a special case of "write beyond eof" (termed - * sparse write). In sparse write case also we perform "inline write" of - * all the dirty data. This is needed for correct read behaviour. Imagine - * a reader reading from the sparse part of the file which is not yet in - * the bytes_chunk_cache. This read will be issued to the server and since - * server doesn't know the updated file size (as the write may still be - * sitting in our bytes_chunk_cache) it will return eof. This is not correct - * as such reads issued after successful write, are valid and should return - * 0 bytes for the sparse range. - */ - - /* - * Do we need to perform "inline write"? - * Inline write implies, we flush all the dirty data and wait for all the - * corresponding backend writes to complete. - */ - const bool need_inline_write = - (sparse_write || inode->get_filecache()->do_inline_write()); - - if (need_inline_write) { - INC_GBL_STATS(inline_writes, 1); - - AZLogDebug("[{}] Inline write (sparse={}), {} bytes, extent @ [{}, {})", - ino, sparse_write, (extent_right - extent_left), - extent_left, extent_right); - - /* - * ensure_flush() will arrange to flush all the dirty data and complete - * the write task when the flush completes. - */ - inode->get_fcsm()->ensure_flush(0, offset, length, this); - - // Free up the fuse thread without completing the application write. - return; - } - - /* - * Ok, we don't need to do inline writes. See if we have enough dirty - * data and we need to start async flush. - */ - - /* - * If the extent size exceeds the max allowed dirty size as returned by - * max_dirty_extent_bytes(), then it's time to flush the extent. - * Note that this will cause sequential writes to be flushed at just the - * right intervals to optimize fewer write calls and also allowing the - * server scheduler to merge better. - * See bytes_to_flush for how random writes are flushed. - * - * Note: max_dirty_extent is static as it doesn't change after it's - * queried for the first time. - */ - static const uint64_t max_dirty_extent = - inode->get_filecache()->max_dirty_extent_bytes(); - assert(max_dirty_extent > 0); - - /* - * How many bytes in the cache need to be flushed. These are dirty chunks - * which have not started flushing yet. + * copy_to_cache() has added some more dirty data to the file cache, + * let FCSM know about it. It may want to flush and/or commit if we are + * above some configured limit. */ - const uint64_t bytes_to_flush = - inode->get_filecache()->get_bytes_to_flush(); - - AZLogDebug("[{}] extent_left: {}, extent_right: {}, size: {}, " - "bytes_to_flush: {} (max_dirty_extent: {})", - ino, extent_left, extent_right, - (extent_right - extent_left), - bytes_to_flush, - max_dirty_extent); - - if ((extent_right - extent_left) < max_dirty_extent) { - /* - * Current extent is not large enough to be flushed, see if we have - * enough total dirty data that needs to be flushed. This is to cause - * random writes to be periodically flushed. - */ - if (bytes_to_flush < max_dirty_extent) { - /* - * No memory pressure. - */ - AZLogDebug("Reply write without syncing to Blob"); - reply_write(length); - return; - } - - /* - * This is the case of non-sequential writes causing enough dirty - * data to be accumulated, need to flush all of that. - */ - extent_left = 0; - extent_right = UINT64_MAX; - } - - /* - * Queue a non-blocking flush target for flushing all the dirty data. - */ - inode->get_fcsm()->ensure_flush(0, offset, length, nullptr); - - /* - * Complete the write request without waiting for the backend flush to - * complete. - */ - reply_write(length); + inode->get_fcsm()->run(this, extent_left, extent_right); } void rpc_task::run_flush() @@ -2575,6 +2725,7 @@ void rpc_task::run_flush() * to wait. Waiting in libnfs thread context will cause deadlocks. */ assert(get_fuse_req() != nullptr); + reply_error(inode->flush_cache_and_wait()); }