From 6542cb1c8d548434542915cf53c2e67aa885bc36 Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Fri, 26 Sep 2025 23:18:55 +0000 Subject: [PATCH 1/6] feat(BatchWrite): Add BatchWriteAsync to Spanner.V1.PooledSession Introduce the BatchWriteAsync method to the PooledSession class in Google.Cloud.Spanner.V1. --- .../Google.Cloud.Spanner.V1/PooledSession.cs | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs index f4e6be8fab47..41a3640366a3 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs @@ -17,8 +17,11 @@ using Google.Cloud.Spanner.V1.Internal; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; +using Grpc.Core; using System; +using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using static Google.Cloud.Spanner.V1.TransactionOptions; @@ -578,7 +581,7 @@ public Task CommitAsync(CommitRequest request, CallSettings call skipTransactionCreation: request.Mutations.Count == 0, // If there are only mutations we won't have a transaction but we need one. callSettings?.CancellationToken ?? default); - void SetCommandTransaction(TransactionSelector transactionSelector) + void SetCommandTransaction(TransactionSelector transactionSelector) { switch (transactionSelector.SelectorCase) { @@ -850,6 +853,37 @@ public Task ExecuteBatchDmlAsync(ExecuteBatchDmlRequest Transaction GetInlinedTransaction(ExecuteBatchDmlResponse response) => response?.ResultSets?.FirstOrDefault()?.Metadata?.Transaction; } + /// + /// Executes a BatchWrite RPC asynchronously, returning a stream of responses. + /// + /// + /// This operation does not participate in the session's active transaction. + /// + /// The batch write request. Must not be null. + /// If not null, applies overrides to this RPC call. + /// An asynchronous stream of messages. + public async IAsyncEnumerable BatchWriteAsync(BatchWriteRequest request, CallSettings callSettings) + { + CheckNotDisposed(); + GaxPreconditions.CheckNotNull(request, nameof(request)); + request.SessionAsSessionName = SessionName; + + SpannerClient.BatchWriteStream stream = Client.BatchWrite(request, callSettings); + AsyncResponseStream responseStream = stream.GetResponseStream(); + + try + { + while (await responseStream.MoveNextAsync().ConfigureAwait(false)) + { + yield return responseStream.Current; + } + } + finally + { + await responseStream.DisposeAsync().ConfigureAwait(false); + } + } + private void MaybeApplyDirectedReadOptions(IReadOrQueryRequest request) { if (TransactionMode == ModeOneofCase.ReadOnly // Directed reads apply only to single use or read only transactions. Single use are read only. From b22137f38b2cc6c36f5b86da8e913c21c1536468 Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Mon, 6 Oct 2025 21:38:08 +0000 Subject: [PATCH 2/6] review - feat(BatchWrite): Add BatchWriteAsync to Spanner.V1.PooledSession Respond to review comments. --- .../Internal/ExecuteHelper.cs | 15 +++++++++ .../Google.Cloud.Spanner.V1/PooledSession.cs | 32 +++++++++++-------- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/Internal/ExecuteHelper.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/Internal/ExecuteHelper.cs index a42bfbb39d11..8e35dc124cad 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/Internal/ExecuteHelper.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/Internal/ExecuteHelper.cs @@ -57,6 +57,21 @@ internal static async Task WithSessionExpiryChecking(this Task task, Session ses } } + /// + /// Waits for to complete, handling session expiry by marking the session appropriately. + /// + internal static async ValueTask WithSessionExpiryChecking(this ValueTask task, Session session) + { + try + { + return await task.ConfigureAwait(false); + } + catch (RpcException ex) when (ex.CheckForSessionExpiredError(session)) + { + throw; + } + } + private static bool CheckForSessionExpiredError(this RpcException rpcException, Session session) { if (rpcException.IsSessionExpiredError()) diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs index 41a3640366a3..5e5eb5ca2a74 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs @@ -862,26 +862,14 @@ public Task ExecuteBatchDmlAsync(ExecuteBatchDmlRequest /// The batch write request. Must not be null. /// If not null, applies overrides to this RPC call. /// An asynchronous stream of messages. - public async IAsyncEnumerable BatchWriteAsync(BatchWriteRequest request, CallSettings callSettings) + public IAsyncEnumerable BatchWriteAsync(BatchWriteRequest request, CallSettings callSettings) { CheckNotDisposed(); GaxPreconditions.CheckNotNull(request, nameof(request)); request.SessionAsSessionName = SessionName; SpannerClient.BatchWriteStream stream = Client.BatchWrite(request, callSettings); - AsyncResponseStream responseStream = stream.GetResponseStream(); - - try - { - while (await responseStream.MoveNextAsync().ConfigureAwait(false)) - { - yield return responseStream.Current; - } - } - finally - { - await responseStream.DisposeAsync().ConfigureAwait(false); - } + return StreamResponsesAndRecordSuccessAsync(stream.GetResponseStream()); } private void MaybeApplyDirectedReadOptions(IReadOrQueryRequest request) @@ -910,6 +898,22 @@ private async Task RecordSuccessAndExpiredSessions(Task task) UpdateRefreshTime(); } + private async IAsyncEnumerable StreamResponsesAndRecordSuccessAsync(AsyncResponseStream responseStream) + { + try + { + while (await responseStream.MoveNextAsync().WithSessionExpiryChecking(Session).ConfigureAwait(false)) + { + UpdateRefreshTime(); + yield return responseStream.Current; + } + } + finally + { + await responseStream.DisposeAsync().ConfigureAwait(false); + } + } + /// /// Updates the refresh time for the session based on the current time. This should /// be called when a successful RPC is made with the associated session. From a91b1b4d9430b25500544da39c3e58d52d1c9620 Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Mon, 6 Oct 2025 21:34:02 +0000 Subject: [PATCH 3/6] test(BatchWrite): PooledSession Integration Tests Add integration tests for Spanner.V1.BatchWrite. --- .../V1/BatchWriteTableFixture.cs | 83 +++++++++++ .../V1/BatchWriteTests.cs | 136 ++++++++++++++++++ 2 files changed, 219 insertions(+) create mode 100644 apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/BatchWriteTableFixture.cs create mode 100644 apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/BatchWriteTests.cs diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/BatchWriteTableFixture.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/BatchWriteTableFixture.cs new file mode 100644 index 000000000000..a433b630b8ca --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/BatchWriteTableFixture.cs @@ -0,0 +1,83 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Api.Gax.Grpc; +using Google.Cloud.Spanner.V1; +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Google.Cloud.Spanner.Data.IntegrationTests; + +[CollectionDefinition(nameof(BatchWriteTableFixture))] +public class BatchWriteTableFixture : CommonDataTableFixture, ICollectionFixture, IAsyncLifetime +{ + private const int TimeoutSeconds = 60; + private SessionPoolManager _poolManager; + private SessionPool _pool; + + /// + /// The name of the key column in the table. + /// + public readonly string KeyName = "Key"; + + public BatchWriteTableFixture() : base("BatchWrite") + { + } + + /// + protected override void CreateTable() + { + ExecuteDdl($@"CREATE TABLE {this.TableName} ( + {this.KeyName} STRING(256), + ) PRIMARY KEY ({this.KeyName})"); + } + + /// + protected override void PopulateTable(bool fresh) + { + // Do nothing. The BatchWriteTests don't need pre-populated data. + } + + /// + /// Initializes the session pool for the fixture. + /// + public async Task InitializeAsync() + { + _poolManager = SessionPoolManager.Create(new SessionPoolOptions()); + _pool = await _poolManager.AcquireSessionPoolAsync(SpannerClientCreationOptions); + } + + + public Task GetPooledSessionAsync() => + _pool.AcquireSessionAsync(DatabaseName, new TransactionOptions(), CancellationToken.None); + + /// + /// Gets the call settings used to interact with the database. + /// + public CallSettings GetCallSettings() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(TimeoutSeconds)); + using SpannerConnection connection = GetConnection(); + return connection.CreateCallSettings(settings => settings.BatchWriteSettings, TimeoutSeconds, cts.Token); + } + + /// + public Task DisposeAsync() + { + _poolManager.Release(_pool); + return Task.CompletedTask; + } +} diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/BatchWriteTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/BatchWriteTests.cs new file mode 100644 index 000000000000..e189c5389aee --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/BatchWriteTests.cs @@ -0,0 +1,136 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Cloud.Spanner.Data.IntegrationTests; +using Google.Protobuf.WellKnownTypes; +using System.Threading.Tasks; +using Xunit; +using System.Collections.Generic; +using System; +using Newtonsoft.Json; + +namespace Google.Cloud.Spanner.V1.IntegrationTests; + +[CommonTestDiagnostics] +[Collection(nameof(BatchWriteTableFixture))] +public class BatchWriteTests +{ + private record BatchWriteResponseAnalysis(int successCount, int failureCount); + + private readonly BatchWriteTableFixture _fixture; + private const int TimeoutSeconds = 60; + + public BatchWriteTests(BatchWriteTableFixture fixture) => + _fixture = fixture; + + [Fact] + public async Task BatchWrite_Success() + { + // Create two non-conflicting write mutations + PooledSession pooledSession = await _fixture.GetPooledSessionAsync(); + var mutationGroup = new BatchWriteRequest.Types.MutationGroup(); + mutationGroup.Mutations.AddRange([ + new Mutation + { + Insert = new Mutation.Types.Write + { + Table = _fixture.TableName, + Columns = { _fixture.KeyName }, + Values = { new ListValue { Values = { Value.ForString(Guid.NewGuid().ToString()) } } } + } + }, + new Mutation + { + Insert = new Mutation.Types.Write + { + Table = _fixture.TableName, + Columns = { _fixture.KeyName }, + Values = { new ListValue { Values = { Value.ForString(Guid.NewGuid().ToString()) } } } + } + } + ]); + + BatchWriteRequest batchWriteRequest = new() + { + Session = pooledSession.Session.Name, + MutationGroups = { mutationGroup } + }; + + IAsyncEnumerable responseStream = pooledSession.BatchWriteAsync(batchWriteRequest, _fixture.GetCallSettings()); + BatchWriteResponseAnalysis responseAnalysis = await GetResponseAnalysis(responseStream); + + // The single mutation group will succeed as one atomic unit + Assert.Equal(0, responseAnalysis.failureCount); + Assert.Equal(1, responseAnalysis.successCount); + } + + [Fact] + public async Task BatchWrite_Failure_Conflict() + { + // Create a valid and a conflicting write mutation group. + PooledSession pooledSession = await _fixture.GetPooledSessionAsync(); + var mutation = new Mutation + { + Insert = new Mutation.Types.Write + { + Table = _fixture.TableName, + Columns = { _fixture.KeyName }, + Values = { new ListValue { Values = { Value.ForString("Conflict because matching write.") } } }, + } + }; + + var mutationGroupNoConflict = new BatchWriteRequest.Types.MutationGroup(); + var mutationGroupConflict = new BatchWriteRequest.Types.MutationGroup(); + mutationGroupConflict.Mutations.AddRange([mutation, mutation]); + mutationGroupNoConflict.Mutations.AddRange([mutation]); + + BatchWriteRequest batchWriteRequest = new() + { + Session = pooledSession.Session.Name, + MutationGroups = { mutationGroupConflict, mutationGroupNoConflict } + }; + + // Send the batch write request and parse the response + IAsyncEnumerable responseStream = pooledSession.BatchWriteAsync(batchWriteRequest, _fixture.GetCallSettings()); + BatchWriteResponseAnalysis responseAnalysis = await GetResponseAnalysis(responseStream); + + // The mutation group with a conflict will result in one failure + Assert.Equal(1, responseAnalysis.failureCount); + // The mutation group without a conflict will result in one success + Assert.Equal(1, responseAnalysis.successCount); + } + + private static async Task GetResponseAnalysis(IAsyncEnumerable responseStream) + { + var successCount = 0; + var failureCount = 0; + string result = ""; + await foreach(var response in responseStream) + { + // STATUS CODE of 0 is a success + if (response.Status.Code == 0) + { + successCount += response.Indexes.Count; + } + else + { + failureCount += response.Indexes.Count; + } + result += JsonConvert.SerializeObject(response); + } + + return new BatchWriteResponseAnalysis(successCount: successCount, failureCount: failureCount); + } + +} From 875c06ded656ff410b18810def1a9b2930beb18c Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Tue, 7 Oct 2025 21:14:40 +0000 Subject: [PATCH 4/6] feat: Add SpannerBatchWriteCommand --- .../SpannerBatchWriteCommand.cs | 147 ++++++++++++++++++ .../SpannerCommand.ExecutableCommand.cs | 86 +++++----- .../SpannerCommand.cs | 3 + .../SpannerConnection.cs | 6 + 4 files changed, 200 insertions(+), 42 deletions(-) create mode 100644 apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs new file mode 100644 index 000000000000..a4d0df9a8374 --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs @@ -0,0 +1,147 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Api.Gax; +using Google.Api.Gax.Grpc; +using Google.Cloud.Spanner.V1; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using static Google.Cloud.Spanner.V1.BatchWriteRequest.Types; + +namespace Google.Cloud.Spanner.Data; + +/// +/// Represents a batch of mutation groups to be sent to Spanner via the BatchWrite RPC. +/// Each group of mutations is committed atomically, but independent of other groups. +/// This command is non-transactional and cannot be used with an explicit transaction. +/// +public sealed class SpannerBatchWriteCommand +{ + // Visible for testing + internal List MutationGroups { get; } = []; + + /// + /// The connection to the data source. This is never null. + /// + public SpannerConnection Connection { get; } + + /// + /// Gets or sets the wait time before terminating the attempt to execute a command and generating an error. + /// Defaults to the timeout from the connection string. + /// + public int CommandTimeout { get; set; } + + /// + /// The statement tag to send to Cloud Spanner for this command. + /// + public string Tag { get; set; } + + /// + /// The RPC priority to use for this command. The default priority is Unspecified. + /// + public Priority Priority { get; set; } + + /// + /// If set to true then any change streams monitoring columns modified + /// by transactions will capture the updates made within that transaction. + /// + public bool ExcludeTxnFromChangeStream {get; set;} + + internal SpannerBatchWriteCommand(SpannerConnection connection) + { + Connection = GaxPreconditions.CheckNotNull(connection, nameof(connection)); + CommandTimeout = connection.Builder.Timeout; + } + + /// + /// Adds a command or multiple commands as a new mutation group to be resolved atomically. + /// + /// The command or commands to add as a single mutation group + public void Add(params SpannerCommand[] commands)=> Add(commands.AsEnumerable()); + + /// + /// Adds a collection of commands as new mutation group to be resolved atomically to be resolved atomically. + /// + /// The commands to add as a single mutation group. + public void Add(IEnumerable commands) + { + var groupAsList = GaxPreconditions.CheckNotNull(commands, nameof(commands)).ToList(); + GaxPreconditions.CheckArgument(groupAsList.Any(), nameof(commands), "Command group cannot be empty."); + + var mutations = new List(); + foreach (SpannerCommand cmd in groupAsList) + { + mutations.Add(cmd.AsMutation()); + } + + // These mutations will be resolved as a single mutation group. + MutationGroups.Add(new MutationGroup + { + Mutations = { mutations } + }); + } + + /// + /// Executes the batch of mutation groups using the BatchWrite RPC, streaming the results. + /// + public async IAsyncEnumerable ExecuteAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + if (MutationGroups.Count == 0) + { + yield break; + } + + var session = await Connection.AcquireSessionAsync(null, cancellationToken, out _).ConfigureAwait(false); + try + { + BatchWriteRequest request = CreateBatchWriteRequest(); + CallSettings callSettings = Connection.CreateCallSettings(null, CommandTimeout, cancellationToken); + IAsyncEnumerable responseStream = session.BatchWriteAsync(request, callSettings); + + await foreach (BatchWriteResponse response in responseStream.ConfigureAwait(false)) + { + yield return response; + } + } + finally + { + session.ReleaseToPool(forceDelete: false); + } + } + + private BatchWriteRequest CreateBatchWriteRequest() + { + var request = new BatchWriteRequest() + { + ExcludeTxnFromChangeStreams = ExcludeTxnFromChangeStream + }; + + if (Tag != null) + { + request.RequestOptions = new RequestOptions { RequestTag = Tag }; + } + + if (Priority != Priority.Unspecified) + { + request.RequestOptions ??= new RequestOptions(); + request.RequestOptions.Priority = PriorityConverter.ToProto(Priority); + } + + request.MutationGroups.AddRange(MutationGroups); + return request; + } +} diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs index c4f9f7b1ecf9..e7719effe67f 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs @@ -202,6 +202,49 @@ internal async Task ExecutePartitionedUpdateAsync(CancellationToken cancel return await transaction.ExecutePartitionedDmlAsync(request, cancellationToken, CommandTimeout).ConfigureAwait(false); } + internal Mutation AsMutation() + { + // Avoid calling method multiple times in the loop. + var conversionOptions = ConversionOptions; + // Whatever we do with the parameters, we'll need them in a ListValue. + var listValue = new ListValue + { + Values = { Parameters.Select(x => x.GetConfiguredSpannerDbType(conversionOptions).ToProtobufValue(x.GetValidatedValue())) } + }; + + if (CommandTextBuilder.SpannerCommandType != SpannerCommandType.Delete) + { + var w = new Mutation.Types.Write + { + Table = CommandTextBuilder.TargetTable, + Columns = { Parameters.Select(x => x.SourceColumn ?? x.ParameterName) }, + Values = { listValue } + }; + switch (CommandTextBuilder.SpannerCommandType) + { + case SpannerCommandType.Update: + return new Mutation { Update = w }; + case SpannerCommandType.Insert: + return new Mutation { Insert = w }; + case SpannerCommandType.InsertOrUpdate: + return new Mutation { InsertOrUpdate = w }; + default: + throw new ArgumentOutOfRangeException(); + } + } + else + { + var w = new Mutation.Types.Delete + { + Table = CommandTextBuilder.TargetTable, + KeySet = new V1.KeySet { Keys = { listValue } } + }; + return new Mutation { Delete = w }; + } + } + + private List AsMutations() => new List{ this.AsMutation() }; + private void ValidateConnectionAndCommandTextBuilder() { GaxPreconditions.CheckState(Connection != null, "SpannerCommand can only be executed when a connection is assigned."); @@ -318,7 +361,7 @@ private async Task ExecuteDdlAsync(CancellationToken cancellationToken) private async Task ExecuteMutationsAsync(CancellationToken cancellationToken) { await Connection.EnsureIsOpenAsync(cancellationToken).ConfigureAwait(false); - var mutations = GetMutations(); + var mutations = AsMutations(); var transaction = Transaction ?? Connection.AmbientTransaction ?? new EphemeralTransaction(Connection, EphemeralTransactionCreationOptions, EphemeralTransactionOptions); // Make the request. This will commit immediately or not depending on whether a transaction was explicitly created. await transaction.ExecuteMutationsAsync(mutations, cancellationToken, CommandTimeout).ConfigureAwait(false); @@ -326,47 +369,6 @@ private async Task ExecuteMutationsAsync(CancellationToken cancellationToke return mutations.Count; } - private List GetMutations() - { - // Avoid calling method multiple times in the loop. - var conversionOptions = ConversionOptions; - // Whatever we do with the parameters, we'll need them in a ListValue. - var listValue = new ListValue - { - Values = { Parameters.Select(x => x.GetConfiguredSpannerDbType(conversionOptions).ToProtobufValue(x.GetValidatedValue())) } - }; - - if (CommandTextBuilder.SpannerCommandType != SpannerCommandType.Delete) - { - var w = new Mutation.Types.Write - { - Table = CommandTextBuilder.TargetTable, - Columns = { Parameters.Select(x => x.SourceColumn ?? x.ParameterName) }, - Values = { listValue } - }; - switch (CommandTextBuilder.SpannerCommandType) - { - case SpannerCommandType.Update: - return new List { new Mutation { Update = w } }; - case SpannerCommandType.Insert: - return new List { new Mutation { Insert = w } }; - case SpannerCommandType.InsertOrUpdate: - return new List { new Mutation { InsertOrUpdate = w } }; - default: - throw new ArgumentOutOfRangeException(); - } - } - else - { - var w = new Mutation.Types.Delete - { - Table = CommandTextBuilder.TargetTable, - KeySet = new V1.KeySet { Keys = { listValue } } - }; - return new List { new Mutation { Delete = w } }; - } - } - // Based on the QueryOptions set at various levels (connection, environment and command), // constructs the QueryOptions proto to set in the ExecuteSqlRequest. // Options set at the SpannerCommand-level has the highest precedence. diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.cs index 94f56c21ab4a..2517e2c537f1 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.cs @@ -507,6 +507,9 @@ public long ExecutePartitionedUpdate() => public Task ExecutePartitionedUpdateAsync(CancellationToken cancellationToken = default) => CreateExecutableCommand().ExecutePartitionedUpdateAsync(cancellationToken); + + internal Mutation AsMutation() => CreateExecutableCommand().AsMutation(); + /// /// Creates an executable command that captures all the necessary information from this command. /// diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs index 2e19c88ee074..409c6332fd41 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs @@ -973,6 +973,12 @@ public SpannerCommand CreateDmlCommand(string dmlStatement, SpannerParameterColl /// public SpannerBatchCommand CreateBatchDmlCommand() => new SpannerBatchCommand(this); + /// + /// Creates a new to execute batched mutation groups with this connection. + /// This command is non-transactional. + /// + public SpannerBatchWriteCommand CreateBatchWriteCommand() => new SpannerBatchWriteCommand(this); + /// protected override DbCommand CreateDbCommand() => new SpannerCommand(this); From b986c785d6d6293428f37b474a5723f274ab2170 Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Tue, 7 Oct 2025 21:15:22 +0000 Subject: [PATCH 5/6] test: Add tests for SpannerBatchWriteCommand --- .../SpannerBatchWriteCommandTests.cs | 128 ++++++++++++++++++ .../SpannerBatchWriteCommandTests.cs | 121 +++++++++++++++++ 2 files changed, 249 insertions(+) create mode 100644 apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/SpannerBatchWriteCommandTests.cs create mode 100644 apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchWriteCommandTests.cs diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/SpannerBatchWriteCommandTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/SpannerBatchWriteCommandTests.cs new file mode 100644 index 000000000000..386ec0c36600 --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/SpannerBatchWriteCommandTests.cs @@ -0,0 +1,128 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Cloud.Spanner.V1; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace Google.Cloud.Spanner.Data.IntegrationTests; + +[Collection(nameof(BatchWriteTableFixture))] +public class SpannerBatchWriteCommandTests +{ + private readonly BatchWriteTableFixture _fixture; + + public SpannerBatchWriteCommandTests(BatchWriteTableFixture fixture) => + _fixture = fixture; + + [Fact] + public async Task ExecuteAsync_Success() + { + using var connection = _fixture.GetConnection(); + var command = connection.CreateBatchWriteCommand(); + var key1 = Guid.NewGuid().ToString(); + var key2 = Guid.NewGuid().ToString(); + + var insertCommand1 = connection.CreateInsertCommand(_fixture.TableName, new SpannerParameterCollection { { _fixture.KeyName, SpannerDbType.String, key1 } }); + var insertCommand2 = connection.CreateInsertCommand(_fixture.TableName, new SpannerParameterCollection { { _fixture.KeyName, SpannerDbType.String, key2 } }); + + command.Add(new[] { insertCommand1, insertCommand2 }); + + var responses = await command.ExecuteAsync().ToListAsync(); + + Assert.Single(responses); + var response = responses[0]; + Assert.Equal(0, response.Status.Code); // OK + Assert.Single(response.Indexes); + Assert.Equal(0, response.Indexes[0]); + + using var snapshot = await connection.BeginTransactionAsync(); + var readCommand = connection.CreateReadCommand(_fixture.TableName, ReadOptions.FromColumns(_fixture.KeyName), KeySet.All); + readCommand.Transaction = snapshot; + using var reader = await readCommand.ExecuteReaderAsync(); + var keys = new HashSet(); + while(await reader.ReadAsync()) + { + keys.Add(reader.GetString(0)); + } + Assert.Contains(key1, keys); + Assert.Contains(key2, keys); + } + + [Fact] + public async Task ExecuteAsync_PartialFailure() + { + using var connection = _fixture.GetConnection(); + var command = connection.CreateBatchWriteCommand(); + var conflictKey = Guid.NewGuid().ToString(); + var successKey = Guid.NewGuid().ToString(); + + var insertConflict1 = connection.CreateInsertCommand(_fixture.TableName, new SpannerParameterCollection { { _fixture.KeyName, SpannerDbType.String, conflictKey } }); + var insertConflict2 = connection.CreateInsertCommand(_fixture.TableName, new SpannerParameterCollection { { _fixture.KeyName, SpannerDbType.String, conflictKey } }); + var insertSuccess = connection.CreateInsertCommand(_fixture.TableName, new SpannerParameterCollection { { _fixture.KeyName, SpannerDbType.String, successKey } }); + + // This group will fail due to a primary key conflict. + command.Add(new[] { insertConflict1, insertConflict2 }); + // This group will succeed. + command.Add(insertSuccess); + + var responses = await command.ExecuteAsync().ToListAsync(); + + Assert.Equal(2, responses.Count); + var failedResponse = responses.Single(r => r.Status.Code != 0); + var successResponse = responses.Single(r => r.Status.Code == 0); + + Assert.Single(failedResponse.Indexes); + Assert.Equal(0, failedResponse.Indexes[0]); + Assert.Single(successResponse.Indexes); + Assert.Equal(1, successResponse.Indexes[0]); + + using var snapshot = await connection.BeginTransactionAsync(); + var readCommand = connection.CreateReadCommand(_fixture.TableName, ReadOptions.FromColumns(_fixture.KeyName), KeySet.All); + readCommand.Transaction = snapshot; + using var reader = await readCommand.ExecuteReaderAsync(); + var keys = new HashSet(); + while (await reader.ReadAsync()) + { + keys.Add(reader.GetString(0)); + } + Assert.Contains(successKey, keys); + Assert.DoesNotContain(conflictKey, keys); + } + + [Fact] + public async Task ExecuteAsync_EmptyCommand() + { + using var connection = _fixture.GetConnection(); + var command = connection.CreateBatchWriteCommand(); + + var responses = await command.ExecuteAsync().ToListAsync(); + + Assert.Empty(responses); + } + + [Fact] + public void Add_InvalidCommandType_Throws() + { + using var connection = _fixture.GetConnection(); + var command = connection.CreateBatchWriteCommand(); + var selectCommand = connection.CreateSelectCommand($"SELECT * FROM {_fixture.TableName}"); + + Assert.Throws(() => command.Add(selectCommand)); + } +} diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchWriteCommandTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchWriteCommandTests.cs new file mode 100644 index 000000000000..fe62b8544331 --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchWriteCommandTests.cs @@ -0,0 +1,121 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Api.Gax; +using Google.Api.Gax.Grpc; +using Google.Cloud.Spanner.V1; +using Google.Cloud.Spanner.V1.Internal.Logging; +using Google.Cloud.Spanner.V1.Tests; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using NSubstitute; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Transactions; +using Xunit; + +namespace Google.Cloud.Spanner.Data.Tests; + +public class SpannerBatchWriteCommandTests +{ + [Fact] + public void ConnectionConstructor() + { + var connection = new SpannerConnection(); + var command = new SpannerBatchWriteCommand(connection); + + Assert.Empty(command.MutationGroups); + Assert.Same(connection, command.Connection); + } + + [Fact] + public void CommandPriorityDefaultsToUnspecified() + { + SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger); + spannerClientMock + .SetupBatchCreateSessionsAsync(); + SpannerConnection connection = SpannerCommandTests.BuildSpannerConnection(spannerClientMock); + + var command = connection.CreateBatchWriteCommand(); + Assert.Equal(Priority.Unspecified, command.Priority); + } + + [Fact] + public async Task CommandIncludesPriority() + { + var priority = Priority.High; + SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger); + spannerClientMock + .SetupBatchCreateSessionsAsync(); + SpannerConnection connection = SpannerCommandTests.BuildSpannerConnection(spannerClientMock); + var command = connection.CreateBatchWriteCommand(); + + command.Priority = priority; + await foreach (var result in command.ExecuteAsync()) + { + // do nothing just let it process + } + + spannerClientMock.Received(1).BatchWrite( + Arg.Is(request => request.RequestOptions.Priority == PriorityConverter.ToProto(priority)), + Arg.Any()); + } + + [Fact] + public async Task CommandIncludesRequestTag() + { + var requestTag = "request-tag-1"; + SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger); + spannerClientMock + .SetupBatchCreateSessionsAsync() + .SetupExecuteBatchDmlAsync() + .SetupCommitAsync(); + SpannerConnection connection = SpannerCommandTests.BuildSpannerConnection(spannerClientMock); + SpannerBatchWriteCommand command = connection.CreateBatchWriteCommand(); + + command.Tag = requestTag; + await foreach (var result in command.ExecuteAsync()) + { + // do nothing just let it process + } + + spannerClientMock.Received(1).BatchWrite( + Arg.Is(request => request.RequestOptions.RequestTag == requestTag), + Arg.Any()); + } + + [Fact] + public async Task CommandIncludesExcludeTxnFromChangeStream() + { + SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger); + spannerClientMock + .SetupBatchCreateSessionsAsync() + .SetupExecuteBatchDmlAsync() + .SetupCommitAsync(); + SpannerConnection connection = SpannerCommandTests.BuildSpannerConnection(spannerClientMock); + SpannerBatchWriteCommand command = connection.CreateBatchWriteCommand(); + + command.ExcludeTxnFromChangeStream = true; + await foreach (var result in command.ExecuteAsync()) + { + // do nothing just let it process + } + + spannerClientMock.Received(1).BatchWrite( + Arg.Is(request => request.ExcludeTxnFromChangeStreams), + Arg.Any()); + } +} From 44557113f12ba079477ea2fd2b6f86b1dc7e5060 Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Mon, 20 Oct 2025 20:55:31 +0000 Subject: [PATCH 6/6] temp changes for unit tests - pop off before sharing --- .../SpannerBatchWriteCommandTests.cs | 3 +++ .../Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs | 1 + 2 files changed, 4 insertions(+) diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchWriteCommandTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchWriteCommandTests.cs index fe62b8544331..73bb60e23490 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchWriteCommandTests.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchWriteCommandTests.cs @@ -63,6 +63,9 @@ public async Task CommandIncludesPriority() SpannerConnection connection = SpannerCommandTests.BuildSpannerConnection(spannerClientMock); var command = connection.CreateBatchWriteCommand(); + SpannerCommand insertCommand = connection.CreateInsertCommand("SomeTableName", new SpannerParameterCollection { { "SomeKeyName", SpannerDbType.String, "SomeKey" } }); + command.Add(insertCommand); + command.Priority = priority; await foreach (var result in command.ExecuteAsync()) { diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs index a4d0df9a8374..a165998aaf18 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerBatchWriteCommand.cs @@ -116,6 +116,7 @@ public async IAsyncEnumerable ExecuteAsync([EnumeratorCancel { yield return response; } + throw new System.Exception(); } finally {