Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public RelationalResultSet executeInternal(@Nonnull final ExecutionContext execu
throw ExceptionUtil.toRelationalException(ipbe);
}
if (queryExecutionContext.isForExplain()) {
return executeExplain(parsedContinuation);
return executeExplain(parsedContinuation, executionContext);
} else {
return executePhysicalPlan(recordLayerSchema, typedEvaluationContext, executionContext, parsedContinuation);
}
Expand Down Expand Up @@ -280,12 +280,38 @@ protected <M extends Message> void validatePlanAgainstEnvironment(@Nonnull final
}

@Nonnull
private RelationalResultSet executeExplain(@Nonnull ContinuationImpl parsedContinuation) {
protected Optional<RecordQueryPlan> getSerializedPlanFromContinuation(@Nonnull ContinuationImpl parsedContinuation,
@Nonnull ExecutionContext executionContext) throws RelationalException {
final var compiledStatement = parsedContinuation.getCompiledStatement();
if (compiledStatement == null || !compiledStatement.hasPlan() || !compiledStatement.hasPlanSerializationMode()) {
return Optional.empty();
}

try {
final var planSerializationMode = PlanValidator.validateSerializedPlanSerializationMode(
compiledStatement,
OptionsUtils.getValidPlanHashModes(executionContext.getOptions())
);
final var serializationContext = new PlanSerializationContext(
DefaultPlanSerializationRegistry.INSTANCE, planSerializationMode
);
return Optional.of(
RecordQueryPlan.fromRecordQueryPlanProto(serializationContext, compiledStatement.getPlan()));
} catch (RelationalException ex) {
return Optional.empty();
}
}

@Nonnull
private RelationalResultSet executeExplain(@Nonnull ContinuationImpl parsedContinuation,
ExecutionContext executionContext) throws RelationalException {
final var continuationStructType = DataType.StructType.from(
"PLAN_CONTINUATION", List.of(
DataType.StructType.Field.from("EXECUTION_STATE", DataType.Primitives.BYTES.type(), 0),
DataType.StructType.Field.from("VERSION", DataType.Primitives.INTEGER.type(), 1),
DataType.StructType.Field.from("PLAN_HASH_MODE", DataType.Primitives.STRING.type(), 2)),
DataType.StructType.Field.from("PLAN_HASH_MODE", DataType.Primitives.STRING.type(), 2),
DataType.StructType.Field.from("PLAN_HASH", DataType.Primitives.INTEGER.type(), 3),
DataType.StructType.Field.from("SERIALIZED_PLAN_COMPLEXITY", DataType.Primitives.INTEGER.type(), 4)),
true);
final var plannerMetricsStructType = DataType.StructType.from(
"PLANNER_METRICS", List.of(
Expand All @@ -312,7 +338,9 @@ private RelationalResultSet executeExplain(@Nonnull ContinuationImpl parsedConti
new ImmutableRowStruct(new ArrayRow(
parsedContinuation.getExecutionState(),
parsedContinuation.getVersion(),
parsedContinuation.getCompiledStatement() == null ? null : parsedContinuation.getCompiledStatement().getPlanSerializationMode()
parsedContinuation.getCompiledStatement() == null ? null : parsedContinuation.getCompiledStatement().getPlanSerializationMode(),
parsedContinuation.getPlanHash(),
getSerializedPlanFromContinuation(parsedContinuation, executionContext).map(RecordQueryPlan::getComplexity).orElse(null)
), RelationalStructMetaData.of(continuationStructType));

final Struct plannerMetrics;
Expand Down Expand Up @@ -406,7 +434,7 @@ private Continuation enrichContinuation(@Nonnull final Continuation continuation
// 3. query constraints
//

final var serializationContext = new PlanSerializationContext(new DefaultPlanSerializationRegistry(), currentPlanHashMode);
final var serializationContext = new PlanSerializationContext(DefaultPlanSerializationRegistry.INSTANCE, currentPlanHashMode);

final var literals = queryExecutionContext.getLiterals();
final var compiledStatementBuilder = CompiledStatement.newBuilder()
Expand Down Expand Up @@ -442,6 +470,9 @@ public static class ContinuedPhysicalQueryPlan extends PhysicalQueryPlan {
@Nonnull
private final PlanHashMode serializedPlanHashMode;

@Nonnull
private final Supplier<Integer> serializedPlanHashSupplier;

public ContinuedPhysicalQueryPlan(@Nonnull final RecordQueryPlan recordQueryPlan,
@Nonnull final TypeRepository typeRepository,
@Nonnull final QueryPlanConstraint continuationConstraint,
Expand All @@ -452,6 +483,7 @@ public ContinuedPhysicalQueryPlan(@Nonnull final RecordQueryPlan recordQueryPlan
super(recordQueryPlan, null, typeRepository, QueryPlanConstraint.noConstraint(),
continuationConstraint, queryExecutionParameters, query, currentPlanHashMode);
this.serializedPlanHashMode = serializedPlanHashMode;
this.serializedPlanHashSupplier = Suppliers.memoize(() -> recordQueryPlan.planHash(serializedPlanHashMode));
}

@Nonnull
Expand Down Expand Up @@ -492,6 +524,18 @@ protected <M extends Message> void validatePlanAgainstEnvironment(@Nonnull final

metricCollector.increment(RelationalMetric.RelationalCount.CONTINUATION_ACCEPTED);
}

@Override
@Nonnull
protected Optional<RecordQueryPlan> getSerializedPlanFromContinuation(@Nonnull ContinuationImpl parsedContinuation,
@Nonnull ExecutionContext executionContext) throws RelationalException {
Assert.that(
Objects.requireNonNull(parsedContinuation.getPlanHash()).equals(serializedPlanHashSupplier.get()),
ErrorCode.INTERNAL_ERROR,
"unexpected mismatch between deserialized plan hash and continuation plan hash");
// No need to deserialize the plan from the continuation again
return Optional.of(getRecordQueryPlan());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,14 @@ public ExplainTests() {
void explainResultSetMetadataTest() throws Exception {
final var expectedLabels = List.of("PLAN", "PLAN_HASH", "PLAN_DOT", "PLAN_GML", "PLAN_CONTINUATION", "PLANNER_METRICS");
final var expectedTypes = List.of(Types.VARCHAR, Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.STRUCT, Types.STRUCT);
final var expectedContLabels = List.of("EXECUTION_STATE", "VERSION", "PLAN_HASH_MODE");
final var expectedContTypes = List.of(Types.BINARY, Types.INTEGER, Types.VARCHAR);
final var expectedContLabels = List.of(
"EXECUTION_STATE",
"VERSION",
"PLAN_HASH_MODE",
"PLAN_HASH",
"SERIALIZED_PLAN_COMPLEXITY"
);
final var expectedContTypes = List.of(Types.BINARY, Types.INTEGER, Types.VARCHAR, Types.INTEGER, Types.INTEGER);
try (var ddl = Ddl.builder().database(URI.create("/TEST/QT")).relationalExtension(relationalExtension).schemaTemplate(schemaTemplate).build()) {
executeInsert(ddl);
try (RelationalPreparedStatement ps = ddl.setSchemaAndGetConnection().prepareStatement("EXPLAIN SELECT * FROM RestaurantComplexRecord")) {
Expand Down Expand Up @@ -103,7 +109,7 @@ void explainWithNoContinuationTest() throws Exception {
final var assertResult = ResultSetAssert.assertThat(resultSet);
assertResult.hasNextRow()
.hasColumn("PLAN", "ISCAN(RECORD_NAME_IDX <,>)")
.hasColumn("PLAN_HASH", -1635569052L)
.hasColumn("PLAN_HASH", -1635569052)
.hasColumn("PLAN_CONTINUATION", null);
assertResult.hasNoNextRow();
}
Expand All @@ -127,15 +133,18 @@ void explainWithContinuationNoSerializedPlanTest() throws Exception {
ps.setObject("cont", continuation.serialize());
try (final RelationalResultSet resultSet = ps.executeQuery()) {
final var assertResult = ResultSetAssert.assertThat(resultSet);

assertResult.hasNextRow()
.hasColumn("PLAN", "ISCAN(RECORD_NAME_IDX <,>)")
.hasColumn("PLAN_HASH", -1635569052L);
.hasColumn("PLAN_HASH", -1635569052);
final var continuationInfo = resultSet.getStruct(5);
org.junit.jupiter.api.Assertions.assertNotNull(continuationInfo);
final var assertStruct = RelationalStructAssert.assertThat(continuationInfo);
assertStruct.hasValue("EXECUTION_STATE", new byte[]{0, 21, 1, 21, 11});
assertStruct.hasValue("VERSION", 1);
assertStruct.hasValue("PLAN_HASH_MODE", null);
assertStruct.hasValue("PLAN_HASH", -1635569052);
assertStruct.hasValue("SERIALIZED_PLAN_COMPLEXITY", null);
}
}
}
Expand All @@ -160,13 +169,48 @@ void explainWithContinuationSerializedPlanTest() throws Exception {
final var assertResult = ResultSetAssert.assertThat(resultSet);
assertResult.hasNextRow()
.hasColumn("PLAN", "ISCAN(RECORD_NAME_IDX <,>)")
.hasColumn("PLAN_HASH", -1635569052L);
.hasColumn("PLAN_HASH", -1635569052);
final var continuationInfo = resultSet.getStruct(5);
org.junit.jupiter.api.Assertions.assertNotNull(continuationInfo);
final var assertStruct = RelationalStructAssert.assertThat(continuationInfo);
assertStruct.hasValue("EXECUTION_STATE", new byte[]{0, 21, 1, 21, 11});
assertStruct.hasValue("VERSION", 1);
assertStruct.hasValue("PLAN_HASH_MODE", "VC0");
assertStruct.hasValue("PLAN_HASH", -1635569052);
assertStruct.hasValue("SERIALIZED_PLAN_COMPLEXITY", 1);
}
}
}
}
}

@Test
void explainWithContinuationSerializedPlanWithDifferentQueryTest() throws Exception {
try (var ddl = Ddl.builder().database(URI.create("/TEST/QT")).relationalExtension(relationalExtension).schemaTemplate(schemaTemplate).build()) {
executeInsert(ddl);
Continuation continuation;
try (final var connection = ddl.setSchemaAndGetConnection()) {
connection.setOption(Options.Name.CONTINUATIONS_CONTAIN_COMPILED_STATEMENTS, true);
try (RelationalPreparedStatement ps = ddl.setSchemaAndGetConnection().prepareStatement("SELECT * FROM RestaurantComplexRecord")) {
ps.setMaxRows(2);
continuation = consumeResultAndGetContinuation(ps, 2);
}

try (RelationalPreparedStatement ps = connection.prepareStatement("EXPLAIN SELECT rest_no FROM RestaurantComplexRecord WITH CONTINUATION ?cont")) {
ps.setObject("cont", continuation.serialize());
try (final RelationalResultSet resultSet = ps.executeQuery()) {
final var assertResult = ResultSetAssert.assertThat(resultSet);
assertResult.hasNextRow()
.hasColumn("PLAN", "COVERING(RECORD_NAME_IDX <,> -> [NAME: KEY[0], REST_NO: KEY[2]]) | MAP (_.REST_NO AS REST_NO)")
.hasColumn("PLAN_HASH", 4759756);
final var continuationInfo = resultSet.getStruct(5);
org.junit.jupiter.api.Assertions.assertNotNull(continuationInfo);
final var assertStruct = RelationalStructAssert.assertThat(continuationInfo);
assertStruct.hasValue("EXECUTION_STATE", new byte[]{0, 21, 1, 21, 11});
assertStruct.hasValue("VERSION", 1);
assertStruct.hasValue("PLAN_HASH_MODE", "VC0");
assertStruct.hasValue("PLAN_HASH", -1635569052);
assertStruct.hasValue("SERIALIZED_PLAN_COMPLEXITY", 1);
}
}
}
Expand All @@ -191,13 +235,15 @@ void explainExecuteStatementTest() throws Exception {
final var assertResult = ResultSetAssert.assertThat(resultSet);
assertResult.hasNextRow()
.hasColumn("PLAN", "ISCAN(RECORD_NAME_IDX <,>)")
.hasColumn("PLAN_HASH", -1635569052L);
.hasColumn("PLAN_HASH", -1635569052);
final var continuationInfo = resultSet.getStruct(5);
org.junit.jupiter.api.Assertions.assertNotNull(continuationInfo);
final var assertStruct = RelationalStructAssert.assertThat(continuationInfo);
assertStruct.hasValue("EXECUTION_STATE", new byte[]{0, 21, 1, 21, 11});
assertStruct.hasValue("VERSION", 1);
assertStruct.hasValue("PLAN_HASH_MODE", "VC0");
assertStruct.hasValue("PLAN_HASH", -1635569052);
assertStruct.hasValue("SERIALIZED_PLAN_COMPLEXITY", 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,33 +416,33 @@ void resolverState() throws RelationalException, SQLException {
try (RelationalConnection connection = db.connect(null)) {
connection.setSchema("dl");
try (RelationalStatement statement = connection.createStatement()) {
long version = interningLayer.getVersion(null).join();
int version = interningLayer.getVersion(null).join();
assertStateMatches(statement, version, "UNLOCKED");

interningLayer.incrementVersion().join();
assertStateMatches(statement, version + 1L, "UNLOCKED");
assertStateMatches(statement, version + 1, "UNLOCKED");

insertResolverState(statement, version + 2L, "UNLOCKED");
assertStateMatches(statement, version + 2L, "UNLOCKED");
insertResolverState(statement, version + 2, "UNLOCKED");
assertStateMatches(statement, version + 2, "UNLOCKED");

interningLayer.enableWriteLock().join();
assertStateMatches(statement, version + 2L, "WRITE_LOCKED");
assertStateMatches(statement, version + 2, "WRITE_LOCKED");

interningLayer.disableWriteLock().join();
assertStateMatches(statement, version + 2L, "UNLOCKED");
assertStateMatches(statement, version + 2, "UNLOCKED");

interningLayer.retireLayer().join();
assertStateMatches(statement, version + 2L, "RETIRED");
assertStateMatches(statement, version + 2, "RETIRED");

insertResolverState(statement, version + 3L, "RETIRED");
assertStateMatches(statement, version + 3L, "RETIRED");
insertResolverState(statement, version + 3, "RETIRED");
assertStateMatches(statement, version + 3, "RETIRED");

insertResolverState(statement, version + 3L, "UNLOCKED");
assertStateMatches(statement, version + 3L, "UNLOCKED");
insertResolverState(statement, version + 3, "UNLOCKED");
assertStateMatches(statement, version + 3, "UNLOCKED");

RelationalAssertions.assertThrowsSqlException(() -> insertResolverState(statement, version + 2L, "UNLOCKED"))
RelationalAssertions.assertThrowsSqlException(() -> insertResolverState(statement, version + 2, "UNLOCKED"))
.hasMessageContaining("resolver state version must monotonically increase");
assertStateMatches(statement, version + 3L, "UNLOCKED");
assertStateMatches(statement, version + 3, "UNLOCKED");
}
}
}
Expand All @@ -455,7 +455,7 @@ void listResolverState() throws RelationalException, SQLException {
try (RelationalConnection connection = db.connect(null)) {
connection.setSchema("dl");
try (RelationalStatement statement = connection.createStatement()) {
long version = interningLayer.getVersion(null).join();
int version = interningLayer.getVersion(null).join();

try (RelationalResultSet resultSet = scanResolverStates(statement)) {
ResultSetAssert.assertThat(resultSet)
Expand All @@ -482,7 +482,7 @@ void listResolverStateWithContinuation() throws RelationalException, SQLExceptio
try (RelationalConnection connection = db.connect(null)) {
connection.setSchema("dl");
try (RelationalStatement statement = connection.createStatement()) {
long version = interningLayer.getVersion(null).join();
int version = interningLayer.getVersion(null).join();

Continuation continuation;
try (RelationalResultSet resultSet = scanResolverStates(statement, 1, ContinuationImpl.BEGIN)) {
Expand Down Expand Up @@ -635,7 +635,7 @@ private void assertMapping(RelationalStatement statement, String key, long value
}
}

private void assertStateMatches(RelationalStatement statement, long version, String lock) throws SQLException {
private void assertStateMatches(RelationalStatement statement, int version, String lock) throws SQLException {
try (RelationalResultSet resultSet = statement.executeGet(LocatableResolverMetaDataProvider.RESOLVER_STATE_TYPE_NAME, new KeySet(), Options.NONE)) {
ResultSetAssert.assertThat(resultSet)
.hasNextRow()
Expand All @@ -657,9 +657,9 @@ private void assertStateMatches(RelationalStatement statement, long version, Str
}
}

private void insertResolverState(RelationalStatement statement, long version, String lock) throws SQLException {
private void insertResolverState(RelationalStatement statement, int version, String lock) throws SQLException {
final var struct = EmbeddedRelationalStruct.newBuilder()
.addInt(LocatableResolverMetaDataProvider.VERSION_FIELD_NAME, (int) version)
.addInt(LocatableResolverMetaDataProvider.VERSION_FIELD_NAME, version)
.addObject(LocatableResolverMetaDataProvider.LOCK_FIELD_NAME, lock)
.build();
Options options = Options.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ private Object readColumn(RelationalResultSet rs, int position, int columnType)
return rs.getBoolean(position);
case Types.SMALLINT:
case Types.INTEGER:
return rs.getInt(position);
case Types.BIGINT:
return rs.getLong(position);
case Types.FLOAT:
Expand Down
Loading