Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,14 @@ pub fn project_new(
}
let mut compress = Compression::None;
if let Some(mut trace) = trace {
let internal_dir = PathBuf::from(&options.root_path)
.join(&options.project_path)
.join(&options.dist_dir);
let trace_file = internal_dir.join("trace-turbopack");

println!("Turbopack tracing enabled with targets: {trace}");
println!(" Note that this might have a small performance impact.");
println!(" Trace output will be written to {}", trace_file.display());

trace = trace
.split(",")
Expand Down Expand Up @@ -441,27 +447,20 @@ pub fn project_new(

let subscriber = subscriber.with(FilterLayer::try_new(&trace).unwrap());

let internal_dir = PathBuf::from(&options.root_path)
.join(&options.project_path)
.join(&options.dist_dir);
std::fs::create_dir_all(&internal_dir)
.context("Unable to create .next directory")
.unwrap();
let trace_file;
let (trace_writer, trace_writer_guard) = match compress {
Compression::None => {
trace_file = internal_dir.join("trace-turbopack");
let trace_writer = std::fs::File::create(trace_file.clone()).unwrap();
TraceWriter::new(trace_writer)
}
Compression::GzipFast => {
trace_file = internal_dir.join("trace-turbopack");
let trace_writer = std::fs::File::create(trace_file.clone()).unwrap();
let trace_writer = GzEncoder::new(trace_writer, flate2::Compression::fast());
TraceWriter::new(trace_writer)
}
Compression::GzipBest => {
trace_file = internal_dir.join("trace-turbopack");
let trace_writer = std::fs::File::create(trace_file.clone()).unwrap();
let trace_writer = GzEncoder::new(trace_writer, flate2::Compression::best());
TraceWriter::new(trace_writer)
Expand Down
2 changes: 2 additions & 0 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {

new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);

let sync_span = tracing::info_span!("sync new files").entered();
let mut new_meta_files = self
.parallel_scheduler
.parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
Expand All @@ -511,6 +512,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
}
anyhow::Ok(())
})?;
drop(sync_span);

let new_meta_info = new_meta_files
.iter()
Expand Down
107 changes: 60 additions & 47 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use parking_lot::{Condvar, Mutex};
use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
use smallvec::{SmallVec, smallvec};
use tokio::time::{Duration, Instant};
use tracing::{Span, info_span, trace_span};
use tracing::{Span, trace_span};
use turbo_tasks::{
CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
ReadOutputOptions, ReadTracking, SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId,
Expand Down Expand Up @@ -1002,26 +1002,38 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

fn snapshot(&self) -> Option<(Instant, bool)> {
fn snapshot_and_persist(
&self,
parent_span: Option<tracing::Id>,
reason: &str,
) -> Option<(Instant, bool)> {
let snapshot_span =
tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
.entered();
let start = Instant::now();
debug_assert!(self.should_persist());
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = true;
let active_operations = self
.in_progress_operations
.fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
if active_operations != 0 {
self.operations_suspended
.wait_while(&mut snapshot_request, |_| {
self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
});

let suspended_operations;
{
let _span = tracing::info_span!("blocking").entered();
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = true;
let active_operations = self
.in_progress_operations
.fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
if active_operations != 0 {
self.operations_suspended
.wait_while(&mut snapshot_request, |_| {
self.in_progress_operations.load(Ordering::Relaxed)
!= SNAPSHOT_REQUESTED_BIT
});
}
suspended_operations = snapshot_request
.suspended_operations
.iter()
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
}
let suspended_operations = snapshot_request
.suspended_operations
.iter()
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
drop(snapshot_request);
self.storage.start_snapshot();
let mut persisted_task_cache_log = self
.persisted_task_cache_log
Expand Down Expand Up @@ -1105,11 +1117,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
)
};

let snapshot = {
let _span = tracing::trace_span!("take snapshot");
self.storage
.take_snapshot(&preprocess, &process, &process_snapshot)
};
let snapshot = self
.storage
.take_snapshot(&preprocess, &process, &process_snapshot);

#[cfg(feature = "print_cache_item_size")]
#[derive(Default)]
Expand Down Expand Up @@ -1195,10 +1205,14 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());

let mut new_items = false;
drop(snapshot_span);

if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
new_items = true;
if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
return Some((snapshot_time, false));
}

let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
{
if let Err(err) = self.backing_storage.save_snapshot(
self.session_id,
suspended_operations,
Expand Down Expand Up @@ -1240,18 +1254,16 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

if new_items {
let elapsed = start.elapsed();
// avoid spamming the event queue with information about fast operations
if elapsed > Duration::from_secs(10) {
turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
"Finished writing to filesystem cache".to_string(),
elapsed,
)));
}
let elapsed = start.elapsed();
// avoid spamming the event queue with information about fast operations
if elapsed > Duration::from_secs(10) {
turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
"Finished writing to filesystem cache".to_string(),
elapsed,
)));
}

Some((snapshot_time, new_items))
Some((snapshot_time, true))
}

fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
Expand All @@ -1273,7 +1285,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

if self.should_persist() {
// Schedule the snapshot job
let _span = Span::none().entered();
let _span = trace_span!("persisting background job").entered();
let _span = tracing::info_span!("thread").entered();
Comment on lines +1288 to +1289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double span?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread is a special span where self time is hidden. So that's intentional.

turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
}
}
Expand All @@ -1291,8 +1304,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
self.verify_aggregation_graph(turbo_tasks, false);
}
if self.should_persist() {
let _span = tracing::info_span!("persist on stop").entered();
self.snapshot();
self.snapshot_and_persist(Span::current().into(), "stop");
}
self.task_cache.drop_contents();
drop_contents(&self.transient_tasks);
Expand Down Expand Up @@ -2477,6 +2489,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
let mut idle_start_listener = self.idle_start_event.listen();
let mut idle_end_listener = self.idle_end_event.listen();
let mut fresh_idle = true;
loop {
const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
Expand All @@ -2494,9 +2509,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if self.stopping.load(Ordering::Acquire) {
return;
}
let mut idle_start_listener = self.idle_start_event.listen();
let mut idle_end_listener = self.idle_end_event.listen();
let mut idle_time = if turbo_tasks.is_idle() {
let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
Instant::now() + idle_timeout
} else {
far_future()
Expand All @@ -2507,6 +2520,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
return;
},
_ = &mut idle_start_listener => {
fresh_idle = true;
idle_time = Instant::now() + idle_timeout;
idle_start_listener = self.idle_start_event.listen()
},
Expand All @@ -2527,12 +2541,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

let _span = info_span!("persist", reason = reason).entered();
let this = self.clone();
let snapshot = this.snapshot();
let snapshot = this.snapshot_and_persist(None, reason);
if let Some((snapshot_start, new_data)) = snapshot {
last_snapshot = snapshot_start;
if new_data {
if !new_data {
fresh_idle = false;
continue;
}
let last_snapshot = last_snapshot.duration_since(self.start_time);
Expand All @@ -2541,7 +2555,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
Ordering::Relaxed,
);

let _span = Span::none().entered();
turbo_tasks.schedule_backend_background_job(
TurboTasksBackendJob::FollowUpSnapshot,
);
Expand All @@ -2550,7 +2563,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}
TurboTasksBackendJob::Prefetch { data, range } => {
let range = if let Some(range) = range {
let range: Range<usize> = if let Some(range) = range {
range
} else {
if data.len() > 128 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
> + Send
+ Sync,
{
let _span = tracing::trace_span!("save snapshot", session_id = ?session_id, operations = operations.len());
let _span = tracing::info_span!("save snapshot", session_id = ?session_id, operations = operations.len()).entered();
let mut batch = self.inner.database.write_batch()?;

// Start organizing the updates in parallel
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbopack-trace-server/src/span_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl<'a> SpanRef<'a> {
.and_modify(|_, v| v.push(span.index()))
.or_insert_with(|| (format!("{name}={value}"), vec![span.index()]));
}
if !span.is_complete() && !span.time_data().ignore_self_time {
if !span.is_complete() && span.span.name != "thread" {
let name = "incomplete_span";
index
.raw_entry_mut()
Expand Down
2 changes: 2 additions & 0 deletions turbopack/crates/turbopack-trace-utils/src/tracing_presets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub static TRACING_OVERVIEW_TARGETS: Lazy<Vec<&str>> = Lazy::new(|| {
"turbo_tasks=info",
"turbo_tasks_fs=info",
"turbo_tasks_fetch=info",
"turbo_tasks_backend=info",
"turbo_persistence=info",
"turbopack=info",
"turbopack_binding=info",
"turbopack_browser=info",
Expand Down
Loading