Skip to content

Commit

Permalink
Implements route backend timeout
Browse files Browse the repository at this point in the history
* Adds `backendTimeout` filter to configure route backend timeout
* Proxy sets up request context with configured timeout and responds
with 504 status on timeout (note: if response streaming has already
started it will be terminated, client will receive backend status and
truncated response body).

See zalando#1041

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Oct 23, 2020
1 parent 8f32650 commit be61996
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 19 deletions.
17 changes: 17 additions & 0 deletions docs/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,23 @@ Example:
* -> randomContent(42) -> <shunt>;
```

## backendTimeout

Configure backend timeout. Skipper responds with `504 Gateway Timeout` status if obtaining a connection,
sending the request, and reading the backend response headers and body takes longer than the configured timeout.
However, if response streaming has already started it will be terminated, i.e. client will receive backend response
status and truncated response body.

Parameters:

* timeout [(duration string)](https://godoc.org/time#ParseDuration)

Example:

```
* -> backendTimeout("10ms") -> "https://www.example.org";
```

## latency

Enable adding artificial latency
Expand Down
2 changes: 2 additions & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
InlineContentIfStatusName = "inlineContentIfStatus"
HeaderToQueryName = "headerToQuery"
QueryToHeaderName = "queryToHeader"
BackendTimeoutName = "backendTimeout"
)

// Returns a Registry object initialized with the default set of filter
Expand Down Expand Up @@ -120,6 +121,7 @@ func MakeRegistry() filters.Registry {
NewDecompress(),
NewHeaderToQuery(),
NewQueryToHeader(),
NewBackendTimeout(),
NewSetDynamicBackendHostFromHeader(),
NewSetDynamicBackendSchemeFromHeader(),
NewSetDynamicBackendUrlFromHeader(),
Expand Down
47 changes: 47 additions & 0 deletions filters/builtin/timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package builtin

import (
"time"

"github.com/zalando/skipper/filters"
)

type timeout struct {
timeout time.Duration
}

func NewBackendTimeout() filters.Spec {
return &timeout{}
}

func (*timeout) Name() string { return BackendTimeoutName }

func (*timeout) CreateFilter(args []interface{}) (filters.Filter, error) {
if len(args) != 1 {
return nil, filters.ErrInvalidFilterParameters
}

var tf timeout
switch v := args[0].(type) {
case string:
d, err := time.ParseDuration(v)
if err != nil {
return nil, err
}
tf.timeout = d
case time.Duration:
tf.timeout = v
default:
return nil, filters.ErrInvalidFilterParameters
}
return &tf, nil
}

func (t *timeout) Request(ctx filters.FilterContext) {
sb := ctx.StateBag()
if _, ok := sb[filters.BackendTimeout]; !ok { // once
sb[filters.BackendTimeout] = t.timeout
}
}

func (t *timeout) Response(filters.FilterContext) {}
37 changes: 37 additions & 0 deletions filters/builtin/timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package builtin

import (
"net/http"
"testing"
"time"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/filtertest"
)

func TestBackendTimeout(t *testing.T) {
bt := NewBackendTimeout()
if bt.Name() != BackendTimeoutName {
t.Error("wrong name")
}

f, err := bt.CreateFilter([]interface{}{"2s"})
if err != nil {
t.Error("wrong id")
}

c := &filtertest.Context{FRequest: &http.Request{}, FStateBag: make(map[string]interface{})}
f.Request(c)

if c.FStateBag[filters.BackendTimeout] != 2*time.Second {
t.Error("wrong timeout")
}

// second filter
f, err = bt.CreateFilter([]interface{}{"5s"})
f.Request(c)

if c.FStateBag[filters.BackendTimeout] != 2*time.Second {
t.Error("no change expected")
}
}
3 changes: 3 additions & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (

// BackendIsProxyKey is the key used in the state bag to notify proxy that the backend is also a proxy.
BackendIsProxyKey = "backend:isproxy"

// BackendTimeout is the key used in the state bag to configure backend timeout in proxy
BackendTimeout = "backend:timeout"
)

// Context object providing state and information that is unique to a request.
Expand Down
111 changes: 111 additions & 0 deletions proxy/backendtimeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package proxy

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"
)

func TestBackendTimeoutBelow(t *testing.T) {
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Millisecond)
}))
defer backend.Close()

doc := fmt.Sprintf(`* -> backendTimeout("1ms") -> "%s"`, backend.URL)
tp, err := newTestProxy(doc, FlagsNone)
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := http.Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusGatewayTimeout {
t.Errorf("expected 504, got: %v", rsp)
}
}

func TestBackendTimeoutAbove(t *testing.T) {
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Millisecond)
}))
defer backend.Close()

doc := fmt.Sprintf(`* -> backendTimeout("10ms") -> "%s"`, backend.URL)
tp, err := newTestProxy(doc, FlagsNone)
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := http.Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
t.Errorf("expected 200, got: %v", rsp)
}
}

func TestBackendTimeoutInTheMiddleOfResponse(t *testing.T) {
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("Wish You"))

f := w.(http.Flusher)
f.Flush()

time.Sleep(20 * time.Millisecond)

w.Write([]byte(" Were Here"))
}))
defer backend.Close()

doc := fmt.Sprintf(`* -> backendTimeout("10ms") -> "%s"`, backend.URL)
tp, err := newTestProxy(doc, FlagsNone)
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := http.Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
t.Errorf("expected 200, got: %v", rsp)
}

body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
t.Error(err)
}

content := string(body)
if content != "Wish You" {
t.Errorf("expected partial content, got %s", content)
}

const msg = "error while copying the response stream: context deadline exceeded"
if err = tp.log.WaitFor(msg, 10*time.Millisecond); err != nil {
t.Errorf("expected '%s' in logs", msg)
}
}
2 changes: 2 additions & 0 deletions proxy/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"bytes"
stdlibcontext "context"
"errors"
"io"
"io/ioutil"
Expand Down Expand Up @@ -45,6 +46,7 @@ type context struct {
parentSpan opentracing.Span
proxy *Proxy
routeLookup *routing.RouteLookup
cancelBackendContext stdlibcontext.CancelFunc
}

type filterMetrics struct {
Expand Down
43 changes: 24 additions & 19 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,13 @@ func setRequestURLForLoadBalancedBackend(u *url.URL, rt *routing.Route, lbctx *r

// creates an outgoing http request to be forwarded to the route endpoint
// based on the augmented incoming request
func mapRequest(r *http.Request, rt *routing.Route, host string, removeHopHeaders bool, stateBag map[string]interface{}) (*http.Request, error) {
func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHeaders bool) (*http.Request, error) {
r := ctx.request
rt := ctx.route
host := ctx.outgoingHost
stateBag := ctx.StateBag()
u := r.URL

switch rt.BackendType {
case eskip.DynamicBackend:
setRequestURLFromRequest(u, r)
Expand All @@ -498,13 +503,11 @@ func mapRequest(r *http.Request, rt *routing.Route, host string, removeHopHeader
body = nil
}

rr, err := http.NewRequest(r.Method, u.String(), body)
rr, err := http.NewRequestWithContext(requestContext, r.Method, u.String(), body)
if err != nil {
return nil, err
}

rr = rr.WithContext(r.Context())

rr.ContentLength = r.ContentLength
if removeHopHeaders {
rr.Header = cloneHeaderExcluding(r.Header, hopHeaders)
Expand Down Expand Up @@ -876,9 +879,9 @@ func (p *Proxy) makeUpgradeRequest(ctx *context, req *http.Request) error {
return nil
}

func (p *Proxy) makeBackendRequest(ctx *context) (*http.Response, *proxyError) {
func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Context) (*http.Response, *proxyError) {
var err error
req, err := mapRequest(ctx.request, ctx.route, ctx.outgoingHost, p.flags.HopHeadersRemoval(), ctx.StateBag())
req, err := mapRequest(ctx, requestContext, p.flags.HopHeadersRemoval())
if err != nil {
p.log.Errorf("could not map backend request, caused by: %v", err)
return nil, &proxyError{err: err}
Expand Down Expand Up @@ -1138,7 +1141,7 @@ func (p *Proxy) do(ctx *context) error {
ctx.setResponse(loopCTX.response, p.flags.PreserveOriginal())
ctx.proxySpan = loopCTX.proxySpan
} else if p.flags.Debug() {
debugReq, err := mapRequest(ctx.request, ctx.route, ctx.outgoingHost, p.flags.HopHeadersRemoval(), ctx.StateBag())
debugReq, err := mapRequest(ctx, ctx.request.Context(), p.flags.HopHeadersRemoval())
if err != nil {
return &proxyError{err: err}
}
Expand All @@ -1153,8 +1156,13 @@ func (p *Proxy) do(ctx *context) error {
return errCircuitBreakerOpen
}

backendContext := ctx.request.Context()
if timeout, ok := ctx.StateBag()[filters.BackendTimeout]; ok {
backendContext, ctx.cancelBackendContext = stdlibcontext.WithTimeout(backendContext, timeout.(time.Duration))
}

backendStart := time.Now()
rsp, perr := p.makeBackendRequest(ctx)
rsp, perr := p.makeBackendRequest(ctx, backendContext)
if perr != nil {
if done != nil {
done(false)
Expand All @@ -1172,7 +1180,7 @@ func (p *Proxy) do(ctx *context) error {

perr = nil
var perr2 *proxyError
rsp, perr2 = p.makeBackendRequest(ctx)
rsp, perr2 = p.makeBackendRequest(ctx, backendContext)
if perr2 != nil {
p.log.Errorf("Failed to do retry backend request: %v", perr2)
if perr2.code >= http.StatusInternalServerError {
Expand Down Expand Up @@ -1245,6 +1253,7 @@ func (p *Proxy) serveResponse(ctx *context) {
} else {
p.metrics.MeasureResponse(ctx.response.StatusCode, ctx.request.Method, ctx.route.Id, start)
}
p.metrics.MeasureServe(ctx.route.Id, ctx.metricsHost(), ctx.request.Method, ctx.response.StatusCode, ctx.startServe)
}

func (p *Proxy) errorResponse(ctx *context, err error) {
Expand Down Expand Up @@ -1277,6 +1286,7 @@ func (p *Proxy) errorResponse(ctx *context, err error) {
}

if span := ot.SpanFromContext(ctx.Request().Context()); span != nil {
p.tracing.setTag(span, ErrorTag, true)
p.tracing.setTag(span, HTTPStatusCodeTag, uint16(code))
}

Expand Down Expand Up @@ -1483,19 +1493,14 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = p.do(ctx)

if err != nil {
p.tracing.setTag(span, ErrorTag, true)
p.errorResponse(ctx, err)
return
} else {
p.serveResponse(ctx)
}

p.serveResponse(ctx)
p.metrics.MeasureServe(
ctx.route.Id,
ctx.metricsHost(),
r.Method,
ctx.response.StatusCode,
ctx.startServe,
)
if ctx.cancelBackendContext != nil {
ctx.cancelBackendContext()
}
}

// Close causes the proxy to stop closing idle
Expand Down

0 comments on commit be61996

Please sign in to comment.