diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index b11f450a..c92fc15f 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -1,7 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Concurrent; +using System.Linq; using Azure.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -99,11 +102,23 @@ static void ConfigureSchedulerOptions( /// /// Configuration class that sets up gRPC channels for client options /// using the provided Durable Task Scheduler options. + /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - /// Monitor for accessing the current scheduler options configuration. - class ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) : - IConfigureNamedOptions + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable { + readonly IOptionsMonitor schedulerOptions; + readonly ConcurrentDictionary> channels = new(); + volatile bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// Monitor for accessing the current scheduler options configuration. + public ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) + { + this.schedulerOptions = schedulerOptions; + } + /// /// Configures the default named options instance. /// @@ -117,8 +132,63 @@ class ConfigureGrpcChannel(IOptionsMonitor sc /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskClientOptions options) { - DurableTaskSchedulerClientOptions source = schedulerOptions.Get(name ?? Options.DefaultName); - options.Channel = source.CreateChannel(); +#if NET7_0_OR_GREATER + ObjectDisposedException.ThrowIf(this.disposed, this); +#else + if (this.disposed) + { + throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); + } +#endif + + string optionsName = name ?? Options.DefaultName; + DurableTaskSchedulerClientOptions source = this.schedulerOptions.Get(optionsName); + + // Create a cache key based on the options name, endpoint, and task hub. + // This ensures channels are reused for the same configuration + // but separate channels are created for different configurations. + string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = this.channels.GetOrAdd( + cacheKey, + _ => new Lazy(source.CreateChannel)).Value; + } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + this.disposed = true; + + foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) + { + DisposeChannel(channel.Value); + } + + this.channels.Clear(); + } + + static void DisposeChannel(GrpcChannel channel) + { + // ShutdownAsync is the graceful way to close a gRPC channel. + // Fire-and-forget but ensure the channel is eventually disposed. + _ = Task.Run(async () => + { + using (channel) + { + try + { + await channel.ShutdownAsync(); + } + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) + { + // Ignore expected shutdown/disposal errors + } + } + }); } } } diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 7a3baa41..42d5f4f7 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -1,7 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Concurrent; +using System.Linq; using Azure.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; using Microsoft.DurableTask.Worker.Grpc.Internal; using Microsoft.Extensions.DependencyInjection; @@ -101,11 +104,23 @@ static void ConfigureSchedulerOptions( /// /// Configuration class that sets up gRPC channels for worker options /// using the provided Durable Task Scheduler options. + /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - /// Monitor for accessing the current scheduler options configuration. - class ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) : - IConfigureNamedOptions + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable { + readonly IOptionsMonitor schedulerOptions; + readonly ConcurrentDictionary> channels = new(); + volatile bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// Monitor for accessing the current scheduler options configuration. + public ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) + { + this.schedulerOptions = schedulerOptions; + } + /// /// Configures the default named options instance. /// @@ -119,9 +134,64 @@ class ConfigureGrpcChannel(IOptionsMonitor sc /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { - DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName); - options.Channel = source.CreateChannel(); +#if NET7_0_OR_GREATER + ObjectDisposedException.ThrowIf(this.disposed, this); +#else + if (this.disposed) + { + throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); + } +#endif + + string optionsName = name ?? Options.DefaultName; + DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName); + + // Create a cache key based on the options name, endpoint, and task hub. + // This ensures channels are reused for the same configuration + // but separate channels are created for different configurations. + string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = this.channels.GetOrAdd( + cacheKey, + _ => new Lazy(source.CreateChannel)).Value; options.ConfigureForAzureManaged(); } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + this.disposed = true; + + foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) + { + DisposeChannel(channel.Value); + } + + this.channels.Clear(); + } + + static void DisposeChannel(GrpcChannel channel) + { + // ShutdownAsync is the graceful way to close a gRPC channel. + // Fire-and-forget but ensure the channel is eventually disposed. + _ = Task.Run(async () => + { + using (channel) + { + try + { + await channel.ShutdownAsync(); + } + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) + { + // Ignore expected shutdown/disposal errors + } + } + }); + } } } diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 3c760ddf..2b7fbdd2 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -280,4 +280,92 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo clientOptions.RetryOptions.RetryableStatusCodes.Should().Contain(StatusCode.Unknown); } } + + [Fact] + public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options multiple times to trigger channel configuration + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); + } + + [Fact] + public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two different named clients with different endpoints + mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); + } + + [Fact] + public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + + // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels + provider.Dispose(); + + // Assert - after disposal, creating a new provider and getting options should work + // (this verifies the old provider was properly cleaned up) + ServiceCollection services2 = new ServiceCollection(); + Mock mockBuilder2 = new Mock(); + mockBuilder2.Setup(b => b.Services).Returns(services2); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider2 = services2.BuildServiceProvider(); + + IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); + GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); + newOptions.Channel.Should().NotBeNull(); + newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + } } diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index a661b53d..370924a4 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -198,4 +198,93 @@ public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() options.ResourceId.Should().Be("https://durabletask.io"); options.AllowInsecureCredentials.Should().BeFalse(); } + + [Fact] + public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options multiple times to trigger channel configuration + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get(Options.DefaultName); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get(Options.DefaultName); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); + } + + [Fact] + public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two different named workers with different endpoints + mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); + } + + [Fact] + public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + + // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels + provider.Dispose(); + + // Assert - after disposal, creating a new provider and getting options should work + // (this verifies the old provider was properly cleaned up) + ServiceCollection services2 = new ServiceCollection(); + Mock mockBuilder2 = new Mock(); + mockBuilder2.Setup(b => b.Services).Returns(services2); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider2 = services2.BuildServiceProvider(); + + IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); + GrpcDurableTaskWorkerOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); + newOptions.Channel.Should().NotBeNull(); + newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + } } +