Skip to content

Commit 12a17a1

Browse files
authored
router, backend: check target health before redirection (#412)
1 parent f22f82b commit 12a17a1

File tree

7 files changed

+235
-73
lines changed

7 files changed

+235
-73
lines changed

pkg/manager/router/backend_observer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ import (
2121

2222
type BackendStatus int
2323

24-
func (bs *BackendStatus) ToScore() int {
25-
return statusScores[*bs]
24+
func (bs BackendStatus) ToScore() int {
25+
return statusScores[bs]
2626
}
2727

28-
func (bs *BackendStatus) String() string {
29-
status, ok := statusNames[*bs]
28+
func (bs BackendStatus) String() string {
29+
status, ok := statusNames[bs]
3030
if !ok {
3131
return "unknown"
3232
}

pkg/manager/router/router.go

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package router
55

66
import (
7+
"sync"
78
"time"
89

910
glist "github.com/bahlo/generic-list-go"
@@ -63,14 +64,23 @@ type RedirectableConn interface {
6364
SetValue(key, val any)
6465
Value(key any) any
6566
// Redirect returns false if the current conn is not redirectable.
66-
Redirect(addr string) bool
67+
Redirect(backend BackendInst) bool
6768
NotifyBackendStatus(status BackendStatus)
6869
ConnectionID() uint64
6970
}
7071

72+
// BackendInst defines a backend that a connection is redirecting to.
73+
type BackendInst interface {
74+
Addr() string
75+
Healthy() bool
76+
}
77+
7178
// backendWrapper contains the connections on the backend.
7279
type backendWrapper struct {
73-
*backendHealth
80+
mu struct {
81+
sync.RWMutex
82+
backendHealth
83+
}
7484
addr string
7585
// connScore is used for calculating backend scores and check if the backend can be removed from the list.
7686
// connScore = connList.Len() + incoming connections - outgoing connections.
@@ -80,9 +90,50 @@ type backendWrapper struct {
8090
connList *glist.List[*connWrapper]
8191
}
8292

93+
func (b *backendWrapper) setHealth(health backendHealth) {
94+
b.mu.Lock()
95+
b.mu.backendHealth = health
96+
b.mu.Unlock()
97+
}
98+
8399
// score calculates the score of the backend. Larger score indicates higher load.
84100
func (b *backendWrapper) score() int {
85-
return b.status.ToScore() + b.connScore
101+
b.mu.RLock()
102+
score := b.mu.status.ToScore() + b.connScore
103+
b.mu.RUnlock()
104+
return score
105+
}
106+
107+
func (b *backendWrapper) Addr() string {
108+
return b.addr
109+
}
110+
111+
func (b *backendWrapper) Status() BackendStatus {
112+
b.mu.RLock()
113+
status := b.mu.status
114+
b.mu.RUnlock()
115+
return status
116+
}
117+
118+
func (b *backendWrapper) Healthy() bool {
119+
b.mu.RLock()
120+
healthy := b.mu.status == StatusHealthy
121+
b.mu.RUnlock()
122+
return healthy
123+
}
124+
125+
func (b *backendWrapper) ServerVersion() string {
126+
b.mu.RLock()
127+
version := b.mu.serverVersion
128+
b.mu.RUnlock()
129+
return version
130+
}
131+
132+
func (b *backendWrapper) String() string {
133+
b.mu.RLock()
134+
str := b.mu.String()
135+
b.mu.RUnlock()
136+
return str
86137
}
87138

88139
// connWrapper wraps RedirectableConn.

pkg/manager/router/router_score.go

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) (string, error) {
8484
for be := router.backends.Back(); be != nil; be = be.Prev() {
8585
backend := be.Value
8686
// These backends may be recycled, so we should not connect to them again.
87-
switch backend.status {
87+
switch backend.Status() {
8888
case StatusCannotConnect, StatusSchemaOutdated:
8989
continue
9090
}
@@ -97,7 +97,7 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) (string, error) {
9797
}
9898
if !found {
9999
backend.connScore++
100-
router.adjustBackendList(be)
100+
router.adjustBackendList(be, false)
101101
return backend.addr, nil
102102
}
103103
}
@@ -123,29 +123,29 @@ func (router *ScoreBasedRouter) onCreateConn(addr string, conn RedirectableConn,
123123
conn.SetEventReceiver(router)
124124
} else {
125125
backend.connScore--
126-
router.adjustBackendList(be)
126+
router.adjustBackendList(be, true)
127127
}
128128
}
129129

130130
func (router *ScoreBasedRouter) removeConn(be *glist.Element[*backendWrapper], ce *glist.Element[*connWrapper]) {
131131
backend := be.Value
132132
backend.connList.Remove(ce)
133133
setBackendConnMetrics(backend.addr, backend.connList.Len())
134-
router.adjustBackendList(be)
134+
router.adjustBackendList(be, true)
135135
}
136136

137137
func (router *ScoreBasedRouter) addConn(be *glist.Element[*backendWrapper], conn *connWrapper) {
138138
backend := be.Value
139139
ce := backend.connList.PushBack(conn)
140140
setBackendConnMetrics(backend.addr, backend.connList.Len())
141141
router.setConnWrapper(conn, ce)
142-
conn.NotifyBackendStatus(backend.status)
143-
router.adjustBackendList(be)
142+
conn.NotifyBackendStatus(backend.Status())
143+
router.adjustBackendList(be, false)
144144
}
145145

146146
// adjustBackendList moves `be` after the score of `be` changes to keep the list ordered.
147-
func (router *ScoreBasedRouter) adjustBackendList(be *glist.Element[*backendWrapper]) {
148-
if router.removeBackendIfEmpty(be) {
147+
func (router *ScoreBasedRouter) adjustBackendList(be *glist.Element[*backendWrapper], removeEmpty bool) {
148+
if removeEmpty && router.removeBackendIfEmpty(be) {
149149
return
150150
}
151151

@@ -193,7 +193,7 @@ func (router *ScoreBasedRouter) RedirectConnections() error {
193193
if connWrapper.phase != phaseRedirectNotify {
194194
connWrapper.phase = phaseRedirectNotify
195195
// we dont care the results
196-
_ = connWrapper.Redirect(backend.addr)
196+
_ = connWrapper.Redirect(backend)
197197
}
198198
}
199199
}
@@ -225,14 +225,15 @@ func (router *ScoreBasedRouter) ensureBackend(addr string, forward bool) *glist.
225225
if be == nil {
226226
// The backend should always exist if it will be needed. Add a warning and add it back.
227227
router.logger.Warn("backend is not found in the router", zap.String("backend_addr", addr), zap.Stack("stack"))
228-
be = router.backends.PushFront(&backendWrapper{
229-
backendHealth: &backendHealth{
230-
status: StatusCannotConnect,
231-
},
228+
backend := &backendWrapper{
232229
addr: addr,
233230
connList: glist.New[*connWrapper](),
231+
}
232+
backend.setHealth(backendHealth{
233+
status: StatusCannotConnect,
234234
})
235-
router.adjustBackendList(be)
235+
be = router.backends.PushFront(backend)
236+
router.adjustBackendList(be, false)
236237
}
237238
return be
238239
}
@@ -261,9 +262,9 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec
261262
connWrapper.phase = phaseRedirectEnd
262263
} else {
263264
fromBe.Value.connScore++
264-
router.adjustBackendList(fromBe)
265+
router.adjustBackendList(fromBe, false)
265266
toBe.Value.connScore--
266-
router.adjustBackendList(toBe)
267+
router.adjustBackendList(toBe, true)
267268
connWrapper.phase = phaseRedirectFail
268269
}
269270
addMigrateMetrics(from, to, succeed, connWrapper.lastRedirect)
@@ -291,18 +292,19 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]*backendHea
291292
if be == nil && health.status != StatusCannotConnect {
292293
router.logger.Info("update backend", zap.String("backend_addr", addr),
293294
zap.String("prev", "none"), zap.String("cur", health.String()))
294-
be = router.backends.PushBack(&backendWrapper{
295-
backendHealth: health,
296-
addr: addr,
297-
connList: glist.New[*connWrapper](),
298-
})
299-
router.adjustBackendList(be)
295+
backend := &backendWrapper{
296+
addr: addr,
297+
connList: glist.New[*connWrapper](),
298+
}
299+
backend.setHealth(*health)
300+
be = router.backends.PushBack(backend)
301+
router.adjustBackendList(be, false)
300302
} else if be != nil {
301303
backend := be.Value
302304
router.logger.Info("update backend", zap.String("backend_addr", addr),
303-
zap.String("prev", backend.String()), zap.String("cur", health.String()))
304-
backend.backendHealth = health
305-
router.adjustBackendList(be)
305+
zap.String("prev", backend.mu.String()), zap.String("cur", health.String()))
306+
backend.setHealth(*health)
307+
router.adjustBackendList(be, true)
306308
for ele := backend.connList.Front(); ele != nil; ele = ele.Next() {
307309
conn := ele.Value
308310
conn.NotifyBackendStatus(health.status)
@@ -371,20 +373,20 @@ func (router *ScoreBasedRouter) rebalance(maxNum int) {
371373
zap.String("from", busiestBackend.addr), zap.String("to", idlestBackend.addr),
372374
zap.Int("from_score", busiestBackend.score()), zap.Int("to_score", idlestBackend.score()))
373375
busiestBackend.connScore--
374-
router.adjustBackendList(busiestEle)
376+
router.adjustBackendList(busiestEle, true)
375377
idlestBackend.connScore++
376-
router.adjustBackendList(idlestEle)
378+
router.adjustBackendList(idlestEle, false)
377379
conn.phase = phaseRedirectNotify
378380
conn.lastRedirect = curTime
379-
conn.Redirect(idlestBackend.addr)
381+
conn.Redirect(idlestBackend)
380382
}
381383
}
382384

383385
func (router *ScoreBasedRouter) removeBackendIfEmpty(be *glist.Element[*backendWrapper]) bool {
384386
backend := be.Value
385387
// If connList.Len() == 0, there won't be any outgoing connections.
386388
// And if also connScore == 0, there won't be any incoming connections.
387-
if backend.status == StatusCannotConnect && backend.connList.Len() == 0 && backend.connScore <= 0 {
389+
if backend.Status() == StatusCannotConnect && backend.connList.Len() == 0 && backend.connScore <= 0 {
388390
router.backends.Remove(be)
389391
return true
390392
}
@@ -406,9 +408,12 @@ func (router *ScoreBasedRouter) ConnCount() int {
406408
func (router *ScoreBasedRouter) updateServerVersion() {
407409
for be := router.backends.Front(); be != nil; be = be.Next() {
408410
backend := be.Value
409-
if backend.backendHealth.status != StatusCannotConnect && len(backend.serverVersion) > 0 {
410-
router.serverVersion = backend.serverVersion
411-
return
411+
if backend.Status() != StatusCannotConnect {
412+
serverVersion := backend.ServerVersion()
413+
if len(serverVersion) > 0 {
414+
router.serverVersion = serverVersion
415+
return
416+
}
412417
}
413418
}
414419
}

0 commit comments

Comments
 (0)