Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::delete_vector::DeleteVector;
use crate::expr::Predicate::AlwaysTrue;
use crate::expr::{Predicate, Reference};
use crate::io::FileIO;
use crate::puffin::{DELETION_VECTOR_V1, PuffinReader};
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::spec::{
DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor,
Expand All @@ -46,7 +47,13 @@ pub(crate) struct CachingDeleteFileLoader {

// Intermediate context during processing of a delete file task.
enum DeleteFileContext {
// TODO: Delete Vector loader from Puffin files
/// Deletion vector loaded from a Puffin file
DeletionVector {
/// The data file path this deletion vector applies to
referenced_data_file: String,
/// The deserialized deletion vector
delete_vector: DeleteVector,
},
ExistingEqDel,
PosDels(ArrowRecordBatchStream),
FreshEqDel {
Expand Down Expand Up @@ -209,6 +216,11 @@ impl CachingDeleteFileLoader {
del_filter: DeleteFilter,
schema: SchemaRef,
) -> Result<DeleteFileContext> {
// Check if this is a deletion vector (stored in Puffin file)
if task.is_deletion_vector() {
return Self::load_deletion_vector(task, &basic_delete_file_loader).await;
}

match task.file_type {
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
basic_delete_file_loader
Expand Down Expand Up @@ -250,10 +262,91 @@ impl CachingDeleteFileLoader {
}
}

/// Load a deletion vector from a Puffin file.
///
/// Per the Iceberg spec, deletion vectors are stored as serialized Roaring Bitmaps
/// in Puffin files with blob type "deletion-vector-v1".
async fn load_deletion_vector(
task: &FileScanTaskDeleteFile,
basic_delete_file_loader: &BasicDeleteFileLoader,
) -> Result<DeleteFileContext> {
let referenced_data_file = task.referenced_data_file.as_ref().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Deletion vector must have referenced_data_file set",
)
})?;

let content_offset = task.content_offset.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Deletion vector must have content_offset set",
)
})?;

let content_size = task.content_size_in_bytes.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Deletion vector must have content_size_in_bytes set",
)
})?;

// Open the Puffin file
let input_file = basic_delete_file_loader.file_io().new_input(&task.file_path)?;
let puffin_reader = PuffinReader::new(input_file);

// Get file metadata to find the blob
let file_metadata = puffin_reader.file_metadata().await?;

// Find the deletion vector blob at the specified offset
let blob_metadata = file_metadata
.blobs
.iter()
.find(|blob| blob.offset == content_offset && blob.length == content_size as usize)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Could not find deletion vector blob at offset {} with length {} in Puffin file {}",
content_offset, content_size, &task.file_path
),
)
})?;

// Verify blob type
if blob_metadata.r#type != DELETION_VECTOR_V1 {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Expected blob type '{}', found '{}' in Puffin file {}",
DELETION_VECTOR_V1, blob_metadata.r#type, &task.file_path
),
));
}

// Read and deserialize the blob
let blob = puffin_reader.blob(blob_metadata).await?;
let delete_vector = DeleteVector::deserialize_from(blob.data())?;

Ok(DeleteFileContext::DeletionVector {
referenced_data_file: referenced_data_file.clone(),
delete_vector,
})
}

async fn parse_file_content_for_task(
ctx: DeleteFileContext,
) -> Result<ParsedDeleteFileContext> {
match ctx {
DeleteFileContext::DeletionVector {
referenced_data_file,
delete_vector,
} => {
// Deletion vectors are already deserialized, just wrap in a HashMap
let mut map = HashMap::new();
map.insert(referenced_data_file, delete_vector);
Ok(ParsedDeleteFileContext::DelVecs(map))
}
DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel),
DeleteFileContext::PosDels(batch_stream) => {
let del_vecs =
Expand Down Expand Up @@ -889,13 +982,19 @@ mod tests {
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
};

let eq_del = FileScanTaskDeleteFile {
file_path: eq_delete_path.clone(),
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
};

let file_scan_task = FileScanTask {
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ impl BasicDeleteFileLoader {
pub fn new(file_io: FileIO) -> Self {
BasicDeleteFileLoader { file_io }
}

/// Returns a reference to the FileIO used by this loader.
pub fn file_io(&self) -> &FileIO {
&self.file_io
}

/// Loads a RecordBatchStream for a given datafile.
pub(crate) async fn parquet_to_batch_stream(
&self,
Expand Down
12 changes: 12 additions & 0 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,20 +315,29 @@ pub(crate) mod tests {
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
};

let pos_del_2 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
};

let pos_del_3 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
};

let file_scan_tasks = vec![
Expand Down Expand Up @@ -411,6 +420,9 @@ pub(crate) mod tests {
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
}],
partition: None,
partition_spec: None,
Expand Down
9 changes: 9 additions & 0 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2717,6 +2717,9 @@ message schema {
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
}],
partition: None,
partition_spec: None,
Expand Down Expand Up @@ -2935,6 +2938,9 @@ message schema {
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
}],
partition: None,
partition_spec: None,
Expand Down Expand Up @@ -3146,6 +3152,9 @@ message schema {
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
}],
partition: None,
partition_spec: None,
Expand Down
61 changes: 61 additions & 0 deletions crates/iceberg/src/delete_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::io::Cursor;
use std::ops::BitOrAssign;

use roaring::RoaringTreemap;
Expand All @@ -36,6 +37,34 @@ impl DeleteVector {
}
}

/// Deserialize a DeleteVector from bytes (Puffin blob data).
///
/// Per the Iceberg spec, deletion vectors are stored as serialized Roaring Bitmaps
/// in Puffin files with blob type "deletion-vector-v1".
pub fn deserialize_from(data: &[u8]) -> Result<Self> {
let cursor = Cursor::new(data);
let treemap = RoaringTreemap::deserialize_from(cursor).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("Failed to deserialize deletion vector from Puffin blob: {}", e),
)
})?;
Ok(DeleteVector { inner: treemap })
}

/// Serialize a DeleteVector to bytes for storage in a Puffin blob.
#[allow(unused)]
pub fn serialize_to_vec(&self) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
self.inner.serialize_into(&mut buffer).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize deletion vector: {}", e),
)
})?;
Ok(buffer)
}

pub fn iter(&self) -> DeleteVectorIterator<'_> {
let outer = self.inner.bitmaps();
DeleteVectorIterator { outer, inner: None }
Expand Down Expand Up @@ -198,4 +227,36 @@ mod tests {
let res = dv.insert_positions(&positions);
assert!(res.is_err());
}

#[test]
fn test_serialize_deserialize_round_trip() {
let mut dv = DeleteVector::default();
dv.insert(42);
dv.insert(100);
dv.insert(1 << 33); // Test high bits

let serialized = dv.serialize_to_vec().unwrap();
let deserialized = DeleteVector::deserialize_from(&serialized).unwrap();

let original_items: Vec<u64> = dv.iter().collect();
let deserialized_items: Vec<u64> = deserialized.iter().collect();

assert_eq!(original_items, deserialized_items);
}

#[test]
fn test_deserialize_empty() {
let empty_dv = DeleteVector::default();
let serialized = empty_dv.serialize_to_vec().unwrap();
let deserialized = DeleteVector::deserialize_from(&serialized).unwrap();

assert_eq!(deserialized.len(), 0);
}

#[test]
fn test_deserialize_invalid_data() {
let invalid_data = vec![0xFF, 0xFE, 0xFD]; // Invalid roaring bitmap data
let result = DeleteVector::deserialize_from(&invalid_data);
assert!(result.is_err());
}
}
34 changes: 29 additions & 5 deletions crates/iceberg/src/scan/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,51 @@ pub(crate) struct DeleteFileContext {

impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
fn from(ctx: &DeleteFileContext) -> Self {
let data_file = &ctx.manifest_entry.data_file;
FileScanTaskDeleteFile {
file_path: ctx.manifest_entry.file_path().to_string(),
file_type: ctx.manifest_entry.content_type(),
partition_spec_id: ctx.partition_spec_id,
equality_ids: ctx.manifest_entry.data_file.equality_ids.clone(),
equality_ids: data_file.equality_ids.clone(),
// Deletion vector fields from DataFile
referenced_data_file: data_file.referenced_data_file.clone(),
content_offset: data_file.content_offset,
content_size_in_bytes: data_file.content_size_in_bytes,
}
}
}

/// A task to scan part of file.
/// A task to scan part of a delete file.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FileScanTaskDeleteFile {
/// The delete file path
pub file_path: String,

/// delete file type
/// Delete file type (PositionDeletes or EqualityDeletes)
pub file_type: DataContentType,

/// partition id
/// Partition spec id
pub partition_spec_id: i32,

/// equality ids for equality deletes (null for anything other than equality-deletes)
/// Equality ids for equality deletes (None for positional deletes and deletion vectors)
pub equality_ids: Option<Vec<i32>>,

/// Referenced data file for deletion vectors.
/// When set along with content_offset and content_size_in_bytes, indicates this is a deletion vector.
pub referenced_data_file: Option<String>,

/// Content offset in the Puffin file for deletion vectors.
pub content_offset: Option<i64>,

/// Content size in bytes for deletion vectors.
pub content_size_in_bytes: Option<i64>,
}

impl FileScanTaskDeleteFile {
/// Returns true if this delete file is a deletion vector stored in a Puffin file.
///
/// Deletion vectors are identified by having both content_offset and referenced_data_file set.
pub fn is_deletion_vector(&self) -> bool {
self.content_offset.is_some() && self.referenced_data_file.is_some()
}
}
Loading