Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using static Google.Cloud.PubSub.V1.SubscriberClient;

// Tests create quite a few tasks that don't need awaiting.
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Expand Down Expand Up @@ -135,6 +136,7 @@ private async Task RunBulkMessagingImpl(
long recvSum = 0L; // Sum of bytes of received messages
var recvedIds = new ConcurrentDictionary<int, bool>();
var nackedIds = new HashSet<int>();
using CancellationTokenSource stopCancellation = new CancellationTokenSource();
Task subTask = subscriber.StartAsync((msg, ct) =>
{
int id = BitConverter.ToInt32(msg.Data.ToArray(), 0);
Expand All @@ -159,7 +161,8 @@ private async Task RunBulkMessagingImpl(
{
// Test finished, so stop subscriber
Console.WriteLine("All msgs received, stopping subscriber.");
Task unused = subscriber.StopAsync(TimeSpan.FromSeconds(15));
stopCancellation.CancelAfter(TimeSpan.FromSeconds(15));
Task unused = subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately }, stopCancellation.Token);
}
}
else
Expand Down Expand Up @@ -196,7 +199,7 @@ private async Task RunBulkMessagingImpl(
{
// Deadlock, shutdown subscriber, and cancel
Console.WriteLine("Deadlock detected. Cancelling test");
subscriber.StopAsync(new CancellationToken(true));
subscriber.StopAsync(new ShutdownOptions(), new CancellationToken(true));
watchdogCts.Cancel();
break;
}
Expand Down Expand Up @@ -257,7 +260,11 @@ private async Task RunBulkMessagingImpl(

if (cancelAfterRecvCount is int cancelAfter)
{
Assert.True(recvCount >= cancelAfter && recvCount <= cancelAfter + maxMessagesInFlight, $"Incorrect recvCount: {recvCount}");
// Because we are using linked tokens for cancellation, which do not guarantee
// atomicity between when a parent token is cancelled to when the linked token is cancelled,
// there's still a chance that we get some more messages after cancelling, even with NackInmediately.
// So we expecte up to 3 times messages in flight after the cutoff message count.
Assert.True(recvCount >= cancelAfter && recvCount <= cancelAfter + 3 * maxMessagesInFlight, $"Incorrect recvCount: {recvCount}");
}
else
{
Expand Down Expand Up @@ -431,9 +438,12 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre
});
await Task.Delay(subscriberLifetime);
Console.WriteLine("Stopping subscriber");
Task stopTask = subscriber.StopAsync(TimeSpan.FromSeconds(15));
// If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail.
await Task.WhenAll(subscribeTask, stopTask);
using var stopCancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(15));
{
Task stopTask = subscriber.StopAsync(new ShutdownOptions { Mode = ShutdownMode.NackImmediately }, stopCancellationSource.Token);
// If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail.
await Task.WhenAll(subscribeTask, stopTask);
}
int recvCount = recvedMsgs.Locked(() => recvedMsgs.Count);
Console.WriteLine($"Stopped subscriber. Recv count: {recvCount}");
if (prevRecvCount == recvCount)
Expand Down Expand Up @@ -534,12 +544,18 @@ await subscriberApi.IAMPolicyClient.SetIamPolicyAsync(new SetIamPolicyRequest
return Task.FromResult(SubscriberClient.Reply.Nack);
});

using CancellationTokenSource stopCancellation = new CancellationTokenSource();
var taskDlqSub = dlqSub.StartAsync((msg, ct) =>
{
result.Add((msg.GetDeliveryAttempt(), true));
// Received DLQ message, so stop test.
sub.StopAsync(TimeSpan.FromSeconds(10));
dlqSub.StopAsync(TimeSpan.FromSeconds(10));
var shutdownOptions = new ShutdownOptions
{
Mode = ShutdownMode.NackImmediately,
};
stopCancellation.CancelAfter(TimeSpan.FromSeconds(10));
sub.StopAsync(shutdownOptions, stopCancellation.Token);
dlqSub.StopAsync(shutdownOptions, stopCancellation.Token);
return Task.FromResult(SubscriberClient.Reply.Ack);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Task Subscribe()
if (recvCount == inputLines.Count)
{
Console.WriteLine("Received all messages, shutting down");
var dummyTask = sub.StopAsync(CancellationToken.None);
var dummyTask = sub.StopAsync(new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately});
}
}
if (rnd.Next(3) == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ await _subscriberClient.StartAsync((msg, token) =>
return Task.FromResult(SubscriberClient.Reply.Ack);
});

public override async Task StopAsync(CancellationToken stoppingToken) =>
await _subscriberClient.StopAsync(stoppingToken);
public override async Task StopAsync(CancellationToken stoppingToken)
{
var shutdownOptions = new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately};
await _subscriberClient.StopAsync(shutdownOptions, cancellationToken: stoppingToken);
}
}
// End sample
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ await subscriber.StartAsync((msg, cancellationToken) =>
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
// Stop this subscriber after one message is received.
// This is non-blocking, and the returned Task may be awaited.
subscriber.StopAsync(TimeSpan.FromSeconds(15));
subscriber.StopAsync(new SubscriberClient.ShutdownOptions
{
Mode = SubscriberClient.ShutdownMode.NackImmediately,
Timeout =TimeSpan.FromSeconds(15)
});
// Return Reply.Ack to indicate this message has been handled.
return Task.FromResult(SubscriberClient.Reply.Ack);
});
Expand Down
Loading
Loading