Skip to content

Commit 6bd37d5

Browse files
RomanZavodskikhRoman Zavodskikh
andauthored
Decouple fadeIn from loadbalancer (#2634)
Signed-off-by: Roman Zavodskikh <[email protected]> Co-authored-by: Roman Zavodskikh <[email protected]>
1 parent d3f0322 commit 6bd37d5

File tree

9 files changed

+531
-609
lines changed

9 files changed

+531
-609
lines changed

loadbalancer/algorithm.go

Lines changed: 30 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@ package loadbalancer
33
import (
44
"errors"
55
"fmt"
6-
"math"
76
"math/rand"
87
"sort"
98
"sync"
109
"sync/atomic"
11-
"time"
1210

1311
"github.com/cespare/xxhash/v2"
1412
log "github.com/sirupsen/logrus"
@@ -53,106 +51,14 @@ var (
5351
defaultAlgorithm = newRoundRobin
5452
)
5553

56-
func fadeInState(now time.Time, duration time.Duration, detected time.Time) (time.Duration, bool) {
57-
rel := now.Sub(detected)
58-
return rel, rel > 0 && rel < duration
59-
}
60-
61-
func fadeIn(now time.Time, duration time.Duration, exponent float64, detected time.Time) float64 {
62-
rel, fadingIn := fadeInState(now, duration, detected)
63-
if !fadingIn {
64-
return 1
65-
}
66-
67-
return math.Pow(float64(rel)/float64(duration), exponent)
68-
}
69-
70-
func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, now time.Time) routing.LBEndpoint {
71-
var sum float64
72-
rt := ctx.Route
73-
ep := ctx.LBEndpoints
74-
for _, epi := range ep {
75-
wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Metrics.DetectedTime())
76-
sum += wi
77-
}
78-
79-
choice := ep[len(ep)-1]
80-
r := rnd.Float64() * sum
81-
var upto float64
82-
for i, epi := range ep {
83-
upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Metrics.DetectedTime())
84-
if upto > r {
85-
choice = ep[i]
86-
break
87-
}
88-
}
89-
90-
return choice
91-
}
92-
93-
func shiftToRemaining(rnd *rand.Rand, ctx *routing.LBContext, wi []int, now time.Time) routing.LBEndpoint {
94-
notFadingIndexes := wi
95-
ep := ctx.LBEndpoints
96-
97-
// if all endpoints are fading, the simplest approach is to use the oldest,
98-
// this departs from the desired curve, but guarantees monotonic fade-in. From
99-
// the perspective of the oldest endpoint, this is temporarily the same as if
100-
// there was no fade-in.
101-
if len(notFadingIndexes) == 0 {
102-
return shiftWeighted(rnd, ctx, now)
103-
}
104-
105-
// otherwise equally distribute between the old endpoints
106-
return ep[notFadingIndexes[rnd.Intn(len(notFadingIndexes))]]
107-
}
108-
109-
func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, choice int, algo routing.LBAlgorithm) routing.LBEndpoint {
110-
ep := ctx.LBEndpoints
111-
now := time.Now()
112-
f := fadeIn(
113-
now,
114-
ctx.Route.LBFadeInDuration,
115-
ctx.Route.LBFadeInExponent,
116-
ctx.LBEndpoints[choice].Metrics.DetectedTime(),
117-
)
118-
119-
if rnd.Float64() < f {
120-
return ep[choice]
121-
}
122-
notFadingIndexes := make([]int, 0, len(ep))
123-
for i := 0; i < len(ep); i++ {
124-
if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Metrics.DetectedTime()); !fadingIn {
125-
notFadingIndexes = append(notFadingIndexes, i)
126-
}
127-
}
128-
129-
switch a := algo.(type) {
130-
case *roundRobin:
131-
return shiftToRemaining(a.rnd, ctx, notFadingIndexes, now)
132-
case *random:
133-
return shiftToRemaining(a.rnd, ctx, notFadingIndexes, now)
134-
case *consistentHash:
135-
// If all endpoints are fading, normal consistent hash result
136-
if len(notFadingIndexes) == 0 {
137-
return ep[choice]
138-
}
139-
// otherwise calculate consistent hash again using endpoints which are not fading
140-
return ep[a.chooseConsistentHashEndpoint(ctx, skipFadingEndpoints(notFadingIndexes))]
141-
default:
142-
return ep[choice]
143-
}
144-
}
145-
14654
type roundRobin struct {
14755
index int64
148-
rnd *rand.Rand
14956
}
15057

15158
func newRoundRobin(endpoints []string) routing.LBAlgorithm {
152-
rnd := rand.New(newLockedSource()) // #nosec
59+
rnd := rand.New(NewLockedSource()) // #nosec
15360
return &roundRobin{
15461
index: int64(rnd.Intn(len(endpoints))),
155-
rnd: rnd,
15662
}
15763
}
15864

@@ -162,13 +68,8 @@ func (r *roundRobin) Apply(ctx *routing.LBContext) routing.LBEndpoint {
16268
return ctx.LBEndpoints[0]
16369
}
16470

165-
index := int(atomic.AddInt64(&r.index, 1) % int64(len(ctx.LBEndpoints)))
166-
167-
if ctx.Route.LBFadeInDuration <= 0 {
168-
return ctx.LBEndpoints[index]
169-
}
170-
171-
return withFadeIn(r.rnd, ctx, index, r)
71+
choice := int(atomic.AddInt64(&r.index, 1) % int64(len(ctx.LBEndpoints)))
72+
return ctx.LBEndpoints[choice]
17273
}
17374

17475
type random struct {
@@ -178,7 +79,7 @@ type random struct {
17879
func newRandom(endpoints []string) routing.LBAlgorithm {
17980
// #nosec
18081
return &random{
181-
rnd: rand.New(newLockedSource()),
82+
rnd: rand.New(NewLockedSource()),
18283
}
18384
}
18485

@@ -188,12 +89,8 @@ func (r *random) Apply(ctx *routing.LBContext) routing.LBEndpoint {
18889
return ctx.LBEndpoints[0]
18990
}
19091

191-
i := r.rnd.Intn(len(ctx.LBEndpoints))
192-
if ctx.Route.LBFadeInDuration <= 0 {
193-
return ctx.LBEndpoints[i]
194-
}
195-
196-
return withFadeIn(r.rnd, ctx, i, r)
92+
choice := r.rnd.Intn(len(ctx.LBEndpoints))
93+
return ctx.LBEndpoints[choice]
19794
}
19895

19996
type (
@@ -203,7 +100,6 @@ type (
203100
}
204101
consistentHash struct {
205102
hashRing []endpointHash // list of endpoints sorted by hash value
206-
rnd *rand.Rand
207103
}
208104
)
209105

@@ -214,10 +110,8 @@ func (ch *consistentHash) Swap(i, j int) {
214110
}
215111

216112
func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routing.LBAlgorithm {
217-
rnd := rand.New(newLockedSource()) // #nosec
218113
ch := &consistentHash{
219114
hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)),
220-
rnd: rnd,
221115
}
222116
for i, ep := range endpoints {
223117
endpointStartIndex := hashesPerEndpoint * i
@@ -237,22 +131,32 @@ func hash(s string) uint64 {
237131
return xxhash.Sum64String(s)
238132
}
239133

134+
func skipEndpoint(c *routing.LBContext, index int) bool {
135+
host := c.Route.LBEndpoints[index].Host
136+
for i := range c.LBEndpoints {
137+
if c.LBEndpoints[i].Host == host {
138+
return false
139+
}
140+
}
141+
return true
142+
}
143+
240144
// Returns index in hash ring with the closest hash to key's hash
241-
func (ch *consistentHash) searchRing(key string, skipEndpoint func(int) bool) int {
145+
func (ch *consistentHash) searchRing(key string, ctx *routing.LBContext) int {
242146
h := hash(key)
243147
i := sort.Search(ch.Len(), func(i int) bool { return ch.hashRing[i].hash >= h })
244148
if i == ch.Len() { // rollover
245149
i = 0
246150
}
247-
for skipEndpoint(ch.hashRing[i].index) {
151+
for skipEndpoint(ctx, ch.hashRing[i].index) {
248152
i = (i + 1) % ch.Len()
249153
}
250154
return i
251155
}
252156

253157
// Returns index of endpoint with closest hash to key's hash
254-
func (ch *consistentHash) search(key string, skipEndpoint func(int) bool) int {
255-
ringIndex := ch.searchRing(key, skipEndpoint)
158+
func (ch *consistentHash) search(key string, ctx *routing.LBContext) int {
159+
ringIndex := ch.searchRing(key, ctx)
256160
return ch.hashRing[ringIndex].index
257161
}
258162

@@ -266,15 +170,15 @@ func computeLoadAverage(ctx *routing.LBContext) float64 {
266170
}
267171

268172
// Returns index of endpoint with closest hash to key's hash, which is also below the target load
269-
// skipEndpoint function is used to skip endpoints we don't want, such as fading endpoints
270-
func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, ctx *routing.LBContext, skipEndpoint func(int) bool) int {
271-
ringIndex := ch.searchRing(key, skipEndpoint)
173+
// skipEndpoint function is used to skip endpoints we don't want, for example, fading endpoints
174+
func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, ctx *routing.LBContext) int {
175+
ringIndex := ch.searchRing(key, ctx)
272176
averageLoad := computeLoadAverage(ctx)
273177
targetLoad := averageLoad * balanceFactor
274178
// Loop round ring, starting at endpoint with closest hash. Stop when we find one whose load is less than targetLoad.
275179
for i := 0; i < ch.Len(); i++ {
276180
endpointIndex := ch.hashRing[ringIndex].index
277-
if skipEndpoint(endpointIndex) {
181+
if skipEndpoint(ctx, endpointIndex) {
278182
continue
279183
}
280184
load := ctx.LBEndpoints[endpointIndex].Metrics.InflightRequests()
@@ -295,46 +199,26 @@ func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint {
295199
return ctx.LBEndpoints[0]
296200
}
297201

298-
choice := ch.chooseConsistentHashEndpoint(ctx, noSkippedEndpoints)
299-
300-
if ctx.Route.LBFadeInDuration <= 0 {
301-
return ctx.LBEndpoints[choice]
302-
}
303-
304-
return withFadeIn(ch.rnd, ctx, choice, ch)
202+
choice := ch.chooseConsistentHashEndpoint(ctx)
203+
return ctx.LBEndpoints[choice]
305204
}
306205

307-
func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, skipEndpoint func(int) bool) int {
206+
func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext) int {
308207
key, ok := ctx.Params[ConsistentHashKey].(string)
309208
if !ok {
310209
key = snet.RemoteHost(ctx.Request).String()
311210
}
312211
balanceFactor, ok := ctx.Params[ConsistentHashBalanceFactor].(float64)
313212
var choice int
314213
if !ok {
315-
choice = ch.search(key, skipEndpoint)
214+
choice = ch.search(key, ctx)
316215
} else {
317-
choice = ch.boundedLoadSearch(key, balanceFactor, ctx, skipEndpoint)
216+
choice = ch.boundedLoadSearch(key, balanceFactor, ctx)
318217
}
319218

320219
return choice
321220
}
322221

323-
func skipFadingEndpoints(notFadingEndpoints []int) func(int) bool {
324-
return func(i int) bool {
325-
for _, notFadingEndpoint := range notFadingEndpoints {
326-
if i == notFadingEndpoint {
327-
return false
328-
}
329-
}
330-
return true
331-
}
332-
}
333-
334-
func noSkippedEndpoints(_ int) bool {
335-
return false
336-
}
337-
338222
type powerOfRandomNChoices struct {
339223
mu sync.Mutex
340224
rnd *rand.Rand
@@ -343,7 +227,7 @@ type powerOfRandomNChoices struct {
343227

344228
// newPowerOfRandomNChoices selects N random backends and picks the one with less outstanding requests.
345229
func newPowerOfRandomNChoices([]string) routing.LBAlgorithm {
346-
rnd := rand.New(newLockedSource()) // #nosec
230+
rnd := rand.New(NewLockedSource()) // #nosec
347231
return &powerOfRandomNChoices{
348232
rnd: rnd,
349233
numberOfChoices: powerOfRandomNChoicesDefaultN,

loadbalancer/algorithm_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,21 @@ func TestApply(t *testing.T) {
291291

292292
func TestConsistentHashSearch(t *testing.T) {
293293
apply := func(key string, endpoints []string) string {
294+
p := NewAlgorithmProvider()
295+
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
296+
r := &routing.Route{
297+
Route: eskip.Route{
298+
BackendType: eskip.LBBackend,
299+
LBAlgorithm: ConsistentHash.String(),
300+
LBEndpoints: endpoints,
301+
},
302+
}
303+
p.Do([]*routing.Route{r})
304+
endpointRegistry.Do([]*routing.Route{r})
305+
294306
ch := newConsistentHash(endpoints).(*consistentHash)
295-
return endpoints[ch.search(key, noSkippedEndpoints)]
307+
ctx := &routing.LBContext{Route: r, LBEndpoints: r.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}, Registry: endpointRegistry}
308+
return endpoints[ch.search(key, ctx)]
296309
}
297310

298311
endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}

0 commit comments

Comments
 (0)