@@ -220,7 +220,7 @@ impl AggregationUpdateJob {
220220#[ derive( Default , Serialize , Deserialize , Clone , Debug ) ]
221221pub struct AggregatedDataUpdate {
222222 /// One of the inner tasks has changed its dirty state or aggregated dirty state.
223- dirty_container_update : Option < ( TaskId , DirtyContainerCount ) > ,
223+ dirty_container_update : Option < ( TaskId , i32 , Option < ( SessionId , i32 ) > ) > ,
224224 /// One of the inner tasks has changed its collectibles count or aggregated collectibles count.
225225 collectibles_update : Vec < ( CollectibleRef , i32 ) > ,
226226}
@@ -261,10 +261,8 @@ impl AggregatedDataUpdate {
261261 } = dirty_container_count;
262262 result = result. dirty_container_update (
263263 task. id ( ) ,
264- DirtyContainerCount {
265- count : if count > 0 { 1 } else { 0 } ,
266- count_in_session : count_in_session. map ( |( s, c) | ( s, if c > 0 { 1 } else { 0 } ) ) ,
267- } ,
264+ if count > 0 { 1 } else { 0 } ,
265+ count_in_session. map ( |( s, c) | ( s, if count > 0 && c == 0 { 1 } else { 0 } ) ) ,
268266 ) ;
269267 }
270268 result
@@ -276,8 +274,11 @@ impl AggregatedDataUpdate {
276274 dirty_container_update,
277275 collectibles_update,
278276 } = & mut self ;
279- if let Some ( ( _, value) ) = dirty_container_update. as_mut ( ) {
280- * value = value. negate ( )
277+ if let Some ( ( _, value, session_dependent_clean_update) ) = dirty_container_update. as_mut ( ) {
278+ * value = -* value;
279+ if let Some ( ( _, value) ) = session_dependent_clean_update. as_mut ( ) {
280+ * value = -* value;
281+ }
281282 }
282283 for ( _, value) in collectibles_update. iter_mut ( ) {
283284 * value = -* value;
@@ -299,11 +300,14 @@ impl AggregatedDataUpdate {
299300 collectibles_update,
300301 } = self ;
301302 let mut result = Self :: default ( ) ;
302- if let Some ( ( dirty_container_id, count) ) = dirty_container_update {
303+ if let Some ( ( dirty_container_id, count, session_dependent_clean_update) ) =
304+ dirty_container_update
305+ {
303306 if should_track_activeness {
304307 // When a dirty container count is increased and the task is considered as active
305308 // we need to schedule the dirty tasks in the new dirty container
306- let current_session_update = count. get ( session_id) ;
309+ let clean = get_clean ( * session_dependent_clean_update, session_id) ;
310+ let current_session_update = count - clean;
307311 if current_session_update > 0 && task. has_key ( & CachedDataItemKey :: Activeness { } ) {
308312 queue. push_find_and_schedule_dirty ( * dirty_container_id)
309313 }
@@ -317,7 +321,11 @@ impl AggregatedDataUpdate {
317321 } ,
318322 |old: Option <DirtyContainerCount >| {
319323 let mut new = old. unwrap_or_default( ) ;
320- aggregated_update = new. update_count( count) ;
324+ aggregated_update =
325+ new. update_count( & DirtyContainerCount :: from_session_dependent_clean(
326+ * count,
327+ * session_dependent_clean_update,
328+ ) ) ;
321329 ( !new. is_zero( ) ) . then_some( new)
322330 }
323331 ) ;
@@ -337,12 +345,17 @@ impl AggregatedDataUpdate {
337345 new. undo_update_with_dirtyness_and_session( dirtyness, clean_in_session) ;
338346 }
339347 if !aggregated_update. is_zero( ) {
340- result. dirty_container_update = Some ( ( task_id, aggregated_update) ) ;
348+ result. dirty_container_update = Some ( (
349+ task_id,
350+ aggregated_update. count,
351+ aggregated_update. session_dependent_clean( ) ,
352+ ) ) ;
341353 }
342354 ( !new. is_zero( ) ) . then_some( new)
343355 } ) ;
344- if let Some ( ( _, count) ) = result. dirty_container_update . as_ref ( )
345- && count. get ( session_id) < 0
356+ if let Some ( ( _, count, session_dependent_clean) ) =
357+ result. dirty_container_update . as_ref ( )
358+ && count - get_clean ( * session_dependent_clean, session_id) < 0
346359 {
347360 // When the current task is no longer dirty, we need to fire the
348361 // aggregate root events and do some cleanup
@@ -421,8 +434,13 @@ impl AggregatedDataUpdate {
421434 }
422435
423436 /// Adds a dirty container update to the update.
424- pub fn dirty_container_update ( mut self , task_id : TaskId , count : DirtyContainerCount ) -> Self {
425- self . dirty_container_update = Some ( ( task_id, count) ) ;
437+ pub fn dirty_container_update (
438+ mut self ,
439+ task_id : TaskId ,
440+ count : i32 ,
441+ session_dependent_clean_update : Option < ( SessionId , i32 ) > ,
442+ ) -> Self {
443+ self . dirty_container_update = Some ( ( task_id, count, session_dependent_clean_update) ) ;
426444 self
427445 }
428446
@@ -433,6 +451,12 @@ impl AggregatedDataUpdate {
433451 }
434452}
435453
454+ fn get_clean (
455+ session_dependent_clean_update : Option < ( SessionId , i32 ) > ,
456+ session_id : SessionId ,
457+ ) -> i32 {
458+ session_dependent_clean_update. map_or ( 0 , |( s, c) | if s == session_id { c } else { 0 } )
459+ }
436460/// An aggregation number update job that is enqueued.
437461#[ derive( Serialize , Deserialize , Clone ) ]
438462struct AggregationNumberUpdate {
0 commit comments