diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f8bac9..0eeb3eb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,8 +60,8 @@ jobs: - name: Run cargo test (with valgrind) run: cargo test -- --test-threads=1 env: - # TODO: use --errors-for-leak-kinds=definite,indirect due to upstream bug (https://github.com/rust-lang/rust/issues/135608) - CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER: valgrind -v --error-exitcode=1 --error-limit=no --leak-check=full --show-leak-kinds=all --errors-for-leak-kinds=definite,indirect --track-origins=yes --fair-sched=yes + # TODO: ignore possible and reachable leaks due to upstream issue (https://github.com/rust-lang/rust/issues/135608) + CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER: valgrind -v --error-exitcode=1 --error-limit=no --leak-check=full --show-leak-kinds=definite,indirect --errors-for-leak-kinds=definite,indirect --track-origins=yes --fair-sched=yes --gen-suppressions=all - name: Run cargo test (with portable-atomic enabled) run: cargo test --features portable-atomic - name: Clone async-executor diff --git a/examples/with-metadata.rs b/examples/with-metadata.rs index 1bd1bc7..7a71f50 100644 --- a/examples/with-metadata.rs +++ b/examples/with-metadata.rs @@ -72,9 +72,7 @@ thread_local! { static QUEUE: RefCell> = RefCell::new(BinaryHeap::new()); } -fn make_future_fn<'a, F>( - future: F, -) -> impl (FnOnce(&'a DurationMetadata) -> MeasureRuntime<'a, F>) { +fn make_future_fn<'a, F>(future: F) -> impl FnOnce(&'a DurationMetadata) -> MeasureRuntime<'a, F> { move |duration_meta| MeasureRuntime { f: future, duration: &duration_meta.inner, diff --git a/src/header.rs b/src/header.rs index ee84035..8494a69 100644 --- a/src/header.rs +++ b/src/header.rs @@ -1,6 +1,6 @@ use core::cell::UnsafeCell; use core::fmt; -use core::task::Waker; +use core::task::{RawWaker, Waker}; #[cfg(not(feature = "portable-atomic"))] use core::sync::atomic::AtomicUsize; @@ -9,13 +9,23 @@ use core::sync::atomic::Ordering; use portable_atomic::AtomicUsize; use crate::raw::TaskVTable; -use crate::state::*; -use crate::utils::abort_on_panic; +use crate::{ + state::*, + utils::{abort, abort_on_panic}, + Runnable, ScheduleInfo, +}; + +/// Actions to take upon calling [`Header::drop_waker`]. +pub(crate) enum DropWakerAction { + /// Re-schedule the task + Schedule, + /// Destroy the task. + Destroy, + /// Do nothing. + None, +} -/// The header of a task. -/// -/// This header is stored in memory at the beginning of the heap-allocated task. -pub(crate) struct Header { +pub(crate) struct Header { /// Current state of the task. /// /// Contains flags representing the current state and the reference count. @@ -32,17 +42,13 @@ pub(crate) struct Header { /// methods necessary for bookkeeping the heap-allocated task. pub(crate) vtable: &'static TaskVTable, - /// Metadata associated with the task. - /// - /// This metadata may be provided to the user. - pub(crate) metadata: M, - + pub(crate) schedule: fn(Runnable, ScheduleInfo), /// Whether or not a panic that occurs in the task should be propagated. #[cfg(feature = "std")] pub(crate) propagate_panic: bool, } -impl Header { +impl Header { /// Notifies the awaiter blocked on this task. /// /// If the awaiter is the same as the current waker, it will not be notified. @@ -157,11 +163,69 @@ impl Header { abort_on_panic(|| w.wake()); } } + + /// Clones a waker. + pub(crate) unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + let header = ptr as *const Header; + + // Increment the reference count. With any kind of reference-counted data structure, + // relaxed ordering is appropriate when incrementing the counter. + let state = (*header).state.fetch_add(REFERENCE, Ordering::Relaxed); + + // If the reference count overflowed, abort. + if state > isize::MAX as usize { + abort(); + } + + RawWaker::new(ptr, (*header).vtable.raw_waker_vtable) + } + + #[inline(never)] + pub(crate) unsafe fn drop_waker(ptr: *const ()) -> DropWakerAction { + let header = ptr as *const Header; + + // Decrement the reference count. + let new = (*header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `Task` has been dropped too, + // then we need to decide how to destroy the task. + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + if new & (COMPLETED | CLOSED) == 0 { + // If the task was not completed nor closed, close it and schedule one more time so + // that its future gets dropped by the executor. + (*header) + .state + .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); + DropWakerAction::Schedule + } else { + // Otherwise, destroy the task right away. + DropWakerAction::Destroy + } + } else { + DropWakerAction::None + } + } +} + +// SAFETY: repr(C) is explicitly used here so that casts between `Header` and `HeaderWithMetadata` +// can be done safely without additional offsets. +// +/// The header of a task. +/// +/// This header is stored in memory at the beginning of the heap-allocated task. +#[repr(C)] +pub(crate) struct HeaderWithMetadata { + pub(crate) header: Header, + + /// Metadata associated with the task. + /// + /// This metadata may be provided to the user. + pub(crate) metadata: M, } -impl fmt::Debug for Header { +impl fmt::Debug for HeaderWithMetadata { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let state = self.state.load(Ordering::SeqCst); + let state = self.header.state.load(Ordering::SeqCst); f.debug_struct("Header") .field("scheduled", &(state & SCHEDULED != 0)) diff --git a/src/lib.rs b/src/lib.rs index c8f6702..e0198dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,9 @@ //! //! // Push the task into the queue by invoking its schedule function. //! runnable.schedule(); +//! # let handle = std::thread::spawn(move || { for runnable in receiver { runnable.run(); }}); +//! # smol::future::block_on(task); +//! # handle.join().unwrap(); //! ``` //! //! The [`Runnable`] is used to poll the task's future, and the [`Task`] is used to await its @@ -70,7 +73,7 @@ #![no_std] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc(test(attr(deny(rust_2018_idioms, warnings))))] -#![doc(test(attr(allow(unused_extern_crates, unused_variables))))] +#![doc(test(attr(allow(unused_variables))))] #![doc( html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] @@ -110,9 +113,6 @@ mod task; mod utils; pub use crate::runnable::{ - spawn, spawn_unchecked, Builder, Runnable, Schedule, ScheduleInfo, WithInfo, + Builder, Runnable, ScheduleInfo, }; pub use crate::task::{FallibleTask, Task}; - -#[cfg(feature = "std")] -pub use crate::runnable::spawn_local; diff --git a/src/raw.rs b/src/raw.rs index 7a45dad..cf6925b 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,19 +1,15 @@ use alloc::alloc::Layout as StdLayout; -use core::cell::UnsafeCell; use core::future::Future; +use core::marker::PhantomData; use core::mem::{self, ManuallyDrop}; use core::pin::Pin; use core::ptr::NonNull; use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -#[cfg(not(feature = "portable-atomic"))] -use core::sync::atomic::AtomicUsize; use core::sync::atomic::Ordering; -#[cfg(feature = "portable-atomic")] -use portable_atomic::AtomicUsize; -use crate::header::Header; -use crate::runnable::{Schedule, ScheduleInfo}; +use crate::header::{DropWakerAction, Header, HeaderWithMetadata}; +use crate::runnable::ScheduleInfo; use crate::state::*; use crate::utils::{abort, abort_on_panic, max, Layout}; use crate::Runnable; @@ -26,17 +22,13 @@ pub(crate) type Panic = core::convert::Infallible; /// The vtable for a task. pub(crate) struct TaskVTable { + pub(crate) raw_waker_vtable: &'static RawWakerVTable, + /// Schedules the task. pub(crate) schedule: unsafe fn(*const (), ScheduleInfo), /// Drops the future inside the task. - pub(crate) drop_future: unsafe fn(*const ()), - - /// Returns a pointer to the output stored after completion. - pub(crate) get_output: unsafe fn(*const ()) -> *const (), - - /// Drops the task reference (`Runnable` or `Waker`). - pub(crate) drop_ref: unsafe fn(ptr: *const ()), + pub(crate) drop_future: unsafe fn(*const (), &TaskLayout), /// Destroys the task. pub(crate) destroy: unsafe fn(*const ()), @@ -44,16 +36,19 @@ pub(crate) struct TaskVTable { /// Runs the task. pub(crate) run: unsafe fn(*const ()) -> bool, - /// Creates a new waker associated with the task. - pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, - /// The memory layout of the task. This information enables /// debuggers to decode raw task memory blobs. Do not remove /// the field, even if it appears to be unused. - #[allow(unused)] pub(crate) layout_info: &'static TaskLayout, } +impl TaskVTable { + /// Returns a pointer to the output inside a task. + pub(crate) unsafe fn get_output(&self, ptr: *const ()) -> *const () { + ptr.add_byte(self.layout_info.offset_r) + } +} + /// Memory layout of a task. /// /// This struct contains the following information: @@ -65,9 +60,6 @@ pub(crate) struct TaskLayout { /// Memory layout of the whole task. pub(crate) layout: StdLayout, - /// Offset into the task at which the schedule function is stored. - pub(crate) offset_s: usize, - /// Offset into the task at which the future is stored. pub(crate) offset_f: usize, @@ -76,12 +68,9 @@ pub(crate) struct TaskLayout { } /// Raw pointers to the fields inside a task. -pub(crate) struct RawTask { +pub(crate) struct RawTask { /// The task header. - pub(crate) header: *const Header, - - /// The schedule function. - pub(crate) schedule: *const S, + pub(crate) header: *const HeaderWithMetadata, /// The future. pub(crate) future: *mut F, @@ -90,23 +79,22 @@ pub(crate) struct RawTask { pub(crate) output: *mut Result, } -impl Copy for RawTask {} +impl Copy for RawTask {} -impl Clone for RawTask { +impl Clone for RawTask { fn clone(&self) -> Self { *self } } -impl RawTask { - const TASK_LAYOUT: TaskLayout = Self::eval_task_layout(); +impl RawTask { + pub(crate) const TASK_LAYOUT: TaskLayout = Self::eval_task_layout(); /// Computes the memory layout for a task. #[inline] const fn eval_task_layout() -> TaskLayout { // Compute the layouts for `Header`, `S`, `F`, and `T`. - let layout_header = Layout::new::>(); - let layout_s = Layout::new::(); + let layout_header = Layout::new::>(); let layout_f = Layout::new::(); let layout_r = Layout::new::>(); @@ -117,397 +105,137 @@ impl RawTask { // Compute the layout for `Header` followed `S` and `union { F, T }`. let layout = layout_header; - let (layout, offset_s) = leap_unwrap!(layout.extend(layout_s)); let (layout, offset_union) = leap_unwrap!(layout.extend(layout_union)); let offset_f = offset_union; let offset_r = offset_union; TaskLayout { layout: unsafe { layout.into_std() }, - offset_s, offset_f, offset_r, } } } -impl RawTask -where - F: Future, - S: Schedule, -{ - const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - Self::clone_waker, - Self::wake, - Self::wake_by_ref, - Self::drop_waker, - ); - - /// Allocates a task with the given `future` and `schedule` function. - /// - /// It is assumed that initially only the `Runnable` and the `Task` exist. - pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>( - future: Gen, - schedule: S, - builder: crate::Builder, - ) -> NonNull<()> - where - F: 'a, - M: 'a, - { - // Compute the layout of the task for allocation. Abort if the computation fails. - // - // n.b. notgull: task_layout now automatically aborts instead of panicking - let task_layout = Self::task_layout(); +/// Allocates a task with the given `future` and `schedule` function. +/// +/// It is assumed that initially only the `Runnable` and the `Task` exist. +/// +/// Use a macro to brute force inlining to minimize stack copies of potentially +/// large futures. +macro_rules! allocate_task { + ($f:tt, $s:tt, $m:tt, $builder:ident, $schedule:ident, $raw:ident => $future:block) => {{ + let allocation = + alloc::alloc::alloc(RawTask::<$f, <$f as Future>::Output, $s, $m>::TASK_LAYOUT.layout); + // Allocate enough space for the entire task. + let ptr = NonNull::new(allocation as *mut ()).unwrap_or_else(|| crate::utils::abort()); + + let $raw = RawTask::<$f, <$f as Future>::Output, $s, $m>::from_ptr(ptr.as_ptr()); + + let crate::Builder { + metadata, + #[cfg(feature = "std")] + propagate_panic, + } = $builder; + + // Write the header as the first field of the task. + ($raw.header as *mut HeaderWithMetadata<$m>).write(HeaderWithMetadata { + header: Header { + #[cfg(not(feature = "portable-atomic"))] + state: core::sync::atomic::AtomicUsize::new(SCHEDULED | TASK | REFERENCE), + #[cfg(feature = "portable-atomic")] + state: portable_atomic::AtomicUsize::new(SCHEDULED | TASK | REFERENCE), + awaiter: core::cell::UnsafeCell::new(None), + vtable: &RawTask::<$f, <$f as Future>::Output, $s, $m>::TASK_VTABLE, + #[cfg(feature = "std")] + propagate_panic, + }, + metadata, + }); - unsafe { - // Allocate enough space for the entire task. - let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { - None => abort(), - Some(p) => p, - }; + // Write the schedule function as the third field of the task. + ($raw.schedule as *mut S).write($schedule); - let raw = Self::from_ptr(ptr.as_ptr()); + // Explicitly avoid using abort_on_panic here to avoid extra stack + // copies of the future on lower optimization levels. + let bomb = crate::utils::Bomb; - let crate::Builder { - metadata, - #[cfg(feature = "std")] - propagate_panic, - } = builder; - - // Write the header as the first field of the task. - (raw.header as *mut Header).write(Header { - state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), - awaiter: UnsafeCell::new(None), - vtable: &TaskVTable { - schedule: Self::schedule, - drop_future: Self::drop_future, - get_output: Self::get_output, - drop_ref: Self::drop_ref, - destroy: Self::destroy, - run: Self::run, - clone_waker: Self::clone_waker, - layout_info: &Self::TASK_LAYOUT, - }, - metadata, - #[cfg(feature = "std")] - propagate_panic, - }); + // Generate the future, now that the metadata has been pinned in place. + // Write the future as the fourth field of the task. + $raw.future.write($future); + // (&(*raw.header).metadata) - // Write the schedule function as the third field of the task. - (raw.schedule as *mut S).write(schedule); + mem::forget(bomb); + ptr + }}; +} - // Generate the future, now that the metadata has been pinned in place. - let future = abort_on_panic(|| future(&(*raw.header).metadata)); +pub(crate) use allocate_task; - // Write the future as the fourth field of the task. - raw.future.write(future); +impl RawTask +where + F: Future, +{ + pub(crate) const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + Header::clone_waker, + wake::, + wake_by_ref::, + drop_waker, + ); - ptr - } - } + pub(crate) const TASK_VTABLE: TaskVTable = TaskVTable { + raw_waker_vtable: &Self::RAW_WAKER_VTABLE, + schedule: schedule::, + drop_future: drop_future::, + destroy: destroy::, + run: Self::run, + layout_info: &Self::TASK_LAYOUT, + }; /// Creates a `RawTask` from a raw task pointer. #[inline] pub(crate) fn from_ptr(ptr: *const ()) -> Self { - let task_layout = Self::task_layout(); - let p = ptr as *const u8; - unsafe { Self { header: p as *const Header, - schedule: p.add(task_layout.offset_s) as *const S, future: p.add(task_layout.offset_f) as *mut F, output: p.add(task_layout.offset_r) as *mut Result, } } } - /// Returns the layout of the task. - #[inline] - fn task_layout() -> TaskLayout { - Self::TASK_LAYOUT - } - /// Wakes a waker. - unsafe fn wake(ptr: *const ()) { - // This is just an optimization. If the schedule function has captured variables, then - // we'll do less reference counting if we wake the waker by reference and then drop it. - if mem::size_of::() > 0 { - Self::wake_by_ref(ptr); - Self::drop_waker(ptr); - return; - } - - let raw = Self::from_ptr(ptr); - - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task is completed or closed, it can't be woken up. - if state & (COMPLETED | CLOSED) != 0 { - // Drop the waker. - Self::drop_waker(ptr); - break; - } - - // If the task is already scheduled, we just need to synchronize with the thread that - // will run the task by "publishing" our current view of the memory. - if state & SCHEDULED != 0 { - // Update the state without actually modifying it. - match (*raw.header).state.compare_exchange_weak( - state, - state, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Drop the waker. - Self::drop_waker(ptr); - break; - } - Err(s) => state = s, - } - } else { - // Mark the task as scheduled. - match (*raw.header).state.compare_exchange_weak( - state, - state | SCHEDULED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not yet scheduled and isn't currently running, now is the - // time to schedule it. - if state & RUNNING == 0 { - // Schedule the task. - Self::schedule(ptr, ScheduleInfo::new(false)); - } else { - // Drop the waker. - Self::drop_waker(ptr); - } - - break; - } - Err(s) => state = s, - } - } - } - } - - /// Wakes a waker by reference. - unsafe fn wake_by_ref(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task is completed or closed, it can't be woken up. - if state & (COMPLETED | CLOSED) != 0 { - break; - } - - // If the task is already scheduled, we just need to synchronize with the thread that - // will run the task by "publishing" our current view of the memory. - if state & SCHEDULED != 0 { - // Update the state without actually modifying it. - match (*raw.header).state.compare_exchange_weak( - state, - state, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(s) => state = s, - } - } else { - // If the task is not running, we can schedule right away. - let new = if state & RUNNING == 0 { - (state | SCHEDULED) + REFERENCE - } else { - state | SCHEDULED - }; - - // Mark the task as scheduled. - match (*raw.header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not running, now is the time to schedule. - if state & RUNNING == 0 { - // If the reference count overflowed, abort. - if state > isize::MAX as usize { - abort(); - } - - // Schedule the task. There is no need to call `Self::schedule(ptr)` - // because the schedule function cannot be destroyed while the waker is - // still alive. - let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); - (*raw.schedule).schedule(task, ScheduleInfo::new(false)); - } - - break; - } - Err(s) => state = s, - } - } - } - } - - /// Clones a waker. - unsafe fn clone_waker(ptr: *const ()) -> RawWaker { - let raw = Self::from_ptr(ptr); - - // Increment the reference count. With any kind of reference-counted data structure, - // relaxed ordering is appropriate when incrementing the counter. - let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); - - // If the reference count overflowed, abort. - if state > isize::MAX as usize { - abort(); - } - - RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) - } - - /// Drops a waker. - /// - /// This function will decrement the reference count. If it drops down to zero, the associated - /// `Task` has been dropped too, and the task has not been completed, then it will get - /// scheduled one more time so that its future gets dropped by the executor. - #[inline] - unsafe fn drop_waker(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // Decrement the reference count. - let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; - - // If this was the last reference to the task and the `Task` has been dropped too, - // then we need to decide how to destroy the task. - if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { - if new & (COMPLETED | CLOSED) == 0 { - // If the task was not completed nor closed, close it and schedule one more time so - // that its future gets dropped by the executor. - (*raw.header) - .state - .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); - Self::schedule(ptr, ScheduleInfo::new(false)); - } else { - // Otherwise, destroy the task right away. - Self::destroy(ptr); - } - } - } - - /// Drops a task reference (`Runnable` or `Waker`). - /// - /// This function will decrement the reference count. If it drops down to zero and the - /// associated `Task` handle has been dropped too, then the task gets destroyed. - #[inline] - unsafe fn drop_ref(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // Decrement the reference count. - let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; - - // If this was the last reference to the task and the `Task` has been dropped too, - // then destroy the task. - if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { - Self::destroy(ptr); - } - } - - /// Schedules a task for running. - /// - /// This function doesn't modify the state of the task. It only passes the task reference to - /// its schedule function. - unsafe fn schedule(ptr: *const (), info: ScheduleInfo) { - let raw = Self::from_ptr(ptr); - - // If the schedule function has captured variables, create a temporary waker that prevents - // the task from getting deallocated while the function is being invoked. - let _waker; - if mem::size_of::() > 0 { - _waker = Waker::from_raw(Self::clone_waker(ptr)); - } - - let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); - (*raw.schedule).schedule(task, info); - } - - /// Drops the future inside a task. - #[inline] - unsafe fn drop_future(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // We need a safeguard against panics because the destructor can panic. - abort_on_panic(|| { - raw.future.drop_in_place(); - }) - } - - /// Returns a pointer to the output inside a task. - unsafe fn get_output(ptr: *const ()) -> *const () { - let raw = Self::from_ptr(ptr); - raw.output as *const () - } - - /// Cleans up task's resources and deallocates it. - /// - /// The schedule function will be dropped, and the task will then get deallocated. - /// The task must be closed before this function is called. - #[inline] - unsafe fn destroy(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - let task_layout = Self::task_layout(); - - // We need a safeguard against panics because destructors can panic. - abort_on_panic(|| { - // Drop the header along with the metadata. - (raw.header as *mut Header).drop_in_place(); - - // Drop the schedule function. - (raw.schedule as *mut S).drop_in_place(); - }); - - // Finally, deallocate the memory reserved by the task. - alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); - } - /// Runs a task. /// /// If polling its future panics, the task will be closed and the panic will be propagated into /// the caller. unsafe fn run(ptr: *const ()) -> bool { let raw = Self::from_ptr(ptr); + let header = ptr as *const Header; // Create a context from the raw task pointer and the vtable inside the its header. let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); let cx = &mut Context::from_waker(&waker); - let mut state = (*raw.header).state.load(Ordering::Acquire); + let mut state = (*header).state.load(Ordering::Acquire); // Update the task's state before polling its future. loop { // If the task has already been closed, drop the task reference and return. if state & CLOSED != 0 { // Drop the future. - Self::drop_future(ptr); + drop_future::(ptr, &Self::TASK_LAYOUT); // Mark the task as unscheduled. - let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); + awaiter = (*header).take(None); } // Drop the task reference. - Self::drop_ref(ptr); + drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -517,7 +245,7 @@ where } // Mark the task as unscheduled and running. - match (*raw.header).state.compare_exchange_weak( + match (*header).state.compare_exchange_weak( state, (state & !SCHEDULED) | RUNNING, Ordering::AcqRel, @@ -535,7 +263,7 @@ where // Poll the inner future, but surround it with a guard that closes the task in case polling // panics. // If available, we should also try to catch the panic so that it is propagated correctly. - let guard = Guard(raw); + let guard = Guard::(ptr, &Self::TASK_LAYOUT, PhantomData); // Panic propagation is not available for no_std. #[cfg(not(feature = "std"))] @@ -544,7 +272,7 @@ where #[cfg(feature = "std")] let poll = { // Check if we should propagate panics. - if (*raw.header).propagate_panic { + if (*header).propagate_panic { // Use catch_unwind to catch the panic. match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { ::poll(Pin::new_unchecked(&mut *raw.future), cx) @@ -563,7 +291,7 @@ where match poll { Poll::Ready(out) => { // Replace the future with its output. - Self::drop_future(ptr); + drop_future::(ptr, &Self::TASK_LAYOUT); raw.output.write(out); // The task is now completed. @@ -576,7 +304,7 @@ where }; // Mark the task as not running and completed. - match (*raw.header).state.compare_exchange_weak( + match (*header).state.compare_exchange_weak( state, new, Ordering::AcqRel, @@ -593,11 +321,11 @@ where // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); + awaiter = (*header).take(None); } // Drop the task reference. - Self::drop_ref(ptr); + drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -625,12 +353,12 @@ where if state & CLOSED != 0 && !future_dropped { // The thread that closed the task didn't drop the future because it was // running so now it's our responsibility to do so. - Self::drop_future(ptr); + drop_future::(ptr, &Self::TASK_LAYOUT); future_dropped = true; } // Mark the task as not running. - match (*raw.header).state.compare_exchange_weak( + match (*header).state.compare_exchange_weak( state, new, Ordering::AcqRel, @@ -644,11 +372,11 @@ where // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); + awaiter = (*header).take(None); } // Drop the task reference. - Self::drop_ref(ptr); + drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -657,11 +385,11 @@ where } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. - Self::schedule(ptr, ScheduleInfo::new(true)); + schedule::(ptr, ScheduleInfo::new(true)); return true; } else { // Drop the task reference. - Self::drop_ref(ptr); + drop_ref(ptr); } break; } @@ -671,86 +399,337 @@ where } } - return false; - - /// A guard that closes the task if polling its future panics. - struct Guard(RawTask) - where - F: Future, - S: Schedule; - - impl Drop for Guard - where - F: Future, - S: Schedule, - { - fn drop(&mut self) { - let raw = self.0; - let ptr = raw.header as *const (); - - unsafe { - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task was closed while running, then unschedule it, drop its - // future, and drop the task reference. - if state & CLOSED != 0 { - // The thread that closed the task didn't drop the future because it - // was running so now it's our responsibility to do so. - RawTask::::drop_future(ptr); - - // Mark the task as not running and not scheduled. - (*raw.header) - .state - .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); + false + } +} - // Take the awaiter out. - let mut awaiter = None; - if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); - } +/// A guard that closes the task if polling its future panics. +struct Guard(*const (), &'static TaskLayout, PhantomData F>) +where + F: Future; - // Drop the task reference. - RawTask::::drop_ref(ptr); +impl Drop for Guard +where + F: Future, +{ + fn drop(&mut self) { + let ptr = self.0; + let task_layout = self.1; + let header = ptr as *const Header; - // Notify the awaiter that the future has been dropped. - if let Some(w) = awaiter { - abort_on_panic(|| w.wake()); - } - break; + unsafe { + let header = &*header; + let mut state = header.state.load(Ordering::Acquire); + + loop { + // If the task was closed while running, then unschedule it, drop its + // future, and drop the task reference. + if state & CLOSED != 0 { + // The thread that closed the task didn't drop the future because it + // was running so now it's our responsibility to do so. + drop_future::(ptr, task_layout); + + // Mark the task as not running and not scheduled. + header + .state + .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = header.take(None); + } + + // Drop the task reference. + drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + + // Mark the task as not running, not scheduled, and closed. + match header.state.compare_exchange_weak( + state, + (state & !RUNNING & !SCHEDULED) | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // Drop the future because the task is now closed. + drop_future::(ptr, task_layout); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = header.take(None); } - // Mark the task as not running, not scheduled, and closed. - match (*raw.header).state.compare_exchange_weak( - state, - (state & !RUNNING & !SCHEDULED) | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(state) => { - // Drop the future because the task is now closed. - RawTask::::drop_future(ptr); + // Drop the task reference. + drop_ref(ptr); - // Take the awaiter out. - let mut awaiter = None; - if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); - } + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + Err(s) => state = s, + } + } + } + } +} - // Drop the task reference. - RawTask::::drop_ref(ptr); +/// Schedules a task for running. +/// +/// This function doesn't modify the state of the task. It only passes the task reference to +/// its schedule function. +unsafe fn schedule, M>(ptr: *const (), info: ScheduleInfo) { + let header = ptr as *const Header; + let task_layout = (*header).vtable.layout_info; + let schedule = ptr.add_byte(task_layout.offset_s) as *mut S; + + // If the schedule function has captured variables, create a temporary waker that prevents + // the task from getting deallocated while the function is being invoked. + let _waker; + if mem::size_of::() > 0 { + _waker = Waker::from_raw(Header::clone_waker(ptr)); + } - // Notify the awaiter that the future has been dropped. - if let Some(w) = awaiter { - abort_on_panic(|| w.wake()); - } - break; - } - Err(s) => state = s, + let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); + (*schedule).schedule(task, info); +} + +/// Drops a waker. +/// +/// This function will decrement the reference count. If it drops down to zero, the associated +/// `Task` has been dropped too, and the task has not been completed, then it will get +/// scheduled one more time so that its future gets dropped by the executor. +#[inline] +unsafe fn drop_waker(ptr: *const ()) { + let header = ptr as *const Header; + match Header::drop_waker(ptr) { + DropWakerAction::Schedule => ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)), + DropWakerAction::Destroy => ((*header).vtable.destroy)(ptr), + DropWakerAction::None => {} + } +} + +/// Drops the future inside a task. +#[inline] +unsafe fn drop_future(ptr: *const (), task_layout: &TaskLayout) { + let future_ptr = ptr.add_byte(task_layout.offset_f) as *mut F; + + // We need a safeguard against panics because the destructor can panic. + abort_on_panic(|| { + future_ptr.drop_in_place(); + }) +} + +/// Wakes a waker. +unsafe fn wake, M>(ptr: *const ()) { + // This is just an optimization. If the schedule function has captured variables, then + // we'll do less reference counting if we wake the waker by reference and then drop it. + if mem::size_of::() > 0 { + wake_by_ref::(ptr); + drop_waker(ptr); + return; + } + + let header = ptr as *const Header; + + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken up. + if state & (COMPLETED | CLOSED) != 0 { + // Drop the waker. + drop_waker(ptr); + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match (*header).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Drop the waker. + drop_waker(ptr); + break; + } + Err(s) => state = s, + } + } else { + // Mark the task as scheduled. + match (*header).state.compare_exchange_weak( + state, + state | SCHEDULED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not yet scheduled and isn't currently running, now is the + // time to schedule it. + if state & RUNNING == 0 { + // Schedule the task. + schedule::(ptr, ScheduleInfo::new(false)); + } else { + // Drop the waker. + drop_waker(ptr); + } + + break; + } + Err(s) => state = s, + } + } + } +} + +/// Wakes a waker by reference. +unsafe fn wake_by_ref, M>(ptr: *const ()) { + let header = ptr as *const Header; + let header = &*header; + let task_layout = header.vtable.layout_info; + + let mut state = header.state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken up. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match header.state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } else { + // If the task is not running, we can schedule right away. + let new = if state & RUNNING == 0 { + (state | SCHEDULED) + REFERENCE + } else { + state | SCHEDULED + }; + + // Mark the task as scheduled. + match header.state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not running, now is the time to schedule. + if state & RUNNING == 0 { + // If the reference count overflowed, abort. + if state > isize::MAX as usize { + abort(); } + + let schedule = ptr.add_byte(task_layout.offset_s) as *mut S; + + // Schedule the task. There is no need to call `Self::schedule(ptr)` + // because the schedule function cannot be destroyed while the waker is + // still alive. + let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); + (*schedule).schedule(task, ScheduleInfo::new(false)); } + + break; } + Err(s) => state = s, } } } } + +/// Cleans up task's resources and deallocates it. +/// +/// The schedule function will be dropped, and the task will then get deallocated. +/// The task must be closed before this function is called. +#[inline] +unsafe fn destroy(ptr: *const ()) { + let header = ptr as *const Header; + let task_layout = (*header).vtable.layout_info; + let schedule = ptr.add_byte(task_layout.offset_s); + + // We need a safeguard against panics because destructors can panic. + abort_on_panic(|| { + // Drop the header along with the metadata. + (ptr as *mut HeaderWithMetadata).drop_in_place(); + + // Drop the schedule function. + (schedule as *mut S).drop_in_place(); + }); + + // Finally, deallocate the memory reserved by the task. + alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); +} + +/// Drops a task reference (`Runnable` or `Waker`). +/// +/// This function will decrement the reference count. If it drops down to zero and the +/// associated `Task` handle has been dropped too, then the task gets destroyed. +#[inline] +pub(crate) unsafe fn drop_ref(ptr: *const ()) { + let header = ptr as *const Header; + let header = &*header; + + // Decrement the reference count. + let new = header.state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `Task` has been dropped too, + // then destroy the task. + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + (header.vtable.destroy)(ptr); + } +} + +trait PointerPolyfill { + // Polyfill for `byte_add`. + // TODO: Replace this with `byte_add` once the MSRV should be bumped past 1.75 + /// Adds an unsigned offset in bytes to a pointer. + /// + /// `count` is in units of bytes. + /// + /// This is purely a convenience for casting to a `u8` pointer and + /// using [add][pointer::add] on it. See that method for documentation + /// and safety requirements. + /// + /// # Safety + /// If any of the following conditions are violated, the result is Undefined Behavior: + /// + /// - The offset in bytes, count * size_of::(), computed on mathematical integers + /// (without “wrapping around”), must fit in an isize. + /// - If the computed offset is non-zero, then self must be derived from a pointer to + /// some allocation, and the entire memory range between self and the result must be + /// in bounds of that allocation. In particular, this range must not “wrap around” + /// the edge of the address space. + unsafe fn add_byte(self, size: usize) -> Self; +} + +impl PointerPolyfill for *const T { + #[inline] + unsafe fn add_byte(self, size: usize) -> Self { + (self.cast::().add(size)).cast::() + } +} diff --git a/src/runnable.rs b/src/runnable.rs index 17945b2..269cc3a 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -6,22 +6,13 @@ use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::task::Waker; -use alloc::boxed::Box; - use crate::header::Header; -use crate::raw::RawTask; +use crate::header::HeaderWithMetadata; +use crate::raw::drop_ref; +use crate::raw::{allocate_task, RawTask}; use crate::state::*; use crate::Task; -mod sealed { - use super::*; - pub trait Sealed {} - - impl Sealed for F where F: Fn(Runnable) {} - - impl Sealed for WithInfo where F: Fn(Runnable, ScheduleInfo) {} -} - /// A builder that creates a new task. #[derive(Debug)] pub struct Builder { @@ -94,80 +85,6 @@ impl ScheduleInfo { } } -/// The trait for scheduling functions. -pub trait Schedule: sealed::Sealed { - /// The actual scheduling procedure. - fn schedule(&self, runnable: Runnable, info: ScheduleInfo); -} - -impl Schedule for F -where - F: Fn(Runnable), -{ - fn schedule(&self, runnable: Runnable, _: ScheduleInfo) { - self(runnable) - } -} - -/// Pass a scheduling function with more scheduling information - a.k.a. -/// [`ScheduleInfo`]. -/// -/// Sometimes, it's useful to pass the runnable's state directly to the -/// scheduling function, such as whether it's woken up while running. The -/// scheduler can thus use the information to determine its scheduling -/// strategy. -/// -/// The data source of [`ScheduleInfo`] is directly from the actual -/// implementation of the crate itself, different from [`Runnable`]'s metadata, -/// which is managed by the caller. -/// -/// # Examples -/// -/// ``` -/// use async_task::{ScheduleInfo, WithInfo}; -/// use std::sync::{Arc, Mutex}; -/// -/// // The future inside the task. -/// let future = async { -/// println!("Hello, world!"); -/// }; -/// -/// // If the task gets woken up while running, it will be sent into this channel. -/// let (s, r) = flume::unbounded(); -/// // Otherwise, it will be placed into this slot. -/// let lifo_slot = Arc::new(Mutex::new(None)); -/// let schedule = move |runnable, info: ScheduleInfo| { -/// if info.woken_while_running { -/// s.send(runnable).unwrap() -/// } else { -/// let last = lifo_slot.lock().unwrap().replace(runnable); -/// if let Some(last) = last { -/// s.send(last).unwrap() -/// } -/// } -/// }; -/// -/// // Create a task with the future and the schedule function. -/// let (runnable, task) = async_task::spawn(future, WithInfo(schedule)); -/// ``` -#[derive(Debug)] -pub struct WithInfo(pub F); - -impl From for WithInfo { - fn from(value: F) -> Self { - WithInfo(value) - } -} - -impl Schedule for WithInfo -where - F: Fn(Runnable, ScheduleInfo), -{ - fn schedule(&self, runnable: Runnable, info: ScheduleInfo) { - (self.0)(runnable, info) - } -} - impl Builder<()> { /// Creates a new task builder. /// @@ -277,6 +194,23 @@ impl Builder<()> { } } +// Use a macro to brute force inlining to minimize stack copies of potentially +// large futures. +macro_rules! spawn_unchecked { + ($f:tt, $s:tt, $m:tt, $builder:ident, $schedule:ident, $raw:ident => $future:block) => {{ + let ptr = allocate_task!($f, $s, $m, $builder, $schedule, $raw => $future); + + #[allow(unused_unsafe)] + // SAFTETY: The task was just allocated above. + let runnable = unsafe { Runnable::from_raw(ptr) }; + let task = Task { + ptr, + _marker: PhantomData, + }; + (runnable, task) + }}; +} + impl Builder { /// Propagates panics that occur in the task. /// @@ -359,14 +293,19 @@ impl Builder { /// // Create a task with the future and the schedule function. /// let (runnable, task) = Builder::new().spawn(|()| future, schedule); /// ``` - pub fn spawn(self, future: F, schedule: S) -> (Runnable, Task) + pub fn spawn( + self, + future: F, + schedule: fn(Runnable, ScheduleInfo), + ) -> (Runnable, Task) where F: FnOnce(&M) -> Fut, Fut: Future + Send + 'static, Fut::Output: Send + 'static, - S: Schedule + Send + Sync + 'static, { - unsafe { self.spawn_unchecked(future, schedule) } + unsafe { + spawn_unchecked!(Fut, S, M, self, schedule, raw => { future(&(*raw.header).metadata) }) + } } /// Creates a new thread-local task. @@ -402,16 +341,15 @@ impl Builder { /// let (runnable, task) = Builder::new().spawn_local(move |()| future, schedule); /// ``` #[cfg(feature = "std")] - pub fn spawn_local( + pub fn spawn_local( self, future: F, - schedule: S, + schedule: fn(Runnable, ScheduleInfo), ) -> (Runnable, Task) where F: FnOnce(&M) -> Fut, Fut: Future + 'static, Fut::Output: 'static, - S: Schedule + Send + Sync + 'static, { use std::mem::ManuallyDrop; use std::pin::Pin; @@ -501,158 +439,20 @@ impl Builder { /// // Create a task with the future and the schedule function. /// let (runnable, task) = unsafe { Builder::new().spawn_unchecked(move |()| future, schedule) }; /// ``` - pub unsafe fn spawn_unchecked<'a, F, Fut, S>( + pub unsafe fn spawn_unchecked<'a, F, Fut>( self, future: F, - schedule: S, + schedule: fn(Runnable, ScheduleInfo), ) -> (Runnable, Task) where F: FnOnce(&'a M) -> Fut, Fut: Future + 'a, - S: Schedule, M: 'a, { - // Allocate large futures on the heap. - let ptr = if mem::size_of::() >= 2048 { - let future = |meta| { - let future = future(meta); - Box::pin(future) - }; - - RawTask::<_, Fut::Output, S, M>::allocate(future, schedule, self) - } else { - RawTask::::allocate(future, schedule, self) - }; - - let runnable = Runnable::from_raw(ptr); - let task = Task { - ptr, - _marker: PhantomData, - }; - (runnable, task) + spawn_unchecked!(Fut, S, M, self, schedule, raw => { future(&(*raw.header).metadata) }) } } -/// Creates a new task. -/// -/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its -/// output. -/// -/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] -/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run -/// again. -/// -/// When the task is woken, its [`Runnable`] is passed to the `schedule` function. -/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it -/// should push it into a task queue so that it can be processed later. -/// -/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider -/// using [`spawn_local()`] or [`spawn_unchecked()`] instead. -/// -/// # Examples -/// -/// ``` -/// // The future inside the task. -/// let future = async { -/// println!("Hello, world!"); -/// }; -/// -/// // A function that schedules the task when it gets woken up. -/// let (s, r) = flume::unbounded(); -/// let schedule = move |runnable| s.send(runnable).unwrap(); -/// -/// // Create a task with the future and the schedule function. -/// let (runnable, task) = async_task::spawn(future, schedule); -/// ``` -pub fn spawn(future: F, schedule: S) -> (Runnable, Task) -where - F: Future + Send + 'static, - F::Output: Send + 'static, - S: Schedule + Send + Sync + 'static, -{ - unsafe { spawn_unchecked(future, schedule) } -} - -/// Creates a new thread-local task. -/// -/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the -/// [`Runnable`] is used or dropped on another thread, a panic will occur. -/// -/// This function is only available when the `std` feature for this crate is enabled. -/// -/// # Examples -/// -/// ``` -/// use async_task::Runnable; -/// use flume::{Receiver, Sender}; -/// use std::rc::Rc; -/// -/// thread_local! { -/// // A queue that holds scheduled tasks. -/// static QUEUE: (Sender, Receiver) = flume::unbounded(); -/// } -/// -/// // Make a non-Send future. -/// let msg: Rc = "Hello, world!".into(); -/// let future = async move { -/// println!("{}", msg); -/// }; -/// -/// // A function that schedules the task when it gets woken up. -/// let s = QUEUE.with(|(s, _)| s.clone()); -/// let schedule = move |runnable| s.send(runnable).unwrap(); -/// -/// // Create a task with the future and the schedule function. -/// let (runnable, task) = async_task::spawn_local(future, schedule); -/// ``` -#[cfg(feature = "std")] -pub fn spawn_local(future: F, schedule: S) -> (Runnable, Task) -where - F: Future + 'static, - F::Output: 'static, - S: Schedule + Send + Sync + 'static, -{ - Builder::new().spawn_local(move |()| future, schedule) -} - -/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. -/// -/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and -/// `'static` on `future` and `schedule`. -/// -/// # Safety -/// -/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original -/// thread. -/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`]. -/// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`] -/// must be used and dropped on the original thread. -/// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the -/// [`Runnable`]'s [`Waker`]. -/// -/// # Examples -/// -/// ``` -/// // The future inside the task. -/// let future = async { -/// println!("Hello, world!"); -/// }; -/// -/// // If the task gets woken up, it will be sent into this channel. -/// let (s, r) = flume::unbounded(); -/// let schedule = move |runnable| s.send(runnable).unwrap(); -/// -/// // Create a task with the future and the schedule function. -/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; -/// ``` -pub unsafe fn spawn_unchecked(future: F, schedule: S) -> (Runnable, Task) -where - F: Future, - S: Schedule, -{ - Builder::new().spawn_unchecked(move |()| future, schedule) -} - /// A handle to a runnable task. /// /// Every spawned task has a single [`Runnable`] handle, which only exists when the task is @@ -713,36 +513,7 @@ impl Runnable { /// Tasks can be created with a metadata object associated with them; by default, this /// is a `()` value. See the [`Builder::metadata()`] method for more information. pub fn metadata(&self) -> &M { - &self.header().metadata - } - - /// Schedules the task. - /// - /// This is a convenience method that passes the [`Runnable`] to the schedule function. - /// - /// # Examples - /// - /// ``` - /// // A function that schedules the task when it gets woken up. - /// let (s, r) = flume::unbounded(); - /// let schedule = move |runnable| s.send(runnable).unwrap(); - /// - /// // Create a task with a simple future and the schedule function. - /// let (runnable, task) = async_task::spawn(async {}, schedule); - /// - /// // Schedule the task. - /// assert_eq!(r.len(), 0); - /// runnable.schedule(); - /// assert_eq!(r.len(), 1); - /// ``` - pub fn schedule(self) { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; - mem::forget(self); - - unsafe { - ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); - } + &self.header_with_metadata().metadata } /// Runs the task by polling its future. @@ -775,7 +546,7 @@ impl Runnable { /// ``` pub fn run(self) -> bool { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; mem::forget(self); unsafe { ((*header).vtable.run)(ptr) } @@ -803,19 +574,23 @@ impl Runnable { /// assert_eq!(r.len(), 0); /// waker.wake(); /// assert_eq!(r.len(), 1); + /// # let handle = std::thread::spawn(move || { for runnable in r { runnable.run(); }}); + /// # smol::future::block_on(task.cancel()); // cancel because the future is future::pending + /// # handle.join().unwrap(); /// ``` pub fn waker(&self) -> Waker { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; - unsafe { - let raw_waker = ((*header).vtable.clone_waker)(ptr); + let raw_waker = Header::clone_waker(self.ptr.as_ptr()); Waker::from_raw(raw_waker) } } - fn header(&self) -> &Header { - unsafe { &*(self.ptr.as_ptr() as *const Header) } + fn header(&self) -> &Header { + unsafe { &*(self.ptr.as_ptr() as *const Header) } + } + + fn header_with_metadata(&self) -> &HeaderWithMetadata { + unsafe { &*(self.ptr.as_ptr() as *const HeaderWithMetadata) } } /// Converts this task into a raw pointer. @@ -917,7 +692,7 @@ impl Drop for Runnable { } // Drop the future. - (header.vtable.drop_future)(ptr); + (header.vtable.drop_future)(ptr, header.vtable.layout_info); // Mark the task as unscheduled. let state = header.state.fetch_and(!SCHEDULED, Ordering::AcqRel); @@ -928,7 +703,7 @@ impl Drop for Runnable { } // Drop the task reference. - (header.vtable.drop_ref)(ptr); + drop_ref(ptr); } } } @@ -936,7 +711,7 @@ impl Drop for Runnable { impl fmt::Debug for Runnable { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const HeaderWithMetadata; f.debug_struct("Runnable") .field("header", unsafe { &(*header) }) diff --git a/src/state.rs b/src/state.rs index 2fc6cf3..40ed5ca 100644 --- a/src/state.rs +++ b/src/state.rs @@ -55,7 +55,7 @@ pub(crate) const REGISTERING: usize = 1 << 6; /// Set if the awaiter is being notified. /// /// This flag is set when notifying the awaiter. If an awaiter is concurrently registered and -/// notified, whichever side came first will take over the reposibility of resolving the race. +/// notified, whichever side came first will take over the responsibility of resolving the race. pub(crate) const NOTIFYING: usize = 1 << 7; /// A single reference. diff --git a/src/task.rs b/src/task.rs index a3dfd17..d92172f 100644 --- a/src/task.rs +++ b/src/task.rs @@ -7,10 +7,10 @@ use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::task::{Context, Poll}; -use crate::header::Header; +use crate::header::{Header, HeaderWithMetadata}; use crate::raw::Panic; -use crate::runnable::ScheduleInfo; use crate::state::*; +use crate::ScheduleInfo; /// A spawned task. /// @@ -85,8 +85,8 @@ impl Task { /// .detach(); /// ``` pub fn detach(self) { - let mut this = self; - let _out = this.set_detached(); + let this = self; + let _out = set_detached::(this.ptr.as_ptr()); mem::forget(this); } @@ -125,8 +125,8 @@ impl Task { /// }); /// ``` pub async fn cancel(self) -> Option { - let mut this = self; - this.set_canceled(); + let this = self; + set_canceled(this.ptr.as_ptr()); this.fallible().await } @@ -179,277 +179,277 @@ impl Task { FallibleTask { task: self } } - /// Puts the task in canceled state. - fn set_canceled(&mut self) { + fn header_with_metadata(&self) -> &HeaderWithMetadata { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; - - unsafe { - let mut state = (*header).state.load(Ordering::Acquire); - - loop { - // If the task has been completed or closed, it can't be canceled. - if state & (COMPLETED | CLOSED) != 0 { - break; - } - - // If the task is not scheduled nor running, we'll need to schedule it. - let new = if state & (SCHEDULED | RUNNING) == 0 { - (state | SCHEDULED | CLOSED) + REFERENCE - } else { - state | CLOSED - }; - - // Mark the task as closed. - match (*header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not scheduled nor running, schedule it one more time so - // that its future gets dropped by the executor. - if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); - } + let header = ptr as *const HeaderWithMetadata; + unsafe { &*header } + } - // Notify the awaiter that the task has been closed. - if state & AWAITER != 0 { - (*header).notify(None); - } + /// Returns `true` if the current task is finished. + /// + /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. + pub fn is_finished(&self) -> bool { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; - break; - } - Err(s) => state = s, - } - } + unsafe { + let state = (*header).state.load(Ordering::Acquire); + state & (CLOSED | COMPLETED) != 0 } } - /// Puts the task in detached state. - fn set_detached(&mut self) -> Option> { + /// Get the metadata associated with this task. + /// + /// Tasks can be created with a metadata object associated with them; by default, this + /// is a `()` value. See the [`Builder::metadata()`] method for more information. + pub fn metadata(&self) -> &M { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const HeaderWithMetadata; + &unsafe { &*header }.metadata + } +} - unsafe { - // A place where the output will be stored in case it needs to be dropped. - let mut output = None; - - // Optimistically assume the `Task` is being detached just after creating the task. - // This is a common case so if the `Task` is datached, the overhead of it is only one - // compare-exchange operation. - if let Err(mut state) = (*header).state.compare_exchange_weak( - SCHEDULED | TASK | REFERENCE, - SCHEDULED | REFERENCE, - Ordering::AcqRel, - Ordering::Acquire, - ) { - loop { - // If the task has been completed but not yet closed, that means its output - // must be dropped. - if state & COMPLETED != 0 && state & CLOSED == 0 { - // Mark the task as closed in order to grab its output. - match (*header).state.compare_exchange_weak( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Read the output. - output = Some( - (((*header).vtable.get_output)(ptr) as *mut Result) - .read(), - ); - - // Update the state variable because we're continuing the loop. - state |= CLOSED; - } - Err(s) => state = s, +/// Puts the task in detached state. +#[inline(never)] +fn set_detached(ptr: *const ()) -> Option> { + let header = ptr as *const Header; + + unsafe { + // A place where the output will be stored in case it needs to be dropped. + let mut output = None; + + // Optimistically assume the `Task` is being detached just after creating the task. + // This is a common case so if the `Task` is datached, the overhead of it is only one + // compare-exchange operation. + if let Err(mut state) = (*header).state.compare_exchange_weak( + SCHEDULED | TASK | REFERENCE, + SCHEDULED | REFERENCE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + loop { + // If the task has been completed but not yet closed, that means its output + // must be dropped. + if state & COMPLETED != 0 && state & CLOSED == 0 { + // Mark the task as closed in order to grab its output. + match (*header).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Read the output. + output = Some( + ((*header).vtable.get_output(ptr) as *mut Result).read(), + ); + + // Update the state variable because we're continuing the loop. + state |= CLOSED; } + Err(s) => state = s, + } + } else { + // If this is the last reference to the task and it's not closed, then + // close it and schedule one more time so that its future gets dropped by + // the executor. + let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { + SCHEDULED | CLOSED | REFERENCE } else { - // If this is the last reference to the task and it's not closed, then - // close it and schedule one more time so that its future gets dropped by - // the executor. - let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { - SCHEDULED | CLOSED | REFERENCE - } else { - state & !TASK - }; - - // Unset the `TASK` flag. - match (*header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If this is the last reference to the task, we need to either - // schedule dropping its future or destroy it. - if state & !(REFERENCE - 1) == 0 { - if state & CLOSED == 0 { - ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); - } else { - ((*header).vtable.destroy)(ptr); - } + state & !TASK + }; + + // Unset the `TASK` flag. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If this is the last reference to the task, we need to either + // schedule dropping its future or destroy it. + if state & !(REFERENCE - 1) == 0 { + if state & CLOSED == 0 { + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); + } else { + ((*header).vtable.destroy)(ptr); } - - break; } - Err(s) => state = s, + + break; } + Err(s) => state = s, } } } - - output } + + output } +} - /// Polls the task to retrieve its output. - /// - /// Returns `Some` if the task has completed or `None` if it was closed. - /// - /// A task becomes closed in the following cases: - /// - /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. - /// 2. Its output gets awaited by the `Task`. - /// 3. It panics while polling the future. - /// 4. It is completed and the `Task` gets dropped. - fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll> { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; +/// Puts the task in canceled state. +#[inline(never)] +fn set_canceled(ptr: *const ()) { + let header = ptr as *const Header; - unsafe { - let mut state = (*header).state.load(Ordering::Acquire); + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); - loop { - // If the task has been closed, notify the awaiter and return `None`. - if state & CLOSED != 0 { - // If the task is scheduled or running, we need to wait until its future is - // dropped. - if state & (SCHEDULED | RUNNING) != 0 { - // Replace the waker with one associated with the current task. - (*header).register(cx.waker()); + loop { + // If the task has been completed or closed, it can't be canceled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } - // Reload the state after registering. It is possible changes occurred just - // before registration so we need to check for that. - state = (*header).state.load(Ordering::Acquire); + // If the task is not scheduled nor running, we'll need to schedule it. + let new = if state & (SCHEDULED | RUNNING) == 0 { + (state | SCHEDULED | CLOSED) + REFERENCE + } else { + state | CLOSED + }; + + // Mark the task as closed. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not scheduled nor running, schedule it one more time so + // that its future gets dropped by the executor. + if state & (SCHEDULED | RUNNING) == 0 { + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); + } - // If the task is still scheduled or running, we need to wait because its - // future is not dropped yet. - if state & (SCHEDULED | RUNNING) != 0 { - return Poll::Pending; - } + // Notify the awaiter that the task has been closed. + if state & AWAITER != 0 { + (*header).notify(None); } - // Even though the awaiter is most likely the current task, it could also be - // another task. - (*header).notify(Some(cx.waker())); - return Poll::Ready(None); + break; } + Err(s) => state = s, + } + } + } +} - // If the task is not completed, register the current task. - if state & COMPLETED == 0 { +/// Polls the task to retrieve its output. +/// +/// Returns `Some` if the task has completed or `None` if it was closed. +/// +/// A task becomes closed in the following cases: +/// +/// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. +/// 2. Its output gets awaited by the `Task`. +/// 3. It panics while polling the future. +/// 4. It is completed and the `Task` gets dropped. +fn poll_task(ptr: *const (), cx: &mut Context<'_>) -> Poll> { + let header = ptr as *const Header; + + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been closed, notify the awaiter and return `None`. + if state & CLOSED != 0 { + // If the task is scheduled or running, we need to wait until its future is + // dropped. + if state & (SCHEDULED | RUNNING) != 0 { // Replace the waker with one associated with the current task. (*header).register(cx.waker()); - // Reload the state after registering. It is possible that the task became - // completed or closed just before registration so we need to check for that. + // Reload the state after registering. It is possible changes occurred just + // before registration so we need to check for that. state = (*header).state.load(Ordering::Acquire); - // If the task has been closed, restart. - if state & CLOSED != 0 { - continue; - } - - // If the task is still not completed, we're blocked on it. - if state & COMPLETED == 0 { + // If the task is still scheduled or running, we need to wait because its + // future is not dropped yet. + if state & (SCHEDULED | RUNNING) != 0 { return Poll::Pending; } } - // Since the task is now completed, mark it as closed in order to grab its output. - match (*header).state.compare_exchange( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Notify the awaiter. Even though the awaiter is most likely the current - // task, it could also be another task. - if state & AWAITER != 0 { - (*header).notify(Some(cx.waker())); - } + // Even though the awaiter is most likely the current task, it could also be + // another task. + (*header).notify(Some(cx.waker())); + return Poll::Ready(None); + } - // Take the output from the task. - let output = ((*header).vtable.get_output)(ptr) as *mut Result; - let output = output.read(); + // If the task is not completed, register the current task. + if state & COMPLETED == 0 { + // Replace the waker with one associated with the current task. + (*header).register(cx.waker()); - // Propagate the panic if the task panicked. - let output = match output { - Ok(output) => output, - #[allow(unreachable_patterns)] - Err(panic) => { - #[cfg(feature = "std")] - std::panic::resume_unwind(panic); + // Reload the state after registering. It is possible that the task became + // completed or closed just before registration so we need to check for that. + state = (*header).state.load(Ordering::Acquire); - #[cfg(not(feature = "std"))] - match panic {} - } - }; + // If the task has been closed, restart. + if state & CLOSED != 0 { + continue; + } - return Poll::Ready(Some(output)); - } - Err(s) => state = s, + // If the task is still not completed, we're blocked on it. + if state & COMPLETED == 0 { + return Poll::Pending; } } - } - } - fn header(&self) -> &Header { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; - unsafe { &*header } - } + // Since the task is now completed, mark it as closed in order to grab its output. + match (*header).state.compare_exchange( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Notify the awaiter. Even though the awaiter is most likely the current + // task, it could also be another task. + if state & AWAITER != 0 { + (*header).notify(Some(cx.waker())); + } - /// Returns `true` if the current task is finished. - /// - /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. - pub fn is_finished(&self) -> bool { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + // Take the output from the task. + let output = (*header).vtable.get_output(ptr) as *mut Result; + let output = output.read(); - unsafe { - let state = (*header).state.load(Ordering::Acquire); - state & (CLOSED | COMPLETED) != 0 - } - } + // Propagate the panic if the task panicked. + let output = match output { + Ok(output) => output, + Err(panic) => { + #[cfg(feature = "std")] + std::panic::resume_unwind(panic); - /// Get the metadata associated with this task. - /// - /// Tasks can be created with a metadata object associated with them; by default, this - /// is a `()` value. See the [`Builder::metadata()`] method for more information. - pub fn metadata(&self) -> &M { - &self.header().metadata + #[cfg(not(feature = "std"))] + match panic {} + } + }; + + return Poll::Ready(Some(output)); + } + Err(s) => state = s, + } + } } } impl Drop for Task { fn drop(&mut self) { - self.set_canceled(); - self.set_detached(); + let ptr = self.ptr.as_ptr(); + set_canceled(ptr); + set_detached::(ptr); } } impl Future for Task { type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.poll_task(cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match poll_task::(self.ptr.as_ptr(), cx) { Poll::Ready(t) => Poll::Ready(t.expect("Task polled after completion")), Poll::Pending => Poll::Pending, } @@ -459,7 +459,7 @@ impl Future for Task { impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Task") - .field("header", self.header()) + .field("header", self.header_with_metadata()) .finish() } } @@ -552,15 +552,15 @@ impl FallibleTask { impl Future for FallibleTask { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.task.poll_task(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + poll_task::(self.task.ptr.as_ptr(), cx) } } impl fmt::Debug for FallibleTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FallibleTask") - .field("header", self.task.header()) + .field("header", self.task.header_with_metadata()) .finish() } } diff --git a/src/utils.rs b/src/utils.rs index 5c2170c..0e8da5f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -17,19 +17,19 @@ pub(crate) fn abort() -> ! { panic!("aborting the process"); } +pub(crate) struct Bomb; + +impl Drop for Bomb { + fn drop(&mut self) { + abort(); + } +} + /// Calls a function and aborts if it panics. /// /// This is useful in unsafe code where we can't recover from panics. #[inline] pub(crate) fn abort_on_panic(f: impl FnOnce() -> T) -> T { - struct Bomb; - - impl Drop for Bomb { - fn drop(&mut self) { - abort(); - } - } - let bomb = Bomb; let t = f(); mem::forget(bomb);