diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 3bda0294add31..ac1dd82fb5e28 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -34,10 +34,11 @@ use turbo_tasks::{ event::{Event, EventListener}, message_queue::TimingEvent, registry::get_value_type, + scope::scope_and_block, task_statistics::TaskStatisticsApi, trace::TraceRawVcs, turbo_tasks, - util::IdFactoryWithReuse, + util::{IdFactoryWithReuse, good_chunk_size, into_chunks}, }; pub use self::{operation::AnyOperation, storage::TaskDataCategory}; @@ -46,10 +47,11 @@ use crate::backend::operation::TaskDirtyCause; use crate::{ backend::{ operation::{ - AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation, - ComputeDirtyAndCleanUpdate, ConnectChildOperation, ExecuteContext, ExecuteContextImpl, - Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number, - get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children, + AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext, + CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation, + ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskGuard, + connect_children, get_aggregation_number, get_uppers, is_root_node, + make_task_dirty_internal, prepare_new_children, }, storage::{ InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with, @@ -68,6 +70,11 @@ use crate::{ }, }; +/// Threshold for parallelizing making dependent tasks dirty. +/// If the number of dependent tasks exceeds this threshold, +/// the operation will be parallelized. +const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000; + const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1); /// Configurable idle timeout for snapshot persistence. @@ -2167,8 +2174,12 @@ impl TurboTasksBackendInner { ); } - let mut queue = AggregationUpdateQueue::new(); - for dependent_task_id in output_dependent_tasks { + fn process_output_dependents( + ctx: &mut impl ExecuteContext<'_>, + task_id: TaskId, + dependent_task_id: TaskId, + queue: &mut AggregationUpdateQueue, + ) { #[cfg(feature = "trace_task_output_dependencies")] let span = tracing::trace_span!( "invalidate output dependency", @@ -2181,7 +2192,7 @@ impl TurboTasksBackendInner { // once tasks are never invalidated #[cfg(feature = "trace_task_output_dependencies")] span.record("result", "once task"); - continue; + return; } let mut make_stale = true; let dependent = ctx.task(dependent_task_id, TaskDataCategory::All); @@ -2198,7 +2209,7 @@ impl TurboTasksBackendInner { // output anymore and doesn't need to be invalidated #[cfg(feature = "trace_task_output_dependencies")] span.record("result", "no backward dependency"); - continue; + return; } make_task_dirty_internal( dependent, @@ -2206,14 +2217,41 @@ impl TurboTasksBackendInner { make_stale, #[cfg(feature = "trace_task_dirty")] TaskDirtyCause::OutputChange { task_id }, - &mut queue, + queue, ctx, ); #[cfg(feature = "trace_task_output_dependencies")] span.record("result", "marked dirty"); } - queue.execute(ctx); + if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD { + let chunk_size = good_chunk_size(output_dependent_tasks.len()); + let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size); + let _ = scope_and_block(chunks.len(), |scope| { + for chunk in chunks { + let child_ctx = ctx.child_context(); + scope.spawn(move || { + let mut ctx = child_ctx.create(); + let mut queue = AggregationUpdateQueue::new(); + for dependent_task_id in chunk { + process_output_dependents( + &mut ctx, + task_id, + dependent_task_id, + &mut queue, + ) + } + queue.execute(&mut ctx); + }); + } + }); + } else { + let mut queue = AggregationUpdateQueue::new(); + for dependent_task_id in output_dependent_tasks { + process_output_dependents(ctx, task_id, dependent_task_id, &mut queue); + } + queue.execute(ctx); + } } fn task_execution_completed_unfinished_children_dirty( diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs index 36ab4b042ecfa..3009a06f29bd2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs @@ -125,10 +125,10 @@ pub fn connect_children( // This avoids long pauses of more than 30µs * 10k = 300ms. // We don't want to parallelize too eagerly as spawning tasks and the temporary allocations have // a cost as well. - const MIN_CHILDREN_FOR_PARALLEL: usize = 10000; + const CONNECT_CHILDREN_PARALLIZATION_THRESHOLD: usize = 10000; let len = new_follower_ids.len(); - if len >= MIN_CHILDREN_FOR_PARALLEL { + if len >= CONNECT_CHILDREN_PARALLIZATION_THRESHOLD { let new_follower_ids = new_follower_ids.into_vec(); let chunk_size = good_chunk_size(len); let _ = scope_and_block(len.div_ceil(chunk_size), |scope| { diff --git a/turbopack/crates/turbo-tasks/src/util.rs b/turbopack/crates/turbo-tasks/src/util.rs index 260a8797f8e7e..7cf5645bb4dc5 100644 --- a/turbopack/crates/turbo-tasks/src/util.rs +++ b/turbopack/crates/turbo-tasks/src/util.rs @@ -330,6 +330,16 @@ pub struct IntoChunks { chunk_size: usize, } +impl IntoChunks { + pub fn len(&self) -> usize { + (self.data.len() - self.index).div_ceil(self.chunk_size) + } + + pub fn is_empty(&self) -> bool { + self.index >= self.data.len() + } +} + impl Iterator for IntoChunks { type Item = Chunk; @@ -414,6 +424,18 @@ impl Drop for Chunk { mod tests { use super::*; + #[test] + fn test_into_chunks_len() { + assert_eq!(into_chunks::(vec![], 2).len(), 0); + assert_eq!(into_chunks(vec![1], 2).len(), 1); + assert_eq!(into_chunks(vec![1, 2], 2).len(), 1); + assert_eq!(into_chunks(vec![1, 2, 3], 2).len(), 2); + assert_eq!(into_chunks(vec![1, 2, 3, 4], 2).len(), 2); + assert_eq!(into_chunks(vec![1, 2, 3, 4, 5], 2).len(), 3); + assert_eq!(into_chunks(vec![1, 2, 3, 4, 5, 6], 2).len(), 3); + assert_eq!(into_chunks(vec![1, 2, 3, 4, 5, 6, 7], 2).len(), 4); + } + #[test] fn test_chunk_iterator() { let data = [(); 10]