Skip to content

Commit efdf35b

Browse files
committed
[DEVEX-227] Added AppendToStream options to Decide options
Made also other smaller improvements like: - removing event from state store interface, - made both StateStore and AggregateStore method virtual, - decoupled Decider from AsyncDecider moving the mapping method to extension classes.
1 parent 48cec84 commit efdf35b

File tree

3 files changed

+177
-71
lines changed

3 files changed

+177
-71
lines changed

src/Kurrent.Client/Streams/DecisionMaking/AggregateStore.cs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,27 @@ public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
9494
);
9595
}
9696

97+
public interface IAggregateStore<TAggregate> : IAggregateStore<TAggregate, object>
98+
where TAggregate : IAggregate<object>;
99+
100+
public class AggregateStoreOptions<TState> where TState : notnull {
101+
#if NET48
102+
public IStateBuilder<TState> StateBuilder { get; set; } = null!;
103+
#else
104+
public required IStateBuilder<TState> StateBuilder { get; set; }
105+
#endif
106+
107+
public DecideOptions<TState>? DecideOptions { get; set; }
108+
}
109+
97110
public class AggregateStore<TAggregate, TEvent>(KurrentClient client, AggregateStoreOptions<TAggregate> options)
98111
: IAggregateStore<TAggregate, TEvent>
99112
where TAggregate : IAggregate<TEvent>
100113
where TEvent : notnull {
101-
public Task<StateAtPointInTime<TAggregate>> Get(string streamName, CancellationToken ct = default) =>
114+
public virtual Task<StateAtPointInTime<TAggregate>> Get(string streamName, CancellationToken ct = default) =>
102115
client.GetStateAsync(streamName, options.StateBuilder, ct);
103116

104-
public Task<IWriteResult> AddAsync(
117+
public virtual Task<IWriteResult> AddAsync(
105118
string streamName,
106119
TAggregate aggregate,
107120
AppendToStreamOptions? appendToStreamOptions,
@@ -112,10 +125,15 @@ public Task<IWriteResult> AddAsync(
112125
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
113126
appendToStreamOptions.ExpectedStreamState = StreamState.NoStream;
114127

115-
return client.AppendToStreamAsync(streamName, aggregate.DequeueUncommittedMessages(), appendToStreamOptions, ct);
128+
return client.AppendToStreamAsync(
129+
streamName,
130+
aggregate.DequeueUncommittedMessages(),
131+
appendToStreamOptions,
132+
ct
133+
);
116134
}
117135

118-
public Task<IWriteResult> UpdateAsync(
136+
public virtual Task<IWriteResult> UpdateAsync(
119137
string streamName,
120138
TAggregate aggregate,
121139
AppendToStreamOptions? appendToStreamOptions,
@@ -126,10 +144,15 @@ public Task<IWriteResult> UpdateAsync(
126144
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
127145
appendToStreamOptions.ExpectedStreamState = StreamState.StreamExists;
128146

129-
return client.AppendToStreamAsync(streamName, aggregate.DequeueUncommittedMessages(), appendToStreamOptions, ct);
147+
return client.AppendToStreamAsync(
148+
streamName,
149+
aggregate.DequeueUncommittedMessages(),
150+
appendToStreamOptions,
151+
ct
152+
);
130153
}
131154

132-
public Task<IWriteResult> HandleAsync(
155+
public virtual Task<IWriteResult> HandleAsync(
133156
string streamName,
134157
Func<TAggregate, CancellationToken, ValueTask> handle,
135158
DecideOptions<TAggregate>? decideOption,
@@ -146,3 +169,7 @@ public Task<IWriteResult> HandleAsync(
146169
ct
147170
);
148171
}
172+
173+
public class AggregateStore<TAggregate>(KurrentClient client, AggregateStoreOptions<TAggregate> options)
174+
: AggregateStore<TAggregate, object>(client, options), IAggregateStore<TAggregate>
175+
where TAggregate : IAggregate<object>;

src/Kurrent.Client/Streams/DecisionMaking/Decider.cs

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,52 +13,64 @@ Func<TState> GetInitialState
1313
) : StateBuilder<TState>(
1414
Evolve,
1515
GetInitialState
16-
) where TState : notnull;
16+
)
17+
where TState : notnull;
1718

1819
public record AsyncDecider<TState>(
1920
Func<object, TState, CancellationToken, ValueTask<Message[]>> Decide,
2021
Func<TState, ResolvedEvent, TState> Evolve,
2122
Func<TState> GetInitialState
22-
) : AsyncDecider<TState, object>(Decide, Evolve, GetInitialState) where TState : notnull;
23+
) : AsyncDecider<TState, object>(Decide, Evolve, GetInitialState)
24+
where TState : notnull;
2325

2426
public record Decider<TState, TCommand, TEvent>(
2527
Func<TCommand, TState, TEvent[]> Decide,
2628
Func<TState, TEvent, TState> Evolve,
2729
Func<TState> GetInitialState
28-
) where TEvent : notnull where TState : notnull {
29-
public AsyncDecider<TState, TCommand> ToAsyncDecider() =>
30+
) where TEvent : notnull
31+
where TState : notnull;
32+
33+
public record Decider<TState, TCommand>(
34+
Func<TCommand, TState, object[]> Decide,
35+
Func<TState, object, TState> Evolve,
36+
Func<TState> GetInitialState
37+
) : Decider<TState, TCommand, object>(Decide, Evolve, GetInitialState)
38+
where TState : notnull;
39+
40+
public static class AsyncDeciderExtensions {
41+
public static AsyncDecider<TState, TCommand> ToAsyncDecider<TState, TCommand, TEvent>(
42+
this Decider<TState, TCommand, TEvent> decider
43+
) where TEvent : notnull
44+
where TState : notnull =>
3045
new AsyncDecider<TState, TCommand>(
3146
(command, state, _) =>
32-
new ValueTask<Message[]>(Decide(command, state).Select(m => Message.From(m)).ToArray()),
47+
new ValueTask<Message[]>(decider.Decide(command, state).Select(m => Message.From(m)).ToArray()),
3348
(state, resolvedEvent) =>
3449
resolvedEvent.DeserializedData is TEvent @event
35-
? Evolve(state, @event)
50+
? decider.Evolve(state, @event)
3651
: state,
37-
GetInitialState
52+
decider.GetInitialState
3853
);
3954
}
4055

41-
public record Decider<TState, TCommand>(
42-
Func<TCommand, TState, object[]> Decide,
43-
Func<TState, object, TState> Evolve,
44-
Func<TState> GetInitialState
45-
) : Decider<TState, TCommand, object>(Decide, Evolve, GetInitialState) where TState : notnull;
46-
47-
public class DecideOptions<TState> : GetStreamStateOptions<TState> where TState : notnull;
56+
public class DecideOptions<TState> where TState : notnull {
57+
public GetStreamStateOptions<TState>? GetStateOptions { get; set; }
58+
public AppendToStreamOptions? AppendToStreamOptions { get; set; }
59+
}
4860

4961
public static class KurrentClientDecisionMakingExtensions {
5062
public static async Task<IWriteResult> DecideAsync<TState>(
5163
this KurrentClient eventStore,
5264
string streamName,
53-
CommandHandler<TState> handle,
65+
CommandHandler<TState> decide,
5466
IStateBuilder<TState> stateBuilder,
5567
DecideOptions<TState>? options,
5668
CancellationToken ct = default
5769
) where TState : notnull {
5870
var (state, streamPosition, position) =
59-
await eventStore.GetStateAsync(streamName, stateBuilder, options, ct);
71+
await eventStore.GetStateAsync(streamName, stateBuilder, options?.GetStateOptions, ct);
6072

61-
var events = await handle(state, ct);
73+
var events = await decide(state, ct);
6274

6375
if (events.Length == 0) {
6476
return new SuccessResult(
@@ -67,10 +79,12 @@ public static async Task<IWriteResult> DecideAsync<TState>(
6779
);
6880
}
6981

70-
var appendToStreamOptions = streamPosition.HasValue
71-
? new AppendToStreamOptions
72-
{ ExpectedStreamRevision = StreamRevision.FromStreamPosition(streamPosition.Value) }
73-
: new AppendToStreamOptions { ExpectedStreamState = StreamState.NoStream };
82+
var appendToStreamOptions = options?.AppendToStreamOptions ?? new AppendToStreamOptions();
83+
84+
if (streamPosition.HasValue)
85+
appendToStreamOptions.ExpectedStreamRevision = StreamRevision.FromStreamPosition(streamPosition.Value);
86+
else
87+
appendToStreamOptions.ExpectedStreamState = StreamState.NoStream;
7488

7589
return await eventStore.AppendToStreamAsync(
7690
streamName,
@@ -94,6 +108,22 @@ CancellationToken ct
94108
ct
95109
);
96110

111+
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
112+
this KurrentClient eventStore,
113+
string streamName,
114+
TCommand command,
115+
Decider<TState, TCommand> decider,
116+
DecideOptions<TState>? options,
117+
CancellationToken ct
118+
) where TState : notnull =>
119+
eventStore.DecideAsync(
120+
streamName,
121+
command,
122+
decider.ToAsyncDecider(),
123+
options,
124+
ct
125+
);
126+
97127
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
98128
this KurrentClient eventStore,
99129
string streamName,
@@ -108,6 +138,22 @@ CancellationToken ct
108138
ct
109139
);
110140

141+
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
142+
this KurrentClient eventStore,
143+
string streamName,
144+
TCommand command,
145+
AsyncDecider<TState, TCommand> asyncDecider,
146+
DecideOptions<TState>? options,
147+
CancellationToken ct
148+
) where TState : notnull =>
149+
eventStore.DecideAsync(
150+
streamName,
151+
(state, token) => asyncDecider.Decide(command, state, token),
152+
asyncDecider,
153+
options,
154+
ct
155+
);
156+
111157
public static Task<IWriteResult> DecideAsync<TState>(
112158
this KurrentClient eventStore,
113159
string streamName,

0 commit comments

Comments
 (0)