Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include timeSaved metric when skipping cache check #4952

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f9f3e16
refactor: Change Itemstatus signature to include TimeSaved metric
May 19, 2023
57c05e0
Improve log message when cache restoration is skipped
mehulkar May 15, 2023
01a4a6d
Return timeSaved metric even when cache restoration is skipped
mehulkar May 15, 2023
c39306c
Fix how Exists() works to make clear that its not composite, but firs…
May 19, 2023
b1dd3bc
Exit the loop early on a cache hit, since we're propaging a cache hit…
May 20, 2023
73674e8
Check for a cahce hit after handling errors
May 20, 2023
92bb2a4
extract duration extracter function instead of just a const
May 20, 2023
405bbbf
Make method private
May 20, 2023
5a45008
comments
May 20, 2023
b08e291
Update cli/internal/runcache/runcache.go
mehulkar May 23, 2023
a798459
Use const for cache source noop
May 20, 2023
320a368
Use CacheMiss helper for noop implementation
May 23, 2023
46fc3f4
Merge branch 'main' into mehulkar/turbo-1000-runs-include-timesaved-f…
mehulkar May 25, 2023
2f1b9fd
Store and retrieve timeSaved value from daemon (#5101)
mehulkar May 27, 2023
d94af44
Remove debug statements
mehulkar May 27, 2023
8ea8c8a
Fix linter
mehulkar May 27, 2023
ea267a6
Merge branch 'main' into mehulkar/turbo-1000-runs-include-timesaved-f…
mehulkar May 30, 2023
55105bc
revert log message
mehulkar May 30, 2023
32296b8
Remove Exists() check
mehulkar May 30, 2023
5bc4f12
set cache source to fs when skipping restoration
mehulkar May 30, 2023
f5e8183
Revert all the cacheStatus signature things
mehulkar May 31, 2023
e96c3bb
Revert the parts of runcache that aren't related to daemon get/set
mehulkar May 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cli/internal/daemonclient/daemonclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@ func New(client *connector.Client) *DaemonClient {
}

// GetChangedOutputs implements runcache.OutputWatcher.GetChangedOutputs
func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) {
func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, int, error) {
resp, err := d.client.GetChangedOutputs(ctx, &turbodprotocol.GetChangedOutputsRequest{
Hash: hash,
OutputGlobs: repoRelativeOutputGlobs,
})
if err != nil {
return nil, err
return nil, 0, err
}

return resp.ChangedOutputGlobs, nil
return resp.ChangedOutputGlobs, int(resp.TimeSaved), nil
}

// NotifyOutputsWritten implements runcache.OutputWatcher.NotifyOutputsWritten
func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error {
func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs, timeSaved int) error {
_, err := d.client.NotifyOutputsWritten(ctx, &turbodprotocol.NotifyOutputsWrittenRequest{
Hash: hash,
OutputGlobs: repoRelativeOutputGlobs.Inclusions,
OutputExclusionGlobs: repoRelativeOutputGlobs.Exclusions,
TimeSaved: uint64(timeSaved),
})
return err
}
Expand Down
10 changes: 5 additions & 5 deletions cli/internal/runcache/output_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
// OutputWatcher instances are responsible for tracking changes to task outputs
type OutputWatcher interface {
// GetChangedOutputs returns which of the given globs have changed since the specified hash was last run
GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error)
GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, int, error)
// NotifyOutputsWritten tells the watcher that the given globs have been cached with the specified hash
NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error
NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs, timeSaved int) error
}

// NoOpOutputWatcher implements OutputWatcher, but always considers every glob to have changed
Expand All @@ -21,12 +21,12 @@ var _ OutputWatcher = (*NoOpOutputWatcher)(nil)

// GetChangedOutputs implements OutputWatcher.GetChangedOutputs.
// Since this is a no-op watcher, no tracking is done.
func (NoOpOutputWatcher) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) {
return repoRelativeOutputGlobs, nil
func (NoOpOutputWatcher) GetChangedOutputs(_ context.Context, _ string, repoRelativeOutputGlobs []string) ([]string, int, error) {
return repoRelativeOutputGlobs, 0, nil
}

// NotifyOutputsWritten implements OutputWatcher.NotifyOutputsWritten.
// Since this is a no-op watcher, consider all globs to have changed
func (NoOpOutputWatcher) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error {
func (NoOpOutputWatcher) NotifyOutputsWritten(_ context.Context, _ string, _ fs.TaskOutputs, _ int) error {
return nil
}
8 changes: 5 additions & 3 deletions cli/internal/runcache/runcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe
return cache.ItemStatus{Local: false, Remote: false}, 0, nil
}

changedOutputGlobs, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs.Inclusions)
changedOutputGlobs, timeSavedFromDaemon, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs.Inclusions)

if err != nil {
progressLogger.Warn(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err))
prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err)))
Expand Down Expand Up @@ -149,13 +150,14 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe
return cache.ItemStatus{Local: false, Remote: false}, 0, nil
}

if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs); err != nil {
if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs, timeSavedFromDaemon); err != nil {
// Don't fail the whole operation just because we failed to watch the outputs
prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err)))
}
} else {
// If no outputs have changed, that means we have a local cache hit.
cacheStatus.Local = true
timeSaved = timeSavedFromDaemon
prefixedUI.Warn(fmt.Sprintf("Skipping cache check for %v, outputs have not changed since previous run.", tc.pt.TaskID))
}

Expand Down Expand Up @@ -279,7 +281,7 @@ func (tc *TaskCache) SaveOutputs(ctx context.Context, logger hclog.Logger, termi
if err = tc.rc.cache.Put(tc.rc.repoRoot, tc.hash, duration, relativePaths); err != nil {
return err
}
err = tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs)
err = tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs, duration)
if err != nil {
// Don't fail the cache write because we also failed to record it, we will just do
// extra I/O in the future restoring files that haven't changed from cache
Expand Down
2 changes: 2 additions & 0 deletions cli/internal/turbodprotocol/turbod.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message NotifyOutputsWrittenRequest {
repeated string output_globs = 1;
string hash = 2;
repeated string output_exclusion_globs = 3;
uint64 time_saved = 4;
}

message NotifyOutputsWrittenResponse {}
Expand All @@ -45,6 +46,7 @@ message GetChangedOutputsRequest {

message GetChangedOutputsResponse {
repeated string changed_output_globs = 1;
uint64 time_saved = 2;
}

message DaemonStatus {
Expand Down
2 changes: 2 additions & 0 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ impl DaemonClient<DaemonConnector> {
hash: String,
output_globs: Vec<String>,
output_exclusion_globs: Vec<String>,
time_saved: u64,
) -> Result<(), DaemonError> {
self.client
.notify_outputs_written(proto::NotifyOutputsWrittenRequest {
hash,
output_globs,
output_exclusion_globs,
time_saved,
})
.await?;

Expand Down
23 changes: 17 additions & 6 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
//! globs, and to query for changes for those globs.

use std::{
collections::HashSet,
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex as StdMutux,
},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -58,6 +58,8 @@ pub struct DaemonServer<T: Watcher> {
shutdown_rx: Option<Receiver<()>>,

running: Arc<AtomicBool>,

times_saved: Arc<std::sync::Mutex<HashMap<String, u64>>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -98,6 +100,7 @@ impl DaemonServer<notify::RecommendedWatcher> {
shutdown_rx: Some(recv_shutdown),

running: Arc::new(AtomicBool::new(true)),
times_saved: Arc::new(StdMutux::new(HashMap::new())),
})
}
}
Expand Down Expand Up @@ -255,6 +258,10 @@ impl<T: Watcher + Send + 'static> proto::turbod_server::Turbod for DaemonServer<
) -> Result<tonic::Response<proto::NotifyOutputsWrittenResponse>, tonic::Status> {
let inner = request.into_inner();

{
let mut times_saved = self.times_saved.lock().expect("times saved lock poisoned");
times_saved.insert(inner.hash.clone(), inner.time_saved);
}
match self
.watcher
.watch_globs(
Expand All @@ -277,17 +284,21 @@ impl<T: Watcher + Send + 'static> proto::turbod_server::Turbod for DaemonServer<
request: tonic::Request<proto::GetChangedOutputsRequest>,
) -> Result<tonic::Response<proto::GetChangedOutputsResponse>, tonic::Status> {
let inner = request.into_inner();
let hash = Arc::new(inner.hash);
let changed = self
.watcher
.changed_globs(
&Arc::new(inner.hash),
HashSet::from_iter(inner.output_globs),
)
.changed_globs(&hash, HashSet::from_iter(inner.output_globs))
.await;

let time_saved = {
let times_saved = self.times_saved.lock().expect("times saved lock poisoned");
times_saved.get(hash.as_str()).copied().unwrap_or_default()
};

match changed {
Ok(changed) => Ok(tonic::Response::new(proto::GetChangedOutputsResponse {
changed_output_globs: changed.into_iter().collect(),
time_saved: time_saved,
})),
Err(e) => {
error!("flush directory operation failed: {:?}", e);
Expand Down