Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions apps/optimizer-scheduler/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id 'openhouse.springboot-ext-conventions'
id 'org.springframework.boot' version '2.7.8'
}

dependencies {
implementation project(':apps:optimizer')
implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8'
runtimeOnly 'mysql:mysql-connector-java:8.0.33'
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8'
testRuntimeOnly 'com.h2database:h2'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.openhouse.scheduler;

import com.linkedin.openhouse.optimizer.entity.TableOperationRow;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Greedy first-fit descending bin-packer for table operations.
*
* <p>Tables are sorted by descending file count, then assigned to the first bin whose running total
* stays below {@code maxFilesPerBin}. A table larger than the limit gets its own bin (oversized
* bins are allowed — we never drop a table).
*
* <p>Tables with no stats entry are treated as file count = 0 and are still schedulable.
*/
public final class BinPacker {

private BinPacker() {}

/**
* Pack {@code pending} rows into bins.
*
* @param pending operations to pack; must not be empty
* @param fileCountByUuid map from tableUuid to number of current data files; missing entries
* default to 0
* @param maxFilesPerBin maximum total file count per bin
* @return list of bins, each bin being a non-empty list of rows
*/
public static List<List<TableOperationRow>> pack(
List<TableOperationRow> pending, Map<String, Long> fileCountByUuid, long maxFilesPerBin) {

if (pending.isEmpty()) {
return List.of();
}

List<TableOperationRow> sorted =
pending.stream()
.sorted(
Comparator.comparingLong(
(TableOperationRow r) -> fileCountByUuid.getOrDefault(r.getTableUuid(), 0L))
.reversed())
.collect(Collectors.toList());

List<List<TableOperationRow>> bins = new ArrayList<>();
List<Long> binTotals = new ArrayList<>();

for (TableOperationRow row : sorted) {
long cost = fileCountByUuid.getOrDefault(row.getTableUuid(), 0L);

int placed = -1;
for (int i = 0; i < bins.size(); i++) {
if (binTotals.get(i) + cost <= maxFilesPerBin || binTotals.get(i) == 0) {
placed = i;
break;
}
}

if (placed >= 0) {
bins.get(placed).add(row);
binTotals.set(placed, binTotals.get(placed) + cost);
} else {
List<TableOperationRow> newBin = new ArrayList<>();
newBin.add(row);
bins.add(newBin);
binTotals.add(cost);
}
}

return bins;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.openhouse.scheduler;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

/** Entry point for the Optimizer Scheduler application. */
@SpringBootApplication
@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.entity")
@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository")
public class SchedulerApplication {

public static void main(String[] args) {
SpringApplication.run(SchedulerApplication.class, args);
}

@Bean
public CommandLineRunner run(SchedulerRunner runner) {
return args -> runner.schedule();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.linkedin.openhouse.scheduler;

import com.linkedin.openhouse.optimizer.entity.TableOperationRow;
import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository;
import com.linkedin.openhouse.optimizer.repository.TableStatsRepository;
import com.linkedin.openhouse.scheduler.client.JobsServiceClient;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

/** Reads PENDING rows from the optimizer DB, bin-packs them, and submits one Spark job per bin. */
@Slf4j
@Component
@RequiredArgsConstructor
public class SchedulerRunner {

private final TableOperationsRepository operationsRepo;
private final TableStatsRepository statsRepo;
private final JobsServiceClient jobsClient;

@Value("${scheduler.bin-size-max-files}")
private long maxFiles;

@Value("${scheduler.operation-type}")
private String operationType;

@Value("${scheduler.results-endpoint}")
private String resultsEndpoint;

@Transactional
public void schedule() {
List<TableOperationRow> pending =
operationsRepo.find(operationType, "PENDING", null, null, null);
if (pending.isEmpty()) {
log.info("No PENDING operations of type {}; nothing to schedule", operationType);
return;
}

Set<String> uuids =
pending.stream().map(TableOperationRow::getTableUuid).collect(Collectors.toSet());

Map<String, Long> fileCountByUuid =
statsRepo.findAllById(uuids).stream()
.collect(
Collectors.toMap(
row -> row.getTableUuid(),
row -> {
if (row.getStats() == null || row.getStats().getSnapshot() == null) {
return 0L;
}
Long count = row.getStats().getSnapshot().getNumCurrentFiles();
return count != null ? count : 0L;
}));

List<List<TableOperationRow>> bins = BinPacker.pack(pending, fileCountByUuid, maxFiles);
log.info(
"Packed {} PENDING operations into {} bins (maxFiles={})",
pending.size(),
bins.size(),
maxFiles);

bins.forEach(this::submitBin);
}

private void submitBin(List<TableOperationRow> bin) {
// 0. Cancel duplicate PENDING rows per (tableUuid, operationType) — keep only the first row.
bin.stream()
.collect(Collectors.groupingBy(TableOperationRow::getTableUuid))
.forEach(
(uuid, rows) -> {
TableOperationRow keep = rows.get(0);
operationsRepo.cancelDuplicatePending(uuid, operationType, keep.getId());
});

// 1. Claim all rows (PENDING → SCHEDULING) — prevents a second scheduler instance from
// double-submitting. Any row already claimed (returns 0) is excluded from this batch.
List<TableOperationRow> claimed =
bin.stream()
.filter(
r -> operationsRepo.markScheduling(r.getId(), r.getVersion(), Instant.now()) == 1)
.collect(Collectors.toList());

if (claimed.isEmpty()) {
log.info("All rows in bin already claimed by another scheduler instance; skipping");
return;
}

// 2. Submit a job for the claimed rows only.
List<String> tableNames =
claimed.stream()
.map(r -> r.getDatabaseName() + "." + r.getTableName())
.collect(Collectors.toList());
List<String> opIds =
claimed.stream().map(TableOperationRow::getId).collect(Collectors.toList());

String jobName = "batched-" + operationType.toLowerCase() + "-" + Instant.now().toEpochMilli();
Optional<String> jobId =
jobsClient.launch(jobName, operationType, tableNames, opIds, resultsEndpoint);

if (jobId.isPresent()) {
// 3. Transition SCHEDULING → SCHEDULED with the jobId.
for (TableOperationRow r : claimed) {
operationsRepo.markScheduled(r.getId(), r.getVersion() + 1, jobId.get());
}
log.info("Submitted job {} for {} tables", jobId.get(), claimed.size());
} else {
log.warn(
"Job submission failed for {} SCHEDULING rows; the analyzer's scheduledTimeout will"
+ " detect and overwrite stale rows",
claimed.size());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.linkedin.openhouse.scheduler.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClient;

/**
* Client for the OpenHouse Jobs Service.
*
* <p>Submits one {@code BatchedOrphanFilesDeletionSparkApp} job per bin via {@code POST /jobs}.
*/
@Slf4j
public class JobsServiceClient {

private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Duration TIMEOUT = Duration.ofSeconds(30);

private final WebClient webClient;
private final String clusterId;

public JobsServiceClient(WebClient webClient, String clusterId) {
this.webClient = webClient;
this.clusterId = clusterId;
}

/**
* Submit a batched Spark job for the given tables.
*
* @param jobName human-readable name for the job
* @param jobType operation type string (e.g. "ORPHAN_FILES_DELETION")
* @param tableNames fully-qualified table names (db.table)
* @param operationIds operation UUIDs — parallel to tableNames
* @param resultsEndpoint base URL the Spark app PATCHes results back to
* @return job ID if the submission succeeded, empty if an error occurred
*/
public Optional<String> launch(
String jobName,
String jobType,
List<String> tableNames,
List<String> operationIds,
String resultsEndpoint) {
try {
ObjectNode body = MAPPER.createObjectNode();
body.put("jobName", jobName);
body.put("clusterId", clusterId);

ObjectNode jobConf = body.putObject("jobConf");
jobConf.put("jobType", jobType);

ArrayNode args = jobConf.putArray("args");
args.add("--tableNames");
args.add(String.join(",", tableNames));
args.add("--operationIds");
args.add(String.join(",", operationIds));
args.add("--resultsEndpoint");
args.add(resultsEndpoint);

String responseBody =
webClient
.post()
.uri("/jobs")
.bodyValue(body)
.retrieve()
.bodyToMono(String.class)
.timeout(TIMEOUT)
.block();

String jobId = MAPPER.readTree(responseBody).path("jobId").asText(null);
return Optional.ofNullable(jobId);
} catch (Exception e) {
log.error("Failed to submit job '{}': {}", jobName, e.getMessage());
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.linkedin.openhouse.scheduler.config;

import com.linkedin.openhouse.scheduler.client.JobsServiceClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;

@Configuration
public class SchedulerConfig {

@Value("${jobs.base-uri}")
private String jobsBaseUri;

@Value("${scheduler.cluster-id}")
private String clusterId;

@Bean
public WebClient jobsWebClient() {
return WebClient.builder().baseUrl(jobsBaseUri).build();
}

@Bean
public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) {
return new JobsServiceClient(jobsWebClient, clusterId);
}
}
11 changes: 11 additions & 0 deletions apps/optimizer-scheduler/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
spring.application.name=openhouse-optimizer-scheduler
spring.main.web-application-type=none
spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:schedulerdb;DB_CLOSE_DELAY=-1;MODE=MySQL}
spring.datasource.username=${OPTIMIZER_DB_USER:sa}
spring.datasource.password=${OPTIMIZER_DB_PASSWORD:}
spring.jpa.hibernate.ddl-auto=none
jobs.base-uri=${JOBS_BASE_URI:http://localhost:8002}
scheduler.bin-size-max-files=${SCHEDULER_BIN_SIZE_MAX_FILES:1000000}
scheduler.operation-type=${SCHEDULER_OPERATION_TYPE:ORPHAN_FILES_DELETION}
scheduler.results-endpoint=${SCHEDULER_RESULTS_ENDPOINT:http://openhouse-optimizer:8080/v1/table-operations}
scheduler.cluster-id=${SCHEDULER_CLUSTER_ID:LocalHadoopCluster}
Loading