diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index aceeae49f7..a6a2ec5f52 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -30,6 +30,7 @@ use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Predicate, Reference}; use crate::io::FileIO; +use crate::puffin::{DELETION_VECTOR_V1, PuffinReader}; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{ DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor, @@ -46,7 +47,13 @@ pub(crate) struct CachingDeleteFileLoader { // Intermediate context during processing of a delete file task. enum DeleteFileContext { - // TODO: Delete Vector loader from Puffin files + /// Deletion vector loaded from a Puffin file + DeletionVector { + /// The data file path this deletion vector applies to + referenced_data_file: String, + /// The deserialized deletion vector + delete_vector: DeleteVector, + }, ExistingEqDel, PosDels(ArrowRecordBatchStream), FreshEqDel { @@ -209,6 +216,11 @@ impl CachingDeleteFileLoader { del_filter: DeleteFilter, schema: SchemaRef, ) -> Result { + // Check if this is a deletion vector (stored in Puffin file) + if task.is_deletion_vector() { + return Self::load_deletion_vector(task, &basic_delete_file_loader).await; + } + match task.file_type { DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( basic_delete_file_loader @@ -250,10 +262,91 @@ impl CachingDeleteFileLoader { } } + /// Load a deletion vector from a Puffin file. + /// + /// Per the Iceberg spec, deletion vectors are stored as serialized Roaring Bitmaps + /// in Puffin files with blob type "deletion-vector-v1". + async fn load_deletion_vector( + task: &FileScanTaskDeleteFile, + basic_delete_file_loader: &BasicDeleteFileLoader, + ) -> Result { + let referenced_data_file = task.referenced_data_file.as_ref().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Deletion vector must have referenced_data_file set", + ) + })?; + + let content_offset = task.content_offset.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Deletion vector must have content_offset set", + ) + })?; + + let content_size = task.content_size_in_bytes.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Deletion vector must have content_size_in_bytes set", + ) + })?; + + // Open the Puffin file + let input_file = basic_delete_file_loader.file_io().new_input(&task.file_path)?; + let puffin_reader = PuffinReader::new(input_file); + + // Get file metadata to find the blob + let file_metadata = puffin_reader.file_metadata().await?; + + // Find the deletion vector blob at the specified offset + let blob_metadata = file_metadata + .blobs + .iter() + .find(|blob| blob.offset == content_offset && blob.length == content_size as usize) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Could not find deletion vector blob at offset {} with length {} in Puffin file {}", + content_offset, content_size, &task.file_path + ), + ) + })?; + + // Verify blob type + if blob_metadata.r#type != DELETION_VECTOR_V1 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Expected blob type '{}', found '{}' in Puffin file {}", + DELETION_VECTOR_V1, blob_metadata.r#type, &task.file_path + ), + )); + } + + // Read and deserialize the blob + let blob = puffin_reader.blob(blob_metadata).await?; + let delete_vector = DeleteVector::deserialize_from(blob.data())?; + + Ok(DeleteFileContext::DeletionVector { + referenced_data_file: referenced_data_file.clone(), + delete_vector, + }) + } + async fn parse_file_content_for_task( ctx: DeleteFileContext, ) -> Result { match ctx { + DeleteFileContext::DeletionVector { + referenced_data_file, + delete_vector, + } => { + // Deletion vectors are already deserialized, just wrap in a HashMap + let mut map = HashMap::new(); + map.insert(referenced_data_file, delete_vector); + Ok(ParsedDeleteFileContext::DelVecs(map)) + } DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel), DeleteFileContext::PosDels(batch_stream) => { let del_vecs = @@ -889,6 +982,9 @@ mod tests { file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }; let eq_del = FileScanTaskDeleteFile { @@ -896,6 +992,9 @@ mod tests { file_type: DataContentType::EqualityDeletes, partition_spec_id: 0, equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }; let file_scan_task = FileScanTask { diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index e12daf5324..bd022af562 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -50,6 +50,12 @@ impl BasicDeleteFileLoader { pub fn new(file_io: FileIO) -> Self { BasicDeleteFileLoader { file_io } } + + /// Returns a reference to the FileIO used by this loader. + pub fn file_io(&self) -> &FileIO { + &self.file_io + } + /// Loads a RecordBatchStream for a given datafile. pub(crate) async fn parquet_to_batch_stream( &self, diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index d05e028997..d0a25b70fc 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -315,6 +315,9 @@ pub(crate) mod tests { file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }; let pos_del_2 = FileScanTaskDeleteFile { @@ -322,6 +325,9 @@ pub(crate) mod tests { file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }; let pos_del_3 = FileScanTaskDeleteFile { @@ -329,6 +335,9 @@ pub(crate) mod tests { file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }; let file_scan_tasks = vec![ @@ -411,6 +420,9 @@ pub(crate) mod tests { file_type: DataContentType::EqualityDeletes, partition_spec_id: 0, equality_ids: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }], partition: None, partition_spec: None, diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index f7f90663a5..d7b7f5b81c 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2717,6 +2717,9 @@ message schema { file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }], partition: None, partition_spec: None, @@ -2935,6 +2938,9 @@ message schema { file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }], partition: None, partition_spec: None, @@ -3146,6 +3152,9 @@ message schema { file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, }], partition: None, partition_spec: None, diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index df8a10193c..9f41072a41 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::io::Cursor; use std::ops::BitOrAssign; use roaring::RoaringTreemap; @@ -36,6 +37,34 @@ impl DeleteVector { } } + /// Deserialize a DeleteVector from bytes (Puffin blob data). + /// + /// Per the Iceberg spec, deletion vectors are stored as serialized Roaring Bitmaps + /// in Puffin files with blob type "deletion-vector-v1". + pub fn deserialize_from(data: &[u8]) -> Result { + let cursor = Cursor::new(data); + let treemap = RoaringTreemap::deserialize_from(cursor).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to deserialize deletion vector from Puffin blob: {}", e), + ) + })?; + Ok(DeleteVector { inner: treemap }) + } + + /// Serialize a DeleteVector to bytes for storage in a Puffin blob. + #[allow(unused)] + pub fn serialize_to_vec(&self) -> Result> { + let mut buffer = Vec::new(); + self.inner.serialize_into(&mut buffer).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to serialize deletion vector: {}", e), + ) + })?; + Ok(buffer) + } + pub fn iter(&self) -> DeleteVectorIterator<'_> { let outer = self.inner.bitmaps(); DeleteVectorIterator { outer, inner: None } @@ -198,4 +227,36 @@ mod tests { let res = dv.insert_positions(&positions); assert!(res.is_err()); } + + #[test] + fn test_serialize_deserialize_round_trip() { + let mut dv = DeleteVector::default(); + dv.insert(42); + dv.insert(100); + dv.insert(1 << 33); // Test high bits + + let serialized = dv.serialize_to_vec().unwrap(); + let deserialized = DeleteVector::deserialize_from(&serialized).unwrap(); + + let original_items: Vec = dv.iter().collect(); + let deserialized_items: Vec = deserialized.iter().collect(); + + assert_eq!(original_items, deserialized_items); + } + + #[test] + fn test_deserialize_empty() { + let empty_dv = DeleteVector::default(); + let serialized = empty_dv.serialize_to_vec().unwrap(); + let deserialized = DeleteVector::deserialize_from(&serialized).unwrap(); + + assert_eq!(deserialized.len(), 0); + } + + #[test] + fn test_deserialize_invalid_data() { + let invalid_data = vec![0xFF, 0xFE, 0xFD]; // Invalid roaring bitmap data + let result = DeleteVector::deserialize_from(&invalid_data); + assert!(result.is_err()); + } } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 5349a9bdd2..f6afc1ccac 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -144,27 +144,51 @@ pub(crate) struct DeleteFileContext { impl From<&DeleteFileContext> for FileScanTaskDeleteFile { fn from(ctx: &DeleteFileContext) -> Self { + let data_file = &ctx.manifest_entry.data_file; FileScanTaskDeleteFile { file_path: ctx.manifest_entry.file_path().to_string(), file_type: ctx.manifest_entry.content_type(), partition_spec_id: ctx.partition_spec_id, - equality_ids: ctx.manifest_entry.data_file.equality_ids.clone(), + equality_ids: data_file.equality_ids.clone(), + // Deletion vector fields from DataFile + referenced_data_file: data_file.referenced_data_file.clone(), + content_offset: data_file.content_offset, + content_size_in_bytes: data_file.content_size_in_bytes, } } } -/// A task to scan part of file. +/// A task to scan part of a delete file. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FileScanTaskDeleteFile { /// The delete file path pub file_path: String, - /// delete file type + /// Delete file type (PositionDeletes or EqualityDeletes) pub file_type: DataContentType, - /// partition id + /// Partition spec id pub partition_spec_id: i32, - /// equality ids for equality deletes (null for anything other than equality-deletes) + /// Equality ids for equality deletes (None for positional deletes and deletion vectors) pub equality_ids: Option>, + + /// Referenced data file for deletion vectors. + /// When set along with content_offset and content_size_in_bytes, indicates this is a deletion vector. + pub referenced_data_file: Option, + + /// Content offset in the Puffin file for deletion vectors. + pub content_offset: Option, + + /// Content size in bytes for deletion vectors. + pub content_size_in_bytes: Option, +} + +impl FileScanTaskDeleteFile { + /// Returns true if this delete file is a deletion vector stored in a Puffin file. + /// + /// Deletion vectors are identified by having both content_offset and referenced_data_file set. + pub fn is_deletion_vector(&self) -> bool { + self.content_offset.is_some() && self.referenced_data_file.is_some() + } }