Skip to content

Commit ce501f8

Browse files
skipper: add server connection keepalive limits (#3246)
Clients may connect to a subset of Skipper fleet which leads to uneven request distribution and increased cpu usage. Autoscaling of Skipper fleet is not effective because clients stay connected to old instances while new instances are underutilized. This change adds ConnManager that tracks creation of new connections and closes connections when their age or number of requests served reaches configured limits. Signed-off-by: Alexander Yastrebov <[email protected]>
1 parent 85937e6 commit ce501f8

File tree

5 files changed

+255
-3
lines changed

5 files changed

+255
-3
lines changed

config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ type Config struct {
243243
ReadHeaderTimeoutServer time.Duration `yaml:"read-header-timeout-server"`
244244
WriteTimeoutServer time.Duration `yaml:"write-timeout-server"`
245245
IdleTimeoutServer time.Duration `yaml:"idle-timeout-server"`
246+
KeepaliveServer time.Duration `yaml:"keepalive-server"`
247+
KeepaliveRequestsServer int `yaml:"keepalive-requests-server"`
246248
MaxHeaderBytes int `yaml:"max-header-bytes"`
247249
EnableConnMetricsServer bool `yaml:"enable-connection-metrics"`
248250
TimeoutBackend time.Duration `yaml:"timeout-backend"`
@@ -544,6 +546,8 @@ func NewConfig() *Config {
544546
flag.DurationVar(&cfg.ReadHeaderTimeoutServer, "read-header-timeout-server", 60*time.Second, "set ReadHeaderTimeout for http server connections")
545547
flag.DurationVar(&cfg.WriteTimeoutServer, "write-timeout-server", 60*time.Second, "set WriteTimeout for http server connections")
546548
flag.DurationVar(&cfg.IdleTimeoutServer, "idle-timeout-server", 60*time.Second, "set IdleTimeout for http server connections")
549+
flag.DurationVar(&cfg.KeepaliveServer, "keepalive-server", 0*time.Second, "sets maximum age for http server connections. The connection is closed after it existed for this duration. Default is 0 for unlimited.")
550+
flag.IntVar(&cfg.KeepaliveRequestsServer, "keepalive-requests-server", 0, "sets maximum number of requests for http server connections. The connection is closed after serving this number of requests. Default is 0 for unlimited.")
547551
flag.IntVar(&cfg.MaxHeaderBytes, "max-header-bytes", http.DefaultMaxHeaderBytes, "set MaxHeaderBytes for http server connections")
548552
flag.BoolVar(&cfg.EnableConnMetricsServer, "enable-connection-metrics", false, "enables connection metrics for http server connections")
549553
flag.DurationVar(&cfg.TimeoutBackend, "timeout-backend", 60*time.Second, "sets the TCP client connection timeout for backend connections")
@@ -879,6 +883,8 @@ func (c *Config) ToOptions() skipper.Options {
879883
ReadHeaderTimeoutServer: c.ReadHeaderTimeoutServer,
880884
WriteTimeoutServer: c.WriteTimeoutServer,
881885
IdleTimeoutServer: c.IdleTimeoutServer,
886+
KeepaliveServer: c.KeepaliveServer,
887+
KeepaliveRequestsServer: c.KeepaliveRequestsServer,
882888
MaxHeaderBytes: c.MaxHeaderBytes,
883889
EnableConnMetricsServer: c.EnableConnMetricsServer,
884890
TimeoutBackend: c.TimeoutBackend,

docs/operation/operation.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,16 @@ combinations of idle timeouts can lead to a few unexpected HTTP 502.
124124
-idle-timeout-server duration
125125
maximum idle connections per backend host (default 1m0s)
126126

127+
This configures maximum number of requests served by server connections:
128+
129+
-keepalive-requests-server int
130+
sets maximum number of requests for http server connections. The connection is closed after serving this number of requests. Default is 0 for unlimited.
131+
132+
This configures maximum age for server connections:
133+
134+
-keepalive-server duration
135+
sets maximum age for http server connections. The connection is closed after it existed for this duration. Default is 0 for unlimited.
136+
127137
This will set MaxHeaderBytes in
128138
[http.Server](https://golang.org/pkg/net/http/#Server) to limit the
129139
size of the http header from your clients.

net/connmanager.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package net
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"net/http"
8+
"time"
9+
10+
"github.com/zalando/skipper/metrics"
11+
)
12+
13+
// ConnManager tracks creation of HTTP server connections and
14+
// closes connections when their age or number of requests served reaches configured limits.
15+
// Use [ConnManager.Configure] method to setup ConnManager for an [http.Server].
16+
type ConnManager struct {
17+
// Metrics is an optional metrics registry to count connection events.
18+
Metrics metrics.Metrics
19+
20+
// Keepalive is the duration after which server connection is closed.
21+
Keepalive time.Duration
22+
23+
// KeepaliveRequests is the number of requests after which server connection is closed.
24+
KeepaliveRequests int
25+
26+
handler http.Handler
27+
}
28+
29+
type connState struct {
30+
expiresAt time.Time
31+
requests int
32+
}
33+
34+
type contextKey struct{}
35+
36+
var connection contextKey
37+
38+
func (cm *ConnManager) Configure(server *http.Server) {
39+
cm.handler = server.Handler
40+
server.Handler = http.HandlerFunc(cm.serveHTTP)
41+
42+
if cc := server.ConnContext; cc != nil {
43+
server.ConnContext = func(ctx context.Context, c net.Conn) context.Context {
44+
ctx = cc(ctx, c)
45+
return cm.connContext(ctx, c)
46+
}
47+
} else {
48+
server.ConnContext = cm.connContext
49+
}
50+
51+
if cs := server.ConnState; cs != nil {
52+
server.ConnState = func(c net.Conn, state http.ConnState) {
53+
cs(c, state)
54+
cm.connState(c, state)
55+
}
56+
} else {
57+
server.ConnState = cm.connState
58+
}
59+
}
60+
61+
func (cm *ConnManager) serveHTTP(w http.ResponseWriter, r *http.Request) {
62+
state, _ := r.Context().Value(connection).(*connState)
63+
state.requests++
64+
65+
if cm.KeepaliveRequests > 0 && state.requests >= cm.KeepaliveRequests {
66+
w.Header().Set("Connection", "close")
67+
68+
cm.count("lb-conn-closed.keepalive-requests")
69+
}
70+
71+
if cm.Keepalive > 0 && time.Now().After(state.expiresAt) {
72+
w.Header().Set("Connection", "close")
73+
74+
cm.count("lb-conn-closed.keepalive")
75+
}
76+
77+
cm.handler.ServeHTTP(w, r)
78+
}
79+
80+
func (cm *ConnManager) connContext(ctx context.Context, _ net.Conn) context.Context {
81+
state := &connState{
82+
expiresAt: time.Now().Add(cm.Keepalive),
83+
}
84+
return context.WithValue(ctx, connection, state)
85+
}
86+
87+
func (cm *ConnManager) connState(_ net.Conn, state http.ConnState) {
88+
cm.count(fmt.Sprintf("lb-conn-%s", state))
89+
}
90+
91+
func (cm *ConnManager) count(name string) {
92+
if cm.Metrics != nil {
93+
cm.Metrics.IncCounter(name)
94+
}
95+
}

net/connmanager_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package net_test
2+
3+
import (
4+
"net/http"
5+
"net/http/httptest"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"github.com/zalando/skipper/metrics/metricstest"
12+
snet "github.com/zalando/skipper/net"
13+
)
14+
15+
func TestConnManager(t *testing.T) {
16+
const (
17+
keepaliveRequests = 3
18+
keepalive = 100 * time.Millisecond
19+
20+
testRequests = keepaliveRequests * 5
21+
)
22+
t.Run("does not close connection without limits", func(t *testing.T) {
23+
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
24+
w.WriteHeader(http.StatusOK)
25+
}))
26+
m := &metricstest.MockMetrics{}
27+
cm := &snet.ConnManager{
28+
Metrics: m,
29+
}
30+
cm.Configure(ts.Config)
31+
32+
ts.Start()
33+
defer ts.Close()
34+
35+
for i := 0; i < testRequests; i++ {
36+
resp, err := ts.Client().Get(ts.URL)
37+
require.NoError(t, err)
38+
assert.Equal(t, http.StatusOK, resp.StatusCode)
39+
assert.False(t, resp.Close)
40+
}
41+
42+
time.Sleep(100 * time.Millisecond) // wait for connection state update
43+
44+
m.WithCounters(func(counters map[string]int64) {
45+
assert.Equal(t, int64(1), counters["lb-conn-new"])
46+
assert.Equal(t, int64(testRequests), counters["lb-conn-active"])
47+
assert.Equal(t, int64(testRequests), counters["lb-conn-idle"])
48+
assert.Equal(t, int64(0), counters["lb-conn-closed"])
49+
})
50+
})
51+
t.Run("closes connection after keepalive requests", func(t *testing.T) {
52+
const keepaliveRequests = 3
53+
54+
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
55+
w.WriteHeader(http.StatusOK)
56+
}))
57+
m := &metricstest.MockMetrics{}
58+
cm := &snet.ConnManager{
59+
Metrics: m,
60+
KeepaliveRequests: keepaliveRequests,
61+
}
62+
cm.Configure(ts.Config)
63+
64+
ts.Start()
65+
defer ts.Close()
66+
67+
for i := 1; i < testRequests; i++ {
68+
resp, err := ts.Client().Get(ts.URL)
69+
require.NoError(t, err)
70+
assert.Equal(t, http.StatusOK, resp.StatusCode)
71+
72+
if i%keepaliveRequests == 0 {
73+
assert.True(t, resp.Close)
74+
} else {
75+
assert.False(t, resp.Close)
76+
}
77+
}
78+
79+
time.Sleep(100 * time.Millisecond) // wait for connection state update
80+
81+
m.WithCounters(func(counters map[string]int64) {
82+
rounds := int64(testRequests / keepaliveRequests)
83+
84+
assert.Equal(t, rounds, counters["lb-conn-new"])
85+
assert.Equal(t, rounds-1, counters["lb-conn-closed"])
86+
assert.Equal(t, rounds-1, counters["lb-conn-closed.keepalive-requests"])
87+
})
88+
})
89+
90+
t.Run("closes connection after keepalive timeout", func(t *testing.T) {
91+
const keepalive = 100 * time.Millisecond
92+
93+
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
94+
w.WriteHeader(http.StatusOK)
95+
}))
96+
m := &metricstest.MockMetrics{}
97+
cm := &snet.ConnManager{
98+
Metrics: m,
99+
Keepalive: keepalive,
100+
}
101+
cm.Configure(ts.Config)
102+
103+
ts.Start()
104+
defer ts.Close()
105+
106+
for i := 0; i < testRequests; i++ {
107+
resp, err := ts.Client().Get(ts.URL)
108+
require.NoError(t, err)
109+
assert.Equal(t, http.StatusOK, resp.StatusCode)
110+
assert.False(t, resp.Close)
111+
}
112+
113+
time.Sleep(2 * keepalive)
114+
115+
resp, err := ts.Client().Get(ts.URL)
116+
require.NoError(t, err)
117+
assert.Equal(t, http.StatusOK, resp.StatusCode)
118+
assert.True(t, resp.Close)
119+
120+
time.Sleep(100 * time.Millisecond) // wait for connection state update
121+
122+
m.WithCounters(func(counters map[string]int64) {
123+
assert.Equal(t, int64(1), counters["lb-conn-new"])
124+
assert.Equal(t, int64(1), counters["lb-conn-closed"])
125+
assert.Equal(t, int64(1), counters["lb-conn-closed.keepalive"])
126+
})
127+
})
128+
}

skipper.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,14 @@ type Options struct {
360360
// Defines IdleTimeout for server http connections.
361361
IdleTimeoutServer time.Duration
362362

363+
// KeepaliveServer configures maximum age for server http connections.
364+
// The connection is closed after it existed for this duration.
365+
KeepaliveServer time.Duration
366+
367+
// KeepaliveRequestsServer configures maximum number of requests for server http connections.
368+
// The connection is closed after serving this number of requests.
369+
KeepaliveRequestsServer int
370+
363371
// Defines MaxHeaderBytes for server http connections.
364372
MaxHeaderBytes int
365373

@@ -1334,12 +1342,17 @@ func listenAndServeQuit(
13341342
ErrorLog: newServerErrorLog(),
13351343
}
13361344

1345+
cm := &skpnet.ConnManager{
1346+
Keepalive: o.KeepaliveServer,
1347+
KeepaliveRequests: o.KeepaliveRequestsServer,
1348+
}
1349+
13371350
if o.EnableConnMetricsServer {
1338-
srv.ConnState = func(conn net.Conn, state http.ConnState) {
1339-
mtr.IncCounter(fmt.Sprintf("lb-conn-%s", state))
1340-
}
1351+
cm.Metrics = mtr
13411352
}
13421353

1354+
cm.Configure(srv)
1355+
13431356
log.Infof("Listen on %v", address)
13441357

13451358
l, err := listen(o, address, mtr)

0 commit comments

Comments
 (0)