Skip to content

Commit

Permalink
Move http cache writing to cacheitem.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Hammond authored and Nathan Hammond committed May 12, 2023
1 parent 087ada2 commit a220a80
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 75 deletions.
89 changes: 18 additions & 71 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,16 @@
package cache

import (
"archive/tar"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
log "log"
"net/http"
"os"
"path/filepath"
"strconv"
"time"

"github.com/DataDog/zstd"

"github.com/vercel/turbo/cli/internal/analytics"
"github.com/vercel/turbo/cli/internal/cacheitem"
"github.com/vercel/turbo/cli/internal/tarpatch"
"github.com/vercel/turbo/cli/internal/turbopath"
)

Expand Down Expand Up @@ -51,19 +43,15 @@ func (l limiter) release() {
<-l
}

// mtime is the time we attach for the modification time of all files.
var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)

// nobody is the usual uid / gid of the 'nobody' user.
const nobody = 65534

func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
func (cache *httpCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
// if cache.writable {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()

r, w := io.Pipe()
go cache.write(w, hash, files)

cacheErrorChan := make(chan error, 1)
go cache.write(w, anchor, files, cacheErrorChan)

// Read the entire artifact tar into memory so we can easily compute the signature.
// Note: retryablehttp.NewRequest reads the files into memory anyways so there's no
Expand All @@ -79,69 +67,28 @@ func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duratio
return fmt.Errorf("failed to store files in HTTP cache: %w", err)
}
}

cacheCreateError := <-cacheErrorChan
if cacheCreateError != nil {
return cacheCreateError
}

return cache.client.PutArtifact(hash, artifactBody, duration, tag)
}

// write writes a series of files into the given Writer.
func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) {
defer w.Close()
defer func() { _ = w.Close() }()
zw := zstd.NewWriter(w)
defer func() { _ = zw.Close() }()
tw := tar.NewWriter(zw)
defer func() { _ = tw.Close() }()
for _, file := range files {
// log.Printf("caching file %v", file)
if err := cache.storeFile(tw, file); err != nil {
log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err)
// TODO(jaredpalmer): How can we cancel the request at this point?
}
}
}
func (cache *httpCache) write(w io.WriteCloser, anchor turbopath.AbsoluteSystemPath, files []turbopath.AnchoredSystemPath, cacheErrorChan chan error) {
cacheItem := cacheitem.CreateWriter(w)

func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error {
absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot)
info, err := absoluteFilePath.Lstat()
if err != nil {
return err
}
target := ""
if info.Mode()&os.ModeSymlink != 0 {
target, err = absoluteFilePath.Readlink()
for _, file := range files {
err := cacheItem.AddFile(anchor, file)
if err != nil {
return err
_ = cacheItem.Close()
cacheErrorChan <- err
}
}
hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target))
if err != nil {
return err
}
// Ensure posix path for filename written in header.
hdr.Name = repoRelativePath.ToUnixPath().ToString()
// Zero out all timestamps.
hdr.ModTime = mtime
hdr.AccessTime = mtime
hdr.ChangeTime = mtime
// Strip user/group ids.
hdr.Uid = nobody
hdr.Gid = nobody
hdr.Uname = "nobody"
hdr.Gname = "nobody"
if err := tw.WriteHeader(hdr); err != nil {
return err
} else if info.IsDir() || target != "" {
return nil // nothing to write
}
f, err := absoluteFilePath.Open()
if err != nil {
return err
}
defer func() { _ = f.Close() }()
_, err = io.Copy(tw, f)
if errors.Is(err, tar.ErrWriteTooLong) {
log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular())
}
return err

cacheErrorChan <- cacheItem.Close()
}

func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
Expand Down
10 changes: 8 additions & 2 deletions cli/internal/cacheitem/cacheitem.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type CacheItem struct {
tw *tar.Writer
zw io.WriteCloser
fileBuffer *bufio.Writer
handle io.Reader
handle interface{}
compressed bool
}

Expand Down Expand Up @@ -72,7 +72,13 @@ func (ci *CacheItem) Close() error {
// GetSha returns the SHA-512 hash for the CacheItem.
func (ci *CacheItem) GetSha() ([]byte, error) {
sha := sha512.New()
if _, err := io.Copy(sha, ci.handle); err != nil {

reader, isReader := ci.handle.(io.Reader)
if !isReader {
panic("can't read from this cache item")
}

if _, err := io.Copy(sha, reader); err != nil {
return nil, err
}

Expand Down
11 changes: 11 additions & 0 deletions cli/internal/cacheitem/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ func Create(path turbopath.AbsoluteSystemPath) (*CacheItem, error) {
return cacheItem, nil
}

// CreateWriter makes a new CacheItem using the specified writer.
func CreateWriter(writer io.WriteCloser) *CacheItem {
cacheItem := &CacheItem{
handle: writer,
compressed: true,
}

cacheItem.init()
return cacheItem
}

// init prepares the CacheItem for writing.
// Wires all the writers end-to-end:
// tar.Writer -> zstd.Writer -> fileBuffer -> file
Expand Down
9 changes: 7 additions & 2 deletions cli/internal/cacheitem/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ func (ci *CacheItem) Restore(anchor turbopath.AbsoluteSystemPath) ([]turbopath.A
var tr *tar.Reader
var closeError error

reader, isReader := ci.handle.(io.Reader)
if !isReader {
panic("can't read from this cache item")
}

// We're reading a tar, possibly wrapped in zstd.
if ci.compressed {
zr := zstd.NewReader(ci.handle)
zr := zstd.NewReader(reader)

// The `Close` function for compression effectively just returns the singular
// error field on the decompressor instance. This is extremely unlikely to be
Expand All @@ -52,7 +57,7 @@ func (ci *CacheItem) Restore(anchor turbopath.AbsoluteSystemPath) ([]turbopath.A
defer func() { closeError = zr.Close() }()
tr = tar.NewReader(zr)
} else {
tr = tar.NewReader(ci.handle)
tr = tar.NewReader(reader)
}

// On first attempt to restore it's possible that a link target doesn't exist.
Expand Down

0 comments on commit a220a80

Please sign in to comment.