@@ -36,10 +36,11 @@ use turbo_tasks::{
3636 event:: { Event , EventListener } ,
3737 message_queue:: TimingEvent ,
3838 registry:: get_value_type,
39+ scope:: scope_and_block,
3940 task_statistics:: TaskStatisticsApi ,
4041 trace:: TraceRawVcs ,
4142 turbo_tasks,
42- util:: { IdFactoryWithReuse , good_chunk_size} ,
43+ util:: { IdFactoryWithReuse , good_chunk_size, into_chunks } ,
4344} ;
4445
4546pub use self :: { operation:: AnyOperation , storage:: TaskDataCategory } ;
@@ -48,10 +49,11 @@ use crate::backend::operation::TaskDirtyCause;
4849use crate :: {
4950 backend:: {
5051 operation:: {
51- AggregationUpdateJob , AggregationUpdateQueue , CleanupOldEdgesOperation ,
52- ComputeDirtyAndCleanUpdate , ConnectChildOperation , ExecuteContext , ExecuteContextImpl ,
53- Operation , OutdatedEdge , TaskGuard , connect_children, get_aggregation_number,
54- get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
52+ AggregationUpdateJob , AggregationUpdateQueue , ChildExecuteContext ,
53+ CleanupOldEdgesOperation , ComputeDirtyAndCleanUpdate , ConnectChildOperation ,
54+ ExecuteContext , ExecuteContextImpl , Operation , OutdatedEdge , TaskGuard ,
55+ connect_children, get_aggregation_number, get_uppers, is_root_node,
56+ make_task_dirty_internal, prepare_new_children,
5557 } ,
5658 storage:: {
5759 InnerStorageSnapshot , Storage , count, get, get_many, get_mut, get_mut_or_insert_with,
@@ -2160,8 +2162,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21602162 ) {
21612163 debug_assert ! ( !output_dependent_tasks. is_empty( ) ) ;
21622164
2163- let mut queue = AggregationUpdateQueue :: new ( ) ;
2164- for dependent_task_id in output_dependent_tasks {
2165+ fn process_output_dependents (
2166+ ctx : & mut impl ExecuteContext < ' _ > ,
2167+ task_id : TaskId ,
2168+ dependent_task_id : TaskId ,
2169+ queue : & mut AggregationUpdateQueue ,
2170+ ) {
21652171 #[ cfg( feature = "trace_task_output_dependencies" ) ]
21662172 let span = tracing:: trace_span!(
21672173 "invalidate output dependency" ,
@@ -2174,37 +2180,64 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21742180 // once tasks are never invalidated
21752181 #[ cfg( feature = "trace_task_output_dependencies" ) ]
21762182 span. record ( "result" , "once task" ) ;
2177- continue ;
2183+ return ;
21782184 }
21792185 let dependent = ctx. task ( dependent_task_id, TaskDataCategory :: All ) ;
21802186 if dependent. has_key ( & CachedDataItemKey :: OutdatedOutputDependency { target : task_id } ) {
21812187 // output dependency is outdated, so it hasn't read the output yet
21822188 // and doesn't need to be invalidated
21832189 #[ cfg( feature = "trace_task_output_dependencies" ) ]
21842190 span. record ( "result" , "outdated dependency" ) ;
2185- continue ;
2191+ return ;
21862192 }
21872193 if !dependent. has_key ( & CachedDataItemKey :: OutputDependency { target : task_id } ) {
21882194 // output dependency has been removed, so the task doesn't depend on the
21892195 // output anymore and doesn't need to be invalidated
21902196 #[ cfg( feature = "trace_task_output_dependencies" ) ]
21912197 span. record ( "result" , "no backward dependency" ) ;
2192- continue ;
2198+ return ;
21932199 }
21942200 make_task_dirty_internal (
21952201 dependent,
21962202 dependent_task_id,
21972203 true ,
21982204 #[ cfg( feature = "trace_task_dirty" ) ]
21992205 TaskDirtyCause :: OutputChange { task_id } ,
2200- & mut queue,
2206+ queue,
22012207 ctx,
22022208 ) ;
22032209 #[ cfg( feature = "trace_task_output_dependencies" ) ]
22042210 span. record ( "result" , "marked dirty" ) ;
22052211 }
22062212
2207- queue. execute ( ctx) ;
2213+ if output_dependent_tasks. len ( ) > 128 {
2214+ let chunk_size = good_chunk_size ( output_dependent_tasks. len ( ) ) ;
2215+ let chunks = into_chunks ( output_dependent_tasks. to_vec ( ) , chunk_size) ;
2216+ let _ = scope_and_block ( chunks. len ( ) , |scope| {
2217+ for chunk in chunks {
2218+ let child_ctx = ctx. child_context ( ) ;
2219+ scope. spawn ( move || {
2220+ let mut ctx = child_ctx. create ( ) ;
2221+ let mut queue = AggregationUpdateQueue :: new ( ) ;
2222+ for dependent_task_id in chunk {
2223+ process_output_dependents (
2224+ & mut ctx,
2225+ task_id,
2226+ dependent_task_id,
2227+ & mut queue,
2228+ )
2229+ }
2230+ queue. execute ( & mut ctx) ;
2231+ } ) ;
2232+ }
2233+ } ) ;
2234+ } else {
2235+ let mut queue = AggregationUpdateQueue :: new ( ) ;
2236+ for dependent_task_id in output_dependent_tasks {
2237+ process_output_dependents ( ctx, task_id, dependent_task_id, & mut queue) ;
2238+ }
2239+ queue. execute ( ctx) ;
2240+ }
22082241 }
22092242
22102243 fn task_execution_completed_unfinished_children_dirty (
0 commit comments