Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ jobs:
- name: Run cargo test (with valgrind)
run: cargo test -- --test-threads=1
env:
# TODO: use --errors-for-leak-kinds=definite,indirect due to upstream bug (https://github.com/rust-lang/rust/issues/135608)
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER: valgrind -v --error-exitcode=1 --error-limit=no --leak-check=full --show-leak-kinds=all --errors-for-leak-kinds=definite,indirect --track-origins=yes --fair-sched=yes
# TODO: ignore possible and reachable leaks due to upstream issue (https://github.com/rust-lang/rust/issues/135608)
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER: valgrind -v --error-exitcode=1 --error-limit=no --leak-check=full --show-leak-kinds=definite,indirect --errors-for-leak-kinds=definite,indirect --track-origins=yes --fair-sched=yes --gen-suppressions=all
- name: Run cargo test (with portable-atomic enabled)
run: cargo test --features portable-atomic
- name: Clone async-executor
Expand Down
4 changes: 1 addition & 3 deletions examples/with-metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ thread_local! {
static QUEUE: RefCell<BinaryHeap<ByDuration>> = RefCell::new(BinaryHeap::new());
}

fn make_future_fn<'a, F>(
future: F,
) -> impl (FnOnce(&'a DurationMetadata) -> MeasureRuntime<'a, F>) {
fn make_future_fn<'a, F>(future: F) -> impl FnOnce(&'a DurationMetadata) -> MeasureRuntime<'a, F> {
move |duration_meta| MeasureRuntime {
f: future,
duration: &duration_meta.inner,
Expand Down
94 changes: 79 additions & 15 deletions src/header.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::cell::UnsafeCell;
use core::fmt;
use core::task::Waker;
use core::task::{RawWaker, Waker};

#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic::AtomicUsize;
Expand All @@ -9,13 +9,23 @@ use core::sync::atomic::Ordering;
use portable_atomic::AtomicUsize;

use crate::raw::TaskVTable;
use crate::state::*;
use crate::utils::abort_on_panic;
use crate::{
state::*,
utils::{abort, abort_on_panic},
Runnable, ScheduleInfo,
};

/// Actions to take upon calling [`Header::drop_waker`].
pub(crate) enum DropWakerAction {
/// Re-schedule the task
Schedule,
/// Destroy the task.
Destroy,
/// Do nothing.
None,
}

/// The header of a task.
///
/// This header is stored in memory at the beginning of the heap-allocated task.
pub(crate) struct Header<M> {
pub(crate) struct Header {
/// Current state of the task.
///
/// Contains flags representing the current state and the reference count.
Expand All @@ -32,17 +42,13 @@ pub(crate) struct Header<M> {
/// methods necessary for bookkeeping the heap-allocated task.
pub(crate) vtable: &'static TaskVTable,

/// Metadata associated with the task.
///
/// This metadata may be provided to the user.
pub(crate) metadata: M,

pub(crate) schedule: fn(Runnable<M>, ScheduleInfo),
/// Whether or not a panic that occurs in the task should be propagated.
#[cfg(feature = "std")]
pub(crate) propagate_panic: bool,
}

impl<M> Header<M> {
impl Header {
/// Notifies the awaiter blocked on this task.
///
/// If the awaiter is the same as the current waker, it will not be notified.
Expand Down Expand Up @@ -157,11 +163,69 @@ impl<M> Header<M> {
abort_on_panic(|| w.wake());
}
}

/// Clones a waker.
pub(crate) unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
let header = ptr as *const Header;

// Increment the reference count. With any kind of reference-counted data structure,
// relaxed ordering is appropriate when incrementing the counter.
let state = (*header).state.fetch_add(REFERENCE, Ordering::Relaxed);

// If the reference count overflowed, abort.
if state > isize::MAX as usize {
abort();
}

RawWaker::new(ptr, (*header).vtable.raw_waker_vtable)
}

#[inline(never)]
pub(crate) unsafe fn drop_waker(ptr: *const ()) -> DropWakerAction {
let header = ptr as *const Header;

// Decrement the reference count.
let new = (*header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;

// If this was the last reference to the task and the `Task` has been dropped too,
// then we need to decide how to destroy the task.
if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
if new & (COMPLETED | CLOSED) == 0 {
// If the task was not completed nor closed, close it and schedule one more time so
// that its future gets dropped by the executor.
(*header)
.state
.store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
DropWakerAction::Schedule
} else {
// Otherwise, destroy the task right away.
DropWakerAction::Destroy
}
} else {
DropWakerAction::None
}
}
}

// SAFETY: repr(C) is explicitly used here so that casts between `Header` and `HeaderWithMetadata`
// can be done safely without additional offsets.
//
/// The header of a task.
///
/// This header is stored in memory at the beginning of the heap-allocated task.
#[repr(C)]
pub(crate) struct HeaderWithMetadata<M> {
pub(crate) header: Header,

/// Metadata associated with the task.
///
/// This metadata may be provided to the user.
pub(crate) metadata: M,
}

impl<M: fmt::Debug> fmt::Debug for Header<M> {
impl<M: fmt::Debug> fmt::Debug for HeaderWithMetadata<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);
let state = self.header.state.load(Ordering::SeqCst);

f.debug_struct("Header")
.field("scheduled", &(state & SCHEDULED != 0))
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
//!
//! // Push the task into the queue by invoking its schedule function.
//! runnable.schedule();
//! # let handle = std::thread::spawn(move || { for runnable in receiver { runnable.run(); }});
//! # smol::future::block_on(task);
//! # handle.join().unwrap();
//! ```
//!
//! The [`Runnable`] is used to poll the task's future, and the [`Task`] is used to await its
Expand Down Expand Up @@ -70,7 +73,7 @@
#![no_std]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
#![doc(test(attr(allow(unused_variables))))]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
Expand Down Expand Up @@ -110,9 +113,6 @@ mod task;
mod utils;

pub use crate::runnable::{
spawn, spawn_unchecked, Builder, Runnable, Schedule, ScheduleInfo, WithInfo,
Builder, Runnable, ScheduleInfo,
};
pub use crate::task::{FallibleTask, Task};

#[cfg(feature = "std")]
pub use crate::runnable::spawn_local;
Loading
Loading