@@ -36,10 +36,11 @@ use turbo_tasks::{
3636 event:: { Event , EventListener } ,
3737 message_queue:: TimingEvent ,
3838 registry:: get_value_type,
39+ scope:: scope_and_block,
3940 task_statistics:: TaskStatisticsApi ,
4041 trace:: TraceRawVcs ,
4142 turbo_tasks,
42- util:: { IdFactoryWithReuse , good_chunk_size} ,
43+ util:: { IdFactoryWithReuse , good_chunk_size, into_chunks } ,
4344} ;
4445
4546pub use self :: { operation:: AnyOperation , storage:: TaskDataCategory } ;
@@ -48,10 +49,11 @@ use crate::backend::operation::TaskDirtyCause;
4849use crate :: {
4950 backend:: {
5051 operation:: {
51- AggregationUpdateJob , AggregationUpdateQueue , CleanupOldEdgesOperation ,
52- ComputeDirtyAndCleanUpdate , ConnectChildOperation , ExecuteContext , ExecuteContextImpl ,
53- Operation , OutdatedEdge , TaskGuard , connect_children, get_aggregation_number,
54- get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
52+ AggregationUpdateJob , AggregationUpdateQueue , ChildExecuteContext ,
53+ CleanupOldEdgesOperation , ComputeDirtyAndCleanUpdate , ConnectChildOperation ,
54+ ExecuteContext , ExecuteContextImpl , Operation , OutdatedEdge , TaskGuard ,
55+ connect_children, get_aggregation_number, get_uppers, is_root_node,
56+ make_task_dirty_internal, prepare_new_children,
5557 } ,
5658 storage:: {
5759 InnerStorageSnapshot , Storage , count, get, get_many, get_mut, get_mut_or_insert_with,
@@ -2157,8 +2159,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21572159 ) {
21582160 debug_assert ! ( !output_dependent_tasks. is_empty( ) ) ;
21592161
2160- let mut queue = AggregationUpdateQueue :: new ( ) ;
2161- for dependent_task_id in output_dependent_tasks {
2162+ fn process_output_dependents (
2163+ ctx : & mut impl ExecuteContext < ' _ > ,
2164+ task_id : TaskId ,
2165+ dependent_task_id : TaskId ,
2166+ queue : & mut AggregationUpdateQueue ,
2167+ ) {
21622168 #[ cfg( feature = "trace_task_output_dependencies" ) ]
21632169 let span = tracing:: trace_span!(
21642170 "invalidate output dependency" ,
@@ -2171,37 +2177,64 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21712177 // once tasks are never invalidated
21722178 #[ cfg( feature = "trace_task_output_dependencies" ) ]
21732179 span. record ( "result" , "once task" ) ;
2174- continue ;
2180+ return ;
21752181 }
21762182 let dependent = ctx. task ( dependent_task_id, TaskDataCategory :: All ) ;
21772183 if dependent. has_key ( & CachedDataItemKey :: OutdatedOutputDependency { target : task_id } ) {
21782184 // output dependency is outdated, so it hasn't read the output yet
21792185 // and doesn't need to be invalidated
21802186 #[ cfg( feature = "trace_task_output_dependencies" ) ]
21812187 span. record ( "result" , "outdated dependency" ) ;
2182- continue ;
2188+ return ;
21832189 }
21842190 if !dependent. has_key ( & CachedDataItemKey :: OutputDependency { target : task_id } ) {
21852191 // output dependency has been removed, so the task doesn't depend on the
21862192 // output anymore and doesn't need to be invalidated
21872193 #[ cfg( feature = "trace_task_output_dependencies" ) ]
21882194 span. record ( "result" , "no backward dependency" ) ;
2189- continue ;
2195+ return ;
21902196 }
21912197 make_task_dirty_internal (
21922198 dependent,
21932199 dependent_task_id,
21942200 true ,
21952201 #[ cfg( feature = "trace_task_dirty" ) ]
21962202 TaskDirtyCause :: OutputChange { task_id } ,
2197- & mut queue,
2203+ queue,
21982204 ctx,
21992205 ) ;
22002206 #[ cfg( feature = "trace_task_output_dependencies" ) ]
22012207 span. record ( "result" , "marked dirty" ) ;
22022208 }
22032209
2204- queue. execute ( ctx) ;
2210+ if output_dependent_tasks. len ( ) > 128 {
2211+ let chunk_size = good_chunk_size ( output_dependent_tasks. len ( ) ) ;
2212+ let chunks = into_chunks ( output_dependent_tasks. to_vec ( ) , chunk_size) ;
2213+ let _ = scope_and_block ( chunks. len ( ) , |scope| {
2214+ for chunk in chunks {
2215+ let child_ctx = ctx. child_context ( ) ;
2216+ scope. spawn ( move || {
2217+ let mut ctx = child_ctx. create ( ) ;
2218+ let mut queue = AggregationUpdateQueue :: new ( ) ;
2219+ for dependent_task_id in chunk {
2220+ process_output_dependents (
2221+ & mut ctx,
2222+ task_id,
2223+ dependent_task_id,
2224+ & mut queue,
2225+ )
2226+ }
2227+ queue. execute ( & mut ctx) ;
2228+ } ) ;
2229+ }
2230+ } ) ;
2231+ } else {
2232+ let mut queue = AggregationUpdateQueue :: new ( ) ;
2233+ for dependent_task_id in output_dependent_tasks {
2234+ process_output_dependents ( ctx, task_id, dependent_task_id, & mut queue) ;
2235+ }
2236+ queue. execute ( ctx) ;
2237+ }
22052238 }
22062239
22072240 fn task_execution_completed_unfinished_children_dirty (
0 commit comments