From 87ce95cad48e46bb0c88ec064f521542ee332304 Mon Sep 17 00:00:00 2001 From: "hippolyte.barraud" Date: Fri, 22 Oct 2021 02:03:50 +0200 Subject: [PATCH 1/2] add a poll variant to `DmaStreamReader::get_buffer_aligned` Since `DmaStreamReader` is a stream, we have to play well with other streams and make our functions easy to use from a polling context. `get_buffer_aligned` is an async function and, as such, is hard to use from there (not impossible, just hard). If we look at the `AsyncRead` trait, one may see that it defines only poll functions. Async functions are then built on top in the `AsyncReadExt.` This is done because creating a future from a poll function is trivial, while the other way around is hard. Therefore, we create a new function `poll_get_buffer_aligned` and we reimplement `get_buffer_aligned` as a wrapper around it: ```rust pub async fn get_buffer_aligned(&mut self, len: u64) -> Result { poll_fn(|cx| self.poll_get_buffer_aligned(cx, len)).await } ``` --- glommio/src/io/dma_file_stream.rs | 55 +++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/glommio/src/io/dma_file_stream.rs b/glommio/src/io/dma_file_stream.rs index e274c41ff..2ab20f092 100644 --- a/glommio/src/io/dma_file_stream.rs +++ b/glommio/src/io/dma_file_stream.rs @@ -17,6 +17,7 @@ use core::task::Waker; use futures_lite::{ future::poll_fn, io::{AsyncRead, AsyncWrite}, + ready, stream::{self, StreamExt}, }; use std::{ @@ -519,7 +520,7 @@ impl DmaStreamReader { /// Allows access to the buffer that holds the current position with no /// extra copy - //_ In order to use this API, one must guarantee that reading the specified + /// In order to use this API, one must guarantee that reading the specified /// length may not cross into a different buffer. Users of this API are /// expected to be aware of their buffer size (selectable in the /// [`DmaStreamReaderBuilder`]) and act accordingly. @@ -562,8 +563,45 @@ impl DmaStreamReader { /// [`AsyncReadExt`]: https://docs.rs/futures-lite/1.11.2/futures_lite/io/trait.AsyncReadExt.html /// [`ReadResult`]: struct.ReadResult.html pub async fn get_buffer_aligned(&mut self, len: u64) -> Result { + poll_fn(|cx| self.poll_get_buffer_aligned(cx, len)).await + } + + /// A variant of [`get_buffer_aligned`] that can be called from a poll + /// function context. + /// + /// Allows access to the buffer that holds the current position with no + /// extra copy + /// In order to use this API, one must guarantee that reading the specified + /// length may not cross into a different buffer. Users of this API are + /// expected to be aware of their buffer size (selectable in the + /// [`DmaStreamReaderBuilder`]) and act accordingly. + /// + /// The buffer is also not released until the returned [`ReadResult`] goes + /// out of scope. So if you plan to keep this alive for a long time this + /// is probably the wrong API. + /// + /// If EOF is hit while reading with this method, the number of bytes in the + /// returned buffer will be less than number requested. + /// + /// Let's say you want to open a file and check if its header is sane: this + /// is a good API for that. + /// + /// But if after such header there is an index that you want to keep in + /// memory, then you are probably better off with one of the methods + /// from [`AsyncReadExt`]. + /// + /// [`get_buffer_aligned`]: Self::get_buffer_aligned + /// [`DmaStreamReader`]: struct.DmaStreamReader.html + /// [`DmaStreamReaderBuilder`]: struct.DmaStreamReaderBuilder.html + /// [`AsyncReadExt`]: https://docs.rs/futures-lite/1.11.2/futures_lite/io/trait.AsyncReadExt.html + /// [`ReadResult`]: struct.ReadResult.html + pub fn poll_get_buffer_aligned( + &mut self, + cx: &mut Context<'_>, + len: u64, + ) -> Poll> { if len == 0 { - return Ok(ReadResult::empty_buffer()); + return Poll::Ready(Ok(ReadResult::empty_buffer())); } let (start_id, end_id, buffer_size) = { @@ -574,15 +612,18 @@ impl DmaStreamReader { }; if start_id != end_id { - return Err(GlommioError::<()>::WouldBlock(ResourceType::File(format!( - "Reading {} bytes from position {} would cross a buffer boundary (Buffer size {})", - len, self.current_pos, buffer_size + return Poll::Ready(Err(GlommioError::<()>::WouldBlock(ResourceType::File( + format!( + "Reading {} bytes from position {} would cross a buffer boundary (Buffer size \ + {})", + len, self.current_pos, buffer_size + ), )))); } - let x = poll_fn(|cx| self.get_buffer(cx, len, start_id)).await?; + let x = ready!(self.poll_get_buffer(cx, len, start_id))?; self.skip(len); - Ok(x) + Poll::Ready(Ok(x)) } fn get_buffer( From 58506581c4cd1972d06e9eff0c99588cffd7f160 Mon Sep 17 00:00:00 2001 From: "hippolyte.barraud" Date: Fri, 22 Oct 2021 02:06:49 +0200 Subject: [PATCH 2/2] rename `get_buffer` to `poll_get_buffer` for consistency Function returning `std::task::Poll` enums are usually called `poll_*` to tell them apart from their async variants. --- glommio/src/io/dma_file_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glommio/src/io/dma_file_stream.rs b/glommio/src/io/dma_file_stream.rs index 2ab20f092..e363e2d28 100644 --- a/glommio/src/io/dma_file_stream.rs +++ b/glommio/src/io/dma_file_stream.rs @@ -626,7 +626,7 @@ impl DmaStreamReader { Poll::Ready(Ok(x)) } - fn get_buffer( + fn poll_get_buffer( &mut self, cx: &mut Context<'_>, len: u64,