Skip to content

Commit

Permalink
Implements route backend timeout
Browse files Browse the repository at this point in the history
* Adds `backendTimeout` filter to configure route backend timeout
* Proxy sets up request context with configured timeout and responds
with 504 status on timeout (note: if response streaming has already
started it will be terminated, client will receive backend status and
truncated response body)

See zalando#1041

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Dec 8, 2020
1 parent c268dfb commit e7a40ea
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 21 deletions.
17 changes: 17 additions & 0 deletions docs/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,23 @@ Example:
* -> repeatContent("I will not waste chalk. ", 1000) -> <shunt>;
```

## backendTimeout

Configure backend timeout. Skipper responds with `504 Gateway Timeout` status if obtaining a connection,
sending the request, and reading the backend response headers and body takes longer than the configured timeout.
However, if response streaming has already started it will be terminated, i.e. client will receive backend response
status and truncated response body.

Parameters:

* timeout [(duration string)](https://godoc.org/time#ParseDuration)

Example:

```
* -> backendTimeout("10ms") -> "https://www.example.org";
```

## latency

Enable adding artificial latency
Expand Down
2 changes: 2 additions & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
InlineContentIfStatusName = "inlineContentIfStatus"
HeaderToQueryName = "headerToQuery"
QueryToHeaderName = "queryToHeader"
BackendTimeoutName = "backendTimeout"
)

// Returns a Registry object initialized with the default set of filter
Expand Down Expand Up @@ -119,6 +120,7 @@ func MakeRegistry() filters.Registry {
NewDecompress(),
NewHeaderToQuery(),
NewQueryToHeader(),
NewBackendTimeout(),
NewSetDynamicBackendHostFromHeader(),
NewSetDynamicBackendSchemeFromHeader(),
NewSetDynamicBackendUrlFromHeader(),
Expand Down
45 changes: 45 additions & 0 deletions filters/builtin/timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package builtin

import (
"time"

"github.com/zalando/skipper/filters"
)

type timeout struct {
timeout time.Duration
}

func NewBackendTimeout() filters.Spec {
return &timeout{}
}

func (*timeout) Name() string { return BackendTimeoutName }

func (*timeout) CreateFilter(args []interface{}) (filters.Filter, error) {
if len(args) != 1 {
return nil, filters.ErrInvalidFilterParameters
}

var tf timeout
switch v := args[0].(type) {
case string:
d, err := time.ParseDuration(v)
if err != nil {
return nil, err
}
tf.timeout = d
case time.Duration:
tf.timeout = v
default:
return nil, filters.ErrInvalidFilterParameters
}
return &tf, nil
}

func (t *timeout) Request(ctx filters.FilterContext) {
// allows overwrite
ctx.StateBag()[filters.BackendTimeout] = t.timeout
}

func (t *timeout) Response(filters.FilterContext) {}
37 changes: 37 additions & 0 deletions filters/builtin/timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package builtin

import (
"net/http"
"testing"
"time"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/filtertest"
)

func TestBackendTimeout(t *testing.T) {
bt := NewBackendTimeout()
if bt.Name() != BackendTimeoutName {
t.Error("wrong name")
}

f, err := bt.CreateFilter([]interface{}{"2s"})
if err != nil {
t.Error("wrong id")
}

c := &filtertest.Context{FRequest: &http.Request{}, FStateBag: make(map[string]interface{})}
f.Request(c)

if c.FStateBag[filters.BackendTimeout] != 2*time.Second {
t.Error("wrong timeout")
}

// second filter overwrites
f, _ = bt.CreateFilter([]interface{}{"5s"})
f.Request(c)

if c.FStateBag[filters.BackendTimeout] != 5*time.Second {
t.Error("overwrite expected")
}
}
3 changes: 3 additions & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (

// BackendIsProxyKey is the key used in the state bag to notify proxy that the backend is also a proxy.
BackendIsProxyKey = "backend:isproxy"

// BackendTimeout is the key used in the state bag to configure backend timeout in proxy
BackendTimeout = "backend:timeout"
)

// Context object providing state and information that is unique to a request.
Expand Down
218 changes: 218 additions & 0 deletions proxy/backendtimeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package proxy

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"
)

func TestSlowService(t *testing.T) {
wait := make(chan struct{})

service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-wait
}))
defer func() {
close(wait)
service.Close()
}()

doc := fmt.Sprintf(`* -> backendTimeout("1ms") -> "%s"`, service.URL)
tp, err := newTestProxy(doc, FlagsNone)
if err != nil {
t.Fatal(err)
}
defer tp.close()
if testing.Verbose() {
tp.log.Unmute()
}

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := http.Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusGatewayTimeout {
t.Errorf("expected 504, got: %v", rsp)
}
}

func TestFastService(t *testing.T) {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(1 * time.Millisecond)
}))
defer service.Close()

doc := fmt.Sprintf(`* -> backendTimeout("10ms") -> "%s"`, service.URL)
tp, err := newTestProxy(doc, FlagsNone)
if err != nil {
t.Fatal(err)
}
defer tp.close()
if testing.Verbose() {
tp.log.Unmute()
}

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := http.Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
t.Errorf("expected 200, got: %v", rsp)
}
}

func TestBackendTimeoutInTheMiddleOfServiceResponse(t *testing.T) {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("Wish You"))

f := w.(http.Flusher)
f.Flush()

time.Sleep(20 * time.Millisecond)

w.Write([]byte(" Were Here"))
}))
defer service.Close()

doc := fmt.Sprintf(`* -> backendTimeout("10ms") -> "%s"`, service.URL)
tp, err := newTestProxy(doc, FlagsNone)
if err != nil {
t.Fatal(err)
}
defer tp.close()
if testing.Verbose() {
tp.log.Unmute()
}

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := http.Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
t.Errorf("expected 200, got: %v", rsp)
}

body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
t.Error(err)
}

content := string(body)
if content != "Wish You" {
t.Errorf("expected partial content, got %s", content)
}

const msg = "error while copying the response stream: context deadline exceeded"
if err = tp.log.WaitFor(msg, 100*time.Millisecond); err != nil {
t.Errorf("expected '%s' in logs", msg)
}
}

type unstableRoundTripper struct {
inner http.RoundTripper
timeout time.Duration
attempt int
}

// Simulates dial timeout on every odd request
func (r *unstableRoundTripper) RoundTrip(req *http.Request) (rsp *http.Response, err error) {
if r.attempt%2 == 0 {
time.Sleep(r.timeout)
rsp, err = nil, &proxyError{
code: -1, // omit 0 handling in proxy.Error()
dialingFailed: true, // indicate error happened before http
}
} else {
rsp, err = r.inner.RoundTrip(req)
}
r.attempt = r.attempt + 1
return
}

func newUnstable(timeout time.Duration) func(r http.RoundTripper) http.RoundTripper {
return func(r http.RoundTripper) http.RoundTripper {
return &unstableRoundTripper{inner: r, timeout: timeout}
}
}

// Retryable request, dial timeout on first attempt, load balanced backend
// dial timeout (10ms) + service latency (10ms) > backendTimeout("15ms") => Gateway Timeout
func TestRetryAndSlowService(t *testing.T) {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(10 * time.Millisecond)
}))
defer service.Close()

doc := fmt.Sprintf(`* -> backendTimeout("15ms") -> <"%s", "%s">`, service.URL, service.URL)
tp, err := newTestProxyWithParams(doc, Params{
CustomHttpRoundTripperWrap: newUnstable(10 * time.Millisecond),
})
if err != nil {
t.Fatal(err)
}
defer tp.close()
if testing.Verbose() {
tp.log.Unmute()
}

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := http.Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusGatewayTimeout {
t.Errorf("expected 504, got: %v", rsp)
}
}

// Retryable request, dial timeout on first attempt, load balanced backend
// dial timeout (10ms) + service latency (10ms) < backendTimeout("25ms") => OK
func TestRetryAndFastService(t *testing.T) {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(10 * time.Millisecond)
}))
defer service.Close()

doc := fmt.Sprintf(`* -> backendTimeout("25ms") -> <"%s", "%s">`, service.URL, service.URL)
tp, err := newTestProxyWithParams(doc, Params{
CustomHttpRoundTripperWrap: newUnstable(10 * time.Millisecond),
})
if err != nil {
t.Fatal(err)
}
defer tp.close()
if testing.Verbose() {
tp.log.Unmute()
}

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := http.Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
t.Errorf("expected 200, got: %v", rsp)
}
}
2 changes: 2 additions & 0 deletions proxy/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"bytes"
stdlibcontext "context"
"errors"
"io"
"io/ioutil"
Expand Down Expand Up @@ -45,6 +46,7 @@ type context struct {
parentSpan opentracing.Span
proxy *Proxy
routeLookup *routing.RouteLookup
cancelBackendContext stdlibcontext.CancelFunc
}

type filterMetrics struct {
Expand Down
Loading

0 comments on commit e7a40ea

Please sign in to comment.