From de436974a96386112acae320ba641e1ae790c5a4 Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Wed, 18 Mar 2026 21:50:41 +0000 Subject: [PATCH 1/5] refactor(PubSub): Rename fields/properties and encapsulate stop cancellation token sources. --- .../SubscriberClientImpl.SingleChannel.cs | 175 +++++++++++------- .../SubscriberClientImpl.cs | 14 +- 2 files changed, 115 insertions(+), 74 deletions(-) diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs index 280b203de18b..6b2f30870ec7 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs @@ -139,16 +139,18 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) // The keys in the dictionary represents the Ack ID of the message, while value contains the receipt ModAck status. // A value of null indicates that the status is not yet started or in progress. A value of true indicates success, and false indicates failure (which can be temporary or permanent). private readonly ConcurrentDictionary _receiptModAckStatusLookup = new(); - private readonly object _lock = new object(); // For: _ackQueue, _nackQueue, _userHandlerInFlight + private readonly object _lock = new object(); // For: _ackQueue, _nackQueue, _messagesInFlight private readonly Action _registerTaskFn; private readonly TaskHelper _taskHelper; private readonly IScheduler _scheduler; private readonly IClock _clock; private readonly SubscriberServiceApiClient _client; private readonly SubscriptionHandler _handler; - private readonly CancellationTokenSource _hardStopCts; - private readonly CancellationTokenSource _pushStopCts; - private readonly CancellationTokenSource _softStopCts; + private readonly CancellationTokenSource _mainCts; // Cancels the main processing loop, immediately stoping all processing. This may drop messages. + private readonly CancellationTokenSource _pullCts; // Cancels the streaming pull, preventing new messages from being consumed. + private readonly CancellationTokenSource _ackNackCts; // Cancels all ACK/NACK pushes to the Pub/Sub service. + private readonly CancellationTokenSource _sendToHandlerCts; // Cancels sending new messages to be handled. + private readonly CancellationTokenSource _waitForHandlerCts; // Cancels all message handler operations. private readonly SubscriptionName _subscriptionName; private readonly LeaseTiming _normalLeaseTiming; private readonly LeaseTiming _exactlyOnceDeliveryLeaseTiming; @@ -167,7 +169,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) private readonly RequeueableQueue _nackQueue = new RequeueableQueue(); private int _pushInFlight = 0; - private int _userHandlerInFlight = 0; + private readonly HashSet _messagesInFlight = new HashSet(); private SubscriberServiceApiClient.StreamingPullStream _pull = null; private int _concurrentPushCount = 0; private bool _pullComplete = false; @@ -193,9 +195,16 @@ internal SingleChannel(SubscriberClientImpl subscriber, _client = client; _clientIndex = clientIndex; _handler = handler; - _hardStopCts = subscriber._globalHardStopCts; - _pushStopCts = CancellationTokenSource.CreateLinkedTokenSource(_hardStopCts.Token); - _softStopCts = subscriber._globalSoftStopCts; + + // Arrange cancellation token sources for shutdown + var nackAndWaitToken = subscriber._globalNackAndWaitCts.Token; + var hardstopToken = subscriber._globalHardStopCts.Token; + _mainCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken); + _ackNackCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken); + _pullCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken, nackAndWaitToken); + _sendToHandlerCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken, nackAndWaitToken); // This is guaranteed to be cancelled before _waitForHandlerCts + _waitForHandlerCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken); + _subscriptionName = subscriber.SubscriptionName; _normalLeaseTiming = subscriber._normalLeaseTiming; _exactlyOnceDeliveryLeaseTiming = subscriber._exactlyOnceDeliveryLeaseTiming; @@ -226,10 +235,10 @@ internal async Task StartAsync() { // Wait for, then process next continuation. TaskNextAction nextContinuation = await _taskHelper.ConfigureAwait(_continuationQueue.DequeueAsync()); - // On hardstop just immediately stop this event loop. + // Once _mainCts is cancelled just immediately stop this event loop. // The registered-task code ensures that all currently-active tasks finish before // return to user code. - if (_hardStopCts.IsCancellationRequested) + if (_mainCts.IsCancellationRequested) { StopStreamingPull(); throw new OperationCanceledException(); @@ -250,8 +259,9 @@ internal async Task StartAsync() } } _logger?.LogDebug("Subscriber task completed."); - // Stop waiting for data to push. - _pushStopCts.Cancel(); + // Stop waiting for data to push and handler tasks. + _waitForHandlerCts.Cancel(); + _ackNackCts.Cancel(); } private LeaseTiming EffectiveLeaseTiming => _exactlyOnceDeliveryEnabled ? _exactlyOnceDeliveryLeaseTiming : _normalLeaseTiming; @@ -262,7 +272,7 @@ private bool IsPushComplete() // Lock required for ackQueue and nackQueue. lock (_lock) { - return _ackQueue.Count == 0 && _nackQueue.Count == 0 && _pushInFlight == 0 && _userHandlerInFlight == 0; + return _ackQueue.Count == 0 && _nackQueue.Count == 0 && _pushInFlight == 0 && _messagesInFlight.Count == 0; } } @@ -313,7 +323,7 @@ private void StartStreamingPull() { // Delay, then start the streaming-pull. _logger?.LogDebug("Client {index} delaying for {seconds}s before streaming pull call.", _clientIndex, (int) backoff.TotalSeconds); - Task delayTask = _scheduler.Delay(backoff, _softStopCts.Token); + Task delayTask = _scheduler.Delay(backoff, _pullCts.Token); Add(delayTask, Next(true, HandleStartStreamingPullWithoutBackoff)); } else @@ -327,9 +337,9 @@ private void StartStreamingPull() private void HandleStartStreamingPullWithoutBackoff() { _retryState.OnStartAttempt(); - _pull = _client.StreamingPull(CallSettings.FromCancellationToken(_softStopCts.Token)); + _pull = _client.StreamingPull(CallSettings.FromCancellationToken(_pullCts.Token)); // Cancellation not needed in this WriteAsync call. The StreamingPull() cancellation - // (above) will cause this call to cancel if _softStopCts is cancelled. + // (above) will cause this call to cancel if _pullCts is cancelled. Task initTask = _pull.WriteAsync(new StreamingPullRequest { SubscriptionAsSubscriptionName = _subscriptionName, @@ -391,7 +401,7 @@ private void HandlePullMoveNext(Task initTask) if (throttle) { // Too many queued ack/nack/extend ids. Loop until the queue has drained a bit. - Add(_scheduler.Delay(TimeSpan.FromMilliseconds(100), _softStopCts.Token), Next(true, () => HandlePullMoveNext(null))); + Add(_scheduler.Delay(TimeSpan.FromMilliseconds(100), _pullCts.Token), Next(true, () => HandlePullMoveNext(null))); } else { @@ -446,8 +456,8 @@ private void HandlePullMessageData(Task moveNextTask) // However, temporary failures are retried for up to three times and may eventaully succeed, result in permanent failure, or remain as temporary failure. // Therefore, we must wait for all receipt ModAck responses to complete to obtain the final status. // Then, the messages with successful receipt ModAcks are sent to the user, while those with failed ModAcks are removed from further processing. - Add(_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_softStopCts.Token) - .ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler) + Add(_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_sendToHandlerCts.Token) + .ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds), _sendToHandlerCts.Token, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler) , Next(true, () => HandlePullMoveNext(null))); } else @@ -501,13 +511,9 @@ private async Task ProcessPullMessagesAsync(List msgs, HashSet< // Running async. Common data needs locking for (int msgIndex = 0; msgIndex < msgs.Count; msgIndex++) { - if (_softStopCts.IsCancellationRequested) + if (_sendToHandlerCts.IsCancellationRequested) { - // If the subscriber was shutdown we should stop processing and nack remaining messages, releasing - // the message for re-delivery. - var remainingAckIds = msgs.Skip(msgIndex).Select(x => x.AckId); - Nack(remainingAckIds); - _softStopCts.Token.ThrowIfCancellationRequested(); + break; } var msg = msgs[msgIndex]; @@ -515,10 +521,14 @@ private async Task ProcessPullMessagesAsync(List msgs, HashSet< // Prepare to call user message handler, _flow.Process(...) enforces the user-handler concurrency constraints. await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), _messageOrderingEnabled ? msg.Message.OrderingKey ?? "" : "", async () => { + if (_sendToHandlerCts.IsCancellationRequested || _waitForHandlerCts.IsCancellationRequested) + { + return; + } // Running async. Common data needs locking lock (_lock) { - _userHandlerInFlight += 1; + _messagesInFlight.Add(msg.AckId); } if (msg.DeliveryAttempt > 0) { @@ -527,42 +537,55 @@ await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), _messageOrde // Call user message handler var reply = await _taskHelper.ConfigureAwaitHideErrors(() => { - // If the subscriber shut down while waiting for flow control, skip the handler. - // Throwing here triggers a Nack, releasing the message for redelivery. - _softStopCts.Token.ThrowIfCancellationRequested(); - return _handler.HandleMessage(msg.Message, _hardStopCts.Token); + return _handler.HandleMessage(msg.Message, _waitForHandlerCts.Token); }, Reply.Nack); - - // Lock msgsIds, this is accessed concurrently here and in HandleExtendLease(). - lock (leaseTracking) + if (_waitForHandlerCts.IsCancellationRequested) { - leaseTracking.Remove(msg.AckId); + // Don't return any ack/nack responses if we are no longer waiting on handlers + lock (_lock) + { + _messagesInFlight.Remove(msg.AckId); + } + // Signal the event loop to re-evaluate the shutdown state and exit early without forwarding ack + // replies, all unhandled messages will be NAck'ed in HandleExtendLease(). + _eventPush.Set(); + return; } // Lock ack/nack-queues, this is accessed concurrently here and in "master" thread. lock (_lock) { - _userHandlerInFlight -= 1; + _messagesInFlight.Remove(msg.AckId); var queue = reply == Reply.Ack ? _ackQueue : _nackQueue; queue.Enqueue(msg.AckId); } + // Lock msgsIds, this is accessed concurrently here and in HandleExtendLease(). + lock (leaseTracking) + { + leaseTracking.Remove(msg.AckId); + } // Ids have been added to ack/nack-queue, so trigger a push. _eventPush.Set(); })); } + } - void Nack(IEnumerable ackIds) + private void Nack(IEnumerable ackIds, HashSet leaseTracking) + { + var idsToNack = ackIds.ToList(); + if (idsToNack.Count == 0) { - lock (_lock) - { - _nackQueue.Enqueue(ackIds); - } - lock (leaseTracking) - { - leaseTracking.ExceptWith(ackIds); - } - // Ids have been added to nack-queue, so trigger a push. - _eventPush.Set(); + return; } + + lock (_lock) + { + _nackQueue.Enqueue(idsToNack); + } + lock (leaseTracking) + { + leaseTracking.ExceptWith(idsToNack); + } + _eventPush.Set(); } private class LeaseCancellation @@ -581,8 +604,8 @@ public CancellationToken Token } } - public LeaseCancellation(CancellationTokenSource softStopCts) => - _cts = CancellationTokenSource.CreateLinkedTokenSource(softStopCts.Token); + public LeaseCancellation(CancellationTokenSource leaseExtensionCts) => + _cts = CancellationTokenSource.CreateLinkedTokenSource(leaseExtensionCts.Token); public void Dispose() { @@ -617,43 +640,59 @@ public void Cancel() } } - private void HandleExtendLease(HashSet msgIds, LeaseCancellation cancellation) + private void HandleExtendLease(HashSet leaseTracking, LeaseCancellation cancellation) { - if (_softStopCts.IsCancellationRequested) + if (_waitForHandlerCts.IsCancellationRequested) { - // No further lease extensions once stop is requested. + // if we are no longer waiting on handlers then NAck messages for redelivery. + Nack(ackIds: leaseTracking, leaseTracking); return; } + + if (_sendToHandlerCts.IsCancellationRequested) + { + // Nack messages that will not be sent to handler freeing them for redelivery. + // We continue extending leases for in flight messages. + var messagesNotBeingHandled = leaseTracking.Except(_messagesInFlight); + Nack(messagesNotBeingHandled, leaseTracking); + + } + // The first call to this method happens as soon as messages in this chunk start to be processed. // This triggers the server to start its lease timer. if (cancellation == null) { // Create a task to cancel lease-extension once `_maxExtensionDuration` has been reached. // This set up once for each chunk of received messages, and passed through to each future call to this method. - cancellation = new LeaseCancellation(_softStopCts); + cancellation = new LeaseCancellation(_waitForHandlerCts); Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () => { + // On cancellation we may have not reached the max extension duration so don't clear the lease + // tracking allowing the messages to be Nack'ed for redelivery. + if (!_waitForHandlerCts.IsCancellationRequested) + { + lock (leaseTracking) + { + leaseTracking.Clear(); + } + } // This is executed when `_maxExtensionDuration` has expired, or when `cancellation` is cancelled, // Which ensures `cancellation` is aways disposed of. cancellation.Dispose(); - lock (msgIds) - { - msgIds.Clear(); - } })); } if (!cancellation.IsDisposed) { // If `_maxExtensionDuration` has not expired, then schedule a further lease extension. bool anyMsgIds; - lock (msgIds) + lock (leaseTracking) { - anyMsgIds = msgIds.Count > 0; + anyMsgIds = leaseTracking.Count > 0; if (anyMsgIds) { lock (_lock) { - _extendQueue.Enqueue(msgIds.Select(x => new TimedId(_extendThrottleHigh + 1, x))); + _extendQueue.Enqueue(leaseTracking.Select(x => new TimedId(_extendThrottleHigh + 1, x))); } } } @@ -663,10 +702,10 @@ private void HandleExtendLease(HashSet msgIds, LeaseCancellation cancell _eventPush.Set(); // Some ids still exist, schedule another extension. // The overall `_maxExtensionDuration` is maintained by passing through the existing `cancellation`. - Add(_scheduler.Delay(EffectiveLeaseTiming.AutoExtendDelay, _softStopCts.Token), Next(false, () => HandleExtendLease(msgIds, cancellation))); + Add(_scheduler.Delay(EffectiveLeaseTiming.AutoExtendDelay, _waitForHandlerCts.Token), Next(false, () => HandleExtendLease(leaseTracking, cancellation))); // Increment _extendThrottles. _extendThrottleHigh += 1; - Add(_scheduler.Delay(EffectiveLeaseTiming.ExtendQueueThrottleInterval, _softStopCts.Token), Next(false, () => _extendThrottleLow += 1)); + Add(_scheduler.Delay(EffectiveLeaseTiming.ExtendQueueThrottleInterval, _waitForHandlerCts.Token), Next(false, () => _extendThrottleLow += 1)); } else { @@ -680,7 +719,7 @@ private void HandleExtendLease(HashSet msgIds, LeaseCancellation cancell private void HandlePush() { // Always re-listen for push events. - Add(_eventPush.WaitAsync(_pushStopCts.Token), Next(false, HandlePush)); + Add(_eventPush.WaitAsync(_ackNackCts.Token), Next(false, HandlePush)); // Send data, if there is any to send. StartPush(); } @@ -712,21 +751,21 @@ private void StartPush() { _pushInFlight += acks.Count; _concurrentPushCount += 1; - Task ackTask = _client.AcknowledgeAsync(_subscriptionName, acks, _hardStopCts.Token); + Task ackTask = _client.AcknowledgeAsync(_subscriptionName, acks, _ackNackCts.Token); Add(ackTask, Next(false, () => HandleAckResponse(ackTask, acks, null, null))); } if (extends.Count > 0) { _pushInFlight += extends.Count; _concurrentPushCount += 1; - Task extendTask = _client.ModifyAckDeadlineAsync(_subscriptionName, extends.Select(x => x.Id), EffectiveLeaseTiming.AckDeadlineSeconds, _hardStopCts.Token); + Task extendTask = _client.ModifyAckDeadlineAsync(_subscriptionName, extends.Select(x => x.Id), EffectiveLeaseTiming.AckDeadlineSeconds, _waitForHandlerCts.Token); Add(extendTask, Next(false, () => HandleAckResponse(extendTask, null, null, extends))); } if (nacks.Count > 0) { _pushInFlight += nacks.Count; _concurrentPushCount += 1; - Task nackTask = _client.ModifyAckDeadlineAsync(_subscriptionName, nacks, 0, _hardStopCts.Token); + Task nackTask = _client.ModifyAckDeadlineAsync(_subscriptionName, nacks, 0, _ackNackCts.Token); Add(nackTask, Next(false, () => HandleAckResponse(nackTask, null, nacks, null))); } } @@ -929,7 +968,7 @@ void RetryTemporaryFailures( { var backoff = retryGroup.Key ?? TimeSpan.Zero; var retryIds = retryGroup.Select(j => j.id); - Task delayTask = _scheduler.Delay(backoff, _softStopCts.Token); + Task delayTask = _scheduler.Delay(backoff, hasAcksOrNacks ? _ackNackCts.Token : _waitForHandlerCts.Token); Add(delayTask, new NextAction(false, hasAcksOrNacks ? () => { ackActionToRetry(retryIds); StartPush(); } : () => { extendActionToRetry(extendIds.Where(j => retryIds.Contains(j.Id))); StartPush(); })); @@ -1092,13 +1131,13 @@ private void HandleStreamPing() { // Need to explicitly check this, as the continuation passed to Add() may be executed // regardless of the fault/cancellation state of the Task. - if (_softStopCts.IsCancellationRequested) + if (_pullCts.IsCancellationRequested) { // No more pings when subscriber stopping. return; } // Schedule next ping, this never stops whilst this subscriber as active - Add(_scheduler.Delay(s_streamPingPeriod, _softStopCts.Token), Next(false, HandleStreamPing)); + Add(_scheduler.Delay(s_streamPingPeriod, _pullCts.Token), Next(false, HandleStreamPing)); // If messages are currently being processed, then ping the stream periodically; // this ensures the stream isn't closed. // If the stream is closed, then all gRPC-buffered messages have their server-side diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs index 9ff2e30d9c8f..acc1a35fd998 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs @@ -89,8 +89,11 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable _mainTcs; - private CancellationTokenSource _globalSoftStopCts; // soft-stop is guarenteed to occur before hard-stop. - private CancellationTokenSource _globalHardStopCts; + private CancellationTokenSource _globalNackAndWaitCts; // Nack and wait is guarenteed to occur before hard-stop. + private CancellationTokenSource _globalHardStopCts; // Immediately stops all processing, dropping any unhandled messages. + + // A boolean that is true if stop has been initiated with the StopAsync methods. + private bool IsStopStarted => _globalHardStopCts.IsCancellationRequested || _globalNackAndWaitCts.IsCancellationRequested; // This property only exists for testing. // This is the delay between obtaining a lease on a message and then further extending the lease on that message @@ -120,7 +123,7 @@ public override Task StartAsync(SubscriptionHandler handler) { GaxPreconditions.CheckState(_mainTcs == null, "Can only start an instance once."); _mainTcs = new TaskCompletionSource(); - _globalSoftStopCts = new CancellationTokenSource(); + _globalNackAndWaitCts = new CancellationTokenSource(); _globalHardStopCts = new CancellationTokenSource(); } var registeredTasks = new HashSet(); @@ -149,7 +152,6 @@ private async Task StopCompletionAsync(Task[] subscriberTasks, HashSet reg var task = await _taskHelper.ConfigureAwait(_taskHelper.WhenAny(subscriberTasks)); if (task.IsFaulted) { - _globalSoftStopCts.Cancel(); _globalHardStopCts.Cancel(); } // Wait for all subscribers to stop @@ -203,13 +205,13 @@ public override Task StopAsync(CancellationToken hardStopToken) lock (_lock) { // Note: If multiple stop requests are made, only the first cancellation token is observed. - if (_mainTcs is not null && _globalSoftStopCts.IsCancellationRequested) + if (_mainTcs is not null && IsStopStarted) { // No-op. We don't want to throw exceptions if DisposeAsync or StopAsync is called a second time. return _mainTcs.Task; } GaxPreconditions.CheckState(_mainTcs != null, "Can only stop a started instance."); - _globalSoftStopCts.Cancel(); + _globalNackAndWaitCts.Cancel(); } var registration = hardStopToken.Register(() => _globalHardStopCts.Cancel()); From 738e14b62651b9ee87c5e19aefe97ab023b37b1d Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Tue, 24 Mar 2026 23:38:44 +0000 Subject: [PATCH 2/5] feat(PubSub): Add LinkedCancellationTokenSource. --- .../SubscriberClientImpl.SingleChannel.cs | 107 ++++++++++++++++-- 1 file changed, 96 insertions(+), 11 deletions(-) diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs index 6b2f30870ec7..69946d4a9351 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs @@ -146,11 +146,11 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) private readonly IClock _clock; private readonly SubscriberServiceApiClient _client; private readonly SubscriptionHandler _handler; - private readonly CancellationTokenSource _mainCts; // Cancels the main processing loop, immediately stoping all processing. This may drop messages. - private readonly CancellationTokenSource _pullCts; // Cancels the streaming pull, preventing new messages from being consumed. - private readonly CancellationTokenSource _ackNackCts; // Cancels all ACK/NACK pushes to the Pub/Sub service. - private readonly CancellationTokenSource _sendToHandlerCts; // Cancels sending new messages to be handled. - private readonly CancellationTokenSource _waitForHandlerCts; // Cancels all message handler operations. + private readonly LinkedCancellationTokenSource _mainCts; // Cancels the main processing loop, immediately stoping all processing. This may drop messages. + private readonly LinkedCancellationTokenSource _pullCts; // Cancels the streaming pull, preventing new messages from being consumed. + private readonly LinkedCancellationTokenSource _ackNackCts; // Cancels all ACK/NACK pushes to the Pub/Sub service. + private readonly LinkedCancellationTokenSource _sendToHandlerCts; // Cancels sending new messages to be handled. + private readonly LinkedCancellationTokenSource _waitForHandlerCts; // Cancels all message handler operations. private readonly SubscriptionName _subscriptionName; private readonly LeaseTiming _normalLeaseTiming; private readonly LeaseTiming _exactlyOnceDeliveryLeaseTiming; @@ -199,11 +199,11 @@ internal SingleChannel(SubscriberClientImpl subscriber, // Arrange cancellation token sources for shutdown var nackAndWaitToken = subscriber._globalNackAndWaitCts.Token; var hardstopToken = subscriber._globalHardStopCts.Token; - _mainCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken); - _ackNackCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken); - _pullCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken, nackAndWaitToken); - _sendToHandlerCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken, nackAndWaitToken); // This is guaranteed to be cancelled before _waitForHandlerCts - _waitForHandlerCts = CancellationTokenSource.CreateLinkedTokenSource(hardstopToken); + _mainCts = new LinkedCancellationTokenSource(hardstopToken); + _ackNackCts = new LinkedCancellationTokenSource(hardstopToken); + _pullCts = new LinkedCancellationTokenSource(hardstopToken, nackAndWaitToken); + _sendToHandlerCts = new LinkedCancellationTokenSource(hardstopToken, nackAndWaitToken);// This is guaranteed to be cancelled before _waitForHandlerCts + _waitForHandlerCts = new LinkedCancellationTokenSource(hardstopToken); _subscriptionName = subscriber.SubscriptionName; _normalLeaseTiming = subscriber._normalLeaseTiming; @@ -664,7 +664,7 @@ private void HandleExtendLease(HashSet leaseTracking, LeaseCancellation { // Create a task to cancel lease-extension once `_maxExtensionDuration` has been reached. // This set up once for each chunk of received messages, and passed through to each future call to this method. - cancellation = new LeaseCancellation(_waitForHandlerCts); + cancellation = new LeaseCancellation(_waitForHandlerCts.Source); Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () => { // On cancellation we may have not reached the max extension duration so don't clear the lease @@ -1326,5 +1326,90 @@ internal void OnServerDisconnect() Backoff = _disconnectBackoff; } } + + /// + /// Evaluates the underlying parent tokens directly for cancellation rather than relying on standard callback mechanisms. + /// This guarantees atomicity and precise synchronization across multiple sources linked to the same parent tokens. + /// + private sealed class LinkedCancellationTokenSource + { + private readonly CancellationTokenSource _linkedSource; + private readonly CancellationToken[] _parentTokens; + + /// + /// Initializes a new instance linked to the provided parent tokens. + /// + public LinkedCancellationTokenSource(params CancellationToken[] parentTokens) + { + _parentTokens = parentTokens ?? Array.Empty(); + + // Fallback to a standard source if no parent tokens are provided + _linkedSource = _parentTokens.Length > 0 + ? CancellationTokenSource.CreateLinkedTokenSource(_parentTokens) + : new CancellationTokenSource(); + } + + /// + /// Gets whether cancellation has been requested for this source or any parent tokens. + /// + public bool IsCancellationRequested + { + get + { + // Fast path: inner source already registered the cancellation + if (_linkedSource.IsCancellationRequested) + { + return true; + } + + // Explicitly check parents to bypass callback delays + for (int i = 0; i < _parentTokens.Length; i++) + { + if (_parentTokens[i].IsCancellationRequested) + { + return true; + } + } + return false; + } + } + + /// + /// Gets the CancellationToken associated with this source. + /// + public CancellationToken Token => _linkedSource.Token; + + + /// + /// Gets the CancellationTokenSource associated with the linked token. + /// + public CancellationTokenSource Source => _linkedSource; + + /// + /// Throws an OperationCanceledException if this source or any parent token has been canceled. + /// + public void ThrowIfCancellationRequested() + { + // Fast path: inner source already registered the cancellation + if (_linkedSource.IsCancellationRequested) + { + _linkedSource.Token.ThrowIfCancellationRequested(); + } + + // Explicitly check parents to bypass callback delays + for (int i = 0; i < _parentTokens.Length; i++) + { + if (_parentTokens[i].IsCancellationRequested) + { + _parentTokens[i].ThrowIfCancellationRequested(); + } + } + } + + /// + /// Communicates a request for cancellation to the linked cancellation token. Does not cancel parent tokens. + /// + public void Cancel() => _linkedSource.Cancel(); + } } } From 69200a071e98d32a26a2ecd6db0862bba4a1cfdf Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Thu, 12 Mar 2026 15:41:18 +0000 Subject: [PATCH 3/5] feat(PubSub): Add NackImmediately and WaitForProcessing shutdown modes. --- .../SubscriberClientTest.cs | 656 +++++++++++++++--- .../SubscriberClient.ShutdownOptions.cs | 52 ++ .../SubscriberClient.cs | 12 + .../SubscriberClientImpl.SingleChannel.cs | 63 +- .../SubscriberClientImpl.cs | 100 ++- 5 files changed, 759 insertions(+), 124 deletions(-) create mode 100644 apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.ShutdownOptions.cs diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs index 72d839098982..e518397b14ef 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs @@ -23,10 +23,10 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Xunit; +using static Google.Cloud.PubSub.V1.SubscriberClient; namespace Google.Cloud.PubSub.V1.Tests { @@ -456,18 +456,16 @@ private void MaybeThrowException(IEnumerable ackIds) /// internal sealed class TestSubscriptionHandler : SubscriptionHandler { - private readonly bool _ackOrNack; + private readonly Func> _handler; - internal List Responses { get; } + internal List Responses { get; } = new List(); - internal TestSubscriptionHandler(bool ackOrNack) - { - _ackOrNack = ackOrNack; - Responses = new List(); - } + internal TestSubscriptionHandler(Func> handler) => _handler = handler; + + internal TestSubscriptionHandler(bool ackOrNack) => _handler = (msg, ct) => Task.FromResult(ackOrNack ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack); public override Task HandleMessage(PubsubMessage message, CancellationToken cancellationToken) => - Task.FromResult(_ackOrNack ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack); + _handler(message, cancellationToken); public override void HandleAckResponses(IReadOnlyList responses) => // For exactly once delivery, only messages that succeed or fail permanently appear here, i.e., only messages whose status is finalized. @@ -583,7 +581,7 @@ private static RpcException GetExactlyOnceDeliveryMixedException(Rpc.ErrorInfo e } [Theory, CombinatorialData] - public void ImmediateStop( + public void ImmediateStop_Obsolete( [CombinatorialValues(false, true)] bool hardStop) { using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) @@ -608,8 +606,35 @@ public void ImmediateStop( } } + [Theory, CombinatorialData] + public void ImmediateStop( + [CombinatorialValues(false, true)] bool hardStop, + ShutdownMode shutdownMode) + { + using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) + { + fake.Scheduler.Run(async () => + { + var doneTask = fake.Subscriber.StartAsync((msg, ct) => + { + throw new Exception("Should never get here"); + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( + () => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode }, cancellationToken: new CancellationToken(hardStop))); + Assert.Equal(hardStop, isCancelled); + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.Subscribers[0].WriteCompletes); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(3) }, fake.ClientShutdowns); + }); + } + } + [Fact] - public void StopBeforeStart() + public void StopBeforeStart_Obsolete() { using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) { @@ -630,10 +655,36 @@ public void StopBeforeStart() } } + [Theory, CombinatorialData] + public void StopBeforeStart(ShutdownMode shutdownMode) + { + // Ensure StopAsync cannot be called before the subscriber has started. + using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) + { + fake.Scheduler.Run(async () => + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + + // Attempting to stop an unstarted subscriber should throw InvalidOperationException. + var exception = await Assert.ThrowsAsync( + async () => await fake.TaskHelper.ConfigureAwait( + fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode, Timeout = TimeSpan.FromHours(1) }))); + + Assert.Equal("Can only stop a started instance.", exception.Message); + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(Array.Empty(), fake.Subscribers[0].WriteCompletes); + Assert.Equal(Array.Empty(), fake.ClientShutdowns); + }); + } + } + // The test is similar to ImmediateStop but checks that calling DisposeAsync() instead of StopAsync() works. // It also tests that DisposeAsync() or StopAsync() can be called multiple times, without throwing exception. [Fact] - public void Dispose() + public void Dispose_Obsolete() { using (var fake = Fake.CreateClientForSingleResponseStream(new[] { ServerAction.Inf() })) { @@ -664,6 +715,37 @@ await fake.TaskHelper.ConfigureAwaitHideCancellation( } } + [Theory, CombinatorialData] + public void Dispose(ShutdownMode shutdownMode) + { + // Ensure StopAsync is idempotent and safe to call after disposal. + using (var fake = Fake.CreateClientForSingleResponseStream(new[] { ServerAction.Inf() })) + { + fake.Scheduler.Run(async () => + { + var doneTask = fake.Subscriber.StartAsync((msg, ct) => + { + throw new Exception("Should never get here"); + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + + // Perform disposal then multiple StopAsync calls. + await fake.TaskHelper.ConfigureAwaitHideCancellation( + () => fake.Subscriber.DisposeAsync().AsTask()); + await fake.TaskHelper.ConfigureAwait( + fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode, Timeout = TimeSpan.FromHours(1) })); + + // Verify the client shutdown correctly without exceptions. + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.Subscribers[0].WriteCompletes); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(3) }, fake.ClientShutdowns); + }); + } + } + [Fact] public void DisposeBeforeStart() { @@ -690,7 +772,7 @@ await fake.TaskHelper.ConfigureAwaitHideCancellation( } [Theory, PairwiseData] - public void RecvManyMsgsNoErrors( + public void RecvManyMsgsNoErrors_Obsolete( [CombinatorialValues(false, true)] bool hardStop, [CombinatorialValues(2, 5, 6, 9, 10)] int batchCount, [CombinatorialValues(1, 10, 13, 44, 45)] int batchSize, @@ -741,6 +823,66 @@ public void RecvManyMsgsNoErrors( } } + [Theory, PairwiseData] + public void RecvManyMsgsNoErrors( + bool hardStop, + ShutdownMode shutdownMode, + [CombinatorialValues(2, 5, 6, 9, 10)] int batchCount, + [CombinatorialValues(1, 10, 13, 44, 45)] int batchSize, + [CombinatorialValues(0, 1, 4, 6, 60)] int interBatchIntervalSeconds, + [CombinatorialValues(0, 1, 5, 8, 21)] int handlerDelaySeconds, + [CombinatorialValues(2, 8, 11, 34, 102)] int stopAfterSeconds, + [CombinatorialValues(1, 2, 3, 4, 5, 7, 13)] int threadCount, + [CombinatorialValues(1, 2, 3, 4, 8, 16, 33)] int clientCount) + { + int delayToSubtract = shutdownMode switch + { + ShutdownMode.WaitForProcessing => hardStop ? handlerDelaySeconds : 0, + ShutdownMode.NackImmediately => handlerDelaySeconds, + _ => throw new InvalidOperationException() + }; + + var expectedCompletedBatches = interBatchIntervalSeconds == 0 + ? (stopAfterSeconds < delayToSubtract ? 0 : batchCount) + : Math.Max(0, stopAfterSeconds - delayToSubtract) / interBatchIntervalSeconds; + var expectedMsgCount = Math.Min(expectedCompletedBatches, batchCount) * batchSize * clientCount; + var expectedAcks = Enumerable.Range(0, batchCount) + .SelectMany(batchIndex => Enumerable.Range(0, batchSize).Select(msgIndex => FakeSubscriberServiceApiClient.MakeMsgId(batchIndex, msgIndex))) + .Take(expectedMsgCount / clientCount) + .OrderBy(x => x); + + var msgss = Enumerable.Range(0, batchCount) + .Select(batchIndex => + ServerAction.Data(TimeSpan.FromSeconds(interBatchIntervalSeconds), Enumerable.Range(0, batchSize).Select(i => (batchIndex * batchSize + i).ToString()))) + .Concat(new[] { ServerAction.Inf() }); + using (var fake = Fake.Create(Enumerable.Repeat(msgss, clientCount).ToList(), clientCount: clientCount, threadCount: threadCount)) + { + fake.Scheduler.Run(async () => + { + List handledMsgs = new List(); + var doneTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(handlerDelaySeconds), ct)); + lock (handledMsgs) + { + handledMsgs.Add(msg.Data.ToStringUtf8()); + } + return SubscriberClient.Reply.Ack; + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); + var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( + () => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode }, cancellationToken: new CancellationToken(hardStop))); + Assert.Equal(hardStop, isCancelled); + Assert.Equal(clientCount, fake.Subscribers.Count); + Assert.Equal(expectedMsgCount, handledMsgs.Locked(() => handledMsgs.Count)); + Assert.Equal(expectedMsgCount, fake.Subscribers.Sum(x => x.Acks.Count)); + Assert.Equal(Enumerable.Repeat(expectedAcks, clientCount), fake.Subscribers.Select(x => x.Acks.Select(y => y.Id).OrderBy(y => y))); + Assert.Equal(Enumerable.Repeat(1, clientCount).ToArray(), fake.Subscribers.Select(x => x.WriteCompletes.Count).ToArray()); + Assert.Equal(1, fake.ClientShutdowns.Count); + }); + } + } + [Theory, CombinatorialData] public void OneClientManyMsgs([CombinatorialValues(1, 2, 3, 4, 5)] int rndSeed) { @@ -783,7 +925,7 @@ public void OneClientManyMsgs([CombinatorialValues(1, 2, 3, 4, 5)] int rndSeed) recvedMsgs.Add(msgString); if (recvedMsgs.Count == totalMsgCount) { - Task unused = fake.Subscriber.StopAsync(CancellationToken.None); + Task unused = fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing }); } } return SubscriberClient.Reply.Ack; @@ -799,7 +941,7 @@ public void OneClientManyMsgs([CombinatorialValues(1, 2, 3, 4, 5)] int rndSeed) } [Theory, PairwiseData] - public void FlowControl( + public void FlowControl_Obsolete( [CombinatorialValues(false, true)] bool hardStop, [CombinatorialValues(1, 2, 3)] int clientCount, [CombinatorialValues(2, 9, 25, 600)] int stopAfterSeconds, @@ -848,8 +990,58 @@ public void FlowControl( } } - [Fact] - public void UserHandlerFaults() + [Theory, PairwiseData] + public void FlowControl( + [CombinatorialValues(false, true)] bool hardStop, + [CombinatorialValues(1, 2, 3)] int clientCount, + [CombinatorialValues(2, 9, 25, 600)] int stopAfterSeconds, + [CombinatorialValues(1, 2, 3, 4, 5, 10, 21, 99, 148)] int flowMaxElements, + [CombinatorialValues(1, 10, 14, 18, 25, 39, 81, 255)] int flowMaxBytes, + [CombinatorialValues(1, 3, 9, 19)] int threadCount) + { + const int msgsPerClient = 100; + var oneMsgByteCount = FakeSubscriberServiceApiClient.MakeReceivedMessage("0000.0000", "0000").CalculateSize(); + var combinedFlowMaxElements = Math.Min(flowMaxElements, flowMaxBytes / oneMsgByteCount + 1); + var expectedMsgCount = Math.Min(msgsPerClient * clientCount, combinedFlowMaxElements * stopAfterSeconds); + var msgss = Enumerable.Range(0, msgsPerClient) + .Select(i => ServerAction.Data(TimeSpan.Zero, new[] { i.ToString("D4") })) + .Concat(new[] { ServerAction.Inf() }); + using (var fake = Fake.Create(Enumerable.Repeat(msgss, clientCount).ToList(), + flowMaxElements: flowMaxElements, flowMaxBytes: flowMaxBytes, clientCount: clientCount, threadCount: threadCount)) + { + fake.Scheduler.Run(async () => + { + List handledMsgs = new List(); + int concurrentElementCount = 0; + int concurrentByteCount = 0; + var doneTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var msgSize = msg.CalculateSize(); + lock (handledMsgs) + { + Assert.True((concurrentElementCount += 1) <= flowMaxElements, "Flow has exceeded max elements."); + // Exceeding the flow byte limit is allowed for individual messages that exceed that size. + Assert.True((concurrentByteCount += msgSize) <= flowMaxBytes || concurrentElementCount == 1, "Flow has exceeded max bytes."); + } + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), ct)); + lock (handledMsgs) + { + handledMsgs.Add(msg.Data.ToStringUtf8()); + // Check just for sanity + Assert.True((concurrentElementCount -= 1) >= 0); + Assert.True((concurrentByteCount -= msgSize) >= 0); + } + return SubscriberClient.Reply.Ack; + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); + await fake.TaskHelper.ConfigureAwaitHideCancellation(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately }, cancellationToken: new CancellationToken(hardStop))); + Assert.Equal(expectedMsgCount, handledMsgs.Count); + }); + } + } + + [Theory, CombinatorialData] + public void UserHandlerFaults(ShutdownMode shutdownMode) { var msgs = Enumerable.Repeat(ServerAction.Data(TimeSpan.Zero, new[] { "m" }), 10).Concat(new[] { ServerAction.Inf() }); using (var fake = Fake.Create(new[] { msgs })) @@ -869,7 +1061,7 @@ public void UserHandlerFaults() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(Enumerable.Repeat("m", 5), handledMsgs); Assert.Equal(5, fake.Subscribers[0].Acks.Count); Assert.Equal(5, fake.Subscribers[0].Nacks.Count); @@ -879,7 +1071,8 @@ public void UserHandlerFaults() [Theory, PairwiseData] public void ServerFaultsRecoverable( - [CombinatorialValues(1, 3, 9, 14)] int threadCount) + [CombinatorialValues(1, 3, 9, 14)] int threadCount, + ShutdownMode shutdownMode) { var zero = TimeSpan.Zero; var recoverableEx = new RpcException(new Status(StatusCode.DeadlineExceeded, ""), ""); @@ -901,7 +1094,7 @@ public void ServerFaultsRecoverable( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(new[] { "1", "2", "3" }, handledMsgs); Assert.Equal(3, fake.Subscribers[0].Acks.Count); }); @@ -912,7 +1105,8 @@ public void ServerFaultsRecoverable( public void ServerFaultsUnrecoverable( [CombinatorialValues(true, false)] bool badMoveNext, [CombinatorialValues(1, 2, 3, 4, 10)] int clientCount, - [CombinatorialValues(1, 3, 9, 14)] int threadCount) + [CombinatorialValues(1, 3, 9, 14)] int threadCount, + ShutdownMode shutdownMode) { var zero = TimeSpan.Zero; var unrecoverableEx = new RpcException(new Status(StatusCode.Unimplemented, ""), ""); @@ -936,7 +1130,7 @@ public void ServerFaultsUnrecoverable( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(unrecoverableEx, ex.AllExceptions().FirstOrDefault()); Assert.NotEmpty(handledMsgs); Assert.True(handledMsgs[0] == "1" || handledMsgs[0] == "2"); @@ -965,7 +1159,7 @@ public void OnlyOneStart() } [Theory, CombinatorialData] - public void LeaseExtension(bool isExactlyOnceDelivery) + public void LeaseExtension(bool isExactlyOnceDelivery, ShutdownMode shutdownMode) { var msgs = new[] { new[] { ServerAction.Data(TimeSpan.Zero, new[] { "1" }), @@ -982,7 +1176,7 @@ public void LeaseExtension(bool isExactlyOnceDelivery) return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await fake.TaskHelper.ConfigureAwait(doneTask); Assert.Equal(1, fake.Subscribers.Count); DateTime S(int seconds) => fake.Time0 + TimeSpan.FromSeconds(seconds); @@ -993,7 +1187,7 @@ public void LeaseExtension(bool isExactlyOnceDelivery) } [Theory, CombinatorialData] - public void LeaseMaxExtension(bool isExactlyOnceDelivery) + public void LeaseMaxExtension(bool isExactlyOnceDelivery, ShutdownMode shutdownMode) { var msgs = new[] { new[] { ServerAction.Data(TimeSpan.Zero, new[] { "1" }), @@ -1010,7 +1204,7 @@ public void LeaseMaxExtension(bool isExactlyOnceDelivery) return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromHours(12), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode, Timeout = TimeSpan.FromHours(24) })); await fake.TaskHelper.ConfigureAwait(doneTask); Assert.Equal(1, fake.Subscribers.Count); // Check that the lease was extended for 60 minutes only. @@ -1020,8 +1214,8 @@ public void LeaseMaxExtension(bool isExactlyOnceDelivery) } } - [Fact] - public void SlowUplinkThrottlesPull() + [Theory, CombinatorialData] + public void SlowUplinkThrottlesPull(ShutdownMode shutdownMode) { const int msgCount = 20; const int flowMaxEls = 5; @@ -1037,7 +1231,7 @@ public void SlowUplinkThrottlesPull() { var subTask = fake.Subscriber.StartAsync((msg, ct) => Task.FromResult(SubscriberClient.Reply.Ack)); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1000), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await fake.TaskHelper.ConfigureAwait(subTask); var sub = fake.Subscribers[0]; Assert.True(sub.Extends.Count <= msgCount); // Difficult to predict, must be <= total message count @@ -1050,8 +1244,8 @@ public void SlowUplinkThrottlesPull() } } - [Fact] - public void StreamPings() + [Theory, CombinatorialData] + public void StreamPings(ShutdownMode shutdownMode) { const int pingPeriodSeconds = 25; // From SubscriberClient. const int pingCount = 10; @@ -1070,7 +1264,7 @@ public void StreamPings() // Wait a bit longer, to check no more pings happen. await th.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(pingPeriodSeconds * 4), CancellationToken.None)); // Stop subscriber. - await th.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await th.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await th.ConfigureAwait(subTask); var expectedPings = Enumerable.Range(0, pingCount).Select(i => fake.Time0 + TimeSpan.FromSeconds(pingPeriodSeconds * (i + 1))); Assert.Equal(expectedPings, fake.Subscribers[0].StreamPings); @@ -1099,14 +1293,14 @@ public void OrderingKeysManyMsgs( var startTask = fake.Subscriber.StartAsync(async (msg, ct) => { var delay = TimeSpan.FromMilliseconds(rnd.Next(1000)); - await th.ConfigureAwait(fake.Scheduler.Delay(delay, default)); + await th.ConfigureAwait(fake.Scheduler.Delay(delay, ct)); lock (recvedMsgs) { recvedMsgs.Add(msg.Data.ToStringUtf8()); recvCount += 1; if (recvCount == msgCount) { - var dummyTask = fake.Subscriber.StopAsync(CancellationToken.None); + var dummyTask = fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromHours(24) }); } } return SubscriberClient.Reply.Ack; @@ -1296,8 +1490,8 @@ public void InvalidParameters() //Assert.Equal("MaxTotalAckExtension", ex9.ParamName); There's a bug in GaxPreconditions.CheckNonNegativeDelay() which uses the wrong paramName } - [Fact] - public void DeliveryAttempt() + [Theory, CombinatorialData] + public void DeliveryAttempt(ShutdownMode shutdownMode) { var msgs = new[] { ServerAction.Data(TimeSpan.Zero, new[] { "m" }, deliveryAttempt: null), @@ -1315,7 +1509,7 @@ public void DeliveryAttempt() return Task.FromResult(SubscriberClient.Reply.Ack); }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(new int?[] { null, 2 }, deliveryAttempts); }); } @@ -1323,7 +1517,7 @@ public void DeliveryAttempt() // Acknowledge / ModifyAcknowledgeDeadline calls may throw RpcException. RpcExceptions should not be thrown to the client. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false, null)] bool? ackOrModifyAck) + public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false, null)] bool? ackOrModifyAck, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1356,7 +1550,7 @@ public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Despite RpcException being thrown, all 4 messages should be handled. Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); }); @@ -1364,7 +1558,7 @@ public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false // If Acknowledge / ModifyAcknowledgeDeadline calls throw exceptions other than RpcExceptions, they should be thrown to the client. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] bool ackOrModifyAck) + public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] bool ackOrModifyAck, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1394,7 +1588,7 @@ public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Equal(exception, ex.AllExceptions().FirstOrDefault()); Assert.Equal(1, fake.ClientShutdowns.Count); }); @@ -1404,7 +1598,7 @@ public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] // In non exactly once delivery, if we use the new SubscriptionHandler to see Ack/NackResponse, // the acknowledgement status should be returned as Success, they are treated as "fire and forget" operations. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_SubscriptionHandler([CombinatorialValues(true, false)] bool ackOrModifyAck) + public void AckModifyAckDeadlineFault_SubscriptionHandler(bool ackOrModifyAck, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1429,14 +1623,14 @@ public void AckModifyAckDeadlineFault_SubscriptionHandler([CombinatorialValues(t { var doneTask = fake.Subscriber.StartAsync(testSubscriptionHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // All the 4 test messages have encountered a recoverable RpcException, but their status should be success. Assert.Equal(4, testSubscriptionHandler.Responses.Count(j => j.Status == AcknowledgementStatus.Success)); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, null)] bool? ackNackOrExtends) + public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, null)] bool? ackNackOrExtends, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1473,14 +1667,14 @@ public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, return ackNackOrExtends == false ? SubscriberClient.Reply.Nack : SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Despite temporary failures, all 4 messages should be handled. Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true, false)] bool ackOrNack) + public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true, false)] bool ackOrNack, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1504,15 +1698,15 @@ public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true { var doneTask = fake.Subscriber.StartAsync(testSubscriptionHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Exception should not be thrown. Assert.Null(ex); Assert.Equal(new[] { "1", "2", "3", "4" }, testSubscriptionHandler.Responses.Where(j => j.Status == AcknowledgementStatus.FailedPrecondition).Select(j => j.MessageId)); }); } - [Fact] - public void ExactlyOnceDelivery_Extends_PermanentFault() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_Extends_PermanentFault(ShutdownMode shutdownMode) { var msgs = new[] { @@ -1538,14 +1732,14 @@ public void ExactlyOnceDelivery_Extends_PermanentFault() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Exception shouldn't be thrown in case of permanent failure. Assert.Null(ex); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, false)] bool ackOrNack) + public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, false)] bool ackOrNack, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1573,7 +1767,7 @@ public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, fa { var doneTask = fake.Subscriber.StartAsync(testHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); Assert.Null(ex); // "1" is success and "3" is permanent failure. Assert.Equal("1", testHandler.Responses.First(j => j.Status == AcknowledgementStatus.Success).MessageId); @@ -1581,8 +1775,8 @@ public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, fa }); } - [Fact] - public void ExactlyOnceDelivery_Extends_MixedFault() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_Extends_MixedFault(ShutdownMode shutdownMode) { var msgs = new[] { @@ -1609,7 +1803,7 @@ public void ExactlyOnceDelivery_Extends_MixedFault() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Permanent exception shouldn't be thrown. // Extends are not user initiated, so we can't get the success and temporary failed status from the client. Assert.Null(ex); @@ -1617,8 +1811,8 @@ public void ExactlyOnceDelivery_Extends_MixedFault() } // All successful receipt ModAcks. - [Fact] - public void ExactlyOnceDelivery_ReceiptModAck() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_ReceiptModAck(ShutdownMode shutdownMode) { var msgs = new[] { @@ -1645,7 +1839,7 @@ public void ExactlyOnceDelivery_ReceiptModAck() }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // All 4 messages are handled. Assert.Equal(4, handledMsgs.Count); Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); @@ -1653,7 +1847,7 @@ public void ExactlyOnceDelivery_ReceiptModAck() } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(true, false)] bool succeedOnRetry) + public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(true, false)] bool succeedOnRetry, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1685,15 +1879,15 @@ public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(tr }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Permanently failed receipt ModAcks won't be passed to the user handler, so 3 is not handled. // Temporary failed ModAck for message 2 becomes successful or permanent failure based on succeedOnRetry flag. Assert.Equal(succeedOnRetry ? new[] { "1", "2", "4" } : new[] { "1", "4" }, handledMsgs); }); } - [Fact] - public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults(ShutdownMode shutdownMode) { var msgs = new[] { @@ -1736,14 +1930,14 @@ public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults() }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Permanently failed receipt ModAcks won't be passed to the user handler, so all 4 messages are not handled. Assert.Equal(0, handledMsgs.Count); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValues(true, false)] bool succeedOnRetry) + public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValues(true, false)] bool succeedOnRetry, ShutdownMode shutdownMode) { var msgs = new[] { @@ -1787,7 +1981,7 @@ public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValu }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); // Temporary failed receipt ModAcks can succeed after 1 retry or stay failed, so based on the succeedOnRetry flag, 4 or 0 messages are handled. Assert.Equal(succeedOnRetry ? 4 : 0, handledMsgs.Count); Assert.Equal(succeedOnRetry ? new[] { "1", "2", "3", "4" } : Array.Empty(), handledMsgs); @@ -1914,8 +2108,8 @@ public void StreamingPullRetry_NonRetriableException() }); } - [Fact] - public void StreamingPullRetry_InternalErrorContinuesRetrying() + [Theory, CombinatorialData] + public void StreamingPullRetry_InternalErrorContinuesRetrying(ShutdownMode shutdownMode) { // A regular internal failure that's not due to an auth error. var exception = new RpcException(new Status(StatusCode.Internal, "Bang")); @@ -1927,7 +2121,7 @@ public void StreamingPullRetry_InternalErrorContinuesRetrying() var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await subscriberTask; }); } @@ -1954,8 +2148,8 @@ public void StreamingPullRetry_RetriableErrorEventuallyFails() /// If the streaming pull call fails in MoveNext after a short time (e.g. 10 seconds) /// we should retry with backoff. /// - [Fact] - public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackoff() + [Theory, CombinatorialData] + public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackoff(ShutdownMode shutdownMode) { var exception = new RpcException(new Status(StatusCode.Unavailable, "Stream terminated")); TimeSpan streamDuration = TimeSpan.FromSeconds(30); @@ -1967,7 +2161,7 @@ public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackof var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromMinutes(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await subscriberTask; // Check the pull times indicate a backoff. @@ -1993,8 +2187,8 @@ public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackof /// We *expect* the streaming pull to fail (in MoveNext) after about a minute... we should /// retry immediately each time. /// - [Fact] - public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBackoff() + [Theory, CombinatorialData] + public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBackoff(ShutdownMode shutdownMode) { var exception = new RpcException(new Status(StatusCode.Unavailable, "Stream terminated")); TimeSpan streamDuration = TimeSpan.FromSeconds(60); @@ -2007,7 +2201,7 @@ public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBack var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromMinutes(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = shutdownMode })); await subscriberTask; // Check the pull times indicate no backoff. @@ -2018,48 +2212,306 @@ public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBack } [Fact] - public void NackMessagesOnShutdown() + public void Shutdown_SoftStop_NacksMessages() { - var msgs = new[] { new[] { + var msgs = new[] { ServerAction.Data(TimeSpan.Zero, ["msg0"]), ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2", "msg3"]), - ServerAction.Data(TimeSpan.Zero, ["msg4", "msg5"]), + ServerAction.Data(TimeSpan.Zero, ["msg4", "msg5", "msg6"]), + ServerAction.Inf() + }; + // flowMaxElements=2: msg0 and msg4 block flow control; msg1, msg2, msg3 are processed quickly. + // msg5-6 wait for flow control and should be Nacked on shutdown. + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var givenToMessageHandler = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + givenToMessageHandler.Locked(() => givenToMessageHandler.Add(data)); + if (data == "msg0" || data == "msg4") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromMinutes(1), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, givenToMessageHandler, strict: true); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + + // msg0-4 were handled/acked; msg5-6 were pulled but nacked on shutdown. + Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg5", "msg6" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_NackImmediately_Success() + { + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + // flowMaxElements=2: msg1 and msg2 block flow control. + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data == "msg1" || data == "msg2") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(30), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // NackImmediately: Nacks pending messages (msg3, msg4) and messages currently being handled (msg1, msg2). + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately, Timeout = TimeSpan.FromSeconds(60) })); + + Assert.Equivalent(new[] { "msg0" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Theory, CombinatorialData] + public void Shutdown_NackImmediately_AckResponseStatus(bool isExactlyOnceDelivery) + { + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1"]), + ServerAction.Inf() + }; + // flowMaxElements=1: msg1 blocks flow control until msg0 is handled. + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 1, useMsgAsId: true, isExactlyOnceDelivery: isExactlyOnceDelivery); + + var testHandler = new TestSubscriptionHandler(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + if (data == "msg1") + { + // Delay msg1 to ensure it's in-flight when shutdown happens. + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(30), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + fake.Scheduler.Run(async () => + { + var startTask = fake.Subscriber.StartAsync(testHandler); + + // Wait for msg0 to be handled and msg1 to start being handled. + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + + // Stop immediately. This should forcefully nack msg1. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately, Timeout = TimeSpan.FromSeconds(60) })); + + // Check captured responses. + // msg0 should be Success as it was handled before shutdown. + Assert.Contains(testHandler.Responses, r => r.MessageId == "msg0" && r.Status == AcknowledgementStatus.Success); + // msg1 should be Other as it was nacked due to shutdown, even though the handler returned Ack. + Assert.Contains(testHandler.Responses, r => r.MessageId == "msg1" && r.Status == AcknowledgementStatus.Other); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_CompletesBeforeNack() + { + // Ensure WaitForProcessing allows all received messages to finish if time permits. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data == "msg1" || data == "msg2") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // Stop and wait for processing with a generous timeout. + // It waits for msg1, msg2 to finish and allows msg3, msg4 to be processed. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromSeconds(60) })); + + // All messages should be Acked and none Nacked. + Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Empty(fake.Subscribers[0].Nacks); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_NacksOnTimeout() + { + // Verify that remaining messages are Nacked if the WaitForProcessing timeout is reached. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2", "msg3", "msg4"]), + ServerAction.Data(TimeSpan.FromSeconds(55), ["msg5", "msg6"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var givenToMessageHandler = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + givenToMessageHandler.Locked(() => givenToMessageHandler.Add(data)); + if (data != "msg0") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, givenToMessageHandler, strict: true); + + // Stop with a timeout that expires before processing completes. + // Timeout=50s: NackDelay=20s. At T=5+20=25s, NackImmediately is triggered. + // msg1, msg2 finish at T=20s. msg3, msg4 start at T=20s but are Nacked as NackImmediately is triggered at T=25s. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromSeconds(50) })); + + // Verify the switch to Nacking for the remaining messages and msg5 & msg6 were not read from stream + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_CancellationToken_AbortsWaitForProcessing() + { + // Verify that cancelling the CancellationToken passed to StopAsync + // immediately triggers a Hard Stop, aborting any graceful shutdown. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var givenToMessageHandler = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + givenToMessageHandler.Locked(() => givenToMessageHandler.Add(msg.Data.ToStringUtf8())); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(60), ct)); + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Single(givenToMessageHandler); + + // Request graceful shutdown with a long timeout. + var cts = new CancellationTokenSource(); + var stopTask = fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromHours(1) }, cts.Token); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + + // Cancel the token after 5 seconds of "graceful" shutdown. + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + cts.Cancel(); + + await fake.TaskHelper.ConfigureAwaitHideCancellation(() => stopTask); + + // No ack response will be provided, all work should be dropped as we didn't have a chance to switch to + // NackImmediately + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_NacksWhenTimeoutLessThanMinimum() + { + // Ensure immediate Nacking if the requested timeout is shorter than the 30s grace period. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data != "msg0") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(6), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // Request shutdown with a short timeout. + // Timeout=15s: NackDelay=0s (since 15 < GracePeriod=30). NackImmediately triggered at T=5s. + // msg1, msg2 are already being handled and will be Nacked upon completion. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.WaitForProcessing, Timeout = TimeSpan.FromSeconds(15) })); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), CancellationToken.None)); + + // All unhandled messages should be Nacked immediately. + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + Assert.Equivalent(new[] { "msg0" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_NackImmediately_LeaseExtensionStops() + { + // Ensure lease extensions stop and current leases are Nacked during immediate shutdown. + var msgs = new[] { new[] { + ServerAction.Data(TimeSpan.Zero, new[] { "msg1" }), ServerAction.Inf() } }; - // Set flowMaxElements to 2 to ensure that "msg0" and "msg2" block flow control preventing - // "msg3","msg4","msg5" from being processed. - using (var fake = Fake.Create(msgs, flowMaxElements: 2, useMsgAsId: true, disposeTimeout: TimeSpan.FromSeconds(10))) + using (var fake = Fake.Create(msgs, ackDeadline: TimeSpan.FromSeconds(30), ackExtendWindow: TimeSpan.FromSeconds(10))) { fake.Scheduler.Run(async () => { - var handledMsgs = new List(); var doneTask = fake.Subscriber.StartAsync(async (msg, ct) => { - var data = msg.Data.ToStringUtf8(); - handledMsgs.Locked(() => handledMsgs.Add(data)); - if (data == "msg0" || data == "msg2") - { - // Delay handling so that StopAsync is called while the rest - // are still pulled but waiting for a flow control slot. - await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), ct)); - } + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(120), ct)); return SubscriberClient.Reply.Ack; }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - // Wait for "msg0" and "msg2" to start being handled. "msg1" will also be handled, but not hold onto - // flow control, releasing it so "msg2" can begin being handled. - await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); - Assert.Equivalent(new [] {"msg0", "msg1", "msg2"}, handledMsgs); + int numExtensionsBeforeShutdown = fake.Subscribers[0].Extends.Count(); - // Stop the subscriber. This should ensure pulled messages that haven't entered the user handler yet - // will be NAck'ed. Specifically for messages already in flow control, they will have to wait for pending - // messages to be processed before they are NAck'ed. - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + // Request immediate shutdown. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately })); + int numExtensionsAfterShutdown = fake.Subscribers[0].Extends.Count(); - // Verify that "msg0", "msg1" and "msg2" completed handling normally, while the rest were - // automatically Nacked during shutdown. - Assert.Equivalent(new [] {"msg0", "msg1", "msg2"}, fake.Subscribers[0].Acks.Select(x => x.Id)); - Assert.Equivalent(new [] {"msg3", "msg4", "msg5"}, fake.Subscribers[0].Nacks.Select(x => x.Id)); + // Verify no more lease extensions occurred after shutdown was initiated. + Assert.Equal(numExtensionsAfterShutdown, numExtensionsBeforeShutdown); }); } } @@ -2073,4 +2525,4 @@ private static IReadOnlyList> CreateBadMoveNextSequenc .Concat(Enumerable.Repeat(ServerAction.Sequence(ServerAction.Inf()), includeTrailing ? 1 : 0)) .ToList(); } -} \ No newline at end of file +} diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.ShutdownOptions.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.ShutdownOptions.cs new file mode 100644 index 000000000000..f86590929f92 --- /dev/null +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.ShutdownOptions.cs @@ -0,0 +1,52 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; + +namespace Google.Cloud.PubSub.V1; + +public abstract partial class SubscriberClient +{ + /// + /// Modes available for subscriber shutdown. + /// + public enum ShutdownMode + { + /// + /// Stops receiving new messages and continues processing all received messages. + /// + WaitForProcessing = 0, + + /// + /// Stops receiving new messages and immediately Nacks all unhandled messages, releasing them for redelivery. + /// + NackImmediately = 1, + } + + /// + /// Settings available for subscriber shutdown. + /// + public sealed class ShutdownOptions + { + /// + /// The to use for shutdown. Defaults to . + /// + public ShutdownMode Mode { get; set; } = ShutdownMode.WaitForProcessing; + + /// + /// The timeout for the shutdown process. If null, a default timeout based on the maximum extension duration is used. + /// + public TimeSpan? Timeout { get; set; } + } +} diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs index cf96fa133751..fa9cc066649c 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs @@ -231,6 +231,18 @@ public virtual Task StartAsync(Func expires. public virtual Task StopAsync(TimeSpan timeout) => StopAsync(new CancellationTokenSource(timeout).Token); + /// + /// Stops this . + /// The returned completes when the shutdown is finished according to the + /// or the is cancelled. + /// The returned faults if an unrecoverable error occurs in the underlying service. + /// The returned cancels if is cancelled. + /// + /// The to use for shutdown. + /// Cancel this to immediately abort handlers and acknowledgement. + /// A that completes when the subscriber is stopped, or if an unrecoverable error occurs. + public virtual Task StopAsync(ShutdownOptions shutdownOptions, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + /// /// Disposes this asynchronously. /// diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs index 69946d4a9351..97a5e32cc280 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs @@ -170,6 +170,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) private int _pushInFlight = 0; private readonly HashSet _messagesInFlight = new HashSet(); + private readonly HashSet _nackedOnShutdownIds = new HashSet(); private SubscriberServiceApiClient.StreamingPullStream _pull = null; private int _concurrentPushCount = 0; private bool _pullComplete = false; @@ -199,11 +200,13 @@ internal SingleChannel(SubscriberClientImpl subscriber, // Arrange cancellation token sources for shutdown var nackAndWaitToken = subscriber._globalNackAndWaitCts.Token; var hardstopToken = subscriber._globalHardStopCts.Token; + var nackImmediatelyToken = subscriber._globalNackImmediatelyCts.Token; + var waitForProcessingToken = subscriber._globalWaitForProcessingCts.Token; _mainCts = new LinkedCancellationTokenSource(hardstopToken); _ackNackCts = new LinkedCancellationTokenSource(hardstopToken); - _pullCts = new LinkedCancellationTokenSource(hardstopToken, nackAndWaitToken); - _sendToHandlerCts = new LinkedCancellationTokenSource(hardstopToken, nackAndWaitToken);// This is guaranteed to be cancelled before _waitForHandlerCts - _waitForHandlerCts = new LinkedCancellationTokenSource(hardstopToken); + _pullCts = new LinkedCancellationTokenSource(hardstopToken, nackAndWaitToken, nackImmediatelyToken, waitForProcessingToken); + _sendToHandlerCts = new LinkedCancellationTokenSource(hardstopToken, nackAndWaitToken, nackImmediatelyToken); // This is guaranteed to be cancelled before _waitForHandlerCts + _waitForHandlerCts = new LinkedCancellationTokenSource(hardstopToken, nackImmediatelyToken); _subscriptionName = subscriber.SubscriptionName; _normalLeaseTiming = subscriber._normalLeaseTiming; @@ -539,22 +542,17 @@ await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), _messageOrde { return _handler.HandleMessage(msg.Message, _waitForHandlerCts.Token); }, Reply.Nack); - if (_waitForHandlerCts.IsCancellationRequested) - { - // Don't return any ack/nack responses if we are no longer waiting on handlers - lock (_lock) - { - _messagesInFlight.Remove(msg.AckId); - } - // Signal the event loop to re-evaluate the shutdown state and exit early without forwarding ack - // replies, all unhandled messages will be NAck'ed in HandleExtendLease(). - _eventPush.Set(); - return; - } // Lock ack/nack-queues, this is accessed concurrently here and in "master" thread. lock (_lock) { _messagesInFlight.Remove(msg.AckId); + if (_waitForHandlerCts.IsCancellationRequested) + { + // Signal the event loop to re-evaluate the shutdown state and exit early without forwarding ack + // replies, all unhandled messages will be NAck'ed in HandleExtendLease(). + _eventPush.Set(); + return; + } var queue = reply == Reply.Ack ? _ackQueue : _nackQueue; queue.Enqueue(msg.AckId); } @@ -645,6 +643,7 @@ private void HandleExtendLease(HashSet leaseTracking, LeaseCancellation if (_waitForHandlerCts.IsCancellationRequested) { // if we are no longer waiting on handlers then NAck messages for redelivery. + _nackedOnShutdownIds.UnionWith(leaseTracking); Nack(ackIds: leaseTracking, leaseTracking); return; } @@ -654,6 +653,7 @@ private void HandleExtendLease(HashSet leaseTracking, LeaseCancellation // Nack messages that will not be sent to handler freeing them for redelivery. // We continue extending leases for in flight messages. var messagesNotBeingHandled = leaseTracking.Except(_messagesInFlight); + _nackedOnShutdownIds.UnionWith(messagesNotBeingHandled); Nack(messagesNotBeingHandled, leaseTracking); } @@ -1097,7 +1097,7 @@ IReadOnlyList GetAckNackResponses(List ids, Task writeT // Temporary errors are deliberately ignored for maintaining consistency with the other client libraries. // Temporary errors will eventually become successful or fail permanently. Their status would be shared then. var ackError = AckError.ForRpcException(rpcException, ids); - return ackError.GetAckNackResponses().ToList(); + return UpdateOnShutdown(ackError.GetAckNackResponses().ToList()); } else if (rpcException != null) { @@ -1122,11 +1122,38 @@ IReadOnlyList GetAckNackResponses(List ids, Task writeT // This method is local to GetAckNackResponses and gets the successful AckNackResponse for every given ID. // It takes the list of IDs that are deemed to be successfully ACK-ed or NACK-ed. // The corresponding AckNackResponse with success status is created for every ID and returned. - static IReadOnlyList GetSuccessResponses(List successfulIds) => - successfulIds.ConvertAll(item => new AckNackResponse(item, AcknowledgementStatus.Success, default)); + IReadOnlyList GetSuccessResponses(List successfulIds) => + UpdateOnShutdown(successfulIds.ConvertAll(item => new AckNackResponse(item, AcknowledgementStatus.Success, default))); + + // Updates response statuses to failure if the messages were Nacked because of shutdown. + // This way the message handler will recognize the message processing ultimately failed. + IReadOnlyList UpdateOnShutdown(List responses) + { + // If we are no longer waiting on message handlers then we may have NAck'ed some of the in-flight messages. + if (IsStopStarted) + { + for (int i = 0; i < responses.Count; i++) + { + var response = responses[i]; + if (_nackedOnShutdownIds.Contains(response.MessageId) && response.Status == AcknowledgementStatus.Success) + { + // Override status to reflect the shutdown failure + responses[i] = new AckNackResponse(response.MessageId, AcknowledgementStatus.Other, "Failed on NackImmediately shutdown."); + } + } + } + + return responses; + } } } + // A stop is initiated if any of the cancellation tokens have been invoked. + private bool IsStopStarted => _pullCts.IsCancellationRequested || + _sendToHandlerCts.IsCancellationRequested || + _waitForHandlerCts.IsCancellationRequested || + _mainCts.IsCancellationRequested; + private void HandleStreamPing() { // Need to explicitly check this, as the continuation passed to Add() may be executed diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs index acc1a35fd998..692933d3d9ff 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs @@ -36,7 +36,7 @@ namespace Google.Cloud.PubSub.V1; public sealed partial class SubscriberClientImpl : SubscriberClient { // TODO: Logging - + private static readonly TimeSpan s_finalNackReservedTime = TimeSpan.FromSeconds(30); // The amount of time reserved for a final nack during wait for processing shutdown. internal const string DeliveryAttemptAttrKey = "googclient_deliveryattempt"; /// @@ -91,9 +91,14 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable _mainTcs; private CancellationTokenSource _globalNackAndWaitCts; // Nack and wait is guarenteed to occur before hard-stop. private CancellationTokenSource _globalHardStopCts; // Immediately stops all processing, dropping any unhandled messages. + private CancellationTokenSource _globalWaitForProcessingCts; + private CancellationTokenSource _globalNackImmediatelyCts; // A boolean that is true if stop has been initiated with the StopAsync methods. - private bool IsStopStarted => _globalHardStopCts.IsCancellationRequested || _globalNackAndWaitCts.IsCancellationRequested; + private bool IsStopStarted => _globalHardStopCts.IsCancellationRequested || + _globalWaitForProcessingCts.IsCancellationRequested || + _globalNackAndWaitCts.IsCancellationRequested || + _globalNackImmediatelyCts.IsCancellationRequested; // This property only exists for testing. // This is the delay between obtaining a lease on a message and then further extending the lease on that message @@ -125,6 +130,8 @@ public override Task StartAsync(SubscriptionHandler handler) _mainTcs = new TaskCompletionSource(); _globalNackAndWaitCts = new CancellationTokenSource(); _globalHardStopCts = new CancellationTokenSource(); + _globalNackImmediatelyCts = new CancellationTokenSource(); + _globalWaitForProcessingCts = CancellationTokenSource.CreateLinkedTokenSource(_globalNackImmediatelyCts.Token); } var registeredTasks = new HashSet(); Action registerTask = task => @@ -214,11 +221,96 @@ public override Task StopAsync(CancellationToken hardStopToken) _globalNackAndWaitCts.Cancel(); } - var registration = hardStopToken.Register(() => _globalHardStopCts.Cancel()); + CancelTargetOnTrigger(targetSourceToCancel: _globalHardStopCts, triggerToken: hardStopToken); + return _mainTcs.Task; + } + + /// + public override Task StopAsync(ShutdownOptions shutdownOptions, CancellationToken cancellationToken = default) + { + GaxPreconditions.CheckArgument(shutdownOptions != null, nameof(shutdownOptions), "ShutdownOptions cannot be null"); + lock (_lock) + { + GaxPreconditions.CheckState(_mainTcs != null, "Can only stop a started instance."); + // Only the first call to StopAsync is observed + if (IsStopStarted) + { + // No-op. We don't want to throw exceptions if DisposeAsync or StopAsync is called a second time. + return _mainTcs.Task; + } + // Signal to stop retrieving new messages immediately. + _globalWaitForProcessingCts.Cancel(); + } + + TimeSpan shutdownTimeout = shutdownOptions.Timeout ?? _maxExtensionDuration; + if (shutdownTimeout <= TimeSpan.Zero) + { + Logger?.LogWarning("Shutdown timeout is 0! Stopping {Client} immediately; work may be dropped.", nameof(SubscriberClient)); + } + + // Schedule the shutdown, we can always think of a shutdown as occuring in the following steps + // WaitForProcessing -> NackImmedieately and maybe a HardStop afterwards if the timeout is reached + // or the cancellation token is cancelled. Note a the Nack delay will be zero in the case of NackImmediately + CancelAfterDelay(_globalNackImmediatelyCts, CalculateNackDelay()); + CancelAfterDelay(_globalHardStopCts, shutdownTimeout); + CancelTargetOnTrigger(targetSourceToCancel: _globalHardStopCts, triggerToken: cancellationToken); + + return _mainTcs.Task; + + // Calculates the time until we switch to NackImmediately mode. + TimeSpan CalculateNackDelay() + { + switch (shutdownOptions.Mode) + { + case ShutdownMode.NackImmediately: + return TimeSpan.Zero; + case ShutdownMode.WaitForProcessing: + // WaitForProcessing enters a Nack grace period before final shutdown if it was unable to finish + // handling all received messages. + TimeSpan delay = shutdownTimeout > s_finalNackReservedTime ? shutdownTimeout - s_finalNackReservedTime : TimeSpan.Zero; + if (delay == TimeSpan.Zero && shutdownTimeout > TimeSpan.Zero) + { + Logger?.LogWarning("Shutdown timeout ({Timeout}) <= GracePeriod ({GracePeriod}). Nacking immediately.", + shutdownTimeout, s_finalNackReservedTime); + } + return delay; + default: + throw new ArgumentOutOfRangeException(nameof(shutdownOptions.Mode), shutdownOptions.Mode, null); + } + } + } + + // Triggers cancellation after provided delay. This is used instead of .CancelAfterDelay to integrate with + // IScheduler, allowing this shutdown method to be tested. + private void CancelAfterDelay(CancellationTokenSource cts, TimeSpan delay) + { + if (delay == TimeSpan.Zero) + { + cts.Cancel(); + return; + } + _taskHelper.Run(async () => + { + await _taskHelper.ConfigureAwait(_scheduler.Delay(delay, CancellationToken.None)); + cts.Cancel(); + }); + } + + /// + /// Configures a to be cancelled automatically + /// whenever a specific is triggered. + /// The registration is automatically disposed when _mainTcs completes. + /// + /// The destination that should be cancelled. + /// The source that provides the cancellation signal. + private void CancelTargetOnTrigger( + CancellationTokenSource targetSourceToCancel, + CancellationToken triggerToken) + { + var registration = triggerToken.Register(() => targetSourceToCancel.Cancel()); // Do not register this Task to be awaited on at shutdown. // It completes *after* _mainTcs, and all registered tasks must complete before _mainTcs _taskHelper.Run(async () => await _taskHelper.ConfigureAwaitWithFinally(() => _mainTcs.Task, () => registration.Dispose())); - return _mainTcs.Task; } } From 764cd6a726bb10ff9eef5c322743d803c699028d Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Mon, 23 Feb 2026 20:42:53 +0000 Subject: [PATCH 4/5] chore(PubSub): Deprecate legacy StopAsync and update docs. --- .../PubSubClientTest.cs | 38 ++++++++++++++----- .../Program.cs | 2 +- .../SubscriberClientSnippets.cs | 7 +++- .../SubscriberServiceApiClientSnippets.cs | 6 ++- .../SubscriberClientTest.cs | 12 ++++++ .../SubscriberClient.cs | 2 + .../SubscriberClientImpl.cs | 3 ++ apis/Google.Cloud.PubSub.V1/docs/index.md | 9 +++++ 8 files changed, 66 insertions(+), 13 deletions(-) diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs index 89c3ca2b303a..a662313f0ef8 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs @@ -23,6 +23,7 @@ using System.Threading; using System.Threading.Tasks; using Xunit; +using static Google.Cloud.PubSub.V1.SubscriberClient; // Tests create quite a few tasks that don't need awaiting. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed @@ -159,7 +160,12 @@ private async Task RunBulkMessagingImpl( { // Test finished, so stop subscriber Console.WriteLine("All msgs received, stopping subscriber."); - Task unused = subscriber.StopAsync(TimeSpan.FromSeconds(15)); + var shutdownOptions = new ShutdownOptions + { + Mode = ShutdownMode.NackImmediately, + Timeout = TimeSpan.FromSeconds(15) + }; + Task unused = subscriber.StopAsync(shutdownOptions); } } else @@ -194,11 +200,15 @@ private async Task RunBulkMessagingImpl( { if (noProgressCount > 60) { - // Deadlock, shutdown subscriber, and cancel - Console.WriteLine("Deadlock detected. Cancelling test"); - subscriber.StopAsync(new CancellationToken(true)); - watchdogCts.Cancel(); - break; + // Deadlock, shutdown subscriber, and cancel + Console.WriteLine("Deadlock detected. Cancelling test"); + var shutdownOptions = new ShutdownOptions + { + Mode = ShutdownMode.NackImmediately + }; + subscriber.StopAsync(shutdownOptions, cancellationToken: new CancellationToken(true)); + watchdogCts.Cancel(); + break; } noProgressCount += 1; } @@ -431,7 +441,12 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre }); await Task.Delay(subscriberLifetime); Console.WriteLine("Stopping subscriber"); - Task stopTask = subscriber.StopAsync(TimeSpan.FromSeconds(15)); + var shutdownOptions = new ShutdownOptions + { + Mode = ShutdownMode.NackImmediately, + Timeout = TimeSpan.FromSeconds(15) + }; + Task stopTask = subscriber.StopAsync(shutdownOptions); // If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail. await Task.WhenAll(subscribeTask, stopTask); int recvCount = recvedMsgs.Locked(() => recvedMsgs.Count); @@ -538,8 +553,13 @@ await subscriberApi.IAMPolicyClient.SetIamPolicyAsync(new SetIamPolicyRequest { result.Add((msg.GetDeliveryAttempt(), true)); // Received DLQ message, so stop test. - sub.StopAsync(TimeSpan.FromSeconds(10)); - dlqSub.StopAsync(TimeSpan.FromSeconds(10)); + var shutdownOptions = new ShutdownOptions + { + Mode = ShutdownMode.NackImmediately, + Timeout = TimeSpan.FromSeconds(10) + }; + sub.StopAsync(shutdownOptions); + dlqSub.StopAsync(shutdownOptions); return Task.FromResult(SubscriberClient.Reply.Ack); }); diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs index 52c4505c29b0..b67d9dfe2f6c 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs @@ -133,7 +133,7 @@ Task Subscribe() if (recvCount == inputLines.Count) { Console.WriteLine("Received all messages, shutting down"); - var dummyTask = sub.StopAsync(CancellationToken.None); + var dummyTask = sub.StopAsync(new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately}); } } if (rnd.Next(3) == 0) diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs index eb8e1fdd814f..8a4d02725327 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs @@ -142,8 +142,11 @@ await _subscriberClient.StartAsync((msg, token) => return Task.FromResult(SubscriberClient.Reply.Ack); }); - public override async Task StopAsync(CancellationToken stoppingToken) => - await _subscriberClient.StopAsync(stoppingToken); + public override async Task StopAsync(CancellationToken stoppingToken) + { + var shutdownOptions = new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately}; + await _subscriberClient.StopAsync(shutdownOptions, cancellationToken: stoppingToken); + } } // End sample } diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs index a69cbc7442c8..aa2d362dbf4c 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs @@ -129,7 +129,11 @@ await subscriber.StartAsync((msg, cancellationToken) => Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'"); // Stop this subscriber after one message is received. // This is non-blocking, and the returned Task may be awaited. - subscriber.StopAsync(TimeSpan.FromSeconds(15)); + subscriber.StopAsync(new SubscriberClient.ShutdownOptions + { + Mode = SubscriberClient.ShutdownMode.NackImmediately, + Timeout =TimeSpan.FromSeconds(15) + }); // Return Reply.Ack to indicate this message has been handled. return Task.FromResult(SubscriberClient.Reply.Ack); }); diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs index e518397b14ef..3c72b74421e8 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs @@ -594,7 +594,9 @@ public void ImmediateStop_Obsolete( }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); +#pragma warning restore CS0618 Assert.Equal(hardStop, isCancelled); Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); @@ -643,7 +645,9 @@ public void StopBeforeStart_Obsolete() await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); var exception = await Assert.ThrowsAsync( async () => await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(TimeSpan.FromHours(1)))); +#pragma warning restore CS0618 Assert.Equal("Can only stop a started instance.", exception.Message); Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); @@ -703,7 +707,9 @@ await fake.TaskHelper.ConfigureAwaitHideCancellation( () => fake.Subscriber.DisposeAsync().AsTask()); // Call StopAsync. It shouldn't throw an exception. await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(CancellationToken.None)); +#pragma warning restore CS0618 Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); @@ -810,8 +816,10 @@ public void RecvManyMsgsNoErrors_Obsolete( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); +#pragma warning disable CS0618 // allow use of obsolete method var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( () => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); +#pragma warning restore CS0618 Assert.Equal(hardStop, isCancelled); Assert.Equal(clientCount, fake.Subscribers.Count); Assert.Equal(expectedMsgCount, handledMsgs.Locked(() => handledMsgs.Count)); @@ -984,7 +992,9 @@ public void FlowControl_Obsolete( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); +#pragma warning disable CS0618 // allow use of obsolete method await fake.TaskHelper.ConfigureAwaitHideCancellation(() => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); +#pragma warning restore CS0618 Assert.Equal(expectedMsgCount, handledMsgs.Count); }); } @@ -2239,7 +2249,9 @@ public void Shutdown_SoftStop_NacksMessages() await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, givenToMessageHandler, strict: true); +#pragma warning disable CS0618 // allow use of obsolete method await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); +#pragma warning restore CS0618 // msg0-4 were handled/acked; msg5-6 were pulled but nacked on shutdown. Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs index fa9cc066649c..635367c09a9e 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs @@ -217,6 +217,7 @@ public virtual Task StartAsync(FuncCancel this to abort handlers and acknowledgement. /// A that completes when all handled messages have been acknowledged; /// faults on unrecoverable service errors; or cancels if is cancelled. + [Obsolete("Use StopAsync(ShutdownOption, TimeSpan?, CancellationToken) instead.")] public virtual Task StopAsync(CancellationToken hardStopToken) => throw new NotImplementedException(); /// @@ -229,6 +230,7 @@ public virtual Task StartAsync(FuncAfter this period, abort handling and acknowledging messages. /// A that completes when all handled messages have been acknowledged; /// faults on unrecoverable service errors; or cancels if expires. + [Obsolete("Use StopAsync(ShutdownOption, TimeSpan?, CancellationToken) instead.")] public virtual Task StopAsync(TimeSpan timeout) => StopAsync(new CancellationTokenSource(timeout).Token); /// diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs index 692933d3d9ff..e04c9106d84f 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs @@ -203,10 +203,13 @@ public override ValueTask DisposeAsync() return new ValueTask(Task.CompletedTask); } } +#pragma warning disable CS0618 // allow use of obsolete method return new ValueTask(StopAsync(_disposeTimeout)); +#pragma warning restore CS0618 } /// + [Obsolete("Use StopAsync(ShutdownOption, TimeSpan?, CancellationToken) instead.")] public override Task StopAsync(CancellationToken hardStopToken) { lock (_lock) diff --git a/apis/Google.Cloud.PubSub.V1/docs/index.md b/apis/Google.Cloud.PubSub.V1/docs/index.md index b50e41fb7f69..83877dd92ad1 100644 --- a/apis/Google.Cloud.PubSub.V1/docs/index.md +++ b/apis/Google.Cloud.PubSub.V1/docs/index.md @@ -172,6 +172,15 @@ Below is an example implementation of a console application that utilizes the de {{sample:SubscriberClient.UseSubscriberServiceInConsoleApp}} +## Subscriber shutdown + +When shutting down a `SubscriberClient`, two different shutdown flows are available via the `StopAsync(ShutdownOptions, CancellationToken)` method: + +- **NackImmediately**: This immediately stops streaming new messages and then actively sends "Nack" (Negative Acknowledgement) responses for any messages that have been received but have not yet finished being handled. This allows those messages to be quickly redelivered to other active subscribers. +- **WaitForProcessing**: This immediately stops streaming new messages from the server but continues to process all messages that have already been received. If processing does not complete 30s before the specified timeout, the client will switch to NackImmediately to release any remaining messages. + +By default, a 1-hour timeout is used for the shutdown process, which can be customized as needed. When the timeout is reached or if the `CancellationToken` is invoked this will trigger an immediate hard stop, aborting all outstanding tasks. + ## Disposing of the publisher and subscriber clients Both `PublisherClient` and `SubscriberClient` implement the `IAsyncDisposable` interface, From 73fcceae5dc007e80ef2ba70e9ccc484c02a3873 Mon Sep 17 00:00:00 2001 From: Robert Voinescu Date: Thu, 26 Mar 2026 17:17:21 +0000 Subject: [PATCH 5/5] SQUASH BEFORE MERGE - Address Comments. --- .../SubscriberClientImpl.SingleChannel.cs | 185 +++++++++--------- .../SubscriberClientImpl.cs | 46 +++-- 2 files changed, 121 insertions(+), 110 deletions(-) diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs index 97a5e32cc280..42166e8a2572 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs @@ -139,7 +139,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) // The keys in the dictionary represents the Ack ID of the message, while value contains the receipt ModAck status. // A value of null indicates that the status is not yet started or in progress. A value of true indicates success, and false indicates failure (which can be temporary or permanent). private readonly ConcurrentDictionary _receiptModAckStatusLookup = new(); - private readonly object _lock = new object(); // For: _ackQueue, _nackQueue, _messagesInFlight + private readonly object _lock = new object(); // For: _ackQueue, _nackQueue, _messagesInHandler private readonly Action _registerTaskFn; private readonly TaskHelper _taskHelper; private readonly IScheduler _scheduler; @@ -169,7 +169,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) private readonly RequeueableQueue _nackQueue = new RequeueableQueue(); private int _pushInFlight = 0; - private readonly HashSet _messagesInFlight = new HashSet(); + private readonly HashSet _messagesInHandler = new HashSet(); private readonly HashSet _nackedOnShutdownIds = new HashSet(); private SubscriberServiceApiClient.StreamingPullStream _pull = null; private int _concurrentPushCount = 0; @@ -262,8 +262,7 @@ internal async Task StartAsync() } } _logger?.LogDebug("Subscriber task completed."); - // Stop waiting for data to push and handler tasks. - _waitForHandlerCts.Cancel(); + // Stop waiting for data to push. _ackNackCts.Cancel(); } @@ -275,7 +274,7 @@ private bool IsPushComplete() // Lock required for ackQueue and nackQueue. lock (_lock) { - return _ackQueue.Count == 0 && _nackQueue.Count == 0 && _pushInFlight == 0 && _messagesInFlight.Count == 0; + return _ackQueue.Count == 0 && _nackQueue.Count == 0 && _pushInFlight == 0 && _messagesInHandler.Count == 0; } } @@ -514,52 +513,62 @@ private async Task ProcessPullMessagesAsync(List msgs, HashSet< // Running async. Common data needs locking for (int msgIndex = 0; msgIndex < msgs.Count; msgIndex++) { - if (_sendToHandlerCts.IsCancellationRequested) - { - break; - } + _sendToHandlerCts.ThrowIfCancellationRequested(); var msg = msgs[msgIndex]; msgs[msgIndex] = null; // Prepare to call user message handler, _flow.Process(...) enforces the user-handler concurrency constraints. await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), _messageOrderingEnabled ? msg.Message.OrderingKey ?? "" : "", async () => { - if (_sendToHandlerCts.IsCancellationRequested || _waitForHandlerCts.IsCancellationRequested) - { - return; - } // Running async. Common data needs locking lock (_lock) { - _messagesInFlight.Add(msg.AckId); + _messagesInHandler.Add(msg.AckId); } if (msg.DeliveryAttempt > 0) { msg.Message.Attributes[DeliveryAttemptAttrKey] = msg.DeliveryAttempt.ToString(CultureInfo.InvariantCulture); } // Call user message handler - var reply = await _taskHelper.ConfigureAwaitHideErrors(() => + bool ignoreReply = false; + var reply = await _taskHelper.ConfigureAwaitHideErrors(async () => { - return _handler.HandleMessage(msg.Message, _waitForHandlerCts.Token); + try + { + _sendToHandlerCts.ThrowIfCancellationRequested(); + var reply = await _taskHelper.ConfigureAwait(_handler.HandleMessage(msg.Message, _waitForHandlerCts.Token)); + _waitForHandlerCts.ThrowIfCancellationRequested(); + return reply; + } + catch (OperationCanceledException) + { + ignoreReply = true; + // A reply needs to be returned, so just throw here to return NAck which will just be + // ignored as the operation was cancelled. + throw; + } }, Reply.Nack); - // Lock ack/nack-queues, this is accessed concurrently here and in "master" thread. - lock (_lock) + if (ignoreReply) { - _messagesInFlight.Remove(msg.AckId); - if (_waitForHandlerCts.IsCancellationRequested) + lock (_lock) { - // Signal the event loop to re-evaluate the shutdown state and exit early without forwarding ack - // replies, all unhandled messages will be NAck'ed in HandleExtendLease(). - _eventPush.Set(); - return; + _messagesInHandler.Remove(msg.AckId); } - var queue = reply == Reply.Ack ? _ackQueue : _nackQueue; - queue.Enqueue(msg.AckId); } - // Lock msgsIds, this is accessed concurrently here and in HandleExtendLease(). - lock (leaseTracking) + else { - leaseTracking.Remove(msg.AckId); + // Lock msgsIds, this is accessed concurrently here and in HandleExtendLease(). + lock (leaseTracking) + { + leaseTracking.Remove(msg.AckId); + } + // Lock ack/nack-queues, this is accessed concurrently here and in "master" thread. + lock (_lock) + { + _messagesInHandler.Remove(msg.AckId); + var queue = reply == Reply.Ack ? _ackQueue : _nackQueue; + queue.Enqueue(msg.AckId); + } } // Ids have been added to ack/nack-queue, so trigger a push. _eventPush.Set(); @@ -567,24 +576,6 @@ await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), _messageOrde } } - private void Nack(IEnumerable ackIds, HashSet leaseTracking) - { - var idsToNack = ackIds.ToList(); - if (idsToNack.Count == 0) - { - return; - } - - lock (_lock) - { - _nackQueue.Enqueue(idsToNack); - } - lock (leaseTracking) - { - leaseTracking.ExceptWith(idsToNack); - } - _eventPush.Set(); - } private class LeaseCancellation { @@ -602,8 +593,8 @@ public CancellationToken Token } } - public LeaseCancellation(CancellationTokenSource leaseExtensionCts) => - _cts = CancellationTokenSource.CreateLinkedTokenSource(leaseExtensionCts.Token); + public LeaseCancellation(CancellationTokenSource parentCts) => + _cts = CancellationTokenSource.CreateLinkedTokenSource(parentCts.Token); public void Dispose() { @@ -640,23 +631,9 @@ public void Cancel() private void HandleExtendLease(HashSet leaseTracking, LeaseCancellation cancellation) { - if (_waitForHandlerCts.IsCancellationRequested) - { - // if we are no longer waiting on handlers then NAck messages for redelivery. - _nackedOnShutdownIds.UnionWith(leaseTracking); - Nack(ackIds: leaseTracking, leaseTracking); - return; - } - - if (_sendToHandlerCts.IsCancellationRequested) - { - // Nack messages that will not be sent to handler freeing them for redelivery. - // We continue extending leases for in flight messages. - var messagesNotBeingHandled = leaseTracking.Except(_messagesInFlight); - _nackedOnShutdownIds.UnionWith(messagesNotBeingHandled); - Nack(messagesNotBeingHandled, leaseTracking); - - } + // NAck any messages for which we do not want to continue extending + // leases for, freeing them for redelivery + MaybeNack(); // The first call to this method happens as soon as messages in this chunk start to be processed. // This triggers the server to start its lease timer. @@ -667,14 +644,11 @@ private void HandleExtendLease(HashSet leaseTracking, LeaseCancellation cancellation = new LeaseCancellation(_waitForHandlerCts.Source); Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () => { - // On cancellation we may have not reached the max extension duration so don't clear the lease - // tracking allowing the messages to be Nack'ed for redelivery. - if (!_waitForHandlerCts.IsCancellationRequested) + // Attempt to NAck messages that are not expired, but cancelled before stopping lease extension. + MaybeNack(); + lock (leaseTracking) { - lock (leaseTracking) - { - leaseTracking.Clear(); - } + leaseTracking.Clear(); } // This is executed when `_maxExtensionDuration` has expired, or when `cancellation` is cancelled, // Which ensures `cancellation` is aways disposed of. @@ -714,6 +688,40 @@ private void HandleExtendLease(HashSet leaseTracking, LeaseCancellation cancellation.Cancel(); } } + + void MaybeNack() + { + HashSet idsToNack = null; + if (_waitForHandlerCts.IsCancellationRequested) + { + // We are no longer waiting on handlers so Nack everything. + idsToNack = leaseTracking; + } + else if (_sendToHandlerCts.IsCancellationRequested) + { + // Nack messages that will not be sent to handler freeing them for redelivery. + // We continue extending leases for in flight messages. + idsToNack = new HashSet(leaseTracking.Except(_messagesInHandler)); + } + if (idsToNack == null || idsToNack.Count == 0) + { + return; + } + lock(_nackedOnShutdownIds) + { + _nackedOnShutdownIds.UnionWith(idsToNack); + } + lock (_lock) + { + _nackQueue.Enqueue(idsToNack); + } + lock (leaseTracking) + { + leaseTracking.ExceptWith(idsToNack); + } + _eventPush.Set(); + } + } private void HandlePush() @@ -1132,13 +1140,17 @@ IReadOnlyList UpdateOnShutdown(List responses) // If we are no longer waiting on message handlers then we may have NAck'ed some of the in-flight messages. if (IsStopStarted) { - for (int i = 0; i < responses.Count; i++) + lock (_nackedOnShutdownIds) { - var response = responses[i]; - if (_nackedOnShutdownIds.Contains(response.MessageId) && response.Status == AcknowledgementStatus.Success) + for (int i = 0; i < responses.Count; i++) { - // Override status to reflect the shutdown failure - responses[i] = new AckNackResponse(response.MessageId, AcknowledgementStatus.Other, "Failed on NackImmediately shutdown."); + + var response = responses[i]; + if (_nackedOnShutdownIds.Contains(response.MessageId) && response.Status == AcknowledgementStatus.Success) + { + // Override status to reflect the shutdown failure + responses[i] = new AckNackResponse(response.MessageId, AcknowledgementStatus.Other, "Failed on NackImmediately shutdown."); + } } } } @@ -1150,6 +1162,7 @@ IReadOnlyList UpdateOnShutdown(List responses) // A stop is initiated if any of the cancellation tokens have been invoked. private bool IsStopStarted => _pullCts.IsCancellationRequested || + _ackNackCts.IsCancellationRequested || _sendToHandlerCts.IsCancellationRequested || _waitForHandlerCts.IsCancellationRequested || _mainCts.IsCancellationRequested; @@ -1394,6 +1407,8 @@ public bool IsCancellationRequested { if (_parentTokens[i].IsCancellationRequested) { + // Cancel the linked soruce, since we already know it's parents are cancelled. + _linkedSource.Cancel(); return true; } } @@ -1417,20 +1432,12 @@ public bool IsCancellationRequested /// public void ThrowIfCancellationRequested() { - // Fast path: inner source already registered the cancellation - if (_linkedSource.IsCancellationRequested) + if (IsCancellationRequested) { + // IsCancellationRequested will synchronize _linkedSource with it's parents + // so no only need to throw from the linked token here. _linkedSource.Token.ThrowIfCancellationRequested(); } - - // Explicitly check parents to bypass callback delays - for (int i = 0; i < _parentTokens.Length; i++) - { - if (_parentTokens[i].IsCancellationRequested) - { - _parentTokens[i].ThrowIfCancellationRequested(); - } - } } /// diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs index e04c9106d84f..e30e9f793da4 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs @@ -131,7 +131,7 @@ public override Task StartAsync(SubscriptionHandler handler) _globalNackAndWaitCts = new CancellationTokenSource(); _globalHardStopCts = new CancellationTokenSource(); _globalNackImmediatelyCts = new CancellationTokenSource(); - _globalWaitForProcessingCts = CancellationTokenSource.CreateLinkedTokenSource(_globalNackImmediatelyCts.Token); + _globalWaitForProcessingCts = new CancellationTokenSource(); } var registeredTasks = new HashSet(); Action registerTask = task => @@ -242,7 +242,15 @@ public override Task StopAsync(ShutdownOptions shutdownOptions, CancellationToke return _mainTcs.Task; } // Signal to stop retrieving new messages immediately. - _globalWaitForProcessingCts.Cancel(); + switch (shutdownOptions.Mode) + { + case ShutdownMode.NackImmediately: + _globalNackImmediatelyCts.Cancel(); + break; + case ShutdownMode.WaitForProcessing: + _globalWaitForProcessingCts.Cancel(); + break; + } } TimeSpan shutdownTimeout = shutdownOptions.Timeout ?? _maxExtensionDuration; @@ -251,10 +259,14 @@ public override Task StopAsync(ShutdownOptions shutdownOptions, CancellationToke Logger?.LogWarning("Shutdown timeout is 0! Stopping {Client} immediately; work may be dropped.", nameof(SubscriberClient)); } - // Schedule the shutdown, we can always think of a shutdown as occuring in the following steps - // WaitForProcessing -> NackImmedieately and maybe a HardStop afterwards if the timeout is reached - // or the cancellation token is cancelled. Note a the Nack delay will be zero in the case of NackImmediately - CancelAfterDelay(_globalNackImmediatelyCts, CalculateNackDelay()); + if (shutdownOptions.Mode == ShutdownMode.WaitForProcessing) + { + // Schedule a NackImmediately shutdown mode to follow WaitForProcessing. + CancelAfterDelay(_globalNackImmediatelyCts, CalculateNackDelay()); + } + + // In all cases perform final HardStop if either the timeout is reached or the + // cancellation token is cancelled. CancelAfterDelay(_globalHardStopCts, shutdownTimeout); CancelTargetOnTrigger(targetSourceToCancel: _globalHardStopCts, triggerToken: cancellationToken); @@ -263,23 +275,15 @@ public override Task StopAsync(ShutdownOptions shutdownOptions, CancellationToke // Calculates the time until we switch to NackImmediately mode. TimeSpan CalculateNackDelay() { - switch (shutdownOptions.Mode) + // WaitForProcessing enters a Nack grace period before final shutdown if it was unable to finish + // handling all received messages. + TimeSpan delay = shutdownTimeout > s_finalNackReservedTime ? shutdownTimeout - s_finalNackReservedTime : TimeSpan.Zero; + if (delay == TimeSpan.Zero && shutdownTimeout > TimeSpan.Zero) { - case ShutdownMode.NackImmediately: - return TimeSpan.Zero; - case ShutdownMode.WaitForProcessing: - // WaitForProcessing enters a Nack grace period before final shutdown if it was unable to finish - // handling all received messages. - TimeSpan delay = shutdownTimeout > s_finalNackReservedTime ? shutdownTimeout - s_finalNackReservedTime : TimeSpan.Zero; - if (delay == TimeSpan.Zero && shutdownTimeout > TimeSpan.Zero) - { - Logger?.LogWarning("Shutdown timeout ({Timeout}) <= GracePeriod ({GracePeriod}). Nacking immediately.", - shutdownTimeout, s_finalNackReservedTime); - } - return delay; - default: - throw new ArgumentOutOfRangeException(nameof(shutdownOptions.Mode), shutdownOptions.Mode, null); + Logger?.LogWarning("Shutdown timeout ({Timeout}) <= GracePeriod ({GracePeriod}). Nacking immediately.", + shutdownTimeout, s_finalNackReservedTime); } + return delay; } }