diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 8ba9726704ba9..32cdbbf7ea002 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -2431,7 +2431,11 @@ impl TurboTasksBackendInner { } AggregationUpdateJob::data_update( &mut task, - AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update), + AggregatedDataUpdate::new().dirty_container_update( + task_id, + aggregated_update.count, + aggregated_update.current_session_clean(ctx.session_id()), + ), ) } else { None diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index c7fc49b3c100e..d483ce43c6e45 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -4,7 +4,7 @@ use std::{ hash::Hash, mem::take, num::NonZeroU32, - ops::ControlFlow, + ops::{ControlFlow, Deref}, thread::yield_now, time::{Duration, Instant}, }; @@ -216,11 +216,33 @@ impl AggregationUpdateJob { } } +#[derive(Default, Serialize, Deserialize, Clone, Copy, Debug)] +pub struct SessionDependent { + #[serde(skip, default)] + pub value: T, +} + +impl SessionDependent { + pub fn new(value: T) -> Self { + Self { value } + } +} + +impl Deref for SessionDependent { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + /// Aggregated data update. #[derive(Default, Serialize, Deserialize, Clone, Debug)] pub struct AggregatedDataUpdate { /// One of the inner tasks has changed its dirty state or aggregated dirty state. - dirty_container_update: Option<(TaskId, DirtyContainerCount)>, + /// (task id, dirty update, current session clean update) + // TODO Serialize the current session clean update as 0 + dirty_container_update: Option<(TaskId, i32, SessionDependent)>, /// One of the inner tasks has changed its collectibles count or aggregated collectibles count. collectibles_update: Vec<(CollectibleRef, i32)>, } @@ -228,7 +250,7 @@ pub struct AggregatedDataUpdate { impl AggregatedDataUpdate { /// Derives an `AggregatedDataUpdate` from a task. This is used when a task is connected to an /// upper task. - fn from_task(task: &mut impl TaskGuard) -> Self { + fn from_task(task: &mut impl TaskGuard, current_session_id: SessionId) -> Self { let aggregation = get_aggregation_number(task); let mut dirty_container_count = Default::default(); let mut collectibles_update: Vec<_> = @@ -255,15 +277,20 @@ impl AggregatedDataUpdate { let mut result = Self::new().collectibles_update(collectibles_update); if !dirty_container_count.is_zero() { - let DirtyContainerCount { - count, - count_in_session, - } = dirty_container_count; result = result.dirty_container_update( task.id(), - DirtyContainerCount { - count: if count > 0 { 1 } else { 0 }, - count_in_session: count_in_session.map(|(s, c)| (s, if c > 0 { 1 } else { 0 })), + if dirty_container_count.count > 0 { + 1 + } else { + 0 + }, + if dirty_container_count.count > 0 + && dirty_container_count.current_session_clean(current_session_id) + >= dirty_container_count.count + { + 1 + } else { + 0 }, ); } @@ -276,8 +303,9 @@ impl AggregatedDataUpdate { dirty_container_update, collectibles_update, } = &mut self; - if let Some((_, value)) = dirty_container_update.as_mut() { - *value = value.negate() + if let Some((_, value, current_session_clean_update)) = dirty_container_update.as_mut() { + *value = -*value; + current_session_clean_update.value = -current_session_clean_update.value; } for (_, value) in collectibles_update.iter_mut() { *value = -*value; @@ -290,7 +318,7 @@ impl AggregatedDataUpdate { fn apply( &self, task: &mut impl TaskGuard, - session_id: SessionId, + current_session_id: SessionId, should_track_activeness: bool, queue: &mut AggregationUpdateQueue, ) -> AggregatedDataUpdate { @@ -299,13 +327,15 @@ impl AggregatedDataUpdate { collectibles_update, } = self; let mut result = Self::default(); - if let Some((dirty_container_id, count)) = dirty_container_update { + if let &Some((dirty_container_id, count, session_dependent_clean_update)) = + dirty_container_update + { if should_track_activeness { // When a dirty container count is increased and the task is considered as active // we need to schedule the dirty tasks in the new dirty container - let current_session_update = count.get(session_id); + let current_session_update = count - *session_dependent_clean_update; if current_session_update > 0 && task.has_key(&CachedDataItemKey::Activeness {}) { - queue.push_find_and_schedule_dirty(*dirty_container_id) + queue.push_find_and_schedule_dirty(dirty_container_id) } } @@ -313,11 +343,16 @@ impl AggregatedDataUpdate { update!( task, AggregatedDirtyContainer { - task: *dirty_container_id + task: dirty_container_id }, |old: Option| { let mut new = old.unwrap_or_default(); - aggregated_update = new.update_count(count); + aggregated_update = + new.update_count(&DirtyContainerCount::from_current_session_clean( + count, + current_session_id, + *session_dependent_clean_update, + )); (!new.is_zero()).then_some(new) } ); @@ -337,12 +372,18 @@ impl AggregatedDataUpdate { new.undo_update_with_dirtyness_and_session(dirtyness, clean_in_session); } if !aggregated_update.is_zero() { - result.dirty_container_update = Some((task_id, aggregated_update)); + result.dirty_container_update = Some(( + task_id, + aggregated_update.count, + SessionDependent::new( + aggregated_update.current_session_clean(current_session_id), + ), + )); } (!new.is_zero()).then_some(new) }); - if let Some((_, count)) = result.dirty_container_update.as_ref() - && count.get(session_id) < 0 + if let Some((_, count, current_session_clean)) = result.dirty_container_update + && count - *current_session_clean < 0 { // When the current task is no longer dirty, we need to fire the // aggregate root events and do some cleanup @@ -421,8 +462,17 @@ impl AggregatedDataUpdate { } /// Adds a dirty container update to the update. - pub fn dirty_container_update(mut self, task_id: TaskId, count: DirtyContainerCount) -> Self { - self.dirty_container_update = Some((task_id, count)); + pub fn dirty_container_update( + mut self, + task_id: TaskId, + count: i32, + current_session_clean_update: i32, + ) -> Self { + self.dirty_container_update = Some(( + task_id, + count, + SessionDependent::new(current_session_clean_update), + )); self } @@ -1041,7 +1091,7 @@ impl AggregationUpdateQueue { } // When this is a new inner node, update aggregated data and // followers - let data = AggregatedDataUpdate::from_task(&mut task); + let data = AggregatedDataUpdate::from_task(&mut task, ctx.session_id()); let followers = get_followers(&task); let diff = data.apply( &mut upper, @@ -1138,7 +1188,8 @@ impl AggregationUpdateQueue { // Since this is no longer an inner node, update the aggregated data and // followers - let data = AggregatedDataUpdate::from_task(&mut task).invert(); + let data = + AggregatedDataUpdate::from_task(&mut task, ctx.session_id()).invert(); let followers = get_followers(&task); let diff = data.apply( &mut upper, @@ -1311,7 +1362,8 @@ impl AggregationUpdateQueue { follower_in_upper }); if !removed_uppers.is_empty() { - let data = AggregatedDataUpdate::from_task(&mut follower).invert(); + let data = + AggregatedDataUpdate::from_task(&mut follower, ctx.session_id()).invert(); let followers = get_followers(&follower); drop(follower); @@ -1473,7 +1525,8 @@ impl AggregationUpdateQueue { Some(old - 1) }); if remove_upper { - let data = AggregatedDataUpdate::from_task(&mut follower).invert(); + let data = + AggregatedDataUpdate::from_task(&mut follower, ctx.session_id()).invert(); let followers = get_followers(&follower); drop(follower); @@ -1711,7 +1764,7 @@ impl AggregationUpdateQueue { self.push_optimize_task(new_follower_id); } - let data = AggregatedDataUpdate::from_task(&mut follower); + let data = AggregatedDataUpdate::from_task(&mut follower, ctx.session_id()); let children = get_followers(&follower); drop(follower); @@ -1882,7 +1935,7 @@ impl AggregationUpdateQueue { } // It's a new upper - let data = AggregatedDataUpdate::from_task(&mut inner); + let data = AggregatedDataUpdate::from_task(&mut inner, ctx.session_id()); let children = get_followers(&inner); let follower_aggregation_number = get_aggregation_number(&inner); drop(inner); @@ -2094,7 +2147,7 @@ impl AggregationUpdateQueue { self.push_optimize_task(new_follower_id); } // It's a new upper - let data = AggregatedDataUpdate::from_task(&mut inner); + let data = AggregatedDataUpdate::from_task(&mut inner, ctx.session_id()); let followers = get_followers(&inner); drop(inner); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index 1e7961e9c44a5..da1f5f03093cd 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -299,7 +299,11 @@ pub fn make_task_dirty_internal( if !aggregated_update.is_zero() { queue.extend(AggregationUpdateJob::data_update( &mut task, - AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update), + AggregatedDataUpdate::new().dirty_container_update( + task_id, + aggregated_update.count, + aggregated_update.current_session_clean(ctx.session_id()), + ), )); } !ctx.should_track_activeness() || task.has_key(&CachedDataItemKey::Activeness {}) diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 2bdb13f41619c..47db822e94bc3 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -171,6 +171,26 @@ pub struct DirtyContainerCount { } impl DirtyContainerCount { + pub fn from_current_session_clean( + count: i32, + current_session_id: SessionId, + current_session_clean: i32, + ) -> DirtyContainerCount { + DirtyContainerCount { + count, + count_in_session: Some((current_session_id, count - current_session_clean)), + } + } + + pub fn current_session_clean(&self, current_session_id: SessionId) -> i32 { + if let Some((s, c)) = self.count_in_session + && s == current_session_id + { + return self.count - c; + } + 0 + } + /// Get the count for a specific session. It's only expected to be asked for the current /// session, since old session counts might be dropped. pub fn get(&self, session: SessionId) -> i32 {