Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 109 additions & 97 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use crate::{
operation::{
ExecuteContext, Operation, TaskGuard,
aggregation_update::{
AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
AggregationUpdateJob, AggregationUpdateQueue, ComputeDirtyAndCleanUpdate,
},
},
storage::{get, get_mut},
storage::{get, get_mut, remove},
},
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState,
CachedDataItem, CachedDataItemKey, CachedDataItemValue, Dirtyness, InProgressState,
InProgressStateInner,
},
};
Expand Down Expand Up @@ -232,15 +232,11 @@ pub fn make_task_dirty_internal(
*stale = true;
}
let old = task.insert(CachedDataItem::Dirty {
value: DirtyState {
clean_in_session: None,
},
value: Dirtyness::Dirty,
});
let mut dirty_container = match old {
let (old_self_dirty, old_current_session_self_clean) = match old {
Some(CachedDataItemValue::Dirty {
value: DirtyState {
clean_in_session: None,
},
value: Dirtyness::Dirty,
}) => {
#[cfg(feature = "trace_task_dirty")]
let _span = tracing::trace_span!(
Expand All @@ -254,26 +250,50 @@ pub fn make_task_dirty_internal(
return;
}
Some(CachedDataItemValue::Dirty {
value: DirtyState {
clean_in_session: Some(session_id),
},
value: Dirtyness::SessionDependent,
}) => {
// Got dirty in that one session only
let mut dirty_container = get!(task, AggregatedDirtyContainerCount)
.cloned()
.unwrap_or_default();
dirty_container.update_session_dependent(session_id, 1);
dirty_container
// It was a session-dependent dirty before, so we need to remove that clean count
let old = remove!(task, CleanInSession);
if let Some(session_id) = old
&& session_id == ctx.session_id()
{
// There was a clean count for a session. If it was the current session, we need to
// propagate that change.
(true, true)
} else {
#[cfg(feature = "trace_task_dirty")]
let _span = tracing::trace_span!(
"session-dependent task already dirty",
name = ctx.get_task_description(task_id),
cause = %TaskDirtyCauseInContext::new(&cause, ctx)
)
.entered();
// already dirty
return;
}
}
None => {
// Get dirty for all sessions
get!(task, AggregatedDirtyContainerCount)
.cloned()
.unwrap_or_default()
// It was clean before, so we need to increase the dirty count
(false, false)
}
_ => unreachable!(),
};

let new_self_dirty = true;
let new_current_session_self_clean = false;

let dirty_container_count = get!(task, AggregatedDirtyContainerCount)
.copied()
.unwrap_or_default();
let current_session_clean_container_count = get!(
task,
AggregatedSessionDependentCleanContainerCount {
session_id: ctx.session_id(),
}
)
.copied()
.unwrap_or_default();

#[cfg(feature = "trace_task_dirty")]
let _span = tracing::trace_span!(
"make task dirty",
Expand All @@ -283,18 +303,27 @@ pub fn make_task_dirty_internal(
)
.entered();

let should_schedule = {
let aggregated_update = dirty_container.update_with_dirty_state(&DirtyState {
clean_in_session: None,
});
if !aggregated_update.is_zero() {
queue.extend(AggregationUpdateJob::data_update(
&mut task,
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
));
}
!ctx.should_track_activeness() || task.has_key(&CachedDataItemKey::Activeness {})
};
let result = ComputeDirtyAndCleanUpdate {
old_dirty_container_count: dirty_container_count,
new_dirty_container_count: dirty_container_count,
old_current_session_clean_container_count: current_session_clean_container_count,
new_current_session_clean_container_count: current_session_clean_container_count,
old_self_dirty,
new_self_dirty,
old_current_session_self_clean,
new_current_session_self_clean,
}
.compute();

if let Some(aggregated_update) = result.aggregated_update(task_id) {
queue.extend(AggregationUpdateJob::data_update(
&mut task,
aggregated_update,
));
}

let should_schedule =
!ctx.should_track_activeness() || task.has_key(&CachedDataItemKey::Activeness {});

if should_schedule {
let description = || ctx.get_task_desc_fn(task_id);
Expand Down
77 changes: 73 additions & 4 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use crate::{
backend::{
OperationGuard, TaskDataCategory, TransientTask, TurboTasksBackend, TurboTasksBackendInner,
TurboTasksBackendJob,
storage::{SpecificTaskDataCategory, StorageWriteGuard, iter_many},
storage::{SpecificTaskDataCategory, StorageWriteGuard, get, iter_many},
},
backing_storage::BackingStorage,
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue,
CachedDataItemValueRef, CachedDataItemValueRefMut,
CachedDataItemValueRef, CachedDataItemValueRefMut, Dirtyness,
},
};

Expand Down Expand Up @@ -415,6 +415,75 @@ pub trait TaskGuard: Debug {
fn invalidate_serialization(&mut self);
fn prefetch(&mut self) -> Option<FxIndexMap<TaskId, bool>>;
fn is_immutable(&self) -> bool;
fn is_dirty(&self, session_id: SessionId) -> bool {
get!(self, Dirty).is_some_and(|dirtyness| match dirtyness {
Dirtyness::Dirty => true,
Dirtyness::SessionDependent => get!(self, CleanInSession).copied() != Some(session_id),
})
}
fn dirtyness_and_session(&self) -> Option<(Dirtyness, Option<SessionId>)> {
match get!(self, Dirty)? {
Dirtyness::Dirty => Some((Dirtyness::Dirty, None)),
Dirtyness::SessionDependent => Some((
Dirtyness::SessionDependent,
get!(self, CleanInSession).copied(),
)),
}
}
/// Returns (is_dirty, is_clean_in_current_session)
fn dirty(&self, session_id: SessionId) -> (bool, bool) {
match get!(self, Dirty) {
None => (false, false),
Some(Dirtyness::Dirty) => (true, false),
Some(Dirtyness::SessionDependent) => (
true,
get!(self, CleanInSession).copied() == Some(session_id),
),
}
}
fn dirty_containers(&self, session_id: SessionId) -> impl Iterator<Item = TaskId> {
self.dirty_containers_with_count(session_id)
.map(|(task_id, _)| task_id)
}
fn dirty_containers_with_count(
&self,
session_id: SessionId,
) -> impl Iterator<Item = (TaskId, i32)> {
iter_many!(self, AggregatedDirtyContainer { task } count => (task, *count)).filter(
move |&(task_id, count)| {
if count > 0 {
let clean_count = get!(
self,
AggregatedSessionDependentCleanContainer {
task: task_id,
session_id
}
)
.copied()
.unwrap_or_default();
count > clean_count
} else {
false
}
},
)
}

fn has_dirty_containers(&self, session_id: SessionId) -> bool {
let dirty_count = get!(self, AggregatedDirtyContainerCount)
.copied()
.unwrap_or_default();
if dirty_count <= 0 {
return false;
}
let clean_count = get!(
self,
AggregatedSessionDependentCleanContainerCount { session_id }
)
.copied()
.unwrap_or_default();
dirty_count > clean_count
}
}

pub struct TaskGuardImpl<'a, B: BackingStorage> {
Expand Down Expand Up @@ -740,8 +809,8 @@ impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue);
pub use self::invalidate::TaskDirtyCause;
pub use self::{
aggregation_update::{
AggregatedDataUpdate, AggregationUpdateJob, get_aggregation_number, get_uppers,
is_aggregating_node, is_root_node,
AggregatedDataUpdate, AggregationUpdateJob, ComputeDirtyAndCleanUpdate,
get_aggregation_number, get_uppers, is_aggregating_node, is_root_node,
},
cleanup_old_edges::OutdatedEdge,
connect_children::connect_children,
Expand Down
33 changes: 33 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,38 @@ macro_rules! update_count {
};
}

macro_rules! update_count_and_get {
($task:ident, $key:ident $input:tt, -$update:expr) => {{
let update = $update;
let mut new = 0;
$crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
let old = old.unwrap_or(0);
new = old - update;
(new != 0).then_some(new)
});
new
}};
($task:ident, $key:ident $input:tt, $update:expr) => {
match $update {
update => {
let mut new = 0;
$crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
let old = old.unwrap_or(0);
new = old + update;
(new != 0).then_some(new)
});
new
}
}
};
($task:ident, $key:ident, -$update:expr) => {
$crate::backend::storage::update_count_and_get!($task, $key {}, -$update)
};
($task:ident, $key:ident, $update:expr) => {
$crate::backend::storage::update_count_and_get!($task, $key {}, $update)
};
}

macro_rules! remove {
($task:ident, $key:ident $input:tt) => {{
#[allow(unused_imports)]
Expand All @@ -1122,6 +1154,7 @@ pub(crate) use iter_many;
pub(crate) use remove;
pub(crate) use update;
pub(crate) use update_count;
pub(crate) use update_count_and_get;

pub struct SnapshotGuard<'l> {
storage: &'l Storage,
Expand Down
Loading
Loading