Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: http retry #2333

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/zalando/skipper/filters/fadein"
"github.com/zalando/skipper/filters/flowid"
logfilter "github.com/zalando/skipper/filters/log"
"github.com/zalando/skipper/filters/retry"
"github.com/zalando/skipper/filters/rfc"
"github.com/zalando/skipper/filters/scheduler"
"github.com/zalando/skipper/filters/sed"
Expand Down Expand Up @@ -231,6 +232,7 @@ func Filters() []filters.Spec {
fadein.NewEndpointCreated(),
consistenthash.NewConsistentHashKey(),
consistenthash.NewConsistentHashBalanceFactor(),
retry.NewRetry(),
tls.New(),
}
}
Expand Down
1 change: 1 addition & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ const (
FifoWithBodyName = "fifoWithBody"
LifoName = "lifo"
LifoGroupName = "lifoGroup"
RetryName = "retry"
RfcPathName = "rfcPath"
RfcHostName = "rfcHost"
BearerInjectorName = "bearerinjector"
Expand Down
82 changes: 82 additions & 0 deletions filters/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package retry

import (
"fmt"
"net/http"

"github.com/zalando/skipper/filters"
"gopkg.in/yaml.v2"
)

const (
single = "single"
)

type (
retrySpec struct{}
RetryFilter struct {
Type string `json:"type,omitempty"`
StatusCodes []int `json:"status-codes,omitempty"`
MaxTimes int `json:"max-times,omitempty"`

Check func(*http.Response) bool
}
)

// NewRetry creates a filter specification for the retry() filter
func NewRetry() filters.Spec { return &retrySpec{} }

func (*retrySpec) Name() string { return filters.RetryName }

func (s *retrySpec) CreateFilter(args []interface{}) (filters.Filter, error) {
rf := &RetryFilter{}

if config, ok := args[0].(string); !ok {
return nil, fmt.Errorf("filter %q requires single string argument", s.Name())
} else if err := yaml.Unmarshal([]byte(config), rf); err != nil {
return nil, fmt.Errorf("failed to parse configuration: %w", err)
}

switch rf.Type {
case single:
i := 0
rf.Check = func(rsp *http.Response) bool {
i++
if i > rf.MaxTimes {
return false
}
return shouldRetry(rsp.StatusCode, rf.StatusCodes)
}
}

return rf, nil
}

// copy from proxy.shouldLog
func shouldRetry(statusCode int, prefixes []int) bool {
if len(prefixes) == 0 {
return false
}

match := false
for _, prefix := range prefixes {
switch {
case prefix < 10:
match = (statusCode >= prefix*100 && statusCode < (prefix+1)*100)
case prefix < 100:
match = (statusCode >= prefix*10 && statusCode < (prefix+1)*10)
default:
match = statusCode == prefix
}
if match {
break
}
}
return match
}

func (rf *RetryFilter) Response(filters.FilterContext) {}

func (rf *RetryFilter) Request(ctx filters.FilterContext) {
ctx.StateBag()[filters.RetryName] = rf.Check
}
95 changes: 95 additions & 0 deletions filters/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package retry

import (
"bytes"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/AlexanderYastrebov/noleak"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/proxy/proxytest"
)

func TestRetry(t *testing.T) {
for _, tt := range []struct {
name string
method string
body string
}{
{
name: "test GET",
method: "GET",
},
{
name: "test POST",
method: "POST",
body: "hello POST",
},
{
name: "test PATCH",
method: "PATCH",
body: "hello PATCH",
},
{
name: "test PUT",
method: "PUT",
body: "hello PUT",
}} {
t.Run(tt.name, func(t *testing.T) {
i := 0
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if i == 0 {
i++
w.WriteHeader(http.StatusBadGateway)
return
}

got, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("got no data")
}
s := string(got)
if tt.body != s {
t.Fatalf("Failed to get the right data want: %q, got: %q", tt.body, s)
}

w.WriteHeader(http.StatusOK)
}))
defer backend.Close()

noleak.Check(t)

fr := make(filters.Registry)
retry := NewRetry()
fr.Register(retry)
r := &eskip.Route{
Filters: []*eskip.Filter{
{Name: retry.Name()},
},
Backend: backend.URL,
}

proxy := proxytest.New(fr, r)
defer proxy.Close()

buf := bytes.NewBufferString(tt.body)
req, err := http.NewRequest(tt.method, proxy.URL, buf)
if err != nil {
t.Fatal(err)
}

rsp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to execute retry: %v", err)
}

if rsp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %s", rsp.Status)
}
rsp.Body.Close()
})
}
}
49 changes: 49 additions & 0 deletions io/copy_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io

import (
"io"
)

type ReadWriterLen interface {
io.ReadWriter
Len() int
}

type CopyBodyStream struct {
left int
buf ReadWriterLen
input io.ReadCloser
}

func NewCopyBodyStream(left int, buf ReadWriterLen, rc io.ReadCloser) *CopyBodyStream {
return &CopyBodyStream{
left: left,
buf: buf,
input: rc,
}
}

func (cb *CopyBodyStream) Len() int {
return cb.buf.Len()
}

func (cb *CopyBodyStream) Read(p []byte) (n int, err error) {
n, err = cb.input.Read(p)
if cb.left > 0 && n > 0 {
m := min(n, cb.left)
written, err := cb.buf.Write(p[:m])
if err != nil {
return 0, err
}
cb.left -= written
}
return n, err
}

func (cb *CopyBodyStream) Close() error {
return cb.input.Close()
}

func (cb *CopyBodyStream) GetBody() io.ReadCloser {
return io.NopCloser(cb.buf)
}
50 changes: 50 additions & 0 deletions io/copy_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io

import (
"bytes"
"io"
"testing"
)

func TestCopyBodyStream(t *testing.T) {
s := "content"
bbuf := io.NopCloser(bytes.NewBufferString(s))
cbs := NewCopyBodyStream(len(s), &bytes.Buffer{}, bbuf)

buf := make([]byte, len(s))
_, err := cbs.Read(buf)
if err != nil {
t.Fatal(err)
}

if cbs.Len() != len(buf) {
t.Fatalf("Failed to have the same buf buffer size want: %d, got: %d", cbs.Len(), len(buf))
}

got, err := io.ReadAll(cbs.GetBody())
if err != nil {
t.Fatalf("Failed to read: %v", err)
}
if gotStr := string(got); s != gotStr {
t.Fatalf("Failed to get the right content: %s != %s", s, gotStr)
}

if err = cbs.Close(); err != nil {
t.Fatal(err)
}
}

func TestCopyBodyStreamFailedRead(t *testing.T) {
s := "content"
bbuf := io.NopCloser(bytes.NewBufferString(s))

failingBuf := &failingWriter{buf: &bytes.Buffer{}}

cbs := NewCopyBodyStream(len(s), failingBuf, bbuf)

buf := make([]byte, len(s))
_, err := cbs.Read(buf)
if err == nil {
t.Fatal("Want to have failing buffer write inside Read()")
}
}
22 changes: 22 additions & 0 deletions io/failing_write_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io

import (
"bytes"
"fmt"
)

type failingWriter struct {
buf *bytes.Buffer
}

func (*failingWriter) Write([]byte) (int, error) {
return 0, fmt.Errorf("failed to write")
}

func (fw *failingWriter) Read(p []byte) (int, error) {
return fw.buf.Read(p)
}

func (fw *failingWriter) Len() int {
return fw.buf.Len()
}
Loading
Loading