@@ -150,6 +150,29 @@ public class GetStreamStateOptions<TState> : ReadStreamOptions where TState : no
150150}
151151
152152public static class KurrentClientGettingStateClientExtensions {
153+ public static async Task < StateAtPointInTime < TState > > GetStateAsync < TState > (
154+ this KurrentClient eventStore ,
155+ string streamName ,
156+ IStateBuilder < TState > stateBuilder ,
157+ GetStreamStateOptions < TState > ? options ,
158+ CancellationToken ct = default
159+ ) where TState : notnull {
160+ StateAtPointInTime < TState > ? stateAtPointInTime = null ;
161+
162+ options ??= new GetStreamStateOptions < TState > ( ) ;
163+
164+ if ( options . GetSnapshot != null )
165+ stateAtPointInTime = await options . GetSnapshot (
166+ GetSnapshotOptions . ForStream ( streamName ) ,
167+ ct
168+ ) ;
169+
170+ options . StreamPosition = stateAtPointInTime ? . LastStreamPosition ?? StreamPosition . Start ;
171+
172+ return await eventStore . ReadStreamAsync ( streamName , options , ct )
173+ . GetStateAsync ( stateBuilder , ct ) ;
174+ }
175+
153176 public static async Task < StateAtPointInTime < TState > > GetStateAsync < TState > (
154177 this IAsyncEnumerable < ResolvedEvent > messages ,
155178 TState initialState ,
@@ -178,21 +201,33 @@ public static async IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TS
178201 this IAsyncEnumerable < ResolvedEvent > messages ,
179202 TState initialState ,
180203 Func < TState , ResolvedEvent , TState > evolve ,
204+ Func < ResolvedEvent , string > ? getProjectedId ,
181205 [ EnumeratorCancellation ] CancellationToken ct
182206 ) where TState : notnull {
183- var state = initialState ;
184-
185207 if ( messages is KurrentClient . ReadStreamResult readStreamResult ) {
186208 if ( await readStreamResult . ReadState . ConfigureAwait ( false ) == ReadState . StreamNotFound ) {
187- yield return new StateAtPointInTime < TState > ( state ) ;
209+ yield return new StateAtPointInTime < TState > ( initialState ) ;
188210
189211 yield break ;
190212 }
191213 }
192214
215+ var states = new Dictionary < string , TState > ( ) ;
216+
217+ getProjectedId ??= resolvedEvent => resolvedEvent . OriginalStreamId ;
218+
193219 await foreach ( var resolvedEvent in messages . WithCancellation ( ct ) ) {
220+ var projectedId = getProjectedId ( resolvedEvent ) ;
221+ #if NET48
222+ var state = states . TryGetValue ( projectedId , out TState ? value ) ? value : initialState ;
223+ #else
224+ var state = states . GetValueOrDefault ( projectedId , initialState ) ;
225+ #endif
226+
194227 state = evolve ( state , resolvedEvent ) ;
195228
229+ states [ projectedId ] = state ;
230+
196231 yield return new StateAtPointInTime < TState > (
197232 state ,
198233 resolvedEvent . Event . EventNumber ,
@@ -201,29 +236,13 @@ [EnumeratorCancellation] CancellationToken ct
201236 }
202237 }
203238
204- public static async Task < StateAtPointInTime < TState > > GetStateAsync < TState > (
205- this KurrentClient eventStore ,
206- string streamName ,
207- IStateBuilder < TState > stateBuilder ,
208- GetStreamStateOptions < TState > ? options ,
209- CancellationToken ct = default
210- ) where TState : notnull {
211- StateAtPointInTime < TState > ? stateAtPointInTime = null ;
212-
213- options ??= new GetStreamStateOptions < TState > ( ) ;
214-
215- if ( options . GetSnapshot != null ) {
216- stateAtPointInTime = await options . GetSnapshot (
217- GetSnapshotOptions . ForStream ( streamName ) ,
218- ct
219- ) ;
220- }
221-
222- options . StreamPosition = stateAtPointInTime ? . LastStreamPosition ?? StreamPosition . Start ;
223-
224- return await eventStore . ReadStreamAsync ( streamName , options , ct )
225- . GetStateAsync ( stateBuilder , ct ) ;
226- }
239+ public static IAsyncEnumerable < StateAtPointInTime < TState > > ProjectState < TState > (
240+ this IAsyncEnumerable < ResolvedEvent > messages ,
241+ TState initialState ,
242+ Func < TState , ResolvedEvent , TState > evolve ,
243+ CancellationToken ct
244+ ) where TState : notnull =>
245+ messages . ProjectState ( initialState , evolve , null , ct ) ;
227246
228247 public static Task < StateAtPointInTime < TState > > GetStateAsync < TState > (
229248 this KurrentClient eventStore ,
0 commit comments