Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 103 additions & 5 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ pub struct FileIO {
builder: FileIOBuilder,

inner: Arc<Storage>,

/// Cached operator to reuse for all operations.
/// When present, this operator will be used instead of calling create_operator.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should document this better

cached_operator: Option<Operator>,
}

impl FileIO {
Expand Down Expand Up @@ -83,13 +87,28 @@ impl FileIO {
Ok(FileIOBuilder::new(url.scheme()))
}

/// Get operator and relative path for a given path.
/// If cached_operator is present, use it; otherwise call create_operator.
fn get_operator<'a>(
&self,
path: &'a impl AsRef<str>,
) -> Result<(Operator, &'a str)> {
let (op, relative_path) = self.inner.create_operator(path)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we create the operator even when there's a cached one? Shouldn't this go into the else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that it can resolve relative path. It's hacky, I'll pause on this for now. I don't like that we have to specify what we're translating from, seems to me that the code should be detecting that...


if let Some(ref cached_op) = self.cached_operator {
Ok((cached_op.clone(), relative_path))
} else {
Ok((op, relative_path))
}
}

/// Deletes file.
///
/// # Arguments
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self.get_operator(&path)?;
Ok(op.delete(relative_path).await?)
}

Expand All @@ -105,7 +124,7 @@ impl FileIO {
/// - If the path is a empty directory, this function will remove the directory itself.
/// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
pub async fn remove_dir_all(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self.get_operator(&path)?;
let path = if relative_path.ends_with('/') {
relative_path.to_string()
} else {
Expand All @@ -120,7 +139,7 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self.get_operator(&path)?;
Ok(op.exists(relative_path).await?)
}

Expand All @@ -130,7 +149,7 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self.get_operator(&path)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();
Ok(InputFile {
Expand All @@ -146,7 +165,7 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self.get_operator(&path)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();
Ok(OutputFile {
Expand Down Expand Up @@ -193,6 +212,8 @@ pub struct FileIOBuilder {
props: HashMap<String, String>,
/// Optional extensions to configure the underlying FileIO behavior.
extensions: Extensions,
/// Optional operator to use for all file operations.
operator: Option<Operator>,
}

impl FileIOBuilder {
Expand All @@ -203,6 +224,7 @@ impl FileIOBuilder {
scheme_str: Some(scheme_str.to_string()),
props: HashMap::default(),
extensions: Extensions::default(),
operator: None,
}
}

Expand All @@ -212,6 +234,7 @@ impl FileIOBuilder {
scheme_str: None,
props: HashMap::default(),
extensions: Extensions::default(),
operator: None,
}
}

Expand Down Expand Up @@ -260,12 +283,39 @@ impl FileIOBuilder {
self.extensions.get::<T>()
}

/// Sets a cached operator to reuse for all file operations.
/// This avoids creating a new operator for each operation.
pub fn with_operator(mut self, operator: Operator) -> Self {
self.operator = Some(operator);
self
}

/// Creates an operator for the given path without building a full FileIO.
/// This is useful for creating a cached operator to pass to `with_operator`.
///
/// # Arguments
///
/// * path: A sample path to create the operator for. The operator will be configured
/// based on the scheme and properties of this builder.
///
/// # Returns
///
/// Returns a tuple of (Operator, relative_path) where the Operator can be used
/// with `with_operator` and relative_path is the path relative to the operator's root.
pub fn create_operator(&self, path: impl AsRef<str>) -> Result<(Operator, String)> {
let storage = Storage::build(self.clone())?;
let (op, relative_path) = storage.create_operator(&path)?;
Ok((op, relative_path.to_string()))
}

/// Builds [`FileIO`].
pub fn build(self) -> Result<FileIO> {
let storage = Storage::build(self.clone())?;
let cached_operator = self.operator.clone();
Ok(FileIO {
builder: self,
inner: Arc::new(storage),
cached_operator,
})
}
}
Expand Down Expand Up @@ -605,4 +655,52 @@ mod tests {
io.delete(&path).await.unwrap();
assert!(!io.exists(&path).await.unwrap());
}

#[tokio::test]
async fn test_file_io_with_cached_operator() {
let tmp_dir = TempDir::new().unwrap();
let tmp_dir_path = tmp_dir.path().to_str().unwrap();
let file_name = "cached_test.txt";
let content = "Testing cached operator";
let full_path = format!("{}/{}", tmp_dir_path, file_name);

write_to_file(content, &full_path);

// Create a local filesystem operator using the public API
let fs_builder = FileIOBuilder::new_fs_io();
let (operator, _) = fs_builder.create_operator(&full_path).unwrap();

// Create an S3 storage builder and FileIO with the cached local operator
// This allows us to pass S3 paths, but use the local filesystem operator
let s3_builder = FileIOBuilder::new("s3");

let io = s3_builder.with_operator(operator).build().unwrap();

// Use S3 paths, but the cached operator (local filesystem) should be used
// This validates that the operator is reused regardless of the scheme in the path
let s3_input_path = format!("s3://my-bucket{}/{}", tmp_dir_path, file_name);
let s3_output_path = format!("s3://another-bucket{}/{}", tmp_dir_path, file_name);

// Test read with cached operator - despite S3 path, it should read from local filesystem
let input_file = io.new_input(&s3_input_path).unwrap();
assert!(input_file.exists().await.unwrap());
let read_content = input_file.read().await.unwrap();
assert_eq!(read_content, Bytes::from(content));

// Test write with cached operator using different S3 path
let new_content = "Updated content";
let output_file = io.new_output(&s3_output_path).unwrap();
output_file.write(new_content.into()).await.unwrap();

// Verify the update by reading with the original local filesystem path
// This ensures the file was actually written to the local filesystem
let local_io = FileIOBuilder::new_fs_io().build().unwrap();
let input_file = local_io.new_input(&full_path).unwrap();
let updated_content = input_file.read().await.unwrap();
assert_eq!(updated_content, Bytes::from(new_content));

// Test delete with cached operator using S3 path
io.delete(&s3_input_path).await.unwrap();
assert!(!local_io.exists(&full_path).await.unwrap());
}
}
Loading