From b7604aec6c21568fdca845dc6b1dca6132affdaa Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 22 May 2023 17:39:38 +0200 Subject: [PATCH] Initial tracing implementation (vercel/turbo#4966) ### Description * adds a raw_trace subscriber that will emit the raw trace events into `.turbopack/trace.log` (resp. `.next/trace.log` for next.js) * adds a CLI script which converts the raw trace into a chrome trace json file -> https://ui.perfetto.dev/ * adds a `TURBOPACK_TRACING` (resp. `NEXT_TURBOPACK_TRACING`) env var to enable tracing * adds some presets e. g. `turbopack` or `next` to enable tracing for certain things. * add tracing for invalidations There are three different visualization modes: #### `--single` Shows all cpu time as it would look like when a single cpu would execute the workload. (10 concurrent tasks that take 1s are shown as 10 tasks that take 1s with total time of 10s) Pro: * Concurrency is visualized by bar filling (dark filled bars -> too few concurrency) * It injects pseudo bars with "cpus idle" for low concurrency (with `--idle`) Con: * Total time won't be represented correctly, since a single CPU would take longer Use Case: Gives a good overview of slow tasks in a build. #### `--merged` Shows all cpu time scaled by the concurrency. (10 concurrent tasks that take 1s are shown as 10 tasks that take 0.1s with total time of 1s) Pro: * Total time is represented correctly * Low concurrent tasks are bigger * Concurrency is visualized by bar filling (dark filled bars -> too few concurrency) * It injects pseudo bars with "cpus idle" for low concurrency (with `--idle`) Con: * Individual tasks time won't be represented correctly. Use Case: Gives a good overview why a build is slow overall. #### `--threads` Shows cpu time distributed on infinite virtual cpus/threads. (10 concurrent tasks that take 1s are shown as 10 concurrent tasks that take 1s with total time of 1s) Pro: * Concurrency is shown via multiple CPU * Most realistic visualization Con: * Hard to read --- crates/turbo-tasks-fs/Cargo.toml | 1 + crates/turbo-tasks-fs/src/lib.rs | 2 + crates/turbo-tasks-macros/src/func.rs | 14 +- .../src/value_impl_macro.rs | 2 +- .../src/value_trait_macro.rs | 2 +- crates/turbo-tasks-memory/Cargo.toml | 1 + .../turbo-tasks-memory/src/memory_backend.rs | 11 +- crates/turbo-tasks/Cargo.toml | 1 + crates/turbo-tasks/src/duration_span.rs | 44 ++ .../turbo-tasks/src/graph/graph_traversal.rs | 10 +- crates/turbo-tasks/src/graph/visit.rs | 8 + crates/turbo-tasks/src/graph/with_future.rs | 22 +- crates/turbo-tasks/src/lib.rs | 4 +- crates/turbo-tasks/src/manager.rs | 96 ++- crates/turbo-tasks/src/util.rs | 30 + crates/turbopack-cli-utils/Cargo.toml | 7 + crates/turbopack-cli-utils/src/exit.rs | 29 + crates/turbopack-cli-utils/src/lib.rs | 6 + crates/turbopack-cli-utils/src/raw_trace.rs | 164 +++++ .../turbopack-cli-utils/src/trace_writer.rs | 103 +++ crates/turbopack-cli-utils/src/tracing.rs | 97 +++ .../src/tracing_presets.rs | 44 ++ crates/turbopack-cli/Cargo.toml | 4 +- crates/turbopack-cli/src/arguments.rs | 14 +- crates/turbopack-cli/src/main.rs | 65 +- crates/turbopack-convert-trace/Cargo.toml | 24 + crates/turbopack-convert-trace/src/main.rs | 589 ++++++++++++++++++ crates/turbopack-core/Cargo.toml | 2 +- crates/turbopack-core/src/chunk/mod.rs | 46 +- crates/turbopack-dev-server/src/lib.rs | 6 +- .../turbopack-dev-server/src/update/server.rs | 2 + .../turbopack-dev-server/src/update/stream.rs | 70 ++- crates/turbopack-dev/Cargo.toml | 2 +- .../src/ecmascript/content_entry.rs | 14 +- crates/turbopack-ecmascript/src/parse.rs | 2 +- crates/turbopack-ecmascript/src/utils.rs | 25 - crates/turbopack-node/Cargo.toml | 1 + crates/turbopack-node/src/evaluate.rs | 5 +- .../turbopack-node/src/render/render_proxy.rs | 24 +- .../src/render/render_static.rs | 25 +- crates/turbopack-node/src/source_map/mod.rs | 2 + 41 files changed, 1468 insertions(+), 152 deletions(-) create mode 100644 crates/turbo-tasks/src/duration_span.rs create mode 100644 crates/turbopack-cli-utils/src/exit.rs create mode 100644 crates/turbopack-cli-utils/src/raw_trace.rs create mode 100644 crates/turbopack-cli-utils/src/trace_writer.rs create mode 100644 crates/turbopack-cli-utils/src/tracing.rs create mode 100644 crates/turbopack-cli-utils/src/tracing_presets.rs create mode 100644 crates/turbopack-convert-trace/Cargo.toml create mode 100644 crates/turbopack-convert-trace/src/main.rs diff --git a/crates/turbo-tasks-fs/Cargo.toml b/crates/turbo-tasks-fs/Cargo.toml index 9eee0e03c7773..d23d3b30448bd 100644 --- a/crates/turbo-tasks-fs/Cargo.toml +++ b/crates/turbo-tasks-fs/Cargo.toml @@ -38,6 +38,7 @@ serde = { workspace = true, features = ["rc"] } serde_json = { workspace = true } serde_path_to_error = "0.1.9" tokio = { workspace = true } +tracing = { workspace = true } turbo-tasks = { workspace = true } turbo-tasks-hash = { workspace = true } diff --git a/crates/turbo-tasks-fs/src/lib.rs b/crates/turbo-tasks-fs/src/lib.rs index 8340b8b42cca5..03439bf2cf2d8 100644 --- a/crates/turbo-tasks-fs/src/lib.rs +++ b/crates/turbo-tasks-fs/src/lib.rs @@ -51,6 +51,7 @@ use tokio::{ fs, io::{AsyncBufReadExt, AsyncReadExt, BufReader}, }; +use tracing::{instrument, Level}; use turbo_tasks::{ mark_stateful, primitives::{BoolVc, StringReadRef, StringVc}, @@ -356,6 +357,7 @@ impl DiskFileSystem { } event = rx.try_recv(); } + #[instrument(parent = None, level = Level::INFO, name = "DiskFileSystem file change", skip_all, fields(name = display(path.display())))] fn invalidate( report_invalidation_reason: &Option<(String, PathBuf)>, path: &Path, diff --git a/crates/turbo-tasks-macros/src/func.rs b/crates/turbo-tasks-macros/src/func.rs index 9de2b0349d313..8d59a16390623 100644 --- a/crates/turbo-tasks-macros/src/func.rs +++ b/crates/turbo-tasks-macros/src/func.rs @@ -165,8 +165,10 @@ pub fn gen_native_function_code( }, (true, false) => quote! { #original_call_code.map(|v| v.into()) }, (false, true) => quote! { - #original_call_code; - Ok(turbo_tasks::NothingVc::new().into()) + { + #original_call_code; + Ok(turbo_tasks::NothingVc::new().into()) + } }, (false, false) => quote! { Ok(#original_call_code.into()) }, }; @@ -184,10 +186,12 @@ pub fn gen_native_function_code( #(#input_convert)* Ok(Box::new(move || { #(#input_clone)* - Box::pin(async move { + Box::pin(turbo_tasks::macro_helpers::tracing::Instrument::instrument(async move { #(#input_final)* - #original_call_code - }) + let turbo_tasks_result = #original_call_code; + turbo_tasks::macro_helpers::notify_scheduled_tasks(); + turbo_tasks_result + }, turbo_tasks::macro_helpers::tracing::trace_span!(#name_code))) })) })) }); diff --git a/crates/turbo-tasks-macros/src/value_impl_macro.rs b/crates/turbo-tasks-macros/src/value_impl_macro.rs index 85381a1c980a9..bdb589a90c824 100644 --- a/crates/turbo-tasks-macros/src/value_impl_macro.rs +++ b/crates/turbo-tasks-macros/src/value_impl_macro.rs @@ -102,7 +102,7 @@ pub fn value_impl(_args: TokenStream, input: TokenStream) -> TokenStream { let (native_function_code, input_raw_vc_arguments) = gen_native_function_code( // use const string - quote! { format!(concat!("{}::", stringify!(#ident)), std::any::type_name::<#vc_ident>()) }, + quote! { concat!(stringify!(#vc_ident), "::", stringify!(#ident)) }, quote! { #vc_ident::#inline_ident }, &function_ident, &function_id_ident, diff --git a/crates/turbo-tasks-macros/src/value_trait_macro.rs b/crates/turbo-tasks-macros/src/value_trait_macro.rs index 0bd58fde6a1c6..53459a7b9a9e1 100644 --- a/crates/turbo-tasks-macros/src/value_trait_macro.rs +++ b/crates/turbo-tasks-macros/src/value_trait_macro.rs @@ -109,7 +109,7 @@ pub fn value_trait(args: TokenStream, input: TokenStream) -> TokenStream { inline_sig.ident = inline_ident.clone(); let (native_function_code, input_raw_vc_arguments) = gen_native_function_code( - quote! { format!(concat!("{}::", stringify!(#method_ident)), std::any::type_name::<#ref_ident>()) }, + quote! { concat!(stringify!(#ref_ident), "::", stringify!(#method_ident)) }, quote! { #ref_ident::#inline_ident }, &function_ident, &function_id_ident, diff --git a/crates/turbo-tasks-memory/Cargo.toml b/crates/turbo-tasks-memory/Cargo.toml index 39db6e78d7720..9f5078060a112 100644 --- a/crates/turbo-tasks-memory/Cargo.toml +++ b/crates/turbo-tasks-memory/Cargo.toml @@ -21,6 +21,7 @@ parking_lot = { workspace = true } priority-queue = "1.3.0" rustc-hash = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } turbo-tasks = { workspace = true } turbo-tasks-hash = { workspace = true } turbo-tasks-malloc = { workspace = true, default-features = false } diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index d819b32a75458..e0be9f90ed217 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -19,6 +19,7 @@ use dashmap::{mapref::entry::Entry, DashMap}; use nohash_hasher::BuildNoHashHasher; use rustc_hash::FxHasher; use tokio::task::futures::TaskLocalFuture; +use tracing::{trace_span, Instrument}; use turbo_tasks::{ backend::{ Backend, BackendJobId, CellContent, PersistentTaskType, TaskExecutionSpec, @@ -767,6 +768,7 @@ impl Job { ) { match self { Job::RemoveFromScopes(tasks, scopes) => { + let _guard = trace_span!("Job::RemoveFromScopes").entered(); for task in tasks { backend.with_task(task, |task| { task.remove_from_scopes(scopes.iter().copied(), backend, turbo_tasks) @@ -775,6 +777,7 @@ impl Job { backend.scope_add_remove_priority.finish_high(); } Job::RemoveFromScope(tasks, scope) => { + let _guard = trace_span!("Job::RemoveFromScope").entered(); for task in tasks { backend.with_task(task, |task| { task.remove_from_scope(scope, backend, turbo_tasks) @@ -783,6 +786,7 @@ impl Job { backend.scope_add_remove_priority.finish_high(); } Job::ScheduleWhenDirtyFromScope(tasks) => { + let _guard = trace_span!("Job::ScheduleWhenDirtyFromScope").entered(); for task in tasks.into_iter() { backend.with_task(task, |task| { task.schedule_when_dirty_from_scope(backend, turbo_tasks); @@ -799,16 +803,20 @@ impl Job { .run_low(async { run_add_to_scope_queue(queue, scope, merging_scopes, backend, turbo_tasks); }) + .instrument(trace_span!("Job::AddToScopeQueue")) .await; } Job::RemoveFromScopeQueue(queue, id) => { + let _guard = trace_span!("Job::AddToScopeQueue").entered(); run_remove_from_scope_queue(queue, id, backend, turbo_tasks); backend.scope_add_remove_priority.finish_high(); } Job::UnloadRootScope(id) => { + let span = trace_span!("Job::UnloadRootScope"); if let Some(future) = turbo_tasks.wait_foreground_done_excluding_own() { - future.await; + future.instrument(span.clone()).await; } + let _guard = span.entered(); backend.with_scope(id, |scope| { scope.assert_unused(); }); @@ -817,6 +825,7 @@ impl Job { } } Job::GarbageCollection => { + let _guard = trace_span!("Job::GarbageCollection").entered(); backend.run_gc(true, turbo_tasks); } } diff --git a/crates/turbo-tasks/Cargo.toml b/crates/turbo-tasks/Cargo.toml index eee2b30d872cd..c52f3a330a4c8 100644 --- a/crates/turbo-tasks/Cargo.toml +++ b/crates/turbo-tasks/Cargo.toml @@ -36,6 +36,7 @@ serde_regex = "1.1.0" stable_deref_trait = "1.2.0" thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } turbo-tasks-hash = { workspace = true } turbo-tasks-macros = { workspace = true } diff --git a/crates/turbo-tasks/src/duration_span.rs b/crates/turbo-tasks/src/duration_span.rs new file mode 100644 index 0000000000000..3e181ebbdb74f --- /dev/null +++ b/crates/turbo-tasks/src/duration_span.rs @@ -0,0 +1,44 @@ +use std::time::Instant; + +/// Guard that emits a tracing event when dropped with the duration of the +/// lifetime of the guard. +pub struct DurationSpanGuard { + start: Instant, + f: Option, +} + +impl DurationSpanGuard { + pub fn new(f: F) -> Self { + Self { + start: Instant::now(), + f: Some(f), + } + } +} + +impl Drop for DurationSpanGuard { + fn drop(&mut self) { + if let Some(f) = self.f.take() { + f(self.start.elapsed().as_micros() as u64); + } + } +} + +/// Creates a event-based span that traces a certain duration (lifetime of the +/// guard). It's not a real span, which means it can be used across threads. It +/// will trace a duration and not the time the cpu is doing actual work. This +/// way it can be used to trace non-cpu-time or time that is spend in other +/// processes. +#[macro_export] +macro_rules! duration_span { + ($name:literal) => { + turbo_tasks::duration_span::DurationSpanGuard::new(|duration| { + turbo_tasks::macro_helpers::tracing::info!(name = $name, duration = duration); + }) + }; + ($name:literal, $($arg:tt)+) => { + turbo_tasks::duration_span::DurationSpanGuard::new(|duration| { + turbo_tasks::macro_helpers::tracing::info!(name = $name, $($arg)+, duration = duration); + }) + }; +} diff --git a/crates/turbo-tasks/src/graph/graph_traversal.rs b/crates/turbo-tasks/src/graph/graph_traversal.rs index 5a77b867a416c..6cf3046683ebc 100644 --- a/crates/turbo-tasks/src/graph/graph_traversal.rs +++ b/crates/turbo-tasks/src/graph/graph_traversal.rs @@ -45,8 +45,9 @@ where for edge in root_edges { match visit.visit(edge) { VisitControlFlow::Continue(node) => { + let span = visit.span(&node); if let Some((parent_handle, node_ref)) = self.insert(None, GraphNode(node)) { - futures.push(With::new(visit.edges(node_ref), parent_handle)); + futures.push(With::new(visit.edges(node_ref), span, parent_handle)); } } VisitControlFlow::Skip(node) => { @@ -146,16 +147,19 @@ where GraphTraversalState::Running(mut running) => 'outer: loop { let futures_pin = unsafe { Pin::new_unchecked(&mut running.futures) }; match futures_pin.poll_next(cx) { - std::task::Poll::Ready(Some((parent_handle, Ok(edges)))) => { + std::task::Poll::Ready(Some((parent_handle, span, Ok(edges)))) => { + let _guard = span.enter(); for edge in edges { match running.visit.visit(edge) { VisitControlFlow::Continue(node) => { + let span = running.visit.span(&node); if let Some((node_handle, node_ref)) = running .store .insert(Some(parent_handle.clone()), GraphNode(node)) { running.futures.push(With::new( running.visit.edges(node_ref), + span, node_handle, )); } @@ -176,7 +180,7 @@ where } } } - std::task::Poll::Ready(Some((_, Err(err)))) => { + std::task::Poll::Ready(Some((_, _, Err(err)))) => { break ( GraphTraversalState::Completed, std::task::Poll::Ready(GraphTraversalResult::Completed(Err(err))), diff --git a/crates/turbo-tasks/src/graph/visit.rs b/crates/turbo-tasks/src/graph/visit.rs index 2acf7dcdf64d0..d0d0003a709be 100644 --- a/crates/turbo-tasks/src/graph/visit.rs +++ b/crates/turbo-tasks/src/graph/visit.rs @@ -1,6 +1,7 @@ use std::future::Future; use anyhow::Result; +use tracing::Span; use super::VisitControlFlow; @@ -21,6 +22,13 @@ pub trait Visit { /// Returns a future that resolves to the outgoing edges of the given /// `node`. fn edges(&mut self, node: &Node) -> Self::EdgesFuture; + + /// Returns a [Span] for the given `node`, under which all edges are + /// processed. + fn span(&mut self, node: &Node) -> Span { + let _ = node; + Span::current() + } } // The different `Impl*` here are necessary in order to avoid the `Conflicting diff --git a/crates/turbo-tasks/src/graph/with_future.rs b/crates/turbo-tasks/src/graph/with_future.rs index bca179b8b08d7..76e5a9f69a6f4 100644 --- a/crates/turbo-tasks/src/graph/with_future.rs +++ b/crates/turbo-tasks/src/graph/with_future.rs @@ -1,6 +1,7 @@ -use std::future::Future; +use std::{future::Future, mem::replace}; use pin_project_lite::pin_project; +use tracing::Span; pin_project! { pub struct With @@ -9,6 +10,7 @@ pin_project! { { #[pin] future: T, + span: Span, handle: Option, } } @@ -17,9 +19,10 @@ impl With where T: Future, { - pub fn new(future: T, handle: H) -> Self { + pub fn new(future: T, span: Span, handle: H) -> Self { Self { future, + span, handle: Some(handle), } } @@ -29,18 +32,23 @@ impl Future for With where T: Future, { - type Output = (H, T::Output); + type Output = (H, Span, T::Output); fn poll( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let this = self.project(); + let guard = this.span.enter(); match this.future.poll(cx) { - std::task::Poll::Ready(result) => std::task::Poll::Ready(( - this.handle.take().expect("polled after completion"), - result, - )), + std::task::Poll::Ready(result) => { + drop(guard); + std::task::Poll::Ready(( + this.handle.take().expect("polled after completion"), + replace(this.span, Span::none()), + result, + )) + } std::task::Poll::Pending => std::task::Poll::Pending, } } diff --git a/crates/turbo-tasks/src/lib.rs b/crates/turbo-tasks/src/lib.rs index e0bde80a23b03..aad4243bc7c0d 100644 --- a/crates/turbo-tasks/src/lib.rs +++ b/crates/turbo-tasks/src/lib.rs @@ -38,6 +38,7 @@ mod collectibles; mod completion; pub mod debug; mod display; +pub mod duration_span; pub mod event; pub mod graph; mod id; @@ -103,8 +104,9 @@ pub use value_type::{ #[doc(hidden)] pub mod macro_helpers { pub use once_cell::sync::{Lazy, OnceCell}; + pub use tracing; - pub use super::manager::{find_cell_by_type, spawn_detached}; + pub use super::manager::{find_cell_by_type, notify_scheduled_tasks, spawn_detached}; } pub mod test_helpers { diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index 7a2ae18cd5d3d..32efb4b6521d4 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -21,6 +21,7 @@ use futures::FutureExt; use nohash_hasher::BuildNoHashHasher; use serde::{de::Visitor, Deserialize, Serialize}; use tokio::{runtime::Handle, select, task_local}; +use tracing::{instrument, trace_span, Instrument, Level}; use crate::{ backend::{Backend, CellContent, PersistentTaskType, TransientTaskType}, @@ -496,10 +497,12 @@ impl TurboTasks { anyhow::Ok(()) }; - let future = TURBO_TASKS.scope( - self.pin(), - CURRENT_TASK_ID.scope(task_id, self.backend.execution_scope(task_id, future)), - ); + let future = TURBO_TASKS + .scope( + self.pin(), + CURRENT_TASK_ID.scope(task_id, self.backend.execution_scope(task_id, future)), + ) + .in_current_span(); #[cfg(feature = "tokio_tracing")] tokio::task::Builder::new() @@ -578,7 +581,9 @@ impl TurboTasks { { return; } - listener.await; + listener + .instrument(trace_span!("wait_foreground_done")) + .await; } pub fn get_in_progress_count(&self) -> usize { @@ -738,27 +743,31 @@ impl TurboTasks { let this = self.pin(); self.currently_scheduled_background_jobs .fetch_add(1, Ordering::AcqRel); - tokio::spawn(TURBO_TASKS.scope(this.clone(), async move { - while this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 { - let listener = this - .event - .listen_with_note(|| "background job waiting for execution".to_string()); - if this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 { - listener.await; - } - } - let this2 = this.clone(); - if !this.stopped.load(Ordering::Acquire) { - func(this).await; - } - if this2 - .currently_scheduled_background_jobs - .fetch_sub(1, Ordering::AcqRel) - == 1 - { - this2.event_background.notify(usize::MAX); - } - })); + tokio::spawn( + TURBO_TASKS + .scope(this.clone(), async move { + while this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 { + let listener = this.event.listen_with_note(|| { + "background job waiting for execution".to_string() + }); + if this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 { + listener.await; + } + } + let this2 = this.clone(); + if !this.stopped.load(Ordering::Acquire) { + func(this).await; + } + if this2 + .currently_scheduled_background_jobs + .fetch_sub(1, Ordering::AcqRel) + == 1 + { + this2.event_background.notify(usize::MAX); + } + }) + .in_current_span(), + ); } #[track_caller] @@ -771,12 +780,16 @@ impl TurboTasks { ) { let this = self.pin(); this.begin_foreground_job(); - tokio::spawn(TURBO_TASKS.scope(this.clone(), async move { - if !this.stopped.load(Ordering::Acquire) { - func(this.clone()).await; - } - this.finish_foreground_job(); - })); + tokio::spawn( + TURBO_TASKS + .scope(this.clone(), async move { + if !this.stopped.load(Ordering::Acquire) { + func(this.clone()).await; + } + this.finish_foreground_job(); + }) + .in_current_span(), + ); } fn finish_current_task_state(&self) -> bool { @@ -787,6 +800,7 @@ impl TurboTasks { } = &mut *cell.borrow_mut(); let tasks = take(tasks_to_notify); if !tasks.is_empty() { + let _guard = trace_span!("finish_current_task_state").entered(); self.backend.invalidate_tasks(tasks, self); } *stateful @@ -857,16 +871,18 @@ impl TurboTasksCallApi for TurboTasks { } impl TurboTasksApi for TurboTasks { + #[instrument(level = Level::INFO, skip_all, name = "invalidate")] fn invalidate(&self, task: TaskId) { self.backend.invalidate_task(task, self); } + #[instrument(level = Level::INFO, skip_all, name = "invalidate", fields(name = display(&reason)))] fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc) { { let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap(); reason_set.insert(reason); } - self.invalidate(task); + self.backend.invalidate_task(task, self); } fn notify_scheduled_tasks(&self) { @@ -1070,6 +1086,7 @@ impl TurboTasksBackendApi for TurboTasks { tasks_to_notify.extend(tasks.iter()); }); if result.is_err() { + let _guard = trace_span!("schedule_notify_tasks", count = tasks.len()).entered(); self.backend.invalidate_tasks(tasks.to_vec(), self); } } @@ -1084,6 +1101,7 @@ impl TurboTasksBackendApi for TurboTasks { tasks_to_notify.extend(tasks.iter()); }); if result.is_err() { + let _guard = trace_span!("schedule_notify_tasks_set", count = tasks.len()).entered(); self.backend .invalidate_tasks(tasks.iter().copied().collect(), self); }; @@ -1328,7 +1346,7 @@ pub fn with_turbo_tasks_for_testing( /// Beware: this method is not safe to use in production code. It is only /// intended for use in tests and for debugging purposes. pub fn spawn_detached(f: impl Future> + Send + 'static) { - tokio::spawn(turbo_tasks().detached(Box::pin(f))); + tokio::spawn(turbo_tasks().detached(Box::pin(f.in_current_span()))); } pub fn current_task_for_testing() -> TaskId { @@ -1363,12 +1381,19 @@ pub fn mark_stateful() { }) } +/// Notifies scheduled tasks for execution. +pub fn notify_scheduled_tasks() { + with_turbo_tasks(|tt| tt.notify_scheduled_tasks()) +} + pub fn emit(collectible: T) { with_turbo_tasks(|tt| tt.emit_collectible(T::get_trait_type_id(), collectible.into())) } pub async fn spawn_blocking(func: impl FnOnce() -> T + Send + 'static) -> T { + let span = trace_span!("blocking operation").or_current(); let (r, d) = tokio::task::spawn_blocking(|| { + let _guard = span.entered(); let start = Instant::now(); let r = func(); (r, start.elapsed()) @@ -1381,10 +1406,13 @@ pub async fn spawn_blocking(func: impl FnOnce() -> T + Send + pub fn spawn_thread(func: impl FnOnce() + Send + 'static) { let handle = Handle::current(); + let span = trace_span!("thread").or_current(); thread::spawn(move || { + let span = span.entered(); let guard = handle.enter(); func(); drop(guard); + drop(span); }); } diff --git a/crates/turbo-tasks/src/util.rs b/crates/turbo-tasks/src/util.rs index a799035920e3d..7df77c74a0ff3 100644 --- a/crates/turbo-tasks/src/util.rs +++ b/crates/turbo-tasks/src/util.rs @@ -2,13 +2,17 @@ use std::{ any::Provider, error::Error as StdError, fmt::{Debug, Display}, + future::Future, hash::{Hash, Hasher}, ops::Deref, + pin::Pin, sync::Arc, + task::{Context, Poll}, time::Duration, }; use anyhow::{anyhow, Error}; +use pin_project_lite::pin_project; use serde::{Deserialize, Deserializer, Serialize, Serializer}; pub use super::{id_factory::IdFactory, no_move_vec::NoMoveVec, once_map::*}; @@ -220,3 +224,29 @@ impl Debug for StaticOrArc { (**self).fmt(f) } } + +pin_project! { + /// A future that wraps another future and applies a function on every poll call. + pub struct WrapFuture { + wrapper: W, + #[pin] + future: F, + } +} + +impl Fn(Pin<&mut F>, &mut Context<'a>) -> Poll> WrapFuture { + pub fn new(wrapper: W, future: F) -> Self { + Self { wrapper, future } + } +} + +impl Fn(Pin<&mut F>, &mut Context<'a>) -> Poll> Future + for WrapFuture +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + (this.wrapper)(this.future, cx) + } +} diff --git a/crates/turbopack-cli-utils/Cargo.toml b/crates/turbopack-cli-utils/Cargo.toml index 69a860d79a152..9be5481f524a5 100644 --- a/crates/turbopack-cli-utils/Cargo.toml +++ b/crates/turbopack-cli-utils/Cargo.toml @@ -14,9 +14,16 @@ bench = false [dependencies] anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } +crossbeam-channel = { workspace = true } crossterm = "0.26.0" +ctrlc = "3.2.5" +once_cell = { workspace = true } owo-colors = { workspace = true } +postcard = { workspace = true, features = ["alloc", "use-std"] } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } turbo-tasks = { workspace = true } turbo-tasks-fs = { workspace = true } turbopack-core = { workspace = true } diff --git a/crates/turbopack-cli-utils/src/exit.rs b/crates/turbopack-cli-utils/src/exit.rs new file mode 100644 index 0000000000000..ba261617e19d6 --- /dev/null +++ b/crates/turbopack-cli-utils/src/exit.rs @@ -0,0 +1,29 @@ +use std::sync::{Arc, Mutex}; + +use anyhow::{Context, Result}; + +/// A guard for the exit handler. When dropped, the exit guard will be dropped. +/// It might also be dropped on Ctrl-C. +pub struct ExitGuard(Arc>>); + +impl Drop for ExitGuard { + fn drop(&mut self) { + drop(self.0.lock().unwrap().take()) + } +} + +impl ExitGuard { + /// Drop a guard when Ctrl-C is pressed or the [ExitGuard] is dropped. + pub fn new(guard: T) -> Result { + let guard = Arc::new(Mutex::new(Some(guard))); + { + let guard = guard.clone(); + ctrlc::set_handler(move || { + drop(guard.lock().unwrap().take()); + std::process::exit(0); + }) + .context("Unable to set ctrl-c handler")?; + } + Ok(ExitGuard(guard)) + } +} diff --git a/crates/turbopack-cli-utils/src/lib.rs b/crates/turbopack-cli-utils/src/lib.rs index bc2d7f1ce0a51..aeb93ad001cb6 100644 --- a/crates/turbopack-cli-utils/src/lib.rs +++ b/crates/turbopack-cli-utils/src/lib.rs @@ -1,10 +1,16 @@ #![feature(async_closure)] #![feature(min_specialization)] #![feature(round_char_boundary)] +#![feature(thread_id_value)] +pub mod exit; pub mod issue; +pub mod raw_trace; pub mod runtime_entry; pub mod source_context; +pub mod trace_writer; +pub mod tracing; +pub mod tracing_presets; pub fn register() { turbo_tasks::register(); diff --git a/crates/turbopack-cli-utils/src/raw_trace.rs b/crates/turbopack-cli-utils/src/raw_trace.rs new file mode 100644 index 0000000000000..d2a8116e2ff3d --- /dev/null +++ b/crates/turbopack-cli-utils/src/raw_trace.rs @@ -0,0 +1,164 @@ +use std::{borrow::Cow, fmt::Write, marker::PhantomData, thread, time::Instant}; + +use tracing::{ + field::{display, Visit}, + span, Subscriber, +}; +use tracing_subscriber::{registry::LookupSpan, Layer}; + +use crate::{ + trace_writer::TraceWriter, + tracing::{TraceRow, TraceValue}, +}; + +/// A tracing layer that writes raw trace data to a writer. The data format is +/// defined by [FullTraceRow]. +pub struct RawTraceLayer LookupSpan<'a>> { + trace_writer: TraceWriter, + start: Instant, + _phantom: PhantomData, +} + +impl LookupSpan<'a>> RawTraceLayer { + pub fn new(trace_writer: TraceWriter) -> Self { + Self { + trace_writer, + start: Instant::now(), + _phantom: PhantomData, + } + } + + fn write(&self, data: TraceRow<'_>) { + // Always use allocated buffer to allow sending it to another thread. + let buf = postcard::to_allocvec(&data).unwrap(); + self.trace_writer.write(buf); + } +} + +impl LookupSpan<'a>> Layer for RawTraceLayer { + fn on_new_span( + &self, + attrs: &span::Attributes<'_>, + id: &span::Id, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + let ts = self.start.elapsed().as_micros() as u64; + let mut values = ValuesVisitor::new(); + attrs.values().record(&mut values); + self.write(TraceRow::Start { + ts, + id: id.into_u64(), + parent: if attrs.is_contextual() { + ctx.current_span().id().map(|p| p.into_u64()) + } else { + attrs.parent().map(|p| p.into_u64()) + }, + name: attrs.metadata().name(), + target: attrs.metadata().target(), + values: values.values, + }); + } + + fn on_close(&self, id: span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) { + let ts = self.start.elapsed().as_micros() as u64; + self.write(TraceRow::End { + ts, + id: id.into_u64(), + }); + } + + fn on_enter(&self, id: &span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) { + let ts = self.start.elapsed().as_micros() as u64; + let thread_id = thread::current().id().as_u64().into(); + self.write(TraceRow::Enter { + ts, + id: id.into_u64(), + thread_id, + }); + } + + fn on_exit(&self, id: &span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) { + let ts = self.start.elapsed().as_micros() as u64; + self.write(TraceRow::Exit { + ts, + id: id.into_u64(), + }); + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { + let ts = self.start.elapsed().as_micros() as u64; + let mut values = ValuesVisitor::new(); + event.record(&mut values); + self.write(TraceRow::Event { + ts, + parent: if event.is_contextual() { + ctx.current_span().id().map(|p| p.into_u64()) + } else { + event.parent().map(|p| p.into_u64()) + }, + values: values.values, + }); + } +} + +struct ValuesVisitor { + values: Vec<(Cow<'static, str>, TraceValue<'static>)>, +} + +impl ValuesVisitor { + fn new() -> Self { + Self { values: Vec::new() } + } +} + +impl Visit for ValuesVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + let mut str = String::new(); + let _ = write!(str, "{:?}", value); + self.values + .push((field.name().into(), TraceValue::String(str.into()))); + } + + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.values + .push((field.name().into(), TraceValue::Float(value))); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.values + .push((field.name().into(), TraceValue::Int(value))); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.values + .push((field.name().into(), TraceValue::UInt(value))); + } + + fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { + self.record_debug(field, &value) + } + + fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { + self.record_debug(field, &value) + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.values + .push((field.name().into(), TraceValue::Bool(value))); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.values.push(( + field.name().into(), + TraceValue::String(value.to_string().into()), + )); + } + + fn record_error( + &mut self, + field: &tracing::field::Field, + value: &(dyn std::error::Error + 'static), + ) { + self.record_debug(field, &display(value)) + } +} diff --git a/crates/turbopack-cli-utils/src/trace_writer.rs b/crates/turbopack-cli-utils/src/trace_writer.rs new file mode 100644 index 0000000000000..f1d9bca6802a0 --- /dev/null +++ b/crates/turbopack-cli-utils/src/trace_writer.rs @@ -0,0 +1,103 @@ +use std::{debug_assert, io::Write, thread::JoinHandle}; + +use crossbeam_channel::{unbounded, Sender, TryRecvError}; + +#[derive(Clone, Debug)] +pub struct TraceWriter { + channel: Sender>, +} + +impl TraceWriter { + /// This is a non-blocking writer that writes a file in a background thread. + /// This is inspired by tracing-appender non_blocking, but has some + /// differences: + /// * It allows writing an owned Vec instead of a reference, so avoiding + /// additional allocation. + /// * It uses an unbounded channel to avoid slowing down the application at + /// all (memory) cost. + /// * It issues less writes by buffering the data into chunks of ~1MB, when + /// possible. + pub fn new(mut writer: W) -> (Self, TraceWriterGuard) { + let (tx, rx) = unbounded::>(); + + let handle: std::thread::JoinHandle<()> = std::thread::spawn(move || { + let mut buf = Vec::with_capacity(1024 * 1024); + 'outer: loop { + if !buf.is_empty() { + let _ = writer.write_all(&buf); + let _ = writer.flush(); + buf.clear(); + } + let Ok(data) = rx.recv() else { + break 'outer; + }; + if data.is_empty() { + break 'outer; + } + if data.len() > buf.capacity() { + let _ = writer.write_all(&data); + } else { + buf.extend_from_slice(&data); + } + loop { + match rx.try_recv() { + Ok(data) => { + if data.is_empty() { + break 'outer; + } + if buf.len() + data.len() > buf.capacity() { + let _ = writer.write_all(&buf); + buf.clear(); + if data.len() > buf.capacity() { + let _ = writer.write_all(&data); + } else { + buf.extend_from_slice(&data); + } + } else { + buf.extend_from_slice(&data); + } + } + Err(TryRecvError::Disconnected) => { + break 'outer; + } + Err(TryRecvError::Empty) => { + break; + } + } + } + } + if !buf.is_empty() { + let _ = writer.write_all(&buf); + } + let _ = writer.flush(); + drop(writer); + }); + + ( + Self { + channel: tx.clone(), + }, + TraceWriterGuard { + channel: Some(tx), + handle: Some(handle), + }, + ) + } + + pub fn write(&self, data: Vec) { + debug_assert!(!data.is_empty()); + let _ = self.channel.send(data); + } +} + +pub struct TraceWriterGuard { + channel: Option>>, + handle: Option>, +} + +impl Drop for TraceWriterGuard { + fn drop(&mut self) { + let _ = self.channel.take().unwrap().send(Vec::new()); + let _ = self.handle.take().unwrap().join(); + } +} diff --git a/crates/turbopack-cli-utils/src/tracing.rs b/crates/turbopack-cli-utils/src/tracing.rs new file mode 100644 index 0000000000000..ae4f647f2a6bb --- /dev/null +++ b/crates/turbopack-cli-utils/src/tracing.rs @@ -0,0 +1,97 @@ +use std::{ + borrow::Cow, + fmt::{Display, Formatter}, +}; + +use serde::{Deserialize, Serialize}; + +/// A raw trace line. +#[derive(Debug, Serialize, Deserialize)] +pub enum TraceRow<'a> { + /// A new span has been started, but not entered yet. + Start { + /// Timestamp + ts: u64, + /// Unique id for this span. + id: u64, + /// Id of the parent span, if any. + parent: Option, + /// The name of the span. + name: &'a str, + /// The target of the span. + target: &'a str, + /// A list of key-value pairs for all attributes of the span. + #[serde(borrow)] + values: Vec<(Cow<'a, str>, TraceValue<'a>)>, + }, + /// A span has ended. The id might be reused in future. + End { + /// Timestamp + ts: u64, + /// Unique id for this span. Must be created by a `Start` event before. + id: u64, + }, + /// A span has been entered. This means it is spending CPU time now. + Enter { + /// Timestamp + ts: u64, + /// Unique id for this span. Must be created by a `Start` event before. + id: u64, + /// The thread id of the thread that entered the span. + thread_id: u64, + }, + /// A span has been exited. This means it is not spending CPU time anymore. + Exit { + /// Timestamp + ts: u64, + /// Unique id for this span. Must be entered by a `Enter` event before. + id: u64, + }, + /// A event has happened for some span. + Event { + /// Timestamp + ts: u64, + /// Id of the parent span, if any. + parent: Option, + /// A list of key-value pairs for all attributes of the event. + #[serde(borrow)] + values: Vec<(Cow<'a, str>, TraceValue<'a>)>, + }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum TraceValue<'a> { + String(#[serde(borrow)] Cow<'a, str>), + Bool(bool), + UInt(u64), + Int(i64), + Float(f64), +} + +impl Display for TraceValue<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TraceValue::String(s) => write!(f, "{}", s), + TraceValue::Bool(b) => write!(f, "{}", b), + TraceValue::UInt(u) => write!(f, "{}", u), + TraceValue::Int(i) => write!(f, "{}", i), + TraceValue::Float(fl) => write!(f, "{}", fl), + } + } +} + +impl<'a> TraceValue<'a> { + pub fn as_u64(&self) -> Option { + match self { + TraceValue::UInt(u) => Some(*u), + _ => None, + } + } + + pub fn as_str(&self) -> Option<&str> { + match self { + TraceValue::String(s) => Some(s), + _ => None, + } + } +} diff --git a/crates/turbopack-cli-utils/src/tracing_presets.rs b/crates/turbopack-cli-utils/src/tracing_presets.rs new file mode 100644 index 0000000000000..c9df6c3d0d739 --- /dev/null +++ b/crates/turbopack-cli-utils/src/tracing_presets.rs @@ -0,0 +1,44 @@ +use once_cell::sync::Lazy; + +pub static TRACING_OVERVIEW_TARGETS: Lazy> = Lazy::new(|| { + vec![ + "turbo_tasks_fs=info", + "turbopack_dev_server=info", + "turbopack_node=info", + ] +}); +pub static TRACING_TURBOPACK_TARGETS: Lazy> = Lazy::new(|| { + [ + &TRACING_OVERVIEW_TARGETS[..], + &[ + "turbo_tasks=info", + "turbopack=trace", + "turbopack_core=trace", + "turbopack_ecmascript=trace", + "turbopack_css=trace", + "turbopack_dev=trace", + "turbopack_image=trace", + "turbopack_dev_server=trace", + "turbopack_json=trace", + "turbopack_mdx=trace", + "turbopack_node=trace", + "turbopack_static=trace", + "turbopack_cli_utils=trace", + "turbopack_cli=trace", + "turbopack_ecmascript=trace", + ], + ] + .concat() +}); +pub static TRACING_TURBO_TASKS_TARGETS: Lazy> = Lazy::new(|| { + [ + &TRACING_TURBOPACK_TARGETS[..], + &[ + "turbo_tasks=trace", + "turbo_tasks_viz=trace", + "turbo_tasks_memory=trace", + "turbo_tasks_fs=trace", + ], + ] + .concat() +}); diff --git a/crates/turbopack-cli/Cargo.toml b/crates/turbopack-cli/Cargo.toml index 3270ebe025974..20ebf6ab5ab8a 100644 --- a/crates/turbopack-cli/Cargo.toml +++ b/crates/turbopack-cli/Cargo.toml @@ -43,11 +43,13 @@ criterion = { workspace = true, features = ["async_tokio"] } dunce = { workspace = true } futures = { workspace = true } mime = { workspace = true } +once_cell = { workspace = true } owo-colors = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } -tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } turbo-tasks = { workspace = true } turbo-tasks-env = { workspace = true } turbo-tasks-fetch = { workspace = true, default-features = false } diff --git a/crates/turbopack-cli/src/arguments.rs b/crates/turbopack-cli/src/arguments.rs index 273a8e440a9dd..c9a4e7ab73d50 100644 --- a/crates/turbopack-cli/src/arguments.rs +++ b/crates/turbopack-cli/src/arguments.rs @@ -1,4 +1,7 @@ -use std::{net::IpAddr, path::PathBuf}; +use std::{ + net::IpAddr, + path::{Path, PathBuf}, +}; use clap::{Args, Parser}; use turbopack_cli_utils::issue::IssueSeverityCliOption; @@ -9,6 +12,15 @@ pub enum Arguments { Dev(DevArguments), } +impl Arguments { + /// The directory of the application. see [CommonArguments]::dir + pub fn dir(&self) -> Option<&Path> { + match self { + Arguments::Dev(args) => args.common.dir.as_deref(), + } + } +} + #[derive(Debug, Args)] pub struct CommonArguments { /// The directory of the application. diff --git a/crates/turbopack-cli/src/main.rs b/crates/turbopack-cli/src/main.rs index a51e4bdf5907c..cd070ee35445f 100644 --- a/crates/turbopack-cli/src/main.rs +++ b/crates/turbopack-cli/src/main.rs @@ -1,10 +1,20 @@ #![feature(future_join)] #![feature(min_specialization)] -use anyhow::Result; +use std::path::Path; + +use anyhow::{Context, Result}; use clap::Parser; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; use turbopack_cli::{arguments::Arguments, register}; +use turbopack_cli_utils::{ + exit::ExitGuard, + raw_trace::RawTraceLayer, + trace_writer::TraceWriter, + tracing_presets::{ + TRACING_OVERVIEW_TARGETS, TRACING_TURBOPACK_TARGETS, TRACING_TURBO_TASKS_TARGETS, + }, +}; #[global_allocator] static ALLOC: turbo_tasks_malloc::TurboMalloc = turbo_tasks_malloc::TurboMalloc; @@ -12,17 +22,49 @@ static ALLOC: turbo_tasks_malloc::TurboMalloc = turbo_tasks_malloc::TurboMalloc; fn main() { use turbo_tasks_malloc::TurboMalloc; - let subscriber = Registry::default(); + let args = Arguments::parse(); + + let trace = std::env::var("TURBOPACK_TRACING").ok(); + + let _guard = if let Some(mut trace) = trace { + // Trace presets + match trace.as_str() { + "overview" => { + trace = TRACING_OVERVIEW_TARGETS.join(","); + } + "turbopack" => { + trace = TRACING_TURBOPACK_TARGETS.join(","); + } + "turbo-tasks" => { + trace = TRACING_TURBO_TASKS_TARGETS.join(","); + } + _ => {} + } - #[cfg(target_os = "macos")] - let subscriber = subscriber.with(tracing_signpost::SignpostLayer::new()); + let subscriber = Registry::default(); - let stdout_log = tracing_subscriber::fmt::layer().pretty(); - let subscriber = subscriber.with(stdout_log); + let subscriber = subscriber.with(EnvFilter::builder().parse(trace).unwrap()); - let subscriber = subscriber.with(EnvFilter::from_default_env()); + let internal_dir = args + .dir() + .unwrap_or_else(|| Path::new(".")) + .join(".turbopack"); + std::fs::create_dir_all(&internal_dir) + .context("Unable to create .turbopack directory") + .unwrap(); + let trace_file = internal_dir.join("trace.log"); + let trace_writer = std::fs::File::create(trace_file).unwrap(); + let (trace_writer, guard) = TraceWriter::new(trace_writer); + let subscriber = subscriber.with(RawTraceLayer::new(trace_writer)); - subscriber.init(); + let guard = ExitGuard::new(guard).unwrap(); + + subscriber.init(); + + Some(guard) + } else { + None + }; tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -31,13 +73,12 @@ fn main() { }) .build() .unwrap() - .block_on(main_inner()) - .unwrap() + .block_on(main_inner(args)) + .unwrap(); } -async fn main_inner() -> Result<()> { +async fn main_inner(args: Arguments) -> Result<()> { register(); - let args = Arguments::parse(); match args { Arguments::Dev(args) => turbopack_cli::dev::start_server(&args).await, diff --git a/crates/turbopack-convert-trace/Cargo.toml b/crates/turbopack-convert-trace/Cargo.toml new file mode 100644 index 0000000000000..3eb70f8aee87f --- /dev/null +++ b/crates/turbopack-convert-trace/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "turbopack-convert-trace" +version = "0.1.0" +description = "TBD" +license = "MPL-2.0" +edition = "2021" +autobenches = false + +[[bin]] +name = "turbopack-convert-trace" +path = "src/main.rs" +bench = false + +[dependencies] +anyhow = { workspace = true, features = ["backtrace"] } +clap = { workspace = true, features = ["derive", "env"] } +futures = { workspace = true } +indexmap = { workspace = true } +intervaltree = "0.2.7" +owo-colors = { workspace = true } +postcard = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +turbopack-cli-utils = { workspace = true } diff --git a/crates/turbopack-convert-trace/src/main.rs b/crates/turbopack-convert-trace/src/main.rs new file mode 100644 index 0000000000000..8fc1dedae5da0 --- /dev/null +++ b/crates/turbopack-convert-trace/src/main.rs @@ -0,0 +1,589 @@ +//! Parses a raw trace file and outputs a new-line separated JSON file +//! compatible with the chrome tracing format. +//! +//! https://ui.perfetto.dev/ can be used to visualize the output file. +//! +//! ## Usage: +//! +//! ```sh +//! turbopack-convert-trace [/path/to/trace.log] +//! ``` +//! +//! ## Options: +//! +//! - `--single`: Shows all cpu time as it would look like when a single cpu +//! would execute the workload. +//! - `--merged`: Shows all cpu time scaled by the concurrency. +//! - `--threads`: Shows cpu time distributed on infinite virtual cpus/threads. +//! - `--idle`: Adds extra info spans when cpus are idle. +//! +//! Default is `--merged`. + +use std::{ + borrow::Cow, + cmp::{max, min, Reverse}, + collections::{hash_map::Entry, HashMap, HashSet}, + eprintln, + ops::Range, +}; + +use indexmap::IndexMap; +use intervaltree::{Element, IntervalTree}; +use turbopack_cli_utils::tracing::{TraceRow, TraceValue}; + +macro_rules! pjson { + ($($tt:tt)*) => { + println!(","); + print!($($tt)*); + } +} + +fn main() { + // Read first argument from argv + let mut args: HashSet = std::env::args().skip(1).collect(); + let single = args.remove("--single"); + let mut merged = args.remove("--merged"); + let threads = args.remove("--threads"); + let idle = args.remove("--idle"); + if !single && !merged && !threads { + merged = true; + } + let arg = args + .iter() + .next() + .map_or(".turbopack/trace.log", String::as_str); + + eprint!("Reading content from {}...", arg); + + // Read file to string + let file = std::fs::read(arg).unwrap(); + eprintln!(" done ({} MiB)", file.len() / 1024 / 1024); + + eprint!("Parsing trace from content..."); + + let mut trace_rows = Vec::new(); + let mut current = &file[..]; + while !current.is_empty() { + match postcard::take_from_bytes(current) { + Ok((row, remaining)) => { + trace_rows.push(row); + current = remaining; + } + Err(err) => { + eprintln!( + "Error parsing trace data at {} bytes: {err}", + file.len() - current.len() + ); + break; + } + } + } + eprintln!(" done ({} items)", trace_rows.len()); + + eprint!("Analysing trace into span tree..."); + + let mut spans = Vec::new(); + spans.push(Span { + parent: 0, + name: "".into(), + target: "".into(), + start: 0, + end: 0, + self_start: None, + items: Vec::new(), + values: IndexMap::new(), + }); + + let mut active_ids = HashMap::new(); + + fn ensure_span(active_ids: &mut HashMap, spans: &mut Vec, id: u64) -> usize { + match active_ids.entry(id) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + let internal_id = spans.len(); + entry.insert(internal_id); + let span = Span { + parent: 0, + name: "".into(), + target: "".into(), + start: 0, + end: 0, + self_start: None, + items: Vec::new(), + values: IndexMap::new(), + }; + spans.push(span); + internal_id + } + } + } + + let mut all_self_times = Vec::new(); + let mut name_counts: HashMap, usize> = HashMap::new(); + + for data in trace_rows { + match data { + TraceRow::Start { + ts, + id, + parent, + name, + target, + values, + } => { + let internal_id = ensure_span(&mut active_ids, &mut spans, id); + spans[internal_id].name = name.into(); + spans[internal_id].target = target.into(); + spans[internal_id].start = ts; + spans[internal_id].end = ts; + spans[internal_id].values = values.into_iter().collect(); + let internal_parent = + parent.map_or(0, |id| ensure_span(&mut active_ids, &mut spans, id)); + spans[internal_id].parent = internal_parent; + let parent = &mut spans[internal_parent]; + parent.items.push(SpanItem::Child(internal_id)); + *name_counts.entry(Cow::Borrowed(name)).or_default() += 1; + } + TraceRow::End { ts, id } => { + // id might be reused + if let Some(internal_id) = active_ids.remove(&id) { + let span = &mut spans[internal_id]; + span.end = ts; + } + } + TraceRow::Enter { + ts, + id, + thread_id: _, + } => { + let internal_id = ensure_span(&mut active_ids, &mut spans, id); + let span = &mut spans[internal_id]; + span.self_start = Some(SelfTimeStarted { ts }); + } + TraceRow::Exit { ts, id } => { + let internal_id = ensure_span(&mut active_ids, &mut spans, id); + let span = &mut spans[internal_id]; + if let Some(SelfTimeStarted { ts: ts_start }) = span.self_start { + let (start, end) = if ts_start > ts { + (ts, ts_start) + } else { + (ts_start, ts) + }; + if end > start { + span.items.push(SpanItem::SelfTime { + start, + duration: end - start, + }); + all_self_times.push(Element { + range: start..end, + value: internal_id, + }); + } + } + } + TraceRow::Event { ts, parent, values } => { + let mut values = values.into_iter().collect::>(); + let duration = values.get("duration").and_then(|v| v.as_u64()).unwrap_or(0); + let name: Cow<'_, str> = values + .remove("name") + .and_then(|v| v.as_str().map(|s| s.to_string().into())) + .unwrap_or("".into()); + let internal_parent = + parent.map_or(0, |id| ensure_span(&mut active_ids, &mut spans, id)); + if duration > 0 { + *name_counts.entry(name.clone()).or_default() += 1; + let internal_id = spans.len(); + let start = ts - duration; + spans.push(Span { + parent: internal_parent, + name, + target: "".into(), + start, + end: ts, + self_start: None, + items: vec![SpanItem::SelfTime { start, duration }], + values, + }); + all_self_times.push(Element { + range: start..ts, + value: internal_id, + }); + let parent = &mut spans[internal_parent]; + parent.items.push(SpanItem::Child(internal_id)); + } + } + } + } + + eprintln!(" done ({} spans)", spans.len()); + + let mut name_counts: Vec<(Cow<'_, str>, usize)> = name_counts.into_iter().collect(); + name_counts.sort_by_key(|(_, count)| Reverse(*count)); + + eprintln!("Top 10 span names:"); + for (name, count) in name_counts.into_iter().take(10) { + eprintln!("{}x {}", count, name); + } + + println!("["); + print!(r#"{{"ph":"M","pid":1,"name":"thread_name","tid":0,"args":{{"name":"Single CPU"}}}}"#); + pjson!(r#"{{"ph":"M","pid":2,"name":"thread_name","tid":0,"args":{{"name":"Scaling CPU"}}}}"#); + + let busy_len = all_self_times.len(); + let busy: IntervalTree = all_self_times.into_iter().collect::>(); + + if threads { + eprint!("Distributing time into virtual threads..."); + let mut virtual_threads = Vec::new(); + + let find_thread = |virtual_threads: &mut Vec, + stack: &[usize], + start: u64| { + let idle_threads = virtual_threads + .iter() + .enumerate() + .filter(|(_, thread)| thread.ts <= start) + .collect::>(); + for (index, id) in stack.iter().enumerate() { + for &(i, thread) in &idle_threads { + if thread.stack.len() > index && thread.stack[index] == *id { + return i; + } + } + } + if let Some((index, _)) = idle_threads.into_iter().max_by_key(|(_, thread)| thread.ts) { + return index; + } + virtual_threads.push(VirtualThread { + stack: Vec::new(), + ts: 0, + }); + let index = virtual_threads.len() - 1; + pjson!( + r#"{{"ph":"M","pid":3,"name":"thread_name","tid":{index},"args":{{"name":"Virtual Thread"}}}}"# + ); + index + }; + + let get_stack = |mut id: usize| { + let mut stack = Vec::new(); + while id != 0 { + let span = &spans[id]; + stack.push(id); + id = span.parent; + } + stack.reverse(); + stack + }; + + for ( + i, + &Element { + range: Range { start, end }, + value: id, + }, + ) in busy.iter_sorted().enumerate() + { + if i % 1000 == 0 { + eprint!("\rDistributing time into virtual threads... {i} / {busy_len}",); + } + let stack = get_stack(id); + let thread = find_thread(&mut virtual_threads, &stack, start); + + let virtual_thread = &mut virtual_threads[thread]; + let ts = virtual_thread.ts; + let thread_stack = &mut virtual_thread.stack; + + let long_idle = virtual_thread.ts + 10000 < start; + + // Leave old spans on that thread + while !thread_stack.is_empty() + && (long_idle || thread_stack.last() != stack.get(thread_stack.len() - 1)) + { + let id = thread_stack.pop().unwrap(); + let span = &spans[id]; + pjson!( + r#"{{"ph":"E","pid":3,"ts":{ts},"name":{},"cat":{},"tid":{thread},"_id":{id},"_stack":"{:?}"}}"#, + serde_json::to_string(&span.name).unwrap(), + serde_json::to_string(&span.target).unwrap(), + stack.get(thread_stack.len()) + ); + } + + // Advance thread time to start + if virtual_thread.ts + 100 < start { + if !thread_stack.is_empty() { + pjson!( + r#"{{"ph":"B","pid":3,"ts":{ts},"name":"idle","cat":"idle","tid":{thread}}}"#, + ); + pjson!( + r#"{{"ph":"E","pid":3,"ts":{start},"name":"idle","cat":"idle","tid":{thread}}}"#, + ); + } + virtual_thread.ts = start; + } + + // Enter new spans on that thread + for id in stack[thread_stack.len()..].iter() { + thread_stack.push(*id); + let span = &spans[*id]; + pjson!( + r#"{{"ph":"B","pid":3,"ts":{start},"name":{},"cat":{},"tid":{thread},"_id":{id}}}"#, + serde_json::to_string(&span.name).unwrap(), + serde_json::to_string(&span.target).unwrap(), + ); + } + + virtual_thread.ts = end; + } + + // Leave all threads + for (i, VirtualThread { ts, mut stack }) in virtual_threads.into_iter().enumerate() { + while let Some(id) = stack.pop() { + let span = &spans[id]; + pjson!( + r#"{{"ph":"E","pid":3,"ts":{ts},"name":{},"cat":{},"tid":{i}}}"#, + serde_json::to_string(&span.name).unwrap(), + serde_json::to_string(&span.target).unwrap(), + ); + } + } + eprintln!(" done"); + } + + if single || merged { + eprint!("Emitting span tree..."); + + let get_concurrency = |range: Range| { + let mut sum = 0; + for interval in busy.query(range.clone()) { + let start = max(interval.range.start, range.start); + let end = min(interval.range.end, range.end); + sum += end - start; + } + 100 * sum / (range.end - range.start) + }; + + let target_concurrency = 200; + let warn_concurrency = 400; + + enum Task { + Enter { + id: usize, + root: bool, + }, + Exit { + name_json: String, + target_json: String, + start: u64, + start_scaled: u64, + }, + SelfTime { + duration: u64, + concurrency: u64, + }, + } + let mut ts = 0; + let mut tts = 0; + let mut merged_ts = 0; + let mut merged_tts = 0; + let mut stack = spans + .iter() + .enumerate() + .skip(1) + .rev() + .filter_map(|(id, span)| { + if span.parent == 0 { + Some(Task::Enter { id, root: true }) + } else { + None + } + }) + .collect::>(); + while let Some(task) = stack.pop() { + match task { + Task::Enter { id, root } => { + let span = &mut spans[id]; + if root { + if ts < span.start { + ts = span.start; + } + if tts < span.start { + tts = span.start; + } + if merged_ts < span.start { + merged_ts = span.start; + } + if merged_tts < span.start { + merged_tts = span.start; + } + } + let name_json = if let Some(name_value) = span.values.get("name") { + serde_json::to_string(&format!("{} {name_value}", span.name)).unwrap() + } else { + serde_json::to_string(&span.name).unwrap() + }; + let target_json = serde_json::to_string(&span.target).unwrap(); + let args_json = serde_json::to_string(&span.values).unwrap(); + if single { + pjson!( + r#"{{"ph":"B","pid":1,"ts":{ts},"tts":{tts},"name":{name_json},"cat":{target_json},"tid":0,"args":{args_json}}}"#, + ); + } + if merged { + pjson!( + r#"{{"ph":"B","pid":2,"ts":{merged_ts},"tts":{merged_tts},"name":{name_json},"cat":{target_json},"tid":0,"args":{args_json}}}"#, + ); + } + stack.push(Task::Exit { + name_json, + target_json, + start: ts, + start_scaled: tts, + }); + for item in span.items.iter().rev() { + match item { + SpanItem::SelfTime { + start, duration, .. + } => { + let range = *start..(start + duration); + let new_concurrency = get_concurrency(range); + let new_duration = *duration; + if let Some(Task::SelfTime { + duration, + concurrency, + }) = stack.last_mut() + { + *concurrency = ((*concurrency) * (*duration) + + new_concurrency * new_duration) + / (*duration + new_duration); + *duration += new_duration; + } else { + stack.push(Task::SelfTime { + duration: new_duration, + concurrency: max(100, new_concurrency), + }); + } + } + SpanItem::Child(id) => { + stack.push(Task::Enter { + id: *id, + root: false, + }); + } + } + } + } + Task::Exit { + name_json, + target_json, + start, + start_scaled, + } => { + if ts > start && tts > start_scaled { + let concurrency = (ts - start) * target_concurrency / (tts - start_scaled); + if single { + pjson!( + r#"{{"ph":"E","pid":1,"ts":{ts},"tts":{tts},"name":{name_json},"cat":{target_json},"tid":0,"args":{{"concurrency":{}}}}}"#, + concurrency as f64 / 100.0, + ); + } + if merged { + pjson!( + r#"{{"ph":"E","pid":2,"ts":{merged_ts},"tts":{merged_tts},"name":{name_json},"cat":{target_json},"tid":0,"args":{{"concurrency":{}}}}}"#, + concurrency as f64 / 100.0, + ); + } + } else { + if single { + pjson!( + r#"{{"ph":"E","pid":1,"ts":{ts},"tts":{tts},"name":{name_json},"cat":{target_json},"tid":0}}"#, + ); + } + if merged { + pjson!( + r#"{{"ph":"E","pid":2,"ts":{merged_ts},"tts":{merged_tts},"name":{name_json},"cat":{target_json},"tid":0}}"#, + ); + } + } + } + Task::SelfTime { + duration, + concurrency, + } => { + let scaled_duration = + (duration * target_concurrency + concurrency - 1) / concurrency; + let merged_duration = (duration * 100 + concurrency - 1) / concurrency; + let merged_scaled_duration = + (merged_duration * target_concurrency + concurrency - 1) / concurrency; + let target_duration = duration * concurrency / warn_concurrency; + let merged_target_duration = merged_duration * concurrency / warn_concurrency; + if idle && concurrency <= warn_concurrency { + let target = ts + target_duration; + let merged_target = merged_ts + merged_target_duration; + if single { + pjson!( + r#"{{"ph":"B","pid":1,"ts":{target},"tts":{tts},"name":"idle cpus","cat":"low concurrency","tid":0,"args":{{"concurrency":{}}}}}"#, + concurrency as f64 / 100.0, + ); + } + if merged { + pjson!( + r#"{{"ph":"B","pid":2,"ts":{merged_target},"tts":{merged_tts},"name":"idle cpus","cat":"low concurrency","tid":0,"args":{{"concurrency":{}}}}}"#, + concurrency as f64 / 100.0, + ); + } + } + ts += duration; + tts += scaled_duration; + merged_ts += merged_duration; + merged_tts += merged_scaled_duration; + if idle && concurrency <= warn_concurrency { + if single { + pjson!( + r#"{{"ph":"E","pid":1,"ts":{ts},"tts":{tts},"name":"idle cpus","cat":"low concurrency","tid":0}}"#, + ); + } + if merged { + pjson!( + r#"{{"ph":"E","pid":2,"ts":{merged_ts},"tts":{merged_tts},"name":"idle cpus","cat":"low concurrency","tid":0}}"#, + ); + } + } + } + } + } + eprintln!(" done"); + } + println!(); + println!("]"); +} + +#[derive(Debug)] +struct SelfTimeStarted { + ts: u64, +} + +#[derive(Debug, Default)] +struct Span<'a> { + parent: usize, + name: Cow<'a, str>, + target: Cow<'a, str>, + start: u64, + end: u64, + self_start: Option, + items: Vec, + values: IndexMap, TraceValue<'a>>, +} + +#[derive(Debug)] +enum SpanItem { + SelfTime { start: u64, duration: u64 }, + Child(usize), +} + +#[derive(Debug)] +struct VirtualThread { + ts: u64, + stack: Vec, +} diff --git a/crates/turbopack-core/Cargo.toml b/crates/turbopack-core/Cargo.toml index 9743679d6c0e0..1aa251170465d 100644 --- a/crates/turbopack-core/Cargo.toml +++ b/crates/turbopack-core/Cargo.toml @@ -25,7 +25,7 @@ serde_json = { workspace = true, features = ["preserve_order"] } serde_qs = { workspace = true } sourcemap = "6.0.2" swc_core = { workspace = true, features = ["ecma_preset_env", "common"] } - +tracing = { workspace = true } turbo-tasks = { workspace = true } turbo-tasks-env = { workspace = true } turbo-tasks-fs = { workspace = true } diff --git a/crates/turbopack-core/src/chunk/mod.rs b/crates/turbopack-core/src/chunk/mod.rs index 723cf0a7edcc7..b23f30d79c8fb 100644 --- a/crates/turbopack-core/src/chunk/mod.rs +++ b/crates/turbopack-core/src/chunk/mod.rs @@ -15,10 +15,11 @@ use std::{ use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; +use tracing::{info_span, Span}; use turbo_tasks::{ debug::ValueDebugFormat, graph::{GraphTraversal, GraphTraversalResult, ReverseTopological, Visit, VisitControlFlow}, - primitives::StringVc, + primitives::{StringReadRef, StringVc}, trace::TraceRawVcs, TryJoinIterExt, Value, ValueToString, ValueToStringVc, }; @@ -281,7 +282,7 @@ where #[derive(Eq, PartialEq, Clone, Hash)] enum ChunkContentGraphNode { // Chunk items that are placed into the current chunk - ChunkItem(I), + ChunkItem { item: I, ident: StringReadRef }, // Asset that is already available and doesn't need to be included AvailableAsset(AssetVc), // Chunks that are loaded in parallel to the current chunk @@ -354,7 +355,10 @@ where if let Some(chunk_item) = I::from_asset(context.chunking_context, asset).await? { graph_nodes.push(( Some((asset, chunking_type)), - ChunkContentGraphNode::ChunkItem(chunk_item), + ChunkContentGraphNode::ChunkItem { + item: chunk_item, + ident: asset.ident().to_string().await?, + }, )); } else { return Err(anyhow!( @@ -392,7 +396,10 @@ where { graph_nodes.push(( Some((asset, chunking_type)), - ChunkContentGraphNode::ChunkItem(chunk_item), + ChunkContentGraphNode::ChunkItem { + item: chunk_item, + ident: asset.ident().to_string().await?, + }, )); continue; } @@ -424,7 +431,10 @@ where { graph_nodes.push(( Some((asset, chunking_type)), - ChunkContentGraphNode::ChunkItem(manifest_loader_item), + ChunkContentGraphNode::ChunkItem { + item: manifest_loader_item, + ident: asset.ident().to_string().await?, + }, )); } else { return Ok(vec![( @@ -476,7 +486,7 @@ where return VisitControlFlow::Skip(node); } - if let ChunkContentGraphNode::ChunkItem(_) = &node { + if let ChunkContentGraphNode::ChunkItem { .. } = &node { self.chunk_items_count += 1; // Make sure the chunk doesn't become too large. @@ -492,7 +502,10 @@ where } fn edges(&mut self, node: &ChunkContentGraphNode) -> Self::EdgesFuture { - let chunk_item = if let ChunkContentGraphNode::ChunkItem(chunk_item) = node { + let chunk_item = if let ChunkContentGraphNode::ChunkItem { + item: chunk_item, .. + } = node + { Some(chunk_item.clone()) } else { None @@ -516,6 +529,14 @@ where .flatten()) } } + + fn span(&mut self, node: &ChunkContentGraphNode) -> Span { + if let ChunkContentGraphNode::ChunkItem { ident, .. } = node { + info_span!("module", name = display(ident)) + } else { + Span::current() + } + } } async fn chunk_content_internal_parallel( @@ -540,9 +561,10 @@ where .map(|entry| async move { Ok(( Some((entry, ChunkingType::Placed)), - ChunkContentGraphNode::ChunkItem( - I::from_asset(chunking_context, entry).await?.unwrap(), - ), + ChunkContentGraphNode::ChunkItem { + item: I::from_asset(chunking_context, entry).await?.unwrap(), + ident: entry.ident().to_string().await?, + }, )) }) .try_join() @@ -576,8 +598,8 @@ where for graph_node in graph_nodes { match graph_node { ChunkContentGraphNode::AvailableAsset(_asset) => {} - ChunkContentGraphNode::ChunkItem(chunk_item) => { - chunk_items.push(chunk_item); + ChunkContentGraphNode::ChunkItem { item, .. } => { + chunk_items.push(item); } ChunkContentGraphNode::Chunk(chunk) => { chunks.push(chunk); diff --git a/crates/turbopack-dev-server/src/lib.rs b/crates/turbopack-dev-server/src/lib.rs index 8ff412b06c0f1..4247828a902f0 100644 --- a/crates/turbopack-dev-server/src/lib.rs +++ b/crates/turbopack-dev-server/src/lib.rs @@ -25,7 +25,7 @@ use hyper::{ Request, Response, Server, }; use socket2::{Domain, Protocol, Socket, Type}; -use tracing::{event, span, Instrument, Level}; +use tracing::{event, info_span, Instrument, Level, Span}; use turbo_tasks::{ run_once_with_reason, trace::TraceRawVcs, util::FormatDuration, CollectiblesSource, RawVc, TransientInstance, TransientValue, TurboTasksApi, @@ -146,13 +146,13 @@ impl DevServerBuilder { let get_issue_reporter = get_issue_reporter.clone(); async move { let handler = move |request: Request| { - event!(Level::DEBUG, "request {:?}", request.uri()); - let request_span = span!(Level::DEBUG, "request", uri = ?request.uri()); + let request_span = info_span!(parent: None, "request", name = ?request.uri()); let start = Instant::now(); let tt = tt.clone(); let get_issue_reporter = get_issue_reporter.clone(); let source_provider = source_provider.clone(); let future = async move { + event!(parent: Span::current(), Level::DEBUG, "request start"); let reason = ServerRequest { method: request.method().clone(), uri: request.uri().clone(), diff --git a/crates/turbopack-dev-server/src/update/server.rs b/crates/turbopack-dev-server/src/update/server.rs index 0bce2239988e8..9d0af2725c740 100644 --- a/crates/turbopack-dev-server/src/update/server.rs +++ b/crates/turbopack-dev-server/src/update/server.rs @@ -10,6 +10,7 @@ use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream}; use pin_project_lite::pin_project; use tokio::select; use tokio_stream::StreamMap; +use tracing::{instrument, Level}; use turbo_tasks::{TransientInstance, TurboTasksApi}; use turbo_tasks_fs::json::parse_json_with_source_context; use turbopack_core::{error::PrettyPrintError, issue::IssueReporterVc, version::Update}; @@ -49,6 +50,7 @@ impl UpdateServer

{ })); } + #[instrument(level = Level::TRACE, skip_all, name = "UpdateServer::run_internal")] async fn run_internal(self, ws: HyperWebsocket) -> Result<()> { let mut client: UpdateClient = ws.await?.into(); diff --git a/crates/turbopack-dev-server/src/update/stream.rs b/crates/turbopack-dev-server/src/update/stream.rs index 9e62ea35f6e6d..79eaf8c3002c8 100644 --- a/crates/turbopack-dev-server/src/update/stream.rs +++ b/crates/turbopack-dev-server/src/update/stream.rs @@ -4,6 +4,7 @@ use anyhow::Result; use futures::{prelude::*, Stream}; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; +use tracing::Instrument; use turbo_tasks::{ primitives::StringVc, CollectiblesSource, IntoTraitRef, State, TraitRef, TransientInstance, }; @@ -211,6 +212,7 @@ pub(super) struct UpdateStream( ); impl UpdateStream { + #[tracing::instrument(skip(get_content), name = "UpdateStream::new")] pub async fn new( resource: String, get_content: TransientInstance, @@ -239,45 +241,49 @@ impl UpdateStream { let mut last_had_issues = false; let stream = ReceiverStream::new(rx).filter_map(move |item| { - let (has_issues, issues_changed) = - if let Some(UpdateStreamItem::Found { issues, .. }) = item.as_deref().ok() { - let has_issues = !issues.is_empty(); - let issues_changed = has_issues != last_had_issues; - last_had_issues = has_issues; - (has_issues, issues_changed) - } else { - (false, false) - }; - - async move { - match item.as_deref() { - Ok(UpdateStreamItem::Found { update, .. }) => { - match &**update { - Update::Partial(PartialUpdate { to, .. }) - | Update::Total(TotalUpdate { to }) => { - version_state - .set(to.clone()) - .await - .expect("failed to update version"); - - Some(item) - } - // Do not propagate empty updates. - Update::None => { - if has_issues || issues_changed { + { + let (has_issues, issues_changed) = + if let Some(UpdateStreamItem::Found { issues, .. }) = item.as_deref().ok() { + let has_issues = !issues.is_empty(); + let issues_changed = has_issues != last_had_issues; + last_had_issues = has_issues; + (has_issues, issues_changed) + } else { + (false, false) + }; + + async move { + match item.as_deref() { + Ok(UpdateStreamItem::Found { update, .. }) => { + match &**update { + Update::Partial(PartialUpdate { to, .. }) + | Update::Total(TotalUpdate { to }) => { + version_state + .set(to.clone()) + .await + .expect("failed to update version"); + Some(item) - } else { - None + } + // Do not propagate empty updates. + Update::None => { + if has_issues || issues_changed { + Some(item) + } else { + None + } } } } - } - _ => { - // Propagate other updates - Some(item) + _ => { + // Propagate other updates + Some(item) + } } } + .in_current_span() } + .in_current_span() }); Ok(UpdateStream(Box::pin(stream))) diff --git a/crates/turbopack-dev/Cargo.toml b/crates/turbopack-dev/Cargo.toml index a55310dd71bda..3da1398a90b1a 100644 --- a/crates/turbopack-dev/Cargo.toml +++ b/crates/turbopack-dev/Cargo.toml @@ -20,7 +20,7 @@ indoc = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_qs = { workspace = true } - +tracing = { workspace = true } turbo-tasks = { workspace = true } turbo-tasks-fs = { workspace = true } turbo-tasks-hash = { workspace = true } diff --git a/crates/turbopack-dev/src/ecmascript/content_entry.rs b/crates/turbopack-dev/src/ecmascript/content_entry.rs index f285b3211edee..265f7cbd9c4f6 100644 --- a/crates/turbopack-dev/src/ecmascript/content_entry.rs +++ b/crates/turbopack-dev/src/ecmascript/content_entry.rs @@ -2,6 +2,7 @@ use std::io::Write as _; use anyhow::Result; use indexmap::IndexMap; +use tracing::{info_span, Instrument}; use turbo_tasks::{ primitives::{StringVc, U64Vc}, TryJoinIterExt, Value, ValueToString, @@ -64,10 +65,17 @@ impl EcmascriptDevChunkContentEntriesVc { .chunk_items .iter() .map(|chunk_item| async move { - Ok(( - chunk_item.id().await?, - EcmascriptDevChunkContentEntry::new(*chunk_item, availability_info).await?, + async move { + Ok(( + chunk_item.id().await?, + EcmascriptDevChunkContentEntry::new(*chunk_item, availability_info).await?, + )) + } + .instrument(info_span!( + "chunk item", + name = display(chunk_item.asset_ident().to_string().await?) )) + .await }) .try_join() .await? diff --git a/crates/turbopack-ecmascript/src/parse.rs b/crates/turbopack-ecmascript/src/parse.rs index 10f28764f6ca4..d73ff8c19f4d1 100644 --- a/crates/turbopack-ecmascript/src/parse.rs +++ b/crates/turbopack-ecmascript/src/parse.rs @@ -21,6 +21,7 @@ use swc_core::{ }; use turbo_tasks::{ primitives::{StringVc, U64Vc}, + util::WrapFuture, Value, ValueToString, }; use turbo_tasks_fs::{FileContent, FileSystemPath, FileSystemPathVc}; @@ -38,7 +39,6 @@ use super::EcmascriptModuleAssetType; use crate::{ analyzer::graph::EvalContext, transform::{EcmascriptInputTransformsVc, TransformContext}, - utils::WrapFuture, EcmascriptInputTransform, }; diff --git a/crates/turbopack-ecmascript/src/utils.rs b/crates/turbopack-ecmascript/src/utils.rs index dd549a10151ee..3d1bdc5c0bebc 100644 --- a/crates/turbopack-ecmascript/src/utils.rs +++ b/crates/turbopack-ecmascript/src/utils.rs @@ -131,28 +131,3 @@ format_iter!(std::fmt::Octal); format_iter!(std::fmt::Pointer); format_iter!(std::fmt::UpperExp); format_iter!(std::fmt::UpperHex); - -pin_project! { - pub struct WrapFuture { - wrapper: W, - #[pin] - future: F, - } -} - -impl Fn(Pin<&mut F>, &mut Context<'a>) -> Poll> WrapFuture { - pub fn new(wrapper: W, future: F) -> Self { - Self { wrapper, future } - } -} - -impl Fn(Pin<&mut F>, &mut Context<'a>) -> Poll> Future - for WrapFuture -{ - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - (this.wrapper)(this.future, cx) - } -} diff --git a/crates/turbopack-node/Cargo.toml b/crates/turbopack-node/Cargo.toml index f12dfb4e9325f..d8d72a4c55a50 100644 --- a/crates/turbopack-node/Cargo.toml +++ b/crates/turbopack-node/Cargo.toml @@ -30,6 +30,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_qs = { workspace = true } tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } turbo-tasks = { workspace = true } turbo-tasks-bytes = { workspace = true } turbo-tasks-env = { workspace = true } diff --git a/crates/turbopack-node/src/evaluate.rs b/crates/turbopack-node/src/evaluate.rs index e4d16997af4ff..0c2453c705759 100644 --- a/crates/turbopack-node/src/evaluate.rs +++ b/crates/turbopack-node/src/evaluate.rs @@ -12,7 +12,7 @@ use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use turbo_tasks::{ - mark_finished, + duration_span, mark_finished, primitives::{JsonValueVc, StringVc}, util::SharedError, CompletionVc, RawVc, TryJoinIterExt, Value, ValueToString, @@ -415,6 +415,8 @@ async fn pull_operation( let mut file_dependencies = Vec::new(); let mut dir_dependencies = Vec::new(); + let guard = duration_span!("Node.js evaluation"); + let output = loop { match operation.recv().await? { EvalJavaScriptIncomingMessage::Error(error) => { @@ -470,6 +472,7 @@ async fn pull_operation( } } }; + drop(guard); // Read dependencies to make them a dependencies of this task. This task will // execute again when they change. diff --git a/crates/turbopack-node/src/render/render_proxy.rs b/crates/turbopack-node/src/render/render_proxy.rs index 25a1a5105628c..96496e43a1c59 100644 --- a/crates/turbopack-node/src/render/render_proxy.rs +++ b/crates/turbopack-node/src/render/render_proxy.rs @@ -5,11 +5,14 @@ use futures::{ pin_mut, SinkExt, StreamExt, TryStreamExt, }; use parking_lot::Mutex; -use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc}; +use turbo_tasks::{ + duration_span, mark_finished, primitives::StringVc, util::SharedError, RawVc, ValueToString, +}; use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::FileSystemPathVc; use turbopack_core::{ + asset::Asset, chunk::{ChunkingContextVc, EvaluatableAssetsVc}, error::PrettyPrintError, }; @@ -258,9 +261,13 @@ async fn render_stream_internal( } operation.send(RenderProxyOutgoingMessage::BodyEnd).await?; + let entry = module.ident().to_string().await?; + let guard = duration_span!("Node.js api execution", entry = display(entry)); + match operation.recv().await? { RenderProxyIncomingMessage::Headers { data } => yield RenderItem::Headers(data), RenderProxyIncomingMessage::Error(error) => { + drop(guard); // If we don't get headers, then something is very wrong. Instead, we send down a // 500 proxy error as if it were the proper result. let trace = trace_stack( @@ -281,7 +288,11 @@ async fn render_stream_internal( yield RenderItem::BodyChunk(body.into()); return; } - v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + v => { + drop(guard); + Err(anyhow!("unexpected message during rendering: {:#?}", v))?; + return; + }, }; loop { @@ -291,16 +302,23 @@ async fn render_stream_internal( } RenderProxyIncomingMessage::BodyEnd => break, RenderProxyIncomingMessage::Error(error) => { + drop(guard); // We have already started to send a result, so we can't change the // headers/body to a proxy error. operation.disallow_reuse(); let trace = trace_stack(error, intermediate_asset, intermediate_output_path, project_dir).await?; Err(anyhow!("error during streaming render: {}", trace))?; + return; } - v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + v => { + drop(guard); + Err(anyhow!("unexpected message during rendering: {:#?}", v))?; + return; + }, } } + drop(guard); }; let mut sender = (sender.get)(); diff --git a/crates/turbopack-node/src/render/render_static.rs b/crates/turbopack-node/src/render/render_static.rs index 55e3c695a9f81..c535707603500 100644 --- a/crates/turbopack-node/src/render/render_static.rs +++ b/crates/turbopack-node/src/render/render_static.rs @@ -5,7 +5,9 @@ use futures::{ pin_mut, SinkExt, StreamExt, TryStreamExt, }; use parking_lot::Mutex; -use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc}; +use turbo_tasks::{ + duration_span, mark_finished, primitives::StringVc, util::SharedError, RawVc, ValueToString, +}; use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::{File, FileContent, FileSystemPathVc}; @@ -296,9 +298,13 @@ async fn render_stream_internal( .await .context("sending headers to node.js process")?; + let entry = module.ident().to_string().await?; + let guard = duration_span!("Node.js rendering", entry = display(entry)); + match operation.recv().await? { RenderStaticIncomingMessage::Headers { data } => yield RenderItem::Headers(data), RenderStaticIncomingMessage::Rewrite { path } => { + drop(guard); yield RenderItem::Response(StaticResultVc::rewrite(RewriteBuilder::new(path).build())); return; } @@ -307,6 +313,7 @@ async fn render_stream_internal( headers, body, } => { + drop(guard); yield RenderItem::Response(StaticResultVc::content( FileContent::Content(File::from(body)).into(), status_code, @@ -315,6 +322,7 @@ async fn render_stream_internal( return; } RenderStaticIncomingMessage::Error(error) => { + drop(guard); // If we don't get headers, then something is very wrong. Instead, we send down a // 500 proxy error as if it were the proper result. let trace = trace_stack( @@ -333,7 +341,11 @@ async fn render_stream_internal( ); return; } - v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + v => { + drop(guard); + Err(anyhow!("unexpected message during rendering: {:#?}", v))?; + return; + }, }; // If we get here, then the first message was a Headers. Now we need to stream out the body @@ -350,11 +362,18 @@ async fn render_stream_internal( operation.disallow_reuse(); let trace = trace_stack(error, intermediate_asset, intermediate_output_path, project_dir).await?; + drop(guard); Err(anyhow!("error during streaming render: {}", trace))?; + return; } - v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + v => { + drop(guard); + Err(anyhow!("unexpected message during rendering: {:#?}", v))?; + return; + }, } } + drop(guard); }; let mut sender = (sender.get)(); diff --git a/crates/turbopack-node/src/source_map/mod.rs b/crates/turbopack-node/src/source_map/mod.rs index f7d64bc69a815..a1541d2716606 100644 --- a/crates/turbopack-node/src/source_map/mod.rs +++ b/crates/turbopack-node/src/source_map/mod.rs @@ -10,6 +10,7 @@ pub use content_source::{NextSourceMapTraceContentSource, NextSourceMapTraceCont use once_cell::sync::Lazy; use regex::Regex; pub use trace::{SourceMapTrace, SourceMapTraceVc, StackFrame, TraceResult, TraceResultVc}; +use tracing::{instrument, Level}; use turbo_tasks_fs::{ source_context::get_source_context, to_sys_path, FileLinesContent, FileLinesContentReadRef, FileSystemPathReadRef, FileSystemPathVc, @@ -311,6 +312,7 @@ impl StructuredError { } } +#[instrument(level = Level::TRACE, skip_all)] pub async fn trace_stack( error: StructuredError, root_asset: AssetVc,