diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java
index 55931a7ac..f673d4f8b 100644
--- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidator.java
@@ -2,9 +2,13 @@
import com.linkedin.openhouse.common.exception.InvalidSchemaEvolutionException;
import com.linkedin.openhouse.tables.repository.SchemaValidator;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.MapDifference;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.springframework.stereotype.Component;
@@ -13,9 +17,97 @@
* A base implementation which delegates the validation to iceberg library that is configured to: -
* not allow to write optional values to a required field. - allow input schema to have different
* ordering than table.
+ *
+ *
Also provides static helpers used by {@link OpenHouseInternalRepositoryImpl} to normalize a
+ * write schema's column names — at top level and at any nesting depth — to the casing already
+ * present in the table schema, enabling case-insensitive writes without mutating the table's
+ * existing column casing.
*/
@Component
public class BaseIcebergSchemaValidator implements SchemaValidator {
+
+ /**
+ * Rewrites every field name in {@code writeSchema} — at top level and inside nested
+ * structs, list elements, and map keys/values — to use the casing from {@code tableSchema},
+ * matched by Iceberg field ID. Fields whose ID is absent from {@code tableSchema} (genuinely new
+ * columns) are left unchanged. All other field attributes (type, doc, optional/required,
+ * identifier-field-ids) are preserved.
+ *
+ *
Uses {@code TypeUtil.indexById} to build a single flat {@code Map} for
+ * the entire table schema in one O(n) walk, giving O(1) name lookup at any depth. Uses {@code
+ * TypeUtil.visit} to recurse through the write schema — covering struct, list, and map container
+ * types exhaustively via Iceberg's visitor contract.
+ */
+ static Schema normalizeSchemaCasingToTable(Schema writeSchema, Schema tableSchema) {
+ // One O(n) walk over the full table schema tree; lookup at any depth is then O(1).
+ final Map tableById = TypeUtil.indexById(tableSchema.asStruct());
+
+ Type rewritten =
+ TypeUtil.visit(
+ writeSchema,
+ new TypeUtil.SchemaVisitor() {
+ @Override
+ public Type schema(Schema s, Type structResult) {
+ return structResult;
+ }
+
+ @Override
+ public Type struct(Types.StructType struct, List fieldTypes) {
+ List originals = struct.fields();
+ List rebuilt = new ArrayList<>(originals.size());
+ for (int i = 0; i < originals.size(); i++) {
+ Types.NestedField original = originals.get(i);
+ Types.NestedField tableField = tableById.get(original.fieldId());
+ String name = (tableField != null) ? tableField.name() : original.name();
+ Type type = fieldTypes.get(i);
+ // Reuse the original if nothing changed (cheap reference-equality short-circuit).
+ if (name.equals(original.name()) && type == original.type()) {
+ rebuilt.add(original);
+ } else {
+ rebuilt.add(
+ original.isOptional()
+ ? Types.NestedField.optional(
+ original.fieldId(), name, type, original.doc())
+ : Types.NestedField.required(
+ original.fieldId(), name, type, original.doc()));
+ }
+ }
+ return Types.StructType.of(rebuilt);
+ }
+
+ @Override
+ public Type field(Types.NestedField f, Type fieldResult) {
+ return fieldResult;
+ }
+
+ @Override
+ public Type list(Types.ListType list, Type elementResult) {
+ Types.NestedField elem = list.fields().get(0);
+ return elem.isOptional()
+ ? Types.ListType.ofOptional(elem.fieldId(), elementResult)
+ : Types.ListType.ofRequired(elem.fieldId(), elementResult);
+ }
+
+ @Override
+ public Type map(Types.MapType map, Type keyResult, Type valueResult) {
+ Types.NestedField k = map.fields().get(0);
+ Types.NestedField v = map.fields().get(1);
+ return v.isOptional()
+ ? Types.MapType.ofOptional(k.fieldId(), v.fieldId(), keyResult, valueResult)
+ : Types.MapType.ofRequired(k.fieldId(), v.fieldId(), keyResult, valueResult);
+ }
+
+ @Override
+ public Type primitive(Type.PrimitiveType p) {
+ return p;
+ }
+ });
+
+ Types.StructType normalizedStruct = (Types.StructType) rewritten;
+ return new Schema(
+ writeSchema.schemaId(), normalizedStruct.fields(), writeSchema.identifierFieldIds());
+ }
+
@Override
public void validateWriteSchema(Schema oldSchema, Schema newSchema, String tableUri)
throws InvalidSchemaEvolutionException, IllegalArgumentException {
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java
index ff67eff19..ab545c2f5 100644
--- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java
@@ -461,6 +461,15 @@ private boolean doUpdateSchemaIfNeeded(
Schema tableSchema,
TableDto tableDto,
UpdateProperties updateProperties) {
+ // Normalize top-level column names in writeSchema to use the casing already present in
+ // tableSchema (matched by Iceberg field ID). This enables case-insensitive writes: a writer
+ // that submits "id" for a table column named "ID" will have its schema normalized to "ID"
+ // before any comparison or storage, so the table's existing casing is never changed.
+ // Tables where two columns share a case-folded name are excluded (ambiguous target column).
+ if (!SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema)) {
+ writeSchema =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+ }
if (!writeSchema.sameSchema(tableSchema)) {
try {
schemaValidator.validateWriteSchema(tableSchema, writeSchema, tableDto.getTableUri());
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtil.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtil.java
new file mode 100644
index 000000000..710c35c97
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtil.java
@@ -0,0 +1,95 @@
+package com.linkedin.openhouse.tables.repository.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Utility for validating Iceberg schemas with respect to case-insensitive column naming.
+ *
+ * Promoted from li-openhouse's {@code SchemaValidationUtil} to the upstream open-source repo so
+ * that both sides share a single implementation. The two callers want opposite outcomes from the
+ * same predicate:
+ *
+ *
+ * - li-openhouse write rejection: if duplicates exist → reject the write.
+ *
- {@link BaseIcebergSchemaValidator} normalization guard: if duplicates exist → skip
+ * normalization (write may still succeed for exact-casing).
+ *
+ *
+ * Coverage: struct fields at any nesting depth, list element types, and map key/value types.
+ * Fields with the same name in different structs (e.g. {@code user.id} and {@code
+ * session.id}) are correctly treated as independent — only siblings within the same struct
+ * are compared.
+ */
+public final class SchemaValidationUtil {
+
+ private SchemaValidationUtil() {}
+
+ /**
+ * Finds all sibling-field pairs at any nesting depth whose names are equal case-insensitively but
+ * not case-sensitively.
+ *
+ * @param schema the Iceberg schema to validate
+ * @return list of conflict descriptions, e.g. {@code "[userid, userId]"} or {@code "[col1.userid,
+ * col1.userId]"}; empty if no conflicts
+ */
+ public static List findDuplicateCaseInsensitiveColumnNames(Schema schema) {
+ List conflicts = new ArrayList<>();
+ checkFieldsForDuplicates(schema.columns(), "", conflicts);
+ return conflicts;
+ }
+
+ /**
+ * Returns {@code true} if the schema has any case-insensitive duplicate field names at any
+ * nesting depth.
+ */
+ public static boolean hasDuplicateCaseInsensitiveColumnNames(Schema schema) {
+ return !findDuplicateCaseInsensitiveColumnNames(schema).isEmpty();
+ }
+
+ /**
+ * Checks {@code fields} for sibling-level case-insensitive duplicates, records any conflicts, and
+ * recurses into each field's type.
+ */
+ private static void checkFieldsForDuplicates(
+ List fields, String pathPrefix, List conflicts) {
+ Map seenLowerToName = new HashMap<>();
+ for (Types.NestedField field : fields) {
+ String name = field.name();
+ String lower = name.toLowerCase(Locale.ROOT);
+ if (seenLowerToName.containsKey(lower)) {
+ String first = seenLowerToName.get(lower);
+ if (!first.equals(name)) {
+ String qualifier = pathPrefix.isEmpty() ? "" : pathPrefix + ".";
+ conflicts.add(String.format("[%s%s, %s%s]", qualifier, first, qualifier, name));
+ }
+ } else {
+ seenLowerToName.put(lower, name);
+ }
+ }
+ for (Types.NestedField field : fields) {
+ String childPath = pathPrefix.isEmpty() ? field.name() : pathPrefix + "." + field.name();
+ checkTypeForDuplicates(field.type(), childPath, conflicts);
+ }
+ }
+
+ /** Recurses into compound types (struct, list element, map key/value). Primitives are a no-op. */
+ private static void checkTypeForDuplicates(Type type, String path, List conflicts) {
+ if (type instanceof Types.StructType) {
+ checkFieldsForDuplicates(((Types.StructType) type).fields(), path, conflicts);
+ } else if (type instanceof Types.ListType) {
+ checkTypeForDuplicates(((Types.ListType) type).elementType(), path, conflicts);
+ } else if (type instanceof Types.MapType) {
+ Types.MapType mapType = (Types.MapType) type;
+ checkTypeForDuplicates(mapType.keyType(), path, conflicts);
+ checkTypeForDuplicates(mapType.valueType(), path, conflicts);
+ }
+ // Primitive types have no nested fields; nothing to recurse into.
+ }
+}
diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java
index a4ee20adc..6626a6da4 100644
--- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java
+++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java
@@ -870,6 +870,213 @@ public void testDefaultFileFormatWithFeatureToggle() {
Assertions.assertFalse(openHouseInternalRepository.existsById(key3));
}
+ // ===== Case-insensitive write normalization =====
+
+ @Test
+ void testCaseInsensitiveWrite_succeeds_andPreservesTableCasing() {
+ // Table is created with uppercase column name "ID".
+ // A subsequent save with lowercase "id" (same field id) should succeed and leave
+ // the stored schema using the original "ID" casing.
+ Schema tableSchema =
+ new Schema(
+ required(1, "ID", Types.StringType.get()), optional(2, "value", Types.LongType.get()));
+
+ TableDto createDto =
+ TABLE_DTO
+ .toBuilder()
+ .tableId("case_insensitive_write_test")
+ .schema(SchemaParser.toJson(tableSchema, false))
+ .timePartitioning(null)
+ .clustering(null)
+ .tableVersion(INITIAL_TABLE_VERSION)
+ .build();
+
+ TableDto savedDto = openHouseInternalRepository.save(createDto);
+
+ // Writer submits schema with lowercase "id" (same field id=1) — simulates a case-insensitive
+ // Spark/Trino write that sends column names in a different casing than the table.
+ Schema writeSchema =
+ new Schema(
+ required(1, "id", Types.StringType.get()), optional(2, "value", Types.LongType.get()));
+
+ TableDto updateDto =
+ savedDto
+ .toBuilder()
+ .schema(SchemaParser.toJson(writeSchema, false))
+ .tableVersion(savedDto.getTableLocation())
+ .build();
+
+ // Save should succeed without throwing InvalidSchemaEvolutionException
+ TableDto updatedDto =
+ Assertions.assertDoesNotThrow(
+ () -> openHouseInternalRepository.save(updateDto),
+ "save() with differently-cased column names should succeed");
+
+ // The stored schema must preserve the original table casing ("ID", not "id")
+ Schema storedSchema = SchemaParser.fromJson(updatedDto.getSchema());
+ Assertions.assertEquals(
+ "ID",
+ storedSchema.findField(1).name(),
+ "Table casing must be preserved after a case-insensitive write");
+
+ TableDtoPrimaryKey primaryKey =
+ TableDtoPrimaryKey.builder()
+ .tableId("case_insensitive_write_test")
+ .databaseId(TABLE_DTO.getDatabaseId())
+ .build();
+ openHouseInternalRepository.deleteById(primaryKey);
+ Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey));
+ }
+
+ @Test
+ void testCaseInsensitiveWrite_blockedForCaseDuplicateTable() {
+ // A table with case-duplicate columns (both "id" and "ID") must NOT apply normalization.
+ // A write with mismatched casing on such a table should still throw.
+ Schema tableSchema =
+ new Schema(
+ required(1, "id", Types.StringType.get()), optional(2, "ID", Types.StringType.get()));
+
+ TableDto createDto =
+ TABLE_DTO
+ .toBuilder()
+ .tableId("case_duplicate_write_test")
+ .schema(SchemaParser.toJson(tableSchema, false))
+ .timePartitioning(null)
+ .clustering(null)
+ .tableVersion(INITIAL_TABLE_VERSION)
+ .build();
+
+ TableDto savedDto = openHouseInternalRepository.save(createDto);
+
+ // Writer sends "Id" for field id=1 (table has "id") — casing mismatch on a case-dup table
+ Schema writeSchema =
+ new Schema(
+ required(1, "Id", Types.StringType.get()), optional(2, "ID", Types.StringType.get()));
+
+ TableDto updateDto =
+ savedDto
+ .toBuilder()
+ .schema(SchemaParser.toJson(writeSchema, false))
+ .tableVersion(savedDto.getTableLocation())
+ .build();
+
+ Assertions.assertThrows(
+ InvalidSchemaEvolutionException.class,
+ () -> openHouseInternalRepository.save(updateDto),
+ "save() with mismatched casing on a case-duplicate table must throw");
+
+ TableDtoPrimaryKey primaryKey =
+ TableDtoPrimaryKey.builder()
+ .tableId("case_duplicate_write_test")
+ .databaseId(TABLE_DTO.getDatabaseId())
+ .build();
+ openHouseInternalRepository.deleteById(primaryKey);
+ Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey));
+ }
+
+ @Test
+ void testCaseInsensitiveWrite_succeedsWithColumnAddition_andPreservesTableCasing() {
+ // Path B: writer has wrong casing on an existing column AND adds a new column.
+ // Normalization must fix the existing column casing first; then validateWriteSchema
+ // sees a valid evolution (existing IDs intact, new column appended) and must accept it.
+ Schema tableSchema =
+ new Schema(
+ required(1, "ID", Types.StringType.get()), optional(2, "value", Types.LongType.get()));
+
+ TableDto createDto =
+ TABLE_DTO
+ .toBuilder()
+ .tableId("case_insensitive_evolution_test")
+ .schema(SchemaParser.toJson(tableSchema, false))
+ .timePartitioning(null)
+ .clustering(null)
+ .tableVersion(INITIAL_TABLE_VERSION)
+ .build();
+
+ TableDto savedDto = openHouseInternalRepository.save(createDto);
+
+ // Writer submits wrong casing on "ID" (sends "id") and also adds new column "new_col" (id=3).
+ Schema writeSchema =
+ new Schema(
+ required(1, "id", Types.StringType.get()),
+ optional(2, "value", Types.LongType.get()),
+ optional(3, "new_col", Types.LongType.get()));
+
+ TableDto updateDto =
+ savedDto
+ .toBuilder()
+ .schema(SchemaParser.toJson(writeSchema, false))
+ .tableVersion(savedDto.getTableLocation())
+ .build();
+
+ // After normalization "id" → "ID", sameSchema is false (new_col added), so
+ // validateWriteSchema is invoked and must accept the valid column addition.
+ TableDto updatedDto =
+ Assertions.assertDoesNotThrow(
+ () -> openHouseInternalRepository.save(updateDto),
+ "save() with casing mismatch + new column should succeed");
+
+ Schema storedSchema = SchemaParser.fromJson(updatedDto.getSchema());
+ Assertions.assertEquals(
+ "ID", storedSchema.findField(1).name(), "Existing column casing must be preserved");
+ Assertions.assertNotNull(
+ storedSchema.findField(3), "New column must be present in stored schema");
+
+ TableDtoPrimaryKey primaryKey =
+ TableDtoPrimaryKey.builder()
+ .tableId("case_insensitive_evolution_test")
+ .databaseId(TABLE_DTO.getDatabaseId())
+ .build();
+ openHouseInternalRepository.deleteById(primaryKey);
+ Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey));
+ }
+
+ @Test
+ void testCaseInsensitiveWrite_caseDuplicateTable_succeedsWithExactCasing() {
+ // Path D: table has case-duplicate columns — normalization guard skips normalization.
+ // A write with exactly matching casing must still succeed (sameSchema = true).
+ // This verifies the guard does not break legitimate writes to legacy case-duplicate tables.
+ Schema tableSchema =
+ new Schema(
+ required(1, "id", Types.StringType.get()), optional(2, "ID", Types.StringType.get()));
+
+ TableDto createDto =
+ TABLE_DTO
+ .toBuilder()
+ .tableId("case_duplicate_exact_write_test")
+ .schema(SchemaParser.toJson(tableSchema, false))
+ .timePartitioning(null)
+ .clustering(null)
+ .tableVersion(INITIAL_TABLE_VERSION)
+ .build();
+
+ TableDto savedDto = openHouseInternalRepository.save(createDto);
+
+ // Write with exact same casing as the table — normalization is skipped but sameSchema = true.
+ Schema writeSchema =
+ new Schema(
+ required(1, "id", Types.StringType.get()), optional(2, "ID", Types.StringType.get()));
+
+ TableDto updateDto =
+ savedDto
+ .toBuilder()
+ .schema(SchemaParser.toJson(writeSchema, false))
+ .tableVersion(savedDto.getTableLocation())
+ .build();
+
+ Assertions.assertDoesNotThrow(
+ () -> openHouseInternalRepository.save(updateDto),
+ "save() with exact casing on a case-duplicate table must succeed");
+
+ TableDtoPrimaryKey primaryKey =
+ TableDtoPrimaryKey.builder()
+ .tableId("case_duplicate_exact_write_test")
+ .databaseId(TABLE_DTO.getDatabaseId())
+ .build();
+ openHouseInternalRepository.deleteById(primaryKey);
+ Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey));
+ }
+
private TableDtoPrimaryKey getPrimaryKey(TableDto tableDto) {
return TableDtoPrimaryKey.builder()
.databaseId(tableDto.getDatabaseId())
diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java
new file mode 100644
index 000000000..69d6b90c3
--- /dev/null
+++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/BaseIcebergSchemaValidatorTest.java
@@ -0,0 +1,262 @@
+package com.linkedin.openhouse.tables.repository.impl;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import com.linkedin.openhouse.common.exception.InvalidSchemaEvolutionException;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+public class BaseIcebergSchemaValidatorTest {
+
+ private static final BaseIcebergSchemaValidator VALIDATOR = new BaseIcebergSchemaValidator();
+
+ // ===== normalizeSchemaCasingToTable =====
+
+ @Test
+ void normalizeSchemaCasingToTable_noChange_whenCasingAlreadyMatches() {
+ Schema tableSchema =
+ new Schema(
+ Types.NestedField.required(1, "ID", Types.StringType.get()),
+ Types.NestedField.optional(2, "Name", Types.StringType.get()));
+ Schema writeSchema =
+ new Schema(
+ Types.NestedField.required(1, "ID", Types.StringType.get()),
+ Types.NestedField.optional(2, "Name", Types.StringType.get()));
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ assertEquals("ID", normalized.findField(1).name());
+ assertEquals("Name", normalized.findField(2).name());
+ }
+
+ @Test
+ void normalizeSchemaCasingToTable_renamesColumn_toMatchTableCasing() {
+ // Table has "ID" (uppercase), writer submits "id" (lowercase)
+ Schema tableSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get()));
+ Schema writeSchema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get()));
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ assertEquals("ID", normalized.findField(1).name());
+ }
+
+ @Test
+ void normalizeSchemaCasingToTable_preservesNewColumns_unchanged() {
+ // Table has id=1, writer adds a new column id=2 with different-cased name
+ Schema tableSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get()));
+ Schema writeSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(2, "newCol", Types.LongType.get()));
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ // Existing column normalized to table casing
+ assertEquals("ID", normalized.findField(1).name());
+ // New column left unchanged
+ assertEquals("newCol", normalized.findField(2).name());
+ }
+
+ @Test
+ void normalizeSchemaCasingToTable_preservesOptionalRequired() {
+ Schema tableSchema =
+ new Schema(
+ Types.NestedField.required(1, "ID", Types.StringType.get()),
+ Types.NestedField.optional(2, "Name", Types.StringType.get()));
+ Schema writeSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(2, "name", Types.StringType.get()));
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ assertFalse(normalized.findField(1).isOptional(), "id=1 should remain required");
+ assertTrue(normalized.findField(2).isOptional(), "id=2 should remain optional");
+ }
+
+ @Test
+ void normalizeSchemaCasingToTable_preservesFieldDoc() {
+ Schema tableSchema =
+ new Schema(Types.NestedField.optional(1, "ID", Types.StringType.get(), "the identifier"));
+ Schema writeSchema =
+ new Schema(Types.NestedField.optional(1, "id", Types.StringType.get(), "the identifier"));
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ assertEquals("the identifier", normalized.findField(1).doc());
+ }
+
+ @Test
+ void normalizeSchemaCasingToTable_renamesFieldsInsideNestedStruct() {
+ // Table: event: struct
+ // Writer: event: struct (wrong casing at nested level)
+ Schema tableSchema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "event",
+ Types.StructType.of(
+ Types.NestedField.required(2, "EventId", Types.StringType.get()),
+ Types.NestedField.optional(3, "Value", Types.LongType.get()))));
+ Schema writeSchema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "event",
+ Types.StructType.of(
+ Types.NestedField.required(2, "eventid", Types.StringType.get()),
+ Types.NestedField.optional(3, "value", Types.LongType.get()))));
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ Types.StructType nestedStruct = normalized.findField(1).type().asStructType();
+ assertEquals("EventId", nestedStruct.field(2).name());
+ assertEquals("Value", nestedStruct.field(3).name());
+ }
+
+ @Test
+ void normalizeSchemaCasingToTable_renamesFieldsInsideListElement() {
+ // Table: events: list>
+ // Writer: events: list>
+ Schema tableSchema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "events",
+ Types.ListType.ofOptional(
+ 2,
+ Types.StructType.of(
+ Types.NestedField.required(3, "EventId", Types.StringType.get())))));
+ Schema writeSchema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "events",
+ Types.ListType.ofOptional(
+ 2,
+ Types.StructType.of(
+ Types.NestedField.required(3, "eventid", Types.StringType.get())))));
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ Types.ListType listType = (Types.ListType) normalized.findField(1).type();
+ Types.StructType elementStruct = listType.elementType().asStructType();
+ assertEquals("EventId", elementStruct.field(3).name());
+ }
+
+ @Test
+ void normalizeSchemaCasingToTable_renamesFieldsInsideMapValue() {
+ // Table: metadata: map>
+ // Writer: metadata: map>
+ Schema tableSchema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "metadata",
+ Types.MapType.ofOptional(
+ 2,
+ 3,
+ Types.StringType.get(),
+ Types.StructType.of(
+ Types.NestedField.required(4, "Key", Types.StringType.get()),
+ Types.NestedField.optional(5, "Val", Types.LongType.get())))));
+ Schema writeSchema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "metadata",
+ Types.MapType.ofOptional(
+ 2,
+ 3,
+ Types.StringType.get(),
+ Types.StructType.of(
+ Types.NestedField.required(4, "key", Types.StringType.get()),
+ Types.NestedField.optional(5, "val", Types.LongType.get())))));
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ Types.MapType mapType = (Types.MapType) normalized.findField(1).type();
+ Types.StructType valueStruct = mapType.valueType().asStructType();
+ assertEquals("Key", valueStruct.field(4).name());
+ assertEquals("Val", valueStruct.field(5).name());
+ }
+
+ // ===== Integration: validateWriteSchema passes after normalization =====
+
+ @Test
+ void validateWriteSchema_passes_afterCasingNormalization() {
+ // Table has "ID" (uppercase); writer submits "id" (lowercase) — same field ID
+ Schema tableSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get()));
+ Schema writeSchema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get()));
+
+ assertFalse(
+ SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema),
+ "table has no case duplicates, normalization should apply");
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ // After normalization, validation should pass without exception
+ assertDoesNotThrow(
+ () -> VALIDATOR.validateWriteSchema(tableSchema, normalized, "db.table"),
+ "validateWriteSchema should succeed after casing normalization");
+ }
+
+ @Test
+ void validateWriteSchema_passes_afterCasingNormalization_withColumnAddition() {
+ // Table has "ID" (id=1); writer submits "id" (id=1) with an extra new column (id=2).
+ // After normalization "id" → "ID", sameSchema is false so validateWriteSchema is called.
+ // The evolution is valid (existing field ID unchanged, new column appended) — must succeed.
+ Schema tableSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get()));
+ Schema writeSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(2, "new_col", Types.LongType.get()));
+
+ assertFalse(
+ SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema),
+ "table has no case duplicates, normalization should apply");
+
+ Schema normalized =
+ BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
+
+ assertEquals("ID", normalized.findField(1).name(), "existing column must be normalized");
+ assertEquals("new_col", normalized.findField(2).name(), "new column must be preserved as-is");
+
+ assertDoesNotThrow(
+ () -> VALIDATOR.validateWriteSchema(tableSchema, normalized, "db.table"),
+ "validateWriteSchema must accept valid column addition after casing normalization");
+ }
+
+ @Test
+ void validateWriteSchema_fails_forCaseDuplicateTable_withMismatchedCasing() {
+ // Table has case-duplicate columns — normalization is skipped for such tables.
+ // Validator should still reject a write with mismatched casing on these tables.
+ Schema tableSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(2, "ID", Types.StringType.get()));
+ Schema writeSchema =
+ new Schema(
+ Types.NestedField.required(1, "Id", Types.StringType.get()),
+ Types.NestedField.optional(2, "ID", Types.StringType.get()));
+
+ assertTrue(
+ SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema),
+ "table should be detected as having case-duplicate columns");
+
+ // Since normalization is skipped, validateWriteSchema sees "Id" vs expected "id" → failure
+ assertThrows(
+ InvalidSchemaEvolutionException.class,
+ () -> VALIDATOR.validateWriteSchema(tableSchema, writeSchema, "db.table"));
+ }
+}
diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtilTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtilTest.java
new file mode 100644
index 000000000..1ef134b7c
--- /dev/null
+++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/repository/impl/SchemaValidationUtilTest.java
@@ -0,0 +1,168 @@
+package com.linkedin.openhouse.tables.repository.impl;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+class SchemaValidationUtilTest {
+
+ // ===== hasDuplicateCaseInsensitiveColumnNames =====
+
+ @Test
+ void hasDuplicateCaseInsensitiveColumnNames_returnsFalse_whenAllNamesUnique() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(2, "name", Types.StringType.get()));
+ assertFalse(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema));
+ }
+
+ @Test
+ void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_whenTwoColumnsDifferOnlyInCase() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(2, "ID", Types.StringType.get()));
+ assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema));
+ }
+
+ @Test
+ void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_forMixedCaseDuplicate() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "datePartition", Types.StringType.get()),
+ Types.NestedField.optional(2, "datepartition", Types.StringType.get()),
+ Types.NestedField.optional(3, "value", Types.LongType.get()));
+ assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema));
+ }
+
+ @Test
+ void hasDuplicateCaseInsensitiveColumnNames_returnsFalse_whenSingleColumn() {
+ Schema schema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get()));
+ assertFalse(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema));
+ }
+
+ @Test
+ void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_forCaseDuplicateInsideNestedStruct() {
+ // event: struct — siblings inside the nested struct are duplicates
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "event",
+ Types.StructType.of(
+ Types.NestedField.required(2, "ID", Types.StringType.get()),
+ Types.NestedField.optional(3, "id", Types.StringType.get()))));
+ assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema));
+ }
+
+ @Test
+ void hasDuplicateCaseInsensitiveColumnNames_returnsFalse_forSameNameInDifferentStructs() {
+ // user.id and session.id are in different structs — not siblings, not duplicates
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "user",
+ Types.StructType.of(Types.NestedField.required(2, "id", Types.StringType.get()))),
+ Types.NestedField.optional(
+ 3,
+ "session",
+ Types.StructType.of(Types.NestedField.required(4, "id", Types.StringType.get()))));
+ assertFalse(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema));
+ }
+
+ @Test
+ void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_forCaseDuplicateInsideListElement() {
+ // events: list> — duplicate inside the list element struct
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "events",
+ Types.ListType.ofOptional(
+ 2,
+ Types.StructType.of(
+ Types.NestedField.required(3, "ID", Types.StringType.get()),
+ Types.NestedField.optional(4, "id", Types.StringType.get())))));
+ assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema));
+ }
+
+ @Test
+ void hasDuplicateCaseInsensitiveColumnNames_returnsTrue_forCaseDuplicateInsideMapValue() {
+ // metadata: map> — duplicate in map value struct
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "metadata",
+ Types.MapType.ofOptional(
+ 2,
+ 3,
+ Types.StringType.get(),
+ Types.StructType.of(
+ Types.NestedField.required(4, "ID", Types.StringType.get()),
+ Types.NestedField.optional(5, "id", Types.StringType.get())))));
+ assertTrue(SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(schema));
+ }
+
+ // ===== findDuplicateCaseInsensitiveColumnNames =====
+
+ @Test
+ void findDuplicateCaseInsensitiveColumnNames_returnsEmpty_whenNoConflicts() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "userid", Types.StringType.get()),
+ Types.NestedField.optional(2, "name", Types.StringType.get()));
+ assertTrue(SchemaValidationUtil.findDuplicateCaseInsensitiveColumnNames(schema).isEmpty());
+ }
+
+ @Test
+ void findDuplicateCaseInsensitiveColumnNames_returnsConflict_forTopLevelDuplicate() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "userid", Types.StringType.get()),
+ Types.NestedField.required(2, "userId", Types.StringType.get()));
+ List conflicts = SchemaValidationUtil.findDuplicateCaseInsensitiveColumnNames(schema);
+ assertEquals(1, conflicts.size());
+ assertTrue(conflicts.get(0).contains("userid") && conflicts.get(0).contains("userId"));
+ }
+
+ @Test
+ void findDuplicateCaseInsensitiveColumnNames_returnsConflictWithPath_forNestedDuplicate() {
+ // col1: struct
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(
+ 1,
+ "col1",
+ Types.StructType.of(
+ Types.NestedField.required(2, "userid", Types.StringType.get()),
+ Types.NestedField.required(3, "userId", Types.StringType.get()))));
+ List conflicts = SchemaValidationUtil.findDuplicateCaseInsensitiveColumnNames(schema);
+ assertEquals(1, conflicts.size());
+ assertTrue(
+ conflicts.get(0).contains("col1.userid") && conflicts.get(0).contains("col1.userId"));
+ }
+
+ @Test
+ void findDuplicateCaseInsensitiveColumnNames_returnsConflictWithPath_forListElementDuplicate() {
+ // events: list>
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional(
+ 1,
+ "events",
+ Types.ListType.ofOptional(
+ 2,
+ Types.StructType.of(
+ Types.NestedField.required(3, "ID", Types.StringType.get()),
+ Types.NestedField.optional(4, "id", Types.StringType.get())))));
+ List conflicts = SchemaValidationUtil.findDuplicateCaseInsensitiveColumnNames(schema);
+ assertEquals(1, conflicts.size());
+ assertTrue(conflicts.get(0).contains("events.ID") && conflicts.get(0).contains("events.id"));
+ }
+}