diff --git a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs index 66e2898532..4ef5bc3046 100644 --- a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs @@ -20,7 +20,9 @@ use std::collections::HashMap; use fnv::FnvHashSet; +use num_bigint::BigInt; use ordered_float::OrderedFloat; +use rust_decimal::prelude::ToPrimitive; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::RowGroupMetaData; use parquet::file::page_index::column_index::ColumnIndexMetaData; @@ -362,11 +364,129 @@ impl<'a> PageIndexEvaluator<'a> { ) }) .collect(), - ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(_) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "unsupported 'FIXED_LEN_BYTE_ARRAY' index type in column_index", - )); + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(idx) => { + // FIXED_LEN_BYTE_ARRAY is used for: + // - Decimal types (stored as big-endian signed integers) + // - UUID types (stored as 16-byte binary) + // - Fixed types (raw binary) + match &field_type { + PrimitiveType::Decimal { .. } => { + // Convert fixed-length bytes to i128 for decimal comparison + idx.min_values_iter() + .zip(idx.max_values_iter()) + .enumerate() + .zip(row_counts.iter()) + .map(|((i, (min, max)), &row_count)| { + let min_datum = min.and_then(|bytes| { + BigInt::from_signed_bytes_be(bytes) + .to_i128() + .map(|val| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::Int128(val), + ) + }) + }); + let max_datum = max.and_then(|bytes| { + BigInt::from_signed_bytes_be(bytes) + .to_i128() + .map(|val| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::Int128(val), + ) + }) + }); + predicate( + min_datum, + max_datum, + PageNullCount::from_row_and_null_counts( + row_count, + idx.null_count(i), + ), + ) + }) + .collect() + } + PrimitiveType::Uuid => { + // UUID stored as 16-byte fixed-length binary + idx.min_values_iter() + .zip(idx.max_values_iter()) + .enumerate() + .zip(row_counts.iter()) + .map(|((i, (min, max)), &row_count)| { + let min_datum = min.and_then(|bytes| { + if bytes.len() == 16 { + let arr: [u8; 16] = bytes.try_into().ok()?; + Some(Datum::new( + field_type.clone(), + PrimitiveLiteral::UInt128(u128::from_be_bytes(arr)), + )) + } else { + None + } + }); + let max_datum = max.and_then(|bytes| { + if bytes.len() == 16 { + let arr: [u8; 16] = bytes.try_into().ok()?; + Some(Datum::new( + field_type.clone(), + PrimitiveLiteral::UInt128(u128::from_be_bytes(arr)), + )) + } else { + None + } + }); + predicate( + min_datum, + max_datum, + PageNullCount::from_row_and_null_counts( + row_count, + idx.null_count(i), + ), + ) + }) + .collect() + } + PrimitiveType::Fixed(_) => { + // Fixed types are raw binary - use binary comparison + idx.min_values_iter() + .zip(idx.max_values_iter()) + .enumerate() + .zip(row_counts.iter()) + .map(|((i, (min, max)), &row_count)| { + predicate( + min.map(|bytes| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::Binary(bytes.to_vec()), + ) + }), + max.map(|bytes| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::Binary(bytes.to_vec()), + ) + }), + PageNullCount::from_row_and_null_counts( + row_count, + idx.null_count(i), + ), + ) + }) + .collect() + } + _ => { + // Other types using FIXED_LEN_BYTE_ARRAY are not supported + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "unsupported 'FIXED_LEN_BYTE_ARRAY' index type for {:?} in column_index", + field_type + ), + )); + } + } } ColumnIndexMetaData::INT96(_) => { return Err(Error::new( @@ -1367,4 +1487,389 @@ mod tests { Ok((iceberg_schema_ref, field_id_map)) } + + // ============================================================ + // Tests for FIXED_LEN_BYTE_ARRAY support (UUID, Fixed) + // Note: Decimal with precision <= 18 is stored as INT64, not FIXED_LEN_BYTE_ARRAY + // ============================================================ + + /// Helper function to create a test parquet file with Fixed(16) column + fn create_fixed_parquet_file() -> Result<(Arc, NamedTempFile)> { + use arrow_array::FixedSizeBinaryArray; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "col_fixed", + DataType::FixedSizeBinary(16), + true, + )])); + + let temp_file = NamedTempFile::new().unwrap(); + let file = temp_file.reopen().unwrap(); + + let props = WriterProperties::builder() + .set_data_page_row_count_limit(100) + .set_write_batch_size(50) + .build(); + + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + + // Page 1: Fixed bytes starting with 0x00 + let fixed_vals: Vec>> = (0..100) + .map(|i| { + let mut arr = vec![0u8; 16]; + arr[0] = 0x00; + arr[15] = i as u8; + Some(arr) + }) + .collect(); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(fixed_vals.into_iter(), 16) + .unwrap(), + )], + ) + .unwrap(); + writer.write(&batch).unwrap(); + + // Page 2: Fixed bytes starting with 0x80 + let fixed_vals: Vec>> = (0..100) + .map(|i| { + let mut arr = vec![0u8; 16]; + arr[0] = 0x80; + arr[15] = i as u8; + Some(arr) + }) + .collect(); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(fixed_vals.into_iter(), 16) + .unwrap(), + )], + ) + .unwrap(); + writer.write(&batch).unwrap(); + + // Page 3: Fixed bytes starting with 0xFF + let fixed_vals: Vec>> = (0..100) + .map(|i| { + let mut arr = vec![0u8; 16]; + arr[0] = 0xFF; + arr[15] = i as u8; + Some(arr) + }) + .collect(); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(fixed_vals.into_iter(), 16) + .unwrap(), + )], + ) + .unwrap(); + writer.write(&batch).unwrap(); + + writer.close().unwrap(); + + let file = temp_file.reopen().unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let metadata = reader.metadata().clone(); + + Ok((metadata, temp_file)) + } + + fn build_fixed_schema_and_field_map() -> Result<(Arc, HashMap)> { + let iceberg_schema = Schema::builder() + .with_fields([Arc::new(NestedField::new( + 1, + "col_fixed", + Type::Primitive(PrimitiveType::Fixed(16)), + true, + ))]) + .build()?; + let iceberg_schema_ref = Arc::new(iceberg_schema); + let field_id_map = HashMap::from_iter([(1, 0)]); + Ok((iceberg_schema_ref, field_id_map)) + } + + #[test] + fn eval_fixed_greater_than_uses_binary_comparison() -> Result<()> { + let (metadata, _temp_file) = create_fixed_parquet_file()?; + let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata); + let (iceberg_schema_ref, field_id_map) = build_fixed_schema_and_field_map()?; + + // Query: col_fixed > [0x7F, 0, 0, ...] (midpoint) + // Page 1: starts with 0x00 -> SKIP + // Page 2: starts with 0x80 -> READ (0x80 > 0x7F) + // Page 3: starts with 0xFF -> READ (0xFF > 0x7F) + let mut target = vec![0u8; 16]; + target[0] = 0x7F; + + let filter = Reference::new("col_fixed") + .greater_than(Datum::fixed(target)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + // Should skip page 1, read pages 2+3 (merged) + let expected = vec![ + RowSelector::skip(100), // Page 1 skipped + RowSelector::select(200), // Pages 2+3 selected (merged) + ]; + + assert_eq!(result, expected); + Ok(()) + } + + #[test] + fn eval_fixed_equal_selects_correct_page() -> Result<()> { + let (metadata, _temp_file) = create_fixed_parquet_file()?; + let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata); + let (iceberg_schema_ref, field_id_map) = build_fixed_schema_and_field_map()?; + + // Query: col_fixed = [0x80, 0, 0, ..., 50] + // This value is in page 2 + let mut target = vec![0u8; 16]; + target[0] = 0x80; + target[15] = 50; + + let filter = Reference::new("col_fixed") + .equal_to(Datum::fixed(target)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + // Should skip pages 1 and 3, read only page 2 + let expected = vec![ + RowSelector::skip(100), // Page 1 skipped + RowSelector::select(100), // Page 2 selected + RowSelector::skip(100), // Page 3 skipped + ]; + + assert_eq!(result, expected); + Ok(()) + } + + /// Helper function to create a test parquet file with UUID column + fn create_uuid_parquet_file() -> Result<(Arc, NamedTempFile)> { + use arrow_array::FixedSizeBinaryArray; + + // UUIDs are stored as 16-byte fixed binary in Parquet + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "col_uuid", + DataType::FixedSizeBinary(16), + true, + )])); + + let temp_file = NamedTempFile::new().unwrap(); + let file = temp_file.reopen().unwrap(); + + let props = WriterProperties::builder() + .set_data_page_row_count_limit(100) + .set_write_batch_size(50) + .build(); + + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + + // Page 1: UUIDs 00000000-0000-0000-0000-000000000000 to ...000063 + let uuid_vals: Vec>> = (0..100) + .map(|i| { + let mut arr = vec![0u8; 16]; + arr[15] = i as u8; + Some(arr) + }) + .collect(); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(uuid_vals.into_iter(), 16) + .unwrap(), + )], + ) + .unwrap(); + writer.write(&batch).unwrap(); + + // Page 2: UUIDs with first byte = 0x44 (like 44444444-...) + let uuid_vals: Vec>> = (0..100) + .map(|i| { + let mut arr = vec![0x44u8; 16]; + arr[15] = i as u8; + Some(arr) + }) + .collect(); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(uuid_vals.into_iter(), 16) + .unwrap(), + )], + ) + .unwrap(); + writer.write(&batch).unwrap(); + + // Page 3: UUIDs with first byte = 0xFF + let uuid_vals: Vec>> = (0..100) + .map(|i| { + let mut arr = vec![0xFFu8; 16]; + arr[15] = i as u8; + Some(arr) + }) + .collect(); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(uuid_vals.into_iter(), 16) + .unwrap(), + )], + ) + .unwrap(); + writer.write(&batch).unwrap(); + + writer.close().unwrap(); + + let file = temp_file.reopen().unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let metadata = reader.metadata().clone(); + + Ok((metadata, temp_file)) + } + + fn build_uuid_schema_and_field_map() -> Result<(Arc, HashMap)> { + let iceberg_schema = Schema::builder() + .with_fields([Arc::new(NestedField::new( + 1, + "col_uuid", + Type::Primitive(PrimitiveType::Uuid), + true, + ))]) + .build()?; + let iceberg_schema_ref = Arc::new(iceberg_schema); + let field_id_map = HashMap::from_iter([(1, 0)]); + Ok((iceberg_schema_ref, field_id_map)) + } + + #[test] + fn eval_uuid_greater_than_skips_lower_pages() -> Result<()> { + let (metadata, _temp_file) = create_uuid_parquet_file()?; + let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata); + let (iceberg_schema_ref, field_id_map) = build_uuid_schema_and_field_map()?; + + // Query: col_uuid > value with first byte 0x44 + // Page 1: starts with 0x00 -> SKIP + // Page 2: starts with 0x44 -> READ (contains boundary) + // Page 3: starts with 0xFF -> READ (all greater) + let mut uuid_bytes = [0x44u8; 16]; + uuid_bytes[15] = 0x44; + let uuid = uuid::Uuid::from_bytes(uuid_bytes); + + let filter = Reference::new("col_uuid") + .greater_than(Datum::uuid(uuid)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + // Should skip page 1, read pages 2+3 (merged) + let expected = vec![ + RowSelector::skip(100), // Page 1 skipped + RowSelector::select(200), // Pages 2+3 selected (merged) + ]; + + assert_eq!(result, expected); + Ok(()) + } + + #[test] + fn eval_uuid_equal_selects_correct_page() -> Result<()> { + let (metadata, _temp_file) = create_uuid_parquet_file()?; + let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata); + let (iceberg_schema_ref, field_id_map) = build_uuid_schema_and_field_map()?; + + // Query: col_uuid = value in page 2 + let mut uuid_bytes = [0x44u8; 16]; + uuid_bytes[15] = 50; // Within page 2 range + let uuid = uuid::Uuid::from_bytes(uuid_bytes); + + let filter = Reference::new("col_uuid") + .equal_to(Datum::uuid(uuid)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + // Should skip pages 1 and 3, read only page 2 + let expected = vec![ + RowSelector::skip(100), // Page 1 skipped + RowSelector::select(100), // Page 2 selected + RowSelector::skip(100), // Page 3 skipped + ]; + + assert_eq!(result, expected); + Ok(()) + } + + #[test] + fn eval_uuid_less_than_skips_higher_pages() -> Result<()> { + let (metadata, _temp_file) = create_uuid_parquet_file()?; + let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata); + let (iceberg_schema_ref, field_id_map) = build_uuid_schema_and_field_map()?; + + // Query: col_uuid < value with first byte 0x22 + // Page 1: starts with 0x00, max is 0x00...63 -> READ (all less than 0x22) + // Page 2: starts with 0x44 -> SKIP (min > 0x22) + // Page 3: starts with 0xFF -> SKIP (min > 0x22) + let uuid_bytes = [0x22u8; 16]; + let uuid = uuid::Uuid::from_bytes(uuid_bytes); + + let filter = Reference::new("col_uuid") + .less_than(Datum::uuid(uuid)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + // Should read page 1, skip pages 2+3 (merged) + let expected = vec![ + RowSelector::select(100), // Page 1 selected + RowSelector::skip(200), // Pages 2+3 skipped (merged) + ]; + + assert_eq!(result, expected); + Ok(()) + } }