Skip to content

thingbuf::mpsc::blocking::{SendRef + RecvRef} causes hang or crash when closing channel while a borrowed slot is active #100

@XCemaXX

Description

@XCemaXX

I’ve found a reproducible hang and an allocator crash (tcache_thread_shutdown(): unaligned tcache chunk detected) in thingbuf::mpsc::blocking when:

  • a background thread is repeatedly acquiring a SendRef
  • the receiver holds an active RecvRef
  • the channel is closed while the RecvRef is still alive

This seems to corrupt the internal wait queue during shutdown.
The issue reproduces with ASAN and tests.

Minimal Reproduction (short hang scenario)

This is the smallest example where the sender thread hangs forever in send_ref_timeout() (or try_send_ref()):

Thread hangs
use std::{
    sync::{atomic::AtomicBool, Arc},
    thread,
    time::Duration,
};
use thingbuf::{Recycle, mpsc::blocking::with_recycle};

#[derive(Default)]
struct Policy;
impl Recycle<u32> for Policy {
    fn recycle(&self, _: &mut u32) {}
    fn new_element(&self) -> u32 { 0 }
}

#[test]
fn hang_test() {
    let (tx, rx) = with_recycle(1, Policy);
    tx.send(1).unwrap();
    let stop = Arc::new(AtomicBool::new(false));
    let stop_thread = stop.clone();
    let t = thread::spawn(move || {
        while !stop_thread.load(std::sync::atomic::Ordering::SeqCst) {
            let r = tx.send_ref_timeout(Duration::from_millis(100));  // hangs
            // let r = tx.try_send_ref(); // hangs too
            drop(r);
        }
    });

    let r = rx.recv_ref().unwrap();
    std::thread::sleep(Duration::from_millis(10));
    stop.store(true, std::sync::atomic::Ordering::SeqCst);
    t.join().unwrap();
    drop(r);
}

Expected:

Sender times out or returns TrySendError::Full.

Actual:

Sender thread blocks forever.

Crash scenario (SendRef + RecvRef + thread shutdown)

The following example causes a hard crash inside thingbuf:
tcache_thread_shutdown(): unaligned tcache chunk detected

ASAN backtrace
#0  0x00005555556782f2 in core::sync::atomic::atomic_swap<usize> (dst=0x7bfff36f5880, val=2, order=core::sync::atomic::Ordering::Release)
    at /.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/sync/atomic.rs:4025
#1  core::sync::atomic::AtomicUsize::swap (self=0x7bfff36f5880, val=2, order=core::sync::atomic::Ordering::Release)
    at /.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/sync/atomic.rs:2977
#2  0x0000555555665b82 in thingbuf::wait::queue::List<std::thread::Thread>::dequeue<std::thread::Thread> (self=0x7d9ff70efe88, new_state=2)
    at /.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/thingbuf-0.1.6/src/wait/queue.rs:581
#3  0x000055555566675d in thingbuf::wait::queue::WaitQueue<std::thread::Thread>::notify_slow<std::thread::Thread> (self=0x7d9ff70efe00, state=1)
    at /.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/thingbuf-0.1.6/src/wait/queue.rs:410
#4  0x000055555565a8e4 in thingbuf::wait::queue::WaitQueue<std::thread::Thread>::notify<std::thread::Thread> (self=0x7d9ff70efe00)
    at /.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/thingbuf-0.1.6/src/wait/queue.rs:391
#5  thingbuf::mpsc::{impl#15}::drop<std::thread::Thread> (self=0x7bfff4400470) at /.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/thingbuf-0.1.6/src/mpsc.rs:571
#6  0x0000555555658b71 in core::ptr::drop_in_place<thingbuf::mpsc::NotifyTx<std::thread::Thread>> ()
    at /.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:805
#7  0x0000555555658abd in core::ptr::drop_in_place<thingbuf::mpsc::RecvRefInner<test_proj::PollerPayload, std::thread::Thread>> ()
    at /.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:805
#8  0x0000555555658ec1 in core::ptr::drop_in_place<thingbuf::mpsc::blocking::RecvRef<test_proj::PollerPayload>> ()
    at /.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:805
#9  0x0000555555668ff9 in core::mem::drop<thingbuf::mpsc::blocking::RecvRef<test_proj::PollerPayload>> (
    _x=<error reading variable: Cannot access memory at address 0x7bfff36f5880>)
    at /.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:969
#10 0x0000555555675a89 in test_proj::crash_test::{async_block#0} () at src/main.rs:198
Code to reproduce
use std::{
    sync::{atomic::AtomicBool, Arc, Weak},
    thread,
    time::Duration,
};

use thingbuf::{
    mpsc::{
        blocking::{with_recycle, Receiver, RecvRef, SendRef},
        errors::{SendTimeoutError, TryRecvError},
    },
    Recycle,
};
use tokio::sync::Notify;

#[derive(Default)]
struct Policy;
impl Recycle<u32> for Policy {
    fn recycle(&self, _v: &mut u32) {}
    fn new_element(&self) -> u32 {
        0
    }
}

struct Shared {
    stop: AtomicBool,
    notify: Notify,
}

pub struct Poller {
    shared: Arc<Shared>,
    rx: Receiver<u32, Policy>,
}

pub struct Handle {
    t: Option<thread::JoinHandle<()>>,
    shared: Weak<Shared>,
}

impl Drop for Handle {
    fn drop(&mut self) {
        if let Some(s) = self.shared.upgrade() {
            s.stop.store(true, std::sync::atomic::Ordering::SeqCst);
        }
        if let Some(t) = self.t.take() {
            let _ = t.join();
        }
    }
}

impl Poller {
    pub fn start() -> (Self, Handle) {
        let (tx, rx) = with_recycle(1, Policy);
        let shared = Arc::new(Shared {
            stop: AtomicBool::new(false),
            notify: Notify::new(),
        });
        let shared_state = shared.clone();
        let t = thread::spawn(move || {
            let mut slot: Option<SendRef<'_, u32>> = None;
            loop {
                if shared_state.stop.load(std::sync::atomic::Ordering::Acquire) {
                    drop(slot.take());
                    break;
                }
                if slot.is_none() {
                    match tx.send_ref_timeout(Duration::from_millis(10)) {
                        Ok(s) => slot = Some(s),
                        Err(SendTimeoutError::Closed(_)) => break,
                        Err(_) => continue,
                    }
                }
                let mut s = slot.take().unwrap();
                *s = *s + 1;
                drop(s);
                shared_state.notify.notify_one();
            }
            shared_state.notify.notify_one();
        });
        (
            Self {
                shared: shared.clone(),
                rx,
            },
            Handle {
                t: Some(t),
                shared: Arc::downgrade(&shared),
            },
        )
    }

    pub async fn get(&self) -> Result<RecvRef<'_, u32>, ()> {
        loop {
            match self.rx.try_recv_ref() {
                Ok(v) => return Ok(v),
                Err(TryRecvError::Empty) => {}
                Err(_) => return Err(()),
            }
            self.shared.notify.notified().await;
        }
    }
}

#[tokio::test]
async fn crash_test() {
    let (poller, handle) = Poller::start();
    let event = poller.get().await;
    drop(handle); // strop thread
    drop(event );
    drop(poller);
}

Expected:

Thread should shut down cleanly.

Actual:

Process aborts with tcache_thread_shutdown(): unaligned tcache chunk detected.

Env

thingbuf version: 0.1.6
Rust: 1.91 nightly and stable (reproduces on both)
OS: Linux x86_64
Sanitizers: ASAN confirms memory corruption on drop path

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions