Skip to content

Commit 12c4c21

Browse files
authored
feat(reader): null struct default values in create_column (#1847)
Fixes `TestSparkReaderDeletes.testPosDeletesOnParquetFileWithMultipleRowGroups` in Iceberg Java 1.10 with DataFusion Comet. ## Which issue does this PR close? - Partially address #1749. ## What changes are included in this PR? - While `RecordBatchTransformer` does not have exhaustive nested type support yet, this adds logic to `create_column` in the specific scenario for a schema evolution with a new struct column that uses the default NULL value. - If the column has a default value other than NULL defined, it will fall into the existing match arm and say it is unsupported. ## Are these changes tested? New test to reflect what happens with Iceberg Java 1.10's `TestSparkReaderDeletes.testPosDeletesOnParquetFileWithMultipleRowGroups`. The test is misleading, since I figured testing positional deletes would just be a delete vector and be schema agnostic, but [it includes schema change with binary and struct types so we need default NULL values](https://github.com/apache/iceberg/blob/53c046efda5d6c6ac67caf7de29849ab7ac6d406/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java#L65).
1 parent 8d4851f commit 12c4c21

File tree

1 file changed

+93
-0
lines changed

1 file changed

+93
-0
lines changed

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use std::sync::Arc;
2121
use arrow_array::{
2222
Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array,
2323
Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray,
24+
StructArray,
2425
};
26+
use arrow_buffer::NullBuffer;
2527
use arrow_cast::cast;
2628
use arrow_schema::{
2729
DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef,
@@ -594,6 +596,21 @@ impl RecordBatchTransformer {
594596
let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
595597
Arc::new(BinaryArray::from_opt_vec(vals))
596598
}
599+
(DataType::Struct(fields), None) => {
600+
// Create a StructArray filled with nulls. Per Iceberg spec, optional struct fields
601+
// default to null when added to the schema. We defer non-null default struct values
602+
// and leave them as not implemented yet.
603+
let null_arrays: Vec<ArrayRef> = fields
604+
.iter()
605+
.map(|field| Self::create_column(field.data_type(), &None, num_rows))
606+
.collect::<Result<Vec<_>>>()?;
607+
608+
Arc::new(StructArray::new(
609+
fields.clone(),
610+
null_arrays,
611+
Some(NullBuffer::new_null(num_rows)),
612+
))
613+
}
597614
(DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
598615
(dt, _) => {
599616
return Err(Error::new(
@@ -743,6 +760,82 @@ mod test {
743760
assert!(date_column.is_null(2));
744761
}
745762

763+
#[test]
764+
fn schema_evolution_adds_struct_column_with_nulls() {
765+
// Test that when a struct column is added after data files are written,
766+
// the transformer can materialize the missing struct column with null values.
767+
// This reproduces the scenario from Iceberg 1.10.0 TestSparkReaderDeletes tests
768+
// where binaryData and structData columns were added to the schema.
769+
let snapshot_schema = Arc::new(
770+
Schema::builder()
771+
.with_schema_id(1)
772+
.with_fields(vec![
773+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
774+
NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
775+
NestedField::optional(
776+
3,
777+
"struct_col",
778+
Type::Struct(crate::spec::StructType::new(vec![
779+
NestedField::optional(
780+
100,
781+
"inner_field",
782+
Type::Primitive(PrimitiveType::String),
783+
)
784+
.into(),
785+
])),
786+
)
787+
.into(),
788+
])
789+
.build()
790+
.unwrap(),
791+
);
792+
let projected_iceberg_field_ids = [1, 2, 3];
793+
794+
let mut transformer =
795+
RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids);
796+
797+
let file_schema = Arc::new(ArrowSchema::new(vec![
798+
simple_field("id", DataType::Int32, false, "1"),
799+
simple_field("data", DataType::Utf8, false, "2"),
800+
]));
801+
802+
let file_batch = RecordBatch::try_new(file_schema, vec![
803+
Arc::new(Int32Array::from(vec![1, 2, 3])),
804+
Arc::new(StringArray::from(vec!["a", "b", "c"])),
805+
])
806+
.unwrap();
807+
808+
let result = transformer.process_record_batch(file_batch).unwrap();
809+
810+
assert_eq!(result.num_columns(), 3);
811+
assert_eq!(result.num_rows(), 3);
812+
813+
let id_column = result
814+
.column(0)
815+
.as_any()
816+
.downcast_ref::<Int32Array>()
817+
.unwrap();
818+
assert_eq!(id_column.values(), &[1, 2, 3]);
819+
820+
let data_column = result
821+
.column(1)
822+
.as_any()
823+
.downcast_ref::<StringArray>()
824+
.unwrap();
825+
assert_eq!(data_column.value(0), "a");
826+
assert_eq!(data_column.value(1), "b");
827+
assert_eq!(data_column.value(2), "c");
828+
829+
let struct_column = result
830+
.column(2)
831+
.as_any()
832+
.downcast_ref::<arrow_array::StructArray>()
833+
.unwrap();
834+
assert!(struct_column.is_null(0));
835+
assert!(struct_column.is_null(1));
836+
assert!(struct_column.is_null(2));
837+
}
838+
746839
pub fn source_record_batch() -> RecordBatch {
747840
RecordBatch::try_new(
748841
arrow_schema_promotion_addition_and_renaming_required(),

0 commit comments

Comments
 (0)