diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs index d9958fae0..6437f38cc 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -33,8 +33,10 @@ protected override async Task ProcessChannelAsync() work.DeliveryTag, work.BasicProperties!, work.Body.Size)) { await work.Consumer.HandleBasicDeliverAsync( - work.ConsumerTag!, work.DeliveryTag, work.Redelivered, - work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) + work.ConsumerTag!, work.DeliveryTag, work.Redelivered, + work.Exchange!, work.RoutingKey!, work.BasicProperties!, + work.Body.Memory, + work.Consumer.Channel?.ChannelCancellationToken ?? default) .ConfigureAwait(false); } break; diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs index b3d4cfb74..044a36fba 100644 --- a/projects/RabbitMQ.Client/IChannel.cs +++ b/projects/RabbitMQ.Client/IChannel.cs @@ -450,5 +450,10 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey, /// timing out. /// TimeSpan ContinuationTimeout { get; set; } + + /// + /// The associated with channel closure. + /// + CancellationToken ChannelCancellationToken { get; } } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 0ea8a2699..10622bee7 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -136,6 +136,7 @@ public IEnumerable ConsumerTags public int ChannelNumber => InnerChannel.ChannelNumber; public ShutdownEventArgs? CloseReason => InnerChannel.CloseReason; + public CancellationToken ChannelCancellationToken => InnerChannel.ChannelCancellationToken; public IAsyncBasicConsumer? DefaultConsumer { diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 53bb47c80..aabf888ac 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -66,6 +66,8 @@ internal partial class Channel : IChannel, IRecoverable private bool _disposed; private int _isDisposing; + private CancellationTokenSource _closeAsyncCts = new CancellationTokenSource(); + public Channel(ISession session, CreateChannelOptions createChannelOptions) { ContinuationTimeout = createChannelOptions.ContinuationTimeout; @@ -85,6 +87,8 @@ public Channel(ISession session, CreateChannelOptions createChannelOptions) Session = session; } + public CancellationToken ChannelCancellationToken => _closeAsyncCts.Token; + internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan ContinuationTimeout { get; set; } @@ -208,13 +212,6 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort, public async Task CloseAsync(ShutdownEventArgs args, bool abort, CancellationToken cancellationToken) { - CancellationToken argCancellationToken = cancellationToken; - if (IsOpen) - { - // Note: we really do need to try and close this channel! - cancellationToken = CancellationToken.None; - } - bool enqueued = false; var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken); @@ -236,6 +233,8 @@ await ModelSendAsync(in method, k.CancellationToken) AssertResultIsTrue(await k); + _closeAsyncCts.Cancel(); + await ConsumerDispatcher.WaitForShutdownAsync() .ConfigureAwait(false); } @@ -265,7 +264,7 @@ await ConsumerDispatcher.WaitForShutdownAsync() MaybeDisposeContinuation(enqueued, k); _rpcSemaphore.Release(); ChannelShutdownAsync -= k.OnConnectionShutdownAsync; - argCancellationToken.ThrowIfCancellationRequested(); + cancellationToken.ThrowIfCancellationRequested(); } } @@ -591,6 +590,7 @@ protected virtual void Dispose(bool disposing) { _rpcSemaphore.Dispose(); _confirmSemaphore.Dispose(); + _closeAsyncCts.Dispose(); } catch { diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index b78bae4b2..8f8c345ee 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -5,6 +5,7 @@ RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong p RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string! RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string! +RabbitMQ.Client.IChannel.ChannelCancellationToken.get -> System.Threading.CancellationToken RabbitMQ.Client.RabbitMQTracingOptions RabbitMQ.Client.RabbitMQTracingOptions.RabbitMQTracingOptions() -> void RabbitMQ.Client.RabbitMQTracingOptions.UsePublisherAsParent.get -> bool