diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 4f4f083c73..d312e6b50c 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -35,8 +35,8 @@ use uuid::Uuid; use crate::error::Result; use crate::spec::{ - Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema, - SchemaVisitor, StructType, Type, + Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, + PrimitiveType, Schema, SchemaVisitor, StructType, Type, }; use crate::{Error, ErrorKind}; @@ -221,6 +221,19 @@ pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { visit_schema(schema, &mut visitor) } +/// Convert Arrow schema to Iceberg schema with automatically assigned field IDs. +/// +/// Unlike [`arrow_schema_to_schema`], this function does not require field IDs in the Arrow +/// schema metadata. Instead, it automatically assigns unique field IDs starting from 1, +/// following Iceberg's field ID assignment rules. +/// +/// This is useful when converting Arrow schemas that don't originate from Iceberg tables, +/// such as schemas from DataFusion or other Arrow-based systems. +pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) -> Result { + let mut visitor = ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID); + visit_schema(schema, &mut visitor) +} + /// Convert Arrow type to iceberg type. pub fn arrow_type_to_type(ty: &DataType) -> Result { let mut visitor = ArrowSchemaConverter::new(); @@ -229,7 +242,7 @@ pub fn arrow_type_to_type(ty: &DataType) -> Result { const ARROW_FIELD_DOC_KEY: &str = "doc"; -pub(super) fn get_field_id(field: &Field) -> Result { +pub(super) fn get_field_id_from_metadata(field: &Field) -> Result { if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) { return value.parse::().map_err(|e| { Error::new( @@ -253,19 +266,54 @@ fn get_field_doc(field: &Field) -> Option { None } -struct ArrowSchemaConverter; +struct ArrowSchemaConverter { + /// When set, the schema builder will reassign field IDs starting from this value + /// using level-order traversal (breadth-first). + reassign_field_ids_from: Option, + /// Generates unique placeholder IDs for fields before reassignment. + /// Required because `ReassignFieldIds` builds an old-to-new ID mapping + /// that expects unique input IDs. + temp_field_id_counter: i32, +} impl ArrowSchemaConverter { fn new() -> Self { - Self {} + Self { + reassign_field_ids_from: None, + temp_field_id_counter: 0, + } + } + + fn new_with_field_ids_from(start_from: i32) -> Self { + Self { + reassign_field_ids_from: Some(start_from), + temp_field_id_counter: 0, + } + } + + fn get_field_id(&mut self, field: &Field) -> Result { + if self.reassign_field_ids_from.is_some() { + // Field IDs will be reassigned by the schema builder. + // We need unique temporary IDs because ReassignFieldIds builds an + // old->new ID mapping that requires unique input IDs. + let temp_id = self.temp_field_id_counter; + self.temp_field_id_counter += 1; + Ok(temp_id) + } else { + get_field_id_from_metadata(field) + } } - fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result> { + fn convert_fields( + &mut self, + fields: &Fields, + field_results: &[Type], + ) -> Result> { let mut results = Vec::with_capacity(fields.len()); for i in 0..fields.len() { let field = &fields[i]; let field_type = &field_results[i]; - let id = get_field_id(field)?; + let id = self.get_field_id(field)?; let doc = get_field_doc(field); let nested_field = NestedField { id, @@ -287,13 +335,16 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { type U = Schema; fn schema(&mut self, schema: &ArrowSchema, values: Vec) -> Result { - let fields = Self::convert_fields(schema.fields(), &values)?; - let builder = Schema::builder().with_fields(fields); + let fields = self.convert_fields(schema.fields(), &values)?; + let mut builder = Schema::builder().with_fields(fields); + if let Some(start_from) = self.reassign_field_ids_from { + builder = builder.with_reassigned_field_ids(start_from) + } builder.build() } fn r#struct(&mut self, fields: &Fields, results: Vec) -> Result { - let fields = Self::convert_fields(fields, &results)?; + let fields = self.convert_fields(fields, &results)?; Ok(Type::Struct(StructType::new(fields))) } @@ -310,7 +361,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { } }; - let id = get_field_id(element_field)?; + let id = self.get_field_id(element_field)?; let doc = get_field_doc(element_field); let mut element_field = NestedField::list_element(id, value.clone(), !element_field.is_nullable()); @@ -335,7 +386,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { let key_field = &fields[0]; let value_field = &fields[1]; - let key_id = get_field_id(key_field)?; + let key_id = self.get_field_id(key_field)?; let key_doc = get_field_doc(key_field); let mut key_field = NestedField::map_key_element(key_id, key_value.clone()); if let Some(doc) = key_doc { @@ -343,7 +394,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { } let key_field = Arc::new(key_field); - let value_id = get_field_id(value_field)?; + let value_id = self.get_field_id(value_field)?; let value_doc = get_field_doc(value_field); let mut value_field = NestedField::map_value_element( value_id, @@ -1932,4 +1983,159 @@ mod tests { assert_eq!(array.value(0), [66u8; 16]); } } + + #[test] + fn test_arrow_schema_to_schema_with_field_id() { + // Create a complex Arrow schema without field ID metadata + // Including: primitives, list, nested struct, map, and nested list of structs + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("price", DataType::Decimal128(10, 2), false), + Field::new( + "created_at", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + Field::new( + "tags", + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + ), + Field::new( + "address", + DataType::Struct(Fields::from(vec![ + Field::new("street", DataType::Utf8, true), + Field::new("city", DataType::Utf8, false), + Field::new("zip", DataType::Int32, true), + ])), + true, + ), + Field::new( + "attributes", + DataType::Map( + Arc::new(Field::new( + DEFAULT_MAP_FIELD_NAME, + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, true), + ])), + false, + )), + false, + ), + true, + ), + Field::new( + "orders", + DataType::List(Arc::new(Field::new( + "element", + DataType::Struct(Fields::from(vec![ + Field::new("order_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + ])), + true, + ))), + true, + ), + ]); + + let schema = arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap(); + + // Build expected schema with exact field IDs following level-order assignment: + // Level 0: id=1, name=2, price=3, created_at=4, tags=5, address=6, attributes=7, orders=8 + // Level 1: tags.element=9, address.{street=10,city=11,zip=12}, attributes.{key=13,value=14}, orders.element=15 + // Level 2: orders.element.{order_id=16,amount=17} + let expected = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required( + 3, + "price", + Type::Primitive(PrimitiveType::Decimal { + precision: 10, + scale: 2, + }), + ) + .into(), + NestedField::optional(4, "created_at", Type::Primitive(PrimitiveType::Timestamptz)) + .into(), + NestedField::optional( + 5, + "tags", + Type::List(ListType { + element_field: NestedField::list_element( + 9, + Type::Primitive(PrimitiveType::String), + false, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 6, + "address", + Type::Struct(StructType::new(vec![ + NestedField::optional(10, "street", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(11, "city", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::optional(12, "zip", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + NestedField::optional( + 7, + "attributes", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 13, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 14, + Type::Primitive(PrimitiveType::String), + false, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 8, + "orders", + Type::List(ListType { + element_field: NestedField::list_element( + 15, + Type::Struct(StructType::new(vec![ + NestedField::required( + 16, + "order_id", + Type::Primitive(PrimitiveType::Long), + ) + .into(), + NestedField::required( + 17, + "amount", + Type::Primitive(PrimitiveType::Double), + ) + .into(), + ])), + false, + ) + .into(), + }), + ) + .into(), + ]) + .build() + .unwrap(); + + pretty_assertions::assert_eq!(schema, expected); + assert_eq!(schema.highest_field_id(), 17); + } } diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 190aba08e8..30b47d83fc 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -27,7 +27,7 @@ use arrow_buffer::NullBuffer; use arrow_schema::{DataType, FieldRef}; use uuid::Uuid; -use super::get_field_id; +use super::get_field_id_from_metadata; use crate::spec::{ ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType, SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner, @@ -450,7 +450,7 @@ impl FieldMatchMode { /// Determines if an Arrow field matches an Iceberg field based on the matching mode. pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool { match self { - FieldMatchMode::Id => get_field_id(arrow_field) + FieldMatchMode::Id => get_field_id_from_metadata(arrow_field) .map(|id| id == iceberg_field.id) .unwrap_or(false), FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name, diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 44b35e5a6b..a2b540f08b 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -49,6 +49,7 @@ pub use snapshot_summary::*; pub use sort::*; pub use statistic_file::*; pub use table_metadata::*; +pub(crate) use table_metadata_builder::FIRST_FIELD_ID; pub use table_properties::*; pub use transform::*; pub use values::*; diff --git a/crates/iceberg/src/spec/schema/mod.rs b/crates/iceberg/src/spec/schema/mod.rs index 7080b6e700..da51cd7080 100644 --- a/crates/iceberg/src/spec/schema/mod.rs +++ b/crates/iceberg/src/spec/schema/mod.rs @@ -102,8 +102,8 @@ impl SchemaBuilder { /// Reassignment starts from the field-id specified in `start_from` (inclusive). /// /// All specified aliases and identifier fields will be updated to the new field-ids. - pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self { - self.reassign_field_ids_from = Some(start_from.try_into().unwrap_or(i32::MAX)); + pub(crate) fn with_reassigned_field_ids(mut self, start_from: i32) -> Self { + self.reassign_field_ids_from = Some(start_from); self } @@ -127,6 +127,22 @@ impl SchemaBuilder { /// Builds the schema. pub fn build(self) -> Result { + // If field IDs need to be reassigned, do it first before validation + if let Some(start_from) = self.reassign_field_ids_from { + let mut id_reassigner = ReassignFieldIds::new(start_from); + let new_fields = id_reassigner.reassign_field_ids(self.fields)?; + let new_identifier_field_ids = + id_reassigner.apply_to_identifier_fields(self.identifier_field_ids)?; + let new_alias_to_id = id_reassigner.apply_to_aliases(self.alias_to_id)?; + + return Schema::builder() + .with_schema_id(self.schema_id) + .with_fields(new_fields) + .with_identifier_field_ids(new_identifier_field_ids) + .with_alias(new_alias_to_id) + .build(); + } + let field_id_to_accessor = self.build_accessors(); let r#struct = StructType::new(self.fields); @@ -151,7 +167,7 @@ impl SchemaBuilder { let highest_field_id = id_to_field.keys().max().cloned().unwrap_or(0); - let mut schema = Schema { + Ok(Schema { r#struct, schema_id: self.schema_id, highest_field_id, @@ -164,24 +180,7 @@ impl SchemaBuilder { id_to_name, field_id_to_accessor, - }; - - if let Some(start_from) = self.reassign_field_ids_from { - let mut id_reassigner = ReassignFieldIds::new(start_from); - let new_fields = id_reassigner.reassign_field_ids(schema.r#struct.fields().to_vec())?; - let new_identifier_field_ids = - id_reassigner.apply_to_identifier_fields(schema.identifier_field_ids)?; - let new_alias_to_id = id_reassigner.apply_to_aliases(schema.alias_to_id.clone())?; - - schema = Schema::builder() - .with_schema_id(schema.schema_id) - .with_fields(new_fields) - .with_identifier_field_ids(new_identifier_field_ids) - .with_alias(new_alias_to_id) - .build()?; - } - - Ok(schema) + }) } fn build_accessors(&self) -> HashMap> { diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index eee4fec345..3db327d48a 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -31,7 +31,7 @@ use crate::error::{Error, ErrorKind, Result}; use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE}; use crate::{TableCreation, TableUpdate}; -const FIRST_FIELD_ID: u32 = 1; +pub(crate) const FIRST_FIELD_ID: i32 = 1; /// Manipulating table metadata. ///