diff --git a/build.gradle b/build.gradle index f9cd741f2..f7cdd71a8 100644 --- a/build.gradle +++ b/build.gradle @@ -177,6 +177,7 @@ tasks.register('CopyGitHooksTask', Copy) { // tables-service.Dockerfile -> :services:tables:bootJar // housetables-service.Dockerfile -> :services:housetables:bootJar // jobs-service.Dockerfile -> :services:jobs:bootJar +// optimizer-service.Dockerfile -> :services:optimizer:bootJar // jobs-scheduler.Dockerfile -> :apps:openhouse-spark-apps_2.12:shadowJar (uber JAR) // spark-base-hadoop2.8.dockerfile -> // :integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar (uber JAR) @@ -196,6 +197,7 @@ tasks.register('dockerPrereqs') { dependsOn ':services:tables:bootJar' dependsOn ':services:housetables:bootJar' dependsOn ':services:jobs:bootJar' + dependsOn ':services:optimizer:bootJar' // Spark runtime uber JARs (shadowJar) dependsOn ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar' @@ -219,6 +221,7 @@ tasks.register('dockerPrereqs') { println ' build/tables/libs/tables.jar' println ' build/housetables/libs/housetables.jar' println ' build/jobs/libs/jobs.jar' + println ' build/optimizer/libs/optimizer.jar' println ' build/openhouse-spark-runtime_2.12/libs/openhouse-spark-runtime_2.12-uber.jar' println ' build/openhouse-spark-3.5-runtime_2.12/libs/openhouse-spark-3.5-runtime_2.12-uber.jar' println ' build/openhouse-spark-apps_2.12/libs/openhouse-spark-apps_2.12-uber.jar' diff --git a/services/optimizer/build.gradle b/services/optimizer/build.gradle new file mode 100644 index 000000000..c05c7f9c3 --- /dev/null +++ b/services/optimizer/build.gradle @@ -0,0 +1,17 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'org.springframework.boot' version '2.7.8' +} + +dependencies { + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + implementation 'com.vladmihalcea:hibernate-types-55:2.21.1' + implementation 'org.springframework.boot:spring-boot-starter-web:2.7.8' + implementation 'mysql:mysql-connector-java:8.+' + testImplementation 'com.h2database:h2:2.2.224' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' +} + +test { + useJUnitPlatform() +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java new file mode 100644 index 000000000..38eb363a8 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java @@ -0,0 +1,13 @@ +package com.linkedin.openhouse.optimizer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** Spring Boot entry point for the Optimizer Service. */ +@SpringBootApplication +public class OptimizerServiceApplication { + + public static void main(String[] args) { + SpringApplication.run(OptimizerServiceApplication.class, args); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/mapper/OptimizerMapper.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/mapper/OptimizerMapper.java new file mode 100644 index 000000000..8c0b17462 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/mapper/OptimizerMapper.java @@ -0,0 +1,32 @@ +package com.linkedin.openhouse.optimizer.api.mapper; + +import com.linkedin.openhouse.optimizer.api.model.TableOperationsDto; +import com.linkedin.openhouse.optimizer.api.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.api.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.entity.TableOperationsHistoryRow; +import com.linkedin.openhouse.optimizer.entity.TableOperationsRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsHistoryRow; +import com.linkedin.openhouse.optimizer.entity.TableStatsRow; +import org.mapstruct.Mapper; + +/** + * MapStruct mapper for converting between optimizer JPA entities and their corresponding DTOs. + * + *
Spring-instantiated at compile time. Inject via {@code @Autowired} or constructor injection. + */ +@Mapper(componentModel = "spring") +public interface OptimizerMapper { + + /** Map a {@link TableOperationsRow} to its DTO. */ + TableOperationsDto toDto(TableOperationsRow row); + + /** Map a {@link TableOperationsHistoryRow} to its DTO. */ + TableOperationsHistoryDto toDto(TableOperationsHistoryRow row); + + /** Map a {@link TableStatsRow} to its DTO. */ + TableStatsDto toDto(TableStatsRow row); + + /** Map a {@link TableStatsHistoryRow} to its DTO. */ + TableStatsHistoryDto toDto(TableStatsHistoryRow row); +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java new file mode 100644 index 000000000..35f7ba782 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/CompleteOperationRequest.java @@ -0,0 +1,25 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Request body for {@code POST /v1/table-operations/{id}/complete}. + * + *
Reports the outcome of a completed operation. The backend looks up the operation row by {@code + * id} and writes a history entry with the operation's table metadata and the supplied result. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CompleteOperationRequest { + + /** Outcome of the operation. */ + private OperationHistoryStatus status; + + /** Error details on failure; {@code null} on success. */ + private JobResult result; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/JobResult.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/JobResult.java new file mode 100644 index 000000000..74942243c --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/JobResult.java @@ -0,0 +1,25 @@ +package com.linkedin.openhouse.optimizer.api.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Result payload for a completed Spark maintenance job. + * + *
Stored as JSON in the {@code result} column of {@code table_operations_history}. Both fields
+ * are {@code null} on success; populated on failure.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class JobResult {
+
+ /** Human-readable error message; {@code null} if the job succeeded. */
+ private String errorMessage;
+
+ /** Error category (e.g., {@code OOM}, {@code TIMEOUT}); {@code null} if the job succeeded. */
+ private String errorType;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationHistoryStatus.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationHistoryStatus.java
new file mode 100644
index 000000000..791d910a6
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationHistoryStatus.java
@@ -0,0 +1,7 @@
+package com.linkedin.openhouse.optimizer.api.model;
+
+/** Terminal states for a completed Spark maintenance job. */
+public enum OperationHistoryStatus {
+ SUCCESS,
+ FAILED
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationStatus.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationStatus.java
new file mode 100644
index 000000000..c97be441b
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationStatus.java
@@ -0,0 +1,21 @@
+package com.linkedin.openhouse.optimizer.api.model;
+
+/** Lifecycle states for a table operation recommendation. */
+public enum OperationStatus {
+
+ /** Recommended by the Analyzer but not yet claimed by the Scheduler. */
+ PENDING,
+
+ /** Claimed by the Scheduler; waiting for the Jobs Service to return a job ID. */
+ SCHEDULING,
+
+ /** Job submitted to the Jobs Service; the row now carries a {@code jobId}. */
+ SCHEDULED,
+
+ /**
+ * Marked by the Scheduler when it detects duplicate PENDING rows for the same {@code (table_uuid,
+ * operation_type)}. Only the most-recent PENDING row is claimed; older duplicates are CANCELED
+ * before the claim step.
+ */
+ CANCELED
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java
new file mode 100644
index 000000000..8507bae12
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/OperationType.java
@@ -0,0 +1,7 @@
+package com.linkedin.openhouse.optimizer.api.model;
+
+/** Maintenance operation types supported by the continuous optimizer. */
+public enum OperationType {
+ /** Removes orphaned data files no longer referenced by table metadata. */
+ ORPHAN_FILES_DELETION
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java
new file mode 100644
index 000000000..d41bd6906
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsDto.java
@@ -0,0 +1,42 @@
+package com.linkedin.openhouse.optimizer.api.model;
+
+import java.time.Instant;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** DTO for {@code table_operations} — Analyzer recommendations read by the Scheduler. */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TableOperationsDto {
+
+ /** Client-generated UUID identifying this specific operation recommendation. */
+ private String id;
+
+ /** Stable table identity from the Tables Service. */
+ private String tableUuid;
+
+ /** Denormalized database name for display; not part of the primary key. */
+ private String databaseName;
+
+ /** Denormalized table name for display; not part of the primary key. */
+ private String tableName;
+
+ /** The type of maintenance operation (e.g. ORPHAN_FILES_DELETION). */
+ private OperationType operationType;
+
+ /** {@code PENDING} or {@code SCHEDULED}. Defaults to {@code PENDING} on creation. */
+ private OperationStatus status;
+
+ /** Server-set when the row is first created by the Analyzer. */
+ private Instant createdAt;
+
+ /** Set by the Scheduler when claiming; {@code null} while PENDING. */
+ private Instant scheduledAt;
+
+ /** Job ID returned by the Jobs Service after successful submission. */
+ private String jobId;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java
new file mode 100644
index 000000000..2a901ad2b
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableOperationsHistoryDto.java
@@ -0,0 +1,37 @@
+package com.linkedin.openhouse.optimizer.api.model;
+
+import java.time.Instant;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** DTO for {@code table_operations_history} — append-only operation results. */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TableOperationsHistoryDto {
+
+ /** Same UUID as the originating {@code table_operations.id}; supplied by the caller. */
+ private String id;
+
+ /** Stable table identity from the Tables Service. */
+ private String tableUuid;
+
+ private String databaseName;
+ private String tableName;
+ private OperationType operationType;
+
+ /** When the operation completed, as recorded by the complete endpoint. */
+ private Instant completedAt;
+
+ /** {@code SUCCESS} or {@code FAILED}. */
+ private OperationHistoryStatus status;
+
+ /** Job ID from the Jobs Service. */
+ private String jobId;
+
+ /** Job result payload; both fields null on success. */
+ private JobResult result;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java
new file mode 100644
index 000000000..64c99061a
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStats.java
@@ -0,0 +1,50 @@
+package com.linkedin.openhouse.optimizer.api.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Combined stats payload stored as a single JSON blob per table. */
+@Data
+@Builder(toBuilder = true)
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TableStats {
+
+ /** Snapshot fields — overwritten on every upsert. */
+ private SnapshotMetrics snapshot;
+
+ /** Delta fields — accumulated across commit events. */
+ private CommitDelta delta;
+
+ /** Point-in-time metadata read from Iceberg at scan time. */
+ @Data
+ @Builder(toBuilder = true)
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class SnapshotMetrics {
+ private String clusterId;
+ private String tableVersion;
+ private String tableLocation;
+ private Long tableSizeBytes;
+ /** Total number of data files as of the latest snapshot — used for bin-packing. */
+ private Long numCurrentFiles;
+ }
+
+ /** Per-commit incremental counters; accumulated across all recorded commit events. */
+ @Data
+ @Builder(toBuilder = true)
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class CommitDelta {
+ private Long numFilesAdded;
+ private Long numFilesDeleted;
+ private Long addedSizeBytes;
+ private Long deletedSizeBytes;
+ }
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java
new file mode 100644
index 000000000..81dd6b802
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/TableStatsDto.java
@@ -0,0 +1,34 @@
+package com.linkedin.openhouse.optimizer.api.model;
+
+import java.time.Instant;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** DTO for {@code table_stats} — used for response payloads. */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TableStatsDto {
+
+ /** Stable Iceberg table UUID. Primary key of the stats row. */
+ private String tableUuid;
+
+ /** Denormalized database name for display. */
+ private String databaseName;
+
+ /** Denormalized table name for display. */
+ private String tableName;
+
+ /** Combined snapshot + delta stats payload, stored as JSON. */
+ private TableStats stats;
+
+ /** Current table properties snapshot (e.g. maintenance opt-in flags). */
+ private Map The Analyzer supplies the operation {@code id} (client-generated UUID) in the path and all
+ * table-identifying fields in this body. The service creates the row on first call.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class UpsertTableOperationsRequest {
+
+ /** Stable Iceberg table UUID identifying the target table. */
+ private String tableUuid;
+
+ /** Denormalized database name for display. */
+ private String databaseName;
+
+ /** Denormalized table name for display. */
+ private String tableName;
+
+ /** The type of maintenance operation to create. */
+ private OperationType operationType;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java
new file mode 100644
index 000000000..02290bad5
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/model/UpsertTableStatsRequest.java
@@ -0,0 +1,32 @@
+package com.linkedin.openhouse.optimizer.api.model;
+
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Request body for {@code PUT /v1/table-stats/{tableUuid}}.
+ *
+ * {@code tableUuid} comes from the path variable. {@code databaseName} and {@code tableName} are
+ * denormalized display columns carried in the body.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class UpsertTableStatsRequest {
+
+ /** Denormalized database name for display. */
+ private String databaseName;
+
+ /** Denormalized table name for display. */
+ private String tableName;
+
+ /** Combined snapshot + delta stats payload from this commit. */
+ private TableStats stats;
+
+ /** Current table properties snapshot (e.g. maintenance opt-in flags). */
+ private Map Written when the operation-complete endpoint is called. The {@code id} is the same UUID as the
+ * originating {@code table_operations.id}, tying each history entry back to the operation cycle
+ * that produced it. Multiple runs of the same operation on the same table produce multiple rows
+ * (each cycle gets a new UUID from the Analyzer).
+ */
+@Entity
+@Table(
+ name = "table_operations_history",
+ indexes = {
+ @Index(name = "idx_table_uuid_hist", columnList = "table_uuid"),
+ @Index(name = "idx_op_type_hist", columnList = "operation_type"),
+ @Index(name = "idx_completed_at", columnList = "completed_at"),
+ @Index(name = "idx_status_hist", columnList = "status"),
+ @Index(name = "idx_job_id", columnList = "job_id"),
+ @Index(name = "idx_toph_db_table", columnList = "database_name, table_name")
+ })
+@Getter
+@EqualsAndHashCode
+@Builder(toBuilder = true)
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PROTECTED)
+public class TableOperationsHistoryRow {
+
+ /** Same UUID as the originating {@code table_operations.id}. Set by the caller; not generated. */
+ @Id
+ @Column(name = "id", nullable = false, length = 36)
+ private String id;
+
+ @Column(name = "table_uuid", nullable = false, length = 36)
+ private String tableUuid;
+
+ @Column(name = "database_name", nullable = false, length = 128)
+ private String databaseName;
+
+ @Column(name = "table_name", nullable = false, length = 128)
+ private String tableName;
+
+ @Enumerated(EnumType.STRING)
+ @Column(name = "operation_type", nullable = false, length = 50)
+ private OperationType operationType;
+
+ /** When the operation completed, as recorded by the complete endpoint. */
+ @Column(name = "completed_at", nullable = false)
+ private Instant completedAt;
+
+ /** {@code SUCCESS} or {@code FAILED}. */
+ @Enumerated(EnumType.STRING)
+ @Column(name = "status", nullable = false, length = 20)
+ private OperationHistoryStatus status;
+
+ /** Spark job ID; indexed for job → result lookups. */
+ @Column(name = "job_id", length = 255)
+ private String jobId;
+
+ /** Job result: error details on failure, both fields null on success. */
+ @Convert(converter = JobResultConverter.class)
+ @Column(name = "result")
+ private JobResult result;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java
new file mode 100644
index 000000000..43778495a
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableOperationsRow.java
@@ -0,0 +1,88 @@
+package com.linkedin.openhouse.optimizer.entity;
+
+import com.linkedin.openhouse.optimizer.api.model.OperationStatus;
+import com.linkedin.openhouse.optimizer.api.model.OperationType;
+import java.time.Instant;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+/**
+ * JPA entity representing an Analyzer recommendation for a table maintenance operation.
+ *
+ * Each row is identified by a client-generated UUID ({@code id}). The Analyzer creates a new row
+ * when it first recommends an operation for a table, or when re-recommending after a prior terminal
+ * state (SUCCESS/FAILED). Old terminal rows accumulate — they serve as implicit history. {@code
+ * table_uuid} is the stable identity for the table (survives renames; rotates on drop+recreate).
+ * The application enforces one active (PENDING or SCHEDULED) row per {@code (table_uuid,
+ * operation_type)} at a time.
+ */
+@Entity
+@Table(
+ name = "table_operations",
+ indexes = {
+ @Index(name = "idx_table_uuid", columnList = "table_uuid"),
+ @Index(name = "idx_op_type", columnList = "operation_type"),
+ @Index(name = "idx_status", columnList = "status"),
+ @Index(name = "idx_created_at", columnList = "created_at"),
+ @Index(name = "idx_scheduled_at", columnList = "scheduled_at")
+ })
+@Getter
+@EqualsAndHashCode
+@Builder(toBuilder = true)
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PROTECTED)
+public class TableOperationsRow {
+
+ /** Client-generated UUID identifying this specific operation recommendation. */
+ @Id
+ @Column(name = "id", nullable = false, length = 36)
+ private String id;
+
+ /** Stable table identity from the Tables Service. Survives renames; rotates on drop+recreate. */
+ @Column(name = "table_uuid", nullable = false, length = 36)
+ private String tableUuid;
+
+ @Column(name = "database_name", nullable = false, length = 128)
+ private String databaseName;
+
+ @Column(name = "table_name", nullable = false, length = 128)
+ private String tableName;
+
+ @Enumerated(EnumType.STRING)
+ @Column(name = "operation_type", nullable = false, length = 50)
+ private OperationType operationType;
+
+ @Enumerated(EnumType.STRING)
+ @Column(name = "status", nullable = false, length = 20)
+ private OperationStatus status;
+
+ /** When the Analyzer first created this row. Set by the service on insert; never updated. */
+ @Column(name = "created_at", nullable = false)
+ private Instant createdAt;
+
+ /** Set when the operation is claimed; {@code null} while {@code PENDING}. */
+ @Column(name = "scheduled_at")
+ private Instant scheduledAt;
+
+ /** Job ID returned by the Jobs Service after successful submission. */
+ @Column(name = "job_id", length = 255)
+ private String jobId;
+
+ /**
+ * Manual optimistic lock for the Scheduler claim. Incremented by the raw {@code claimOperation}
+ * UPDATE query; must NOT use JPA {@code @Version} since the claim bypasses JPA entity management.
+ */
+ @Column(name = "version")
+ private Long version;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsHistoryRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsHistoryRow.java
new file mode 100644
index 000000000..b0d92fc81
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsHistoryRow.java
@@ -0,0 +1,61 @@
+package com.linkedin.openhouse.optimizer.entity;
+
+import com.linkedin.openhouse.optimizer.api.model.TableStats;
+import com.vladmihalcea.hibernate.type.json.JsonStringType;
+import java.time.Instant;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.hibernate.annotations.Type;
+import org.hibernate.annotations.TypeDef;
+
+/**
+ * Append-only record of per-commit stats reported by the Tables Service.
+ *
+ * Each Iceberg commit produces one row. The {@code stats} JSON contains both the snapshot
+ * metrics (point-in-time) and the commit delta (files added/deleted in this commit). Consumers can
+ * query this table to reconstruct change rates over arbitrary time windows.
+ */
+@TypeDef(name = "json", typeClass = JsonStringType.class)
+@Entity
+@Table(
+ name = "table_stats_history",
+ indexes = {
+ @Index(name = "idx_tsh_table_uuid", columnList = "table_uuid"),
+ @Index(name = "idx_tsh_recorded_at", columnList = "recorded_at")
+ })
+@Getter
+@EqualsAndHashCode
+@Builder(toBuilder = true)
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PROTECTED)
+public class TableStatsHistoryRow {
+
+ @Id
+ @Column(name = "id", nullable = false, length = 36)
+ private String id;
+
+ @Column(name = "table_uuid", nullable = false, length = 36)
+ private String tableUuid;
+
+ @Column(name = "database_name", nullable = false, length = 128)
+ private String databaseName;
+
+ @Column(name = "table_name", nullable = false, length = 128)
+ private String tableName;
+
+ @Type(type = "json")
+ @Column(name = "stats", columnDefinition = "TEXT")
+ private TableStats stats;
+
+ @Column(name = "recorded_at", nullable = false)
+ private Instant recordedAt;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java
new file mode 100644
index 000000000..f682a3485
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/entity/TableStatsRow.java
@@ -0,0 +1,57 @@
+package com.linkedin.openhouse.optimizer.entity;
+
+import com.linkedin.openhouse.optimizer.api.model.TableStats;
+import com.vladmihalcea.hibernate.type.json.JsonStringType;
+import java.time.Instant;
+import java.util.Map;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.hibernate.annotations.Type;
+import org.hibernate.annotations.TypeDef;
+
+/**
+ * JPA entity representing a per-table stats snapshot in the optimizer DB.
+ *
+ * Written by the Tables Service on every Iceberg commit. Read by the Analyzer directly via JPA
+ * to enumerate tables and check scheduling eligibility.
+ */
+@TypeDef(name = "json", typeClass = JsonStringType.class)
+@Entity
+@Table(name = "table_stats")
+@Getter
+@EqualsAndHashCode
+@Builder(toBuilder = true)
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PROTECTED)
+public class TableStatsRow {
+
+ @Id
+ @Column(name = "table_uuid", nullable = false, length = 36)
+ private String tableUuid;
+
+ @Column(name = "database_name", nullable = false, length = 128)
+ private String databaseName;
+
+ @Column(name = "table_name", nullable = false, length = 128)
+ private String tableName;
+
+ @Type(type = "json")
+ @Column(name = "stats", columnDefinition = "TEXT")
+ private TableStats stats;
+
+ @Type(type = "json")
+ @Column(name = "table_properties", columnDefinition = "TEXT")
+ private Map