From 882ea20efdb34be3e0a46785bfd4dd2759bb2c2e Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Mon, 26 Jan 2026 17:39:52 -0800 Subject: [PATCH 1/5] Fix GrpcChannel handle leak in AzureManaged backend - Add GrpcChannelCache for thread-safe channel caching by endpoint - Update Client/Worker extensions to use shared cache - Ensure channels are disposed when ServiceProvider disposes - Add comprehensive unit and integration tests --- .../DurableTaskSchedulerClientExtensions.cs | 20 +- src/Shared/AzureManaged/GrpcChannelCache.cs | 173 +++++++ .../DurableTaskSchedulerWorkerExtensions.cs | 20 +- ...rableTaskSchedulerClientExtensionsTests.cs | 53 +++ .../GrpcChannelCacheTests.cs | 427 ++++++++++++++++++ ...rableTaskSchedulerWorkerExtensionsTests.cs | 54 +++ 6 files changed, 741 insertions(+), 6 deletions(-) create mode 100644 src/Shared/AzureManaged/GrpcChannelCache.cs create mode 100644 test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index b11f450a6..bdcb1246a 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using Azure.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -91,6 +92,10 @@ static void ConfigureSchedulerOptions( options.EnableEntitySupport = true; }); + // Register the channel cache as a singleton to ensure channels are reused + // and properly disposed when the service provider is disposed. + builder.Services.TryAddSingleton(); + builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -101,7 +106,10 @@ static void ConfigureSchedulerOptions( /// using the provided Durable Task Scheduler options. /// /// Monitor for accessing the current scheduler options configuration. - class ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) : + /// Cache for gRPC channels to ensure reuse and proper disposal. + class ConfigureGrpcChannel( + IOptionsMonitor schedulerOptions, + GrpcChannelCache channelCache) : IConfigureNamedOptions { /// @@ -117,8 +125,14 @@ class ConfigureGrpcChannel(IOptionsMonitor sc /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskClientOptions options) { - DurableTaskSchedulerClientOptions source = schedulerOptions.Get(name ?? Options.DefaultName); - options.Channel = source.CreateChannel(); + string optionsName = name ?? Options.DefaultName; + DurableTaskSchedulerClientOptions source = schedulerOptions.Get(optionsName); + + // Create a cache key based on the options name, endpoint, and task hub. + // This ensures channels are reused for the same configuration + // but separate channels are created for different configurations. + string cacheKey = $"client:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel()); } } } diff --git a/src/Shared/AzureManaged/GrpcChannelCache.cs b/src/Shared/AzureManaged/GrpcChannelCache.cs new file mode 100644 index 000000000..3235ae828 --- /dev/null +++ b/src/Shared/AzureManaged/GrpcChannelCache.cs @@ -0,0 +1,173 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Concurrent; +using Grpc.Net.Client; + +namespace Microsoft.DurableTask; + +/// +/// Thread-safe cache for gRPC channels that ensures channels are reused across retries/calls +/// and properly disposed when replaced or evicted. +/// +sealed class GrpcChannelCache : IDisposable +{ + readonly ConcurrentDictionary channels = new(); + readonly object syncLock = new(); + volatile bool disposed; + + /// + /// Gets or creates a cached gRPC channel for the specified key. + /// If a channel already exists for the key, it is returned. + /// If the factory creates a new channel, any existing channel for the key is disposed. + /// + /// The cache key (typically endpoint + taskhub combination). + /// Factory function to create a new channel if needed. + /// The cached or newly created gRPC channel. + public GrpcChannel GetOrCreate(string key, Func channelFactory) + { + Check.NotNullOrEmpty(key); + Check.NotNull(channelFactory); + + if (this.disposed) + { + throw new ObjectDisposedException(nameof(GrpcChannelCache)); + } + + // Fast path: return existing channel + if (this.channels.TryGetValue(key, out GrpcChannel? existingChannel)) + { + return existingChannel; + } + + // Slow path: create new channel with synchronization to avoid creating multiple channels + lock (this.syncLock) + { + if (this.disposed) + { + throw new ObjectDisposedException(nameof(GrpcChannelCache)); + } + + // Double-check after acquiring lock + if (this.channels.TryGetValue(key, out existingChannel)) + { + return existingChannel; + } + + GrpcChannel newChannel = channelFactory(); + this.channels[key] = newChannel; + return newChannel; + } + } + + /// + /// Replaces an existing channel for the specified key with a new one, + /// disposing the old channel if it exists. + /// + /// The cache key. + /// The new channel to cache. + public void Replace(string key, GrpcChannel newChannel) + { + Check.NotNullOrEmpty(key); + Check.NotNull(newChannel); + + if (this.disposed) + { + throw new ObjectDisposedException(nameof(GrpcChannelCache)); + } + + GrpcChannel? oldChannel = null; + + lock (this.syncLock) + { + if (this.disposed) + { + throw new ObjectDisposedException(nameof(GrpcChannelCache)); + } + + if (this.channels.TryGetValue(key, out oldChannel)) + { + // Only replace if it's actually a different channel + if (ReferenceEquals(oldChannel, newChannel)) + { + return; + } + } + + this.channels[key] = newChannel; + } + + // Dispose the old channel outside the lock to avoid potential deadlocks + DisposeChannelAsync(oldChannel); + } + + /// + /// Removes and disposes a channel for the specified key. + /// + /// The cache key. + /// True if a channel was removed; otherwise, false. + public bool TryRemove(string key) + { + Check.NotNullOrEmpty(key); + + if (this.channels.TryRemove(key, out GrpcChannel? channel)) + { + DisposeChannelAsync(channel); + return true; + } + + return false; + } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + lock (this.syncLock) + { + if (this.disposed) + { + return; + } + + this.disposed = true; + + foreach (KeyValuePair kvp in this.channels) + { + DisposeChannelAsync(kvp.Value); + } + + this.channels.Clear(); + } + } + + static void DisposeChannelAsync(GrpcChannel? channel) + { + if (channel == null) + { + return; + } + + // ShutdownAsync is the graceful way to close a gRPC channel + // We fire-and-forget but ensure the channel is eventually disposed + _ = Task.Run(async () => + { + try + { + await channel.ShutdownAsync(); + } + catch + { + // Ignore shutdown errors during disposal + } + finally + { + channel.Dispose(); + } + }); + } +} diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 7a3baa414..7360dd124 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using Azure.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; using Microsoft.DurableTask.Worker.Grpc.Internal; using Microsoft.Extensions.DependencyInjection; @@ -93,6 +94,10 @@ static void ConfigureSchedulerOptions( options.EnableEntitySupport = true; }); + // Register the channel cache as a singleton to ensure channels are reused + // and properly disposed when the service provider is disposed. + builder.Services.TryAddSingleton(); + builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -103,7 +108,10 @@ static void ConfigureSchedulerOptions( /// using the provided Durable Task Scheduler options. /// /// Monitor for accessing the current scheduler options configuration. - class ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) : + /// Cache for gRPC channels to ensure reuse and proper disposal. + class ConfigureGrpcChannel( + IOptionsMonitor schedulerOptions, + GrpcChannelCache channelCache) : IConfigureNamedOptions { /// @@ -119,8 +127,14 @@ class ConfigureGrpcChannel(IOptionsMonitor sc /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { - DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName); - options.Channel = source.CreateChannel(); + string optionsName = name ?? Options.DefaultName; + DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(optionsName); + + // Create a cache key based on the options name, endpoint, and task hub. + // This ensures channels are reused for the same configuration + // but separate channels are created for different configurations. + string cacheKey = $"worker:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel()); options.ConfigureForAzureManaged(); } } diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 3c760ddf6..77e64055e 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -280,4 +280,57 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo clientOptions.RetryOptions.RetryableStatusCodes.Should().Contain(StatusCode.Unknown); } } + + [Fact] + public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options multiple times to trigger channel configuration + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); + } + + [Fact] + public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two different named clients with different endpoints + mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); + } } diff --git a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs new file mode 100644 index 000000000..bb5b7cd7e --- /dev/null +++ b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs @@ -0,0 +1,427 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Grpc.Net.Client; +using Xunit; + +namespace Microsoft.DurableTask.Tests; + +public class GrpcChannelCacheTests +{ + const string TestEndpoint = "http://localhost:5000"; + + [Fact] + public void GetOrCreate_SameKey_ReturnsSameChannel() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "test-key"; + int factoryCallCount = 0; + GrpcChannel Factory() + { + factoryCallCount++; + return GrpcChannel.ForAddress(TestEndpoint); + } + + // Act + GrpcChannel channel1 = cache.GetOrCreate(key, Factory); + GrpcChannel channel2 = cache.GetOrCreate(key, Factory); + + // Assert + channel1.Should().BeSameAs(channel2); + factoryCallCount.Should().Be(1, "factory should only be called once for the same key"); + } + + [Fact] + public void GetOrCreate_DifferentKeys_ReturnsDifferentChannels() + { + // Arrange + using GrpcChannelCache cache = new(); + string key1 = "key1"; + string key2 = "key2"; + + // Act + GrpcChannel channel1 = cache.GetOrCreate(key1, () => GrpcChannel.ForAddress(TestEndpoint)); + GrpcChannel channel2 = cache.GetOrCreate(key2, () => GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + channel1.Should().NotBeSameAs(channel2); + } + + [Fact] + public void GetOrCreate_ConcurrentAccess_CreatesSingleChannel() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "concurrent-key"; + int factoryCallCount = 0; + object countLock = new(); + GrpcChannel Factory() + { + lock (countLock) + { + factoryCallCount++; + } + + // Add small delay to increase chance of race conditions + Thread.Sleep(10); + return GrpcChannel.ForAddress(TestEndpoint); + } + + // Act + GrpcChannel[] channels = new GrpcChannel[10]; + Parallel.For(0, 10, i => + { + channels[i] = cache.GetOrCreate(key, Factory); + }); + + // Assert + factoryCallCount.Should().Be(1, "factory should only be called once even with concurrent access"); + channels.All(c => ReferenceEquals(c, channels[0])).Should().BeTrue("all channels should be the same instance"); + } + + [Fact] + public void Replace_ExistingChannel_DisposesOldChannel() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "replace-key"; + GrpcChannel oldChannel = GrpcChannel.ForAddress(TestEndpoint); + GrpcChannel newChannel = GrpcChannel.ForAddress(TestEndpoint); + cache.GetOrCreate(key, () => oldChannel); + + // Act + cache.Replace(key, newChannel); + GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); + + // Assert + retrievedChannel.Should().BeSameAs(newChannel); + retrievedChannel.Should().NotBeSameAs(oldChannel); + } + + [Fact] + public void Replace_SameChannel_DoesNothing() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "same-channel-key"; + GrpcChannel channel = GrpcChannel.ForAddress(TestEndpoint); + cache.GetOrCreate(key, () => channel); + + // Act & Assert - should not throw or change anything + cache.Replace(key, channel); + GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); + retrievedChannel.Should().BeSameAs(channel); + } + + [Fact] + public void Replace_NonExistingKey_AddsChannel() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "new-key"; + GrpcChannel channel = GrpcChannel.ForAddress(TestEndpoint); + + // Act + cache.Replace(key, channel); + GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); + + // Assert + retrievedChannel.Should().BeSameAs(channel); + } + + [Fact] + public void TryRemove_ExistingKey_RemovesAndReturnsTrue() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "remove-key"; + cache.GetOrCreate(key, () => GrpcChannel.ForAddress(TestEndpoint)); + + // Act + bool result = cache.TryRemove(key); + + // Assert + result.Should().BeTrue(); + + // Verify the key is removed by checking that a new channel is created + int factoryCallCount = 0; + cache.GetOrCreate(key, () => + { + factoryCallCount++; + return GrpcChannel.ForAddress(TestEndpoint); + }); + factoryCallCount.Should().Be(1, "a new channel should be created after removal"); + } + + [Fact] + public void TryRemove_NonExistingKey_ReturnsFalse() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "non-existing-key"; + + // Act + bool result = cache.TryRemove(key); + + // Assert + result.Should().BeFalse(); + } + + [Fact] + public void Dispose_DisposesAllChannels() + { + // Arrange + GrpcChannelCache cache = new(); + cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); + cache.GetOrCreate("key2", () => GrpcChannel.ForAddress(TestEndpoint)); + cache.GetOrCreate("key3", () => GrpcChannel.ForAddress(TestEndpoint)); + + // Act + cache.Dispose(); + + // Assert - attempting to use the cache after disposal should throw + Action action = () => cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); + action.Should().Throw(); + } + + [Fact] + public void Dispose_MultipleCalls_DoesNotThrow() + { + // Arrange + GrpcChannelCache cache = new(); + cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); + + // Act & Assert - multiple dispose calls should not throw + cache.Dispose(); + cache.Dispose(); + cache.Dispose(); + } + + [Fact] + public void GetOrCreate_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + GrpcChannelCache cache = new(); + cache.Dispose(); + + // Act + Action action = () => cache.GetOrCreate("key", () => GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw() + .WithMessage("*GrpcChannelCache*"); + } + + [Fact] + public void Replace_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + GrpcChannelCache cache = new(); + cache.Dispose(); + + // Act + Action action = () => cache.Replace("key", GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw() + .WithMessage("*GrpcChannelCache*"); + } + + [Fact] + public void GetOrCreate_WithNullKey_ThrowsArgumentException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.GetOrCreate(null!, () => GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void GetOrCreate_WithEmptyKey_ThrowsArgumentException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.GetOrCreate(string.Empty, () => GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void GetOrCreate_WithNullFactory_ThrowsArgumentNullException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.GetOrCreate("key", null!); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void Replace_WithNullKey_ThrowsArgumentException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.Replace(null!, GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void Replace_WithNullChannel_ThrowsArgumentNullException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.Replace("key", null!); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void TryRemove_WithNullKey_ThrowsArgumentException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.TryRemove(null!); + + // Assert + action.Should().Throw(); + } + + /// + /// This test verifies the core fix for the handle leak issue. + /// Without the cache, each call to configure options would create a new GrpcChannel, + /// causing handle count to grow unbounded when the service is unreachable. + /// With the cache, repeated calls reuse the same channel, preventing handle leaks. + /// + [Fact] + public void GetOrCreate_SimulatesRetryScenario_DoesNotCreateMultipleChannels() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "client:default:myendpoint.durabletask.io:myhub"; + int factoryCallCount = 0; + + GrpcChannel CreateChannel() + { + factoryCallCount++; + // Each GrpcChannel creates HttpClient + SocketsHttpHandler internally, + // which allocates socket handles. Without caching, this would leak handles. + return GrpcChannel.ForAddress(TestEndpoint); + } + + // Act - Simulate what happens during retries when service is unreachable: + // The options configuration callback may be invoked multiple times + const int retryAttempts = 100; + GrpcChannel[] channels = new GrpcChannel[retryAttempts]; + for (int i = 0; i < retryAttempts; i++) + { + channels[i] = cache.GetOrCreate(key, CreateChannel); + } + + // Assert - The factory should only be called ONCE, not 100 times + // This is the key behavior that prevents handle accumulation + factoryCallCount.Should().Be(1, + "the channel factory should only be called once regardless of how many times GetOrCreate is called - " + + "this is what prevents handle leaks when the service is unreachable"); + + // All returned channels should be the exact same instance + channels.All(c => ReferenceEquals(c, channels[0])).Should().BeTrue( + "all calls should return the same cached channel instance"); + } + + /// + /// Verifies that the old behavior (without cache) would create multiple channels. + /// This demonstrates what the cache prevents. + /// + [Fact] + public void WithoutCache_MultipleCallsCreateMultipleChannels() + { + // Arrange - simulate old behavior without cache + int factoryCallCount = 0; + List channels = new(); + + GrpcChannel CreateChannelWithoutCache() + { + factoryCallCount++; + return GrpcChannel.ForAddress(TestEndpoint); + } + + // Act - Without caching, each "retry" creates a new channel + const int retryAttempts = 10; + for (int i = 0; i < retryAttempts; i++) + { + // This simulates the OLD behavior before the fix + channels.Add(CreateChannelWithoutCache()); + } + + // Assert - Each call creates a new channel (the problematic behavior we fixed) + factoryCallCount.Should().Be(retryAttempts, + "without caching, each call creates a new channel - this causes handle leaks"); + + // All channels are different instances + channels.Distinct().Count().Should().Be(retryAttempts, + "without caching, each channel is a unique instance with its own handles"); + + // Cleanup + foreach (var channel in channels) + { + channel.Dispose(); + } + } + + /// + /// Verifies channels are properly disposed when the cache is disposed, + /// which releases the associated handles. + /// + [Fact] + public async Task Dispose_ReleasesChannelResources() + { + // Arrange + GrpcChannelCache cache = new(); + List createdChannels = new(); + + // Create multiple channels through the cache + for (int i = 0; i < 5; i++) + { + string key = $"key{i}"; + GrpcChannel channel = cache.GetOrCreate(key, () => + { + var c = GrpcChannel.ForAddress(TestEndpoint); + createdChannels.Add(c); + return c; + }); + } + + createdChannels.Count.Should().Be(5); + + // Act - Dispose the cache (this should dispose all channels) + cache.Dispose(); + + // Wait a bit for async disposal to complete + await Task.Delay(100); + + // Assert - The cache should be disposed and unusable + Action action = () => cache.GetOrCreate("new-key", () => GrpcChannel.ForAddress(TestEndpoint)); + action.Should().Throw(); + } +} diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index a661b53d3..1510c7e56 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -198,4 +198,58 @@ public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() options.ResourceId.Should().Be("https://durabletask.io"); options.AllowInsecureCredentials.Should().BeFalse(); } + + [Fact] + public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options multiple times to trigger channel configuration + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get(Options.DefaultName); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get(Options.DefaultName); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); + } + + [Fact] + public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two different named workers with different endpoints + mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); + } } + From 387c960168bf90b136e4e24f781e0d33d661a06b Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Mon, 26 Jan 2026 17:53:23 -0800 Subject: [PATCH 2/5] Address PR review comments and fix potential deadlock - Move channel factory call outside lock to prevent deadlock - Combine nested if statements in Replace method - Use 'using' statement for channel disposal - Catch Exception instead of bare catch - Remove unused variable in test --- src/Shared/AzureManaged/GrpcChannelCache.cs | 40 ++++++++++--------- .../GrpcChannelCacheTests.cs | 4 +- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/Shared/AzureManaged/GrpcChannelCache.cs b/src/Shared/AzureManaged/GrpcChannelCache.cs index 3235ae828..13694aeb9 100644 --- a/src/Shared/AzureManaged/GrpcChannelCache.cs +++ b/src/Shared/AzureManaged/GrpcChannelCache.cs @@ -40,21 +40,26 @@ public GrpcChannel GetOrCreate(string key, Func channelFactory) return existingChannel; } - // Slow path: create new channel with synchronization to avoid creating multiple channels + // Create channel outside lock to avoid potential deadlock if factory calls back into cache + GrpcChannel newChannel = channelFactory(); + lock (this.syncLock) { if (this.disposed) { + // Cache was disposed while we were creating the channel - dispose and throw + DisposeChannelAsync(newChannel); throw new ObjectDisposedException(nameof(GrpcChannelCache)); } - // Double-check after acquiring lock + // Check if another thread added a channel while we were creating ours if (this.channels.TryGetValue(key, out existingChannel)) { + // Dispose our duplicate and return the existing one + DisposeChannelAsync(newChannel); return existingChannel; } - GrpcChannel newChannel = channelFactory(); this.channels[key] = newChannel; return newChannel; } @@ -85,13 +90,11 @@ public void Replace(string key, GrpcChannel newChannel) throw new ObjectDisposedException(nameof(GrpcChannelCache)); } - if (this.channels.TryGetValue(key, out oldChannel)) + // Only replace if it's actually a different channel + if (this.channels.TryGetValue(key, out oldChannel) && + ReferenceEquals(oldChannel, newChannel)) { - // Only replace if it's actually a different channel - if (ReferenceEquals(oldChannel, newChannel)) - { - return; - } + return; } this.channels[key] = newChannel; @@ -156,17 +159,16 @@ static void DisposeChannelAsync(GrpcChannel? channel) // We fire-and-forget but ensure the channel is eventually disposed _ = Task.Run(async () => { - try - { - await channel.ShutdownAsync(); - } - catch + using (channel) { - // Ignore shutdown errors during disposal - } - finally - { - channel.Dispose(); + try + { + await channel.ShutdownAsync(); + } + catch (Exception) + { + // Ignore shutdown errors during disposal + } } }); } diff --git a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs index bb5b7cd7e..eb483f00c 100644 --- a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs +++ b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs @@ -404,9 +404,9 @@ public async Task Dispose_ReleasesChannelResources() for (int i = 0; i < 5; i++) { string key = $"key{i}"; - GrpcChannel channel = cache.GetOrCreate(key, () => + cache.GetOrCreate(key, () => { - var c = GrpcChannel.ForAddress(TestEndpoint); + GrpcChannel c = GrpcChannel.ForAddress(TestEndpoint); createdChannels.Add(c); return c; }); From 9ef84a9242632ba4d63ddbcd0e293e05215c0fdd Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Mon, 26 Jan 2026 18:01:56 -0800 Subject: [PATCH 3/5] Simplify channel caching per review feedback - Remove separate GrpcChannelCache class - Inline channel caching directly in ConfigureGrpcChannel using ConcurrentDictionary> - Make ConfigureGrpcChannel implement IDisposable for proper channel disposal - Remove unused Replace() and TryRemove() methods - Add disposal verification tests - Reduces complexity from 170+ LOC to ~40 LOC per extension --- .../DurableTaskSchedulerClientExtensions.cs | 84 +++- src/Shared/AzureManaged/GrpcChannelCache.cs | 175 ------- .../DurableTaskSchedulerWorkerExtensions.cs | 84 +++- ...rableTaskSchedulerClientExtensionsTests.cs | 35 ++ .../GrpcChannelCacheTests.cs | 427 ------------------ ...rableTaskSchedulerWorkerExtensionsTests.cs | 35 ++ 6 files changed, 212 insertions(+), 628 deletions(-) delete mode 100644 src/Shared/AzureManaged/GrpcChannelCache.cs delete mode 100644 test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index bdcb1246a..f57458c00 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Concurrent; using Azure.Core; using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; @@ -92,10 +93,6 @@ static void ConfigureSchedulerOptions( options.EnableEntitySupport = true; }); - // Register the channel cache as a singleton to ensure channels are reused - // and properly disposed when the service provider is disposed. - builder.Services.TryAddSingleton(); - builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -104,14 +101,23 @@ static void ConfigureSchedulerOptions( /// /// Configuration class that sets up gRPC channels for client options /// using the provided Durable Task Scheduler options. + /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - /// Monitor for accessing the current scheduler options configuration. - /// Cache for gRPC channels to ensure reuse and proper disposal. - class ConfigureGrpcChannel( - IOptionsMonitor schedulerOptions, - GrpcChannelCache channelCache) : - IConfigureNamedOptions + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable { + readonly IOptionsMonitor schedulerOptions; + readonly ConcurrentDictionary> channels = new(); + volatile bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// Monitor for accessing the current scheduler options configuration. + public ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) + { + this.schedulerOptions = schedulerOptions; + } + /// /// Configures the default named options instance. /// @@ -125,14 +131,66 @@ class ConfigureGrpcChannel( /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskClientOptions options) { +#if NET7_0_OR_GREATER + ObjectDisposedException.ThrowIf(this.disposed, this); +#else + if (this.disposed) + { + throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); + } +#endif + string optionsName = name ?? Options.DefaultName; - DurableTaskSchedulerClientOptions source = schedulerOptions.Get(optionsName); + DurableTaskSchedulerClientOptions source = this.schedulerOptions.Get(optionsName); // Create a cache key based on the options name, endpoint, and task hub. // This ensures channels are reused for the same configuration // but separate channels are created for different configurations. - string cacheKey = $"client:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; - options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel()); + string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = this.channels.GetOrAdd( + cacheKey, + _ => new Lazy(source.CreateChannel)).Value; + } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + this.disposed = true; + + foreach (KeyValuePair> kvp in this.channels) + { + if (kvp.Value.IsValueCreated) + { + DisposeChannel(kvp.Value.Value); + } + } + + this.channels.Clear(); + } + + static void DisposeChannel(GrpcChannel channel) + { + // ShutdownAsync is the graceful way to close a gRPC channel. + // Fire-and-forget but ensure the channel is eventually disposed. + _ = Task.Run(async () => + { + using (channel) + { + try + { + await channel.ShutdownAsync(); + } + catch (Exception) + { + // Ignore shutdown errors during disposal + } + } + }); } } } diff --git a/src/Shared/AzureManaged/GrpcChannelCache.cs b/src/Shared/AzureManaged/GrpcChannelCache.cs deleted file mode 100644 index 13694aeb9..000000000 --- a/src/Shared/AzureManaged/GrpcChannelCache.cs +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Collections.Concurrent; -using Grpc.Net.Client; - -namespace Microsoft.DurableTask; - -/// -/// Thread-safe cache for gRPC channels that ensures channels are reused across retries/calls -/// and properly disposed when replaced or evicted. -/// -sealed class GrpcChannelCache : IDisposable -{ - readonly ConcurrentDictionary channels = new(); - readonly object syncLock = new(); - volatile bool disposed; - - /// - /// Gets or creates a cached gRPC channel for the specified key. - /// If a channel already exists for the key, it is returned. - /// If the factory creates a new channel, any existing channel for the key is disposed. - /// - /// The cache key (typically endpoint + taskhub combination). - /// Factory function to create a new channel if needed. - /// The cached or newly created gRPC channel. - public GrpcChannel GetOrCreate(string key, Func channelFactory) - { - Check.NotNullOrEmpty(key); - Check.NotNull(channelFactory); - - if (this.disposed) - { - throw new ObjectDisposedException(nameof(GrpcChannelCache)); - } - - // Fast path: return existing channel - if (this.channels.TryGetValue(key, out GrpcChannel? existingChannel)) - { - return existingChannel; - } - - // Create channel outside lock to avoid potential deadlock if factory calls back into cache - GrpcChannel newChannel = channelFactory(); - - lock (this.syncLock) - { - if (this.disposed) - { - // Cache was disposed while we were creating the channel - dispose and throw - DisposeChannelAsync(newChannel); - throw new ObjectDisposedException(nameof(GrpcChannelCache)); - } - - // Check if another thread added a channel while we were creating ours - if (this.channels.TryGetValue(key, out existingChannel)) - { - // Dispose our duplicate and return the existing one - DisposeChannelAsync(newChannel); - return existingChannel; - } - - this.channels[key] = newChannel; - return newChannel; - } - } - - /// - /// Replaces an existing channel for the specified key with a new one, - /// disposing the old channel if it exists. - /// - /// The cache key. - /// The new channel to cache. - public void Replace(string key, GrpcChannel newChannel) - { - Check.NotNullOrEmpty(key); - Check.NotNull(newChannel); - - if (this.disposed) - { - throw new ObjectDisposedException(nameof(GrpcChannelCache)); - } - - GrpcChannel? oldChannel = null; - - lock (this.syncLock) - { - if (this.disposed) - { - throw new ObjectDisposedException(nameof(GrpcChannelCache)); - } - - // Only replace if it's actually a different channel - if (this.channels.TryGetValue(key, out oldChannel) && - ReferenceEquals(oldChannel, newChannel)) - { - return; - } - - this.channels[key] = newChannel; - } - - // Dispose the old channel outside the lock to avoid potential deadlocks - DisposeChannelAsync(oldChannel); - } - - /// - /// Removes and disposes a channel for the specified key. - /// - /// The cache key. - /// True if a channel was removed; otherwise, false. - public bool TryRemove(string key) - { - Check.NotNullOrEmpty(key); - - if (this.channels.TryRemove(key, out GrpcChannel? channel)) - { - DisposeChannelAsync(channel); - return true; - } - - return false; - } - - /// - public void Dispose() - { - if (this.disposed) - { - return; - } - - lock (this.syncLock) - { - if (this.disposed) - { - return; - } - - this.disposed = true; - - foreach (KeyValuePair kvp in this.channels) - { - DisposeChannelAsync(kvp.Value); - } - - this.channels.Clear(); - } - } - - static void DisposeChannelAsync(GrpcChannel? channel) - { - if (channel == null) - { - return; - } - - // ShutdownAsync is the graceful way to close a gRPC channel - // We fire-and-forget but ensure the channel is eventually disposed - _ = Task.Run(async () => - { - using (channel) - { - try - { - await channel.ShutdownAsync(); - } - catch (Exception) - { - // Ignore shutdown errors during disposal - } - } - }); - } -} diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 7360dd124..21d5137f2 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Concurrent; using Azure.Core; using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; @@ -94,10 +95,6 @@ static void ConfigureSchedulerOptions( options.EnableEntitySupport = true; }); - // Register the channel cache as a singleton to ensure channels are reused - // and properly disposed when the service provider is disposed. - builder.Services.TryAddSingleton(); - builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -106,14 +103,23 @@ static void ConfigureSchedulerOptions( /// /// Configuration class that sets up gRPC channels for worker options /// using the provided Durable Task Scheduler options. + /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - /// Monitor for accessing the current scheduler options configuration. - /// Cache for gRPC channels to ensure reuse and proper disposal. - class ConfigureGrpcChannel( - IOptionsMonitor schedulerOptions, - GrpcChannelCache channelCache) : - IConfigureNamedOptions + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable { + readonly IOptionsMonitor schedulerOptions; + readonly ConcurrentDictionary> channels = new(); + volatile bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// Monitor for accessing the current scheduler options configuration. + public ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) + { + this.schedulerOptions = schedulerOptions; + } + /// /// Configures the default named options instance. /// @@ -127,15 +133,67 @@ class ConfigureGrpcChannel( /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { +#if NET7_0_OR_GREATER + ObjectDisposedException.ThrowIf(this.disposed, this); +#else + if (this.disposed) + { + throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); + } +#endif + string optionsName = name ?? Options.DefaultName; - DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(optionsName); + DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName); // Create a cache key based on the options name, endpoint, and task hub. // This ensures channels are reused for the same configuration // but separate channels are created for different configurations. - string cacheKey = $"worker:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; - options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel()); + string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = this.channels.GetOrAdd( + cacheKey, + _ => new Lazy(source.CreateChannel)).Value; options.ConfigureForAzureManaged(); } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + this.disposed = true; + + foreach (KeyValuePair> kvp in this.channels) + { + if (kvp.Value.IsValueCreated) + { + DisposeChannel(kvp.Value.Value); + } + } + + this.channels.Clear(); + } + + static void DisposeChannel(GrpcChannel channel) + { + // ShutdownAsync is the graceful way to close a gRPC channel. + // Fire-and-forget but ensure the channel is eventually disposed. + _ = Task.Run(async () => + { + using (channel) + { + try + { + await channel.ShutdownAsync(); + } + catch (Exception) + { + // Ignore shutdown errors during disposal + } + } + }); + } } } diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 77e64055e..2b7fbdd22 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -333,4 +333,39 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() options2.Channel.Should().NotBeNull(); options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); } + + [Fact] + public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + + // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels + provider.Dispose(); + + // Assert - after disposal, creating a new provider and getting options should work + // (this verifies the old provider was properly cleaned up) + ServiceCollection services2 = new ServiceCollection(); + Mock mockBuilder2 = new Mock(); + mockBuilder2.Setup(b => b.Services).Returns(services2); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider2 = services2.BuildServiceProvider(); + + IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); + GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); + newOptions.Channel.Should().NotBeNull(); + newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + } } diff --git a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs deleted file mode 100644 index eb483f00c..000000000 --- a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs +++ /dev/null @@ -1,427 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using FluentAssertions; -using Grpc.Net.Client; -using Xunit; - -namespace Microsoft.DurableTask.Tests; - -public class GrpcChannelCacheTests -{ - const string TestEndpoint = "http://localhost:5000"; - - [Fact] - public void GetOrCreate_SameKey_ReturnsSameChannel() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "test-key"; - int factoryCallCount = 0; - GrpcChannel Factory() - { - factoryCallCount++; - return GrpcChannel.ForAddress(TestEndpoint); - } - - // Act - GrpcChannel channel1 = cache.GetOrCreate(key, Factory); - GrpcChannel channel2 = cache.GetOrCreate(key, Factory); - - // Assert - channel1.Should().BeSameAs(channel2); - factoryCallCount.Should().Be(1, "factory should only be called once for the same key"); - } - - [Fact] - public void GetOrCreate_DifferentKeys_ReturnsDifferentChannels() - { - // Arrange - using GrpcChannelCache cache = new(); - string key1 = "key1"; - string key2 = "key2"; - - // Act - GrpcChannel channel1 = cache.GetOrCreate(key1, () => GrpcChannel.ForAddress(TestEndpoint)); - GrpcChannel channel2 = cache.GetOrCreate(key2, () => GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - channel1.Should().NotBeSameAs(channel2); - } - - [Fact] - public void GetOrCreate_ConcurrentAccess_CreatesSingleChannel() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "concurrent-key"; - int factoryCallCount = 0; - object countLock = new(); - GrpcChannel Factory() - { - lock (countLock) - { - factoryCallCount++; - } - - // Add small delay to increase chance of race conditions - Thread.Sleep(10); - return GrpcChannel.ForAddress(TestEndpoint); - } - - // Act - GrpcChannel[] channels = new GrpcChannel[10]; - Parallel.For(0, 10, i => - { - channels[i] = cache.GetOrCreate(key, Factory); - }); - - // Assert - factoryCallCount.Should().Be(1, "factory should only be called once even with concurrent access"); - channels.All(c => ReferenceEquals(c, channels[0])).Should().BeTrue("all channels should be the same instance"); - } - - [Fact] - public void Replace_ExistingChannel_DisposesOldChannel() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "replace-key"; - GrpcChannel oldChannel = GrpcChannel.ForAddress(TestEndpoint); - GrpcChannel newChannel = GrpcChannel.ForAddress(TestEndpoint); - cache.GetOrCreate(key, () => oldChannel); - - // Act - cache.Replace(key, newChannel); - GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); - - // Assert - retrievedChannel.Should().BeSameAs(newChannel); - retrievedChannel.Should().NotBeSameAs(oldChannel); - } - - [Fact] - public void Replace_SameChannel_DoesNothing() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "same-channel-key"; - GrpcChannel channel = GrpcChannel.ForAddress(TestEndpoint); - cache.GetOrCreate(key, () => channel); - - // Act & Assert - should not throw or change anything - cache.Replace(key, channel); - GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); - retrievedChannel.Should().BeSameAs(channel); - } - - [Fact] - public void Replace_NonExistingKey_AddsChannel() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "new-key"; - GrpcChannel channel = GrpcChannel.ForAddress(TestEndpoint); - - // Act - cache.Replace(key, channel); - GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); - - // Assert - retrievedChannel.Should().BeSameAs(channel); - } - - [Fact] - public void TryRemove_ExistingKey_RemovesAndReturnsTrue() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "remove-key"; - cache.GetOrCreate(key, () => GrpcChannel.ForAddress(TestEndpoint)); - - // Act - bool result = cache.TryRemove(key); - - // Assert - result.Should().BeTrue(); - - // Verify the key is removed by checking that a new channel is created - int factoryCallCount = 0; - cache.GetOrCreate(key, () => - { - factoryCallCount++; - return GrpcChannel.ForAddress(TestEndpoint); - }); - factoryCallCount.Should().Be(1, "a new channel should be created after removal"); - } - - [Fact] - public void TryRemove_NonExistingKey_ReturnsFalse() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "non-existing-key"; - - // Act - bool result = cache.TryRemove(key); - - // Assert - result.Should().BeFalse(); - } - - [Fact] - public void Dispose_DisposesAllChannels() - { - // Arrange - GrpcChannelCache cache = new(); - cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); - cache.GetOrCreate("key2", () => GrpcChannel.ForAddress(TestEndpoint)); - cache.GetOrCreate("key3", () => GrpcChannel.ForAddress(TestEndpoint)); - - // Act - cache.Dispose(); - - // Assert - attempting to use the cache after disposal should throw - Action action = () => cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); - action.Should().Throw(); - } - - [Fact] - public void Dispose_MultipleCalls_DoesNotThrow() - { - // Arrange - GrpcChannelCache cache = new(); - cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); - - // Act & Assert - multiple dispose calls should not throw - cache.Dispose(); - cache.Dispose(); - cache.Dispose(); - } - - [Fact] - public void GetOrCreate_AfterDispose_ThrowsObjectDisposedException() - { - // Arrange - GrpcChannelCache cache = new(); - cache.Dispose(); - - // Act - Action action = () => cache.GetOrCreate("key", () => GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw() - .WithMessage("*GrpcChannelCache*"); - } - - [Fact] - public void Replace_AfterDispose_ThrowsObjectDisposedException() - { - // Arrange - GrpcChannelCache cache = new(); - cache.Dispose(); - - // Act - Action action = () => cache.Replace("key", GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw() - .WithMessage("*GrpcChannelCache*"); - } - - [Fact] - public void GetOrCreate_WithNullKey_ThrowsArgumentException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.GetOrCreate(null!, () => GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void GetOrCreate_WithEmptyKey_ThrowsArgumentException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.GetOrCreate(string.Empty, () => GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void GetOrCreate_WithNullFactory_ThrowsArgumentNullException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.GetOrCreate("key", null!); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void Replace_WithNullKey_ThrowsArgumentException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.Replace(null!, GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void Replace_WithNullChannel_ThrowsArgumentNullException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.Replace("key", null!); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void TryRemove_WithNullKey_ThrowsArgumentException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.TryRemove(null!); - - // Assert - action.Should().Throw(); - } - - /// - /// This test verifies the core fix for the handle leak issue. - /// Without the cache, each call to configure options would create a new GrpcChannel, - /// causing handle count to grow unbounded when the service is unreachable. - /// With the cache, repeated calls reuse the same channel, preventing handle leaks. - /// - [Fact] - public void GetOrCreate_SimulatesRetryScenario_DoesNotCreateMultipleChannels() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "client:default:myendpoint.durabletask.io:myhub"; - int factoryCallCount = 0; - - GrpcChannel CreateChannel() - { - factoryCallCount++; - // Each GrpcChannel creates HttpClient + SocketsHttpHandler internally, - // which allocates socket handles. Without caching, this would leak handles. - return GrpcChannel.ForAddress(TestEndpoint); - } - - // Act - Simulate what happens during retries when service is unreachable: - // The options configuration callback may be invoked multiple times - const int retryAttempts = 100; - GrpcChannel[] channels = new GrpcChannel[retryAttempts]; - for (int i = 0; i < retryAttempts; i++) - { - channels[i] = cache.GetOrCreate(key, CreateChannel); - } - - // Assert - The factory should only be called ONCE, not 100 times - // This is the key behavior that prevents handle accumulation - factoryCallCount.Should().Be(1, - "the channel factory should only be called once regardless of how many times GetOrCreate is called - " + - "this is what prevents handle leaks when the service is unreachable"); - - // All returned channels should be the exact same instance - channels.All(c => ReferenceEquals(c, channels[0])).Should().BeTrue( - "all calls should return the same cached channel instance"); - } - - /// - /// Verifies that the old behavior (without cache) would create multiple channels. - /// This demonstrates what the cache prevents. - /// - [Fact] - public void WithoutCache_MultipleCallsCreateMultipleChannels() - { - // Arrange - simulate old behavior without cache - int factoryCallCount = 0; - List channels = new(); - - GrpcChannel CreateChannelWithoutCache() - { - factoryCallCount++; - return GrpcChannel.ForAddress(TestEndpoint); - } - - // Act - Without caching, each "retry" creates a new channel - const int retryAttempts = 10; - for (int i = 0; i < retryAttempts; i++) - { - // This simulates the OLD behavior before the fix - channels.Add(CreateChannelWithoutCache()); - } - - // Assert - Each call creates a new channel (the problematic behavior we fixed) - factoryCallCount.Should().Be(retryAttempts, - "without caching, each call creates a new channel - this causes handle leaks"); - - // All channels are different instances - channels.Distinct().Count().Should().Be(retryAttempts, - "without caching, each channel is a unique instance with its own handles"); - - // Cleanup - foreach (var channel in channels) - { - channel.Dispose(); - } - } - - /// - /// Verifies channels are properly disposed when the cache is disposed, - /// which releases the associated handles. - /// - [Fact] - public async Task Dispose_ReleasesChannelResources() - { - // Arrange - GrpcChannelCache cache = new(); - List createdChannels = new(); - - // Create multiple channels through the cache - for (int i = 0; i < 5; i++) - { - string key = $"key{i}"; - cache.GetOrCreate(key, () => - { - GrpcChannel c = GrpcChannel.ForAddress(TestEndpoint); - createdChannels.Add(c); - return c; - }); - } - - createdChannels.Count.Should().Be(5); - - // Act - Dispose the cache (this should dispose all channels) - cache.Dispose(); - - // Wait a bit for async disposal to complete - await Task.Delay(100); - - // Assert - The cache should be disposed and unusable - Action action = () => cache.GetOrCreate("new-key", () => GrpcChannel.ForAddress(TestEndpoint)); - action.Should().Throw(); - } -} diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index 1510c7e56..370924a4c 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -251,5 +251,40 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() options2.Channel.Should().NotBeNull(); options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); } + + [Fact] + public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + + // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels + provider.Dispose(); + + // Assert - after disposal, creating a new provider and getting options should work + // (this verifies the old provider was properly cleaned up) + ServiceCollection services2 = new ServiceCollection(); + Mock mockBuilder2 = new Mock(); + mockBuilder2.Setup(b => b.Services).Returns(services2); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider2 = services2.BuildServiceProvider(); + + IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); + GrpcDurableTaskWorkerOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); + newOptions.Channel.Should().NotBeNull(); + newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + } } From 19f521f1371672458deab4ff0a1a9fc5b130131c Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Mon, 26 Jan 2026 18:10:06 -0800 Subject: [PATCH 4/5] Address remaining CodeQL comments - Use LINQ Where() instead of if inside foreach for filtering channels - Narrow catch (Exception) to specific types (OperationCanceledException, ObjectDisposedException) --- .../DurableTaskSchedulerClientExtensions.cs | 12 +++++------- .../DurableTaskSchedulerWorkerExtensions.cs | 12 +++++------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index f57458c00..c92fc15fa 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.Collections.Concurrent; +using System.Linq; using Azure.Core; using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; @@ -162,12 +163,9 @@ public void Dispose() this.disposed = true; - foreach (KeyValuePair> kvp in this.channels) + foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) { - if (kvp.Value.IsValueCreated) - { - DisposeChannel(kvp.Value.Value); - } + DisposeChannel(channel.Value); } this.channels.Clear(); @@ -185,9 +183,9 @@ static void DisposeChannel(GrpcChannel channel) { await channel.ShutdownAsync(); } - catch (Exception) + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) { - // Ignore shutdown errors during disposal + // Ignore expected shutdown/disposal errors } } }); diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 21d5137f2..42d5f4f70 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.Collections.Concurrent; +using System.Linq; using Azure.Core; using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; @@ -165,12 +166,9 @@ public void Dispose() this.disposed = true; - foreach (KeyValuePair> kvp in this.channels) + foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) { - if (kvp.Value.IsValueCreated) - { - DisposeChannel(kvp.Value.Value); - } + DisposeChannel(channel.Value); } this.channels.Clear(); @@ -188,9 +186,9 @@ static void DisposeChannel(GrpcChannel channel) { await channel.ShutdownAsync(); } - catch (Exception) + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) { - // Ignore shutdown errors during disposal + // Ignore expected shutdown/disposal errors } } }); From 9b6703204dc25f5c88d9414a02c22a0ea664de27 Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Tue, 27 Jan 2026 14:45:13 -0800 Subject: [PATCH 5/5] Refactor gRPC channel disposal to use IAsyncDisposable and improve exception handling --- .../DurableTaskSchedulerClientExtensions.cs | 53 +++++++++++-------- .../DurableTaskSchedulerWorkerExtensions.cs | 53 +++++++++++-------- ...rableTaskSchedulerClientExtensionsTests.cs | 41 +++++++++++--- ...rableTaskSchedulerWorkerExtensionsTests.cs | 41 +++++++++++--- 4 files changed, 132 insertions(+), 56 deletions(-) diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index c92fc15fa..d2f81fcd5 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -104,11 +104,11 @@ static void ConfigureSchedulerOptions( /// using the provided Durable Task Scheduler options. /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IAsyncDisposable { readonly IOptionsMonitor schedulerOptions; readonly ConcurrentDictionary> channels = new(); - volatile bool disposed; + int disposed; /// /// Initializes a new instance of the class. @@ -133,9 +133,9 @@ public ConfigureGrpcChannel(IOptionsMonitor s public void Configure(string? name, GrpcDurableTaskClientOptions options) { #if NET7_0_OR_GREATER - ObjectDisposedException.ThrowIf(this.disposed, this); + ObjectDisposedException.ThrowIf(this.disposed == 1, this); #else - if (this.disposed) + if (this.disposed == 1) { throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); } @@ -154,41 +154,50 @@ public void Configure(string? name, GrpcDurableTaskClientOptions options) } /// - public void Dispose() + public async ValueTask DisposeAsync() { - if (this.disposed) + if (Interlocked.Exchange(ref this.disposed, 1) == 1) { return; } - this.disposed = true; - + List? exceptions = null; foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) { - DisposeChannel(channel.Value); + try + { + await DisposeChannelAsync(channel.Value).ConfigureAwait(false); + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } } this.channels.Clear(); + GC.SuppressFinalize(this); + + if (exceptions is { Count: > 0 }) + { + throw new AggregateException(exceptions); + } } - static void DisposeChannel(GrpcChannel channel) + static async Task DisposeChannelAsync(GrpcChannel channel) { // ShutdownAsync is the graceful way to close a gRPC channel. - // Fire-and-forget but ensure the channel is eventually disposed. - _ = Task.Run(async () => + using (channel) { - using (channel) + try { - try - { - await channel.ShutdownAsync(); - } - catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) - { - // Ignore expected shutdown/disposal errors - } + await channel.ShutdownAsync().ConfigureAwait(false); } - }); + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) + { + // Ignore expected shutdown/disposal errors + } + } } } } diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 42d5f4f70..d2534f9be 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -106,11 +106,11 @@ static void ConfigureSchedulerOptions( /// using the provided Durable Task Scheduler options. /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IAsyncDisposable { readonly IOptionsMonitor schedulerOptions; readonly ConcurrentDictionary> channels = new(); - volatile bool disposed; + int disposed; /// /// Initializes a new instance of the class. @@ -135,9 +135,9 @@ public ConfigureGrpcChannel(IOptionsMonitor s public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { #if NET7_0_OR_GREATER - ObjectDisposedException.ThrowIf(this.disposed, this); + ObjectDisposedException.ThrowIf(this.disposed == 1, this); #else - if (this.disposed) + if (this.disposed == 1) { throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); } @@ -157,41 +157,50 @@ public void Configure(string? name, GrpcDurableTaskWorkerOptions options) } /// - public void Dispose() + public async ValueTask DisposeAsync() { - if (this.disposed) + if (Interlocked.Exchange(ref this.disposed, 1) == 1) { return; } - this.disposed = true; - + List? exceptions = null; foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) { - DisposeChannel(channel.Value); + try + { + await DisposeChannelAsync(channel.Value).ConfigureAwait(false); + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } } this.channels.Clear(); + GC.SuppressFinalize(this); + + if (exceptions is { Count: > 0 }) + { + throw new AggregateException(exceptions); + } } - static void DisposeChannel(GrpcChannel channel) + static async Task DisposeChannelAsync(GrpcChannel channel) { // ShutdownAsync is the graceful way to close a gRPC channel. - // Fire-and-forget but ensure the channel is eventually disposed. - _ = Task.Run(async () => + using (channel) { - using (channel) + try { - try - { - await channel.ShutdownAsync(); - } - catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) - { - // Ignore expected shutdown/disposal errors - } + await channel.ShutdownAsync().ConfigureAwait(false); } - }); + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) + { + // Ignore expected shutdown/disposal errors + } + } } } } diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 2b7fbdd22..e64485637 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -5,6 +5,7 @@ using Azure.Identity; using FluentAssertions; using Grpc.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -335,7 +336,7 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() } [Fact] - public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -351,21 +352,49 @@ public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName); options.Channel.Should().NotBeNull(); + GrpcChannel channel = options.Channel!; // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels - provider.Dispose(); + await provider.DisposeAsync(); - // Assert - after disposal, creating a new provider and getting options should work - // (this verifies the old provider was properly cleaned up) + // Assert - verify the channel was disposed by checking it throws ObjectDisposedException + Action action = () => channel.CreateCallInvoker(); + action.Should().Throw("channel should be disposed after provider disposal"); + + // Also verify that creating a new provider and getting options still works ServiceCollection services2 = new ServiceCollection(); Mock mockBuilder2 = new Mock(); mockBuilder2.Setup(b => b.Services).Returns(services2); mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - ServiceProvider provider2 = services2.BuildServiceProvider(); + await using ServiceProvider provider2 = services2.BuildServiceProvider(); IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); newOptions.Channel.Should().NotBeNull(); - newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + newOptions.Channel.Should().NotBeSameAs(channel, "new provider should create a new channel"); + } + + [Fact] + public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisposedException() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options monitor before disposal + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + + // Dispose the service provider + await provider.DisposeAsync(); + + // Assert - attempting to get options after disposal should throw + Action action = () => optionsMonitor.Get(Options.DefaultName); + action.Should().Throw("configuring options after disposal should throw"); } } diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index 370924a4c..591ffaebf 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -4,6 +4,7 @@ using Azure.Core; using Azure.Identity; using FluentAssertions; +using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -253,7 +254,7 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() } [Fact] - public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -269,22 +270,50 @@ public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName); options.Channel.Should().NotBeNull(); + GrpcChannel channel = options.Channel!; // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels - provider.Dispose(); + await provider.DisposeAsync(); - // Assert - after disposal, creating a new provider and getting options should work - // (this verifies the old provider was properly cleaned up) + // Assert - verify the channel was disposed by checking it throws ObjectDisposedException + Action action = () => channel.CreateCallInvoker(); + action.Should().Throw("channel should be disposed after provider disposal"); + + // Also verify that creating a new provider and getting options still works ServiceCollection services2 = new ServiceCollection(); Mock mockBuilder2 = new Mock(); mockBuilder2.Setup(b => b.Services).Returns(services2); mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - ServiceProvider provider2 = services2.BuildServiceProvider(); + await using ServiceProvider provider2 = services2.BuildServiceProvider(); IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); GrpcDurableTaskWorkerOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); newOptions.Channel.Should().NotBeNull(); - newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + newOptions.Channel.Should().NotBeSameAs(channel, "new provider should create a new channel"); + } + + [Fact] + public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisposedException() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options monitor before disposal + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + + // Dispose the service provider + await provider.DisposeAsync(); + + // Assert - attempting to get options after disposal should throw + Action action = () => optionsMonitor.Get(Options.DefaultName); + action.Should().Throw("configuring options after disposal should throw"); } }