Skip to content

Commit 23f63c9

Browse files
committed
Split AggregatedDirtyContainer
1 parent 47f1762 commit 23f63c9

File tree

5 files changed

+168
-42
lines changed

5 files changed

+168
-42
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -582,14 +582,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
582582
.set_active_until_clean();
583583
if ctx.should_track_activeness() {
584584
// A newly added Activeness need to make sure to schedule the tasks
585-
task_ids_to_schedule = get_many!(
586-
task,
587-
AggregatedDirtyContainer {
588-
task
589-
} count if count.get(self.session_id) > 0 => {
590-
task
591-
}
592-
);
585+
task_ids_to_schedule = task.dirty_containers(self.session_id).collect();
593586
task_ids_to_schedule.push(task_id);
594587
}
595588
get!(task, Activeness).unwrap()
@@ -650,16 +643,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
650643
"{task_id} {task_description}{count} (aggr={aggregation_number}, \
651644
{in_progress}, {activeness}{is_dirty})",
652645
);
653-
let children: Vec<_> = iter_many!(
654-
task,
655-
AggregatedDirtyContainer {
656-
task
657-
} count => {
658-
(task, count.get(ctx.session_id()))
659-
}
660-
)
661-
.filter(|(_, count)| *count > 0)
662-
.collect();
646+
let children: Vec<_> =
647+
task.dirty_containers_with_count(ctx.session_id()).collect();
663648
drop(task);
664649

665650
if missing_upper {
@@ -2927,9 +2912,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
29272912
}
29282913
}
29292914

2930-
let is_dirty = task.is_dirty(self.session_id);
2931-
let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2932-
.is_some_and(|count| count.get(self.session_id) > 0);
2915+
let is_dirty = get!(task, Dirty).is_some();
2916+
let has_dirty_container =
2917+
get!(task, AggregatedDirtyContainerCount).is_some_and(|count| count.count > 0);
29332918
let should_be_in_upper = is_dirty || has_dirty_container;
29342919

29352920
let aggregation_number = get_aggregation_number(&task);
@@ -2954,7 +2939,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
29542939
for upper_id in uppers {
29552940
let task = ctx.task(task_id, TaskDataCategory::All);
29562941
let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2957-
.is_some_and(|dirty| dirty.get(self.session_id) > 0);
2942+
.is_some_and(|&dirty| dirty > 0);
29582943
if !in_upper {
29592944
panic!(
29602945
"Task {} ({}) is dirty, but is not listed in the upper task {} \

turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs

Lines changed: 87 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ use crate::{
2828
backend::{
2929
TaskDataCategory, get_mut, get_mut_or_insert_with,
3030
operation::{ExecuteContext, Operation, TaskGuard, invalidate::make_task_dirty},
31-
storage::{count, get, get_many, iter_many, remove, update, update_count},
31+
storage::{
32+
count, get, get_many, iter_many, remove, update, update_count, update_count_and_get,
33+
},
3234
},
3335
data::{
3436
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
@@ -327,34 +329,100 @@ impl AggregatedDataUpdate {
327329
collectibles_update,
328330
} = self;
329331
let mut result = Self::default();
330-
if let &Some((dirty_container_id, count, session_dependent_clean_update)) =
332+
if let &Some((dirty_container_id, count, current_session_clean_update)) =
331333
dirty_container_update
332334
{
333335
if should_track_activeness {
334336
// When a dirty container count is increased and the task is considered as active
335337
// we need to schedule the dirty tasks in the new dirty container
336-
let current_session_update = count - *session_dependent_clean_update;
338+
let current_session_update = count - *current_session_clean_update;
337339
if current_session_update > 0 && task.has_key(&CachedDataItemKey::Activeness {}) {
338340
queue.push_find_and_schedule_dirty(dirty_container_id)
339341
}
340342
}
341343

342-
let mut aggregated_update = DirtyContainerCount::default();
343-
update!(
344-
task,
345-
AggregatedDirtyContainer {
346-
task: dirty_container_id
347-
},
348-
|old: Option<DirtyContainerCount>| {
349-
let mut new = old.unwrap_or_default();
350-
aggregated_update =
351-
new.update_count(&DirtyContainerCount::from_current_session_clean(
352-
count,
353-
current_session_id,
354-
*session_dependent_clean_update,
355-
));
356-
(!new.is_zero()).then_some(new)
344+
let mut aggregated_count_update = 0;
345+
let mut aggregated_current_session_clean_update = 0;
346+
let old_aggregated_dirty_container_count;
347+
let new_aggregated_dirty_container_count;
348+
let old_current_session_clean_count;
349+
let new_current_session_clean_count;
350+
351+
if count != 0 {
352+
new_aggregated_dirty_container_count = update_count_and_get!(
353+
task,
354+
AggregatedDirtyContainer {
355+
task: dirty_container_id
356+
},
357+
count
358+
);
359+
old_aggregated_dirty_container_count = new_aggregated_dirty_container_count - count;
360+
match (
361+
old_aggregated_dirty_container_count > 0,
362+
new_aggregated_dirty_container_count > 0,
363+
) {
364+
(true, false) => {
365+
aggregated_count_update = -1;
366+
}
367+
(false, true) => {
368+
aggregated_count_update = 1;
369+
}
370+
_ => {}
371+
}
372+
} else {
373+
new_aggregated_dirty_container_count = get!(
374+
task,
375+
AggregatedDirtyContainer {
376+
task: dirty_container_id
377+
}
378+
)
379+
.copied()
380+
.unwrap_or_default();
381+
old_aggregated_dirty_container_count = new_aggregated_dirty_container_count;
382+
}
383+
384+
if *current_session_clean_update != 0 {
385+
new_current_session_clean_count = update_count_and_get!(
386+
task,
387+
AggregatedSessionDependentCleanContainer {
388+
task: dirty_container_id,
389+
session_id: current_session_id
390+
},
391+
*current_session_clean_update
392+
);
393+
old_current_session_clean_count =
394+
new_current_session_clean_count - *current_session_clean_update;
395+
} else {
396+
new_current_session_clean_count = get!(
397+
task,
398+
AggregatedSessionDependentCleanContainer {
399+
task: dirty_container_id,
400+
session_id: current_session_id
401+
}
402+
)
403+
.copied()
404+
.unwrap_or_default();
405+
old_current_session_clean_count = new_current_session_clean_count;
406+
}
407+
408+
let was_clean = old_aggregated_dirty_container_count > 0
409+
&& old_aggregated_dirty_container_count - old_current_session_clean_count <= 0;
410+
let is_clean = new_aggregated_dirty_container_count > 0
411+
&& new_aggregated_dirty_container_count - new_current_session_clean_count <= 0;
412+
match (was_clean, is_clean) {
413+
(true, false) => {
414+
aggregated_current_session_clean_update = 1;
415+
}
416+
(false, true) => {
417+
aggregated_current_session_clean_update = -1;
357418
}
419+
_ => {}
420+
}
421+
422+
let aggregated_update = DirtyContainerCount::from_current_session_clean(
423+
aggregated_count_update,
424+
current_session_id,
425+
aggregated_current_session_clean_update,
358426
);
359427

360428
if !aggregated_update.is_zero() {
@@ -1273,7 +1341,7 @@ impl AggregationUpdateQueue {
12731341
// this would already be scheduled by the `Activeness`
12741342
let is_active_until_clean = get!(task, Activeness).is_some_and(|a| a.active_until_clean);
12751343
if !is_active_until_clean {
1276-
let mut dirty_containers = iter_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task).peekable();
1344+
let mut dirty_containers = task.dirty_containers(session_id).peekable();
12771345
let is_empty = dirty_containers.peek().is_none();
12781346
if !is_empty || dirty {
12791347
self.extend_find_and_schedule_dirty(dirty_containers);

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,33 @@ pub trait TaskGuard: Debug {
430430
)),
431431
}
432432
}
433+
fn dirty_containers(&self, session_id: SessionId) -> impl Iterator<Item = TaskId> {
434+
self.dirty_containers_with_count(session_id)
435+
.map(|(task_id, _)| task_id)
436+
}
437+
fn dirty_containers_with_count(
438+
&self,
439+
session_id: SessionId,
440+
) -> impl Iterator<Item = (TaskId, i32)> {
441+
iter_many!(self, AggregatedDirtyContainer { task } count => (task, *count)).filter(
442+
move |&(task_id, count)| {
443+
if count > 0 {
444+
let clean_count = get!(
445+
self,
446+
AggregatedSessionDependentCleanContainer {
447+
task: task_id,
448+
session_id
449+
}
450+
)
451+
.copied()
452+
.unwrap_or_default();
453+
count > clean_count
454+
} else {
455+
false
456+
}
457+
},
458+
)
459+
}
433460
}
434461

435462
pub struct TaskGuardImpl<'a, B: BackingStorage> {

turbopack/crates/turbo-tasks-backend/src/backend/storage.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,37 @@ macro_rules! update_count {
10961096
};
10971097
}
10981098

1099+
macro_rules! update_count_and_get {
1100+
($task:ident, $key:ident $input:tt, -$update:expr) => {{
1101+
let update = $update;
1102+
let mut new = 0;
1103+
$crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1104+
let old = old.unwrap_or(0);
1105+
new = old - update;
1106+
(new != 0).then_some(new)
1107+
});
1108+
new
1109+
}};
1110+
($task:ident, $key:ident $input:tt, $update:expr) => {
1111+
match $update {
1112+
update => {
1113+
let mut new = 0;
1114+
$crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1115+
let old = old.unwrap_or(0);
1116+
new = old + update;
1117+
(new != 0).then_some(new)
1118+
});
1119+
new
1120+
}
1121+
}
1122+
};
1123+
($task:ident, $key:ident, -$update:expr) => {
1124+
$crate::backend::storage::update_count_and_get!($task, $key {}, -$update)
1125+
}; ($task:ident, $key:ident, $update:expr) => {
1126+
$crate::backend::storage::update_count_and_get!($task, $key {}, $update)
1127+
};
1128+
}
1129+
10991130
macro_rules! remove {
11001131
($task:ident, $key:ident $input:tt) => {{
11011132
#[allow(unused_imports)]
@@ -1122,6 +1153,7 @@ pub(crate) use iter_many;
11221153
pub(crate) use remove;
11231154
pub(crate) use update;
11241155
pub(crate) use update_count;
1156+
pub(crate) use update_count_and_get;
11251157

11261158
pub struct SnapshotGuard<'l> {
11271159
storage: &'l Storage,

turbopack/crates/turbo-tasks-backend/src/data.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,12 @@ pub enum CachedDataItem {
653653
// Aggregated Data
654654
AggregatedDirtyContainer {
655655
task: TaskId,
656-
value: DirtyContainerCount,
656+
value: i32,
657+
},
658+
AggregatedSessionDependentCleanContainer {
659+
task: TaskId,
660+
session_id: SessionId,
661+
value: i32,
657662
},
658663
AggregatedCollectible {
659664
collectible: CollectibleRef,
@@ -734,6 +739,9 @@ impl CachedDataItem {
734739
CachedDataItem::Follower { task, .. } => !task.is_transient(),
735740
CachedDataItem::Upper { task, .. } => !task.is_transient(),
736741
CachedDataItem::AggregatedDirtyContainer { task, .. } => !task.is_transient(),
742+
CachedDataItem::AggregatedSessionDependentCleanContainer { task, .. } => {
743+
!task.is_transient()
744+
}
737745
CachedDataItem::AggregatedCollectible { collectible, .. } => {
738746
!collectible.cell.task.is_transient()
739747
}
@@ -808,6 +816,7 @@ impl CachedDataItem {
808816
| Self::Child { .. }
809817
| Self::Upper { .. }
810818
| Self::AggregatedDirtyContainer { .. }
819+
| Self::AggregatedSessionDependentCleanContainer { .. }
811820
| Self::AggregatedCollectible { .. }
812821
| Self::AggregatedDirtyContainerCount { .. }
813822
| Self::Stateful { .. }
@@ -852,6 +861,9 @@ impl CachedDataItemKey {
852861
CachedDataItemKey::Follower { task, .. } => !task.is_transient(),
853862
CachedDataItemKey::Upper { task, .. } => !task.is_transient(),
854863
CachedDataItemKey::AggregatedDirtyContainer { task, .. } => !task.is_transient(),
864+
CachedDataItemKey::AggregatedSessionDependentCleanContainer { task, .. } => {
865+
!task.is_transient()
866+
}
855867
CachedDataItemKey::AggregatedCollectible { collectible, .. } => {
856868
!collectible.cell.task.is_transient()
857869
}
@@ -894,6 +906,7 @@ impl CachedDataItemType {
894906
| Self::Child { .. }
895907
| Self::Upper { .. }
896908
| Self::AggregatedDirtyContainer { .. }
909+
| Self::AggregatedSessionDependentCleanContainer { .. }
897910
| Self::AggregatedCollectible { .. }
898911
| Self::AggregatedDirtyContainerCount { .. }
899912
| Self::Stateful { .. }
@@ -930,6 +943,7 @@ impl CachedDataItemType {
930943
| Self::Follower
931944
| Self::Upper
932945
| Self::AggregatedDirtyContainer
946+
| Self::AggregatedSessionDependentCleanContainer
933947
| Self::AggregatedCollectible
934948
| Self::AggregatedDirtyContainerCount
935949
| Self::Stateful

0 commit comments

Comments
 (0)