diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 867786be84..1021e1c469 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -53,6 +53,15 @@ * transactions. */ final class MultiplexedSessionDatabaseClient extends AbstractMultiplexedSessionDatabaseClient { + /** + * The maximum number of attempts that the client will try to execute CreateSession for the + * initial multiplexed session. This value is only used for the very first multiplexed session + * that is created, and it is only used if the application has not set a waitForMinSessions value. + * If waitForMinSessions has been set, then the client will retry until the duration in + * waitForMinSessions has been reached. + */ + private static final int MAX_INITIAL_CREATE_SESSION_ATTEMPTS = 10; + @VisibleForTesting static final Statement DETERMINE_DIALECT_STATEMENT = Statement.newBuilder( @@ -226,14 +235,19 @@ public void close() { final SettableApiFuture initialSessionReferenceFuture = SettableApiFuture.create(); this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture); - asyncCreateMultiplexedSession(initialSessionReferenceFuture); + + Duration waitDuration = + sessionClient.getSpanner().getOptions().getSessionPoolOptions().getWaitForMinSessions(); + int initialAttempts = + waitDuration == null || waitDuration.isZero() ? MAX_INITIAL_CREATE_SESSION_ATTEMPTS : 1; + asyncCreateMultiplexedSession(initialSessionReferenceFuture, initialAttempts); maybeWaitForSessionCreation( sessionClient.getSpanner().getOptions().getSessionPoolOptions(), initialSessionReferenceFuture); } private void asyncCreateMultiplexedSession( - SettableApiFuture sessionReferenceFuture) { + SettableApiFuture sessionReferenceFuture, int remainingAttempts) { this.sessionClient.asyncCreateMultiplexedSession( new SessionConsumer() { @Override @@ -263,7 +277,15 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount MultiplexedSessionDatabaseClient.this.resourceNotFoundException.set( (ResourceNotFoundException) spannerException); } + // Set the exception to trigger an error for all waiters. + // Then retry the session creation if the error is (potentially) transient. sessionReferenceFuture.setException(t); + if (remainingAttempts > 1 + && RETRYABLE_ERROR_CODES.contains(spannerException.getErrorCode())) { + final SettableApiFuture future = SettableApiFuture.create(); + MultiplexedSessionDatabaseClient.this.multiplexedSessionReference.set(future); + asyncCreateMultiplexedSession(future, remainingAttempts - 1); + } } }); } @@ -283,7 +305,7 @@ private void maybeWaitForSessionCreation( // If any exception is thrown, then retry the multiplexed session creation if (sessionReferenceFuture == null) { sessionReferenceFuture = SettableApiFuture.create(); - asyncCreateMultiplexedSession(sessionReferenceFuture); + asyncCreateMultiplexedSession(sessionReferenceFuture, 1); this.multiplexedSessionReference.set(sessionReferenceFuture); } try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ExcludeFromChangeStreamTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ExcludeFromChangeStreamTest.java new file mode 100644 index 0000000000..498e2fab10 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ExcludeFromChangeStreamTest.java @@ -0,0 +1,299 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.connection.AbstractMockServerTest; +import com.google.cloud.spanner.connection.RandomResultSetGenerator; +import com.google.common.collect.ImmutableList; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.ReadRequest; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ExcludeFromChangeStreamTest extends AbstractMockServerTest { + + @BeforeClass + public static void setupReadResult() { + RandomResultSetGenerator generator = new RandomResultSetGenerator(10); + mockSpanner.putStatementResult( + StatementResult.query( + Statement.of("SELECT my-column FROM my-table WHERE 1=1"), generator.generate())); + } + + private Spanner createSpanner() { + return SpannerOptions.newBuilder() + .setProjectId("fake-project") + .setHost("http://localhost:" + getPort()) + .setCredentials(NoCredentials.getInstance()) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build() + .getService(); + } + + @Test + public void testStandardTransaction() { + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + + ReadRequest readRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(readRequest.hasTransaction()); + assertTrue(readRequest.getTransaction().hasBegin()); + assertTrue(readRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(readRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0); + assertNotNull(commitRequest.getTransactionId()); + + mockSpanner.clearRequests(); + } + } + } + + @Test + public void testTransactionAbortedDuringRead() { + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + AtomicBoolean hasAborted = new AtomicBoolean(false); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + if (hasAborted.compareAndSet(false, true)) { + mockSpanner.abortNextStatement(); + } + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + + BeginTransactionRequest beginRequest = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class).get(0); + assertTrue(beginRequest.getOptions().hasReadWrite()); + assertTrue(beginRequest.getOptions().getExcludeTxnFromChangeStreams()); + + ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(firstReadRequest.hasTransaction()); + assertTrue(firstReadRequest.getTransaction().hasBegin()); + assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1); + assertTrue(secondReadRequest.hasTransaction()); + assertTrue(secondReadRequest.getTransaction().hasId()); + + CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0); + assertNotNull(commitRequest.getTransactionId()); + + mockSpanner.clearRequests(); + } + } + } + + @Test + public void testTransactionAbortedDuringCommit() { + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + AtomicBoolean hasAborted = new AtomicBoolean(false); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + if (hasAborted.compareAndSet(false, true)) { + mockSpanner.abortNextStatement(); + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class)); + + ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(firstReadRequest.hasTransaction()); + assertTrue(firstReadRequest.getTransaction().hasBegin()); + assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1); + assertTrue(secondReadRequest.hasTransaction()); + assertTrue(secondReadRequest.getTransaction().hasBegin()); + assertTrue(secondReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(secondReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + for (CommitRequest commitRequest : mockSpanner.getRequestsOfType(CommitRequest.class)) { + assertNotNull(commitRequest.getTransactionId()); + } + mockSpanner.clearRequests(); + } + } + } + + @Test + public void testReadReturnsUnavailable() { + + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + + ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(firstReadRequest.hasTransaction()); + assertTrue(firstReadRequest.getTransaction().hasBegin()); + assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1); + assertTrue(secondReadRequest.hasTransaction()); + assertTrue(secondReadRequest.getTransaction().hasBegin()); + assertTrue(secondReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(secondReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0); + assertNotNull(commitRequest.getTransactionId()); + + mockSpanner.clearRequests(); + } + } + } + + @Test + public void testReadReturnsUnavailableHalfway() { + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 2)); + + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + + ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(firstReadRequest.hasTransaction()); + assertTrue(firstReadRequest.getTransaction().hasBegin()); + assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1); + assertTrue(secondReadRequest.hasTransaction()); + assertTrue(secondReadRequest.getTransaction().hasId()); + + CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0); + assertNotNull(commitRequest.getTransactionId()); + + mockSpanner.clearRequests(); + } + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 720b088f14..629b561186 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -101,6 +101,37 @@ public void createSpannerInstance() { .getService(); } + @Test + public void testCreateSessionDeadlineExceeded() { + // Simulate a problem with the CreateSession RPC making it slow. + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .build() + .getService(); + DatabaseClient client = testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + // The first attempt should lead to a DEADLINE_EXCEEDED error being propagated from the + // CreateSession attempt. + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + SpannerException exception = assertThrows(SpannerException.class, resultSet::next); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); + } + + // Remove the simulated problem on the mock server. + // The next attempt should then succeed. + mockSpanner.removeAllExecutionTimes(); + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) {} + } + } + @Test public void testMultiUseReadOnlyTransactionUsesSameSession() { // Execute two queries using the same transaction. Both queries should use the same @@ -346,12 +377,21 @@ public void testRetryWithNoSessionCreationWaitTime() { }); assertEquals(ErrorCode.DEADLINE_EXCEEDED, spannerException.getErrorCode()); + // The CreateSession RPC will be retried, and as the exception is removed by the first call, + // the second attempt will succeed. + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + List createSessionRequests = mockSpanner.getRequestsOfType(CreateSessionRequest.class); - assertEquals(1, createSessionRequests.size()); + assertEquals(2, createSessionRequests.size()); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); - assertEquals(0, requests.size()); + assertEquals(1, requests.size()); testSpanner.close(); }