@@ -4,7 +4,7 @@ use std::{
44 hash:: Hash ,
55 mem:: take,
66 num:: NonZeroU32 ,
7- ops:: ControlFlow ,
7+ ops:: { ControlFlow , Deref } ,
88 thread:: yield_now,
99 time:: { Duration , Instant } ,
1010} ;
@@ -216,19 +216,41 @@ impl AggregationUpdateJob {
216216 }
217217}
218218
219+ #[ derive( Default , Serialize , Deserialize , Clone , Copy , Debug ) ]
220+ pub struct SessionDependent < T > {
221+ #[ serde( skip, default ) ]
222+ pub value : T ,
223+ }
224+
225+ impl < T > SessionDependent < T > {
226+ pub fn new ( value : T ) -> Self {
227+ Self { value }
228+ }
229+ }
230+
231+ impl < T > Deref for SessionDependent < T > {
232+ type Target = T ;
233+
234+ fn deref ( & self ) -> & Self :: Target {
235+ & self . value
236+ }
237+ }
238+
219239/// Aggregated data update.
220240#[ derive( Default , Serialize , Deserialize , Clone , Debug ) ]
221241pub struct AggregatedDataUpdate {
222242 /// One of the inner tasks has changed its dirty state or aggregated dirty state.
223- dirty_container_update : Option < ( TaskId , i32 , Option < ( SessionId , i32 ) > ) > ,
243+ /// (task id, dirty update, current session clean update)
244+ // TODO Serialize the current session clean update as 0
245+ dirty_container_update : Option < ( TaskId , i32 , SessionDependent < i32 > ) > ,
224246 /// One of the inner tasks has changed its collectibles count or aggregated collectibles count.
225247 collectibles_update : Vec < ( CollectibleRef , i32 ) > ,
226248}
227249
228250impl AggregatedDataUpdate {
229251 /// Derives an `AggregatedDataUpdate` from a task. This is used when a task is connected to an
230252 /// upper task.
231- fn from_task ( task : & mut impl TaskGuard ) -> Self {
253+ fn from_task ( task : & mut impl TaskGuard , current_session_id : SessionId ) -> Self {
232254 let aggregation = get_aggregation_number ( task) ;
233255 let mut dirty_container_count = Default :: default ( ) ;
234256 let mut collectibles_update: Vec < _ > =
@@ -255,14 +277,21 @@ impl AggregatedDataUpdate {
255277
256278 let mut result = Self :: new ( ) . collectibles_update ( collectibles_update) ;
257279 if !dirty_container_count. is_zero ( ) {
258- let DirtyContainerCount {
259- count,
260- count_in_session,
261- } = dirty_container_count;
262280 result = result. dirty_container_update (
263281 task. id ( ) ,
264- if count > 0 { 1 } else { 0 } ,
265- count_in_session. map ( |( s, c) | ( s, if count > 0 && c == 0 { 1 } else { 0 } ) ) ,
282+ if dirty_container_count. count > 0 {
283+ 1
284+ } else {
285+ 0
286+ } ,
287+ if dirty_container_count. count > 0
288+ && dirty_container_count. current_session_clean ( current_session_id)
289+ >= dirty_container_count. count
290+ {
291+ 1
292+ } else {
293+ 0
294+ } ,
266295 ) ;
267296 }
268297 result
@@ -274,11 +303,9 @@ impl AggregatedDataUpdate {
274303 dirty_container_update,
275304 collectibles_update,
276305 } = & mut self ;
277- if let Some ( ( _, value, session_dependent_clean_update ) ) = dirty_container_update. as_mut ( ) {
306+ if let Some ( ( _, value, current_session_clean_update ) ) = dirty_container_update. as_mut ( ) {
278307 * value = -* value;
279- if let Some ( ( _, value) ) = session_dependent_clean_update. as_mut ( ) {
280- * value = -* value;
281- }
308+ current_session_clean_update. value = -current_session_clean_update. value ;
282309 }
283310 for ( _, value) in collectibles_update. iter_mut ( ) {
284311 * value = -* value;
@@ -291,7 +318,7 @@ impl AggregatedDataUpdate {
291318 fn apply (
292319 & self ,
293320 task : & mut impl TaskGuard ,
294- session_id : SessionId ,
321+ current_session_id : SessionId ,
295322 should_track_activeness : bool ,
296323 queue : & mut AggregationUpdateQueue ,
297324 ) -> AggregatedDataUpdate {
@@ -300,30 +327,30 @@ impl AggregatedDataUpdate {
300327 collectibles_update,
301328 } = self ;
302329 let mut result = Self :: default ( ) ;
303- if let Some ( ( dirty_container_id, count, session_dependent_clean_update) ) =
330+ if let & Some ( ( dirty_container_id, count, session_dependent_clean_update) ) =
304331 dirty_container_update
305332 {
306333 if should_track_activeness {
307334 // When a dirty container count is increased and the task is considered as active
308335 // we need to schedule the dirty tasks in the new dirty container
309- let clean = get_clean ( * session_dependent_clean_update, session_id) ;
310- let current_session_update = count - clean;
336+ let current_session_update = count - * session_dependent_clean_update;
311337 if current_session_update > 0 && task. has_key ( & CachedDataItemKey :: Activeness { } ) {
312- queue. push_find_and_schedule_dirty ( * dirty_container_id)
338+ queue. push_find_and_schedule_dirty ( dirty_container_id)
313339 }
314340 }
315341
316342 let mut aggregated_update = DirtyContainerCount :: default ( ) ;
317343 update ! (
318344 task,
319345 AggregatedDirtyContainer {
320- task: * dirty_container_id
346+ task: dirty_container_id
321347 } ,
322348 |old: Option <DirtyContainerCount >| {
323349 let mut new = old. unwrap_or_default( ) ;
324350 aggregated_update =
325- new. update_count( & DirtyContainerCount :: from_session_dependent_clean(
326- * count,
351+ new. update_count( & DirtyContainerCount :: from_current_session_clean(
352+ count,
353+ current_session_id,
327354 * session_dependent_clean_update,
328355 ) ) ;
329356 ( !new. is_zero( ) ) . then_some( new)
@@ -348,14 +375,15 @@ impl AggregatedDataUpdate {
348375 result. dirty_container_update = Some ( (
349376 task_id,
350377 aggregated_update. count,
351- aggregated_update. session_dependent_clean( ) ,
378+ SessionDependent :: new(
379+ aggregated_update. current_session_clean( current_session_id) ,
380+ ) ,
352381 ) ) ;
353382 }
354383 ( !new. is_zero( ) ) . then_some( new)
355384 } ) ;
356- if let Some ( ( _, count, session_dependent_clean) ) =
357- result. dirty_container_update . as_ref ( )
358- && count - get_clean ( * session_dependent_clean, session_id) < 0
385+ if let Some ( ( _, count, current_session_clean) ) = result. dirty_container_update
386+ && count - * current_session_clean < 0
359387 {
360388 // When the current task is no longer dirty, we need to fire the
361389 // aggregate root events and do some cleanup
@@ -438,9 +466,13 @@ impl AggregatedDataUpdate {
438466 mut self ,
439467 task_id : TaskId ,
440468 count : i32 ,
441- session_dependent_clean_update : Option < ( SessionId , i32 ) > ,
469+ current_session_clean_update : i32 ,
442470 ) -> Self {
443- self . dirty_container_update = Some ( ( task_id, count, session_dependent_clean_update) ) ;
471+ self . dirty_container_update = Some ( (
472+ task_id,
473+ count,
474+ SessionDependent :: new ( current_session_clean_update) ,
475+ ) ) ;
444476 self
445477 }
446478
@@ -451,12 +483,6 @@ impl AggregatedDataUpdate {
451483 }
452484}
453485
454- fn get_clean (
455- session_dependent_clean_update : Option < ( SessionId , i32 ) > ,
456- session_id : SessionId ,
457- ) -> i32 {
458- session_dependent_clean_update. map_or ( 0 , |( s, c) | if s == session_id { c } else { 0 } )
459- }
460486/// An aggregation number update job that is enqueued.
461487#[ derive( Serialize , Deserialize , Clone ) ]
462488struct AggregationNumberUpdate {
@@ -1065,7 +1091,7 @@ impl AggregationUpdateQueue {
10651091 }
10661092 // When this is a new inner node, update aggregated data and
10671093 // followers
1068- let data = AggregatedDataUpdate :: from_task ( & mut task) ;
1094+ let data = AggregatedDataUpdate :: from_task ( & mut task, ctx . session_id ( ) ) ;
10691095 let followers = get_followers ( & task) ;
10701096 let diff = data. apply (
10711097 & mut upper,
@@ -1162,7 +1188,8 @@ impl AggregationUpdateQueue {
11621188
11631189 // Since this is no longer an inner node, update the aggregated data and
11641190 // followers
1165- let data = AggregatedDataUpdate :: from_task ( & mut task) . invert ( ) ;
1191+ let data =
1192+ AggregatedDataUpdate :: from_task ( & mut task, ctx. session_id ( ) ) . invert ( ) ;
11661193 let followers = get_followers ( & task) ;
11671194 let diff = data. apply (
11681195 & mut upper,
@@ -1335,7 +1362,8 @@ impl AggregationUpdateQueue {
13351362 follower_in_upper
13361363 } ) ;
13371364 if !removed_uppers. is_empty ( ) {
1338- let data = AggregatedDataUpdate :: from_task ( & mut follower) . invert ( ) ;
1365+ let data =
1366+ AggregatedDataUpdate :: from_task ( & mut follower, ctx. session_id ( ) ) . invert ( ) ;
13391367 let followers = get_followers ( & follower) ;
13401368 drop ( follower) ;
13411369
@@ -1497,7 +1525,8 @@ impl AggregationUpdateQueue {
14971525 Some ( old - 1 )
14981526 } ) ;
14991527 if remove_upper {
1500- let data = AggregatedDataUpdate :: from_task ( & mut follower) . invert ( ) ;
1528+ let data =
1529+ AggregatedDataUpdate :: from_task ( & mut follower, ctx. session_id ( ) ) . invert ( ) ;
15011530 let followers = get_followers ( & follower) ;
15021531 drop ( follower) ;
15031532
@@ -1735,7 +1764,7 @@ impl AggregationUpdateQueue {
17351764 self . push_optimize_task ( new_follower_id) ;
17361765 }
17371766
1738- let data = AggregatedDataUpdate :: from_task ( & mut follower) ;
1767+ let data = AggregatedDataUpdate :: from_task ( & mut follower, ctx . session_id ( ) ) ;
17391768 let children = get_followers ( & follower) ;
17401769 drop ( follower) ;
17411770
@@ -1906,7 +1935,7 @@ impl AggregationUpdateQueue {
19061935 }
19071936
19081937 // It's a new upper
1909- let data = AggregatedDataUpdate :: from_task ( & mut inner) ;
1938+ let data = AggregatedDataUpdate :: from_task ( & mut inner, ctx . session_id ( ) ) ;
19101939 let children = get_followers ( & inner) ;
19111940 let follower_aggregation_number = get_aggregation_number ( & inner) ;
19121941 drop ( inner) ;
@@ -2118,7 +2147,7 @@ impl AggregationUpdateQueue {
21182147 self . push_optimize_task ( new_follower_id) ;
21192148 }
21202149 // It's a new upper
2121- let data = AggregatedDataUpdate :: from_task ( & mut inner) ;
2150+ let data = AggregatedDataUpdate :: from_task ( & mut inner, ctx . session_id ( ) ) ;
21222151 let followers = get_followers ( & inner) ;
21232152 drop ( inner) ;
21242153
0 commit comments