diff --git a/turbonfs/inc/file_cache.h b/turbonfs/inc/file_cache.h index 33729300..867abe50 100644 --- a/turbonfs/inc/file_cache.h +++ b/turbonfs/inc/file_cache.h @@ -233,10 +233,37 @@ struct membuf * immediately so that it can handle other interactive requests (note that * fuse threads are very limited). Later when the flush/write to Blob * actually completes, it completes the fuse task(s) queued in this list. + * + * Note: Depending on whether a membuf is written using UNSTABLE or STABLE + * write, it will or won't need to be committed. A membuf written + * using UNSTABLE write will be just added to the TBL (Temporary Block + * List) of the Blob and won't be visible to other readers and + * writers, till it's committed (which will add it to the Committed + * Block List, the CBL). + * A membuf written using STABLE write will be committed, i.e., it'll + * be added to the CBL, and won't need an extra commit call. + * So, with UNSTABLE write a membuf will have a flush call to write + * the membuf data and then a commit call to commit it to the Blob, + * (usually) along with more such membufs. Various membufs can be + * flushed in parallel, which allows us to get higher write throughput + * than what a single connection can support. */ std::vector *flush_waiters = nullptr; std::mutex flush_waiters_lock_44; + /** + * Like flush_waiters, commit_waiters is the list of tasks(s) waiting for + * this membuf to be committed, i.e., data in this membuf to be added to + * the CBL (Committed Block List) of the backing Blob. Note that the membuf + * must have started flushing, but it may not have yet completed flushing. + * When the membuf successfully completes flushing, the flush callback will + * start the commit and on completion of commit, the commit_waiters will be + * called. + * Also see flush_waiters. + */ + std::vector *commit_waiters = nullptr; + std::mutex commit_waiters_lock_44; + /** * get_flush_waiters() returns the list of tasks waiting for this * membuf to be flushed to the Blob. All these tasks must be completed @@ -247,6 +274,16 @@ struct membuf */ std::vector get_flush_waiters(); + /** + * get_commit_waiters() returns the list of tasks waiting for this + * membuf to be committed to the Blob. All these tasks must be completed + * appropriately when membuf commit completes (success or failure). + * + * Note: This splices the commit_waiters list and returns, and hence + * it's not idempotent. + */ + std::vector get_commit_waiters(); + /* * If is_file_backed() is true then 'allocated_buffer' is the mmap()ed * address o/w it's the heap allocation address. @@ -1222,6 +1259,31 @@ class bytes_chunk_cache uint64_t length, struct rpc_task *task); + /** + * Add 'task' to the commit_waiters list for membuf covering the region + * [offset, offset+list). If the region is covered by more than one membuf + * then 'task' is added to the commit_waiters list of the last membuf. + * + * Returns true if task was successfully added. This will happen when the + * following conditions are met: + * 1. There is a membuf with at least one byte overlapping with the given + * region [offset, offset+list). + * 2. The membuf is either in flushing or commit_pending state. + * If in flushing state, it'll first move to commit_pending state + * after successful flush and then after commit completed, this + * task will be completed. + * + * If it returns true, caller need not call the fuse callback as it'll + * get called when the membuf is committed. + * If task cannot be added to the commit_waiters list, it returns false, + * and in that case caller must complete the fuse callback. + * + * LOCK: This takes chunkmap_lock_43 lock exclusively. + */ + bool add_commit_waiter(uint64_t offset, + uint64_t length, + struct rpc_task *task); + /* * Returns all dirty chunks for a given range in chunkmap. * Before returning it increases the inuse count of underlying membuf(s). @@ -1237,8 +1299,8 @@ class bytes_chunk_cache * check for that after holding the membuf lock, before it tries to * flush those membuf(s). */ - std::vector get_dirty_bc_range(uint64_t st_off, - uint64_t end_off) const; + std::vector get_dirty_bc_range(uint64_t st_off = 0, + uint64_t end_off = UINT64_MAX) const; /* * Returns dirty chunks which are not already flushing, in the given range, @@ -1259,6 +1321,16 @@ class bytes_chunk_cache std::vector get_dirty_nonflushing_bcs_range( uint64_t st_off = 0, uint64_t end_off = UINT64_MAX) const; + /** + * Returns contiguous dirty (and not flushing) chunks from chunmap, starting + * with the lowest dirty offset, and returns the total number of (dirty) + * bytes contained in the returned chunks. + * Before returning it increases the inuse count of underlying membuf(s). + * Caller will typically flush these to the backing Blob as UNSTABLE + * writes. + */ + std::vector get_contiguous_dirty_bcs(uint64_t& size) const; + /* * Returns all dirty chunks which are currently flushing for a given range * in chunkmap. Before returning it increases the inuse count of underlying @@ -1274,8 +1346,8 @@ class bytes_chunk_cache * check for that after holding the membuf lock, before it tries to * commit those membuf(s). */ - std::vector get_flushing_bc_range(uint64_t st_off, - uint64_t end_off) const; + std::vector get_flushing_bc_range(uint64_t st_off = 0, + uint64_t end_off = UINT64_MAX) const; /* * Returns *all* commit pending chunks in chunkmap. @@ -1366,22 +1438,65 @@ class bytes_chunk_cache } /** - * Maximum size a dirty extent can grow before we should flush it. + * Maximum size a dirty extent can grow before we must flush it. * This is 60% of the allowed cache size or 1GB whichever is lower. * The reason for limiting it to 1GB is because there's not much value in * holding more data than the Blob NFS server's scheduler cache size. * We want to send as prompt as possible to utilize the n/w b/w but slow * enough to give the write scheduler an opportunity to merge better. + * + * TODO: For unstable writes, it may not make sense to wait after we have + * a full block worth of data. */ - uint64_t max_dirty_extent_bytes() const + static uint64_t max_dirty_extent_bytes() { // Maximum cache size allowed in bytes. static const uint64_t max_total = (aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL); assert(max_total != 0); - static const uint64_t max_dirty_extent = (max_total * 0.6); - return std::min(max_dirty_extent, uint64_t(1024 * 1024 * 1024ULL)); + /* + * Minimum of 60% of max cache and 1000MiB. + * 1000MiB, as it can be equally divided into 100MiB blocks. + */ + static const uint64_t max_dirty_extent = + std::min((uint64_t)(max_total * 0.6), 1000 * 1024 * 1024UL); + + // Must be non-zero. + assert(max_dirty_extent != 0); + + return max_dirty_extent; + } + + /** + * Maximum size of commit_pending data that can be in cache, before we + * must commit it to Blob. + * It should be greater than or equal to the flush threshold (as returned + * by max_dirty_extent_bytes()) and smaller than the inline write threshold + * (as suggested by do_inline_write()), to minimize inline flush waits as + * much as possible, in steady state. + */ + static uint64_t max_commit_bytes() + { + // Maximum cache size allowed in bytes. + static const uint64_t max_total = + (aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL); + assert(max_total != 0); + + /* + * Minimum of 60% of max cache and 2 times the flush limit. + * We want to commit as soon as possible w/o affecting performance. + * If we commit too often, since commit is a serializing operation, + * it'll affect the write throughput, otoh, if we commit too late + * then we might hit the inline write threshold, which again would + * serialize writes, bringing down throughput. + */ + static const uint64_t max_commit_bytes = + std::min((uint64_t)(max_total * 0.6), + 2 * max_dirty_extent_bytes()); + assert(max_commit_bytes > 0); + + return max_commit_bytes; } /** @@ -1432,12 +1547,84 @@ class bytes_chunk_cache return bytes_flushing > 0; } + /** + * Check if we must initiate a COMMIT RPC now. Note that the caller would + * just send the COMMIT RPC and not necessarily block the user write + * request till the COMMIT RPC completes, i.e., it's not an inline commit. + * + * We must start commit if: + * 1. We have enough commit_pending data for this file/cache, or, + * 2. Global memory pressure dictates that we commit now to free up + * memory. In this case we might be committing more frequently which + * won't necessarily be optimal, but we have no choice due to the + * memory pressure. + */ + bool commit_required() const + { + const bool local_pressure = + (bytes_commit_pending >= max_commit_bytes()); + + if (local_pressure) { + return true; + } + + /* + * TODO: Take cue from global memory pressure. + */ + return false; + } + + /** + * Check if we must initiate flush of some cached data. Note that the caller + * would just send the corresponding WRITE RPC and not necessarily block the + * user write request till the WRITE RPC completes, i.e., it's not an inline + * write. + * + * We must start flush/write if: + * 1. We have enough bytes to flush so that we can write a full sized + * block, or for the case of stable write, we have enough data to fill + * the scheduler queue. + * 2. Global memory pressure dictates that we flush now to free up memory. + * In this case we might be flushing more frequently which won't + * necessarily be optimal, but we have no choice due to the memory + * pressure. + */ + bool flush_required() const + { + const bool local_pressure = + (get_bytes_to_flush() >= max_dirty_extent_bytes()); + + if (local_pressure) { + return true; + } + + /* + * TODO: Take cue from global memory pressure. + */ + return false; + } + /** * This should be called by writer threads to find out if they must wait * for the write to complete. This will check both the cache specific and * global memory pressure. */ bool do_inline_write() const + { + return get_inline_flush_bytes() > 0; + } + + /** + * Inline write/flush means that we are under sufficient memory pressure + * that we want to slow down the application writes by not completing them + * after copying the data to cache (using copy_to_cache()) but instead + * complete application writes only after the flush completes. + * + * This function returns a non-zero number representing the number of bytes + * that we need to write inline, else if not under memory pressure returns + * zero. + */ + uint64_t get_inline_flush_bytes() const { /* * Allow two dirty extents before we force inline write. @@ -1445,11 +1632,18 @@ class bytes_chunk_cache * the second one. */ static const uint64_t max_dirty_allowed_per_cache = - max_dirty_extent_bytes() * 2; - const bool local_pressure = bytes_dirty > max_dirty_allowed_per_cache; + (max_dirty_extent_bytes() * 4); + const bool local_pressure = + ((int64_t) get_bytes_to_flush() > (int64_t) max_dirty_allowed_per_cache); if (local_pressure) { - return true; + /* + * Leave one max_dirty_extent_bytes worth of dirty bytes, and + * flush the rest. + */ + const int64_t flush_now = + (get_bytes_to_flush() - max_dirty_extent_bytes()); + return std::max((int64_t) 0, flush_now); } /* @@ -1458,8 +1652,20 @@ class bytes_chunk_cache */ uint64_t inline_bytes; + /* + * TODO: Noisy neighbor syndrome, where one file is hogging the cache, + * inline pruning will be triggered for other files. + */ get_prune_goals(&inline_bytes, nullptr); - return (inline_bytes > 0); + + /* + * (bytes_flushing + bytes_commit_pending) represents the data which + * is either already flushed or being flushed. Exclude that from the + * needs-to-be-flushed data. + */ + const int64_t flush_now = + (inline_bytes - (bytes_flushing + bytes_commit_pending)); + return std::max((int64_t) 0, flush_now); } /** diff --git a/turbonfs/inc/nfs_inode.h b/turbonfs/inc/nfs_inode.h index bef85bb9..35e8ff54 100644 --- a/turbonfs/inc/nfs_inode.h +++ b/turbonfs/inc/nfs_inode.h @@ -127,8 +127,16 @@ struct nfs_inode * * See flush_lock()/flush_unlock() for the actual locking. * + * Note: While flush_lock is held it's guaranteed that no new flush can + * start, but there can be ongoing flushes at the time when flush_lock + * is called (and it returns with the lock held). If the caller wants + * to proceed only after all ongoing flushes complete (and no new + * flushes are started), it must hold the flush_lock and then wait + * for ongoing flushes to complete. Ref flush_cache_and_wait(). + * * Note: Though it's called flush lock, but it protects backend file size - * changes through both flush and/or commit. + * changes through both flush and/or commit, i.e., if flush_lock is + * held it's guaranteed that no new flush or commit can start. */ mutable std::atomic is_flushing = false; mutable std::condition_variable_any flush_cv; @@ -279,6 +287,29 @@ struct nfs_inode */ struct stat attr; + /** + * We maintain following multiple views of the file and thus multiple file + * sizes for those views. + * - Cached. + * This is the view of the file that comprises of data that has been + * written by the application and saved in file cache. It may or may not + * have been flushed and/or committed. This is the most uptodate view of + * the file and applications must use this view. + * cached_filesize tracks the file size for this view. + * - Uncommited. + * This is the view of the file that tracks data that has been flushed + * using UNSTABLE writes but not yet COMMITted to the Blob. This view of + * the file is only used to see if the next PB call will write after the + * last PB'ed byte and thus can be appended. + * putblock_filesize tracks the file size for this view. + * - Committed. + * This is the view of the file that tracks data committed to the Blob. + * Other clients will see this view. + * attr.st_size tracks the file size for this view. + */ + off_t cached_filesize = 0; + off_t putblock_filesize = 0; + /* * Has this inode seen any non-append write? * This starts as false and remains false as long as copy_to_cache() only @@ -296,11 +327,8 @@ struct nfs_inode * writes (either overwrite or sparse write) must go as a stable write * (since server knows best how to allocate blocks for them). * Once set to true, it remains true for the life of the inode. - * - * Note: As of now, we are not using this flag as commit changes not yet - * integrated, so we are setting this flag to true. */ - bool stable_write = true; + bool stable_write = false; public: /* @@ -384,6 +412,15 @@ struct nfs_inode */ int write_error = 0; + /* + * Offset and length of the latest flush operation. + * Tasks that want to wait for multiple membufs to be flushed/committed, + * will add themselves to flush_waiters/commit_waiters list for this + * membuf. See add_task_to_flush_waiters()/add_task_to_commit_waiters(). + */ + uint64_t last_flushing_offset = 0; + uint64_t last_flushing_length = 0; + /* * Commit state for this inode. * This is used to track the state of commit operation for this inode, which @@ -522,7 +559,7 @@ struct nfs_inode * change the file size after truncate sets it. */ bool truncate_start(size_t size); - void truncate_end() const; + void truncate_end(); /** * This MUST be called only after has_filecache() returns true, else @@ -952,7 +989,7 @@ struct nfs_inode */ if (!non_append_writes_seen && (offset != attr.st_size)) { non_append_writes_seen = true; - AZLogInfo("[{}] Non-append write seen [{}, {}), file size: {}", + AZLogDebug("[{}] Non-append write seen [{}, {}), file size: {}", ino, offset, offset+length, attr.st_size); } @@ -1108,41 +1145,86 @@ struct nfs_inode } /** - * Is commit pending for this inode? + * Is commit pending/scheduled for this inode? + * + * It's called from write_iov_callback() when flush/write completes to see + * if some other thread has scheduled commit (as it could not start commit + * since flush was in progress) and it needs to start commit now. + * + * This MUST be called with flush_lock held. */ bool is_commit_pending() const { + assert(is_flushing); assert(commit_state != commit_state_t::INVALID); return (commit_state == commit_state_t::NEEDS_COMMIT); } /** - * set needs_commit state for this inode. - * Note this is set to let flushing task know that commit is pending and start commit task. + * Schedule commit for this inode. + * + * This MUST be called with flush_lock held, as only one thread must + * schedule commit. Note that a thread wanting to commit some data, checks + * if some other thread is already flushing data from that cache, if yes it + * cannot start the commit till the flush completes (as we need consistent + * TBL at the time of commit), so it just schedules the commit and + * write_iov_callback() then starts the commit when the last ongoing flush + * completes. If there's no ongoing flush at the time of this call, then the + * thread that wants to commit must start the commit itself, in that case + * it'll call set_commit_in_progress() and not set_commit_pending(). + * See schedule_or_start_commit(). + * + * XXX Do not confuse it with similar function in membuf. + * That is used to set "commit needed" for a membuf after it's written + * using UNSTABLE write. This one is for the inode and conveys that one + * or more membufs in this inode's filecache need to be committed. */ void set_commit_pending() { - // Commit can be set to pending only if it is in commit_not_needed state. + assert(is_flushing); + /* + * Commit MUST be scheduled only if it is not already running or + * scheduled. + */ assert(commit_state == commit_state_t::COMMIT_NOT_NEEDED); commit_state = commit_state_t::NEEDS_COMMIT; } /** * Is commit in progress for this inode? + * Note that we consider commit to be in-progress even if it's scheduled + * (commit_state == NEEDS_COMMIT) but not yet running. This is because + * once scheduled the commit WILL DEFINITELY run (when the last flush + * completes) and we usually don't need to distinguish between that and + * whether it's actually running now. + * + * This can only be safely called with flush_lock held. + * Depending on other conditions, caller may be able to call it w/o the + * flush_lock but be careful and think through scenarios where some + * other thread can call set_commit_pending()/set_commit_in_progress(), + * right after or before our call to is_commit_in_progress(). */ bool is_commit_in_progress() const { assert(commit_state != commit_state_t::INVALID); - return (commit_state == commit_state_t::COMMIT_IN_PROGRESS); + return ((commit_state == commit_state_t::COMMIT_IN_PROGRESS) || + (commit_state == commit_state_t::NEEDS_COMMIT)); } /** * Set commit_in_progress state for this inode. + * + * This MUST be called with flush_lock held, as only one thread must + * start commit. */ void set_commit_in_progress() { + assert(is_flushing); + assert(commit_state != commit_state_t::INVALID); + // Must not already be in-progress or scheduled. assert(commit_state != commit_state_t::COMMIT_IN_PROGRESS); + commit_state = commit_state_t::COMMIT_IN_PROGRESS; } @@ -1301,9 +1383,7 @@ struct nfs_inode /** * Flush the dirty file cache represented by filecache_handle and wait - * till all dirty data is sync'ed with the NFS server. Only dirty data - * in the given range is flushed if provided, else all dirty data is - * flushed. + * till all dirty data is sync'ed with the NFS server. * Note that filecache_handle is the only writeback cache that we have * and hence this only flushes that. * For a non-reg file inode this will be a no-op. @@ -1318,18 +1398,76 @@ struct nfs_inode * initiate any new flush operations while some truncate call is in * progress (which must have held the is_flushing lock). */ - int flush_cache_and_wait(uint64_t start_off = 0, - uint64_t end_off = UINT64_MAX); + int flush_cache_and_wait(); + + /** + * Start commit of (all) uncommitted data. Since commit and flush cannot + * run in parallel, if there's an ongoing flush it schedules commit by + * calling set_commit_pending() which will cause the last completing flush + * to trigger the commit, else it starts the commit. + * MUST be called only when doing unstable writes. + */ + void schedule_or_start_commit(); + + /** + * Helper method for calling file_cache's add_flush_waiter() method. + * If not able to add successfully, then it completes the task else the + * task will be completed when ongoing flush completes. + * Call to this function guarantees that task will be completed and + * caller need not complete the task. + */ + void add_task_to_flush_waiters(struct rpc_task *task); + + /** + * Helper method for calling file_cache's add_commit_waiter() method. + * If not able to add successfully, then it completes the task else the + * task will be completed when ongoing commit or commit initiated after + * the ongoing flush completes. + * Call to this function guarantees that task will be completed and + * caller need not complete the task. + */ + void add_task_to_commit_waiters(struct rpc_task *task); /** * Wait for currently flushing membufs to complete. + * If parent_task is non-null, then it's added to the commit_waiters list + * and returned, otherwise it waits for the ongoing flush (and subsequent + * commit) to complete. + * * Returns 0 on success and a positive errno value on error. * * Note : Caller must hold the inode is_flushing lock to ensure that - * no new membufs are added till this call completes. + * no new flush is started till this call completes. + */ + int wait_for_ongoing_flush_commit(struct rpc_task *parent_task = nullptr); + + /** + * commit_membufs() is called to commit uncommitted membufs to the Blob. + * It creates commit RPC and sends it to the NFS server. + */ + void commit_membufs(); + + /** + * switch_to_stable_write() is called to switch the inode to stable write + * mode. It waits for all ongoing flush and subsequent commit to complete. + * If not already scheduled, it'll perform an explicit commit after the + * flush complete. + * Post that it'll mark inode for stable write and return. From then on + * any writes to this inode will be sent as stable writes. */ - int wait_for_ongoing_flush(uint64_t start_off = 0, - uint64_t end_off = UINT64_MAX); + void switch_to_stable_write(); + + /** + * Check if stable write is required for the given offset. + * Given offset is the start of contiguous dirty membufs that need to be + * flushed to the Blob. + */ + bool check_stable_write_required(off_t offset); + + /** + * Wait for ongoing commit operation to complete. + */ + void wait_for_ongoing_commit(); /** * Sync the dirty membufs in the file cache to the NFS server. @@ -1340,6 +1478,8 @@ struct nfs_inode * caller in case of memory pressure when we want to delay fuse callbacks * to slow down writes which can cause more memory to be dirtied. * + * Note: Must be called with flush_lock held. + * * Note: sync_membufs() can free parent_task if all issued backend * writes complete before sync_membufs() could return. * DO NOT access parent_task after sync_membufs() returns. @@ -1372,7 +1512,12 @@ struct nfs_inode * Lock the inode for flushing. */ void flush_lock() const; - void flush_unlock() const; + void flush_unlock(); + + bool flushtry_lock() + { + return !is_flushing.exchange(true); + } /** * Revalidate the inode. diff --git a/turbonfs/inc/rpc_stats.h b/turbonfs/inc/rpc_stats.h index 626914b1..d0a7bbed 100644 --- a/turbonfs/inc/rpc_stats.h +++ b/turbonfs/inc/rpc_stats.h @@ -139,9 +139,6 @@ class rpc_stats_az */ void on_rpc_issue() { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(stamp.issue == 0); stamp.issue = get_current_usecs(); assert(stamp.issue >= stamp.create); @@ -156,9 +153,6 @@ class rpc_stats_az */ void on_rpc_cancel() { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert(stamp.issue != 0); assert((int64_t) stamp.issue <= get_current_usecs()); assert(stamp.dispatch == 0); @@ -178,9 +172,6 @@ class rpc_stats_az */ void on_rpc_complete(struct rpc_pdu *pdu, nfsstat3 status) { - // FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead. - assert(optype != FUSE_FLUSH); - assert((status == NFS3ERR_RPC_ERROR) || (nfsstat3_to_errno(status) != -ERANGE)); diff --git a/turbonfs/inc/rpc_task.h b/turbonfs/inc/rpc_task.h index a7f049ae..c9fd4303 100644 --- a/turbonfs/inc/rpc_task.h +++ b/turbonfs/inc/rpc_task.h @@ -2387,6 +2387,7 @@ struct rpc_task */ bool add_bc(const bytes_chunk& bc); void issue_write_rpc(); + void issue_commit_rpc(); #ifdef ENABLE_NO_FUSE /* diff --git a/turbonfs/src/file_cache.cpp b/turbonfs/src/file_cache.cpp index 2b51abbb..f948c3c1 100644 --- a/turbonfs/src/file_cache.cpp +++ b/turbonfs/src/file_cache.cpp @@ -483,7 +483,7 @@ void membuf::clear_flushing() * TODO: Remove me once we have enough testing. * Don't release it to production. */ - assert(!is_dirty()); + // assert(!is_dirty()); flag &= ~MB_Flag::Flushing; @@ -712,6 +712,36 @@ void membuf::clear_inuse() inuse--; } +std::vector membuf::get_commit_waiters() +{ + /* + * We cannot assert for !is_dirty() as in case of write failure + * we complete flush_waiters w/o clearing dirty flag. + */ + assert(is_inuse()); + assert(is_locked()); + assert(!is_flushing()); + assert(!is_commit_pending()); + + std::unique_lock _lock(commit_waiters_lock_44); + std::vector tasks; + + if (commit_waiters) { + // flush_waiters is dynamically allocated when first task is added. + assert(!commit_waiters->empty()); + tasks.swap(*commit_waiters); + assert(commit_waiters->empty()); + + /* + * Free the commit_waiters vector. + */ + delete commit_waiters; + commit_waiters = nullptr; + } + + return tasks; +} + std::vector membuf::get_flush_waiters() { /* @@ -2238,7 +2268,7 @@ void bytes_chunk_cache::clear_nolock() assert(bytes_cached == 0); if (bytes_allocated != 0) { - AZLogWarn("[{}] Cache purge: bytes_allocated is still {}, some user " + AZLogDebug("[{}] Cache purge: bytes_allocated is still {}, some user " "is still holding on to the bytes_chunk/membuf even after " "dropping the inuse count: backing_file_name={}", CACHE_TAG, bytes_allocated.load(), backing_file_name); @@ -2362,6 +2392,43 @@ std::vector bytes_chunk_cache::get_dirty_nonflushing_bcs_range( return bc_vec; } +std::vector bytes_chunk_cache::get_contiguous_dirty_bcs( + uint64_t& size) const +{ + std::vector bc_vec; + size = 0; + + // TODO: Make it shared lock. + const std::unique_lock _lock(chunkmap_lock_43); + auto it = chunkmap.lower_bound(0); + uint64_t prev_offset = AZNFSC_BAD_OFFSET; + + while (it != chunkmap.cend()) { + const struct bytes_chunk& bc = it->second; + struct membuf *mb = bc.get_membuf(); + + if (mb->is_dirty() && !mb->is_flushing()) { + if (prev_offset != AZNFSC_BAD_OFFSET) { + if (prev_offset != bc.offset) { + break; + } else { + size += bc.length; + prev_offset = bc.offset + bc.length; + } + } else { + size += bc.length; + prev_offset = bc.offset + bc.length; + } + mb->set_inuse(); + bc_vec.emplace_back(bc); + } + + ++it; + } + + return bc_vec; +} + bool bytes_chunk_cache::add_flush_waiter(uint64_t offset, uint64_t length, struct rpc_task *task) @@ -2409,6 +2476,53 @@ bool bytes_chunk_cache::add_flush_waiter(uint64_t offset, return false; } +bool bytes_chunk_cache::add_commit_waiter(uint64_t offset, + uint64_t length, + struct rpc_task *task) +{ + assert(offset < AZNFSC_MAX_FILE_SIZE); + assert(length > 0); + + // Only frontend write tasks must ever wait. + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + + const std::unique_lock _lock(chunkmap_lock_43); + auto it = chunkmap.lower_bound(offset); + struct bytes_chunk *last_bc = nullptr; + + // Get the last bc for the given range. + while (it != chunkmap.cend() && (it->first < (offset + length))) { + last_bc = &(it->second); + ++it; + } + + if (last_bc != nullptr) { + // membuf must have at least one byte in the requested range. + assert(last_bc->offset >= offset && + last_bc->offset < (offset + length)); + + struct membuf *mb = last_bc->get_membuf(); + + std::unique_lock _lock2(mb->commit_waiters_lock_44); + /* + * Only add flush waiters to dirty membufs, else it may have already + * completed flush and we don't want the task to be waiting forever. + */ + if (mb->is_flushing() || mb->is_commit_pending()) { + if (mb->commit_waiters == nullptr) { + mb->commit_waiters = new std::vector(); + } + + mb->commit_waiters->emplace_back(task); + return true; + } + } + + return false; +} + std::vector bytes_chunk_cache::get_dirty_bc_range( uint64_t start_off, uint64_t end_off) const { diff --git a/turbonfs/src/nfs_client.cpp b/turbonfs/src/nfs_client.cpp index 0589c5e4..d5ec131e 100644 --- a/turbonfs/src/nfs_client.cpp +++ b/turbonfs/src/nfs_client.cpp @@ -1180,6 +1180,8 @@ void nfs_client::getattr( * updating nfs_inode::attr during cached writes and then returning * attributes from that instead of making a getattr call here. * We need to think carefully though. + * + * TODO: flush_cache_and_wait() */ if (inode->is_regfile()) { AZLogDebug("[{}] Flushing file data ahead of getattr", diff --git a/turbonfs/src/nfs_inode.cpp b/turbonfs/src/nfs_inode.cpp index 1fb31583..50886418 100644 --- a/turbonfs/src/nfs_inode.cpp +++ b/turbonfs/src/nfs_inode.cpp @@ -34,7 +34,7 @@ nfs_inode::nfs_inode(const struct nfs_fh3 *filehandle, assert(write_error == 0); // TODO: Revert this to false once commit changes integrated. - assert(stable_write == true); + assert(stable_write == false); assert(commit_state == commit_state_t::COMMIT_NOT_NEEDED); #ifndef ENABLE_NON_AZURE_NFS @@ -375,10 +375,161 @@ int nfs_inode::get_actimeo_max() const } } +/* + * Caller should hold flush_lock(). + */ +void nfs_inode::wait_for_ongoing_commit() +{ + assert(is_flushing); + + /* + * TODO: See if we can eliminate inline sleep. + */ + if (is_commit_in_progress()) { + AZLogWarn("[{}] wait_for_ongoing_commit() will sleep inline!!", + get_fuse_ino()); + } + + while (is_commit_in_progress()) { + assert(!get_filecache()->is_flushing_in_progress()); + ::usleep(1000); + } + + assert(!is_commit_in_progress()); +} + +/* + * This function is called with flush_lock() held. + * This should be called whenever we figure out that we cannot proceed with + * unstable writes (most common reason being, next write is not an append + * write). Once this function returns, following is guaranteed: + * - There will be no flushes in progress. + * - There will be no commit_pending data and no commit inprogress. + * - inode->stable_write will be set to true. + */ +void nfs_inode::switch_to_stable_write() +{ + assert(is_flushing); + assert(!is_stable_write()); + + AZLogInfo("[{}] Switching to stable write", ino); + + /* + * switch_to_stable_write() is called from places where we are about to + * start a flush operation. Before that we check to see if we need to + * change to stable write. Since we are not flushing yet and since we + * do not support multiple ongoing flushes, we are guaranteed that no + * flush should be in progress when we reach here. + * Similarly commit should not be in progress. + */ + assert(!is_commit_in_progress()); + assert(!get_filecache()->is_flushing_in_progress()); + + /* + * Check if there is anything to commit, if not then + * switch to stable write. + */ + if (get_filecache()->get_bytes_to_commit() == 0) { + AZLogDebug("[{}] Nothing to commit, switching to stable write", ino); + set_stable_write(); + return; + } + + /* + * If there is something to commit, then commit it and + * wait for the commit to complete. + */ + schedule_or_start_commit(); + + /* + * Wait for the commit to complete. + */ + wait_for_ongoing_commit(); + + assert(get_filecache()->get_bytes_to_commit() == 0); + assert(!get_filecache()->is_flushing_in_progress()); + assert(!is_commit_in_progress()); + + set_stable_write(); + return; +} + +/* + * This function checks, whether switch to stable write or not. + */ +bool nfs_inode::check_stable_write_required(off_t offset) +{ + /* + * If stable_write is already set, we don't need to do anything. + * We don't need lock here as once stable_write is set it's never + * unset. + */ + if (is_stable_write()) { + return false; + } + + /* + * If current write is not append write, then we can't go for unstable writes + * It may be overwrite to existing data and we don't have the knowldege of + * existing block list, it maye require read modified write. So, we can't go + * for unstable write. Similarly, if the offset is more than end of the file, + * we need to write zero block in between the current end of the file and the + * offset. + */ + if (putblock_filesize != offset) { + AZLogInfo("Stable write required as putblock_filesize:{} is not at the" + "offset:{}", putblock_filesize, offset); + + return true; + } + + return false; +} + +/** + * commit_membufs() is called by writer thread to commit flushed membufs. + * It's always issued under flush_lock(). + */ +void nfs_inode::commit_membufs() +{ + assert(is_flushing); + + set_commit_in_progress(); + + /* + * Create the commit task to carry out the write. + */ + struct rpc_task *commit_task = + get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_FLUSH); + commit_task->init_flush(nullptr /* fuse_req */, ino); + assert(commit_task->rpc_api->pvt == nullptr); + + commit_task->issue_commit_rpc(); +} + +/** + * sync_membufs() + */ void nfs_inode::sync_membufs(std::vector &bc_vec, bool is_flush, struct rpc_task *parent_task) { + assert(is_flushing); + + if (!is_stable_write()) { + /* + * We do not allow a new flush while there's an ongoing one, in case + * of unstable writes. + */ + assert(!get_filecache()->is_flushing_in_progress()); + } + + /* + * Stable won't have commit and for unstable we cannot flush while + * commit is going on. + */ + assert(!is_commit_in_progress()); + if (bc_vec.empty()) { return; } @@ -407,12 +558,33 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, parent_task->num_ongoing_backend_writes = 1; } + /* + * If new offset is not at the end of the file, + * then we need to switch to stable write. + */ + if (check_stable_write_required(bc_vec[0].offset)) { + switch_to_stable_write(); + } + /* * Create the flush task to carry out the write. */ struct rpc_task *write_task = nullptr; - // Flush dirty membufs to backend. + /* + * Flush dirty membufs to backend. + * We increment bytes_flushing by 1 before starting any of the flush to + * protect is against the condition where whatever flush we have issued + * completes till we could issue all the flushes. In write_iov_callback() + * we will wrongly treat it as "all flush completed" and start the commit, + * while this thread may still keep doing flush for membufs. + * + * Also assert that for unstable writes, we should never start a new flush + * while there is an ongoing flush. + */ + assert((get_filecache()->bytes_flushing == 0) || is_stable_write()); + get_filecache()->bytes_flushing++; + for (bytes_chunk& bc : bc_vec) { /* * Get the underlying membuf for bc. @@ -495,6 +667,14 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, continue; } + #if 0 + /* + * Update the last flushing offset and length. + */ + last_flushing_offset = bc.offset; + last_flushing_length = bc.length; + #endif + if (write_task == nullptr) { write_task = get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_WRITE); @@ -518,6 +698,10 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, * Once packed completely, then dispatch the write. */ if (write_task->add_bc(bc)) { + assert(is_stable_write() || ((uint64_t) putblock_filesize == bc.offset)); + putblock_filesize += bc.length; + last_flushing_offset = bc.offset; + last_flushing_length = bc.length; continue; } else { /* @@ -543,6 +727,10 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, // Single bc addition should not fail. [[maybe_unused]] bool res = write_task->add_bc(bc); assert(res == true); + assert(is_stable_write() || ((uint64_t) putblock_filesize == bc.offset)); + putblock_filesize += bc.length; + last_flushing_offset = bc.offset; + last_flushing_length = bc.length; } } @@ -551,6 +739,22 @@ void nfs_inode::sync_membufs(std::vector &bc_vec, write_task->issue_write_rpc(); } + /* + * Drop the protective bytes_flushing count. + * If bytes_flushing becomes 0 then all flushes issued above have completed + * by the time we reached here. Check if we need to start the commit. + * Any thread that decides to commit and finds that there's an ongoing + * flush, will mark the inode as 'NEEDS_COMMIT' and when the flush + * completes, we need to start the commit. + */ + if (--get_filecache()->bytes_flushing == 0) { + if (is_commit_pending()) { + assert(!is_stable_write()); + + commit_membufs(); + } + } + /* * Drop the protective num_ongoing_backend_writes count taken at the start * of this function, and if it's the only one remaining that means all @@ -767,14 +971,117 @@ int nfs_inode::copy_to_cache(const struct fuse_bufvec* bufv, return err; } -/** - * Note: Caller should call with flush_lock() held. +/* + * This function is called with flush_lock() held. + * Once this function is called it's guaranteed that a commit will be done, + * either from here or from the last flush (write_iov_callback()). */ -int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) +void nfs_inode::schedule_or_start_commit() +{ + assert(is_flushing); + assert(!is_stable_write()); + /* + * We want to commit, but we need to wait for current flushing to complete. + * 1. If flushing is going on, set commit_pending for the inode, when last + * membuf flush completes write_iov_callback() will commit the data. + * 2. If flushing is not going on, then start commit of data rightaway. + * + * Note: Ensure lock is held till we set the commit state depending on + * flushing going on or not. This lock is to synchronize the commit + * and flush task. + */ + if (!is_commit_in_progress()) { + if (get_filecache()->is_flushing_in_progress()) { + AZLogWarn("[{}] Flushing in progress, setting commit_pending", ino); + set_commit_pending(); + } else { + commit_membufs(); + } + } +} + +void nfs_inode::add_task_to_flush_waiters(struct rpc_task *task) +{ + // Must be called only for unstable writes. + assert(!is_stable_write()); + assert(task->magic == RPC_TASK_MAGIC); + // Must be a frontend write task. + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + + /* + * Add this task to the flush_waiter list, so that it can be + * completed after the flush is done. + */ + if (get_filecache()->add_flush_waiter(last_flushing_offset, + last_flushing_length, + task)) { + AZLogDebug("[{}] flush in progress, added to flush_waiter list" + " for [{}, {})", ino, last_flushing_offset, + last_flushing_length); + return; + } else { + /* + * flush is done, but this task is not added to flush_waiter + * list, so complete the write. + */ + AZLogDebug("[{}] Could not add to flush_waiters list, completing " + "write task {}", ino, fmt::ptr(task)); + task->reply_write(task->rpc_api->write_task.get_size()); + return; + } +} + +void nfs_inode::add_task_to_commit_waiters(struct rpc_task *task) { - assert(start_off < end_off); + // Must be called only for unstable writes. + assert(!is_stable_write()); + assert(task->magic == RPC_TASK_MAGIC); + // Must be a frontend write task. + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); - // Caller must call us with is_flushing lock held. + /* + * Add this task to the commit_waiters list, so that it can be + * completed after the commit is done. + */ + if (get_filecache()->add_commit_waiter(last_flushing_offset, + last_flushing_length, + task)) { + AZLogDebug("[{}] Commit in progress, added to commit_waiters list" + " for [{}, {})", ino, last_flushing_offset, + last_flushing_length); + /* + * Since we don't complete the write task here, add_commit_waiter() + * MUST ensure task will be completed if it returns true. + */ + return; + } else { + /* + * Commit is done, but this task is not added to commit_waiters + * list, so complete the write. + */ + AZLogDebug("[{}] Could not add to commit_waiters list, completing " + "write task {}", ino, fmt::ptr(task)); + task->reply_write(task->rpc_api->write_task.get_size()); + return; + } +} + +/** + * Wait for ongoing flushes and commit all uncommitted data. + * Caller must call with flush_lock() held. + * On return there would be no ongoing flush/commit in progress and all + * uncommitted data would have been committed. + * + * If caller passes a valid frontend write task, the call itself will + * return immediately but the task will be completed after flush and commit + * have completed. If caller does not pass a tak pointer, then the call + * itself would block till all flush and commit complete. + */ +int nfs_inode::wait_for_ongoing_flush_commit(struct rpc_task *task) +{ + // Caller must call us with flush_lock held. assert(is_flushing); /* @@ -786,6 +1093,14 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) return 0; } + if (task) { + assert(task->magic == RPC_TASK_MAGIC); + // Must be a frontend write task. + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); + } + /* * If flush() is called w/o open(), there won't be any cache, skip. */ @@ -794,80 +1109,175 @@ int nfs_inode::wait_for_ongoing_flush(uint64_t start_off, uint64_t end_off) } /* - * Flushing not in progress and no new flushing can be started as we hold - * the flush_lock(). + * Stable writes do not need commit, so no commit inprogress and no pending + * commit data. */ - if (!get_filecache()->is_flushing_in_progress()) { - AZLogDebug("[{}] No flush in progress, returning", ino); + if (is_stable_write()) { + assert(!is_commit_in_progress()); + assert(get_filecache()->bytes_commit_pending == 0); + } + + if (get_filecache()->is_flushing_in_progress()) { + /* + * Flushing and committing cannot happen simultaneously. + * This is effectively asserting the following: + * assert(commit_state != commit_state_t::COMMIT_IN_PROGRESS); + */ + assert(!is_commit_in_progress() || is_commit_pending()); + } else if (!is_commit_in_progress() && + get_filecache()->bytes_commit_pending == 0) { + /* + * No flush or commit in progress and nothing to commit, return. + */ + AZLogDebug("[{}] No flush or commit in progress, returning", ino); + + /* + * Complete the write task if it was passed. + */ + if (task) { + task->reply_write(task->rpc_api->write_task.get_size()); + } return 0; } /* - * Get the flushing bytes_chunk from the filecache handle. - * This will grab an exclusive lock on the file cache and return the list - * of flushing bytes_chunks at that point. Note that we can have new dirty - * bytes_chunks created but we don't want to wait for those. + * Ok, so either flushing is going on or commit is going on or we have at + * least one byte to commit. */ - std::vector bc_vec = - filecache_handle->get_flushing_bc_range(start_off, end_off); + std::vector bc_vec = {}; + if ((task == nullptr) && get_filecache()->is_flushing_in_progress()) { + /* + * Get the flushing bc from the filecache. + * We will need to wait for these inline. + * Note that we can have new dirty membufs created after the call but + * we don't want to wait for those. + */ + bc_vec = filecache_handle->get_flushing_bc_range(); + } /* - * Our caller expects us to return only after the flush completes. + * If there is no task passed, then our caller expects us to return + * only after the flush completes (f.e. truncate, switch_to_stable_write). * Wait for all the membufs to flush and get result back. + * Otherwise add the task to the flush_waiter list for stable write and + * commit_waiter list for unstable write. + * Task will wait on last flushed offset and length. */ - for (bytes_chunk &bc : bc_vec) { - struct membuf *mb = bc.get_membuf(); + if (task == nullptr) { + for (bytes_chunk& bc : bc_vec) { + struct membuf *mb = bc.get_membuf(); - assert(mb != nullptr); - assert(mb->is_inuse()); + assert(mb != nullptr); + assert(mb->is_inuse()); + mb->set_locked(); - /* - * sync_membufs() would have taken the membuf lock for the duration - * of the backend wite that flushes the membuf, so once we get the - * lock we know that the flush write has completed. - */ - mb->set_locked(); + /* + * If still dirty after we get the lock, it may mean two things: + * - Write failed. We ignore the failure and pretend as if flush + * completed. + * - Some other thread got the lock before us and it made the + * membuf dirty again. Since we wanted to wait only for ongoing + * flushes and not newly written data, we ignore this. + */ + if (mb->is_dirty() && get_write_error()) { + AZLogError("[{}] Flush [{}, {}) failed with error: {}", + ino, bc.offset, bc.offset + bc.length, + get_write_error()); + } - /* - * If still dirty after we get the lock, it may mean two things: - * - Write failed. - * - Some other thread got the lock before us and it made the - * membuf dirty again. - */ - if (mb->is_dirty() && get_write_error()) { - AZLogError("[{}] Flush [{}, {}) failed with error: {}", - ino, - bc.offset, bc.offset + bc.length, - get_write_error()); + mb->clear_locked(); + mb->clear_inuse(); + + /* + * Release the bytes_chunk back to the filecache. + * These bytes_chunks are not needed anymore as the flush is done. + * + * Note: We come here for bytes_chunks which were found dirty by the + * above loop. These writes may or may not have been issued by + * us (if not issued by us it was because some other thread, + * mostly the writer issued the write so we found it flushing + * and hence didn't issue). In any case since we have an inuse + * count, release() called from write_callback() would not have + * released it, so we need to release it now. + */ + filecache_handle->release(bc.offset, bc.length); + } + } else { + if (is_stable_write()) { + /* + * Add task to flush_waiters list. + */ + add_task_to_flush_waiters(task); + } else { + /* + * Add task to commit_waiters list. + */ + add_task_to_commit_waiters(task); } + } - mb->clear_locked(); - mb->clear_inuse(); + /* + * Unstable write case, we need to wait for the commit to complete. + * We must start/schedule commit. + */ + if (!is_stable_write()) { + schedule_or_start_commit(); + if (task) { + return 0; + } /* - * Release the bytes_chunk back to the filecache. - * These bytes_chunks are not needed anymore as the flush is done. - * - * Note: We come here for bytes_chunks which were found dirty by the - * above loop. These writes may or may not have been issued by - * us (if not issued by us it was because some other thread, - * mostly the writer issued the write so we found it flushing - * and hence didn't issue). In any case since we have an inuse - * count, release() called from write_callback() would not have - * released it, so we need to release it now. + * Wait for the ongoing commit to complete. + * If task is not valid we would have waited for all ongoing flushes, + * and no new flush can start because we hold the flush_lock, so when + * we come here we should have flushing going on. */ - filecache_handle->release(bc.offset, bc.length); + assert(!get_filecache()->is_flushing_in_progress()); + if (is_commit_pending()) { + AZLogInfo("[{}] Commit is pending, we need to start the commit", ino); + commit_membufs(); + } + wait_for_ongoing_commit(); } - AZLogDebug("[{}] wait_for_ongoing_flush() returning with error: {}", - ino, get_write_error()); + + /* + * As we have flush_lock held, no new flush can be started. + * We already waited for the ongoing flush and commit to complete. + * For stable write case there should not be any commit pending. + * So, following conditions should be true for both stable and + * unstable writes.: + */ + assert(!get_filecache()->is_flushing_in_progress()); + assert(!is_commit_in_progress()); + assert(get_filecache()->bytes_commit_pending == 0); + return get_write_error(); } /** - * Note: This takes shared lock on ilock_1. + * flush_cache_and_wait() is called only from the release/flush call. + * It's called with flush_lock() held. + * Things it does: + * - Before flushing it needs to wait for any ongoing commit to complete. + * - First it try to get membufs through get_contiguous_dirty_bcs() or + * get_dirty_bc_range() based on the unstable write or stable write. + * - Issue flush for the dirty membufs and wait for the flush to complete. + * - If it's unstable write, it issue commit for commit pending membufs. + * - Wait for the commit to complete. + * - Returns the error code if any. + * + * Note: Flush_cache_and_wait() blocks the fuse thread till the flush completes. + * It's called from the release(), flush() and getattr() calls. It's ok + * as of now as it's not very often called. We can optimize to complete + * the flush in background and return immediately. For that we need to add + * special handling for the getattr() call. */ -int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) +int nfs_inode::flush_cache_and_wait() { + assert(!is_stable_write() || + get_filecache()->bytes_commit_pending == 0); + assert(!is_stable_write() || !is_commit_in_progress()); + /* * MUST be called only for regular files. * Leave the assert to catch if fuse ever calls flush() on non-reg files. @@ -897,36 +1307,101 @@ int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) } /* - * Grab the inode is_flushing lock to ensure that we doen't initiate - * any new flush operation while some truncate call is in progress - * (which must have taken the is_flushing lock). - * Once flush_lock() returns we have the is_flushing lock and we are + * Grab the flush_lock to ensure that we don't initiate any new flushes + * while some truncate call is in progress (which must have taken the + * flush_lock). Once flush_lock() returns we have the lock and we are * guaranteed that no new truncate operation can start till we release - * the is_flushing lock. We can safely start the flush then. + * the flush_lock. We can safely start the flush then. */ flush_lock(); + /* + * If flush is in progress, wait for it to complete. + * It may happen that last flush is still in progress and we need to wait + * for it to complete before we start the new flush. + * As we have the flush_lock() we are guaranteed that no new flush can be + * started till we release the flush_lock(). + * + * wait_for_ongoing_flush_commit() will wait for the ongoing flush/commt to + * complete. + * + * Note: For stable write case, we don't need to wait for the ongoing flush, + * as there is no commit required for stable write. + */ + if (!is_stable_write()) { + const int err = wait_for_ongoing_flush_commit(); + if (err != 0) { + flush_unlock(); + AZLogError("[{}] Failed to flush cache to stable storage, " + "error={}", ino, err); + return err; + } + } + + /* + * For unstable write, If no bytes to flush, then return. + * For stable write, If no bytes to flush and no flush in + * progress, then return. As for stable write, we didn't + * wait for the flush to complete as above. + * + * TODO: Need to check getattr() call. As that can cause this + * assert to fail. + */ + assert(!get_filecache()->is_flushing_in_progress() || + is_stable_write()); + assert(!is_commit_in_progress()); + assert(get_filecache()->bytes_commit_pending == 0); + + if (get_filecache()->get_bytes_to_flush() == 0) { + assert(get_filecache()->bytes_dirty == 0); + assert(get_filecache()->bytes_flushing == 0); + + AZLogDebug("[{}] Nothing to flush, returning", ino); + flush_unlock(); + return 0; + } + /* * Get the dirty bytes_chunk from the filecache handle. * This will grab an exclusive lock on the file cache and return the list - * of dirty bytes_chunks at that point. Note that we can have new dirty - * bytes_chunks created but we don't want to wait for those. + * of dirty bytes_chunks at that point. */ - std::vector bc_vec = - filecache_handle->get_dirty_bc_range(start_off, end_off); + std::vector bc_vec; + uint64_t size = 0; + if (!is_stable_write()) { + bc_vec = filecache_handle->get_contiguous_dirty_bcs(size); + + if (size != get_filecache()->get_bytes_to_flush()) { + AZLogInfo("[{}] Flushed size: {} != bytes to flush: {}", + ino, size, get_filecache()->get_bytes_to_flush()); + AZLogInfo("SWITCHING TO STABLE WRITE"); + + /* + * This is rare and no-op as we already completed the pending + * flush and commit. + */ + switch_to_stable_write(); + for (auto& bc : bc_vec) { + bc.get_membuf()->clear_inuse(); + } + bc_vec = filecache_handle->get_dirty_bc_range(); + } + } else { + bc_vec = filecache_handle->get_dirty_bc_range(); + } /* * sync_membufs() iterate over the bc_vec and starts flushing the dirty * membufs. It batches the contiguous dirty membufs and issues a single * write RPC for them. */ - sync_membufs(bc_vec, true); + sync_membufs(bc_vec, true /* is_flush */); /* * Our caller expects us to return only after the flush completes. * Wait for all the membufs to flush and get result back. */ - for (bytes_chunk &bc : bc_vec) { + for (bytes_chunk& bc : bc_vec) { struct membuf *mb = bc.get_membuf(); assert(mb != nullptr); @@ -964,20 +1439,43 @@ int nfs_inode::flush_cache_and_wait(uint64_t start_off, uint64_t end_off) filecache_handle->release(bc.offset, bc.length); } + /* + * Flush_cache_and_wait called from a release call only, + * we need to commit all the dirty membufs which are not yet committed. + */ + if (get_filecache()->get_bytes_to_commit() > 0) { + assert(!is_stable_write()); + assert(!is_commit_in_progress()); + assert(get_filecache()->get_bytes_to_flush() == 0 || + get_write_error() != 0); + + schedule_or_start_commit(); + } + + /* + * Wait for the ongoing commit to complete. + */ + wait_for_ongoing_commit(); flush_unlock(); /* - * If the file is deleted while we still have data in the cache, don't - * treat it as a failure to flush. The file has gone and we don't really - * care about the unwritten data. + * Cleanup on error. */ - if (get_write_error() == ENOENT || get_write_error() == ESTALE) { - return 0; + if (get_write_error()) { + AZLogError("[{}] Failed to flush cache to stable storage, " + "error={}", ino, get_write_error()); } + assert(!is_commit_in_progress()); + assert(!get_filecache()->is_flushing_in_progress()); + assert(get_filecache()->bytes_dirty == 0); + assert(get_filecache()->bytes_commit_pending == 0); + assert(get_filecache()->bytes_flushing == 0); + return get_write_error(); } + void nfs_inode::flush_lock() const { AZLogDebug("[{}] flush_lock() called", ino); @@ -1002,7 +1500,7 @@ void nfs_inode::flush_lock() const return; } -void nfs_inode::flush_unlock() const +void nfs_inode::flush_unlock() { AZLogDebug("[{}] flush_unlock() called", ino); @@ -1011,7 +1509,9 @@ void nfs_inode::flush_unlock() const */ assert(has_filecache()); assert(is_flushing == true); - + if (!filecache_handle->is_flushing_in_progress() && is_commit_pending()) { + commit_membufs(); + } { std::unique_lock _lock(iflush_lock_3); is_flushing = false; @@ -1021,7 +1521,7 @@ void nfs_inode::flush_unlock() const flush_cv.notify_one(); } -void nfs_inode::truncate_end() const +void nfs_inode::truncate_end() { AZLogDebug("[{}] truncate_end() called", ino); @@ -1053,7 +1553,7 @@ bool nfs_inode::truncate_start(size_t size) */ flush_lock(); - wait_for_ongoing_flush(0, UINT64_MAX); + wait_for_ongoing_flush_commit(); AZLogDebug("[{}] Ongoing flush operations completed", ino); diff --git a/turbonfs/src/rpc_stats.cpp b/turbonfs/src/rpc_stats.cpp index ad26697a..c3fe7be4 100644 --- a/turbonfs/src/rpc_stats.cpp +++ b/turbonfs/src/rpc_stats.cpp @@ -303,6 +303,7 @@ do { \ DUMP_OP(FUSE_READDIRPLUS); DUMP_OP(FUSE_READ); DUMP_OP(FUSE_WRITE); + DUMP_OP(FUSE_FLUSH); /* * TODO: Add more ops. diff --git a/turbonfs/src/rpc_task.cpp b/turbonfs/src/rpc_task.cpp index 9f62bbbe..c458955d 100644 --- a/turbonfs/src/rpc_task.cpp +++ b/turbonfs/src/rpc_task.cpp @@ -806,6 +806,233 @@ void access_callback( } } +/** + * Helper function to complete all write tasks waiting for this membuf to be + * committed. + */ +static void complete_commit_waiter_tasks(struct membuf *mb) +{ + /* + * membuf must have finished committing. + * Obviously must not be dirty, as we will only ever commit a clean membuf + * (after it completes flushing). + */ + assert(!mb->is_commit_pending()); + assert(!mb->is_dirty()); + + std::vector tvec = mb->get_commit_waiters(); + for (auto& task : tvec) { + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + assert(task->rpc_api->write_task.get_size() > 0); + + AZLogDebug("Completing commit waiter task {} for [{}, {})", + fmt::ptr(task), + task->rpc_api->write_task.get_offset(), + task->rpc_api->write_task.get_offset() + + task->rpc_api->write_task.get_size()); + task->reply_write(task->rpc_api->write_task.get_size()); + } +} + +static void commit_callback( + struct rpc_context *rpc, + int rpc_status, + void *data, + void *private_data) +{ + rpc_task *task = (rpc_task*) private_data; + + /* + * Commit is issued as a FUSE_FLUSH. + * TODO: Maybe it should have a type of its own. + */ + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_FLUSH); + assert(task->rpc_api->pvt != nullptr); + // Commit is never called for a fuse request. + assert(task->get_fuse_req() == nullptr); + + auto res = (COMMIT3res*) data; + + INJECT_JUKEBOX(res, task); + + const fuse_ino_t ino = task->rpc_api->flush_task.get_ino(); + struct nfs_inode *inode = + task->get_client()->get_nfs_inode_from_ino(ino); + // List of bcs committed by this commit call. + auto bc_vec_ptr = (std::vector *) task->rpc_api->pvt; + + // We should call commit only when inode is doing unstable+commit. + assert(!inode->is_stable_write()); + // Caller must be inprogress. + assert(inode->is_commit_in_progress()); + assert(!inode->get_filecache()->is_flushing_in_progress()); + + const int status = task->status(rpc_status, NFS_STATUS(res)); + UPDATE_INODE_WCC(inode, res->COMMIT3res_u.resok.file_wcc); + + AZLogDebug("[{}] commit_callback", ino); + + /* + * Now that the request has completed, we can query libnfs for the + * dispatch time. + */ + task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUS(res)); + + if (status == 0) { + uint64_t offset = 0; + uint64_t length = 0; + + /* + * Go over all the successfully committed bcs and release them from + * file cache (note that successful commit confirms that server has + * persisted the data and client can fee it). + * Also complete any tasks waiting for these membufs to be committed. + */ + for (auto &bc : *bc_vec_ptr) { + struct membuf *mb = bc.get_membuf(); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_commit_pending()); + assert(mb->is_uptodate()); + // Dirty membufs must not be committed. + assert(!mb->is_dirty()); + + mb->clear_commit_pending(); + complete_commit_waiter_tasks(mb); + mb->clear_locked(); + mb->clear_inuse(); + + /** + * Collect the contiguous range of bc's to release. + */ + if (offset == 0 && length == 0) { + offset = bc.offset; + length = bc.length; + } else if (offset + length == bc.offset) { + length += bc.length; + } else { + const uint64_t released = + inode->get_filecache()->release(offset, length); + AZLogDebug("[{}] commit_callback releasing bc [{}, {}), " + "released {} bytes", + ino, offset, offset+length, released); + offset = bc.offset; + length = bc.length; + } + } + + // Release the last bc not released by the above loop. + if (length != 0) { + const uint64_t released = + inode->get_filecache()->release(offset, length); + AZLogDebug("[{}] commit_callback releasing bc [{}, {}), " + "released {} bytes", + ino, offset, offset+length, released); + } + } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) { + task->get_client()->jukebox_retry(task); + return; + } else { + /* + * Commit has failed. + * Go over all the bcs that this commit was targetting, mark the + * membufs back as dirty and clear the commit_pending flag. + * Next write will initiate the flush again with stable write. + */ + for (auto &bc : *bc_vec_ptr) { + struct membuf *mb = bc.get_membuf(); + assert(mb->is_inuse()); + assert(mb->is_locked()); + assert(mb->is_commit_pending()); + assert(mb->is_uptodate()); + assert(!mb->is_dirty()); + + mb->clear_commit_pending(); + complete_commit_waiter_tasks(mb); + mb->set_dirty(); + mb->clear_locked(); + mb->clear_inuse(); + } + /* + * Set the inode to stable write, so that next write will initiate + * the flush again with stable write. + * There should be no flush in progress as this moment. + */ + assert(inode->get_filecache()->is_flushing_in_progress() == false); + inode->set_stable_write(); + } + + /* + * Clear commit inprogress for this inode. + * Only after any new flushes or commits can be sent for this file. + */ + inode->clear_commit_in_progress(); + + delete bc_vec_ptr; + task->rpc_api->pvt = nullptr; + task->free_rpc_task(); +} + +void rpc_task::issue_commit_rpc() +{ + // Must only be called for a flush task. + assert(get_op_type() == FUSE_FLUSH); + // Commit is never called for a fuse request. + assert(get_fuse_req() == nullptr); + + const fuse_ino_t ino = rpc_api->flush_task.get_ino(); + struct nfs_inode *inode = get_client()->get_nfs_inode_from_ino(ino); + + // Commit must not be called if we are doing stable writes. + assert(!inode->is_stable_write()); + // Commit must mot be sent with ongoing flushes. + assert(!inode->get_filecache()->is_flushing_in_progress()); + // Caller must have marked commit inprogress. + assert(inode->is_commit_in_progress()); + + COMMIT3args args; + ::memset(&args, 0, sizeof(args)); + bool rpc_retry = false; + + AZLogDebug("[{}] issue_commit_rpc", ino); + + /* + * Get the bcs marked for commit_pending. + */ + assert(rpc_api->pvt == nullptr); + rpc_api->pvt = static_cast(new std::vector( + inode->get_filecache()->get_commit_pending_bcs())); + + args.file = inode->get_fh(); + args.offset = 0; + args.count = 0; + + do { + rpc_retry = false; + stats.on_rpc_issue(); + + if (rpc_nfs3_commit_task(get_rpc_ctx(), + commit_callback, &args, this) == NULL) { + stats.on_rpc_cancel(); + /* + * Most common reason for this is memory allocation failure, + * hence wait for some time before retrying. Also block the + * current thread as we really want to slow down things. + * + * TODO: For soft mount should we fail this? + */ + rpc_retry = true; + + AZLogWarn("rpc_nfs3_write_task failed to issue, retrying " + "after 5 secs!"); + ::sleep(5); + } + } while (rpc_retry); +} + /** * Must be called when bytes_completed bytes are successfully read/written. */ @@ -986,6 +1213,29 @@ void bc_iovec::on_io_fail(int status) task->reply_error(status); } + /* + * This membuf has completed flushing albeit with failure, check if any + * task is waiting for this membuf to be committed. Now this membuf + * failed to flush, so we don't issue commit for this membuf. Hence + * we must complete the commit waiter tasks. + */ + std::vector tvec_commit = mb->get_commit_waiters(); + for (auto& task : tvec_commit) { + assert(task->magic == RPC_TASK_MAGIC); + assert(task->get_op_type() == FUSE_WRITE); + assert(task->rpc_api->write_task.is_fe()); + + AZLogError("Completing commit waiter task {} for [{}, {}), " + "failed: {}", + fmt::ptr(task), + task->rpc_api->write_task.get_offset(), + task->rpc_api->write_task.get_offset() + + task->rpc_api->write_task.get_size(), + status); + + task->reply_error(status); + } + mb->clear_locked(); mb->clear_inuse(); iov++; @@ -1103,7 +1353,7 @@ static void write_iov_callback( bciov->orig_offset + bciov->orig_length); // Update bciov after the current write. - bciov->on_io_complete(res->WRITE3res_u.resok.count); + bciov->on_io_complete(res->WRITE3res_u.resok.count, !inode->is_stable_write()); // Create a new write_task for the remaining bc_iovec. struct rpc_task *write_task = @@ -1141,7 +1391,7 @@ static void write_iov_callback( return; } else { // Complete bc_iovec IO completed. - bciov->on_io_complete(res->WRITE3res_u.resok.count); + bciov->on_io_complete(res->WRITE3res_u.resok.count, !inode->is_stable_write()); // Complete data writen to blob. AZLogDebug("[{}] <{}> Completed write, off: {}, len: {}", @@ -1173,8 +1423,21 @@ static void write_iov_callback( bciov->on_io_fail(status); } - delete bciov; - task->rpc_api->pvt = nullptr; + /* + * Check if commit is pending. + * Any thread that decides to commit and finds that there's an ongoing + * flush, will mark the inode as 'NEEDS_COMMIT' and when the flush + * completes, we need to start the commit. + */ + if (!inode->get_filecache()->is_flushing_in_progress()) { + if(inode->flushtry_lock()) { + if (!inode->get_filecache()->is_flushing_in_progress() && + inode->is_commit_pending()) { + inode->commit_membufs(); + } + inode->flush_unlock(); + } + } /* * If this write_rpc was issued as part of a parent write task, then @@ -1200,6 +1463,9 @@ static void write_iov_callback( } } + delete bciov; + task->rpc_api->pvt = nullptr; + // Release the task. task->free_rpc_task(); } @@ -1258,17 +1524,26 @@ void rpc_task::issue_write_rpc() args.file = inode->get_fh(); args.offset = offset; args.count = length; - args.stable = FILE_SYNC; + args.stable = inode->is_stable_write() ? FILE_SYNC : UNSTABLE; + + /* + * Unstable writes want to make use of multiple connections for better perf. + * Note that they aren't affected by the optimistic concurrency backoff + * issues as seen by stable writes. + */ + if (!inode->is_stable_write()) { + set_csched(CONN_SCHED_RR); + } do { rpc_retry = false; stats.on_rpc_issue(); if (rpc_nfs3_writev_task(get_rpc_ctx(), - write_iov_callback, &args, - bciov->iov, - bciov->iovcnt, - this) == NULL) { + write_iov_callback, &args, + bciov->iov, + bciov->iovcnt, + this) == NULL) { stats.on_rpc_cancel(); /* * Most common reason for this is memory allocation failure, @@ -2308,6 +2583,150 @@ void rpc_task::run_access() } while (rpc_retry); } +static void perform_inline_writes(struct rpc_task *task, + struct nfs_inode *inode) +{ + AZLogInfo("[{}] Performing inline write", inode->get_fuse_ino()); + + const size_t length = task->rpc_api->write_task.get_size(); + std::vector bc_vec; + + /* + * Grab flush_lock to get exclusive list of dirty chunks, which are not + * already being flushed. This also protects us racing with a truncate + * call and growing the file size after truncate shrinks the file. + */ + inode->flush_lock(); + + if (!inode->get_filecache()->do_inline_write()) { + inode->flush_unlock(); + task->reply_write(length); + return; + } + + /* + * flush_now is the number of bytes that need to be flushed to + * make space for the new writes. If it's 0, then we don't need to + * flush more data we can just wait for the ongoing flush/commit + * to complete. + */ + const uint64_t flush_now = + inode->get_filecache()->get_inline_flush_bytes(); + + if ((flush_now == 0) && !inode->is_stable_write()) { + AZLogInfo("No new bytes to flush, Wait for ongoing flush/commit " + "to complete"); + + /* + * get_inline_flush_bytes() tells us that we have no new data to flush + * inline, but since we are in memory pressure we need to slow down the + * writers by waiting till all the ongoing flush+commit complete. + * wait_for_ongoing_flush_commit() will complete task after all the + * ongoing flush and subsequent commit completes. + */ + inode->wait_for_ongoing_flush_commit(task); + inode->flush_unlock(); + return; + } else { + uint64_t size = 0; + if (!inode->is_stable_write()) { + assert(flush_now > 0); + /* + * We need to wait for the ongoing flush/commit to complete, as + * for unstable write case we only issue one active flush at a + * time. + * This is only for the case where we have to prune some data + * to make space for the new writes. Usually this will not be + * happen so often, it's ok to wait here. + * + * TODO: See if we can eliminate this wait as it blocks the fuse + * thread. + */ + AZLogWarn("Wait for ongoing flush/commit to complete!!"); + inode->wait_for_ongoing_flush_commit(); + + /* + * We already waited for flush and commit to complete and we have + * the flush_lock held, so no new flush and commit can start. + */ + assert(!inode->is_commit_in_progress()); + assert(!inode->get_filecache()->is_flushing_in_progress()); + + /* + * If the write is not stable, we need to copy the application + * data into the membufs and issue the writes to the backend. + * If we don't find flush_now contiguous bytes, then we switch + * to stable write. + */ + bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(size); + if (size >= flush_now) { + assert(!bc_vec.empty()); + /* + * Perform inline sync. + * In case of unstable writes, we need to complete the write + * on commit complete. So, we don't pass the 3rd arg to + * sync_membufs(), but add the task to the commit_waiters list. + */ + inode->sync_membufs(bc_vec, false /* is_flush */, nullptr); + inode->schedule_or_start_commit(); + + inode->flush_unlock(); + + inode->add_task_to_commit_waiters(task); + return; + } + } + + if (!inode->is_stable_write()) { + AZLogWarn("Not enough contiguous dirty data to flush, may be " + "pattern is not sequential. flush_now={}, size={}", + flush_now, size); + + inode->switch_to_stable_write(); + } + + /* + * If the write is stable, we can directly write to the + * membufs and issue the writes to the backend. + */ + assert(inode->is_stable_write()); + bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range(); + if (!bc_vec.empty()) { + /* + * Perform inline sync. + * Since we pass the 3rd arg to sync_membufs, it tells sync_membufs() + * to call the fuse callback after all the issued backend writes + * complete. This will be done asynchronously while the sync_membufs() + * call will return after issuing the writes. + */ + inode->sync_membufs(bc_vec, false /* is_flush */, task); + inode->flush_unlock(); + + // Free up the fuse thread without completing the application write. + return; + } + + /* + * Another application write raced with us and it got to do the + * inline write before us. Try adding this task to the flush_waiters + * list for the membuf where we copied the application data. If + * we are able to add successfully, then we will call the fuse + * callback when the flush completes at the backend. This will + * slow down the writer application, thus relieving the memory + * pressure. + * If add_flush_waiter() returns false, that means this membuf + * is already flushed by that other thread and we can complete + * the fuse callback in this context. + */ + inode->flush_unlock(); + + inode->add_task_to_flush_waiters(task); + return; + } + + assert(false); +} + void rpc_task::run_write() { // This must be called only for front end tasks. @@ -2318,7 +2737,10 @@ void rpc_task::run_write() const size_t length = rpc_api->write_task.get_size(); struct fuse_bufvec *const bufv = rpc_api->write_task.get_buffer_vector(); const off_t offset = rpc_api->write_task.get_offset(); - const bool sparse_write = (offset > inode->get_file_size(true)); + /* + * TODO: Fix sparse writes. + */ + const bool sparse_write = false; uint64_t extent_left = 0; uint64_t extent_right = 0; @@ -2409,34 +2831,36 @@ void rpc_task::run_write() /* * We have successfully copied the user data into the cache. - * We can have the following cases: - * 1. This new write caused a contiguous extent to be greater than - * max_dirty_extent_bytes(), aka MDEB. + * We can have the following cases (in decreasing order of criticality): + * 1. Cache has dirty+uncommitted data beyond the "inline write threshold", + * ref do_inline_write(). + * In this case we begin flush of the dirty data and/or commit of the + * uncommitted data. + * This is a memory pressure situation and hence we do not complete the + * application write till all the above backend writes complete. + * This will happen when application is writing faster than our backend + * write throughput, eventually dirty data will grow beyond the "inline + * write threshold" and then we have to slow down the writers by delaying + * completion. + * 2. Cache has uncommitted data beyond the "commit threshold", ref + * commit_required(). + * In this case we free up space in the cache by committing data. + * We just initiate a commit while the current write request is + * completed. Note that we want to delay commits so that we can reduce + * the commit calls (commit more data per call) as commit calls sort + * of serialize the writes as we cannot send any other write to the + * server while commit is on. + * 3. Cache has enough dirty data that we can flush. + * For sequential writes, this means the new write caused a contiguous + * extent to be greater than max_dirty_extent_bytes(), aka MDEB, while + * for non-seq writes it would mean that the total dirty data grew beyond + * MDEB. * In this case we begin flush of this contiguous extent (in multiple * parallel wsize sized blocks) since there's no benefit in waiting more * as the data is sufficient for the server scheduler to effectively * write, in optimal sized blocks. * We complete the application write rightaway without waiting for the * flush to complete as we are not under memory pressure. - * This will happen when user is sequentially writing to the file. - * 2. A single contiguous extent is not greater than MDEB but total dirty - * bytes waiting to be flushed is more than MDEB. - * In this case we begin flush of the entire dirty data from the cache - * as we have sufficient data for the server scheduler to perform - * batched writes effectively. - * We complete the application write rightaway without waiting for the - * flush to complete as we are not under memory pressure. - * This will happen when user is writing to the file in a random or - * pseudo-sequential fashion. - * 3. Cache has dirty data beyond the "inline write" threshold, ref - * do_inline_write(). - * In this case we begin flush of the entire dirty data. - * This is a memory pressure situation and hence we do not complete the - * application write till all the backend writes complete. - * This will happen when user is writing faster than our backend write - * throughput, eventually dirty data will grow beyond the "inline write" - * threshold and then we have to slow down the writers by delaying - * completion. * * Other than this we have a special case of "write beyond eof" (termed * sparse write). In sparse write case also we perform "inline write" of @@ -2450,163 +2874,187 @@ void rpc_task::run_write() */ /* - * Do we need to perform "inline write"? - * Inline write implies, we flush all the dirty data and wait for all the - * corresponding backend writes to complete. + * If the extent size exceeds the max allowed dirty size as returned by + * max_dirty_extent_bytes(), then it's time to flush the extent. + * Note that this will cause sequential writes to be flushed at just the + * right intervals to optimize fewer write calls and also allowing the + * server scheduler to merge better. + * See bytes_to_flush for how random writes are flushed. + * + * Note: max_dirty_extent is static as it doesn't change after it's + * queried for the first time. + */ + static const uint64_t max_dirty_extent = + inode->get_filecache()->max_dirty_extent_bytes(); + assert(max_dirty_extent > 0); + + /* + * Check what kind of limit we have hit. */ const bool need_inline_write = (sparse_write || inode->get_filecache()->do_inline_write()); + const bool need_commit = + inode->get_filecache()->commit_required(); + const bool need_flush = + ((extent_right - extent_left) >= max_dirty_extent) || + inode->get_filecache()->flush_required(); - if (need_inline_write) { - INC_GBL_STATS(inline_writes, 1); + AZLogDebug("[{}] handle write (sparse={}, need_inline_write={}, " + "need_commit={}, need_flush={}, extent=[{}, {}))", + inode->get_fuse_ino(), sparse_write, need_inline_write, + need_commit, need_flush, extent_left, extent_right); - AZLogDebug("[{}] Inline write (sparse={}), {} bytes, extent @ [{}, {})", - ino, sparse_write, (extent_right - extent_left), - extent_left, extent_right); + /* + * Nothing to do, we can complete the application write rightaway. + * This should be the happy path! + */ + if (!need_inline_write && !need_commit && !need_flush) { + reply_write(length); + return; + } + if (need_flush && ((extent_right - extent_left) < max_dirty_extent)) { /* - * Grab flush_lock to get exclusive list of dirty chunks, which are not - * already being flushed. This also protects us racing with a truncate - * call and growing the file size after truncate shrinks the file. + * This is the case of non-sequential writes causing enough dirty + * data to be accumulated, need to flush all of that. */ - inode->flush_lock(); - - std::vector bc_vec = - inode->get_filecache()->get_dirty_nonflushing_bcs_range(); - - if (bc_vec.empty()) { - /* - * Another application write raced with us and it got to do the - * inline write before us. Try adding this task to the flush_waiters - * list for the membuf where we copied the application data. If - * we are able to add successfully, then we will call the fuse - * callback when the flush completes at the backend. This will - * slow down the writer application, thus relieving the memory - * pressure. - * If add_flush_waiter() returns false, that means this membuf - * is already flushed by that other thread and we can complete - * the fuse callback in this context. - */ - inode->flush_unlock(); + extent_left = 0; + extent_right = UINT64_MAX; + } - if (inode->get_filecache()->add_flush_waiter(offset, - length, - this)) { - AZLogDebug("[{}] Inline write, membuf not flushed, write " - "[{}, {}) will be completed when membuf is flushed", - ino, offset, offset+length); - return; - } else { - AZLogDebug("[{}] Inline write, membuf already flushed, " - "completing fuse write [{}, {})", - ino, offset, offset+length); - reply_write(length); - return; - } - } - /* - * Perform inline sync. - * Since we pass the 3rd arg to sync_membufs, it tells sync_membufs() - * to call the fuse callback after all the issued backend writes - * complete. This will be done asynchronously while the sync_membufs() - * call will return after issuing the writes. - * - * Note: sync_membufs() can free this rpc_task if all issued backend - * writes complete before sync_membufs() can return. - * DO NOT access rpc_task after sync_membufs() call. - */ - inode->sync_membufs(bc_vec, false /* is_flush */, this); - inode->flush_unlock(); + // Case 1: Inline writes + /* + * Do we need to perform "inline write"? + * Inline write implies, we flush dirty data and commit uncommitted data + * and wait for all the corresponding backend writes to complete. + */ + if (need_inline_write) { + INC_GBL_STATS(inline_writes, 1); // Free up the fuse thread without completing the application write. + perform_inline_writes(this, inode); return; } + // Case 2: Commit /* - * Ok, we don't need to do inline writes. See if we have enough dirty - * data and we need to start async flush. + * We don't need to do inline writes. See if we need to commit the + * uncommitted data to the backend. We just need to stat the commit + * and not hold the current write task till the commit completes. */ + if (need_commit) { + inode->flush_lock(); - /* - * If the extent size exceeds the max allowed dirty size as returned by - * max_dirty_extent_bytes(), then it's time to flush the extent. - * Note that this will cause sequential writes to be flushed at just the - * right intervals to optimize fewer write calls and also allowing the - * server scheduler to merge better. - * See bytes_to_flush for how random writes are flushed. - * - * Note: max_dirty_extent is static as it doesn't change after it's - * queried for the first time. - */ - static const uint64_t max_dirty_extent = - inode->get_filecache()->max_dirty_extent_bytes(); - assert(max_dirty_extent > 0); + /* + * Start commit if no flushing is in progress else mark commit + * pending, so that last flush can then start the commit. + */ + inode->schedule_or_start_commit(); - /* - * How many bytes in the cache need to be flushed. These are dirty chunks - * which have not started flushing yet. - */ - const uint64_t bytes_to_flush = - inode->get_filecache()->get_bytes_to_flush(); + /* + * Note: Commit request sent by the above call can potentially + * complete before we reach here. In that case the following + * assert will fail. It's very unlikely, so leave the assert + * as it's useful. + */ + assert(inode->is_commit_in_progress()); - AZLogDebug("[{}] extent_left: {}, extent_right: {}, size: {}, " - "bytes_to_flush: {} (max_dirty_extent: {})", - ino, extent_left, extent_right, - (extent_right - extent_left), - bytes_to_flush, - max_dirty_extent); + inode->flush_unlock(); + assert(!need_inline_write); + } - if ((extent_right - extent_left) < max_dirty_extent) { + // Case 3: Flush + if (need_flush) { /* - * Current extent is not large enough to be flushed, see if we have - * enough total dirty data that needs to be flushed. This is to cause - * random writes to be periodically flushed. + * We need to flush the dirty data in the range [extent_left, extent_right), + * get the membufs for the dirty data. + * We don't want to run over an inprogress truncate and resetting the file + * size set by truncate, so grab the is_flushing lock. */ - if (bytes_to_flush < max_dirty_extent) { + inode->flush_lock(); + + /* + * If flush is required but flush/commit already in progress, then + * add this task to the respective waiters list and return. + * As we already have 1GB worth of dirty data in the cache, we don't + * want to add more data to the cache. So we wait for the ongoing + * flush/commit. + * + * XXX What if is_commit_in_progress() returns false but some other + * thread starts commit right after that? + * Note that we hold flush_lock which only protects us against some + * other thread starting a flush but commit is protected by + * commit_lock_5. + */ + if (inode->is_commit_in_progress()) { + inode->flush_unlock(); /* - * No memory pressure. + * If commit is in progress, we need to wait for the commit + * to complete before we can complete the write. */ - AZLogDebug("Reply write without syncing to Blob"); - reply_write(length); + inode->add_task_to_commit_waiters(this); + return; + } + + if (!inode->is_stable_write() && + inode->get_filecache()->is_flushing_in_progress()) { + inode->flush_unlock(); + /* + * If flush is in progress, we need to wait for the flush to + * complete before we can proceed with the write. + * Note that for unstable write we don't start a new flush if + * there's an ongoing one, but for stable it's fine to start + * another flush. + */ + inode->add_task_to_flush_waiters(this); return; } /* - * This is the case of non-sequential writes causing enough dirty - * data to be accumulated, need to flush all of that. + * If we are here, then we need to flush the dirty data to the backend. + * We don't need to wait for the flush to complete, we can complete the + * application write rightaway. */ - extent_left = 0; - extent_right = UINT64_MAX; - } + uint64_t size = 0; + std::vector bc_vec; - /* - * We need to flush the dirty data in the range [extent_left, extent_right), - * get the membufs for the dirty data. - * We don't want to run over an inprogress truncate and resetting the file - * size set by truncate, so grab the is_flushing lock. - */ - inode->flush_lock(); + /* + * If the inode is stable write, then get the dirty membufs in the range. + * otherwise get the dirty membufs contigious range. + */ + if (inode->is_stable_write()) { + bc_vec = + inode->get_filecache()->get_dirty_nonflushing_bcs_range(extent_left, + extent_right); + } else { + bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(size); + } - std::vector bc_vec = - inode->get_filecache()->get_dirty_nonflushing_bcs_range(extent_left, - extent_right); + if (bc_vec.size() == 0) { + inode->flush_unlock(); + reply_write(length); + return; + } - if (bc_vec.size() == 0) { + /* + * Pass is_flush as false, since we don't want the writes to complete + * before returning. + */ + inode->sync_membufs(bc_vec, false /* is_flush */); inode->flush_unlock(); + + // Send reply to original request without waiting for the backend write to complete. reply_write(length); return; } /* - * Pass is_flush as false, since we don't want the writes to complete - * before returning. + * If we reached true, then we have need_commit true, but need_flush false. */ - inode->sync_membufs(bc_vec, false /* is_flush */); - inode->flush_unlock(); - - // Send reply to original request without waiting for the backend write to complete. + assert(need_commit && !need_flush); reply_write(length); + return; } void rpc_task::run_flush()