From e0f2af0d1f03a100e7ebee290cafda8faa5ee406 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 6 Mar 2024 18:37:44 +0100 Subject: [PATCH 1/2] feature: httpclient supporting retry net.Client.Retry() feature: retry() filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- filters/builtin/builtin.go | 2 + filters/filters.go | 1 + filters/retry/retry.go | 18 ++ filters/retry/retry_test.go | 95 ++++++++ io/copy_stream.go | 49 +++++ io/copy_stream_test.go | 50 +++++ io/failing_write_buffer_test.go | 22 ++ net/httpclient.go | 47 +++- net/httpclient_test.go | 371 ++++++++++++++++++++++++++++++++ proxy/proxy.go | 40 ++++ 10 files changed, 687 insertions(+), 8 deletions(-) create mode 100644 filters/retry/retry.go create mode 100644 filters/retry/retry_test.go create mode 100644 io/copy_stream.go create mode 100644 io/copy_stream_test.go create mode 100644 io/failing_write_buffer_test.go diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 78e046eb21..7dbc584476 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -16,6 +16,7 @@ import ( "github.com/zalando/skipper/filters/fadein" "github.com/zalando/skipper/filters/flowid" logfilter "github.com/zalando/skipper/filters/log" + "github.com/zalando/skipper/filters/retry" "github.com/zalando/skipper/filters/rfc" "github.com/zalando/skipper/filters/scheduler" "github.com/zalando/skipper/filters/sed" @@ -231,6 +232,7 @@ func Filters() []filters.Spec { fadein.NewEndpointCreated(), consistenthash.NewConsistentHashKey(), consistenthash.NewConsistentHashBalanceFactor(), + retry.NewRetry(), tls.New(), } } diff --git a/filters/filters.go b/filters/filters.go index 1c554a95db..c423aa93ca 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -343,6 +343,7 @@ const ( FifoWithBodyName = "fifoWithBody" LifoName = "lifo" LifoGroupName = "lifoGroup" + RetryName = "retry" RfcPathName = "rfcPath" RfcHostName = "rfcHost" BearerInjectorName = "bearerinjector" diff --git a/filters/retry/retry.go b/filters/retry/retry.go new file mode 100644 index 0000000000..3890f87df4 --- /dev/null +++ b/filters/retry/retry.go @@ -0,0 +1,18 @@ +package retry + +import ( + "github.com/zalando/skipper/filters" +) + +type retry struct{} + +// NewRetry creates a filter specification for the retry() filter +func NewRetry() filters.Spec { return retry{} } + +func (retry) Name() string { return filters.RetryName } +func (retry) CreateFilter([]interface{}) (filters.Filter, error) { return retry{}, nil } +func (retry) Response(filters.FilterContext) {} + +func (retry) Request(ctx filters.FilterContext) { + ctx.StateBag()[filters.RetryName] = struct{}{} +} diff --git a/filters/retry/retry_test.go b/filters/retry/retry_test.go new file mode 100644 index 0000000000..81f97c5bbb --- /dev/null +++ b/filters/retry/retry_test.go @@ -0,0 +1,95 @@ +package retry + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/AlexanderYastrebov/noleak" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/proxy/proxytest" +) + +func TestRetry(t *testing.T) { + for _, tt := range []struct { + name string + method string + body string + }{ + { + name: "test GET", + method: "GET", + }, + { + name: "test POST", + method: "POST", + body: "hello POST", + }, + { + name: "test PATCH", + method: "PATCH", + body: "hello PATCH", + }, + { + name: "test PUT", + method: "PUT", + body: "hello PUT", + }} { + t.Run(tt.name, func(t *testing.T) { + i := 0 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if i == 0 { + i++ + w.WriteHeader(http.StatusBadGateway) + return + } + + got, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("got no data") + } + s := string(got) + if tt.body != s { + t.Fatalf("Failed to get the right data want: %q, got: %q", tt.body, s) + } + + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + noleak.Check(t) + + fr := make(filters.Registry) + retry := NewRetry() + fr.Register(retry) + r := &eskip.Route{ + Filters: []*eskip.Filter{ + {Name: retry.Name()}, + }, + Backend: backend.URL, + } + + proxy := proxytest.New(fr, r) + defer proxy.Close() + + buf := bytes.NewBufferString(tt.body) + req, err := http.NewRequest(tt.method, proxy.URL, buf) + if err != nil { + t.Fatal(err) + } + + rsp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Failed to execute retry: %v", err) + } + + if rsp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %s", rsp.Status) + } + rsp.Body.Close() + }) + } +} diff --git a/io/copy_stream.go b/io/copy_stream.go new file mode 100644 index 0000000000..40ce810c4c --- /dev/null +++ b/io/copy_stream.go @@ -0,0 +1,49 @@ +package io + +import ( + "io" +) + +type ReadWriterLen interface { + io.ReadWriter + Len() int +} + +type CopyBodyStream struct { + left int + buf ReadWriterLen + input io.ReadCloser +} + +func NewCopyBodyStream(left int, buf ReadWriterLen, rc io.ReadCloser) *CopyBodyStream { + return &CopyBodyStream{ + left: left, + buf: buf, + input: rc, + } +} + +func (cb *CopyBodyStream) Len() int { + return cb.buf.Len() +} + +func (cb *CopyBodyStream) Read(p []byte) (n int, err error) { + n, err = cb.input.Read(p) + if cb.left > 0 && n > 0 { + m := min(n, cb.left) + written, err := cb.buf.Write(p[:m]) + if err != nil { + return 0, err + } + cb.left -= written + } + return n, err +} + +func (cb *CopyBodyStream) Close() error { + return cb.input.Close() +} + +func (cb *CopyBodyStream) GetBody() io.ReadCloser { + return io.NopCloser(cb.buf) +} diff --git a/io/copy_stream_test.go b/io/copy_stream_test.go new file mode 100644 index 0000000000..ac2773e526 --- /dev/null +++ b/io/copy_stream_test.go @@ -0,0 +1,50 @@ +package io + +import ( + "bytes" + "io" + "testing" +) + +func TestCopyBodyStream(t *testing.T) { + s := "content" + bbuf := io.NopCloser(bytes.NewBufferString(s)) + cbs := NewCopyBodyStream(len(s), &bytes.Buffer{}, bbuf) + + buf := make([]byte, len(s)) + _, err := cbs.Read(buf) + if err != nil { + t.Fatal(err) + } + + if cbs.Len() != len(buf) { + t.Fatalf("Failed to have the same buf buffer size want: %d, got: %d", cbs.Len(), len(buf)) + } + + got, err := io.ReadAll(cbs.GetBody()) + if err != nil { + t.Fatalf("Failed to read: %v", err) + } + if gotStr := string(got); s != gotStr { + t.Fatalf("Failed to get the right content: %s != %s", s, gotStr) + } + + if err = cbs.Close(); err != nil { + t.Fatal(err) + } +} + +func TestCopyBodyStreamFailedRead(t *testing.T) { + s := "content" + bbuf := io.NopCloser(bytes.NewBufferString(s)) + + failingBuf := &failingWriter{buf: &bytes.Buffer{}} + + cbs := NewCopyBodyStream(len(s), failingBuf, bbuf) + + buf := make([]byte, len(s)) + _, err := cbs.Read(buf) + if err == nil { + t.Fatal("Want to have failing buffer write inside Read()") + } +} diff --git a/io/failing_write_buffer_test.go b/io/failing_write_buffer_test.go new file mode 100644 index 0000000000..81ec3367c6 --- /dev/null +++ b/io/failing_write_buffer_test.go @@ -0,0 +1,22 @@ +package io + +import ( + "bytes" + "fmt" +) + +type failingWriter struct { + buf *bytes.Buffer +} + +func (*failingWriter) Write([]byte) (int, error) { + return 0, fmt.Errorf("failed to write") +} + +func (fw *failingWriter) Read(p []byte) (int, error) { + return fw.buf.Read(p) +} + +func (fw *failingWriter) Len() int { + return fw.buf.Len() +} diff --git a/net/httpclient.go b/net/httpclient.go index 0de67d962f..b545a3fda5 100644 --- a/net/httpclient.go +++ b/net/httpclient.go @@ -1,7 +1,9 @@ package net import ( + "bytes" "crypto/tls" + "errors" "fmt" "io" "net/http" @@ -14,6 +16,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" + skpio "github.com/zalando/skipper/io" "github.com/zalando/skipper/logging" "github.com/zalando/skipper/secrets" ) @@ -23,15 +26,18 @@ const ( defaultRefreshInterval = 5 * time.Minute ) +var errRequestNotFound = errors.New("request not found") + // Client adds additional features like Bearer token injection, and // opentracing to the wrapped http.Client with the same interface as // http.Client from the stdlib. type Client struct { - once sync.Once - client http.Client - tr *Transport - log logging.Logger - sr secrets.SecretsReader + once sync.Once + client http.Client + tr *Transport + log logging.Logger + sr secrets.SecretsReader + retryBuffers *sync.Map } // NewClient creates a wrapped http.Client and uses Transport to @@ -67,9 +73,10 @@ func NewClient(o Options) *Client { Transport: tr, CheckRedirect: o.CheckRedirect, }, - tr: tr, - log: o.Log, - sr: sr, + tr: tr, + log: o.Log, + sr: sr, + retryBuffers: &sync.Map{}, } return c @@ -125,9 +132,33 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { req.Header.Set("Authorization", "Bearer "+string(b)) } } + if req.Body != nil && req.Body != http.NoBody && req.ContentLength > 0 { + retryBuffer := skpio.NewCopyBodyStream(int(req.ContentLength), &bytes.Buffer{}, req.Body) + c.retryBuffers.Store(req, retryBuffer) + req.Body = retryBuffer + } return c.client.Do(req) } +func (c *Client) Retry(req *http.Request) (*http.Response, error) { + if req.Body == nil || req.Body == http.NoBody { + return c.Do(req) + } + + buf, ok := c.retryBuffers.LoadAndDelete(req) + if !ok { + return nil, fmt.Errorf("no retry possible, %w: %s %s", errRequestNotFound, req.Method, req.URL) + } + + retryBuffer, ok := buf.(*skpio.CopyBodyStream) + if !ok { + return nil, fmt.Errorf("no retry possible, no retry buffer for request: %s %s", req.Method, req.URL) + } + req.Body = retryBuffer.GetBody() + + return c.Do(req) +} + // CloseIdleConnections delegates the call to the underlying // http.Client. func (c *Client) CloseIdleConnections() { diff --git a/net/httpclient_test.go b/net/httpclient_test.go index 980bff2da8..79f030279d 100644 --- a/net/httpclient_test.go +++ b/net/httpclient_test.go @@ -1,13 +1,17 @@ package net import ( + "bytes" + "errors" "fmt" + "io" "net/http" "net/http/httptest" "net/url" "os" "path/filepath" "testing" + "testing/iotest" "time" "github.com/AlexanderYastrebov/noleak" @@ -426,3 +430,370 @@ func TestClientClosesIdleConnections(t *testing.T) { } rsp.Body.Close() } + +func TestClientRetry(t *testing.T) { + for _, tt := range []struct { + name string + method string + body string + }{ + { + name: "test GET", + method: "GET", + }, + { + name: "test POST", + method: "POST", + body: "hello POST", + }, + { + name: "test PATCH", + method: "PATCH", + body: "hello PATCH", + }, + { + name: "test PUT", + method: "PUT", + body: "hello PUT", + }} { + t.Run(tt.name, func(t *testing.T) { + i := 0 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if i == 0 { + i++ + w.WriteHeader(http.StatusBadGateway) + return + } + + got, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("got no data") + } + s := string(got) + if tt.body != s { + t.Fatalf("Failed to get the right data want: %q, got: %q", tt.body, s) + } + + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + noleak.Check(t) + + cli := NewClient(Options{}) + defer cli.Close() + + buf := bytes.NewBufferString(tt.body) + req, err := http.NewRequest(tt.method, backend.URL, buf) + if err != nil { + t.Fatal(err) + } + rsp, err := cli.Do(req) + if err != nil { + t.Fatal(err) + } + if rsp.StatusCode != http.StatusBadGateway { + t.Fatalf("unexpected status code: %v", rsp.StatusCode) + } + + rsp, err = cli.Retry(req) + if err != nil { + t.Fatalf("Failed to execute retry: %v", err) + } + + if rsp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %v", rsp.StatusCode) + } + rsp.Body.Close() + }) + } +} + +func TestClientRetryConcurrentRequests(t *testing.T) { + for _, tt := range []struct { + name string + method string + body string + }{ + { + name: "test GET", + method: "GET", + }, + { + name: "test POST", + method: "POST", + body: "hello POST", + }, + { + name: "test PATCH", + method: "PATCH", + body: "hello PATCH", + }, + { + name: "test PUT", + method: "PUT", + body: "hello PUT", + }} { + t.Run(tt.name, func(t *testing.T) { + i := 0 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/ignore" { + w.WriteHeader(http.StatusOK) + return + } + + if i == 0 { + i++ + io.ReadAll(r.Body) + w.WriteHeader(http.StatusBadGateway) + return + } + + got, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("got no data") + } + s := string(got) + if tt.body != s { + t.Fatalf("Failed to get the right data want: %q, got: %q", tt.body, s) + } + + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + noleak.Check(t) + + cli := NewClient(Options{}) + defer cli.Close() + + quit := make(chan struct{}) + go func() { + for { + select { + case <-quit: + return + default: + } + cli.Get(backend.URL + "/ignore") + } + }() + + buf := bytes.NewBufferString(tt.body) + req, err := http.NewRequest(tt.method, backend.URL, buf) + if err != nil { + t.Fatal(err) + } + rsp, err := cli.Do(req) + if err != nil { + t.Fatal(err) + } + if rsp.StatusCode != http.StatusBadGateway { + t.Fatalf("unexpected status code: %v", rsp.StatusCode) + } + + rsp, err = cli.Retry(req) + if err != nil { + t.Fatalf("Failed to execute retry: %v", err) + } + if rsp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %v", rsp.StatusCode) + } + rsp.Body.Close() + + close(quit) + }) + } +} + +func TestClientRetryFailConcurrentRequests(t *testing.T) { + for _, tt := range []struct { + name string + method string + body string + }{ + { + name: "test GET", + method: "GET", + }, + { + name: "test POST", + method: "POST", + body: "hello POST", + }, + { + name: "test PATCH", + method: "PATCH", + body: "hello PATCH", + }, + { + name: "test PUT", + method: "PUT", + body: "hello PUT", + }} { + t.Run(tt.name, func(t *testing.T) { + i := 0 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/ignore" { + w.WriteHeader(http.StatusOK) + return + } + + if i < 3 { + i++ + io.ReadAll(r.Body) + w.WriteHeader(http.StatusBadGateway) + return + } + + got, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("got no data") + } + s := string(got) + if tt.body != s { + t.Fatalf("Failed to get the right data want: %q, got: %q", tt.body, s) + } + + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + noleak.Check(t) + + cli := NewClient(Options{}) + defer cli.Close() + + quit := make(chan struct{}) + go func() { + for { + select { + case <-quit: + return + default: + } + cli.Get(backend.URL + "/ignore") + } + }() + + buf := bytes.NewBufferString(tt.body) + req, err := http.NewRequest(tt.method, backend.URL, buf) + if err != nil { + t.Fatal(err) + } + rsp, err := cli.Do(req) + if err != nil { + t.Fatal(err) + } + if rsp.StatusCode != http.StatusBadGateway { + t.Fatalf("unexpected status code: %s", rsp.Status) + } + + for i := 0; i < 2; i++ { + rsp, err = cli.Retry(req) + if err != nil { + t.Fatalf("Failed to execute retry: %v", err) + } + if rsp.StatusCode != http.StatusBadGateway { + t.Fatalf("unexpected status code: %s", rsp.Status) + } + } + + rsp, err = cli.Retry(req) + if err != nil { + t.Fatalf("Failed to execute retry: %v", err) + } + if rsp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %s", rsp.Status) + } + rsp.Body.Close() + + close(quit) + }) + } +} + +func TestClientRetryBodyHalfReader(t *testing.T) { + for _, tt := range []struct { + name string + method string + body string + }{ + { + name: "test GET", + method: "GET", + }, + { + name: "test POST", + method: "POST", + body: "hello POST", + }, + { + name: "test PATCH", + method: "PATCH", + body: "hello PATCH", + }, + { + name: "test PUT", + method: "PUT", + body: "hello PUT", + }} { + t.Run(tt.name, func(t *testing.T) { + i := 0 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if i == 0 { + i++ + w.WriteHeader(http.StatusBadGateway) + return + } + + got, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("got no data") + } + + s := string(got) + if len(s) != 0 { + t.Fatalf("Failed to get the right data got: %q", s) + } + + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + noleak.Check(t) + + cli := NewClient(Options{}) + defer cli.Close() + + b := bytes.NewBufferString(tt.body) + buf := iotest.HalfReader(b) + + req, err := http.NewRequest(tt.method, backend.URL, buf) + if err != nil { + t.Fatal(err) + } + rsp, err := cli.Do(req) + if err != nil { + t.Fatal(err) + } + if rsp.StatusCode != http.StatusBadGateway { + t.Fatalf("unexpected status code: %s", rsp.Status) + } + + rsp, err = cli.Retry(req) + if err != nil { + if !errors.Is(err, errRequestNotFound) { + t.Fatalf("Failed to execute retry: %v", err) + } else { + return + } + } + + if rsp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %s", rsp.Status) + } + rsp.Body.Close() + }) + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 5553a2fdd5..1a87498538 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1272,8 +1272,16 @@ func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) { return errCircuitBreakerOpen } + var retryBuffer *skpio.CopyBodyStream + retryConfig, ok := ctx.StateBag()[filters.RetryName] + if ok { + retryBuffer = skpio.NewCopyBodyStream(int(ctx.Request().ContentLength), &bytes.Buffer{}, ctx.Request().Body) + ctx.Request().Body = retryBuffer + } + backendContext := ctx.request.Context() if timeout, ok := ctx.StateBag()[filters.BackendTimeout]; ok { + defer ctx.cancelBackendContext() backendContext, ctx.cancelBackendContext = stdlibcontext.WithTimeout(backendContext, timeout.(time.Duration)) } @@ -1312,6 +1320,23 @@ func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) { p.applyFiltersOnError(ctx, processedFilters) return perr2 } + + } else if retryConfig != nil { + perr = nil + var perr2 *proxyError + + ctx.request.Body = retryBuffer.GetBody() + rsp, perr2 = p.makeBackendRequest(ctx, backendContext) + if perr2 != nil { + ctx.Logger().Errorf("Failed to retry backend request by filter: %v", perr2) + if perr2.code >= http.StatusInternalServerError { + p.metrics.MeasureBackend5xx(backendStart) + } + p.makeErrorResponse(ctx, perr2) + p.applyFiltersOnError(ctx, processedFilters) + return perr2 + } + } else { p.makeErrorResponse(ctx, perr) p.applyFiltersOnError(ctx, processedFilters) @@ -1321,6 +1346,21 @@ func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) { if rsp.StatusCode >= http.StatusInternalServerError { p.metrics.MeasureBackend5xx(backendStart) + + if retryConfig != nil { + perr = nil + ctx.request.Body = retryBuffer.GetBody() + rsp, perr = p.makeBackendRequest(ctx, backendContext) + if perr != nil { + ctx.Logger().Errorf("Failed to retry backend request by filter: %v", perr) + if perr.code >= http.StatusInternalServerError { + p.metrics.MeasureBackend5xx(backendStart) + } + p.makeErrorResponse(ctx, perr) + p.applyFiltersOnError(ctx, processedFilters) + return perr + } + } } if done != nil { From 0a27cce822ea31fbabee375b8130bbc072157247 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Fri, 16 Aug 2024 14:53:39 +0200 Subject: [PATCH 2/2] add yaml config for retry() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- filters/retry/retry.go | 78 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/filters/retry/retry.go b/filters/retry/retry.go index 3890f87df4..89f04b95a4 100644 --- a/filters/retry/retry.go +++ b/filters/retry/retry.go @@ -1,18 +1,82 @@ package retry import ( + "fmt" + "net/http" + "github.com/zalando/skipper/filters" + "gopkg.in/yaml.v2" +) + +const ( + single = "single" ) -type retry struct{} +type ( + retrySpec struct{} + RetryFilter struct { + Type string `json:"type,omitempty"` + StatusCodes []int `json:"status-codes,omitempty"` + MaxTimes int `json:"max-times,omitempty"` + + Check func(*http.Response) bool + } +) // NewRetry creates a filter specification for the retry() filter -func NewRetry() filters.Spec { return retry{} } +func NewRetry() filters.Spec { return &retrySpec{} } + +func (*retrySpec) Name() string { return filters.RetryName } + +func (s *retrySpec) CreateFilter(args []interface{}) (filters.Filter, error) { + rf := &RetryFilter{} + + if config, ok := args[0].(string); !ok { + return nil, fmt.Errorf("filter %q requires single string argument", s.Name()) + } else if err := yaml.Unmarshal([]byte(config), rf); err != nil { + return nil, fmt.Errorf("failed to parse configuration: %w", err) + } + + switch rf.Type { + case single: + i := 0 + rf.Check = func(rsp *http.Response) bool { + i++ + if i > rf.MaxTimes { + return false + } + return shouldRetry(rsp.StatusCode, rf.StatusCodes) + } + } + + return rf, nil +} + +// copy from proxy.shouldLog +func shouldRetry(statusCode int, prefixes []int) bool { + if len(prefixes) == 0 { + return false + } + + match := false + for _, prefix := range prefixes { + switch { + case prefix < 10: + match = (statusCode >= prefix*100 && statusCode < (prefix+1)*100) + case prefix < 100: + match = (statusCode >= prefix*10 && statusCode < (prefix+1)*10) + default: + match = statusCode == prefix + } + if match { + break + } + } + return match +} -func (retry) Name() string { return filters.RetryName } -func (retry) CreateFilter([]interface{}) (filters.Filter, error) { return retry{}, nil } -func (retry) Response(filters.FilterContext) {} +func (rf *RetryFilter) Response(filters.FilterContext) {} -func (retry) Request(ctx filters.FilterContext) { - ctx.StateBag()[filters.RetryName] = struct{}{} +func (rf *RetryFilter) Request(ctx filters.FilterContext) { + ctx.StateBag()[filters.RetryName] = rf.Check }