From f7f6812639a9a478d6abe9f003f17464af1f80d0 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Mon, 6 Apr 2026 10:59:15 -0700 Subject: [PATCH 1/4] feat(optimizer): add REST service layer, controllers, and shared module Service interface and implementation for all optimizer CRUD operations including complete-operation lifecycle, stats upsert with history double-write, and filtered queries. Three REST controllers expose the endpoints. The apps/optimizer shared module provides lightweight entity/repo copies for the analyzer and scheduler apps. Co-Authored-By: Claude Opus 4.6 --- apps/optimizer/build.gradle | 13 ++ .../entity/TableOperationHistoryRow.java | 37 ++++ .../optimizer/entity/TableOperationRow.java | 55 +++++ .../optimizer/entity/TableStatsRow.java | 53 +++++ .../openhouse/optimizer/model/TableStats.java | 45 ++++ .../TableOperationHistoryRepository.java | 23 ++ .../repository/TableOperationsRepository.java | 75 +++++++ .../repository/TableStatsRepository.java | 26 +++ .../controller/TableOperationsController.java | 66 ++++++ .../TableOperationsHistoryController.java | 60 ++++++ .../api/controller/TableStatsController.java | 69 ++++++ .../api/model/CompleteOperationRequest.java | 6 - .../service/OptimizerDataService.java | 98 +++++++++ .../service/OptimizerDataServiceImpl.java | 202 ++++++++++++++++++ settings.gradle | 1 + 15 files changed, 823 insertions(+), 6 deletions(-) create mode 100644 apps/optimizer/build.gradle create mode 100644 apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationHistoryRow.java create mode 100644 apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationRow.java create mode 100644 apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java create mode 100644 apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStats.java create mode 100644 apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationHistoryRepository.java create mode 100644 apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java create mode 100644 apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepository.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java create mode 100644 services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java diff --git a/apps/optimizer/build.gradle b/apps/optimizer/build.gradle new file mode 100644 index 000000000..f14969274 --- /dev/null +++ b/apps/optimizer/build.gradle @@ -0,0 +1,13 @@ +plugins { + id 'openhouse.java-minimal-conventions' +} + +// Avoid build-directory collision with services:optimizer (same project.name 'optimizer'). +buildDir = "${rootProject.buildDir}/apps-optimizer" + +dependencies { + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + implementation 'com.vladmihalcea:hibernate-types-55:2.21.1' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' + testRuntimeOnly 'com.h2database:h2' +} diff --git a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationHistoryRow.java b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationHistoryRow.java new file mode 100644 index 000000000..4e638e2e1 --- /dev/null +++ b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationHistoryRow.java @@ -0,0 +1,37 @@ +package com.linkedin.openhouse.optimizer.entity; + +import java.time.Instant; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** Lightweight JPA entity for reading {@code table_operations_history} rows. */ +@Entity +@Table(name = "table_operations_history") +@Getter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableOperationHistoryRow { + + @Id + @Column(name = "id", nullable = false, length = 36) + private String id; + + @Column(name = "table_uuid", nullable = false, length = 36) + private String tableUuid; + + @Column(name = "operation_type", nullable = false, length = 50) + private String operationType; + + @Column(name = "submitted_at", nullable = false) + private Instant submittedAt; + + @Column(name = "status", nullable = false, length = 20) + private String status; +} diff --git a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationRow.java b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationRow.java new file mode 100644 index 000000000..fc0104604 --- /dev/null +++ b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationRow.java @@ -0,0 +1,55 @@ +package com.linkedin.openhouse.optimizer.entity; + +import java.time.Instant; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** JPA entity mapping to the {@code table_operations} table in the optimizer DB. */ +@Entity +@Table(name = "table_operations") +@Getter +@Setter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableOperationRow { + + @Id + @Column(name = "id", nullable = false, length = 36) + private String id; + + @Column(name = "table_uuid", nullable = false, length = 36) + private String tableUuid; + + @Column(name = "database_name", nullable = false, length = 255) + private String databaseName; + + @Column(name = "table_name", nullable = false, length = 255) + private String tableName; + + @Column(name = "operation_type", nullable = false, length = 50) + private String operationType; + + @Column(name = "status", nullable = false, length = 20) + private String status; + + @Column(name = "created_at") + private Instant createdAt; + + @Column(name = "scheduled_at") + private Instant scheduledAt; + + @Column(name = "job_id", length = 255) + private String jobId; + + /** Plain version column — not managed by JPA optimistic locking. */ + @Column(name = "version") + private Long version; +} diff --git a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java new file mode 100644 index 000000000..5cdf16a97 --- /dev/null +++ b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java @@ -0,0 +1,53 @@ +package com.linkedin.openhouse.optimizer.entity; + +import com.linkedin.openhouse.optimizer.model.TableStats; +import com.vladmihalcea.hibernate.type.json.JsonStringType; +import java.time.Instant; +import java.util.Map; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.hibernate.annotations.Type; +import org.hibernate.annotations.TypeDef; + +/** + * JPA entity for the optimizer {@code table_stats} table. Written by the Tables Service on every + * Iceberg commit; read by the Analyzer and Scheduler directly via JPA. + */ +@TypeDef(name = "json", typeClass = JsonStringType.class) +@Entity +@Table(name = "table_stats") +@Getter +@Setter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableStatsRow { + + @Id + @Column(name = "table_uuid", nullable = false, length = 36) + private String tableUuid; + + @Column(name = "database_id", nullable = false, length = 255) + private String databaseId; + + @Column(name = "table_name", nullable = false, length = 255) + private String tableName; + + @Type(type = "json") + @Column(name = "stats", columnDefinition = "TEXT") + private TableStats stats; + + @Type(type = "json") + @Column(name = "table_properties", columnDefinition = "TEXT") + private Map tableProperties; + + @Column(name = "updated_at", nullable = false) + private Instant updatedAt; +} diff --git a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStats.java b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStats.java new file mode 100644 index 000000000..5e0f51468 --- /dev/null +++ b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStats.java @@ -0,0 +1,45 @@ +package com.linkedin.openhouse.optimizer.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Combined stats payload stored as a single JSON blob per table in {@code table_stats}. */ +@Data +@Builder(toBuilder = true) +@NoArgsConstructor +@AllArgsConstructor +public class TableStats { + + /** Snapshot fields — overwritten on every upsert. */ + private SnapshotMetrics snapshot; + + /** Delta fields — accumulated across commit events. */ + private CommitDelta delta; + + /** Point-in-time metadata read from Iceberg at scan time. */ + @Data + @Builder(toBuilder = true) + @NoArgsConstructor + @AllArgsConstructor + public static class SnapshotMetrics { + private String clusterId; + private String tableVersion; + private String tableLocation; + private Long tableSizeBytes; + /** Total number of data files as of the latest snapshot — used for bin-packing. */ + private Long numCurrentFiles; + } + + /** Per-commit incremental counters accumulated across all recorded commit events. */ + @Data + @Builder(toBuilder = true) + @NoArgsConstructor + @AllArgsConstructor + public static class CommitDelta { + private Long numFilesAdded; + private Long numFilesDeleted; + private Long deletedSizeBytes; + } +} diff --git a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationHistoryRepository.java b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationHistoryRepository.java new file mode 100644 index 000000000..a9434b4b7 --- /dev/null +++ b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationHistoryRepository.java @@ -0,0 +1,23 @@ +package com.linkedin.openhouse.optimizer.repository; + +import com.linkedin.openhouse.optimizer.entity.TableOperationHistoryRow; +import java.util.List; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +/** Repository for reading {@code table_operations_history} in the Analyzer. */ +public interface TableOperationHistoryRepository + extends JpaRepository { + + /** + * Returns all history rows for an operation type, newest first. Loaded once per analysis run and + * grouped in memory by {@code tableUuid} to eliminate per-table N+1 queries in the circuit + * breaker check. + */ + @Query( + "SELECT r FROM TableOperationHistoryRow r " + + "WHERE r.operationType = :opType " + + "ORDER BY r.submittedAt DESC") + List findAllByOperationType(@Param("opType") String operationType); +} diff --git a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java new file mode 100644 index 000000000..404aaf873 --- /dev/null +++ b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java @@ -0,0 +1,75 @@ +package com.linkedin.openhouse.optimizer.repository; + +import com.linkedin.openhouse.optimizer.entity.TableOperationRow; +import java.time.Instant; +import java.util.Collection; +import java.util.List; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +/** Spring Data JPA repository for {@code table_operations} rows in the optimizer DB. */ +public interface TableOperationsRepository extends JpaRepository { + + /** + * Returns rows for the given operation type whose status is in {@code statuses}. Used by the + * Scheduler to load all PENDING rows in one query. + */ + @Query( + "SELECT r FROM TableOperationRow r WHERE r.operationType = :type" + + " AND r.status IN :statuses") + List findByTypeAndStatuses( + @Param("type") String operationType, @Param("statuses") Collection statuses); + + /** + * Returns all rows for the given operation type regardless of status. Used by the Analyzer to + * find the most recent row per table_uuid for scheduling decisions. + */ + @Query("SELECT r FROM TableOperationRow r WHERE r.operationType = :type") + List findByType(@Param("type") String operationType); + + /** + * Cancel older duplicate PENDING rows for the same (table_uuid, operation_type), keeping only the + * row identified by {@code keepId}. Called by the Scheduler before claiming to prevent duplicate + * job submissions from concurrent Analyzer runs. + * + * @return the number of rows marked CANCELED + */ + @Modifying + @Query( + "UPDATE TableOperationRow r SET r.status = 'CANCELED' " + + "WHERE r.tableUuid = :tableUuid AND r.operationType = :opType " + + "AND r.status = 'PENDING' AND r.id != :keepId") + int cancelDuplicatePending( + @Param("tableUuid") String tableUuid, + @Param("opType") String operationType, + @Param("keepId") String keepId); + + /** + * Atomically claim a PENDING row by flipping its status to SCHEDULING. + * + *

The {@code version} guard prevents double-scheduling when multiple scheduler instances run + * concurrently. Returns 1 if the claim succeeded, 0 if the row was already claimed by another + * instance. + */ + @Modifying(flushAutomatically = true, clearAutomatically = true) + @Query( + "UPDATE TableOperationRow r SET r.status = 'SCHEDULING', r.scheduledAt = :now," + + " r.version = r.version + 1 WHERE r.id = :id AND r.version = :version") + int markScheduling( + @Param("id") String id, @Param("version") Long version, @Param("now") Instant now); + + /** + * Transition a SCHEDULING row to SCHEDULED after the Jobs Service returns a job ID. + * + * @return 1 if updated, 0 if not found or wrong version/status + */ + @Modifying(flushAutomatically = true, clearAutomatically = true) + @Query( + "UPDATE TableOperationRow r SET r.status = 'SCHEDULED', r.jobId = :jobId," + + " r.version = r.version + 1" + + " WHERE r.id = :id AND r.version = :version AND r.status = 'SCHEDULING'") + int markScheduled( + @Param("id") String id, @Param("version") Long version, @Param("jobId") String jobId); +} diff --git a/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepository.java b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepository.java new file mode 100644 index 000000000..3c0ef40b8 --- /dev/null +++ b/apps/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepository.java @@ -0,0 +1,26 @@ +package com.linkedin.openhouse.optimizer.repository; + +import com.linkedin.openhouse.optimizer.entity.TableStatsRow; +import java.util.stream.Stream; +import javax.persistence.QueryHint; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.jpa.repository.QueryHints; + +/** Spring Data JPA repository for {@code table_stats} rows in the optimizer DB. */ +public interface TableStatsRepository extends JpaRepository { + + /** + * Streams all rows as a JDBC cursor rather than buffering them in memory. The caller must consume + * the stream inside an active {@code @Transactional} method and close it when done. + * + *

{@code Integer.MIN_VALUE} is MySQL Connector/J's signal to enable row-by-row streaming + * instead of loading the full result set into the driver buffer. + */ + @Query("SELECT r FROM TableStatsRow r") + @QueryHints( + @QueryHint( + name = org.hibernate.jpa.QueryHints.HINT_FETCH_SIZE, + value = "" + Integer.MIN_VALUE)) + Stream streamAll(); +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java new file mode 100644 index 000000000..d8ba13b11 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java @@ -0,0 +1,66 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.model.CompleteOperationRequest; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** REST controller for {@code table_operations}. */ +@RestController +@RequestMapping("/v1/table-operations") +@RequiredArgsConstructor +public class TableOperationsController { + + private final OptimizerDataService service; + + /** + * Report that an operation has completed. The backend looks up the operation row, writes a + * history entry with the operation's table metadata and the supplied result. Returns 201 Created + * with the history row, or 404 if the operation does not exist. + */ + @PostMapping("/{id}/complete") + public ResponseEntity completeOperation( + @PathVariable String id, @RequestBody CompleteOperationRequest request) { + return service + .completeOperation(id, request) + .map(dto -> ResponseEntity.status(HttpStatus.CREATED).body(dto)) + .orElse(ResponseEntity.notFound().build()); + } + + /** Fetch a single operation row by its ID, regardless of status. Returns 404 if not found. */ + @GetMapping("/{id}") + public ResponseEntity getTableOperation(@PathVariable String id) { + return service + .getTableOperation(id) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } + + /** + * List operations matching the given filters. All parameters are optional — omit all to return + * every row. + */ + @GetMapping + public ResponseEntity> listTableOperations( + @RequestParam(required = false) OperationType operationType, + @RequestParam(required = false) OperationStatus status, + @RequestParam(required = false) String databaseName, + @RequestParam(required = false) String tableName, + @RequestParam(required = false) String tableUuid) { + return ResponseEntity.ok( + service.listTableOperations(operationType, status, databaseName, tableName, tableUuid)); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java new file mode 100644 index 000000000..11c77a15d --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java @@ -0,0 +1,60 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import java.time.Instant; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** REST controller for {@code table_operations_history}. */ +@RestController +@RequestMapping("/v1/table-operations-history") +@RequiredArgsConstructor +public class TableOperationsHistoryController { + + private final OptimizerDataService service; + + /** Append a completed-job result. Called by the SparkJob after each run (success or failure). */ + @PostMapping + public ResponseEntity appendHistory( + @RequestBody TableOperationsHistoryDto dto) { + return ResponseEntity.status(HttpStatus.CREATED).body(service.appendHistory(dto)); + } + + /** Return the most recent history for a table, newest first, up to {@code limit} rows. */ + @GetMapping("/{tableUuid}") + public ResponseEntity> getHistory( + @PathVariable String tableUuid, @RequestParam(defaultValue = "100") int limit) { + return ResponseEntity.ok(service.getHistory(tableUuid, limit)); + } + + /** + * List history rows matching the given filters, ordered newest first. All parameters are optional + * — omit all to return every row up to {@code limit}. + */ + @GetMapping + public ResponseEntity> listHistory( + @RequestParam(required = false) String databaseName, + @RequestParam(required = false) String tableName, + @RequestParam(required = false) String tableUuid, + @RequestParam(required = false) OperationType operationType, + @RequestParam(required = false) OperationHistoryStatus status, + @RequestParam(required = false) Instant since, + @RequestParam(required = false) Instant until, + @RequestParam(defaultValue = "100") int limit) { + return ResponseEntity.ok( + service.listHistory( + databaseName, tableName, tableUuid, operationType, status, since, until, limit)); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java new file mode 100644 index 000000000..d469586a2 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java @@ -0,0 +1,69 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import java.time.Instant; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** REST controller for managing per-table stats in the optimizer DB. */ +@RestController +@RequestMapping("/v1/table-stats") +@RequiredArgsConstructor +public class TableStatsController { + + private final OptimizerDataService service; + + /** + * Create or overwrite the stats row for {@code tableUuid}. Called by the Tables Service on every + * Iceberg commit. Idempotent. + */ + @PutMapping("/{tableUuid}") + public ResponseEntity upsertTableStats( + @PathVariable String tableUuid, @RequestBody UpsertTableStatsRequest request) { + return ResponseEntity.ok(service.upsertTableStats(tableUuid, request)); + } + + /** Fetch the stats row for {@code tableUuid}. Returns 404 if no stats have been written yet. */ + @GetMapping("/{tableUuid}") + public ResponseEntity getTableStats(@PathVariable String tableUuid) { + return service + .getTableStats(tableUuid) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } + + /** + * List stats rows matching the given filters. All parameters are optional — omit all to return + * every row. + */ + @GetMapping + public ResponseEntity> listTableStats( + @RequestParam(required = false) String databaseId, + @RequestParam(required = false) String tableName, + @RequestParam(required = false) String tableUuid) { + return ResponseEntity.ok(service.listTableStats(databaseId, tableName, tableUuid)); + } + + /** + * Return per-commit stats history for {@code tableUuid}, newest first. Optionally filter by + * {@code since} (inclusive) and cap at {@code limit} rows. + */ + @GetMapping("/{tableUuid}/history") + public ResponseEntity> getStatsHistory( + @PathVariable String tableUuid, + @RequestParam(required = false) Instant since, + @RequestParam(defaultValue = "100") int limit) { + return ResponseEntity.ok(service.getStatsHistory(tableUuid, since, limit)); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java index c26893197..35f7ba782 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java @@ -22,10 +22,4 @@ public class CompleteOperationRequest { /** Error details on failure; {@code null} on success. */ private JobResult result; - - /** Number of orphan files deleted; set by OFD Spark app on success. */ - private Integer orphanFilesDeleted; - - /** Bytes reclaimed by orphan file deletion; set by OFD Spark app on success. */ - private Long orphanBytesDeleted; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java new file mode 100644 index 000000000..ce3120400 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java @@ -0,0 +1,98 @@ +package com.linkedin.openhouse.optimizer.service; + +import com.linkedin.openhouse.optimizer.api.model.CompleteOperationRequest; +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +/** Service interface for optimizer data operations. */ +public interface OptimizerDataService { + + // --- TableOperations --- + + /** + * List operations matching the given filters. Every parameter is optional — pass {@code null} to + * skip that filter. No filters returns all rows. + */ + List listTableOperations( + OperationType operationType, + OperationStatus status, + String databaseName, + String tableName, + String tableUuid); + + /** + * Complete an operation by writing a history entry. Looks up the operation row by {@code id}, + * copies its table metadata into a new history row, and saves it. Returns the history DTO, or + * empty if the operation does not exist. + */ + Optional completeOperation( + String id, CompleteOperationRequest request); + + /** + * Return the operation row for {@code id} regardless of status, or empty if it does not exist. + * Used to poll a specific operation (e.g. waiting for SUCCESS after a Spark job completes). + */ + Optional getTableOperation(String id); + + // --- TableStats --- + + /** + * Create or update the stats row for {@code tableUuid}. Fully idempotent: the same call + * overwrites the previous snapshot with the latest commit values. + */ + TableStatsDto upsertTableStats(String tableUuid, UpsertTableStatsRequest request); + + /** Return the stats row for {@code tableUuid}, or empty if none exists. */ + Optional getTableStats(String tableUuid); + + /** + * List stats rows matching the given filters. Every parameter is optional — pass {@code null} to + * skip that filter. No filters returns all rows. + */ + List listTableStats(String databaseId, String tableName, String tableUuid); + + /** + * Return per-commit stats history for {@code tableUuid}, newest first. + * + * @param tableUuid the stable table UUID + * @param since if non-null, only return rows recorded at or after this instant + * @param limit maximum number of rows to return + */ + List getStatsHistory(String tableUuid, Instant since, int limit); + + // --- TableOperationsHistory --- + + /** Append a completed-job result record. */ + TableOperationsHistoryDto appendHistory(TableOperationsHistoryDto dto); + + /** + * Return the most recent history rows for a table UUID, newest first. + * + * @param tableUuid the stable table UUID + * @param limit maximum number of rows to return + */ + List getHistory(String tableUuid, int limit); + + /** + * List history rows matching the given filters, ordered newest first. Every parameter is optional + * — pass {@code null} to skip that filter. No filters returns all rows up to {@code limit}. + */ + List listHistory( + String databaseName, + String tableName, + String tableUuid, + OperationType operationType, + OperationHistoryStatus status, + Instant since, + Instant until, + int limit); +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java new file mode 100644 index 000000000..dbc5f466b --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java @@ -0,0 +1,202 @@ +package com.linkedin.openhouse.optimizer.service; + +import com.linkedin.openhouse.optimizer.api.mapper.OptimizerMapper; +import com.linkedin.openhouse.optimizer.api.model.CompleteOperationRequest; +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; +import com.linkedin.openhouse.optimizer.entity.TableOperationsHistoryRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsHistoryRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsRow; +import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** Implementation of {@link OptimizerDataService}. */ +@Service +@RequiredArgsConstructor +public class OptimizerDataServiceImpl implements OptimizerDataService { + + private final TableOperationsRepository operationsRepository; + private final TableOperationsHistoryRepository historyRepository; + private final TableStatsRepository statsRepository; + private final TableStatsHistoryRepository statsHistoryRepository; + private final OptimizerMapper mapper; + + // --- TableOperations --- + + @Override + public List listTableOperations( + OperationType operationType, + OperationStatus status, + String databaseName, + String tableName, + String tableUuid) { + return operationsRepository + .findFiltered(operationType, status, databaseName, tableName, tableUuid).stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + + @Override + @Transactional + public Optional completeOperation( + String id, CompleteOperationRequest request) { + return operationsRepository + .findById(id) + .map( + row -> { + TableOperationsHistoryRow historyRow = + TableOperationsHistoryRow.builder() + .id(row.getId()) + .tableUuid(row.getTableUuid()) + .databaseName(row.getDatabaseName()) + .tableName(row.getTableName()) + .operationType(row.getOperationType()) + .submittedAt(Instant.now()) + .status(request.getStatus()) + .jobId(row.getJobId()) + .result(request.getResult()) + .build(); + return mapper.toDto(historyRepository.save(historyRow)); + }); + } + + @Override + public Optional getTableOperation(String id) { + return operationsRepository.findById(id).map(mapper::toDto); + } + + // --- TableStats --- + + @Override + @Transactional + public TableStatsDto upsertTableStats(String tableUuid, UpsertTableStatsRequest request) { + Instant now = Instant.now(); + TableStatsRow row = + statsRepository + .findById(tableUuid) + .map( + existing -> + existing + .toBuilder() + .databaseId(request.getDatabaseId()) + .tableName(request.getTableName()) + .stats(request.getStats()) + .tableProperties(request.getTableProperties()) + .updatedAt(now) + .build()) + .orElse( + TableStatsRow.builder() + .tableUuid(tableUuid) + .databaseId(request.getDatabaseId()) + .tableName(request.getTableName()) + .stats(request.getStats()) + .tableProperties(request.getTableProperties()) + .updatedAt(now) + .build()); + TableStatsDto saved = mapper.toDto(statsRepository.save(row)); + + statsHistoryRepository.save( + TableStatsHistoryRow.builder() + .tableUuid(tableUuid) + .databaseId(request.getDatabaseId()) + .tableName(request.getTableName()) + .stats(request.getStats()) + .recordedAt(now) + .build()); + + return saved; + } + + @Override + public Optional getTableStats(String tableUuid) { + return statsRepository.findById(tableUuid).map(mapper::toDto); + } + + @Override + public List listTableStats(String databaseId, String tableName, String tableUuid) { + return statsRepository.findFiltered(databaseId, tableName, tableUuid).stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + + @Override + public List getStatsHistory(String tableUuid, Instant since, int limit) { + PageRequest page = PageRequest.of(0, limit); + if (since != null) { + return statsHistoryRepository.findByTableUuidSince(tableUuid, since, page).stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + return statsHistoryRepository.findByTableUuid(tableUuid, page).stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + + // --- TableOperationsHistory --- + + @Override + @Transactional + public TableOperationsHistoryDto appendHistory(TableOperationsHistoryDto dto) { + TableOperationsHistoryRow row = + TableOperationsHistoryRow.builder() + .id(dto.getId()) + .tableUuid(dto.getTableUuid()) + .databaseName(dto.getDatabaseName()) + .tableName(dto.getTableName()) + .operationType(dto.getOperationType()) + .submittedAt(dto.getSubmittedAt() != null ? dto.getSubmittedAt() : Instant.now()) + .status(dto.getStatus()) + .jobId(dto.getJobId()) + .result(dto.getResult()) + .build(); + return mapper.toDto(historyRepository.save(row)); + } + + @Override + public List getHistory(String tableUuid, int limit) { + return historyRepository.find(tableUuid, limit).stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + + @Override + public List listHistory( + String databaseName, + String tableName, + String tableUuid, + OperationType operationType, + OperationHistoryStatus status, + Instant since, + Instant until, + int limit) { + return historyRepository + .findFiltered( + databaseName, + tableName, + tableUuid, + operationType, + status, + since, + until, + PageRequest.of(0, limit)) + .stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } +} diff --git a/settings.gradle b/settings.gradle index cad06785e..0d64dad53 100644 --- a/settings.gradle +++ b/settings.gradle @@ -50,6 +50,7 @@ include ':services:common' include ':services:housetables' include ':services:jobs' include ':services:optimizer' +include ':apps:optimizer' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5' From ef3260f9303a692f218f9d72985c42da421da5d3 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Mon, 6 Apr 2026 11:37:07 -0700 Subject: [PATCH 2/4] fix: update service impl to use consolidated find methods Align OptimizerDataServiceImpl with renamed repository methods from optimizer-1 review feedback. Co-Authored-By: Claude Opus 4.6 --- .../service/OptimizerDataServiceImpl.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java index dbc5f466b..629853156 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java @@ -46,8 +46,8 @@ public List listTableOperations( String databaseName, String tableName, String tableUuid) { - return operationsRepository - .findFiltered(operationType, status, databaseName, tableName, tableUuid).stream() + return operationsRepository.find(operationType, status, databaseName, tableName, tableUuid) + .stream() .map(mapper::toDto) .collect(Collectors.toList()); } @@ -130,20 +130,14 @@ public Optional getTableStats(String tableUuid) { @Override public List listTableStats(String databaseId, String tableName, String tableUuid) { - return statsRepository.findFiltered(databaseId, tableName, tableUuid).stream() + return statsRepository.find(databaseId, tableName, tableUuid).stream() .map(mapper::toDto) .collect(Collectors.toList()); } @Override public List getStatsHistory(String tableUuid, Instant since, int limit) { - PageRequest page = PageRequest.of(0, limit); - if (since != null) { - return statsHistoryRepository.findByTableUuidSince(tableUuid, since, page).stream() - .map(mapper::toDto) - .collect(Collectors.toList()); - } - return statsHistoryRepository.findByTableUuid(tableUuid, page).stream() + return statsHistoryRepository.find(tableUuid, since, PageRequest.of(0, limit)).stream() .map(mapper::toDto) .collect(Collectors.toList()); } @@ -170,7 +164,8 @@ public TableOperationsHistoryDto appendHistory(TableOperationsHistoryDto dto) { @Override public List getHistory(String tableUuid, int limit) { - return historyRepository.find(tableUuid, limit).stream() + return historyRepository + .find(null, null, tableUuid, null, null, null, null, PageRequest.of(0, limit)).stream() .map(mapper::toDto) .collect(Collectors.toList()); } @@ -186,7 +181,7 @@ public List listHistory( Instant until, int limit) { return historyRepository - .findFiltered( + .find( databaseName, tableName, tableUuid, From 01466c70cd4f7ad4f56db31897e23f681512a31a Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Mon, 6 Apr 2026 12:34:29 -0700 Subject: [PATCH 3/4] feat(optimizer): add service-layer integration tests H2 integration tests for OptimizerDataServiceImpl covering completeOperation (write history, not-found) and upsertTableStats (create, update, history append). Co-Authored-By: Claude Opus 4.6 --- .../service/OptimizerDataServiceImplTest.java | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java new file mode 100644 index 000000000..6e3194018 --- /dev/null +++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java @@ -0,0 +1,159 @@ +package com.linkedin.openhouse.optimizer.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.api.model.CompleteOperationRequest; +import com.linkedin.openhouse.optimizer.api.model.JobResult; +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.TableStats; +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; +import com.linkedin.openhouse.optimizer.entity.TableOperationsRow; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.domain.PageRequest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.transaction.annotation.Transactional; + +@SpringBootTest +@ActiveProfiles("test") +@Transactional +class OptimizerDataServiceImplTest { + + @Autowired OptimizerDataService service; + @Autowired TableOperationsRepository operationsRepository; + @Autowired TableStatsRepository statsRepository; + @Autowired TableStatsHistoryRepository statsHistoryRepository; + + // --- completeOperation --- + + @Test + void completeOperation_writesHistoryFromOperationRow() { + String id = UUID.randomUUID().toString(); + String tableUuid = UUID.randomUUID().toString(); + operationsRepository.save( + TableOperationsRow.builder() + .id(id) + .tableUuid(tableUuid) + .databaseName("db1") + .tableName("tbl1") + .operationType(OperationType.ORPHAN_FILES_DELETION) + .status(OperationStatus.SCHEDULED) + .createdAt(Instant.now()) + .scheduledAt(Instant.now()) + .jobId("spark-job-123") + .build()); + + Optional result = + service.completeOperation( + id, CompleteOperationRequest.builder().status(OperationHistoryStatus.SUCCESS).build()); + + assertThat(result).isPresent(); + assertThat(result.get().getStatus()).isEqualTo(OperationHistoryStatus.SUCCESS); + assertThat(result.get().getTableUuid()).isEqualTo(tableUuid); + assertThat(result.get().getJobId()).isEqualTo("spark-job-123"); + assertThat(result.get().getOperationType()).isEqualTo(OperationType.ORPHAN_FILES_DELETION); + assertThat(result.get().getDatabaseName()).isEqualTo("db1"); + assertThat(result.get().getSubmittedAt()).isNotNull(); + } + + @Test + void completeOperation_notFound_returnsEmpty() { + Optional result = + service.completeOperation( + UUID.randomUUID().toString(), + CompleteOperationRequest.builder() + .status(OperationHistoryStatus.FAILED) + .result( + JobResult.builder().errorMessage("boom").errorType("RuntimeException").build()) + .build()); + + assertThat(result).isEmpty(); + } + + // --- upsertTableStats --- + + @Test + void upsertTableStats_createsNewRow() { + String tableUuid = UUID.randomUUID().toString(); + TableStats stats = + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(1024L).build()) + .build(); + + TableStatsDto dto = + service.upsertTableStats( + tableUuid, + UpsertTableStatsRequest.builder() + .databaseId("db1") + .tableName("tbl1") + .stats(stats) + .tableProperties(Map.of("maintenance.optimizer.ofd.enabled", "true")) + .build()); + + assertThat(dto.getTableUuid()).isEqualTo(tableUuid); + assertThat(dto.getDatabaseId()).isEqualTo("db1"); + assertThat(dto.getStats().getSnapshot().getTableSizeBytes()).isEqualTo(1024L); + assertThat(dto.getTableProperties()).containsEntry("maintenance.optimizer.ofd.enabled", "true"); + assertThat(statsRepository.findById(tableUuid)).isPresent(); + } + + @Test + void upsertTableStats_updatesExistingRow() { + String tableUuid = UUID.randomUUID().toString(); + UpsertTableStatsRequest first = + UpsertTableStatsRequest.builder() + .databaseId("db1") + .tableName("tbl1") + .stats( + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(100L).build()) + .build()) + .build(); + UpsertTableStatsRequest second = + UpsertTableStatsRequest.builder() + .databaseId("db1") + .tableName("tbl1") + .stats( + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(200L).build()) + .build()) + .build(); + + service.upsertTableStats(tableUuid, first); + TableStatsDto dto = service.upsertTableStats(tableUuid, second); + + assertThat(dto.getStats().getSnapshot().getTableSizeBytes()).isEqualTo(200L); + assertThat(statsRepository.findAll()).hasSize(1); + } + + @Test + void upsertTableStats_appendsHistoryOnEveryCall() { + String tableUuid = UUID.randomUUID().toString(); + UpsertTableStatsRequest request = + UpsertTableStatsRequest.builder() + .databaseId("db1") + .tableName("tbl1") + .stats( + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(100L).build()) + .build()) + .build(); + + service.upsertTableStats(tableUuid, request); + service.upsertTableStats(tableUuid, request); + + assertThat(statsHistoryRepository.find(tableUuid, null, PageRequest.of(0, 100))).hasSize(2); + } +} From ff07fde3cbfc8dd0cb2c2fde49748dc84ee6734c Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Mon, 6 Apr 2026 12:43:44 -0700 Subject: [PATCH 4/4] fix: assert stats history delta values in upsert test Strengthen upsertTableStats test to verify history rows contain the raw delta stats from each call, not just the row count. Co-Authored-By: Claude Opus 4.6 --- .../service/OptimizerDataServiceImplTest.java | 69 +++++++++---------- 1 file changed, 33 insertions(+), 36 deletions(-) diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java index 6e3194018..244acb204 100644 --- a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java +++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java @@ -12,10 +12,12 @@ import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; import com.linkedin.openhouse.optimizer.entity.TableOperationsRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsHistoryRow; import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; import com.linkedin.openhouse.optimizer.repository.TableStatsHistoryRepository; import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -110,50 +112,45 @@ void upsertTableStats_createsNewRow() { } @Test - void upsertTableStats_updatesExistingRow() { + void upsertTableStats_updatesExistingRow_andAppendsHistory() { String tableUuid = UUID.randomUUID().toString(); - UpsertTableStatsRequest first = - UpsertTableStatsRequest.builder() - .databaseId("db1") - .tableName("tbl1") - .stats( - TableStats.builder() - .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(100L).build()) - .build()) + TableStats firstStats = + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(100L).build()) + .delta(TableStats.CommitDelta.builder().numFilesAdded(5L).numFilesDeleted(1L).build()) .build(); - UpsertTableStatsRequest second = - UpsertTableStatsRequest.builder() - .databaseId("db1") - .tableName("tbl1") - .stats( - TableStats.builder() - .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(200L).build()) - .build()) + TableStats secondStats = + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(200L).build()) + .delta(TableStats.CommitDelta.builder().numFilesAdded(3L).numFilesDeleted(0L).build()) .build(); - service.upsertTableStats(tableUuid, first); - TableStatsDto dto = service.upsertTableStats(tableUuid, second); - - assertThat(dto.getStats().getSnapshot().getTableSizeBytes()).isEqualTo(200L); - assertThat(statsRepository.findAll()).hasSize(1); - } - - @Test - void upsertTableStats_appendsHistoryOnEveryCall() { - String tableUuid = UUID.randomUUID().toString(); - UpsertTableStatsRequest request = + service.upsertTableStats( + tableUuid, UpsertTableStatsRequest.builder() .databaseId("db1") .tableName("tbl1") - .stats( - TableStats.builder() - .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(100L).build()) - .build()) - .build(); + .stats(firstStats) + .build()); + TableStatsDto dto = + service.upsertTableStats( + tableUuid, + UpsertTableStatsRequest.builder() + .databaseId("db1") + .tableName("tbl1") + .stats(secondStats) + .build()); - service.upsertTableStats(tableUuid, request); - service.upsertTableStats(tableUuid, request); + // Current row reflects the latest upsert + assertThat(dto.getStats().getSnapshot().getTableSizeBytes()).isEqualTo(200L); + assertThat(statsRepository.findAll()).hasSize(1); - assertThat(statsHistoryRepository.find(tableUuid, null, PageRequest.of(0, 100))).hasSize(2); + // History has one row per upsert with the raw delta from each call + List history = + statsHistoryRepository.find(tableUuid, null, PageRequest.of(0, 100)); + assertThat(history).hasSize(2); + // Newest first + assertThat(history.get(0).getStats().getDelta().getNumFilesAdded()).isEqualTo(3L); + assertThat(history.get(1).getStats().getDelta().getNumFilesAdded()).isEqualTo(5L); } }