diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java new file mode 100644 index 000000000..d8ba13b11 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java @@ -0,0 +1,66 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.model.CompleteOperationRequest; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** REST controller for {@code table_operations}. */ +@RestController +@RequestMapping("/v1/table-operations") +@RequiredArgsConstructor +public class TableOperationsController { + + private final OptimizerDataService service; + + /** + * Report that an operation has completed. The backend looks up the operation row, writes a + * history entry with the operation's table metadata and the supplied result. Returns 201 Created + * with the history row, or 404 if the operation does not exist. + */ + @PostMapping("/{id}/complete") + public ResponseEntity completeOperation( + @PathVariable String id, @RequestBody CompleteOperationRequest request) { + return service + .completeOperation(id, request) + .map(dto -> ResponseEntity.status(HttpStatus.CREATED).body(dto)) + .orElse(ResponseEntity.notFound().build()); + } + + /** Fetch a single operation row by its ID, regardless of status. Returns 404 if not found. */ + @GetMapping("/{id}") + public ResponseEntity getTableOperation(@PathVariable String id) { + return service + .getTableOperation(id) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } + + /** + * List operations matching the given filters. All parameters are optional — omit all to return + * every row. + */ + @GetMapping + public ResponseEntity> listTableOperations( + @RequestParam(required = false) OperationType operationType, + @RequestParam(required = false) OperationStatus status, + @RequestParam(required = false) String databaseName, + @RequestParam(required = false) String tableName, + @RequestParam(required = false) String tableUuid) { + return ResponseEntity.ok( + service.listTableOperations(operationType, status, databaseName, tableName, tableUuid)); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java new file mode 100644 index 000000000..11c77a15d --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java @@ -0,0 +1,60 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import java.time.Instant; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** REST controller for {@code table_operations_history}. */ +@RestController +@RequestMapping("/v1/table-operations-history") +@RequiredArgsConstructor +public class TableOperationsHistoryController { + + private final OptimizerDataService service; + + /** Append a completed-job result. Called by the SparkJob after each run (success or failure). */ + @PostMapping + public ResponseEntity appendHistory( + @RequestBody TableOperationsHistoryDto dto) { + return ResponseEntity.status(HttpStatus.CREATED).body(service.appendHistory(dto)); + } + + /** Return the most recent history for a table, newest first, up to {@code limit} rows. */ + @GetMapping("/{tableUuid}") + public ResponseEntity> getHistory( + @PathVariable String tableUuid, @RequestParam(defaultValue = "100") int limit) { + return ResponseEntity.ok(service.getHistory(tableUuid, limit)); + } + + /** + * List history rows matching the given filters, ordered newest first. All parameters are optional + * — omit all to return every row up to {@code limit}. + */ + @GetMapping + public ResponseEntity> listHistory( + @RequestParam(required = false) String databaseName, + @RequestParam(required = false) String tableName, + @RequestParam(required = false) String tableUuid, + @RequestParam(required = false) OperationType operationType, + @RequestParam(required = false) OperationHistoryStatus status, + @RequestParam(required = false) Instant since, + @RequestParam(required = false) Instant until, + @RequestParam(defaultValue = "100") int limit) { + return ResponseEntity.ok( + service.listHistory( + databaseName, tableName, tableUuid, operationType, status, since, until, limit)); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java new file mode 100644 index 000000000..d469586a2 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java @@ -0,0 +1,69 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import java.time.Instant; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** REST controller for managing per-table stats in the optimizer DB. */ +@RestController +@RequestMapping("/v1/table-stats") +@RequiredArgsConstructor +public class TableStatsController { + + private final OptimizerDataService service; + + /** + * Create or overwrite the stats row for {@code tableUuid}. Called by the Tables Service on every + * Iceberg commit. Idempotent. + */ + @PutMapping("/{tableUuid}") + public ResponseEntity upsertTableStats( + @PathVariable String tableUuid, @RequestBody UpsertTableStatsRequest request) { + return ResponseEntity.ok(service.upsertTableStats(tableUuid, request)); + } + + /** Fetch the stats row for {@code tableUuid}. Returns 404 if no stats have been written yet. */ + @GetMapping("/{tableUuid}") + public ResponseEntity getTableStats(@PathVariable String tableUuid) { + return service + .getTableStats(tableUuid) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } + + /** + * List stats rows matching the given filters. All parameters are optional — omit all to return + * every row. + */ + @GetMapping + public ResponseEntity> listTableStats( + @RequestParam(required = false) String databaseId, + @RequestParam(required = false) String tableName, + @RequestParam(required = false) String tableUuid) { + return ResponseEntity.ok(service.listTableStats(databaseId, tableName, tableUuid)); + } + + /** + * Return per-commit stats history for {@code tableUuid}, newest first. Optionally filter by + * {@code since} (inclusive) and cap at {@code limit} rows. + */ + @GetMapping("/{tableUuid}/history") + public ResponseEntity> getStatsHistory( + @PathVariable String tableUuid, + @RequestParam(required = false) Instant since, + @RequestParam(defaultValue = "100") int limit) { + return ResponseEntity.ok(service.getStatsHistory(tableUuid, since, limit)); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java new file mode 100644 index 000000000..ce3120400 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java @@ -0,0 +1,98 @@ +package com.linkedin.openhouse.optimizer.service; + +import com.linkedin.openhouse.optimizer.api.model.CompleteOperationRequest; +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +/** Service interface for optimizer data operations. */ +public interface OptimizerDataService { + + // --- TableOperations --- + + /** + * List operations matching the given filters. Every parameter is optional — pass {@code null} to + * skip that filter. No filters returns all rows. + */ + List listTableOperations( + OperationType operationType, + OperationStatus status, + String databaseName, + String tableName, + String tableUuid); + + /** + * Complete an operation by writing a history entry. Looks up the operation row by {@code id}, + * copies its table metadata into a new history row, and saves it. Returns the history DTO, or + * empty if the operation does not exist. + */ + Optional completeOperation( + String id, CompleteOperationRequest request); + + /** + * Return the operation row for {@code id} regardless of status, or empty if it does not exist. + * Used to poll a specific operation (e.g. waiting for SUCCESS after a Spark job completes). + */ + Optional getTableOperation(String id); + + // --- TableStats --- + + /** + * Create or update the stats row for {@code tableUuid}. Fully idempotent: the same call + * overwrites the previous snapshot with the latest commit values. + */ + TableStatsDto upsertTableStats(String tableUuid, UpsertTableStatsRequest request); + + /** Return the stats row for {@code tableUuid}, or empty if none exists. */ + Optional getTableStats(String tableUuid); + + /** + * List stats rows matching the given filters. Every parameter is optional — pass {@code null} to + * skip that filter. No filters returns all rows. + */ + List listTableStats(String databaseId, String tableName, String tableUuid); + + /** + * Return per-commit stats history for {@code tableUuid}, newest first. + * + * @param tableUuid the stable table UUID + * @param since if non-null, only return rows recorded at or after this instant + * @param limit maximum number of rows to return + */ + List getStatsHistory(String tableUuid, Instant since, int limit); + + // --- TableOperationsHistory --- + + /** Append a completed-job result record. */ + TableOperationsHistoryDto appendHistory(TableOperationsHistoryDto dto); + + /** + * Return the most recent history rows for a table UUID, newest first. + * + * @param tableUuid the stable table UUID + * @param limit maximum number of rows to return + */ + List getHistory(String tableUuid, int limit); + + /** + * List history rows matching the given filters, ordered newest first. Every parameter is optional + * — pass {@code null} to skip that filter. No filters returns all rows up to {@code limit}. + */ + List listHistory( + String databaseName, + String tableName, + String tableUuid, + OperationType operationType, + OperationHistoryStatus status, + Instant since, + Instant until, + int limit); +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java new file mode 100644 index 000000000..629853156 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java @@ -0,0 +1,197 @@ +package com.linkedin.openhouse.optimizer.service; + +import com.linkedin.openhouse.optimizer.api.mapper.OptimizerMapper; +import com.linkedin.openhouse.optimizer.api.model.CompleteOperationRequest; +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; +import com.linkedin.openhouse.optimizer.entity.TableOperationsHistoryRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsHistoryRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsRow; +import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** Implementation of {@link OptimizerDataService}. */ +@Service +@RequiredArgsConstructor +public class OptimizerDataServiceImpl implements OptimizerDataService { + + private final TableOperationsRepository operationsRepository; + private final TableOperationsHistoryRepository historyRepository; + private final TableStatsRepository statsRepository; + private final TableStatsHistoryRepository statsHistoryRepository; + private final OptimizerMapper mapper; + + // --- TableOperations --- + + @Override + public List listTableOperations( + OperationType operationType, + OperationStatus status, + String databaseName, + String tableName, + String tableUuid) { + return operationsRepository.find(operationType, status, databaseName, tableName, tableUuid) + .stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + + @Override + @Transactional + public Optional completeOperation( + String id, CompleteOperationRequest request) { + return operationsRepository + .findById(id) + .map( + row -> { + TableOperationsHistoryRow historyRow = + TableOperationsHistoryRow.builder() + .id(row.getId()) + .tableUuid(row.getTableUuid()) + .databaseName(row.getDatabaseName()) + .tableName(row.getTableName()) + .operationType(row.getOperationType()) + .submittedAt(Instant.now()) + .status(request.getStatus()) + .jobId(row.getJobId()) + .result(request.getResult()) + .build(); + return mapper.toDto(historyRepository.save(historyRow)); + }); + } + + @Override + public Optional getTableOperation(String id) { + return operationsRepository.findById(id).map(mapper::toDto); + } + + // --- TableStats --- + + @Override + @Transactional + public TableStatsDto upsertTableStats(String tableUuid, UpsertTableStatsRequest request) { + Instant now = Instant.now(); + TableStatsRow row = + statsRepository + .findById(tableUuid) + .map( + existing -> + existing + .toBuilder() + .databaseId(request.getDatabaseId()) + .tableName(request.getTableName()) + .stats(request.getStats()) + .tableProperties(request.getTableProperties()) + .updatedAt(now) + .build()) + .orElse( + TableStatsRow.builder() + .tableUuid(tableUuid) + .databaseId(request.getDatabaseId()) + .tableName(request.getTableName()) + .stats(request.getStats()) + .tableProperties(request.getTableProperties()) + .updatedAt(now) + .build()); + TableStatsDto saved = mapper.toDto(statsRepository.save(row)); + + statsHistoryRepository.save( + TableStatsHistoryRow.builder() + .tableUuid(tableUuid) + .databaseId(request.getDatabaseId()) + .tableName(request.getTableName()) + .stats(request.getStats()) + .recordedAt(now) + .build()); + + return saved; + } + + @Override + public Optional getTableStats(String tableUuid) { + return statsRepository.findById(tableUuid).map(mapper::toDto); + } + + @Override + public List listTableStats(String databaseId, String tableName, String tableUuid) { + return statsRepository.find(databaseId, tableName, tableUuid).stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + + @Override + public List getStatsHistory(String tableUuid, Instant since, int limit) { + return statsHistoryRepository.find(tableUuid, since, PageRequest.of(0, limit)).stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + + // --- TableOperationsHistory --- + + @Override + @Transactional + public TableOperationsHistoryDto appendHistory(TableOperationsHistoryDto dto) { + TableOperationsHistoryRow row = + TableOperationsHistoryRow.builder() + .id(dto.getId()) + .tableUuid(dto.getTableUuid()) + .databaseName(dto.getDatabaseName()) + .tableName(dto.getTableName()) + .operationType(dto.getOperationType()) + .submittedAt(dto.getSubmittedAt() != null ? dto.getSubmittedAt() : Instant.now()) + .status(dto.getStatus()) + .jobId(dto.getJobId()) + .result(dto.getResult()) + .build(); + return mapper.toDto(historyRepository.save(row)); + } + + @Override + public List getHistory(String tableUuid, int limit) { + return historyRepository + .find(null, null, tableUuid, null, null, null, null, PageRequest.of(0, limit)).stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } + + @Override + public List listHistory( + String databaseName, + String tableName, + String tableUuid, + OperationType operationType, + OperationHistoryStatus status, + Instant since, + Instant until, + int limit) { + return historyRepository + .find( + databaseName, + tableName, + tableUuid, + operationType, + status, + since, + until, + PageRequest.of(0, limit)) + .stream() + .map(mapper::toDto) + .collect(Collectors.toList()); + } +} diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java new file mode 100644 index 000000000..244acb204 --- /dev/null +++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java @@ -0,0 +1,156 @@ +package com.linkedin.openhouse.optimizer.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.api.model.CompleteOperationRequest; +import com.linkedin.openhouse.optimizer.api.model.JobResult; +import com.linkedin.openhouse.optimizer.api.model.OperationHistoryStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationStatus; +import com.linkedin.openhouse.optimizer.api.model.OperationType; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.TableStats; +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.UpsertTableStatsRequest; +import com.linkedin.openhouse.optimizer.entity.TableOperationsRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsHistoryRow; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.domain.PageRequest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.transaction.annotation.Transactional; + +@SpringBootTest +@ActiveProfiles("test") +@Transactional +class OptimizerDataServiceImplTest { + + @Autowired OptimizerDataService service; + @Autowired TableOperationsRepository operationsRepository; + @Autowired TableStatsRepository statsRepository; + @Autowired TableStatsHistoryRepository statsHistoryRepository; + + // --- completeOperation --- + + @Test + void completeOperation_writesHistoryFromOperationRow() { + String id = UUID.randomUUID().toString(); + String tableUuid = UUID.randomUUID().toString(); + operationsRepository.save( + TableOperationsRow.builder() + .id(id) + .tableUuid(tableUuid) + .databaseName("db1") + .tableName("tbl1") + .operationType(OperationType.ORPHAN_FILES_DELETION) + .status(OperationStatus.SCHEDULED) + .createdAt(Instant.now()) + .scheduledAt(Instant.now()) + .jobId("spark-job-123") + .build()); + + Optional result = + service.completeOperation( + id, CompleteOperationRequest.builder().status(OperationHistoryStatus.SUCCESS).build()); + + assertThat(result).isPresent(); + assertThat(result.get().getStatus()).isEqualTo(OperationHistoryStatus.SUCCESS); + assertThat(result.get().getTableUuid()).isEqualTo(tableUuid); + assertThat(result.get().getJobId()).isEqualTo("spark-job-123"); + assertThat(result.get().getOperationType()).isEqualTo(OperationType.ORPHAN_FILES_DELETION); + assertThat(result.get().getDatabaseName()).isEqualTo("db1"); + assertThat(result.get().getSubmittedAt()).isNotNull(); + } + + @Test + void completeOperation_notFound_returnsEmpty() { + Optional result = + service.completeOperation( + UUID.randomUUID().toString(), + CompleteOperationRequest.builder() + .status(OperationHistoryStatus.FAILED) + .result( + JobResult.builder().errorMessage("boom").errorType("RuntimeException").build()) + .build()); + + assertThat(result).isEmpty(); + } + + // --- upsertTableStats --- + + @Test + void upsertTableStats_createsNewRow() { + String tableUuid = UUID.randomUUID().toString(); + TableStats stats = + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(1024L).build()) + .build(); + + TableStatsDto dto = + service.upsertTableStats( + tableUuid, + UpsertTableStatsRequest.builder() + .databaseId("db1") + .tableName("tbl1") + .stats(stats) + .tableProperties(Map.of("maintenance.optimizer.ofd.enabled", "true")) + .build()); + + assertThat(dto.getTableUuid()).isEqualTo(tableUuid); + assertThat(dto.getDatabaseId()).isEqualTo("db1"); + assertThat(dto.getStats().getSnapshot().getTableSizeBytes()).isEqualTo(1024L); + assertThat(dto.getTableProperties()).containsEntry("maintenance.optimizer.ofd.enabled", "true"); + assertThat(statsRepository.findById(tableUuid)).isPresent(); + } + + @Test + void upsertTableStats_updatesExistingRow_andAppendsHistory() { + String tableUuid = UUID.randomUUID().toString(); + TableStats firstStats = + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(100L).build()) + .delta(TableStats.CommitDelta.builder().numFilesAdded(5L).numFilesDeleted(1L).build()) + .build(); + TableStats secondStats = + TableStats.builder() + .snapshot(TableStats.SnapshotMetrics.builder().tableSizeBytes(200L).build()) + .delta(TableStats.CommitDelta.builder().numFilesAdded(3L).numFilesDeleted(0L).build()) + .build(); + + service.upsertTableStats( + tableUuid, + UpsertTableStatsRequest.builder() + .databaseId("db1") + .tableName("tbl1") + .stats(firstStats) + .build()); + TableStatsDto dto = + service.upsertTableStats( + tableUuid, + UpsertTableStatsRequest.builder() + .databaseId("db1") + .tableName("tbl1") + .stats(secondStats) + .build()); + + // Current row reflects the latest upsert + assertThat(dto.getStats().getSnapshot().getTableSizeBytes()).isEqualTo(200L); + assertThat(statsRepository.findAll()).hasSize(1); + + // History has one row per upsert with the raw delta from each call + List history = + statsHistoryRepository.find(tableUuid, null, PageRequest.of(0, 100)); + assertThat(history).hasSize(2); + // Newest first + assertThat(history.get(0).getStats().getDelta().getNumFilesAdded()).isEqualTo(3L); + assertThat(history.get(1).getStats().getDelta().getNumFilesAdded()).isEqualTo(5L); + } +}