Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 55 additions & 12 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ use turbo_tasks::{
event::{Event, EventListener},
message_queue::TimingEvent,
registry::get_value_type,
scope::scope_and_block,
task_statistics::TaskStatisticsApi,
trace::TraceRawVcs,
turbo_tasks,
util::{IdFactoryWithReuse, good_chunk_size},
util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
};

pub use self::{operation::AnyOperation, storage::TaskDataCategory};
Expand All @@ -48,10 +49,11 @@ use crate::backend::operation::TaskDirtyCause;
use crate::{
backend::{
operation::{
AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation,
ComputeDirtyAndCleanUpdate, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskGuard,
connect_children, get_aggregation_number, get_uppers, is_root_node,
make_task_dirty_internal, prepare_new_children,
},
storage::{
InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
Expand All @@ -70,6 +72,16 @@ use crate::{
},
};

/// Threshold for parallelizing making dependent tasks dirty.
/// If the number of dependent tasks exceeds this threshold,
/// the operation will be parallelized.
const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 128;

/// Threshold for parallelizing prefetching tasks.
/// If the number of tasks to prefetch exceeds this threshold,
/// the operation will be parallelized.
const PREFETCH_TASKS_PARALLIZATION_THRESHOLD: usize = 128;

const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);

/// Configurable idle timeout for snapshot persistence.
Expand Down Expand Up @@ -2160,8 +2172,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
) {
debug_assert!(!output_dependent_tasks.is_empty());

let mut queue = AggregationUpdateQueue::new();
for dependent_task_id in output_dependent_tasks {
fn process_output_dependents(
ctx: &mut impl ExecuteContext<'_>,
task_id: TaskId,
dependent_task_id: TaskId,
queue: &mut AggregationUpdateQueue,
) {
#[cfg(feature = "trace_task_output_dependencies")]
let span = tracing::trace_span!(
"invalidate output dependency",
Expand All @@ -2174,7 +2190,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// once tasks are never invalidated
#[cfg(feature = "trace_task_output_dependencies")]
span.record("result", "once task");
continue;
return;
}
let mut make_stale = true;
let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
Expand All @@ -2191,22 +2207,49 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// output anymore and doesn't need to be invalidated
#[cfg(feature = "trace_task_output_dependencies")]
span.record("result", "no backward dependency");
continue;
return;
}
make_task_dirty_internal(
dependent,
dependent_task_id,
make_stale,
#[cfg(feature = "trace_task_dirty")]
TaskDirtyCause::OutputChange { task_id },
&mut queue,
queue,
ctx,
);
#[cfg(feature = "trace_task_output_dependencies")]
span.record("result", "marked dirty");
}

queue.execute(ctx);
if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
let chunk_size = good_chunk_size(output_dependent_tasks.len());
let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
let _ = scope_and_block(chunks.len(), |scope| {
for chunk in chunks {
let child_ctx = ctx.child_context();
scope.spawn(move || {
let mut ctx = child_ctx.create();
let mut queue = AggregationUpdateQueue::new();
for dependent_task_id in chunk {
process_output_dependents(
&mut ctx,
task_id,
dependent_task_id,
&mut queue,
)
}
queue.execute(&mut ctx);
});
}
});
} else {
let mut queue = AggregationUpdateQueue::new();
for dependent_task_id in output_dependent_tasks {
process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
}
queue.execute(ctx);
}
}

fn task_execution_completed_unfinished_children_dirty(
Expand Down Expand Up @@ -2572,7 +2615,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let range: Range<usize> = if let Some(range) = range {
range
} else {
if data.len() > 128 {
if data.len() > PREFETCH_TASKS_PARALLIZATION_THRESHOLD {
let chunk_size = good_chunk_size(data.len());
let chunks = data.len().div_ceil(chunk_size);
for i in 0..chunks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ pub fn connect_children(
// This avoids long pauses of more than 30µs * 10k = 300ms.
// We don't want to parallelize too eagerly as spawning tasks and the temporary allocations have
// a cost as well.
const MIN_CHILDREN_FOR_PARALLEL: usize = 10000;
const CONNECT_CHILDREN_PARALLIZATION_THRESHOLD: usize = 10000;

let len = new_follower_ids.len();
if len >= MIN_CHILDREN_FOR_PARALLEL {
if len >= CONNECT_CHILDREN_PARALLIZATION_THRESHOLD {
let new_follower_ids = new_follower_ids.into_vec();
let chunk_size = good_chunk_size(len);
let _ = scope_and_block(len.div_ceil(chunk_size), |scope| {
Expand Down
22 changes: 22 additions & 0 deletions turbopack/crates/turbo-tasks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ pub struct IntoChunks<T> {
chunk_size: usize,
}

impl<T> IntoChunks<T> {
pub fn len(&self) -> usize {
(self.data.len() - self.index).div_ceil(self.chunk_size)
}

pub fn is_empty(&self) -> bool {
self.index >= self.data.len()
}
}

impl<T> Iterator for IntoChunks<T> {
type Item = Chunk<T>;

Expand Down Expand Up @@ -414,6 +424,18 @@ impl<T> Drop for Chunk<T> {
mod tests {
use super::*;

#[test]
fn test_into_chunks_len() {
assert_eq!(into_chunks::<i32>(vec![], 2).len(), 0);
assert_eq!(into_chunks(vec![1], 2).len(), 1);
assert_eq!(into_chunks(vec![1, 2], 2).len(), 1);
assert_eq!(into_chunks(vec![1, 2, 3], 2).len(), 2);
assert_eq!(into_chunks(vec![1, 2, 3, 4], 2).len(), 2);
assert_eq!(into_chunks(vec![1, 2, 3, 4, 5], 2).len(), 3);
assert_eq!(into_chunks(vec![1, 2, 3, 4, 5, 6], 2).len(), 3);
assert_eq!(into_chunks(vec![1, 2, 3, 4, 5, 6, 7], 2).len(), 4);
}

#[test]
fn test_chunk_iterator() {
let data = [(); 10]
Expand Down
Loading