Skip to content

Commit accfde5

Browse files
RomanZavodskikhRoman Zavodskikh
andauthored
Passive health checks implemented (#2888)
Signed-off-by: Roman Zavodskikh <[email protected]> Co-authored-by: Roman Zavodskikh <[email protected]>
1 parent 8c03dfb commit accfde5

16 files changed

+494
-93
lines changed

config/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ type Config struct {
285285
OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"`
286286
OpenPolicyAgentMaxRequestBodySize int64 `yaml:"open-policy-agent-max-request-body-size"`
287287
OpenPolicyAgentMaxMemoryBodyParsing int64 `yaml:"open-policy-agent-max-memory-body-parsing"`
288+
289+
PassiveHealthCheck mapFlags `yaml:"passive-health-check"`
288290
}
289291

290292
const (
@@ -571,6 +573,9 @@ func NewConfig() *Config {
571573
flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use <module>.<symbol> to selectively enable module symbols, for example: package,base._G,base.print,json")
572574
flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`)
573575

576+
// Passive Health Checks
577+
flag.Var(&cfg.PassiveHealthCheck, "passive-health-check", "sets the parameters for passive health check feature")
578+
574579
cfg.flags = flag
575580
return cfg
576581
}
@@ -912,6 +917,8 @@ func (c *Config) ToOptions() skipper.Options {
912917
OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout,
913918
OpenPolicyAgentMaxRequestBodySize: c.OpenPolicyAgentMaxRequestBodySize,
914919
OpenPolicyAgentMaxMemoryBodyParsing: c.OpenPolicyAgentMaxMemoryBodyParsing,
920+
921+
PassiveHealthCheck: c.PassiveHealthCheck.values,
915922
}
916923
for _, rcci := range c.CloneRoute {
917924
eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl)

docs/operation/operation.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,34 @@ to get the results paginated or getting all of them at the same time.
893893
curl localhost:9911/routes?offset=200&limit=100
894894
```
895895

896+
## Passive health check (*experimental*)
897+
898+
Skipper has an option to automatically detect and mitigate faulty backend endpoints, this feature is called
899+
Passive Health Check(PHC).
900+
901+
PHC works the following way: the entire uptime is divided in chunks of `period`, per every period Skipper calculates
902+
the total amount of requests and amount of requests failed per every endpoint. While next period is going on,
903+
the Skipper takes a look at previous period and if the amount of requests in the previous period is more than `min-requests`
904+
for the given endpoints then Skipper will send reduced (the more `max-drop-probability`
905+
and failed requests ratio in previous period are, the stronger reduction is)
906+
amount of requests compared to amount sent without PHC.
907+
908+
Having this, we expect less requests to fail because a lot of them would be sent to endpoints that seem to be healthy instead.
909+
910+
To enable this feature, you need to provide `-passive-health-check` option having all forementioned parameters
911+
(`period`, `min-requests`, `max-drop-probability`) defined,
912+
for instance: `-passive-health-check=period=1s,min-requests=10,max-drop-probability=0.9`.
913+
914+
You need to define all parameters on your side, there are no defaults, and skipper will not run if PHC params are passed only partially.
915+
916+
However, Skipper will run without this feature, if no `-passive-health-check` is provided at all.
917+
918+
The parameters of `-passive-health-check` option are:
919+
+ `period=<duration>` - the duration of stats reset period
920+
+ `min-requests=<int>` - the minimum number of requests per `period` per backend endpoint required to activate PHC for this endpoint
921+
+ `max-drop-probabilty=<float more than/equal to 0 and less than/equal to 1>` - the maximum possible probability of unhealthy endpoint being not considered
922+
while choosing the endpoint for the given request
923+
896924
## Memory consumption
897925

898926
While Skipper is generally not memory bound, some features may require

filters/fadein/fadein_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ func TestPostProcessor(t *testing.T) {
236236
`
237237

238238
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
239+
defer endpointRegistry.Close()
239240
rt, _ := createRouting(t, routes, endpointRegistry)
240241

241242
foo := route(rt, "/foo")
@@ -266,6 +267,7 @@ func TestPostProcessor(t *testing.T) {
266267
`
267268

268269
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
270+
defer endpointRegistry.Close()
269271
rt, _ := createRouting(t, routes, endpointRegistry)
270272
r := route(rt, "/")
271273
if r != nil {
@@ -278,13 +280,14 @@ func TestPostProcessor(t *testing.T) {
278280
* -> fadeIn("-1m") -> <"http://10.0.0.1:8080">
279281
`
280282

281-
endpointRegisty := routing.NewEndpointRegistry(routing.RegistryOptions{})
282-
rt, _ := createRouting(t, routes, endpointRegisty)
283+
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
284+
defer endpointRegistry.Close()
285+
rt, _ := createRouting(t, routes, endpointRegistry)
283286
r := route(rt, "/")
284287
if r == nil || len(r.LBEndpoints) == 0 {
285288
t.Fatal("failed to ignore negative duration")
286289
}
287-
if endpointRegisty.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() {
290+
if endpointRegistry.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() {
288291
t.Fatal("failed to ignore negative duration")
289292
}
290293
})
@@ -295,6 +298,7 @@ func TestPostProcessor(t *testing.T) {
295298
`
296299

297300
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
301+
defer endpointRegistry.Close()
298302
rt, update := createRouting(t, routes, endpointRegistry)
299303
firstDetected := time.Now()
300304

@@ -327,6 +331,7 @@ func TestPostProcessor(t *testing.T) {
327331
`
328332

329333
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
334+
defer endpointRegistry.Close()
330335
rt, update := createRouting(t, initialRoutes, endpointRegistry)
331336
firstDetected := time.Now()
332337

@@ -362,6 +367,7 @@ func TestPostProcessor(t *testing.T) {
362367

363368
const lastSeenTimeout = 2 * time.Second
364369
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{LastSeenTimeout: lastSeenTimeout})
370+
defer endpointRegistry.Close()
365371
rt, update := createRouting(t, initialRoutes, endpointRegistry)
366372
firstDetected := time.Now()
367373

@@ -397,6 +403,7 @@ func TestPostProcessor(t *testing.T) {
397403
`
398404

399405
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
406+
defer endpointRegistry.Close()
400407
routes := fmt.Sprintf(routesFmt, nows(t))
401408
rt, update := createRouting(t, routes, endpointRegistry)
402409
firstDetected := time.Now()

loadbalancer/algorithm_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func TestSelectAlgorithm(t *testing.T) {
3131
t.Run("LB route with default algorithm", func(t *testing.T) {
3232
p := NewAlgorithmProvider()
3333
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
34+
defer endpointRegistry.Close()
3435
r := &routing.Route{
3536
Route: eskip.Route{
3637
BackendType: eskip.LBBackend,
@@ -59,6 +60,7 @@ func TestSelectAlgorithm(t *testing.T) {
5960
t.Run("LB route with explicit round-robin algorithm", func(t *testing.T) {
6061
p := NewAlgorithmProvider()
6162
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
63+
defer endpointRegistry.Close()
6264
r := &routing.Route{
6365
Route: eskip.Route{
6466
BackendType: eskip.LBBackend,
@@ -88,6 +90,7 @@ func TestSelectAlgorithm(t *testing.T) {
8890
t.Run("LB route with explicit consistentHash algorithm", func(t *testing.T) {
8991
p := NewAlgorithmProvider()
9092
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
93+
defer endpointRegistry.Close()
9194
r := &routing.Route{
9295
Route: eskip.Route{
9396
BackendType: eskip.LBBackend,
@@ -117,6 +120,7 @@ func TestSelectAlgorithm(t *testing.T) {
117120
t.Run("LB route with explicit random algorithm", func(t *testing.T) {
118121
p := NewAlgorithmProvider()
119122
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
123+
defer endpointRegistry.Close()
120124
r := &routing.Route{
121125
Route: eskip.Route{
122126
BackendType: eskip.LBBackend,
@@ -146,6 +150,7 @@ func TestSelectAlgorithm(t *testing.T) {
146150
t.Run("LB route with explicit powerOfRandomNChoices algorithm", func(t *testing.T) {
147151
p := NewAlgorithmProvider()
148152
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
153+
defer endpointRegistry.Close()
149154
r := &routing.Route{
150155
Route: eskip.Route{
151156
BackendType: eskip.LBBackend,
@@ -260,6 +265,7 @@ func TestApply(t *testing.T) {
260265
req, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
261266
p := NewAlgorithmProvider()
262267
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
268+
defer endpointRegistry.Close()
263269
r := &routing.Route{
264270
Route: eskip.Route{
265271
BackendType: eskip.LBBackend,
@@ -293,6 +299,7 @@ func TestConsistentHashSearch(t *testing.T) {
293299
apply := func(key string, endpoints []string) string {
294300
p := NewAlgorithmProvider()
295301
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
302+
defer endpointRegistry.Close()
296303
r := &routing.Route{
297304
Route: eskip.Route{
298305
BackendType: eskip.LBBackend,
@@ -349,6 +356,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
349356
Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25},
350357
}
351358
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
359+
defer endpointRegistry.Close()
352360
endpointRegistry.Do([]*routing.Route{route})
353361
noLoad := ch.Apply(ctx)
354362
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}})
@@ -429,6 +437,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
429437
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
430438
}
431439
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
440+
defer endpointRegistry.Close()
432441
endpointRegistry.Do([]*routing.Route{route})
433442

434443
for i := 0; i < 100; i++ {

metricsinit_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ func TestInitOrderAndDefault(t *testing.T) {
5353
SwarmRedisURLs: []string{fmt.Sprintf("localhost:%d", redisPort)},
5454
EnableRatelimiters: true,
5555
SwarmRedisConnMetricsInterval: ringMetricsUpdatePeriod,
56+
PassiveHealthCheck: map[string]string{
57+
"period": "1m",
58+
"min-requests": "10",
59+
"max-drop-probability": "0.9",
60+
},
5661
}
5762

5863
tornDown := make(chan struct{})

proxy/fadein_internal_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur
9292
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
9393
}
9494

95-
proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}}
95+
proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}, quit: make(chan struct{})}
9696
return route, proxy, eps
9797
}
9898

@@ -103,6 +103,7 @@ func calculateFadeInDuration(t *testing.T, algorithmName string, endpointAges []
103103
const precalculateRatio = 10
104104

105105
route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDuration)
106+
defer proxy.Close()
106107
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
107108

108109
t.Log("preemulation start", time.Now())
@@ -125,6 +126,7 @@ func testFadeInMonotony(
125126
t.Run(name, func(t *testing.T) {
126127
fadeInDuration := calculateFadeInDuration(t, algorithmName, endpointAges)
127128
route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, fadeInDuration)
129+
defer proxy.Close()
128130

129131
t.Log("test start", time.Now())
130132
var stats []string
@@ -273,6 +275,7 @@ func testFadeInLoadBetweenOldAndNewEps(
273275
}
274276

275277
route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
278+
defer proxy.Close()
276279
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
277280
nReqs := map[string]int{}
278281

@@ -330,6 +333,7 @@ func testSelectEndpointEndsWhenAllEndpointsAreFading(
330333
// Initialize every endpoint with zero: every endpoint is new
331334
endpointAges := make([]float64, nEndpoints)
332335
route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
336+
defer proxy.Close()
333337
applied := make(chan struct{})
334338

335339
go func() {
@@ -364,6 +368,7 @@ func benchmarkFadeIn(
364368
) {
365369
b.Run(name, func(b *testing.B) {
366370
route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
371+
defer proxy.Close()
367372
var wg sync.WaitGroup
368373

369374
// Emulate the load balancer loop, sending requests to it with random hash keys

proxy/healthy_endpoints.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package proxy
2+
3+
import (
4+
"math/rand"
5+
6+
"github.com/zalando/skipper/routing"
7+
)
8+
9+
type healthyEndpoints struct {
10+
rnd *rand.Rand
11+
endpointRegistry *routing.EndpointRegistry
12+
}
13+
14+
func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint {
15+
if h == nil {
16+
return endpoints
17+
}
18+
19+
p := h.rnd.Float64()
20+
21+
filtered := make([]routing.LBEndpoint, 0, len(endpoints))
22+
for _, e := range endpoints {
23+
if p < e.Metrics.HealthCheckDropProbability() {
24+
/* drop */
25+
} else {
26+
filtered = append(filtered, e)
27+
}
28+
}
29+
30+
if len(filtered) == 0 {
31+
return endpoints
32+
}
33+
return filtered
34+
}

0 commit comments

Comments
 (0)