Skip to content

Commit

Permalink
Return timeSaved metric even when cache restoration is skipped
Browse files Browse the repository at this point in the history
  • Loading branch information
mehulkar authored and Mehul Kar committed May 17, 2023
1 parent 6c31141 commit d380677
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cli/internal/cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, file
return c.realCache.Fetch(anchor, key, files)
}

func (c *asyncCache) Exists(key string) ItemStatus {
func (c *asyncCache) Exists(key string) (ItemStatus, int) {
return c.realCache.Exists(key)
}

Expand Down
20 changes: 16 additions & 4 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Cache interface {
// Fetch returns true if there is a cache it. It is expected to move files
// into their correct position as a side effect
Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error)
Exists(hash string) ItemStatus
Exists(hash string) (ItemStatus, int)
// Put caches files for a given hash
Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
Clean(anchor turbopath.AbsoluteSystemPath)
Expand Down Expand Up @@ -271,6 +271,7 @@ func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key st
// the operation. Future work that plumbs UI / Logging into the cache system
// should probably log this at least.
}

if ok {
// Store this into other caches. We can ignore errors here because we know
// we have previously successfully stored in a higher-priority cache, and so the overall
Expand All @@ -287,15 +288,26 @@ func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key st
return ItemStatus{Local: false, Remote: false}, nil, 0, nil
}

func (mplex *cacheMultiplexer) Exists(target string) ItemStatus {
func (mplex *cacheMultiplexer) Exists(target string) (ItemStatus, int) {
syncCacheState := ItemStatus{}
syncTimeSaved := 0

for _, cache := range mplex.caches {
itemStatus := cache.Exists(target)
itemStatus, timeSaved := cache.Exists(target)

syncCacheState.Local = syncCacheState.Local || itemStatus.Local
syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote

// If one of the caches saved more time, use that.
// TODO: it would make more sense to return the right value with the right cache,
// and then downstream pick the right cache to restore from, and then use _that_ timeSaved
// number, but that involves reworking the caching logic.
if timeSaved > syncTimeSaved {
syncTimeSaved = timeSaved
}
}

return syncCacheState
return syncCacheState, syncTimeSaved
}

func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) {
Expand Down
19 changes: 16 additions & 3 deletions cli/internal/cache/cache_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type fsCache struct {
cacheDirectory turbopath.AbsoluteSystemPath
recorder analytics.Recorder
label string
}

// newFsCache creates a new filesystem cache
Expand All @@ -27,6 +28,7 @@ func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.Absol
return nil, err
}
return &fsCache{
label: CacheSourceFS,
cacheDirectory: cacheDir,
recorder: recorder,
}, nil
Expand Down Expand Up @@ -74,15 +76,26 @@ func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []st
return ItemStatus{Local: true}, restoredFiles, meta.Duration, nil
}

func (f *fsCache) Exists(hash string) ItemStatus {
// Exists returns the ItemStatus and the timeSaved
func (f *fsCache) Exists(hash string) (ItemStatus, int) {
uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar")
compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst")

status := ItemStatus{Local: false}
if compressedCachePath.FileExists() || uncompressedCachePath.FileExists() {
return ItemStatus{Local: true}
status.Local = true
}

return ItemStatus{Local: false}
// Swallow the error
var duration int
if meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json")); err != nil {
return status, 0
} else {
duration = meta.Duration
}

return status, duration

}

func (f *fsCache) logFetch(hit bool, hash string, duration int) {
Expand Down
38 changes: 26 additions & 12 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/vercel/turbo/cli/internal/turbopath"
)

const durationHeaderName = "x-artifact-duration"

type client interface {
PutArtifact(hash string, body []byte, duration int, tag string) error
FetchArtifact(hash string) (*http.Response, error)
Expand All @@ -31,6 +33,7 @@ type httpCache struct {
recorder analytics.Recorder
signerVerifier *ArtifactSignatureAuthentication
repoRoot turbopath.AbsoluteSystemPath
label string
}

type limiter chan struct{}
Expand Down Expand Up @@ -104,14 +107,14 @@ func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []st
return ItemStatus{Remote: hit}, files, duration, err
}

func (cache *httpCache) Exists(key string) ItemStatus {
func (cache *httpCache) Exists(key string) (ItemStatus, int) {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
hit, err := cache.exists(key)
hit, timeSaved, err := cache.exists(key)
if err != nil {
return ItemStatus{Remote: false}
return ItemStatus{Remote: false}, timeSaved
}
return ItemStatus{Remote: hit}
return ItemStatus{Remote: hit}, timeSaved
}

func (cache *httpCache) logFetch(hit bool, hash string, duration int) {
Expand All @@ -130,20 +133,30 @@ func (cache *httpCache) logFetch(hit bool, hash string, duration int) {
cache.recorder.LogEvent(payload)
}

func (cache *httpCache) exists(hash string) (bool, error) {
func (cache *httpCache) exists(hash string) (bool, int, error) {
resp, err := cache.client.ArtifactExists(hash)
if err != nil {
return false, nil
return false, 0, nil
}

defer func() { err = resp.Body.Close() }()

if resp.StatusCode == http.StatusNotFound {
return false, nil
return false, 0, nil
} else if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode))
return false, 0, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode))
}
return true, err

// If present, extract the duration from the response.
duration := 0
if resp.Header.Get(durationHeaderName) != "" {
// If we had an error reading the duration header, just swallow it for now.
if intVar, err := strconv.Atoi(resp.Header.Get(durationHeaderName)); err == nil {
duration = intVar
}
}

return true, duration, err
}

func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemPath, int, error) {
Expand All @@ -160,10 +173,10 @@ func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemP
}
// If present, extract the duration from the response.
duration := 0
if resp.Header.Get("x-artifact-duration") != "" {
intVar, err := strconv.Atoi(resp.Header.Get("x-artifact-duration"))
if resp.Header.Get(durationHeaderName) != "" {
intVar, err := strconv.Atoi(resp.Header.Get(durationHeaderName))
if err != nil {
return false, nil, 0, fmt.Errorf("invalid x-artifact-duration header: %w", err)
return false, nil, 0, fmt.Errorf("invalid %s header: %w", durationHeaderName, err)
}
duration = intVar
}
Expand Down Expand Up @@ -217,6 +230,7 @@ func (cache *httpCache) Shutdown() {}

func newHTTPCache(opts Opts, client client, recorder analytics.Recorder, repoRoot turbopath.AbsoluteSystemPath) *httpCache {
return &httpCache{
label: CacheSourceRemote,
writable: true,
client: client,
requestLimiter: make(limiter, 20),
Expand Down
4 changes: 2 additions & 2 deletions cli/internal/cache/cache_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func (c *noopCache) Put(_ turbopath.AbsoluteSystemPath, _ string, _ int, _ []tur
func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
return ItemStatus{Local: false, Remote: false}, nil, 0, nil
}
func (c *noopCache) Exists(_ string) ItemStatus {
return ItemStatus{}
func (c *noopCache) Exists(_ string) (ItemStatus, int) {
return ItemStatus{}, 0
}

func (c *noopCache) Clean(_ turbopath.AbsoluteSystemPath) {}
Expand Down
1 change: 1 addition & 0 deletions cli/internal/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (c *APIClient) getArtifact(hash string, httpMethod string) (*http.Response,
if allowAuth {
req.Header.Set("Authorization", "Bearer "+c.token)
}

req.Header.Set("User-Agent", c.userAgent())
if err != nil {
return nil, fmt.Errorf("invalid cache URL: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions cli/internal/run/dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func populateCacheState(turboCache cache.Cache, taskSummaries []*runsummary.Task
defer wg.Done()
for index := range queue {
task := taskSummaries[index]
itemStatus := turboCache.Exists(task.Hash)
task.CacheSummary = runsummary.NewTaskCacheSummary(itemStatus, nil)
itemStatus, timeSaved := turboCache.Exists(task.Hash)
task.CacheSummary = runsummary.NewTaskCacheSummary(itemStatus, &timeSaved)
}
}()
}
Expand Down
5 changes: 5 additions & 0 deletions cli/internal/runcache/runcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe
} else {
// If outputs have not changed changed, that means we have a local cache hit.
cacheStatus.Local = true

// Check and assign timeSaved value.
_, saved := tc.rc.cache.Exists(tc.hash)
timeSaved = saved

prefixedUI.Warn(fmt.Sprintf("Configured outputs are already in place for %s, skipping cache check", tc.pt.TaskID))
}

Expand Down

0 comments on commit d380677

Please sign in to comment.