Skip to content

Commit

Permalink
feat(turborepo): new ui + watch mode (#7962)
Browse files Browse the repository at this point in the history
### Description

Integrates new UI with watch mode. Pulls out the UI handle and sender
outside the `Visitor` and `Run` struct, so it can be owned by the
`WatchClient`. Also adds an `Event` for updating the task names, so on
rediscovery we can keep using the same UI thread but just update the
task names.

### Testing Instructions

Give it a shot!


Closes TURBO-2801

---------

Co-authored-by: Chris Olszewski <[email protected]>
  • Loading branch information
NicholasLYang and chris-olszewski authored May 14, 2024
1 parent a86e262 commit edd2118
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 67 deletions.
13 changes: 12 additions & 1 deletion crates/turborepo-lib/src/commands/run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::future::Future;

use tracing::error;
use turborepo_telemetry::events::command::CommandEventBuilder;

use crate::{commands::CommandBase, run, run::builder::RunBuilder, signal::SignalHandler};
Expand Down Expand Up @@ -42,12 +43,22 @@ pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result<i3
.with_analytics_sender(analytics_sender)
.build(&handler, telemetry)
.await?;
let result = run.run().await;

let (sender, handle) = run.start_experimental_ui().unzip();

let result = run.run(sender.clone()).await;

if let Some(analytics_handle) = analytics_handle {
analytics_handle.close_with_timeout().await;
}

if let (Some(handle), Some(sender)) = (handle, sender) {
sender.stop();
if let Err(e) = handle.await.expect("render thread panicked") {
error!("error encountered rendering tui: {e}");
}
}

result
};

Expand Down
26 changes: 22 additions & 4 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{collections::HashSet, io::Write, sync::Arc};
pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCache};
use chrono::{DateTime, Local};
use rayon::iter::ParallelBridge;
use tokio::task::JoinHandle;
use tracing::debug;
use turbopath::AbsoluteSystemPathBuf;
use turborepo_api_client::{APIAuth, APIClient};
Expand All @@ -25,7 +26,7 @@ use turborepo_env::EnvironmentVariableMap;
use turborepo_repository::package_graph::{PackageGraph, PackageName};
use turborepo_scm::SCM;
use turborepo_telemetry::events::generic::GenericEventBuilder;
use turborepo_ui::{cprint, cprintln, BOLD_GREY, GREY, UI};
use turborepo_ui::{cprint, cprintln, tui, tui::AppSender, BOLD_GREY, GREY, UI};

pub use crate::run::error::Error;
use crate::{
Expand All @@ -45,7 +46,6 @@ use crate::{
pub struct Run {
version: &'static str,
ui: UI,
experimental_ui: bool,
start_at: DateTime<Local>,
processes: ProcessManager,
run_telemetry: GenericEventBuilder,
Expand All @@ -64,6 +64,7 @@ pub struct Run {
task_access: TaskAccess,
daemon: Option<DaemonClient<DaemonConnector>>,
should_print_prelude: bool,
experimental_ui: bool,
}

impl Run {
Expand Down Expand Up @@ -117,7 +118,23 @@ impl Run {
new_run
}

pub async fn run(&mut self) -> Result<i32, Error> {
pub fn has_experimental_ui(&self) -> bool {
self.experimental_ui
}

pub fn start_experimental_ui(&self) -> Option<(AppSender, JoinHandle<Result<(), tui::Error>>)> {
if !self.experimental_ui {
return None;
}

let task_names = self.engine.tasks_with_command(&self.pkg_dep_graph);
let (sender, receiver) = AppSender::new();
let handle = tokio::task::spawn_blocking(move || tui::run_app(task_names, receiver));

Some((sender, handle))
}

pub async fn run(&mut self, experimental_ui_sender: Option<AppSender>) -> Result<i32, Error> {
if self.should_print_prelude {
self.print_run_prelude();
}
Expand Down Expand Up @@ -240,7 +257,7 @@ impl Run {
self.processes.clone(),
&self.repo_root,
global_env,
self.experimental_ui,
experimental_ui_sender,
);

if self.opts.run_opts.dry_run.is_some() {
Expand Down Expand Up @@ -279,6 +296,7 @@ impl Run {
&self.engine,
&self.env_at_execution_start,
self.opts.scope_opts.pkg_inference_root.as_deref(),
self.experimental_ui,
)
.await?;

Expand Down
14 changes: 9 additions & 5 deletions crates/turborepo-lib/src/run/summary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ impl RunTracker {
engine: &'a Engine,
hash_tracker: TaskHashTracker,
env_at_execution_start: &'a EnvironmentVariableMap,
has_experimental_ui: bool,
) -> Result<(), Error> {
let end_time = Local::now();

Expand Down Expand Up @@ -305,7 +306,7 @@ impl RunTracker {
.await?;

run_summary
.finish(end_time, exit_code, pkg_dep_graph, ui)
.finish(end_time, exit_code, pkg_dep_graph, ui, has_experimental_ui)
.await
}

Expand Down Expand Up @@ -380,6 +381,7 @@ impl<'a> RunSummary<'a> {
exit_code: i32,
pkg_dep_graph: &PackageGraph,
ui: UI,
has_experimental_ui: bool,
) -> Result<(), Error> {
if matches!(self.run_type, RunType::DryJson | RunType::DryText) {
return self.close_dry_run(pkg_dep_graph, ui);
Expand All @@ -391,10 +393,12 @@ impl<'a> RunSummary<'a> {
}
}

if let Some(execution) = &self.execution {
let path = self.get_path();
let failed_tasks = self.get_failed_tasks();
execution.print(ui, path, failed_tasks);
if !has_experimental_ui {
if let Some(execution) = &self.execution {
let path = self.get_path();
let failed_tasks = self.get_failed_tasks();
execution.print(ui, path, failed_tasks);
}
}

if let Some(spaces_client_handle) = self.spaces_client_handle.take() {
Expand Down
31 changes: 26 additions & 5 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::{
};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;
use turborepo_ui::{tui, tui::AppSender};

use crate::{
cli::{Command, RunArgs},
Expand Down Expand Up @@ -49,6 +50,8 @@ pub struct WatchClient {
base: CommandBase,
telemetry: CommandEventBuilder,
handler: SignalHandler,
ui_sender: Option<AppSender>,
ui_handle: Option<JoinHandle<Result<(), tui::Error>>>,
}

#[derive(Debug, Error, Diagnostic)]
Expand Down Expand Up @@ -88,6 +91,8 @@ pub enum Error {
SignalInterrupt,
#[error("package change error")]
PackageChange(#[from] tonic::Status),
#[error("could not connect to UI thread")]
UISend(String),
}

impl WatchClient {
Expand All @@ -109,6 +114,8 @@ impl WatchClient {
.build(&handler, telemetry.clone())
.await?;

let (sender, handle) = run.start_experimental_ui().unzip();

let connector = DaemonConnector {
can_start_server: true,
can_kill_server: true,
Expand All @@ -122,6 +129,8 @@ impl WatchClient {
handler,
telemetry,
persistent_tasks_handle: None,
ui_sender: sender,
ui_handle: handle,
})
}

Expand All @@ -131,7 +140,9 @@ impl WatchClient {

let mut events = client.package_changes().await?;

self.run.print_run_prelude();
if !self.run.has_experimental_ui() {
self.run.print_run_prelude();
}

let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?;

Expand Down Expand Up @@ -254,7 +265,7 @@ impl WatchClient {
.build(&signal_handler, telemetry)
.await?;

Ok(run.run().await?)
Ok(run.run(self.ui_sender.clone()).await?)
}
ChangedPackages::All => {
let mut args = self.base.args().clone();
Expand Down Expand Up @@ -286,23 +297,33 @@ impl WatchClient {
.build(&self.handler, self.telemetry.clone())
.await?;

if let Some(sender) = &self.ui_sender {
let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph);
sender
.update_tasks(task_names)
.map_err(|err| Error::UISend(err.to_string()))?;
}

if self.run.has_persistent_tasks() {
// Abort old run
if let Some(run) = self.persistent_tasks_handle.take() {
run.abort();
}

let mut persistent_run = self.run.create_run_for_persistent_tasks();
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
self.persistent_tasks_handle =
Some(tokio::spawn(async move { persistent_run.run().await }));
Some(tokio::spawn(
async move { persistent_run.run(ui_sender).await },
));

// But we still run the regular tasks blocking
let mut non_persistent_run = self.run.create_run_without_persistent_tasks();
Ok(non_persistent_run.run().await?)
Ok(non_persistent_run.run(self.ui_sender.clone()).await?)
} else {
Ok(self.run.run().await?)
Ok(self.run.run(self.ui_sender.clone()).await?)
}
}
}
Expand Down
38 changes: 11 additions & 27 deletions crates/turborepo-lib/src/task_graph/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use turborepo_telemetry::events::{
generic::GenericEventBuilder, task::PackageTaskEventBuilder, EventBuilder, TrackedErrors,
};
use turborepo_ui::{
tui::{self, TuiTask},
tui::{self, AppSender, TuiTask},
ColorSelector, OutputClient, OutputSink, OutputWriter, PrefixedUI, UI,
};
use which::which;
Expand Down Expand Up @@ -63,7 +63,7 @@ pub struct Visitor<'a> {
sink: OutputSink<StdWriter>,
task_hasher: TaskHasher<'a>,
ui: UI,
experimental_ui: bool,
experimental_ui_sender: Option<AppSender>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<'a> Visitor<'a> {
manager: ProcessManager,
repo_root: &'a AbsoluteSystemPath,
global_env: EnvironmentVariableMap,
experimental_ui: bool,
experimental_ui_sender: Option<AppSender>,
) -> Self {
let task_hasher = TaskHasher::new(
package_inputs_hashes,
Expand All @@ -135,7 +135,7 @@ impl<'a> Visitor<'a> {
task_hasher,
ui,
global_env,
experimental_ui,
experimental_ui_sender,
}
}

Expand All @@ -148,16 +148,6 @@ impl<'a> Visitor<'a> {
let concurrency = self.run_opts.concurrency as usize;
let (node_sender, mut node_stream) = mpsc::channel(concurrency);

let (ui, render_thread_handle) = if self.experimental_ui {
let task_names = engine.tasks_with_command(&self.package_graph);

let (handle, receiver) = tui::AppSender::new();
let app = tokio::task::spawn_blocking(move || tui::run_app(task_names, receiver));
(Some(handle), Some(app))
} else {
(None, None)
};

let engine_handle = {
let engine = engine.clone();
tokio::spawn(engine.execute(ExecutionOptions::new(false, concurrency), node_sender))
Expand Down Expand Up @@ -285,7 +275,7 @@ impl<'a> Visitor<'a> {
let vendor_behavior =
Vendor::infer().and_then(|vendor| vendor.behavior.as_ref());

let output_client = if let Some(handle) = &ui {
let output_client = if let Some(handle) = &self.experimental_ui_sender {
TaskOutput::UI(handle.task(info.to_string()))
} else {
TaskOutput::Direct(self.output_client(&info, vendor_behavior))
Expand Down Expand Up @@ -321,16 +311,6 @@ impl<'a> Visitor<'a> {
}
}
drop(factory);
if let Some(handle) = ui {
handle.stop();
if let Err(e) = render_thread_handle
.unwrap()
.await
.expect("render thread panicked")
{
error!("error encountered rendering tui: {e}");
}
}

if !internal_errors.is_empty() {
return Err(Error::InternalErrors(
Expand All @@ -351,6 +331,8 @@ impl<'a> Visitor<'a> {

/// Finishes visiting the tasks, creates the run summary, and either
/// prints, saves, or sends it to spaces.

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(
self,
packages,
Expand All @@ -366,6 +348,7 @@ impl<'a> Visitor<'a> {
engine: &Engine,
env_at_execution_start: &EnvironmentVariableMap,
pkg_inference_root: Option<&AnchoredSystemPath>,
has_experimental_ui: bool,
) -> Result<(), Error> {
let Self {
package_graph,
Expand Down Expand Up @@ -394,6 +377,7 @@ impl<'a> Visitor<'a> {
engine,
task_hasher.task_hash_tracker(),
env_at_execution_start,
has_experimental_ui,
)
.await?)
}
Expand Down Expand Up @@ -495,7 +479,7 @@ impl<'a> Visitor<'a> {
pub fn dry_run(&mut self) {
self.dry = true;
// No need to start a TUI on dry run
self.experimental_ui = false;
self.experimental_ui_sender = None;
}
}

Expand Down Expand Up @@ -665,7 +649,7 @@ impl<'a> ExecContextFactory<'a> {
ExecContext {
engine: self.engine.clone(),
ui: self.visitor.ui,
experimental_ui: self.visitor.experimental_ui,
experimental_ui: self.visitor.experimental_ui_sender.is_some(),
is_github_actions: self.visitor.run_opts.is_github_actions,
pretty_prefix: self
.visitor
Expand Down
Loading

0 comments on commit edd2118

Please sign in to comment.