Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 68 additions & 23 deletions turbonfs/inc/fcsm.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,54 +69,55 @@ class fcsm
struct nfs_inode *_inode);

/**
* Ensure 'flush_bytes' additional bytes are flushed from the cache, above
* and beyond what's already flushed or flushing now. flush_bytes==0 implies
* flush all dirty bytes.
* Ensure all or some dirty bytes are flushed or scheduled for flushing (if
* a flush or commit is already ongoing).
* If the state machine is currently not running, it'll kick off the state
* machine by calling sync_membufs(), else it'll add a new flush target to
* ftgtq, which will be run by on_flush_complete() when the ongoing flush
* completes. If task is non-null it is the frontend write task which
* initiated this flush and in that case a blocking target will be added
* which means the specified task will block till the requested target
* completes, else a non-blocking target will be added which just requests
* the specific amount of bytes to be flushed w/o having the application
* wait.
* dirty bytes to be flushed w/o having the application wait.
*
* ensure_flush() provides the following guarantees:
* - Additional flush_bytes bytes will be flushed. This is beyond what's
* already flushed or scheduled for flush.
* - It'll flush all or part of cached dirty bytes starting from the lowest
* offset. The actual number of bytes flushed is decided based on various
* config settings and the memory pressure.
* - On completion of that flush, task will be completed.
* If after grabbing flush_lock it figures out that the requested flush
* target is already met, it completes the task rightaway.
*
* write_off and write_len describe the current application write call.
* write_len is needed for completing the write rightaway for non-blocking
* cases, where 'task' is null.
* They are needed for logging when task is nullptr.
*
* LOCKS: flush_lock.
* Caller MUST hold the flush_lock.
*/
void ensure_flush(uint64_t flush_bytes,
uint64_t write_off,
void ensure_flush(uint64_t write_off,
uint64_t write_len,
struct rpc_task *task = nullptr);
struct rpc_task *task = nullptr,
std::atomic<bool> *conditional_variable = nullptr);

/**
* Ensure 'commit_bytes' additional bytes are committed from the cache,
* above and beyond what's already committed or committing now.
* Ensure all or some commit-pending bytes are committed or scheduled for
* commit (if a flush or commit is already ongoing).
* If 'task' is null it'll add a non-blocking commit target to ctgtq, else
* it'll add a blocking commit target for completing task when given commit
* goal is met.
*
* Caller MUST hold the flush_lock.
*/
void ensure_commit(uint64_t commit_bytes,
struct rpc_task *task = nullptr);
void ensure_commit(uint64_t write_off,
uint64_t write_len,
struct rpc_task *task = nullptr,
std::atomic<bool> *conditional_variable = nullptr,
bool commit_full = false);

/**
* Callbacks to be called when flush/commit successfully complete.
* These will update flushed_seq_num/committed_seq_num and run flush/commit
* targets from ftgtq/ctgtq as appropriate.
*/
void on_flush_complete(uint64_t flush_bytes);
void on_commit_complete();
void on_commit_complete(uint64_t commit_bytes);

/**
* Is the state machine currently running, i.e. it has sent (one or more)
Expand Down Expand Up @@ -148,9 +149,27 @@ class fcsm
void mark_running();
void clear_running();

/**
* Nudge the flush-commit state machine.
* After the fuse thread copies the application data into the cache, it
* must call this to let FCSM know that some more dirty data has been
* added. It checks the dirty data against the configured limits and
* decides which of the following action to take:
* - Do nothing, as dirty data is still within limits.
* - Start flushing.
* - Start committing.
* - Flush/commit while blocking the task till flush/commit completes.
*/
void run(struct rpc_task *task,
uint64_t extent_left,
uint64_t extent_right);

/**
* Call when more writes are dispatched, or prepared to be dispatched.
* This MUST be called before the write callback can be called.
* These must correspond to membufs which are dirty and not already
* flushing.
* This MUST be called before the write_iov_callback() can be called, i.e.,
* before the actual write call is issued.
*/
void add_flushing(uint64_t bytes);

Expand Down Expand Up @@ -180,6 +199,22 @@ class fcsm
return (fc_cb_count() > 0);
}

/**
* Call when more commit are dispatched, or prepared to be dispatched.
* These must correspond to membufs which are already flushed, i.e., they
* must not be dirty or flushing and must be commit pending.
* This MUST be called before the commit_callback can be called, i.e.,
* before the actual commit call is issued.
*/
void add_committing(uint64_t bytes);

/**
* ctgtq_cleanup() is called when we switch to stable writes.
* It clears up all the queued commit targets as stable writes will not
* cause a commit and those targets would never normally complete.
*/
void ctgtq_cleanup();

private:
/*
* The singleton nfs_client, for convenience.
Expand Down Expand Up @@ -216,7 +251,8 @@ class fcsm
fctgt(struct fcsm *fcsm,
uint64_t _flush_seq,
uint64_t _commit_seq,
struct rpc_task *_task = nullptr);
struct rpc_task *_task = nullptr,
std::atomic<bool> *conditional_variable = nullptr);

/*
* Flush and commit targets (in terms of flushed_seq_num/committed_seq_num)
Expand All @@ -235,6 +271,12 @@ class fcsm
* Pointer to the containing fcsm.
*/
struct fcsm *const fcsm = nullptr;

/*
* If non-null, it's initial value is false.
* Caller who enqueue the target waiting for it to be true.
*/
std::atomic<bool> *conditional_variable = nullptr;
#if 0
/*
* Has the required flush/commit task started?
Expand Down Expand Up @@ -262,7 +304,10 @@ class fcsm
/*
* Continually increasing seq number of the last byte successfully flushed
* and committed. Flush-commit targets (fctgt) are expressed in terms of
* these.
* these. These are actually numerically one more than the last
* flushing/commiting and flushed/committed byte's seq number, e.g., if the
* last byte flushed was byte #1 (0 and 1 are flushed), then flushed_seq_num
* would be 2.
* Note that these are *not* offsets in the file but these are cumulative
* values for total bytes flushed/committed till now since the file was
* opened. In case of overwrite same byte(s) may be repeatedly flushed
Expand Down
15 changes: 10 additions & 5 deletions turbonfs/inc/file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,11 @@ struct bytes_chunk
*/
bool is_new = false;

/**
* Get the inode corresponding to this bc.
*/
struct nfs_inode *get_inode() const;

/**
* Return membuf corresponding to this bytes_chunk.
* This will be used by caller to synchronize operations on the membuf.
Expand Down Expand Up @@ -1200,8 +1205,8 @@ class bytes_chunk_cache
* check for that after holding the membuf lock, before it tries to
* flush those membuf(s).
*/
std::vector<bytes_chunk> get_dirty_bc_range(uint64_t st_off,
uint64_t end_off) const;
std::vector<bytes_chunk> get_dirty_bc_range(
uint64_t st_off = 0, uint64_t end_off = UINT64_MAX) const;

/*
* Returns dirty chunks which are not already flushing, in the given range,
Expand Down Expand Up @@ -1239,8 +1244,8 @@ class bytes_chunk_cache
* check for that after holding the membuf lock, before it tries to
* commit those membuf(s).
*/
std::vector<bytes_chunk> get_flushing_bc_range(uint64_t st_off,
uint64_t end_off) const;
std::vector<bytes_chunk> get_flushing_bc_range(
uint64_t st_off = 0, uint64_t end_off = UINT64_MAX) const;

/**
* Returns contiguous dirty (and not flushing) chunks from chunmap, starting
Expand Down Expand Up @@ -1526,7 +1531,7 @@ class bytes_chunk_cache
* e.g.,
* if the max_dirty_extent_bytes() is 1GB, then we have
* flush_required() @ 1GB
* commit_required() @ 1GB
* commit_required() @ 2GB
* do_inline_write() @ 4GB.
*
* Assuming backend flush speed of 1GB/s and memory write speed of
Expand Down
66 changes: 60 additions & 6 deletions turbonfs/inc/nfs_inode.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,29 @@ struct nfs_inode
*/
struct stat attr;

/**
* We maintain following multiple views of the file and thus multiple file
* sizes for those views.
* - Cached.
* This is the view of the file that comprises of data that has been
* written by the application and saved in file cache. It may or may not
* have been flushed and/or committed. This is the most uptodate view of
* the file and applications must use this view.
* cached_filesize tracks the file size for this view.
* - Uncommited.
* This is the view of the file that tracks data that has been flushed
* using UNSTABLE writes but not yet COMMITted to the Blob. This view of
* the file is only used to see if the next PB call will write after the
* last PB'ed byte and thus can be appended.
* putblock_filesize tracks the file size for this view.
* - Committed.
* This is the view of the file that tracks data committed to the Blob.
* Other clients will see this view.
* attr.st_size tracks the file size for this view.
*/
off_t cached_filesize = 0;
mutable off_t putblock_filesize = 0;

/*
* Has this inode seen any non-append write?
* This starts as false and remains false as long as copy_to_cache() only
Expand All @@ -310,7 +333,7 @@ struct nfs_inode
* Note: As of now, we are not using this flag as commit changes not yet
* integrated, so we are setting this flag to true.
*/
bool stable_write = true;
bool stable_write = false;

public:
/*
Expand Down Expand Up @@ -1400,21 +1423,52 @@ struct nfs_inode
* initiate any new flush operations while some truncate call is in
* progress (which must have held the flush_lock).
*/
int flush_cache_and_wait(uint64_t start_off = 0,
uint64_t end_off = UINT64_MAX);
int flush_cache_and_wait();

/**
* Wait for currently flushing membufs to complete.
* Wait for currently flushing/committing membufs to complete.
* It will wait till the currently flushing membufs complete and then
* issue a commit and wait for that. If no flush is ongoing but there's
* commit_pending data, it'll commit that and return after the commit
* completes.
* Returns 0 on success and a positive errno value on error.
* Once it returns, commit_pending will be 0.
*
* Note : Caller must hold the inode flush_lock to ensure that
* no new membufs are added till this call completes.
* It may release the flush_lock() if it has to wait for ongoing
* flush/write requests to complete, but it'll exit with flush_lock
* held.
*/
int wait_for_ongoing_flush(uint64_t start_off = 0,
uint64_t end_off = UINT64_MAX);
int wait_for_ongoing_flush();

/**
* commit_membufs() is called to commit uncommitted membufs to the Blob.
* It creates commit RPC and sends it to the NFS server.
*/
void commit_membufs(std::vector<bytes_chunk> &bcs);

/**
* switch_to_stable_write() is called to switch the inode to stable write
* mode. It waits for all ongoing flush and subsequent commit to complete.
* If not already scheduled, it'll perform an explicit commit after the
* flush complete.
* Post that it'll mark inode for stable write and return. From then on
* any writes to this inode will be sent as stable writes.
*/
void switch_to_stable_write();

/**
* Check if stable write is required for the given offset.
* Given offset is the start of contiguous dirty membufs that need to be
* flushed to the Blob.
*/
bool check_stable_write_required(off_t offset);

/**
* Wait for ongoing commit operation to complete.
*/
void wait_for_ongoing_commit();

/**
* Sync the dirty membufs in the file cache to the NFS server.
Expand Down
9 changes: 0 additions & 9 deletions turbonfs/inc/rpc_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,6 @@ class rpc_stats_az
*/
void on_rpc_issue()
{
// FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead.
assert(optype != FUSE_FLUSH);

assert(stamp.issue == 0);
stamp.issue = get_current_usecs();
assert(stamp.issue >= stamp.create);
Expand All @@ -156,9 +153,6 @@ class rpc_stats_az
*/
void on_rpc_cancel()
{
// FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead.
assert(optype != FUSE_FLUSH);

assert(stamp.issue != 0);
assert((int64_t) stamp.issue <= get_current_usecs());
assert(stamp.dispatch == 0);
Expand All @@ -178,9 +172,6 @@ class rpc_stats_az
*/
void on_rpc_complete(struct rpc_pdu *pdu, nfsstat3 status)
{
// FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead.
assert(optype != FUSE_FLUSH);

assert((status == NFS3ERR_RPC_ERROR) ||
(nfsstat3_to_errno(status) != -ERANGE));

Expand Down
1 change: 1 addition & 0 deletions turbonfs/inc/rpc_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -2428,6 +2428,7 @@ struct rpc_task
*/
bool add_bc(const bytes_chunk& bc);
void issue_write_rpc();
void issue_commit_rpc();

#ifdef ENABLE_NO_FUSE
/*
Expand Down
Loading