From b61a1cacdb51d3b1e616c3c04b5c7dce78dc52c2 Mon Sep 17 00:00:00 2001 From: Fahmi Akbar Wildana Date: Tue, 9 Jan 2024 15:45:58 +0700 Subject: [PATCH 1/4] Unwrap redundant Option<_> --- src/lib.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7c978c1..578cc53 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -178,7 +178,7 @@ use default::{get_length_and_block_size, set_sparse, trim}; pub struct RandomAccessDisk { #[allow(dead_code)] filename: path::PathBuf, - file: Option, + file: fs::File, length: u64, block_size: u64, auto_sync: bool, @@ -206,7 +206,7 @@ impl RandomAccess for RandomAccessDisk { offset: u64, data: &[u8], ) -> Result<(), RandomAccessError> { - let file = self.file.as_mut().expect("self.file was None."); + let ref mut file = self.file; file.seek(SeekFrom::Start(offset)).await?; file.write_all(data).await?; if self.auto_sync { @@ -243,7 +243,7 @@ impl RandomAccess for RandomAccessDisk { }); } - let file = self.file.as_mut().expect("self.file was None."); + let ref mut file = self.file; let mut buffer = vec![0; length as usize]; file.seek(SeekFrom::Start(offset)).await?; let _bytes_read = file.read(&mut buffer[..]).await?; @@ -273,7 +273,7 @@ impl RandomAccess for RandomAccessDisk { return self.truncate(offset).await; } - let file = self.file.as_mut().expect("self.file was None."); + let ref mut file = self.file; trim(file, offset, length, self.block_size).await?; if self.auto_sync { file.sync_all().await?; @@ -282,7 +282,7 @@ impl RandomAccess for RandomAccessDisk { } async fn truncate(&mut self, length: u64) -> Result<(), RandomAccessError> { - let file = self.file.as_ref().expect("self.file was None."); + let ref file = self.file; self.length = length; file.set_len(self.length).await?; if self.auto_sync { @@ -301,7 +301,7 @@ impl RandomAccess for RandomAccessDisk { async fn sync_all(&mut self) -> Result<(), RandomAccessError> { if !self.auto_sync { - let file = self.file.as_ref().expect("self.file was None."); + let ref file = self.file; file.sync_all().await?; } Ok(()) @@ -310,15 +310,14 @@ impl RandomAccess for RandomAccessDisk { impl Drop for RandomAccessDisk { fn drop(&mut self) { + let ref file = self.file; // We need to flush the file on drop. Unfortunately, that is not possible to do in a // non-blocking fashion, but our only other option here is losing data remaining in the // write cache. Good task schedulers should be resilient to occasional blocking hiccups in // file destructors so we don't expect this to be a common problem in practice. // (from async_std::fs::File::drop) #[cfg(feature = "async-std")] - if let Some(file) = &self.file { - let _ = async_std::task::block_on(file.sync_all()); - } + let _ = async_std::task::block_on(file.sync_all()); // For tokio, the below errors with: // // "Cannot start a runtime from within a runtime. This happens because a function (like @@ -329,11 +328,9 @@ impl Drop for RandomAccessDisk { // in a sync drop(), so for tokio, we'll need to wait for a real AsyncDrop to arrive. // // #[cfg(feature = "tokio")] - // if let Some(file) = &self.file { - // tokio::runtime::Handle::current() - // .block_on(file.sync_all()) - // .expect("Could not sync file changes on drop."); - // } + // tokio::runtime::Handle::current() + // .block_on(file.sync_all()) + // .expect("Could not sync file changes on drop."); } } @@ -379,7 +376,7 @@ impl Builder { let (length, block_size) = get_length_and_block_size(&file).await?; Ok(RandomAccessDisk { filename: self.filename, - file: Some(file), + file, length, auto_sync: self.auto_sync, block_size, From 6d02ac8e2514ae5301b3a9ec6b0e2a735cf56394 Mon Sep 17 00:00:00 2001 From: Fahmi Akbar Wildana Date: Wed, 10 Jan 2024 09:35:16 +0700 Subject: [PATCH 2/4] Add method to acquire BufReader --- src/lib.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 578cc53..fea56f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,7 +109,7 @@ compile_error!("features `random-access-disk/async-std` and `random-access-disk/ use async_std::{ fs::{self, OpenOptions}, io::prelude::{SeekExt, WriteExt}, - io::{ReadExt, SeekFrom}, + io::{BufReader, ReadExt, SeekFrom}, }; use random_access_storage::{RandomAccess, RandomAccessError}; use std::ops::Drop; @@ -120,7 +120,7 @@ use std::io::SeekFrom; #[cfg(feature = "tokio")] use tokio::{ fs::{self, OpenOptions}, - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}, }; #[cfg(all( @@ -197,6 +197,11 @@ impl RandomAccessDisk { pub fn builder(filename: impl AsRef) -> Builder { Builder::new(filename) } + + /// Acquire buffered reader. + pub fn reader(&self) -> BufReader<&fs::File> { + BufReader::new(&self.file) + } } #[async_trait::async_trait] From 319b6c374eb27d7f9cd9b3dc7233fec4b5c9feb1 Mon Sep 17 00:00:00 2001 From: Fahmi Akbar Wildana Date: Wed, 10 Jan 2024 09:38:09 +0700 Subject: [PATCH 3/4] Add rustdoc example for reader() --- src/lib.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index fea56f9..6d75e52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -199,6 +199,29 @@ impl RandomAccessDisk { } /// Acquire buffered reader. + /// # Examples + /// Read each lines to delete on specific line/column + /// + /// ```no_run + /// # use random_access_disk::RandomAccessDisk; + /// # use random_access_storage::RandomAccess; + /// # use async_std::prelude::*; + /// # #[async_std::main] + /// # async fn main() { + /// let line = 10; + /// let column = 10; + /// let length = 5; + /// + /// let mut file = RandomAccessDisk::open("text.db").await.unwrap(); + /// + /// let offset = column + file.reader() + /// .lines().take(line - 1) + /// .fold(column, |acc, line| acc + line.unwrap().len() as u64) + /// .await; + /// + /// let _ = file.del(offset, length).await; + /// # } + /// ``` pub fn reader(&self) -> BufReader<&fs::File> { BufReader::new(&self.file) } From 0f4ab1cad8479e1e2c29ea411be72ab86f71537c Mon Sep 17 00:00:00 2001 From: Fahmi Akbar Wildana Date: Wed, 10 Jan 2024 13:30:16 +0700 Subject: [PATCH 4/4] Make reader() works on tokio --- Cargo.toml | 1 + src/lib.rs | 27 +++++++++++++++++++++------ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9d8e1bf..e660b9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ async-std = { version = "1.12.0", features = ["attributes"] } tokio = { version = "1.27.0", features = ["macros", "rt", "rt-multi-thread"] } criterion = { version = "0.4", features = ["async_std", "async_tokio"] } tokio-test = "0.4" +tokio-stream = { version="0.1.14", features = ["io-util"] } [features] default = ["sparse", "async-std"] diff --git a/src/lib.rs b/src/lib.rs index 6d75e52..ae25ef0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -200,12 +200,20 @@ impl RandomAccessDisk { /// Acquire buffered reader. /// # Examples - /// Read each lines to delete on specific line/column + /// Read each lines to delete at specific line/column /// /// ```no_run /// # use random_access_disk::RandomAccessDisk; /// # use random_access_storage::RandomAccess; - /// # use async_std::prelude::*; + /// # + /// # #[cfg(feature = "async-std")] + /// # use async_std::{io::prelude::BufReadExt, stream::StreamExt}; + /// # + /// # #[cfg(feature = "tokio")] + /// # use tokio::io::{AsyncBufReadExt}; + /// # #[cfg(feature = "tokio")] + /// # use tokio_stream::{StreamExt, wrappers::LinesStream}; + /// # /// # #[async_std::main] /// # async fn main() { /// let line = 10; @@ -214,16 +222,22 @@ impl RandomAccessDisk { /// /// let mut file = RandomAccessDisk::open("text.db").await.unwrap(); /// - /// let offset = column + file.reader() - /// .lines().take(line - 1) + /// #[cfg(feature = "tokio")] + /// let lines = LinesStream::new(file.reader().lines()); + /// + /// #[cfg(not(feature = "tokio"))] + /// let lines = file.reader().lines(); + /// + /// let offset = column + lines + /// .take(line - 1) /// .fold(column, |acc, line| acc + line.unwrap().len() as u64) /// .await; /// /// let _ = file.del(offset, length).await; /// # } /// ``` - pub fn reader(&self) -> BufReader<&fs::File> { - BufReader::new(&self.file) + pub fn reader(&mut self) -> BufReader<&mut fs::File> { + BufReader::new(&mut self.file) // currently BufReader<&fs::File> didn't work on tokio } } @@ -338,6 +352,7 @@ impl RandomAccess for RandomAccessDisk { impl Drop for RandomAccessDisk { fn drop(&mut self) { + #[cfg(feature = "async-std")] let ref file = self.file; // We need to flush the file on drop. Unfortunately, that is not possible to do in a // non-blocking fashion, but our only other option here is losing data remaining in the