diff --git a/crates/turborepo-api-client/src/lib.rs b/crates/turborepo-api-client/src/lib.rs index 54a75f4daaec4..ae3059d1f709e 100644 --- a/crates/turborepo-api-client/src/lib.rs +++ b/crates/turborepo-api-client/src/lib.rs @@ -9,6 +9,7 @@ use lazy_static::lazy_static; use regex::Regex; pub use reqwest::Response; use reqwest::{Method, RequestBuilder, StatusCode}; +use serde::Deserialize; use turborepo_ci::{is_ci, Vendor}; use turborepo_vercel_api::{ APIError, CachingStatus, CachingStatusResponse, PreflightResponse, SpacesResponse, Team, @@ -42,6 +43,7 @@ pub trait Client { ) -> Result; async fn get_spaces(&self, token: &str, team_id: Option<&str>) -> Result; async fn verify_sso_token(&self, token: &str, token_name: &str) -> Result; + #[allow(clippy::too_many_arguments)] async fn put_artifact( &self, hash: &str, @@ -49,6 +51,8 @@ pub trait Client { duration: u64, tag: Option<&str>, token: &str, + team_id: Option<&str>, + team_slug: Option<&str>, ) -> Result<()>; async fn handle_403(response: Response) -> Error; async fn fetch_artifact( @@ -223,6 +227,8 @@ impl Client for APIClient { duration: u64, tag: Option<&str>, token: &str, + team_id: Option<&str>, + team_slug: Option<&str>, ) -> Result<()> { let mut request_url = self.make_url(&format!("/v8/artifacts/{}", hash)); let mut allow_auth = true; @@ -253,6 +259,8 @@ impl Client for APIClient { request_builder = request_builder.header("Authorization", format!("Bearer {}", token)); } + request_builder = Self::add_team_params(request_builder, team_id, team_slug); + request_builder = Self::add_ci_header(request_builder); if let Some(tag) = tag { @@ -270,7 +278,11 @@ impl Client for APIClient { } async fn handle_403(response: Response) -> Error { - let api_error: APIError = match response.json().await { + #[derive(Deserialize)] + struct WrappedAPIError { + error: APIError, + } + let WrappedAPIError { error: api_error } = match response.json().await { Ok(api_error) => api_error, Err(e) => return Error::ReqwestError(e), }; diff --git a/crates/turborepo-auth/src/auth/login.rs b/crates/turborepo-auth/src/auth/login.rs index b81fbe3fcd26d..2ff4204b6b9c0 100644 --- a/crates/turborepo-auth/src/auth/login.rs +++ b/crates/turborepo-auth/src/auth/login.rs @@ -217,6 +217,8 @@ mod tests { _duration: u64, _tag: Option<&str>, _token: &str, + _team_id: Option<&str>, + _team_slug: Option<&str>, ) -> turborepo_api_client::Result<()> { unimplemented!("put_artifact") } diff --git a/crates/turborepo-auth/src/auth/sso.rs b/crates/turborepo-auth/src/auth/sso.rs index 15229bb8709a8..30ab84bd230e5 100644 --- a/crates/turborepo-auth/src/auth/sso.rs +++ b/crates/turborepo-auth/src/auth/sso.rs @@ -228,6 +228,8 @@ mod tests { _duration: u64, _tag: Option<&str>, _token: &str, + _team_id: Option<&str>, + _team_slug: Option<&str>, ) -> turborepo_api_client::Result<()> { unimplemented!("put_artifact") } diff --git a/crates/turborepo-cache/src/async_cache.rs b/crates/turborepo-cache/src/async_cache.rs index 4596d55814443..b32b42d70866c 100644 --- a/crates/turborepo-cache/src/async_cache.rs +++ b/crates/turborepo-cache/src/async_cache.rs @@ -1,16 +1,19 @@ -use std::sync::Arc; +use std::sync::{atomic::AtomicU8, Arc}; use futures::{stream::FuturesUnordered, StreamExt}; use tokio::{ sync::{mpsc, Semaphore}, task::JoinHandle, }; +use tracing::warn; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; use turborepo_api_client::{APIAuth, APIClient}; use crate::{multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts}; +const WARNING_CUTOFF: u8 = 4; + pub struct AsyncCache { real_cache: Arc, writer_sender: mpsc::Sender, @@ -24,7 +27,6 @@ enum WorkerRequest { duration: u64, files: Vec, }, - #[cfg(test)] Flush(tokio::sync::oneshot::Sender<()>), } @@ -52,6 +54,7 @@ impl AsyncCache { let semaphore = Arc::new(Semaphore::new(max_workers)); let mut workers = FuturesUnordered::new(); let real_cache = worker_real_cache; + let warnings = Arc::new(AtomicU8::new(0)); while let Some(request) = write_consumer.recv().await { match request { @@ -63,13 +66,24 @@ impl AsyncCache { } => { let permit = semaphore.clone().acquire_owned().await.unwrap(); let real_cache = real_cache.clone(); + let warnings = warnings.clone(); workers.push(tokio::spawn(async move { - let _ = real_cache.put(&anchor, &key, &files, duration).await; + if let Err(err) = real_cache.put(&anchor, &key, &files, duration).await + { + let num_warnings = + warnings.load(std::sync::atomic::Ordering::Acquire); + if num_warnings <= WARNING_CUTOFF { + warnings.store( + num_warnings + 1, + std::sync::atomic::Ordering::Release, + ); + warn!("{err}"); + } + } // Release permit once we're done with the write drop(permit); })) } - #[cfg(test)] WorkerRequest::Flush(callback) => { // Wait on all workers to finish writing while let Some(worker) = workers.next().await { @@ -131,7 +145,6 @@ impl AsyncCache { // Used for testing to ensure that the workers resolve // before checking the cache. - #[cfg(test)] pub async fn wait(&self) { let (tx, rx) = tokio::sync::oneshot::channel(); self.writer_sender diff --git a/crates/turborepo-cache/src/http.rs b/crates/turborepo-cache/src/http.rs index 4d4bc209fd8ef..365be1275cf0c 100644 --- a/crates/turborepo-cache/src/http.rs +++ b/crates/turborepo-cache/src/http.rs @@ -1,5 +1,6 @@ use std::{backtrace::Backtrace, io::Write}; +use tracing::debug; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; use turborepo_api_client::{ @@ -78,6 +79,8 @@ impl HTTPCache { duration, tag.as_deref(), &self.api_auth.token, + self.api_auth.team_id.as_deref(), + self.api_auth.team_slug.as_deref(), ) .await?; @@ -144,6 +147,7 @@ impl HTTPCache { hash: hash.to_string(), duration, }; + debug!("logging fetch: {analytics_event:?}"); let _ = analytics_recorder.send(analytics_event); } } diff --git a/crates/turborepo-cache/src/multiplexer.rs b/crates/turborepo-cache/src/multiplexer.rs index 63a3e887bea0b..f29c7ca25104f 100644 --- a/crates/turborepo-cache/src/multiplexer.rs +++ b/crates/turborepo-cache/src/multiplexer.rs @@ -90,16 +90,18 @@ impl CacheMultiplexer { _ => None, }; - if let Some(Err(CacheError::ApiClientError( - box turborepo_api_client::Error::CacheDisabled { .. }, - .., - ))) = http_result - { - warn!("failed to put to http cache: cache disabled"); - self.should_use_http_cache.store(false, Ordering::Relaxed); + match http_result { + Some(Err(CacheError::ApiClientError( + box turborepo_api_client::Error::CacheDisabled { .. }, + .., + ))) => { + warn!("failed to put to http cache: cache disabled"); + self.should_use_http_cache.store(false, Ordering::Relaxed); + Ok(()) + } + Some(Err(e)) => Err(e), + None | Some(Ok(())) => Ok(()), } - - Ok(()) } pub async fn fetch( diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index 5ed3e6bd32404..6d196c9bb326b 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -4,14 +4,11 @@ use crate::{commands::CommandBase, run, run::Run, signal::SignalHandler}; pub async fn run(base: CommandBase) -> Result { let handler = SignalHandler::new(tokio::signal::ctrl_c()); - let run_subscriber = handler - .subscribe() - .expect("handler shouldn't close immediately after opening"); let mut run = Run::new(&base); debug!("using the experimental rust codepath"); debug!("configured run struct: {:?}", run); - let run_fut = run.run(run_subscriber); + let run_fut = run.run(&handler); let handler_fut = handler.done(); tokio::select! { biased; diff --git a/crates/turborepo-lib/src/run/cache.rs b/crates/turborepo-lib/src/run/cache.rs index c6496d0a2cea5..f3951673b2bc1 100644 --- a/crates/turborepo-lib/src/run/cache.rs +++ b/crates/turborepo-lib/src/run/cache.rs @@ -104,6 +104,10 @@ impl RunCache { ui: self.ui, } } + + pub async fn wait_for_cache(&self) { + self.cache.wait().await + } } pub struct TaskCache { diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 5b0c80f4b4e32..a6aeafee4de3e 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -44,7 +44,7 @@ use crate::{ process::ProcessManager, run::{global_hash::get_global_hash_inputs, summary::RunTracker}, shim::TurboState, - signal::SignalSubscriber, + signal::{SignalHandler, SignalSubscriber}, task_graph::Visitor, task_hash::{get_external_deps_hash, PackageInputsHashes, TaskHashTrackerState}, }; @@ -118,8 +118,8 @@ impl<'a> Run<'a> { } } - #[tracing::instrument(skip(self, signal_subscriber))] - pub async fn run(&mut self, signal_subscriber: SignalSubscriber) -> Result { + #[tracing::instrument(skip(self, signal_handler))] + pub async fn run(&mut self, signal_handler: &SignalHandler) -> Result { tracing::trace!( platform = %TurboState::platform_name(), start_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).expect("system time after epoch").as_micros(), @@ -129,7 +129,9 @@ impl<'a> Run<'a> { TurboState::platform_name(), ); let start_at = Local::now(); - self.connect_process_manager(signal_subscriber); + if let Some(subscriber) = signal_handler.subscribe() { + self.connect_process_manager(subscriber); + } let api_auth = self.base.api_auth()?; let api_client = self.base.api_client()?; @@ -137,7 +139,13 @@ impl<'a> Run<'a> { Self::initialize_analytics(api_auth.clone(), api_client.clone()).unzip(); let result = self - .run_with_analytics(start_at, api_auth, api_client, analytics_sender) + .run_with_analytics( + start_at, + api_auth, + api_client, + analytics_sender, + signal_handler, + ) .await; if let Some(analytics_handle) = analytics_handle { @@ -155,6 +163,7 @@ impl<'a> Run<'a> { api_auth: Option, api_client: APIClient, analytics_sender: Option, + signal_handler: &SignalHandler, ) -> Result { let package_json_path = self.base.repo_root.join_component("package.json"); let root_package_json = PackageJson::load(&package_json_path)?; @@ -318,6 +327,15 @@ impl<'a> Run<'a> { self.base.ui, opts.run_opts.dry_run.is_some(), )); + if let Some(subscriber) = signal_handler.subscribe() { + let runcache = runcache.clone(); + tokio::spawn(async move { + let _guard = subscriber.listen().await; + let spinner = turborepo_ui::start_spinner("...Finishing writing to cache..."); + runcache.wait_for_cache().await; + spinner.finish_and_clear(); + }); + } let mut global_env_mode = opts.run_opts.env_mode; if matches!(global_env_mode, EnvMode::Infer) diff --git a/turborepo-tests/integration/tests/run-caching/remote-caching-enable.t b/turborepo-tests/integration/tests/run-caching/remote-caching-enable.t index d970737583cb1..ae48bef6e6ea0 100644 --- a/turborepo-tests/integration/tests/run-caching/remote-caching-enable.t +++ b/turborepo-tests/integration/tests/run-caching/remote-caching-enable.t @@ -15,7 +15,7 @@ The fixture does not have a `remoteCache` config at all, output should be null null Test that remote caching is enabled by default - $ ${TURBO} run build --team=vercel --token=hi --output-logs=none | grep "Remote caching" + $ ${TURBO} run build --team=vercel --token=hi --output-logs=none 2>/dev/null | grep "Remote caching" \xe2\x80\xa2 Remote caching enabled (esc) Set `remoteCache = {}` into turbo.json