diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index e21e3f59fa66f..90be149ec7a7d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -48,8 +48,8 @@ use crate::backend::operation::TaskDirtyCause; use crate::{ backend::{ operation::{ - AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue, - CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl, + AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation, + ComputeDirtyAndCleanUpdate, ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number, get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children, }, @@ -570,11 +570,8 @@ impl TurboTasksBackendInner { let is_dirty = task.is_dirty(self.session_id); // Check the dirty count of the root node - let dirty_tasks = get!(task, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default() - .get(self.session_id); - if dirty_tasks > 0 || is_dirty { + let has_dirty_containers = task.has_dirty_containers(self.session_id); + if has_dirty_containers || is_dirty { let activeness = get_mut!(task, Activeness); let mut task_ids_to_schedule: Vec<_> = Vec::new(); // When there are dirty task, subscribe to the all_clean_event @@ -615,7 +612,7 @@ impl TurboTasksBackendInner { parent_and_count: Option<(TaskId, i32)>, visited: &mut FxHashSet, ) -> String { - let task = ctx.task(task_id, TaskDataCategory::Data); + let task = ctx.task(task_id, TaskDataCategory::All); let is_dirty = task.is_dirty(ctx.session_id()); let in_progress = get!(task, InProgress).map_or("not in progress", |p| match p { @@ -637,13 +634,15 @@ impl TurboTasksBackendInner { }; // Check the dirty count of the root node - let dirty_tasks = get!(task, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default() - .get(ctx.session_id()); + let has_dirty_containers = task.has_dirty_containers(ctx.session_id()); let task_description = ctx.get_task_description(task_id); - let is_dirty = if is_dirty { ", dirty" } else { "" }; + let is_dirty_label = if is_dirty { ", dirty" } else { "" }; + let has_dirty_containers_label = if has_dirty_containers { + ", dirty containers" + } else { + "" + }; let count = if let Some((_, count)) = parent_and_count { format!(" {count}") } else { @@ -651,7 +650,8 @@ impl TurboTasksBackendInner { }; let mut info = format!( "{task_id} {task_description}{count} (aggr={aggregation_number}, \ - {in_progress}, {activeness}{is_dirty})", + {in_progress}, \ + {activeness}{is_dirty_label}{has_dirty_containers_label})", ); let children: Vec<_> = task.dirty_containers_with_count(ctx.session_id()).collect(); @@ -661,8 +661,8 @@ impl TurboTasksBackendInner { info.push_str("\n ERROR: missing upper connection"); } - if dirty_tasks > 0 || !children.is_empty() { - writeln!(info, "\n {dirty_tasks} dirty tasks:").unwrap(); + if has_dirty_containers || !children.is_empty() { + writeln!(info, "\n dirty tasks:").unwrap(); for (child_task_id, count) in children { let task_description = ctx.get_task_description(child_task_id); @@ -2363,68 +2363,92 @@ impl TurboTasksBackendInner { }, )); - // Update the dirty state - let old_dirtyness = task.dirtyness_and_session(); + // Grab the old dirty state + let old_dirtyness = get!(task, Dirty).cloned(); + let (old_self_dirty, old_current_session_self_clean, old_clean_in_session) = + match old_dirtyness { + None => (false, false, None), + Some(Dirtyness::Dirty) => (true, false, None), + Some(Dirtyness::SessionDependent) => { + let clean_in_session = get!(task, CleanInSession).copied(); + ( + true, + clean_in_session == Some(self.session_id), + clean_in_session, + ) + } + }; - let new_dirtyness = if session_dependent { - Some((Dirtyness::SessionDependent, Some(self.session_id))) - } else { - None - }; + // Compute the new dirty state + let (new_dirtyness, new_clean_in_session, new_self_dirty, new_current_session_self_clean) = + if session_dependent { + ( + Some(Dirtyness::SessionDependent), + Some(self.session_id), + true, + true, + ) + } else { + (None, None, false, false) + }; - let dirty_changed = old_dirtyness != new_dirtyness; - let data_update = if dirty_changed { - if let Some((value, _)) = new_dirtyness { + // Update the dirty state + if old_dirtyness != new_dirtyness { + if let Some(value) = new_dirtyness { task.insert(CachedDataItem::Dirty { value }); } else if old_dirtyness.is_some() { task.remove(&CachedDataItemKey::Dirty {}); } - if let Some(session_id) = new_dirtyness.and_then(|t| t.1) { + } + if old_clean_in_session != new_clean_in_session { + if let Some(session_id) = new_clean_in_session { task.insert(CachedDataItem::CleanInSession { value: session_id }); - } else if old_dirtyness.is_some_and(|t| t.1.is_some()) { + } else if old_clean_in_session.is_some() { task.remove(&CachedDataItemKey::CleanInSession {}); } + } - let mut dirty_containers = get!(task, AggregatedDirtyContainerCount) + // Propagate dirtyness changes + let data_update = if old_self_dirty != new_self_dirty + || old_current_session_self_clean != new_current_session_self_clean + { + let dirty_container_count = get!(task, AggregatedDirtyContainerCount) .cloned() .unwrap_or_default(); - if let Some((old_dirtyness, old_clean_in_session)) = old_dirtyness { - dirty_containers - .update_with_dirtyness_and_session(old_dirtyness, old_clean_in_session); - } - let aggregated_update = match (old_dirtyness, new_dirtyness) { - (None, None) => unreachable!(), - (Some(old), None) => { - dirty_containers.undo_update_with_dirtyness_and_session(old.0, old.1) - } - (None, Some(new)) => { - dirty_containers.update_with_dirtyness_and_session(new.0, new.1) - } - (Some(old), Some(new)) => { - dirty_containers.replace_dirtyness_and_session(old.0, old.1, new.0, new.1) + let current_session_clean_container_count = get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: self.session_id } - }; - if !aggregated_update.is_zero() { - if aggregated_update.get(self.session_id) < 0 - && let Some(activeness_state) = get_mut!(task, Activeness) - { + ) + .copied() + .unwrap_or_default(); + let result = ComputeDirtyAndCleanUpdate { + old_dirty_container_count: dirty_container_count, + new_dirty_container_count: dirty_container_count, + old_current_session_clean_container_count: current_session_clean_container_count, + new_current_session_clean_container_count: current_session_clean_container_count, + old_self_dirty, + new_self_dirty, + old_current_session_self_clean, + new_current_session_self_clean, + } + .compute(); + if result.dirty_count_update - result.current_session_clean_update < 0 { + // The task is clean now + if let Some(activeness_state) = get_mut!(task, Activeness) { activeness_state.all_clean_event.notify(usize::MAX); activeness_state.unset_active_until_clean(); if activeness_state.is_empty() { task.remove(&CachedDataItemKey::Activeness {}); } } - AggregationUpdateJob::data_update( - &mut task, - AggregatedDataUpdate::new().dirty_container_update( - task_id, - aggregated_update.count, - aggregated_update.current_session_clean(ctx.session_id()), - ), - ) - } else { - None } + result + .aggregated_update(task_id) + .and_then(|aggregated_update| { + AggregationUpdateJob::data_update(&mut task, aggregated_update) + }) } else { None }; @@ -2891,10 +2915,7 @@ impl TurboTasksBackendInner { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); let is_dirty = task.is_dirty(self.session_id); - let has_dirty_containers = get!(task, AggregatedDirtyContainerCount) - .map_or(false, |dirty_containers| { - dirty_containers.get(self.session_id) > 0 - }); + let has_dirty_containers = task.has_dirty_containers(self.session_id); if is_dirty || has_dirty_containers { if let Some(activeness_state) = get_mut!(task, Activeness) { // We will finish the task, but it would be removed after the task is done @@ -2984,8 +3005,7 @@ impl TurboTasksBackendInner { } let is_dirty = get!(task, Dirty).is_some(); - let has_dirty_container = - get!(task, AggregatedDirtyContainerCount).is_some_and(|count| count.count > 0); + let has_dirty_container = task.has_dirty_containers(self.session_id); let should_be_in_upper = is_dirty || has_dirty_container; let aggregation_number = get_aggregation_number(&task); @@ -3008,17 +3028,19 @@ impl TurboTasksBackendInner { if should_be_in_upper { for upper_id in uppers { - let task = ctx.task(task_id, TaskDataCategory::All); + let task = ctx.task(upper_id, TaskDataCategory::All); let in_upper = get!(task, AggregatedDirtyContainer { task: task_id }) .is_some_and(|&dirty| dirty > 0); if !in_upper { + let containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task: task_id } value => (task_id, *value)); panic!( "Task {} ({}) is dirty, but is not listed in the upper task {} \ - ({})", + ({})\nThese dirty containers are present:\n{:#?}", task_id, ctx.get_task_description(task_id), upper_id, - ctx.get_task_description(upper_id) + ctx.get_task_description(upper_id), + containers, ); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 82679af48347e..1103f2aa40367 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -34,7 +34,7 @@ use crate::{ }, data::{ ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType, - CollectibleRef, DirtyContainerCount, + CollectibleRef, }, utils::swap_retain, }; @@ -93,6 +93,79 @@ pub fn get_aggregation_number(task: &impl TaskGuard) -> u32 { .unwrap_or_default() } +#[derive(Debug)] +pub struct ComputeDirtyAndCleanUpdate { + pub old_dirty_container_count: i32, + pub new_dirty_container_count: i32, + pub old_current_session_clean_container_count: i32, + pub new_current_session_clean_container_count: i32, + pub old_self_dirty: bool, + pub new_self_dirty: bool, + pub old_current_session_self_clean: bool, + pub new_current_session_self_clean: bool, +} + +pub struct ComputeDirtyAndCleanUpdateResult { + pub dirty_count_update: i32, + pub current_session_clean_update: i32, +} + +impl ComputeDirtyAndCleanUpdate { + pub fn compute(self) -> ComputeDirtyAndCleanUpdateResult { + let ComputeDirtyAndCleanUpdate { + old_dirty_container_count, + new_dirty_container_count, + old_current_session_clean_container_count, + new_current_session_clean_container_count, + old_self_dirty, + new_self_dirty, + old_current_session_self_clean, + new_current_session_self_clean, + } = self; + let was_dirty_without_clean = old_self_dirty || old_dirty_container_count > 0; + let is_dirty_without_clean = new_self_dirty || new_dirty_container_count > 0; + let was_dirty = old_self_dirty && !old_current_session_self_clean + || old_dirty_container_count > 0 + && old_dirty_container_count > old_current_session_clean_container_count; + let is_dirty = new_self_dirty && !new_current_session_self_clean + || new_dirty_container_count > 0 + && new_dirty_container_count > new_current_session_clean_container_count; + let was_flagged_clean = was_dirty_without_clean && !was_dirty; + let is_flagged_clean = is_dirty_without_clean && !is_dirty; + + fn before_after_to_diff_value(before: bool, after: bool) -> i32 { + match (before, after) { + (true, false) => -1, + (false, true) => 1, + _ => 0, + } + } + let dirty_count_update = + before_after_to_diff_value(was_dirty_without_clean, is_dirty_without_clean); + let current_session_clean_update = + before_after_to_diff_value(was_flagged_clean, is_flagged_clean); + + ComputeDirtyAndCleanUpdateResult { + dirty_count_update, + current_session_clean_update, + } + } +} + +impl ComputeDirtyAndCleanUpdateResult { + pub fn aggregated_update(&self, task_id: TaskId) -> Option { + if self.dirty_count_update != 0 || self.current_session_clean_update != 0 { + Some(AggregatedDataUpdate::new().dirty_container_update( + task_id, + self.dirty_count_update, + self.current_session_clean_update, + )) + } else { + None + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct InnerOfUppersHasNewFollowersJob { pub upper_ids: TaskIdVec, @@ -254,13 +327,22 @@ impl AggregatedDataUpdate { /// upper task. fn from_task(task: &mut impl TaskGuard, current_session_id: SessionId) -> Self { let aggregation = get_aggregation_number(task); - let mut dirty_container_count = Default::default(); + let mut dirty_count = 0; + let mut current_session_clean_count = 0; let mut collectibles_update: Vec<_> = get_many!(task, Collectible { collectible } count => (collectible, *count)); if is_aggregating_node(aggregation) { - dirty_container_count = get!(task, AggregatedDirtyContainerCount) - .cloned() + dirty_count = get!(task, AggregatedDirtyContainerCount) + .copied() .unwrap_or_default(); + current_session_clean_count = get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: current_session_id + } + ) + .copied() + .unwrap_or_default(); let collectibles = iter_many!( task, AggregatedCollectible { @@ -273,9 +355,6 @@ impl AggregatedDataUpdate { collectibles_update.push((collectible, 1)); } } - let mut dirty_count = dirty_container_count.count; - let mut current_session_clean_count = - dirty_container_count.current_session_clean(current_session_id); let (dirty, current_session_clean) = task.dirty(current_session_id); if dirty { dirty_count += 1; @@ -410,56 +489,88 @@ impl AggregatedDataUpdate { // compute aggregated update let was_single_container_clean = old_dirty_single_container_count > 0 && old_dirty_single_container_count - - old_single_container_current_session_clean_count - <= 0; + <= old_single_container_current_session_clean_count; let is_single_container_clean = new_dirty_single_container_count > 0 && new_dirty_single_container_count - - new_single_container_current_session_clean_count - <= 0; + <= new_single_container_current_session_clean_count; let current_session_clean_update = before_after_to_diff_value(was_single_container_clean, is_single_container_clean); - let aggregated_update = DirtyContainerCount::from_current_session_clean( - dirty_container_count_update, - current_session_id, - current_session_clean_update, - ); + if dirty_container_count_update != 0 || current_session_clean_update != 0 { + let (is_self_dirty, current_session_self_clean) = task.dirty(current_session_id); - if !aggregated_update.is_zero() { - let dirtyness_and_session = task.dirtyness_and_session(); let task_id = task.id(); - update!(task, AggregatedDirtyContainerCount, |old: Option< - DirtyContainerCount, - >| { - let mut new = old.unwrap_or_default(); - if let Some((dirtyness, clean_in_session)) = dirtyness_and_session { - new.update_with_dirtyness_and_session(dirtyness, clean_in_session); - } - let aggregated_update = new.update_count(&aggregated_update); - if let Some((dirtyness, clean_in_session)) = dirtyness_and_session { - new.undo_update_with_dirtyness_and_session(dirtyness, clean_in_session); - } - if !aggregated_update.is_zero() { - result.dirty_container_update = Some(( - task_id, - aggregated_update.count, - SessionDependent::new( - aggregated_update.current_session_clean(current_session_id), - ), - )); - } - (!new.is_zero()).then_some(new) - }); - if let Some((_, count, current_session_clean)) = result.dirty_container_update - && count - *current_session_clean < 0 - { - // When the current task is no longer dirty, we need to fire the - // aggregate root events and do some cleanup - if let Some(activeness_state) = get_mut!(task, Activeness) { - activeness_state.all_clean_event.notify(usize::MAX); - activeness_state.unset_active_until_clean(); - if activeness_state.is_empty() { - task.remove(&CachedDataItemKey::Activeness {}); + + // Update AggregatedDirtyContainerCount and compute aggregate value + let old_dirty_container_count; + let new_dirty_container_count; + if dirty_container_count_update != 0 { + new_dirty_container_count = update_count_and_get!( + task, + AggregatedDirtyContainerCount, + dirty_container_count_update + ); + old_dirty_container_count = + new_dirty_container_count - dirty_container_count_update; + } else { + new_dirty_container_count = get!(task, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default(); + old_dirty_container_count = new_dirty_container_count; + }; + + // Update AggregatedSessionDependentCleanContainerCount and compute aggregate value + let new_current_session_clean_container_count; + let old_current_session_clean_container_count; + if current_session_clean_update != 0 { + new_current_session_clean_container_count = update_count_and_get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: current_session_id + }, + current_session_clean_update + ); + old_current_session_clean_container_count = + new_current_session_clean_container_count - current_session_clean_update; + } else { + new_current_session_clean_container_count = get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: current_session_id + } + ) + .copied() + .unwrap_or_default(); + old_current_session_clean_container_count = + new_current_session_clean_container_count; + }; + + let compute_result = ComputeDirtyAndCleanUpdate { + old_dirty_container_count, + new_dirty_container_count, + old_current_session_clean_container_count, + new_current_session_clean_container_count, + old_self_dirty: is_self_dirty, + new_self_dirty: is_self_dirty, + old_current_session_self_clean: current_session_self_clean, + new_current_session_self_clean: current_session_self_clean, + } + .compute(); + + if let Some(aggregated_update) = compute_result.aggregated_update(task_id) { + result = aggregated_update; + + if let Some((_, count, current_session_clean)) = result.dirty_container_update + && count - *current_session_clean < 0 + { + // When the current task is no longer dirty, we need to fire the + // aggregate root events and do some cleanup + if let Some(activeness_state) = get_mut!(task, Activeness) { + activeness_state.all_clean_event.notify(usize::MAX); + activeness_state.unset_active_until_clean(); + if activeness_state.is_empty() { + task.remove(&CachedDataItemKey::Activeness {}); + } } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index da1f5f03093cd..6d2005eda5aa3 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -8,7 +8,7 @@ use crate::{ operation::{ ExecuteContext, Operation, TaskGuard, aggregation_update::{ - AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue, + AggregationUpdateJob, AggregationUpdateQueue, ComputeDirtyAndCleanUpdate, }, }, storage::{get, get_mut, remove}, @@ -234,7 +234,7 @@ pub fn make_task_dirty_internal( let old = task.insert(CachedDataItem::Dirty { value: Dirtyness::Dirty, }); - let mut dirty_container = match old { + let (old_self_dirty, old_current_session_self_clean) = match old { Some(CachedDataItemValue::Dirty { value: Dirtyness::Dirty, }) => { @@ -252,38 +252,48 @@ pub fn make_task_dirty_internal( Some(CachedDataItemValue::Dirty { value: Dirtyness::SessionDependent, }) => { + // It was a session-dependent dirty before, so we need to remove that clean count let old = remove!(task, CleanInSession); - match old { - None => { - #[cfg(feature = "trace_task_dirty")] - let _span = tracing::trace_span!( - "session-dependent task already dirty", - name = ctx.get_task_description(task_id), - cause = %TaskDirtyCauseInContext::new(&cause, ctx) - ) - .entered(); - // already dirty - return; - } - Some(session_id) => { - // Got dirty in that one session only - let mut dirty_container = get!(task, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default(); - dirty_container.update_session_dependent(session_id, 1); - dirty_container - } + if let Some(session_id) = old + && session_id == ctx.session_id() + { + // There was a clean count for a session. If it was the current session, we need to + // propagate that change. + (true, true) + } else { + #[cfg(feature = "trace_task_dirty")] + let _span = tracing::trace_span!( + "session-dependent task already dirty", + name = ctx.get_task_description(task_id), + cause = %TaskDirtyCauseInContext::new(&cause, ctx) + ) + .entered(); + // already dirty + return; } } None => { - // Get dirty for all sessions - get!(task, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default() + // It was clean before, so we need to increase the dirty count + (false, false) } _ => unreachable!(), }; + let new_self_dirty = true; + let new_current_session_self_clean = false; + + let dirty_container_count = get!(task, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default(); + let current_session_clean_container_count = get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: ctx.session_id(), + } + ) + .copied() + .unwrap_or_default(); + #[cfg(feature = "trace_task_dirty")] let _span = tracing::trace_span!( "make task dirty", @@ -293,21 +303,27 @@ pub fn make_task_dirty_internal( ) .entered(); - let should_schedule = { - let aggregated_update = - dirty_container.update_with_dirtyness_and_session(Dirtyness::Dirty, None); - if !aggregated_update.is_zero() { - queue.extend(AggregationUpdateJob::data_update( - &mut task, - AggregatedDataUpdate::new().dirty_container_update( - task_id, - aggregated_update.count, - aggregated_update.current_session_clean(ctx.session_id()), - ), - )); - } - !ctx.should_track_activeness() || task.has_key(&CachedDataItemKey::Activeness {}) - }; + let result = ComputeDirtyAndCleanUpdate { + old_dirty_container_count: dirty_container_count, + new_dirty_container_count: dirty_container_count, + old_current_session_clean_container_count: current_session_clean_container_count, + new_current_session_clean_container_count: current_session_clean_container_count, + old_self_dirty, + new_self_dirty, + old_current_session_self_clean, + new_current_session_self_clean, + } + .compute(); + + if let Some(aggregated_update) = result.aggregated_update(task_id) { + queue.extend(AggregationUpdateJob::data_update( + &mut task, + aggregated_update, + )); + } + + let should_schedule = + !ctx.should_track_activeness() || task.has_key(&CachedDataItemKey::Activeness {}); if should_schedule { let description = || ctx.get_task_desc_fn(task_id); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 63127a58b7939..ffc103e1d703d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -25,7 +25,7 @@ use crate::{ backing_storage::BackingStorage, data::{ CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue, - CachedDataItemValueRef, CachedDataItemValueRefMut, DirtyContainerCount, Dirtyness, + CachedDataItemValueRef, CachedDataItemValueRefMut, Dirtyness, }, }; @@ -469,10 +469,20 @@ pub trait TaskGuard: Debug { ) } - fn dirty_container_count(&self) -> DirtyContainerCount { - get!(self, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default() + fn has_dirty_containers(&self, session_id: SessionId) -> bool { + let dirty_count = get!(self, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default(); + if dirty_count <= 0 { + return false; + } + let clean_count = get!( + self, + AggregatedSessionDependentCleanContainerCount { session_id } + ) + .copied() + .unwrap_or_default(); + dirty_count > clean_count } } @@ -799,8 +809,8 @@ impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue); pub use self::invalidate::TaskDirtyCause; pub use self::{ aggregation_update::{ - AggregatedDataUpdate, AggregationUpdateJob, get_aggregation_number, get_uppers, - is_aggregating_node, is_root_node, + AggregatedDataUpdate, AggregationUpdateJob, ComputeDirtyAndCleanUpdate, + get_aggregation_number, get_uppers, is_aggregating_node, is_root_node, }, cleanup_old_edges::OutdatedEdge, connect_children::connect_children, diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 02b4dbb18d677..54763bf9c70c5 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use turbo_tasks::{ @@ -149,363 +147,6 @@ pub enum Dirtyness { SessionDependent, } -fn add_with_diff(v: &mut i32, u: i32) -> i32 { - let old = *v; - *v += u; - if old <= 0 && *v > 0 { - 1 - } else if old > 0 && *v <= 0 { - -1 - } else { - 0 - } -} - -/// Represents a count of dirty containers. Since dirtiness can be session dependent, there might be -/// a different count for a specific session. It only need to store the highest session count, since -/// old sessions can't be visited again, so we can ignore their counts. -#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct DirtyContainerCount { - pub count: i32, - pub count_in_session: Option<(SessionId, i32)>, -} - -impl DirtyContainerCount { - pub fn from_current_session_clean( - count: i32, - current_session_id: SessionId, - current_session_clean: i32, - ) -> DirtyContainerCount { - DirtyContainerCount { - count, - count_in_session: Some((current_session_id, count - current_session_clean)), - } - } - - pub fn current_session_clean(&self, current_session_id: SessionId) -> i32 { - if let Some((s, c)) = self.count_in_session - && s == current_session_id - { - return self.count - c; - } - 0 - } - - /// Get the count for a specific session. It's only expected to be asked for the current - /// session, since old session counts might be dropped. - pub fn get(&self, session: SessionId) -> i32 { - if let Some((s, count)) = self.count_in_session - && s == session - { - return count; - } - self.count - } - - /// Increase/decrease the count by the given value. - pub fn update(&mut self, count: i32) -> DirtyContainerCount { - self.update_count(&DirtyContainerCount { - count, - count_in_session: None, - }) - } - - /// Increase/decrease the count by the given value, but does not update the count for a specific - /// session. This matches the "dirty, but clean in one session" behavior. - pub fn update_session_dependent( - &mut self, - ignore_session: SessionId, - count: i32, - ) -> DirtyContainerCount { - self.update_count(&DirtyContainerCount { - count, - count_in_session: Some((ignore_session, 0)), - }) - } - - /// Adds the `count` to the current count. This correctly handles session dependent counts. - /// Returns a new count object that represents the aggregated count. The aggregated count will - /// be +1 when the self count changes from <= 0 to > 0 and -1 when the self count changes from > - /// 0 to <= 0. The same for the session dependent count. - pub fn update_count(&mut self, count: &DirtyContainerCount) -> DirtyContainerCount { - let mut diff = DirtyContainerCount::default(); - match ( - self.count_in_session.as_mut(), - count.count_in_session.as_ref(), - ) { - (None, None) => {} - (Some((s, c)), None) => { - let d = add_with_diff(c, count.count); - diff.count_in_session = Some((*s, d)); - } - (None, Some((s, c))) => { - let mut new = self.count; - let d = add_with_diff(&mut new, *c); - self.count_in_session = Some((*s, new)); - diff.count_in_session = Some((*s, d)); - } - (Some((s1, c1)), Some((s2, c2))) => match (*s1).cmp(s2) { - Ordering::Less => { - let mut new = self.count; - let d = add_with_diff(&mut new, *c2); - self.count_in_session = Some((*s2, new)); - diff.count_in_session = Some((*s2, d)); - } - Ordering::Equal => { - let d = add_with_diff(c1, *c2); - diff.count_in_session = Some((*s1, d)); - } - Ordering::Greater => { - let d = add_with_diff(c1, count.count); - diff.count_in_session = Some((*s1, d)); - } - }, - } - let d = add_with_diff(&mut self.count, count.count); - diff.count = d; - diff - } - - /// Applies a dirtyness to the count. Returns an aggregated count that represents the change. - pub fn update_with_dirtyness_and_session( - &mut self, - dirtyness: Dirtyness, - clean_in_session: Option, - ) -> DirtyContainerCount { - if let (Dirtyness::SessionDependent, Some(session_id)) = (dirtyness, clean_in_session) { - self.update_session_dependent(session_id, 1) - } else { - self.update(1) - } - } - - /// Undoes the effect of a dirtyness on the count. Returns an aggregated count that represents - /// the change. - pub fn undo_update_with_dirtyness_and_session( - &mut self, - dirtyness: Dirtyness, - clean_in_session: Option, - ) -> DirtyContainerCount { - if let (Dirtyness::SessionDependent, Some(session_id)) = (dirtyness, clean_in_session) { - self.update_session_dependent(session_id, -1) - } else { - self.update(-1) - } - } - - /// Replaces the old dirtyness with the new one. Returns an aggregated count that represents - /// the change. - pub fn replace_dirtyness_and_session( - &mut self, - old_dirtyness: Dirtyness, - old_clean_in_session: Option, - new_dirtyness: Dirtyness, - new_clean_in_session: Option, - ) -> DirtyContainerCount { - let mut diff = - self.undo_update_with_dirtyness_and_session(old_dirtyness, old_clean_in_session); - diff.update_count( - &self.update_with_dirtyness_and_session(new_dirtyness, new_clean_in_session), - ); - diff - } - - /// Returns true if the count is zero and applying it would have no effect - pub fn is_zero(&self) -> bool { - self.count == 0 && self.count_in_session.map(|(_, c)| c == 0).unwrap_or(true) - } - - /// Negates the counts. - pub fn negate(&self) -> Self { - Self { - count: -self.count, - count_in_session: self.count_in_session.map(|(s, c)| (s, -c)), - } - } -} - -#[cfg(test)] -mod dirty_container_count_tests { - use turbo_tasks::SessionId; - - use super::*; - - const SESSION_1: SessionId = unsafe { SessionId::new_unchecked(1) }; - const SESSION_2: SessionId = unsafe { SessionId::new_unchecked(2) }; - const SESSION_3: SessionId = unsafe { SessionId::new_unchecked(3) }; - - #[test] - fn test_update() { - let mut count = DirtyContainerCount::default(); - assert!(count.is_zero()); - - let diff = count.update(1); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update(-1); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - - let diff = count.update(2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 2); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 2); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update(-1); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 0); - - let diff = count.update(-1); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - - let diff = count.update(-1); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), -1); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), -1); - assert_eq!(diff.get(SESSION_2), 0); - - let diff = count.update(2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update(-2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), -1); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), -1); - assert_eq!(diff.get(SESSION_2), -1); - - let diff = count.update(1); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), 0); - } - - #[test] - fn test_session_dependent() { - let mut count = DirtyContainerCount::default(); - assert!(count.is_zero()); - - let diff = count.update_session_dependent(SESSION_1, 1); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update_session_dependent(SESSION_1, -1); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - - let diff = count.update_session_dependent(SESSION_1, 2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 2); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update_session_dependent(SESSION_2, -2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 2); - assert_eq!(diff.get(SESSION_2), 0); - assert_eq!(count.get(SESSION_3), 0); - assert_eq!(diff.get(SESSION_3), -1); - } - - #[test] - fn test_update_with_dirtyness_and_session() { - let mut count = DirtyContainerCount::default(); - let diff = count.update_with_dirtyness_and_session(Dirtyness::Dirty, None); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.undo_update_with_dirtyness_and_session(Dirtyness::Dirty, None); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - - let mut count = DirtyContainerCount::default(); - let diff = - count.update_with_dirtyness_and_session(Dirtyness::SessionDependent, Some(SESSION_1)); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count - .undo_update_with_dirtyness_and_session(Dirtyness::SessionDependent, Some(SESSION_1)); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - } - - #[test] - fn test_replace_dirtyness_and_session() { - let mut count = DirtyContainerCount::default(); - count.update_with_dirtyness_and_session(Dirtyness::Dirty, None); - let diff = count.replace_dirtyness_and_session( - Dirtyness::Dirty, - None, - Dirtyness::SessionDependent, - Some(SESSION_1), - ); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 0); - - let mut count = DirtyContainerCount::default(); - count.update_with_dirtyness_and_session(Dirtyness::SessionDependent, Some(SESSION_1)); - let diff = count.replace_dirtyness_and_session( - Dirtyness::SessionDependent, - Some(SESSION_1), - Dirtyness::Dirty, - None, - ); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 0); - } -} - #[derive(Debug, Clone, Copy)] pub enum RootType { RootTask, @@ -665,7 +306,11 @@ pub enum CachedDataItem { value: i32, }, AggregatedDirtyContainerCount { - value: DirtyContainerCount, + value: i32, + }, + AggregatedSessionDependentCleanContainerCount { + session_id: SessionId, + value: i32, }, // Flags @@ -746,6 +391,7 @@ impl CachedDataItem { !collectible.cell.task.is_transient() } CachedDataItem::AggregatedDirtyContainerCount { .. } => true, + CachedDataItem::AggregatedSessionDependentCleanContainerCount { .. } => true, CachedDataItem::Stateful { .. } => true, CachedDataItem::HasInvalidator { .. } => true, CachedDataItem::Immutable { .. } => true, @@ -819,6 +465,7 @@ impl CachedDataItem { | Self::AggregatedSessionDependentCleanContainer { .. } | Self::AggregatedCollectible { .. } | Self::AggregatedDirtyContainerCount { .. } + | Self::AggregatedSessionDependentCleanContainerCount { .. } | Self::Stateful { .. } | Self::HasInvalidator { .. } | Self::Immutable { .. } @@ -868,6 +515,7 @@ impl CachedDataItemKey { !collectible.cell.task.is_transient() } CachedDataItemKey::AggregatedDirtyContainerCount { .. } => true, + CachedDataItemKey::AggregatedSessionDependentCleanContainerCount { .. } => true, CachedDataItemKey::Stateful { .. } => true, CachedDataItemKey::HasInvalidator { .. } => true, CachedDataItemKey::Immutable { .. } => true, @@ -909,6 +557,7 @@ impl CachedDataItemType { | Self::AggregatedSessionDependentCleanContainer { .. } | Self::AggregatedCollectible { .. } | Self::AggregatedDirtyContainerCount { .. } + | Self::AggregatedSessionDependentCleanContainerCount { .. } | Self::Stateful { .. } | Self::HasInvalidator { .. } | Self::Immutable { .. } @@ -946,6 +595,7 @@ impl CachedDataItemType { | Self::AggregatedSessionDependentCleanContainer | Self::AggregatedCollectible | Self::AggregatedDirtyContainerCount + | Self::AggregatedSessionDependentCleanContainerCount | Self::Stateful | Self::HasInvalidator | Self::Immutable => true,