Skip to content

Commit

Permalink
Add pre-terminate hook (#1248)
Browse files Browse the repository at this point in the history
* Add `pre-terminate` hook

* Add test for hooks package

* Add test for handler package

* Document new hook return value
  • Loading branch information
Acconut authored Mar 3, 2025
1 parent b4d5032 commit e33611c
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 6 deletions.
8 changes: 8 additions & 0 deletions docs/_advanced-topics/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The table below provides an overview of all available hooks.
| post-receive | No | regularly while data is being transmitted. | logging upload progress, stopping running uploads | Yes |
| pre-finish | Yes | after all upload data has been received but before a response is sent. | sending custom data when an upload is finished | No |
| post-finish | No | after all upload data has been received and after a response is sent. | post-processing of upload, logging of upload end | Yes |
| pre-terminate | Yes | before an upload will be terminated. | checking if an upload should be deleted | No |
| post-terminate | No | after an upload has been terminated. | clean up of allocated resources | Yes |

Users should be aware of following things:
Expand Down Expand Up @@ -161,6 +162,13 @@ Below you can find an annotated, JSON-ish encoded example of a hook response:
// to the client.
"RejectUpload": false,

// RejectTermination will cause upload terminations via DELETE requests to be rejected,
// allowing the hook to control whether associated resources are deleted.
// This value is only respected for pre-terminate hooks. For other hooks,
// it is ignored. Use the HTTPResponse field to send details about the rejection
// to the client.
"RejectTermination": false,

// ChangeFileInfo can be set to change selected properties of an upload before
// it has been created.
// Changes are applied on a per-property basis, meaning that specifying just
Expand Down
7 changes: 7 additions & 0 deletions pkg/handler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ type Config struct {
// If the error is non-nil, the error will be forwarded to the client. Furthermore,
// HTTPResponse will be ignored and the error value can contain values for the HTTP response.
PreFinishResponseCallback func(hook HookEvent) (HTTPResponse, error)
// PreUploadTerminateCallback will be invoked on DELETE requests before an upload is terminated,
// giving the application the opportunity to reject the termination. For example, to ensure resources
// used by other services are not deleted.
// If the callback returns no error, optional values from HTTPResponse will be contained in the HTTP response.
// If the error is non-nil, the error will be forwarded to the client. Furthermore,
// HTTPResponse will be ignored and the error value can contain values for the HTTP response.
PreUploadTerminateCallback func(hook HookEvent) (HTTPResponse, error)
// GracefulRequestCompletionTimeout is the timeout for operations to complete after an HTTP
// request has ended (successfully or by error). For example, if an HTTP request is interrupted,
// instead of stopping immediately, the handler and data store will be given some additional
Expand Down
68 changes: 68 additions & 0 deletions pkg/handler/terminate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ func TestTerminate(t *testing.T) {
composer.UseTerminater(store)
composer.UseLocker(locker)

preTerminateCalled := false
handler, _ := NewHandler(Config{
StoreComposer: composer,
NotifyTerminatedUploads: true,
PreUploadTerminateCallback: func(hook HookEvent) (HTTPResponse, error) {
preTerminateCalled = true
return HTTPResponse{}, nil
},
})

c := make(chan HookEvent, 1)
Expand All @@ -81,6 +86,69 @@ func TestTerminate(t *testing.T) {
req := event.HTTPRequest
a.Equal("DELETE", req.Method)
a.Equal("foo", req.URI)

a.True(preTerminateCalled)
})

SubTest(t, "RejectTermination", func(t *testing.T, store *MockFullDataStore, _ *StoreComposer) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
locker := NewMockFullLocker(ctrl)
lock := NewMockFullLock(ctrl)
upload := NewMockFullUpload(ctrl)

gomock.InOrder(
locker.EXPECT().NewLock("foo").Return(lock, nil),
lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil),
store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil),
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
ID: "foo",
Size: 10,
}, nil),
lock.EXPECT().Unlock().Return(nil),
)

composer := NewStoreComposer()
composer.UseCore(store)
composer.UseTerminater(store)
composer.UseLocker(locker)

a := assert.New(t)

handler, _ := NewHandler(Config{
StoreComposer: composer,
NotifyTerminatedUploads: true,
PreUploadTerminateCallback: func(hook HookEvent) (HTTPResponse, error) {
a.Equal("foo", hook.Upload.ID)
a.Equal(int64(10), hook.Upload.Size)

req := hook.HTTPRequest
a.Equal("DELETE", req.Method)
a.Equal("foo", req.URI)

return HTTPResponse{}, ErrUploadTerminationRejected
},
})

c := make(chan HookEvent, 1)
handler.TerminatedUploads = c

(&httpTest{
Method: "DELETE",
URL: "foo",
ReqHeader: map[string]string{
"Tus-Resumable": "1.0.0",
},
Code: http.StatusBadRequest,
ResBody: "ERR_UPLOAD_TERMINATION_REJECTED: upload termination has been rejected by server\n",
}).Run(handler, t)

select {
case <-c:
a.Fail("Expected termination to be rejected")
default:
// Expected no event
}
})

SubTest(t, "NotProvided", func(t *testing.T, store *MockFullDataStore, _ *StoreComposer) {
Expand Down
20 changes: 16 additions & 4 deletions pkg/handler/unrouted_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
ErrInvalidUploadDeferLength = NewError("ERR_INVALID_UPLOAD_LENGTH_DEFER", "invalid Upload-Defer-Length header", http.StatusBadRequest)
ErrUploadStoppedByServer = NewError("ERR_UPLOAD_STOPPED", "upload has been stopped by server", http.StatusBadRequest)
ErrUploadRejectedByServer = NewError("ERR_UPLOAD_REJECTED", "upload creation has been rejected by server", http.StatusBadRequest)
ErrUploadTerminationRejected = NewError("ERR_UPLOAD_TERMINATION_REJECTED", "upload termination has been rejected by server", http.StatusBadRequest)
ErrUploadInterrupted = NewError("ERR_UPLOAD_INTERRUPTED", "upload has been interrupted by another request for this upload resource", http.StatusBadRequest)
ErrServerShutdown = NewError("ERR_SERVER_SHUTDOWN", "request has been interrupted because the server is shutting down", http.StatusServiceUnavailable)
ErrOriginNotAllowed = NewError("ERR_ORIGIN_NOT_ALLOWED", "request origin is not allowed", http.StatusForbidden)
Expand Down Expand Up @@ -1203,23 +1204,34 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
}

var info FileInfo
if handler.config.NotifyTerminatedUploads {
if handler.config.NotifyTerminatedUploads || handler.config.PreUploadTerminateCallback != nil {
info, err = upload.GetInfo(c)
if err != nil {
handler.sendError(c, err)
return
}
}

resp := HTTPResponse{
StatusCode: http.StatusNoContent,
}

if handler.config.PreUploadTerminateCallback != nil {
resp2, err := handler.config.PreUploadTerminateCallback(newHookEvent(c, info))
if err != nil {
handler.sendError(c, err)
return
}
resp = resp.MergeWith(resp2)
}

err = handler.terminateUpload(c, upload, info)
if err != nil {
handler.sendError(c, err)
return
}

handler.sendResp(c, HTTPResponse{
StatusCode: http.StatusNoContent,
})
handler.sendResp(c, resp)
}

// terminateUpload passes a given upload to the DataStore's Terminater,
Expand Down
37 changes: 36 additions & 1 deletion pkg/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ type HookResponse struct {
// to the client.
RejectUpload bool

// RejectTermination will cause the termination of the upload to be rejected, keeping the upload.
// This value is only respected for pre-terminate hooks. For other hooks,
// it is ignored. Use the HTTPResponse field to send details about the rejection
// to the client.
RejectTermination bool

// ChangeFileInfo can be set to change selected properties of an upload before
// it has been created. See the handler.FileInfoChanges type for more details.
// Changes are applied on a per-property basis, meaning that specifying just
Expand All @@ -91,10 +97,11 @@ const (
HookPostCreate HookType = "post-create"
HookPreCreate HookType = "pre-create"
HookPreFinish HookType = "pre-finish"
HookPreTerminate HookType = "pre-terminate"
)

// AvailableHooks is a slice of all hooks that are implemented by tusd.
var AvailableHooks []HookType = []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPostTerminate, HookPostFinish, HookPreFinish}
var AvailableHooks []HookType = []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPreTerminate, HookPostTerminate, HookPostFinish, HookPreFinish}

func preCreateCallback(event handler.HookEvent, hookHandler HookHandler) (handler.HTTPResponse, handler.FileInfoChanges, error) {
ok, hookRes, err := invokeHookSync(HookPreCreate, event, hookHandler)
Expand Down Expand Up @@ -128,6 +135,26 @@ func preFinishCallback(event handler.HookEvent, hookHandler HookHandler) (handle
return httpRes, nil
}

func preTerminateCallback(event handler.HookEvent, hookHandler HookHandler) (handler.HTTPResponse, error) {
ok, hookRes, err := invokeHookSync(HookPreTerminate, event, hookHandler)
if !ok || err != nil {
return handler.HTTPResponse{}, err
}

httpRes := hookRes.HTTPResponse

// If the hook response includes the instruction to reject the termination, reuse the error code
// and message from ErrUploadTerminationRejected, but also include custom HTTP response values.
if hookRes.RejectTermination {
err := handler.ErrUploadTerminationRejected
err.HTTPResponse = err.HTTPResponse.MergeWith(httpRes)

return handler.HTTPResponse{}, err
}

return httpRes, nil
}

func postReceiveCallback(event handler.HookEvent, hookHandler HookHandler) {
ok, hookRes, _ := invokeHookSync(HookPostReceive, event, hookHandler)
// invokeHookSync already logs the error, if any occurs. So by checking `ok`, we can ensure
Expand Down Expand Up @@ -166,12 +193,14 @@ func SetupHookMetrics() {
MetricsHookErrorsTotal.WithLabelValues(string(HookPostCreate)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(HookPreCreate)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(HookPreFinish)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(HookPreTerminate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPostFinish)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPostTerminate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPostReceive)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPostCreate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPreCreate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPreFinish)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPreTerminate)).Add(0)
}

func invokeHookAsync(typ HookType, event handler.HookEvent, hookHandler HookHandler) {
Expand Down Expand Up @@ -248,6 +277,12 @@ func NewHandlerWithHooks(config *handler.Config, hookHandler HookHandler, enable
}
}

if slices.Contains(enabledHooks, HookPreTerminate) {
config.PreUploadTerminateCallback = func(event handler.HookEvent) (handler.HTTPResponse, error) {
return preTerminateCallback(event, hookHandler)
}
}

// Create handler
handler, err := handler.NewHandler(*config)
if err != nil {
Expand Down
35 changes: 34 additions & 1 deletion pkg/hooks/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ func TestNewHandlerWithHooks(t *testing.T) {
Type: HookPreFinish,
Event: event,
}).Return(HookResponse{}, error),
hookHandler.EXPECT().InvokeHook(HookRequest{
Type: HookPreTerminate,
Event: event,
}).Return(HookResponse{
HTTPResponse: response,
}, nil),
hookHandler.EXPECT().InvokeHook(HookRequest{
Type: HookPreTerminate,
Event: event,
}).Return(HookResponse{
HTTPResponse: response,
RejectTermination: true,
}, nil),
)

// The hooks are executed asynchronously, so we don't know their execution order.
Expand All @@ -112,7 +125,7 @@ func TestNewHandlerWithHooks(t *testing.T) {
Event: event,
})

uploadHandler, err := NewHandlerWithHooks(&config, hookHandler, []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPostTerminate, HookPostFinish, HookPreFinish})
uploadHandler, err := NewHandlerWithHooks(&config, hookHandler, []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPostTerminate, HookPostFinish, HookPreFinish, HookPreTerminate})
a.NoError(err)

// Successful pre-create hook
Expand Down Expand Up @@ -148,6 +161,26 @@ func TestNewHandlerWithHooks(t *testing.T) {
a.Equal(error, err)
a.Equal(handler.HTTPResponse{}, resp_got)

// Successful pre-terminate hook
resp_got, err = config.PreUploadTerminateCallback(event)
a.NoError(err)
a.Equal(response, resp_got)

// Pre-terminate hook with rejection
resp_got, err = config.PreUploadTerminateCallback(event)
a.Equal(handler.Error{
ErrorCode: handler.ErrUploadTerminationRejected.ErrorCode,
Message: handler.ErrUploadTerminationRejected.Message,
HTTPResponse: handler.HTTPResponse{
StatusCode: 200,
Body: "foobar",
Header: handler.HTTPHeader{
"X-Hello": "here",
"Content-Type": "text/plain; charset=utf-8",
},
},
}, err)

// Successful post-* hooks
uploadHandler.CreatedUploads <- event
uploadHandler.UploadProgress <- event
Expand Down

0 comments on commit e33611c

Please sign in to comment.