Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
import com.linkedin.openhouse.jobs.util.TableStatsCollector;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -347,8 +348,9 @@ private Map<String, List<String>> prepareBackupDataManifests(

private void writeBackupDataManifests(
Map<String, List<String>> manifestCache, Table table, String backupDir, ZonedDateTime now) {
for (String partitionPath : manifestCache.keySet()) {
List<String> files = manifestCache.get(partitionPath);
for (Map.Entry<String, List<String>> entry : manifestCache.entrySet()) {
String partitionPath = entry.getKey();
List<String> files = entry.getValue();
List<String> backupFiles =
files.stream()
.map(file -> getTrashPath(table, file, backupDir).toString())
Expand All @@ -367,7 +369,7 @@ private void writeBackupDataManifests(
fs.mkdirs(destPath.getParent());
}
try (FSDataOutputStream out = fs.create(destPath, true)) {
out.write(jsonStr.getBytes());
out.write(jsonStr.getBytes(StandardCharsets.UTF_8));
}
log.info("Wrote {} with {} backup files", destPath, backupFiles.size());
} catch (IOException e) {
Expand Down
34 changes: 34 additions & 0 deletions gradle/spotbugs/spotbugsExclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,38 @@
<Bug pattern="MS_EXPOSE_REP"/>
</Match>

<Match>
<!--
SpotBugs false positive: try-with-resources generates a synthetic null check on the
AutoCloseable variable in the compiler-emitted finally block. SpotBugs (4.x) sees this as
"null check of value previously dereferenced" (RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE)
even though the dereference and the null check are in separate compiler-generated paths.
-->
<Class name="com.linkedin.openhouse.jobs.spark.Operations"/>
<Method name="prepareBackupDataManifests"/>
<Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"/>
</Match>

<Match>
<!--
Test-fixture interface: softDeletedTables is a mutable static field declared directly in the
HouseTablesH2Repository interface (implicitly public). Moving it to a concrete class would
require significant refactoring of test infrastructure; suppressing here is safe because this
field is only used in test code and is never exposed to untrusted callers.
-->
<Class name="com.linkedin.openhouse.tablestest.HouseTablesH2Repository"/>
<Bug pattern="MS_MUTABLE_COLLECTION_PKGPROTECT"/>
</Match>

<Match>
<!--
Test-fixture class: OpenHouseSparkITest.getBuilder() performs lazy initialization of the
static openHouseLocalServer field without synchronization. This is intentional for test
infrastructure where single-threaded sequential test execution is guaranteed by JUnit 5.
-->
<Class name="com.linkedin.openhouse.tablestest.OpenHouseSparkITest"/>
<Method name="getBuilder"/>
<Bug pattern="LI_LAZY_INIT_STATIC"/>
</Match>

</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void beforeEach(ExtensionContext context) throws Exception {
"spark.sql.extensions",
("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
+ "com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions"))
.config("spark.sql.catalog.openhouse", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.openhouse", "com.linkedin.openhouse.spark.OHSparkCatalog")
.config(
"spark.sql.catalog.openhouse.catalog-impl",
"com.linkedin.openhouse.spark.OpenHouseCatalog")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public void testCTASPreservesNonNull() throws Exception {

// Verify spark catalogs have correct classes configured
assertEquals(
"org.apache.iceberg.spark.SparkCatalog", spark.conf().get("spark.sql.catalog.openhouse"));
"com.linkedin.openhouse.spark.OHSparkCatalog",
spark.conf().get("spark.sql.catalog.openhouse"));

// Verify id column is preserved in good catalog, not preserved in bad catalog
assertFalse(sourceSchema.apply("id").nullable(), "Source table id column should be required");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,4 +411,179 @@ public void testAlterTableSortOrderCTAS() throws Exception {
Assertions.assertEquals(SortOrder.unsorted(), newSqlTable.sortOrder());
}
}

// ===== Case-insensitive reads (OHCaseInsensitiveResolveRule) =====

@Test
public void testReadWithCaseMismatchSucceeds_andDoesNotChangeCaseSensitiveConfig()
throws Exception {
try (SparkSession spark = getSparkSession()) {
// Create a table with uppercase column "ID" — typical for tables originally created
// by Hive or engines that preserve user-specified casing.
Catalog catalog = getOpenHouseCatalog(spark);
Schema schema =
new Schema(
Types.NestedField.required(1, "ID", Types.StringType.get()),
Types.NestedField.optional(2, "value", Types.LongType.get()));
catalog.createTable(TableIdentifier.of("d1", "case_read_test"), schema);
spark.sql("INSERT INTO openhouse.d1.case_read_test VALUES ('row1', 42)");

// With caseSensitive=true, vanilla Spark would reject "id" as unresolved against "ID".
// OHCaseInsensitiveResolveRule normalizes the attribute before ResolveReferences runs,
// so the query must succeed — and crucially must NOT change the session setting.
spark.conf().set("spark.sql.caseSensitive", "true");
try {
List<Row> rows = spark.sql("SELECT id FROM openhouse.d1.case_read_test").collectAsList();
Assertions.assertEquals(1, rows.size());
Assertions.assertEquals("row1", rows.get(0).getString(0));

// The rule must NOT mutate spark.sql.caseSensitive — that is the whole point of moving
// away from the session-level override approach.
Assertions.assertEquals(
"true",
spark.conf().get("spark.sql.caseSensitive"),
"OHCaseInsensitiveResolveRule must not modify spark.sql.caseSensitive");
} finally {
spark.conf().set("spark.sql.caseSensitive", "false");
spark.sql("DROP TABLE openhouse.d1.case_read_test");
}
}
}

@Test
public void testViewWithCaseMismatchResolvesViaRule() throws Exception {
try (SparkSession spark = getSparkSession()) {
// Create a table with uppercase "ID" column.
Catalog catalog = getOpenHouseCatalog(spark);
Schema schema =
new Schema(
Types.NestedField.required(1, "ID", Types.StringType.get()),
Types.NestedField.optional(2, "count", Types.LongType.get()));
catalog.createTable(TableIdentifier.of("d1", "view_case_test"), schema);
spark.sql("INSERT INTO openhouse.d1.view_case_test VALUES ('a', 1), ('b', 2)");

// View defined with explicit lowercase column references against a table that stores "ID".
// With caseSensitive=true and without the rule both the view definition and any outer query
// referencing a mismatched column name would fail to resolve.
spark.sql(
"CREATE OR REPLACE TEMP VIEW v_case AS "
+ "SELECT id, count FROM openhouse.d1.view_case_test");

// View defined with SELECT * — columns come from star-expansion over the OH table schema.
spark.sql(
"CREATE OR REPLACE TEMP VIEW v_case_star AS "
+ "SELECT * FROM openhouse.d1.view_case_test");

spark.conf().set("spark.sql.caseSensitive", "true");
try {
// SELECT * from explicit-column view: the rule normalises the inlined view SQL and the
// query must return all rows with correct values.
List<Row> selectStar = spark.sql("SELECT * FROM v_case ORDER BY id").collectAsList();
Assertions.assertEquals(2, selectStar.size());
Assertions.assertEquals("a", selectStar.get(0).getString(0));
Assertions.assertEquals("b", selectStar.get(1).getString(0));

// SELECT id from explicit-column view: the outer query uses a case-mismatched column
// reference against the view output — the rule must normalise it at the outer level too.
List<Row> selectId = spark.sql("SELECT id FROM v_case ORDER BY id").collectAsList();
Assertions.assertEquals(2, selectId.size());
Assertions.assertEquals("a", selectId.get(0).getString(0));
Assertions.assertEquals("b", selectId.get(1).getString(0));

// SELECT * from star view: star-expansion over the OH table schema produces "ID"; the
// rule must make reading it back case-insensitive.
List<Row> starViewSelectStar =
spark.sql("SELECT * FROM v_case_star ORDER BY id").collectAsList();
Assertions.assertEquals(2, starViewSelectStar.size());
Assertions.assertEquals("a", starViewSelectStar.get(0).getString(0));
Assertions.assertEquals("b", starViewSelectStar.get(1).getString(0));

// SELECT id from star view: explicit column reference with case mismatch against
// star-expanded view output must also resolve correctly.
List<Row> starViewSelectId =
spark.sql("SELECT id FROM v_case_star ORDER BY id").collectAsList();
Assertions.assertEquals(2, starViewSelectId.size());
Assertions.assertEquals("a", starViewSelectId.get(0).getString(0));
Assertions.assertEquals("b", starViewSelectId.get(1).getString(0));

Assertions.assertEquals(
"true",
spark.conf().get("spark.sql.caseSensitive"),
"OHCaseInsensitiveResolveRule must not modify spark.sql.caseSensitive");
} finally {
spark.conf().set("spark.sql.caseSensitive", "false");
spark.sql("DROP TABLE openhouse.d1.view_case_test");
}
}
}

@Test
public void testCaseDuplicateTableIsExcludedFromNormalization() throws Exception {
try (SparkSession spark = getSparkSession()) {
// Case-duplicate tables (columns "id" and "ID") are rejected at some enforcement layer.
// If the server validates at CREATE time it throws BadRequestException; if it allows
// creation then OHCaseInsensitiveResolveRule skips normalization for such tables and Spark's
// own ResolveReferences raises AnalysisException for the ambiguous column reference.
// Either way, case-duplicate column names cannot be silently misdirected.
Catalog catalog = getOpenHouseCatalog(spark);
Schema schema =
new Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
Types.NestedField.optional(2, "ID", Types.StringType.get()));

try {
catalog.createTable(TableIdentifier.of("d1", "case_dup_test"), schema);
// Server allowed creation; verify Spark raises an error for the ambiguous reference.
try {
Assertions.assertThrows(
Exception.class,
() -> spark.sql("SELECT id FROM openhouse.d1.case_dup_test").collectAsList(),
"Ambiguous column reference on case-duplicate table must throw");
} finally {
spark.sql("DROP TABLE openhouse.d1.case_dup_test");
}
} catch (Exception ignored) {
// Server rejected the case-duplicate schema at CREATE time — also correct behavior.
}
}
}

@Test
public void testWriteWithCaseMismatch_succeedsWithCaseSensitiveTrue() throws Exception {
try (SparkSession spark = getSparkSession()) {
// Create a table with uppercase column "ID" — the common case for tables originally created
// by Hive or engines that preserve user-specified casing.
Catalog catalog = getOpenHouseCatalog(spark);
Schema schema = new Schema(Types.NestedField.required(1, "ID", Types.StringType.get()));
catalog.createTable(TableIdentifier.of("d1", "write_case_test"), schema);

// With caseSensitive=true, Spark's ResolveOutputRelation uses a case-sensitive resolver and
// cannot find source column "id" in the target schema column "ID". Vanilla Spark would throw
// "Cannot find data for output column 'ID'" at analysis time.
//
// OHSparkCatalog advertises ACCEPT_ANY_SCHEMA so outputResolved=true and
// ResolveOutputRelation skips OH writes. OHWriteSchemaNormalizationRule (post-hoc) then
// inserts a Project(Alias("id" → "ID")) so Iceberg sees the correct stored casing.
spark.conf().set("spark.sql.caseSensitive", "true");
try {
Assertions.assertDoesNotThrow(
() -> spark.sql("SELECT 'row1' AS id").writeTo("openhouse.d1.write_case_test").append(),
"writeTo().append() must succeed when source has lowercase 'id' and OH table has 'ID'");

// Verify the row was written with the correct stored casing.
List<Row> rows = spark.sql("SELECT id FROM openhouse.d1.write_case_test").collectAsList();
Assertions.assertEquals(1, rows.size());
Assertions.assertEquals("row1", rows.get(0).getString(0));

// The rule must NOT mutate spark.sql.caseSensitive.
Assertions.assertEquals(
"true",
spark.conf().get("spark.sql.caseSensitive"),
"OHWriteSchemaNormalizationRule must not modify spark.sql.caseSensitive");
} finally {
spark.conf().set("spark.sql.caseSensitive", "false");
spark.sql("DROP TABLE openhouse.d1.write_case_test");
}
}
}
}
Loading