diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index b550fd8b71f95..1291dbb2b964f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -592,14 +592,7 @@ impl TurboTasksBackendInner { .set_active_until_clean(); if ctx.should_track_activeness() { // A newly added Activeness need to make sure to schedule the tasks - task_ids_to_schedule = get_many!( - task, - AggregatedDirtyContainer { - task - } count if count.get(self.session_id) > 0 => { - task - } - ); + task_ids_to_schedule = task.dirty_containers(self.session_id).collect(); task_ids_to_schedule.push(task_id); } get!(task, Activeness).unwrap() @@ -660,16 +653,8 @@ impl TurboTasksBackendInner { "{task_id} {task_description}{count} (aggr={aggregation_number}, \ {in_progress}, {activeness}{is_dirty})", ); - let children: Vec<_> = iter_many!( - task, - AggregatedDirtyContainer { - task - } count => { - (task, count.get(ctx.session_id())) - } - ) - .filter(|(_, count)| *count > 0) - .collect(); + let children: Vec<_> = + task.dirty_containers_with_count(ctx.session_id()).collect(); drop(task); if missing_upper { @@ -2985,9 +2970,9 @@ impl TurboTasksBackendInner { } } - let is_dirty = task.is_dirty(self.session_id); - let has_dirty_container = get!(task, AggregatedDirtyContainerCount) - .is_some_and(|count| count.get(self.session_id) > 0); + let is_dirty = get!(task, Dirty).is_some(); + let has_dirty_container = + get!(task, AggregatedDirtyContainerCount).is_some_and(|count| count.count > 0); let should_be_in_upper = is_dirty || has_dirty_container; let aggregation_number = get_aggregation_number(&task); @@ -3012,7 +2997,7 @@ impl TurboTasksBackendInner { for upper_id in uppers { let task = ctx.task(task_id, TaskDataCategory::All); let in_upper = get!(task, AggregatedDirtyContainer { task: task_id }) - .is_some_and(|dirty| dirty.get(self.session_id) > 0); + .is_some_and(|&dirty| dirty > 0); if !in_upper { panic!( "Task {} ({}) is dirty, but is not listed in the upper task {} \ diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index d483ce43c6e45..82679af48347e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -28,7 +28,9 @@ use crate::{ backend::{ TaskDataCategory, get_mut, get_mut_or_insert_with, operation::{ExecuteContext, Operation, TaskGuard, invalidate::make_task_dirty}, - storage::{count, get, get_many, iter_many, remove, update, update_count}, + storage::{ + count, get, get_many, iter_many, remove, update, update_count, update_count_and_get, + }, }, data::{ ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType, @@ -271,23 +273,23 @@ impl AggregatedDataUpdate { collectibles_update.push((collectible, 1)); } } - if let Some((dirtyness, clean_in_session)) = task.dirtyness_and_session() { - dirty_container_count.update_with_dirtyness_and_session(dirtyness, clean_in_session); + let mut dirty_count = dirty_container_count.count; + let mut current_session_clean_count = + dirty_container_count.current_session_clean(current_session_id); + let (dirty, current_session_clean) = task.dirty(current_session_id); + if dirty { + dirty_count += 1; + } + if current_session_clean { + current_session_clean_count += 1; } let mut result = Self::new().collectibles_update(collectibles_update); - if !dirty_container_count.is_zero() { + if dirty_count > 0 { result = result.dirty_container_update( task.id(), - if dirty_container_count.count > 0 { - 1 - } else { - 0 - }, - if dirty_container_count.count > 0 - && dirty_container_count.current_session_clean(current_session_id) - >= dirty_container_count.count - { + if dirty_count > 0 { 1 } else { 0 }, + if dirty_count > 0 && dirty_count - current_session_clean_count <= 0 { 1 } else { 0 @@ -322,39 +324,105 @@ impl AggregatedDataUpdate { should_track_activeness: bool, queue: &mut AggregationUpdateQueue, ) -> AggregatedDataUpdate { + fn before_after_to_diff_value(before: bool, after: bool) -> i32 { + match (before, after) { + (true, false) => -1, + (false, true) => 1, + _ => 0, + } + } + let Self { dirty_container_update, collectibles_update, } = self; let mut result = Self::default(); - if let &Some((dirty_container_id, count, session_dependent_clean_update)) = + if let &Some((dirty_container_id, count, current_session_clean_update)) = dirty_container_update { if should_track_activeness { // When a dirty container count is increased and the task is considered as active // we need to schedule the dirty tasks in the new dirty container - let current_session_update = count - *session_dependent_clean_update; + let current_session_update = count - *current_session_clean_update; if current_session_update > 0 && task.has_key(&CachedDataItemKey::Activeness {}) { queue.push_find_and_schedule_dirty(dirty_container_id) } } - let mut aggregated_update = DirtyContainerCount::default(); - update!( - task, - AggregatedDirtyContainer { - task: dirty_container_id - }, - |old: Option| { - let mut new = old.unwrap_or_default(); - aggregated_update = - new.update_count(&DirtyContainerCount::from_current_session_clean( - count, - current_session_id, - *session_dependent_clean_update, - )); - (!new.is_zero()).then_some(new) - } + // Update AggregatedDirtyContainer and compute aggregated update + let mut dirty_container_count_update = 0; + let old_dirty_single_container_count; + let new_dirty_single_container_count; + if count != 0 { + new_dirty_single_container_count = update_count_and_get!( + task, + AggregatedDirtyContainer { + task: dirty_container_id + }, + count + ); + old_dirty_single_container_count = new_dirty_single_container_count - count; + dirty_container_count_update = before_after_to_diff_value( + old_dirty_single_container_count > 0, + new_dirty_single_container_count > 0, + ); + } else { + new_dirty_single_container_count = get!( + task, + AggregatedDirtyContainer { + task: dirty_container_id + } + ) + .copied() + .unwrap_or_default(); + old_dirty_single_container_count = new_dirty_single_container_count; + } + + // Update AggregatedSessionDependentCleanContainer + let old_single_container_current_session_clean_count; + let new_single_container_current_session_clean_count; + if *current_session_clean_update != 0 { + new_single_container_current_session_clean_count = update_count_and_get!( + task, + AggregatedSessionDependentCleanContainer { + task: dirty_container_id, + session_id: current_session_id + }, + *current_session_clean_update + ); + old_single_container_current_session_clean_count = + new_single_container_current_session_clean_count + - *current_session_clean_update; + } else { + new_single_container_current_session_clean_count = get!( + task, + AggregatedSessionDependentCleanContainer { + task: dirty_container_id, + session_id: current_session_id + } + ) + .copied() + .unwrap_or_default(); + old_single_container_current_session_clean_count = + new_single_container_current_session_clean_count; + } + + // compute aggregated update + let was_single_container_clean = old_dirty_single_container_count > 0 + && old_dirty_single_container_count + - old_single_container_current_session_clean_count + <= 0; + let is_single_container_clean = new_dirty_single_container_count > 0 + && new_dirty_single_container_count + - new_single_container_current_session_clean_count + <= 0; + let current_session_clean_update = + before_after_to_diff_value(was_single_container_clean, is_single_container_clean); + + let aggregated_update = DirtyContainerCount::from_current_session_clean( + dirty_container_count_update, + current_session_id, + current_session_clean_update, ); if !aggregated_update.is_zero() { @@ -1273,7 +1341,7 @@ impl AggregationUpdateQueue { // this would already be scheduled by the `Activeness` let is_active_until_clean = get!(task, Activeness).is_some_and(|a| a.active_until_clean); if !is_active_until_clean { - let mut dirty_containers = iter_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task).peekable(); + let mut dirty_containers = task.dirty_containers(session_id).peekable(); let is_empty = dirty_containers.peek().is_none(); if !is_empty || dirty { self.extend_find_and_schedule_dirty(dirty_containers); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 443ee182e5df2..63127a58b7939 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -25,7 +25,7 @@ use crate::{ backing_storage::BackingStorage, data::{ CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue, - CachedDataItemValueRef, CachedDataItemValueRefMut, Dirtyness, + CachedDataItemValueRef, CachedDataItemValueRefMut, DirtyContainerCount, Dirtyness, }, }; @@ -430,6 +430,50 @@ pub trait TaskGuard: Debug { )), } } + /// Returns (is_dirty, is_clean_in_current_session) + fn dirty(&self, session_id: SessionId) -> (bool, bool) { + match get!(self, Dirty) { + None => (false, false), + Some(Dirtyness::Dirty) => (true, false), + Some(Dirtyness::SessionDependent) => ( + true, + get!(self, CleanInSession).copied() == Some(session_id), + ), + } + } + fn dirty_containers(&self, session_id: SessionId) -> impl Iterator { + self.dirty_containers_with_count(session_id) + .map(|(task_id, _)| task_id) + } + fn dirty_containers_with_count( + &self, + session_id: SessionId, + ) -> impl Iterator { + iter_many!(self, AggregatedDirtyContainer { task } count => (task, *count)).filter( + move |&(task_id, count)| { + if count > 0 { + let clean_count = get!( + self, + AggregatedSessionDependentCleanContainer { + task: task_id, + session_id + } + ) + .copied() + .unwrap_or_default(); + count > clean_count + } else { + false + } + }, + ) + } + + fn dirty_container_count(&self) -> DirtyContainerCount { + get!(self, AggregatedDirtyContainerCount) + .cloned() + .unwrap_or_default() + } } pub struct TaskGuardImpl<'a, B: BackingStorage> { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index aeb0e076ad621..5245b6101a064 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -1096,6 +1096,38 @@ macro_rules! update_count { }; } +macro_rules! update_count_and_get { + ($task:ident, $key:ident $input:tt, -$update:expr) => {{ + let update = $update; + let mut new = 0; + $crate::backend::storage::update!($task, $key $input, |old: Option<_>| { + let old = old.unwrap_or(0); + new = old - update; + (new != 0).then_some(new) + }); + new + }}; + ($task:ident, $key:ident $input:tt, $update:expr) => { + match $update { + update => { + let mut new = 0; + $crate::backend::storage::update!($task, $key $input, |old: Option<_>| { + let old = old.unwrap_or(0); + new = old + update; + (new != 0).then_some(new) + }); + new + } + } + }; + ($task:ident, $key:ident, -$update:expr) => { + $crate::backend::storage::update_count_and_get!($task, $key {}, -$update) + }; + ($task:ident, $key:ident, $update:expr) => { + $crate::backend::storage::update_count_and_get!($task, $key {}, $update) + }; +} + macro_rules! remove { ($task:ident, $key:ident $input:tt) => {{ #[allow(unused_imports)] @@ -1122,6 +1154,7 @@ pub(crate) use iter_many; pub(crate) use remove; pub(crate) use update; pub(crate) use update_count; +pub(crate) use update_count_and_get; pub struct SnapshotGuard<'l> { storage: &'l Storage, diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 47db822e94bc3..02b4dbb18d677 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -653,7 +653,12 @@ pub enum CachedDataItem { // Aggregated Data AggregatedDirtyContainer { task: TaskId, - value: DirtyContainerCount, + value: i32, + }, + AggregatedSessionDependentCleanContainer { + task: TaskId, + session_id: SessionId, + value: i32, }, AggregatedCollectible { collectible: CollectibleRef, @@ -734,6 +739,9 @@ impl CachedDataItem { CachedDataItem::Follower { task, .. } => !task.is_transient(), CachedDataItem::Upper { task, .. } => !task.is_transient(), CachedDataItem::AggregatedDirtyContainer { task, .. } => !task.is_transient(), + CachedDataItem::AggregatedSessionDependentCleanContainer { task, .. } => { + !task.is_transient() + } CachedDataItem::AggregatedCollectible { collectible, .. } => { !collectible.cell.task.is_transient() } @@ -808,6 +816,7 @@ impl CachedDataItem { | Self::Child { .. } | Self::Upper { .. } | Self::AggregatedDirtyContainer { .. } + | Self::AggregatedSessionDependentCleanContainer { .. } | Self::AggregatedCollectible { .. } | Self::AggregatedDirtyContainerCount { .. } | Self::Stateful { .. } @@ -852,6 +861,9 @@ impl CachedDataItemKey { CachedDataItemKey::Follower { task, .. } => !task.is_transient(), CachedDataItemKey::Upper { task, .. } => !task.is_transient(), CachedDataItemKey::AggregatedDirtyContainer { task, .. } => !task.is_transient(), + CachedDataItemKey::AggregatedSessionDependentCleanContainer { task, .. } => { + !task.is_transient() + } CachedDataItemKey::AggregatedCollectible { collectible, .. } => { !collectible.cell.task.is_transient() } @@ -894,6 +906,7 @@ impl CachedDataItemType { | Self::Child { .. } | Self::Upper { .. } | Self::AggregatedDirtyContainer { .. } + | Self::AggregatedSessionDependentCleanContainer { .. } | Self::AggregatedCollectible { .. } | Self::AggregatedDirtyContainerCount { .. } | Self::Stateful { .. } @@ -930,6 +943,7 @@ impl CachedDataItemType { | Self::Follower | Self::Upper | Self::AggregatedDirtyContainer + | Self::AggregatedSessionDependentCleanContainer | Self::AggregatedCollectible | Self::AggregatedDirtyContainerCount | Self::Stateful diff --git a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs index d10af67b3e28e..7a23a8d4bfd42 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs @@ -4,19 +4,17 @@ use anyhow::Result; use turbo_tasks::{State, Vc}; -use turbo_tasks_testing::{Registration, register, run_once}; +use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn recompute() { - run_once(®ISTRATION, || async { - let input = ChangingInput { - state: State::new(1), - } - .cell(); + run(®ISTRATION, || async { + let input = get_state().resolve().await?; + input.await?.state.set(0); let output = compute(input); - assert_eq!(*output.await?, 1); + assert_eq!(*output.strongly_consistent().await?, 0); println!("changing input"); input.await?.state.set(10); @@ -44,6 +42,14 @@ async fn recompute() { .unwrap(); } +#[turbo_tasks::function] +fn get_state() -> Vc { + ChangingInput { + state: State::new(0), + } + .cell() +} + #[turbo_tasks::value] struct ChangingInput { state: State, diff --git a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs new file mode 100644 index 0000000000000..b2ad8ec529b13 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs @@ -0,0 +1,82 @@ +#![feature(arbitrary_self_types)] +#![feature(arbitrary_self_types_pointers)] +#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this + +use anyhow::Result; +use turbo_tasks::{State, Vc, mark_session_dependent}; +use turbo_tasks_testing::{Registration, register, run}; + +static REGISTRATION: Registration = register!(); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn recompute() { + run(®ISTRATION, || async { + let input = get_state().resolve().await?; + input.await?.state.set(0); + let output = compute(input); + assert_eq!(*output.strongly_consistent().await?, 0); + + println!("changing input"); + input.await?.state.set(10); + assert_eq!(*output.strongly_consistent().await?, 10); + + println!("changing input"); + input.await?.state.set(5); + assert_eq!(*output.strongly_consistent().await?, 5); + + println!("changing input"); + input.await?.state.set(20); + assert_eq!(*output.strongly_consistent().await?, 20); + + println!("changing input"); + input.await?.state.set(15); + assert_eq!(*output.strongly_consistent().await?, 15); + + println!("changing input"); + input.await?.state.set(1); + assert_eq!(*output.strongly_consistent().await?, 1); + + anyhow::Ok(()) + }) + .await + .unwrap(); +} + +#[turbo_tasks::function] +fn get_state() -> Vc { + ChangingInput { + state: State::new(0), + } + .cell() +} + +#[turbo_tasks::value] +struct ChangingInput { + state: State, +} + +#[turbo_tasks::function] +async fn compute(input: Vc) -> Result> { + println!("compute()"); + let value = *inner_compute(input).await?; + Ok(Vc::cell(value)) +} + +#[turbo_tasks::function] +async fn inner_compute(input: Vc) -> Result> { + println!("inner_compute()"); + let state_value = *input.await?.state.get(); + let mut last = None; + for i in 0..=state_value { + last = Some(compute2(Vc::cell(i))); + } + Ok(last.unwrap()) +} + +#[turbo_tasks::function] +async fn compute2(input: Vc) -> Result> { + mark_session_dependent(); + println!("compute2()"); + let value = *input.await?; + Ok(Vc::cell(value)) +} diff --git a/turbopack/crates/turbo-tasks-testing/src/run.rs b/turbopack/crates/turbo-tasks-testing/src/run.rs index 565c80e31606f..b7e7a3b2ea095 100644 --- a/turbopack/crates/turbo-tasks-testing/src/run.rs +++ b/turbopack/crates/turbo-tasks-testing/src/run.rs @@ -98,38 +98,49 @@ where F: Future> + Send + 'static, T: Debug + PartialEq + Eq + TraceRawVcs + Send + 'static, { - let single_run = env::var("SINGLE_RUN").is_ok(); + let infinite_initial_runs = env::var("INFINITE_INITIAL_RUNS").is_ok(); + let infinite_memory_runs = !infinite_initial_runs && env::var("INFINITE_MEMORY_RUNS").is_ok(); + let single_run = infinite_initial_runs || env::var("SINGLE_RUN").is_ok(); let name = closure_to_name(&fut); - let tt = registration.create_turbo_tasks(&name, true); - println!("Run #1 (without cache)"); - let start = std::time::Instant::now(); - let first = fut(tt.clone()).await?; - println!("Run #1 took {:?}", start.elapsed()); - if !single_run { - for i in 2..10 { - println!("Run #{i} (with memory cache, same TurboTasks instance)"); - let start = std::time::Instant::now(); - let second = fut(tt.clone()).await?; - println!("Run #{i} took {:?}", start.elapsed()); - assert_eq!(first, second); - } - } - let start = std::time::Instant::now(); - tt.stop_and_wait().await; - println!("Stopping TurboTasks took {:?}", start.elapsed()); - if single_run { - return Ok(()); - } - for i in 10..20 { - let tt = registration.create_turbo_tasks(&name, false); - println!("Run #{i} (with filesystem cache if available, new TurboTasks instance)"); + let mut i = 1; + loop { + let tt = registration.create_turbo_tasks(&name, true); + println!("Run #{i} (without cache)"); let start = std::time::Instant::now(); - let third = fut(tt.clone()).await?; + let first = fut(tt.clone()).await?; println!("Run #{i} took {:?}", start.elapsed()); + i += 1; + if !single_run { + let max_run = if infinite_memory_runs { usize::MAX } else { 10 }; + for _ in 0..max_run { + println!("Run #{i} (with memory cache, same TurboTasks instance)"); + let start = std::time::Instant::now(); + let second = fut(tt.clone()).await?; + println!("Run #{i} took {:?}", start.elapsed()); + i += 1; + assert_eq!(first, second); + } + } let start = std::time::Instant::now(); tt.stop_and_wait().await; println!("Stopping TurboTasks took {:?}", start.elapsed()); - assert_eq!(first, third); + if !single_run { + for _ in 10..20 { + let tt = registration.create_turbo_tasks(&name, false); + println!("Run #{i} (with filesystem cache if available, new TurboTasks instance)"); + let start = std::time::Instant::now(); + let third = fut(tt.clone()).await?; + println!("Run #{i} took {:?}", start.elapsed()); + i += 1; + let start = std::time::Instant::now(); + tt.stop_and_wait().await; + println!("Stopping TurboTasks took {:?}", start.elapsed()); + assert_eq!(first, third); + } + } + if !infinite_initial_runs { + break; + } } Ok(()) }