Skip to content

Commit 1f6f829

Browse files
committed
[DEVEX-227] Added missing options in Get state methods of State and Aggregate Store
Also split the Decider information and Decision processing
1 parent 0a5bc23 commit 1f6f829

File tree

4 files changed

+330
-291
lines changed

4 files changed

+330
-291
lines changed

src/Kurrent.Client/Streams/DecisionMaking/AggregateStore.cs

Lines changed: 81 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
using EventStore.Client;
2-
using Kurrent.Client.Core.Serialization;
32
using Kurrent.Client.Streams.GettingState;
43

54
namespace Kurrent.Client.Streams.DecisionMaking;
65

76
public interface IAggregateStore<TAggregate, TEvent>
87
where TAggregate : IAggregate<TEvent> {
9-
Task<StateAtPointInTime<TAggregate>> Get(
8+
Task<StateAtPointInTime<TAggregate>> GetAsync(
109
string streamName,
10+
GetStreamStateOptions<TAggregate>? getStreamStateOptions,
1111
CancellationToken ct = default
1212
);
1313

@@ -36,66 +36,6 @@ Task<IWriteResult> HandleAsync(
3636
public interface IAggregateStore<TAggregate> : IAggregateStore<TAggregate, object>
3737
where TAggregate : IAggregate<object>;
3838

39-
public static class AggregateStoreExtensions {
40-
public static Task<IWriteResult> AddAsync<TAggregate, TEvent>(
41-
this IAggregateStore<TAggregate, TEvent> aggregateStore,
42-
string streamName,
43-
TAggregate aggregate,
44-
CancellationToken ct = default
45-
) where TAggregate : IAggregate<TEvent> =>
46-
aggregateStore.AddAsync(
47-
streamName,
48-
aggregate,
49-
new AppendToStreamOptions { ExpectedStreamState = StreamState.NoStream },
50-
ct
51-
);
52-
53-
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
54-
this IAggregateStore<TAggregate, TEvent> aggregateStore,
55-
string streamName,
56-
Func<TAggregate, CancellationToken, ValueTask> handle,
57-
CancellationToken ct = default
58-
) where TAggregate : IAggregate<TEvent> =>
59-
aggregateStore.HandleAsync(
60-
streamName,
61-
handle,
62-
new DecideOptions<TAggregate>(),
63-
ct
64-
);
65-
66-
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
67-
this IAggregateStore<TAggregate, TEvent> aggregateStore,
68-
string streamName,
69-
Action<TAggregate> handle,
70-
CancellationToken ct = default
71-
) where TAggregate : IAggregate<TEvent> =>
72-
aggregateStore.HandleAsync(
73-
streamName,
74-
(state, _) => {
75-
handle(state);
76-
return new ValueTask();
77-
},
78-
new DecideOptions<TAggregate>(),
79-
ct
80-
);
81-
82-
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
83-
this IAggregateStore<TAggregate, TEvent> aggregateStore,
84-
string streamName,
85-
Action<TAggregate> handle,
86-
DecideOptions<TAggregate>? decideOption,
87-
CancellationToken ct = default
88-
) where TAggregate : IAggregate<TEvent> =>
89-
aggregateStore.HandleAsync(
90-
streamName,
91-
(state, _) => {
92-
handle(state);
93-
return new ValueTask();
94-
},
95-
decideOption,
96-
ct
97-
);
98-
}
9939

10040
public class AggregateStoreOptions<TState> where TState : notnull {
10141
#if NET48
@@ -111,8 +51,17 @@ public class AggregateStore<TAggregate, TEvent>(KurrentClient client, AggregateS
11151
: IAggregateStore<TAggregate, TEvent>
11252
where TAggregate : IAggregate<TEvent>
11353
where TEvent : notnull {
114-
public virtual Task<StateAtPointInTime<TAggregate>> Get(string streamName, CancellationToken ct = default) =>
115-
client.GetStateAsync(streamName, options.StateBuilder, ct);
54+
public virtual Task<StateAtPointInTime<TAggregate>> GetAsync(
55+
string streamName,
56+
GetStreamStateOptions<TAggregate>? getStreamStateOptions,
57+
CancellationToken ct = default
58+
) =>
59+
client.GetStateAsync(
60+
streamName,
61+
options.StateBuilder,
62+
getStreamStateOptions ?? options.DecideOptions?.GetStateOptions,
63+
ct
64+
);
11665

11766
public virtual Task<IWriteResult> AddAsync(
11867
string streamName,
@@ -173,3 +122,71 @@ public virtual Task<IWriteResult> HandleAsync(
173122
public class AggregateStore<TAggregate>(KurrentClient client, AggregateStoreOptions<TAggregate> options)
174123
: AggregateStore<TAggregate, object>(client, options), IAggregateStore<TAggregate>
175124
where TAggregate : IAggregate<object>;
125+
126+
public static class AggregateStoreExtensions {
127+
public static Task<StateAtPointInTime<TAggregate>> GetAsync<TAggregate, TEvent>(
128+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
129+
string streamName,
130+
CancellationToken ct = default
131+
) where TAggregate : IAggregate<TEvent> =>
132+
aggregateStore.GetAsync(streamName, null, ct);
133+
134+
public static Task<IWriteResult> AddAsync<TAggregate, TEvent>(
135+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
136+
string streamName,
137+
TAggregate aggregate,
138+
CancellationToken ct = default
139+
) where TAggregate : IAggregate<TEvent> =>
140+
aggregateStore.AddAsync(
141+
streamName,
142+
aggregate,
143+
new AppendToStreamOptions { ExpectedStreamState = StreamState.NoStream },
144+
ct
145+
);
146+
147+
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
148+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
149+
string streamName,
150+
Func<TAggregate, CancellationToken, ValueTask> handle,
151+
CancellationToken ct = default
152+
) where TAggregate : IAggregate<TEvent> =>
153+
aggregateStore.HandleAsync(
154+
streamName,
155+
handle,
156+
new DecideOptions<TAggregate>(),
157+
ct
158+
);
159+
160+
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
161+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
162+
string streamName,
163+
Action<TAggregate> handle,
164+
CancellationToken ct = default
165+
) where TAggregate : IAggregate<TEvent> =>
166+
aggregateStore.HandleAsync(
167+
streamName,
168+
(state, _) => {
169+
handle(state);
170+
return new ValueTask();
171+
},
172+
new DecideOptions<TAggregate>(),
173+
ct
174+
);
175+
176+
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
177+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
178+
string streamName,
179+
Action<TAggregate> handle,
180+
DecideOptions<TAggregate>? decideOption,
181+
CancellationToken ct = default
182+
) where TAggregate : IAggregate<TEvent> =>
183+
aggregateStore.HandleAsync(
184+
streamName,
185+
(state, _) => {
186+
handle(state);
187+
return new ValueTask();
188+
},
189+
decideOption,
190+
ct
191+
);
192+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
using EventStore.Client;
2+
using Kurrent.Client.Streams.GettingState;
3+
using Polly;
4+
5+
namespace Kurrent.Client.Streams.DecisionMaking;
6+
7+
using static AsyncDecider;
8+
9+
public class DecideOptions<TState> where TState : notnull {
10+
public GetStreamStateOptions<TState>? GetStateOptions { get; set; }
11+
public AppendToStreamOptions? AppendToStreamOptions { get; set; }
12+
public IAsyncPolicy<IWriteResult>? RetryPolicy { get; set; }
13+
}
14+
15+
public static class KurrentClientDecisionMakingExtensions {
16+
public static Task<IWriteResult> DecideAsync<TState>(
17+
this KurrentClient eventStore,
18+
string streamName,
19+
CommandHandler<TState> decide,
20+
IStateBuilder<TState> stateBuilder,
21+
DecideOptions<TState>? options,
22+
CancellationToken cancellationToken = default
23+
) where TState : notnull =>
24+
DecideRetryPolicy(options).ExecuteAsync(
25+
async ct => {
26+
var (state, streamPosition, position) =
27+
await eventStore.GetStateAsync(streamName, stateBuilder, options?.GetStateOptions, ct);
28+
29+
var messages = await decide(state, ct);
30+
31+
if (messages.Length == 0) {
32+
return new SuccessResult(
33+
streamPosition.HasValue
34+
? StreamRevision.FromStreamPosition(streamPosition.Value)
35+
: StreamRevision.None,
36+
position ?? Position.Start
37+
);
38+
}
39+
40+
var appendToStreamOptions = options?.AppendToStreamOptions ?? new AppendToStreamOptions();
41+
42+
if (streamPosition.HasValue)
43+
appendToStreamOptions.ExpectedStreamRevision ??=
44+
StreamRevision.FromStreamPosition(streamPosition.Value);
45+
else
46+
appendToStreamOptions.ExpectedStreamState ??= StreamState.NoStream;
47+
48+
return await eventStore.AppendToStreamAsync(
49+
streamName,
50+
messages,
51+
appendToStreamOptions,
52+
ct
53+
);
54+
},
55+
cancellationToken
56+
);
57+
58+
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
59+
this KurrentClient eventStore,
60+
string streamName,
61+
TCommand command,
62+
Decider<TState, TCommand> decider,
63+
CancellationToken ct
64+
) where TState : notnull =>
65+
eventStore.DecideAsync(
66+
streamName,
67+
command,
68+
decider.ToAsyncDecider(),
69+
ct
70+
);
71+
72+
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
73+
this KurrentClient eventStore,
74+
string streamName,
75+
TCommand command,
76+
Decider<TState, TCommand> decider,
77+
DecideOptions<TState>? options,
78+
CancellationToken ct
79+
) where TState : notnull =>
80+
eventStore.DecideAsync(
81+
streamName,
82+
command,
83+
decider.ToAsyncDecider(),
84+
options,
85+
ct
86+
);
87+
88+
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
89+
this KurrentClient eventStore,
90+
string streamName,
91+
TCommand command,
92+
AsyncDecider<TState, TCommand> asyncDecider,
93+
CancellationToken ct
94+
) where TState : notnull =>
95+
eventStore.DecideAsync(
96+
streamName,
97+
(state, token) => asyncDecider.Decide(command, state, token),
98+
asyncDecider,
99+
ct
100+
);
101+
102+
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
103+
this KurrentClient eventStore,
104+
string streamName,
105+
TCommand command,
106+
AsyncDecider<TState, TCommand> asyncDecider,
107+
DecideOptions<TState>? options,
108+
CancellationToken ct
109+
) where TState : notnull =>
110+
eventStore.DecideAsync(
111+
streamName,
112+
(state, token) => asyncDecider.Decide(command, state, token),
113+
asyncDecider,
114+
options,
115+
ct
116+
);
117+
118+
public static Task<IWriteResult> DecideAsync<TState>(
119+
this KurrentClient eventStore,
120+
string streamName,
121+
CommandHandler<TState> handle,
122+
IStateBuilder<TState> stateBuilder,
123+
CancellationToken ct = default
124+
) where TState : notnull =>
125+
eventStore.DecideAsync(
126+
streamName,
127+
handle,
128+
stateBuilder,
129+
new DecideOptions<TState>(),
130+
ct
131+
);
132+
133+
public static Task<IWriteResult> DecideAsync<TState, TEvent>(
134+
this KurrentClient eventStore,
135+
string streamName,
136+
CommandHandler<TState> handle,
137+
CancellationToken ct = default
138+
) where TState : IState<TEvent>, new() =>
139+
eventStore.DecideAsync(
140+
streamName,
141+
handle,
142+
StateBuilder.For<TState, TEvent>(),
143+
new DecideOptions<TState>(),
144+
ct
145+
);
146+
}
147+
148+
public static class AsyncDecider {
149+
public static readonly IAsyncPolicy<IWriteResult> DefaultRetryPolicy =
150+
Policy<IWriteResult>
151+
.Handle<WrongExpectedVersionException>()
152+
.WaitAndRetryAsync(
153+
retryCount: 3,
154+
sleepDurationProvider: retryAttempt => TimeSpan.FromMilliseconds(20 * retryAttempt)
155+
);
156+
157+
public static bool HasUserProvidedExpectedVersioning(AppendToStreamOptions? options) =>
158+
options != null && (options.ExpectedStreamState.HasValue || options.ExpectedStreamRevision.HasValue);
159+
160+
public static IAsyncPolicy<IWriteResult> DecideRetryPolicy<TState>(DecideOptions<TState>? options)
161+
where TState : notnull =>
162+
options?.RetryPolicy ??
163+
(HasUserProvidedExpectedVersioning(options?.AppendToStreamOptions)
164+
// it doesn't make sense to retry, as expected state will be always the same
165+
? Policy.NoOpAsync<IWriteResult>()
166+
: DefaultRetryPolicy);
167+
}

0 commit comments

Comments
 (0)