Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 75 additions & 5 deletions src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Linq;
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
Expand Down Expand Up @@ -99,11 +102,23 @@ static void ConfigureSchedulerOptions(
/// <summary>
/// Configuration class that sets up gRPC channels for client options
/// using the provided Durable Task Scheduler options.
/// Channels are cached per configuration key and disposed when the service provider is disposed.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions) :
IConfigureNamedOptions<GrpcDurableTaskClientOptions>
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskClientOptions>, IDisposable
{
readonly IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions;
readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();
volatile bool disposed;

/// <summary>
/// Initializes a new instance of the <see cref="ConfigureGrpcChannel"/> class.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
public ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions)
{
this.schedulerOptions = schedulerOptions;
}

/// <summary>
/// Configures the default named options instance.
/// </summary>
Expand All @@ -117,8 +132,63 @@ class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> sc
/// <param name="options">The options instance to configure.</param>
public void Configure(string? name, GrpcDurableTaskClientOptions options)
{
DurableTaskSchedulerClientOptions source = schedulerOptions.Get(name ?? Options.DefaultName);
options.Channel = source.CreateChannel();
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(this.disposed, this);
#else
if (this.disposed)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}
#endif

string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerClientOptions source = this.schedulerOptions.Get(optionsName);

// Create a cache key based on the options name, endpoint, and task hub.
// This ensures channels are reused for the same configuration
// but separate channels are created for different configurations.
string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
}

/// <inheritdoc/>
public void Dispose()
{
if (this.disposed)
{
return;
}

this.disposed = true;

foreach (Lazy<GrpcChannel> channel in this.channels.Values.Where(lazy => lazy.IsValueCreated))
{
DisposeChannel(channel.Value);
}

this.channels.Clear();
}

static void DisposeChannel(GrpcChannel channel)
{
// ShutdownAsync is the graceful way to close a gRPC channel.
// Fire-and-forget but ensure the channel is eventually disposed.
_ = Task.Run(async () =>
{
using (channel)
{
try
{
await channel.ShutdownAsync();
}
catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException)
{
// Ignore expected shutdown/disposal errors
}
}
});
}
}
}
80 changes: 75 additions & 5 deletions src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Linq;
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Worker.Grpc;
using Microsoft.DurableTask.Worker.Grpc.Internal;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -101,11 +104,23 @@ static void ConfigureSchedulerOptions(
/// <summary>
/// Configuration class that sets up gRPC channels for worker options
/// using the provided Durable Task Scheduler options.
/// Channels are cached per configuration key and disposed when the service provider is disposed.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions) :
IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>, IDisposable
{
readonly IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions;
readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();
volatile bool disposed;

/// <summary>
/// Initializes a new instance of the <see cref="ConfigureGrpcChannel"/> class.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
public ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions)
{
this.schedulerOptions = schedulerOptions;
}

/// <summary>
/// Configures the default named options instance.
/// </summary>
Expand All @@ -119,9 +134,64 @@ class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> sc
/// <param name="options">The options instance to configure.</param>
public void Configure(string? name, GrpcDurableTaskWorkerOptions options)
{
DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName);
options.Channel = source.CreateChannel();
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(this.disposed, this);
#else
if (this.disposed)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}
#endif

string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName);

// Create a cache key based on the options name, endpoint, and task hub.
// This ensures channels are reused for the same configuration
// but separate channels are created for different configurations.
string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
options.ConfigureForAzureManaged();
}

/// <inheritdoc/>
public void Dispose()
{
if (this.disposed)
{
return;
}

this.disposed = true;

foreach (Lazy<GrpcChannel> channel in this.channels.Values.Where(lazy => lazy.IsValueCreated))
{
DisposeChannel(channel.Value);
}

this.channels.Clear();
}

static void DisposeChannel(GrpcChannel channel)
{
// ShutdownAsync is the graceful way to close a gRPC channel.
// Fire-and-forget but ensure the channel is eventually disposed.
_ = Task.Run(async () =>
{
using (channel)
{
try
{
await channel.ShutdownAsync();
}
catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException)
{
// Ignore expected shutdown/disposal errors
}
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,92 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo
clientOptions.RetryOptions.RetryableStatusCodes.Should().Contain(StatusCode.Unknown);
}
}

[Fact]
public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName);

// Assert
options1.Channel.Should().NotBeNull();
options2.Channel.Should().NotBeNull();
options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel");
}

[Fact]
public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder1 = new Mock<IDurableTaskClientBuilder>();
Mock<IDurableTaskClientBuilder> mockBuilder2 = new Mock<IDurableTaskClientBuilder>();
mockBuilder1.Setup(b => b.Services).Returns(services);
mockBuilder1.Setup(b => b.Name).Returns("client1");
mockBuilder2.Setup(b => b.Services).Returns(services);
mockBuilder2.Setup(b => b.Name).Returns("client2");
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act - configure two different named clients with different endpoints
mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential);
mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options for both named clients
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1");
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2");

// Assert
options1.Channel.Should().NotBeNull();
options2.Channel.Should().NotBeNull();
options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels");
}

[Fact]
public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options to trigger channel creation
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName);
options.Channel.Should().NotBeNull();

// Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels
provider.Dispose();

// Assert - after disposal, creating a new provider and getting options should work
// (this verifies the old provider was properly cleaned up)
ServiceCollection services2 = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder2 = new Mock<IDurableTaskClientBuilder>();
mockBuilder2.Setup(b => b.Services).Returns(services2);
mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider2 = services2.BuildServiceProvider();

IOptionsMonitor<GrpcDurableTaskClientOptions> newOptionsMonitor = provider2.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName);
newOptions.Channel.Should().NotBeNull();
newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,93 @@ public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly()
options.ResourceId.Should().Be("https://durabletask.io");
options.AllowInsecureCredentials.Should().BeFalse();
}

[Fact]
public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskWorkerBuilder> mockBuilder = new Mock<IDurableTaskWorkerBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskWorkerOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskWorkerOptions>>();
GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get(Options.DefaultName);

// Assert
options1.Channel.Should().NotBeNull();
options2.Channel.Should().NotBeNull();
options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel");
}

[Fact]
public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskWorkerBuilder> mockBuilder1 = new Mock<IDurableTaskWorkerBuilder>();
Mock<IDurableTaskWorkerBuilder> mockBuilder2 = new Mock<IDurableTaskWorkerBuilder>();
mockBuilder1.Setup(b => b.Services).Returns(services);
mockBuilder1.Setup(b => b.Name).Returns("worker1");
mockBuilder2.Setup(b => b.Services).Returns(services);
mockBuilder2.Setup(b => b.Name).Returns("worker2");
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act - configure two different named workers with different endpoints
mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential);
mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options for both named workers
IOptionsMonitor<GrpcDurableTaskWorkerOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskWorkerOptions>>();
GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1");
GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2");

// Assert
options1.Channel.Should().NotBeNull();
options2.Channel.Should().NotBeNull();
options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels");
}

[Fact]
public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskWorkerBuilder> mockBuilder = new Mock<IDurableTaskWorkerBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options to trigger channel creation
IOptionsMonitor<GrpcDurableTaskWorkerOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskWorkerOptions>>();
GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName);
options.Channel.Should().NotBeNull();

// Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels
provider.Dispose();

// Assert - after disposal, creating a new provider and getting options should work
// (this verifies the old provider was properly cleaned up)
ServiceCollection services2 = new ServiceCollection();
Mock<IDurableTaskWorkerBuilder> mockBuilder2 = new Mock<IDurableTaskWorkerBuilder>();
mockBuilder2.Setup(b => b.Services).Returns(services2);
mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider2 = services2.BuildServiceProvider();

IOptionsMonitor<GrpcDurableTaskWorkerOptions> newOptionsMonitor = provider2.GetRequiredService<IOptionsMonitor<GrpcDurableTaskWorkerOptions>>();
GrpcDurableTaskWorkerOptions newOptions = newOptionsMonitor.Get(Options.DefaultName);
newOptions.Channel.Should().NotBeNull();
newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel");
}
}

Loading