@@ -48,8 +48,8 @@ use crate::backend::operation::TaskDirtyCause;
4848use crate :: {
4949 backend:: {
5050 operation:: {
51- AggregatedDataUpdate , AggregationUpdateJob , AggregationUpdateQueue ,
52- CleanupOldEdgesOperation , ConnectChildOperation , ExecuteContext , ExecuteContextImpl ,
51+ AggregationUpdateJob , AggregationUpdateQueue , CleanupOldEdgesOperation ,
52+ ComputeDirtyAndCleanUpdate , ConnectChildOperation , ExecuteContext , ExecuteContextImpl ,
5353 Operation , OutdatedEdge , TaskGuard , connect_children, get_aggregation_number,
5454 get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
5555 } ,
@@ -570,11 +570,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
570570 let is_dirty = task. is_dirty ( self . session_id ) ;
571571
572572 // Check the dirty count of the root node
573- let dirty_tasks = get ! ( task, AggregatedDirtyContainerCount )
574- . cloned ( )
575- . unwrap_or_default ( )
576- . get ( self . session_id ) ;
577- if dirty_tasks > 0 || is_dirty {
573+ let has_dirty_containers = task. has_dirty_containers ( self . session_id ) ;
574+ if has_dirty_containers || is_dirty {
578575 let activeness = get_mut ! ( task, Activeness ) ;
579576 let mut task_ids_to_schedule: Vec < _ > = Vec :: new ( ) ;
580577 // When there are dirty task, subscribe to the all_clean_event
@@ -615,7 +612,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
615612 parent_and_count : Option < ( TaskId , i32 ) > ,
616613 visited : & mut FxHashSet < TaskId > ,
617614 ) -> String {
618- let task = ctx. task ( task_id, TaskDataCategory :: Data ) ;
615+ let task = ctx. task ( task_id, TaskDataCategory :: All ) ;
619616 let is_dirty = task. is_dirty ( ctx. session_id ( ) ) ;
620617 let in_progress =
621618 get ! ( task, InProgress ) . map_or ( "not in progress" , |p| match p {
@@ -637,21 +634,24 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
637634 } ;
638635
639636 // Check the dirty count of the root node
640- let dirty_tasks = get ! ( task, AggregatedDirtyContainerCount )
641- . cloned ( )
642- . unwrap_or_default ( )
643- . get ( ctx. session_id ( ) ) ;
637+ let has_dirty_containers = task. has_dirty_containers ( ctx. session_id ( ) ) ;
644638
645639 let task_description = ctx. get_task_description ( task_id) ;
646- let is_dirty = if is_dirty { ", dirty" } else { "" } ;
640+ let is_dirty_label = if is_dirty { ", dirty" } else { "" } ;
641+ let has_dirty_containers_label = if has_dirty_containers {
642+ ", dirty containers"
643+ } else {
644+ ""
645+ } ;
647646 let count = if let Some ( ( _, count) ) = parent_and_count {
648647 format ! ( " {count}" )
649648 } else {
650649 String :: new ( )
651650 } ;
652651 let mut info = format ! (
653652 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
654- {in_progress}, {activeness}{is_dirty})",
653+ {in_progress}, \
654+ {activeness}{is_dirty_label}{has_dirty_containers_label})",
655655 ) ;
656656 let children: Vec < _ > =
657657 task. dirty_containers_with_count ( ctx. session_id ( ) ) . collect ( ) ;
@@ -661,8 +661,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
661661 info. push_str ( "\n ERROR: missing upper connection" ) ;
662662 }
663663
664- if dirty_tasks > 0 || !children. is_empty ( ) {
665- writeln ! ( info, "\n {dirty_tasks} dirty tasks:" ) . unwrap ( ) ;
664+ if has_dirty_containers || !children. is_empty ( ) {
665+ writeln ! ( info, "\n dirty tasks:" ) . unwrap ( ) ;
666666
667667 for ( child_task_id, count) in children {
668668 let task_description = ctx. get_task_description ( child_task_id) ;
@@ -2363,68 +2363,92 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
23632363 } ,
23642364 ) ) ;
23652365
2366- // Update the dirty state
2367- let old_dirtyness = task. dirtyness_and_session ( ) ;
2366+ // Grab the old dirty state
2367+ let old_dirtyness = get ! ( task, Dirty ) . cloned ( ) ;
2368+ let ( old_self_dirty, old_current_session_self_clean, old_clean_in_session) =
2369+ match old_dirtyness {
2370+ None => ( false , false , None ) ,
2371+ Some ( Dirtyness :: Dirty ) => ( true , false , None ) ,
2372+ Some ( Dirtyness :: SessionDependent ) => {
2373+ let clean_in_session = get ! ( task, CleanInSession ) . copied ( ) ;
2374+ (
2375+ true ,
2376+ clean_in_session == Some ( self . session_id ) ,
2377+ clean_in_session,
2378+ )
2379+ }
2380+ } ;
23682381
2369- let new_dirtyness = if session_dependent {
2370- Some ( ( Dirtyness :: SessionDependent , Some ( self . session_id ) ) )
2371- } else {
2372- None
2373- } ;
2382+ // Compute the new dirty state
2383+ let ( new_dirtyness, new_clean_in_session, new_self_dirty, new_current_session_self_clean) =
2384+ if session_dependent {
2385+ (
2386+ Some ( Dirtyness :: SessionDependent ) ,
2387+ Some ( self . session_id ) ,
2388+ true ,
2389+ true ,
2390+ )
2391+ } else {
2392+ ( None , None , false , false )
2393+ } ;
23742394
2375- let dirty_changed = old_dirtyness != new_dirtyness ;
2376- let data_update = if dirty_changed {
2377- if let Some ( ( value, _ ) ) = new_dirtyness {
2395+ // Update the dirty state
2396+ if old_dirtyness != new_dirtyness {
2397+ if let Some ( value) = new_dirtyness {
23782398 task. insert ( CachedDataItem :: Dirty { value } ) ;
23792399 } else if old_dirtyness. is_some ( ) {
23802400 task. remove ( & CachedDataItemKey :: Dirty { } ) ;
23812401 }
2382- if let Some ( session_id) = new_dirtyness. and_then ( |t| t. 1 ) {
2402+ }
2403+ if old_clean_in_session != new_clean_in_session {
2404+ if let Some ( session_id) = new_clean_in_session {
23832405 task. insert ( CachedDataItem :: CleanInSession { value : session_id } ) ;
2384- } else if old_dirtyness . is_some_and ( |t| t . 1 . is_some ( ) ) {
2406+ } else if old_clean_in_session . is_some ( ) {
23852407 task. remove ( & CachedDataItemKey :: CleanInSession { } ) ;
23862408 }
2409+ }
23872410
2388- let mut dirty_containers = get ! ( task, AggregatedDirtyContainerCount )
2411+ // Propagate dirtyness changes
2412+ let data_update = if old_self_dirty != new_self_dirty
2413+ || old_current_session_self_clean != new_current_session_self_clean
2414+ {
2415+ let dirty_container_count = get ! ( task, AggregatedDirtyContainerCount )
23892416 . cloned ( )
23902417 . unwrap_or_default ( ) ;
2391- if let Some ( ( old_dirtyness, old_clean_in_session) ) = old_dirtyness {
2392- dirty_containers
2393- . update_with_dirtyness_and_session ( old_dirtyness, old_clean_in_session) ;
2394- }
2395- let aggregated_update = match ( old_dirtyness, new_dirtyness) {
2396- ( None , None ) => unreachable ! ( ) ,
2397- ( Some ( old) , None ) => {
2398- dirty_containers. undo_update_with_dirtyness_and_session ( old. 0 , old. 1 )
2399- }
2400- ( None , Some ( new) ) => {
2401- dirty_containers. update_with_dirtyness_and_session ( new. 0 , new. 1 )
2402- }
2403- ( Some ( old) , Some ( new) ) => {
2404- dirty_containers. replace_dirtyness_and_session ( old. 0 , old. 1 , new. 0 , new. 1 )
2418+ let current_session_clean_container_count = get ! (
2419+ task,
2420+ AggregatedSessionDependentCleanContainerCount {
2421+ session_id: self . session_id
24052422 }
2406- } ;
2407- if !aggregated_update. is_zero ( ) {
2408- if aggregated_update. get ( self . session_id ) < 0
2409- && let Some ( activeness_state) = get_mut ! ( task, Activeness )
2410- {
2423+ )
2424+ . copied ( )
2425+ . unwrap_or_default ( ) ;
2426+ let result = ComputeDirtyAndCleanUpdate {
2427+ old_dirty_container_count : dirty_container_count,
2428+ new_dirty_container_count : dirty_container_count,
2429+ old_current_session_clean_container_count : current_session_clean_container_count,
2430+ new_current_session_clean_container_count : current_session_clean_container_count,
2431+ old_self_dirty,
2432+ new_self_dirty,
2433+ old_current_session_self_clean,
2434+ new_current_session_self_clean,
2435+ }
2436+ . compute ( ) ;
2437+ if result. dirty_count_update - result. current_session_clean_update < 0 {
2438+ // The task is clean now
2439+ if let Some ( activeness_state) = get_mut ! ( task, Activeness ) {
24112440 activeness_state. all_clean_event . notify ( usize:: MAX ) ;
24122441 activeness_state. unset_active_until_clean ( ) ;
24132442 if activeness_state. is_empty ( ) {
24142443 task. remove ( & CachedDataItemKey :: Activeness { } ) ;
24152444 }
24162445 }
2417- AggregationUpdateJob :: data_update (
2418- & mut task,
2419- AggregatedDataUpdate :: new ( ) . dirty_container_update (
2420- task_id,
2421- aggregated_update. count ,
2422- aggregated_update. current_session_clean ( ctx. session_id ( ) ) ,
2423- ) ,
2424- )
2425- } else {
2426- None
24272446 }
2447+ result
2448+ . aggregated_update ( task_id)
2449+ . and_then ( |aggregated_update| {
2450+ AggregationUpdateJob :: data_update ( & mut task, aggregated_update)
2451+ } )
24282452 } else {
24292453 None
24302454 } ;
@@ -2891,10 +2915,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
28912915 let mut ctx = self . execute_context ( turbo_tasks) ;
28922916 let mut task = ctx. task ( task_id, TaskDataCategory :: All ) ;
28932917 let is_dirty = task. is_dirty ( self . session_id ) ;
2894- let has_dirty_containers = get ! ( task, AggregatedDirtyContainerCount )
2895- . map_or ( false , |dirty_containers| {
2896- dirty_containers. get ( self . session_id ) > 0
2897- } ) ;
2918+ let has_dirty_containers = task. has_dirty_containers ( self . session_id ) ;
28982919 if is_dirty || has_dirty_containers {
28992920 if let Some ( activeness_state) = get_mut ! ( task, Activeness ) {
29002921 // We will finish the task, but it would be removed after the task is done
@@ -2984,8 +3005,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
29843005 }
29853006
29863007 let is_dirty = get ! ( task, Dirty ) . is_some ( ) ;
2987- let has_dirty_container =
2988- get ! ( task, AggregatedDirtyContainerCount ) . is_some_and ( |count| count. count > 0 ) ;
3008+ let has_dirty_container = task. has_dirty_containers ( self . session_id ) ;
29893009 let should_be_in_upper = is_dirty || has_dirty_container;
29903010
29913011 let aggregation_number = get_aggregation_number ( & task) ;
@@ -3008,17 +3028,19 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
30083028
30093029 if should_be_in_upper {
30103030 for upper_id in uppers {
3011- let task = ctx. task ( task_id , TaskDataCategory :: All ) ;
3031+ let task = ctx. task ( upper_id , TaskDataCategory :: All ) ;
30123032 let in_upper = get ! ( task, AggregatedDirtyContainer { task: task_id } )
30133033 . is_some_and ( |& dirty| dirty > 0 ) ;
30143034 if !in_upper {
3035+ let containers: Vec < _ > = get_many ! ( task, AggregatedDirtyContainer { task: task_id } value => ( task_id, * value) ) ;
30153036 panic ! (
30163037 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3017- ({})",
3038+ ({})\n These dirty containers are present: \n {:#?} ",
30183039 task_id,
30193040 ctx. get_task_description( task_id) ,
30203041 upper_id,
3021- ctx. get_task_description( upper_id)
3042+ ctx. get_task_description( upper_id) ,
3043+ containers,
30223044 ) ;
30233045 }
30243046 }
0 commit comments