diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 90f0447f7..4d96dbb88 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,7 +58,7 @@ jobs: cat << EOF > "run-gha-workflow.sh" PATH=$PATH:/usr/share/rust/.cargo/bin echo "`nproc` CPU(s) available" - rustup install 1.70 + rustup install 1.92 rustup show rustup default stable cargo install cargo-sort diff --git a/Cargo.toml b/Cargo.toml index 0f48b4536..677c64675 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,3 @@ [workspace] -members = [ - "examples", - "glommio", -] +members = ["examples", "glommio"] resolver = "2" diff --git a/README.md b/README.md index 6972af6f3..b14c0808f 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ an [introductory article.](https://www.datadoghq.com/blog/engineering/introducin ## Supported Rust Versions -Glommio is built against the latest stable release. The minimum supported version is 1.70. The current Glommio version +Glommio is built against the latest stable release. The minimum supported version is 1.92. The current Glommio version is not guaranteed to build on Rust versions earlier than the minimum supported version. ## Supported Linux kernels diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 7c277314b..c47a5eb34 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -3,23 +3,23 @@ name = "examples" version = "0.0.0" license = "Apache-2.0 OR MIT" publish = false -edition = "2021" +edition = "2024" [dev-dependencies] byte-unit = "5.1.4" -yansi = "~0.5.1" clap = "4.5.3" fastrand = "2" futures = "~0.3.5" futures-lite = "2.6.0" glommio = { path = "../glommio" } +http-body-util = "0.1.0" # hyper and tokio for the hyper example. We just need the traits from Tokio hyper = { version = "1.2.0", features = ["full"] } num_cpus = "1.13.0" -sys-info = "~0.8.0" -http-body-util = "0.1.0" serde_json = "1.0.114" +sys-info = "~0.9.1" +yansi = "~1.0.1" [[example]] name = "echo" diff --git a/examples/deadline_writer.rs b/examples/deadline_writer.rs index e3feaaa14..472a61519 100644 --- a/examples/deadline_writer.rs +++ b/examples/deadline_writer.rs @@ -69,11 +69,11 @@ impl IntWriter { println!( "{}: Wrote {} ({}%), {:.0} int/s, scheduler shares: {} , {:.2} % CPU", - Paint::blue(format!("{}s", elapsed.as_secs())), + format!("{}s", elapsed.as_secs()).blue(), self.count.get(), - Paint::new(format!("{:.0}", ratio)).bold(), + format!("{:.0}", ratio).bold(), intratio, - Paint::new(tq_stats.current_shares().to_string()).bold(), + tq_stats.current_shares().to_string().bold(), cpuratio ); self.next_print @@ -168,10 +168,7 @@ fn main() { "cpuhog", ); - println!( - "{}", - Paint::new("Welcome to the Deadline Writer example").bold() - ); + println!("{}", "Welcome to the Deadline Writer example".bold()); println!( "In this example we will write a sequence of integers to a variable, busy looping \ for 500us after each write" @@ -183,7 +180,7 @@ fn main() { println!( "For {} results, this test is pinned to your CPU0. Make sure nothing else of \ significance is running there. You should be able to see it at 100% at all times!", - Paint::new("best").bold() + "best".bold() ); println!("\n\nPlease tell me how many integers you would like to write"); @@ -191,28 +188,25 @@ fn main() { println!( "Ok, now let's write {} integers with both the writer and the CPU hog having the \ same priority", - Paint::blue(to_write.to_string()) + to_write.to_string().blue() ); let dur = static_writer(to_write, 1000, cpuhog_tq).await; - println!( - "Finished writing in {}", - Paint::green(format!("{dur:#.0?}")) - ); + println!("Finished writing in {}", format!("{dur:#.0?}").green()); println!( "This was using {} shares, and short of reducing the priority of the CPU hog. {}", - Paint::green("1000"), - Paint::new("This is as fast as we can do!").bold() + "1000".green(), + "This is as fast as we can do!".bold() ); println!( "With {} shares, this would have taken approximately {}", - Paint::green("100"), - Paint::green(format!("{:#.1?}", dur * 10)) + "100".green(), + format!("{:#.1?}", dur * 10).green() ); println!( "With {} shares, this would have taken approximately {}. {}.", - Paint::green("1"), - Paint::green(format!("{:#.1?}", dur * 1000)), - Paint::new("Can't go any slower than that!").bold() + "1".green(), + format!("{:#.1?}", dur * 1000).green(), + "Can't go any slower than that!".bold() ); println!( @@ -233,10 +227,7 @@ fn main() { let deadline = DeadlineQueue::new("example", Duration::from_millis(250)); let test = IntWriter::new(to_write, Duration::from_secs(duration as u64)); let dur = deadline.push_work(test).await.unwrap(); - println!( - "Finished writing in {}", - Paint::green(format!("{dur:#.2?}")) - ); + println!("Finished writing in {}", format!("{dur:#.2?}").green()); stop.set(true); hog.await.unwrap(); println!( diff --git a/examples/defer.rs b/examples/defer.rs index 98a5a9c98..96019b2ef 100644 --- a/examples/defer.rs +++ b/examples/defer.rs @@ -3,7 +3,7 @@ // // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. // -use glommio::{defer, timer::TimerActionOnce, LocalExecutorBuilder}; +use glommio::{LocalExecutorBuilder, defer, timer::TimerActionOnce}; use std::time::Duration; fn main() { diff --git a/examples/hyper_client.rs b/examples/hyper_client.rs index 6a02e2edb..ca8c925b8 100644 --- a/examples/hyper_client.rs +++ b/examples/hyper_client.rs @@ -13,9 +13,9 @@ mod hyper_compat { use glommio::net::TcpStream; use http_body_util::BodyExt; - use hyper::body::{Body as HttpBody, Bytes, Frame}; use hyper::Error; use hyper::Request; + use hyper::body::{Body as HttpBody, Bytes, Frame}; use std::io; use std::marker::PhantomData; @@ -74,38 +74,6 @@ mod hyper_compat { } } - struct GlommioSleep(glommio::timer::Timer); - - impl Future for GlommioSleep { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - match Pin::new(&mut self.0).poll(cx) { - Poll::Ready(_) => Poll::Ready(()), - Poll::Pending => Poll::Pending, - } - } - } - - impl hyper::rt::Sleep for GlommioSleep {} - unsafe impl Send for GlommioSleep {} - unsafe impl Sync for GlommioSleep {} - - #[derive(Clone, Copy, Debug)] - pub struct GlommioTimer; - - impl hyper::rt::Timer for GlommioTimer { - fn sleep(&self, duration: std::time::Duration) -> Pin> { - Box::pin(GlommioSleep(glommio::timer::Timer::new(duration))) - } - - fn sleep_until(&self, deadline: std::time::Instant) -> Pin> { - Box::pin(GlommioSleep(glommio::timer::Timer::new( - deadline - std::time::Instant::now(), - ))) - } - } - struct Body { // Our Body type is !Send and !Sync: _marker: PhantomData<*const ()>, diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index f20cd4708..dcb648222 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -8,9 +8,9 @@ mod hyper_compat { sync::Semaphore, }; use hyper::{ + Error, Request, Response, body::{Body as HttpBody, Bytes, Frame, Incoming}, service::service_fn, - Error, Request, Response, }; use std::{ @@ -173,7 +173,7 @@ mod hyper_compat { } use glommio::{CpuSet, LocalExecutorPoolBuilder, PoolPlacement}; -use hyper::{body::Incoming, Method, Request, Response, StatusCode}; +use hyper::{Method, Request, Response, StatusCode, body::Incoming}; use hyper_compat::ResponseBody; use std::convert::Infallible; diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index 91f5089d9..bda628aa4 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -3,7 +3,7 @@ // // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. // -use glommio::{enclose, LocalExecutor}; +use glommio::{LocalExecutor, enclose}; use std::{cell::RefCell, rc::Rc}; fn main() { diff --git a/examples/sharding.rs b/examples/sharding.rs index 8f617131f..8f6ccdea0 100644 --- a/examples/sharding.rs +++ b/examples/sharding.rs @@ -1,4 +1,4 @@ -use futures_lite::{future::ready, stream::repeat_with, FutureExt, StreamExt}; +use futures_lite::{FutureExt, StreamExt, future::ready, stream::repeat_with}; use glommio::{ channels::{ diff --git a/examples/storage.rs b/examples/storage.rs index 8beb8edd4..061f9cbc7 100644 --- a/examples/storage.rs +++ b/examples/storage.rs @@ -1,16 +1,15 @@ use byte_unit::{Byte, UnitType}; use clap::{Arg, Command}; use futures_lite::{ - stream::{self, StreamExt}, AsyncReadExt, AsyncWriteExt, + stream::{self, StreamExt}, }; use glommio::{ - enclose, + LocalExecutorBuilder, Placement, enclose, io::{ BufferedFile, DmaFile, DmaStreamReader, DmaStreamReaderBuilder, DmaStreamWriterBuilder, MergedBufferLimit, ReadAmplificationLimit, StreamReaderBuilder, StreamWriterBuilder, }, - LocalExecutorBuilder, Placement, }; use std::{ cell::Cell, diff --git a/glommio/Cargo.toml b/glommio/Cargo.toml index 8b6c381fc..8f1943c93 100755 --- a/glommio/Cargo.toml +++ b/glommio/Cargo.toml @@ -4,7 +4,7 @@ version = "0.9.0" authors = [ "Glauber Costa ", "Hippolyte Barraud ", - "DataDog " + "DataDog ", ] edition = "2021" description = "Glommio is a thread-per-core crate that makes writing highly parallel asynchronous applications in a thread-per-core architecture easier for rustaceans." @@ -15,55 +15,55 @@ keywords = ["linux", "rust", "async", "iouring", "thread-per-core"] categories = ["asynchronous", "concurrency", "os", "filesystem", "network-programming"] readme = "../README.md" # This is also documented in the README.md under "Supported Rust Versions" -rust-version = "1.70" +rust-version = "1.92" + +[features] +bench = [] +debugging = [] + +# Unstable features based on nightly +native-tls = [] +nightly = ["native-tls"] [dependencies] -ahash = "0.7" +ahash = "0.8" backtrace = { version = "0.3" } -bitflags = "2.4" -bitmaps = "3.1" -buddy-alloc = "0.4" -concurrent-queue = "1.2" +bencher = "0.1.5" +bitflags = "2.10" +bitmaps = "3.2" +buddy-alloc = "0.6" +concurrent-queue = "2.5" crossbeam = "0.8" -enclose = "1.1" -flume = { version = "0.11", features = ["async"] } -futures-lite = "2.6.0" -intrusive-collections = "0.9" -lazy_static = "1.4" +enclose = "1.2" +flume = { version = "0.12", features = ["async"] } +futures-lite = "2.6" +intrusive-collections = "0.10" +lazy_static = "1.5" libc = "0.2" -lockfree = "0.5" log = "0.4" -nix = { version = "0.27", features = ["event", "fs", "ioctl", "mman", "net", "poll", "sched", "time"] } +nix = { version = "0.30", features = ["event", "fs", "ioctl", "mman", "net", "poll", "sched", "time"] } pin-project-lite = "0.2" -rlimit = "0.6" +rlimit = "0.10" scoped-tls = "1.0" -scopeguard = "1.1" -signal-hook = { version = "0.3" } -sketches-ddsketch = "0.1" -smallvec = { version = "1.7", features = ["union"] } -socket2 = { version = "0.4", features = ["all"] } +scopeguard = "1.2" +signal-hook = { version = "0.4" } +sketches-ddsketch = "0.3" +smallvec = { version = "1.15", features = ["union"] } +socket2 = { version = "0.6", features = ["all"] } tracing = "0.1" -typenum = "1.15" - -[dev-dependencies] -fastrand = "2" -futures = "0" -hdrhistogram = "7" -pretty_env_logger = "0" -rand = "0" -tokio = { version = "1", default-features = false, features = ["rt", "macros", "rt-multi-thread", "net", "io-util", "time", "sync"] } -tracing-subscriber = { version = "0", features = ["env-filter"] } +typenum = "1.19" [build-dependencies] -cc = "1.0" - -[features] -bench = [] -debugging = [] +cc = "1.2" -# Unstable features based on nightly -native-tls = [] -nightly = ["native-tls"] +[dev-dependencies] +fastrand = "2.3" +futures = "0.3" +hdrhistogram = "7.5" +pretty_env_logger = "0.5" +rand = "0.9" +tokio = { version = "1.49", default-features = false, features = ["rt", "macros", "rt-multi-thread", "net", "io-util", "time", "sync"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [[bench]] name = "executor" diff --git a/glommio/benches/spsc_queue.rs b/glommio/benches/spsc_queue.rs index cb79f0b21..ddd5da724 100644 --- a/glommio/benches/spsc_queue.rs +++ b/glommio/benches/spsc_queue.rs @@ -1,32 +1,76 @@ use glommio::channels::spsc_queue; +use std::hint::{black_box, spin_loop}; +use std::thread; use std::time::Instant; -fn test_spsc(capacity: usize) { - const RUNS: u32 = 10 * 1000 * 1000; - let (sender, receiver) = spsc_queue::make::(1024); - let consumer = std::thread::spawn(move || { - let t = Instant::now(); +const RUNS: usize = 10_000_000; + +fn pin_to_cpu(cpu: usize) { + use std::mem::MaybeUninit; + + unsafe { + let mut set = MaybeUninit::::zeroed().assume_init(); + libc::CPU_ZERO(&mut set); + libc::CPU_SET(cpu, &mut set); + + let ret = libc::sched_setaffinity(0, std::mem::size_of::(), &set); + assert_eq!(ret, 0, "sched_setaffinity failed"); + } +} + +fn bench_spsc(capacity: usize, cpu_producer: usize, cpu_consumer: usize) { + let (sender, receiver) = spsc_queue::make::(capacity); + + let consumer = thread::spawn(move || { + pin_to_cpu(cpu_consumer); + + let start = Instant::now(); for _ in 0..RUNS { - while receiver.try_pop().is_none() {} + loop { + if black_box(receiver.try_pop()).is_some() { + break; + } + spin_loop(); + } } - println!( - "cost of receiving {:#?}, capacity {}", - t.elapsed() / RUNS, - capacity, - ); + start.elapsed() }); - let t = Instant::now(); + + // Producer timing + pin_to_cpu(cpu_producer); + + let start = Instant::now(); for i in 0..RUNS { - while sender.try_push(i as u64).is_some() {} + loop { + if black_box(sender.try_push(i as u32)).is_none() { + break; + } + spin_loop(); + } } + let prod_elapsed = start.elapsed(); + let cons_elapsed = consumer.join().unwrap(); + + let prod_ns = prod_elapsed.as_nanos() as f64 / RUNS as f64; + let cons_ns = cons_elapsed.as_nanos() as f64 / RUNS as f64; + + let prod_kops = (1e9 / prod_ns) / 1e3; + let cons_kops = (1e9 / cons_ns) / 1e3; + println!( - "cost of sending {:#?}, capacity {}", - t.elapsed() / RUNS, - capacity + "Cap {:>6} | Prod {:>8.2} ns/op ({:>10.2} KOPS) | Cons {:>8.2} ns/op ({:>10.2} KOPS)", + capacity, prod_ns, prod_kops, cons_ns, cons_kops ); - consumer.join().unwrap(); } fn main() { - test_spsc(1024); + let pairs = &[(0usize, 1usize), (0usize, 2usize)]; + + for &(cpu_p, cpu_c) in pairs { + println!("CPU {}->{}", cpu_p, cpu_c); + for &capacity in &[1, 16, 1024, 4096, 10_000] { + bench_spsc(capacity, cpu_p, cpu_c); + } + println!("--"); + } } diff --git a/glommio/build.rs b/glommio/build.rs index a4eb0f2f4..3395b0078 100644 --- a/glommio/build.rs +++ b/glommio/build.rs @@ -1,4 +1,4 @@ -use std::{env, fs, path::*, process::Command}; +use std::{env, path::*, process::Command}; use cc::Build; @@ -21,9 +21,13 @@ fn main() { } }; - // Run the configure script in OUT_DIR to get `compat.h` - let configured_include = configure(&liburing); + // Run the configure script to get `compat.h` + Command::new("./configure") + .current_dir(&liburing) + .output() + .expect("configure script failed"); + let configured_include = liburing.join("src/include"); let src = liburing.join("src"); // liburing @@ -33,7 +37,6 @@ fn main() { .file(src.join("syscall.c")) .file(src.join("register.c")) .flag("-D_GNU_SOURCE") - .include(src.join("include")) .include(&configured_include) .extra_warnings(false) .compile("uring"); @@ -42,20 +45,6 @@ fn main() { Build::new() .file(project.join("rusturing.c")) .flag("-D_GNU_SOURCE") - .include(src.join("include")) .include(&configured_include) .compile("rusturing"); } - -fn configure(liburing: &Path) -> PathBuf { - let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()) - .canonicalize() - .unwrap(); - fs::copy(liburing.join("configure"), out_dir.join("configure")).unwrap(); - fs::create_dir_all(out_dir.join("src/include/liburing")).unwrap(); - Command::new("./configure") - .current_dir(&out_dir) - .output() - .expect("configure script failed"); - out_dir.join("src/include") -} diff --git a/glommio/liburing b/glommio/liburing index 41a61c97c..e07a859d4 160000 --- a/glommio/liburing +++ b/glommio/liburing @@ -1 +1 @@ -Subproject commit 41a61c97c2e3df4475c93fdf5026d575ce3f1377 +Subproject commit e07a859d4b39583c0fe0290730a9d75bccc24b5e diff --git a/glommio/rusturing.c b/glommio/rusturing.c index 541d03161..296e948d4 100644 --- a/glommio/rusturing.c +++ b/glommio/rusturing.c @@ -108,7 +108,7 @@ extern inline void rust_io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, io_uring_prep_poll_add(sqe, fd, poll_mask); } -extern inline void rust_io_uring_prep_poll_remove(struct io_uring_sqe *sqe, void *user_data) +extern inline void rust_io_uring_prep_poll_remove(struct io_uring_sqe *sqe, __u64 user_data) { io_uring_prep_poll_remove(sqe, user_data); } @@ -312,3 +312,8 @@ extern inline int rust_io_uring_wait_cqe(struct io_uring *ring, struct io_uring_ { return io_uring_wait_cqe(ring, cqe_ptr); } + +extern inline struct io_uring_sqe *rust_io_uring_get_sqe(struct io_uring *ring) +{ + return io_uring_get_sqe(ring); +} diff --git a/glommio/src/channels/channel_mesh.rs b/glommio/src/channels/channel_mesh.rs index ec09fff83..49fdbfe32 100644 --- a/glommio/src/channels/channel_mesh.rs +++ b/glommio/src/channels/channel_mesh.rs @@ -4,14 +4,11 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. // use std::{ - cell::Cell, fmt::{self, Debug, Formatter}, io::{Error, ErrorKind}, - rc::Rc, + sync::{Arc, Mutex}, }; -use std::sync::{Arc, RwLock}; - use crate::{ channels::shared_channel::{self, *}, GlommioError, Result, @@ -161,27 +158,18 @@ impl Receivers { struct Peer { executor_id: usize, - notifier: Option>, role: Role, } impl Peer { - fn new(sender: Option>, role: Role) -> Self { + fn new(role: Role) -> Self { Self { executor_id: crate::executor().id(), - notifier: sender, role, } } } -type SharedChannel = ( - Cell>>, - Cell>>, -); - -type SharedChannels = Vec>>; - /// The role an executor plays in the mesh #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Role { @@ -203,6 +191,32 @@ impl Role { } } +struct JoinState { + peers: Vec, + ready: bool, + senders: Vec>>>, + receivers: Vec>>>, +} + +impl JoinState { + fn new(nr_peers: usize) -> Self { + let mut senders: Vec>>> = Vec::with_capacity(nr_peers); + let mut receivers: Vec>>> = Vec::with_capacity(nr_peers); + + for _ in 0..nr_peers { + senders.push((0..nr_peers).map(|_| None).collect()); + receivers.push((0..nr_peers).map(|_| None).collect()); + } + + Self { + peers: Vec::new(), + ready: false, + senders, + receivers, + } + } +} + /// An adapter for MeshBuilder pub trait MeshAdapter: Clone { /// Determine whether a channel should be created between a pair of peers @@ -252,20 +266,16 @@ pub type PartialMesh = MeshBuilder; pub struct MeshBuilder { nr_peers: usize, channel_size: usize, - peers: Rc>>, - channels: Arc>, + state: Arc>>, adapter: A, } -unsafe impl Send for MeshBuilder {} - impl Clone for MeshBuilder { fn clone(&self) -> Self { Self { nr_peers: self.nr_peers, channel_size: self.channel_size, - peers: self.peers.clone(), - channels: self.channels.clone(), + state: self.state.clone(), adapter: self.adapter.clone(), } } @@ -309,8 +319,7 @@ impl MeshBuilder { MeshBuilder { nr_peers, channel_size, - peers: Rc::new(RwLock::new(Vec::new())), - channels: Arc::new(Self::placeholder(nr_peers)), + state: Arc::new(Mutex::new(JoinState::new(nr_peers))), adapter, } } @@ -320,80 +329,99 @@ impl MeshBuilder { self.nr_peers } - fn placeholder(nr_peers: usize) -> SharedChannels { - (0..nr_peers) - .map(|_| { - (0..nr_peers) - .map(|_| (Cell::new(None), Cell::new(None))) - .collect() - }) - .collect() - } + async fn register(&self, role: Role) -> Result<(usize, usize), ()> { + let (is_last, exec_id) = { + let mut state = self.state.lock().unwrap(); - fn register(&self, role: Role) -> Result, ()> { - let mut peers = self.peers.write().unwrap(); + if state.peers.len() == self.nr_peers { + return Err(GlommioError::IoError(Error::other( + "The channel mesh is full.", + ))); + } - if peers.len() == self.nr_peers { - return Err(GlommioError::IoError(Error::new( - ErrorKind::Other, - "The channel mesh is full.", - ))); - } + let exec_id = crate::executor().id(); + let index = state + .peers + .binary_search_by(|n| n.executor_id.cmp(&exec_id)) + .expect_err("Should not join a mesh more than once."); - let index = peers - .binary_search_by(|n| n.executor_id.cmp(&crate::executor().id())) - .expect_err("Should not join a mesh more than once."); + state.peers.insert(index, Peer::new(role)); - if peers.len() == self.nr_peers - 1 { - peers.insert(index, Peer::new(None, role)); + (state.peers.len() == self.nr_peers, exec_id) + }; + + if is_last { + // If this was the last joiner, build the channels + let mut state = self.state.lock().unwrap(); + if !state.ready { + let n = state.peers.len(); + for from_idx in 0..n { + for to_idx in 0..n { + if from_idx == to_idx { + continue; + } - for (idx_from, from) in peers.iter().enumerate() { - for (idx_to, to) in peers.iter().enumerate() { - let channel = &self.channels[idx_from][idx_to]; - if idx_from != idx_to && self.adapter.connect(&from.role, &to.role) { - let (sender, receiver) = shared_channel::new_bounded(self.channel_size); - channel.0.set(Some(sender)); - channel.1.set(Some(receiver)); + let from_role = state.peers[from_idx].role; + let to_role = state.peers[to_idx].role; + + if self.adapter.connect(&from_role, &to_role) { + let (tx, rx) = shared_channel::new_bounded(self.channel_size); + state.senders[from_idx][to_idx] = Some(tx); + state.receivers[from_idx][to_idx] = Some(rx); + } } } + + state.ready = true; } + } else { + // Wait until last joiner finishes building + loop { + let ready = { + let state = self.state.lock().unwrap(); + state.ready + }; - let peers: Vec<_> = peers - .iter_mut() - .filter_map(|notifier| notifier.notifier.take()) - .collect(); + if ready { + break; + } - Ok(RegisterResult::NotificationSenders(peers)) - } else { - let (sender, receiver) = shared_channel::new_bounded(1); - peers.insert(index, Peer::new(Some(sender), role)); - Ok(RegisterResult::NotificationReceiver(receiver)) + futures_lite::future::yield_now().await; + } } + + let state = self.state.lock().unwrap(); + // At this point st.ready == true and st.peers is stable. + + let peer_id = state + .peers + .binary_search_by(|p| p.executor_id.cmp(&exec_id)) + .unwrap(); + + let role_id = state.peers[..peer_id] + .iter() + .filter(|p| p.role == role) + .count(); + + Ok((peer_id, role_id)) } async fn join_with(self, role: Role) -> Result<(Senders, Receivers), ()> { - match Self::register(&self, role)? { - RegisterResult::NotificationReceiver(receiver) => { - receiver.connect().await.recv().await.unwrap(); - } - RegisterResult::NotificationSenders(peers) => { - for peer in peers { - peer.connect().await.send(true).await.unwrap(); - } + let (peer_id, role_id) = self.register(role).await?; + + // Extract Senders and Receivers for this peer + let (mut senders, mut receivers) = { + let mut st = self.state.lock().unwrap(); + + let mut row = Vec::with_capacity(self.nr_peers); + let mut col = Vec::with_capacity(self.nr_peers); + + for i in 0..self.nr_peers { + row.push(st.senders[peer_id][i].take()); + col.push(st.receivers[i][peer_id].take()); } - } - let (peer_id, role_id) = { - let peers = self.peers.read().unwrap(); - let peer_id = peers - .binary_search_by(|n| n.executor_id.cmp(&crate::executor().id())) - .unwrap(); - let role_id = peers - .iter() - .take(peer_id) - .filter(|r| r.role.eq(&role)) - .count(); - (peer_id, role_id) + (row, col) }; let producer_id = if role.is_producer() { @@ -408,32 +436,32 @@ impl MeshBuilder { None }; - let mut senders = Vec::with_capacity(self.nr_peers); - let mut receivers = Vec::with_capacity(self.nr_peers); + let mut senders_vec = Vec::with_capacity(self.nr_peers); + let mut receivers_vec = Vec::with_capacity(self.nr_peers); for i in 0..self.nr_peers { - let sender = self.channels[peer_id][i].0.take(); - let receiver = self.channels[i][peer_id].1.take(); + let sender = senders[i].take(); + let receiver = receivers[i].take(); - let sender = sender.map(|sender| crate::spawn_local(sender.connect()).detach()); - let receiver = receiver.map(|receiver| crate::spawn_local(receiver.connect()).detach()); + let sender = sender.map(|s| crate::spawn_local(s.connect()).detach()); + let receiver = receiver.map(|r| crate::spawn_local(r.connect()).detach()); match sender { None => { if self.adapter.is_full() { - senders.push(None) + senders_vec.push(None) } } - Some(sender) => senders.push(Some(sender.await.unwrap())), + Some(h) => senders_vec.push(Some(h.await.unwrap())), } match receiver { None => { if self.adapter.is_full() { - receivers.push(None) + receivers_vec.push(None) } } - Some(receiver) => receivers.push(Some(receiver.await.unwrap())), + Some(h) => receivers_vec.push(Some(h.await.unwrap())), } } @@ -441,22 +469,17 @@ impl MeshBuilder { Senders { peer_id, producer_id, - senders, + senders: senders_vec, }, Receivers { peer_id, consumer_id, - receivers, + receivers: receivers_vec, }, )) } } -enum RegisterResult { - NotificationReceiver(SharedReceiver), - NotificationSenders(Vec>), -} - #[cfg(test)] mod tests { use futures::future; diff --git a/glommio/src/channels/shared_channel.rs b/glommio/src/channels/shared_channel.rs index eae45d541..8d48a9094 100644 --- a/glommio/src/channels/shared_channel.rs +++ b/glommio/src/channels/shared_channel.rs @@ -36,7 +36,7 @@ type Result = crate::Result; /// [`ConnectedReceiver`]: struct.ConnectedReceiver.html /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html pub struct SharedReceiver { - state: Option>>, + state: Option>>, } /// The `SharedSender` is the sending end of the Shared Channel. @@ -52,7 +52,7 @@ pub struct SharedReceiver { /// [`ConnectedSender`]: struct.ConnectedSender.html /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html pub struct SharedSender { - state: Option>>, + state: Option>>, } impl fmt::Debug for SharedSender { @@ -73,13 +73,10 @@ impl fmt::Debug for SharedReceiver { } } -unsafe impl Send for SharedReceiver {} -unsafe impl Send for SharedSender {} - /// The `ConnectedReceiver` is the receiving end of the Shared Channel. pub struct ConnectedReceiver { id: u64, - state: Rc>, + state: Arc>, reactor: Weak, notifier: Arc, } @@ -87,7 +84,7 @@ pub struct ConnectedReceiver { /// The `ConnectedSender` is the sending end of the Shared Channel. pub struct ConnectedSender { id: u64, - state: Rc>, + state: Arc>, reactor: Weak, notifier: Arc, } @@ -114,18 +111,18 @@ struct ReceiverState { buffer: Consumer, } -struct Connector { +struct Connector { buffer: T, reactor: Weak, } -impl Connector { +impl Connector { fn new(buffer: T, reactor: Weak) -> Self { Self { buffer, reactor } } } -impl Future for Connector { +impl Future for Connector { type Output = Arc; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let reactor = self.reactor.upgrade().unwrap(); @@ -150,10 +147,10 @@ pub fn new_bounded(size: usize) -> (SharedSender, SharedRece let (producer, consumer) = make(size); ( SharedSender { - state: Some(Rc::new(SenderState { buffer: producer })), + state: Some(Arc::new(SenderState { buffer: producer })), }, SharedReceiver { - state: Some(Rc::new(ReceiverState { buffer: consumer })), + state: Some(Arc::new(ReceiverState { buffer: consumer })), }, ) } @@ -176,7 +173,7 @@ impl SharedSender { }})); let reactor = Rc::downgrade(&reactor); - let peer = Connector::new(state.buffer.clone(), reactor.clone()); + let peer = Connector::new(state.buffer.clone_internal(), reactor.clone()); let notifier = peer.await; ConnectedSender { id, @@ -341,7 +338,7 @@ impl SharedReceiver { }})); let reactor = Rc::downgrade(&reactor); - let peer = Connector::new(state.buffer.clone(), reactor.clone()); + let peer = Connector::new(state.buffer.clone_internal(), reactor.clone()); let notifier = peer.await; ConnectedReceiver { id, diff --git a/glommio/src/channels/spsc_queue.rs b/glommio/src/channels/spsc_queue.rs index ca46df7c1..70629a389 100644 --- a/glommio/src/channels/spsc_queue.rs +++ b/glommio/src/channels/spsc_queue.rs @@ -1,9 +1,8 @@ use std::{ - cell::{Cell, UnsafeCell}, + cell::UnsafeCell, fmt, marker::PhantomData, - mem::{self, MaybeUninit}, - slice::from_raw_parts_mut, + mem::MaybeUninit, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, @@ -15,7 +14,7 @@ use std::{ struct ProducerCacheline { /// Index position of current tail tail: AtomicUsize, - limit: Cell, + limit: AtomicUsize, /// Id == 0 : never connected /// Id == usize::MAX: disconnected consumer_id: AtomicUsize, @@ -37,6 +36,8 @@ struct Slot { has_value: AtomicBool, } +unsafe impl Sync for Slot {} + /// The internal memory buffer used by the queue. /// /// `Buffer` holds a pointer to allocated memory which represents the bounded @@ -44,7 +45,7 @@ struct Slot { /// consumer use to track location in the ring. #[repr(C)] pub(crate) struct Buffer { - buffer_storage: *mut Slot, + buffer_storage: Box<[Slot]>, capacity: usize, mask: usize, lookahead: usize, @@ -59,7 +60,7 @@ impl fmt::Debug for Buffer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let head = self.ccache.head.load(Ordering::Relaxed); let tail = self.pcache.tail.load(Ordering::Relaxed); - let limit = self.pcache.limit.get(); + let limit = self.pcache.limit.load(Ordering::Relaxed); let id_to_str = |id| match id { 0 => "not connected".into(), usize::MAX => "disconnected".into(), @@ -79,29 +80,80 @@ impl fmt::Debug for Buffer { .finish() } } - unsafe impl Sync for Buffer {} -/// A handle to the queue which allows consuming values from the buffer +/// A handle to the queue which allows consuming values from the buffer. +/// +/// # SPSC Guarantee +/// +/// This type does NOT implement `Clone`. The queue is designed as a +/// Single-Producer-Single-Consumer (SPSC) queue, and allowing multiple +/// consumers would violate memory safety guarantees. +/// +/// Cloning the consumer would allow multiple threads to concurrently call +/// `try_pop()`, which uses `Ordering::Relaxed` and assumes exclusive access. +/// This can lead to: +/// - Multiple consumers reading the same value from the queue +/// - Double-free when both consumers drop the same value +/// - Heap corruption and undefined behavior +/// +/// See issue #700 for details. pub struct Consumer { pub(crate) buffer: Arc>, } -impl Clone for Consumer { - fn clone(&self) -> Self { +impl Consumer { + /// Internal cloning method for use within glommio. + /// + /// # Safety + /// + /// This is intentionally NOT exposed via the Clone trait to prevent + /// external code from creating multiple consumers, which would violate + /// SPSC guarantees and cause memory corruption. + /// + /// Internal use within glommio (e.g., shared_channel) must ensure that + /// only one consumer is actually active at any time, even if multiple + /// Consumer handles exist temporarily during handoff. + pub(crate) fn clone_internal(&self) -> Self { Consumer { buffer: self.buffer.clone(), } } } -/// A handle to the queue which allows adding values onto the buffer +/// A handle to the queue which allows adding values onto the buffer. +/// +/// # SPSC Guarantee +/// +/// This type does NOT implement `Clone`. The queue is designed as a +/// Single-Producer-Single-Consumer (SPSC) queue, and allowing multiple +/// producers would violate memory safety guarantees. +/// +/// Cloning the producer would allow multiple threads to concurrently call +/// `try_push()`, which uses `Ordering::Relaxed` and assumes exclusive access. +/// This can lead to: +/// - Multiple producers writing to the same slot +/// - Values being overwritten without being dropped (memory leak) +/// - Lost updates and data corruption +/// +/// See issue #700 for details. pub struct Producer { pub(crate) buffer: Arc>, } -impl Clone for Producer { - fn clone(&self) -> Self { +impl Producer { + /// Internal cloning method for use within glommio. + /// + /// # Safety + /// + /// This is intentionally NOT exposed via the Clone trait to prevent + /// external code from creating multiple producers, which would violate + /// SPSC guarantees and cause memory corruption. + /// + /// Internal use within glommio (e.g., shared_channel) must ensure that + /// only one producer is actually active at any time, even if multiple + /// Producer handles exist temporarily during handoff. + pub(crate) fn clone_internal(&self) -> Self { Producer { buffer: self.buffer.clone(), } @@ -120,9 +172,6 @@ impl fmt::Debug for Producer { } } -unsafe impl Send for Consumer {} -unsafe impl Send for Producer {} - impl Buffer { /// Attempt to pop a value off the buffer. /// @@ -132,28 +181,18 @@ impl Buffer { /// else, etc.) fn try_pop(&self) -> Option { let head = self.ccache.head.load(Ordering::Relaxed); - let slot = unsafe { &*self.buffer_storage.add(head & self.mask) }; + let slot = &self.buffer_storage[head & self.mask]; if !slot.has_value.load(Ordering::Acquire) { return None; } let v = Some(unsafe { slot.value.get().read().assume_init() }); slot.has_value.store(false, Ordering::Release); - self.ccache.head.store(head + 1, Ordering::Relaxed); + self.ccache + .head + .store(head.wrapping_add(1), Ordering::Relaxed); v } - fn has_space(&self, tail: usize) -> bool { - let index = (tail + self.lookahead) & self.mask; - let slot = unsafe { &*self.buffer_storage.add(index) }; - if !slot.has_value.load(Ordering::Acquire) { - self.pcache.limit.set(tail + self.lookahead + 1); - true - } else { - let slot = unsafe { &*self.buffer_storage.add(tail & self.mask) }; - !slot.has_value.load(Ordering::Acquire) - } - } - /// Attempt to push a value onto the buffer. /// /// If the buffer is full, this method will not block. Instead, it will @@ -164,17 +203,34 @@ impl Buffer { if self.consumer_disconnected() { return Some(v); } + let tail = self.pcache.tail.load(Ordering::Relaxed); - if tail >= self.pcache.limit.get() && !self.has_space(tail) { - return Some(v); + let limit = self.pcache.limit.load(Ordering::Relaxed); + + if tail == limit { + let idx = tail.wrapping_add(self.lookahead); + let slot = &self.buffer_storage[idx & self.mask]; + if !slot.has_value.load(Ordering::Acquire) { + self.pcache.limit.store(idx, Ordering::Relaxed); + } else { + let slot: &Slot = &self.buffer_storage[tail & self.mask]; + if slot.has_value.load(Ordering::Acquire) { + return Some(v); + } + self.pcache + .limit + .store(tail.wrapping_add(1), Ordering::Relaxed); + } } - let slot = unsafe { - let slot = &*self.buffer_storage.add(tail & self.mask); + + let slot = &self.buffer_storage[tail & self.mask]; + unsafe { slot.value.get().write(MaybeUninit::new(v)); - slot - }; + } slot.has_value.store(true, Ordering::Release); - self.pcache.tail.store(tail + 1, Ordering::Relaxed); + self.pcache + .tail + .store(tail.wrapping_add(1), Ordering::Relaxed); None } @@ -221,21 +277,12 @@ impl Drop for Buffer { // Pop the rest of the values off the queue. By moving them into this scope, // we implicitly call their destructor while self.try_pop().is_some() {} - // We don't want to run any destructors here, because we didn't run - // any of the constructors through the vector. And whatever object was - // in fact still alive we popped above. - let _drop = unsafe { - // Nightly clippy warns about this but ptr::from_raw_parts_mut isn't stable yet. - #[allow(clippy::cast_slice_from_raw_parts)] - let ptr = from_raw_parts_mut(self.buffer_storage, self.capacity) as *mut [Slot]; - Box::from_raw(ptr) - }; } } /// Creates a new `spsc_queue` returning its producer and consumer /// endpoints. -pub fn make(capacity: usize) -> (Producer, Consumer) { +pub fn make(capacity: usize) -> (Producer, Consumer) { inner_make(capacity, 0) } @@ -248,10 +295,10 @@ fn inner_make(capacity: usize, initial_value: usize) -> (Producer, Consume buffer_storage, capacity, mask: capacity - 1, - lookahead: std::cmp::min(capacity / 4, MAX_LOOKAHEAD), + lookahead: (capacity / 4).clamp(1, MAX_LOOKAHEAD), pcache: ProducerCacheline { tail: AtomicUsize::new(initial_value), - limit: Cell::new(0), + limit: AtomicUsize::new(initial_value), consumer_id: AtomicUsize::new(0), }, ccache: ConsumerCacheline { @@ -268,16 +315,14 @@ fn inner_make(capacity: usize, initial_value: usize) -> (Producer, Consume ) } -fn allocate_buffer(capacity: usize) -> *mut Slot { - let mut boxed: Box<[Slot]> = (0..capacity) - .map(|_| Slot { - has_value: AtomicBool::new(false), - value: UnsafeCell::new(MaybeUninit::uninit()), - }) - .collect(); - let ptr = boxed.as_mut_ptr(); - mem::forget(boxed); - ptr +fn allocate_buffer(capacity: usize) -> Box<[Slot]> { + std::iter::repeat_with(|| Slot { + value: UnsafeCell::new(MaybeUninit::uninit()), + has_value: AtomicBool::new(false), + }) + .take(capacity) + .collect::>() + .into_boxed_slice() } pub(crate) trait BufferHalf { @@ -469,7 +514,6 @@ mod tests { } } - #[should_panic] #[test] fn test_wrap() { let (p, c) = super::inner_make(10, usize::MAX - 1); @@ -482,4 +526,11 @@ mod tests { assert_eq!(c.try_pop(), Some(i)); } } + + fn assert_send_sync() {} + #[test] + fn producer_consumer_are_send() { + assert_send_sync::>(); + assert_send_sync::>(); + } } diff --git a/glommio/src/controllers/deadline_queue.rs b/glommio/src/controllers/deadline_queue.rs index 5d623ccaf..ae7156f7e 100644 --- a/glommio/src/controllers/deadline_queue.rs +++ b/glommio/src/controllers/deadline_queue.rs @@ -496,9 +496,10 @@ impl DeadlineQueue { pub async fn push_work(&self, source: Rc>) -> io::Result { self.queue.admit(source.clone())?; self.sender.send(source.clone()).await?; - self.responder.recv().await.ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "no response from response channel") - }) + self.responder + .recv() + .await + .ok_or_else(|| io::Error::other("no response from response channel")) } /// Returns the TaskQueueHandle associated with this controller diff --git a/glommio/src/error.rs b/glommio/src/error.rs index d3da4193f..7cd66e0a8 100644 --- a/glommio/src/error.rs +++ b/glommio/src/error.rs @@ -494,7 +494,7 @@ impl From> for io::Error { GlommioError::ExecutorError(ExecutorErrorKind::QueueError { index, kind }) => { match kind { QueueErrorKind::StillActive => { - io::Error::new(io::ErrorKind::Other, format!("Queue #{index} still active")) + io::Error::other(format!("Queue #{index} still active")) } QueueErrorKind::NotFound => { io::Error::new(io::ErrorKind::NotFound, format!("Queue #{index} not found")) @@ -508,10 +508,9 @@ impl From> for io::Error { GlommioError::BuilderError(BuilderErrorKind::NonExistentCpus { .. }) | GlommioError::BuilderError(BuilderErrorKind::InsufficientCpus { .. }) | GlommioError::BuilderError(BuilderErrorKind::NrShards { .. }) - | GlommioError::BuilderError(BuilderErrorKind::ThreadPanic(_)) => io::Error::new( - io::ErrorKind::Other, - format!("Executor builder error: {display_err}"), - ), + | GlommioError::BuilderError(BuilderErrorKind::ThreadPanic(_)) => { + io::Error::other(format!("Executor builder error: {display_err}")) + } GlommioError::EnhancedIoError { source, .. } => { io::Error::new(source.kind(), display_err) } @@ -521,7 +520,7 @@ impl From> for io::Error { format!("IncorrectSourceType {x:?}"), ), ReactorErrorKind::MemLockLimit(a, b) => { - io::Error::new(io::ErrorKind::Other, format!("MemLockLimit({a:?}/{b:?})")) + io::Error::other(format!("MemLockLimit({a:?}/{b:?})")) } }, GlommioError::TimedOut(dur) => { @@ -647,8 +646,7 @@ mod test { #[test] fn composite_error_from_into() { - let err: GlommioError<()> = - io::Error::new(io::ErrorKind::Other, "test other io-error").into(); + let err: GlommioError<()> = io::Error::other("test other io-error").into(); let _: io::Error = err.into(); let err: GlommioError<()> = diff --git a/glommio/src/executor/mod.rs b/glommio/src/executor/mod.rs index 5cc2d67ee..5fab92ae8 100644 --- a/glommio/src/executor/mod.rs +++ b/glommio/src/executor/mod.rs @@ -177,7 +177,7 @@ impl Ord for TaskQueue { impl PartialOrd for TaskQueue { fn partial_cmp(&self, other: &Self) -> Option { - Some(other.vruntime.cmp(&self.vruntime)) + Some(self.cmp(other)) } } @@ -1025,7 +1025,7 @@ impl LocalExecutorPoolBuilder { } else { // this `Err` isn't visible to the user; the pool builder directly returns an // `Err` from the `std::thread::Builder` - Err(io::Error::new(io::ErrorKind::Other, "spawn failed").into()) + Err(io::Error::other("spawn failed").into()) } } }); diff --git a/glommio/src/executor/placement/pq_tree.rs b/glommio/src/executor/placement/pq_tree.rs index 3c5e58835..db0c75f2d 100644 --- a/glommio/src/executor/placement/pq_tree.rs +++ b/glommio/src/executor/placement/pq_tree.rs @@ -78,9 +78,9 @@ where fn priority_pack(&self, other: &Self) -> Ordering { // if a node is partially saturated (i.e. currently being filled by the packing // iterator), then it takes priority - if self.nr_slots_selected % self.nr_slots != 0 { + if !self.nr_slots_selected.is_multiple_of(self.nr_slots) { Ordering::Greater - } else if other.nr_slots_selected % other.nr_slots != 0 { + } else if !other.nr_slots_selected.is_multiple_of(other.nr_slots) { Ordering::Less } else { // if the node is not partially saturated, then it is either equally saturated diff --git a/glommio/src/free_list.rs b/glommio/src/free_list.rs index 73bd8b11e..3a657365c 100644 --- a/glommio/src/free_list.rs +++ b/glommio/src/free_list.rs @@ -72,7 +72,7 @@ impl FreeList { } pub(crate) fn dealloc(&mut self, idx: Idx) -> T { let slot = Slot::Free { - next_free: mem::replace(&mut self.first_free, Some(idx)), + next_free: Option::replace(&mut self.first_free, idx), }; match mem::replace(&mut self.slots[idx.to_raw()], slot) { Slot::Full { item } => item, diff --git a/glommio/src/io/bulk_io.rs b/glommio/src/io/bulk_io.rs index e86018dde..61b1f8ee1 100644 --- a/glommio/src/io/bulk_io.rs +++ b/glommio/src/io/bulk_io.rs @@ -13,7 +13,7 @@ use std::{ }; /// Set a limit to the size of merged IO requests. -#[derive(Debug)] +#[derive(Debug, Default)] pub enum MergedBufferLimit { /// Disables request coalescing NoMerging, @@ -21,6 +21,7 @@ pub enum MergedBufferLimit { /// Sets the limit to the maximum the kernel allows for the underlying /// device without breaking down the request into smaller ones /// (/sys/block/.../queue/max_sectors_kb) + #[default] DeviceMaxSingleRequest, /// Sets a custom limit. @@ -29,15 +30,9 @@ pub enum MergedBufferLimit { Custom(usize), } -impl Default for MergedBufferLimit { - fn default() -> Self { - Self::DeviceMaxSingleRequest - } -} - /// Set a limit to the amount of read amplification in-between two mergeable IO /// requests. -#[derive(Debug)] +#[derive(Debug, Default)] pub enum ReadAmplificationLimit { /// Deny read amplification. /// @@ -46,6 +41,7 @@ pub enum ReadAmplificationLimit { /// size. For instance, if the minimum IO size is 4KiB and the user reads /// [0..256] and [2048..2560] then the two will be merged into [0..4096] to /// accommodate the 4KiB minimum IO size. + #[default] NoAmplification, /// Merge two consecutive IO requests if the read amplification is below a @@ -59,12 +55,6 @@ pub enum ReadAmplificationLimit { NoLimit, } -impl Default for ReadAmplificationLimit { - fn default() -> Self { - Self::NoAmplification - } -} - /// An interface to an IO vector. pub trait IoVec { /// The read position (the offset) in the file diff --git a/glommio/src/io/directory.rs b/glommio/src/io/directory.rs index 1cec4987c..dd9cb1577 100644 --- a/glommio/src/io/directory.rs +++ b/glommio/src/io/directory.rs @@ -118,7 +118,6 @@ impl Directory { pub fn sync_read_dir(&self) -> Result { let path = self.file.path_required("read directory")?; enhanced_try!(std::fs::read_dir(&*path), "Reading a directory", self.file) - .map_err(Into::into) } /// Issues fdatasync into the underlying file. diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index d0d48c0eb..eecf32b0b 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -21,7 +21,10 @@ use nix::sys::statfs::*; use std::{ cell::Ref, io, - os::unix::io::{AsRawFd, RawFd}, + os::{ + fd::BorrowedFd, + unix::io::{AsFd, AsRawFd, RawFd}, + }, path::Path, rc::Rc, sync::{Arc, Weak as AWeak}, @@ -60,8 +63,8 @@ pub struct AdvisoryLockGuard(Option>); impl Drop for AdvisoryLockGuard { fn drop(&mut self) { - if let Some(mut locked) = self.0.take().and_then(Arc::into_inner) { - unsafe { locked.funlock_immediately() } + if let Some(locked) = self.0.take().and_then(Arc::into_inner) { + locked.funlock_immediately().expect("Cannopt unlock fd") } } } @@ -159,6 +162,12 @@ impl AsRawFd for DmaFile { } } +impl AsFd for DmaFile { + fn as_fd(&self) -> BorrowedFd<'_> { + self.file.as_fd() + } +} + impl DmaFile { /// Returns true if the DmaFiles represent the same file on the underlying /// device. @@ -213,16 +222,14 @@ impl DmaFile { let pollable = if (fstype.0 as u64) == (libc::TMPFS_MAGIC as u64) { PollableStatus::NonPollable(DirectIo::Disabled) } else { - // Allow this to work on non direct I/O devices, but only if this is in-memory sys::direct_io_ify(file.as_raw_fd(), flags)?; - let reactor = file.reactor.upgrade().unwrap(); - if reactor - .probe_iopoll_support(file.as_raw_fd(), o_direct_alignment, major, minor, path) - .await + if !sysfs::BlockDevice::has_io_poll(major, minor) + || sysfs::BlockDevice::is_rotational(major, minor) + || sysfs::BlockDevice::memory_device(major, minor) { - PollableStatus::Pollable - } else { PollableStatus::NonPollable(DirectIo::Enabled) + } else { + PollableStatus::Pollable } }; @@ -247,7 +254,7 @@ impl DmaFile { | (opts.custom_flags as libc::c_int & !libc::O_ACCMODE); let res = DmaFile::open_at(dir, path, flags, opts.mode).await; - Ok(enhanced_try!(res, opdesc, Some(path), None)?) + Ok(enhanced_try!(res, opdesc, Some(path.to_path_buf()), None)?) } pub(super) fn attach_scheduler(&self) { @@ -382,7 +389,7 @@ impl DmaFile { pos, self.pollable, ); - enhanced_try!(source.collect_rw().await, "Writing", self.file).map_err(Into::into) + enhanced_try!(source.collect_rw().await, "Writing", self.file) } /// Equivalent to [`DmaFile::write_at`] except that the caller retains @@ -442,7 +449,7 @@ impl DmaFile { pos, self.pollable, ); - enhanced_try!(source.collect_rw().await, "Writing", self.file).map_err(Into::into) + enhanced_try!(source.collect_rw().await, "Writing", self.file) } /// Reads from a specific position in the file and returns the buffer. @@ -771,7 +778,7 @@ impl DmaFile { /// NOTE: Clones are allowed to exist on any thread and all share the same underlying /// fd safely. try_take_last_clone is also safe to invoke from any thread and will /// behave correctly with respect to clones on other threads. - pub fn try_take_last_clone(mut self) -> std::result::Result { + pub fn try_take_last_clone(mut self) -> std::result::Result> { match self.file.try_take_last_clone() { Ok(took) => { self.file = took; @@ -779,7 +786,7 @@ impl DmaFile { } Err(still_cloned) => { self.file = still_cloned; - Err(self) + Err(Box::new(self)) } } } @@ -955,8 +962,6 @@ impl DmaFile { /// .join() /// .unwrap(); /// -/// assert_eq!(nix::fcntl::fcntl(original_fd, nix::fcntl::FcntlArg::F_GETFD), Err(nix::errno::Errno::EBADF)); -/// /// let file: DmaFile = result.into(); /// assert_ne!(file.as_raw_fd(), original_fd); /// assert_eq!(file.inode(), original_inode); @@ -995,7 +1000,7 @@ impl OwnedDmaFile { file: enhanced_try!( self.file.dup(), "Duplicating", - self.file.path.as_ref(), + self.file.path.clone(), self.file.fd.as_ref().map(|fd| fd.as_raw_fd()) )?, o_direct_alignment: self.o_direct_alignment, diff --git a/glommio/src/io/glommio_file.rs b/glommio/src/io/glommio_file.rs index 6368ce5a4..00962ba2b 100644 --- a/glommio/src/io/glommio_file.rs +++ b/glommio/src/io/glommio_file.rs @@ -15,7 +15,10 @@ use std::{ cell::{Ref, RefCell}, convert::TryInto, io, - os::unix::io::{AsRawFd, FromRawFd, RawFd}, + os::{ + fd::{BorrowedFd, IntoRawFd, OwnedFd}, + unix::io::{AsFd, AsRawFd, FromRawFd, RawFd}, + }, path::{Path, PathBuf}, rc::{Rc, Weak}, sync::{Arc, Weak as AWeak}, @@ -141,7 +144,7 @@ impl AdvisoryLockStateHolder { /// non-Buffered files #[derive(Debug, Clone)] pub(crate) struct GlommioFile { - pub(crate) file: Option>, + pub(crate) file: Option>, pub(crate) lock_state: Option>, // A file can appear in many paths, through renaming and linking. // If we do that, each path should have its own object. This is to @@ -164,10 +167,10 @@ That means that while the file is already out of scope, the file descriptor is s This is likely fine, but in extreme situations can lead to resource exhaustion. An explicit \ asynchronous close is still preferred", self.path.borrow(), - file + file.as_raw_fd() ); if let Some(r) = self.reactor.upgrade() { - r.sys.async_close(file); + r.sys.async_close(file.into_raw_fd()); } } } @@ -182,7 +185,7 @@ impl AsRawFd for GlommioFile { impl FromRawFd for GlommioFile { unsafe fn from_raw_fd(fd: RawFd) -> Self { GlommioFile { - file: Some(Arc::new(fd)), + file: Some(Arc::new(OwnedFd::from_raw_fd(fd))), lock_state: Some(Arc::new(AdvisoryLockStateHolder::default())), path: RefCell::new(None), inode: 0, @@ -194,6 +197,12 @@ impl FromRawFd for GlommioFile { } } +impl AsFd for GlommioFile { + fn as_fd(&self) -> BorrowedFd<'_> { + self.file.as_ref().unwrap().as_fd() + } +} + impl GlommioFile { pub(crate) async fn open_at( dir: RawFd, @@ -214,7 +223,7 @@ impl GlommioFile { let fd = source.collect_rw().await?; let mut file = GlommioFile { - file: Some(Arc::new(fd as _)), + file: Some(Arc::new(unsafe { OwnedFd::from_raw_fd(fd as _) })), lock_state: Some(Arc::new(Default::default())), path: RefCell::new(Some(path)), inode: 0, @@ -234,10 +243,14 @@ impl GlommioFile { pub(crate) fn dup(&self) -> io::Result { let reactor = crate::executor().reactor(); + let file = self + .file + .as_ref() + .expect("dup called on closed file") + .try_clone()?; + let duped = Self { - file: Some(Arc::new(nix::unistd::dup( - self.file.as_ref().unwrap().as_raw_fd(), - )?)), + file: Some(Arc::new(file)), lock_state: self.lock_state.clone(), path: self.path.clone(), inode: self.inode, @@ -331,7 +344,14 @@ impl GlommioFile { // exists), then we can't safely unlock yet anyway. self.lock_state.take().unwrap(); - (Arc::into_inner(self.file.take().unwrap()), self.path.take()) + let file = self.file.take().unwrap(); + let owned = Arc::into_inner(file); + + if let Some(fd) = owned { + (Some(fd.into_raw_fd()), self.path.take()) + } else { + (None, self.path.take()) + } } pub(crate) async fn close(self) -> Result<()> { @@ -581,7 +601,7 @@ impl GlommioFile { /// This lets you open a DmaFile on one thread and then send it safely to another thread for processing. #[derive(Debug, Clone)] pub(crate) struct OwnedGlommioFile { - pub(crate) fd: Option>, + pub(crate) fd: Option>, pub(crate) lock_state: Option>, pub(crate) path: Option, pub(crate) inode: u64, @@ -592,7 +612,7 @@ pub(crate) struct OwnedGlommioFile { impl OwnedGlommioFile { pub(crate) fn dup(&self) -> io::Result { let fd = match self.fd.as_ref() { - Some(fd) => Some(Arc::new(nix::unistd::dup(fd.as_raw_fd())?)), + Some(fd) => Some(Arc::new(fd.try_clone()?)), None => None, }; @@ -624,21 +644,27 @@ impl OwnedGlommioFile { } } - pub(crate) unsafe fn funlock_immediately(&mut self) { - let advisory_lock = self.lock_state.take().unwrap(); + pub(crate) fn funlock_immediately(&self) -> Result<()> { + let advisory_lock = self.lock_state.as_ref().unwrap(); assert_eq!( advisory_lock.state(), AdvisoryLockState::Locked, "Not holding the lock!" ); - nix::fcntl::flock( - self.fd.as_ref().unwrap().as_raw_fd(), - nix::fcntl::FlockArg::Unlock, - ) - .unwrap(); + let res = unsafe { libc::flock(self.as_raw_fd(), libc::LOCK_UN) }; + if res == -1 { + return Err(GlommioError::create_enhanced( + std::io::Error::last_os_error(), + "funlock", + self.path.clone(), + Some(self.as_raw_fd()), + )); + } advisory_lock.unlock(); + + Ok(()) } } @@ -688,7 +714,7 @@ impl From for OwnedGlommioFile { #[derive(Default, Debug, Clone)] pub(crate) struct WeakGlommioFile { - pub(crate) fd: AWeak, + pub(crate) fd: AWeak, pub(crate) lock_state: AWeak, pub(crate) path: Option, pub(crate) inode: u64, @@ -750,10 +776,10 @@ pub(crate) mod test { files }; - assert!(file_list().iter().any(|x| *x == gf_fd)); // sanity check that file is open + assert!(file_list().contains(&gf_fd)); // sanity check that file is open let _ = { gf }; // moves scope and drops sleep(Duration::from_millis(10)).await; // forces the reactor to run, which will drop the file - assert!(!file_list().iter().any(|x| *x == gf_fd)); // file is gone + assert!(!file_list().contains(&gf_fd)); // file is gone }); } } diff --git a/glommio/src/io/sched.rs b/glommio/src/io/sched.rs index e0bbbea6e..657eb6279 100644 --- a/glommio/src/io/sched.rs +++ b/glommio/src/io/sched.rs @@ -71,7 +71,7 @@ struct FileSchedulerInner { sources: RefCell>, } -intrusive_adapter!(FileSchedulerAdapter = Rc: FileSchedulerInner { link: RBTreeLink }); +intrusive_adapter!(FileSchedulerAdapter = Rc: FileSchedulerInner { link => RBTreeLink }); impl<'a> KeyAdapter<'a> for FileSchedulerAdapter { type Key = Identity; fn get_key(&self, s: &'a FileSchedulerInner) -> Self::Key { @@ -177,7 +177,7 @@ struct ScheduledSourceInner { data_range: Range, } -intrusive_adapter!(ScheduledSourceAdapter = Rc: ScheduledSourceInner { link: RBTreeLink }); +intrusive_adapter!(ScheduledSourceAdapter = Rc: ScheduledSourceInner { link => RBTreeLink }); impl<'a> KeyAdapter<'a> for ScheduledSourceAdapter { type Key = u64; fn get_key(&self, s: &'a ScheduledSourceInner) -> Self::Key { diff --git a/glommio/src/iou/probe.rs b/glommio/src/iou/probe.rs index 450c04d96..14d2ac747 100644 --- a/glommio/src/iou/probe.rs +++ b/glommio/src/iou/probe.rs @@ -34,6 +34,6 @@ impl Probe { impl Drop for Probe { fn drop(&mut self) { - unsafe { libc::free(self.probe.as_ptr() as *mut _) } + unsafe { uring_sys::io_uring_free_probe(self.probe.as_ptr()) } } } diff --git a/glommio/src/iou/sqe.rs b/glommio/src/iou/sqe.rs index 97731b628..cebefd29e 100644 --- a/glommio/src/iou/sqe.rs +++ b/glommio/src/iou/sqe.rs @@ -349,7 +349,7 @@ impl<'a> SQE<'a> { #[inline] pub unsafe fn prep_poll_remove(&mut self, user_data: u64) { - uring_sys::io_uring_prep_poll_remove(self.sqe, user_data as _) + uring_sys::io_uring_prep_poll_remove(self.sqe, user_data) } #[inline] diff --git a/glommio/src/net/mod.rs b/glommio/src/net/mod.rs index 47b1f881a..ba539914f 100644 --- a/glommio/src/net/mod.rs +++ b/glommio/src/net/mod.rs @@ -6,9 +6,15 @@ //! This module provides glommio's networking support. use crate::sys; use nix::sys::socket::{MsgFlags, SockaddrLike}; -use std::{io, os::unix::io::RawFd}; +use std::{ + io, + os::{ + fd::AsRawFd, + unix::io::{BorrowedFd, RawFd}, + }, +}; -fn yolo_accept(fd: RawFd) -> Option> { +fn yolo_accept(fd: BorrowedFd<'_>) -> Option> { let flags = nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap()).unwrap(); nix::fcntl::fcntl( @@ -16,7 +22,7 @@ fn yolo_accept(fd: RawFd) -> Option> { nix::fcntl::F_SETFL(flags | nix::fcntl::OFlag::O_NONBLOCK), ) .unwrap(); - let r = sys::accept_syscall(fd); + let r = sys::accept_syscall(fd.as_raw_fd()); nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags)).unwrap(); match r { Ok(x) => Some(Ok(x)), diff --git a/glommio/src/net/tcp_socket.rs b/glommio/src/net/tcp_socket.rs index 37cc2b1da..96db5ca1c 100644 --- a/glommio/src/net/tcp_socket.rs +++ b/glommio/src/net/tcp_socket.rs @@ -26,7 +26,10 @@ use std::{ cell::RefCell, io, net::{self, Shutdown, SocketAddr, ToSocketAddrs}, - os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, + os::{ + fd::AsFd, + unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, + }, pin::Pin, rc::{Rc, Weak}, task::{Context, Poll}, @@ -120,7 +123,7 @@ impl TcpListener { .to_socket_addrs() .unwrap() .next() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; + .ok_or_else(|| io::Error::other("empty address"))?; let domain = if addr.is_ipv6() { Domain::IPV6 @@ -189,8 +192,8 @@ impl TcpListener { Some(source) => poll_source(source), None => { let reactor = self.reactor.upgrade().unwrap(); - let raw_fd = self.listener.as_raw_fd(); - match yolo_accept(raw_fd) { + let fd = self.listener.as_fd(); + match yolo_accept(fd) { Some(r) => match r { Ok(fd) => Poll::Ready(Ok(AcceptedTcpStream { fd })), Err(err) => Poll::Ready(Err(GlommioError::IoError(err))), diff --git a/glommio/src/net/udp_socket.rs b/glommio/src/net/udp_socket.rs index 175839f2c..8d78b9360 100644 --- a/glommio/src/net/udp_socket.rs +++ b/glommio/src/net/udp_socket.rs @@ -46,7 +46,7 @@ fn sockaddr_storage_to_std(addr: SockaddrStorage) -> Option { match addr.family() { Some(nix::sys::socket::AddressFamily::Inet) => addr .as_sockaddr_in() - .map(|x| SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(x.ip()), x.port()))), + .map(|x| SocketAddr::V4(SocketAddrV4::new(x.ip(), x.port()))), Some(nix::sys::socket::AddressFamily::Inet6) => addr.as_sockaddr_in6().map(|x| { SocketAddr::V6(SocketAddrV6::new( x.ip(), @@ -84,7 +84,7 @@ impl UdpSocket { .to_socket_addrs() .unwrap() .next() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; + .ok_or_else(|| io::Error::other("empty address"))?; let domain = if addr.is_ipv6() { Domain::IPV6 @@ -136,7 +136,7 @@ impl UdpSocket { /// [`recv`]: UdpSocket::recv pub async fn connect(&self, addr: A) -> Result<()> { let iter = addr.to_socket_addrs()?; - let mut err = io::Error::new(io::ErrorKind::Other, "No Valid addresses"); + let mut err = io::Error::other("No Valid addresses"); for addr in iter { let reactor = self.socket.reactor.upgrade().unwrap(); let source = reactor.connect(self.socket.as_raw_fd(), SockaddrStorage::from(addr)); @@ -628,7 +628,7 @@ impl UdpSocket { .to_socket_addrs() .unwrap() .next() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?; + .ok_or_else(|| io::Error::other("empty address"))?; let sockaddr = SockaddrStorage::from(addr); self.socket.send_to(buf, sockaddr).await.map_err(Into::into) diff --git a/glommio/src/net/unix.rs b/glommio/src/net/unix.rs index 28d93a62f..339cd7826 100644 --- a/glommio/src/net/unix.rs +++ b/glommio/src/net/unix.rs @@ -329,8 +329,7 @@ impl UnixStream { let reactor = crate::executor().reactor(); let socket = Socket::new(Domain::UNIX, Type::STREAM, None)?; - let addr = - UnixAddr::new(addr.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let addr = UnixAddr::new(addr.as_ref()).map_err(io::Error::other)?; let source = reactor.connect(socket.as_raw_fd(), addr); source.collect_rw().await?; @@ -536,8 +535,7 @@ impl UnixDatagram { /// [`send`]: UnixDatagram::send /// [`recv`]: UnixDatagram::recv pub async fn connect>(&self, addr: A) -> Result<()> { - let addr = - UnixAddr::new(addr.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let addr = UnixAddr::new(addr.as_ref()).map_err(io::Error::other)?; let reactor = self.socket.reactor.upgrade().unwrap(); let source = reactor.connect(self.socket.as_raw_fd(), addr); @@ -684,8 +682,7 @@ impl UnixDatagram { /// }) /// ``` pub async fn send_to>(&self, buf: &[u8], addr: A) -> Result { - let addr = nix::sys::socket::UnixAddr::new(addr.as_ref()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let addr = nix::sys::socket::UnixAddr::new(addr.as_ref()).map_err(io::Error::other)?; self.socket.send_to(buf, addr).await.map_err(Into::into) } diff --git a/glommio/src/reactor.rs b/glommio/src/reactor.rs index 25ba89ad5..e9484b7d2 100644 --- a/glommio/src/reactor.rs +++ b/glommio/src/reactor.rs @@ -24,7 +24,6 @@ use std::{ }; use ahash::AHashMap; -use log::error; use nix::sys::socket::{MsgFlags, SockaddrLike, SockaddrStorage}; use smallvec::SmallVec; @@ -32,7 +31,7 @@ use crate::{ io::{FileScheduler, IoScheduler, ScheduledSource}, iou::sqe::SockAddrStorage, sys::{ - self, blocking::BlockingThreadPool, common_flags, read_flags, sysfs, DirectIo, DmaBuffer, + self, blocking::BlockingThreadPool, common_flags, read_flags, DirectIo, DmaBuffer, DmaSource, IoBuffer, PollableStatus, SleepNotifier, Source, SourceType, StatsCollection, Statx, }, @@ -236,52 +235,6 @@ impl Reactor { self.io_scheduler.inform_requirements(req); } - pub(crate) async fn probe_iopoll_support( - &self, - raw: RawFd, - alignment: u64, - major: usize, - minor: usize, - path: &Path, - ) -> bool { - match sysfs::BlockDevice::iopoll(major, minor) { - None => { - // This routine exercises the iopoll code path in the kernel and asserts that we - // can successfully read something. If we can't and receive `ENOTSUP` then - // the kernel doesn't support iopoll for this device. - // - // In a perfect world, we would issue a read of size 0 because we are not - // interested in any data, just to check whether we _could_. Unfortunately it - // seems we are not allowed to have nice things, and the kernel can - // short-circuit its internal logic and return an early - // "success" for reads of size 0. We are therefore forced to do - // an actual read. - - let source = - self.new_source(raw, SourceType::Read(PollableStatus::Pollable, None), None); - self.sys.read_dma(&source, 0, alignment as usize); - let iopoll = if let Err(err) = source.collect_rw().await { - if let Some(libc::ENOTSUP) = err.raw_os_error() { - false - } else { - // The IO requests failed, but not because the poll ring doesn't work. - error!( - "got unexpected error when probing iopoll support for file {path:?} \ - (fd: {raw}) hosted on ({major}, {minor}); the poll ring will be \ - disabled for this device: {err}" - ); - false - } - } else { - true - }; - sysfs::BlockDevice::set_iopoll_support(major, minor, iopoll); - iopoll - } - Some(iopoll) => iopoll, - } - } - pub(crate) fn register_shared_channel(&self, test_function: Box) -> u64 where F: Fn() -> usize + 'static, diff --git a/glommio/src/sys/hardware_topology.rs b/glommio/src/sys/hardware_topology.rs index 903fe0bb5..d096092e3 100644 --- a/glommio/src/sys/hardware_topology.rs +++ b/glommio/src/sys/hardware_topology.rs @@ -7,7 +7,7 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, - io::{self, ErrorKind}, + io::{self}, path::Path, }; @@ -39,7 +39,7 @@ fn build_cpu_location( let package_id = ListIterator::from_path(&cpu_path.join("physical_package_id"))? .next() - .ok_or_else(|| io::Error::new(ErrorKind::Other, "failed to parse physical_package_id"))??; + .ok_or_else(|| io::Error::other("failed to parse physical_package_id"))??; Ok(CpuLocation { cpu, @@ -146,7 +146,7 @@ fn get_core_id( let core = ListIterator::from_path(&cpu_path.join("core_id"))? .next() .transpose()? - .ok_or_else(|| io::Error::new(ErrorKind::Other, "failed to parse core_id"))?; + .ok_or_else(|| io::Error::other("failed to parse core_id"))?; for sibling in cpu_siblings { cpu_to_core.insert(sibling?, core); } diff --git a/glommio/src/sys/membarrier.rs b/glommio/src/sys/membarrier.rs index 66566c682..b26af41eb 100644 --- a/glommio/src/sys/membarrier.rs +++ b/glommio/src/sys/membarrier.rs @@ -160,7 +160,7 @@ mod mprotect { 0 as libc::off_t, ); fatal_assert!(page != libc::MAP_FAILED); - fatal_assert!(page as libc::size_t % page_size == 0); + fatal_assert!((page as libc::size_t).is_multiple_of(page_size)); // Locking the page ensures that it stays in memory during the two mprotect // calls in `Barrier::barrier()`. If the page was unmapped between those calls, diff --git a/glommio/src/sys/sysfs.rs b/glommio/src/sys/sysfs.rs index 8cecd3969..512eda061 100644 --- a/glommio/src/sys/sysfs.rs +++ b/glommio/src/sys/sysfs.rs @@ -6,7 +6,7 @@ use ahash::AHashMap; use std::{ cell::RefCell, - fs::{canonicalize, read_dir, read_to_string}, + fs::{read_dir, read_to_string}, io, marker::PhantomData, path::{Path, PathBuf}, @@ -39,6 +39,7 @@ impl FromStr for StorageCache { pub(crate) struct BlockDevice { memory_device: bool, rotational: bool, + io_poll: bool, minimum_io_size: usize, optimal_io_size: usize, logical_block_size: usize, @@ -46,12 +47,11 @@ pub(crate) struct BlockDevice { max_sectors_size: usize, max_segment_size: usize, cache: StorageCache, - iopoll: Option, subcomponents: Vec, } macro_rules! block_property { - ( $map:expr, $property:tt, $major:expr, $minor:expr ) => { + ($property:tt, $major:expr, $minor:expr ) => { DEV_MAP.with(|x| { let key = ($major, $minor); let mut map = x.borrow_mut(); @@ -62,24 +62,35 @@ macro_rules! block_property { }; } -macro_rules! set_block_property { - ( $map:expr, $property:tt, $major:expr, $minor:expr, $value:expr ) => { - DEV_MAP.with(|x| { - let key = ($major, $minor); - let mut map = x.borrow_mut(); - let bdev = map.entry(key).or_insert_with(|| BlockDevice::new(key)); +fn read_int(path: &Path) -> io::Result { + let data = read_to_string(path)?; + let contents = data.trim_matches('\n'); + Ok(contents.parse::().unwrap()) +} - bdev.$property = $value; - }) - }; +fn read_int_opt(path: &Path) -> Option { + match read_int(path) { + Ok(v) => Some(v), + Err(e) if e.kind() == io::ErrorKind::NotFound => None, + Err(_) => None, + } } -fn read_int>(path: P) -> isize { - let path = path.as_ref(); - let data = - read_to_string(path).unwrap_or_else(|err| panic!("reading {} ({})", path.display(), err)); - let contents = data.trim_matches('\n'); - contents.parse::().unwrap() +/// Walk up from `dev_dir` until we find a dir with `queue/rotational`. +/// This handles partitions (e.g. .../block/sdb/sdb1) where the queue is on the parent disk (sdb). +fn find_queue_dir(mut dev_dir: PathBuf) -> Option { + loop { + let q = dev_dir.join("queue"); + if q.join("rotational").exists() { + return Some(q); + } + if q.is_dir() { + return Some(q); + } + if !dev_dir.pop() { + return None; + } + } } #[allow(dead_code)] @@ -88,6 +99,7 @@ impl BlockDevice { BlockDevice { memory_device: true, rotational: false, + io_poll: false, minimum_io_size: 512, optimal_io_size: 128 << 10, logical_block_size: 512, @@ -95,43 +107,75 @@ impl BlockDevice { max_sectors_size: 128 << 10, max_segment_size: (u32::MAX - 1) as usize, cache: StorageCache::WriteBack, - iopoll: Some(false), subcomponents: Vec::new(), } } fn new(dev_id: (usize, usize)) -> BlockDevice { - // /sys/dev/block/major:minor is a symlink to the device in /sys/devices/ - // However if this a partition, we actually want to look at the main device. - // So minor is always zero. - let dir_path = format!("/sys/dev/block/{}:{}", dev_id.0, 0); - let dir = match canonicalize(Path::new(dir_path.as_str())) { - Ok(path) => path, - Err(x) => match x.kind() { - io::ErrorKind::NotFound => return BlockDevice::in_memory(), - _ => panic!("Unexpected error: {:?}", x), - }, + // Prefer /sys/dev/block/major:minor, then resolve it. + let link = PathBuf::from(format!("/sys/dev/block/{}:{}", dev_id.0, dev_id.1)); + + let dir = match link.canonicalize() { + Ok(p) => p, + Err(e) if e.kind() == io::ErrorKind::NotFound => return BlockDevice::in_memory(), + Err(e) => panic!("Unexpected error canonicalizing {}: {e:?}", link.display()), }; - let queue = dir.join("queue"); - - let rotational = read_int(queue.join("rotational")) != 0; - let minimum_io_size = read_int(queue.join("minimum_io_size")) as _; - let optimal_io_size = read_int(queue.join("optimal_io_size")) as _; - let logical_block_size = read_int(queue.join("logical_block_size")) as _; - let physical_block_size = read_int(queue.join("physical_block_size")) as _; - let max_sectors_kb = read_int(queue.join("max_sectors_kb")) as usize; - let max_segment_size = read_int(queue.join("max_segment_size")) as usize; - - let cache_data = read_to_string(queue.join("write_cache")).unwrap(); - let cache = cache_data.parse::().unwrap(); + + // Find the correct queue directory (partition-safe). + let queue = match find_queue_dir(dir.clone()) { + Some(q) => q, + None => { + // If we can’t find queue, treat it as non-pollable and return conservative defaults. + // (But keep it as "not memory".) + return BlockDevice { + memory_device: false, + rotational: true, + io_poll: false, + minimum_io_size: 512, + optimal_io_size: 128 << 10, + logical_block_size: 512, + physical_block_size: 512, + max_sectors_size: 128 << 10, + max_segment_size: (u32::MAX - 1) as usize, + cache: StorageCache::WriteBack, + subcomponents: Vec::new(), + }; + } + }; + + // Rotational should exist for real block queues, but don’t hard-fail. + let rotational = read_int_opt(&queue.join("rotational")).unwrap_or(1) != 0; + + // io_poll may not exist on older kernels/drivers; treat missing as false. + let io_poll = read_int_opt(&queue.join("io_poll")).unwrap_or(0) != 0; + + let minimum_io_size = read_int_opt(&queue.join("minimum_io_size")).unwrap_or(512) as usize; + let optimal_io_size = + read_int_opt(&queue.join("optimal_io_size")).unwrap_or(128 << 10) as usize; + let logical_block_size = + read_int_opt(&queue.join("logical_block_size")).unwrap_or(512) as usize; + let physical_block_size = + read_int_opt(&queue.join("physical_block_size")).unwrap_or(512) as usize; + + let max_sectors_kb = read_int_opt(&queue.join("max_sectors_kb")).unwrap_or(128) as usize; + let max_segment_size = read_int_opt(&queue.join("max_segment_size")) + .unwrap_or((u32::MAX - 1) as isize) as usize; + + // write_cache might be missing on some virtual devices; default to write back. + let cache = read_to_string(queue.join("write_cache")) + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(StorageCache::WriteBack); + let subcomponents = read_dir(dir.join("slaves")) - .unwrap() - .map(|x| x.unwrap().path()) - .collect(); + .ok() + .map(|it| it.filter_map(|x| x.ok().map(|e| e.path())).collect()) + .unwrap_or_default(); BlockDevice { memory_device: false, rotational, + io_poll, minimum_io_size, optimal_io_size, logical_block_size, @@ -139,53 +183,48 @@ impl BlockDevice { max_sectors_size: max_sectors_kb << 10, max_segment_size, cache, - iopoll: None, subcomponents, } } pub(crate) fn memory_device(major: usize, minor: usize) -> bool { - block_property!(DEV_MAP, memory_device, major, minor) + block_property!(memory_device, major, minor) } pub(crate) fn is_rotational(major: usize, minor: usize) -> bool { - block_property!(DEV_MAP, rotational, major, minor) + block_property!(rotational, major, minor) + } + + pub(crate) fn has_io_poll(major: usize, minor: usize) -> bool { + block_property!(io_poll, major, minor) } pub(crate) fn minimum_io_size(major: usize, minor: usize) -> usize { - block_property!(DEV_MAP, minimum_io_size, major, minor) + block_property!(minimum_io_size, major, minor) } pub(crate) fn optimal_io_size(major: usize, minor: usize) -> usize { - block_property!(DEV_MAP, optimal_io_size, major, minor) + block_property!(optimal_io_size, major, minor) } pub(crate) fn logical_block_size(major: usize, minor: usize) -> usize { - block_property!(DEV_MAP, logical_block_size, major, minor) + block_property!(logical_block_size, major, minor) } pub(crate) fn physical_block_size(major: usize, minor: usize) -> usize { - block_property!(DEV_MAP, physical_block_size, major, minor) + block_property!(physical_block_size, major, minor) } pub(crate) fn max_sectors_size(major: usize, minor: usize) -> usize { - block_property!(DEV_MAP, max_sectors_size, major, minor) + block_property!(max_sectors_size, major, minor) } pub(crate) fn max_segment_size(major: usize, minor: usize) -> usize { - block_property!(DEV_MAP, max_segment_size, major, minor) + block_property!(max_segment_size, major, minor) } pub(crate) fn is_md(major: usize, minor: usize) -> bool { - !block_property!(DEV_MAP, subcomponents, major, minor).is_empty() - } - - pub(crate) fn iopoll(major: usize, minor: usize) -> Option { - block_property!(DEV_MAP, iopoll, major, minor) - } - - pub(crate) fn set_iopoll_support(major: usize, minor: usize, supported: bool) { - set_block_property!(DEV_MAP, iopoll, major, minor, Some(supported)) + !block_property!(subcomponents, major, minor).is_empty() } } diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index affdfa9b4..19184fa6b 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -886,9 +886,8 @@ impl UringCommon for PollRing { }, source_map, ) - .map(|x| { + .inspect(|_| { self.in_kernel -= 1; - x }) } @@ -1199,9 +1198,8 @@ impl UringCommon for SleepableRing { }, source_map, ) - .map(|x| { + .inspect(|_| { self.in_kernel -= 1; - x }) } diff --git a/glommio/src/uring_sys/mod.rs b/glommio/src/uring_sys/mod.rs index d7cac10b3..56fb40705 100644 --- a/glommio/src/uring_sys/mod.rs +++ b/glommio/src/uring_sys/mod.rs @@ -350,8 +350,6 @@ extern "C" { pub fn io_uring_submit_and_wait(ring: *mut io_uring, wait_nr: libc::c_uint) -> libc::c_int; - pub fn io_uring_get_sqe(ring: *mut io_uring) -> *mut io_uring_sqe; - pub fn io_uring_register_buffers( ring: *mut io_uring, iovecs: *const libc::iovec, @@ -495,7 +493,7 @@ extern "C" { ); #[link_name = "rust_io_uring_prep_poll_remove"] - pub fn io_uring_prep_poll_remove(sqe: *mut io_uring_sqe, user_data: *mut libc::c_void); + pub fn io_uring_prep_poll_remove(sqe: *mut io_uring_sqe, user_data: libc::__u64); #[link_name = "rust_io_uring_prep_fsync"] pub fn io_uring_prep_fsync(sqe: *mut io_uring_sqe, fd: libc::c_int, fsync_flags: libc::c_uint); @@ -698,4 +696,7 @@ extern "C" { #[link_name = "rust_io_uring_wait_cqe"] pub fn io_uring_wait_cqe(ring: *mut io_uring, cqe_ptr: *mut *mut io_uring_cqe) -> libc::c_int; + + #[link_name = "rust_io_uring_get_sqe"] + pub fn io_uring_get_sqe(ring: *mut io_uring) -> *mut io_uring_sqe; }