Skip to content

GET route #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ jobs:
with:
go-version: '1.22'

- name: Set up Docker Compose
uses: ndeloof/[email protected]
with:
version: 'latest'

- name: Start containers
run: docker-compose -f docker-compose-github.yml up -d

Expand Down
12 changes: 12 additions & 0 deletions db/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ func GetModified(bucketName string, path string) (int64, error) {
return strconv.ParseInt(string(resp.Kvs[0].Value), 10, 64)
}

func GetKeys(prefix string) ([]string, error) {
resp, err := etcdClient.Get(etcdClient.Ctx(), prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
keys := make([]string, 0)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
}
return keys, nil
}

func SetModified(bucketName string, path string, modified int64) error {
_, err := etcdClient.Put(etcdClient.Ctx(), GetModifiedName(bucketName, path), strconv.FormatInt(modified, 10))
return err
Expand Down
13 changes: 11 additions & 2 deletions proxy/handlers/files.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package handlers

import (
"github.com/sio2project/ft-to-s3/v1/utils"

"net/http"
"strings"

"github.com/sio2project/ft-to-s3/v1/utils"
)

func Files(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) {
if r.Method == http.MethodPut {
Put(w, r, logger, bucketName)
} else if r.Method == http.MethodGet {
if strings.HasPrefix(r.URL.Path, "/files/") {
GetFiles(w, r, logger, bucketName)
} else if strings.HasPrefix(r.URL.Path, "/list/") {
GetList(w, r, logger, bucketName)
} else {
w.WriteHeader(http.StatusBadRequest)
}
} else {
w.WriteHeader(http.StatusMethodNotAllowed)
}
Expand Down
33 changes: 33 additions & 0 deletions proxy/handlers/get_files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package handlers

import (
"github.com/sio2project/ft-to-s3/v1/storage"
"github.com/sio2project/ft-to-s3/v1/utils"
"io"
"net/http"
"strconv"
)

func GetFiles(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) {
path := r.URL.Path[len("/files/"):]
result := storage.Get(bucketName, logger, path)
if result.Err != nil {
logger.Error("Error", result.Err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if !result.Found {
w.WriteHeader(http.StatusNotFound)
return
}
if result.Gziped {
w.Header().Set("Content-Encoding", "gzip")
}
w.Header().Set("Logical-Size", strconv.FormatInt(result.LogicalSize, 10))
w.Header().Set("Last-Modified", toRFC2822(result.LastModified))
w.WriteHeader(http.StatusOK)
_, err := io.Copy(w, result.File)
if err != nil {
logger.Error("Error", err)
}
}
36 changes: 36 additions & 0 deletions proxy/handlers/get_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package handlers

import (
"github.com/sio2project/ft-to-s3/v1/storage"
"github.com/sio2project/ft-to-s3/v1/utils"
"net/http"
)

func GetList(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) {
path := r.URL.Path[len("/list/"):]
lastModifiedRFC := r.URL.Query().Get("last_modified")
if lastModifiedRFC == "" {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("\"?last-modified=\" is required"))
return
}
lastModified, err := FromRFC2822(lastModifiedRFC)
if err != nil {
logger.Error("Error", err)
w.WriteHeader(http.StatusBadRequest)
return
}

files, err := storage.GetList(bucketName, logger, path, lastModified)
if err != nil {
logger.Error("Error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/text")
w.WriteHeader(http.StatusOK)
for _, file := range files {
w.Write([]byte(file + "\n"))
}
}
2 changes: 1 addition & 1 deletion proxy/handlers/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func Put(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, buc
compressed, digest, logicalSize)

if err != nil {
logger.Error("Error", err)
logger.Error("Error while storing the file:", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
123 changes: 103 additions & 20 deletions storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package storage
import (
"bytes"
"context"
"io"

"github.com/minio/minio-go/v7"
"github.com/sio2project/ft-to-s3/v1/db"
"github.com/sio2project/ft-to-s3/v1/utils"
"io"
)

func Store(bucketName string, logger *utils.LoggerObject, path string, reader io.Reader, version int64, size int64,
compressed bool, sha256Digest string, logicalSize int64) (int64, error) {
logger.Debug("storage.Store called on", bucketName+":"+path)

session := db.GetSession()
defer session.Close()
fileMutex := db.GetMutex(session, bucketName+":"+path)
Expand All @@ -20,75 +21,89 @@ func Store(bucketName string, logger *utils.LoggerObject, path string, reader io

dbModified, err := db.GetModified(bucketName, path)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while getting modified time", err)
}
if version <= dbModified {
return dbModified, nil
}
logger.Debug("Version is greater than dbModified")

oldFile, err := db.GetHashForPath(bucketName, path)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while getting hash for path", err)
}

options := minio.PutObjectOptions{}
var data bytes.Buffer
var headersCalculated bool
if sha256Digest == "" || logicalSize == -1 {
var data []byte
logger.Debug("Calculating sha256Digest and logicalSize")
headersCalculated = true
var tempData []byte
teeReader := io.TeeReader(reader, &data)
if compressed {
data, err = utils.ReadGzip(reader)
logger.Debug("File is compressed, reading gzip")
tempData, err = utils.ReadGzip(teeReader)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while reading gzip", err)
}
} else {
data, err = io.ReadAll(reader)
logger.Debug("File is not compressed, reading data")
tempData, err = io.ReadAll(teeReader)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while reading data", err)
}
}

sha256Digest = utils.Sha256Checksum(data)
logicalSize = int64(len(data))
reader = bytes.NewReader(data)
sha256Digest = utils.Sha256Checksum(tempData)
logicalSize = int64(len(tempData))
logger.Debug("Calculated sha256Digest and logicalSize")
}
if compressed {
logger.Debug("Setting ContentEncoding to gzip")
options.ContentEncoding = "gzip"
options.ContentType = "application/gzip"
}

refCount, err := db.GetRefCount(bucketName, sha256Digest)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while getting refCount", err)
}

if refCount == 0 {
logger.Debug("Storing with options ", options)
logger.Debug("Storing with options", options)
minioClient := GetClient()

_, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, reader, size, options)
if headersCalculated {
_, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, &data, size, options)
} else {
_, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, reader, size, options)
}
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while putting object", err)
}
}

logger.Info("Putting refFile")
err = db.SetHashForPath(bucketName, path, sha256Digest)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while setting hash for path", err)
}

logger.Info("Putting refCount")
err = db.SetRefCount(bucketName, sha256Digest, refCount+1)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while setting refCount", err)
}

err = db.SetModified(bucketName, path, version)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while setting modified", err)
}

err = deleteByHash(bucketName, logger, oldFile, false)
if err != nil {
return 0, err
return 0, utils.ErrorWrapper("Error while deleting old file", err)
}

return version, nil
Expand All @@ -98,3 +113,71 @@ func deleteByHash(bucketName string, logger *utils.LoggerObject, path string, lo
logger.Debug("DeleteByHash called on ", path)
return nil
}

func Get(bucketName string, logger *utils.LoggerObject, path string) *GetResult {
logger.Debug("storage.Get called on", bucketName+":"+path)

fileHash, err := db.GetHashForPath(bucketName, path)
if err != nil {
return &GetResult{Err: err}
}
if fileHash == "" {
return &GetResult{Found: false}
}

lastModified, err := db.GetModified(bucketName, path)
if err != nil {
return &GetResult{Err: err}
}

minioClient := GetClient()
info, err := minioClient.StatObject(context.Background(), bucketName, fileHash, minio.StatObjectOptions{})
if err != nil {
minioErr := minio.ToErrorResponse(err)
if minioErr.Code == "NoSuchKey" {
return &GetResult{Found: false}
}
return &GetResult{Err: err}
}
gziped := info.ContentType == "application/gzip"

reader, err := minioClient.GetObject(context.Background(), bucketName, fileHash, minio.GetObjectOptions{})
if err != nil {
minioErr := minio.ToErrorResponse(err)
if minioErr.Code == "NoSuchKey" {
return &GetResult{Found: false}
}
return &GetResult{Err: err}
}

return &GetResult{
Found: true,
File: reader,
Gziped: gziped,
LastModified: lastModified,
LogicalSize: info.Size,
}
}

func GetList(bucketName string, logger *utils.LoggerObject, path string, last_modified int64) ([]string, error) {
logger.Debug("storage.GetList called on", bucketName+":"+path)

prefix := db.GetModifiedName(bucketName, path)
keys, err := db.GetKeys(prefix)
if err != nil {
return nil, err
}

files := make([]string, 0)
for _, key := range keys {
modified, err := db.GetModified(bucketName, key)
if err != nil {
return nil, err
}
if modified > last_modified {
files = append(files, key)
}
}

return files, nil
}
12 changes: 12 additions & 0 deletions storage/structs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package storage

import "io"

type GetResult struct {
Found bool
File io.Reader
Gziped bool
LastModified int64
LogicalSize int64
Err error
}
14 changes: 14 additions & 0 deletions utils/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package utils

type Error struct {
Message string
Err error
}

func (e *Error) Error() string {
return e.Message + ": " + e.Err.Error()
}

func ErrorWrapper(msg string, err error) *Error {
return &Error{msg, err}
}
Loading