From 0b5bfd4d69d43908e1ba81c329a0a5e3127bdcec Mon Sep 17 00:00:00 2001 From: Levi Jiang Date: Tue, 28 Apr 2026 00:14:25 -0700 Subject: [PATCH 1/3] opt in 1 day ttl for OFD --- .../spark/OrphanFilesDeletionSparkApp.java | 19 ++++-- .../openhouse/jobs/util/AppConstants.java | 1 + .../OrphanFilesDeletionSparkAppTest.java | 67 +++++++++++++++++++ 3 files changed, 81 insertions(+), 6 deletions(-) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java index f3a8ddb65..983f6a8ef 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java @@ -57,21 +57,22 @@ public OrphanFilesDeletionSparkApp( @Override protected void runInner(Operations ops) { updateTtlSeconds(ops); + Table table = ops.getTable(fqtn); long olderThanTimestampMillis = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlSeconds); - Table table = ops.getTable(fqtn); boolean backupEnabled = Boolean.parseBoolean( table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false")); log.info( - "Orphan files deletion app start for table={} with olderThanTimestampMillis={} backupEnabled={} and backupDir={}", + "Orphan files deletion app start for table={} with olderThanTimestampMillis={} ttlSeconds={} backupEnabled={} and backupDir={}", fqtn, olderThanTimestampMillis, + ttlSeconds, backupEnabled, backupDir); DeleteOrphanFiles.Result result = ops.deleteOrphanFiles( - ops.getTable(fqtn), + table, olderThanTimestampMillis, backupEnabled, backupDir, @@ -91,20 +92,26 @@ protected void runInner(Operations ops) { } /** - * Validate and keep min OFD TTL for replica table as 3 days if provided TTL is less than 3 days + * Apply the one-day OFD TTL opt-in when the table property is set, then enforce the 3-day + * minimum for replica tables. * * @param ops */ private void updateTtlSeconds(Operations ops) { Table table = ops.getTable(fqtn); + boolean oneDayTtlEnabled = + Boolean.parseBoolean( + table.properties().getOrDefault(AppConstants.OFD_ONE_DAY_TTL_ENABLED_KEY, "false")); + if (oneDayTtlEnabled) { + ttlSeconds = TimeUnit.DAYS.toSeconds(1); + } String tableType = table .properties() .getOrDefault(AppConstants.OPENHOUSE_TABLE_TYPE_KEY, AppConstants.TABLE_TYPE_PRIMARY); - // Check if replica table and update TTL + // Keep the min default OFD TTL for replica tables if (AppConstants.TABLE_TYPE_REPLICA.equals(tableType)) { long days = Duration.ofSeconds(ttlSeconds).toDays(); - // Keep the min default OFD TTL for replica tables if (days < DEFAULT_MIN_OFD_TTL_IN_DAYS) { ttlSeconds = TimeUnit.DAYS.toSeconds(DEFAULT_MIN_OFD_TTL_IN_DAYS); } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java index 6f4918635..84a595826 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java @@ -59,6 +59,7 @@ public final class AppConstants { public static final String OPENHOUSE_TABLE_TYPE_KEY = "openhouse.tableType"; public static final String TABLE_TYPE_PRIMARY = "PRIMARY_TABLE"; public static final String TABLE_TYPE_REPLICA = "REPLICA_TABLE"; + public static final String OFD_ONE_DAY_TTL_ENABLED_KEY = "ofd.one_day_ttl.enabled"; private AppConstants() {} } diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java index 2e56b2c0a..c7f32e95b 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java @@ -2,13 +2,23 @@ import com.linkedin.openhouse.common.metrics.DefaultOtelConfig; import com.linkedin.openhouse.common.metrics.OtelEmitter; +import com.linkedin.openhouse.jobs.util.AppConstants; import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Table; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +@Slf4j public class OrphanFilesDeletionSparkAppTest extends OpenHouseSparkITest { + private static final String BACKUP_DIR = ".backup"; + private static final String ORPHAN_FILE_NAME = "data/test_orphan_file.orc"; + private static final long DEFAULT_TTL_SECONDS = TimeUnit.DAYS.toSeconds(7); private final OtelEmitter otelEmitter = new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry())); @@ -74,6 +84,39 @@ public void testTtlSecondsReplicaTableThreeDayTtl() throws Exception { } } + @Test + public void testOneDayTtlEnabled() throws Exception { + final String tableName = "db.test_ofd_one_day_ttl_enabled"; + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + Path orphanFilePath = setupTableWithOrphanFile(ops, tableName, 2); + ops.spark() + .sql( + String.format( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", + tableName, AppConstants.OFD_ONE_DAY_TTL_ENABLED_KEY)); + + newApp(tableName).runInner(ops); + + Assertions.assertFalse( + ops.fs().exists(orphanFilePath), + "Orphan file older than 1 day should be deleted when ofd.one_day_ttl.enabled=true"); + } + } + + @Test + public void testDefaultTtl() throws Exception { + final String tableName = "db.test_ofd_default_ttl"; + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + Path orphanFilePath = setupTableWithOrphanFile(ops, tableName, 2); + + newApp(tableName).runInner(ops); + + Assertions.assertTrue( + ops.fs().exists(orphanFilePath), + "Orphan file younger than the default 7d TTL should be preserved when the property is absent"); + } + } + private static void prepareTable(Operations ops, String tableName) { ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show(); ops.spark() @@ -95,4 +138,28 @@ private static void prepareReplicaTable(Operations ops, String tableName) { .show(); ops.spark().sql(String.format("SHOW TBLPROPERTIES %s", tableName)).show(false); } + + private OrphanFilesDeletionSparkApp newApp(String tableName) { + return new OrphanFilesDeletionSparkApp( + "test-job-id", null, tableName, DEFAULT_TTL_SECONDS, otelEmitter, BACKUP_DIR, 5, false, + 20000); + } + + private Path setupTableWithOrphanFile(Operations ops, String tableName, int orphanAgeDays) + throws Exception { + ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show(); + ops.spark().sql(String.format("CREATE TABLE %s (data string, ts timestamp)", tableName)).show(); + for (int row = 0; row < 3; ++row) { + ops.spark() + .sql(String.format("INSERT INTO %s VALUES ('v%d', current_timestamp())", tableName, row)) + .show(); + } + Table table = ops.getTable(tableName); + Path orphanFilePath = new Path(table.location(), ORPHAN_FILE_NAME); + FileSystem fs = ops.fs(); + fs.createNewFile(orphanFilePath); + long mtimeMs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(orphanAgeDays); + fs.setTimes(orphanFilePath, mtimeMs, -1); + return orphanFilePath; + } } From 356a60193e246943798dec12882af24b61df2190 Mon Sep 17 00:00:00 2001 From: Levi Jiang Date: Tue, 28 Apr 2026 11:41:42 -0700 Subject: [PATCH 2/3] fix --- .../jobs/spark/OrphanFilesDeletionSparkAppTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java index c7f32e95b..ba761c4b9 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java @@ -141,7 +141,14 @@ private static void prepareReplicaTable(Operations ops, String tableName) { private OrphanFilesDeletionSparkApp newApp(String tableName) { return new OrphanFilesDeletionSparkApp( - "test-job-id", null, tableName, DEFAULT_TTL_SECONDS, otelEmitter, BACKUP_DIR, 5, false, + "test-job-id", + null, + tableName, + DEFAULT_TTL_SECONDS, + otelEmitter, + BACKUP_DIR, + 5, + false, 20000); } From e640c68c964ad56c1940cc86a393b7af36049f90 Mon Sep 17 00:00:00 2001 From: Levi Jiang Date: Thu, 30 Apr 2026 22:43:11 -0700 Subject: [PATCH 3/3] rebase --- .../openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java index 983f6a8ef..e85832de0 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java @@ -92,8 +92,8 @@ protected void runInner(Operations ops) { } /** - * Apply the one-day OFD TTL opt-in when the table property is set, then enforce the 3-day - * minimum for replica tables. + * Apply the one-day OFD TTL opt-in when the table property is set, then enforce the 3-day minimum + * for replica tables. * * @param ops */