Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,14 +592,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.set_active_until_clean();
if ctx.should_track_activeness() {
// A newly added Activeness need to make sure to schedule the tasks
task_ids_to_schedule = get_many!(
task,
AggregatedDirtyContainer {
task
} count if count.get(self.session_id) > 0 => {
task
}
);
task_ids_to_schedule = task.dirty_containers(self.session_id).collect();
task_ids_to_schedule.push(task_id);
}
get!(task, Activeness).unwrap()
Expand Down Expand Up @@ -660,16 +653,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
"{task_id} {task_description}{count} (aggr={aggregation_number}, \
{in_progress}, {activeness}{is_dirty})",
);
let children: Vec<_> = iter_many!(
task,
AggregatedDirtyContainer {
task
} count => {
(task, count.get(ctx.session_id()))
}
)
.filter(|(_, count)| *count > 0)
.collect();
let children: Vec<_> =
task.dirty_containers_with_count(ctx.session_id()).collect();
drop(task);

if missing_upper {
Expand Down Expand Up @@ -2985,9 +2970,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

let is_dirty = task.is_dirty(self.session_id);
let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
.is_some_and(|count| count.get(self.session_id) > 0);
let is_dirty = get!(task, Dirty).is_some();
let has_dirty_container =
get!(task, AggregatedDirtyContainerCount).is_some_and(|count| count.count > 0);
let should_be_in_upper = is_dirty || has_dirty_container;

let aggregation_number = get_aggregation_number(&task);
Expand All @@ -3012,7 +2997,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
for upper_id in uppers {
let task = ctx.task(task_id, TaskDataCategory::All);
let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
.is_some_and(|dirty| dirty.get(self.session_id) > 0);
.is_some_and(|&dirty| dirty > 0);
if !in_upper {
panic!(
"Task {} ({}) is dirty, but is not listed in the upper task {} \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use crate::{
backend::{
TaskDataCategory, get_mut, get_mut_or_insert_with,
operation::{ExecuteContext, Operation, TaskGuard, invalidate::make_task_dirty},
storage::{count, get, get_many, iter_many, remove, update, update_count},
storage::{
count, get, get_many, iter_many, remove, update, update_count, update_count_and_get,
},
},
data::{
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
Expand Down Expand Up @@ -271,23 +273,23 @@ impl AggregatedDataUpdate {
collectibles_update.push((collectible, 1));
}
}
if let Some((dirtyness, clean_in_session)) = task.dirtyness_and_session() {
dirty_container_count.update_with_dirtyness_and_session(dirtyness, clean_in_session);
let mut dirty_count = dirty_container_count.count;
let mut current_session_clean_count =
dirty_container_count.current_session_clean(current_session_id);
let (dirty, current_session_clean) = task.dirty(current_session_id);
if dirty {
dirty_count += 1;
}
if current_session_clean {
current_session_clean_count += 1;
}

let mut result = Self::new().collectibles_update(collectibles_update);
if !dirty_container_count.is_zero() {
if dirty_count > 0 {
result = result.dirty_container_update(
task.id(),
if dirty_container_count.count > 0 {
1
} else {
0
},
if dirty_container_count.count > 0
&& dirty_container_count.current_session_clean(current_session_id)
>= dirty_container_count.count
{
if dirty_count > 0 { 1 } else { 0 },
if dirty_count > 0 && dirty_count - current_session_clean_count <= 0 {
1
} else {
0
Expand Down Expand Up @@ -322,39 +324,105 @@ impl AggregatedDataUpdate {
should_track_activeness: bool,
queue: &mut AggregationUpdateQueue,
) -> AggregatedDataUpdate {
fn before_after_to_diff_value(before: bool, after: bool) -> i32 {
match (before, after) {
(true, false) => -1,
(false, true) => 1,
_ => 0,
}
}

let Self {
dirty_container_update,
collectibles_update,
} = self;
let mut result = Self::default();
if let &Some((dirty_container_id, count, session_dependent_clean_update)) =
if let &Some((dirty_container_id, count, current_session_clean_update)) =
dirty_container_update
{
if should_track_activeness {
// When a dirty container count is increased and the task is considered as active
// we need to schedule the dirty tasks in the new dirty container
let current_session_update = count - *session_dependent_clean_update;
let current_session_update = count - *current_session_clean_update;
if current_session_update > 0 && task.has_key(&CachedDataItemKey::Activeness {}) {
queue.push_find_and_schedule_dirty(dirty_container_id)
}
}

let mut aggregated_update = DirtyContainerCount::default();
update!(
task,
AggregatedDirtyContainer {
task: dirty_container_id
},
|old: Option<DirtyContainerCount>| {
let mut new = old.unwrap_or_default();
aggregated_update =
new.update_count(&DirtyContainerCount::from_current_session_clean(
count,
current_session_id,
*session_dependent_clean_update,
));
(!new.is_zero()).then_some(new)
}
// Update AggregatedDirtyContainer and compute aggregated update
let mut dirty_container_count_update = 0;
let old_dirty_single_container_count;
let new_dirty_single_container_count;
if count != 0 {
new_dirty_single_container_count = update_count_and_get!(
task,
AggregatedDirtyContainer {
task: dirty_container_id
},
count
);
old_dirty_single_container_count = new_dirty_single_container_count - count;
dirty_container_count_update = before_after_to_diff_value(
old_dirty_single_container_count > 0,
new_dirty_single_container_count > 0,
);
} else {
new_dirty_single_container_count = get!(
task,
AggregatedDirtyContainer {
task: dirty_container_id
}
)
.copied()
.unwrap_or_default();
old_dirty_single_container_count = new_dirty_single_container_count;
}

// Update AggregatedSessionDependentCleanContainer
let old_single_container_current_session_clean_count;
let new_single_container_current_session_clean_count;
if *current_session_clean_update != 0 {
new_single_container_current_session_clean_count = update_count_and_get!(
task,
AggregatedSessionDependentCleanContainer {
task: dirty_container_id,
session_id: current_session_id
},
*current_session_clean_update
);
old_single_container_current_session_clean_count =
new_single_container_current_session_clean_count
- *current_session_clean_update;
} else {
new_single_container_current_session_clean_count = get!(
task,
AggregatedSessionDependentCleanContainer {
task: dirty_container_id,
session_id: current_session_id
}
)
.copied()
.unwrap_or_default();
old_single_container_current_session_clean_count =
new_single_container_current_session_clean_count;
}

// compute aggregated update
let was_single_container_clean = old_dirty_single_container_count > 0
&& old_dirty_single_container_count
- old_single_container_current_session_clean_count
<= 0;
let is_single_container_clean = new_dirty_single_container_count > 0
&& new_dirty_single_container_count
- new_single_container_current_session_clean_count
<= 0;
let current_session_clean_update =
before_after_to_diff_value(was_single_container_clean, is_single_container_clean);

let aggregated_update = DirtyContainerCount::from_current_session_clean(
dirty_container_count_update,
current_session_id,
current_session_clean_update,
);

if !aggregated_update.is_zero() {
Expand Down Expand Up @@ -1273,7 +1341,7 @@ impl AggregationUpdateQueue {
// this would already be scheduled by the `Activeness`
let is_active_until_clean = get!(task, Activeness).is_some_and(|a| a.active_until_clean);
if !is_active_until_clean {
let mut dirty_containers = iter_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task).peekable();
let mut dirty_containers = task.dirty_containers(session_id).peekable();
let is_empty = dirty_containers.peek().is_none();
if !is_empty || dirty {
self.extend_find_and_schedule_dirty(dirty_containers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
backing_storage::BackingStorage,
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue,
CachedDataItemValueRef, CachedDataItemValueRefMut, Dirtyness,
CachedDataItemValueRef, CachedDataItemValueRefMut, DirtyContainerCount, Dirtyness,
},
};

Expand Down Expand Up @@ -430,6 +430,50 @@ pub trait TaskGuard: Debug {
)),
}
}
/// Returns (is_dirty, is_clean_in_current_session)
fn dirty(&self, session_id: SessionId) -> (bool, bool) {
match get!(self, Dirty) {
None => (false, false),
Some(Dirtyness::Dirty) => (true, false),
Some(Dirtyness::SessionDependent) => (
true,
get!(self, CleanInSession).copied() == Some(session_id),
),
}
}
fn dirty_containers(&self, session_id: SessionId) -> impl Iterator<Item = TaskId> {
self.dirty_containers_with_count(session_id)
.map(|(task_id, _)| task_id)
}
fn dirty_containers_with_count(
&self,
session_id: SessionId,
) -> impl Iterator<Item = (TaskId, i32)> {
iter_many!(self, AggregatedDirtyContainer { task } count => (task, *count)).filter(
move |&(task_id, count)| {
if count > 0 {
let clean_count = get!(
self,
AggregatedSessionDependentCleanContainer {
task: task_id,
session_id
}
)
.copied()
.unwrap_or_default();
count > clean_count
} else {
false
}
},
)
}

fn dirty_container_count(&self) -> DirtyContainerCount {
get!(self, AggregatedDirtyContainerCount)
.cloned()
.unwrap_or_default()
}
}

pub struct TaskGuardImpl<'a, B: BackingStorage> {
Expand Down
33 changes: 33 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,38 @@ macro_rules! update_count {
};
}

macro_rules! update_count_and_get {
($task:ident, $key:ident $input:tt, -$update:expr) => {{
let update = $update;
let mut new = 0;
$crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
let old = old.unwrap_or(0);
new = old - update;
(new != 0).then_some(new)
});
new
}};
($task:ident, $key:ident $input:tt, $update:expr) => {
match $update {
update => {
let mut new = 0;
$crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
let old = old.unwrap_or(0);
new = old + update;
(new != 0).then_some(new)
});
new
}
}
};
($task:ident, $key:ident, -$update:expr) => {
$crate::backend::storage::update_count_and_get!($task, $key {}, -$update)
};
($task:ident, $key:ident, $update:expr) => {
$crate::backend::storage::update_count_and_get!($task, $key {}, $update)
};
}

macro_rules! remove {
($task:ident, $key:ident $input:tt) => {{
#[allow(unused_imports)]
Expand All @@ -1122,6 +1154,7 @@ pub(crate) use iter_many;
pub(crate) use remove;
pub(crate) use update;
pub(crate) use update_count;
pub(crate) use update_count_and_get;

pub struct SnapshotGuard<'l> {
storage: &'l Storage,
Expand Down
Loading
Loading