From 76e0f08af3ec60e8fcecd46c64b81e0d0c34aa0e Mon Sep 17 00:00:00 2001 From: Amit Panda Date: Thu, 23 Apr 2026 20:32:46 -0700 Subject: [PATCH 1/4] feat: normalize write schema casing to table casing at catalog layer Writers (DaliSpark, Spark SQL, Trino DML, Java Iceberg API) may submit column names with different casing than what the table stores (e.g. "id" vs "ID"). Because validateWriteSchema is case-sensitive, these writes were rejected with InvalidSchemaEvolutionException even when the field IDs matched, making case-insensitive writes impossible without changing the table's existing column names. Fix: in doUpdateSchemaIfNeeded, normalize the write schema's top-level column names to match the table schema's casing (matched by Iceberg field ID, not by name) before any comparison or storage. The table's existing casing is never mutated, and writers do not need to know or match the exact casing stored in the catalog. Tables where two or more top-level columns share the same case-folded name (e.g. "id" and "ID") are excluded from normalization because the target column would be ambiguous, so writes to such tables must still use exact casing. Testing: - BaseIcebergSchemaValidatorTest: 10 unit tests covering hasCaseDuplicateFields and normalizeSchemaCasingToTable in isolation, including new-column passthrough and field attribute preservation. - RepositoryTest (H2 Spring integration): 2 new tests exercising the full save() path through the Spring context: - testCaseInsensitiveWrite_succeeds_andPreservesTableCasing: creates a table with "ID", saves an update with "id", asserts no exception and that the stored schema still shows "ID". - testCaseInsensitiveWrite_blockedForCaseDuplicateTable: creates a table with case-duplicate columns, asserts that a mismatched-casing save still throws InvalidSchemaEvolutionException. Co-Authored-By: Claude Sonnet 4.6 --- .../impl/BaseIcebergSchemaValidator.java | 58 ++++++ .../impl/OpenHouseInternalRepositoryImpl.java | 9 + .../tables/e2e/h2/RepositoryTest.java | 104 +++++++++++ .../impl/BaseIcebergSchemaValidatorTest.java | 174 ++++++++++++++++++ 4 files changed, 345 insertions(+) create mode 100644 services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java index 55931a7ac..989659fde 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java @@ -2,6 +2,10 @@ import com.linkedin.openhouse.common.exception.InvalidSchemaEvolutionException; import com.linkedin.openhouse.tables.repository.SchemaValidator; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.MapDifference; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -13,9 +17,63 @@ * A base implementation which delegates the validation to iceberg library that is configured to: - * not allow to write optional values to a required field. - allow input schema to have different * ordering than table. + * + *

Also provides static helpers used by {@link OpenHouseInternalRepositoryImpl} to normalize a + * write schema's top-level column names to the casing already present in the table schema, enabling + * case-insensitive writes without mutating the table's existing column casing. */ @Component public class BaseIcebergSchemaValidator implements SchemaValidator { + + /** + * Returns true if {@code schema} has two or more top-level columns whose names differ only in + * case (e.g. "id" and "ID"). Such tables are excluded from case-insensitive write normalization + * because the target column would be ambiguous. + */ + static boolean hasCaseDuplicateFields(Schema schema) { + Map seen = new HashMap<>(); + for (Types.NestedField field : schema.columns()) { + String lower = field.name().toLowerCase(); + if (seen.containsKey(lower) && !seen.get(lower).equals(field.fieldId())) { + return true; + } + seen.put(lower, field.fieldId()); + } + return false; + } + + /** + * Renames top-level fields in {@code writeSchema} to use the casing from {@code tableSchema}, + * matched by Iceberg field ID (not by name). Fields in {@code writeSchema} whose ID does not + * appear in {@code tableSchema} (genuinely new columns) are left unchanged. + * + *

This preserves the table's existing column casing when a writer submits columns with + * different casing (e.g. writer sends "id", table has "ID" → normalized to "ID"). + */ + static Schema normalizeSchemaCasingToTable(Schema writeSchema, Schema tableSchema) { + // Build fieldId → table column name map for O(1) lookup + Map tableNameById = new HashMap<>(); + for (Types.NestedField field : tableSchema.columns()) { + tableNameById.put(field.fieldId(), field.name()); + } + + List normalizedFields = new ArrayList<>(); + for (Types.NestedField field : writeSchema.columns()) { + String tableName = tableNameById.get(field.fieldId()); + if (tableName != null && !tableName.equals(field.name())) { + // Existing column with different casing: rename to table casing, keep all other attributes + normalizedFields.add( + field.isOptional() + ? Types.NestedField.optional(field.fieldId(), tableName, field.type(), field.doc()) + : Types.NestedField.required( + field.fieldId(), tableName, field.type(), field.doc())); + } else { + normalizedFields.add(field); + } + } + return new Schema(writeSchema.schemaId(), normalizedFields, writeSchema.identifierFieldIds()); + } + @Override public void validateWriteSchema(Schema oldSchema, Schema newSchema, String tableUri) throws InvalidSchemaEvolutionException, IllegalArgumentException { diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java index ff67eff19..7818bf579 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java @@ -461,6 +461,15 @@ private boolean doUpdateSchemaIfNeeded( Schema tableSchema, TableDto tableDto, UpdateProperties updateProperties) { + // Normalize top-level column names in writeSchema to use the casing already present in + // tableSchema (matched by Iceberg field ID). This enables case-insensitive writes: a writer + // that submits "id" for a table column named "ID" will have its schema normalized to "ID" + // before any comparison or storage, so the table's existing casing is never changed. + // Tables where two columns share a case-folded name are excluded (ambiguous target column). + if (!BaseIcebergSchemaValidator.hasCaseDuplicateFields(tableSchema)) { + writeSchema = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + } if (!writeSchema.sameSchema(tableSchema)) { try { schemaValidator.validateWriteSchema(tableSchema, writeSchema, tableDto.getTableUri()); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java index a4ee20adc..a30b0f437 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java @@ -870,6 +870,110 @@ public void testDefaultFileFormatWithFeatureToggle() { Assertions.assertFalse(openHouseInternalRepository.existsById(key3)); } + // ===== Case-insensitive write normalization ===== + + @Test + void testCaseInsensitiveWrite_succeeds_andPreservesTableCasing() { + // Table is created with uppercase column name "ID". + // A subsequent save with lowercase "id" (same field id) should succeed and leave + // the stored schema using the original "ID" casing. + Schema tableSchema = + new Schema( + required(1, "ID", Types.StringType.get()), optional(2, "value", Types.LongType.get())); + + TableDto createDto = + TABLE_DTO + .toBuilder() + .tableId("case_insensitive_write_test") + .schema(SchemaParser.toJson(tableSchema, false)) + .timePartitioning(null) + .clustering(null) + .tableVersion(INITIAL_TABLE_VERSION) + .build(); + + TableDto savedDto = openHouseInternalRepository.save(createDto); + + // Writer submits schema with lowercase "id" (same field id=1) — simulates a case-insensitive + // Spark/Trino write that sends column names in a different casing than the table. + Schema writeSchema = + new Schema( + required(1, "id", Types.StringType.get()), optional(2, "value", Types.LongType.get())); + + TableDto updateDto = + savedDto + .toBuilder() + .schema(SchemaParser.toJson(writeSchema, false)) + .tableVersion(savedDto.getTableLocation()) + .build(); + + // Save should succeed without throwing InvalidSchemaEvolutionException + TableDto updatedDto = + Assertions.assertDoesNotThrow( + () -> openHouseInternalRepository.save(updateDto), + "save() with differently-cased column names should succeed"); + + // The stored schema must preserve the original table casing ("ID", not "id") + Schema storedSchema = SchemaParser.fromJson(updatedDto.getSchema()); + Assertions.assertEquals( + "ID", + storedSchema.findField(1).name(), + "Table casing must be preserved after a case-insensitive write"); + + TableDtoPrimaryKey primaryKey = + TableDtoPrimaryKey.builder() + .tableId("case_insensitive_write_test") + .databaseId(TABLE_DTO.getDatabaseId()) + .build(); + openHouseInternalRepository.deleteById(primaryKey); + Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey)); + } + + @Test + void testCaseInsensitiveWrite_blockedForCaseDuplicateTable() { + // A table with case-duplicate columns (both "id" and "ID") must NOT apply normalization. + // A write with mismatched casing on such a table should still throw. + Schema tableSchema = + new Schema( + required(1, "id", Types.StringType.get()), optional(2, "ID", Types.StringType.get())); + + TableDto createDto = + TABLE_DTO + .toBuilder() + .tableId("case_duplicate_write_test") + .schema(SchemaParser.toJson(tableSchema, false)) + .timePartitioning(null) + .clustering(null) + .tableVersion(INITIAL_TABLE_VERSION) + .build(); + + TableDto savedDto = openHouseInternalRepository.save(createDto); + + // Writer sends "Id" for field id=1 (table has "id") — casing mismatch on a case-dup table + Schema writeSchema = + new Schema( + required(1, "Id", Types.StringType.get()), optional(2, "ID", Types.StringType.get())); + + TableDto updateDto = + savedDto + .toBuilder() + .schema(SchemaParser.toJson(writeSchema, false)) + .tableVersion(savedDto.getTableLocation()) + .build(); + + Assertions.assertThrows( + InvalidSchemaEvolutionException.class, + () -> openHouseInternalRepository.save(updateDto), + "save() with mismatched casing on a case-duplicate table must throw"); + + TableDtoPrimaryKey primaryKey = + TableDtoPrimaryKey.builder() + .tableId("case_duplicate_write_test") + .databaseId(TABLE_DTO.getDatabaseId()) + .build(); + openHouseInternalRepository.deleteById(primaryKey); + Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey)); + } + private TableDtoPrimaryKey getPrimaryKey(TableDto tableDto) { return TableDtoPrimaryKey.builder() .databaseId(tableDto.getDatabaseId()) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java new file mode 100644 index 000000000..75859f05c --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java @@ -0,0 +1,174 @@ +package com.linkedin.openhouse.tables.repository.impl; + +import static org.junit.jupiter.api.Assertions.*; + +import com.linkedin.openhouse.common.exception.InvalidSchemaEvolutionException; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class BaseIcebergSchemaValidatorTest { + + private static final BaseIcebergSchemaValidator VALIDATOR = new BaseIcebergSchemaValidator(); + + // ===== hasCaseDuplicateFields ===== + + @Test + void hasCaseDuplicateFields_returnsFalse_whenAllNamesUnique() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + assertFalse(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); + } + + @Test + void hasCaseDuplicateFields_returnsTrue_whenTwoColumnsDifferOnlyInCase() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "ID", Types.StringType.get())); + assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); + } + + @Test + void hasCaseDuplicateFields_returnsTrue_forMixedCaseDuplicate() { + Schema schema = + new Schema( + Types.NestedField.required(1, "datePartition", Types.StringType.get()), + Types.NestedField.optional(2, "datepartition", Types.StringType.get()), + Types.NestedField.optional(3, "value", Types.LongType.get())); + assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); + } + + @Test + void hasCaseDuplicateFields_returnsFalse_whenSingleColumn() { + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + assertFalse(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); + } + + // ===== normalizeSchemaCasingToTable ===== + + @Test + void normalizeSchemaCasingToTable_noChange_whenCasingAlreadyMatches() { + Schema tableSchema = + new Schema( + Types.NestedField.required(1, "ID", Types.StringType.get()), + Types.NestedField.optional(2, "Name", Types.StringType.get())); + Schema writeSchema = + new Schema( + Types.NestedField.required(1, "ID", Types.StringType.get()), + Types.NestedField.optional(2, "Name", Types.StringType.get())); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + assertEquals("ID", normalized.findField(1).name()); + assertEquals("Name", normalized.findField(2).name()); + } + + @Test + void normalizeSchemaCasingToTable_renamesColumn_toMatchTableCasing() { + // Table has "ID" (uppercase), writer submits "id" (lowercase) + Schema tableSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get())); + Schema writeSchema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + assertEquals("ID", normalized.findField(1).name()); + } + + @Test + void normalizeSchemaCasingToTable_preservesNewColumns_unchanged() { + // Table has id=1, writer adds a new column id=2 with different-cased name + Schema tableSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get())); + Schema writeSchema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "newCol", Types.LongType.get())); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + // Existing column normalized to table casing + assertEquals("ID", normalized.findField(1).name()); + // New column left unchanged + assertEquals("newCol", normalized.findField(2).name()); + } + + @Test + void normalizeSchemaCasingToTable_preservesOptionalRequired() { + Schema tableSchema = + new Schema( + Types.NestedField.required(1, "ID", Types.StringType.get()), + Types.NestedField.optional(2, "Name", Types.StringType.get())); + Schema writeSchema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + assertFalse(normalized.findField(1).isOptional(), "id=1 should remain required"); + assertTrue(normalized.findField(2).isOptional(), "id=2 should remain optional"); + } + + @Test + void normalizeSchemaCasingToTable_preservesFieldDoc() { + Schema tableSchema = + new Schema(Types.NestedField.optional(1, "ID", Types.StringType.get(), "the identifier")); + Schema writeSchema = + new Schema(Types.NestedField.optional(1, "id", Types.StringType.get(), "the identifier")); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + assertEquals("the identifier", normalized.findField(1).doc()); + } + + // ===== Integration: validateWriteSchema passes after normalization ===== + + @Test + void validateWriteSchema_passes_afterCasingNormalization() { + // Table has "ID" (uppercase); writer submits "id" (lowercase) — same field ID + Schema tableSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get())); + Schema writeSchema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + + assertFalse( + BaseIcebergSchemaValidator.hasCaseDuplicateFields(tableSchema), + "table has no case duplicates, normalization should apply"); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + // After normalization, validation should pass without exception + assertDoesNotThrow( + () -> VALIDATOR.validateWriteSchema(tableSchema, normalized, "db.table"), + "validateWriteSchema should succeed after casing normalization"); + } + + @Test + void validateWriteSchema_fails_forCaseDuplicateTable_withMismatchedCasing() { + // Table has case-duplicate columns — normalization is skipped for such tables. + // Validator should still reject a write with mismatched casing on these tables. + Schema tableSchema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "ID", Types.StringType.get())); + Schema writeSchema = + new Schema( + Types.NestedField.required(1, "Id", Types.StringType.get()), + Types.NestedField.optional(2, "ID", Types.StringType.get())); + + assertTrue( + BaseIcebergSchemaValidator.hasCaseDuplicateFields(tableSchema), + "table should be detected as having case-duplicate columns"); + + // Since normalization is skipped, validateWriteSchema sees "Id" vs expected "id" → failure + assertThrows( + InvalidSchemaEvolutionException.class, + () -> VALIDATOR.validateWriteSchema(tableSchema, writeSchema, "db.table")); + } +} From 609aaf015a6ce0f9be9ad453ed5b590931f5c63d Mon Sep 17 00:00:00 2001 From: Amit Panda Date: Wed, 29 Apr 2026 17:56:37 -0700 Subject: [PATCH 2/4] refactor: extend casing normalization to nested fields via TypeUtil.visit Replace the top-level-only loop implementations of hasCaseDuplicateFields and normalizeSchemaCasingToTable with TypeUtil.SchemaVisitor-based implementations that recurse through structs, list elements, and map key/value types at any depth. Key changes: - hasCaseDuplicateFields: uses TypeUtil.visit to detect case-duplicate sibling fields at any nesting level; correctly treats same-named fields in different structs as non-duplicates; adds Locale.ROOT to toLowerCase - normalizeSchemaCasingToTable: uses TypeUtil.indexById to build a single flat Map for the entire table schema in one O(n) walk, then TypeUtil.visit to rewrite field names at any depth; uses reference-equality short-circuit to avoid allocation for unchanged fields - Both methods use Iceberg's visitor contract so new Type variants cause a compile error rather than a silent passthrough Add tests covering nested struct, list element, and map value normalization as well as the hasCaseDuplicateFields depth cases. Co-Authored-By: Claude Sonnet 4.6 --- .../impl/BaseIcebergSchemaValidator.java | 168 +++++++++++++----- .../impl/BaseIcebergSchemaValidatorTest.java | 162 +++++++++++++++++ 2 files changed, 290 insertions(+), 40 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java index 989659fde..7af2cbe59 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java @@ -3,12 +3,15 @@ import com.linkedin.openhouse.common.exception.InvalidSchemaEvolutionException; import com.linkedin.openhouse.tables.repository.SchemaValidator; import java.util.ArrayList; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Set; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.MapDifference; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.springframework.stereotype.Component; @@ -19,59 +22,144 @@ * ordering than table. * *

Also provides static helpers used by {@link OpenHouseInternalRepositoryImpl} to normalize a - * write schema's top-level column names to the casing already present in the table schema, enabling - * case-insensitive writes without mutating the table's existing column casing. + * write schema's column names — at top level and at any nesting depth — to the casing already + * present in the table schema, enabling case-insensitive writes without mutating the table's + * existing column casing. */ @Component public class BaseIcebergSchemaValidator implements SchemaValidator { /** - * Returns true if {@code schema} has two or more top-level columns whose names differ only in - * case (e.g. "id" and "ID"). Such tables are excluded from case-insensitive write normalization - * because the target column would be ambiguous. + * Returns true if {@code schema} has, at any nesting depth, two sibling fields whose names differ + * only in case (e.g. {@code "id"} and {@code "ID"} inside the same struct). Such schemas are + * excluded from case-insensitive write normalization because the target field would be ambiguous. + * + *

Note: fields with the same name in different structs (e.g. {@code user.id} and + * {@code session.id}) are not considered duplicates — Iceberg's field-ID semantics treat them as + * independent fields. */ static boolean hasCaseDuplicateFields(Schema schema) { - Map seen = new HashMap<>(); - for (Types.NestedField field : schema.columns()) { - String lower = field.name().toLowerCase(); - if (seen.containsKey(lower) && !seen.get(lower).equals(field.fieldId())) { - return true; - } - seen.put(lower, field.fieldId()); - } - return false; + return Boolean.TRUE.equals( + TypeUtil.visit( + schema, + new TypeUtil.SchemaVisitor() { + @Override + public Boolean schema(Schema s, Boolean structResult) { + return structResult; + } + + @Override + public Boolean struct(Types.StructType struct, List childResults) { + if (childResults.stream().anyMatch(Boolean.TRUE::equals)) return true; + Set seen = new HashSet<>(); + for (Types.NestedField f : struct.fields()) { + if (!seen.add(f.name().toLowerCase(Locale.ROOT))) return true; + } + return false; + } + + @Override + public Boolean field(Types.NestedField f, Boolean fieldResult) { + return fieldResult; + } + + @Override + public Boolean list(Types.ListType l, Boolean elementResult) { + return elementResult; + } + + @Override + public Boolean map(Types.MapType m, Boolean kRes, Boolean vRes) { + return Boolean.TRUE.equals(kRes) || Boolean.TRUE.equals(vRes); + } + + @Override + public Boolean primitive(Type.PrimitiveType p) { + return false; + } + })); } /** - * Renames top-level fields in {@code writeSchema} to use the casing from {@code tableSchema}, - * matched by Iceberg field ID (not by name). Fields in {@code writeSchema} whose ID does not - * appear in {@code tableSchema} (genuinely new columns) are left unchanged. + * Rewrites every field name in {@code writeSchema} — at top level and inside nested + * structs, list elements, and map keys/values — to use the casing from {@code tableSchema}, + * matched by Iceberg field ID. Fields whose ID is absent from {@code tableSchema} (genuinely new + * columns) are left unchanged. All other field attributes (type, doc, optional/required, + * identifier-field-ids) are preserved. * - *

This preserves the table's existing column casing when a writer submits columns with - * different casing (e.g. writer sends "id", table has "ID" → normalized to "ID"). + *

Uses {@code TypeUtil.indexById} to build a single flat {@code Map} for + * the entire table schema in one O(n) walk, giving O(1) name lookup at any depth. Uses {@code + * TypeUtil.visit} to recurse through the write schema — covering struct, list, and map container + * types exhaustively via Iceberg's visitor contract. */ static Schema normalizeSchemaCasingToTable(Schema writeSchema, Schema tableSchema) { - // Build fieldId → table column name map for O(1) lookup - Map tableNameById = new HashMap<>(); - for (Types.NestedField field : tableSchema.columns()) { - tableNameById.put(field.fieldId(), field.name()); - } + // One O(n) walk over the full table schema tree; lookup at any depth is then O(1). + final Map tableById = TypeUtil.indexById(tableSchema.asStruct()); - List normalizedFields = new ArrayList<>(); - for (Types.NestedField field : writeSchema.columns()) { - String tableName = tableNameById.get(field.fieldId()); - if (tableName != null && !tableName.equals(field.name())) { - // Existing column with different casing: rename to table casing, keep all other attributes - normalizedFields.add( - field.isOptional() - ? Types.NestedField.optional(field.fieldId(), tableName, field.type(), field.doc()) - : Types.NestedField.required( - field.fieldId(), tableName, field.type(), field.doc())); - } else { - normalizedFields.add(field); - } - } - return new Schema(writeSchema.schemaId(), normalizedFields, writeSchema.identifierFieldIds()); + Type rewritten = + TypeUtil.visit( + writeSchema, + new TypeUtil.SchemaVisitor() { + @Override + public Type schema(Schema s, Type structResult) { + return structResult; + } + + @Override + public Type struct(Types.StructType struct, List fieldTypes) { + List originals = struct.fields(); + List rebuilt = new ArrayList<>(originals.size()); + for (int i = 0; i < originals.size(); i++) { + Types.NestedField original = originals.get(i); + Types.NestedField tableField = tableById.get(original.fieldId()); + String name = (tableField != null) ? tableField.name() : original.name(); + Type type = fieldTypes.get(i); + // Reuse the original if nothing changed (cheap reference-equality short-circuit). + if (name.equals(original.name()) && type == original.type()) { + rebuilt.add(original); + } else { + rebuilt.add( + original.isOptional() + ? Types.NestedField.optional( + original.fieldId(), name, type, original.doc()) + : Types.NestedField.required( + original.fieldId(), name, type, original.doc())); + } + } + return Types.StructType.of(rebuilt); + } + + @Override + public Type field(Types.NestedField f, Type fieldResult) { + return fieldResult; + } + + @Override + public Type list(Types.ListType list, Type elementResult) { + Types.NestedField elem = list.fields().get(0); + return elem.isOptional() + ? Types.ListType.ofOptional(elem.fieldId(), elementResult) + : Types.ListType.ofRequired(elem.fieldId(), elementResult); + } + + @Override + public Type map(Types.MapType map, Type keyResult, Type valueResult) { + Types.NestedField k = map.fields().get(0); + Types.NestedField v = map.fields().get(1); + return v.isOptional() + ? Types.MapType.ofOptional(k.fieldId(), v.fieldId(), keyResult, valueResult) + : Types.MapType.ofRequired(k.fieldId(), v.fieldId(), keyResult, valueResult); + } + + @Override + public Type primitive(Type.PrimitiveType p) { + return p; + } + }); + + Types.StructType normalizedStruct = (Types.StructType) rewritten; + return new Schema( + writeSchema.schemaId(), normalizedStruct.fields(), writeSchema.identifierFieldIds()); } @Override diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java index 75859f05c..8db6c4cdc 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java @@ -47,6 +47,70 @@ void hasCaseDuplicateFields_returnsFalse_whenSingleColumn() { assertFalse(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); } + @Test + void hasCaseDuplicateFields_returnsTrue_forCaseDuplicateInsideNestedStruct() { + // event: struct — siblings inside the nested struct are duplicates + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "event", + Types.StructType.of( + Types.NestedField.required(2, "ID", Types.StringType.get()), + Types.NestedField.optional(3, "id", Types.StringType.get())))); + assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); + } + + @Test + void hasCaseDuplicateFields_returnsFalse_forSameNameInDifferentStructs() { + // user.id and session.id are in different structs — not siblings, not duplicates + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "user", + Types.StructType.of(Types.NestedField.required(2, "id", Types.StringType.get()))), + Types.NestedField.optional( + 3, + "session", + Types.StructType.of(Types.NestedField.required(4, "id", Types.StringType.get())))); + assertFalse(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); + } + + @Test + void hasCaseDuplicateFields_returnsTrue_forCaseDuplicateInsideListElement() { + // events: list> — duplicate inside the list element struct + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "events", + Types.ListType.ofOptional( + 2, + Types.StructType.of( + Types.NestedField.required(3, "ID", Types.StringType.get()), + Types.NestedField.optional(4, "id", Types.StringType.get()))))); + assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); + } + + @Test + void hasCaseDuplicateFields_returnsTrue_forCaseDuplicateInsideMapValue() { + // metadata: map> — duplicate in map value struct + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "metadata", + Types.MapType.ofOptional( + 2, + 3, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(4, "ID", Types.StringType.get()), + Types.NestedField.optional(5, "id", Types.StringType.get()))))); + assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); + } + // ===== normalizeSchemaCasingToTable ===== @Test @@ -128,6 +192,104 @@ void normalizeSchemaCasingToTable_preservesFieldDoc() { assertEquals("the identifier", normalized.findField(1).doc()); } + @Test + void normalizeSchemaCasingToTable_renamesFieldsInsideNestedStruct() { + // Table: event: struct + // Writer: event: struct (wrong casing at nested level) + Schema tableSchema = + new Schema( + Types.NestedField.optional( + 1, + "event", + Types.StructType.of( + Types.NestedField.required(2, "EventId", Types.StringType.get()), + Types.NestedField.optional(3, "Value", Types.LongType.get())))); + Schema writeSchema = + new Schema( + Types.NestedField.optional( + 1, + "event", + Types.StructType.of( + Types.NestedField.required(2, "eventid", Types.StringType.get()), + Types.NestedField.optional(3, "value", Types.LongType.get())))); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + Types.StructType nestedStruct = normalized.findField(1).type().asStructType(); + assertEquals("EventId", nestedStruct.field(2).name()); + assertEquals("Value", nestedStruct.field(3).name()); + } + + @Test + void normalizeSchemaCasingToTable_renamesFieldsInsideListElement() { + // Table: events: list> + // Writer: events: list> + Schema tableSchema = + new Schema( + Types.NestedField.optional( + 1, + "events", + Types.ListType.ofOptional( + 2, + Types.StructType.of( + Types.NestedField.required(3, "EventId", Types.StringType.get()))))); + Schema writeSchema = + new Schema( + Types.NestedField.optional( + 1, + "events", + Types.ListType.ofOptional( + 2, + Types.StructType.of( + Types.NestedField.required(3, "eventid", Types.StringType.get()))))); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + Types.ListType listType = (Types.ListType) normalized.findField(1).type(); + Types.StructType elementStruct = listType.elementType().asStructType(); + assertEquals("EventId", elementStruct.field(3).name()); + } + + @Test + void normalizeSchemaCasingToTable_renamesFieldsInsideMapValue() { + // Table: metadata: map> + // Writer: metadata: map> + Schema tableSchema = + new Schema( + Types.NestedField.optional( + 1, + "metadata", + Types.MapType.ofOptional( + 2, + 3, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(4, "Key", Types.StringType.get()), + Types.NestedField.optional(5, "Val", Types.LongType.get()))))); + Schema writeSchema = + new Schema( + Types.NestedField.optional( + 1, + "metadata", + Types.MapType.ofOptional( + 2, + 3, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(4, "key", Types.StringType.get()), + Types.NestedField.optional(5, "val", Types.LongType.get()))))); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + Types.MapType mapType = (Types.MapType) normalized.findField(1).type(); + Types.StructType valueStruct = mapType.valueType().asStructType(); + assertEquals("Key", valueStruct.field(4).name()); + assertEquals("Val", valueStruct.field(5).name()); + } + // ===== Integration: validateWriteSchema passes after normalization ===== @Test From 197983c0c96b621fe9100a99dab67a40238ee81e Mon Sep 17 00:00:00 2001 From: Amit Panda Date: Wed, 29 Apr 2026 18:44:12 -0700 Subject: [PATCH 3/4] refactor: promote SchemaValidationUtil upstream from li-openhouse Introduce SchemaValidationUtil in the open-source repo so both open-source BaseIcebergSchemaValidator and li-openhouse share one duplicate-detection implementation instead of maintaining divergent subsets. Changes: - New SchemaValidationUtil with findDuplicateCaseInsensitiveColumnNames (returns conflict paths) and hasDuplicateCaseInsensitiveColumnNames (boolean), covering struct fields at any depth, list element types, and map key/value types; uses Locale.ROOT to fix the Turkish-i locale bug - Remove hasCaseDuplicateFields from BaseIcebergSchemaValidator; the normalization guard in OpenHouseInternalRepositoryImpl now calls SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames directly - Move hasCaseDuplicateFields tests to SchemaValidationUtilTest; update the two integration-test precondition assertions in BaseIcebergSchemaValidatorTest to use the new utility The two callers retain opposite semantics from the same predicate: li-openhouse rejects writes when duplicates exist; the normalization guard skips normalization (writes may still succeed for exact-casing). Co-Authored-By: Claude Sonnet 4.6 --- .../impl/BaseIcebergSchemaValidator.java | 54 ------ .../impl/OpenHouseInternalRepositoryImpl.java | 2 +- .../repository/impl/SchemaValidationUtil.java | 95 ++++++++++ .../impl/BaseIcebergSchemaValidatorTest.java | 104 +---------- .../impl/SchemaValidationUtilTest.java | 168 ++++++++++++++++++ 5 files changed, 266 insertions(+), 157 deletions(-) create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtil.java create mode 100644 services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtilTest.java diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java index 7af2cbe59..f673d4f8b 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java @@ -3,11 +3,8 @@ import com.linkedin.openhouse.common.exception.InvalidSchemaEvolutionException; import com.linkedin.openhouse.tables.repository.SchemaValidator; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.Set; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.MapDifference; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -29,57 +26,6 @@ @Component public class BaseIcebergSchemaValidator implements SchemaValidator { - /** - * Returns true if {@code schema} has, at any nesting depth, two sibling fields whose names differ - * only in case (e.g. {@code "id"} and {@code "ID"} inside the same struct). Such schemas are - * excluded from case-insensitive write normalization because the target field would be ambiguous. - * - *

Note: fields with the same name in different structs (e.g. {@code user.id} and - * {@code session.id}) are not considered duplicates — Iceberg's field-ID semantics treat them as - * independent fields. - */ - static boolean hasCaseDuplicateFields(Schema schema) { - return Boolean.TRUE.equals( - TypeUtil.visit( - schema, - new TypeUtil.SchemaVisitor() { - @Override - public Boolean schema(Schema s, Boolean structResult) { - return structResult; - } - - @Override - public Boolean struct(Types.StructType struct, List childResults) { - if (childResults.stream().anyMatch(Boolean.TRUE::equals)) return true; - Set seen = new HashSet<>(); - for (Types.NestedField f : struct.fields()) { - if (!seen.add(f.name().toLowerCase(Locale.ROOT))) return true; - } - return false; - } - - @Override - public Boolean field(Types.NestedField f, Boolean fieldResult) { - return fieldResult; - } - - @Override - public Boolean list(Types.ListType l, Boolean elementResult) { - return elementResult; - } - - @Override - public Boolean map(Types.MapType m, Boolean kRes, Boolean vRes) { - return Boolean.TRUE.equals(kRes) || Boolean.TRUE.equals(vRes); - } - - @Override - public Boolean primitive(Type.PrimitiveType p) { - return false; - } - })); - } - /** * Rewrites every field name in {@code writeSchema} — at top level and inside nested * structs, list elements, and map keys/values — to use the casing from {@code tableSchema}, diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java index 7818bf579..ab545c2f5 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java @@ -466,7 +466,7 @@ private boolean doUpdateSchemaIfNeeded( // that submits "id" for a table column named "ID" will have its schema normalized to "ID" // before any comparison or storage, so the table's existing casing is never changed. // Tables where two columns share a case-folded name are excluded (ambiguous target column). - if (!BaseIcebergSchemaValidator.hasCaseDuplicateFields(tableSchema)) { + if (!SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema)) { writeSchema = BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtil.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtil.java new file mode 100644 index 000000000..710c35c97 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtil.java @@ -0,0 +1,95 @@ +package com.linkedin.openhouse.tables.repository.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * Utility for validating Iceberg schemas with respect to case-insensitive column naming. + * + *

Promoted from li-openhouse's {@code SchemaValidationUtil} to the upstream open-source repo so + * that both sides share a single implementation. The two callers want opposite outcomes from the + * same predicate: + * + *

+ * + *

Coverage: struct fields at any nesting depth, list element types, and map key/value types. + * Fields with the same name in different structs (e.g. {@code user.id} and {@code + * session.id}) are correctly treated as independent — only siblings within the same struct + * are compared. + */ +public final class SchemaValidationUtil { + + private SchemaValidationUtil() {} + + /** + * Finds all sibling-field pairs at any nesting depth whose names are equal case-insensitively but + * not case-sensitively. + * + * @param schema the Iceberg schema to validate + * @return list of conflict descriptions, e.g. {@code "[userid, userId]"} or {@code "[col1.userid, + * col1.userId]"}; empty if no conflicts + */ + public static List findDuplicateCaseInsensitiveColumnNames(Schema schema) { + List conflicts = new ArrayList<>(); + checkFieldsForDuplicates(schema.columns(), "", conflicts); + return conflicts; + } + + /** + * Returns {@code true} if the schema has any case-insensitive duplicate field names at any + * nesting depth. + */ + public static boolean hasDuplicateCaseInsensitiveColumnNames(Schema schema) { + return !findDuplicateCaseInsensitiveColumnNames(schema).isEmpty(); + } + + /** + * Checks {@code fields} for sibling-level case-insensitive duplicates, records any conflicts, and + * recurses into each field's type. + */ + private static void checkFieldsForDuplicates( + List fields, String pathPrefix, List conflicts) { + Map seenLowerToName = new HashMap<>(); + for (Types.NestedField field : fields) { + String name = field.name(); + String lower = name.toLowerCase(Locale.ROOT); + if (seenLowerToName.containsKey(lower)) { + String first = seenLowerToName.get(lower); + if (!first.equals(name)) { + String qualifier = pathPrefix.isEmpty() ? "" : pathPrefix + "."; + conflicts.add(String.format("[%s%s, %s%s]", qualifier, first, qualifier, name)); + } + } else { + seenLowerToName.put(lower, name); + } + } + for (Types.NestedField field : fields) { + String childPath = pathPrefix.isEmpty() ? field.name() : pathPrefix + "." + field.name(); + checkTypeForDuplicates(field.type(), childPath, conflicts); + } + } + + /** Recurses into compound types (struct, list element, map key/value). Primitives are a no-op. */ + private static void checkTypeForDuplicates(Type type, String path, List conflicts) { + if (type instanceof Types.StructType) { + checkFieldsForDuplicates(((Types.StructType) type).fields(), path, conflicts); + } else if (type instanceof Types.ListType) { + checkTypeForDuplicates(((Types.ListType) type).elementType(), path, conflicts); + } else if (type instanceof Types.MapType) { + Types.MapType mapType = (Types.MapType) type; + checkTypeForDuplicates(mapType.keyType(), path, conflicts); + checkTypeForDuplicates(mapType.valueType(), path, conflicts); + } + // Primitive types have no nested fields; nothing to recurse into. + } +} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java index 8db6c4cdc..2848b0f8e 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java @@ -11,106 +11,6 @@ public class BaseIcebergSchemaValidatorTest { private static final BaseIcebergSchemaValidator VALIDATOR = new BaseIcebergSchemaValidator(); - // ===== hasCaseDuplicateFields ===== - - @Test - void hasCaseDuplicateFields_returnsFalse_whenAllNamesUnique() { - Schema schema = - new Schema( - Types.NestedField.required(1, "id", Types.StringType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); - assertFalse(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); - } - - @Test - void hasCaseDuplicateFields_returnsTrue_whenTwoColumnsDifferOnlyInCase() { - Schema schema = - new Schema( - Types.NestedField.required(1, "id", Types.StringType.get()), - Types.NestedField.optional(2, "ID", Types.StringType.get())); - assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); - } - - @Test - void hasCaseDuplicateFields_returnsTrue_forMixedCaseDuplicate() { - Schema schema = - new Schema( - Types.NestedField.required(1, "datePartition", Types.StringType.get()), - Types.NestedField.optional(2, "datepartition", Types.StringType.get()), - Types.NestedField.optional(3, "value", Types.LongType.get())); - assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); - } - - @Test - void hasCaseDuplicateFields_returnsFalse_whenSingleColumn() { - Schema schema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); - assertFalse(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); - } - - @Test - void hasCaseDuplicateFields_returnsTrue_forCaseDuplicateInsideNestedStruct() { - // event: struct — siblings inside the nested struct are duplicates - Schema schema = - new Schema( - Types.NestedField.optional( - 1, - "event", - Types.StructType.of( - Types.NestedField.required(2, "ID", Types.StringType.get()), - Types.NestedField.optional(3, "id", Types.StringType.get())))); - assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); - } - - @Test - void hasCaseDuplicateFields_returnsFalse_forSameNameInDifferentStructs() { - // user.id and session.id are in different structs — not siblings, not duplicates - Schema schema = - new Schema( - Types.NestedField.optional( - 1, - "user", - Types.StructType.of(Types.NestedField.required(2, "id", Types.StringType.get()))), - Types.NestedField.optional( - 3, - "session", - Types.StructType.of(Types.NestedField.required(4, "id", Types.StringType.get())))); - assertFalse(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); - } - - @Test - void hasCaseDuplicateFields_returnsTrue_forCaseDuplicateInsideListElement() { - // events: list> — duplicate inside the list element struct - Schema schema = - new Schema( - Types.NestedField.optional( - 1, - "events", - Types.ListType.ofOptional( - 2, - Types.StructType.of( - Types.NestedField.required(3, "ID", Types.StringType.get()), - Types.NestedField.optional(4, "id", Types.StringType.get()))))); - assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); - } - - @Test - void hasCaseDuplicateFields_returnsTrue_forCaseDuplicateInsideMapValue() { - // metadata: map> — duplicate in map value struct - Schema schema = - new Schema( - Types.NestedField.optional( - 1, - "metadata", - Types.MapType.ofOptional( - 2, - 3, - Types.StringType.get(), - Types.StructType.of( - Types.NestedField.required(4, "ID", Types.StringType.get()), - Types.NestedField.optional(5, "id", Types.StringType.get()))))); - assertTrue(BaseIcebergSchemaValidator.hasCaseDuplicateFields(schema)); - } - // ===== normalizeSchemaCasingToTable ===== @Test @@ -299,7 +199,7 @@ void validateWriteSchema_passes_afterCasingNormalization() { Schema writeSchema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); assertFalse( - BaseIcebergSchemaValidator.hasCaseDuplicateFields(tableSchema), + SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema), "table has no case duplicates, normalization should apply"); Schema normalized = @@ -325,7 +225,7 @@ void validateWriteSchema_fails_forCaseDuplicateTable_withMismatchedCasing() { Types.NestedField.optional(2, "ID", Types.StringType.get())); assertTrue( - BaseIcebergSchemaValidator.hasCaseDuplicateFields(tableSchema), + SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema), "table should be detected as having case-duplicate columns"); // Since normalization is skipped, validateWriteSchema sees "Id" vs expected "id" → failure diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtilTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtilTest.java new file mode 100644 index 000000000..1ef134b7c --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtilTest.java @@ -0,0 +1,168 @@ +package com.linkedin.openhouse.tables.repository.impl; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class SchemaValidationUtilTest { + + // ===== hasDuplicateCaseInsensitiveColumnNames ===== + + @Test + void hasDuplicateCaseInsensitiveColumnNames_returnsFalse_whenAllNamesUnique() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + assertFalse(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema)); + } + + @Test + void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_whenTwoColumnsDifferOnlyInCase() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "ID", Types.StringType.get())); + assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema)); + } + + @Test + void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_forMixedCaseDuplicate() { + Schema schema = + new Schema( + Types.NestedField.required(1, "datePartition", Types.StringType.get()), + Types.NestedField.optional(2, "datepartition", Types.StringType.get()), + Types.NestedField.optional(3, "value", Types.LongType.get())); + assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema)); + } + + @Test + void hasDuplicateCaseInsensitiveColumnNames_returnsFalse_whenSingleColumn() { + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + assertFalse(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema)); + } + + @Test + void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_forCaseDuplicateInsideNestedStruct() { + // event: struct — siblings inside the nested struct are duplicates + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "event", + Types.StructType.of( + Types.NestedField.required(2, "ID", Types.StringType.get()), + Types.NestedField.optional(3, "id", Types.StringType.get())))); + assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema)); + } + + @Test + void hasDuplicateCaseInsensitiveColumnNames_returnsFalse_forSameNameInDifferentStructs() { + // user.id and session.id are in different structs — not siblings, not duplicates + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "user", + Types.StructType.of(Types.NestedField.required(2, "id", Types.StringType.get()))), + Types.NestedField.optional( + 3, + "session", + Types.StructType.of(Types.NestedField.required(4, "id", Types.StringType.get())))); + assertFalse(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema)); + } + + @Test + void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_forCaseDuplicateInsideListElement() { + // events: list> — duplicate inside the list element struct + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "events", + Types.ListType.ofOptional( + 2, + Types.StructType.of( + Types.NestedField.required(3, "ID", Types.StringType.get()), + Types.NestedField.optional(4, "id", Types.StringType.get()))))); + assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema)); + } + + @Test + void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_forCaseDuplicateInsideMapValue() { + // metadata: map> — duplicate in map value struct + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "metadata", + Types.MapType.ofOptional( + 2, + 3, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(4, "ID", Types.StringType.get()), + Types.NestedField.optional(5, "id", Types.StringType.get()))))); + assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema)); + } + + // ===== findDuplicateCaseInsensitiveColumnNames ===== + + @Test + void findDuplicateCaseInsensitiveColumnNames_returnsEmpty_whenNoConflicts() { + Schema schema = + new Schema( + Types.NestedField.required(1, "userid", Types.StringType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + assertTrue(SchemaValidationUtil.findDuplicateCaseInsensitiveColumnNames(schema).isEmpty()); + } + + @Test + void findDuplicateCaseInsensitiveColumnNames_returnsConflict_forTopLevelDuplicate() { + Schema schema = + new Schema( + Types.NestedField.required(1, "userid", Types.StringType.get()), + Types.NestedField.required(2, "userId", Types.StringType.get())); + List conflicts = SchemaValidationUtil.findDuplicateCaseInsensitiveColumnNames(schema); + assertEquals(1, conflicts.size()); + assertTrue(conflicts.get(0).contains("userid") && conflicts.get(0).contains("userId")); + } + + @Test + void findDuplicateCaseInsensitiveColumnNames_returnsConflictWithPath_forNestedDuplicate() { + // col1: struct + Schema schema = + new Schema( + Types.NestedField.required( + 1, + "col1", + Types.StructType.of( + Types.NestedField.required(2, "userid", Types.StringType.get()), + Types.NestedField.required(3, "userId", Types.StringType.get())))); + List conflicts = SchemaValidationUtil.findDuplicateCaseInsensitiveColumnNames(schema); + assertEquals(1, conflicts.size()); + assertTrue( + conflicts.get(0).contains("col1.userid") && conflicts.get(0).contains("col1.userId")); + } + + @Test + void findDuplicateCaseInsensitiveColumnNames_returnsConflictWithPath_forListElementDuplicate() { + // events: list> + Schema schema = + new Schema( + Types.NestedField.optional( + 1, + "events", + Types.ListType.ofOptional( + 2, + Types.StructType.of( + Types.NestedField.required(3, "ID", Types.StringType.get()), + Types.NestedField.optional(4, "id", Types.StringType.get()))))); + List conflicts = SchemaValidationUtil.findDuplicateCaseInsensitiveColumnNames(schema); + assertEquals(1, conflicts.size()); + assertTrue(conflicts.get(0).contains("events.ID") && conflicts.get(0).contains("events.id")); + } +} From 910bd70a2fb6efabdd9044562cf923ea4b7fb021 Mon Sep 17 00:00:00 2001 From: Amit Panda Date: Thu, 30 Apr 2026 11:10:55 -0700 Subject: [PATCH 4/4] test: add missing write-path coverage for case-insensitive normalization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three gaps identified in the E2E and unit test suites: Path B (RepositoryTest): writer submits wrong casing on an existing column AND adds a new column in the same write. Normalization fixes the existing column; validateWriteSchema then sees a valid evolution and must accept it. This is the realistic migration scenario where a Spark/Trino client sends a schema update alongside a column addition. Path D (RepositoryTest): table has case-duplicate columns (guard skips normalization). A write with exactly matching casing must still succeed (sameSchema = true). Verifies the guard does not break legitimate writes to legacy case-duplicate tables. Unit (BaseIcebergSchemaValidatorTest): same as Path B but at the unit level — normalize "id"→"ID", then confirm validateWriteSchema accepts the resulting evolution (new column appended, existing IDs intact). Co-Authored-By: Claude Sonnet 4.6 --- .../tables/e2e/h2/RepositoryTest.java | 103 ++++++++++++++++++ .../impl/BaseIcebergSchemaValidatorTest.java | 26 +++++ 2 files changed, 129 insertions(+) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java index a30b0f437..6626a6da4 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java @@ -974,6 +974,109 @@ void testCaseInsensitiveWrite_blockedForCaseDuplicateTable() { Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey)); } + @Test + void testCaseInsensitiveWrite_succeedsWithColumnAddition_andPreservesTableCasing() { + // Path B: writer has wrong casing on an existing column AND adds a new column. + // Normalization must fix the existing column casing first; then validateWriteSchema + // sees a valid evolution (existing IDs intact, new column appended) and must accept it. + Schema tableSchema = + new Schema( + required(1, "ID", Types.StringType.get()), optional(2, "value", Types.LongType.get())); + + TableDto createDto = + TABLE_DTO + .toBuilder() + .tableId("case_insensitive_evolution_test") + .schema(SchemaParser.toJson(tableSchema, false)) + .timePartitioning(null) + .clustering(null) + .tableVersion(INITIAL_TABLE_VERSION) + .build(); + + TableDto savedDto = openHouseInternalRepository.save(createDto); + + // Writer submits wrong casing on "ID" (sends "id") and also adds new column "new_col" (id=3). + Schema writeSchema = + new Schema( + required(1, "id", Types.StringType.get()), + optional(2, "value", Types.LongType.get()), + optional(3, "new_col", Types.LongType.get())); + + TableDto updateDto = + savedDto + .toBuilder() + .schema(SchemaParser.toJson(writeSchema, false)) + .tableVersion(savedDto.getTableLocation()) + .build(); + + // After normalization "id" → "ID", sameSchema is false (new_col added), so + // validateWriteSchema is invoked and must accept the valid column addition. + TableDto updatedDto = + Assertions.assertDoesNotThrow( + () -> openHouseInternalRepository.save(updateDto), + "save() with casing mismatch + new column should succeed"); + + Schema storedSchema = SchemaParser.fromJson(updatedDto.getSchema()); + Assertions.assertEquals( + "ID", storedSchema.findField(1).name(), "Existing column casing must be preserved"); + Assertions.assertNotNull( + storedSchema.findField(3), "New column must be present in stored schema"); + + TableDtoPrimaryKey primaryKey = + TableDtoPrimaryKey.builder() + .tableId("case_insensitive_evolution_test") + .databaseId(TABLE_DTO.getDatabaseId()) + .build(); + openHouseInternalRepository.deleteById(primaryKey); + Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey)); + } + + @Test + void testCaseInsensitiveWrite_caseDuplicateTable_succeedsWithExactCasing() { + // Path D: table has case-duplicate columns — normalization guard skips normalization. + // A write with exactly matching casing must still succeed (sameSchema = true). + // This verifies the guard does not break legitimate writes to legacy case-duplicate tables. + Schema tableSchema = + new Schema( + required(1, "id", Types.StringType.get()), optional(2, "ID", Types.StringType.get())); + + TableDto createDto = + TABLE_DTO + .toBuilder() + .tableId("case_duplicate_exact_write_test") + .schema(SchemaParser.toJson(tableSchema, false)) + .timePartitioning(null) + .clustering(null) + .tableVersion(INITIAL_TABLE_VERSION) + .build(); + + TableDto savedDto = openHouseInternalRepository.save(createDto); + + // Write with exact same casing as the table — normalization is skipped but sameSchema = true. + Schema writeSchema = + new Schema( + required(1, "id", Types.StringType.get()), optional(2, "ID", Types.StringType.get())); + + TableDto updateDto = + savedDto + .toBuilder() + .schema(SchemaParser.toJson(writeSchema, false)) + .tableVersion(savedDto.getTableLocation()) + .build(); + + Assertions.assertDoesNotThrow( + () -> openHouseInternalRepository.save(updateDto), + "save() with exact casing on a case-duplicate table must succeed"); + + TableDtoPrimaryKey primaryKey = + TableDtoPrimaryKey.builder() + .tableId("case_duplicate_exact_write_test") + .databaseId(TABLE_DTO.getDatabaseId()) + .build(); + openHouseInternalRepository.deleteById(primaryKey); + Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey)); + } + private TableDtoPrimaryKey getPrimaryKey(TableDto tableDto) { return TableDtoPrimaryKey.builder() .databaseId(tableDto.getDatabaseId()) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java index 2848b0f8e..69d6b90c3 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java @@ -211,6 +211,32 @@ void validateWriteSchema_passes_afterCasingNormalization() { "validateWriteSchema should succeed after casing normalization"); } + @Test + void validateWriteSchema_passes_afterCasingNormalization_withColumnAddition() { + // Table has "ID" (id=1); writer submits "id" (id=1) with an extra new column (id=2). + // After normalization "id" → "ID", sameSchema is false so validateWriteSchema is called. + // The evolution is valid (existing field ID unchanged, new column appended) — must succeed. + Schema tableSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get())); + Schema writeSchema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "new_col", Types.LongType.get())); + + assertFalse( + SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema), + "table has no case duplicates, normalization should apply"); + + Schema normalized = + BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema); + + assertEquals("ID", normalized.findField(1).name(), "existing column must be normalized"); + assertEquals("new_col", normalized.findField(2).name(), "new column must be preserved as-is"); + + assertDoesNotThrow( + () -> VALIDATOR.validateWriteSchema(tableSchema, normalized, "db.table"), + "validateWriteSchema must accept valid column addition after casing normalization"); + } + @Test void validateWriteSchema_fails_forCaseDuplicateTable_withMismatchedCasing() { // Table has case-duplicate columns — normalization is skipped for such tables.