Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2431,7 +2431,11 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
AggregationUpdateJob::data_update(
&mut task,
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
AggregatedDataUpdate::new().dirty_container_update(
task_id,
aggregated_update.count,
aggregated_update.current_session_clean(ctx.session_id()),
),
)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
hash::Hash,
mem::take,
num::NonZeroU32,
ops::ControlFlow,
ops::{ControlFlow, Deref},
thread::yield_now,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -216,19 +216,41 @@ impl AggregationUpdateJob {
}
}

#[derive(Default, Serialize, Deserialize, Clone, Copy, Debug)]
pub struct SessionDependent<T> {
#[serde(skip, default)]
pub value: T,
}

impl<T> SessionDependent<T> {
pub fn new(value: T) -> Self {
Self { value }
}
}

impl<T> Deref for SessionDependent<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.value
}
}

/// Aggregated data update.
#[derive(Default, Serialize, Deserialize, Clone, Debug)]
pub struct AggregatedDataUpdate {
/// One of the inner tasks has changed its dirty state or aggregated dirty state.
dirty_container_update: Option<(TaskId, DirtyContainerCount)>,
/// (task id, dirty update, current session clean update)
// TODO Serialize the current session clean update as 0
dirty_container_update: Option<(TaskId, i32, SessionDependent<i32>)>,
/// One of the inner tasks has changed its collectibles count or aggregated collectibles count.
collectibles_update: Vec<(CollectibleRef, i32)>,
}

impl AggregatedDataUpdate {
/// Derives an `AggregatedDataUpdate` from a task. This is used when a task is connected to an
/// upper task.
fn from_task(task: &mut impl TaskGuard) -> Self {
fn from_task(task: &mut impl TaskGuard, current_session_id: SessionId) -> Self {
let aggregation = get_aggregation_number(task);
let mut dirty_container_count = Default::default();
let mut collectibles_update: Vec<_> =
Expand All @@ -255,15 +277,20 @@ impl AggregatedDataUpdate {

let mut result = Self::new().collectibles_update(collectibles_update);
if !dirty_container_count.is_zero() {
let DirtyContainerCount {
count,
count_in_session,
} = dirty_container_count;
result = result.dirty_container_update(
task.id(),
DirtyContainerCount {
count: if count > 0 { 1 } else { 0 },
count_in_session: count_in_session.map(|(s, c)| (s, if c > 0 { 1 } else { 0 })),
if dirty_container_count.count > 0 {
1
} else {
0
},
if dirty_container_count.count > 0
&& dirty_container_count.current_session_clean(current_session_id)
>= dirty_container_count.count
{
1
} else {
0
},
);
}
Expand All @@ -276,8 +303,9 @@ impl AggregatedDataUpdate {
dirty_container_update,
collectibles_update,
} = &mut self;
if let Some((_, value)) = dirty_container_update.as_mut() {
*value = value.negate()
if let Some((_, value, current_session_clean_update)) = dirty_container_update.as_mut() {
*value = -*value;
current_session_clean_update.value = -current_session_clean_update.value;
}
for (_, value) in collectibles_update.iter_mut() {
*value = -*value;
Expand All @@ -290,7 +318,7 @@ impl AggregatedDataUpdate {
fn apply(
&self,
task: &mut impl TaskGuard,
session_id: SessionId,
current_session_id: SessionId,
should_track_activeness: bool,
queue: &mut AggregationUpdateQueue,
) -> AggregatedDataUpdate {
Expand All @@ -299,25 +327,32 @@ impl AggregatedDataUpdate {
collectibles_update,
} = self;
let mut result = Self::default();
if let Some((dirty_container_id, count)) = dirty_container_update {
if let &Some((dirty_container_id, count, session_dependent_clean_update)) =
dirty_container_update
{
if should_track_activeness {
// When a dirty container count is increased and the task is considered as active
// we need to schedule the dirty tasks in the new dirty container
let current_session_update = count.get(session_id);
let current_session_update = count - *session_dependent_clean_update;
if current_session_update > 0 && task.has_key(&CachedDataItemKey::Activeness {}) {
queue.push_find_and_schedule_dirty(*dirty_container_id)
queue.push_find_and_schedule_dirty(dirty_container_id)
}
}

let mut aggregated_update = DirtyContainerCount::default();
update!(
task,
AggregatedDirtyContainer {
task: *dirty_container_id
task: dirty_container_id
},
|old: Option<DirtyContainerCount>| {
let mut new = old.unwrap_or_default();
aggregated_update = new.update_count(count);
aggregated_update =
new.update_count(&DirtyContainerCount::from_current_session_clean(
count,
current_session_id,
*session_dependent_clean_update,
));
(!new.is_zero()).then_some(new)
}
);
Expand All @@ -337,12 +372,18 @@ impl AggregatedDataUpdate {
new.undo_update_with_dirtyness_and_session(dirtyness, clean_in_session);
}
if !aggregated_update.is_zero() {
result.dirty_container_update = Some((task_id, aggregated_update));
result.dirty_container_update = Some((
task_id,
aggregated_update.count,
SessionDependent::new(
aggregated_update.current_session_clean(current_session_id),
),
));
}
(!new.is_zero()).then_some(new)
});
if let Some((_, count)) = result.dirty_container_update.as_ref()
&& count.get(session_id) < 0
if let Some((_, count, current_session_clean)) = result.dirty_container_update
&& count - *current_session_clean < 0
{
// When the current task is no longer dirty, we need to fire the
// aggregate root events and do some cleanup
Expand Down Expand Up @@ -421,8 +462,17 @@ impl AggregatedDataUpdate {
}

/// Adds a dirty container update to the update.
pub fn dirty_container_update(mut self, task_id: TaskId, count: DirtyContainerCount) -> Self {
self.dirty_container_update = Some((task_id, count));
pub fn dirty_container_update(
mut self,
task_id: TaskId,
count: i32,
current_session_clean_update: i32,
) -> Self {
self.dirty_container_update = Some((
task_id,
count,
SessionDependent::new(current_session_clean_update),
));
self
}

Expand Down Expand Up @@ -1041,7 +1091,7 @@ impl AggregationUpdateQueue {
}
// When this is a new inner node, update aggregated data and
// followers
let data = AggregatedDataUpdate::from_task(&mut task);
let data = AggregatedDataUpdate::from_task(&mut task, ctx.session_id());
let followers = get_followers(&task);
let diff = data.apply(
&mut upper,
Expand Down Expand Up @@ -1138,7 +1188,8 @@ impl AggregationUpdateQueue {

// Since this is no longer an inner node, update the aggregated data and
// followers
let data = AggregatedDataUpdate::from_task(&mut task).invert();
let data =
AggregatedDataUpdate::from_task(&mut task, ctx.session_id()).invert();
let followers = get_followers(&task);
let diff = data.apply(
&mut upper,
Expand Down Expand Up @@ -1311,7 +1362,8 @@ impl AggregationUpdateQueue {
follower_in_upper
});
if !removed_uppers.is_empty() {
let data = AggregatedDataUpdate::from_task(&mut follower).invert();
let data =
AggregatedDataUpdate::from_task(&mut follower, ctx.session_id()).invert();
let followers = get_followers(&follower);
drop(follower);

Expand Down Expand Up @@ -1473,7 +1525,8 @@ impl AggregationUpdateQueue {
Some(old - 1)
});
if remove_upper {
let data = AggregatedDataUpdate::from_task(&mut follower).invert();
let data =
AggregatedDataUpdate::from_task(&mut follower, ctx.session_id()).invert();
let followers = get_followers(&follower);
drop(follower);

Expand Down Expand Up @@ -1711,7 +1764,7 @@ impl AggregationUpdateQueue {
self.push_optimize_task(new_follower_id);
}

let data = AggregatedDataUpdate::from_task(&mut follower);
let data = AggregatedDataUpdate::from_task(&mut follower, ctx.session_id());
let children = get_followers(&follower);
drop(follower);

Expand Down Expand Up @@ -1882,7 +1935,7 @@ impl AggregationUpdateQueue {
}

// It's a new upper
let data = AggregatedDataUpdate::from_task(&mut inner);
let data = AggregatedDataUpdate::from_task(&mut inner, ctx.session_id());
let children = get_followers(&inner);
let follower_aggregation_number = get_aggregation_number(&inner);
drop(inner);
Expand Down Expand Up @@ -2094,7 +2147,7 @@ impl AggregationUpdateQueue {
self.push_optimize_task(new_follower_id);
}
// It's a new upper
let data = AggregatedDataUpdate::from_task(&mut inner);
let data = AggregatedDataUpdate::from_task(&mut inner, ctx.session_id());
let followers = get_followers(&inner);
drop(inner);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,11 @@ pub fn make_task_dirty_internal(
if !aggregated_update.is_zero() {
queue.extend(AggregationUpdateJob::data_update(
&mut task,
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
AggregatedDataUpdate::new().dirty_container_update(
task_id,
aggregated_update.count,
aggregated_update.current_session_clean(ctx.session_id()),
),
));
}
!ctx.should_track_activeness() || task.has_key(&CachedDataItemKey::Activeness {})
Expand Down
20 changes: 20 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,26 @@ pub struct DirtyContainerCount {
}

impl DirtyContainerCount {
pub fn from_current_session_clean(
count: i32,
current_session_id: SessionId,
current_session_clean: i32,
) -> DirtyContainerCount {
DirtyContainerCount {
count,
count_in_session: Some((current_session_id, count - current_session_clean)),
}
}

pub fn current_session_clean(&self, current_session_id: SessionId) -> i32 {
if let Some((s, c)) = self.count_in_session
&& s == current_session_id
{
return self.count - c;
}
0
}

/// Get the count for a specific session. It's only expected to be asked for the current
/// session, since old session counts might be dropped.
pub fn get(&self, session: SessionId) -> i32 {
Expand Down
Loading