Skip to content

Commit 7ba8e6f

Browse files
committed
Add metastore metrics to LakehouseMetadata.getMetrics
1 parent 69d8d4b commit 7ba8e6f

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.plugin.lakehouse;
1515

16+
import com.google.common.collect.ImmutableMap;
1617
import com.google.common.collect.Iterators;
1718
import io.airlift.slice.Slice;
1819
import io.trino.metastore.Table;
@@ -87,6 +88,8 @@
8788
import io.trino.spi.expression.Constant;
8889
import io.trino.spi.function.LanguageFunction;
8990
import io.trino.spi.function.SchemaFunctionName;
91+
import io.trino.spi.metrics.Metric;
92+
import io.trino.spi.metrics.Metrics;
9093
import io.trino.spi.security.GrantInfo;
9194
import io.trino.spi.security.Privilege;
9295
import io.trino.spi.security.RoleGrant;
@@ -251,6 +254,25 @@ public Optional<Object> getInfo(ConnectorSession session, ConnectorTableHandle t
251254
return forHandle(table).getInfo(session, table);
252255
}
253256

257+
@Override
258+
public Metrics getMetrics(ConnectorSession session)
259+
{
260+
ImmutableMap.Builder<String, Metric<?>> metrics = ImmutableMap.<String, Metric<?>>builder();
261+
hiveMetadata.getMetrics(session).getMetrics().forEach((metricName, metric) -> {
262+
metrics.put("hive." + metricName, metric);
263+
});
264+
icebergMetadata.getMetrics(session).getMetrics().forEach((metricName, metric) -> {
265+
metrics.put("iceberg." + metricName, metric);
266+
});
267+
deltaMetadata.getMetrics(session).getMetrics().forEach((metricName, metric) -> {
268+
metrics.put("delta." + metricName, metric);
269+
});
270+
hudiMetadata.getMetrics(session).getMetrics().forEach((metricName, metric) -> {
271+
metrics.put("hudi." + metricName, metric);
272+
});
273+
return new Metrics(metrics.buildOrThrow());
274+
}
275+
254276
@Override
255277
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
256278
{

plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,30 @@
1313
*/
1414
package io.trino.plugin.lakehouse;
1515

16+
import com.google.common.collect.ImmutableList;
1617
import io.trino.Session;
18+
import io.trino.connector.MockConnectorFactory;
19+
import io.trino.connector.MockConnectorPlugin;
1720
import io.trino.plugin.hive.containers.Hive3MinioDataLake;
21+
import io.trino.spi.metrics.Metrics;
22+
import io.trino.spi.statistics.TableStatistics;
1823
import io.trino.testing.BaseConnectorTest;
1924
import io.trino.testing.MaterializedResult;
25+
import io.trino.testing.QueryFailedException;
2026
import io.trino.testing.QueryRunner;
2127
import io.trino.testing.TestingConnectorBehavior;
28+
import io.trino.testing.TestingSession;
2229
import io.trino.testing.sql.TestTable;
2330
import org.junit.jupiter.api.BeforeAll;
2431
import org.junit.jupiter.api.Disabled;
2532
import org.junit.jupiter.api.Test;
2633
import org.junit.jupiter.api.TestInstance;
2734

35+
import java.util.Map;
2836
import java.util.Optional;
2937
import java.util.OptionalInt;
3038

39+
import static io.trino.SystemSessionProperties.ITERATIVE_OPTIMIZER_TIMEOUT;
3140
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
3241
import static io.trino.spi.type.VarcharType.VARCHAR;
3342
import static io.trino.testing.MaterializedResult.resultBuilder;
@@ -36,8 +45,10 @@
3645
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
3746
import static io.trino.testing.containers.Minio.MINIO_REGION;
3847
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
48+
import static java.lang.String.format;
3949
import static org.assertj.core.api.Assertions.assertThat;
4050
import static org.assertj.core.api.Assertions.assertThatThrownBy;
51+
import static org.assertj.core.api.Fail.fail;
4152
import static org.junit.jupiter.api.Assumptions.abort;
4253
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
4354

@@ -81,6 +92,29 @@ public void setUp()
8192
copyTpchTables(getQueryRunner(), "tpch", TINY_SCHEMA_NAME, REQUIRED_TPCH_TABLES);
8293
}
8394

95+
@BeforeAll
96+
public void initMockMetricsCatalog()
97+
{
98+
QueryRunner queryRunner = getQueryRunner();
99+
String mockConnector = "mock_metrics";
100+
queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder()
101+
.withName(mockConnector)
102+
.withListSchemaNames(_ -> ImmutableList.of("default"))
103+
.withGetTableStatistics(_ -> {
104+
try {
105+
Thread.sleep(110);
106+
}
107+
catch (InterruptedException e) {
108+
Thread.currentThread().interrupt();
109+
throw new RuntimeException(e);
110+
}
111+
return TableStatistics.empty();
112+
})
113+
.build()));
114+
115+
queryRunner.createCatalog("mock_metrics", mockConnector);
116+
}
117+
84118
@Override
85119
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
86120
{
@@ -365,4 +399,36 @@ public void testShowCreateTable()
365399
type = 'ICEBERG'
366400
)\\E""");
367401
}
402+
403+
@Test
404+
public void testCatalogMetadataMetrics()
405+
{
406+
QueryRunner.MaterializedResultWithPlan result = getQueryRunner().executeWithPlan(
407+
getSession(),
408+
"SELECT count(*) FROM region r, nation n WHERE r.regionkey = n.regionkey");
409+
Map<String, Metrics> metrics = getCatalogMetadataMetrics(result.queryId());
410+
assertCountMetricExists(metrics, "lakehouse", "iceberg.metastore.all.time.total");
411+
assertDistributionMetricExists(metrics, "lakehouse", "iceberg.metastore.all.time.distribution");
412+
assertCountMetricExists(metrics, "lakehouse", "iceberg.metastore.getTable.time.total");
413+
assertDistributionMetricExists(metrics, "lakehouse", "iceberg.metastore.getTable.time.distribution");
414+
}
415+
416+
@Test
417+
public void testCatalogMetadataMetricsWithOptimizerTimeoutExceeded()
418+
{
419+
String query = "SELECT count(*) FROM region r, nation n, mock_metrics.default.mock_table m WHERE r.regionkey = n.regionkey";
420+
try {
421+
Session smallOptimizerTimeout = TestingSession.testSessionBuilder(getSession())
422+
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "100ms")
423+
.build();
424+
QueryRunner.MaterializedResultWithPlan result = getQueryRunner().executeWithPlan(smallOptimizerTimeout, query);
425+
fail(format("Expected query to fail: %s [QueryId: %s]", query, result.queryId()));
426+
}
427+
catch (QueryFailedException e) {
428+
assertThat(e.getMessage()).contains("The optimizer exhausted the time limit");
429+
Map<String, Metrics> metrics = getCatalogMetadataMetrics(e.getQueryId());
430+
assertCountMetricExists(metrics, "lakehouse", "iceberg.metastore.all.time.total");
431+
assertCountMetricExists(metrics, "lakehouse", "iceberg.metastore.getTable.time.total");
432+
}
433+
}
368434
}

0 commit comments

Comments
 (0)