diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2f6f857..b22a6b0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,6 +18,11 @@ jobs: with: go-version: '1.22' + - name: Set up Docker Compose + uses: ndeloof/install-compose-action@v0.0.1 + with: + version: 'latest' + - name: Start containers run: docker-compose -f docker-compose-github.yml up -d diff --git a/db/etcd.go b/db/etcd.go index 0c79c81..c996fad 100644 --- a/db/etcd.go +++ b/db/etcd.go @@ -54,6 +54,18 @@ func GetModified(bucketName string, path string) (int64, error) { return strconv.ParseInt(string(resp.Kvs[0].Value), 10, 64) } +func GetKeys(prefix string) ([]string, error) { + resp, err := etcdClient.Get(etcdClient.Ctx(), prefix, clientv3.WithPrefix()) + if err != nil { + return nil, err + } + keys := make([]string, 0) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + } + return keys, nil +} + func SetModified(bucketName string, path string, modified int64) error { _, err := etcdClient.Put(etcdClient.Ctx(), GetModifiedName(bucketName, path), strconv.FormatInt(modified, 10)) return err diff --git a/proxy/handlers/files.go b/proxy/handlers/files.go index 9a45abf..89b0ed4 100644 --- a/proxy/handlers/files.go +++ b/proxy/handlers/files.go @@ -1,14 +1,23 @@ package handlers import ( - "github.com/sio2project/ft-to-s3/v1/utils" - "net/http" + "strings" + + "github.com/sio2project/ft-to-s3/v1/utils" ) func Files(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) { if r.Method == http.MethodPut { Put(w, r, logger, bucketName) + } else if r.Method == http.MethodGet { + if strings.HasPrefix(r.URL.Path, "/files/") { + GetFiles(w, r, logger, bucketName) + } else if strings.HasPrefix(r.URL.Path, "/list/") { + GetList(w, r, logger, bucketName) + } else { + w.WriteHeader(http.StatusBadRequest) + } } else { w.WriteHeader(http.StatusMethodNotAllowed) } diff --git a/proxy/handlers/get_files.go b/proxy/handlers/get_files.go new file mode 100644 index 0000000..cb0d15a --- /dev/null +++ b/proxy/handlers/get_files.go @@ -0,0 +1,33 @@ +package handlers + +import ( + "github.com/sio2project/ft-to-s3/v1/storage" + "github.com/sio2project/ft-to-s3/v1/utils" + "io" + "net/http" + "strconv" +) + +func GetFiles(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) { + path := r.URL.Path[len("/files/"):] + result := storage.Get(bucketName, logger, path) + if result.Err != nil { + logger.Error("Error", result.Err) + w.WriteHeader(http.StatusInternalServerError) + return + } + if !result.Found { + w.WriteHeader(http.StatusNotFound) + return + } + if result.Gziped { + w.Header().Set("Content-Encoding", "gzip") + } + w.Header().Set("Logical-Size", strconv.FormatInt(result.LogicalSize, 10)) + w.Header().Set("Last-Modified", toRFC2822(result.LastModified)) + w.WriteHeader(http.StatusOK) + _, err := io.Copy(w, result.File) + if err != nil { + logger.Error("Error", err) + } +} diff --git a/proxy/handlers/get_list.go b/proxy/handlers/get_list.go new file mode 100644 index 0000000..1eab44a --- /dev/null +++ b/proxy/handlers/get_list.go @@ -0,0 +1,36 @@ +package handlers + +import ( + "github.com/sio2project/ft-to-s3/v1/storage" + "github.com/sio2project/ft-to-s3/v1/utils" + "net/http" +) + +func GetList(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) { + path := r.URL.Path[len("/list/"):] + lastModifiedRFC := r.URL.Query().Get("last_modified") + if lastModifiedRFC == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("\"?last-modified=\" is required")) + return + } + lastModified, err := FromRFC2822(lastModifiedRFC) + if err != nil { + logger.Error("Error", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + files, err := storage.GetList(bucketName, logger, path, lastModified) + if err != nil { + logger.Error("Error", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/text") + w.WriteHeader(http.StatusOK) + for _, file := range files { + w.Write([]byte(file + "\n")) + } +} diff --git a/proxy/handlers/put.go b/proxy/handlers/put.go index 5961ca0..2e26cb1 100644 --- a/proxy/handlers/put.go +++ b/proxy/handlers/put.go @@ -51,7 +51,7 @@ func Put(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, buc compressed, digest, logicalSize) if err != nil { - logger.Error("Error", err) + logger.Error("Error while storing the file:", err) w.WriteHeader(http.StatusInternalServerError) return } diff --git a/storage/main.go b/storage/main.go index 3fd0a63..32515a6 100644 --- a/storage/main.go +++ b/storage/main.go @@ -3,15 +3,16 @@ package storage import ( "bytes" "context" - "io" - "github.com/minio/minio-go/v7" "github.com/sio2project/ft-to-s3/v1/db" "github.com/sio2project/ft-to-s3/v1/utils" + "io" ) func Store(bucketName string, logger *utils.LoggerObject, path string, reader io.Reader, version int64, size int64, compressed bool, sha256Digest string, logicalSize int64) (int64, error) { + logger.Debug("storage.Store called on", bucketName+":"+path) + session := db.GetSession() defer session.Close() fileMutex := db.GetMutex(session, bucketName+":"+path) @@ -20,75 +21,89 @@ func Store(bucketName string, logger *utils.LoggerObject, path string, reader io dbModified, err := db.GetModified(bucketName, path) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while getting modified time", err) } if version <= dbModified { return dbModified, nil } + logger.Debug("Version is greater than dbModified") oldFile, err := db.GetHashForPath(bucketName, path) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while getting hash for path", err) } options := minio.PutObjectOptions{} + var data bytes.Buffer + var headersCalculated bool if sha256Digest == "" || logicalSize == -1 { - var data []byte + logger.Debug("Calculating sha256Digest and logicalSize") + headersCalculated = true + var tempData []byte + teeReader := io.TeeReader(reader, &data) if compressed { - data, err = utils.ReadGzip(reader) + logger.Debug("File is compressed, reading gzip") + tempData, err = utils.ReadGzip(teeReader) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while reading gzip", err) } } else { - data, err = io.ReadAll(reader) + logger.Debug("File is not compressed, reading data") + tempData, err = io.ReadAll(teeReader) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while reading data", err) } } - sha256Digest = utils.Sha256Checksum(data) - logicalSize = int64(len(data)) - reader = bytes.NewReader(data) + sha256Digest = utils.Sha256Checksum(tempData) + logicalSize = int64(len(tempData)) + logger.Debug("Calculated sha256Digest and logicalSize") } if compressed { + logger.Debug("Setting ContentEncoding to gzip") options.ContentEncoding = "gzip" + options.ContentType = "application/gzip" } refCount, err := db.GetRefCount(bucketName, sha256Digest) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while getting refCount", err) } if refCount == 0 { - logger.Debug("Storing with options ", options) + logger.Debug("Storing with options", options) minioClient := GetClient() - _, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, reader, size, options) + if headersCalculated { + _, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, &data, size, options) + } else { + _, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, reader, size, options) + } if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while putting object", err) } } logger.Info("Putting refFile") err = db.SetHashForPath(bucketName, path, sha256Digest) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while setting hash for path", err) } logger.Info("Putting refCount") err = db.SetRefCount(bucketName, sha256Digest, refCount+1) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while setting refCount", err) } err = db.SetModified(bucketName, path, version) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while setting modified", err) } err = deleteByHash(bucketName, logger, oldFile, false) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while deleting old file", err) } return version, nil @@ -98,3 +113,71 @@ func deleteByHash(bucketName string, logger *utils.LoggerObject, path string, lo logger.Debug("DeleteByHash called on ", path) return nil } + +func Get(bucketName string, logger *utils.LoggerObject, path string) *GetResult { + logger.Debug("storage.Get called on", bucketName+":"+path) + + fileHash, err := db.GetHashForPath(bucketName, path) + if err != nil { + return &GetResult{Err: err} + } + if fileHash == "" { + return &GetResult{Found: false} + } + + lastModified, err := db.GetModified(bucketName, path) + if err != nil { + return &GetResult{Err: err} + } + + minioClient := GetClient() + info, err := minioClient.StatObject(context.Background(), bucketName, fileHash, minio.StatObjectOptions{}) + if err != nil { + minioErr := minio.ToErrorResponse(err) + if minioErr.Code == "NoSuchKey" { + return &GetResult{Found: false} + } + return &GetResult{Err: err} + } + gziped := info.ContentType == "application/gzip" + + reader, err := minioClient.GetObject(context.Background(), bucketName, fileHash, minio.GetObjectOptions{}) + if err != nil { + minioErr := minio.ToErrorResponse(err) + if minioErr.Code == "NoSuchKey" { + return &GetResult{Found: false} + } + return &GetResult{Err: err} + } + + return &GetResult{ + Found: true, + File: reader, + Gziped: gziped, + LastModified: lastModified, + LogicalSize: info.Size, + } +} + +func GetList(bucketName string, logger *utils.LoggerObject, path string, last_modified int64) ([]string, error) { + logger.Debug("storage.GetList called on", bucketName+":"+path) + + prefix := db.GetModifiedName(bucketName, path) + keys, err := db.GetKeys(prefix) + if err != nil { + return nil, err + } + + files := make([]string, 0) + for _, key := range keys { + modified, err := db.GetModified(bucketName, key) + if err != nil { + return nil, err + } + if modified > last_modified { + files = append(files, key) + } + } + + return files, nil +} diff --git a/storage/structs.go b/storage/structs.go new file mode 100644 index 0000000..ae6017b --- /dev/null +++ b/storage/structs.go @@ -0,0 +1,12 @@ +package storage + +import "io" + +type GetResult struct { + Found bool + File io.Reader + Gziped bool + LastModified int64 + LogicalSize int64 + Err error +} diff --git a/utils/error.go b/utils/error.go new file mode 100644 index 0000000..00c5aff --- /dev/null +++ b/utils/error.go @@ -0,0 +1,14 @@ +package utils + +type Error struct { + Message string + Err error +} + +func (e *Error) Error() string { + return e.Message + ": " + e.Err.Error() +} + +func ErrorWrapper(msg string, err error) *Error { + return &Error{msg, err} +}