From baaba6ef0391c880afd0282db93c8a2c6c9e71a8 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Thu, 19 May 2022 11:44:12 -0700 Subject: [PATCH 1/7] Cluster lock invalidation and shrink spinlocks Cluster lock invalidation and shrinking have very similar work flows. They rarely modify the state of locks and put them on lists for work to process. Today the lists and state modification are protected by the mount-wide lock_info spinlock, which we want to break up. This creates a little work_list struct that has a work_queue, list, and lock. Invalidation and shrinking use this to track locks that are being processed and protect the list with the new spinlock in the struct. This leaves some awkward nesting with the lock_info spinlock because it still protects invididual lock state. That will be fixed as we move towards individual lock refcounting and spinlocks. Signed-off-by: Zach Brown --- kmod/src/lock.c | 124 +++++++++++++++++++++++++++--------------------- kmod/src/lock.h | 2 +- 2 files changed, 71 insertions(+), 55 deletions(-) diff --git a/kmod/src/lock.c b/kmod/src/lock.c index f27fb1e1d..68ba0f7ba 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -69,6 +69,13 @@ * relative to that lock state we resend. */ +struct work_list { + struct work_struct work; + spinlock_t lock; + struct list_head list; +}; + + /* * allocated per-super, freed on unmount. */ @@ -83,10 +90,8 @@ struct lock_info { struct list_head lru_list; unsigned long long lru_nr; struct workqueue_struct *workq; - struct work_struct inv_work; - struct list_head inv_list; - struct work_struct shrink_work; - struct list_head shrink_list; + struct work_list inv_wlist; + struct work_list shrink_wlist; atomic64_t next_refresh_gen; struct dentry *tseq_dentry; @@ -111,6 +116,21 @@ static bool lock_mode_can_write(enum scoutfs_lock_mode mode) return mode == SCOUTFS_LOCK_WRITE || mode == SCOUTFS_LOCK_WRITE_ONLY; } +static void init_work_list(struct work_list *wlist, work_func_t func) +{ + spin_lock_init(&wlist->lock); + INIT_WORK(&wlist->work, func); + INIT_LIST_HEAD(&wlist->list); +} + +static void queue_nonempty_work_list(struct lock_info *linfo, struct work_list *wlist) +{ + assert_spin_locked(&wlist->lock); + + if (!list_empty(&wlist->list)) + queue_work(linfo->workq, &wlist->work); +} + /* * Returns true if a lock with the granted mode can satisfy a requested * mode. This is directional. A read lock is satisfied by a write lock @@ -276,7 +296,7 @@ static struct scoutfs_lock *lock_alloc(struct super_block *sb, RB_CLEAR_NODE(&lock->range_node); INIT_LIST_HEAD(&lock->lru_head); INIT_LIST_HEAD(&lock->inv_head); - INIT_LIST_HEAD(&lock->inv_list); + INIT_LIST_HEAD(&lock->inv_req_list); INIT_LIST_HEAD(&lock->shrink_head); spin_lock_init(&lock->cov_list_lock); INIT_LIST_HEAD(&lock->cov_list); @@ -540,18 +560,6 @@ static void put_lock(struct lock_info *linfo,struct scoutfs_lock *lock) } } -/* - * The caller has made a change (set a lock mode) which can let one of the - * invalidating locks make forward progress. - */ -static void queue_inv_work(struct lock_info *linfo) -{ - assert_spin_locked(&linfo->lock); - - if (!list_empty(&linfo->inv_list)) - queue_work(linfo->workq, &linfo->inv_work); -} - /* * The given lock is processing a received a grant response. Trigger a * bug if the cache is inconsistent. @@ -671,7 +679,7 @@ struct inv_req { */ static void lock_invalidate_worker(struct work_struct *work) { - struct lock_info *linfo = container_of(work, struct lock_info, inv_work); + struct lock_info *linfo = container_of(work, struct lock_info, inv_wlist.work); struct super_block *sb = linfo->sb; struct scoutfs_net_lock *nl; struct scoutfs_lock *lock; @@ -683,9 +691,10 @@ static void lock_invalidate_worker(struct work_struct *work) scoutfs_inc_counter(sb, lock_invalidate_work); spin_lock(&linfo->lock); + spin_lock(&linfo->inv_wlist.lock); - list_for_each_entry_safe(lock, tmp, &linfo->inv_list, inv_head) { - ireq = list_first_entry(&lock->inv_list, struct inv_req, head); + list_for_each_entry_safe(lock, tmp, &linfo->inv_wlist.list, inv_head) { + ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; /* wait until incompatible holders unlock */ @@ -700,6 +709,7 @@ static void lock_invalidate_worker(struct work_struct *work) list_move_tail(&lock->inv_head, &ready); } + spin_unlock(&linfo->inv_wlist.lock); spin_unlock(&linfo->lock); if (list_empty(&ready)) @@ -707,7 +717,7 @@ static void lock_invalidate_worker(struct work_struct *work) /* invalidate once the lock is read */ list_for_each_entry(lock, &ready, inv_head) { - ireq = list_first_entry(&lock->inv_list, struct inv_req, head); + ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; /* only lock protocol, inv can't call subsystems after shutdown */ @@ -727,9 +737,10 @@ static void lock_invalidate_worker(struct work_struct *work) /* and finish all the invalidated locks */ spin_lock(&linfo->lock); + spin_lock(&linfo->inv_wlist.lock); list_for_each_entry_safe(lock, tmp, &ready, inv_head) { - ireq = list_first_entry(&lock->inv_list, struct inv_req, head); + ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); trace_scoutfs_lock_invalidated(sb, lock); @@ -738,20 +749,21 @@ static void lock_invalidate_worker(struct work_struct *work) lock->invalidating_mode = SCOUTFS_LOCK_NULL; - if (list_empty(&lock->inv_list)) { + if (list_empty(&lock->inv_req_list)) { /* finish if another request didn't arrive */ list_del_init(&lock->inv_head); lock->invalidate_pending = 0; wake_up(&lock->waitq); } else { /* another request arrived, back on the list and requeue */ - list_move_tail(&lock->inv_head, &linfo->inv_list); - queue_inv_work(linfo); + list_move_tail(&lock->inv_head, &linfo->inv_wlist.list); + queue_nonempty_work_list(linfo, &linfo->inv_wlist); } put_lock(linfo, lock); } + spin_unlock(&linfo->inv_wlist.lock); spin_unlock(&linfo->lock); } @@ -798,12 +810,14 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, ireq->lock = lock; ireq->net_id = net_id; ireq->nl = *nl; - if (list_empty(&lock->inv_list)) { - list_add_tail(&lock->inv_head, &linfo->inv_list); + if (list_empty(&lock->inv_req_list)) { + spin_lock(&linfo->inv_wlist.lock); + list_add_tail(&lock->inv_head, &linfo->inv_wlist.list); lock->invalidate_pending = 1; - queue_inv_work(linfo); + queue_nonempty_work_list(linfo, &linfo->inv_wlist); + spin_unlock(&linfo->inv_wlist.lock); } - list_add_tail(&ireq->head, &lock->inv_list); + list_add_tail(&ireq->head, &lock->inv_req_list); } spin_unlock(&linfo->lock); @@ -1296,7 +1310,11 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou trace_scoutfs_lock_unlock(sb, lock); wake_up(&lock->waitq); - queue_inv_work(linfo); + + spin_lock(&linfo->inv_wlist.lock); + queue_nonempty_work_list(linfo, &linfo->inv_wlist); + spin_unlock(&linfo->inv_wlist.lock); + put_lock(linfo, lock); spin_unlock(&linfo->lock); @@ -1389,8 +1407,7 @@ bool scoutfs_lock_protected(struct scoutfs_lock *lock, struct scoutfs_key *key, */ static void lock_shrink_worker(struct work_struct *work) { - struct lock_info *linfo = container_of(work, struct lock_info, - shrink_work); + struct lock_info *linfo = container_of(work, struct lock_info, shrink_wlist.work); struct super_block *sb = linfo->sb; struct scoutfs_net_lock nl; struct scoutfs_lock *lock; @@ -1400,9 +1417,9 @@ static void lock_shrink_worker(struct work_struct *work) scoutfs_inc_counter(sb, lock_shrink_work); - spin_lock(&linfo->lock); - list_splice_init(&linfo->shrink_list, &list); - spin_unlock(&linfo->lock); + spin_lock(&linfo->shrink_wlist.lock); + list_splice_init(&linfo->shrink_wlist.list, &list); + spin_unlock(&linfo->shrink_wlist.lock); list_for_each_entry_safe(lock, tmp, &list, shrink_head) { list_del_init(&lock->shrink_head); @@ -1460,13 +1477,12 @@ static unsigned long lock_scan_objects(struct shrinker *shrink, struct scoutfs_lock *tmp; unsigned long freed = 0; unsigned long nr = sc->nr_to_scan; - bool added = false; scoutfs_inc_counter(sb, lock_scan_objects); spin_lock(&linfo->lock); + spin_lock(&linfo->shrink_wlist.lock); -restart: list_for_each_entry_safe(lock, tmp, &linfo->lru_list, lru_head) { BUG_ON(!lock_idle(lock)); @@ -1478,22 +1494,17 @@ static unsigned long lock_scan_objects(struct shrinker *shrink, __lock_del_lru(linfo, lock); lock->request_pending = 1; - list_add_tail(&lock->shrink_head, &linfo->shrink_list); - added = true; freed++; + list_add_tail(&lock->shrink_head, &linfo->shrink_wlist.list); scoutfs_inc_counter(sb, lock_shrink_attempted); trace_scoutfs_lock_shrink(sb, lock); - - /* could have bazillions of idle locks */ - if (cond_resched_lock(&linfo->lock)) - goto restart; } - spin_unlock(&linfo->lock); + queue_nonempty_work_list(linfo, &linfo->shrink_wlist); - if (added) - queue_work(linfo->workq, &linfo->shrink_work); + spin_unlock(&linfo->shrink_wlist.lock); + spin_unlock(&linfo->lock); trace_scoutfs_lock_shrink_exit(sb, sc->nr_to_scan, freed); return freed; @@ -1537,7 +1548,7 @@ void scoutfs_lock_unmount_begin(struct super_block *sb) if (linfo) { linfo->unmounting = true; - flush_work(&linfo->inv_work); + flush_work(&linfo->inv_wlist.work); } } @@ -1546,7 +1557,7 @@ void scoutfs_lock_flush_invalidate(struct super_block *sb) DECLARE_LOCK_INFO(sb, linfo); if (linfo) - flush_work(&linfo->inv_work); + flush_work(&linfo->inv_wlist.work); } static u64 get_held_lock_refresh_gen(struct super_block *sb, struct scoutfs_key *start) @@ -1615,7 +1626,7 @@ void scoutfs_lock_shutdown(struct super_block *sb) /* stop the shrinker from queueing work */ KC_UNREGISTER_SHRINKER(&linfo->shrinker); - flush_work(&linfo->shrink_work); + flush_work(&linfo->shrink_wlist.work); /* cause current and future lock calls to return errors */ spin_lock(&linfo->lock); @@ -1694,7 +1705,7 @@ void scoutfs_lock_destroy(struct super_block *sb) lock = rb_entry(node, struct scoutfs_lock, node); node = rb_next(node); - list_for_each_entry_safe(ireq, ireq_tmp, &lock->inv_list, head) { + list_for_each_entry_safe(ireq, ireq_tmp, &lock->inv_req_list, head) { list_del_init(&ireq->head); put_lock(linfo, ireq->lock); kfree(ireq); @@ -1703,12 +1714,19 @@ void scoutfs_lock_destroy(struct super_block *sb) lock->request_pending = 0; if (!list_empty(&lock->lru_head)) __lock_del_lru(linfo, lock); + + spin_lock(&linfo->inv_wlist.lock); if (!list_empty(&lock->inv_head)) { list_del_init(&lock->inv_head); lock->invalidate_pending = 0; } + spin_unlock(&linfo->inv_wlist.lock); + + spin_lock(&linfo->shrink_wlist.lock); if (!list_empty(&lock->shrink_head)) list_del_init(&lock->shrink_head); + spin_unlock(&linfo->shrink_wlist.lock); + lock_remove(linfo, lock); lock_free(linfo, lock); } @@ -1737,10 +1755,8 @@ int scoutfs_lock_setup(struct super_block *sb) lock_scan_objects); KC_REGISTER_SHRINKER(&linfo->shrinker, "scoutfs-lock:" SCSBF, SCSB_ARGS(sb)); INIT_LIST_HEAD(&linfo->lru_list); - INIT_WORK(&linfo->inv_work, lock_invalidate_worker); - INIT_LIST_HEAD(&linfo->inv_list); - INIT_WORK(&linfo->shrink_work, lock_shrink_worker); - INIT_LIST_HEAD(&linfo->shrink_list); + init_work_list(&linfo->inv_wlist, lock_invalidate_worker); + init_work_list(&linfo->shrink_wlist, lock_shrink_worker); atomic64_set(&linfo->next_refresh_gen, 0); scoutfs_tseq_tree_init(&linfo->tseq_tree, lock_tseq_show); diff --git a/kmod/src/lock.h b/kmod/src/lock.h index 07908d624..a1869ccda 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -32,7 +32,7 @@ struct scoutfs_lock { invalidate_pending:1; struct list_head inv_head; /* entry in linfo's list of locks with invalidations */ - struct list_head inv_list; /* list of lock's invalidation requests */ + struct list_head inv_req_list; /* list of lock's invalidation requests */ struct list_head shrink_head; spinlock_t cov_list_lock; From 363cc0051991e22db3b2dc5cc77ad2445dafd534 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 20 May 2022 09:45:51 -0700 Subject: [PATCH 2/7] Add per-cluster lock spinlock Add a spinlock to the scoutfs_lock cluster lock which protects its state. This replaces the use of the mount-wide lock_info spinlock. In practice, for now, this largely just mirrors the continued use of the lock_info spinlock because it's still needed to protect the mount-wide structures that are used during put_lock. That'll be fixed in future patches as the use of global structures is reduced. Signed-off-by: Zach Brown --- kmod/src/lock.c | 50 ++++++++++++++++++++++++++++++++++++------------- kmod/src/lock.h | 1 + 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/kmod/src/lock.c b/kmod/src/lock.c index 68ba0f7ba..1be0c9872 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -250,12 +250,9 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) { struct super_block *sb = lock->sb; - assert_spin_locked(&linfo->lock); - trace_scoutfs_lock_free(sb, lock); scoutfs_inc_counter(sb, lock_free); - /* manually checking lock_idle gives identifying line numbers */ BUG_ON(lock->request_pending); BUG_ON(lock->invalidate_pending); BUG_ON(lock->waiters[SCOUTFS_LOCK_READ]); @@ -292,6 +289,7 @@ static struct scoutfs_lock *lock_alloc(struct super_block *sb, scoutfs_inc_counter(sb, lock_alloc); + spin_lock_init(&lock->lock); RB_CLEAR_NODE(&lock->node); RB_CLEAR_NODE(&lock->range_node); INIT_LIST_HEAD(&lock->lru_head); @@ -631,6 +629,8 @@ int scoutfs_lock_grant_response(struct super_block *sb, bug_on_inconsistent_grant_cache(sb, lock, nl->old_mode, nl->new_mode); + spin_lock(&lock->lock); + if (!lock_mode_can_read(nl->old_mode) && lock_mode_can_read(nl->new_mode)) lock->refresh_gen = atomic64_inc_return(&linfo->next_refresh_gen); @@ -639,6 +639,8 @@ int scoutfs_lock_grant_response(struct super_block *sb, lock->write_seq = le64_to_cpu(nl->write_seq); trace_scoutfs_lock_granted(sb, lock); + + spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); @@ -694,19 +696,21 @@ static void lock_invalidate_worker(struct work_struct *work) spin_lock(&linfo->inv_wlist.lock); list_for_each_entry_safe(lock, tmp, &linfo->inv_wlist.list, inv_head) { + spin_lock(&lock->lock); ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; /* wait until incompatible holders unlock */ - if (!lock_counts_match(nl->new_mode, lock->users)) - continue; + if (lock_counts_match(nl->new_mode, lock->users)) { + /* set the new mode, no incompatible users during inval, recov needs old */ + lock->invalidating_mode = lock->mode; + lock->mode = nl->new_mode; - /* set the new mode, no incompatible users during inval, recov needs old */ - lock->invalidating_mode = lock->mode; - lock->mode = nl->new_mode; + /* move everyone that's ready to our private list */ + list_move_tail(&lock->inv_head, &ready); + } - /* move everyone that's ready to our private list */ - list_move_tail(&lock->inv_head, &ready); + spin_unlock(&lock->lock); } spin_unlock(&linfo->inv_wlist.lock); @@ -740,6 +744,7 @@ static void lock_invalidate_worker(struct work_struct *work) spin_lock(&linfo->inv_wlist.lock); list_for_each_entry_safe(lock, tmp, &ready, inv_head) { + spin_lock(&lock->lock); ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); trace_scoutfs_lock_invalidated(sb, lock); @@ -760,6 +765,7 @@ static void lock_invalidate_worker(struct work_struct *work) queue_nonempty_work_list(linfo, &linfo->inv_wlist); } + spin_unlock(&lock->lock); put_lock(linfo, lock); } @@ -806,6 +812,7 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, spin_lock(&linfo->lock); lock = get_lock(sb, &nl->key); if (lock) { + spin_lock(&lock->lock); trace_scoutfs_lock_invalidate_request(sb, lock); ireq->lock = lock; ireq->net_id = net_id; @@ -818,6 +825,7 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, spin_unlock(&linfo->inv_wlist.lock); } list_add_tail(&ireq->head, &lock->inv_req_list); + spin_unlock(&lock->lock); } spin_unlock(&linfo->lock); @@ -863,6 +871,8 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, for (i = 0; lock && i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) { + spin_lock(&lock->lock); + if (lock->invalidating_mode != SCOUTFS_LOCK_NULL) mode = lock->invalidating_mode; else @@ -873,6 +883,8 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, nlr->locks[i].old_mode = mode; nlr->locks[i].new_mode = mode; + spin_unlock(&lock->lock); + node = rb_next(&lock->node); if (node) lock = rb_entry(node, struct scoutfs_lock, node); @@ -895,10 +907,10 @@ static bool lock_wait_cond(struct super_block *sb, struct scoutfs_lock *lock, DECLARE_LOCK_INFO(sb, linfo); bool wake; - spin_lock(&linfo->lock); + spin_lock(&lock->lock); wake = linfo->shutdown || lock_modes_match(lock->mode, mode) || !lock->request_pending; - spin_unlock(&linfo->lock); + spin_unlock(&lock->lock); if (!wake) scoutfs_inc_counter(sb, lock_wait); @@ -960,6 +972,8 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i goto out_unlock; } + spin_lock(&lock->lock); + /* the waiters count is only used by debugging output */ lock_inc_count(lock->waiters, mode); @@ -991,6 +1005,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i should_send = false; } + spin_unlock(&lock->lock); spin_unlock(&linfo->lock); if (should_send) { @@ -1001,6 +1016,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i ret = scoutfs_client_lock_request(sb, &nl); if (ret) { spin_lock(&linfo->lock); + spin_lock(&lock->lock); lock->request_pending = 0; break; } @@ -1018,6 +1034,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i } spin_lock(&linfo->lock); + spin_lock(&lock->lock); if (ret) break; } @@ -1026,6 +1043,8 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (ret == 0) trace_scoutfs_lock_locked(sb, lock); + + spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); @@ -1303,18 +1322,20 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou scoutfs_inc_counter(sb, lock_unlock); spin_lock(&linfo->lock); + spin_lock(&lock->lock); lock_dec_count(lock->users, mode); if (lock_mode_can_write(mode)) lock->dirty_trans_seq = scoutfs_trans_sample_seq(sb); trace_scoutfs_lock_unlock(sb, lock); - wake_up(&lock->waitq); spin_lock(&linfo->inv_wlist.lock); queue_nonempty_work_list(linfo, &linfo->inv_wlist); spin_unlock(&linfo->inv_wlist.lock); + spin_unlock(&lock->lock); + wake_up(&lock->waitq); put_lock(linfo, lock); spin_unlock(&linfo->lock); @@ -1435,8 +1456,11 @@ static void lock_shrink_worker(struct work_struct *work) scoutfs_inc_counter(sb, lock_shrink_aborted); spin_lock(&linfo->lock); + spin_lock(&lock->lock); lock->request_pending = 0; + + spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); diff --git a/kmod/src/lock.h b/kmod/src/lock.h index a1869ccda..a04f02e57 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -19,6 +19,7 @@ struct inode_deletion_lock_data; */ struct scoutfs_lock { struct super_block *sb; + spinlock_t lock; struct scoutfs_key start; struct scoutfs_key end; struct rb_node node; From 4c2a287474f2cce98ebf92757ded1b478e757adf Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Wed, 25 May 2022 11:03:27 -0700 Subject: [PATCH 3/7] Protect clusters locks with refcounts The first pass at managing the cluster lock state machine used a simple global spinlock. It's time to break it up. This adds refcounting to the cluster lock struct. Rather than managing global data structures and individual lock state all under a global spinlock, we use per-structure locks, a lock spinlock, and a lock refcount. Active users of the cluster lock hold a reference. This primarily lets unlock only check global structures once the refcounts say that it's time to remove the lock from the structures. The careful use of the refcount to avoid locks that are being freed during lookup also paves the way for using mostly read-only RCU lookup structures soon. The global LRU is still modified on every lock use, that'll also be removed up in future work. The linfo spinlock is now only used for the LRU and lookup structures. Other uses are removed, which causes more careful use of the finer grained locks that initially just mirrored the use of the linfo spinlock to keep those introduction patches safe. The move from a single global lock to more fine grained locks creates nesting that needs to be managed. Shrinking and recovery in particular need to be careful as they transition from spinlocks used to find cluster locks to getting the cluster lock spinlock. The presence of freeing locks in the lookup indexes means that some callers need to retry if they hit freeing locks. We have to add this protection to recovery iterating over locks by their key value, but it wouldn't have made sense to build that around the lookup rbtree as its going away. It makes sense to use the range tree that we're going to keep using to make sure we don't accidentally introduce locks whose ranges overlap (which would lead to item cache inconsistency). Signed-off-by: Zach Brown Signed-off-by: Chris Kirby --- kmod/src/inode.c | 2 +- kmod/src/lock.c | 475 ++++++++++++++++++++++++++++------------------- kmod/src/lock.h | 1 + 3 files changed, 282 insertions(+), 196 deletions(-) diff --git a/kmod/src/inode.c b/kmod/src/inode.c index 3553c520b..0681850e3 100644 --- a/kmod/src/inode.c +++ b/kmod/src/inode.c @@ -482,7 +482,7 @@ int scoutfs_complete_truncate(struct inode *inode, struct scoutfs_lock *lock) } /* - * If we're changing the file size than the contents of the file are + * If we're changing the file size then the contents of the file are * changing and we increment the data_version. This would prevent * staging because the data_version is per-inode today, not per-extent. * So if there are any offline extents within the new size then we need diff --git a/kmod/src/lock.c b/kmod/src/lock.c index 1be0c9872..a0daa3fdf 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -253,6 +253,7 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) trace_scoutfs_lock_free(sb, lock); scoutfs_inc_counter(sb, lock_free); + BUG_ON(atomic_read(&lock->refcount) != 0); BUG_ON(lock->request_pending); BUG_ON(lock->invalidate_pending); BUG_ON(lock->waiters[SCOUTFS_LOCK_READ]); @@ -289,6 +290,7 @@ static struct scoutfs_lock *lock_alloc(struct super_block *sb, scoutfs_inc_counter(sb, lock_alloc); + atomic_set(&lock->refcount, 0); spin_lock_init(&lock->lock); RB_CLEAR_NODE(&lock->node); RB_CLEAR_NODE(&lock->range_node); @@ -342,26 +344,32 @@ static bool lock_counts_match(int granted, unsigned int *counts) return true; } -/* - * An idle lock has nothing going on. It can be present in the lru and - * can be freed by the final put when it has a null mode. - */ -static bool lock_idle(struct scoutfs_lock *lock) +static void __lock_add_lru(struct lock_info *linfo, struct scoutfs_lock *lock) { - enum scoutfs_lock_mode mode; - - if (lock->request_pending || lock->invalidate_pending) - return false; + assert_spin_locked(&linfo->lock); - for (mode = 0; mode < SCOUTFS_LOCK_NR_MODES; mode++) { - if (lock->waiters[mode] || lock->users[mode]) - return false; + if (list_empty(&lock->lru_head)) { + list_add_tail(&lock->lru_head, &linfo->lru_list); + linfo->lru_nr++; } +} - return true; +static void __lock_del_lru(struct lock_info *linfo, struct scoutfs_lock *lock) +{ + assert_spin_locked(&linfo->lock); + + if (!list_empty(&lock->lru_head)) { + list_del_init(&lock->lru_head); + linfo->lru_nr--; + } } -static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) +/* + * Insert the lock into the tree that tracks their non-overlapping key + * ranges. Warn if we see an attempt to insert a lock that overlaps + * with an existing lock that isn't being freed. + */ +static int insert_lock_range(struct super_block *sb, struct scoutfs_lock *ins) { DECLARE_LOCK_INFO(sb, linfo); struct rb_root *root = &linfo->lock_range_tree; @@ -370,17 +378,28 @@ static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) struct scoutfs_lock *lock; int cmp; + assert_spin_locked(&linfo->lock); + while (*node) { parent = *node; lock = container_of(*node, struct scoutfs_lock, range_node); cmp = scoutfs_key_compare_ranges(&ins->start, &ins->end, &lock->start, &lock->end); - if (WARN_ON_ONCE(cmp == 0)) { - scoutfs_warn(sb, "inserting lock start "SK_FMT" end "SK_FMT" overlaps with existing lock start "SK_FMT" end "SK_FMT, + if (cmp == 0) { + if (WARN_ON_ONCE(atomic_read(&lock->refcount) >= 2)) { + /* Overlap with an in-use lock */ + scoutfs_warn(sb, "inserting lock start "SK_FMT" end "SK_FMT" overlaps with existing lock start "SK_FMT" end "SK_FMT, SK_ARG(&ins->start), SK_ARG(&ins->end), SK_ARG(&lock->start), SK_ARG(&lock->end)); - return false; + return -EINVAL; + } else { + /* + * Overlap with a lock that's being freed. Tell + * the caller to retry. + */ + return -EEXIST; + } } if (cmp < 0) @@ -389,15 +408,47 @@ static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) node = &(*node)->rb_right; } - rb_link_node(&ins->range_node, parent, node); rb_insert_color(&ins->range_node, root); - return true; + return 0; } -/* returns true if the lock was inserted at its start key */ -static bool lock_insert(struct super_block *sb, struct scoutfs_lock *ins) +static struct scoutfs_lock *next_lock_range(struct super_block *sb, struct scoutfs_key *key) +{ + DECLARE_LOCK_INFO(sb, linfo); + struct rb_node *node = linfo->lock_range_tree.rb_node; + struct scoutfs_lock *next = NULL; + struct scoutfs_lock *lock; + int cmp; + + assert_spin_locked(&linfo->lock); + + while (node) { + lock = container_of(node, struct scoutfs_lock, range_node); + + cmp = scoutfs_key_compare(key, &lock->start); + if (cmp < 0) { + next = lock; + node = node->rb_left; + } else if (cmp > 0) { + node = node->rb_right; + } else { + return lock; + } + } + + return next; +} + +/* + * Insert a lock into the lookup hash table, keyed by its start key. If + * another lock is already present then we return EEXIST and the caller + * will retry. The locks are inserted with a 0 refcount so that they + * won't be used until they've been inserted into the range tree without + * overlaps. + */ +static struct scoutfs_lock *lock_insert(struct super_block *sb, struct scoutfs_lock *ins) { DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_lock *lock; @@ -419,24 +470,33 @@ static bool lock_insert(struct super_block *sb, struct scoutfs_lock *ins) else if (cmp > 0) node = &(*node)->rb_right; else - return false; + return lock; } - if (!insert_range_node(sb, ins)) - return false; + if (insert_lock_range(sb, ins) != 0) + return NULL; rb_link_node(&ins->node, parent, node); rb_insert_color(&ins->node, &linfo->lock_tree); - scoutfs_tseq_add(&linfo->tseq_tree, &ins->tseq_entry); + __lock_add_lru(linfo, ins); + atomic_add(2, &ins->refcount); - return true; + return ins; } +/* + * Remove the lock from all the active indexes. The caller has already + * established the exclusive ability to remove by atomically removing + * the 2 refs that were added by insertion. There should be no more + * references once those refs were removed. + */ static void lock_remove(struct lock_info *linfo, struct scoutfs_lock *lock) { assert_spin_locked(&linfo->lock); + WARN_ON_ONCE(atomic_read(&lock->refcount) != 1); + rb_erase(&lock->node, &linfo->lock_tree); RB_CLEAR_NODE(&lock->node); rb_erase(&lock->range_node, &linfo->lock_range_tree); @@ -445,117 +505,129 @@ static void lock_remove(struct lock_info *linfo, struct scoutfs_lock *lock) scoutfs_tseq_del(&linfo->tseq_tree, &lock->tseq_entry); } -static struct scoutfs_lock *lock_lookup(struct super_block *sb, - struct scoutfs_key *start, - struct scoutfs_lock **next) +/* should be in the core */ +static int atomic_add_unless_lessthan(atomic_t *v, int a, int u) { - DECLARE_LOCK_INFO(sb, linfo); - struct rb_node *node = linfo->lock_tree.rb_node; - struct scoutfs_lock *lock; - int cmp; - - assert_spin_locked(&linfo->lock); - - if (next) - *next = NULL; + int c, old; + + c = atomic_read(v); + for (;;) { + if (unlikely(c < (u))) + break; + old = atomic_cmpxchg((v), c, c + (a)); + if (likely(old == c)) + return 1; + c = old; + } - while (node) { - lock = container_of(node, struct scoutfs_lock, node); + return 0; +} - cmp = scoutfs_key_compare(start, &lock->start); - if (cmp < 0) { - if (next) - *next = lock; - node = node->rb_left; - } else if (cmp > 0) { - node = node->rb_right; - } else { - return lock; - } - } +/* + * Get a reference to a lock that's still active and present in the + * lookup index. + */ +static struct scoutfs_lock *get_lock(struct scoutfs_lock *lock) +{ + if (lock && atomic_add_unless_lessthan(&lock->refcount, 1, 2)) + return lock; return NULL; } -static void __lock_del_lru(struct lock_info *linfo, struct scoutfs_lock *lock) +/* + * The caller has a referenced lock and is holding its spinlock. If + * it's null, and we're the only user, and we're able to atomically + * remove the 2 refs for its presence in the lookup index, then we can + * lock the lookup index and remove it. This creates a window where the + * lock is in the index but won't allow new references, lookups and + * insertions need to be careful. + * + * This nests the global linfo spinlock under the per-lock spinlock only + * to keep callers from having to free on the other side of dropping + * the refs and unlocking the lock's spinlock. + */ +static bool try_remove_null_lock(struct lock_info *linfo, struct scoutfs_lock *lock) { - assert_spin_locked(&linfo->lock); + assert_spin_locked(&lock->lock); - if (!list_empty(&lock->lru_head)) { - list_del_init(&lock->lru_head); - linfo->lru_nr--; + if (lock && lock->mode == SCOUTFS_LOCK_NULL && + atomic_cmpxchg(&lock->refcount, 3, 1) == 3) { + spin_lock(&linfo->lock); + lock_remove(linfo, lock); + spin_unlock(&linfo->lock); + return true; } + + return false; } /* - * Get a lock and remove it from the lru. The caller must set state on - * the lock that indicates that it's busy before dropping the lock. - * Then later they call add_lru_or_free once they've cleared that state. + * Search for a lock by its key in the lookup index and return with a + * reference held. */ -static struct scoutfs_lock *get_lock(struct super_block *sb, - struct scoutfs_key *start) +static struct scoutfs_lock *find_lock(struct super_block *sb, struct scoutfs_key *start) { DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_lock *lock; - assert_spin_locked(&linfo->lock); - + spin_lock(&linfo->lock); lock = lock_lookup(sb, start, NULL); - if (lock) - __lock_del_lru(linfo, lock); + if (lock) { + lock = get_lock(lock); + if (lock) { + __lock_del_lru(linfo, lock); + __lock_add_lru(linfo, lock); + } + } + spin_unlock(&linfo->lock); return lock; } /* - * Get a lock, creating it if it doesn't exist. The caller must treat - * the lock like it came from get lock (mark sate, drop lock, clear - * state, put lock). Allocated locks aren't on the lru. + * Find a lock, allocating and inserting a new lock if it doesn't exist. */ -static struct scoutfs_lock *create_lock(struct super_block *sb, - struct scoutfs_key *start, - struct scoutfs_key *end) +static struct scoutfs_lock *find_or_alloc_lock(struct super_block *sb, + struct scoutfs_key *start, struct scoutfs_key *end) { DECLARE_LOCK_INFO(sb, linfo); + struct scoutfs_lock *found; struct scoutfs_lock *lock; + struct scoutfs_lock *ins; - assert_spin_locked(&linfo->lock); +retry: + lock = find_lock(sb, start); + while (!lock) { + ins = lock_alloc(sb, start, end); + if (!ins) + break; - lock = get_lock(sb, start); - if (!lock) { - spin_unlock(&linfo->lock); - lock = lock_alloc(sb, start, end); spin_lock(&linfo->lock); + found = lock_insert(sb, ins); + lock = found ? get_lock(found) : NULL; + spin_unlock(&linfo->lock); - if (lock) { - if (!lock_insert(sb, lock)) { - lock_free(linfo, lock); - lock = get_lock(sb, start); - } + if (lock != ins) + lock_free(linfo, ins); + + if (found && !lock) { + cpu_relax(); + goto retry; } } return lock; } -/* - * The caller is done using a lock and has cleared state that used to - * indicate that the lock wasn't idle. If it really is idle then we - * either free it if it's null or put it back on the lru. - */ -static void put_lock(struct lock_info *linfo,struct scoutfs_lock *lock) +static bool put_lock(struct lock_info *linfo, struct scoutfs_lock *lock) { - assert_spin_locked(&linfo->lock); - - if (lock_idle(lock)) { - if (lock->mode != SCOUTFS_LOCK_NULL) { - list_add_tail(&lock->lru_head, &linfo->lru_list); - linfo->lru_nr++; - } else { - lock_remove(linfo, lock); - lock_free(linfo, lock); - } + if (lock && atomic_dec_and_test(&lock->refcount)) { + lock_free(linfo, lock); + return true; } + + return false; } /* @@ -589,7 +661,7 @@ static void bug_on_inconsistent_grant_cache(struct super_block *sb, if (dirty || (cached && (!lock_mode_can_read(old_mode) || !lock_mode_can_read(new_mode)))) { - scoutfs_err(sb, "granted lock item cache inconsistency, cached %u dirty %u old_mode %d new_mode %d: start "SK_FMT" end "SK_FMT" refresh_gen %llu mode %u waiters: rd %u wr %u wo %u users: rd %u wr %u wo %u", + scoutfs_err(sb, "granted lock item cache inconsistency, cached %u dirty %u old_mode %d new_mode %d: start "SK_FMT" end "SK_FMT" refresh_gen %llu mode %u waiters: rd %u wr %u wo %u users: rd %u wr %u wo %u refcnt %d", cached, dirty, old_mode, new_mode, SK_ARG(&lock->start), SK_ARG(&lock->end), lock->refresh_gen, lock->mode, lock->waiters[SCOUTFS_LOCK_READ], @@ -597,7 +669,8 @@ static void bug_on_inconsistent_grant_cache(struct super_block *sb, lock->waiters[SCOUTFS_LOCK_WRITE_ONLY], lock->users[SCOUTFS_LOCK_READ], lock->users[SCOUTFS_LOCK_WRITE], - lock->users[SCOUTFS_LOCK_WRITE_ONLY]); + lock->users[SCOUTFS_LOCK_WRITE_ONLY], + atomic_read(&lock->refcount)); BUG(); } } @@ -619,10 +692,8 @@ int scoutfs_lock_grant_response(struct super_block *sb, scoutfs_inc_counter(sb, lock_grant_response); - spin_lock(&linfo->lock); - /* lock must already be busy with request_pending */ - lock = lock_lookup(sb, &nl->key, NULL); + lock = find_lock(sb, &nl->key); BUG_ON(!lock); trace_scoutfs_lock_grant_response(sb, lock); BUG_ON(!lock->request_pending); @@ -635,17 +706,17 @@ int scoutfs_lock_grant_response(struct super_block *sb, lock->refresh_gen = atomic64_inc_return(&linfo->next_refresh_gen); lock->request_pending = 0; + put_lock(linfo, lock); lock->mode = nl->new_mode; lock->write_seq = le64_to_cpu(nl->write_seq); trace_scoutfs_lock_granted(sb, lock); + try_remove_null_lock(linfo, lock); spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); - spin_unlock(&linfo->lock); - return 0; } @@ -692,11 +763,17 @@ static void lock_invalidate_worker(struct work_struct *work) scoutfs_inc_counter(sb, lock_invalidate_work); - spin_lock(&linfo->lock); +retry: spin_lock(&linfo->inv_wlist.lock); list_for_each_entry_safe(lock, tmp, &linfo->inv_wlist.list, inv_head) { - spin_lock(&lock->lock); + /* inversion, usually we get the inv spinlock under the lock spinlock */ + if (!spin_trylock(&lock->lock)) { + spin_unlock(&linfo->inv_wlist.lock); + cpu_relax(); + goto retry; + } + ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; @@ -714,15 +791,16 @@ static void lock_invalidate_worker(struct work_struct *work) } spin_unlock(&linfo->inv_wlist.lock); - spin_unlock(&linfo->lock); if (list_empty(&ready)) return; - /* invalidate once the lock is read */ + /* invalidate once the lock is ready */ list_for_each_entry(lock, &ready, inv_head) { + spin_lock(&lock->lock); ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; + spin_unlock(&lock->lock); /* only lock protocol, inv can't call subsystems after shutdown */ if (!linfo->shutdown) { @@ -740,9 +818,6 @@ static void lock_invalidate_worker(struct work_struct *work) } /* and finish all the invalidated locks */ - spin_lock(&linfo->lock); - spin_lock(&linfo->inv_wlist.lock); - list_for_each_entry_safe(lock, tmp, &ready, inv_head) { spin_lock(&lock->lock); ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); @@ -761,16 +836,16 @@ static void lock_invalidate_worker(struct work_struct *work) wake_up(&lock->waitq); } else { /* another request arrived, back on the list and requeue */ + spin_lock(&linfo->inv_wlist.lock); list_move_tail(&lock->inv_head, &linfo->inv_wlist.list); queue_nonempty_work_list(linfo, &linfo->inv_wlist); + spin_unlock(&linfo->inv_wlist.lock); } + try_remove_null_lock(linfo, lock); spin_unlock(&lock->lock); put_lock(linfo, lock); } - - spin_unlock(&linfo->inv_wlist.lock); - spin_unlock(&linfo->lock); } /* @@ -809,8 +884,7 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, goto out; } - spin_lock(&linfo->lock); - lock = get_lock(sb, &nl->key); + lock = find_lock(sb, &nl->key); if (lock) { spin_lock(&lock->lock); trace_scoutfs_lock_invalidate_request(sb, lock); @@ -825,9 +899,10 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, spin_unlock(&linfo->inv_wlist.lock); } list_add_tail(&ireq->head, &lock->inv_req_list); + get_lock(lock); spin_unlock(&lock->lock); + put_lock(linfo, lock); } - spin_unlock(&linfo->lock); out: if (!lock) { @@ -851,9 +926,9 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_net_lock_recover *nlr; enum scoutfs_lock_mode mode; + struct scoutfs_lock *found; struct scoutfs_lock *lock; - struct scoutfs_lock *next; - struct rb_node *node; + struct scoutfs_key pos; int ret; int i; @@ -865,11 +940,24 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, if (!nlr) return -ENOMEM; - spin_lock(&linfo->lock); + pos = *key; - lock = lock_lookup(sb, key, &next) ?: next; + for (i = 0; i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) { - for (i = 0; lock && i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) { + spin_lock(&linfo->lock); + found = next_lock_range(sb, &pos); + lock = found ? get_lock(found) : NULL; + spin_unlock(&linfo->lock); + + /* retry to avoid freeing locks */ + if (found && !lock) { + cpu_relax(); + i--; + continue; + } + + if (lock == NULL) + break; spin_lock(&lock->lock); @@ -883,19 +971,15 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, nlr->locks[i].old_mode = mode; nlr->locks[i].new_mode = mode; - spin_unlock(&lock->lock); + pos = lock->start; + scoutfs_key_inc(&pos); - node = rb_next(&lock->node); - if (node) - lock = rb_entry(node, struct scoutfs_lock, node); - else - lock = NULL; + spin_unlock(&lock->lock); + put_lock(linfo, lock); } nlr->nr = cpu_to_le16(i); - spin_unlock(&linfo->lock); - ret = scoutfs_client_lock_recover_response(sb, net_id, nlr); kfree(nlr); return ret; @@ -963,14 +1047,9 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (WARN_ON_ONCE(scoutfs_trans_held())) return -EDEADLK; - spin_lock(&linfo->lock); - - /* drops and re-acquires lock if it allocates */ - lock = create_lock(sb, start, end); - if (!lock) { - ret = -ENOMEM; - goto out_unlock; - } + lock = find_or_alloc_lock(sb, start, end); + if (!lock) + return -ENOMEM; spin_lock(&lock->lock); @@ -986,6 +1065,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i /* the fast path where we can use the granted mode */ if (lock_modes_match(lock->mode, mode)) { lock_inc_count(lock->users, mode); + get_lock(lock); *ret_lock = lock; ret = 0; break; @@ -1000,13 +1080,13 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (!lock->request_pending) { lock->request_pending = 1; + get_lock(lock); should_send = true; } else { should_send = false; } spin_unlock(&lock->lock); - spin_unlock(&linfo->lock); if (should_send) { nl.key = lock->start; @@ -1015,9 +1095,9 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i ret = scoutfs_client_lock_request(sb, &nl); if (ret) { - spin_lock(&linfo->lock); spin_lock(&lock->lock); lock->request_pending = 0; + put_lock(linfo, lock); break; } scoutfs_inc_counter(sb, lock_grant_request); @@ -1033,7 +1113,6 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i ret = 0; } - spin_lock(&linfo->lock); spin_lock(&lock->lock); if (ret) break; @@ -1048,9 +1127,6 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i wake_up(&lock->waitq); put_lock(linfo, lock); -out_unlock: - spin_unlock(&linfo->lock); - if (ret && ret != -EAGAIN && ret != -ERESTARTSYS) scoutfs_inc_counter(sb, lock_lock_error); @@ -1321,7 +1397,6 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou scoutfs_inc_counter(sb, lock_unlock); - spin_lock(&linfo->lock); spin_lock(&lock->lock); lock_dec_count(lock->users, mode); @@ -1337,8 +1412,6 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); - - spin_unlock(&linfo->lock); } void scoutfs_lock_init_coverage(struct scoutfs_lock_coverage *cov) @@ -1406,7 +1479,7 @@ void scoutfs_lock_del_coverage(struct super_block *sb, * with the access mode and the access key must be in the lock's key * range. * - * This is called by lock holders who's use of the lock must be preventing + * This is called by lock holders whose use of the lock must be preventing * the mode and keys from changing. */ bool scoutfs_lock_protected(struct scoutfs_lock *lock, struct scoutfs_key *key, @@ -1443,6 +1516,8 @@ static void lock_shrink_worker(struct work_struct *work) spin_unlock(&linfo->shrink_wlist.lock); list_for_each_entry_safe(lock, tmp, &list, shrink_head) { + + spin_lock(&lock->lock); list_del_init(&lock->shrink_head); /* unlocked lock access, but should be stable since we queued */ @@ -1450,12 +1525,13 @@ static void lock_shrink_worker(struct work_struct *work) nl.old_mode = lock->mode; nl.new_mode = SCOUTFS_LOCK_NULL; + spin_unlock(&lock->lock); + ret = scoutfs_client_lock_request(sb, &nl); if (ret) { /* oh well, not freeing */ scoutfs_inc_counter(sb, lock_shrink_aborted); - spin_lock(&linfo->lock); spin_lock(&lock->lock); lock->request_pending = 0; @@ -1463,8 +1539,6 @@ static void lock_shrink_worker(struct work_struct *work) spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); - - spin_unlock(&linfo->lock); } } } @@ -1481,11 +1555,17 @@ static unsigned long lock_count_objects(struct shrinker *shrink, } /* - * Start the shrinking process for locks on the lru. If a lock is on - * the lru then it can't have any active users. We don't want to block - * or allocate here so all we do is get the lock, mark it request - * pending, and kick off the work. The work sends a null request and - * eventually the lock is freed by its response. + * Start the shrinking process for locks on the lru. Locks are always + * on the lru so we skip any locks that are being used by any other + * references. Lock put/free defines nesting of the linfo spinlock + * inside the lock's spinlock so we're careful to honor that here. Our + * reference to the lock protects its presence on the lru so we can + * always resume iterating from it after dropping and reacquiring the + * linfo lock. + * + * We don't want to block or allocate here so all we do is get the lock, + * mark it request pending, and kick off the work. The work sends a + * null request and eventually the lock is freed by its response. * * Only a racing lock attempt that isn't matched can prevent the lock * from being freed. It'll block waiting to send its request for its @@ -1497,38 +1577,47 @@ static unsigned long lock_scan_objects(struct shrinker *shrink, { struct lock_info *linfo = KC_SHRINKER_CONTAINER_OF(shrink, struct lock_info); struct super_block *sb = linfo->sb; - struct scoutfs_lock *lock; - struct scoutfs_lock *tmp; + struct scoutfs_lock *lock = NULL; unsigned long freed = 0; unsigned long nr = sc->nr_to_scan; scoutfs_inc_counter(sb, lock_scan_objects); spin_lock(&linfo->lock); - spin_lock(&linfo->shrink_wlist.lock); - list_for_each_entry_safe(lock, tmp, &linfo->lru_list, lru_head) { + lock = list_first_entry_or_null(&linfo->lru_list, struct scoutfs_lock, lru_head); + while (lock && nr > 0) { - BUG_ON(!lock_idle(lock)); - BUG_ON(lock->mode == SCOUTFS_LOCK_NULL); - BUG_ON(!list_empty(&lock->shrink_head)); + if (get_lock(lock)) { + spin_unlock(&linfo->lock); - if (nr-- == 0) - break; + spin_lock(&lock->lock); + if (lock->mode != SCOUTFS_LOCK_NULL && atomic_read(&lock->refcount) == 3) { + lock->request_pending = 1; + spin_lock(&linfo->shrink_wlist.lock); + list_add_tail(&lock->shrink_head, &linfo->shrink_wlist.list); + spin_unlock(&linfo->shrink_wlist.lock); + get_lock(lock); + nr--; + freed++; + } + spin_unlock(&lock->lock); + put_lock(linfo, lock); - __lock_del_lru(linfo, lock); - lock->request_pending = 1; - freed++; - list_add_tail(&lock->shrink_head, &linfo->shrink_wlist.list); + spin_lock(&linfo->lock); + } - scoutfs_inc_counter(sb, lock_shrink_attempted); - trace_scoutfs_lock_shrink(sb, lock); + if (lock->lru_head.next != &linfo->lru_list) + lock = list_next_entry(lock, lru_head); + else + lock = NULL; } - queue_nonempty_work_list(linfo, &linfo->shrink_wlist); + spin_unlock(&linfo->lock); + spin_lock(&linfo->shrink_wlist.lock); + queue_nonempty_work_list(linfo, &linfo->shrink_wlist); spin_unlock(&linfo->shrink_wlist.lock); - spin_unlock(&linfo->lock); trace_scoutfs_lock_shrink_exit(sb, sc->nr_to_scan, freed); return freed; @@ -1594,13 +1683,12 @@ static u64 get_held_lock_refresh_gen(struct super_block *sb, struct scoutfs_key if (!linfo) return 0; - spin_lock(&linfo->lock); - lock = lock_lookup(sb, start, NULL); + lock = find_lock(sb, start); if (lock) { if (lock_mode_can_read(lock->mode)) refresh_gen = lock->refresh_gen; + put_lock(linfo, lock); } - spin_unlock(&linfo->lock); return refresh_gen; } @@ -1689,7 +1777,6 @@ void scoutfs_lock_destroy(struct super_block *sb) trace_scoutfs_lock_destroy(sb, linfo); - /* make sure that no one's actively using locks */ spin_lock(&linfo->lock); for (node = rb_first(&linfo->lock_tree); node; node = rb_next(node)) { @@ -1717,45 +1804,43 @@ void scoutfs_lock_destroy(struct super_block *sb) /* * Usually lock_free is only called once locks are idle but all * locks are idle by definition during shutdown. We need to - * manually update the lock's state to reflect that we've given - * up on pending work that would otherwise prevent free from - * being called (and would trip assertions in our manual calling - * of free). + * drop references for any pending work that we've canceled so + * that we can tear down the locks. */ - spin_lock(&linfo->lock); - node = rb_first(&linfo->lock_tree); while (node) { lock = rb_entry(node, struct scoutfs_lock, node); node = rb_next(node); + atomic_inc(&lock->refcount); + list_for_each_entry_safe(ireq, ireq_tmp, &lock->inv_req_list, head) { list_del_init(&ireq->head); - put_lock(linfo, ireq->lock); + put_lock(linfo, lock); kfree(ireq); } - lock->request_pending = 0; - if (!list_empty(&lock->lru_head)) - __lock_del_lru(linfo, lock); + if (lock->request_pending) { + lock->request_pending = 0; + put_lock(linfo, lock); + } - spin_lock(&linfo->inv_wlist.lock); if (!list_empty(&lock->inv_head)) { list_del_init(&lock->inv_head); lock->invalidate_pending = 0; } - spin_unlock(&linfo->inv_wlist.lock); - spin_lock(&linfo->shrink_wlist.lock); - if (!list_empty(&lock->shrink_head)) + if (!list_empty(&lock->shrink_head)) { list_del_init(&lock->shrink_head); - spin_unlock(&linfo->shrink_wlist.lock); + put_lock(linfo, lock); + } + /* manually forcing removal for non-null locks */ + atomic_sub(2, &lock->refcount); lock_remove(linfo, lock); - lock_free(linfo, lock); - } - spin_unlock(&linfo->lock); + WARN_ON_ONCE(!put_lock(linfo, lock)); + } kfree(linfo); sbi->lock_info = NULL; diff --git a/kmod/src/lock.h b/kmod/src/lock.h index a04f02e57..dc6bd0f1e 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -19,6 +19,7 @@ struct inode_deletion_lock_data; */ struct scoutfs_lock { struct super_block *sb; + atomic_t refcount; spinlock_t lock; struct scoutfs_key start; struct scoutfs_key end; From f2a11d77777895567be82195075c615a5a831c07 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Thu, 26 May 2022 11:24:55 -0700 Subject: [PATCH 4/7] Lookup cluster locks with an RCU hash table The previous work we did introduce the per-lock spinlock and the refcount now make it easy to switch from an rbtree protected by a spinlock to a hash table protected by RCU read critical sections. The cluster lock lookup fast path now only dirties fields in the scoutfs_lock struct itself. We have to be a little careful when inserting so that users can't get references to locks that made it into the hash table but which then had to be removed because they were found to overlap. Freeing is straight forward and we only have to make sure to free the locks in RCU grace periods so that read sections can continue to reference the memory and see the refcount that indicates that the locks are freeing. A few remaining places were using the lookup rbtree to walk all locks, they're converted to using the range tree that we're keeping around to resolve overlapping ranges but which is also handy for iteration that isn't performance sensitive. The LRU still does create contention on the linfo spinlock on every lookup, fixing that is next. Signed-off-by: Zach Brown Signed-off-by: Chris Kirby --- kmod/src/lock.c | 160 ++++++++++++++++++++++++++---------------------- kmod/src/lock.h | 5 +- 2 files changed, 91 insertions(+), 74 deletions(-) diff --git a/kmod/src/lock.c b/kmod/src/lock.c index a0daa3fdf..98ed1359a 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -18,6 +18,7 @@ #include #include #include +#include #include "super.h" #include "lock.h" @@ -84,7 +85,7 @@ struct lock_info { spinlock_t lock; bool shutdown; bool unmounting; - struct rb_root lock_tree; + struct rhashtable ht; struct rb_root lock_range_tree; KC_DEFINE_SHRINKER(shrinker); struct list_head lru_list; @@ -263,7 +264,6 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) BUG_ON(lock->users[SCOUTFS_LOCK_WRITE]); BUG_ON(lock->users[SCOUTFS_LOCK_WRITE_ONLY]); BUG_ON(!linfo->shutdown && lock->mode != SCOUTFS_LOCK_NULL); - BUG_ON(!RB_EMPTY_NODE(&lock->node)); BUG_ON(!RB_EMPTY_NODE(&lock->range_node)); BUG_ON(!list_empty(&lock->lru_head)); BUG_ON(!list_empty(&lock->inv_head)); @@ -271,7 +271,7 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) BUG_ON(!list_empty(&lock->cov_list)); kfree(lock->inode_deletion_data); - kfree(lock); + kfree_rcu(lock, rcu_head); } static struct scoutfs_lock *lock_alloc(struct super_block *sb, @@ -292,7 +292,6 @@ static struct scoutfs_lock *lock_alloc(struct super_block *sb, atomic_set(&lock->refcount, 0); spin_lock_init(&lock->lock); - RB_CLEAR_NODE(&lock->node); RB_CLEAR_NODE(&lock->range_node); INIT_LIST_HEAD(&lock->lru_head); INIT_LIST_HEAD(&lock->inv_head); @@ -441,6 +440,12 @@ static struct scoutfs_lock *next_lock_range(struct super_block *sb, struct scout return next; } +static const struct rhashtable_params lock_ht_params = { + .key_len = member_sizeof(struct scoutfs_lock, start), + .key_offset = offsetof(struct scoutfs_lock, start), + .head_offset = offsetof(struct scoutfs_lock, ht_head), +}; + /* * Insert a lock into the lookup hash table, keyed by its start key. If * another lock is already present then we return EEXIST and the caller @@ -448,41 +453,41 @@ static struct scoutfs_lock *next_lock_range(struct super_block *sb, struct scout * won't be used until they've been inserted into the range tree without * overlaps. */ -static struct scoutfs_lock *lock_insert(struct super_block *sb, struct scoutfs_lock *ins) +static int lock_insert(struct super_block *sb, struct scoutfs_lock *lock) { DECLARE_LOCK_INFO(sb, linfo); - struct scoutfs_lock *lock; - struct rb_node *parent; - struct rb_node **node; - int cmp; - - assert_spin_locked(&linfo->lock); + int ret; - node = &linfo->lock_tree.rb_node; - parent = NULL; - while (*node) { - parent = *node; - lock = container_of(*node, struct scoutfs_lock, node); + if (WARN_ON_ONCE(atomic_read(&lock->refcount) != 0)) + return -EINVAL; - cmp = scoutfs_key_compare(&ins->start, &lock->start); - if (cmp < 0) - node = &(*node)->rb_left; - else if (cmp > 0) - node = &(*node)->rb_right; - else - return lock; +retry: + ret = rhashtable_lookup_insert_fast(&linfo->ht, &lock->ht_head, lock_ht_params); + if (ret < 0) { + if (ret == -EBUSY) { + /* wait for pending rebalance to finish */ + synchronize_rcu(); + goto retry; + } } - if (insert_lock_range(sb, ins) != 0) - return NULL; + if (ret == 0) { + spin_lock(&linfo->lock); + + ret = insert_lock_range(sb, lock); + if (ret == 0) { + scoutfs_tseq_add(&linfo->tseq_tree, &lock->tseq_entry); + __lock_add_lru(linfo, lock); + atomic_add(2, &lock->refcount); + } - rb_link_node(&ins->node, parent, node); - rb_insert_color(&ins->node, &linfo->lock_tree); - scoutfs_tseq_add(&linfo->tseq_tree, &ins->tseq_entry); - __lock_add_lru(linfo, ins); - atomic_add(2, &ins->refcount); + spin_unlock(&linfo->lock); + + if (ret < 0) + rhashtable_remove_fast(&linfo->ht, &lock->ht_head, lock_ht_params); + } - return ins; + return ret; } /* @@ -493,14 +498,15 @@ static struct scoutfs_lock *lock_insert(struct super_block *sb, struct scoutfs_l */ static void lock_remove(struct lock_info *linfo, struct scoutfs_lock *lock) { - assert_spin_locked(&linfo->lock); - WARN_ON_ONCE(atomic_read(&lock->refcount) != 1); - rb_erase(&lock->node, &linfo->lock_tree); - RB_CLEAR_NODE(&lock->node); + rhashtable_remove_fast(&linfo->ht, &lock->ht_head, lock_ht_params); + + spin_lock(&linfo->lock); rb_erase(&lock->range_node, &linfo->lock_range_tree); RB_CLEAR_NODE(&lock->range_node); + __lock_del_lru(linfo, lock); + spin_unlock(&linfo->lock); scoutfs_tseq_del(&linfo->tseq_tree, &lock->tseq_entry); } @@ -553,9 +559,7 @@ static bool try_remove_null_lock(struct lock_info *linfo, struct scoutfs_lock *l if (lock && lock->mode == SCOUTFS_LOCK_NULL && atomic_cmpxchg(&lock->refcount, 3, 1) == 3) { - spin_lock(&linfo->lock); lock_remove(linfo, lock); - spin_unlock(&linfo->lock); return true; } @@ -571,53 +575,56 @@ static struct scoutfs_lock *find_lock(struct super_block *sb, struct scoutfs_key DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_lock *lock; - spin_lock(&linfo->lock); - lock = lock_lookup(sb, start, NULL); - if (lock) { + rcu_read_lock(); + lock = rhashtable_lookup(&linfo->ht, start, lock_ht_params); + if (lock) lock = get_lock(lock); - if (lock) { - __lock_del_lru(linfo, lock); - __lock_add_lru(linfo, lock); - } + rcu_read_unlock(); + + if (lock) { + spin_lock(&linfo->lock); + __lock_del_lru(linfo, lock); + __lock_add_lru(linfo, lock); + spin_unlock(&linfo->lock); } - spin_unlock(&linfo->lock); return lock; } /* * Find a lock, allocating and inserting a new lock if it doesn't exist. + * Concurrent insertion attempts that fail with eexist will retry + * finding the lock. This can return hard errors from insertion. */ -static struct scoutfs_lock *find_or_alloc_lock(struct super_block *sb, - struct scoutfs_key *start, struct scoutfs_key *end) +static int find_or_alloc_lock(struct super_block *sb, struct scoutfs_key *start, + struct scoutfs_key *end, struct scoutfs_lock **lock_ret) { DECLARE_LOCK_INFO(sb, linfo); - struct scoutfs_lock *found; struct scoutfs_lock *lock; struct scoutfs_lock *ins; + int ret = 0; + + while (!(lock = find_lock(sb, start))) { -retry: - lock = find_lock(sb, start); - while (!lock) { ins = lock_alloc(sb, start, end); - if (!ins) + if (!ins) { + ret = -ENOMEM; break; + } - spin_lock(&linfo->lock); - found = lock_insert(sb, ins); - lock = found ? get_lock(found) : NULL; - spin_unlock(&linfo->lock); - - if (lock != ins) + ret = lock_insert(sb, ins); + if (ret < 0) { lock_free(linfo, ins); - - if (found && !lock) { - cpu_relax(); - goto retry; + if (ret != -EEXIST) + break; + ret = 0; } + + cpu_relax(); } - return lock; + *lock_ret = lock; + return ret; } static bool put_lock(struct lock_info *linfo, struct scoutfs_lock *lock) @@ -1047,9 +1054,9 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (WARN_ON_ONCE(scoutfs_trans_held())) return -EDEADLK; - lock = find_or_alloc_lock(sb, start, end); - if (!lock) - return -ENOMEM; + ret = find_or_alloc_lock(sb, start, end, &lock); + if (ret < 0) + return ret; spin_lock(&lock->lock); @@ -1743,8 +1750,8 @@ void scoutfs_lock_shutdown(struct super_block *sb) /* cause current and future lock calls to return errors */ spin_lock(&linfo->lock); linfo->shutdown = true; - for (node = rb_first(&linfo->lock_tree); node; node = rb_next(node)) { - lock = rb_entry(node, struct scoutfs_lock, node); + for (node = rb_first(&linfo->lock_range_tree); node; node = rb_next(node)) { + lock = rb_entry(node, struct scoutfs_lock, range_node); wake_up(&lock->waitq); } spin_unlock(&linfo->lock); @@ -1779,8 +1786,8 @@ void scoutfs_lock_destroy(struct super_block *sb) /* make sure that no one's actively using locks */ spin_lock(&linfo->lock); - for (node = rb_first(&linfo->lock_tree); node; node = rb_next(node)) { - lock = rb_entry(node, struct scoutfs_lock, node); + for (node = rb_first(&linfo->lock_range_tree); node; node = rb_next(node)) { + lock = rb_entry(node, struct scoutfs_lock, range_node); for (mode = 0; mode < SCOUTFS_LOCK_NR_MODES; mode++) { if (lock->waiters[mode] || lock->users[mode]) { @@ -1807,9 +1814,9 @@ void scoutfs_lock_destroy(struct super_block *sb) * drop references for any pending work that we've canceled so * that we can tear down the locks. */ - node = rb_first(&linfo->lock_tree); + node = rb_first(&linfo->lock_range_tree); while (node) { - lock = rb_entry(node, struct scoutfs_lock, node); + lock = rb_entry(node, struct scoutfs_lock, range_node); node = rb_next(node); atomic_inc(&lock->refcount); @@ -1842,6 +1849,8 @@ void scoutfs_lock_destroy(struct super_block *sb) WARN_ON_ONCE(!put_lock(linfo, lock)); } + rhashtable_destroy(&linfo->ht); + kfree(linfo); sbi->lock_info = NULL; } @@ -1858,7 +1867,6 @@ int scoutfs_lock_setup(struct super_block *sb) linfo->sb = sb; spin_lock_init(&linfo->lock); - linfo->lock_tree = RB_ROOT; linfo->lock_range_tree = RB_ROOT; KC_INIT_SHRINKER_FUNCS(&linfo->shrinker, lock_count_objects, lock_scan_objects); @@ -1872,6 +1880,12 @@ int scoutfs_lock_setup(struct super_block *sb) sbi->lock_info = linfo; trace_scoutfs_lock_setup(sb, linfo); + ret = rhashtable_init(&linfo->ht, &lock_ht_params); + if (ret < 0) { + kfree(linfo); + return -ENOMEM; + } + linfo->tseq_dentry = scoutfs_tseq_create("client_locks", sbi->debug_root, &linfo->tseq_tree); diff --git a/kmod/src/lock.h b/kmod/src/lock.h index dc6bd0f1e..970af4ce5 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -1,6 +1,8 @@ #ifndef _SCOUTFS_LOCK_H_ #define _SCOUTFS_LOCK_H_ +#include + #include "key.h" #include "tseq.h" @@ -21,9 +23,10 @@ struct scoutfs_lock { struct super_block *sb; atomic_t refcount; spinlock_t lock; + struct rcu_head rcu_head; struct scoutfs_key start; struct scoutfs_key end; - struct rb_node node; + struct rhash_head ht_head; struct rb_node range_node; u64 refresh_gen; u64 write_seq; From 09fe4fddd40659fcbbfd5bdfca3b2e4fbe0bd8ef Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 3 Jun 2022 14:42:51 -0700 Subject: [PATCH 5/7] Two cluster lock LRU lists with less precision Currently we maintain a single LRU list of cluster locks and every time we acquire a cluster lock we move it to the head of the LRU, creating significant contention acquiring the spinlock that protects the LRU list. This moves to two LRU lists, a list of cluster locks ready to be reclaimed and one for locks that are in active use. We mark locks with which list they're on and only move them to the active list if they're on the reclaim list. We track imbalance between the two lists so that they're always roughly the same size. This removes contention maintaining a precise LRU amongst a set of active cluster locks. It doesn't address contention creating or removing locks, which are already very expensive operations. It also loses strict ordering by access time. Reclaim has to make it through the oldest half of locks before getting to the newer half, though there is no guaranteed ordering amogst the newest half. Signed-off-by: Zach Brown Signed-off-by: Chris Kirby --- kmod/src/lock.c | 196 ++++++++++++++++++++++++++++++++++++------------ kmod/src/lock.h | 1 + 2 files changed, 147 insertions(+), 50 deletions(-) diff --git a/kmod/src/lock.c b/kmod/src/lock.c index 98ed1359a..8f04337af 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -88,7 +88,9 @@ struct lock_info { struct rhashtable ht; struct rb_root lock_range_tree; KC_DEFINE_SHRINKER(shrinker); - struct list_head lru_list; + struct list_head lru_active; + struct list_head lru_reclaim; + long lru_imbalance; unsigned long long lru_nr; struct workqueue_struct *workq; struct work_list inv_wlist; @@ -343,26 +345,104 @@ static bool lock_counts_match(int granted, unsigned int *counts) return true; } -static void __lock_add_lru(struct lock_info *linfo, struct scoutfs_lock *lock) +enum { LOCK_LRU_ACTIVE, LOCK_LRU_RECLAIM }; + +/* + * Restore balance between the active and reclaim lru lists. This is + * called after single operations on the lists could have created + * imbalance so we can always restore balance with one operation. + * + * @lru_imbalance is the difference between the number of entries on the + * active list and the number on the reclaim list. It's positive if + * there are more entries on the active list. + */ +static void lock_lru_rebalance(struct lock_info *linfo) { + struct scoutfs_lock *lock; + assert_spin_locked(&linfo->lock); - if (list_empty(&lock->lru_head)) { - list_add_tail(&lock->lru_head, &linfo->lru_list); - linfo->lru_nr++; + if (linfo->lru_imbalance > 1) { + BUG_ON(list_empty(&linfo->lru_active)); + lock = list_first_entry(&linfo->lru_active, struct scoutfs_lock, lru_head); + list_move_tail(&lock->lru_head, &linfo->lru_reclaim); + lock->lru_on_list = LOCK_LRU_RECLAIM; + linfo->lru_imbalance -= 2; + + } else if (linfo->lru_imbalance < -1) { + BUG_ON(list_empty(&linfo->lru_reclaim)); + lock = list_last_entry(&linfo->lru_reclaim, struct scoutfs_lock, lru_head); + list_move(&lock->lru_head, &linfo->lru_active); + lock->lru_on_list = LOCK_LRU_ACTIVE; + linfo->lru_imbalance += 2; } + + BUG_ON(linfo->lru_imbalance < -1 || linfo->lru_imbalance > 1); } -static void __lock_del_lru(struct lock_info *linfo, struct scoutfs_lock *lock) +static void lock_lru_insert(struct lock_info *linfo, struct scoutfs_lock *lock) { assert_spin_locked(&linfo->lock); + BUG_ON(!list_empty(&lock->lru_head)); + + list_add_tail(&lock->lru_head, &linfo->lru_active); + lock->lru_on_list = LOCK_LRU_ACTIVE; + linfo->lru_imbalance++; + linfo->lru_nr++; + + lock_lru_rebalance(linfo); +} - if (!list_empty(&lock->lru_head)) { - list_del_init(&lock->lru_head); - linfo->lru_nr--; +/* + * As we use a lock we move it to the end of the active list if it was + * on the reclaim list. + * + * This is meant to reduce contention on use of active locks. It + * doesn't maintain a precise ordering of lock access times and only + * ensures that reclaim has to go through the oldest half of locks + * before it can get to any of the newest half. That does mean that the + * first lock in the newest half could well be the most recently used. + * + * The caller only has a reference to the lock. We use an unlocked test + * of which list it's on to avoid acquiring the global lru lock. We + * don't mind if the load is rarely racey. It's always safe to reclaim + * and reacquire locks, so the LRU being rarely a bit off doesn't + * matter. Shrinking costs the most for locks that are actively in use, + * and in that case there are lots of chances for the load to be + * consistent and move a lock to protect it from shrinking. + */ +static void lock_lru_update(struct lock_info *linfo, struct scoutfs_lock *lock) +{ + BUG_ON(atomic_read(&lock->refcount) < 3); + BUG_ON(list_empty(&lock->lru_head)); + + if (lock->lru_on_list != LOCK_LRU_ACTIVE) { + spin_lock(&linfo->lock); + if (lock->lru_on_list != LOCK_LRU_ACTIVE) { + list_move_tail(&lock->lru_head, &linfo->lru_active); + lock->lru_on_list = LOCK_LRU_ACTIVE; + linfo->lru_imbalance += 2; + lock_lru_rebalance(linfo); + } + spin_unlock(&linfo->lock); } } +static void lock_lru_remove(struct lock_info *linfo, struct scoutfs_lock *lock) +{ + assert_spin_locked(&linfo->lock); + BUG_ON(list_empty(&lock->lru_head)); + + list_del_init(&lock->lru_head); + if (lock->lru_on_list == LOCK_LRU_ACTIVE) + linfo->lru_imbalance--; + else + linfo->lru_imbalance++; + linfo->lru_nr--; + + lock_lru_rebalance(linfo); +} + /* * Insert the lock into the tree that tracks their non-overlapping key * ranges. Warn if we see an attempt to insert a lock that overlaps @@ -477,7 +557,7 @@ static int lock_insert(struct super_block *sb, struct scoutfs_lock *lock) ret = insert_lock_range(sb, lock); if (ret == 0) { scoutfs_tseq_add(&linfo->tseq_tree, &lock->tseq_entry); - __lock_add_lru(linfo, lock); + lock_lru_insert(linfo, lock); atomic_add(2, &lock->refcount); } @@ -505,7 +585,7 @@ static void lock_remove(struct lock_info *linfo, struct scoutfs_lock *lock) spin_lock(&linfo->lock); rb_erase(&lock->range_node, &linfo->lock_range_tree); RB_CLEAR_NODE(&lock->range_node); - __lock_del_lru(linfo, lock); + lock_lru_remove(linfo, lock); spin_unlock(&linfo->lock); scoutfs_tseq_del(&linfo->tseq_tree, &lock->tseq_entry); @@ -581,12 +661,8 @@ static struct scoutfs_lock *find_lock(struct super_block *sb, struct scoutfs_key lock = get_lock(lock); rcu_read_unlock(); - if (lock) { - spin_lock(&linfo->lock); - __lock_del_lru(linfo, lock); - __lock_add_lru(linfo, lock); - spin_unlock(&linfo->lock); - } + if (lock) + lock_lru_update(linfo, lock); return lock; } @@ -1562,17 +1638,21 @@ static unsigned long lock_count_objects(struct shrinker *shrink, } /* - * Start the shrinking process for locks on the lru. Locks are always - * on the lru so we skip any locks that are being used by any other - * references. Lock put/free defines nesting of the linfo spinlock - * inside the lock's spinlock so we're careful to honor that here. Our - * reference to the lock protects its presence on the lru so we can - * always resume iterating from it after dropping and reacquiring the - * linfo lock. + * Start the shrinking process for locks on the lru. The reclaim and + * active lists are walked from head to tail. We hand locks off to the + * shrink worker if we can get a reference and acquire the lock's + * spinlock and find it idle. + * + * The global linfo spinlock is ordered under the lock's spinlock as a + * convenience to freeing null locks. We use trylock to check each + * lock and just skip locks when trylock fails. It seemed easier and + * more reliable than stopping and restarting iteration around spinlock + * reacquisition. * - * We don't want to block or allocate here so all we do is get the lock, - * mark it request pending, and kick off the work. The work sends a - * null request and eventually the lock is freed by its response. + * This is only a best effort scan to start freeing locks. We return + * after having queued work that will do the blocking work to kick off + * the null requests, and even then it will be some time before we get + * the responses and free the null locks. * * Only a racing lock attempt that isn't matched can prevent the lock * from being freed. It'll block waiting to send its request for its @@ -1585,39 +1665,53 @@ static unsigned long lock_scan_objects(struct shrinker *shrink, struct lock_info *linfo = KC_SHRINKER_CONTAINER_OF(shrink, struct lock_info); struct super_block *sb = linfo->sb; struct scoutfs_lock *lock = NULL; + struct list_head *list; unsigned long freed = 0; unsigned long nr = sc->nr_to_scan; scoutfs_inc_counter(sb, lock_scan_objects); - spin_lock(&linfo->lock); + if (nr == 0) + goto out; - lock = list_first_entry_or_null(&linfo->lru_list, struct scoutfs_lock, lru_head); - while (lock && nr > 0) { + spin_lock(&linfo->lock); + list = &linfo->lru_reclaim; + list_for_each_entry(lock, list, lru_head) { if (get_lock(lock)) { - spin_unlock(&linfo->lock); - - spin_lock(&lock->lock); - if (lock->mode != SCOUTFS_LOCK_NULL && atomic_read(&lock->refcount) == 3) { - lock->request_pending = 1; - spin_lock(&linfo->shrink_wlist.lock); - list_add_tail(&lock->shrink_head, &linfo->shrink_wlist.list); - spin_unlock(&linfo->shrink_wlist.lock); - get_lock(lock); - nr--; - freed++; + if (spin_trylock(&lock->lock)) { + if (lock->mode != SCOUTFS_LOCK_NULL && + !lock->request_pending && + !lock->invalidate_pending && + atomic_read(&lock->refcount) == 3) { + get_lock(lock); + lock->request_pending = 1; + spin_lock(&linfo->shrink_wlist.lock); + list_add_tail(&lock->shrink_head, + &linfo->shrink_wlist.list); + spin_unlock(&linfo->shrink_wlist.lock); + nr--; + freed++; + } + spin_unlock(&lock->lock); + put_lock(linfo, lock); + } else { + /* + * The put_lock() is intentionally not factored + * out since it confuses the sparse checker. + */ + put_lock(linfo, lock); } - spin_unlock(&lock->lock); - put_lock(linfo, lock); - - spin_lock(&linfo->lock); } - if (lock->lru_head.next != &linfo->lru_list) - lock = list_next_entry(lock, lru_head); - else - lock = NULL; + if (nr == 0) + break; + + /* switch to active at last reclaim entry, _for_each_ stops if active empty */ + if (lock->lru_head.next == &linfo->lru_reclaim) { + list = &linfo->lru_active; + lock = list_first_entry(list, struct scoutfs_lock, lru_head); + } } spin_unlock(&linfo->lock); @@ -1626,6 +1720,7 @@ static unsigned long lock_scan_objects(struct shrinker *shrink, queue_nonempty_work_list(linfo, &linfo->shrink_wlist); spin_unlock(&linfo->shrink_wlist.lock); +out: trace_scoutfs_lock_shrink_exit(sb, sc->nr_to_scan, freed); return freed; } @@ -1871,7 +1966,8 @@ int scoutfs_lock_setup(struct super_block *sb) KC_INIT_SHRINKER_FUNCS(&linfo->shrinker, lock_count_objects, lock_scan_objects); KC_REGISTER_SHRINKER(&linfo->shrinker, "scoutfs-lock:" SCSBF, SCSB_ARGS(sb)); - INIT_LIST_HEAD(&linfo->lru_list); + INIT_LIST_HEAD(&linfo->lru_active); + INIT_LIST_HEAD(&linfo->lru_reclaim); init_work_list(&linfo->inv_wlist, lock_invalidate_worker); init_work_list(&linfo->shrink_wlist, lock_shrink_worker); atomic64_set(&linfo->next_refresh_gen, 0); diff --git a/kmod/src/lock.h b/kmod/src/lock.h index 970af4ce5..e1fc7c93d 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -32,6 +32,7 @@ struct scoutfs_lock { u64 write_seq; u64 dirty_trans_seq; struct list_head lru_head; + int lru_on_list; wait_queue_head_t waitq; unsigned long request_pending:1, invalidate_pending:1; From 6b67aee2e30f568f37c717152f6c62716c610320 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Tue, 7 Jun 2022 11:02:40 -0700 Subject: [PATCH 6/7] Directly queue cluster lock work We had a little helper that scheduled work after testing the list, which required holding the spinlock. This was a little too crude and required scoutfs_unlock() acquiring the invalidate work list spinlock even though it already had the cluster lock spinlock held and could see that there are invalidate requests pending and should queue the invalidation work. Signed-off-by: Zach Brown --- kmod/src/lock.c | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/kmod/src/lock.c b/kmod/src/lock.c index 8f04337af..718bf4c9b 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -126,14 +126,6 @@ static void init_work_list(struct work_list *wlist, work_func_t func) INIT_LIST_HEAD(&wlist->list); } -static void queue_nonempty_work_list(struct lock_info *linfo, struct work_list *wlist) -{ - assert_spin_locked(&wlist->lock); - - if (!list_empty(&wlist->list)) - queue_work(linfo->workq, &wlist->work); -} - /* * Returns true if a lock with the granted mode can satisfy a requested * mode. This is directional. A read lock is satisfied by a write lock @@ -921,7 +913,7 @@ static void lock_invalidate_worker(struct work_struct *work) /* another request arrived, back on the list and requeue */ spin_lock(&linfo->inv_wlist.lock); list_move_tail(&lock->inv_head, &linfo->inv_wlist.list); - queue_nonempty_work_list(linfo, &linfo->inv_wlist); + queue_work(linfo->workq, &linfo->inv_wlist.work); spin_unlock(&linfo->inv_wlist.lock); } @@ -978,7 +970,7 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, spin_lock(&linfo->inv_wlist.lock); list_add_tail(&lock->inv_head, &linfo->inv_wlist.list); lock->invalidate_pending = 1; - queue_nonempty_work_list(linfo, &linfo->inv_wlist); + queue_work(linfo->workq, &linfo->inv_wlist.work); spin_unlock(&linfo->inv_wlist.lock); } list_add_tail(&ireq->head, &lock->inv_req_list); @@ -1488,9 +1480,8 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou trace_scoutfs_lock_unlock(sb, lock); - spin_lock(&linfo->inv_wlist.lock); - queue_nonempty_work_list(linfo, &linfo->inv_wlist); - spin_unlock(&linfo->inv_wlist.lock); + if (!list_empty(&lock->inv_req_list)) + queue_work(linfo->workq, &linfo->inv_wlist.work); spin_unlock(&lock->lock); wake_up(&lock->waitq); @@ -1717,7 +1708,8 @@ static unsigned long lock_scan_objects(struct shrinker *shrink, spin_unlock(&linfo->lock); spin_lock(&linfo->shrink_wlist.lock); - queue_nonempty_work_list(linfo, &linfo->shrink_wlist); + if (!list_empty(&linfo->shrink_wlist.list)) + queue_work(linfo->workq, &linfo->shrink_wlist.work); spin_unlock(&linfo->shrink_wlist.lock); out: From 96049fe4f92cc45916f891c420e731383ab6f5ac Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Wed, 8 Jun 2022 14:06:06 -0700 Subject: [PATCH 7/7] Update tracing with cluster lock changes Signed-off-by: Zach Brown --- kmod/src/scoutfs_trace.h | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 1316418b9..072f05b3b 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -1100,6 +1100,7 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __field(unsigned char, invalidate_pending) __field(int, mode) __field(int, invalidating_mode) + __field(unsigned int, refcount) __field(unsigned int, waiters_cw) __field(unsigned int, waiters_pr) __field(unsigned int, waiters_ex) @@ -1118,6 +1119,7 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __entry->invalidate_pending = lck->invalidate_pending; __entry->mode = lck->mode; __entry->invalidating_mode = lck->invalidating_mode; + __entry->refcount = atomic_read(&lck->refcount); __entry->waiters_pr = lck->waiters[SCOUTFS_LOCK_READ]; __entry->waiters_ex = lck->waiters[SCOUTFS_LOCK_WRITE]; __entry->waiters_cw = lck->waiters[SCOUTFS_LOCK_WRITE_ONLY]; @@ -1125,11 +1127,11 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __entry->users_ex = lck->users[SCOUTFS_LOCK_WRITE]; __entry->users_cw = lck->users[SCOUTFS_LOCK_WRITE_ONLY]; ), - TP_printk(SCSBF" start "SK_FMT" end "SK_FMT" mode %u invmd %u reqp %u invp %u refg %llu wris %llu dts %llu waiters: pr %u ex %u cw %u users: pr %u ex %u cw %u", + TP_printk(SCSBF" start "SK_FMT" end "SK_FMT" mode %u invmd %u reqp %u invp %u refg %llu rfcnt %d wris %llu dts %llu waiters: pr %u ex %u cw %u users: pr %u ex %u cw %u", SCSB_TRACE_ARGS, sk_trace_args(start), sk_trace_args(end), __entry->mode, __entry->invalidating_mode, __entry->request_pending, - __entry->invalidate_pending, __entry->refresh_gen, __entry->write_seq, - __entry->dirty_trans_seq, + __entry->invalidate_pending, __entry->refresh_gen, __entry->refcount, + __entry->write_seq, __entry->dirty_trans_seq, __entry->waiters_pr, __entry->waiters_ex, __entry->waiters_cw, __entry->users_pr, __entry->users_ex, __entry->users_cw) );