diff --git a/kmod/src/inode.c b/kmod/src/inode.c index 3553c520..0681850e 100644 --- a/kmod/src/inode.c +++ b/kmod/src/inode.c @@ -482,7 +482,7 @@ int scoutfs_complete_truncate(struct inode *inode, struct scoutfs_lock *lock) } /* - * If we're changing the file size than the contents of the file are + * If we're changing the file size then the contents of the file are * changing and we increment the data_version. This would prevent * staging because the data_version is per-inode today, not per-extent. * So if there are any offline extents within the new size then we need diff --git a/kmod/src/lock.c b/kmod/src/lock.c index f27fb1e1..718bf4c9 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -18,6 +18,7 @@ #include #include #include +#include #include "super.h" #include "lock.h" @@ -69,6 +70,13 @@ * relative to that lock state we resend. */ +struct work_list { + struct work_struct work; + spinlock_t lock; + struct list_head list; +}; + + /* * allocated per-super, freed on unmount. */ @@ -77,16 +85,16 @@ struct lock_info { spinlock_t lock; bool shutdown; bool unmounting; - struct rb_root lock_tree; + struct rhashtable ht; struct rb_root lock_range_tree; KC_DEFINE_SHRINKER(shrinker); - struct list_head lru_list; + struct list_head lru_active; + struct list_head lru_reclaim; + long lru_imbalance; unsigned long long lru_nr; struct workqueue_struct *workq; - struct work_struct inv_work; - struct list_head inv_list; - struct work_struct shrink_work; - struct list_head shrink_list; + struct work_list inv_wlist; + struct work_list shrink_wlist; atomic64_t next_refresh_gen; struct dentry *tseq_dentry; @@ -111,6 +119,13 @@ static bool lock_mode_can_write(enum scoutfs_lock_mode mode) return mode == SCOUTFS_LOCK_WRITE || mode == SCOUTFS_LOCK_WRITE_ONLY; } +static void init_work_list(struct work_list *wlist, work_func_t func) +{ + spin_lock_init(&wlist->lock); + INIT_WORK(&wlist->work, func); + INIT_LIST_HEAD(&wlist->list); +} + /* * Returns true if a lock with the granted mode can satisfy a requested * mode. This is directional. A read lock is satisfied by a write lock @@ -230,12 +245,10 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) { struct super_block *sb = lock->sb; - assert_spin_locked(&linfo->lock); - trace_scoutfs_lock_free(sb, lock); scoutfs_inc_counter(sb, lock_free); - /* manually checking lock_idle gives identifying line numbers */ + BUG_ON(atomic_read(&lock->refcount) != 0); BUG_ON(lock->request_pending); BUG_ON(lock->invalidate_pending); BUG_ON(lock->waiters[SCOUTFS_LOCK_READ]); @@ -245,7 +258,6 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) BUG_ON(lock->users[SCOUTFS_LOCK_WRITE]); BUG_ON(lock->users[SCOUTFS_LOCK_WRITE_ONLY]); BUG_ON(!linfo->shutdown && lock->mode != SCOUTFS_LOCK_NULL); - BUG_ON(!RB_EMPTY_NODE(&lock->node)); BUG_ON(!RB_EMPTY_NODE(&lock->range_node)); BUG_ON(!list_empty(&lock->lru_head)); BUG_ON(!list_empty(&lock->inv_head)); @@ -253,7 +265,7 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) BUG_ON(!list_empty(&lock->cov_list)); kfree(lock->inode_deletion_data); - kfree(lock); + kfree_rcu(lock, rcu_head); } static struct scoutfs_lock *lock_alloc(struct super_block *sb, @@ -272,11 +284,12 @@ static struct scoutfs_lock *lock_alloc(struct super_block *sb, scoutfs_inc_counter(sb, lock_alloc); - RB_CLEAR_NODE(&lock->node); + atomic_set(&lock->refcount, 0); + spin_lock_init(&lock->lock); RB_CLEAR_NODE(&lock->range_node); INIT_LIST_HEAD(&lock->lru_head); INIT_LIST_HEAD(&lock->inv_head); - INIT_LIST_HEAD(&lock->inv_list); + INIT_LIST_HEAD(&lock->inv_req_list); INIT_LIST_HEAD(&lock->shrink_head); spin_lock_init(&lock->cov_list_lock); INIT_LIST_HEAD(&lock->cov_list); @@ -324,26 +337,110 @@ static bool lock_counts_match(int granted, unsigned int *counts) return true; } +enum { LOCK_LRU_ACTIVE, LOCK_LRU_RECLAIM }; + /* - * An idle lock has nothing going on. It can be present in the lru and - * can be freed by the final put when it has a null mode. + * Restore balance between the active and reclaim lru lists. This is + * called after single operations on the lists could have created + * imbalance so we can always restore balance with one operation. + * + * @lru_imbalance is the difference between the number of entries on the + * active list and the number on the reclaim list. It's positive if + * there are more entries on the active list. */ -static bool lock_idle(struct scoutfs_lock *lock) +static void lock_lru_rebalance(struct lock_info *linfo) { - enum scoutfs_lock_mode mode; + struct scoutfs_lock *lock; - if (lock->request_pending || lock->invalidate_pending) - return false; + assert_spin_locked(&linfo->lock); - for (mode = 0; mode < SCOUTFS_LOCK_NR_MODES; mode++) { - if (lock->waiters[mode] || lock->users[mode]) - return false; + if (linfo->lru_imbalance > 1) { + BUG_ON(list_empty(&linfo->lru_active)); + lock = list_first_entry(&linfo->lru_active, struct scoutfs_lock, lru_head); + list_move_tail(&lock->lru_head, &linfo->lru_reclaim); + lock->lru_on_list = LOCK_LRU_RECLAIM; + linfo->lru_imbalance -= 2; + + } else if (linfo->lru_imbalance < -1) { + BUG_ON(list_empty(&linfo->lru_reclaim)); + lock = list_last_entry(&linfo->lru_reclaim, struct scoutfs_lock, lru_head); + list_move(&lock->lru_head, &linfo->lru_active); + lock->lru_on_list = LOCK_LRU_ACTIVE; + linfo->lru_imbalance += 2; } - return true; + BUG_ON(linfo->lru_imbalance < -1 || linfo->lru_imbalance > 1); +} + +static void lock_lru_insert(struct lock_info *linfo, struct scoutfs_lock *lock) +{ + assert_spin_locked(&linfo->lock); + BUG_ON(!list_empty(&lock->lru_head)); + + list_add_tail(&lock->lru_head, &linfo->lru_active); + lock->lru_on_list = LOCK_LRU_ACTIVE; + linfo->lru_imbalance++; + linfo->lru_nr++; + + lock_lru_rebalance(linfo); +} + +/* + * As we use a lock we move it to the end of the active list if it was + * on the reclaim list. + * + * This is meant to reduce contention on use of active locks. It + * doesn't maintain a precise ordering of lock access times and only + * ensures that reclaim has to go through the oldest half of locks + * before it can get to any of the newest half. That does mean that the + * first lock in the newest half could well be the most recently used. + * + * The caller only has a reference to the lock. We use an unlocked test + * of which list it's on to avoid acquiring the global lru lock. We + * don't mind if the load is rarely racey. It's always safe to reclaim + * and reacquire locks, so the LRU being rarely a bit off doesn't + * matter. Shrinking costs the most for locks that are actively in use, + * and in that case there are lots of chances for the load to be + * consistent and move a lock to protect it from shrinking. + */ +static void lock_lru_update(struct lock_info *linfo, struct scoutfs_lock *lock) +{ + BUG_ON(atomic_read(&lock->refcount) < 3); + BUG_ON(list_empty(&lock->lru_head)); + + if (lock->lru_on_list != LOCK_LRU_ACTIVE) { + spin_lock(&linfo->lock); + if (lock->lru_on_list != LOCK_LRU_ACTIVE) { + list_move_tail(&lock->lru_head, &linfo->lru_active); + lock->lru_on_list = LOCK_LRU_ACTIVE; + linfo->lru_imbalance += 2; + lock_lru_rebalance(linfo); + } + spin_unlock(&linfo->lock); + } +} + +static void lock_lru_remove(struct lock_info *linfo, struct scoutfs_lock *lock) +{ + assert_spin_locked(&linfo->lock); + BUG_ON(list_empty(&lock->lru_head)); + + list_del_init(&lock->lru_head); + if (lock->lru_on_list == LOCK_LRU_ACTIVE) + linfo->lru_imbalance--; + else + linfo->lru_imbalance++; + linfo->lru_nr--; + + lock_lru_rebalance(linfo); } -static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) +/* + * Insert the lock into the tree that tracks their non-overlapping key + * ranges. Warn if we see an attempt to insert a lock that overlaps + * with an existing lock that isn't being freed. + */ +static int insert_lock_range(struct super_block *sb, struct scoutfs_lock *ins) { DECLARE_LOCK_INFO(sb, linfo); struct rb_root *root = &linfo->lock_range_tree; @@ -352,17 +449,28 @@ static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) struct scoutfs_lock *lock; int cmp; + assert_spin_locked(&linfo->lock); + while (*node) { parent = *node; lock = container_of(*node, struct scoutfs_lock, range_node); cmp = scoutfs_key_compare_ranges(&ins->start, &ins->end, &lock->start, &lock->end); - if (WARN_ON_ONCE(cmp == 0)) { - scoutfs_warn(sb, "inserting lock start "SK_FMT" end "SK_FMT" overlaps with existing lock start "SK_FMT" end "SK_FMT, + if (cmp == 0) { + if (WARN_ON_ONCE(atomic_read(&lock->refcount) >= 2)) { + /* Overlap with an in-use lock */ + scoutfs_warn(sb, "inserting lock start "SK_FMT" end "SK_FMT" overlaps with existing lock start "SK_FMT" end "SK_FMT, SK_ARG(&ins->start), SK_ARG(&ins->end), SK_ARG(&lock->start), SK_ARG(&lock->end)); - return false; + return -EINVAL; + } else { + /* + * Overlap with a lock that's being freed. Tell + * the caller to retry. + */ + return -EEXIST; + } } if (cmp < 0) @@ -371,185 +479,230 @@ static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) node = &(*node)->rb_right; } - rb_link_node(&ins->range_node, parent, node); rb_insert_color(&ins->range_node, root); - return true; + return 0; } -/* returns true if the lock was inserted at its start key */ -static bool lock_insert(struct super_block *sb, struct scoutfs_lock *ins) +static struct scoutfs_lock *next_lock_range(struct super_block *sb, struct scoutfs_key *key) { DECLARE_LOCK_INFO(sb, linfo); + struct rb_node *node = linfo->lock_range_tree.rb_node; + struct scoutfs_lock *next = NULL; struct scoutfs_lock *lock; - struct rb_node *parent; - struct rb_node **node; int cmp; assert_spin_locked(&linfo->lock); - node = &linfo->lock_tree.rb_node; - parent = NULL; - while (*node) { - parent = *node; - lock = container_of(*node, struct scoutfs_lock, node); + while (node) { + lock = container_of(node, struct scoutfs_lock, range_node); - cmp = scoutfs_key_compare(&ins->start, &lock->start); - if (cmp < 0) - node = &(*node)->rb_left; - else if (cmp > 0) - node = &(*node)->rb_right; - else - return false; + cmp = scoutfs_key_compare(key, &lock->start); + if (cmp < 0) { + next = lock; + node = node->rb_left; + } else if (cmp > 0) { + node = node->rb_right; + } else { + return lock; + } } - if (!insert_range_node(sb, ins)) - return false; + return next; +} + +static const struct rhashtable_params lock_ht_params = { + .key_len = member_sizeof(struct scoutfs_lock, start), + .key_offset = offsetof(struct scoutfs_lock, start), + .head_offset = offsetof(struct scoutfs_lock, ht_head), +}; - rb_link_node(&ins->node, parent, node); - rb_insert_color(&ins->node, &linfo->lock_tree); +/* + * Insert a lock into the lookup hash table, keyed by its start key. If + * another lock is already present then we return EEXIST and the caller + * will retry. The locks are inserted with a 0 refcount so that they + * won't be used until they've been inserted into the range tree without + * overlaps. + */ +static int lock_insert(struct super_block *sb, struct scoutfs_lock *lock) +{ + DECLARE_LOCK_INFO(sb, linfo); + int ret; - scoutfs_tseq_add(&linfo->tseq_tree, &ins->tseq_entry); + if (WARN_ON_ONCE(atomic_read(&lock->refcount) != 0)) + return -EINVAL; - return true; +retry: + ret = rhashtable_lookup_insert_fast(&linfo->ht, &lock->ht_head, lock_ht_params); + if (ret < 0) { + if (ret == -EBUSY) { + /* wait for pending rebalance to finish */ + synchronize_rcu(); + goto retry; + } + } + + if (ret == 0) { + spin_lock(&linfo->lock); + + ret = insert_lock_range(sb, lock); + if (ret == 0) { + scoutfs_tseq_add(&linfo->tseq_tree, &lock->tseq_entry); + lock_lru_insert(linfo, lock); + atomic_add(2, &lock->refcount); + } + + spin_unlock(&linfo->lock); + + if (ret < 0) + rhashtable_remove_fast(&linfo->ht, &lock->ht_head, lock_ht_params); + } + + return ret; } +/* + * Remove the lock from all the active indexes. The caller has already + * established the exclusive ability to remove by atomically removing + * the 2 refs that were added by insertion. There should be no more + * references once those refs were removed. + */ static void lock_remove(struct lock_info *linfo, struct scoutfs_lock *lock) { - assert_spin_locked(&linfo->lock); + WARN_ON_ONCE(atomic_read(&lock->refcount) != 1); + + rhashtable_remove_fast(&linfo->ht, &lock->ht_head, lock_ht_params); - rb_erase(&lock->node, &linfo->lock_tree); - RB_CLEAR_NODE(&lock->node); + spin_lock(&linfo->lock); rb_erase(&lock->range_node, &linfo->lock_range_tree); RB_CLEAR_NODE(&lock->range_node); + lock_lru_remove(linfo, lock); + spin_unlock(&linfo->lock); scoutfs_tseq_del(&linfo->tseq_tree, &lock->tseq_entry); } -static struct scoutfs_lock *lock_lookup(struct super_block *sb, - struct scoutfs_key *start, - struct scoutfs_lock **next) +/* should be in the core */ +static int atomic_add_unless_lessthan(atomic_t *v, int a, int u) { - DECLARE_LOCK_INFO(sb, linfo); - struct rb_node *node = linfo->lock_tree.rb_node; - struct scoutfs_lock *lock; - int cmp; - - assert_spin_locked(&linfo->lock); - - if (next) - *next = NULL; + int c, old; + + c = atomic_read(v); + for (;;) { + if (unlikely(c < (u))) + break; + old = atomic_cmpxchg((v), c, c + (a)); + if (likely(old == c)) + return 1; + c = old; + } - while (node) { - lock = container_of(node, struct scoutfs_lock, node); + return 0; +} - cmp = scoutfs_key_compare(start, &lock->start); - if (cmp < 0) { - if (next) - *next = lock; - node = node->rb_left; - } else if (cmp > 0) { - node = node->rb_right; - } else { - return lock; - } - } +/* + * Get a reference to a lock that's still active and present in the + * lookup index. + */ +static struct scoutfs_lock *get_lock(struct scoutfs_lock *lock) +{ + if (lock && atomic_add_unless_lessthan(&lock->refcount, 1, 2)) + return lock; return NULL; } -static void __lock_del_lru(struct lock_info *linfo, struct scoutfs_lock *lock) +/* + * The caller has a referenced lock and is holding its spinlock. If + * it's null, and we're the only user, and we're able to atomically + * remove the 2 refs for its presence in the lookup index, then we can + * lock the lookup index and remove it. This creates a window where the + * lock is in the index but won't allow new references, lookups and + * insertions need to be careful. + * + * This nests the global linfo spinlock under the per-lock spinlock only + * to keep callers from having to free on the other side of dropping + * the refs and unlocking the lock's spinlock. + */ +static bool try_remove_null_lock(struct lock_info *linfo, struct scoutfs_lock *lock) { - assert_spin_locked(&linfo->lock); + assert_spin_locked(&lock->lock); - if (!list_empty(&lock->lru_head)) { - list_del_init(&lock->lru_head); - linfo->lru_nr--; + if (lock && lock->mode == SCOUTFS_LOCK_NULL && + atomic_cmpxchg(&lock->refcount, 3, 1) == 3) { + lock_remove(linfo, lock); + return true; } + + return false; } /* - * Get a lock and remove it from the lru. The caller must set state on - * the lock that indicates that it's busy before dropping the lock. - * Then later they call add_lru_or_free once they've cleared that state. + * Search for a lock by its key in the lookup index and return with a + * reference held. */ -static struct scoutfs_lock *get_lock(struct super_block *sb, - struct scoutfs_key *start) +static struct scoutfs_lock *find_lock(struct super_block *sb, struct scoutfs_key *start) { DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_lock *lock; - assert_spin_locked(&linfo->lock); + rcu_read_lock(); + lock = rhashtable_lookup(&linfo->ht, start, lock_ht_params); + if (lock) + lock = get_lock(lock); + rcu_read_unlock(); - lock = lock_lookup(sb, start, NULL); if (lock) - __lock_del_lru(linfo, lock); + lock_lru_update(linfo, lock); return lock; } /* - * Get a lock, creating it if it doesn't exist. The caller must treat - * the lock like it came from get lock (mark sate, drop lock, clear - * state, put lock). Allocated locks aren't on the lru. + * Find a lock, allocating and inserting a new lock if it doesn't exist. + * Concurrent insertion attempts that fail with eexist will retry + * finding the lock. This can return hard errors from insertion. */ -static struct scoutfs_lock *create_lock(struct super_block *sb, - struct scoutfs_key *start, - struct scoutfs_key *end) +static int find_or_alloc_lock(struct super_block *sb, struct scoutfs_key *start, + struct scoutfs_key *end, struct scoutfs_lock **lock_ret) { DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_lock *lock; + struct scoutfs_lock *ins; + int ret = 0; - assert_spin_locked(&linfo->lock); + while (!(lock = find_lock(sb, start))) { - lock = get_lock(sb, start); - if (!lock) { - spin_unlock(&linfo->lock); - lock = lock_alloc(sb, start, end); - spin_lock(&linfo->lock); + ins = lock_alloc(sb, start, end); + if (!ins) { + ret = -ENOMEM; + break; + } - if (lock) { - if (!lock_insert(sb, lock)) { - lock_free(linfo, lock); - lock = get_lock(sb, start); - } + ret = lock_insert(sb, ins); + if (ret < 0) { + lock_free(linfo, ins); + if (ret != -EEXIST) + break; + ret = 0; } + + cpu_relax(); } - return lock; + *lock_ret = lock; + return ret; } -/* - * The caller is done using a lock and has cleared state that used to - * indicate that the lock wasn't idle. If it really is idle then we - * either free it if it's null or put it back on the lru. - */ -static void put_lock(struct lock_info *linfo,struct scoutfs_lock *lock) +static bool put_lock(struct lock_info *linfo, struct scoutfs_lock *lock) { - assert_spin_locked(&linfo->lock); - - if (lock_idle(lock)) { - if (lock->mode != SCOUTFS_LOCK_NULL) { - list_add_tail(&lock->lru_head, &linfo->lru_list); - linfo->lru_nr++; - } else { - lock_remove(linfo, lock); - lock_free(linfo, lock); - } + if (lock && atomic_dec_and_test(&lock->refcount)) { + lock_free(linfo, lock); + return true; } -} -/* - * The caller has made a change (set a lock mode) which can let one of the - * invalidating locks make forward progress. - */ -static void queue_inv_work(struct lock_info *linfo) -{ - assert_spin_locked(&linfo->lock); - - if (!list_empty(&linfo->inv_list)) - queue_work(linfo->workq, &linfo->inv_work); + return false; } /* @@ -583,7 +736,7 @@ static void bug_on_inconsistent_grant_cache(struct super_block *sb, if (dirty || (cached && (!lock_mode_can_read(old_mode) || !lock_mode_can_read(new_mode)))) { - scoutfs_err(sb, "granted lock item cache inconsistency, cached %u dirty %u old_mode %d new_mode %d: start "SK_FMT" end "SK_FMT" refresh_gen %llu mode %u waiters: rd %u wr %u wo %u users: rd %u wr %u wo %u", + scoutfs_err(sb, "granted lock item cache inconsistency, cached %u dirty %u old_mode %d new_mode %d: start "SK_FMT" end "SK_FMT" refresh_gen %llu mode %u waiters: rd %u wr %u wo %u users: rd %u wr %u wo %u refcnt %d", cached, dirty, old_mode, new_mode, SK_ARG(&lock->start), SK_ARG(&lock->end), lock->refresh_gen, lock->mode, lock->waiters[SCOUTFS_LOCK_READ], @@ -591,7 +744,8 @@ static void bug_on_inconsistent_grant_cache(struct super_block *sb, lock->waiters[SCOUTFS_LOCK_WRITE_ONLY], lock->users[SCOUTFS_LOCK_READ], lock->users[SCOUTFS_LOCK_WRITE], - lock->users[SCOUTFS_LOCK_WRITE_ONLY]); + lock->users[SCOUTFS_LOCK_WRITE_ONLY], + atomic_read(&lock->refcount)); BUG(); } } @@ -613,29 +767,31 @@ int scoutfs_lock_grant_response(struct super_block *sb, scoutfs_inc_counter(sb, lock_grant_response); - spin_lock(&linfo->lock); - /* lock must already be busy with request_pending */ - lock = lock_lookup(sb, &nl->key, NULL); + lock = find_lock(sb, &nl->key); BUG_ON(!lock); trace_scoutfs_lock_grant_response(sb, lock); BUG_ON(!lock->request_pending); bug_on_inconsistent_grant_cache(sb, lock, nl->old_mode, nl->new_mode); + spin_lock(&lock->lock); + if (!lock_mode_can_read(nl->old_mode) && lock_mode_can_read(nl->new_mode)) lock->refresh_gen = atomic64_inc_return(&linfo->next_refresh_gen); lock->request_pending = 0; + put_lock(linfo, lock); lock->mode = nl->new_mode; lock->write_seq = le64_to_cpu(nl->write_seq); trace_scoutfs_lock_granted(sb, lock); + + try_remove_null_lock(linfo, lock); + spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); - spin_unlock(&linfo->lock); - return 0; } @@ -671,7 +827,7 @@ struct inv_req { */ static void lock_invalidate_worker(struct work_struct *work) { - struct lock_info *linfo = container_of(work, struct lock_info, inv_work); + struct lock_info *linfo = container_of(work, struct lock_info, inv_wlist.work); struct super_block *sb = linfo->sb; struct scoutfs_net_lock *nl; struct scoutfs_lock *lock; @@ -682,33 +838,44 @@ static void lock_invalidate_worker(struct work_struct *work) scoutfs_inc_counter(sb, lock_invalidate_work); - spin_lock(&linfo->lock); +retry: + spin_lock(&linfo->inv_wlist.lock); + + list_for_each_entry_safe(lock, tmp, &linfo->inv_wlist.list, inv_head) { + /* inversion, usually we get the inv spinlock under the lock spinlock */ + if (!spin_trylock(&lock->lock)) { + spin_unlock(&linfo->inv_wlist.lock); + cpu_relax(); + goto retry; + } - list_for_each_entry_safe(lock, tmp, &linfo->inv_list, inv_head) { - ireq = list_first_entry(&lock->inv_list, struct inv_req, head); + ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; /* wait until incompatible holders unlock */ - if (!lock_counts_match(nl->new_mode, lock->users)) - continue; + if (lock_counts_match(nl->new_mode, lock->users)) { + /* set the new mode, no incompatible users during inval, recov needs old */ + lock->invalidating_mode = lock->mode; + lock->mode = nl->new_mode; - /* set the new mode, no incompatible users during inval, recov needs old */ - lock->invalidating_mode = lock->mode; - lock->mode = nl->new_mode; + /* move everyone that's ready to our private list */ + list_move_tail(&lock->inv_head, &ready); + } - /* move everyone that's ready to our private list */ - list_move_tail(&lock->inv_head, &ready); + spin_unlock(&lock->lock); } - spin_unlock(&linfo->lock); + spin_unlock(&linfo->inv_wlist.lock); if (list_empty(&ready)) return; - /* invalidate once the lock is read */ + /* invalidate once the lock is ready */ list_for_each_entry(lock, &ready, inv_head) { - ireq = list_first_entry(&lock->inv_list, struct inv_req, head); + spin_lock(&lock->lock); + ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; + spin_unlock(&lock->lock); /* only lock protocol, inv can't call subsystems after shutdown */ if (!linfo->shutdown) { @@ -726,10 +893,9 @@ static void lock_invalidate_worker(struct work_struct *work) } /* and finish all the invalidated locks */ - spin_lock(&linfo->lock); - list_for_each_entry_safe(lock, tmp, &ready, inv_head) { - ireq = list_first_entry(&lock->inv_list, struct inv_req, head); + spin_lock(&lock->lock); + ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); trace_scoutfs_lock_invalidated(sb, lock); @@ -738,21 +904,23 @@ static void lock_invalidate_worker(struct work_struct *work) lock->invalidating_mode = SCOUTFS_LOCK_NULL; - if (list_empty(&lock->inv_list)) { + if (list_empty(&lock->inv_req_list)) { /* finish if another request didn't arrive */ list_del_init(&lock->inv_head); lock->invalidate_pending = 0; wake_up(&lock->waitq); } else { /* another request arrived, back on the list and requeue */ - list_move_tail(&lock->inv_head, &linfo->inv_list); - queue_inv_work(linfo); + spin_lock(&linfo->inv_wlist.lock); + list_move_tail(&lock->inv_head, &linfo->inv_wlist.list); + queue_work(linfo->workq, &linfo->inv_wlist.work); + spin_unlock(&linfo->inv_wlist.lock); } + try_remove_null_lock(linfo, lock); + spin_unlock(&lock->lock); put_lock(linfo, lock); } - - spin_unlock(&linfo->lock); } /* @@ -791,21 +959,25 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, goto out; } - spin_lock(&linfo->lock); - lock = get_lock(sb, &nl->key); + lock = find_lock(sb, &nl->key); if (lock) { + spin_lock(&lock->lock); trace_scoutfs_lock_invalidate_request(sb, lock); ireq->lock = lock; ireq->net_id = net_id; ireq->nl = *nl; - if (list_empty(&lock->inv_list)) { - list_add_tail(&lock->inv_head, &linfo->inv_list); + if (list_empty(&lock->inv_req_list)) { + spin_lock(&linfo->inv_wlist.lock); + list_add_tail(&lock->inv_head, &linfo->inv_wlist.list); lock->invalidate_pending = 1; - queue_inv_work(linfo); + queue_work(linfo->workq, &linfo->inv_wlist.work); + spin_unlock(&linfo->inv_wlist.lock); } - list_add_tail(&ireq->head, &lock->inv_list); + list_add_tail(&ireq->head, &lock->inv_req_list); + get_lock(lock); + spin_unlock(&lock->lock); + put_lock(linfo, lock); } - spin_unlock(&linfo->lock); out: if (!lock) { @@ -829,9 +1001,9 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_net_lock_recover *nlr; enum scoutfs_lock_mode mode; + struct scoutfs_lock *found; struct scoutfs_lock *lock; - struct scoutfs_lock *next; - struct rb_node *node; + struct scoutfs_key pos; int ret; int i; @@ -843,11 +1015,26 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, if (!nlr) return -ENOMEM; - spin_lock(&linfo->lock); + pos = *key; + + for (i = 0; i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) { - lock = lock_lookup(sb, key, &next) ?: next; + spin_lock(&linfo->lock); + found = next_lock_range(sb, &pos); + lock = found ? get_lock(found) : NULL; + spin_unlock(&linfo->lock); - for (i = 0; lock && i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) { + /* retry to avoid freeing locks */ + if (found && !lock) { + cpu_relax(); + i--; + continue; + } + + if (lock == NULL) + break; + + spin_lock(&lock->lock); if (lock->invalidating_mode != SCOUTFS_LOCK_NULL) mode = lock->invalidating_mode; @@ -859,17 +1046,15 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, nlr->locks[i].old_mode = mode; nlr->locks[i].new_mode = mode; - node = rb_next(&lock->node); - if (node) - lock = rb_entry(node, struct scoutfs_lock, node); - else - lock = NULL; + pos = lock->start; + scoutfs_key_inc(&pos); + + spin_unlock(&lock->lock); + put_lock(linfo, lock); } nlr->nr = cpu_to_le16(i); - spin_unlock(&linfo->lock); - ret = scoutfs_client_lock_recover_response(sb, net_id, nlr); kfree(nlr); return ret; @@ -881,10 +1066,10 @@ static bool lock_wait_cond(struct super_block *sb, struct scoutfs_lock *lock, DECLARE_LOCK_INFO(sb, linfo); bool wake; - spin_lock(&linfo->lock); + spin_lock(&lock->lock); wake = linfo->shutdown || lock_modes_match(lock->mode, mode) || !lock->request_pending; - spin_unlock(&linfo->lock); + spin_unlock(&lock->lock); if (!wake) scoutfs_inc_counter(sb, lock_wait); @@ -937,14 +1122,11 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (WARN_ON_ONCE(scoutfs_trans_held())) return -EDEADLK; - spin_lock(&linfo->lock); + ret = find_or_alloc_lock(sb, start, end, &lock); + if (ret < 0) + return ret; - /* drops and re-acquires lock if it allocates */ - lock = create_lock(sb, start, end); - if (!lock) { - ret = -ENOMEM; - goto out_unlock; - } + spin_lock(&lock->lock); /* the waiters count is only used by debugging output */ lock_inc_count(lock->waiters, mode); @@ -958,6 +1140,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i /* the fast path where we can use the granted mode */ if (lock_modes_match(lock->mode, mode)) { lock_inc_count(lock->users, mode); + get_lock(lock); *ret_lock = lock; ret = 0; break; @@ -972,12 +1155,13 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (!lock->request_pending) { lock->request_pending = 1; + get_lock(lock); should_send = true; } else { should_send = false; } - spin_unlock(&linfo->lock); + spin_unlock(&lock->lock); if (should_send) { nl.key = lock->start; @@ -986,8 +1170,9 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i ret = scoutfs_client_lock_request(sb, &nl); if (ret) { - spin_lock(&linfo->lock); + spin_lock(&lock->lock); lock->request_pending = 0; + put_lock(linfo, lock); break; } scoutfs_inc_counter(sb, lock_grant_request); @@ -1003,7 +1188,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i ret = 0; } - spin_lock(&linfo->lock); + spin_lock(&lock->lock); if (ret) break; } @@ -1012,12 +1197,11 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (ret == 0) trace_scoutfs_lock_locked(sb, lock); + + spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); -out_unlock: - spin_unlock(&linfo->lock); - if (ret && ret != -EAGAIN && ret != -ERESTARTSYS) scoutfs_inc_counter(sb, lock_lock_error); @@ -1288,18 +1472,20 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou scoutfs_inc_counter(sb, lock_unlock); - spin_lock(&linfo->lock); + spin_lock(&lock->lock); lock_dec_count(lock->users, mode); if (lock_mode_can_write(mode)) lock->dirty_trans_seq = scoutfs_trans_sample_seq(sb); trace_scoutfs_lock_unlock(sb, lock); + + if (!list_empty(&lock->inv_req_list)) + queue_work(linfo->workq, &linfo->inv_wlist.work); + + spin_unlock(&lock->lock); wake_up(&lock->waitq); - queue_inv_work(linfo); put_lock(linfo, lock); - - spin_unlock(&linfo->lock); } void scoutfs_lock_init_coverage(struct scoutfs_lock_coverage *cov) @@ -1367,7 +1553,7 @@ void scoutfs_lock_del_coverage(struct super_block *sb, * with the access mode and the access key must be in the lock's key * range. * - * This is called by lock holders who's use of the lock must be preventing + * This is called by lock holders whose use of the lock must be preventing * the mode and keys from changing. */ bool scoutfs_lock_protected(struct scoutfs_lock *lock, struct scoutfs_key *key, @@ -1389,8 +1575,7 @@ bool scoutfs_lock_protected(struct scoutfs_lock *lock, struct scoutfs_key *key, */ static void lock_shrink_worker(struct work_struct *work) { - struct lock_info *linfo = container_of(work, struct lock_info, - shrink_work); + struct lock_info *linfo = container_of(work, struct lock_info, shrink_wlist.work); struct super_block *sb = linfo->sb; struct scoutfs_net_lock nl; struct scoutfs_lock *lock; @@ -1400,11 +1585,13 @@ static void lock_shrink_worker(struct work_struct *work) scoutfs_inc_counter(sb, lock_shrink_work); - spin_lock(&linfo->lock); - list_splice_init(&linfo->shrink_list, &list); - spin_unlock(&linfo->lock); + spin_lock(&linfo->shrink_wlist.lock); + list_splice_init(&linfo->shrink_wlist.list, &list); + spin_unlock(&linfo->shrink_wlist.lock); list_for_each_entry_safe(lock, tmp, &list, shrink_head) { + + spin_lock(&lock->lock); list_del_init(&lock->shrink_head); /* unlocked lock access, but should be stable since we queued */ @@ -1412,18 +1599,20 @@ static void lock_shrink_worker(struct work_struct *work) nl.old_mode = lock->mode; nl.new_mode = SCOUTFS_LOCK_NULL; + spin_unlock(&lock->lock); + ret = scoutfs_client_lock_request(sb, &nl); if (ret) { /* oh well, not freeing */ scoutfs_inc_counter(sb, lock_shrink_aborted); - spin_lock(&linfo->lock); + spin_lock(&lock->lock); lock->request_pending = 0; + + spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); - - spin_unlock(&linfo->lock); } } } @@ -1440,11 +1629,21 @@ static unsigned long lock_count_objects(struct shrinker *shrink, } /* - * Start the shrinking process for locks on the lru. If a lock is on - * the lru then it can't have any active users. We don't want to block - * or allocate here so all we do is get the lock, mark it request - * pending, and kick off the work. The work sends a null request and - * eventually the lock is freed by its response. + * Start the shrinking process for locks on the lru. The reclaim and + * active lists are walked from head to tail. We hand locks off to the + * shrink worker if we can get a reference and acquire the lock's + * spinlock and find it idle. + * + * The global linfo spinlock is ordered under the lock's spinlock as a + * convenience to freeing null locks. We use trylock to check each + * lock and just skip locks when trylock fails. It seemed easier and + * more reliable than stopping and restarting iteration around spinlock + * reacquisition. + * + * This is only a best effort scan to start freeing locks. We return + * after having queued work that will do the blocking work to kick off + * the null requests, and even then it will be some time before we get + * the responses and free the null locks. * * Only a racing lock attempt that isn't matched can prevent the lock * from being freed. It'll block waiting to send its request for its @@ -1456,45 +1655,64 @@ static unsigned long lock_scan_objects(struct shrinker *shrink, { struct lock_info *linfo = KC_SHRINKER_CONTAINER_OF(shrink, struct lock_info); struct super_block *sb = linfo->sb; - struct scoutfs_lock *lock; - struct scoutfs_lock *tmp; + struct scoutfs_lock *lock = NULL; + struct list_head *list; unsigned long freed = 0; unsigned long nr = sc->nr_to_scan; - bool added = false; scoutfs_inc_counter(sb, lock_scan_objects); - spin_lock(&linfo->lock); + if (nr == 0) + goto out; -restart: - list_for_each_entry_safe(lock, tmp, &linfo->lru_list, lru_head) { + spin_lock(&linfo->lock); - BUG_ON(!lock_idle(lock)); - BUG_ON(lock->mode == SCOUTFS_LOCK_NULL); - BUG_ON(!list_empty(&lock->shrink_head)); + list = &linfo->lru_reclaim; + list_for_each_entry(lock, list, lru_head) { + if (get_lock(lock)) { + if (spin_trylock(&lock->lock)) { + if (lock->mode != SCOUTFS_LOCK_NULL && + !lock->request_pending && + !lock->invalidate_pending && + atomic_read(&lock->refcount) == 3) { + get_lock(lock); + lock->request_pending = 1; + spin_lock(&linfo->shrink_wlist.lock); + list_add_tail(&lock->shrink_head, + &linfo->shrink_wlist.list); + spin_unlock(&linfo->shrink_wlist.lock); + nr--; + freed++; + } + spin_unlock(&lock->lock); + put_lock(linfo, lock); + } else { + /* + * The put_lock() is intentionally not factored + * out since it confuses the sparse checker. + */ + put_lock(linfo, lock); + } + } - if (nr-- == 0) + if (nr == 0) break; - __lock_del_lru(linfo, lock); - lock->request_pending = 1; - list_add_tail(&lock->shrink_head, &linfo->shrink_list); - added = true; - freed++; - - scoutfs_inc_counter(sb, lock_shrink_attempted); - trace_scoutfs_lock_shrink(sb, lock); - - /* could have bazillions of idle locks */ - if (cond_resched_lock(&linfo->lock)) - goto restart; + /* switch to active at last reclaim entry, _for_each_ stops if active empty */ + if (lock->lru_head.next == &linfo->lru_reclaim) { + list = &linfo->lru_active; + lock = list_first_entry(list, struct scoutfs_lock, lru_head); + } } spin_unlock(&linfo->lock); - if (added) - queue_work(linfo->workq, &linfo->shrink_work); + spin_lock(&linfo->shrink_wlist.lock); + if (!list_empty(&linfo->shrink_wlist.list)) + queue_work(linfo->workq, &linfo->shrink_wlist.work); + spin_unlock(&linfo->shrink_wlist.lock); +out: trace_scoutfs_lock_shrink_exit(sb, sc->nr_to_scan, freed); return freed; } @@ -1537,7 +1755,7 @@ void scoutfs_lock_unmount_begin(struct super_block *sb) if (linfo) { linfo->unmounting = true; - flush_work(&linfo->inv_work); + flush_work(&linfo->inv_wlist.work); } } @@ -1546,7 +1764,7 @@ void scoutfs_lock_flush_invalidate(struct super_block *sb) DECLARE_LOCK_INFO(sb, linfo); if (linfo) - flush_work(&linfo->inv_work); + flush_work(&linfo->inv_wlist.work); } static u64 get_held_lock_refresh_gen(struct super_block *sb, struct scoutfs_key *start) @@ -1559,13 +1777,12 @@ static u64 get_held_lock_refresh_gen(struct super_block *sb, struct scoutfs_key if (!linfo) return 0; - spin_lock(&linfo->lock); - lock = lock_lookup(sb, start, NULL); + lock = find_lock(sb, start); if (lock) { if (lock_mode_can_read(lock->mode)) refresh_gen = lock->refresh_gen; + put_lock(linfo, lock); } - spin_unlock(&linfo->lock); return refresh_gen; } @@ -1615,13 +1832,13 @@ void scoutfs_lock_shutdown(struct super_block *sb) /* stop the shrinker from queueing work */ KC_UNREGISTER_SHRINKER(&linfo->shrinker); - flush_work(&linfo->shrink_work); + flush_work(&linfo->shrink_wlist.work); /* cause current and future lock calls to return errors */ spin_lock(&linfo->lock); linfo->shutdown = true; - for (node = rb_first(&linfo->lock_tree); node; node = rb_next(node)) { - lock = rb_entry(node, struct scoutfs_lock, node); + for (node = rb_first(&linfo->lock_range_tree); node; node = rb_next(node)) { + lock = rb_entry(node, struct scoutfs_lock, range_node); wake_up(&lock->waitq); } spin_unlock(&linfo->lock); @@ -1654,11 +1871,10 @@ void scoutfs_lock_destroy(struct super_block *sb) trace_scoutfs_lock_destroy(sb, linfo); - /* make sure that no one's actively using locks */ spin_lock(&linfo->lock); - for (node = rb_first(&linfo->lock_tree); node; node = rb_next(node)) { - lock = rb_entry(node, struct scoutfs_lock, node); + for (node = rb_first(&linfo->lock_range_tree); node; node = rb_next(node)) { + lock = rb_entry(node, struct scoutfs_lock, range_node); for (mode = 0; mode < SCOUTFS_LOCK_NR_MODES; mode++) { if (lock->waiters[mode] || lock->users[mode]) { @@ -1682,38 +1898,45 @@ void scoutfs_lock_destroy(struct super_block *sb) /* * Usually lock_free is only called once locks are idle but all * locks are idle by definition during shutdown. We need to - * manually update the lock's state to reflect that we've given - * up on pending work that would otherwise prevent free from - * being called (and would trip assertions in our manual calling - * of free). + * drop references for any pending work that we've canceled so + * that we can tear down the locks. */ - spin_lock(&linfo->lock); - - node = rb_first(&linfo->lock_tree); + node = rb_first(&linfo->lock_range_tree); while (node) { - lock = rb_entry(node, struct scoutfs_lock, node); + lock = rb_entry(node, struct scoutfs_lock, range_node); node = rb_next(node); - list_for_each_entry_safe(ireq, ireq_tmp, &lock->inv_list, head) { + atomic_inc(&lock->refcount); + + list_for_each_entry_safe(ireq, ireq_tmp, &lock->inv_req_list, head) { list_del_init(&ireq->head); - put_lock(linfo, ireq->lock); + put_lock(linfo, lock); kfree(ireq); } - lock->request_pending = 0; - if (!list_empty(&lock->lru_head)) - __lock_del_lru(linfo, lock); + if (lock->request_pending) { + lock->request_pending = 0; + put_lock(linfo, lock); + } + if (!list_empty(&lock->inv_head)) { list_del_init(&lock->inv_head); lock->invalidate_pending = 0; } - if (!list_empty(&lock->shrink_head)) + + if (!list_empty(&lock->shrink_head)) { list_del_init(&lock->shrink_head); + put_lock(linfo, lock); + } + + /* manually forcing removal for non-null locks */ + atomic_sub(2, &lock->refcount); lock_remove(linfo, lock); - lock_free(linfo, lock); + + WARN_ON_ONCE(!put_lock(linfo, lock)); } - spin_unlock(&linfo->lock); + rhashtable_destroy(&linfo->ht); kfree(linfo); sbi->lock_info = NULL; @@ -1731,22 +1954,26 @@ int scoutfs_lock_setup(struct super_block *sb) linfo->sb = sb; spin_lock_init(&linfo->lock); - linfo->lock_tree = RB_ROOT; linfo->lock_range_tree = RB_ROOT; KC_INIT_SHRINKER_FUNCS(&linfo->shrinker, lock_count_objects, lock_scan_objects); KC_REGISTER_SHRINKER(&linfo->shrinker, "scoutfs-lock:" SCSBF, SCSB_ARGS(sb)); - INIT_LIST_HEAD(&linfo->lru_list); - INIT_WORK(&linfo->inv_work, lock_invalidate_worker); - INIT_LIST_HEAD(&linfo->inv_list); - INIT_WORK(&linfo->shrink_work, lock_shrink_worker); - INIT_LIST_HEAD(&linfo->shrink_list); + INIT_LIST_HEAD(&linfo->lru_active); + INIT_LIST_HEAD(&linfo->lru_reclaim); + init_work_list(&linfo->inv_wlist, lock_invalidate_worker); + init_work_list(&linfo->shrink_wlist, lock_shrink_worker); atomic64_set(&linfo->next_refresh_gen, 0); scoutfs_tseq_tree_init(&linfo->tseq_tree, lock_tseq_show); sbi->lock_info = linfo; trace_scoutfs_lock_setup(sb, linfo); + ret = rhashtable_init(&linfo->ht, &lock_ht_params); + if (ret < 0) { + kfree(linfo); + return -ENOMEM; + } + linfo->tseq_dentry = scoutfs_tseq_create("client_locks", sbi->debug_root, &linfo->tseq_tree); diff --git a/kmod/src/lock.h b/kmod/src/lock.h index 07908d62..e1fc7c93 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -1,6 +1,8 @@ #ifndef _SCOUTFS_LOCK_H_ #define _SCOUTFS_LOCK_H_ +#include + #include "key.h" #include "tseq.h" @@ -19,20 +21,24 @@ struct inode_deletion_lock_data; */ struct scoutfs_lock { struct super_block *sb; + atomic_t refcount; + spinlock_t lock; + struct rcu_head rcu_head; struct scoutfs_key start; struct scoutfs_key end; - struct rb_node node; + struct rhash_head ht_head; struct rb_node range_node; u64 refresh_gen; u64 write_seq; u64 dirty_trans_seq; struct list_head lru_head; + int lru_on_list; wait_queue_head_t waitq; unsigned long request_pending:1, invalidate_pending:1; struct list_head inv_head; /* entry in linfo's list of locks with invalidations */ - struct list_head inv_list; /* list of lock's invalidation requests */ + struct list_head inv_req_list; /* list of lock's invalidation requests */ struct list_head shrink_head; spinlock_t cov_list_lock; diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 1316418b..072f05b3 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -1100,6 +1100,7 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __field(unsigned char, invalidate_pending) __field(int, mode) __field(int, invalidating_mode) + __field(unsigned int, refcount) __field(unsigned int, waiters_cw) __field(unsigned int, waiters_pr) __field(unsigned int, waiters_ex) @@ -1118,6 +1119,7 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __entry->invalidate_pending = lck->invalidate_pending; __entry->mode = lck->mode; __entry->invalidating_mode = lck->invalidating_mode; + __entry->refcount = atomic_read(&lck->refcount); __entry->waiters_pr = lck->waiters[SCOUTFS_LOCK_READ]; __entry->waiters_ex = lck->waiters[SCOUTFS_LOCK_WRITE]; __entry->waiters_cw = lck->waiters[SCOUTFS_LOCK_WRITE_ONLY]; @@ -1125,11 +1127,11 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __entry->users_ex = lck->users[SCOUTFS_LOCK_WRITE]; __entry->users_cw = lck->users[SCOUTFS_LOCK_WRITE_ONLY]; ), - TP_printk(SCSBF" start "SK_FMT" end "SK_FMT" mode %u invmd %u reqp %u invp %u refg %llu wris %llu dts %llu waiters: pr %u ex %u cw %u users: pr %u ex %u cw %u", + TP_printk(SCSBF" start "SK_FMT" end "SK_FMT" mode %u invmd %u reqp %u invp %u refg %llu rfcnt %d wris %llu dts %llu waiters: pr %u ex %u cw %u users: pr %u ex %u cw %u", SCSB_TRACE_ARGS, sk_trace_args(start), sk_trace_args(end), __entry->mode, __entry->invalidating_mode, __entry->request_pending, - __entry->invalidate_pending, __entry->refresh_gen, __entry->write_seq, - __entry->dirty_trans_seq, + __entry->invalidate_pending, __entry->refresh_gen, __entry->refcount, + __entry->write_seq, __entry->dirty_trans_seq, __entry->waiters_pr, __entry->waiters_ex, __entry->waiters_cw, __entry->users_pr, __entry->users_ex, __entry->users_cw) );