diff --git a/apps/optimizer-scheduler/build.gradle b/apps/optimizer-scheduler/build.gradle new file mode 100644 index 000000000..a232ad28a --- /dev/null +++ b/apps/optimizer-scheduler/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'org.springframework.boot' version '2.7.8' +} + +dependencies { + implementation project(':apps:optimizer') + implementation 'org.springframework.boot:spring-boot-starter:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8' + runtimeOnly 'mysql:mysql-connector-java:8.0.33' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' + testRuntimeOnly 'com.h2database:h2' +} + +test { + useJUnitPlatform() +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/BinPacker.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/BinPacker.java new file mode 100644 index 000000000..240a50661 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/BinPacker.java @@ -0,0 +1,74 @@ +package com.linkedin.openhouse.scheduler; + +import com.linkedin.openhouse.optimizer.entity.TableOperationRow; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Greedy first-fit descending bin-packer for table operations. + * + *

Tables are sorted by descending file count, then assigned to the first bin whose running total + * stays below {@code maxFilesPerBin}. A table larger than the limit gets its own bin (oversized + * bins are allowed — we never drop a table). + * + *

Tables with no stats entry are treated as file count = 0 and are still schedulable. + */ +public final class BinPacker { + + private BinPacker() {} + + /** + * Pack {@code pending} rows into bins. + * + * @param pending operations to pack; must not be empty + * @param fileCountByUuid map from tableUuid to number of current data files; missing entries + * default to 0 + * @param maxFilesPerBin maximum total file count per bin + * @return list of bins, each bin being a non-empty list of rows + */ + public static List> pack( + List pending, Map fileCountByUuid, long maxFilesPerBin) { + + if (pending.isEmpty()) { + return List.of(); + } + + List sorted = + pending.stream() + .sorted( + Comparator.comparingLong( + (TableOperationRow r) -> fileCountByUuid.getOrDefault(r.getTableUuid(), 0L)) + .reversed()) + .collect(Collectors.toList()); + + List> bins = new ArrayList<>(); + List binTotals = new ArrayList<>(); + + for (TableOperationRow row : sorted) { + long cost = fileCountByUuid.getOrDefault(row.getTableUuid(), 0L); + + int placed = -1; + for (int i = 0; i < bins.size(); i++) { + if (binTotals.get(i) + cost <= maxFilesPerBin || binTotals.get(i) == 0) { + placed = i; + break; + } + } + + if (placed >= 0) { + bins.get(placed).add(row); + binTotals.set(placed, binTotals.get(placed) + cost); + } else { + List newBin = new ArrayList<>(); + newBin.add(row); + bins.add(newBin); + binTotals.add(cost); + } + } + + return bins; + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerApplication.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerApplication.java new file mode 100644 index 000000000..216ae9748 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerApplication.java @@ -0,0 +1,24 @@ +package com.linkedin.openhouse.scheduler; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.context.annotation.Bean; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; + +/** Entry point for the Optimizer Scheduler application. */ +@SpringBootApplication +@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.entity") +@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository") +public class SchedulerApplication { + + public static void main(String[] args) { + SpringApplication.run(SchedulerApplication.class, args); + } + + @Bean + public CommandLineRunner run(SchedulerRunner runner) { + return args -> runner.schedule(); + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerRunner.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerRunner.java new file mode 100644 index 000000000..d6f3d404f --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerRunner.java @@ -0,0 +1,121 @@ +package com.linkedin.openhouse.scheduler; + +import com.linkedin.openhouse.optimizer.entity.TableOperationRow; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.scheduler.client.JobsServiceClient; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** Reads PENDING rows from the optimizer DB, bin-packs them, and submits one Spark job per bin. */ +@Slf4j +@Component +@RequiredArgsConstructor +public class SchedulerRunner { + + private final TableOperationsRepository operationsRepo; + private final TableStatsRepository statsRepo; + private final JobsServiceClient jobsClient; + + @Value("${scheduler.bin-size-max-files}") + private long maxFiles; + + @Value("${scheduler.operation-type}") + private String operationType; + + @Value("${scheduler.results-endpoint}") + private String resultsEndpoint; + + @Transactional + public void schedule() { + List pending = + operationsRepo.find(operationType, "PENDING", null, null, null); + if (pending.isEmpty()) { + log.info("No PENDING operations of type {}; nothing to schedule", operationType); + return; + } + + Set uuids = + pending.stream().map(TableOperationRow::getTableUuid).collect(Collectors.toSet()); + + Map fileCountByUuid = + statsRepo.findAllById(uuids).stream() + .collect( + Collectors.toMap( + row -> row.getTableUuid(), + row -> { + if (row.getStats() == null || row.getStats().getSnapshot() == null) { + return 0L; + } + Long count = row.getStats().getSnapshot().getNumCurrentFiles(); + return count != null ? count : 0L; + })); + + List> bins = BinPacker.pack(pending, fileCountByUuid, maxFiles); + log.info( + "Packed {} PENDING operations into {} bins (maxFiles={})", + pending.size(), + bins.size(), + maxFiles); + + bins.forEach(this::submitBin); + } + + private void submitBin(List bin) { + // 0. Cancel duplicate PENDING rows per (tableUuid, operationType) — keep only the first row. + bin.stream() + .collect(Collectors.groupingBy(TableOperationRow::getTableUuid)) + .forEach( + (uuid, rows) -> { + TableOperationRow keep = rows.get(0); + operationsRepo.cancelDuplicatePending(uuid, operationType, keep.getId()); + }); + + // 1. Claim all rows (PENDING → SCHEDULING) — prevents a second scheduler instance from + // double-submitting. Any row already claimed (returns 0) is excluded from this batch. + List claimed = + bin.stream() + .filter( + r -> operationsRepo.markScheduling(r.getId(), r.getVersion(), Instant.now()) == 1) + .collect(Collectors.toList()); + + if (claimed.isEmpty()) { + log.info("All rows in bin already claimed by another scheduler instance; skipping"); + return; + } + + // 2. Submit a job for the claimed rows only. + List tableNames = + claimed.stream() + .map(r -> r.getDatabaseName() + "." + r.getTableName()) + .collect(Collectors.toList()); + List opIds = + claimed.stream().map(TableOperationRow::getId).collect(Collectors.toList()); + + String jobName = "batched-" + operationType.toLowerCase() + "-" + Instant.now().toEpochMilli(); + Optional jobId = + jobsClient.launch(jobName, operationType, tableNames, opIds, resultsEndpoint); + + if (jobId.isPresent()) { + // 3. Transition SCHEDULING → SCHEDULED with the jobId. + for (TableOperationRow r : claimed) { + operationsRepo.markScheduled(r.getId(), r.getVersion() + 1, jobId.get()); + } + log.info("Submitted job {} for {} tables", jobId.get(), claimed.size()); + } else { + log.warn( + "Job submission failed for {} SCHEDULING rows; the analyzer's scheduledTimeout will" + + " detect and overwrite stale rows", + claimed.size()); + } + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/client/JobsServiceClient.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/client/JobsServiceClient.java new file mode 100644 index 000000000..76a96f957 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/client/JobsServiceClient.java @@ -0,0 +1,80 @@ +package com.linkedin.openhouse.scheduler.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * Client for the OpenHouse Jobs Service. + * + *

Submits one {@code BatchedOrphanFilesDeletionSparkApp} job per bin via {@code POST /jobs}. + */ +@Slf4j +public class JobsServiceClient { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final Duration TIMEOUT = Duration.ofSeconds(30); + + private final WebClient webClient; + private final String clusterId; + + public JobsServiceClient(WebClient webClient, String clusterId) { + this.webClient = webClient; + this.clusterId = clusterId; + } + + /** + * Submit a batched Spark job for the given tables. + * + * @param jobName human-readable name for the job + * @param jobType operation type string (e.g. "ORPHAN_FILES_DELETION") + * @param tableNames fully-qualified table names (db.table) + * @param operationIds operation UUIDs — parallel to tableNames + * @param resultsEndpoint base URL the Spark app PATCHes results back to + * @return job ID if the submission succeeded, empty if an error occurred + */ + public Optional launch( + String jobName, + String jobType, + List tableNames, + List operationIds, + String resultsEndpoint) { + try { + ObjectNode body = MAPPER.createObjectNode(); + body.put("jobName", jobName); + body.put("clusterId", clusterId); + + ObjectNode jobConf = body.putObject("jobConf"); + jobConf.put("jobType", jobType); + + ArrayNode args = jobConf.putArray("args"); + args.add("--tableNames"); + args.add(String.join(",", tableNames)); + args.add("--operationIds"); + args.add(String.join(",", operationIds)); + args.add("--resultsEndpoint"); + args.add(resultsEndpoint); + + String responseBody = + webClient + .post() + .uri("/jobs") + .bodyValue(body) + .retrieve() + .bodyToMono(String.class) + .timeout(TIMEOUT) + .block(); + + String jobId = MAPPER.readTree(responseBody).path("jobId").asText(null); + return Optional.ofNullable(jobId); + } catch (Exception e) { + log.error("Failed to submit job '{}': {}", jobName, e.getMessage()); + return Optional.empty(); + } + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/config/SchedulerConfig.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/config/SchedulerConfig.java new file mode 100644 index 000000000..afdc36240 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/config/SchedulerConfig.java @@ -0,0 +1,27 @@ +package com.linkedin.openhouse.scheduler.config; + +import com.linkedin.openhouse.scheduler.client.JobsServiceClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.client.WebClient; + +@Configuration +public class SchedulerConfig { + + @Value("${jobs.base-uri}") + private String jobsBaseUri; + + @Value("${scheduler.cluster-id}") + private String clusterId; + + @Bean + public WebClient jobsWebClient() { + return WebClient.builder().baseUrl(jobsBaseUri).build(); + } + + @Bean + public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) { + return new JobsServiceClient(jobsWebClient, clusterId); + } +} diff --git a/apps/optimizer-scheduler/src/main/resources/application.properties b/apps/optimizer-scheduler/src/main/resources/application.properties new file mode 100644 index 000000000..75074b0fe --- /dev/null +++ b/apps/optimizer-scheduler/src/main/resources/application.properties @@ -0,0 +1,11 @@ +spring.application.name=openhouse-optimizer-scheduler +spring.main.web-application-type=none +spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:schedulerdb;DB_CLOSE_DELAY=-1;MODE=MySQL} +spring.datasource.username=${OPTIMIZER_DB_USER:sa} +spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} +spring.jpa.hibernate.ddl-auto=none +jobs.base-uri=${JOBS_BASE_URI:http://localhost:8002} +scheduler.bin-size-max-files=${SCHEDULER_BIN_SIZE_MAX_FILES:1000000} +scheduler.operation-type=${SCHEDULER_OPERATION_TYPE:ORPHAN_FILES_DELETION} +scheduler.results-endpoint=${SCHEDULER_RESULTS_ENDPOINT:http://openhouse-optimizer:8080/v1/table-operations} +scheduler.cluster-id=${SCHEDULER_CLUSTER_ID:LocalHadoopCluster} diff --git a/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/BinPackerTest.java b/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/BinPackerTest.java new file mode 100644 index 000000000..69ea4d3bc --- /dev/null +++ b/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/BinPackerTest.java @@ -0,0 +1,104 @@ +package com.linkedin.openhouse.scheduler; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.entity.TableOperationRow; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class BinPackerTest { + + private static TableOperationRow row(String uuid) { + return TableOperationRow.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(uuid) + .databaseName("db") + .tableName("tbl_" + uuid) + .operationType("ORPHAN_FILES_DELETION") + .status("PENDING") + .build(); + } + + @Test + void emptyInput_returnsEmptyBins() { + assertThat(BinPacker.pack(List.of(), Map.of(), 1_000_000L)).isEmpty(); + } + + @Test + void singleTable_oneBin() { + TableOperationRow r = row("uuid-1"); + List> bins = + BinPacker.pack(List.of(r), Map.of("uuid-1", 100L), 1_000_000L); + + assertThat(bins).hasSize(1); + assertThat(bins.get(0)).containsExactly(r); + } + + @Test + void tablesUnderLimit_oneBin() { + TableOperationRow r1 = row("a"); + TableOperationRow r2 = row("b"); + TableOperationRow r3 = row("c"); + + List> bins = + BinPacker.pack( + List.of(r1, r2, r3), Map.of("a", 300_000L, "b", 300_000L, "c", 300_000L), 1_000_000L); + + assertThat(bins).hasSize(1); + assertThat(bins.get(0)).hasSize(3); + } + + @Test + void tablesOverLimit_twoBins() { + TableOperationRow r1 = row("a"); + TableOperationRow r2 = row("b"); + TableOperationRow r3 = row("c"); + + // 600k + 600k would exceed 1M; 400k fits after 600k + List> bins = + BinPacker.pack( + List.of(r1, r2, r3), Map.of("a", 600_000L, "b", 600_000L, "c", 400_000L), 1_000_000L); + + assertThat(bins).hasSize(2); + assertThat(bins.get(0)).hasSize(2); // 600k + 400k + assertThat(bins.get(1)).hasSize(1); // 600k alone + } + + @Test + void largeTableAlone_exceedsLimitSingleBin() { + TableOperationRow r = row("big"); + List> bins = + BinPacker.pack(List.of(r), Map.of("big", 5_000_000L), 1_000_000L); + + assertThat(bins).hasSize(1); + assertThat(bins.get(0)).containsExactly(r); + } + + @Test + void noStats_fileCountZero_groupedNormally() { + TableOperationRow r1 = row("x"); + TableOperationRow r2 = row("y"); + + // No stats entries — both get cost 0, both fit in one bin + List> bins = BinPacker.pack(List.of(r1, r2), Map.of(), 1_000_000L); + + assertThat(bins).hasSize(1); + assertThat(bins.get(0)).hasSize(2); + } + + @Test + void sortedDescending_largestTablesFirst() { + TableOperationRow small = row("small"); + TableOperationRow large = row("large"); + + List> bins = + BinPacker.pack(List.of(small, large), Map.of("small", 100L, "large", 900_000L), 1_000_000L); + + // Both fit in one bin, large should appear first due to descending sort + assertThat(bins).hasSize(1); + assertThat(bins.get(0).get(0).getTableUuid()).isEqualTo("large"); + assertThat(bins.get(0).get(1).getTableUuid()).isEqualTo("small"); + } +} diff --git a/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/SchedulerRunnerTest.java b/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/SchedulerRunnerTest.java new file mode 100644 index 000000000..bc3578572 --- /dev/null +++ b/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/SchedulerRunnerTest.java @@ -0,0 +1,186 @@ +package com.linkedin.openhouse.scheduler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.openhouse.optimizer.entity.TableOperationRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsRow; +import com.linkedin.openhouse.optimizer.model.TableStats; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.scheduler.client.JobsServiceClient; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; + +@ExtendWith(MockitoExtension.class) +class SchedulerRunnerTest { + + @Mock private TableOperationsRepository operationsRepo; + @Mock private TableStatsRepository statsRepo; + @Mock private JobsServiceClient jobsClient; + + @InjectMocks private SchedulerRunner runner; + + @BeforeEach + void injectConfig() { + ReflectionTestUtils.setField(runner, "maxFiles", 1_000_000L); + ReflectionTestUtils.setField(runner, "operationType", "ORPHAN_FILES_DELETION"); + ReflectionTestUtils.setField( + runner, "resultsEndpoint", "http://localhost:8080/v1/table-operations"); + } + + private TableOperationRow pendingRow(String uuid, String db, String table) { + return TableOperationRow.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(uuid) + .databaseName(db) + .tableName(table) + .operationType("ORPHAN_FILES_DELETION") + .status("PENDING") + .version(0L) + .build(); + } + + private TableStatsRow statsRow(String uuid, long numCurrentFiles) { + TableStats stats = + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().numCurrentFiles(numCurrentFiles).build()) + .build(); + return TableStatsRow.builder().tableUuid(uuid).stats(stats).build(); + } + + @Test + void schedule_noPendingOps_noJobSubmitted() { + when(operationsRepo.find("ORPHAN_FILES_DELETION", "PENDING", null, null, null)) + .thenReturn(List.of()); + + runner.schedule(); + + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + } + + @Test + void schedule_twoStepClaim_claimsAndSchedules() { + String uuid = UUID.randomUUID().toString(); + TableOperationRow row = pendingRow(uuid, "db1", "tbl1"); + + when(operationsRepo.find("ORPHAN_FILES_DELETION", "PENDING", null, null, null)) + .thenReturn(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100_000L))); + when(operationsRepo.markScheduling(anyString(), anyLong(), any())).thenReturn(1); + when(operationsRepo.markScheduled(anyString(), anyLong(), anyString())).thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-123")); + + runner.schedule(); + + // Step 1: PENDING → SCHEDULING + verify(operationsRepo).markScheduling(eq(row.getId()), eq(0L), any()); + // Step 2: SCHEDULING → SCHEDULED with jobId + verify(operationsRepo).markScheduled(eq(row.getId()), eq(1L), eq("job-123")); + + ArgumentCaptor> tableNamesCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor> opIdsCaptor = ArgumentCaptor.forClass(List.class); + verify(jobsClient) + .launch( + anyString(), + eq("ORPHAN_FILES_DELETION"), + tableNamesCaptor.capture(), + opIdsCaptor.capture(), + anyString()); + + assertThat(tableNamesCaptor.getValue()).containsExactly("db1.tbl1"); + assertThat(opIdsCaptor.getValue()).containsExactly(row.getId()); + } + + @Test + void schedule_jobLaunchFails_rowsStayScheduling() { + String uuid = UUID.randomUUID().toString(); + TableOperationRow row = pendingRow(uuid, "db1", "tbl1"); + + when(operationsRepo.find("ORPHAN_FILES_DELETION", "PENDING", null, null, null)) + .thenReturn(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of()); + when(operationsRepo.markScheduling(anyString(), anyLong(), any())).thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.empty()); + + runner.schedule(); + + verify(operationsRepo).markScheduling(eq(row.getId()), eq(0L), any()); + verify(operationsRepo, never()).markScheduled(anyString(), anyLong(), anyString()); + } + + @Test + void schedule_rowAlreadyClaimed_skipsSubmit() { + String uuid = UUID.randomUUID().toString(); + TableOperationRow row = pendingRow(uuid, "db1", "tbl1"); + + when(operationsRepo.find("ORPHAN_FILES_DELETION", "PENDING", null, null, null)) + .thenReturn(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of()); + when(operationsRepo.markScheduling(anyString(), anyLong(), any())).thenReturn(0); + + runner.schedule(); + + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + } + + @Test + void schedule_cancelsDuplicatePendingBeforeClaim() { + String uuid = UUID.randomUUID().toString(); + TableOperationRow row1 = pendingRow(uuid, "db1", "tbl1"); + TableOperationRow row2 = pendingRow(uuid, "db1", "tbl1"); + + when(operationsRepo.find("ORPHAN_FILES_DELETION", "PENDING", null, null, null)) + .thenReturn(List.of(row1, row2)); + when(statsRepo.findAllById(any())).thenReturn(List.of()); + when(operationsRepo.markScheduling(anyString(), anyLong(), any())).thenReturn(1); + when(operationsRepo.markScheduled(anyString(), anyLong(), anyString())).thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-789")); + + runner.schedule(); + + verify(operationsRepo) + .cancelDuplicatePending(eq(uuid), eq("ORPHAN_FILES_DELETION"), eq(row1.getId())); + } + + @Test + void schedule_claimsAllRowsInBin() { + String uuid1 = UUID.randomUUID().toString(); + String uuid2 = UUID.randomUUID().toString(); + TableOperationRow row1 = pendingRow(uuid1, "db1", "tbl1"); + TableOperationRow row2 = pendingRow(uuid2, "db1", "tbl2"); + + when(operationsRepo.find("ORPHAN_FILES_DELETION", "PENDING", null, null, null)) + .thenReturn(List.of(row1, row2)); + when(statsRepo.findAllById(any())).thenReturn(List.of()); + when(operationsRepo.markScheduling(anyString(), anyLong(), any())).thenReturn(1); + when(operationsRepo.markScheduled(anyString(), anyLong(), anyString())).thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-456")); + + runner.schedule(); + + verify(operationsRepo, times(2)).markScheduling(anyString(), eq(0L), any()); + verify(operationsRepo, times(2)).markScheduled(anyString(), eq(1L), eq("job-456")); + } +} diff --git a/apps/optimizer-scheduler/src/test/resources/application-test.properties b/apps/optimizer-scheduler/src/test/resources/application-test.properties new file mode 100644 index 000000000..3dcec13da --- /dev/null +++ b/apps/optimizer-scheduler/src/test/resources/application-test.properties @@ -0,0 +1,11 @@ +spring.datasource.url=jdbc:h2:mem:schedulertestdb;DB_CLOSE_DELAY=-1;MODE=MySQL +spring.datasource.username=sa +spring.datasource.password= +spring.jpa.hibernate.ddl-auto=none +spring.sql.init.mode=always +spring.sql.init.schema-locations=classpath:schema.sql +jobs.base-uri=http://localhost:9999 +scheduler.bin-size-max-files=1000000 +scheduler.operation-type=ORPHAN_FILES_DELETION +scheduler.results-endpoint=http://localhost:8080/v1/table-operations +scheduler.cluster-id=test-cluster diff --git a/apps/optimizer-scheduler/src/test/resources/schema.sql b/apps/optimizer-scheduler/src/test/resources/schema.sql new file mode 100644 index 000000000..75de24d3d --- /dev/null +++ b/apps/optimizer-scheduler/src/test/resources/schema.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS table_operations ( + id VARCHAR(36) NOT NULL, + table_uuid VARCHAR(36) NOT NULL, + database_name VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + operation_type VARCHAR(50) NOT NULL, + status VARCHAR(20) NOT NULL, + created_at TIMESTAMP(6) NOT NULL, + scheduled_at TIMESTAMP(6), + job_id VARCHAR(255), + version BIGINT, + PRIMARY KEY (id) +); + +CREATE TABLE IF NOT EXISTS table_stats ( + table_uuid VARCHAR(36) NOT NULL, + database_id VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + stats TEXT, + table_properties TEXT, + updated_at TIMESTAMP(6) NOT NULL, + PRIMARY KEY (table_uuid) +); diff --git a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java index 27424dfdc..0ba241b97 100644 --- a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java +++ b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java @@ -1,8 +1,10 @@ package com.linkedin.openhouse.optimizer.repository; import com.linkedin.openhouse.optimizer.entity.TableOperationRow; +import java.time.Instant; import java.util.List; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; @@ -26,4 +28,46 @@ List find( @Param("tableUuid") String tableUuid, @Param("databaseName") String databaseName, @Param("tableName") String tableName); + + /** + * Delete duplicate PENDING rows for the same (tableUuid, operationType), keeping only the + * specified row. Used by the Scheduler to deduplicate before claiming. + */ + @Modifying + @Query( + "DELETE FROM TableOperationRow r " + + "WHERE r.tableUuid = :tableUuid " + + "AND r.operationType = :operationType " + + "AND r.status = 'PENDING' " + + "AND r.id <> :keepId") + int cancelDuplicatePending( + @Param("tableUuid") String tableUuid, + @Param("operationType") String operationType, + @Param("keepId") String keepId); + + /** + * CAS transition: PENDING → SCHEDULING. Returns 1 if the row was claimed, 0 if already claimed by + * another instance or the version has changed. + */ + @Modifying + @Query( + "UPDATE TableOperationRow r " + + "SET r.status = 'SCHEDULING', r.scheduledAt = :scheduledAt, r.version = r.version + 1 " + + "WHERE r.id = :id AND r.version = :version AND r.status = 'PENDING'") + int markScheduling( + @Param("id") String id, + @Param("version") long version, + @Param("scheduledAt") Instant scheduledAt); + + /** + * CAS transition: SCHEDULING → SCHEDULED with the external job ID. Returns 1 on success, 0 if the + * row is no longer in SCHEDULING state at the expected version. + */ + @Modifying + @Query( + "UPDATE TableOperationRow r " + + "SET r.status = 'SCHEDULED', r.jobId = :jobId, r.version = r.version + 1 " + + "WHERE r.id = :id AND r.version = :version AND r.status = 'SCHEDULING'") + int markScheduled( + @Param("id") String id, @Param("version") long version, @Param("jobId") String jobId); } diff --git a/settings.gradle b/settings.gradle index 52873b677..48c81f458 100644 --- a/settings.gradle +++ b/settings.gradle @@ -52,6 +52,7 @@ include ':services:jobs' include ':services:optimizer' include ':apps:optimizer' include ':apps:optimizer-analyzer' +include ':apps:optimizer-scheduler' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'