diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java index 8ee552c98930..dbda38344b71 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java @@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; @@ -51,6 +52,7 @@ public class LakehouseConnector private final LakehouseSessionProperties sessionProperties; private final LakehouseTableProperties tableProperties; private final IcebergMaterializedViewProperties materializedViewProperties; + private final Set tableProcedures; @Inject public LakehouseConnector( @@ -62,7 +64,8 @@ public LakehouseConnector( LakehouseNodePartitioningProvider nodePartitioningProvider, LakehouseSessionProperties sessionProperties, LakehouseTableProperties tableProperties, - IcebergMaterializedViewProperties materializedViewProperties) + IcebergMaterializedViewProperties materializedViewProperties, + Set tableProcedures) { this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); @@ -73,6 +76,7 @@ public LakehouseConnector( this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null"); this.tableProperties = requireNonNull(tableProperties, "tableProperties is null"); this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null"); + this.tableProcedures = requireNonNull(tableProcedures, "tableProcedures is null"); } @Override @@ -148,6 +152,12 @@ public List> getMaterializedViewProperties() return materializedViewProperties.getMaterializedViewProperties(); } + @Override + public Set getTableProcedures() + { + return tableProcedures; + } + @Override public void shutdown() { diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java index 6050c8fa54f0..6f3817454afe 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java @@ -16,6 +16,7 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Scopes; +import com.google.inject.multibindings.Multibinder; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.plugin.hive.HideDeltaLakeTables; @@ -24,7 +25,17 @@ import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure; +import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure; +import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure; +import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure; +import io.trino.plugin.iceberg.procedure.OptimizeManifestsTableProcedure; +import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; +import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure; +import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTableProcedure; +import io.trino.spi.connector.TableProcedureMetadata; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -53,6 +64,16 @@ protected void setup(Binder binder) binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName(); + Multibinder tableProcedures = newSetBinder(binder, TableProcedureMetadata.class); + tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(OptimizeManifestsTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(RollbackToSnapshotTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON); + binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false); } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java index d5faf3eb11a1..c2c7357a90e5 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java @@ -62,6 +62,7 @@ protected QueryRunner createQueryRunner() .addLakehouseProperty("s3.endpoint", hiveMinio.getMinio().getMinioAddress()) .addLakehouseProperty("s3.path-style-access", "true") .addLakehouseProperty("s3.streaming.part-size", "5MB") + .addLakehouseProperty("iceberg.add-files-procedure.enabled", "true") .build(); } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java index 955bdd2da1ee..953c6911888f 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.DELTA; +import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -59,4 +60,17 @@ public void testShowCreateTable() type = 'DELTA' )\\E"""); } + + @Test + public void testOptimize() + { + String tableName = "test_optimize_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar)"); + try { + assertThat(query("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")).succeeds(); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java index 1745312c5c41..1e17619c1d28 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java @@ -13,11 +13,13 @@ */ package io.trino.plugin.lakehouse; +import io.trino.Session; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.HIVE; +import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; public class TestLakehouseHiveConnectorSmokeTest @@ -66,4 +68,28 @@ comment varchar(152) type = 'HIVE' )"""); } + + @Test + public void testOptimize() + { + String tableName = "test_optimize_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar)"); + try { + assertThat(query("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")) + .failure().hasMessage("OPTIMIZE procedure must be explicitly enabled via non_transactional_optimize_enabled session property"); + + Session session = optimizeEnabledSession(); + assertThat(query(session, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")).succeeds(); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + + private Session optimizeEnabledSession() + { + return Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "non_transactional_optimize_enabled", "true") + .build(); + } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java index 42bfaa102b22..2b3effe9c298 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.ICEBERG; +import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; public class TestLakehouseIcebergConnectorSmokeTest @@ -44,4 +45,46 @@ public void testShowCreateTable() type = 'ICEBERG' )\\E"""); } + + @Test + public void testOptimize() + { + String tableName = "test_optimize_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar)"); + try { + assertThat(query("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")).succeeds(); + + assertThat(query("ALTER TABLE " + tableName + " EXECUTE optimize_manifests")).succeeds(); + + assertThat(query("ALTER TABLE " + tableName + " EXECUTE drop_extended_stats")).succeeds(); + + long currentSnapshotId = getCurrentSnapshotId(tableName); + assertThat(currentSnapshotId).isGreaterThan(0); + assertThat(query("ALTER TABLE " + tableName + " EXECUTE rollback_to_snapshot(" + currentSnapshotId + ")")).succeeds(); + + assertThat(query("ALTER TABLE " + tableName + " EXECUTE expire_snapshots(retention_threshold => '7d')")).succeeds(); + + assertThat(query("ALTER TABLE " + tableName + " EXECUTE remove_orphan_files(retention_threshold => '7d')")).succeeds(); + + assertThat(query("ALTER TABLE " + tableName + " EXECUTE add_files(" + + " location => 's3://my-bucket/a/path'," + + " format => 'ORC')")) + .failure().hasMessage("Failed to add files: Failed to list location: s3://my-bucket/a/path"); + + String tableName2 = "test_optimize2_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName2 + " (key integer, value varchar)"); + assertThat(query("ALTER TABLE " + tableName + " EXECUTE add_files_from_table(" + + " schema_name => CURRENT_SCHEMA," + + " table_name => '" + tableName2 + "')")) + .failure().hasMessage("Adding files from non-Hive tables is unsupported"); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + + private long getCurrentSnapshotId(String tableName) + { + return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); + } }