Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[workspace]
members = [
"examples",
"glommio",
]
members = ["examples", "glommio"]
resolver = "2"
6 changes: 3 additions & 3 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ edition = "2021"

[dev-dependencies]
byte-unit = "5.1.4"
yansi = "~0.5.1"
clap = "4.5.3"
fastrand = "2"
futures = "~0.3.5"
futures-lite = "2.6.0"
glommio = { path = "../glommio" }
http-body-util = "0.1.0"

# hyper and tokio for the hyper example. We just need the traits from Tokio
hyper = { version = "1.2.0", features = ["full"] }
num_cpus = "1.13.0"
sys-info = "~0.8.0"
http-body-util = "0.1.0"
serde_json = "1.0.114"
sys-info = "~0.8.0"
yansi = "~0.5.1"

[[example]]
name = "echo"
Expand Down
2 changes: 2 additions & 0 deletions examples/hyper_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ mod hyper_compat {
}
}

#[allow(unused)]
struct GlommioSleep(glommio::timer::Timer);

impl Future for GlommioSleep {
Expand All @@ -91,6 +92,7 @@ mod hyper_compat {
unsafe impl Send for GlommioSleep {}
unsafe impl Sync for GlommioSleep {}

#[allow(unused)]
#[derive(Clone, Copy, Debug)]
pub struct GlommioTimer;

Expand Down
24 changes: 12 additions & 12 deletions glommio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.9.0"
authors = [
"Glauber Costa <glommer@gmail.com>",
"Hippolyte Barraud <hippolyte.barraud@datadoghq.com>",
"DataDog <info@datadoghq.com>"
"DataDog <info@datadoghq.com>",
]
edition = "2021"
description = "Glommio is a thread-per-core crate that makes writing highly parallel asynchronous applications in a thread-per-core architecture easier for rustaceans."
Expand All @@ -17,6 +17,14 @@ readme = "../README.md"
# This is also documented in the README.md under "Supported Rust Versions"
rust-version = "1.70"

[features]
bench = []
debugging = []

# Unstable features based on nightly
native-tls = []
nightly = ["native-tls"]

[dependencies]
ahash = "0.7"
backtrace = { version = "0.3" }
Expand Down Expand Up @@ -45,6 +53,9 @@ socket2 = { version = "0.4", features = ["all"] }
tracing = "0.1"
typenum = "1.15"

[build-dependencies]
cc = "1.0"

[dev-dependencies]
fastrand = "2"
futures = "0"
Expand All @@ -54,17 +65,6 @@ rand = "0"
tokio = { version = "1", default-features = false, features = ["rt", "macros", "rt-multi-thread", "net", "io-util", "time", "sync"] }
tracing-subscriber = { version = "0", features = ["env-filter"] }

[build-dependencies]
cc = "1.0"

[features]
bench = []
debugging = []

# Unstable features based on nightly
native-tls = []
nightly = ["native-tls"]

[[bench]]
name = "executor"
harness = false
Expand Down
10 changes: 8 additions & 2 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl Ord for TaskQueue {

impl PartialOrd for TaskQueue {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(other.vruntime.cmp(&self.vruntime))
Some(self.cmp(other))
}
}

Expand Down Expand Up @@ -3241,7 +3241,13 @@ mod test {
} else {
// 100 ms may have passed without us running for 100ms in case
// there are other threads. Need to be a bit more relaxed
Duration::from_millis(90)
if cfg!(target_env = "musl") {
// For my musl-based compilation, it'll also sometime returns
// lower `getrusage` value
Duration::from_millis(40)
} else {
Duration::from_millis(90)
}
};

let ex_ru_start = getrusage();
Expand Down
1 change: 1 addition & 0 deletions glommio/src/executor/stall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl StallDetector {
unsafe impl Send for SendWrapper {}
let tid = SendWrapper(unsafe { nix::libc::pthread_self() });
std::thread::spawn(enclose::enclose! { (terminated, timer) move || {
let tid = tid;
while timer.wait().is_ok() {
if terminated.load(Ordering::Relaxed) {
return
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/free_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<T> FreeList<T> {
}
pub(crate) fn dealloc(&mut self, idx: Idx<T>) -> T {
let slot = Slot::Free {
next_free: mem::replace(&mut self.first_free, Some(idx)),
next_free: self.first_free.replace(idx),
};
match mem::replace(&mut self.slots[idx.to_raw()], slot) {
Slot::Full { item } => item,
Expand Down
1 change: 0 additions & 1 deletion glommio/src/io/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ impl Directory {
pub fn sync_read_dir(&self) -> Result<std::fs::ReadDir> {
let path = self.file.path_required("read directory")?;
enhanced_try!(std::fs::read_dir(&*path), "Reading a directory", self.file)
.map_err(Into::into)
}

/// Issues fdatasync into the underlying file.
Expand Down
71 changes: 69 additions & 2 deletions glommio/src/io/dma_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl DmaFile {
pos,
self.pollable,
);
enhanced_try!(source.collect_rw().await, "Writing", self.file).map_err(Into::into)
enhanced_try!(source.collect_rw().await, "Writing", self.file)
}

/// Equivalent to [`DmaFile::write_at`] except that the caller retains
Expand Down Expand Up @@ -442,7 +442,7 @@ impl DmaFile {
pos,
self.pollable,
);
enhanced_try!(source.collect_rw().await, "Writing", self.file).map_err(Into::into)
enhanced_try!(source.collect_rw().await, "Writing", self.file)
}

/// Reads from a specific position in the file and returns the buffer.
Expand Down Expand Up @@ -771,6 +771,7 @@ impl DmaFile {
/// NOTE: Clones are allowed to exist on any thread and all share the same underlying
/// fd safely. try_take_last_clone is also safe to invoke from any thread and will
/// behave correctly with respect to clones on other threads.
#[expect(clippy::result_large_err)]
pub fn try_take_last_clone(mut self) -> std::result::Result<Self, Self> {
match self.file.try_take_last_clone() {
Ok(took) => {
Expand Down Expand Up @@ -1546,6 +1547,12 @@ pub(crate) mod test {
}

dma_file_test!(per_queue_stats, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let q1 =
crate::executor().create_task_queue(Shares::default(), Latency::NotImportant, "q1");
let q2 = crate::executor().create_task_queue(
Expand Down Expand Up @@ -1586,6 +1593,12 @@ pub(crate) mod test {
});

dma_file_test!(file_many_reads, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let new_file = Rc::new(write_dma_file(path.join("testfile"), 4096).await);

println!("{new_file:?}");
Expand Down Expand Up @@ -1622,6 +1635,12 @@ pub(crate) mod test {
});

dma_file_test!(file_many_reads_unaligned, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let new_file = Rc::new(write_dma_file(path.join("testfile"), 4096).await);

let total_reads = Rc::new(RefCell::new(0));
Expand Down Expand Up @@ -1655,6 +1674,12 @@ pub(crate) mod test {
});

dma_file_test!(file_many_reads_no_coalescing, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let new_file = Rc::new(write_dma_file(path.join("testfile"), 4096).await);

let total_reads = Rc::new(RefCell::new(0));
Expand Down Expand Up @@ -1823,6 +1848,12 @@ pub(crate) mod test {
});

dma_file_test!(mirror_buffer_to_two_files, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let (file1, file2) = join!(
async {
OpenOptions::new()
Expand Down Expand Up @@ -1898,6 +1929,12 @@ pub(crate) mod test {
});

dma_file_test!(send_file_across_threads, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let file = OpenOptions::new()
.create_new(true)
.read(true)
Expand Down Expand Up @@ -1953,6 +1990,12 @@ pub(crate) mod test {
});

dma_file_test!(dup, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

fn populate(buf: &mut DmaBuffer) {
buf.as_bytes_mut()[0..5].copy_from_slice(b"hello");
buf.as_bytes_mut()[5..].fill(0);
Expand Down Expand Up @@ -2052,6 +2095,12 @@ pub(crate) mod test {
});

dma_file_test!(resize_dma_buf, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let file = OpenOptions::new()
.create_new(true)
.read(true)
Expand All @@ -2077,6 +2126,12 @@ pub(crate) mod test {
});

dma_file_test!(copy_file_range, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let file1 = OpenOptions::new()
.create_new(true)
.read(true)
Expand Down Expand Up @@ -2119,6 +2174,12 @@ pub(crate) mod test {
});

dma_file_test!(zero_copy_between_files, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let file1 = OpenOptions::new()
.create_new(true)
.read(true)
Expand Down Expand Up @@ -2252,6 +2313,12 @@ pub(crate) mod test {
});

dma_file_test!(share_file_between_threads, path, _k, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let file = OpenOptions::new()
.create_new(true)
.read(true)
Expand Down
4 changes: 2 additions & 2 deletions glommio/src/io/glommio_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,10 @@ pub(crate) mod test {
files
};

assert!(file_list().iter().any(|x| *x == gf_fd)); // sanity check that file is open
assert!(file_list().contains(&gf_fd)); // sanity check that file is open
let _ = { gf }; // moves scope and drops
sleep(Duration::from_millis(10)).await; // forces the reactor to run, which will drop the file
assert!(!file_list().iter().any(|x| *x == gf_fd)); // file is gone
assert!(!file_list().contains(&gf_fd)); // file is gone
});
}
}
24 changes: 24 additions & 0 deletions glommio/src/io/immutable_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,12 @@ mod test {
});

immutable_file_test!(seal_and_stream, path, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let fname = path.join("testfile");
let mut immutable = ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
let written = immutable.write(&[0, 1, 2, 3, 4, 5]).await.unwrap();
Expand All @@ -506,6 +512,12 @@ mod test {
});

immutable_file_test!(stream_pos, path, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let fname = path.join("testfile");
let mut immutable = ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
assert_eq!(immutable.current_pos(), 0);
Expand All @@ -531,6 +543,12 @@ mod test {
});

immutable_file_test!(seal_and_random, path, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let fname = path.join("testfile");
let mut immutable = ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
let written = immutable.write(&[0, 1, 2, 3, 4, 5]).await.unwrap();
Expand All @@ -556,6 +574,12 @@ mod test {
});

immutable_file_test!(seal_ready_many, path, {
// For CI tests, some operations are unsupported
if std::env::var("CI").is_ok_and(|val| val == "1" || val == "true") {
eprintln!("Operation unsupported on CI");
return;
}

let fname = path.join("testfile");
let mut immutable = ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
let written = immutable.write(&[0, 1, 2, 3, 4, 5]).await.unwrap();
Expand Down
10 changes: 10 additions & 0 deletions glommio/src/iou/registrar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ mod tests {

#[test]
#[should_panic(expected = "Device or resource busy")]
#[cfg(not(target_env = "musl"))]
fn double_register() {
let ring = IoUring::new(1).unwrap();
let _ = ring.registrar().register_files(&[1]).unwrap();
let _ = ring.registrar().register_files(&[1]).unwrap();
}

#[test]
#[should_panic(expected = "Resource busy")] // Different panic message for `musl`
#[cfg(target_env = "musl")]
fn double_register() {
let ring = IoUring::new(1).unwrap();
let _ = ring.registrar().register_files(&[1]).unwrap();
Expand Down