@@ -48,8 +48,8 @@ use crate::backend::operation::TaskDirtyCause;
4848use crate :: {
4949 backend:: {
5050 operation:: {
51- AggregatedDataUpdate , AggregationUpdateJob , AggregationUpdateQueue ,
52- CleanupOldEdgesOperation , ConnectChildOperation , ExecuteContext , ExecuteContextImpl ,
51+ AggregationUpdateJob , AggregationUpdateQueue , CleanupOldEdgesOperation ,
52+ ComputeDirtyAndCleanUpdate , ConnectChildOperation , ExecuteContext , ExecuteContextImpl ,
5353 Operation , OutdatedEdge , TaskGuard , connect_children, get_aggregation_number,
5454 get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
5555 } ,
@@ -2345,72 +2345,96 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
23452345 } ,
23462346 ) ) ;
23472347
2348- // Update the dirty state
2349- let old_dirtyness = task. dirtyness_and_session ( ) ;
2350-
2351- let new_dirtyness = if session_dependent {
2352- Some ( ( Dirtyness :: SessionDependent , Some ( self . session_id ) ) )
2353- } else {
2354- None
2348+ // Grab the old dirty state
2349+ let old_dirtyness = get ! ( task, Dirty ) . cloned ( ) ;
2350+ let ( was_dirty, was_current_session_clean, old_clean_in_session) = match old_dirtyness {
2351+ None => ( false , false , None ) ,
2352+ Some ( Dirtyness :: Dirty ) => ( true , false , None ) ,
2353+ Some ( Dirtyness :: SessionDependent ) => {
2354+ let clean_in_session = get ! ( task, CleanInSession ) . copied ( ) ;
2355+ (
2356+ true ,
2357+ clean_in_session == Some ( self . session_id ) ,
2358+ clean_in_session,
2359+ )
2360+ }
23552361 } ;
2362+ let old_dirty_value = if was_dirty { 1 } else { 0 } ;
2363+ let old_current_session_clean_value = if was_current_session_clean { 1 } else { 0 } ;
23562364
2357- let dirty_changed = old_dirtyness != new_dirtyness;
2358- let data_update = if dirty_changed {
2359- if let Some ( ( value, _) ) = new_dirtyness {
2365+ // Compute the new dirty state
2366+ let ( new_dirtyness, new_clean_in_session, new_dirty_value, new_current_session_clean_value) =
2367+ if session_dependent {
2368+ (
2369+ Some ( Dirtyness :: SessionDependent ) ,
2370+ Some ( self . session_id ) ,
2371+ 1 ,
2372+ 1 ,
2373+ )
2374+ } else {
2375+ ( None , None , 0 , 0 )
2376+ } ;
2377+
2378+ // Update the dirty state
2379+ if old_dirtyness != new_dirtyness {
2380+ if let Some ( value) = new_dirtyness {
23602381 task. insert ( CachedDataItem :: Dirty { value } ) ;
23612382 } else if old_dirtyness. is_some ( ) {
23622383 task. remove ( & CachedDataItemKey :: Dirty { } ) ;
23632384 }
2364- if let Some ( session_id) = new_dirtyness. and_then ( |t| t. 1 ) {
2385+ }
2386+ if old_clean_in_session != new_clean_in_session {
2387+ if let Some ( session_id) = new_clean_in_session {
23652388 task. insert ( CachedDataItem :: CleanInSession { value : session_id } ) ;
2366- } else if old_dirtyness . is_some_and ( |t| t . 1 . is_some ( ) ) {
2389+ } else if old_clean_in_session . is_some ( ) {
23672390 task. remove ( & CachedDataItemKey :: CleanInSession { } ) ;
23682391 }
2392+ }
23692393
2370- let mut dirty_containers = get ! ( task, AggregatedDirtyContainerCount )
2394+ // Propagate dirtyness changes
2395+ let data_update = if old_dirty_value != new_dirty_value
2396+ || old_current_session_clean_value != new_current_session_clean_value
2397+ {
2398+ let dirty_container_count = get ! ( task, AggregatedDirtyContainerCount )
23712399 . cloned ( )
23722400 . unwrap_or_default ( ) ;
2373- if let Some ( ( old_dirtyness, old_clean_in_session) ) = old_dirtyness {
2374- dirty_containers
2375- . update_with_dirtyness_and_session ( old_dirtyness, old_clean_in_session) ;
2376- }
2377- let aggregated_update = match ( old_dirtyness, new_dirtyness) {
2378- ( None , None ) => unreachable ! ( ) ,
2379- ( Some ( old) , None ) => {
2380- dirty_containers. undo_update_with_dirtyness_and_session ( old. 0 , old. 1 )
2381- }
2382- ( None , Some ( new) ) => {
2383- dirty_containers. update_with_dirtyness_and_session ( new. 0 , new. 1 )
2384- }
2385- ( Some ( old) , Some ( new) ) => {
2386- dirty_containers. replace_dirtyness_and_session ( old. 0 , old. 1 , new. 0 , new. 1 )
2387- }
2388- } ;
2389- if !aggregated_update. is_zero ( ) {
2390- if aggregated_update. get ( self . session_id ) < 0
2391- && let Some ( activeness_state) = get_mut ! ( task, Activeness )
2392- {
2393- activeness_state. all_clean_event . notify ( usize:: MAX ) ;
2394- activeness_state. unset_active_until_clean ( ) ;
2395- if activeness_state. is_empty ( ) {
2396- task. remove ( & CachedDataItemKey :: Activeness { } ) ;
2397- }
2401+ let current_session_clean_container_count = get ! (
2402+ task,
2403+ AggregatedSessionDependentCleanContainerCount {
2404+ session_id: self . session_id
23982405 }
2399- AggregationUpdateJob :: data_update (
2400- & mut task,
2401- AggregatedDataUpdate :: new ( ) . dirty_container_update (
2402- task_id,
2403- aggregated_update. count ,
2404- aggregated_update. current_session_clean ( ctx. session_id ( ) ) ,
2405- ) ,
2406- )
2407- } else {
2408- None
2406+ )
2407+ . copied ( )
2408+ . unwrap_or_default ( ) ;
2409+ let result = ComputeDirtyAndCleanUpdate {
2410+ old_dirty_container_count : dirty_container_count,
2411+ new_dirty_container_count : dirty_container_count,
2412+ old_current_session_clean_container_count : current_session_clean_container_count,
2413+ new_current_session_clean_container_count : current_session_clean_container_count,
2414+ old_dirty_value,
2415+ new_dirty_value,
2416+ old_current_session_clean_value,
2417+ new_current_session_clean_value,
24092418 }
2419+ . compute ( ) ;
2420+ result
2421+ . aggregated_update ( task_id)
2422+ . and_then ( |aggregated_update| {
2423+ AggregationUpdateJob :: data_update ( & mut task, aggregated_update)
2424+ } )
24102425 } else {
24112426 None
24122427 } ;
24132428
2429+ if let Some ( activeness_state) = get_mut ! ( task, Activeness ) {
2430+ // The task is clean now
2431+ activeness_state. all_clean_event . notify ( usize:: MAX ) ;
2432+ activeness_state. unset_active_until_clean ( ) ;
2433+ if activeness_state. is_empty ( ) {
2434+ task. remove ( & CachedDataItemKey :: Activeness { } ) ;
2435+ }
2436+ }
2437+
24142438 #[ cfg( feature = "verify_determinism" ) ]
24152439 let reschedule = ( dirty_changed || no_output_set) && !task_id. is_transient ( ) ;
24162440 #[ cfg( not( feature = "verify_determinism" ) ) ]
0 commit comments