Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions io-uring-test/src/tests/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ pub fn test_tcp_buffer_select<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
match bid {
0 => assert_eq!(&buf0[..256], &input[1024..]),
1 => assert_eq!(&buf1[..256], &input[1024..]),
_ => panic!("{}", cqe.flags()),
_ => panic!("{}", cqe.flags().bits()),
}

// remove one remaining buf
Expand Down Expand Up @@ -1496,7 +1496,7 @@ pub fn test_socket<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert!(cqes[0].result() >= 0);
let io_uring_socket = unsafe { Socket::from_raw_fd(cqes[0].result()) };
assert!(io_uring_socket.as_raw_fd() != plain_fd);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());

// Try a setsockopt.
{
Expand Down Expand Up @@ -1534,7 +1534,7 @@ pub fn test_socket<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 1234);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());

// Check value actually set.
optval = 0;
Expand Down Expand Up @@ -1582,7 +1582,7 @@ pub fn test_socket<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 55);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());

// If the fixed-socket operation worked properly, this must not fail.
ring.submitter().unregister_files().unwrap();
Expand Down Expand Up @@ -1628,7 +1628,7 @@ pub fn test_socket_bind_listen<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert!(cqes[0].result() >= 0);
let io_uring_socket = unsafe { Socket::from_raw_fd(cqes[0].result()) };
assert!(io_uring_socket.as_raw_fd() != plain_fd);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());

// Try to bind.
{
Expand All @@ -1649,7 +1649,7 @@ pub fn test_socket_bind_listen<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 2345);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());

assert_eq!(
io_uring_socket
Expand All @@ -1676,7 +1676,7 @@ pub fn test_socket_bind_listen<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 3456);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());

// Ensure the socket is actually in the listening state.
_ = TcpStream::connect(
Expand Down Expand Up @@ -1719,7 +1719,7 @@ pub fn test_socket_bind_listen<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 55);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());

// If the fixed-socket operation worked properly, this must not fail.
ring.submitter().unregister_files().unwrap();
Expand Down Expand Up @@ -1768,7 +1768,7 @@ pub fn test_udp_recvmsg_multishot<S: squeue::EntryMarker, C: cqueue::EntryMarker
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 11);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());
}

// This structure is actually only used for input arguments to the kernel
Expand Down Expand Up @@ -1922,7 +1922,7 @@ pub fn test_udp_recvmsg_multishot_trunc<S: squeue::EntryMarker, C: cqueue::Entry
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 11);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());
}

// This structure is actually only used for input arguments to the kernel
Expand Down Expand Up @@ -2133,7 +2133,7 @@ pub fn test_udp_sendzc_with_dest<S: squeue::EntryMarker, C: cqueue::EntryMarker>
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 11);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
assert_eq!(cqes[0].flags(), cqueue::CompletionFlags::empty());
}

let recvmsg_e = opcode::RecvMulti::new(Fd(server_socket.as_raw_fd()), BUF_GROUP)
Expand Down
12 changes: 6 additions & 6 deletions io-uring-test/src/tests/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,13 @@ pub fn test_msg_ring_data<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert_eq!(source_cqes.len(), 1);
assert_eq!(source_cqes[0].user_data(), 0);
assert_eq!(source_cqes[0].result(), 0);
assert_eq!(source_cqes[0].flags(), 0);
assert_eq!(source_cqes[0].flags(), cqueue::CompletionFlags::empty());

let dest_cqes: Vec<cqueue::Entry> = dest_ring.completion().map(Into::into).collect();
assert_eq!(dest_cqes.len(), 1);
assert_eq!(dest_cqes[0].user_data(), user_data);
assert_eq!(dest_cqes[0].result(), result);
assert_eq!(dest_cqes[0].flags(), 0);
assert_eq!(dest_cqes[0].flags(), cqueue::CompletionFlags::empty());

Ok(())
}
Expand Down Expand Up @@ -254,13 +254,13 @@ pub fn test_msg_ring_send_fd<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert_eq!(source_cqes.len(), 1);
assert_eq!(source_cqes[0].user_data(), 0);
assert_eq!(source_cqes[0].result(), 0);
assert_eq!(source_cqes[0].flags(), 0);
assert_eq!(source_cqes[0].flags(), cqueue::CompletionFlags::empty());

let dest_cqes: Vec<cqueue::Entry> = temp_ring.completion().map(Into::into).collect();
assert_eq!(dest_cqes.len(), 1);
assert_eq!(dest_cqes[0].user_data(), 22);
assert_eq!(dest_cqes[0].result(), 0);
assert_eq!(dest_cqes[0].flags(), 0);
assert_eq!(dest_cqes[0].flags(), cqueue::CompletionFlags::empty());
}

// Unregister the fixed files from the source ring, then reserve some empty slots
Expand All @@ -285,13 +285,13 @@ pub fn test_msg_ring_send_fd<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
assert_eq!(source_cqes.len(), 1);
assert_eq!(source_cqes[0].user_data(), 0);
assert_eq!(source_cqes[0].result(), 0);
assert_eq!(source_cqes[0].flags(), 0);
assert_eq!(source_cqes[0].flags(), cqueue::CompletionFlags::empty());

let dest_cqes: Vec<cqueue::Entry> = ring.completion().map(Into::into).collect();
assert_eq!(dest_cqes.len(), 1);
assert_eq!(dest_cqes[0].user_data(), 44);
assert_eq!(dest_cqes[0].result(), 0);
assert_eq!(dest_cqes[0].flags(), 0);
assert_eq!(dest_cqes[0].flags(), cqueue::CompletionFlags::empty());
}

// Unregister the fixed files from both rings, then repeat again to
Expand Down
14 changes: 12 additions & 2 deletions io-uring-test/src/tests/register_buf_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,12 @@ impl InnerBufRing {

// Returns the buffer the uring interface picked from the buf_ring for the completion result
// represented by the res and flags.
fn get_buf(&self, buf_ring: FixedSizeBufRing, res: u32, flags: u32) -> io::Result<GBuf> {
fn get_buf(
&self,
buf_ring: FixedSizeBufRing,
res: u32,
flags: cqueue::CompletionFlags,
) -> io::Result<GBuf> {
// This fn does the odd thing of having self as the BufRing and taking an argument that is
// the same BufRing but wrapped in Rc<_> so the wrapped buf_ring can be passed to the
// outgoing GBuf.
Expand All @@ -279,7 +284,12 @@ impl InnerBufRing {
}

// Returns vector of buffers for completion results that can return a bundle
pub(crate) fn get_bufs(&self, buf_ring: &FixedSizeBufRing, res: u32, flags: u32) -> Vec<GBuf> {
pub(crate) fn get_bufs(
&self,
buf_ring: &FixedSizeBufRing,
res: u32,
flags: cqueue::CompletionFlags,
) -> Vec<GBuf> {
let mut bid = io_uring::cqueue::buffer_select(flags).unwrap();
let mut len = res as usize;
let mut output = Vec::with_capacity(len / self.buf_len);
Expand Down
77 changes: 62 additions & 15 deletions src/cqueue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Completion Queue

use bitflags::bitflags;
use std::fmt::{self, Debug};
use std::mem;
use std::mem::MaybeUninit;
Expand Down Expand Up @@ -212,8 +213,10 @@ impl Entry {
/// - Storing the selected buffer ID, if one was selected. See
/// [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
#[inline]
pub fn flags(&self) -> u32 {
self.0.flags
pub fn flags(&self) -> CompletionFlags {
// Use retain as the upper 16 bits store buffer ID if BUFFER_SELECT opt
// is enabled.
CompletionFlags::from_bits_retain(self.0.flags)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially missed the fact that upper 16 bits have the buffer ID.

We use from_bits_retain to ensure we dont lose that information.

}
}

Expand Down Expand Up @@ -261,8 +264,10 @@ impl Entry32 {
/// - Storing the selected buffer ID, if one was selected. See
/// [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
#[inline]
pub fn flags(&self) -> u32 {
self.0 .0.flags
pub fn flags(&self) -> CompletionFlags {
// Use retain as the upper 16 bits store buffer ID if BUFFER_SELECT opt
// is enabled.
CompletionFlags::from_bits_retain(self.0 .0.flags)
}

/// Additional data available in 32-byte completion queue entries (CQEs).
Expand Down Expand Up @@ -295,24 +300,66 @@ impl Debug for Entry32 {
}
}

bitflags!(
/// Request specific information carried in CQE flags field.
/// See man page for complete description:
/// https://man7.org/linux/man-pages/man7/io_uring.7.html
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct CompletionFlags: u32 {
/// If set, the upper 16 bits of the flags field carries the
/// buffer ID that was chosen for this request. The request
/// must have been issued with IOSQE_BUFFER_SELECT set, and
/// used with a request type that supports buffer selection.
/// Additionally, buffers must have been provided upfront
/// either via the IORING_OP_PROVIDE_BUFFERS or the
/// IORING_REGISTER_PBUF_RING methods.
const BUFFER = sys::IORING_CQE_F_BUFFER;

/// If set, the application should expect more completions from
/// the request. This is used for requests that can generate
/// multiple completions, such as multi-shot requests, receive,
/// or accept.
const MORE = sys::IORING_CQE_F_MORE;

/// If set, upon receiving the data from the socket in the
/// current request, the socket still had data left on
/// completion of this request.
const SOCK_NONEMPTY = sys::IORING_CQE_F_SOCK_NONEMPTY;

/// Set for notification CQEs, as seen with the zero-copy
/// networking send and receive support.
const NOTIF = sys::IORING_CQE_F_NOTIF;

/// If set, the buffer ID set in the completion will get more
/// completions. This means that the provided buffer has been
/// partially consumed and there's more buffer space left, and
/// hence the application should expect more completions with
/// this buffer ID. Each completion will continue where the
/// previous one left off. This can only happen if the provided
/// buffer ring has been setup with IOU_PBUF_RING_INC to allow
/// for incremental / partial consumption of buffers.
const BUF_MORE = sys::IORING_CQE_F_BUF_MORE;
}
);

/// Return whether the buffer will be reused by future CQE completions
///
/// This corresponds to the `IORING_CQE_BUF_MORE` flag, and it signals to
/// the consumer that it should expect further completions involging the
/// related buffer ID when the registered buffer ring was setup with
/// the `IOU_PBUF_RING_INC` flag.
pub fn buffer_more(flags: u32) -> bool {
flags & sys::IORING_CQE_F_BUF_MORE != 0
pub fn buffer_more(flags: CompletionFlags) -> bool {
flags.contains(CompletionFlags::BUF_MORE)
}

/// Return which dynamic buffer was used by this operation.
///
/// This corresponds to the `IORING_CQE_F_BUFFER` flag (and related bit-shifting),
/// and it signals to the consumer which provided contains the result of this
/// operation.
pub fn buffer_select(flags: u32) -> Option<u16> {
if flags & sys::IORING_CQE_F_BUFFER != 0 {
let id = flags >> sys::IORING_CQE_BUFFER_SHIFT;
pub fn buffer_select(flags: CompletionFlags) -> Option<u16> {
if flags.contains(CompletionFlags::BUFFER) {
let id = flags.bits() >> sys::IORING_CQE_BUFFER_SHIFT;

// FIXME
//
Expand All @@ -329,8 +376,8 @@ pub fn buffer_select(flags: u32) -> Option<u16> {
/// This corresponds to the `IORING_CQE_F_MORE` flag, and it signals to
/// the consumer that it should expect further CQE entries after this one,
/// still from the same original SQE request (e.g. for multishot operations).
pub fn more(flags: u32) -> bool {
flags & sys::IORING_CQE_F_MORE != 0
pub fn more(flags: CompletionFlags) -> bool {
flags.contains(CompletionFlags::MORE)
}

/// Return whether socket has more data ready to read.
Expand All @@ -340,14 +387,14 @@ pub fn more(flags: u32) -> bool {
///
/// The io_uring documentation says recv, recv-multishot, recvmsg, and recvmsg-multishot
/// can provide this bit in their respective CQE.
pub fn sock_nonempty(flags: u32) -> bool {
flags & sys::IORING_CQE_F_SOCK_NONEMPTY != 0
pub fn sock_nonempty(flags: CompletionFlags) -> bool {
flags.contains(CompletionFlags::SOCK_NONEMPTY)
}

/// Returns whether this completion event is a notification.
///
/// This corresponds to the `IORING_CQE_F_NOTIF` flag,
/// currently used by the [SendZc](crate::opcode::SendZc) operation.
pub fn notif(flags: u32) -> bool {
flags & sys::IORING_CQE_F_NOTIF != 0
pub fn notif(flags: CompletionFlags) -> bool {
flags.contains(CompletionFlags::NOTIF)
}
Loading