diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 24e91ca8a4..bd588ac675 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -49,6 +49,10 @@ pub struct FileIO { builder: FileIOBuilder, inner: Arc, + + /// Cached operator to reuse for all operations. + /// When present, this operator will be used instead of calling create_operator. + cached_operator: Option, } impl FileIO { @@ -83,13 +87,28 @@ impl FileIO { Ok(FileIOBuilder::new(url.scheme())) } + /// Get operator and relative path for a given path. + /// If cached_operator is present, use it; otherwise call create_operator. + fn get_operator<'a>( + &self, + path: &'a impl AsRef, + ) -> Result<(Operator, &'a str)> { + let (op, relative_path) = self.inner.create_operator(path)?; + + if let Some(ref cached_op) = self.cached_operator { + Ok((cached_op.clone(), relative_path)) + } else { + Ok((op, relative_path)) + } + } + /// Deletes file. /// /// # Arguments /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn delete(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.get_operator(&path)?; Ok(op.delete(relative_path).await?) } @@ -105,7 +124,7 @@ impl FileIO { /// - If the path is a empty directory, this function will remove the directory itself. /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories. pub async fn remove_dir_all(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.get_operator(&path)?; let path = if relative_path.ends_with('/') { relative_path.to_string() } else { @@ -120,7 +139,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn exists(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.get_operator(&path)?; Ok(op.exists(relative_path).await?) } @@ -130,7 +149,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_input(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.get_operator(&path)?; let path = path.as_ref().to_string(); let relative_path_pos = path.len() - relative_path.len(); Ok(InputFile { @@ -146,7 +165,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_output(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.get_operator(&path)?; let path = path.as_ref().to_string(); let relative_path_pos = path.len() - relative_path.len(); Ok(OutputFile { @@ -193,6 +212,8 @@ pub struct FileIOBuilder { props: HashMap, /// Optional extensions to configure the underlying FileIO behavior. extensions: Extensions, + /// Optional operator to use for all file operations. + operator: Option, } impl FileIOBuilder { @@ -203,6 +224,7 @@ impl FileIOBuilder { scheme_str: Some(scheme_str.to_string()), props: HashMap::default(), extensions: Extensions::default(), + operator: None, } } @@ -212,6 +234,7 @@ impl FileIOBuilder { scheme_str: None, props: HashMap::default(), extensions: Extensions::default(), + operator: None, } } @@ -260,12 +283,39 @@ impl FileIOBuilder { self.extensions.get::() } + /// Sets a cached operator to reuse for all file operations. + /// This avoids creating a new operator for each operation. + pub fn with_operator(mut self, operator: Operator) -> Self { + self.operator = Some(operator); + self + } + + /// Creates an operator for the given path without building a full FileIO. + /// This is useful for creating a cached operator to pass to `with_operator`. + /// + /// # Arguments + /// + /// * path: A sample path to create the operator for. The operator will be configured + /// based on the scheme and properties of this builder. + /// + /// # Returns + /// + /// Returns a tuple of (Operator, relative_path) where the Operator can be used + /// with `with_operator` and relative_path is the path relative to the operator's root. + pub fn create_operator(&self, path: impl AsRef) -> Result<(Operator, String)> { + let storage = Storage::build(self.clone())?; + let (op, relative_path) = storage.create_operator(&path)?; + Ok((op, relative_path.to_string())) + } + /// Builds [`FileIO`]. pub fn build(self) -> Result { let storage = Storage::build(self.clone())?; + let cached_operator = self.operator.clone(); Ok(FileIO { builder: self, inner: Arc::new(storage), + cached_operator, }) } } @@ -605,4 +655,52 @@ mod tests { io.delete(&path).await.unwrap(); assert!(!io.exists(&path).await.unwrap()); } + + #[tokio::test] + async fn test_file_io_with_cached_operator() { + let tmp_dir = TempDir::new().unwrap(); + let tmp_dir_path = tmp_dir.path().to_str().unwrap(); + let file_name = "cached_test.txt"; + let content = "Testing cached operator"; + let full_path = format!("{}/{}", tmp_dir_path, file_name); + + write_to_file(content, &full_path); + + // Create a local filesystem operator using the public API + let fs_builder = FileIOBuilder::new_fs_io(); + let (operator, _) = fs_builder.create_operator(&full_path).unwrap(); + + // Create an S3 storage builder and FileIO with the cached local operator + // This allows us to pass S3 paths, but use the local filesystem operator + let s3_builder = FileIOBuilder::new("s3"); + + let io = s3_builder.with_operator(operator).build().unwrap(); + + // Use S3 paths, but the cached operator (local filesystem) should be used + // This validates that the operator is reused regardless of the scheme in the path + let s3_input_path = format!("s3://my-bucket{}/{}", tmp_dir_path, file_name); + let s3_output_path = format!("s3://another-bucket{}/{}", tmp_dir_path, file_name); + + // Test read with cached operator - despite S3 path, it should read from local filesystem + let input_file = io.new_input(&s3_input_path).unwrap(); + assert!(input_file.exists().await.unwrap()); + let read_content = input_file.read().await.unwrap(); + assert_eq!(read_content, Bytes::from(content)); + + // Test write with cached operator using different S3 path + let new_content = "Updated content"; + let output_file = io.new_output(&s3_output_path).unwrap(); + output_file.write(new_content.into()).await.unwrap(); + + // Verify the update by reading with the original local filesystem path + // This ensures the file was actually written to the local filesystem + let local_io = FileIOBuilder::new_fs_io().build().unwrap(); + let input_file = local_io.new_input(&full_path).unwrap(); + let updated_content = input_file.read().await.unwrap(); + assert_eq!(updated_content, Bytes::from(new_content)); + + // Test delete with cached operator using S3 path + io.delete(&s3_input_path).await.unwrap(); + assert!(!local_io.exists(&full_path).await.unwrap()); + } }