Skip to content

Commit 4444c8c

Browse files
committed
[DEVEX-227] Added Polly to support retries of business logic
1 parent efdf35b commit 4444c8c

File tree

4 files changed

+47
-14
lines changed

4 files changed

+47
-14
lines changed

src/Kurrent.Client/Kurrent.Client.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.0"/>
1313
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="9.0.0"/>
1414
<PackageReference Include="System.Linq.Async" Version="6.0.1"/>
15-
<PackageReference Include="OpenTelemetry.Api" Version="1.10.0" />
15+
<PackageReference Include="OpenTelemetry.Api" Version="1.11.2" />
16+
<PackageReference Include="Polly" Version="8.5.2" />
1617

1718
<PackageReference Include="Grpc.Tools" Version="2.68.1">
1819
<PrivateAssets>all</PrivateAssets>

src/Kurrent.Client/Streams/DecisionMaking/Aggregate.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ public interface IAggregate<in TEvent> : IState<TEvent> {
77
Message[] DequeueUncommittedMessages();
88
}
99

10-
public interface IAggregate : IAggregate<object>;
10+
public interface IAggregate : IAggregate<object>, IState;
1111

1212
public class Aggregate : Aggregate<object>, IAggregate;
1313

@@ -24,9 +24,9 @@ Message[] IAggregate<TEvent>.DequeueUncommittedMessages() {
2424
return dequeuedEvents;
2525
}
2626

27-
protected void Enqueue(TEvent message) {
28-
Apply(message);
29-
_uncommittedEvents.Enqueue(Message.From(message));
27+
protected void Enqueue(TEvent @event) {
28+
Apply(@event);
29+
_uncommittedEvents.Enqueue(Message.From(@event));
3030
}
3131

3232
protected void Enqueue(Message message) {

src/Kurrent.Client/Streams/DecisionMaking/StateStore.cs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public static Task<IWriteResult> AddAsync<TState>(
5555
null,
5656
ct
5757
);
58-
58+
5959
public static Task<IWriteResult> AddAsync<TState>(
6060
this IStateStore<TState> stateStore,
6161
string streamName,
@@ -68,13 +68,14 @@ public static Task<IWriteResult> AddAsync<TState>(
6868
new AppendToStreamOptions { ExpectedStreamState = StreamState.NoStream },
6969
ct
7070
);
71-
71+
7272
public static Task<IWriteResult> AddAsync<TState, TEvent>(
7373
this IStateStore<TState> stateStore,
7474
string streamName,
7575
IEnumerable<TEvent> events,
7676
CancellationToken ct = default
77-
) where TState : notnull where TEvent : notnull =>
77+
) where TState : notnull
78+
where TEvent : notnull =>
7879
stateStore.AddAsync(
7980
streamName,
8081
events.Select(e => Message.From(e)),
@@ -87,7 +88,8 @@ public static Task<IWriteResult> UpdateAsync<TState, TEvent>(
8788
string streamName,
8889
IEnumerable<TEvent> events,
8990
CancellationToken ct = default
90-
) where TState : notnull where TEvent : notnull =>
91+
) where TState : notnull
92+
where TEvent : notnull =>
9193
stateStore.UpdateAsync(
9294
streamName,
9395
events.Select(e => Message.From(e)),
@@ -114,7 +116,8 @@ public static Task<IWriteResult> UpdateAsync<TState, TEvent>(
114116
IEnumerable<TEvent> events,
115117
StreamRevision expectedStreamRevision,
116118
CancellationToken ct = default
117-
) where TState : notnull where TEvent : notnull =>
119+
) where TState : notnull
120+
where TEvent : notnull =>
118121
stateStore.UpdateAsync(
119122
streamName,
120123
events.Select(e => Message.From(e)),
@@ -154,7 +157,21 @@ public static Task<IWriteResult> Handle<TState, TEvent>(
154157
string streamName,
155158
Func<TState, TEvent[]> handle,
156159
CancellationToken ct = default
157-
) where TState : notnull where TEvent : notnull =>
160+
) where TState : notnull
161+
where TEvent : notnull =>
162+
stateStore.Handle(
163+
streamName,
164+
(state, _) => new ValueTask<Message[]>(handle(state).Select(m => Message.From(m)).ToArray()),
165+
null,
166+
ct
167+
);
168+
169+
public static Task<IWriteResult> Handle<TState>(
170+
this IStateStore<TState> stateStore,
171+
string streamName,
172+
Func<TState, object[]> handle,
173+
CancellationToken ct = default
174+
) where TState : notnull =>
158175
stateStore.Handle(
159176
streamName,
160177
(state, _) => new ValueTask<Message[]>(handle(state).Select(m => Message.From(m)).ToArray()),
@@ -168,7 +185,22 @@ public static Task<IWriteResult> Handle<TState, TEvent>(
168185
Func<TState, TEvent[]> handle,
169186
DecideOptions<TState>? decideOptions,
170187
CancellationToken ct = default
171-
) where TState : notnull where TEvent : notnull =>
188+
) where TState : notnull
189+
where TEvent : notnull =>
190+
stateStore.Handle(
191+
streamName,
192+
(state, _) => new ValueTask<Message[]>(handle(state).Select(m => Message.From(m)).ToArray()),
193+
decideOptions,
194+
ct
195+
);
196+
197+
public static Task<IWriteResult> Handle<TState>(
198+
this IStateStore<TState> stateStore,
199+
string streamName,
200+
Func<TState, object[]> handle,
201+
DecideOptions<TState>? decideOptions,
202+
CancellationToken ct = default
203+
) where TState : notnull =>
172204
stateStore.Handle(
173205
streamName,
174206
(state, _) => new ValueTask<Message[]>(handle(state).Select(m => Message.From(m)).ToArray()),
@@ -179,7 +211,7 @@ public static Task<IWriteResult> Handle<TState, TEvent>(
179211

180212
public class StateStore<TState>(KurrentClient client, StateStoreOptions<TState> options)
181213
: IStateStore<TState>
182-
where TState : notnull{
214+
where TState : notnull {
183215
public virtual Task<StateAtPointInTime<TState>> Get(string streamName, CancellationToken ct = default) =>
184216
client.GetStateAsync(streamName, options.StateBuilder, options.GetStreamStateOptions, ct);
185217

test/Kurrent.Client.Tests.Common/Kurrent.Client.Tests.Common.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<ItemGroup>
1111
<PackageReference Include="Ductus.FluentDocker" Version="2.10.59"/>
1212
<PackageReference Include="Humanizer.Core" Version="2.14.1"/>
13-
<PackageReference Include="Polly" Version="8.5.0"/>
13+
<PackageReference Include="Polly" Version="8.5.2"/>
1414
<PackageReference Include="Polly.Contrib.WaitAndRetry" Version="1.1.1"/>
1515
<PackageReference Include="Serilog" Version="4.2.0"/>
1616
<PackageReference Include="Serilog.AspNetCore" Version="9.0.0"/>

0 commit comments

Comments
 (0)