Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import com.linkedin.openhouse.common.exception.InvalidSchemaEvolutionException;
import com.linkedin.openhouse.tables.repository.SchemaValidator;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.MapDifference;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.springframework.stereotype.Component;
Expand All @@ -13,9 +17,97 @@
* A base implementation which delegates the validation to iceberg library that is configured to: -
* not allow to write optional values to a required field. - allow input schema to have different
* ordering than table.
*
* <p>Also provides static helpers used by {@link OpenHouseInternalRepositoryImpl} to normalize a
* write schema's column names — at top level and at any nesting depth — to the casing already
* present in the table schema, enabling case-insensitive writes without mutating the table's
* existing column casing.
*/
@Component
public class BaseIcebergSchemaValidator implements SchemaValidator {

/**
* Rewrites every field name in {@code writeSchema} — at top level <em>and</em> inside nested
* structs, list elements, and map keys/values — to use the casing from {@code tableSchema},
* matched by Iceberg field ID. Fields whose ID is absent from {@code tableSchema} (genuinely new
* columns) are left unchanged. All other field attributes (type, doc, optional/required,
* identifier-field-ids) are preserved.
*
* <p>Uses {@code TypeUtil.indexById} to build a single flat {@code Map<Integer, NestedField>} for
* the entire table schema in one O(n) walk, giving O(1) name lookup at any depth. Uses {@code
* TypeUtil.visit} to recurse through the write schema — covering struct, list, and map container
* types exhaustively via Iceberg's visitor contract.
*/
static Schema normalizeSchemaCasingToTable(Schema writeSchema, Schema tableSchema) {
Comment thread
pandaamit91 marked this conversation as resolved.
// One O(n) walk over the full table schema tree; lookup at any depth is then O(1).
final Map<Integer, Types.NestedField> tableById = TypeUtil.indexById(tableSchema.asStruct());

Type rewritten =
TypeUtil.visit(
writeSchema,
new TypeUtil.SchemaVisitor<Type>() {
@Override
public Type schema(Schema s, Type structResult) {
return structResult;
}

@Override
public Type struct(Types.StructType struct, List<Type> fieldTypes) {
List<Types.NestedField> originals = struct.fields();
List<Types.NestedField> rebuilt = new ArrayList<>(originals.size());
for (int i = 0; i < originals.size(); i++) {
Types.NestedField original = originals.get(i);
Types.NestedField tableField = tableById.get(original.fieldId());
String name = (tableField != null) ? tableField.name() : original.name();
Type type = fieldTypes.get(i);
// Reuse the original if nothing changed (cheap reference-equality short-circuit).
if (name.equals(original.name()) && type == original.type()) {
rebuilt.add(original);
} else {
rebuilt.add(
original.isOptional()
? Types.NestedField.optional(
original.fieldId(), name, type, original.doc())
: Types.NestedField.required(
original.fieldId(), name, type, original.doc()));
}
}
return Types.StructType.of(rebuilt);
}

@Override
public Type field(Types.NestedField f, Type fieldResult) {
return fieldResult;
}

@Override
public Type list(Types.ListType list, Type elementResult) {
Types.NestedField elem = list.fields().get(0);
return elem.isOptional()
? Types.ListType.ofOptional(elem.fieldId(), elementResult)
: Types.ListType.ofRequired(elem.fieldId(), elementResult);
}

@Override
public Type map(Types.MapType map, Type keyResult, Type valueResult) {
Types.NestedField k = map.fields().get(0);
Types.NestedField v = map.fields().get(1);
return v.isOptional()
? Types.MapType.ofOptional(k.fieldId(), v.fieldId(), keyResult, valueResult)
: Types.MapType.ofRequired(k.fieldId(), v.fieldId(), keyResult, valueResult);
}

@Override
public Type primitive(Type.PrimitiveType p) {
return p;
}
});

Types.StructType normalizedStruct = (Types.StructType) rewritten;
return new Schema(
writeSchema.schemaId(), normalizedStruct.fields(), writeSchema.identifierFieldIds());
}

@Override
public void validateWriteSchema(Schema oldSchema, Schema newSchema, String tableUri)
throws InvalidSchemaEvolutionException, IllegalArgumentException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,15 @@ private boolean doUpdateSchemaIfNeeded(
Schema tableSchema,
TableDto tableDto,
UpdateProperties updateProperties) {
// Normalize top-level column names in writeSchema to use the casing already present in
// tableSchema (matched by Iceberg field ID). This enables case-insensitive writes: a writer
// that submits "id" for a table column named "ID" will have its schema normalized to "ID"
// before any comparison or storage, so the table's existing casing is never changed.
// Tables where two columns share a case-folded name are excluded (ambiguous target column).
if (!SchemaValidationUtil.hasDuplicateCaseInsensitiveColumnNames(tableSchema)) {
writeSchema =
BaseIcebergSchemaValidator.normalizeSchemaCasingToTable(writeSchema, tableSchema);
}
if (!writeSchema.sameSchema(tableSchema)) {
try {
schemaValidator.validateWriteSchema(tableSchema, writeSchema, tableDto.getTableUri());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.linkedin.openhouse.tables.repository.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

/**
* Utility for validating Iceberg schemas with respect to case-insensitive column naming.
*
* <p>Promoted from li-openhouse's {@code SchemaValidationUtil} to the upstream open-source repo so
* that both sides share a single implementation. The two callers want opposite outcomes from the
* same predicate:
*
* <ul>
* <li><b>li-openhouse write rejection</b>: if duplicates exist → reject the write.
* <li><b>{@link BaseIcebergSchemaValidator} normalization guard</b>: if duplicates exist → skip
* normalization (write may still succeed for exact-casing).
* </ul>
*
* <p>Coverage: struct fields at any nesting depth, list element types, and map key/value types.
* Fields with the same name in <em>different</em> structs (e.g. {@code user.id} and {@code
* session.id}) are correctly treated as independent — only <em>siblings</em> within the same struct
* are compared.
*/
public final class SchemaValidationUtil {

private SchemaValidationUtil() {}

/**
* Finds all sibling-field pairs at any nesting depth whose names are equal case-insensitively but
* not case-sensitively.
*
* @param schema the Iceberg schema to validate
* @return list of conflict descriptions, e.g. {@code "[userid, userId]"} or {@code "[col1.userid,
* col1.userId]"}; empty if no conflicts
*/
public static List<String> findDuplicateCaseInsensitiveColumnNames(Schema schema) {
List<String> conflicts = new ArrayList<>();
checkFieldsForDuplicates(schema.columns(), "", conflicts);
return conflicts;
}

/**
* Returns {@code true} if the schema has any case-insensitive duplicate field names at any
* nesting depth.
*/
public static boolean hasDuplicateCaseInsensitiveColumnNames(Schema schema) {
return !findDuplicateCaseInsensitiveColumnNames(schema).isEmpty();
}

/**
* Checks {@code fields} for sibling-level case-insensitive duplicates, records any conflicts, and
* recurses into each field's type.
*/
private static void checkFieldsForDuplicates(
List<Types.NestedField> fields, String pathPrefix, List<String> conflicts) {
Map<String, String> seenLowerToName = new HashMap<>();
for (Types.NestedField field : fields) {
String name = field.name();
String lower = name.toLowerCase(Locale.ROOT);
if (seenLowerToName.containsKey(lower)) {
String first = seenLowerToName.get(lower);
if (!first.equals(name)) {
String qualifier = pathPrefix.isEmpty() ? "" : pathPrefix + ".";
conflicts.add(String.format("[%s%s, %s%s]", qualifier, first, qualifier, name));
}
} else {
seenLowerToName.put(lower, name);
}
}
for (Types.NestedField field : fields) {
String childPath = pathPrefix.isEmpty() ? field.name() : pathPrefix + "." + field.name();
checkTypeForDuplicates(field.type(), childPath, conflicts);
}
}

/** Recurses into compound types (struct, list element, map key/value). Primitives are a no-op. */
private static void checkTypeForDuplicates(Type type, String path, List<String> conflicts) {
if (type instanceof Types.StructType) {
checkFieldsForDuplicates(((Types.StructType) type).fields(), path, conflicts);
} else if (type instanceof Types.ListType) {
checkTypeForDuplicates(((Types.ListType) type).elementType(), path, conflicts);
} else if (type instanceof Types.MapType) {
Types.MapType mapType = (Types.MapType) type;
checkTypeForDuplicates(mapType.keyType(), path, conflicts);
checkTypeForDuplicates(mapType.valueType(), path, conflicts);
}
// Primitive types have no nested fields; nothing to recurse into.
}
}
Loading