Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 88 additions & 7 deletions turbonfs/inc/file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,8 @@ class bytes_chunk_cache
*/
std::vector<bytes_chunk> get_commit_pending_bcs() const;

std::vector<bytes_chunk> get_contigious_dirty_bcs(uint64_t& size);

/**
* Drop cached data in the given range.
* This must be called only for file-backed caches. For non file-backed
Expand Down Expand Up @@ -1262,7 +1264,7 @@ class bytes_chunk_cache

/**
* Maximum size a dirty extent can grow before we should flush it.
* This is 60% of the allowed cache size or 1GB whichever is lower.
* This is 30% of the allowed cache size or 1GB whichever is lower.
* The reason for limiting it to 1GB is because there's not much value in
* holding more data than the Blob NFS server's scheduler cache size.
* We want to send as prompt as possible to utilize the n/w b/w but slow
Expand All @@ -1274,11 +1276,38 @@ class bytes_chunk_cache
static const uint64_t max_total =
(aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL);
assert(max_total != 0);
static const uint64_t max_dirty_extent = (max_total * 0.6);
static const uint64_t max_dirty_extent =
std::max((uint64_t)(max_total * 0.3), (uint64_t)AZNFSCFG_WSIZE_MAX);

return std::min(max_dirty_extent, uint64_t(1024 * 1024 * 1024ULL));
}

/*
* Maximum size of commit bytes that can be pending in cache, before we
* should commit it to Blob.
* It should be greater than max_dirty_extent_bytes() and smaller than
* inline_dirty_threshold. So, that inline pruning can be avoided.
* This is 60% of the allowed cache size.
* f.e = Cache size of 4GB then max_commit_bytes = 2.4GB
* - Flush will start every 1GB dirty data and each 1GB dirty data
* converted to commit_pending_bytes.
*
* This is 60% of the allowed cache size.
*/
uint64_t max_commit_bytes() const
{
// Maximum cache size allowed in bytes.
static const uint64_t max_total =
(aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL);
assert(max_total != 0);

static const uint64_t max_commit_bytes =
(((uint64_t)(max_total * 0.6))/ max_dirty_extent_bytes()) * max_dirty_extent_bytes();

return std::max((max_commit_bytes - AZNFSCFG_WSIZE_MIN),
((2 * max_dirty_extent_bytes()) - AZNFSCFG_WSIZE_MIN));
}

/**
* Get the amount of dirty data that needs to be flushed.
* This excludes the data which is already flushing.
Expand Down Expand Up @@ -1327,21 +1356,73 @@ class bytes_chunk_cache
return bytes_flushing > 0;
}

/*
* get_bytes_to_prune() returns the number of bytes that need to be flushed
* inline to free up the space. If there are enough bytes_flushing then we
* can just wait for them to complete.
*
* Note : We are not considering bytes_commit_pending in this calculation.
* If bytes_commit_pending are high then commit already started and
* if bytes_flushing are high once flushing is done commit will be
* triggered.
*/
uint64_t get_bytes_to_prune() const
{
static const uint64_t max_dirty_allowed_per_cache =
max_dirty_extent_bytes() * 2;
int64_t total_bytes =
std::max(int64_t(bytes_dirty - bytes_flushing), int64_t(0));
const bool local_pressure = total_bytes > (int64_t)max_dirty_allowed_per_cache;

if (local_pressure) {
return max_dirty_extent_bytes();
}

/*
* Global pressure is when get_prune_goals() returns non-zero bytes
* to be pruned inline.
*/
uint64_t inline_bytes;

/*
* TODO: Noisy neighbor syndrome, where one file is hogging the cache,
* inline pruning will be triggered for other files.
*/
get_prune_goals(&inline_bytes, nullptr);
return std::max(int64_t(inline_bytes - (bytes_flushing + bytes_commit_pending)),
(int64_t)0);
}

/**
* This should be called by writer threads to find out if they must wait
* for the write to complete. This will check both the cache specific and
* global memory pressure.
*/
bool do_inline_write() const
{
// Maximum cache size allowed in bytes.
static const uint64_t max_total =
(aznfsc_cfg.cache.data.user.max_size_mb * 1024 * 1024ULL);
assert(max_total != 0);

/*
* Allow two dirty extents before we force inline write.
* This way one of the extent can be getting flushed and we can populate
* the second one.
* Allow upto 80% of the cache size to be dirty.
* After this we enforce inline pruning for writers. This is to avoid
* writers getting blocked due to memory pressure. We allow upto 80%
* because we don't want to prune too aggressively, as it hurts write
* performance.
*/
static const uint64_t max_dirty_allowed_per_cache =
max_dirty_extent_bytes() * 2;
const bool local_pressure = bytes_dirty > max_dirty_allowed_per_cache;
max_total * 0.8;

/*
* If current cache usage is more than the max_dirty_allowed_per_cache
* limit, we need to enforce inline pruning. Cache usage is the sum of
* bytes_dirty and bytes_commit_pending. Membufs can be dirty or commit
* pending, but not both at the same time. Both dirty and commit pending
* occupy memory, so we need to account for both.
*/
const bool local_pressure = (bytes_dirty + bytes_commit_pending) > max_dirty_allowed_per_cache;

if (local_pressure) {
return true;
Expand Down
51 changes: 44 additions & 7 deletions turbonfs/inc/nfs_inode.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ struct nfs_inode
*/
struct stat attr;

off_t in_cache_filesize = 0;
off_t putblock_filesize = 0;

/*
* Has this inode seen any non-append write?
* This starts as false and remains false as long as copy_to_cache() only
Expand All @@ -300,7 +303,7 @@ struct nfs_inode
* Note: As of now, we are not using this flag as commit changes not yet
* integrated, so we are setting this flag to true.
*/
bool stable_write = true;
bool stable_write = false;

public:
/*
Expand Down Expand Up @@ -418,6 +421,12 @@ struct nfs_inode

std::atomic<commit_state_t> commit_state = commit_state_t::COMMIT_NOT_NEEDED;

/*
* commit_lock_5 is used to synchronize flush thread and write thread
* for commit operation.
*/
std::mutex commit_lock_5;

/**
* TODO: Initialize attr with postop attributes received in the RPC
* response.
Expand Down Expand Up @@ -947,14 +956,14 @@ struct nfs_inode
* updated from postop attributes. We will need to correctly
* update that when file is truncated f.e.
*/
if (!non_append_writes_seen && (offset != attr.st_size)) {
if (!non_append_writes_seen && (offset != in_cache_filesize)) {
non_append_writes_seen = true;
AZLogInfo("[{}] Non-append write seen [{}, {}), file size: {}",
ino, offset, offset+length, attr.st_size);
ino, offset, offset+length, in_cache_filesize);
}

if (new_size > attr.st_size) {
attr.st_size = new_size;
if (new_size > in_cache_filesize) {
in_cache_filesize = new_size;
}
}

Expand Down Expand Up @@ -1130,7 +1139,8 @@ struct nfs_inode
bool is_commit_in_progress() const
{
assert(commit_state != commit_state_t::INVALID);
return (commit_state == commit_state_t::COMMIT_IN_PROGRESS);
return ((commit_state == commit_state_t::COMMIT_IN_PROGRESS) ||
(commit_state == commit_state_t::NEEDS_COMMIT));
}

/**
Expand Down Expand Up @@ -1316,7 +1326,7 @@ struct nfs_inode
* progress (which must have held the is_flushing lock).
*/
int flush_cache_and_wait(uint64_t start_off = 0,
uint64_t end_off = UINT64_MAX);
uint64_t end_off = UINT64_MAX, bool is_release = true);

/**
* Wait for currently flushing membufs to complete.
Expand All @@ -1328,6 +1338,33 @@ struct nfs_inode
int wait_for_ongoing_flush(uint64_t start_off = 0,
uint64_t end_off = UINT64_MAX);

/*
* commit_membufs() is called to commit uncommitted membufs to the BLOB.
* It creates commit RPC and sends it to the NFS server.
*/
void commit_membufs();


/*
* switch_to_stable_write() is called to switch the inode to stable write
* mode. There is should be no ongoing commit/flusing operation when this
* is called. It creates a commit RPC to commit all the uncopmmitted membufs
* to the BLOB.
*/
void switch_to_stable_write();

/**
* Check if stable write is required for the given offset.
* Given offset is the start of contigious dirty membufs that need to be
* flushed to the BLOB.
*/
bool check_stable_write_required(off_t offset);

/**
* Wait for ongoing commit operation to complete.
*/
void wait_for_ongoing_commit();

/**
* Sync the dirty membufs in the file cache to the NFS server.
* All contiguous dirty membufs are clubbed together and sent to the
Expand Down
9 changes: 0 additions & 9 deletions turbonfs/inc/rpc_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ class rpc_stats_az
*/
void on_rpc_issue()
{
// FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead.
assert(optype != FUSE_FLUSH);

assert(stamp.issue == 0);
stamp.issue = get_current_usecs();
assert(stamp.issue >= stamp.create);
Expand All @@ -133,9 +130,6 @@ class rpc_stats_az
*/
void on_rpc_cancel()
{
// FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead.
assert(optype != FUSE_FLUSH);

assert(stamp.issue != 0);
assert((int64_t) stamp.issue <= get_current_usecs());
assert(stamp.dispatch == 0);
Expand All @@ -155,9 +149,6 @@ class rpc_stats_az
*/
void on_rpc_complete(struct rpc_pdu *pdu, nfsstat3 status)
{
// FUSE_FLUSH is never issued as an RPC task to the server. FUSE_WRITE is issued instead.
assert(optype != FUSE_FLUSH);

assert(nfsstat3_to_errno(status) != -ERANGE);

req_size = rpc_pdu_get_req_size(pdu);
Expand Down
5 changes: 4 additions & 1 deletion turbonfs/inc/rpc_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,6 @@ struct api_task_info
*/
fuse_req *req = nullptr;


/*
* Only valid for FUSE_READ.
*
Expand Down Expand Up @@ -2177,6 +2176,8 @@ struct rpc_task
return stats;
}

void set_task_csched(bool stable_write);

struct nfs_context *get_nfs_context() const;

struct rpc_context *get_rpc_ctx() const
Expand Down Expand Up @@ -2503,6 +2504,8 @@ struct rpc_task
bool add_bc(const bytes_chunk& bc);
void issue_write_rpc();

void issue_commit_rpc();

#ifdef ENABLE_NO_FUSE
/*
* In nofuse mode we re-define these fuse_reply functions to copy the
Expand Down
36 changes: 36 additions & 0 deletions turbonfs/src/file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,42 @@ std::vector<bytes_chunk> bytes_chunk_cache::get_flushing_bc_range(
return bc_vec;
}

std::vector<bytes_chunk> bytes_chunk_cache::get_contigious_dirty_bcs(uint64_t& size)
{
std::vector<bytes_chunk> bc_vec;
size = 0;

// TODO: Make it shared lock.
const std::unique_lock<std::mutex> _lock(chunkmap_lock_43);
auto it = chunkmap.lower_bound(0);
uint64_t prev_offset = AZNFSC_BAD_OFFSET;

while (it != chunkmap.cend()) {
const struct bytes_chunk& bc = it->second;
struct membuf *mb = bc.get_membuf();

if (mb->is_dirty() && !mb->is_flushing()) {
if (prev_offset != AZNFSC_BAD_OFFSET) {
if (prev_offset != bc.offset) {
break;
} else {
size += bc.length;
prev_offset = bc.offset + bc.length;
}
} else {
size += bc.length;
prev_offset = bc.offset + bc.length;
}
mb->set_inuse();
bc_vec.emplace_back(bc);
}

++it;
}

return bc_vec;
}

std::vector<bytes_chunk> bytes_chunk_cache::get_dirty_bc_range(uint64_t start_off, uint64_t end_off) const
{
std::vector<bytes_chunk> bc_vec;
Expand Down
Loading