Skip to content

Commit

Permalink
handler, s3store, filestore: Serve GET requests from storage provider (
Browse files Browse the repository at this point in the history
…#1228)

* feat(handler, s3store): implement ContentServerDataStore for direct content serving, closes #1064

- Add ServerDataStore interface
- Implement ContentServerDataStore in S3Store with streaming support
- Add Range header support for partial content requests
- Update StoreComposer to support ContentServer capability
- Add tests for new ContentServerDataStore functionality
- Update Go version to 1.22.1

* Add documentation

* Set Content-Type and Content-Disposition in handler

* Handle range request, conditional requests and errors better

* Move implementation and test into own files

* Use `store.GetUpload` to simulate more realistic usage

* Return proper error for incomplete uploads

* Rename variable

* Remove debug logging

* Log outgoing status code

* Implement content server for filestore

* Undo changes in `go.mod`

---------

Co-authored-by: Derrick Hammer <[email protected]>
  • Loading branch information
Acconut and pcfreak30 authored Dec 20, 2024
1 parent 9e5ba70 commit e20b174
Show file tree
Hide file tree
Showing 8 changed files with 513 additions and 1 deletion.
12 changes: 12 additions & 0 deletions pkg/filestore/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"

Expand Down Expand Up @@ -50,6 +51,7 @@ func (store FileStore) UseIn(composer *handler.StoreComposer) {
composer.UseTerminater(store)
composer.UseConcater(store)
composer.UseLengthDeferrer(store)
composer.UseContentServer(store)
}

func (store FileStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
Expand Down Expand Up @@ -156,6 +158,10 @@ func (store FileStore) AsConcatableUpload(upload handler.Upload) handler.Concata
return upload.(*fileUpload)
}

func (store FileStore) AsServableUpload(upload handler.Upload) handler.ServableUpload {
return upload.(*fileUpload)
}

// defaultBinPath returns the path to the file storing the binary data, if it is
// not customized using the pre-create hook.
func (store FileStore) defaultBinPath(id string) string {
Expand Down Expand Up @@ -268,6 +274,12 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
return nil
}

func (upload *fileUpload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
http.ServeFile(w, r, upload.binPath)

return nil
}

// createFile creates the file with the content. If the corresponding directory does not exist,
// it is created. If the file already exists, its content is removed.
func createFile(path string, content []byte) error {
Expand Down
17 changes: 17 additions & 0 deletions pkg/filestore/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package filestore
import (
"context"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -68,6 +70,21 @@ func TestFilestore(t *testing.T) {
a.Equal("hello world", string(content))
reader.(io.Closer).Close()

// Serve content
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/", nil)
r.Header.Set("Range", "bytes=0-4")

err = store.AsServableUpload(upload).ServeContent(context.Background(), w, r)
a.Nil(err)

a.Equal(http.StatusPartialContent, w.Code)
a.Equal("5", w.Header().Get("Content-Length"))
a.Equal("text/plain; charset=utf-8", w.Header().Get("Content-Type"))
a.Equal("bytes 0-4/11", w.Header().Get("Content-Range"))
a.NotEqual("", w.Header().Get("Last-Modified"))
a.Equal("hello", w.Body.String())

// Terminate upload
a.NoError(store.AsTerminatableUpload(upload).Terminate(ctx))

Expand Down
7 changes: 7 additions & 0 deletions pkg/handler/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type StoreComposer struct {
Concater ConcaterDataStore
UsesLengthDeferrer bool
LengthDeferrer LengthDeferrerDataStore
ContentServer ContentServerDataStore
UsesContentServer bool
}

// NewStoreComposer creates a new and empty store composer.
Expand Down Expand Up @@ -85,3 +87,8 @@ func (store *StoreComposer) UseLengthDeferrer(ext LengthDeferrerDataStore) {
store.UsesLengthDeferrer = ext != nil
store.LengthDeferrer = ext
}

func (store *StoreComposer) UseContentServer(ext ContentServerDataStore) {
store.UsesContentServer = ext != nil
store.ContentServer = ext
}
19 changes: 19 additions & 0 deletions pkg/handler/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"context"
"io"
"net/http"
)

type MetaData map[string]string
Expand Down Expand Up @@ -191,3 +192,21 @@ type Lock interface {
// Unlock releases an existing lock for the given upload.
Unlock() error
}

type ServableUpload interface {
// ServeContent serves the uploaded data as specified by the GET request.
// It allows data stores to delegate the handling of range requests and conditional
// requests to their underlying providers.
// The tusd handler will set the Content-Type and Content-Disposition headers
// before calling ServeContent, but the implementation can override them.
// After calling ServeContent, the handler will not take any further action
// other than handling a potential error.
ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error
}

// ContentServerDataStore is the interface for DataStores that can serve content directly.
// When the handler serves a GET request, it will pass the request to ServeContent
// and delegate its handling to the DataStore, instead of using GetReader to obtain the content.
type ContentServerDataStore interface {
AsServableUpload(upload Upload) ServableUpload
}
48 changes: 48 additions & 0 deletions pkg/handler/unrouted_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return
}

// Fall back to the existing GetReader implementation if ContentServerDataStore is not implemented
contentType, contentDisposition := filterContentType(info)
resp := HTTPResponse{
StatusCode: http.StatusOK,
Expand All @@ -1058,13 +1059,43 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
Body: "", // Body is intentionally left empty, and we copy it manually in later.
}

// If the data store implements ContentServerDataStore, use delegate the handling
// of GET requests to the data store.
// Otherwise, we will use the existing GetReader implementation.
if handler.composer.UsesContentServer {
servableUpload := handler.composer.ContentServer.AsServableUpload(upload)

// Pass file type and name to the implementation, but it may override them.
w.Header().Set("Content-Type", resp.Header["Content-Type"])
w.Header().Set("Content-Disposition", resp.Header["Content-Disposition"])

// Use loggingResponseWriter to get the ResponseOutgoing log entry that
// normally handler.sendResp would produce.
loggingW := &loggingResponseWriter{ResponseWriter: w, logger: c.log}

err = servableUpload.ServeContent(c, loggingW, r)
if err != nil {
handler.sendError(c, err)
}
return
}

// If no data has been uploaded yet, respond with an empty "204 No Content" status.
if info.Offset == 0 {
resp.StatusCode = http.StatusNoContent
handler.sendResp(c, resp)
return
}

if handler.composer.UsesContentServer {
servableUpload := handler.composer.ContentServer.AsServableUpload(upload)
err = servableUpload.ServeContent(c, w, r)
if err != nil {
handler.sendError(c, err)
}
return
}

src, err := upload.GetReader(c)
if err != nil {
handler.sendError(c, err)
Expand Down Expand Up @@ -1679,3 +1710,20 @@ func validateUploadId(newId string) error {

return nil
}

// loggingResponseWriter is a wrapper around http.ResponseWriter that logs the
// final status code similar to UnroutedHandler.sendResp.
type loggingResponseWriter struct {
http.ResponseWriter
logger *slog.Logger
}

func (w *loggingResponseWriter) WriteHeader(statusCode int) {
if statusCode >= 200 {
w.logger.Info("ResponseOutgoing", "status", statusCode)
}
w.ResponseWriter.WriteHeader(statusCode)
}

// Unwrap provides access to the underlying http.ResponseWriter.
func (w *loggingResponseWriter) Unwrap() http.ResponseWriter { return w.ResponseWriter }
6 changes: 5 additions & 1 deletion pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ import (
// considered valid into a header value according to RFC2616.
var nonPrintableRegexp = regexp.MustCompile(`[^\x09\x20-\x7E]`)

// errIncompleteUpload is used when a client attempts to download an incomplete upload
var errIncompleteUpload = handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest)

// See the handler.DataStore interface for documentation about the different
// methods.
type S3Store struct {
Expand Down Expand Up @@ -262,6 +265,7 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) {
composer.UseTerminater(store)
composer.UseConcater(store)
composer.UseLengthDeferrer(store)
composer.UseContentServer(store)
}

func (store S3Store) RegisterMetrics(registry prometheus.Registerer) {
Expand Down Expand Up @@ -758,7 +762,7 @@ func (upload s3Upload) GetReader(ctx context.Context) (io.ReadCloser, error) {
})
if err == nil {
// The multipart upload still exists, which means we cannot download it yet
return nil, handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest)
return nil, errIncompleteUpload
}

// The AWS Go SDK v2 has a bug where types.NoSuchUpload is not returned,
Expand Down
141 changes: 141 additions & 0 deletions pkg/s3store/serve_content.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package s3store

import (
"context"
"errors"
"io"
"net/http"
"strconv"

"github.com/tus/tusd/v2/pkg/handler"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

func (store S3Store) AsServableUpload(upload handler.Upload) handler.ServableUpload {
return upload.(*s3Upload)
}

func (upload *s3Upload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
input := &s3.GetObjectInput{
Bucket: aws.String(upload.store.Bucket),
Key: upload.store.keyWithPrefix(upload.objectId),
}

// Forward the Range, If-Match, If-None-Match, If-Unmodified-Since, If-Modified-Since headers if present
if val := r.Header.Get("Range"); val != "" {
input.Range = aws.String(val)
}
if val := r.Header.Get("If-Match"); val != "" {
input.IfMatch = aws.String(val)
}
if val := r.Header.Get("If-None-Match"); val != "" {
input.IfNoneMatch = aws.String(val)
}
if val := r.Header.Get("If-Modified-Since"); val != "" {
t, err := http.ParseTime(val)
if err == nil {
input.IfModifiedSince = aws.Time(t)
}
}
if val := r.Header.Get("If-Unmodified-Since"); val != "" {
t, err := http.ParseTime(val)
if err == nil {
input.IfUnmodifiedSince = aws.Time(t)
}
}

// Let S3 handle the request
result, err := upload.store.Service.GetObject(ctx, input)
if err != nil {
// Delete the headers set by tusd's handler. We don't need them for errors.
w.Header().Del("Content-Type")
w.Header().Del("Content-Disposition")

var respErr *awshttp.ResponseError
if errors.As(err, &respErr) {
if respErr.HTTPStatusCode() == http.StatusNotFound || respErr.HTTPStatusCode() == http.StatusForbidden {
// If the object cannot be found, it means that the upload is not yet complete and we cannot serve it.
// At this stage it is not possible that the upload itself does not exist, because the handler
// alredy checked this case. Therefore, we can safely assume that the upload is still in progress.
return errIncompleteUpload
}

if respErr.HTTPStatusCode() == http.StatusNotModified {
// Content-Location, Date, ETag, Vary, Cache-Control and Expires should be set
// for 304 Not Modified responses. See https://httpwg.org/specs/rfc9110.html#status.304
for _, header := range []string{"Content-Location", "Date", "ETag", "Vary", "Cache-Control", "Expires"} {
if val := respErr.Response.Header.Get(header); val != "" {
w.Header().Set(header, val)
}
}

w.WriteHeader(http.StatusNotModified)
return nil
}

if respErr.HTTPStatusCode() == http.StatusRequestedRangeNotSatisfiable {
// Content-Range should be set for 416 Request Range Not Satisfiable responses.
// See https://httpwg.org/specs/rfc9110.html#status.304
// Note: AWS S3 does not seem to include this header in its response.
if val := respErr.Response.Header.Get("Content-Range"); val != "" {
w.Header().Set("Content-Range", val)
}

w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return nil
}
}
return err
}
defer result.Body.Close()

// Add Accept-Ranges,Content-*, Cache-Control, ETag, Expires, Last-Modified headers if present in S3 response
if result.AcceptRanges != nil {
w.Header().Set("Accept-Ranges", *result.AcceptRanges)
}
if result.ContentDisposition != nil {
w.Header().Set("Content-Disposition", *result.ContentDisposition)
}
if result.ContentEncoding != nil {
w.Header().Set("Content-Encoding", *result.ContentEncoding)
}
if result.ContentLanguage != nil {
w.Header().Set("Content-Language", *result.ContentLanguage)
}
if result.ContentLength != nil {
w.Header().Set("Content-Length", strconv.FormatInt(*result.ContentLength, 10))
}
if result.ContentRange != nil {
w.Header().Set("Content-Range", *result.ContentRange)
}
if result.ContentType != nil {
w.Header().Set("Content-Type", *result.ContentType)
}
if result.CacheControl != nil {
w.Header().Set("Cache-Control", *result.CacheControl)
}
if result.ETag != nil {
w.Header().Set("ETag", *result.ETag)
}
if result.ExpiresString != nil {
w.Header().Set("Expires", *result.ExpiresString)
}
if result.LastModified != nil {
w.Header().Set("Last-Modified", result.LastModified.Format(http.TimeFormat))
}

statusCode := http.StatusOK
if result.ContentRange != nil {
// Use 206 Partial Content for range requests
statusCode = http.StatusPartialContent
} else if result.ContentLength != nil && *result.ContentLength == 0 {
statusCode = http.StatusNoContent
}
w.WriteHeader(statusCode)

_, err = io.Copy(w, result.Body)
return err
}
Loading

0 comments on commit e20b174

Please sign in to comment.