diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs index 89c3ca2b303a..d8307df02e29 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs @@ -23,6 +23,7 @@ using System.Threading; using System.Threading.Tasks; using Xunit; +using static Google.Cloud.PubSub.V1.SubscriberClient; // Tests create quite a few tasks that don't need awaiting. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed @@ -135,6 +136,7 @@ private async Task RunBulkMessagingImpl( long recvSum = 0L; // Sum of bytes of received messages var recvedIds = new ConcurrentDictionary(); var nackedIds = new HashSet(); + using CancellationTokenSource stopCancellation = new CancellationTokenSource(); Task subTask = subscriber.StartAsync((msg, ct) => { int id = BitConverter.ToInt32(msg.Data.ToArray(), 0); @@ -159,7 +161,8 @@ private async Task RunBulkMessagingImpl( { // Test finished, so stop subscriber Console.WriteLine("All msgs received, stopping subscriber."); - Task unused = subscriber.StopAsync(TimeSpan.FromSeconds(15)); + stopCancellation.CancelAfter(TimeSpan.FromSeconds(15)); + Task unused = subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately }, stopCancellation.Token); } } else @@ -196,7 +199,7 @@ private async Task RunBulkMessagingImpl( { // Deadlock, shutdown subscriber, and cancel Console.WriteLine("Deadlock detected. Cancelling test"); - subscriber.StopAsync(new CancellationToken(true)); + subscriber.StopAsync(new ShutdownOptions(), new CancellationToken(true)); watchdogCts.Cancel(); break; } @@ -257,7 +260,11 @@ private async Task RunBulkMessagingImpl( if (cancelAfterRecvCount is int cancelAfter) { - Assert.True(recvCount >= cancelAfter && recvCount <= cancelAfter + maxMessagesInFlight, $"Incorrect recvCount: {recvCount}"); + // Because we are using linked tokens for cancellation, which do not guarantee + // atomicity between when a parent token is cancelled to when the linked token is cancelled, + // there's still a chance that we get some more messages after cancelling, even with NackInmediately. + // So we expecte up to 3 times messages in flight after the cutoff message count. + Assert.True(recvCount >= cancelAfter && recvCount <= cancelAfter + 3 * maxMessagesInFlight, $"Incorrect recvCount: {recvCount}"); } else { @@ -431,9 +438,12 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre }); await Task.Delay(subscriberLifetime); Console.WriteLine("Stopping subscriber"); - Task stopTask = subscriber.StopAsync(TimeSpan.FromSeconds(15)); - // If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail. - await Task.WhenAll(subscribeTask, stopTask); + using var stopCancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + { + Task stopTask = subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately }, stopCancellationSource.Token); + // If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail. + await Task.WhenAll(subscribeTask, stopTask); + } int recvCount = recvedMsgs.Locked(() => recvedMsgs.Count); Console.WriteLine($"Stopped subscriber. Recv count: {recvCount}"); if (prevRecvCount == recvCount) @@ -534,12 +544,18 @@ await subscriberApi.IAMPolicyClient.SetIamPolicyAsync(new SetIamPolicyRequest return Task.FromResult(SubscriberClient.Reply.Nack); }); + using CancellationTokenSource stopCancellation = new CancellationTokenSource(); var taskDlqSub = dlqSub.StartAsync((msg, ct) => { result.Add((msg.GetDeliveryAttempt(), true)); // Received DLQ message, so stop test. - sub.StopAsync(TimeSpan.FromSeconds(10)); - dlqSub.StopAsync(TimeSpan.FromSeconds(10)); + var shutdownOptions = new ShutdownOptions + { + Mode = ShutdownMode.NackImmediately, + }; + stopCancellation.CancelAfter(TimeSpan.FromSeconds(10)); + sub.StopAsync(shutdownOptions, stopCancellation.Token); + dlqSub.StopAsync(shutdownOptions, stopCancellation.Token); return Task.FromResult(SubscriberClient.Reply.Ack); }); diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs index 52c4505c29b0..b67d9dfe2f6c 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs @@ -133,7 +133,7 @@ Task Subscribe() if (recvCount == inputLines.Count) { Console.WriteLine("Received all messages, shutting down"); - var dummyTask = sub.StopAsync(CancellationToken.None); + var dummyTask = sub.StopAsync(new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately}); } } if (rnd.Next(3) == 0) diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs index eb8e1fdd814f..8a4d02725327 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs @@ -142,8 +142,11 @@ await _subscriberClient.StartAsync((msg, token) => return Task.FromResult(SubscriberClient.Reply.Ack); }); - public override async Task StopAsync(CancellationToken stoppingToken) => - await _subscriberClient.StopAsync(stoppingToken); + public override async Task StopAsync(CancellationToken stoppingToken) + { + var shutdownOptions = new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately}; + await _subscriberClient.StopAsync(shutdownOptions, cancellationToken: stoppingToken); + } } // End sample } diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs index a69cbc7442c8..aa2d362dbf4c 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs @@ -129,7 +129,11 @@ await subscriber.StartAsync((msg, cancellationToken) => Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'"); // Stop this subscriber after one message is received. // This is non-blocking, and the returned Task may be awaited. - subscriber.StopAsync(TimeSpan.FromSeconds(15)); + subscriber.StopAsync(new SubscriberClient.ShutdownOptions + { + Mode = SubscriberClient.ShutdownMode.NackImmediately, + Timeout =TimeSpan.FromSeconds(15) + }); // Return Reply.Ack to indicate this message has been handled. return Task.FromResult(SubscriberClient.Reply.Ack); }); diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs index 72d839098982..83f26c21ecd4 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs @@ -23,10 +23,10 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Xunit; +using static Google.Cloud.PubSub.V1.SubscriberClient; namespace Google.Cloud.PubSub.V1.Tests { @@ -583,7 +583,7 @@ private static RpcException GetExactlyOnceDeliveryMixedException(Rpc.ErrorInfo e } [Theory, CombinatorialData] - public void ImmediateStop( + public void ImmediateStop_Obsolete( [CombinatorialValues(false, true)] bool hardStop) { using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) @@ -596,20 +596,49 @@ public void ImmediateStop( }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); +#pragma warning restore CS0618 + Assert.Equal(hardStop, isCancelled); + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.Subscribers[0].WriteCompletes); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.ClientShutdowns); + }); + } + } + + [Theory, CombinatorialData] + public void ImmediateStop( + [CombinatorialValues(false, true)] bool hardStop, + ShutdownMode shutdownMode) + { + using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) + { + fake.Scheduler.Run(async () => + { + var doneTask = fake.Subscriber.StartAsync((msg, ct) => + { + throw new Exception("Should never get here"); + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( + () => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode }, cancellationToken: new CancellationToken(hardStop))); Assert.Equal(hardStop, isCancelled); Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); Assert.Empty(fake.Subscribers[0].Nacks); Assert.Empty(fake.Subscribers[0].Extends); Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.Subscribers[0].WriteCompletes); - Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(3) }, fake.ClientShutdowns); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.ClientShutdowns); }); } } [Fact] - public void StopBeforeStart() + public void StopBeforeStart_Obsolete() { using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) { @@ -618,7 +647,35 @@ public void StopBeforeStart() await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); var exception = await Assert.ThrowsAsync( async () => await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(TimeSpan.FromHours(1)))); +#pragma warning restore CS0618 + Assert.Equal("Can only stop a started instance.", exception.Message); + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(Array.Empty(), fake.Subscribers[0].WriteCompletes); + Assert.Equal(Array.Empty(), fake.ClientShutdowns); + }); + } + } + + [Theory, CombinatorialData] + public void StopBeforeStart(ShutdownMode shutdownMode) + { + // Ensure StopAsync cannot be called before the subscriber has started. + using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) + { + fake.Scheduler.Run(async () => + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + + // Attempting to stop an unstarted subscriber should throw InvalidOperationException. + var exception = await Assert.ThrowsAsync( + async () => await fake.TaskHelper.ConfigureAwait( + fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode, Timeout = TimeSpan.FromHours(1) }))); + Assert.Equal("Can only stop a started instance.", exception.Message); Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); @@ -633,7 +690,7 @@ public void StopBeforeStart() // The test is similar to ImmediateStop but checks that calling DisposeAsync() instead of StopAsync() works. // It also tests that DisposeAsync() or StopAsync() can be called multiple times, without throwing exception. [Fact] - public void Dispose() + public void Dispose_Obsolete() { using (var fake = Fake.CreateClientForSingleResponseStream(new[] { ServerAction.Inf() })) { @@ -652,14 +709,47 @@ await fake.TaskHelper.ConfigureAwaitHideCancellation( () => fake.Subscriber.DisposeAsync().AsTask()); // Call StopAsync. It shouldn't throw an exception. await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(CancellationToken.None)); +#pragma warning restore CS0618 Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); Assert.Empty(fake.Subscribers[0].Nacks); Assert.Empty(fake.Subscribers[0].Extends); Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.Subscribers[0].WriteCompletes); - Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(3) }, fake.ClientShutdowns); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.ClientShutdowns); + }); + } + } + + [Theory, CombinatorialData] + public void Dispose(ShutdownMode shutdownMode) + { + // Ensure StopAsync is idempotent and safe to call after disposal. + using (var fake = Fake.CreateClientForSingleResponseStream(new[] { ServerAction.Inf() })) + { + fake.Scheduler.Run(async () => + { + var doneTask = fake.Subscriber.StartAsync((msg, ct) => + { + throw new Exception("Should never get here"); + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + + // Perform disposal then multiple StopAsync calls. + await fake.TaskHelper.ConfigureAwaitHideCancellation( + () => fake.Subscriber.DisposeAsync().AsTask()); + await fake.TaskHelper.ConfigureAwait( + fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode, Timeout = TimeSpan.FromHours(1) })); + + // Verify the client shutdown correctly without exceptions. + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.Subscribers[0].WriteCompletes); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.ClientShutdowns); }); } } @@ -691,7 +781,8 @@ await fake.TaskHelper.ConfigureAwaitHideCancellation( [Theory, PairwiseData] public void RecvManyMsgsNoErrors( - [CombinatorialValues(false, true)] bool hardStop, + bool hardStop, + ShutdownMode shutdownMode, [CombinatorialValues(2, 5, 6, 9, 10)] int batchCount, [CombinatorialValues(1, 10, 13, 44, 45)] int batchSize, [CombinatorialValues(0, 1, 4, 6, 60)] int interBatchIntervalSeconds, @@ -700,9 +791,16 @@ public void RecvManyMsgsNoErrors( [CombinatorialValues(1, 2, 3, 4, 5, 7, 13)] int threadCount, [CombinatorialValues(1, 2, 3, 4, 8, 16, 33)] int clientCount) { + int delayToSubtract = shutdownMode switch + { + ShutdownMode.WaitForProcessing => hardStop ? handlerDelaySeconds : 0, + ShutdownMode.NackImmediately => handlerDelaySeconds, + _ => throw new InvalidOperationException() + }; + var expectedCompletedBatches = interBatchIntervalSeconds == 0 - ? (hardStop && stopAfterSeconds < handlerDelaySeconds ? 0 : batchCount) - : Math.Max(0, stopAfterSeconds - (hardStop ? handlerDelaySeconds : 0)) / interBatchIntervalSeconds; + ? (stopAfterSeconds < delayToSubtract ? 0 : batchCount) + : Math.Max(0, stopAfterSeconds - delayToSubtract) / interBatchIntervalSeconds; var expectedMsgCount = Math.Min(expectedCompletedBatches, batchCount) * batchSize * clientCount; var expectedAcks = Enumerable.Range(0, batchCount) .SelectMany(batchIndex => Enumerable.Range(0, batchSize).Select(msgIndex => FakeSubscriberServiceApiClient.MakeMsgId(batchIndex, msgIndex))) @@ -729,7 +827,7 @@ public void RecvManyMsgsNoErrors( }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( - () => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); + () => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode }, cancellationToken: new CancellationToken(hardStop))); Assert.Equal(hardStop, isCancelled); Assert.Equal(clientCount, fake.Subscribers.Count); Assert.Equal(expectedMsgCount, handledMsgs.Locked(() => handledMsgs.Count)); @@ -783,7 +881,7 @@ public void OneClientManyMsgs([CombinatorialValues(1, 2, 3, 4, 5)] int rndSeed) recvedMsgs.Add(msgString); if (recvedMsgs.Count == totalMsgCount) { - Task unused = fake.Subscriber.StopAsync(CancellationToken.None); + Task unused = fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing }); } } return SubscriberClient.Reply.Ack; @@ -810,7 +908,7 @@ public void FlowControl( const int msgsPerClient = 100; var oneMsgByteCount = FakeSubscriberServiceApiClient.MakeReceivedMessage("0000.0000", "0000").CalculateSize(); var combinedFlowMaxElements = Math.Min(flowMaxElements, flowMaxBytes / oneMsgByteCount + 1); - var expectedMsgCount = Math.Min(msgsPerClient * clientCount, combinedFlowMaxElements * stopAfterSeconds + (hardStop ? 0 : combinedFlowMaxElements)); + var expectedMsgCount = Math.Min(msgsPerClient * clientCount, combinedFlowMaxElements * stopAfterSeconds); var msgss = Enumerable.Range(0, msgsPerClient) .Select(i => ServerAction.Data(TimeSpan.Zero, new[] { i.ToString("D4") })) .Concat(new[] { ServerAction.Inf() }); @@ -842,14 +940,14 @@ public void FlowControl( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwaitHideCancellation(() => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); + await fake.TaskHelper.ConfigureAwaitHideCancellation(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately }, cancellationToken: new CancellationToken(hardStop))); Assert.Equal(expectedMsgCount, handledMsgs.Count); }); } } - [Fact] - public void UserHandlerFaults() + [Theory, CombinatorialData] + public void UserHandlerFaults(ShutdownMode shutdownMode) { var msgs = Enumerable.Repeat(ServerAction.Data(TimeSpan.Zero, new[] { "m" }), 10).Concat(new[] { ServerAction.Inf() }); using (var fake = Fake.Create(new[] { msgs })) @@ -869,7 +967,7 @@ public void UserHandlerFaults() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(Enumerable.Repeat("m", 5), handledMsgs); Assert.Equal(5, fake.Subscribers[0].Acks.Count); Assert.Equal(5, fake.Subscribers[0].Nacks.Count); @@ -879,7 +977,8 @@ public void UserHandlerFaults() [Theory, PairwiseData] public void ServerFaultsRecoverable( - [CombinatorialValues(1, 3, 9, 14)] int threadCount) + [CombinatorialValues(1, 3, 9, 14)] int threadCount, + ShutdownMode shutdownMode) { var zero = TimeSpan.Zero; var recoverableEx = new RpcException(new Status(StatusCode.DeadlineExceeded, ""), ""); @@ -889,7 +988,7 @@ public void ServerFaultsRecoverable( new[] { ServerAction.Data(zero, new[] { "2" }), ServerAction.BadCurrent(zero, recoverableEx) }, new[] { ServerAction.Data(zero, new[] { "3" }), ServerAction.Inf() } }; - using (var fake = Fake.Create(msgs, threadCount: threadCount)) + using (var fake = Fake.Create(msgs, threadCount: threadCount, useMsgAsId: true)) { fake.Scheduler.Run(async () => { @@ -901,7 +1000,7 @@ public void ServerFaultsRecoverable( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(new[] { "1", "2", "3" }, handledMsgs); Assert.Equal(3, fake.Subscribers[0].Acks.Count); }); @@ -912,7 +1011,8 @@ public void ServerFaultsRecoverable( public void ServerFaultsUnrecoverable( [CombinatorialValues(true, false)] bool badMoveNext, [CombinatorialValues(1, 2, 3, 4, 10)] int clientCount, - [CombinatorialValues(1, 3, 9, 14)] int threadCount) + [CombinatorialValues(1, 3, 9, 14)] int threadCount, + ShutdownMode shutdownMode) { var zero = TimeSpan.Zero; var unrecoverableEx = new RpcException(new Status(StatusCode.Unimplemented, ""), ""); @@ -936,7 +1036,7 @@ public void ServerFaultsUnrecoverable( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(unrecoverableEx, ex.AllExceptions().FirstOrDefault()); Assert.NotEmpty(handledMsgs); Assert.True(handledMsgs[0] == "1" || handledMsgs[0] == "2"); @@ -965,7 +1065,7 @@ public void OnlyOneStart() } [Theory, CombinatorialData] - public void LeaseExtension(bool isExactlyOnceDelivery) + public void LeaseExtension(bool isExactlyOnceDelivery, ShutdownMode shutdownMode) { var msgs = new[] { new[] { ServerAction.Data(TimeSpan.Zero, new[] { "1" }), @@ -982,7 +1082,7 @@ public void LeaseExtension(bool isExactlyOnceDelivery) return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await fake.TaskHelper.ConfigureAwait(doneTask); Assert.Equal(1, fake.Subscribers.Count); DateTime S(int seconds) => fake.Time0 + TimeSpan.FromSeconds(seconds); @@ -993,7 +1093,7 @@ public void LeaseExtension(bool isExactlyOnceDelivery) } [Theory, CombinatorialData] - public void LeaseMaxExtension(bool isExactlyOnceDelivery) + public void LeaseMaxExtension(bool isExactlyOnceDelivery, ShutdownMode shutdownMode) { var msgs = new[] { new[] { ServerAction.Data(TimeSpan.Zero, new[] { "1" }), @@ -1010,7 +1110,7 @@ public void LeaseMaxExtension(bool isExactlyOnceDelivery) return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromHours(12), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await fake.TaskHelper.ConfigureAwait(doneTask); Assert.Equal(1, fake.Subscribers.Count); // Check that the lease was extended for 60 minutes only. @@ -1020,8 +1120,8 @@ public void LeaseMaxExtension(bool isExactlyOnceDelivery) } } - [Fact] - public void SlowUplinkThrottlesPull() + [Theory, CombinatorialData] + public void SlowUplinkThrottlesPull(ShutdownMode shutdownMode) { const int msgCount = 20; const int flowMaxEls = 5; @@ -1037,7 +1137,7 @@ public void SlowUplinkThrottlesPull() { var subTask = fake.Subscriber.StartAsync((msg, ct) => Task.FromResult(SubscriberClient.Reply.Ack)); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1000), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await fake.TaskHelper.ConfigureAwait(subTask); var sub = fake.Subscribers[0]; Assert.True(sub.Extends.Count <= msgCount); // Difficult to predict, must be <= total message count @@ -1050,8 +1150,8 @@ public void SlowUplinkThrottlesPull() } } - [Fact] - public void StreamPings() + [Theory, CombinatorialData] + public void StreamPings(ShutdownMode shutdownMode) { const int pingPeriodSeconds = 25; // From SubscriberClient. const int pingCount = 10; @@ -1070,7 +1170,7 @@ public void StreamPings() // Wait a bit longer, to check no more pings happen. await th.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(pingPeriodSeconds * 4), CancellationToken.None)); // Stop subscriber. - await th.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await th.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await th.ConfigureAwait(subTask); var expectedPings = Enumerable.Range(0, pingCount).Select(i => fake.Time0 + TimeSpan.FromSeconds(pingPeriodSeconds * (i + 1))); Assert.Equal(expectedPings, fake.Subscribers[0].StreamPings); @@ -1084,8 +1184,8 @@ public void OrderingKeysManyMsgs( [CombinatorialValues(1, 3, 6, 10, 55)] int orderingKeysCount, [CombinatorialValues(1, 2, 5, 11, 44)] int flowMaxElements, [CombinatorialValues(1, 2, 3, 9)] int threadCount, - [CombinatorialValues(1, 2, 3, 4, 5)] int rndSeed - ) + [CombinatorialValues(1, 2, 3, 4, 5)] int rndSeed, + ShutdownMode shutdownMode) { var rnd = new Random(rndSeed); var msgs = ServerAction.Data(TimeSpan.Zero, Enumerable.Range(0, msgCount).Select(i => $"order{i % orderingKeysCount}|{i}").ToList()); @@ -1099,14 +1199,14 @@ public void OrderingKeysManyMsgs( var startTask = fake.Subscriber.StartAsync(async (msg, ct) => { var delay = TimeSpan.FromMilliseconds(rnd.Next(1000)); - await th.ConfigureAwait(fake.Scheduler.Delay(delay, default)); + await th.ConfigureAwait(fake.Scheduler.Delay(delay, ct)); lock (recvedMsgs) { recvedMsgs.Add(msg.Data.ToStringUtf8()); recvCount += 1; if (recvCount == msgCount) { - var dummyTask = fake.Subscriber.StopAsync(CancellationToken.None); + var dummyTask = fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode }); } } return SubscriberClient.Reply.Ack; @@ -1296,8 +1396,8 @@ public void InvalidParameters() //Assert.Equal("MaxTotalAckExtension", ex9.ParamName); There's a bug in GaxPreconditions.CheckNonNegativeDelay() which uses the wrong paramName } - [Fact] - public void DeliveryAttempt() + [Theory, CombinatorialData] + public void DeliveryAttempt(ShutdownMode shutdownMode) { var msgs = new[] { ServerAction.Data(TimeSpan.Zero, new[] { "m" }, deliveryAttempt: null), @@ -1315,7 +1415,7 @@ public void DeliveryAttempt() return Task.FromResult(SubscriberClient.Reply.Ack); }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(new int?[] { null, 2 }, deliveryAttempts); }); } @@ -1323,7 +1423,7 @@ public void DeliveryAttempt() // Acknowledge / ModifyAcknowledgeDeadline calls may throw RpcException. RpcExceptions should not be thrown to the client. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false, null)] bool? ackOrModifyAck) + public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false, null)] bool? ackOrModifyAck, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1356,7 +1456,7 @@ public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Despite RpcException being thrown, all 4 messages should be handled. Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); }); @@ -1364,7 +1464,7 @@ public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false // If Acknowledge / ModifyAcknowledgeDeadline calls throw exceptions other than RpcExceptions, they should be thrown to the client. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] bool ackOrModifyAck) + public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] bool ackOrModifyAck, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1394,7 +1494,7 @@ public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(exception, ex.AllExceptions().FirstOrDefault()); Assert.Equal(1, fake.ClientShutdowns.Count); }); @@ -1404,7 +1504,7 @@ public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] // In non exactly once delivery, if we use the new SubscriptionHandler to see Ack/NackResponse, // the acknowledgement status should be returned as Success, they are treated as "fire and forget" operations. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_SubscriptionHandler([CombinatorialValues(true, false)] bool ackOrModifyAck) + public void AckModifyAckDeadlineFault_SubscriptionHandler(bool ackOrModifyAck, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1429,14 +1529,14 @@ public void AckModifyAckDeadlineFault_SubscriptionHandler([CombinatorialValues(t { var doneTask = fake.Subscriber.StartAsync(testSubscriptionHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // All the 4 test messages have encountered a recoverable RpcException, but their status should be success. Assert.Equal(4, testSubscriptionHandler.Responses.Count(j => j.Status == AcknowledgementStatus.Success)); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, null)] bool? ackNackOrExtends) + public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, null)] bool? ackNackOrExtends, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1473,14 +1573,14 @@ public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, return ackNackOrExtends == false ? SubscriberClient.Reply.Nack : SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Despite temporary failures, all 4 messages should be handled. Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true, false)] bool ackOrNack) + public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true, false)] bool ackOrNack, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1504,15 +1604,15 @@ public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true { var doneTask = fake.Subscriber.StartAsync(testSubscriptionHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Exception should not be thrown. Assert.Null(ex); Assert.Equal(new[] { "1", "2", "3", "4" }, testSubscriptionHandler.Responses.Where(j => j.Status == AcknowledgementStatus.FailedPrecondition).Select(j => j.MessageId)); }); } - [Fact] - public void ExactlyOnceDelivery_Extends_PermanentFault() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_Extends_PermanentFault(ShutdownMode shutdownMode) { var msgs = new[] { @@ -1538,14 +1638,14 @@ public void ExactlyOnceDelivery_Extends_PermanentFault() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Exception shouldn't be thrown in case of permanent failure. Assert.Null(ex); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, false)] bool ackOrNack) + public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, false)] bool ackOrNack, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1573,7 +1673,7 @@ public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, fa { var doneTask = fake.Subscriber.StartAsync(testHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Null(ex); // "1" is success and "3" is permanent failure. Assert.Equal("1", testHandler.Responses.First(j => j.Status == AcknowledgementStatus.Success).MessageId); @@ -1581,8 +1681,8 @@ public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, fa }); } - [Fact] - public void ExactlyOnceDelivery_Extends_MixedFault() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_Extends_MixedFault(ShutdownMode shutdownMode) { var msgs = new[] { @@ -1609,7 +1709,7 @@ public void ExactlyOnceDelivery_Extends_MixedFault() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Permanent exception shouldn't be thrown. // Extends are not user initiated, so we can't get the success and temporary failed status from the client. Assert.Null(ex); @@ -1617,8 +1717,8 @@ public void ExactlyOnceDelivery_Extends_MixedFault() } // All successful receipt ModAcks. - [Fact] - public void ExactlyOnceDelivery_ReceiptModAck() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_ReceiptModAck(ShutdownMode shutdownMode) { var msgs = new[] { @@ -1645,7 +1745,7 @@ public void ExactlyOnceDelivery_ReceiptModAck() }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // All 4 messages are handled. Assert.Equal(4, handledMsgs.Count); Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); @@ -1653,7 +1753,7 @@ public void ExactlyOnceDelivery_ReceiptModAck() } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(true, false)] bool succeedOnRetry) + public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(true, false)] bool succeedOnRetry, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1685,15 +1785,15 @@ public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(tr }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Permanently failed receipt ModAcks won't be passed to the user handler, so 3 is not handled. // Temporary failed ModAck for message 2 becomes successful or permanent failure based on succeedOnRetry flag. Assert.Equal(succeedOnRetry ? new[] { "1", "2", "4" } : new[] { "1", "4" }, handledMsgs); }); } - [Fact] - public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults(ShutdownMode shutdownMode) { var msgs = new[] { @@ -1736,14 +1836,14 @@ public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults() }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Permanently failed receipt ModAcks won't be passed to the user handler, so all 4 messages are not handled. Assert.Equal(0, handledMsgs.Count); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValues(true, false)] bool succeedOnRetry) + public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValues(true, false)] bool succeedOnRetry, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1787,7 +1887,7 @@ public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValu }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Temporary failed receipt ModAcks can succeed after 1 retry or stay failed, so based on the succeedOnRetry flag, 4 or 0 messages are handled. Assert.Equal(succeedOnRetry ? 4 : 0, handledMsgs.Count); Assert.Equal(succeedOnRetry ? new[] { "1", "2", "3", "4" } : Array.Empty(), handledMsgs); @@ -1908,14 +2008,12 @@ public void StreamingPullRetry_NonRetriableException() var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); var subscriberEx = await Assert.ThrowsAsync(() => subscriberTask); Assert.Equal(rpcEx.Status, subscriberEx.Status); - // We should have failed immediately - but there's a two second delay in SubscriberClient.StopCompletionAsync - // to avoid a race condition. - Assert.Equal(start.AddSeconds(2), fake.Scheduler.Clock.GetCurrentDateTimeUtc()); + Assert.Equal(start, fake.Scheduler.Clock.GetCurrentDateTimeUtc()); }); } - [Fact] - public void StreamingPullRetry_InternalErrorContinuesRetrying() + [Theory, CombinatorialData] + public void StreamingPullRetry_InternalErrorContinuesRetrying(ShutdownMode shutdownMode) { // A regular internal failure that's not due to an auth error. var exception = new RpcException(new Status(StatusCode.Internal, "Bang")); @@ -1927,7 +2025,7 @@ public void StreamingPullRetry_InternalErrorContinuesRetrying() var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await subscriberTask; }); } @@ -1954,8 +2052,8 @@ public void StreamingPullRetry_RetriableErrorEventuallyFails() /// If the streaming pull call fails in MoveNext after a short time (e.g. 10 seconds) /// we should retry with backoff. /// - [Fact] - public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackoff() + [Theory, CombinatorialData] + public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackoff(ShutdownMode shutdownMode) { var exception = new RpcException(new Status(StatusCode.Unavailable, "Stream terminated")); TimeSpan streamDuration = TimeSpan.FromSeconds(30); @@ -1967,7 +2065,7 @@ public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackof var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromMinutes(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await subscriberTask; // Check the pull times indicate a backoff. @@ -1993,8 +2091,8 @@ public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackof /// We *expect* the streaming pull to fail (in MoveNext) after about a minute... we should /// retry immediately each time. /// - [Fact] - public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBackoff() + [Theory, CombinatorialData] + public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBackoff(ShutdownMode shutdownMode) { var exception = new RpcException(new Status(StatusCode.Unavailable, "Stream terminated")); TimeSpan streamDuration = TimeSpan.FromSeconds(60); @@ -2007,7 +2105,7 @@ public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBack var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromMinutes(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await subscriberTask; // Check the pull times indicate no backoff. @@ -2018,48 +2116,230 @@ public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBack } [Fact] - public void NackMessagesOnShutdown() + public void Shutdown_NackImmediately_Success() { - var msgs = new[] { new[] { + var msgs = new[] { ServerAction.Data(TimeSpan.Zero, ["msg0"]), - ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2", "msg3"]), - ServerAction.Data(TimeSpan.Zero, ["msg4", "msg5"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + // flowMaxElements=2: msg1 and msg2 block flow control. + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data == "msg1" || data == "msg2") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(30), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // NackImmediately: Nacks pending messages (msg3, msg4) and messages currently being handled (msg1, msg2). + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately, Timeout = TimeSpan.FromSeconds(60) })); + + Assert.Equivalent(new[] { "msg0" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_CompletesBeforeNack() + { + // Ensure WaitForProcessing allows all received messages to finish if time permits. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data == "msg1" || data == "msg2") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // Stop and wait for processing with a generous timeout. + // It waits for msg1, msg2 to finish and allows msg3, msg4 to be processed. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromSeconds(60) })); + + // All messages should be Acked and none Nacked. + Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Empty(fake.Subscribers[0].Nacks); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_NacksOnTimeout() + { + // Verify that remaining messages are Nacked if the WaitForProcessing timeout is reached. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2", "msg3", "msg4"]), + ServerAction.Data(TimeSpan.FromSeconds(55), ["msg5", "msg6"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var givenToMessageHandler = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + givenToMessageHandler.Locked(() => givenToMessageHandler.Add(data)); + if (data != "msg0") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, givenToMessageHandler, strict: true); + + // Stop with a timeout that expires before processing completes. + // Timeout=50s: NackDelay=20s. At T=5+20=25s, NackImmediately is triggered. + // msg1, msg2 finish at T=20s. msg3, msg4 start at T=20s but are Nacked as NackImmediately is triggered at T=25s. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromSeconds(50) })); + + // Verify the switch to Nacking for the remaining messages and msg5 & msg6 were not read from stream + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_CancellationToken_AbortsWaitForProcessing() + { + // Verify that cancelling the CancellationToken passed to StopAsync + // immediately triggers a Hard Stop, aborting any graceful shutdown. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var givenToMessageHandler = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + givenToMessageHandler.Locked(() => givenToMessageHandler.Add(msg.Data.ToStringUtf8())); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(60), ct)); + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Single(givenToMessageHandler); + + // Request graceful shutdown with a long timeout. + var cts = new CancellationTokenSource(); + var stopTask = fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromHours(1) }, cts.Token); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + + // Cancel the token after 5 seconds of "graceful" shutdown. + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + cts.Cancel(); + + await fake.TaskHelper.ConfigureAwaitHideCancellation(() => stopTask); + + // No ack response will be provided, all work should be dropped as we didn't have a chance to switch to + // NackImmediately + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_NacksWhenTimeoutLessThanMinimum() + { + // Ensure immediate Nacking if the requested timeout is shorter than the 30s grace period. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data != "msg0") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(6), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // Request shutdown with a short timeout. + // Timeout=15s: NackDelay=0s (since 15 < GracePeriod=30). NackImmediately triggered at T=5s. + // msg1, msg2 are already being handled and will be Nacked upon completion. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromSeconds(15) })); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), CancellationToken.None)); + + // All unhandled messages should be Nacked immediately. + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + Assert.Equivalent(new[] { "msg0" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_NackImmediately_LeaseExtensionStops() + { + // Ensure lease extensions stop and current leases are Nacked during immediate shutdown. + var msgs = new[] { new[] { + ServerAction.Data(TimeSpan.Zero, new[] { "msg1" }), ServerAction.Inf() } }; - // Set flowMaxElements to 2 to ensure that "msg0" and "msg2" block flow control preventing - // "msg3","msg4","msg5" from being processed. - using (var fake = Fake.Create(msgs, flowMaxElements: 2, useMsgAsId: true, disposeTimeout: TimeSpan.FromSeconds(10))) + using (var fake = Fake.Create(msgs, ackDeadline: TimeSpan.FromSeconds(30), ackExtendWindow: TimeSpan.FromSeconds(10))) { fake.Scheduler.Run(async () => { - var handledMsgs = new List(); var doneTask = fake.Subscriber.StartAsync(async (msg, ct) => { - var data = msg.Data.ToStringUtf8(); - handledMsgs.Locked(() => handledMsgs.Add(data)); - if (data == "msg0" || data == "msg2") - { - // Delay handling so that StopAsync is called while the rest - // are still pulled but waiting for a flow control slot. - await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), ct)); - } + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(120), ct)); return SubscriberClient.Reply.Ack; }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - // Wait for "msg0" and "msg2" to start being handled. "msg1" will also be handled, but not hold onto - // flow control, releasing it so "msg2" can begin being handled. - await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); - Assert.Equivalent(new [] {"msg0", "msg1", "msg2"}, handledMsgs); + int numExtensionsBeforeShutdown = fake.Subscribers[0].Extends.Count(); - // Stop the subscriber. This should ensure pulled messages that haven't entered the user handler yet - // will be NAck'ed. Specifically for messages already in flow control, they will have to wait for pending - // messages to be processed before they are NAck'ed. - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + // Request immediate shutdown. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately })); + int numExtensionsAfterShutdown = fake.Subscribers[0].Extends.Count(); - // Verify that "msg0", "msg1" and "msg2" completed handling normally, while the rest were - // automatically Nacked during shutdown. - Assert.Equivalent(new [] {"msg0", "msg1", "msg2"}, fake.Subscribers[0].Acks.Select(x => x.Id)); - Assert.Equivalent(new [] {"msg3", "msg4", "msg5"}, fake.Subscribers[0].Nacks.Select(x => x.Id)); + // Verify no more lease extensions occurred after shutdown was initiated. + Assert.Equal(numExtensionsAfterShutdown, numExtensionsBeforeShutdown); }); } } @@ -2073,4 +2353,4 @@ private static IReadOnlyList> CreateBadMoveNextSequenc .Concat(Enumerable.Repeat(ServerAction.Sequence(ServerAction.Inf()), includeTrailing ? 1 : 0)) .ToList(); } -} \ No newline at end of file +} diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.ShutdownOptions.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.ShutdownOptions.cs new file mode 100644 index 000000000000..f86590929f92 --- /dev/null +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.ShutdownOptions.cs @@ -0,0 +1,52 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; + +namespace Google.Cloud.PubSub.V1; + +public abstract partial class SubscriberClient +{ + /// + /// Modes available for subscriber shutdown. + /// + public enum ShutdownMode + { + /// + /// Stops receiving new messages and continues processing all received messages. + /// + WaitForProcessing = 0, + + /// + /// Stops receiving new messages and immediately Nacks all unhandled messages, releasing them for redelivery. + /// + NackImmediately = 1, + } + + /// + /// Settings available for subscriber shutdown. + /// + public sealed class ShutdownOptions + { + /// + /// The to use for shutdown. Defaults to . + /// + public ShutdownMode Mode { get; set; } = ShutdownMode.WaitForProcessing; + + /// + /// The timeout for the shutdown process. If null, a default timeout based on the maximum extension duration is used. + /// + public TimeSpan? Timeout { get; set; } + } +} diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs index cf96fa133751..a35e2f159c7c 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs @@ -217,7 +217,9 @@ public virtual Task StartAsync(FuncCancel this to abort handlers and acknowledgement. /// A that completes when all handled messages have been acknowledged; /// faults on unrecoverable service errors; or cancels if is cancelled. - public virtual Task StopAsync(CancellationToken hardStopToken) => throw new NotImplementedException(); + [Obsolete("Use StopAsync(ShutdownOption, TimeSpan?, CancellationToken) instead.")] + public virtual Task StopAsync(CancellationToken hardStopToken) => + StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing }, hardStopToken); /// /// Stop this . If expires, the @@ -229,7 +231,21 @@ public virtual Task StartAsync(FuncAfter this period, abort handling and acknowledging messages. /// A that completes when all handled messages have been acknowledged; /// faults on unrecoverable service errors; or cancels if expires. - public virtual Task StopAsync(TimeSpan timeout) => StopAsync(new CancellationTokenSource(timeout).Token); + [Obsolete("Use StopAsync(ShutdownOption, TimeSpan?, CancellationToken) instead.")] + public virtual Task StopAsync(TimeSpan timeout) => + StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = timeout }); + + /// + /// Stops this . + /// The returned completes when the shutdown is finished according to the + /// or when the is cancelled. + /// The returned faults if an unrecoverable error occurs in the underlying service. + /// The returned cancels if is cancelled. + /// + /// The to use for shutdown. + /// Cancel this to immediately abort handlers and acknowledgement. + /// A that completes when the subscriber is stopped, or if an unrecoverable error occurs. + public virtual Task StopAsync(ShutdownOptions shutdownOptions, CancellationToken cancellationToken = default) => throw new NotImplementedException(); /// /// Disposes this asynchronously. diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.RequeueableQueue.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.RequeueableQueue.cs index 7e7911573452..64b498332013 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.RequeueableQueue.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.RequeueableQueue.cs @@ -115,5 +115,12 @@ internal bool TryPeek(out T value) value = default(T); return false; } + + internal void Clear() + { + _qs.Clear(); + _q.Clear(); + _requeueCount = 0; + } } } diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs index 280b203de18b..8907960658fb 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs @@ -139,16 +139,18 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) // The keys in the dictionary represents the Ack ID of the message, while value contains the receipt ModAck status. // A value of null indicates that the status is not yet started or in progress. A value of true indicates success, and false indicates failure (which can be temporary or permanent). private readonly ConcurrentDictionary _receiptModAckStatusLookup = new(); - private readonly object _lock = new object(); // For: _ackQueue, _nackQueue, _userHandlerInFlight + private readonly object _lock = new object(); // For: _ackQueue, _nackQueue, _messageSetsExpired, _messageSetsInLeasing. private readonly Action _registerTaskFn; private readonly TaskHelper _taskHelper; private readonly IScheduler _scheduler; private readonly IClock _clock; private readonly SubscriberServiceApiClient _client; private readonly SubscriptionHandler _handler; - private readonly CancellationTokenSource _hardStopCts; - private readonly CancellationTokenSource _pushStopCts; - private readonly CancellationTokenSource _softStopCts; + private readonly CancellationTokenSource _mainCts; // Cancels the main processing loop, immediately stoping all processing. This may drop messages. + private readonly CancellationTokenSource _pushCts; // Cancels all push operations. + private readonly CancellationTokenSource _handlerCts; // Cancels all handling operations, including waiting for the handler and sending new messages to the handler. + private readonly CancellationTokenSource _pullCts; // Cancels the streaming pull, preventing new messages from being consumed. + private readonly SubscriptionName _subscriptionName; private readonly LeaseTiming _normalLeaseTiming; private readonly LeaseTiming _exactlyOnceDeliveryLeaseTiming; @@ -166,9 +168,10 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) private readonly RequeueableQueue _ackQueue = new RequeueableQueue(); private readonly RequeueableQueue _nackQueue = new RequeueableQueue(); - private int _pushInFlight = 0; - private int _userHandlerInFlight = 0; + private readonly HashSet> _messageSetsExpired = new HashSet>(); + private readonly HashSet> _messageSetsInLeasing = new HashSet>(); private SubscriberServiceApiClient.StreamingPullStream _pull = null; + private int _pushInFlight = 0; private int _concurrentPushCount = 0; private bool _pullComplete = false; private bool _receiptModAckForExactlyOnceDelivery = false; // True if the lease is being extended for the messages for very first time in an exactly-once delivery flow; otherwise false. @@ -193,9 +196,16 @@ internal SingleChannel(SubscriberClientImpl subscriber, _client = client; _clientIndex = clientIndex; _handler = handler; - _hardStopCts = subscriber._globalHardStopCts; - _pushStopCts = CancellationTokenSource.CreateLinkedTokenSource(_hardStopCts.Token); - _softStopCts = subscriber._globalSoftStopCts; + + // Arrange cancellation token sources for shutdown + var hardstopToken = subscriber._globalHardStopCts.Token; + var nackImmediatelyToken = subscriber._globalNackImmediatelyCts.Token; + var waitForProcessingToken = subscriber._globalWaitForProcessingCts.Token; + _mainCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken); + _pushCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken); + _handlerCts = CancellationTokenSource.CreateLinkedTokenSource(nackImmediatelyToken, hardstopToken); + _pullCts = CancellationTokenSource.CreateLinkedTokenSource(waitForProcessingToken, nackImmediatelyToken, hardstopToken); + _subscriptionName = subscriber.SubscriptionName; _normalLeaseTiming = subscriber._normalLeaseTiming; _exactlyOnceDeliveryLeaseTiming = subscriber._exactlyOnceDeliveryLeaseTiming; @@ -220,16 +230,18 @@ internal async Task StartAsync() HandlePush(); // Start stream-keep-alive ping HandleStreamPing(); + // On NackInmediately we nack everything that we have client side. + using var _ = NackOnShutdownRegistration(); // Start event loop. // This loop exits by an action throwing a error or cancellation exception. while (!(_pullComplete && IsPushComplete())) { // Wait for, then process next continuation. TaskNextAction nextContinuation = await _taskHelper.ConfigureAwait(_continuationQueue.DequeueAsync()); - // On hardstop just immediately stop this event loop. + // Once _mainCts is cancelled just immediately stop this event loop. // The registered-task code ensures that all currently-active tasks finish before // return to user code. - if (_hardStopCts.IsCancellationRequested) + if (_mainCts.IsCancellationRequested) { StopStreamingPull(); throw new OperationCanceledException(); @@ -250,8 +262,11 @@ internal async Task StartAsync() } } _logger?.LogDebug("Subscriber task completed."); - // Stop waiting for data to push. - _pushStopCts.Cancel(); + // If we are here, we shutdown gracefully, but there might still be some registered tasks + // waiting to be triggered. + // Let's explicitly cancel everything. + _handlerCts.Cancel(); + _pushCts.Cancel(); } private LeaseTiming EffectiveLeaseTiming => _exactlyOnceDeliveryEnabled ? _exactlyOnceDeliveryLeaseTiming : _normalLeaseTiming; @@ -259,10 +274,14 @@ internal async Task StartAsync() private bool IsPushComplete() { // extend-queue not included, as these have no effect after shutdown. - // Lock required for ackQueue and nackQueue. + // Lock required for ackQueue and nackQueue and the message sets. lock (_lock) { - return _ackQueue.Count == 0 && _nackQueue.Count == 0 && _pushInFlight == 0 && _userHandlerInFlight == 0; + return _ackQueue.Count == 0 && + _nackQueue.Count == 0 && + _pushInFlight == 0 && + _messageSetsInLeasing.Count == 0 && + _messageSetsExpired.Count == 0; } } @@ -272,8 +291,13 @@ private void Add(Task task, NextAction next) { _registerTaskFn(task); var taskNext = new TaskNextAction(task, next); - task.ContinueWith(_ => _continuationQueue.Enqueue(taskNext), - CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler); + // We create the continuation on a new task to avoid the continuation to be executed on this thread + // if `task` is already completed. Note that although the continuation "only equeues", if there's + // a task (i.e. the main loop) waiting to dequeue, the dequeu would happen synchronously after the + // synchronous enqueue as well as the execution of the next task. + _taskHelper.Run(() => + task.ContinueWith(_ => _continuationQueue.Enqueue(taskNext), + CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler)); } private void StopStreamingPull() @@ -284,7 +308,7 @@ private void StopStreamingPull() // Ignore all errors; the stream may be in any state. try { - Add(_pull.WriteCompleteAsync(), Next(false, () => + Add(pullToDispose.WriteCompleteAsync(), Next(false, () => { try { @@ -313,7 +337,7 @@ private void StartStreamingPull() { // Delay, then start the streaming-pull. _logger?.LogDebug("Client {index} delaying for {seconds}s before streaming pull call.", _clientIndex, (int) backoff.TotalSeconds); - Task delayTask = _scheduler.Delay(backoff, _softStopCts.Token); + Task delayTask = _scheduler.Delay(backoff, _pullCts.Token); Add(delayTask, Next(true, HandleStartStreamingPullWithoutBackoff)); } else @@ -327,9 +351,9 @@ private void StartStreamingPull() private void HandleStartStreamingPullWithoutBackoff() { _retryState.OnStartAttempt(); - _pull = _client.StreamingPull(CallSettings.FromCancellationToken(_softStopCts.Token)); + _pull = _client.StreamingPull(CallSettings.FromCancellationToken(_pullCts.Token)); // Cancellation not needed in this WriteAsync call. The StreamingPull() cancellation - // (above) will cause this call to cancel if _softStopCts is cancelled. + // (above) will cause this call to cancel if _pullCts is cancelled. Task initTask = _pull.WriteAsync(new StreamingPullRequest { SubscriptionAsSubscriptionName = _subscriptionName, @@ -391,7 +415,7 @@ private void HandlePullMoveNext(Task initTask) if (throttle) { // Too many queued ack/nack/extend ids. Loop until the queue has drained a bit. - Add(_scheduler.Delay(TimeSpan.FromMilliseconds(100), _softStopCts.Token), Next(true, () => HandlePullMoveNext(null))); + Add(_scheduler.Delay(TimeSpan.FromMilliseconds(100), _pullCts.Token), Next(true, () => HandlePullMoveNext(null))); } else { @@ -446,8 +470,8 @@ private void HandlePullMessageData(Task moveNextTask) // However, temporary failures are retried for up to three times and may eventaully succeed, result in permanent failure, or remain as temporary failure. // Therefore, we must wait for all receipt ModAck responses to complete to obtain the final status. // Then, the messages with successful receipt ModAcks are sent to the user, while those with failed ModAcks are removed from further processing. - Add(_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_softStopCts.Token) - .ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler) + Add(_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_handlerCts.Token) + .ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds), _handlerCts.Token, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler) , Next(true, () => HandlePullMoveNext(null))); } else @@ -475,8 +499,8 @@ private void HandlePullMessageData(Task moveNextTask) Task ProcessSuccessfulMessages(List messages, HashSet messageIds) { // Remove all Ack IDs that failed to extend successfully. - // The value of false in the dictionary represents the failure. - var failures = new HashSet(_receiptModAckStatusLookup.Where(keyValue => keyValue.Value == false).Select(keyValue => keyValue.Key)); + // Everything that's not true is considered to have failed. + var failures = new HashSet(_receiptModAckStatusLookup.Where(keyValue => keyValue.Value != true).Select(keyValue => keyValue.Key)); if (failures.Any()) { // Remove failed messages. @@ -501,68 +525,56 @@ private async Task ProcessPullMessagesAsync(List msgs, HashSet< // Running async. Common data needs locking for (int msgIndex = 0; msgIndex < msgs.Count; msgIndex++) { - if (_softStopCts.IsCancellationRequested) - { - // If the subscriber was shutdown we should stop processing and nack remaining messages, releasing - // the message for re-delivery. - var remainingAckIds = msgs.Skip(msgIndex).Select(x => x.AckId); - Nack(remainingAckIds); - _softStopCts.Token.ThrowIfCancellationRequested(); - } + // If handler operations are cancelled,throw. + _handlerCts.Token.ThrowIfCancellationRequested(); var msg = msgs[msgIndex]; msgs[msgIndex] = null; // Prepare to call user message handler, _flow.Process(...) enforces the user-handler concurrency constraints. await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), _messageOrderingEnabled ? msg.Message.OrderingKey ?? "" : "", async () => { - // Running async. Common data needs locking - lock (_lock) - { - _userHandlerInFlight += 1; - } + // If handler operations are cancelled,throw. + // We do it here also because we may have been delayed by flow control. + _handlerCts.Token.ThrowIfCancellationRequested(); if (msg.DeliveryAttempt > 0) { msg.Message.Attributes[DeliveryAttemptAttrKey] = msg.DeliveryAttempt.ToString(CultureInfo.InvariantCulture); } // Call user message handler - var reply = await _taskHelper.ConfigureAwaitHideErrors(() => - { - // If the subscriber shut down while waiting for flow control, skip the handler. - // Throwing here triggers a Nack, releasing the message for redelivery. - _softStopCts.Token.ThrowIfCancellationRequested(); - return _handler.HandleMessage(msg.Message, _hardStopCts.Token); - }, Reply.Nack); - - // Lock msgsIds, this is accessed concurrently here and in HandleExtendLease(). + var reply = await _taskHelper.ConfigureAwaitHideErrors( + () => _handler.HandleMessage(msg.Message, _handlerCts.Token), + Reply.Nack); + + // On shutdown we nack everything that we are tracking in either `_messageSetsInLeasing` or + // `_messageSetsExpired` and at that point we clear the nacked `leaseTracking` sets. + // If this message was still on its `leaseTracking` we are not on shutdown and we + // forward the handler's response to the service. Otherwise the message was already nacked. + bool wasInLeaseTracking; + // Lock leaseTracking, this is accessed concurrently here and in HandleExtendLease() and on Shutdown. lock (leaseTracking) { - leaseTracking.Remove(msg.AckId); + wasInLeaseTracking = leaseTracking.Remove(msg.AckId); + if (leaseTracking.Count == 0) + { + lock (_lock) + { + _messageSetsInLeasing.Remove(leaseTracking); + _messageSetsExpired.Remove(leaseTracking); + } + } } - // Lock ack/nack-queues, this is accessed concurrently here and in "master" thread. - lock (_lock) + if (wasInLeaseTracking) { - _userHandlerInFlight -= 1; - var queue = reply == Reply.Ack ? _ackQueue : _nackQueue; - queue.Enqueue(msg.AckId); + // Lock ack/nack-queues, this is accessed concurrently here and in "master" thread. + lock (_lock) + { + var queue = reply == Reply.Ack ? _ackQueue : _nackQueue; + queue.Enqueue(msg.AckId); + } + _eventPush.Set(); } - // Ids have been added to ack/nack-queue, so trigger a push. - _eventPush.Set(); })); } - - void Nack(IEnumerable ackIds) - { - lock (_lock) - { - _nackQueue.Enqueue(ackIds); - } - lock (leaseTracking) - { - leaseTracking.ExceptWith(ackIds); - } - // Ids have been added to nack-queue, so trigger a push. - _eventPush.Set(); - } } private class LeaseCancellation @@ -581,8 +593,8 @@ public CancellationToken Token } } - public LeaseCancellation(CancellationTokenSource softStopCts) => - _cts = CancellationTokenSource.CreateLinkedTokenSource(softStopCts.Token); + public LeaseCancellation(CancellationTokenSource parentCts) => + _cts = CancellationTokenSource.CreateLinkedTokenSource(parentCts.Token); public void Dispose() { @@ -617,43 +629,75 @@ public void Cancel() } } - private void HandleExtendLease(HashSet msgIds, LeaseCancellation cancellation) + private void HandleExtendLease(HashSet leaseTracking, LeaseCancellation cancellation) { - if (_softStopCts.IsCancellationRequested) - { - // No further lease extensions once stop is requested. - return; - } // The first call to this method happens as soon as messages in this chunk start to be processed. // This triggers the server to start its lease timer. if (cancellation == null) { - // Create a task to cancel lease-extension once `_maxExtensionDuration` has been reached. // This set up once for each chunk of received messages, and passed through to each future call to this method. - cancellation = new LeaseCancellation(_softStopCts); - Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () => + cancellation = new LeaseCancellation(_handlerCts); + // Let's track these messages as being in leasing. + lock (_lock) + { + _messageSetsInLeasing.Add(leaseTracking); + } + // But, if these messages came in and we are alreading stopping handler activities set them up to be nacked and dispose + // of the cancellation so they are not leased. + if (_handlerCts.IsCancellationRequested) { - // This is executed when `_maxExtensionDuration` has expired, or when `cancellation` is cancelled, - // Which ensures `cancellation` is aways disposed of. + // Note we setup this once on startup and for most cases that should be enough. + // But because cancellation of all tokens is not atomic there's a small chance that + // a batch of pulled messages comes through right after cancellation has started + // and after the nack on shutdown has been executed. So we need to setup nack on shutdown again + // to make certain these are nacked as well. + // Note we can dispose of the registration inmediately here because the token is already cancelled, + // so the registered callback will execute inmidiately and synchronously. + using var _ = NackOnShutdownRegistration(); cancellation.Dispose(); - lock (msgIds) + } + else + { + // Create a task to cancel lease-extension once `_maxExtensionDuration` has been reached or we have stopped + // all handler activities. + Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () => { - msgIds.Clear(); - } - })); + // This is executed when `_maxExtensionDuration` has expired, or when `cancellation` is cancelled, + // Which ensures `cancellation` is aways disposed of. + cancellation.Dispose(); + // If we are here because of `_maxExtensionDuration` has expired, + // move these messages to _maxLeaseExpire. + // Otherwise we are in shutdown mode, the messages won't be leased again and will be nacked elsewhere. + if (!cancellation.Token.IsCancellationRequested) + { + lock (leaseTracking) + { + lock (_lock) + { + _messageSetsInLeasing.Remove(leaseTracking); + if (leaseTracking.Count > 0) + { + _messageSetsExpired.Add(leaseTracking); + } + } + } + + } + })); + } } - if (!cancellation.IsDisposed) + if (!cancellation.IsDisposed && !cancellation.Token.IsCancellationRequested) { - // If `_maxExtensionDuration` has not expired, then schedule a further lease extension. + // If leasing hasn't been cancelled , then schedule a further lease extension. bool anyMsgIds; - lock (msgIds) + lock (leaseTracking) { - anyMsgIds = msgIds.Count > 0; + anyMsgIds = leaseTracking.Count > 0; if (anyMsgIds) { lock (_lock) { - _extendQueue.Enqueue(msgIds.Select(x => new TimedId(_extendThrottleHigh + 1, x))); + _extendQueue.Enqueue(leaseTracking.Select(x => new TimedId(_extendThrottleHigh + 1, x))); } } } @@ -663,31 +707,48 @@ private void HandleExtendLease(HashSet msgIds, LeaseCancellation cancell _eventPush.Set(); // Some ids still exist, schedule another extension. // The overall `_maxExtensionDuration` is maintained by passing through the existing `cancellation`. - Add(_scheduler.Delay(EffectiveLeaseTiming.AutoExtendDelay, _softStopCts.Token), Next(false, () => HandleExtendLease(msgIds, cancellation))); + Add(_scheduler.Delay(EffectiveLeaseTiming.AutoExtendDelay, cancellation.Token), Next(false, () => HandleExtendLease(leaseTracking, cancellation))); // Increment _extendThrottles. _extendThrottleHigh += 1; - Add(_scheduler.Delay(EffectiveLeaseTiming.ExtendQueueThrottleInterval, _softStopCts.Token), Next(false, () => _extendThrottleLow += 1)); + Add(_scheduler.Delay(EffectiveLeaseTiming.ExtendQueueThrottleInterval, _handlerCts.Token), Next(false, () => _extendThrottleLow += 1)); } else { // All messages have been handled in this chunk, so cancel the max-lease-time monitoring. // This will also cause `cancellation` to be disposed in the anonymous function above. cancellation.Cancel(); + // Note that we don't remove the empty leasing collection from _messageSetsInLeasing because + // that's done in when the leasing collection becomes empty after the user handler returns for + // the last message in the chunk. } } } private void HandlePush() { - // Always re-listen for push events. - Add(_eventPush.WaitAsync(_pushStopCts.Token), Next(false, HandlePush)); + // Re-listen for push events if push operations are not cancelled yet. + if (!_pushCts.IsCancellationRequested) + { + Add(_eventPush.WaitAsync(_pushCts.Token), Next(false, HandlePush)); + } // Send data, if there is any to send. StartPush(); } private void StartPush() { - // Send data, if there is any to send. + // If push has been cancelled, let's just clear the queues. + if (_pushCts.IsCancellationRequested) + { + lock (_lock) + { + _ackQueue.Clear(); + _nackQueue.Clear(); + _extendQueue.Clear(); + } + return; + } + if (_concurrentPushCount >= _maxConcurrentPush) { // Too many existing concurrent pushes; do nothing. @@ -705,6 +766,12 @@ private void StartPush() nacks = _nackQueue.Dequeue(_maxAckExtendSendCount, null); var ackSet = new HashSet(acks); var nackSet = new HashSet(nacks); + // If user handler operations have been cancelled, let's clear the extend queue. + // But acks and nacks continue to be sent as usual. + if (_handlerCts.IsCancellationRequested) + { + _extendQueue.Clear(); + } // Only send extends for ids that aren't also about to ack or nack. extends = _extendQueue.Dequeue(_maxAckExtendSendCount, x => !ackSet.Contains(x.Id) && !nackSet.Contains(x.Id)); } @@ -712,21 +779,21 @@ private void StartPush() { _pushInFlight += acks.Count; _concurrentPushCount += 1; - Task ackTask = _client.AcknowledgeAsync(_subscriptionName, acks, _hardStopCts.Token); + Task ackTask = _client.AcknowledgeAsync(_subscriptionName, acks, _pushCts.Token); Add(ackTask, Next(false, () => HandleAckResponse(ackTask, acks, null, null))); } if (extends.Count > 0) { _pushInFlight += extends.Count; _concurrentPushCount += 1; - Task extendTask = _client.ModifyAckDeadlineAsync(_subscriptionName, extends.Select(x => x.Id), EffectiveLeaseTiming.AckDeadlineSeconds, _hardStopCts.Token); + Task extendTask = _client.ModifyAckDeadlineAsync(_subscriptionName, extends.Select(x => x.Id), EffectiveLeaseTiming.AckDeadlineSeconds, _handlerCts.Token); Add(extendTask, Next(false, () => HandleAckResponse(extendTask, null, null, extends))); } if (nacks.Count > 0) { _pushInFlight += nacks.Count; _concurrentPushCount += 1; - Task nackTask = _client.ModifyAckDeadlineAsync(_subscriptionName, nacks, 0, _hardStopCts.Token); + Task nackTask = _client.ModifyAckDeadlineAsync(_subscriptionName, nacks, 0, _pushCts.Token); Add(nackTask, Next(false, () => HandleAckResponse(nackTask, null, nacks, null))); } } @@ -929,10 +996,10 @@ void RetryTemporaryFailures( { var backoff = retryGroup.Key ?? TimeSpan.Zero; var retryIds = retryGroup.Select(j => j.id); - Task delayTask = _scheduler.Delay(backoff, _softStopCts.Token); + Task delayTask = _scheduler.Delay(backoff, hasAcksOrNacks ? _pushCts.Token : _handlerCts.Token); Add(delayTask, new NextAction(false, hasAcksOrNacks ? () => { ackActionToRetry(retryIds); StartPush(); } - : () => { extendActionToRetry(extendIds.Where(j => retryIds.Contains(j.Id))); StartPush(); })); + : () => { extendActionToRetry(extendIds.Where(j => retryIds.Contains(j.Id))); StartPush(); })); } } @@ -1005,13 +1072,15 @@ private void HandleAckResponse(Task writeTask, List ackIds, List var ackNackResponses = ids.Count > 0 ? GetAckNackResponses(ids, writeTask) : new List(); - if (hasAckIds) + // On shutdown, we only send ack responses if we are on EOD. + if (hasAckIds && (_exactlyOnceDeliveryEnabled || !_handlerCts.IsCancellationRequested)) { // Invoke the handler for ACKs. // Any uncaught exception in the handler will terminate the client. _handler.HandleAckResponses(ackNackResponses); } - else if (hasNackIds) + // We only send nack responses if we are not on shutdown. + else if (hasNackIds && !_handlerCts.IsCancellationRequested) { // Invoke the handler for NACKs. // Any uncaught exception in the handler will terminate the client. @@ -1092,13 +1161,13 @@ private void HandleStreamPing() { // Need to explicitly check this, as the continuation passed to Add() may be executed // regardless of the fault/cancellation state of the Task. - if (_softStopCts.IsCancellationRequested) + if (_pullCts.IsCancellationRequested) { // No more pings when subscriber stopping. return; } // Schedule next ping, this never stops whilst this subscriber as active - Add(_scheduler.Delay(s_streamPingPeriod, _softStopCts.Token), Next(false, HandleStreamPing)); + Add(_scheduler.Delay(s_streamPingPeriod, _pullCts.Token), Next(false, HandleStreamPing)); // If messages are currently being processed, then ping the stream periodically; // this ensures the stream isn't closed. // If the stream is closed, then all gRPC-buffered messages have their server-side @@ -1120,6 +1189,45 @@ private void HandleStreamPing() } } + private CancellationTokenRegistration NackOnShutdownRegistration() + { + return _handlerCts.Token.Register(() => Add(Task.CompletedTask, Next(false, NackAll))); + + void NackAll() + { + HashSet> messageSetsToNack; + lock (_lock) + { + messageSetsToNack = new HashSet>(_messageSetsInLeasing.Union(_messageSetsExpired)); + _messageSetsInLeasing.Clear(); + _messageSetsExpired.Clear(); + } + + HashSet idsToNack = new HashSet(); + foreach (var leasingSet in messageSetsToNack) + { + lock (leasingSet) + { + if (leasingSet.Count > 0) + { + idsToNack.UnionWith(leasingSet); + leasingSet.Clear(); + } + } + } + + // If we need to nack anything, do so. + if (idsToNack?.Count > 0) + { + lock (_lock) + { + _nackQueue.Enqueue(idsToNack); + } + _eventPush.Set(); + } + } + } + /// /// Updates the receipt ModAck status for the specified Ack IDs. /// @@ -1287,5 +1395,6 @@ internal void OnServerDisconnect() Backoff = _disconnectBackoff; } } + } } diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs index 9ff2e30d9c8f..0e0b61a41f2d 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs @@ -36,8 +36,10 @@ namespace Google.Cloud.PubSub.V1; public sealed partial class SubscriberClientImpl : SubscriberClient { // TODO: Logging - internal const string DeliveryAttemptAttrKey = "googclient_deliveryattempt"; + // If shutting down with WaitForProcessing we automatically switch to NackImmediately + // this time before the hard stop triggers. + private static readonly TimeSpan s_nackImmediatelyBeforeHardStopWindow = TimeSpan.FromSeconds(30); /// /// Instantiate a associated with the specified . @@ -70,7 +72,11 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable _mainTcs; - private CancellationTokenSource _globalSoftStopCts; // soft-stop is guarenteed to occur before hard-stop. - private CancellationTokenSource _globalHardStopCts; + private CancellationTokenSource _globalHardStopCts; // Immediately stops all processing, dropping any unhandled messages. + private CancellationTokenSource _globalNackImmediatelyCts; + private CancellationTokenSource _globalWaitForProcessingCts; + + // A boolean that is true if stop has been initiated with the StopAsync methods. + private bool IsStopStarted => _globalHardStopCts.IsCancellationRequested || + _globalNackImmediatelyCts.IsCancellationRequested || + _globalWaitForProcessingCts.IsCancellationRequested; // This property only exists for testing. // This is the delay between obtaining a lease on a message and then further extending the lease on that message @@ -120,8 +132,9 @@ public override Task StartAsync(SubscriptionHandler handler) { GaxPreconditions.CheckState(_mainTcs == null, "Can only start an instance once."); _mainTcs = new TaskCompletionSource(); - _globalSoftStopCts = new CancellationTokenSource(); _globalHardStopCts = new CancellationTokenSource(); + _globalNackImmediatelyCts = new CancellationTokenSource(); + _globalWaitForProcessingCts = new CancellationTokenSource(); } var registeredTasks = new HashSet(); Action registerTask = task => @@ -139,7 +152,7 @@ public override Task StartAsync(SubscriptionHandler handler) return _taskHelper.Run(() => singleChannel.StartAsync()); }).ToArray(); // Set up finish task; code that executes when this subscriber is being shutdown (for whatever reason). - _taskHelper.Run(async () => await StopCompletionAsync(subscriberTasks, registeredTasks)); + _taskHelper.Run(() => StopCompletionAsync(subscriberTasks, registeredTasks)); return _mainTcs.Task; } @@ -149,7 +162,6 @@ private async Task StopCompletionAsync(Task[] subscriberTasks, HashSet reg var task = await _taskHelper.ConfigureAwait(_taskHelper.WhenAny(subscriberTasks)); if (task.IsFaulted) { - _globalSoftStopCts.Cancel(); _globalHardStopCts.Cancel(); } // Wait for all subscribers to stop @@ -160,10 +172,6 @@ await _taskHelper.ConfigureAwaitHideErrors( // Call shutdown function if (_shutdown != null) { - // TODO: Remove this 2 second delay. - // This is a temporary patch to avoid race condition between gRPC call cancellation and channel dispose. - // Please see https://github.com/grpc/grpc-dotnet/issues/2119 for the deadlock issue in Grpc.Net.Client. - await _scheduler.Delay(TimeSpan.FromSeconds(2), CancellationToken.None); await _taskHelper.ConfigureAwaitHideErrors(_shutdown); } // Return final result @@ -194,29 +202,96 @@ public override ValueTask DisposeAsync() return new ValueTask(Task.CompletedTask); } } - return new ValueTask(StopAsync(_disposeTimeout)); + return new ValueTask(StopAsync(_disposeShutdownOptions)); } /// - public override Task StopAsync(CancellationToken hardStopToken) + public override Task StopAsync(ShutdownOptions shutdownOptions, CancellationToken cancellationToken = default) { + GaxPreconditions.CheckNotNull(shutdownOptions, nameof(shutdownOptions)); + + TimeSpan hardStopIn; + TimeSpan nackImmediatelyIn; + bool hardStop = false; + bool nackImmediately = false; + CancellationTokenSource tokenToCancel; lock (_lock) { - // Note: If multiple stop requests are made, only the first cancellation token is observed. - if (_mainTcs is not null && _globalSoftStopCts.IsCancellationRequested) + GaxPreconditions.CheckState(_mainTcs != null, "Can only stop a started instance."); + // Only the first call to StopAsync is observed + if (IsStopStarted) { // No-op. We don't want to throw exceptions if DisposeAsync or StopAsync is called a second time. return _mainTcs.Task; } - GaxPreconditions.CheckState(_mainTcs != null, "Can only stop a started instance."); - _globalSoftStopCts.Cancel(); + + hardStopIn = shutdownOptions.Timeout ?? _maxExtensionDuration; + nackImmediatelyIn = hardStopIn - s_nackImmediatelyBeforeHardStopWindow; + + if (hardStopIn <= TimeSpan.Zero || cancellationToken.IsCancellationRequested) + { + tokenToCancel = _globalHardStopCts; + hardStop = true; + } + else if (nackImmediatelyIn <= TimeSpan.Zero || shutdownOptions.Mode == ShutdownMode.NackImmediately) + { + tokenToCancel = _globalNackImmediatelyCts; + nackImmediately = true; + } + else + { + tokenToCancel = _globalWaitForProcessingCts; + } + } + // Cancel outside the lock because callbacks may execute synchronously. + tokenToCancel.Cancel(); + + if (!hardStop) + { + // If this is not a hard stop we trigger hard stop, eventually + CancelTargetOnTrigger(targetSourceToCancel: _globalHardStopCts, triggerToken: cancellationToken); + CancelAfterDelay(_globalHardStopCts, hardStopIn); + if (!nackImmediately) + { + // If this is neither hard stop nor nack immediately, we are in wait for processing, + // we need to trigger nack immediately, eventually. + CancelAfterDelay(_globalNackImmediatelyCts, nackImmediatelyIn); + } + } + // In case we are in an other than expected shutdown mode, let's log. + if (hardStop) + { + Logger?.LogWarning("Shutdown timeout is {ShutdownTimeout}. " + + "Can't fulfill request for shutdown mode {ShutdownMode}. " + + "Shutting down immediately.", hardStopIn, shutdownOptions.Mode); + } + else if (nackImmediately && shutdownOptions.Mode != ShutdownMode.NackImmediately) + { + Logger?.LogWarning("Shutdown timeout is {ShutdownTimeout}. " + + "Not enough time to fulfill request for shutdown mode {ShutdownMode} " + + "and {NackImmediately} grace period before shutdown of {GracePeriod} " + + "Going to {ActualMode} shutdown mode immediately.", + hardStopIn, shutdownOptions.Mode, ShutdownMode.NackImmediately, s_nackImmediatelyBeforeHardStopWindow, ShutdownMode.NackImmediately); } - var registration = hardStopToken.Register(() => _globalHardStopCts.Cancel()); - // Do not register this Task to be awaited on at shutdown. - // It completes *after* _mainTcs, and all registered tasks must complete before _mainTcs - _taskHelper.Run(async () => - await _taskHelper.ConfigureAwaitWithFinally(() => _mainTcs.Task, () => registration.Dispose())); return _mainTcs.Task; + + void CancelTargetOnTrigger(CancellationTokenSource targetSourceToCancel, CancellationToken triggerToken) + { + var registration = triggerToken.Register(() => targetSourceToCancel.Cancel()); + // Do not register this Task to be awaited on at shutdown. + // It completes *after* _mainTcs, and all registered tasks must complete before _mainTcs + _taskHelper.Run(async () => + await _taskHelper.ConfigureAwaitWithFinally(() => _mainTcs.Task, () => registration.Dispose())); + } + + // Triggers cancellation after provided delay. This is used instead of .CancelAfter to integrate with + // IScheduler, allowing this shutdown method to be tested. + void CancelAfterDelay(CancellationTokenSource cts, TimeSpan delay) => + _taskHelper.Run(async () => + { + await _taskHelper.ConfigureAwait(_scheduler.Delay(delay, CancellationToken.None)); + cts.Cancel(); + }); } } diff --git a/apis/Google.Cloud.PubSub.V1/docs/index.md b/apis/Google.Cloud.PubSub.V1/docs/index.md index b50e41fb7f69..83877dd92ad1 100644 --- a/apis/Google.Cloud.PubSub.V1/docs/index.md +++ b/apis/Google.Cloud.PubSub.V1/docs/index.md @@ -172,6 +172,15 @@ Below is an example implementation of a console application that utilizes the de {{sample:SubscriberClient.UseSubscriberServiceInConsoleApp}} +## Subscriber shutdown + +When shutting down a `SubscriberClient`, two different shutdown flows are available via the `StopAsync(ShutdownOptions, CancellationToken)` method: + +- **NackImmediately**: This immediately stops streaming new messages and then actively sends "Nack" (Negative Acknowledgement) responses for any messages that have been received but have not yet finished being handled. This allows those messages to be quickly redelivered to other active subscribers. +- **WaitForProcessing**: This immediately stops streaming new messages from the server but continues to process all messages that have already been received. If processing does not complete 30s before the specified timeout, the client will switch to NackImmediately to release any remaining messages. + +By default, a 1-hour timeout is used for the shutdown process, which can be customized as needed. When the timeout is reached or if the `CancellationToken` is invoked this will trigger an immediate hard stop, aborting all outstanding tasks. + ## Disposing of the publisher and subscriber clients Both `PublisherClient` and `SubscriberClient` implement the `IAsyncDisposable` interface,