Skip to content

Commit fb74da2

Browse files
fix: ensure cache is done writing before exit (#6603)
### Description This PR contains intertwined changes: - Waiting on all cache writes to finish before we exit the run. We leverage the signal handler here so we can make sure we wait regardless of how/why the run exits. - Printing errors from artifact upload failures. This differs from Go where we [throw generated errors away](https://github.com/vercel/turbo/blob/main/cli/internal/cache/async_cache.go#L79C12-L79C12) (I'm not sure this is desired since we do [format these errors for human consumption](https://github.com/vercel/turbo/blob/main/cli/internal/client/cache.go#L70)) - Correctly deserialize Vercel API errors. As per the [docs](https://vercel.com/docs/rest-api/errors) there's a containing object with an `error` field that has the information. - Pass team information when saving artifacts. This matches all of the other artifact api calls. Each commit is reviewable on it's own. ### Testing Instructions The `turbo` cli is a great candidate for these. Make sure you're logged in and linked to Vercel team. For testing that we wait until upload finishes: ``` # Ensure there's no error printed at the end of the run and the "Finishing writing to cache" message appears while we upload the large debug binary [0 olszewski@chriss-mbp] /Users/olszewski/code/vercel/turborepo $ turbo_dev build --filter=cli --force ... ...Finishing writing to cache... # Should be a FULL TURBO due to the previous command [0 olszewski@chriss-mbp] /Users/olszewski/code/vercel/turborepo $ turbo_dev build --filter=cli --remote-only --remote-cache-timeout=120 ``` Closes TURBO-1770 --------- Co-authored-by: Chris Olszewski <Chris Olszewski>
1 parent 0709591 commit fb74da2

File tree

10 files changed

+79
-25
lines changed

10 files changed

+79
-25
lines changed

crates/turborepo-api-client/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use lazy_static::lazy_static;
99
use regex::Regex;
1010
pub use reqwest::Response;
1111
use reqwest::{Method, RequestBuilder, StatusCode};
12+
use serde::Deserialize;
1213
use turborepo_ci::{is_ci, Vendor};
1314
use turborepo_vercel_api::{
1415
APIError, CachingStatus, CachingStatusResponse, PreflightResponse, SpacesResponse, Team,
@@ -42,13 +43,16 @@ pub trait Client {
4243
) -> Result<CachingStatusResponse>;
4344
async fn get_spaces(&self, token: &str, team_id: Option<&str>) -> Result<SpacesResponse>;
4445
async fn verify_sso_token(&self, token: &str, token_name: &str) -> Result<VerifiedSsoUser>;
46+
#[allow(clippy::too_many_arguments)]
4547
async fn put_artifact(
4648
&self,
4749
hash: &str,
4850
artifact_body: &[u8],
4951
duration: u64,
5052
tag: Option<&str>,
5153
token: &str,
54+
team_id: Option<&str>,
55+
team_slug: Option<&str>,
5256
) -> Result<()>;
5357
async fn handle_403(response: Response) -> Error;
5458
async fn fetch_artifact(
@@ -223,6 +227,8 @@ impl Client for APIClient {
223227
duration: u64,
224228
tag: Option<&str>,
225229
token: &str,
230+
team_id: Option<&str>,
231+
team_slug: Option<&str>,
226232
) -> Result<()> {
227233
let mut request_url = self.make_url(&format!("/v8/artifacts/{}", hash));
228234
let mut allow_auth = true;
@@ -253,6 +259,8 @@ impl Client for APIClient {
253259
request_builder = request_builder.header("Authorization", format!("Bearer {}", token));
254260
}
255261

262+
request_builder = Self::add_team_params(request_builder, team_id, team_slug);
263+
256264
request_builder = Self::add_ci_header(request_builder);
257265

258266
if let Some(tag) = tag {
@@ -270,7 +278,11 @@ impl Client for APIClient {
270278
}
271279

272280
async fn handle_403(response: Response) -> Error {
273-
let api_error: APIError = match response.json().await {
281+
#[derive(Deserialize)]
282+
struct WrappedAPIError {
283+
error: APIError,
284+
}
285+
let WrappedAPIError { error: api_error } = match response.json().await {
274286
Ok(api_error) => api_error,
275287
Err(e) => return Error::ReqwestError(e),
276288
};

crates/turborepo-auth/src/auth/login.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ mod tests {
217217
_duration: u64,
218218
_tag: Option<&str>,
219219
_token: &str,
220+
_team_id: Option<&str>,
221+
_team_slug: Option<&str>,
220222
) -> turborepo_api_client::Result<()> {
221223
unimplemented!("put_artifact")
222224
}

crates/turborepo-auth/src/auth/sso.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ mod tests {
228228
_duration: u64,
229229
_tag: Option<&str>,
230230
_token: &str,
231+
_team_id: Option<&str>,
232+
_team_slug: Option<&str>,
231233
) -> turborepo_api_client::Result<()> {
232234
unimplemented!("put_artifact")
233235
}

crates/turborepo-cache/src/async_cache.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1-
use std::sync::Arc;
1+
use std::sync::{atomic::AtomicU8, Arc};
22

33
use futures::{stream::FuturesUnordered, StreamExt};
44
use tokio::{
55
sync::{mpsc, Semaphore},
66
task::JoinHandle,
77
};
8+
use tracing::warn;
89
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf};
910
use turborepo_analytics::AnalyticsSender;
1011
use turborepo_api_client::{APIAuth, APIClient};
1112

1213
use crate::{multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts};
1314

15+
const WARNING_CUTOFF: u8 = 4;
16+
1417
pub struct AsyncCache {
1518
real_cache: Arc<CacheMultiplexer>,
1619
writer_sender: mpsc::Sender<WorkerRequest>,
@@ -24,7 +27,6 @@ enum WorkerRequest {
2427
duration: u64,
2528
files: Vec<AnchoredSystemPathBuf>,
2629
},
27-
#[cfg(test)]
2830
Flush(tokio::sync::oneshot::Sender<()>),
2931
}
3032

@@ -52,6 +54,7 @@ impl AsyncCache {
5254
let semaphore = Arc::new(Semaphore::new(max_workers));
5355
let mut workers = FuturesUnordered::new();
5456
let real_cache = worker_real_cache;
57+
let warnings = Arc::new(AtomicU8::new(0));
5558

5659
while let Some(request) = write_consumer.recv().await {
5760
match request {
@@ -63,13 +66,24 @@ impl AsyncCache {
6366
} => {
6467
let permit = semaphore.clone().acquire_owned().await.unwrap();
6568
let real_cache = real_cache.clone();
69+
let warnings = warnings.clone();
6670
workers.push(tokio::spawn(async move {
67-
let _ = real_cache.put(&anchor, &key, &files, duration).await;
71+
if let Err(err) = real_cache.put(&anchor, &key, &files, duration).await
72+
{
73+
let num_warnings =
74+
warnings.load(std::sync::atomic::Ordering::Acquire);
75+
if num_warnings <= WARNING_CUTOFF {
76+
warnings.store(
77+
num_warnings + 1,
78+
std::sync::atomic::Ordering::Release,
79+
);
80+
warn!("{err}");
81+
}
82+
}
6883
// Release permit once we're done with the write
6984
drop(permit);
7085
}))
7186
}
72-
#[cfg(test)]
7387
WorkerRequest::Flush(callback) => {
7488
// Wait on all workers to finish writing
7589
while let Some(worker) = workers.next().await {
@@ -131,7 +145,6 @@ impl AsyncCache {
131145

132146
// Used for testing to ensure that the workers resolve
133147
// before checking the cache.
134-
#[cfg(test)]
135148
pub async fn wait(&self) {
136149
let (tx, rx) = tokio::sync::oneshot::channel();
137150
self.writer_sender

crates/turborepo-cache/src/http.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{backtrace::Backtrace, io::Write};
22

3+
use tracing::debug;
34
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf};
45
use turborepo_analytics::AnalyticsSender;
56
use turborepo_api_client::{
@@ -78,6 +79,8 @@ impl HTTPCache {
7879
duration,
7980
tag.as_deref(),
8081
&self.api_auth.token,
82+
self.api_auth.team_id.as_deref(),
83+
self.api_auth.team_slug.as_deref(),
8184
)
8285
.await?;
8386

@@ -144,6 +147,7 @@ impl HTTPCache {
144147
hash: hash.to_string(),
145148
duration,
146149
};
150+
debug!("logging fetch: {analytics_event:?}");
147151
let _ = analytics_recorder.send(analytics_event);
148152
}
149153
}

crates/turborepo-cache/src/multiplexer.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,18 @@ impl CacheMultiplexer {
9090
_ => None,
9191
};
9292

93-
if let Some(Err(CacheError::ApiClientError(
94-
box turborepo_api_client::Error::CacheDisabled { .. },
95-
..,
96-
))) = http_result
97-
{
98-
warn!("failed to put to http cache: cache disabled");
99-
self.should_use_http_cache.store(false, Ordering::Relaxed);
93+
match http_result {
94+
Some(Err(CacheError::ApiClientError(
95+
box turborepo_api_client::Error::CacheDisabled { .. },
96+
..,
97+
))) => {
98+
warn!("failed to put to http cache: cache disabled");
99+
self.should_use_http_cache.store(false, Ordering::Relaxed);
100+
Ok(())
101+
}
102+
Some(Err(e)) => Err(e),
103+
None | Some(Ok(())) => Ok(()),
100104
}
101-
102-
Ok(())
103105
}
104106

105107
pub async fn fetch(

crates/turborepo-lib/src/commands/run.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@ use crate::{commands::CommandBase, run, run::Run, signal::SignalHandler};
44

55
pub async fn run(base: CommandBase) -> Result<i32, run::Error> {
66
let handler = SignalHandler::new(tokio::signal::ctrl_c());
7-
let run_subscriber = handler
8-
.subscribe()
9-
.expect("handler shouldn't close immediately after opening");
107

118
let mut run = Run::new(&base);
129
debug!("using the experimental rust codepath");
1310
debug!("configured run struct: {:?}", run);
14-
let run_fut = run.run(run_subscriber);
11+
let run_fut = run.run(&handler);
1512
let handler_fut = handler.done();
1613
tokio::select! {
1714
biased;

crates/turborepo-lib/src/run/cache.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ impl RunCache {
104104
ui: self.ui,
105105
}
106106
}
107+
108+
pub async fn wait_for_cache(&self) {
109+
self.cache.wait().await
110+
}
107111
}
108112

109113
pub struct TaskCache {

crates/turborepo-lib/src/run/mod.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::{
4444
process::ProcessManager,
4545
run::{global_hash::get_global_hash_inputs, summary::RunTracker},
4646
shim::TurboState,
47-
signal::SignalSubscriber,
47+
signal::{SignalHandler, SignalSubscriber},
4848
task_graph::Visitor,
4949
task_hash::{get_external_deps_hash, PackageInputsHashes, TaskHashTrackerState},
5050
};
@@ -118,8 +118,8 @@ impl<'a> Run<'a> {
118118
}
119119
}
120120

121-
#[tracing::instrument(skip(self, signal_subscriber))]
122-
pub async fn run(&mut self, signal_subscriber: SignalSubscriber) -> Result<i32, Error> {
121+
#[tracing::instrument(skip(self, signal_handler))]
122+
pub async fn run(&mut self, signal_handler: &SignalHandler) -> Result<i32, Error> {
123123
tracing::trace!(
124124
platform = %TurboState::platform_name(),
125125
start_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).expect("system time after epoch").as_micros(),
@@ -129,15 +129,23 @@ impl<'a> Run<'a> {
129129
TurboState::platform_name(),
130130
);
131131
let start_at = Local::now();
132-
self.connect_process_manager(signal_subscriber);
132+
if let Some(subscriber) = signal_handler.subscribe() {
133+
self.connect_process_manager(subscriber);
134+
}
133135

134136
let api_auth = self.base.api_auth()?;
135137
let api_client = self.base.api_client()?;
136138
let (analytics_sender, analytics_handle) =
137139
Self::initialize_analytics(api_auth.clone(), api_client.clone()).unzip();
138140

139141
let result = self
140-
.run_with_analytics(start_at, api_auth, api_client, analytics_sender)
142+
.run_with_analytics(
143+
start_at,
144+
api_auth,
145+
api_client,
146+
analytics_sender,
147+
signal_handler,
148+
)
141149
.await;
142150

143151
if let Some(analytics_handle) = analytics_handle {
@@ -155,6 +163,7 @@ impl<'a> Run<'a> {
155163
api_auth: Option<APIAuth>,
156164
api_client: APIClient,
157165
analytics_sender: Option<AnalyticsSender>,
166+
signal_handler: &SignalHandler,
158167
) -> Result<i32, Error> {
159168
let package_json_path = self.base.repo_root.join_component("package.json");
160169
let root_package_json = PackageJson::load(&package_json_path)?;
@@ -318,6 +327,15 @@ impl<'a> Run<'a> {
318327
self.base.ui,
319328
opts.run_opts.dry_run.is_some(),
320329
));
330+
if let Some(subscriber) = signal_handler.subscribe() {
331+
let runcache = runcache.clone();
332+
tokio::spawn(async move {
333+
let _guard = subscriber.listen().await;
334+
let spinner = turborepo_ui::start_spinner("...Finishing writing to cache...");
335+
runcache.wait_for_cache().await;
336+
spinner.finish_and_clear();
337+
});
338+
}
321339

322340
let mut global_env_mode = opts.run_opts.env_mode;
323341
if matches!(global_env_mode, EnvMode::Infer)

turborepo-tests/integration/tests/run-caching/remote-caching-enable.t

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ The fixture does not have a `remoteCache` config at all, output should be null
1515
null
1616

1717
Test that remote caching is enabled by default
18-
$ ${TURBO} run build --team=vercel --token=hi --output-logs=none | grep "Remote caching"
18+
$ ${TURBO} run build --team=vercel --token=hi --output-logs=none 2>/dev/null | grep "Remote caching"
1919
\xe2\x80\xa2 Remote caching enabled (esc)
2020

2121
Set `remoteCache = {}` into turbo.json

0 commit comments

Comments
 (0)