diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index 42f51a9bd..cbbf9a3aa 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -13,6 +13,7 @@ import com.linkedin.openhouse.jobs.util.SparkJobUtil; import com.linkedin.openhouse.jobs.util.TableStatsCollector; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; @@ -347,8 +348,9 @@ private Map> prepareBackupDataManifests( private void writeBackupDataManifests( Map> manifestCache, Table table, String backupDir, ZonedDateTime now) { - for (String partitionPath : manifestCache.keySet()) { - List files = manifestCache.get(partitionPath); + for (Map.Entry> entry : manifestCache.entrySet()) { + String partitionPath = entry.getKey(); + List files = entry.getValue(); List backupFiles = files.stream() .map(file -> getTrashPath(table, file, backupDir).toString()) @@ -367,7 +369,7 @@ private void writeBackupDataManifests( fs.mkdirs(destPath.getParent()); } try (FSDataOutputStream out = fs.create(destPath, true)) { - out.write(jsonStr.getBytes()); + out.write(jsonStr.getBytes(StandardCharsets.UTF_8)); } log.info("Wrote {} with {} backup files", destPath, backupFiles.size()); } catch (IOException e) { diff --git a/gradle/spotbugs/spotbugsExclude.xml b/gradle/spotbugs/spotbugsExclude.xml index d0529a54e..efbb7e789 100644 --- a/gradle/spotbugs/spotbugsExclude.xml +++ b/gradle/spotbugs/spotbugsExclude.xml @@ -53,4 +53,38 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/SparkTestBase.java b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/SparkTestBase.java index ec5ee9c67..0e5080ee9 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/SparkTestBase.java +++ b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/SparkTestBase.java @@ -39,7 +39,7 @@ public void beforeEach(ExtensionContext context) throws Exception { "spark.sql.extensions", ("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions," + "com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions")) - .config("spark.sql.catalog.openhouse", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.openhouse", "com.linkedin.openhouse.spark.OHSparkCatalog") .config( "spark.sql.catalog.openhouse.catalog-impl", "com.linkedin.openhouse.spark.OpenHouseCatalog") diff --git a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CTASNonNullTest.java b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CTASNonNullTest.java index beb812daf..d969ae706 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CTASNonNullTest.java +++ b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CTASNonNullTest.java @@ -24,7 +24,8 @@ public void testCTASPreservesNonNull() throws Exception { // Verify spark catalogs have correct classes configured assertEquals( - "org.apache.iceberg.spark.SparkCatalog", spark.conf().get("spark.sql.catalog.openhouse")); + "com.linkedin.openhouse.spark.OHSparkCatalog", + spark.conf().get("spark.sql.catalog.openhouse")); // Verify id column is preserved in good catalog, not preserved in bad catalog assertFalse(sourceSchema.apply("id").nullable(), "Source table id column should be required"); diff --git a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java index 4d5641124..1618a2bdd 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java +++ b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java @@ -411,4 +411,179 @@ public void testAlterTableSortOrderCTAS() throws Exception { Assertions.assertEquals(SortOrder.unsorted(), newSqlTable.sortOrder()); } } + + // ===== Case-insensitive reads (OHCaseInsensitiveResolveRule) ===== + + @Test + public void testReadWithCaseMismatchSucceeds_andDoesNotChangeCaseSensitiveConfig() + throws Exception { + try (SparkSession spark = getSparkSession()) { + // Create a table with uppercase column "ID" — typical for tables originally created + // by Hive or engines that preserve user-specified casing. + Catalog catalog = getOpenHouseCatalog(spark); + Schema schema = + new Schema( + Types.NestedField.required(1, "ID", Types.StringType.get()), + Types.NestedField.optional(2, "value", Types.LongType.get())); + catalog.createTable(TableIdentifier.of("d1", "case_read_test"), schema); + spark.sql("INSERT INTO openhouse.d1.case_read_test VALUES ('row1', 42)"); + + // With caseSensitive=true, vanilla Spark would reject "id" as unresolved against "ID". + // OHCaseInsensitiveResolveRule normalizes the attribute before ResolveReferences runs, + // so the query must succeed — and crucially must NOT change the session setting. + spark.conf().set("spark.sql.caseSensitive", "true"); + try { + List rows = spark.sql("SELECT id FROM openhouse.d1.case_read_test").collectAsList(); + Assertions.assertEquals(1, rows.size()); + Assertions.assertEquals("row1", rows.get(0).getString(0)); + + // The rule must NOT mutate spark.sql.caseSensitive — that is the whole point of moving + // away from the session-level override approach. + Assertions.assertEquals( + "true", + spark.conf().get("spark.sql.caseSensitive"), + "OHCaseInsensitiveResolveRule must not modify spark.sql.caseSensitive"); + } finally { + spark.conf().set("spark.sql.caseSensitive", "false"); + spark.sql("DROP TABLE openhouse.d1.case_read_test"); + } + } + } + + @Test + public void testViewWithCaseMismatchResolvesViaRule() throws Exception { + try (SparkSession spark = getSparkSession()) { + // Create a table with uppercase "ID" column. + Catalog catalog = getOpenHouseCatalog(spark); + Schema schema = + new Schema( + Types.NestedField.required(1, "ID", Types.StringType.get()), + Types.NestedField.optional(2, "count", Types.LongType.get())); + catalog.createTable(TableIdentifier.of("d1", "view_case_test"), schema); + spark.sql("INSERT INTO openhouse.d1.view_case_test VALUES ('a', 1), ('b', 2)"); + + // View defined with explicit lowercase column references against a table that stores "ID". + // With caseSensitive=true and without the rule both the view definition and any outer query + // referencing a mismatched column name would fail to resolve. + spark.sql( + "CREATE OR REPLACE TEMP VIEW v_case AS " + + "SELECT id, count FROM openhouse.d1.view_case_test"); + + // View defined with SELECT * — columns come from star-expansion over the OH table schema. + spark.sql( + "CREATE OR REPLACE TEMP VIEW v_case_star AS " + + "SELECT * FROM openhouse.d1.view_case_test"); + + spark.conf().set("spark.sql.caseSensitive", "true"); + try { + // SELECT * from explicit-column view: the rule normalises the inlined view SQL and the + // query must return all rows with correct values. + List selectStar = spark.sql("SELECT * FROM v_case ORDER BY id").collectAsList(); + Assertions.assertEquals(2, selectStar.size()); + Assertions.assertEquals("a", selectStar.get(0).getString(0)); + Assertions.assertEquals("b", selectStar.get(1).getString(0)); + + // SELECT id from explicit-column view: the outer query uses a case-mismatched column + // reference against the view output — the rule must normalise it at the outer level too. + List selectId = spark.sql("SELECT id FROM v_case ORDER BY id").collectAsList(); + Assertions.assertEquals(2, selectId.size()); + Assertions.assertEquals("a", selectId.get(0).getString(0)); + Assertions.assertEquals("b", selectId.get(1).getString(0)); + + // SELECT * from star view: star-expansion over the OH table schema produces "ID"; the + // rule must make reading it back case-insensitive. + List starViewSelectStar = + spark.sql("SELECT * FROM v_case_star ORDER BY id").collectAsList(); + Assertions.assertEquals(2, starViewSelectStar.size()); + Assertions.assertEquals("a", starViewSelectStar.get(0).getString(0)); + Assertions.assertEquals("b", starViewSelectStar.get(1).getString(0)); + + // SELECT id from star view: explicit column reference with case mismatch against + // star-expanded view output must also resolve correctly. + List starViewSelectId = + spark.sql("SELECT id FROM v_case_star ORDER BY id").collectAsList(); + Assertions.assertEquals(2, starViewSelectId.size()); + Assertions.assertEquals("a", starViewSelectId.get(0).getString(0)); + Assertions.assertEquals("b", starViewSelectId.get(1).getString(0)); + + Assertions.assertEquals( + "true", + spark.conf().get("spark.sql.caseSensitive"), + "OHCaseInsensitiveResolveRule must not modify spark.sql.caseSensitive"); + } finally { + spark.conf().set("spark.sql.caseSensitive", "false"); + spark.sql("DROP TABLE openhouse.d1.view_case_test"); + } + } + } + + @Test + public void testCaseDuplicateTableIsExcludedFromNormalization() throws Exception { + try (SparkSession spark = getSparkSession()) { + // Case-duplicate tables (columns "id" and "ID") are rejected at some enforcement layer. + // If the server validates at CREATE time it throws BadRequestException; if it allows + // creation then OHCaseInsensitiveResolveRule skips normalization for such tables and Spark's + // own ResolveReferences raises AnalysisException for the ambiguous column reference. + // Either way, case-duplicate column names cannot be silently misdirected. + Catalog catalog = getOpenHouseCatalog(spark); + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "ID", Types.StringType.get())); + + try { + catalog.createTable(TableIdentifier.of("d1", "case_dup_test"), schema); + // Server allowed creation; verify Spark raises an error for the ambiguous reference. + try { + Assertions.assertThrows( + Exception.class, + () -> spark.sql("SELECT id FROM openhouse.d1.case_dup_test").collectAsList(), + "Ambiguous column reference on case-duplicate table must throw"); + } finally { + spark.sql("DROP TABLE openhouse.d1.case_dup_test"); + } + } catch (Exception ignored) { + // Server rejected the case-duplicate schema at CREATE time — also correct behavior. + } + } + } + + @Test + public void testWriteWithCaseMismatch_succeedsWithCaseSensitiveTrue() throws Exception { + try (SparkSession spark = getSparkSession()) { + // Create a table with uppercase column "ID" — the common case for tables originally created + // by Hive or engines that preserve user-specified casing. + Catalog catalog = getOpenHouseCatalog(spark); + Schema schema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get())); + catalog.createTable(TableIdentifier.of("d1", "write_case_test"), schema); + + // With caseSensitive=true, Spark's ResolveOutputRelation uses a case-sensitive resolver and + // cannot find source column "id" in the target schema column "ID". Vanilla Spark would throw + // "Cannot find data for output column 'ID'" at analysis time. + // + // OHSparkCatalog advertises ACCEPT_ANY_SCHEMA so outputResolved=true and + // ResolveOutputRelation skips OH writes. OHWriteSchemaNormalizationRule (post-hoc) then + // inserts a Project(Alias("id" → "ID")) so Iceberg sees the correct stored casing. + spark.conf().set("spark.sql.caseSensitive", "true"); + try { + Assertions.assertDoesNotThrow( + () -> spark.sql("SELECT 'row1' AS id").writeTo("openhouse.d1.write_case_test").append(), + "writeTo().append() must succeed when source has lowercase 'id' and OH table has 'ID'"); + + // Verify the row was written with the correct stored casing. + List rows = spark.sql("SELECT id FROM openhouse.d1.write_case_test").collectAsList(); + Assertions.assertEquals(1, rows.size()); + Assertions.assertEquals("row1", rows.get(0).getString(0)); + + // The rule must NOT mutate spark.sql.caseSensitive. + Assertions.assertEquals( + "true", + spark.conf().get("spark.sql.caseSensitive"), + "OHWriteSchemaNormalizationRule must not modify spark.sql.caseSensitive"); + } finally { + spark.conf().set("spark.sql.caseSensitive", "false"); + spark.sql("DROP TABLE openhouse.d1.write_case_test"); + } + } + } } diff --git a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/extensions/OHCaseInsensitiveResolveRuleTest.java b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/extensions/OHCaseInsensitiveResolveRuleTest.java new file mode 100644 index 000000000..104a7ffc4 --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/extensions/OHCaseInsensitiveResolveRuleTest.java @@ -0,0 +1,328 @@ +package com.linkedin.openhouse.spark.e2e.extensions; + +import static com.linkedin.openhouse.spark.MockHelpers.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; + +import com.linkedin.openhouse.spark.SparkTestBase; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Random; +import lombok.SneakyThrows; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Tests for {@link com.linkedin.openhouse.spark.extensions.OHCaseInsensitiveResolveRule}. These + * tests use a mock OH server so they can simulate a case-duplicate table (one whose column names + * differ only in casing, e.g. "id" and "ID") without being blocked by the OH server's schema + * validation, which rejects such schemas on creation. + */ +@ExtendWith(SparkTestBase.class) +public class OHCaseInsensitiveResolveRuleTest { + + /** + * Verifies that {@code OHCaseInsensitiveResolveRule} excludes case-duplicate tables from + * normalization and does NOT silently resolve a mixed-case column reference to the wrong column. + * + *

Setup: a pre-existing OH table with columns "id" (field 1) and "ID" (field 2). With {@code + * spark.sql.caseSensitive=true}, a reference to {@code Id} (neither exact case) must NOT be + * silently redirected to "ID" by the rule. Instead the rule must leave the plan unchanged so + * Spark's own {@code ResolveReferences} reports an unresolved attribute. + * + *

Without the case-duplicate guard the rule's map would contain {@code "id" -> "ID"} (last + * write wins), causing {@code Id} to be renamed to {@code "ID"} and resolved silently to the + * wrong field. The guard prevents this by returning an empty mapping for case-duplicate tables. + */ + @SneakyThrows + @Test + public void testCaseDuplicateTable_mixedCaseRef_doesNotSilentlyNormalize() { + // Create an Iceberg table with case-duplicate schema directly via the Iceberg Java API, + // bypassing both Spark SQL and the OH server (neither allows duplicate-cased column names). + // This simulates a table created before the server-side validation was introduced. + TableIdentifier tableId = TableIdentifier.of("dbCaseDupRule", "caseDupTbl"); + Schema caseDupSchema = + new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.optional(2, "ID", Types.StringType.get())); + + String warehouse = spark.conf().get("spark.sql.catalog.testhelper.warehouse"); + Catalog hadoopCatalog = + CatalogUtil.buildIcebergCatalog( + "testhelper_caseduptable", + ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, warehouse, ICEBERG_CATALOG_TYPE, "hadoop"), + new Configuration()); + hadoopCatalog.createTable(tableId, caseDupSchema); + + // Derive a valid metadata.json path the same way MockHelpers.craftMetadataLocation does. + Table table = hadoopCatalog.loadTable(tableId); + Path metadataPath = + Paths.get(((BaseTable) table).operations().current().metadataFileLocation()); + String copiedMetadata = + Files.copy( + metadataPath, + metadataPath.resolveSibling( + new Random().nextInt(Integer.MAX_VALUE) + "-.metadata.json")) + .toString(); + + // Mock the OH catalog to return the case-duplicate table. + mockTableService.enqueue( + mockResponse( + 200, + mockGetTableResponseBody( + "dbCaseDupRule", + "caseDupTbl", + "c1", + "dbCaseDupRule.caseDupTbl.c1", + "UUID", + copiedMetadata, + "v1", + SchemaParser.toJson(caseDupSchema), + null, + null))); + + // With caseSensitive=true, a mixed-case reference "Id" has no exact match in either "id" + // or "ID". The rule must skip normalization (empty mappings) so Spark reports an unresolved + // attribute rather than silently redirecting to the wrong column. + spark.conf().set("spark.sql.caseSensitive", "true"); + try { + Assertions.assertThrows( + Exception.class, + () -> spark.sql("SELECT Id FROM openhouse.dbCaseDupRule.caseDupTbl").collectAsList(), + "Rule must not normalize mixed-case ref against a case-duplicate table; query must throw"); + } finally { + spark.conf().set("spark.sql.caseSensitive", "false"); + } + } + + /** + * Verifies that {@code OHCaseInsensitiveResolveRule} does NOT rename column references that + * belong to a non-OH catalog relation when the OH table in the same plan has a column with the + * same case-folded name. + * + *

Setup: an OH table ({@code openhouse} catalog) has column {@code "ID"} (uppercase); a non-OH + * Iceberg table in the {@code testhelper} Hadoop catalog has column {@code "id"} (lowercase). + * Both appear in the same plan via a JOIN. Both catalogs resolve their tables as {@code + * DataSourceV2Relation} leaf nodes in the same analyzer pass, so the non-OH relation IS visible + * to {@code collectOHColumnMappings} when the rule fires. + * + *

Without the fix: the mapping {@code {"id" -> "ID"}} is built from the OH table and {@code + * transformExpressions} renames the non-OH table's {@code id} reference to {@code "ID"}, which + * fails to resolve under {@code caseSensitive=true}. + * + *

With the fix, {@code collectOHColumnMappings} detects that {@code "id"} (case-folded) also + * appears in the non-OH {@code DataSourceV2Relation}, excludes it from the mapping, and leaves + * the non-OH table's reference unchanged — the analysis succeeds. + */ + @SneakyThrows + @Test + public void testCrossCatalogJoin_nonOHTableColumnNotRenamedToMatchOHCasing() { + // Create a non-OH Iceberg table in the testhelper (Hadoop) catalog with lowercase column "id". + // testhelper uses type=hadoop (not catalog-impl), so isOHRelation returns false for it. + spark.sql( + "CREATE TABLE IF NOT EXISTS testhelper.dbCrossJoin.nonOhTable (id string) USING iceberg"); + + // Create an OH table with uppercase column "ID" directly via Hadoop catalog, + // bypassing Spark SQL DDL to ensure the column is stored as "ID" (not lowercased). + TableIdentifier ohTableId = TableIdentifier.of("dbCrossJoin", "ohJoinTable"); + Schema ohSchema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get())); + + String warehouse = spark.conf().get("spark.sql.catalog.testhelper.warehouse"); + Catalog hadoopCatalog = + CatalogUtil.buildIcebergCatalog( + "testhelper_crossjoin", + ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, warehouse, ICEBERG_CATALOG_TYPE, "hadoop"), + new Configuration()); + hadoopCatalog.createTable(ohTableId, ohSchema); + + Table table = hadoopCatalog.loadTable(ohTableId); + Path metadataPath = + Paths.get(((BaseTable) table).operations().current().metadataFileLocation()); + String copiedMetadata = + Files.copy( + metadataPath, + metadataPath.resolveSibling( + new Random().nextInt(Integer.MAX_VALUE) + "-.metadata.json")) + .toString(); + + // Mock the OH catalog response for the OH table (column "ID"). + mockTableService.enqueue( + mockResponse( + 200, + mockGetTableResponseBody( + "dbCrossJoin", + "ohJoinTable", + "c1", + "dbCrossJoin.ohJoinTable.c1", + "UUID", + copiedMetadata, + "v1", + SchemaParser.toJson(ohSchema), + null, + null))); + + // Both tables appear in the same plan via a JOIN. With caseSensitive=true: + // - testhelper.dbCrossJoin.nonOhTable has "id" (lowercase, exact case in query) + // - openhouse.dbCrossJoin.ohJoinTable has "ID" (uppercase, exact case in query) + // The rule must exclude "id" from its mapping (shared name with the non-OH relation) and + // leave the non-OH table's reference unchanged — analysis must succeed. + spark.conf().set("spark.sql.caseSensitive", "true"); + try { + Assertions.assertDoesNotThrow( + () -> + spark + .sql( + "SELECT t.id FROM testhelper.dbCrossJoin.nonOhTable t" + + " JOIN openhouse.dbCrossJoin.ohJoinTable oh ON t.id = oh.ID") + .queryExecution() + .analyzed(), + "Analysis must succeed: the rule must not rename nonOhTable.id to ID"); + } finally { + spark.conf().set("spark.sql.caseSensitive", "false"); + spark.sql("DROP TABLE IF EXISTS testhelper.dbCrossJoin.nonOhTable"); + } + } + + /** + * Verifies that {@code OHCaseInsensitiveResolveRule} normalizes nested struct field name parts in + * addition to top-level column names. + * + *

Setup: an OH table whose top-level column and all struct field names are stored in UPPERCASE + * — mimicking Hive-migrated tables where the engine normalises every identifier to upper case. + * With {@code spark.sql.caseSensitive=true}, a lower-case reference to {@code payload.event_id} + * must be renamed to {@code PAYLOAD.EVENT_ID} so that {@code ResolveReferences} can resolve the + * struct field access. Similarly, {@code payload.nested.value} must become {@code + * PAYLOAD.NESTED.VALUE}. + * + *

Why the top-level name must also be upper-case in this test: {@code + * OHCaseInsensitiveResolveRule} is injected as an {@code extendedResolutionRule}, which Spark + * places after {@code ResolveReferences} in the Resolution batch. When the top-level + * column name is an exact case match (e.g. stored as {@code payload}, queried as {@code + * payload}), {@code ResolveReferences} finds the struct attribute and immediately calls {@code + * ExtractValue} on the nested field name — which throws {@code AnalysisException} before our rule + * gets a chance to run. Making the top-level name upper-case ({@code PAYLOAD}) means the + * case-sensitive resolver does not match {@code payload}, leaves the entire attribute + * unresolved, and our rule is reached on the same fixed-point iteration where it can normalise + * the full dotted path. + * + *

Without the nested-field normalization, the rule would leave {@code payload.event_id} + * unchanged and Spark would throw an {@code AnalysisException} because {@code EVENT_ID} ≠ {@code + * event_id} under {@code caseSensitive=true}. + */ + @SneakyThrows + @Test + public void testNestedStructField_normalizedCaseInsensitively() { + // Build an OH table schema where EVERY stored name is in uppercase (Hive convention). + // Schema: PAYLOAD STRUCT> + // Using all-uppercase ensures the top-level column also has a case mismatch with the + // lowercase query references, which is required for our rule to intercept the plan before + // Spark's ResolveReferences throws (see Javadoc above). + TableIdentifier tableId = TableIdentifier.of("dbNestedStruct", "nestedTbl"); + Schema nestedSchema = + new Schema( + Types.NestedField.required( + 1, + "PAYLOAD", + Types.StructType.of( + Types.NestedField.required(101, "EVENT_ID", Types.StringType.get()), + Types.NestedField.optional( + 102, + "NESTED", + Types.StructType.of( + Types.NestedField.optional(201, "VALUE", Types.LongType.get())))))); + + String warehouse = spark.conf().get("spark.sql.catalog.testhelper.warehouse"); + Catalog hadoopCatalog = + CatalogUtil.buildIcebergCatalog( + "testhelper_nested", + ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, warehouse, ICEBERG_CATALOG_TYPE, "hadoop"), + new Configuration()); + hadoopCatalog.createTable(tableId, nestedSchema); + + Table table = hadoopCatalog.loadTable(tableId); + Path metadataPath = + Paths.get(((BaseTable) table).operations().current().metadataFileLocation()); + String copiedMetadata = + Files.copy( + metadataPath, + metadataPath.resolveSibling( + new Random().nextInt(Integer.MAX_VALUE) + "-.metadata.json")) + .toString(); + + mockTableService.enqueue( + mockResponse( + 200, + mockGetTableResponseBody( + "dbNestedStruct", + "nestedTbl", + "c1", + "dbNestedStruct.nestedTbl.c1", + "UUID", + copiedMetadata, + "v1", + SchemaParser.toJson(nestedSchema), + null, + null))); + + // Enqueue a second response for the second query below (each table load consumes one mock). + String copiedMetadata2 = + Files.copy( + metadataPath, + metadataPath.resolveSibling( + new Random().nextInt(Integer.MAX_VALUE) + "-.metadata.json")) + .toString(); + mockTableService.enqueue( + mockResponse( + 200, + mockGetTableResponseBody( + "dbNestedStruct", + "nestedTbl", + "c1", + "dbNestedStruct.nestedTbl.c1", + "UUID", + copiedMetadata2, + "v1", + SchemaParser.toJson(nestedSchema), + null, + null))); + + spark.conf().set("spark.sql.caseSensitive", "true"); + try { + // Single-level nested field: payload.event_id must resolve to PAYLOAD.EVENT_ID. + Assertions.assertDoesNotThrow( + () -> + spark + .sql("SELECT payload.event_id FROM openhouse.dbNestedStruct.nestedTbl") + .queryExecution() + .analyzed(), + "Single-level nested field reference must resolve case-insensitively"); + + // Two-level nested field: payload.nested.value must resolve to PAYLOAD.NESTED.VALUE. + Assertions.assertDoesNotThrow( + () -> + spark + .sql("SELECT payload.nested.value FROM openhouse.dbNestedStruct.nestedTbl") + .queryExecution() + .analyzed(), + "Two-level nested field reference must resolve case-insensitively"); + } finally { + spark.conf().set("spark.sql.caseSensitive", "false"); + } + } +} diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/java/com/linkedin/openhouse/spark/OHSparkCatalog.java b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/java/com/linkedin/openhouse/spark/OHSparkCatalog.java new file mode 100644 index 000000000..8dd7aeca9 --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/java/com/linkedin/openhouse/spark/OHSparkCatalog.java @@ -0,0 +1,65 @@ +package com.linkedin.openhouse.spark; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCapability; + +/** + * Spark catalog wrapper for OpenHouse that extends {@link SparkCatalog} and annotates every loaded + * table with {@link TableCapability#ACCEPT_ANY_SCHEMA}. + * + *

Why {@code ACCEPT_ANY_SCHEMA}? Spark's {@code ResolveOutputRelation} analyzer rule uses the + * session resolver (case-sensitive when {@code spark.sql.caseSensitive=true}). If a client + * DataFrame has column {@code "id"} but the OH table stores {@code "ID"}, the write command never + * resolves — Spark throws "Cannot find data for output column 'ID'" before OH's own server-side + * case-insensitive schema validation runs. + * + *

Advertising {@code ACCEPT_ANY_SCHEMA} causes {@code DataSourceV2Relation.skipSchemaResolution} + * to return {@code true}, which makes {@code V2WriteCommand.outputResolved} return {@code true} + * immediately. {@code ResolveOutputRelation} therefore skips OH write commands, allowing {@link + * OHWriteSchemaNormalizationRule} (a post-hoc resolution rule) to insert the necessary + * column-renaming {@code Project} before execution. + * + *

For reads and DDL the capability has no effect; it is only consulted during write analysis. + * + *

Configuration: + * + *

+ *   spark.sql.catalog.openhouse=com.linkedin.openhouse.spark.OHSparkCatalog
+ *   spark.sql.catalog.openhouse.catalog-impl=com.linkedin.openhouse.spark.OpenHouseCatalog
+ * 
+ */ +public class OHSparkCatalog extends SparkCatalog { + + @Override + public SparkTable loadTable(Identifier ident) throws NoSuchTableException { + SparkTable original = super.loadTable(ident); + return withAcceptAnySchema(original); + } + + /** + * Wraps a {@link SparkTable} in an anonymous subclass that adds {@link + * TableCapability#ACCEPT_ANY_SCHEMA} to the table's capabilities. + * + *

The anonymous class delegates all other behaviour to the original table by invoking {@code + * super} (which delegates to the underlying Iceberg table object). {@code snapshotId=null} and + * {@code refreshEagerly=false} are the correct defaults for a standard (non-time-travel) table + * load; the original table's Iceberg {@code Table} object is passed unchanged so all reads and + * writes continue to use the real table state. + */ + private SparkTable withAcceptAnySchema(SparkTable original) { + return new SparkTable(original.table(), null /* snapshotId */, false /* refreshEagerly */) { + @Override + public Set capabilities() { + Set caps = new HashSet<>(original.capabilities()); + caps.add(TableCapability.ACCEPT_ANY_SCHEMA); + return Collections.unmodifiableSet(caps); + } + }; + } +} diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHCaseInsensitiveResolveRule.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHCaseInsensitiveResolveRule.scala new file mode 100644 index 000000000..02d9ff9ac --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHCaseInsensitiveResolveRule.scala @@ -0,0 +1,218 @@ +package com.linkedin.openhouse.spark.extensions + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.types.{DataType, StructType} + +/** + * Analyzer rule that normalizes [[UnresolvedAttribute]] names to use the exact casing stored in + * OpenHouse table schemas, enabling case-insensitive column resolution for OH tables regardless + * of the {@code spark.sql.caseSensitive} session setting. + * + *

Mechanism: When a query references a column by a name whose casing differs from the stored + * name (e.g. query uses {@code id}, table stores {@code ID}), Spark's built-in + * {@code ResolveReferences} rule fails to resolve the attribute when + * {@code spark.sql.caseSensitive=true}. This rule runs in the same analyzer pass and renames + * any [[UnresolvedAttribute]] whose name parts case-insensitively match a column (or nested struct + * field) in a resolved OpenHouse relation, replacing each part with the stored casing. Spark's + * {@code ResolveReferences} then finds an exact match on the next fixed-point iteration. + * + *

Nested struct fields: for a dotted attribute reference such as {@code PAYLOAD.event_id} + * where the stored schema is {@code PAYLOAD STRUCT}, the rule normalizes the + * full name-part chain — {@code ["PAYLOAD", "event_id"]} becomes {@code ["PAYLOAD", "EVENT_ID"]}. + * Normalization descends recursively through arbitrarily nested struct types. Case-duplicate + * struct fields at any level are skipped (ambiguous target). + * + *

Batch-ordering constraint: this rule is injected via {@code injectResolutionRule}, + * which places it after Spark's built-in {@code ResolveReferences} in the Resolution + * batch. {@code ResolveReferences} throws an {@code AnalysisException} immediately when it finds + * the top-level attribute (case-sensitively) but cannot resolve the nested field — so + * normalization of nested fields only succeeds when the top-level column name also has a + * case mismatch (e.g. query uses {@code payload}, stored as {@code PAYLOAD}). In that situation + * {@code ResolveReferences} leaves the whole dotted reference unresolved (no exception), our rule + * normalises the full path, and the next fixed-point iteration resolves it. If the top-level name + * is an exact match but a nested field is not (e.g. {@code payload.event_id} with stored schema + * {@code payload STRUCT}), {@code ResolveReferences} throws before this rule + * can act. The typical production case — Hive-migrated tables where all identifiers are + * upper-cased — is fully covered. + * + *

Scope: Only applies to tables backed by a catalog whose {@code catalog-impl} is configured + * with "openhouse" (checked via the Spark conf). Two exclusions keep non-OH catalogs (Hive, other + * v2 catalogs) safe: (1) tables where two or more columns share the same case-folded name are + * skipped (ambiguous target), consistent with the server-side write-path guard; (2) column names + * that also appear in any non-OH resolved relation in the same plan are excluded — because + * {@code transformExpressions} walks the whole plan tree and cannot tell which + * {@link UnresolvedAttribute} belongs to which catalog, renaming a shared name could break + * resolution for non-OH references under {@code caseSensitive=true}. + * + *

The rule does NOT modify {@code spark.sql.caseSensitive} and has no effect on non-OH tables + * or intermediate DataFrame operations in the same query. + */ +class OHCaseInsensitiveResolveRule(spark: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + val (topLevelMap, typeMap) = collectOHColumnMappings(plan) + if (topLevelMap.isEmpty) return plan + + // Use resolveOperatorsDown so the rename is applied to every *unresolved* plan node in the + // tree — Sort, Project, Filter, etc. — not just the top-level node. + // plan.transformExpressions alone only applies mapExpressions to the root plan node's own + // expression fields (via mapProductIterator) and does not descend into child plan nodes, so + // for queries like "SELECT id FROM v ORDER BY id" the Project's expressions were left + // untouched. resolveOperatorsDown visits every unanalyzed node (skipping already-resolved + // view bodies) and applies transformExpressions to each one. + plan.resolveOperatorsDown { + case p: LogicalPlan => + p.transformExpressions { + case attr: UnresolvedAttribute => + val newParts = normalizeNameParts(attr.nameParts, topLevelMap, typeMap) + if (newParts != attr.nameParts) UnresolvedAttribute(newParts) + else attr + } + } + } + + /** + * Normalizes a sequence of name parts from an [[UnresolvedAttribute]] to use the exact casing + * stored in OH table schemas, handling both top-level columns and nested struct field access. + * + *

Three patterns are handled: + *

    + *
  1. Single-part: {@code ["id"]} → {@code ["ID"]} (top-level column, no qualifier)
  2. + *
  3. Qualifier + column: {@code ["t", "id"]} → {@code ["t", "ID"]} (table alias or catalog + * prefix before the column name)
  4. + *
  5. Nested struct access: {@code ["payload", "event_id"]} → + * {@code ["payload", "EVENT_ID"]} where {@code payload} is an OH struct column whose + * stored field name is {@code EVENT_ID}; or {@code ["t", "payload", "event_id"]} → + * {@code ["t", "payload", "EVENT_ID"]} with a qualifier prefix. Normalization recurses + * into arbitrarily nested struct types.
  6. + *
+ * + *

The algorithm scans {@code parts} left-to-right looking for the first index {@code i} + * where {@code parts(i)} case-insensitively matches a top-level OH column. The prefix + * {@code parts(0..i-1)} (qualifiers) is preserved unchanged, {@code parts(i)} is replaced with + * the stored column name, and the remaining parts are recursively normalized against that + * column's struct schema (if any). If no top-level OH column is found, the parts are returned + * unchanged. + */ + private def normalizeNameParts( + parts: Seq[String], + topLevelMap: Map[String, String], + typeMap: Map[String, DataType]): Seq[String] = { + if (parts.isEmpty) return parts + + for (i <- parts.indices) { + topLevelMap.get(parts(i).toLowerCase) match { + case Some(storedName) => + val remainingNormalized = typeMap.get(storedName) match { + case Some(dt) => normalizeStructPath(parts.drop(i + 1), dt) + case None => parts.drop(i + 1) + } + return parts.take(i) ++ Seq(storedName) ++ remainingNormalized + case None => + } + } + parts + } + + /** + * Recursively normalizes a path of field name parts through a nested [[StructType]]. + * + *

For each part, looks up the stored field name case-insensitively within the current struct + * and recurses into that field's type for subsequent parts. Returns the input unchanged if: + *

    + *
  • The current type is not a {@link StructType} (no struct to traverse)
  • + *
  • No field matches the current part (part is not a field name at this level)
  • + *
  • Two or more fields at this level share the same case-folded name (ambiguous)
  • + *
+ */ + private def normalizeStructPath(parts: Seq[String], dataType: DataType): Seq[String] = { + if (parts.isEmpty) return parts + dataType match { + case st: StructType => + val grouped = st.fields.groupBy(_.name.toLowerCase) + // Skip normalization at this level if any two fields share the same case-folded name. + if (grouped.values.exists(_.size > 1)) return parts + val fieldByLower = grouped.collect { case (lower, arr) if arr.size == 1 => lower -> arr.head } + fieldByLower.get(parts.head.toLowerCase) match { + case Some(field) => + Seq(field.name) ++ normalizeStructPath(parts.tail, field.dataType) + case None => + parts + } + case _ => + parts + } + } + + /** + * Scans the plan for resolved OpenHouse relations ([[DataSourceV2Relation]] nodes whose catalog + * is configured with an OpenHouse catalog-impl) and returns: + *
    + *
  • A top-level map of {@code lowercase_column_name -> stored_column_name}, with names + * that appear in any non-OH resolved relation excluded (cross-catalog safety).
  • + *
  • A type map of {@code stored_column_name -> DataType} used to traverse nested struct + * schemas during normalization of dotted attribute paths.
  • + *
+ * + *

Two exclusions apply: + *

    + *
  1. Tables with case-duplicate columns (e.g. both "id" and "ID") are skipped — the target + * column is ambiguous and normalization could silently misdirect a read.
  2. + *
  3. Column names that also appear (case-insensitively) in any non-OH resolved relation in + * the same plan are excluded from the top-level map. [[plan.transformExpressions]] is + * applied to the whole plan tree and cannot distinguish which [[UnresolvedAttribute]] + * belongs to which catalog. Renaming a name that also exists in a Hive/other-catalog + * relation would corrupt resolution for those references under + * {@code caseSensitive=true}.
  4. + *
+ */ + private def collectOHColumnMappings( + plan: LogicalPlan): (Map[String, String], Map[String, DataType]) = { + val ohBuilder = Map.newBuilder[String, String] + val typeBuilder = Map.newBuilder[String, DataType] + val nonOHLower = collection.mutable.Set[String]() + + plan.foreach { + case rel: DataSourceV2Relation if isOHRelation(rel) => + val fieldNames = rel.output.map(_.name) + // Skip tables where two columns share the same case-folded name. + val grouped = fieldNames.groupBy(_.toLowerCase) + if (grouped.values.forall(_.size == 1)) { + rel.output.foreach { attr => + ohBuilder += (attr.name.toLowerCase -> attr.name) + typeBuilder += (attr.name -> attr.dataType) + } + } + case node: LeafNode if node.resolved => + // Track all column names from every other resolved relation (Hive, other v2 catalogs, + // file scans, etc.) so we can exclude ambiguous names below. + node.output.foreach(attr => nonOHLower += attr.name.toLowerCase) + case _ => + } + + val rawMap = ohBuilder.result() + val typeMap = typeBuilder.result() + // Only keep names that are unambiguously OH-specific. + val topLevelMap = rawMap.filterKeys(k => !nonOHLower.contains(k)) + (topLevelMap, typeMap) + } + + /** + * Returns true if the relation is backed by an OpenHouse catalog, identified by checking + * whether the catalog-impl configured for this catalog's name contains "openhouse". + * This avoids hardcoding the catalog name and works with any registered OH catalog instance. + */ + private def isOHRelation(rel: DataSourceV2Relation): Boolean = { + rel.catalog match { + case Some(c) => + val key = s"spark.sql.catalog.${c.name()}.catalog-impl" + spark.conf.getOption(key).exists(_.toLowerCase.contains("openhouse")) + case None => + false + } + } +} diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHWriteSchemaNormalizationRule.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHWriteSchemaNormalizationRule.scala new file mode 100644 index 000000000..75f03e072 --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHWriteSchemaNormalizationRule.scala @@ -0,0 +1,163 @@ +package com.linkedin.openhouse.spark.extensions + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +/** + * Post-hoc resolution rule that replicates the column-name and type normalization that Spark's + * {@code ResolveOutputRelation} would have applied to OpenHouse write commands, compensating for + * the fact that OH tables advertise {@link org.apache.spark.sql.connector.catalog.TableCapability#ACCEPT_ANY_SCHEMA} + * (via {@link OHSparkCatalog}) which causes {@code ResolveOutputRelation} to skip them entirely. + * + *

Why {@code ACCEPT_ANY_SCHEMA}? Spark's {@code ResolveOutputRelation} throws at analysis time + * when {@code caseSensitive=true} and a client DataFrame column (e.g. {@code "id"}) does not match + * the OH table column name exactly ({@code "ID"}). Advertising {@code ACCEPT_ANY_SCHEMA} prevents + * the throw. This rule then runs as a {@code Post-Hoc Resolution} rule and does the work that + * {@code ResolveOutputRelation} would have done: wrapping the source query in a {@code Project} + * that renames (and if necessary casts) each source column to the stored OH casing and type. + * + *

The rule handles both write modes: + *

    + *
  • By-name writes ({@code isByName=true}, e.g. {@code df.writeTo().append()}): each + * source column is matched to the target column whose name it equals case-insensitively. + * Tables with case-duplicate columns are skipped (ambiguous target).
  • + *
  • By-position writes ({@code isByName=false}, e.g. {@code INSERT INTO … VALUES …}): + * source and target columns are zipped positionally and each source column is renamed (and + * if the types differ, cast) to match the target. This replicates the {@code Alias} + + * {@code Cast} that {@code ResolveOutputRelation} would have inserted.
  • + *
+ * + *

In both modes, if source and target already match in name and type the rule returns the plan + * unchanged. If the column count differs the rule is a no-op (the mismatch is left for Iceberg or + * the OH server to report). + */ +class OHWriteSchemaNormalizationRule(spark: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformDown { + case write: V2WriteCommand + if write.table.resolved && write.query.resolved && isOHWrite(write) => + normalizeColumnNames(write).getOrElse(write) + } + } + + private def isOHWrite(write: V2WriteCommand): Boolean = { + write.table match { + case rel: DataSourceV2Relation => isOHRelation(rel) + case _ => false + } + } + + private def normalizeColumnNames(write: V2WriteCommand): Option[V2WriteCommand] = { + val ohRelation = write.table match { + case rel: DataSourceV2Relation => rel + case _ => return None + } + + val targetCols = ohRelation.output + val sourceCols = write.query.output + + // If column counts differ, leave it to Iceberg / the OH server to report the mismatch. + if (sourceCols.size != targetCols.size) return None + + val projections = + if (write.isByName) projectByName(sourceCols, targetCols) + else projectByPosition(sourceCols, targetCols) + + projections match { + case None => None + case Some(exprs) => Some(write.withNewQuery(Project(exprs, write.query))) + } + } + + /** + * By-name mode: replicate what {@code ResolveOutputRelation} does for by-name writes — produce a + * projection in target column order that renames (and if necessary casts) each source + * column to the stored OH casing. This also handles the case where the source DataFrame has + * columns in a different order than the stored schema (e.g. when the source is built from a bean + * whose fields are introspected alphabetically). + * + *

Tables with case-duplicate columns are skipped (the target is ambiguous). + */ + private def projectByName( + sourceCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute], + targetCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute]) + : Option[Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]] = { + + // Case-duplicate target: skip normalization to avoid silently misdirecting the write. + val targetGrouped = targetCols.groupBy(_.name.toLowerCase) + if (targetGrouped.values.exists(_.size > 1)) return None + + // Case-duplicate source: skip to avoid ambiguous lookup. + val srcGrouped = sourceCols.groupBy(_.name.toLowerCase) + if (srcGrouped.values.exists(_.size > 1)) return None + val srcByLower: Map[String, org.apache.spark.sql.catalyst.expressions.Attribute] = + srcGrouped.map { case (lower, attrs) => lower -> attrs.head } + + // Produce expressions in TARGET column order (replicating ResolveOutputRelation). + // For each target column find the matching source column by case-insensitive name. + val exprs: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression] = + targetCols.map { tgt => + srcByLower.get(tgt.name.toLowerCase) match { + case Some(src) if src.name == tgt.name => src // correct casing, keep as-is + case Some(src) => Alias(src, tgt.name)() // rename to stored casing + case None => return None // unmatched column + } + } + + // No-op if the result is identical to the source (same column order, same names). + val unchanged = exprs.zip(sourceCols).forall { + case (expr: org.apache.spark.sql.catalyst.expressions.Attribute, src) => + expr.exprId == src.exprId + case _ => false + } + if (unchanged) None else Some(exprs) + } + + /** + * By-position mode (e.g. {@code INSERT INTO … VALUES …}): zip source and target by position. + * For each pair, replicate what {@code ResolveOutputRelation} would have done: + *

    + *
  • If names and types already match, keep the source attribute as-is.
  • + *
  • Otherwise, wrap the source in {@code Alias(Cast(src, targetType), targetName)} to + * rename the column and coerce the type to the stored schema.
  • + *
+ */ + private def projectByPosition( + sourceCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute], + targetCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute]) + : Option[Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]] = { + + val pairsNeedingChange = sourceCols.zip(targetCols).filter { + case (src, tgt) => + src.name != tgt.name || + src.dataType != tgt.dataType || + src.metadata != tgt.metadata + } + if (pairsNeedingChange.isEmpty) return None + + val exprs = sourceCols.zip(targetCols).map { + case (src, tgt) + if src.name == tgt.name && src.dataType == tgt.dataType && src.metadata == tgt.metadata => + src + case (src, tgt) => + val castExpr = if (src.dataType == tgt.dataType) src + else Cast(src, tgt.dataType, Option(spark.conf.get("spark.sql.session.timeZone"))) + Alias(castExpr, tgt.name)(explicitMetadata = Some(tgt.metadata)) + } + Some(exprs) + } + + private def isOHRelation(rel: DataSourceV2Relation): Boolean = { + rel.catalog match { + case Some(c) => + val key = s"spark.sql.catalog.${c.name()}.catalog-impl" + spark.conf.getOption(key).exists(_.toLowerCase.contains("openhouse")) + case None => + false + } + } +} diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OpenhouseSparkSessionExtensions.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OpenhouseSparkSessionExtensions.scala index c8d911dc2..892fa63ae 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OpenhouseSparkSessionExtensions.scala +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OpenhouseSparkSessionExtensions.scala @@ -7,6 +7,8 @@ import org.apache.spark.sql.SparkSessionExtensions class OpenhouseSparkSessionExtensions extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectParser { case (_, parser) => new OpenhouseSparkSqlExtensionsParser(parser) } - extensions.injectPlannerStrategy( spark => OpenhouseDataSourceV2Strategy(spark)) + extensions.injectPlannerStrategy(spark => OpenhouseDataSourceV2Strategy(spark)) + extensions.injectResolutionRule(spark => new OHCaseInsensitiveResolveRule(spark)) + extensions.injectPostHocResolutionRule(spark => new OHWriteSchemaNormalizationRule(spark)) } } diff --git a/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CTASNonNullTestSpark3_5.java b/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CTASNonNullTestSpark3_5.java index 46b47c9b8..6c7dd0898 100644 --- a/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CTASNonNullTestSpark3_5.java +++ b/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CTASNonNullTestSpark3_5.java @@ -24,7 +24,8 @@ public void testCTASPreservesNonNull() throws Exception { // Verify spark catalogs have correct classes configured assertEquals( - "org.apache.iceberg.spark.SparkCatalog", spark.conf().get("spark.sql.catalog.openhouse")); + "com.linkedin.openhouse.spark.OHSparkCatalog", + spark.conf().get("spark.sql.catalog.openhouse")); // Verify id column is preserved in good catalog, not preserved in bad catalog assertFalse(sourceSchema.apply("id").nullable(), "Source table id column should be required"); diff --git a/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/java/com/linkedin/openhouse/spark/OHSparkCatalog.java b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/java/com/linkedin/openhouse/spark/OHSparkCatalog.java new file mode 100644 index 000000000..34ab2d33f --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/java/com/linkedin/openhouse/spark/OHSparkCatalog.java @@ -0,0 +1,60 @@ +package com.linkedin.openhouse.spark; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; + +/** + * OpenHouse catalog extension for Spark 3.5 / Iceberg 1.5. Overrides {@link + * SparkCatalog#loadTable(Identifier)} to advertise {@link TableCapability#ACCEPT_ANY_SCHEMA} on + * every OpenHouse table. This prevents Spark's {@code ResolveOutputRelation} from throwing at + * analysis time when {@code caseSensitive=true} and the source column casing differs from the + * stored column name. The companion rule {@link + * com.linkedin.openhouse.spark.extensions.OHWriteSchemaNormalizationRule} runs as a post-hoc + * resolution rule and applies the necessary column renaming / casting that {@code + * ResolveOutputRelation} would otherwise have done. + */ +public class OHSparkCatalog extends SparkCatalog { + + @Override + public Table loadTable(Identifier ident) throws NoSuchTableException { + Table original = super.loadTable(ident); + if (original instanceof SparkTable) { + return withAcceptAnySchema((SparkTable) original); + } + return original; + } + + private SparkTable withAcceptAnySchema(SparkTable original) { + // SparkTable carries a branch field (set when loading branch-qualified identifiers like + // "table.branch_feature_a"). We must use the SparkTable(Table, String, boolean) constructor + // when a branch is present so that newWriteBuilder() targets the correct branch. + // Using SparkTable(Table, Long, boolean) with snapshotId=null would silently drop the branch + // and cause all branch writes to land on the main table instead. + String branch = original.branch(); + if (branch != null) { + return new SparkTable(original.table(), branch, false /* refreshEagerly */) { + @Override + public Set capabilities() { + Set caps = new HashSet<>(original.capabilities()); + caps.add(TableCapability.ACCEPT_ANY_SCHEMA); + return Collections.unmodifiableSet(caps); + } + }; + } + return new SparkTable(original.table(), original.snapshotId(), false /* refreshEagerly */) { + @Override + public Set capabilities() { + Set caps = new HashSet<>(original.capabilities()); + caps.add(TableCapability.ACCEPT_ANY_SCHEMA); + return Collections.unmodifiableSet(caps); + } + }; + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHCaseInsensitiveResolveRule.scala b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHCaseInsensitiveResolveRule.scala new file mode 100644 index 000000000..0fc3e2967 --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHCaseInsensitiveResolveRule.scala @@ -0,0 +1,218 @@ +package com.linkedin.openhouse.spark.extensions + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.types.{DataType, StructType} + +/** + * Analyzer rule that normalizes [[UnresolvedAttribute]] names to use the exact casing stored in + * OpenHouse table schemas, enabling case-insensitive column resolution for OH tables regardless + * of the {@code spark.sql.caseSensitive} session setting. + * + *

Mechanism: When a query references a column by a name whose casing differs from the stored + * name (e.g. query uses {@code id}, table stores {@code ID}), Spark's built-in + * {@code ResolveReferences} rule fails to resolve the attribute when + * {@code spark.sql.caseSensitive=true}. This rule runs in the same analyzer pass and renames + * any [[UnresolvedAttribute]] whose name parts case-insensitively match a column (or nested struct + * field) in a resolved OpenHouse relation, replacing each part with the stored casing. Spark's + * {@code ResolveReferences} then finds an exact match on the next fixed-point iteration. + * + *

Nested struct fields: for a dotted attribute reference such as {@code PAYLOAD.event_id} + * where the stored schema is {@code PAYLOAD STRUCT}, the rule normalizes the + * full name-part chain — {@code ["PAYLOAD", "event_id"]} becomes {@code ["PAYLOAD", "EVENT_ID"]}. + * Normalization descends recursively through arbitrarily nested struct types. Case-duplicate + * struct fields at any level are skipped (ambiguous target). + * + *

Batch-ordering constraint: this rule is injected via {@code injectResolutionRule}, + * which places it after Spark's built-in {@code ResolveReferences} in the Resolution + * batch. {@code ResolveReferences} throws an {@code AnalysisException} immediately when it finds + * the top-level attribute (case-sensitively) but cannot resolve the nested field — so + * normalization of nested fields only succeeds when the top-level column name also has a + * case mismatch (e.g. query uses {@code payload}, stored as {@code PAYLOAD}). In that situation + * {@code ResolveReferences} leaves the whole dotted reference unresolved (no exception), our rule + * normalises the full path, and the next fixed-point iteration resolves it. If the top-level name + * is an exact match but a nested field is not (e.g. {@code payload.event_id} with stored schema + * {@code payload STRUCT}), {@code ResolveReferences} throws before this rule + * can act. The typical production case — Hive-migrated tables where all identifiers are + * upper-cased — is fully covered. + * + *

Scope: Only applies to tables backed by a catalog whose {@code catalog-impl} is configured + * with "openhouse" (checked via the Spark conf). Two exclusions keep non-OH catalogs (Hive, other + * v2 catalogs) safe: (1) tables where two or more columns share the same case-folded name are + * skipped (ambiguous target), consistent with the server-side write-path guard; (2) column names + * that also appear in any non-OH resolved relation in the same plan are excluded — because + * {@code resolveOperatorsDown} + {@code transformExpressions} walks the whole plan tree and cannot + * tell which {@link UnresolvedAttribute} belongs to which catalog, renaming a shared name could + * break resolution for non-OH references under {@code caseSensitive=true}. + * + *

The rule does NOT modify {@code spark.sql.caseSensitive} and has no effect on non-OH tables + * or intermediate DataFrame operations in the same query. + */ +class OHCaseInsensitiveResolveRule(spark: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + val (topLevelMap, typeMap) = collectOHColumnMappings(plan) + if (topLevelMap.isEmpty) return plan + + // Use resolveOperatorsDown so the rename is applied to every *unresolved* plan node in the + // tree — Sort, Project, Filter, etc. — not just the top-level node. + // plan.transformExpressions alone only applies mapExpressions to the root plan node's own + // expression fields (via mapProductIterator) and does not descend into child plan nodes, so + // for queries like "SELECT id FROM v ORDER BY id" the Project's expressions were left + // untouched. resolveOperatorsDown visits every unanalyzed node (skipping already-resolved + // view bodies) and applies transformExpressions to each one. + plan.resolveOperatorsDown { + case p: LogicalPlan => + p.transformExpressions { + case attr: UnresolvedAttribute => + val newParts = normalizeNameParts(attr.nameParts, topLevelMap, typeMap) + if (newParts != attr.nameParts) UnresolvedAttribute(newParts) + else attr + } + } + } + + /** + * Normalizes a sequence of name parts from an [[UnresolvedAttribute]] to use the exact casing + * stored in OH table schemas, handling both top-level columns and nested struct field access. + * + *

Three patterns are handled: + *

    + *
  1. Single-part: {@code ["id"]} → {@code ["ID"]} (top-level column, no qualifier)
  2. + *
  3. Qualifier + column: {@code ["t", "id"]} → {@code ["t", "ID"]} (table alias or catalog + * prefix before the column name)
  4. + *
  5. Nested struct access: {@code ["payload", "event_id"]} → + * {@code ["payload", "EVENT_ID"]} where {@code payload} is an OH struct column whose + * stored field name is {@code EVENT_ID}; or {@code ["t", "payload", "event_id"]} → + * {@code ["t", "payload", "EVENT_ID"]} with a qualifier prefix. Normalization recurses + * into arbitrarily nested struct types.
  6. + *
+ * + *

The algorithm scans {@code parts} left-to-right looking for the first index {@code i} + * where {@code parts(i)} case-insensitively matches a top-level OH column. The prefix + * {@code parts(0..i-1)} (qualifiers) is preserved unchanged, {@code parts(i)} is replaced with + * the stored column name, and the remaining parts are recursively normalized against that + * column's struct schema (if any). If no top-level OH column is found, the parts are returned + * unchanged. + */ + private def normalizeNameParts( + parts: Seq[String], + topLevelMap: Map[String, String], + typeMap: Map[String, DataType]): Seq[String] = { + if (parts.isEmpty) return parts + + for (i <- parts.indices) { + topLevelMap.get(parts(i).toLowerCase) match { + case Some(storedName) => + val remainingNormalized = typeMap.get(storedName) match { + case Some(dt) => normalizeStructPath(parts.drop(i + 1), dt) + case None => parts.drop(i + 1) + } + return parts.take(i) ++ Seq(storedName) ++ remainingNormalized + case None => + } + } + parts + } + + /** + * Recursively normalizes a path of field name parts through a nested [[StructType]]. + * + *

For each part, looks up the stored field name case-insensitively within the current struct + * and recurses into that field's type for subsequent parts. Returns the input unchanged if: + *

    + *
  • The current type is not a {@link StructType} (no struct to traverse)
  • + *
  • No field matches the current part (part is not a field name at this level)
  • + *
  • Two or more fields at this level share the same case-folded name (ambiguous)
  • + *
+ */ + private def normalizeStructPath(parts: Seq[String], dataType: DataType): Seq[String] = { + if (parts.isEmpty) return parts + dataType match { + case st: StructType => + val grouped = st.fields.groupBy(_.name.toLowerCase) + // Skip normalization at this level if any two fields share the same case-folded name. + if (grouped.values.exists(_.size > 1)) return parts + val fieldByLower = grouped.collect { case (lower, arr) if arr.size == 1 => lower -> arr.head } + fieldByLower.get(parts.head.toLowerCase) match { + case Some(field) => + Seq(field.name) ++ normalizeStructPath(parts.tail, field.dataType) + case None => + parts + } + case _ => + parts + } + } + + /** + * Scans the plan for resolved OpenHouse relations ([[DataSourceV2Relation]] nodes whose catalog + * is configured with an OpenHouse catalog-impl) and returns: + *
    + *
  • A top-level map of {@code lowercase_column_name -> stored_column_name}, with names + * that appear in any non-OH resolved relation excluded (cross-catalog safety).
  • + *
  • A type map of {@code stored_column_name -> DataType} used to traverse nested struct + * schemas during normalization of dotted attribute paths.
  • + *
+ * + *

Two exclusions apply: + *

    + *
  1. Tables with case-duplicate columns (e.g. both "id" and "ID") are skipped — the target + * column is ambiguous and normalization could silently misdirect a read.
  2. + *
  3. Column names that also appear (case-insensitively) in any non-OH resolved relation in + * the same plan are excluded from the top-level map. [[resolveOperatorsDown]] + + * [[transformExpressions]] walks the whole plan tree and cannot distinguish which + * [[UnresolvedAttribute]] belongs to which catalog. Renaming a name that also exists in a + * Hive/other-catalog relation would corrupt resolution for those references under + * {@code caseSensitive=true}.
  4. + *
+ */ + private def collectOHColumnMappings( + plan: LogicalPlan): (Map[String, String], Map[String, DataType]) = { + val ohBuilder = Map.newBuilder[String, String] + val typeBuilder = Map.newBuilder[String, DataType] + val nonOHLower = collection.mutable.Set[String]() + + plan.foreach { + case rel: DataSourceV2Relation if isOHRelation(rel) => + val fieldNames = rel.output.map(_.name) + // Skip tables where two columns share the same case-folded name. + val grouped = fieldNames.groupBy(_.toLowerCase) + if (grouped.values.forall(_.size == 1)) { + rel.output.foreach { attr => + ohBuilder += (attr.name.toLowerCase -> attr.name) + typeBuilder += (attr.name -> attr.dataType) + } + } + case node: LeafNode if node.resolved => + // Track all column names from every other resolved relation (Hive, other v2 catalogs, + // file scans, etc.) so we can exclude ambiguous names below. + node.output.foreach(attr => nonOHLower += attr.name.toLowerCase) + case _ => + } + + val rawMap = ohBuilder.result() + val typeMap = typeBuilder.result() + // Only keep names that are unambiguously OH-specific. + val topLevelMap = rawMap.filterKeys(k => !nonOHLower.contains(k)) + (topLevelMap, typeMap) + } + + /** + * Returns true if the relation is backed by an OpenHouse catalog, identified by checking + * whether the catalog-impl configured for this catalog's name contains "openhouse". + * This avoids hardcoding the catalog name and works with any registered OH catalog instance. + */ + private def isOHRelation(rel: DataSourceV2Relation): Boolean = { + rel.catalog match { + case Some(c) => + val key = s"spark.sql.catalog.${c.name()}.catalog-impl" + spark.conf.getOption(key).exists(_.toLowerCase.contains("openhouse")) + case None => + false + } + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHWriteSchemaNormalizationRule.scala b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHWriteSchemaNormalizationRule.scala new file mode 100644 index 000000000..75f03e072 --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OHWriteSchemaNormalizationRule.scala @@ -0,0 +1,163 @@ +package com.linkedin.openhouse.spark.extensions + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +/** + * Post-hoc resolution rule that replicates the column-name and type normalization that Spark's + * {@code ResolveOutputRelation} would have applied to OpenHouse write commands, compensating for + * the fact that OH tables advertise {@link org.apache.spark.sql.connector.catalog.TableCapability#ACCEPT_ANY_SCHEMA} + * (via {@link OHSparkCatalog}) which causes {@code ResolveOutputRelation} to skip them entirely. + * + *

Why {@code ACCEPT_ANY_SCHEMA}? Spark's {@code ResolveOutputRelation} throws at analysis time + * when {@code caseSensitive=true} and a client DataFrame column (e.g. {@code "id"}) does not match + * the OH table column name exactly ({@code "ID"}). Advertising {@code ACCEPT_ANY_SCHEMA} prevents + * the throw. This rule then runs as a {@code Post-Hoc Resolution} rule and does the work that + * {@code ResolveOutputRelation} would have done: wrapping the source query in a {@code Project} + * that renames (and if necessary casts) each source column to the stored OH casing and type. + * + *

The rule handles both write modes: + *

    + *
  • By-name writes ({@code isByName=true}, e.g. {@code df.writeTo().append()}): each + * source column is matched to the target column whose name it equals case-insensitively. + * Tables with case-duplicate columns are skipped (ambiguous target).
  • + *
  • By-position writes ({@code isByName=false}, e.g. {@code INSERT INTO … VALUES …}): + * source and target columns are zipped positionally and each source column is renamed (and + * if the types differ, cast) to match the target. This replicates the {@code Alias} + + * {@code Cast} that {@code ResolveOutputRelation} would have inserted.
  • + *
+ * + *

In both modes, if source and target already match in name and type the rule returns the plan + * unchanged. If the column count differs the rule is a no-op (the mismatch is left for Iceberg or + * the OH server to report). + */ +class OHWriteSchemaNormalizationRule(spark: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformDown { + case write: V2WriteCommand + if write.table.resolved && write.query.resolved && isOHWrite(write) => + normalizeColumnNames(write).getOrElse(write) + } + } + + private def isOHWrite(write: V2WriteCommand): Boolean = { + write.table match { + case rel: DataSourceV2Relation => isOHRelation(rel) + case _ => false + } + } + + private def normalizeColumnNames(write: V2WriteCommand): Option[V2WriteCommand] = { + val ohRelation = write.table match { + case rel: DataSourceV2Relation => rel + case _ => return None + } + + val targetCols = ohRelation.output + val sourceCols = write.query.output + + // If column counts differ, leave it to Iceberg / the OH server to report the mismatch. + if (sourceCols.size != targetCols.size) return None + + val projections = + if (write.isByName) projectByName(sourceCols, targetCols) + else projectByPosition(sourceCols, targetCols) + + projections match { + case None => None + case Some(exprs) => Some(write.withNewQuery(Project(exprs, write.query))) + } + } + + /** + * By-name mode: replicate what {@code ResolveOutputRelation} does for by-name writes — produce a + * projection in target column order that renames (and if necessary casts) each source + * column to the stored OH casing. This also handles the case where the source DataFrame has + * columns in a different order than the stored schema (e.g. when the source is built from a bean + * whose fields are introspected alphabetically). + * + *

Tables with case-duplicate columns are skipped (the target is ambiguous). + */ + private def projectByName( + sourceCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute], + targetCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute]) + : Option[Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]] = { + + // Case-duplicate target: skip normalization to avoid silently misdirecting the write. + val targetGrouped = targetCols.groupBy(_.name.toLowerCase) + if (targetGrouped.values.exists(_.size > 1)) return None + + // Case-duplicate source: skip to avoid ambiguous lookup. + val srcGrouped = sourceCols.groupBy(_.name.toLowerCase) + if (srcGrouped.values.exists(_.size > 1)) return None + val srcByLower: Map[String, org.apache.spark.sql.catalyst.expressions.Attribute] = + srcGrouped.map { case (lower, attrs) => lower -> attrs.head } + + // Produce expressions in TARGET column order (replicating ResolveOutputRelation). + // For each target column find the matching source column by case-insensitive name. + val exprs: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression] = + targetCols.map { tgt => + srcByLower.get(tgt.name.toLowerCase) match { + case Some(src) if src.name == tgt.name => src // correct casing, keep as-is + case Some(src) => Alias(src, tgt.name)() // rename to stored casing + case None => return None // unmatched column + } + } + + // No-op if the result is identical to the source (same column order, same names). + val unchanged = exprs.zip(sourceCols).forall { + case (expr: org.apache.spark.sql.catalyst.expressions.Attribute, src) => + expr.exprId == src.exprId + case _ => false + } + if (unchanged) None else Some(exprs) + } + + /** + * By-position mode (e.g. {@code INSERT INTO … VALUES …}): zip source and target by position. + * For each pair, replicate what {@code ResolveOutputRelation} would have done: + *

    + *
  • If names and types already match, keep the source attribute as-is.
  • + *
  • Otherwise, wrap the source in {@code Alias(Cast(src, targetType), targetName)} to + * rename the column and coerce the type to the stored schema.
  • + *
+ */ + private def projectByPosition( + sourceCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute], + targetCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute]) + : Option[Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]] = { + + val pairsNeedingChange = sourceCols.zip(targetCols).filter { + case (src, tgt) => + src.name != tgt.name || + src.dataType != tgt.dataType || + src.metadata != tgt.metadata + } + if (pairsNeedingChange.isEmpty) return None + + val exprs = sourceCols.zip(targetCols).map { + case (src, tgt) + if src.name == tgt.name && src.dataType == tgt.dataType && src.metadata == tgt.metadata => + src + case (src, tgt) => + val castExpr = if (src.dataType == tgt.dataType) src + else Cast(src, tgt.dataType, Option(spark.conf.get("spark.sql.session.timeZone"))) + Alias(castExpr, tgt.name)(explicitMetadata = Some(tgt.metadata)) + } + Some(exprs) + } + + private def isOHRelation(rel: DataSourceV2Relation): Boolean = { + rel.catalog match { + case Some(c) => + val key = s"spark.sql.catalog.${c.name()}.catalog-impl" + spark.conf.getOption(key).exists(_.toLowerCase.contains("openhouse")) + case None => + false + } + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OpenhouseSparkSessionExtensions.scala b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OpenhouseSparkSessionExtensions.scala new file mode 100644 index 000000000..892fa63ae --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/extensions/OpenhouseSparkSessionExtensions.scala @@ -0,0 +1,14 @@ +package com.linkedin.openhouse.spark.extensions + +import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSparkSqlExtensionsParser +import com.linkedin.openhouse.spark.sql.execution.datasources.v2.OpenhouseDataSourceV2Strategy +import org.apache.spark.sql.SparkSessionExtensions + +class OpenhouseSparkSessionExtensions extends (SparkSessionExtensions => Unit) { + override def apply(extensions: SparkSessionExtensions): Unit = { + extensions.injectParser { case (_, parser) => new OpenhouseSparkSqlExtensionsParser(parser) } + extensions.injectPlannerStrategy(spark => OpenhouseDataSourceV2Strategy(spark)) + extensions.injectResolutionRule(spark => new OHCaseInsensitiveResolveRule(spark)) + extensions.injectPostHocResolutionRule(spark => new OHWriteSchemaNormalizationRule(spark)) + } +} diff --git a/tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/TestSparkSessionUtil.java b/tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/TestSparkSessionUtil.java index 5404c3248..d78278b8b 100644 --- a/tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/TestSparkSessionUtil.java +++ b/tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/TestSparkSessionUtil.java @@ -43,7 +43,7 @@ public static void configureCatalogs( builder .config( String.format("spark.sql.catalog.%s", catalogName), - "org.apache.iceberg.spark.SparkCatalog") + "com.linkedin.openhouse.spark.OHSparkCatalog") .config( String.format("spark.sql.catalog.%s.catalog-impl", catalogName), "com.linkedin.openhouse.spark.OpenHouseCatalog")