diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index c497355d4..123a6122b 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -38,7 +38,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; @@ -46,6 +45,8 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; +import org.apache.iceberg.actions.ExpireSnapshots; +import org.apache.iceberg.actions.ImmutableExpireSnapshots; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -249,7 +250,26 @@ public void deleteStagedOrphanDirectory( /** Expire snapshots on a given fully-qualified table name. */ public void expireSnapshots(String fqtn, int maxAge, String granularity, int versions) { - expireSnapshots(getTable(fqtn), maxAge, granularity, versions); + expireSnapshots(fqtn, maxAge, granularity, versions, false); + } + + /** Expire snapshots on a given fully-qualified table name with deleteFiles parameter. */ + public ExpireSnapshots.Result expireSnapshots( + String fqtn, int maxAge, String granularity, int versions, boolean deleteFiles) { + return expireSnapshots(fqtn, maxAge, granularity, versions, deleteFiles, false, ".backup"); + } + + /** Expire snapshots with backup support. */ + public ExpireSnapshots.Result expireSnapshots( + String fqtn, + int maxAge, + String granularity, + int versions, + boolean deleteFiles, + boolean backupEnabled, + String backupDir) { + return expireSnapshots( + getTable(fqtn), maxAge, granularity, versions, deleteFiles, backupEnabled, backupDir); } /** @@ -259,26 +279,192 @@ public void expireSnapshots(String fqtn, int maxAge, String granularity, int ver * number of snapshots younger than the maxAge */ public void expireSnapshots(Table table, int maxAge, String granularity, int versions) { - ExpireSnapshots expireSnapshotsCommand = table.expireSnapshots().cleanExpiredFiles(false); + // Call the Result-returning version but ignore the result for backward compatibility + expireSnapshots(table, maxAge, granularity, versions, false); + } + + /** + * Expire snapshots on a given {@link Table} with deleteFiles parameter. If maxAge is provided, it + * will expire snapshots older than maxAge in granularity timeunit. If versions is provided, it + * will retain the last versions snapshots. If both are provided, it will prioritize maxAge; only + * retain up to versions number of snapshots younger than the maxAge. Returns {@link + * ExpireSnapshots.Result} containing metrics about deleted files. + */ + public ExpireSnapshots.Result expireSnapshots( + Table table, int maxAge, String granularity, int versions, boolean deleteFiles) { + return expireSnapshots(table, maxAge, granularity, versions, deleteFiles, false, ".backup"); + } + + /** + * Expire snapshots with backup support. Main orchestration method that delegates to helper + * methods based on deleteFiles flag. + */ + public ExpireSnapshots.Result expireSnapshots( + Table table, + int maxAge, + String granularity, + int versions, + boolean deleteFiles, + boolean backupEnabled, + String backupDir) { - // maxAge will always be defined ChronoUnit timeUnitGranularity = ChronoUnit.valueOf( SparkJobUtil.convertGranularityToChrono(granularity.toUpperCase()).name()); long expireBeforeTimestampMs = System.currentTimeMillis() - timeUnitGranularity.getDuration().multipliedBy(maxAge).toMillis(); - log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs); - expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit(); + log.info( + "Expiring snapshots for table: {} older than {}ms with deleteFiles={}, backupEnabled={}, backupDir={}", + table, + expireBeforeTimestampMs, + deleteFiles, + backupEnabled, + backupDir); + + // First expiration: based on maxAge + ExpireSnapshots.Result result = + deleteFiles + ? expireSnapshotsWithFiles( + table, expireBeforeTimestampMs, null, backupEnabled, backupDir) + : expireSnapshotsMetadataOnly(table, expireBeforeTimestampMs, null); + + // Second expiration: based on versions (if needed) if (versions > 0 && Iterators.size(table.snapshots().iterator()) > versions) { log.info("Expiring snapshots for table: {} retaining last {} versions", table, versions); - // Note: retainLast keeps the last N snapshots that WOULD be expired, hence expireOlderThan - // currentTime - expireSnapshotsCommand - .expireOlderThan(System.currentTimeMillis()) - .retainLast(versions) - .commit(); + result = + deleteFiles + ? expireSnapshotsWithFiles( + table, System.currentTimeMillis(), versions, backupEnabled, backupDir) + : expireSnapshotsMetadataOnly(table, System.currentTimeMillis(), versions); + } + + return result; + } + + /** + * Expire snapshots using Table API (metadata-only, no file deletion). Efficient - skips file + * enumeration entirely. + */ + private ExpireSnapshots.Result expireSnapshotsMetadataOnly( + Table table, long expireBeforeTimestampMs, Integer versions) { + + org.apache.iceberg.ExpireSnapshots expireSnapshots = + table.expireSnapshots().cleanExpiredFiles(false).expireOlderThan(expireBeforeTimestampMs); + + if (versions != null) { + expireSnapshots = expireSnapshots.retainLast(versions); + } + + expireSnapshots.commit(); + + // Return empty result for metadata-only operation + return ImmutableExpireSnapshots.Result.builder() + .deletedDataFilesCount(0L) + .deletedManifestsCount(0L) + .deletedManifestListsCount(0L) + .deletedPositionDeleteFilesCount(0L) + .deletedEqualityDeleteFilesCount(0L) + .build(); + } + + /** + * Expire snapshots using SparkActions API with file deletion. Distributed execution via Spark, + * with optional backup support. + */ + private ExpireSnapshots.Result expireSnapshotsWithFiles( + Table table, + long expireBeforeTimestampMs, + Integer versions, + boolean backupEnabled, + String backupDir) { + + ExpireSnapshots expireSnapshotsAction = + backupEnabled + ? createExpireSnapshotsActionWithBackup(table, backupDir) + : SparkActions.get(spark).expireSnapshots(table); + + expireSnapshotsAction = expireSnapshotsAction.expireOlderThan(expireBeforeTimestampMs); + + if (versions != null) { + expireSnapshotsAction = expireSnapshotsAction.retainLast(versions); + } + + return expireSnapshotsAction.execute(); + } + + /** + * Create ExpireSnapshots action with custom backup logic for data files. Backup logic runs + * distributed on Spark executors. + */ + private ExpireSnapshots createExpireSnapshotsActionWithBackup(Table table, String backupDir) { + Map dataManifestsCache = new ConcurrentHashMap<>(); + Path backupDirRoot = new Path(table.location(), backupDir); + Path dataDirRoot = new Path(table.location(), "data"); + + return SparkActions.get(spark) + .expireSnapshots(table) + .deleteWith( + file -> + processExpiredFileWithBackup( + file, table, backupDir, backupDirRoot, dataDirRoot, dataManifestsCache)); + } + + /** + * Process a single expired file: backup if needed, otherwise delete. This method runs on Spark + * executors in a distributed manner. + */ + private void processExpiredFileWithBackup( + String file, + Table table, + String backupDir, + Path backupDirRoot, + Path dataDirRoot, + Map dataManifestsCache) { + + log.info("Processing expired file {}", file); + + // Skip metadata.json - Iceberg commits handle these + if (file.endsWith("metadata.json")) { + log.info("Skipped deleting metadata file {}", file); + return; + } + + // Skip files already in backup directory + if (file.contains(backupDirRoot.toString())) { + log.info("Skipped deleting backup file {}", file); + return; + } + + // Backup data files with manifests, delete everything else + if (file.contains(dataDirRoot.toString()) + && isExistBackupDataManifests(table, file, backupDir, dataManifestsCache)) { + backupDataFile(file, table, backupDir); + } else { + deleteFile(file); + } + } + + /** Move a data file to the backup directory. */ + private void backupDataFile(String file, Table table, String backupDir) { + Path backupFilePath = getTrashPath(table, file, backupDir); + log.info("Moving expired data file {} to {}", file, backupFilePath); + try { + rename(new Path(file), backupFilePath); + fs().setTimes(backupFilePath, System.currentTimeMillis(), -1); + } catch (IOException e) { + log.error(String.format("Move operation failed for file: %s", file), e); + } + } + + /** Delete a file directly (manifests, manifest lists, data files without manifests). */ + private void deleteFile(String file) { + log.info("Deleting expired file {}", file); + try { + fs().delete(new Path(file), false); + } catch (IOException e) { + log.error(String.format("Delete operation failed for file: %s", file), e); } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java index 36c40efd3..d5273580a 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java @@ -3,6 +3,7 @@ import com.linkedin.openhouse.common.metrics.DefaultOtelConfig; import com.linkedin.openhouse.common.metrics.OtelEmitter; import com.linkedin.openhouse.jobs.spark.state.StateManager; +import com.linkedin.openhouse.jobs.util.AppConstants; import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -11,6 +12,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ExpireSnapshots; /** * Class with main entry point to run as a table snapshot expiration job. Snapshots for table which @@ -24,11 +27,14 @@ public class SnapshotsExpirationSparkApp extends BaseTableSparkApp { private final String granularity; private final int maxAge; private final int versions; + private final boolean deleteFiles; + private final String backupDir; public static class DEFAULT_CONFIGURATION { public static final int MAX_AGE = 3; public static final String GRANULARITY = ChronoUnit.DAYS.toString(); public static final int VERSIONS = 0; + public static final boolean DELETE_FILES = false; } public SnapshotsExpirationSparkApp( @@ -38,6 +44,8 @@ public SnapshotsExpirationSparkApp( int maxAge, String granularity, int versions, + boolean deleteFiles, + String backupDir, OtelEmitter otelEmitter) { super(jobId, stateManager, fqtn, otelEmitter); // By default, always enforce a time to live for snapshots even if unconfigured @@ -49,17 +57,59 @@ public SnapshotsExpirationSparkApp( this.granularity = granularity; } this.versions = versions; + this.deleteFiles = deleteFiles; + this.backupDir = backupDir; } @Override protected void runInner(Operations ops) { + Table table = ops.getTable(fqtn); + boolean backupEnabled = + Boolean.parseBoolean( + table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false")); + log.info( - "Snapshot expiration app start for table {}, expiring older than {} {}s or with more than {} versions", + "Snapshot expiration app start for table {}, expiring older than {} {}s or with more than {} versions, deleteFiles={}, backupEnabled={}, backupDir={}", fqtn, maxAge, granularity, - versions); - ops.expireSnapshots(fqtn, maxAge, granularity, versions); + versions, + deleteFiles, + backupEnabled, + backupDir); + + long startTime = System.currentTimeMillis(); + ExpireSnapshots.Result result = + ops.expireSnapshots( + fqtn, maxAge, granularity, versions, deleteFiles, backupEnabled, backupDir); + long duration = System.currentTimeMillis() - startTime; + + // Log results + log.info( + "Snapshot expiration completed for table {}. Deleted {} data files, {} equality delete files, {} position delete files, {} manifests, {} manifest lists", + fqtn, + result.deletedDataFilesCount(), + result.deletedEqualityDeleteFilesCount(), + result.deletedPositionDeleteFilesCount(), + result.deletedManifestsCount(), + result.deletedManifestListsCount()); + + // Emit metrics + recordMetrics(duration); + } + + private void recordMetrics(long duration) { + io.opentelemetry.api.common.Attributes attributes = + io.opentelemetry.api.common.Attributes.of( + io.opentelemetry.api.common.AttributeKey.stringKey(AppConstants.TABLE_NAME), + fqtn, + io.opentelemetry.api.common.AttributeKey.booleanKey(AppConstants.DELETE_FILES_ENABLED), + deleteFiles); + otelEmitter.time( + SnapshotsExpirationSparkApp.class.getName(), + AppConstants.SNAPSHOTS_EXPIRATION_DURATION, + duration, + attributes); } public static void main(String[] args) { @@ -76,6 +126,14 @@ public static SnapshotsExpirationSparkApp createApp(String[] args, OtelEmitter o extraOptions.add(new Option("g", "granularity", true, "Granularity: day")); extraOptions.add( new Option("v", "versions", true, "Number of versions to keep after snapshot expiration")); + extraOptions.add( + new Option( + "d", + "deleteFiles", + false, + "Delete expired snapshot files (data, manifests, manifest lists)")); + extraOptions.add( + new Option("b", "backupDir", true, "Backup directory for data files (default: .backup)")); CommandLine cmdLine = createCommandLine(args, extraOptions); return new SnapshotsExpirationSparkApp( getJobId(cmdLine), @@ -84,6 +142,8 @@ public static SnapshotsExpirationSparkApp createApp(String[] args, OtelEmitter o Integer.parseInt(cmdLine.getOptionValue("maxAge", "0")), cmdLine.getOptionValue("granularity", ""), Integer.parseInt(cmdLine.getOptionValue("versions", "0")), + cmdLine.hasOption("deleteFiles"), + cmdLine.getOptionValue("backupDir", ".backup"), otelEmitter); } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java index a756a65f8..e17c1e10c 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java @@ -21,6 +21,10 @@ public final class AppConstants { public static final String REWRITTEN_DATA_FILE_BYTES = "rewritten_data_file_bytes"; public static final String REWRITTEN_DATA_FILE_GROUP_COUNT = "rewritten_data_file_group_count"; + // Snapshot Expiration metrics + public static final String SNAPSHOTS_EXPIRATION_DURATION = "snapshots_expiration_duration"; + public static final String DELETE_FILES_ENABLED = "delete_files_enabled"; + // Openhouse jobs status tags public static final String STATUS = "status"; public static final String STATUS_CODE = "status_code"; diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/SnapshotExpirationTaskTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/SnapshotExpirationTaskTest.java index bc284bb46..3da5cfc42 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/SnapshotExpirationTaskTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/SnapshotExpirationTaskTest.java @@ -104,4 +104,113 @@ void testSnapshotExpirationJobWithMaxAgeAndVersions() { .collect(Collectors.toList()); Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); } + + @Test + void testSnapshotExpirationWithDeleteFilesDisabled() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata); + + List expectedArgs = + Stream.of("--tableName", tableMetadata.fqtn()).collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } + + @Test + void testSnapshotExpirationWithDeleteFilesEnabled() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata, 1000, 2000, 3000); + + // deleteFiles is no longer passed via scheduler, so it won't be in args + List expectedArgs = + Stream.of("--tableName", tableMetadata.fqtn()).collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } + + @Test + void testSnapshotExpirationWithDeleteFilesAndMaxAgeConfig() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata, 1000, 2000, 3000); + + HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class); + int maxAge = 1; + History.GranularityEnum granularity = History.GranularityEnum.DAY; + + Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock); + Mockito.when(historyConfigMock.getMaxAge()).thenReturn(maxAge); + Mockito.when(historyConfigMock.getGranularity()).thenReturn(granularity); + List expectedArgs = + Stream.of( + "--tableName", + tableMetadata.fqtn(), + "--maxAge", + String.valueOf(maxAge), + "--granularity", + granularity.getValue()) + .collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } + + @Test + void testSnapshotExpirationWithDeleteFilesAndVersionsConfig() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata, 1000, 2000, 3000); + + HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class); + int versions = 3; + + Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock); + Mockito.when(historyConfigMock.getVersions()).thenReturn(versions); + List expectedArgs = + Stream.of("--tableName", tableMetadata.fqtn(), "--versions", String.valueOf(versions)) + .collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } + + @Test + void testSnapshotExpirationWithDeleteFilesAndFullConfig() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata, 1000, 2000, 3000); + + HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class); + int maxAge = 3; + History.GranularityEnum granularity = History.GranularityEnum.DAY; + int versions = 3; + + Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock); + Mockito.when(historyConfigMock.getMaxAge()).thenReturn(maxAge); + Mockito.when(historyConfigMock.getGranularity()).thenReturn(granularity); + Mockito.when(historyConfigMock.getVersions()).thenReturn(versions); + + List expectedArgs = + Stream.of( + "--tableName", + tableMetadata.fqtn(), + "--maxAge", + String.valueOf(maxAge), + "--granularity", + granularity.getValue(), + "--versions", + String.valueOf(versions)) + .collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } + + @Test + void testSnapshotExpirationWithTimeoutsAndDeleteFiles() { + long pollIntervalMs = 1000L; + long queuedTimeoutMs = 5000L; + long taskTimeoutMs = 10000L; + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask( + jobsClient, + tablesClient, + tableMetadata, + pollIntervalMs, + queuedTimeoutMs, + taskTimeoutMs); + + List expectedArgs = + Stream.of("--tableName", tableMetadata.fqtn()).collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } } diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java index 4f3d6ea3e..132538364 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java @@ -674,6 +674,112 @@ public void testSnapshotExpirationWithHoursDaysMonthsYears() throws Exception { } } + @Test + public void testSnapshotsExpirationWithFilesDeletion() throws Exception { + final String tableName = "db.test_es_delete_files"; + final int numInserts = 5; + final int maxAge = 0; + final String timeGranularity = "DAYS"; + + List snapshotIds; + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + prepareTable(ops, tableName); + populateTable(ops, tableName, numInserts); + snapshotIds = getSnapshotIds(ops, tableName); + Assertions.assertEquals( + numInserts, + snapshotIds.size(), + String.format("There must be %d snapshot(s) after inserts", numInserts)); + Table table = ops.getTable(tableName); + log.info("Loaded table {}, location {}", table.name(), table.location()); + + // Expire snapshots with deleteFiles=true + org.apache.iceberg.actions.ExpireSnapshots.Result resultWithDeletion = + ops.expireSnapshots(table, maxAge, timeGranularity, 0, true); + + // Verify that the result object is returned properly + log.info( + "Snapshot expiration with deleteFiles=true: deleted {} data files, {} equality delete files, {} position delete files, {} manifests, {} manifest lists", + resultWithDeletion.deletedDataFilesCount(), + resultWithDeletion.deletedEqualityDeleteFilesCount(), + resultWithDeletion.deletedPositionDeleteFilesCount(), + resultWithDeletion.deletedManifestsCount(), + resultWithDeletion.deletedManifestListsCount()); + + // Verify result is not null and has the expected structure + Assertions.assertNotNull(resultWithDeletion, "Result should not be null"); + + // When deleteFiles=true, manifests and manifest lists should be deleted + // Data files may or may not be deleted depending on whether they're still referenced + Assertions.assertTrue( + resultWithDeletion.deletedManifestsCount() > 0 + || resultWithDeletion.deletedManifestListsCount() > 0, + "Should have deleted manifests or manifest lists from expired snapshots"); + + // Only retain the last snapshot + checkSnapshots(table, snapshotIds.subList(snapshotIds.size() - 1, snapshotIds.size())); + } + + // restart the app to reload catalog cache + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // verify that new apps see snapshots correctly + checkSnapshots( + ops, tableName, snapshotIds.subList(snapshotIds.size() - 1, snapshotIds.size())); + } + } + + @Test + public void testSnapshotsExpirationWithoutFilesDeletion() throws Exception { + final String tableName = "db.test_es_no_delete_files"; + final int numInserts = 5; + final int maxAge = 0; + final String timeGranularity = "DAYS"; + + List snapshotIds; + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + prepareTable(ops, tableName); + populateTable(ops, tableName, numInserts); + snapshotIds = getSnapshotIds(ops, tableName); + Assertions.assertEquals( + numInserts, + snapshotIds.size(), + String.format("There must be %d snapshot(s) after inserts", numInserts)); + Table table = ops.getTable(tableName); + log.info("Loaded table {}, location {}", table.name(), table.location()); + + // Expire snapshots with deleteFiles=false (default behavior) + org.apache.iceberg.actions.ExpireSnapshots.Result resultWithoutDeletion = + ops.expireSnapshots(table, maxAge, timeGranularity, 0, false); + + // Verify that no files were deleted (custom delete function prevents deletion) + log.info( + "Snapshot expiration with deleteFiles=false: deleted {} data files, {} manifests, {} manifest lists", + resultWithoutDeletion.deletedDataFilesCount(), + resultWithoutDeletion.deletedManifestsCount(), + resultWithoutDeletion.deletedManifestListsCount()); + + // With deleteFiles=false, no files should be physically deleted + Assertions.assertEquals( + 0, + resultWithoutDeletion.deletedDataFilesCount(), + "Should not delete data files when deleteFiles=false"); + Assertions.assertEquals( + 0, + resultWithoutDeletion.deletedManifestsCount(), + "Should not delete manifest files when deleteFiles=false"); + + // Only retain the last snapshot + checkSnapshots(table, snapshotIds.subList(snapshotIds.size() - 1, snapshotIds.size())); + } + + // restart the app to reload catalog cache + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // verify that new apps see snapshots correctly + checkSnapshots( + ops, tableName, snapshotIds.subList(snapshotIds.size() - 1, snapshotIds.size())); + } + } + @Test public void testStagedFilesDelete() throws Exception { final String tableName = "db.test_staged_delete"; diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml index 3e795d4db..13857b74c 100644 --- a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml +++ b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml @@ -47,7 +47,7 @@ jobs: << : *livy-engine - type: SNAPSHOTS_EXPIRATION class-name: com.linkedin.openhouse.jobs.spark.SnapshotsExpirationSparkApp - args: [] + args: ["--deleteFiles", "--backupDir", ".backup"] << : *apps-defaults << : *livy-engine - type: ORPHAN_FILES_DELETION diff --git a/jobs-scheduler.Dockerfile b/jobs-scheduler.Dockerfile index 7ae586333..5ec0c11c3 100644 --- a/jobs-scheduler.Dockerfile +++ b/jobs-scheduler.Dockerfile @@ -31,6 +31,10 @@ RUN chown -R openhouse:openhouse $USER_HOME # Setup default path for Java RUN mkdir -p /usr/java && ln -sfn /export/apps/jdk/JDK-1_8_0_172 /usr/java/default +RUN echo '#!/bin/sh' > /usr/local/bin/entrypoint.sh && \ + echo 'exec java -Xmx256M -Xms64M -XX:NativeMemoryTracking=summary -cp ${APP_NAME}.jar com.linkedin.openhouse.jobs.scheduler.JobsScheduler --tokenFile /var/config/openhouse.token "$@"' >> /usr/local/bin/entrypoint.sh && \ + chmod +x /usr/local/bin/entrypoint.sh + USER $USER -ENTRYPOINT ["sh", "-c", "java -Xmx256M -Xms64M -XX:NativeMemoryTracking=summary -cp $APP_NAME.jar com.linkedin.openhouse.jobs.scheduler.JobsScheduler --tokenFile /var/config/openhouse.token $@"] +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]