Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include timeSaved metric when skipping cache check #4952

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f9f3e16
refactor: Change Itemstatus signature to include TimeSaved metric
May 19, 2023
57c05e0
Improve log message when cache restoration is skipped
mehulkar May 15, 2023
01a4a6d
Return timeSaved metric even when cache restoration is skipped
mehulkar May 15, 2023
c39306c
Fix how Exists() works to make clear that its not composite, but firs…
May 19, 2023
b1dd3bc
Exit the loop early on a cache hit, since we're propaging a cache hit…
May 20, 2023
73674e8
Check for a cahce hit after handling errors
May 20, 2023
92bb2a4
extract duration extracter function instead of just a const
May 20, 2023
405bbbf
Make method private
May 20, 2023
5a45008
comments
May 20, 2023
b08e291
Update cli/internal/runcache/runcache.go
mehulkar May 23, 2023
a798459
Use const for cache source noop
May 20, 2023
320a368
Use CacheMiss helper for noop implementation
May 23, 2023
46fc3f4
Merge branch 'main' into mehulkar/turbo-1000-runs-include-timesaved-f…
mehulkar May 25, 2023
2f1b9fd
Store and retrieve timeSaved value from daemon (#5101)
mehulkar May 27, 2023
d94af44
Remove debug statements
mehulkar May 27, 2023
8ea8c8a
Fix linter
mehulkar May 27, 2023
ea267a6
Merge branch 'main' into mehulkar/turbo-1000-runs-include-timesaved-f…
mehulkar May 30, 2023
55105bc
revert log message
mehulkar May 30, 2023
32296b8
Remove Exists() check
mehulkar May 30, 2023
5bc4f12
set cache source to fs when skipping restoration
mehulkar May 30, 2023
f5e8183
Revert all the cacheStatus signature things
mehulkar May 31, 2023
e96c3bb
Revert the parts of runcache that aren't related to daemon get/set
mehulkar May 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/internal/cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, durati
return nil
}

func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, error) {
return c.realCache.Fetch(anchor, key, files)
}

Expand Down
68 changes: 45 additions & 23 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
type Cache interface {
// Fetch returns true if there is a cache it. It is expected to move files
// into their correct position as a side effect
Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error)
Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, error)
Exists(hash string) ItemStatus
// Put caches files for a given hash
Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
Expand All @@ -32,15 +32,44 @@ type Cache interface {
// ItemStatus holds whether artifacts exists for a given hash on local
// and/or remote caching server
type ItemStatus struct {
Local bool `json:"local"`
Remote bool `json:"remote"`
Hit bool
Source string // only relevant if Hit is true
TimeSaved int // will be 0 if Hit is false
}

// NewCacheMiss returns an ItemStatus with the fields set to indicate a cache miss
func NewCacheMiss() ItemStatus {
return ItemStatus{
Source: CacheSourceNone,
Hit: false,
TimeSaved: 0,
}
}

// newFSTaskCacheStatus returns an ItemStatus with the fields set to indicate a local cache hit
func newFSTaskCacheStatus(hit bool, timeSaved int) ItemStatus {
return ItemStatus{
Source: CacheSourceFS,
Hit: hit,
TimeSaved: timeSaved,
}
}

func newRemoteTaskCacheStatus(hit bool, timeSaved int) ItemStatus {
return ItemStatus{
Source: CacheSourceRemote,
Hit: hit,
TimeSaved: timeSaved,
}
}

const (
// CacheSourceFS is a constant to indicate local cache hit
CacheSourceFS = "LOCAL"
// CacheSourceRemote is a constant to indicate remote cache hit
CacheSourceRemote = "REMOTE"
// CacheSourceNone is an empty string because there is no source for a cache miss
CacheSourceNone = ""
mehulkar marked this conversation as resolved.
Show resolved Hide resolved
// CacheEventHit is a constant to indicate a cache hit
CacheEventHit = "HIT"
// CacheEventMiss is a constant to indicate a cache miss
Expand Down Expand Up @@ -240,24 +269,17 @@ func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) {
}
}

func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, error) {
// Make a shallow copy of the caches, since storeUntil can call removeCache
mplex.mu.RLock()
caches := make([]Cache, len(mplex.caches))
copy(caches, mplex.caches)
mplex.mu.RUnlock()

// We need to return a composite cache status from multiple caches
// Initialize the empty struct so we can assign values to it. This is similar
// to how the Exists() method works.
combinedCacheState := ItemStatus{}

// Retrieve from caches sequentially; if we did them simultaneously we could
// easily write the same file from two goroutines at once.
for i, cache := range caches {
itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files)
ok := itemStatus.Local || itemStatus.Remote

itemStatus, actualFiles, err := cache.Fetch(anchor, key, files)
if err != nil {
cd := &util.CacheDisabledError{}
if errors.As(err, &cd) {
Expand All @@ -271,31 +293,31 @@ func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key st
// the operation. Future work that plumbs UI / Logging into the cache system
// should probably log this at least.
}
if ok {

if itemStatus.Hit {
// Store this into other caches. We can ignore errors here because we know
// we have previously successfully stored in a higher-priority cache, and so the overall
// result is a success at fetching. Storing in lower-priority caches is an optimization.
_ = mplex.storeUntil(anchor, key, duration, actualFiles, i)
_ = mplex.storeUntil(anchor, key, itemStatus.TimeSaved, actualFiles, i)

// If another cache had already set this to true, we don't need to set it again from this cache
combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local
combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote
return combinedCacheState, actualFiles, duration, err
// Return this cache, and exit the for loop, since we don't need to keep looking.
return itemStatus, actualFiles, nil
}
}

return ItemStatus{Local: false, Remote: false}, nil, 0, nil
return NewCacheMiss(), nil, nil
}

// Exists check each cache sequentially and return the first one that has a cache hit
func (mplex *cacheMultiplexer) Exists(target string) ItemStatus {
syncCacheState := ItemStatus{}
for _, cache := range mplex.caches {
itemStatus := cache.Exists(target)
syncCacheState.Local = syncCacheState.Local || itemStatus.Local
syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote
if itemStatus.Hit {
return itemStatus
}
}

return syncCacheState
return NewCacheMiss()
}

func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) {
Expand Down
28 changes: 19 additions & 9 deletions cli/internal/cache/cache_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.Absol
}

// Fetch returns true if items are cached. It moves them into position as a side effect.
func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, error) {
uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar")
compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst")

Expand All @@ -45,44 +45,54 @@ func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []st
} else {
// It's not in the cache, bail now
f.logFetch(false, hash, 0)
return ItemStatus{Local: false}, nil, 0, nil
return newFSTaskCacheStatus(false, 0), nil, nil
}

cacheItem, openErr := cacheitem.Open(actualCachePath)
if openErr != nil {
return ItemStatus{Local: false}, nil, 0, openErr
return newFSTaskCacheStatus(false, 0), nil, openErr
}

restoredFiles, restoreErr := cacheItem.Restore(anchor)
if restoreErr != nil {
_ = cacheItem.Close()
return ItemStatus{Local: false}, nil, 0, restoreErr
return newFSTaskCacheStatus(false, 0), nil, restoreErr
}

meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json"))
if err != nil {
_ = cacheItem.Close()
return ItemStatus{Local: false}, nil, 0, fmt.Errorf("error reading cache metadata: %w", err)
return newFSTaskCacheStatus(false, 0), nil, fmt.Errorf("error reading cache metadata: %w", err)
}
f.logFetch(true, hash, meta.Duration)

// Wait to see what happens with close.
closeErr := cacheItem.Close()
if closeErr != nil {
return ItemStatus{Local: false}, restoredFiles, 0, closeErr
return newFSTaskCacheStatus(false, 0), restoredFiles, closeErr
}
return ItemStatus{Local: true}, restoredFiles, meta.Duration, nil
return newFSTaskCacheStatus(true, meta.Duration), restoredFiles, nil
}

// Exists returns the ItemStatus and the timeSaved
func (f *fsCache) Exists(hash string) ItemStatus {
uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar")
compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst")

status := newFSTaskCacheStatus(false, 0)
if compressedCachePath.FileExists() || uncompressedCachePath.FileExists() {
return ItemStatus{Local: true}
status.Hit = true
}

return ItemStatus{Local: false}
// Swallow the error
if meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json")); err != nil {
status.TimeSaved = 0
} else {
status.TimeSaved = meta.Duration
}

return status

}

func (f *fsCache) logFetch(hit bool, hash string, duration int) {
Expand Down
4 changes: 2 additions & 2 deletions cli/internal/cache/cache_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ func TestFetch(t *testing.T) {

outputDir := turbopath.AbsoluteSystemPath(t.TempDir())
dstOutputPath := "some-package"
cacheStatus, files, _, err := cache.Fetch(outputDir, "the-hash", []string{})
cacheStatus, files, err := cache.Fetch(outputDir, "the-hash", []string{})
assert.NilError(t, err, "Fetch")
hit := cacheStatus.Local || cacheStatus.Remote
hit := cacheStatus.Hit
if !hit {
t.Error("Fetch got false, want true")
}
Expand Down
49 changes: 29 additions & 20 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,26 @@ func (cache *httpCache) write(w io.WriteCloser, anchor turbopath.AbsoluteSystemP
cacheErrorChan <- cacheItem.Close()
}

func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, error) {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
hit, files, duration, err := cache.retrieve(key)
if err != nil {
// TODO: analytics event?
return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
return newRemoteTaskCacheStatus(false, duration), files, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
}
cache.logFetch(hit, key, duration)
return ItemStatus{Remote: hit}, files, duration, err
return newRemoteTaskCacheStatus(hit, duration), files, err
}

func (cache *httpCache) Exists(key string) ItemStatus {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
hit, err := cache.exists(key)
hit, timeSaved, err := cache.exists(key)
if err != nil {
return ItemStatus{Remote: false}
return newRemoteTaskCacheStatus(false, 0)
}
return ItemStatus{Remote: hit}
return newRemoteTaskCacheStatus(hit, timeSaved)
}

func (cache *httpCache) logFetch(hit bool, hash string, duration int) {
Expand All @@ -130,20 +130,22 @@ func (cache *httpCache) logFetch(hit bool, hash string, duration int) {
cache.recorder.LogEvent(payload)
}

func (cache *httpCache) exists(hash string) (bool, error) {
func (cache *httpCache) exists(hash string) (bool, int, error) {
resp, err := cache.client.ArtifactExists(hash)
if err != nil {
return false, nil
return false, 0, nil
}

defer func() { err = resp.Body.Close() }()

if resp.StatusCode == http.StatusNotFound {
return false, nil
return false, 0, nil
} else if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode))
return false, 0, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode))
}
return true, err

duration := getDurationFromResponse(resp)
return true, duration, err
}

func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemPath, int, error) {
Expand All @@ -158,15 +160,9 @@ func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemP
b, _ := ioutil.ReadAll(resp.Body)
return false, nil, 0, fmt.Errorf("%s", string(b))
}
// If present, extract the duration from the response.
duration := 0
if resp.Header.Get("x-artifact-duration") != "" {
intVar, err := strconv.Atoi(resp.Header.Get("x-artifact-duration"))
if err != nil {
return false, nil, 0, fmt.Errorf("invalid x-artifact-duration header: %w", err)
}
duration = intVar
}

duration := getDurationFromResponse(resp)

var tarReader io.Reader

defer func() { _ = resp.Body.Close() }()
Expand Down Expand Up @@ -200,6 +196,19 @@ func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemP
return true, files, duration, nil
}

// getDurationFromResponse extracts the duration from the response header
func getDurationFromResponse(resp *http.Response) int {
duration := 0
if resp.Header.Get("x-artifact-duration") != "" {
// If we had an error reading the duration header, just swallow it for now.
if intVar, err := strconv.Atoi(resp.Header.Get("x-artifact-duration")); err == nil {
duration = intVar
}
}

return duration
}

func restoreTar(root turbopath.AbsoluteSystemPath, reader io.Reader) ([]turbopath.AnchoredSystemPath, error) {
cache := cacheitem.FromReader(reader, true)
return cache.Restore(root)
Expand Down
2 changes: 1 addition & 1 deletion cli/internal/cache/cache_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestRemoteCachingDisabled(t *testing.T) {
requestLimiter: make(limiter, 20),
}
cd := &util.CacheDisabledError{}
_, _, _, err := cache.Fetch("unused-target", "some-hash", []string{"unused", "outputs"})
_, _, err := cache.Fetch("unused-target", "some-hash", []string{"unused", "outputs"})
if !errors.As(err, &cd) {
t.Errorf("cache.Fetch err got %v, want a CacheDisabled error", err)
}
Expand Down
7 changes: 4 additions & 3 deletions cli/internal/cache/cache_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ func newNoopCache() *noopCache {
func (c *noopCache) Put(_ turbopath.AbsoluteSystemPath, _ string, _ int, _ []turbopath.AnchoredSystemPath) error {
return nil
}
func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
return ItemStatus{Local: false, Remote: false}, nil, 0, nil
func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, error) {
return ItemStatus{Source: "Noop", Hit: false, TimeSaved: 0}, nil, nil
}

func (c *noopCache) Exists(_ string) ItemStatus {
return ItemStatus{}
return ItemStatus{Source: CacheSourceNone, Hit: false, TimeSaved: 0}
}

func (c *noopCache) Clean(_ turbopath.AbsoluteSystemPath) {}
Expand Down
Loading