Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/Client/Core/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@
/// <remarks>
/// <para>All orchestrations must have a unique instance ID. You can provide an instance ID using the
/// <paramref name="options"/> parameter or you can omit this and a random instance ID will be
/// generated for you automatically. If an orchestration with the specified instance ID already exists and is in a
/// non-terminal state (Pending, Running, etc.), then this operation may fail silently. However, if an orchestration
/// instance with this ID already exists in a terminal state (Completed, Terminated, Failed, etc.) then the instance
/// may be recreated automatically, depending on the configuration of the backend instance store.
/// generated for you automatically. If an orchestration with the specified instance ID already exists and its status
/// is not in the <see cref="StartOrchestrationOptions.DedupeStatuses"/> field of <paramref name="options"/>, then
/// a new orchestration may be recreated automatically, depending on the configuration of the backend instance store.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "depending on the configuration of the backend instance store." in the existing method comment mean? Do not all backends support dedupe statuses?

/// If the existing orchestration is in a non-terminal state (Pending, Running, etc.), then the orchestration will first
/// be terminated before the new orchestration is created.
/// </para><para>
/// Orchestration instances started with this method will be created in the
/// <see cref="OrchestrationRuntimeStatus.Pending"/> state and will transition to the
Expand All @@ -98,8 +99,9 @@
/// </param>
/// <param name="options">The options to start the new orchestration with.</param>
/// <param name="cancellation">
/// The cancellation token. This only cancels enqueueing the new orchestration to the backend. Does not cancel the
/// orchestration once enqueued.
/// The cancellation token. This only cancels enqueueing the new orchestration to the backend, or waiting for the
/// termination of an existing non-terminal instance if its status is not in
/// <see cref="StartOrchestrationOptions.DedupeStatuses"/>. Does not cancel the orchestration once enqueued.
/// </param>
/// <returns>
/// A task that completes when the orchestration instance is successfully scheduled. The value of this task is
Expand Down Expand Up @@ -529,7 +531,7 @@
throw new NotSupportedException(
$"{this.GetType()} does not support listing orchestration instance IDs filtered by completed time.");
}

Check warning on line 534 in src/Client/Core/DurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

// TODO: Create task hub

// TODO: Delete task hub
Expand All @@ -539,3 +541,3 @@
/// </summary>
/// <returns>A <see cref="ValueTask"/> that completes when the disposal completes.</returns>
public abstract ValueTask DisposeAsync();
Expand Down
23 changes: 7 additions & 16 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,9 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
}

// Set orchestration ID reuse policy for deduplication support
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
// Parse and validate all status strings to enum first
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options?.DedupeStatuses is null
? []
: [.. options.DedupeStatuses
.Select(s =>
{
if (!System.Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out OrchestrationRuntimeStatus status))
Expand All @@ -139,17 +136,11 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
}

return status;
}).ToImmutableHashSet();

// Convert dedupe statuses to protobuf statuses and create reuse policy
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);
})];

if (policy != null)
{
request.OrchestrationIdReusePolicy = policy;
}
}
// Convert dedupe statuses to protobuf statuses and create reuse policy
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
request.OrchestrationIdReusePolicy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add logic to wrap the "AlreadyExists" RpcException thrown if an orchestration already exists with a status in dedupe statuses?


using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request);

Expand Down
51 changes: 27 additions & 24 deletions src/Client/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ namespace Microsoft.DurableTask.Client.Grpc;
public static class ProtoUtils
{
/// <summary>
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
/// Gets an array of all orchestration statuses.
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
/// </summary>
/// <returns>An immutable array of terminal orchestration statuses.</returns>
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
/// <returns>An immutable array of all orchestration statuses.</returns>
public static ImmutableArray<P.OrchestrationStatus> GetAllStatuses()
Comment on lines +16 to +20
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change removes/renames public APIs in ProtoUtils (e.g., GetTerminalStatuses -> GetAllStatuses, nullable return types removed). If ProtoUtils is part of the public surface of Client.Grpc, this is a breaking change for SDK consumers beyond the server-side behavior change described in the PR. Consider keeping the old APIs as [Obsolete] shims (delegating to the new behavior) or explicitly calling out the API break in the PR summary/release notes.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. Is this something we need to worry about?

{
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compatibility with what?

return ImmutableArray.Create(
P.OrchestrationStatus.Completed,
P.OrchestrationStatus.Failed,
P.OrchestrationStatus.Terminated,
P.OrchestrationStatus.Canceled);
P.OrchestrationStatus.Terminated,
P.OrchestrationStatus.Canceled,
P.OrchestrationStatus.Pending,
P.OrchestrationStatus.Running,
P.OrchestrationStatus.Suspended);
#pragma warning restore CS0618
}

Expand All @@ -33,55 +36,55 @@ public static class ProtoUtils
/// with replaceable statuses (statuses that CAN be replaced).
/// </summary>
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced.
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
/// So replaceableStatus = all statuses MINUS dedupeStatuses.
/// </remarks>
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
public static P.OrchestrationIdReusePolicy ConvertDedupeStatusesToReusePolicy(
IEnumerable<P.OrchestrationStatus>? dedupeStatuses)
{
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableArray<P.OrchestrationStatus> statuses = GetAllStatuses();
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses?.ToImmutableHashSet() ?? ImmutableHashSet<P.OrchestrationStatus>.Empty;

P.OrchestrationIdReusePolicy policy = new();

// Add terminal statuses that are NOT in dedupeStatuses as replaceable
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status)))
// Add statuses that are NOT in dedupeStatuses as replaceable
foreach (P.OrchestrationStatus status in statuses.Where(status => !dedupeStatusSet.Contains(status)))
{
policy.ReplaceableStatus.Add(terminalStatus);
policy.ReplaceableStatus.Add(status);
}

// Only return policy if we have replaceable statuses
return policy.ReplaceableStatus.Count > 0 ? policy : null;
return policy;
}

/// <summary>
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
/// (statuses that should NOT be replaced).
/// </summary>
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses. If this parameter is null,
/// then all statuses are considered replaceable.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all statuses are replaceable.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
/// So dedupeStatuses = all statuses MINUS replaceableStatus.
/// </remarks>
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
P.OrchestrationIdReusePolicy? policy)
{
if (policy == null || policy.ReplaceableStatus.Count == 0)
{
if (policy == null)
{
return null;
}
}

ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableArray<P.OrchestrationStatus> allStatuses = GetAllStatuses();
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();

// Calculate dedupe statuses = terminal statuses - replaceable statuses
P.OrchestrationStatus[] dedupeStatuses = terminalStatuses
.Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus))
// Calculate dedupe statuses = all statuses - replaceable statuses
P.OrchestrationStatus[] dedupeStatuses = allStatuses
.Where(status => !replaceableStatusSet.Contains(status))
.ToArray();

// Only return if there are dedupe statuses
Expand Down
51 changes: 51 additions & 0 deletions src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.DurableTask.Client;
Expand Down Expand Up @@ -224,6 +225,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
.ToArray();
}

await this.TerminateTaskOrchestrationWithReusableRunningStatusAndWaitAsync(instanceId, dedupeStatuses, cancellation);
await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses);
return instanceId;
}
Expand Down Expand Up @@ -385,4 +387,53 @@ Task SendInstanceMessageAsync(string instanceId, HistoryEvent @event, Cancellati

return this.Client.SendTaskOrchestrationMessageAsync(message);
}

async Task TerminateTaskOrchestrationWithReusableRunningStatusAndWaitAsync(
string instanceId,
OrchestrationStatus[]? dedupeStatuses,
CancellationToken cancellation)
{
var runningStatuses = new List<OrchestrationStatus>()
{
OrchestrationStatus.Running,
OrchestrationStatus.Pending,
OrchestrationStatus.Suspended,
};

// At least one running status is reusable, so determine if an orchestration already exists with this status and terminate it if so
if (dedupeStatuses == null || runningStatuses.Any(status => !dedupeStatuses.Contains(status)))
{
OrchestrationMetadata? metadata = await this.GetInstancesAsync(instanceId, getInputsAndOutputs: false, cancellation);

if (metadata != null)
{
OrchestrationStatus orchestrationStatus = metadata.RuntimeStatus.ConvertToCore();
if (dedupeStatuses?.Contains(orchestrationStatus) == true)
{
throw new OrchestrationAlreadyExistsException($"An orchestration with instance ID '{instanceId}' and status " +
$"'{metadata.RuntimeStatus}' already exists");
}

if (runningStatuses.Contains(orchestrationStatus))
{
// Check for cancellation before attempting to terminate the orchestration
cancellation.ThrowIfCancellationRequested();

string terminationReason = $"A new instance creation request has been issued for instance {instanceId} which " +
$"currently has status {metadata.RuntimeStatus}. Since the dedupe statuses of the creation request, " +
$"{(dedupeStatuses == null ? "[]" : string.Join(", ", dedupeStatuses))}, do not contain the orchestration's status, " +
$"the orchestration has been terminated and a new instance with the same instance ID will be created.";

await this.TerminateInstanceAsync(instanceId, terminationReason, cancellation);

while (metadata != null && !metadata.IsCompleted)
{
cancellation.ThrowIfCancellationRequested();
await Task.Delay(TimeSpan.FromSeconds(1), cancellation);
metadata = await this.GetInstancesAsync(instanceId, getInputsAndOutputs: false, cancellation);
}
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async Task WaitForWorkItemClientConnection()
// Convert OrchestrationIdReusePolicy to dedupeStatuses
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
// So dedupeStatuses = all terminal statuses MINUS replaceableStatus
// So dedupeStatuses = all statuses MINUS replaceableStatus
OrchestrationStatus[]? dedupeStatuses = null;
P.OrchestrationStatus[]? dedupeStatusesProto = ProtoUtils.ConvertReusePolicyToDedupeStatuses(request.OrchestrationIdReusePolicy);
if (dedupeStatusesProto != null)
Expand Down
Loading
Loading