Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 84 additions & 5 deletions src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Linq;
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
Expand Down Expand Up @@ -99,11 +102,23 @@ static void ConfigureSchedulerOptions(
/// <summary>
/// Configuration class that sets up gRPC channels for client options
/// using the provided Durable Task Scheduler options.
/// Channels are cached per configuration key and disposed when the service provider is disposed.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions) :
IConfigureNamedOptions<GrpcDurableTaskClientOptions>
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskClientOptions>, IAsyncDisposable
{
readonly IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions;
readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();
int disposed;
Comment on lines +107 to +111
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions IDisposable and a volatile disposed flag, but this implementation is IAsyncDisposable-only and the disposed field isn’t volatile (and is read without Volatile.Read). Either update the PR description or adjust the implementation to match (e.g., implement IDisposable and use volatile/Volatile.Read for the disposed check).

Copilot uses AI. Check for mistakes.

/// <summary>
/// Initializes a new instance of the <see cref="ConfigureGrpcChannel"/> class.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
public ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions)
{
this.schedulerOptions = schedulerOptions;
}

/// <summary>
/// Configures the default named options instance.
/// </summary>
Expand All @@ -117,8 +132,72 @@ class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> sc
/// <param name="options">The options instance to configure.</param>
public void Configure(string? name, GrpcDurableTaskClientOptions options)
{
DurableTaskSchedulerClientOptions source = schedulerOptions.Get(name ?? Options.DefaultName);
options.Channel = source.CreateChannel();
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(this.disposed == 1, this);
#else
if (this.disposed == 1)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}
#endif

string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerClientOptions source = this.schedulerOptions.Get(optionsName);

// Create a cache key based on the options name, endpoint, and task hub.
// This ensures channels are reused for the same configuration
Comment on lines +144 to +148
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateChannel() behavior depends on more than endpoint/task hub (e.g., ResourceId, Credential, AllowInsecureCredentials, RetryOptions). If any of those values change while EndpointAddress/TaskHubName stay the same (e.g., via options reload), the cached channel will be reused with stale settings. Consider including the channel-affecting fields in the cache key or enforcing immutability for them.

Copilot uses AI. Check for mistakes.
// but separate channels are created for different configurations.
string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel cache key is built by concatenating strings with ':' delimiters, but EndpointAddress commonly contains ':' (e.g., "https://" or ports). This can create ambiguous keys and potential collisions. Prefer a composite key type (e.g., ValueTuple/record struct) rather than a delimited string.

Suggested change
string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
string cacheKey = string.Concat(optionsName, "\u001F", source.EndpointAddress, "\u001F", source.TaskHubName);

Copilot uses AI. Check for mistakes.
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref this.disposed, 1) == 1)
{
return;
}

List<Exception>? exceptions = null;
foreach (Lazy<GrpcChannel> channel in this.channels.Values.Where(lazy => lazy.IsValueCreated))
{
try
{
await DisposeChannelAsync(channel.Value).ConfigureAwait(false);
}
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
Comment on lines +171 to +175
}

this.channels.Clear();
GC.SuppressFinalize(this);

if (exceptions is { Count: > 0 })
{
throw new AggregateException(exceptions);
}
Comment on lines +181 to +184
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DisposeAsync throws an AggregateException when any channel shutdown/dispose fails. Throwing from ServiceProvider disposal can surface as app shutdown failures and is difficult for callers to handle. Consider making this best-effort (swallow/log disposal errors) instead of throwing.

Copilot uses AI. Check for mistakes.
}

static async Task DisposeChannelAsync(GrpcChannel channel)
{
// ShutdownAsync is the graceful way to close a gRPC channel.
using (channel)
{
try
{
await channel.ShutdownAsync().ConfigureAwait(false);
}
catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException)
{
// Ignore expected shutdown/disposal errors
}
}
}
}
}
89 changes: 84 additions & 5 deletions src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Linq;
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Worker.Grpc;
using Microsoft.DurableTask.Worker.Grpc.Internal;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -101,11 +104,23 @@ static void ConfigureSchedulerOptions(
/// <summary>
/// Configuration class that sets up gRPC channels for worker options
/// using the provided Durable Task Scheduler options.
/// Channels are cached per configuration key and disposed when the service provider is disposed.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions) :
IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>, IAsyncDisposable
{
readonly IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions;
readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();
int disposed;
Comment on lines +109 to +113
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions IDisposable and a volatile disposed flag, but this implementation is IAsyncDisposable-only and the disposed field isn’t volatile (and is read without Volatile.Read). Either update the PR description or adjust the implementation to match (e.g., implement IDisposable and use volatile/Volatile.Read for the disposed check).

Copilot uses AI. Check for mistakes.

/// <summary>
/// Initializes a new instance of the <see cref="ConfigureGrpcChannel"/> class.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
public ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions)
{
this.schedulerOptions = schedulerOptions;
}

/// <summary>
/// Configures the default named options instance.
/// </summary>
Expand All @@ -119,9 +134,73 @@ class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> sc
/// <param name="options">The options instance to configure.</param>
public void Configure(string? name, GrpcDurableTaskWorkerOptions options)
{
DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName);
options.Channel = source.CreateChannel();
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(this.disposed == 1, this);
#else
if (this.disposed == 1)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}
#endif

string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName);

// Create a cache key based on the options name, endpoint, and task hub.
// This ensures channels are reused for the same configuration
Comment on lines +146 to +150
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateChannel() behavior depends on more than endpoint/task hub (e.g., ResourceId, Credential, AllowInsecureCredentials, and WorkerId via the call credentials interceptor). If any of these values change while EndpointAddress/TaskHubName stay the same (e.g., via options reload), the cached channel will be reused with stale settings. Consider including these fields in the cache key or enforcing immutability for them.

Copilot uses AI. Check for mistakes.
// but separate channels are created for different configurations.
string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel cache key is built by concatenating strings with ':' delimiters, but EndpointAddress commonly contains ':' (e.g., "https://" or ports). This can create ambiguous keys and potential collisions. Prefer a composite key type (e.g., ValueTuple/record struct) rather than a delimited string.

Suggested change
string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
// Use a delimiter character (\u001F) that will not appear in typical endpoint URIs.
string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}";

Copilot uses AI. Check for mistakes.
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
options.ConfigureForAzureManaged();
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref this.disposed, 1) == 1)
{
return;
}

List<Exception>? exceptions = null;
foreach (Lazy<GrpcChannel> channel in this.channels.Values.Where(lazy => lazy.IsValueCreated))
{
try
{
await DisposeChannelAsync(channel.Value).ConfigureAwait(false);
}
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
Comment on lines +174 to +178
}

this.channels.Clear();
GC.SuppressFinalize(this);

if (exceptions is { Count: > 0 })
{
throw new AggregateException(exceptions);
}
Comment on lines +184 to +187
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DisposeAsync throws an AggregateException when any channel shutdown/dispose fails. Throwing from ServiceProvider disposal can surface as app shutdown failures and is difficult for callers to handle. Consider making this best-effort (swallow/log disposal errors) instead of throwing.

Copilot uses AI. Check for mistakes.
}

static async Task DisposeChannelAsync(GrpcChannel channel)
{
// ShutdownAsync is the graceful way to close a gRPC channel.
using (channel)
{
try
{
await channel.ShutdownAsync().ConfigureAwait(false);
}
catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException)
{
// Ignore expected shutdown/disposal errors
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Azure.Identity;
using FluentAssertions;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -280,4 +281,120 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo
clientOptions.RetryOptions.RetryableStatusCodes.Should().Contain(StatusCode.Unknown);
}
}

[Fact]
public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName);
Comment on lines +296 to +301
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test calls IOptionsMonitor.Get() twice; OptionsMonitor caches options per name, so the second call typically returns the same options instance and won’t re-run Configure (so it doesn’t validate the new channel-caching behavior). Consider forcing new options creation (e.g., via IOptionsFactory/IOptionsSnapshot scopes) and disposing the ServiceProvider to avoid leaking channels/handlers during the test run.

Suggested change
ServiceProvider provider = services.BuildServiceProvider();
// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName);
using ServiceProvider provider = services.BuildServiceProvider();
// Resolve options multiple times to trigger channel configuration
IOptionsFactory<GrpcDurableTaskClientOptions> optionsFactory = provider.GetRequiredService<IOptionsFactory<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsFactory.Create(Options.DefaultName);
GrpcDurableTaskClientOptions options2 = optionsFactory.Create(Options.DefaultName);

Copilot uses AI. Check for mistakes.

// Assert
options1.Channel.Should().NotBeNull();
options2.Channel.Should().NotBeNull();
options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel");
}

[Fact]
public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder1 = new Mock<IDurableTaskClientBuilder>();
Mock<IDurableTaskClientBuilder> mockBuilder2 = new Mock<IDurableTaskClientBuilder>();
mockBuilder1.Setup(b => b.Services).Returns(services);
mockBuilder1.Setup(b => b.Name).Returns("client1");
mockBuilder2.Setup(b => b.Services).Returns(services);
mockBuilder2.Setup(b => b.Name).Returns("client2");
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act - configure two different named clients with different endpoints
mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential);
mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

Comment on lines +322 to +326
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses different endpoints for different named options, so it will pass even if the cache key accidentally ignores the options name. To validate name isolation in the cache key, use the same endpoint/task hub for both names and assert the channels differ; also dispose the ServiceProvider to avoid leaking channels.

Copilot uses AI. Check for mistakes.
// Resolve options for both named clients
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1");
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2");

// Assert
options1.Channel.Should().NotBeNull();
options2.Channel.Should().NotBeNull();
options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels");
}

[Fact]
public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options to trigger channel creation
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName);
options.Channel.Should().NotBeNull();
GrpcChannel channel = options.Channel!;

// Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels
await provider.DisposeAsync();

// Assert - verify the channel was disposed by checking it throws ObjectDisposedException
Action action = () => channel.CreateCallInvoker();
action.Should().Throw<ObjectDisposedException>("channel should be disposed after provider disposal");

// Also verify that creating a new provider and getting options still works
ServiceCollection services2 = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder2 = new Mock<IDurableTaskClientBuilder>();
mockBuilder2.Setup(b => b.Services).Returns(services2);
mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
await using ServiceProvider provider2 = services2.BuildServiceProvider();

IOptionsMonitor<GrpcDurableTaskClientOptions> newOptionsMonitor = provider2.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName);
newOptions.Channel.Should().NotBeNull();
newOptions.Channel.Should().NotBeSameAs(channel, "new provider should create a new channel");
}

[Fact]
public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisposedException()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options monitor before disposal
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();

// Dispose the service provider
await provider.DisposeAsync();

// Assert - attempting to get options after disposal should throw
Action action = () => optionsMonitor.Get(Options.DefaultName);
action.Should().Throw<ObjectDisposedException>("configuring options after disposal should throw");
}
}
Loading
Loading