From 54274370cdffda93b306559d188d142039e56281 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 4 Dec 2022 00:16:37 -0800 Subject: [PATCH 1/3] Use st3 as local queues --- Cargo.toml | 1 + src/lib.rs | 39 +++++++++++++++++---------------------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index faee198..be3d7ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ concurrent-queue = "2.0.0" fastrand = "1.3.4" futures-lite = "1.11.0" slab = "0.4.4" +st3 = "0.4" [dev-dependencies] async-channel = "1.4.1" diff --git a/src/lib.rs b/src/lib.rs index 24fbfac..36ea937 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::task::{Poll, Waker}; +use st3::lifo::{Worker, Stealer}; use async_lock::OnceCell; use async_task::Runnable; @@ -465,7 +466,7 @@ struct State { queue: ConcurrentQueue, /// Local queues created by runners. - local_queues: RwLock>>>, + local_queues: RwLock>>, /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. notified: AtomicBool, @@ -717,7 +718,7 @@ struct Runner<'a> { ticker: Ticker<'a>, /// The local queue. - local: Arc>, + local: Worker, /// Bumped every time a runnable task is found. ticks: AtomicUsize, @@ -726,17 +727,18 @@ struct Runner<'a> { impl Runner<'_> { /// Creates a runner and registers it in the executor state. fn new(state: &State) -> Runner<'_> { + let worker = Worker::new(512); let runner = Runner { state, ticker: Ticker::new(state), - local: Arc::new(ConcurrentQueue::bounded(512)), + local: worker, ticks: AtomicUsize::new(0), }; state .local_queues .write() .unwrap() - .push(runner.local.clone()); + .push(runner.local.stealer()); runner } @@ -746,7 +748,7 @@ impl Runner<'_> { .ticker .runnable_with(|| { // Try the local queue. - if let Ok(r) = self.local.pop() { + if let Some(r) = self.local.pop() { return Some(r); } @@ -768,13 +770,10 @@ impl Runner<'_> { .skip(start) .take(n); - // Remove this runner's local queue. - let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); - // Try stealing from each local queue in the list. for local in iter { - steal(local, &self.local); - if let Ok(r) = self.local.pop() { + let count_fn = |remaining| remaining / 2; + if let Ok((r, _)) = local.steal_and_pop(&self.local, count_fn) { return Some(r); } } @@ -798,30 +797,26 @@ impl Runner<'_> { impl Drop for Runner<'_> { fn drop(&mut self) { // Remove the local queue. - self.state - .local_queues - .write() - .unwrap() - .retain(|local| !Arc::ptr_eq(local, &self.local)); + // self.state + // .local_queues + // .write() + // .unwrap() + // .retain(|local| !Arc::ptr_eq(local, &self.local)); // Re-schedule remaining tasks in the local queue. - while let Ok(r) = self.local.pop() { + while let Some(r) = self.local.pop() { r.schedule(); } } } /// Steals some items from one queue into another. -fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { +fn steal(src: &ConcurrentQueue, dest: &Worker) { // Half of `src`'s length rounded up. let mut count = (src.len() + 1) / 2; + count = count.max(dest.spare_capacity()); if count > 0 { - // Don't steal more than fits into the queue. - if let Some(cap) = dest.capacity() { - count = count.min(cap - dest.len()); - } - // Steal tasks. for _ in 0..count { if let Ok(t) = src.pop() { From 97fe4a3157a0d196dabc572270504ac16cd14ad9 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 15:40:42 -0700 Subject: [PATCH 2/3] Use fifo instead of lifo --- src/lib.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bb1d86f..3aee2c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,7 +45,7 @@ use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; use std::sync::{Arc, Mutex, RwLock, TryLockError}; use std::task::{Poll, Waker}; -use st3::lifo::{Worker, Stealer}; +use st3::fifo::{Worker, Stealer}; use async_task::{Builder, Runnable}; use concurrent_queue::ConcurrentQueue; @@ -1009,12 +1009,13 @@ impl Runner<'_> { impl Drop for Runner<'_> { fn drop(&mut self) { + let stealer_ref = self.local.stealer_ref(); // Remove the local queue. - // self.state - // .local_queues - // .write() - // .unwrap() - // .retain(|local| !Arc::ptr_eq(local, &self.local)); + self.state + .local_queues + .write() + .unwrap() + .retain(|stealer| !core::ptr::eq(stealer, stealer_ref)); // Re-schedule remaining tasks in the local queue. while let Some(r) = self.local.pop() { @@ -1027,16 +1028,12 @@ impl Drop for Runner<'_> { fn steal(src: &ConcurrentQueue, dest: &Worker) { // Half of `src`'s length rounded up. let mut count = (src.len() + 1) / 2; - count = count.max(dest.spare_capacity()); + count = count.min(dest.spare_capacity()); if count > 0 { // Steal tasks. - for _ in 0..count { - if let Ok(t) = src.pop() { - assert!(dest.push(t).is_ok()); - } else { - break; - } + for t in src.try_iter().take(count) { + assert!(dest.push(t).is_ok()); } } } From 0579b9ebd36ef9c2f1ed52c38d664c11b1fbb2f3 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 22 Apr 2024 07:47:44 -0700 Subject: [PATCH 3/3] Add back in the self-stealing avoidance --- src/lib.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3aee2c4..72460b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +use st3::fifo::{Stealer, Worker}; use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; @@ -45,7 +46,6 @@ use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; use std::sync::{Arc, Mutex, RwLock, TryLockError}; use std::task::{Poll, Waker}; -use st3::fifo::{Worker, Stealer}; use async_task::{Builder, Runnable}; use concurrent_queue::ConcurrentQueue; @@ -983,6 +983,10 @@ impl Runner<'_> { .skip(start) .take(n); + // Remove this runner's local queue. + let local_stealer = self.local.stealer_ref(); + let iter = iter.filter(|local| !core::ptr::eq(*local, local_stealer)); + // Try stealing from each local queue in the list. for local in iter { let count_fn = |remaining| remaining / 2; @@ -1028,9 +1032,10 @@ impl Drop for Runner<'_> { fn steal(src: &ConcurrentQueue, dest: &Worker) { // Half of `src`'s length rounded up. let mut count = (src.len() + 1) / 2; - count = count.min(dest.spare_capacity()); if count > 0 { + count = count.min(dest.spare_capacity()); + // Steal tasks. for t in src.try_iter().take(count) { assert!(dest.push(t).is_ok());