|
1 | 1 | using EventStore.Client; |
2 | 2 | using Kurrent.Client.Core.Serialization; |
3 | 3 | using Kurrent.Client.Streams.GettingState; |
| 4 | +using Polly; |
| 5 | +using Polly.Retry; |
4 | 6 |
|
5 | 7 | namespace Kurrent.Client.Streams.DecisionMaking; |
6 | 8 |
|
| 9 | +using static AsyncDecider; |
| 10 | + |
7 | 11 | public delegate ValueTask<Message[]> CommandHandler<in TState>(TState state, CancellationToken ct = default); |
8 | 12 |
|
9 | 13 | public record AsyncDecider<TState, TCommand>( |
@@ -56,43 +60,51 @@ resolvedEvent.DeserializedData is TEvent @event |
56 | 60 | public class DecideOptions<TState> where TState : notnull { |
57 | 61 | public GetStreamStateOptions<TState>? GetStateOptions { get; set; } |
58 | 62 | public AppendToStreamOptions? AppendToStreamOptions { get; set; } |
| 63 | + public IAsyncPolicy<IWriteResult>? RetryPolicy { get; set; } |
59 | 64 | } |
60 | 65 |
|
61 | 66 | public static class KurrentClientDecisionMakingExtensions { |
62 | | - public static async Task<IWriteResult> DecideAsync<TState>( |
| 67 | + public static Task<IWriteResult> DecideAsync<TState>( |
63 | 68 | this KurrentClient eventStore, |
64 | 69 | string streamName, |
65 | 70 | CommandHandler<TState> decide, |
66 | 71 | IStateBuilder<TState> stateBuilder, |
67 | 72 | DecideOptions<TState>? options, |
68 | | - CancellationToken ct = default |
69 | | - ) where TState : notnull { |
70 | | - var (state, streamPosition, position) = |
71 | | - await eventStore.GetStateAsync(streamName, stateBuilder, options?.GetStateOptions, ct); |
72 | | - |
73 | | - var events = await decide(state, ct); |
74 | | - |
75 | | - if (events.Length == 0) { |
76 | | - return new SuccessResult( |
77 | | - streamPosition.HasValue ? StreamRevision.FromStreamPosition(streamPosition.Value) : StreamRevision.None, |
78 | | - position ?? Position.Start |
79 | | - ); |
80 | | - } |
81 | | - |
82 | | - var appendToStreamOptions = options?.AppendToStreamOptions ?? new AppendToStreamOptions(); |
83 | | - |
84 | | - if (streamPosition.HasValue) |
85 | | - appendToStreamOptions.ExpectedStreamRevision = StreamRevision.FromStreamPosition(streamPosition.Value); |
86 | | - else |
87 | | - appendToStreamOptions.ExpectedStreamState = StreamState.NoStream; |
88 | | - |
89 | | - return await eventStore.AppendToStreamAsync( |
90 | | - streamName, |
91 | | - events.Cast<object>(), |
92 | | - appendToStreamOptions, |
93 | | - cancellationToken: ct |
| 73 | + CancellationToken cancellationToken = default |
| 74 | + ) where TState : notnull => |
| 75 | + DecideRetryPolicy(options).ExecuteAsync( |
| 76 | + async ct => { |
| 77 | + var (state, streamPosition, position) = |
| 78 | + await eventStore.GetStateAsync(streamName, stateBuilder, options?.GetStateOptions, ct); |
| 79 | + |
| 80 | + var messages = await decide(state, ct); |
| 81 | + |
| 82 | + if (messages.Length == 0) { |
| 83 | + return new SuccessResult( |
| 84 | + streamPosition.HasValue |
| 85 | + ? StreamRevision.FromStreamPosition(streamPosition.Value) |
| 86 | + : StreamRevision.None, |
| 87 | + position ?? Position.Start |
| 88 | + ); |
| 89 | + } |
| 90 | + |
| 91 | + var appendToStreamOptions = options?.AppendToStreamOptions ?? new AppendToStreamOptions(); |
| 92 | + |
| 93 | + if (streamPosition.HasValue) |
| 94 | + appendToStreamOptions.ExpectedStreamRevision ??= |
| 95 | + StreamRevision.FromStreamPosition(streamPosition.Value); |
| 96 | + else |
| 97 | + appendToStreamOptions.ExpectedStreamState ??= StreamState.NoStream; |
| 98 | + |
| 99 | + return await eventStore.AppendToStreamAsync( |
| 100 | + streamName, |
| 101 | + messages, |
| 102 | + appendToStreamOptions, |
| 103 | + ct |
| 104 | + ); |
| 105 | + }, |
| 106 | + cancellationToken |
94 | 107 | ); |
95 | | - } |
96 | 108 |
|
97 | 109 | public static Task<IWriteResult> DecideAsync<TState, TCommand>( |
98 | 110 | this KurrentClient eventStore, |
@@ -183,3 +195,24 @@ public static Task<IWriteResult> DecideAsync<TState, TEvent>( |
183 | 195 | ct |
184 | 196 | ); |
185 | 197 | } |
| 198 | + |
| 199 | +public static class AsyncDecider { |
| 200 | + public static readonly IAsyncPolicy<IWriteResult> DefaultRetryPolicy = |
| 201 | + Policy<IWriteResult> |
| 202 | + .Handle<WrongExpectedVersionException>() |
| 203 | + .WaitAndRetryAsync( |
| 204 | + retryCount: 3, |
| 205 | + sleepDurationProvider: retryAttempt => TimeSpan.FromMilliseconds(20 * retryAttempt) |
| 206 | + ); |
| 207 | + |
| 208 | + public static bool HasUserProvidedExpectedVersioning(AppendToStreamOptions? options) => |
| 209 | + options != null && (options.ExpectedStreamState.HasValue || options.ExpectedStreamRevision.HasValue); |
| 210 | + |
| 211 | + public static IAsyncPolicy<IWriteResult> DecideRetryPolicy<TState>(DecideOptions<TState>? options) |
| 212 | + where TState : notnull => |
| 213 | + options?.RetryPolicy ?? |
| 214 | + (HasUserProvidedExpectedVersioning(options?.AppendToStreamOptions) |
| 215 | + // it doesn't make sense to retry, as expected state will be always the same |
| 216 | + ? Policy.NoOpAsync<IWriteResult>() |
| 217 | + : DefaultRetryPolicy); |
| 218 | +} |
0 commit comments