diff --git a/docs/reference/filters.md b/docs/reference/filters.md index cb8b084ac5..70710dda0f 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -466,6 +466,23 @@ Example: * -> randomContent(42) -> ; ``` +## backendTimeout + +Configure backend timeout. Skipper responds with `504 Gateway Timeout` status if obtaining a connection, +sending the request, and reading the backend response headers and body takes longer than the configured timeout. +However, if response streaming has already started it will be terminated, i.e. client will receive backend response +status and truncated response body. + +Parameters: + +* timeout [(duration string)](https://godoc.org/time#ParseDuration) + +Example: + +``` +* -> backendTimeout("10ms") -> "https://www.example.org"; +``` + ## latency Enable adding artificial latency diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index be29bc438a..a2083b9ada 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -72,6 +72,7 @@ const ( InlineContentIfStatusName = "inlineContentIfStatus" HeaderToQueryName = "headerToQuery" QueryToHeaderName = "queryToHeader" + BackendTimeoutName = "backendTimeout" ) // Returns a Registry object initialized with the default set of filter @@ -120,6 +121,7 @@ func MakeRegistry() filters.Registry { NewDecompress(), NewHeaderToQuery(), NewQueryToHeader(), + NewBackendTimeout(), NewSetDynamicBackendHostFromHeader(), NewSetDynamicBackendSchemeFromHeader(), NewSetDynamicBackendUrlFromHeader(), diff --git a/filters/builtin/timeout.go b/filters/builtin/timeout.go new file mode 100644 index 0000000000..63e56b0ec6 --- /dev/null +++ b/filters/builtin/timeout.go @@ -0,0 +1,47 @@ +package builtin + +import ( + "time" + + "github.com/zalando/skipper/filters" +) + +type timeout struct { + timeout time.Duration +} + +func NewBackendTimeout() filters.Spec { + return &timeout{} +} + +func (*timeout) Name() string { return BackendTimeoutName } + +func (*timeout) CreateFilter(args []interface{}) (filters.Filter, error) { + if len(args) != 1 { + return nil, filters.ErrInvalidFilterParameters + } + + var tf timeout + switch v := args[0].(type) { + case string: + d, err := time.ParseDuration(v) + if err != nil { + return nil, err + } + tf.timeout = d + case time.Duration: + tf.timeout = v + default: + return nil, filters.ErrInvalidFilterParameters + } + return &tf, nil +} + +func (t *timeout) Request(ctx filters.FilterContext) { + sb := ctx.StateBag() + if _, ok := sb[filters.BackendTimeout]; !ok { // once + sb[filters.BackendTimeout] = t.timeout + } +} + +func (t *timeout) Response(filters.FilterContext) {} diff --git a/filters/builtin/timeout_test.go b/filters/builtin/timeout_test.go new file mode 100644 index 0000000000..e827bbbf64 --- /dev/null +++ b/filters/builtin/timeout_test.go @@ -0,0 +1,37 @@ +package builtin + +import ( + "net/http" + "testing" + "time" + + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/filters/filtertest" +) + +func TestBackendTimeout(t *testing.T) { + bt := NewBackendTimeout() + if bt.Name() != BackendTimeoutName { + t.Error("wrong name") + } + + f, err := bt.CreateFilter([]interface{}{"2s"}) + if err != nil { + t.Error("wrong id") + } + + c := &filtertest.Context{FRequest: &http.Request{}, FStateBag: make(map[string]interface{})} + f.Request(c) + + if c.FStateBag[filters.BackendTimeout] != 2*time.Second { + t.Error("wrong timeout") + } + + // second filter + f, err = bt.CreateFilter([]interface{}{"5s"}) + f.Request(c) + + if c.FStateBag[filters.BackendTimeout] != 2*time.Second { + t.Error("no change expected") + } +} diff --git a/filters/filters.go b/filters/filters.go index f45ba1247b..087702745e 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -20,6 +20,9 @@ const ( // BackendIsProxyKey is the key used in the state bag to notify proxy that the backend is also a proxy. BackendIsProxyKey = "backend:isproxy" + + // BackendTimeout is the key used in the state bag to configure backend timeout in proxy + BackendTimeout = "backend:timeout" ) // Context object providing state and information that is unique to a request. diff --git a/proxy/backendtimeout_test.go b/proxy/backendtimeout_test.go new file mode 100644 index 0000000000..59f5af46e2 --- /dev/null +++ b/proxy/backendtimeout_test.go @@ -0,0 +1,111 @@ +package proxy + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestBackendTimeoutBelow(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(2 * time.Millisecond) + })) + defer backend.Close() + + doc := fmt.Sprintf(`* -> backendTimeout("1ms") -> "%s"`, backend.URL) + tp, err := newTestProxy(doc, FlagsNone) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + rsp, err := http.Get(ps.URL) + if err != nil { + t.Fatal(err) + } + + if rsp.StatusCode != http.StatusGatewayTimeout { + t.Errorf("expected 504, got: %v", rsp) + } +} + +func TestBackendTimeoutAbove(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(2 * time.Millisecond) + })) + defer backend.Close() + + doc := fmt.Sprintf(`* -> backendTimeout("10ms") -> "%s"`, backend.URL) + tp, err := newTestProxy(doc, FlagsNone) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + rsp, err := http.Get(ps.URL) + if err != nil { + t.Fatal(err) + } + + if rsp.StatusCode != http.StatusOK { + t.Errorf("expected 200, got: %v", rsp) + } +} + +func TestBackendTimeoutInTheMiddleOfResponse(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Write([]byte("Wish You")) + + f := w.(http.Flusher) + f.Flush() + + time.Sleep(20 * time.Millisecond) + + w.Write([]byte(" Were Here")) + })) + defer backend.Close() + + doc := fmt.Sprintf(`* -> backendTimeout("10ms") -> "%s"`, backend.URL) + tp, err := newTestProxy(doc, FlagsNone) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + rsp, err := http.Get(ps.URL) + if err != nil { + t.Fatal(err) + } + + if rsp.StatusCode != http.StatusOK { + t.Errorf("expected 200, got: %v", rsp) + } + + body, err := ioutil.ReadAll(rsp.Body) + if err != nil { + t.Error(err) + } + + content := string(body) + if content != "Wish You" { + t.Errorf("expected partial content, got %s", content) + } + + const msg = "error while copying the response stream: context deadline exceeded" + if err = tp.log.WaitFor(msg, 10*time.Millisecond); err != nil { + t.Errorf("expected '%s' in logs", msg) + } +} diff --git a/proxy/context.go b/proxy/context.go index 75eb9ec1c7..4cfe81b439 100644 --- a/proxy/context.go +++ b/proxy/context.go @@ -2,6 +2,7 @@ package proxy import ( "bytes" + stdlibcontext "context" "errors" "io" "io/ioutil" @@ -45,6 +46,7 @@ type context struct { parentSpan opentracing.Span proxy *Proxy routeLookup *routing.RouteLookup + cancelBackendContext stdlibcontext.CancelFunc } type filterMetrics struct { diff --git a/proxy/proxy.go b/proxy/proxy.go index 38cd94ac42..de5ca7bfd5 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -480,8 +480,13 @@ func setRequestURLForLoadBalancedBackend(u *url.URL, rt *routing.Route, lbctx *r // creates an outgoing http request to be forwarded to the route endpoint // based on the augmented incoming request -func mapRequest(r *http.Request, rt *routing.Route, host string, removeHopHeaders bool, stateBag map[string]interface{}) (*http.Request, error) { +func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHeaders bool) (*http.Request, error) { + r := ctx.request + rt := ctx.route + host := ctx.outgoingHost + stateBag := ctx.StateBag() u := r.URL + switch rt.BackendType { case eskip.DynamicBackend: setRequestURLFromRequest(u, r) @@ -498,13 +503,11 @@ func mapRequest(r *http.Request, rt *routing.Route, host string, removeHopHeader body = nil } - rr, err := http.NewRequest(r.Method, u.String(), body) + rr, err := http.NewRequestWithContext(requestContext, r.Method, u.String(), body) if err != nil { return nil, err } - rr = rr.WithContext(r.Context()) - rr.ContentLength = r.ContentLength if removeHopHeaders { rr.Header = cloneHeaderExcluding(r.Header, hopHeaders) @@ -876,9 +879,9 @@ func (p *Proxy) makeUpgradeRequest(ctx *context, req *http.Request) error { return nil } -func (p *Proxy) makeBackendRequest(ctx *context) (*http.Response, *proxyError) { +func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Context) (*http.Response, *proxyError) { var err error - req, err := mapRequest(ctx.request, ctx.route, ctx.outgoingHost, p.flags.HopHeadersRemoval(), ctx.StateBag()) + req, err := mapRequest(ctx, requestContext, p.flags.HopHeadersRemoval()) if err != nil { p.log.Errorf("could not map backend request, caused by: %v", err) return nil, &proxyError{err: err} @@ -1138,7 +1141,7 @@ func (p *Proxy) do(ctx *context) error { ctx.setResponse(loopCTX.response, p.flags.PreserveOriginal()) ctx.proxySpan = loopCTX.proxySpan } else if p.flags.Debug() { - debugReq, err := mapRequest(ctx.request, ctx.route, ctx.outgoingHost, p.flags.HopHeadersRemoval(), ctx.StateBag()) + debugReq, err := mapRequest(ctx, ctx.request.Context(), p.flags.HopHeadersRemoval()) if err != nil { return &proxyError{err: err} } @@ -1153,8 +1156,13 @@ func (p *Proxy) do(ctx *context) error { return errCircuitBreakerOpen } + backendContext := ctx.request.Context() + if timeout, ok := ctx.StateBag()[filters.BackendTimeout]; ok { + backendContext, ctx.cancelBackendContext = stdlibcontext.WithTimeout(backendContext, timeout.(time.Duration)) + } + backendStart := time.Now() - rsp, perr := p.makeBackendRequest(ctx) + rsp, perr := p.makeBackendRequest(ctx, backendContext) if perr != nil { if done != nil { done(false) @@ -1172,7 +1180,7 @@ func (p *Proxy) do(ctx *context) error { perr = nil var perr2 *proxyError - rsp, perr2 = p.makeBackendRequest(ctx) + rsp, perr2 = p.makeBackendRequest(ctx, backendContext) if perr2 != nil { p.log.Errorf("Failed to do retry backend request: %v", perr2) if perr2.code >= http.StatusInternalServerError { @@ -1245,6 +1253,7 @@ func (p *Proxy) serveResponse(ctx *context) { } else { p.metrics.MeasureResponse(ctx.response.StatusCode, ctx.request.Method, ctx.route.Id, start) } + p.metrics.MeasureServe(ctx.route.Id, ctx.metricsHost(), ctx.request.Method, ctx.response.StatusCode, ctx.startServe) } func (p *Proxy) errorResponse(ctx *context, err error) { @@ -1277,6 +1286,7 @@ func (p *Proxy) errorResponse(ctx *context, err error) { } if span := ot.SpanFromContext(ctx.Request().Context()); span != nil { + p.tracing.setTag(span, ErrorTag, true) p.tracing.setTag(span, HTTPStatusCodeTag, uint16(code)) } @@ -1483,19 +1493,14 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { err = p.do(ctx) if err != nil { - p.tracing.setTag(span, ErrorTag, true) p.errorResponse(ctx, err) - return + } else { + p.serveResponse(ctx) } - p.serveResponse(ctx) - p.metrics.MeasureServe( - ctx.route.Id, - ctx.metricsHost(), - r.Method, - ctx.response.StatusCode, - ctx.startServe, - ) + if ctx.cancelBackendContext != nil { + ctx.cancelBackendContext() + } } // Close causes the proxy to stop closing idle