Skip to content

Commit e81b569

Browse files
authored
Add and use connectivity package for states (grpc#1430)
* Add and use connectivity package * Mark cc state APIs as experimental
1 parent 73041be commit e81b569

File tree

4 files changed

+122
-79
lines changed

4 files changed

+122
-79
lines changed

clientconn.go

Lines changed: 41 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ package grpc
2020

2121
import (
2222
"errors"
23-
"fmt"
2423
"net"
2524
"strings"
2625
"sync"
2726
"time"
2827

2928
"golang.org/x/net/context"
3029
"golang.org/x/net/trace"
30+
"google.golang.org/grpc/connectivity"
3131
"google.golang.org/grpc/credentials"
3232
"google.golang.org/grpc/grpclog"
3333
"google.golang.org/grpc/keepalive"
@@ -445,39 +445,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
445445
return cc, nil
446446
}
447447

448-
// ConnectivityState indicates the state of a client connection.
449-
type ConnectivityState int
450-
451-
const (
452-
// Idle indicates the ClientConn is idle.
453-
Idle ConnectivityState = iota
454-
// Connecting indicates the ClienConn is connecting.
455-
Connecting
456-
// Ready indicates the ClientConn is ready for work.
457-
Ready
458-
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
459-
TransientFailure
460-
// Shutdown indicates the ClientConn has started shutting down.
461-
Shutdown
462-
)
463-
464-
func (s ConnectivityState) String() string {
465-
switch s {
466-
case Idle:
467-
return "IDLE"
468-
case Connecting:
469-
return "CONNECTING"
470-
case Ready:
471-
return "READY"
472-
case TransientFailure:
473-
return "TRANSIENT_FAILURE"
474-
case Shutdown:
475-
return "SHUTDOWN"
476-
default:
477-
panic(fmt.Sprintf("unknown connectivity state: %d", s))
478-
}
479-
}
480-
481448
// connectivityStateEvaluator gets updated by addrConns when their
482449
// states transition, based on which it evaluates the state of
483450
// ClientConn.
@@ -492,55 +459,55 @@ type connectivityStateEvaluator struct {
492459

493460
// recordTransition records state change happening in every addrConn and based on
494461
// that it evaluates what state the ClientConn is in.
495-
// It can only transition between Ready, Connecting and TransientFailure. Other states,
496-
// Idle and Shutdown are transitioned into by ClientConn; in the begining of the connection
462+
// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
463+
// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
497464
// before any addrConn is created ClientConn is in idle state. In the end when ClientConn
498-
// closes it is in Shutdown state.
465+
// closes it is in connectivity.Shutdown state.
499466
// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
500-
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState ConnectivityState) {
467+
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
501468
cse.mu.Lock()
502469
defer cse.mu.Unlock()
503470

504471
// Update counters.
505-
for idx, state := range []ConnectivityState{oldState, newState} {
472+
for idx, state := range []connectivity.State{oldState, newState} {
506473
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
507474
switch state {
508-
case Ready:
475+
case connectivity.Ready:
509476
cse.numReady += updateVal
510-
case Connecting:
477+
case connectivity.Connecting:
511478
cse.numConnecting += updateVal
512-
case TransientFailure:
479+
case connectivity.TransientFailure:
513480
cse.numTransientFailure += updateVal
514481
}
515482
}
516483

517484
// Evaluate.
518485
if cse.numReady > 0 {
519-
cse.csMgr.updateState(Ready)
486+
cse.csMgr.updateState(connectivity.Ready)
520487
return
521488
}
522489
if cse.numConnecting > 0 {
523-
cse.csMgr.updateState(Connecting)
490+
cse.csMgr.updateState(connectivity.Connecting)
524491
return
525492
}
526-
cse.csMgr.updateState(TransientFailure)
493+
cse.csMgr.updateState(connectivity.TransientFailure)
527494
}
528495

529-
// connectivityStateManager keeps the ConnectivityState of ClientConn.
496+
// connectivityStateManager keeps the connectivity.State of ClientConn.
530497
// This struct will eventually be exported so the balancers can access it.
531498
type connectivityStateManager struct {
532499
mu sync.Mutex
533-
state ConnectivityState
500+
state connectivity.State
534501
notifyChan chan struct{}
535502
}
536503

537-
// updateState updates the ConnectivityState of ClientConn.
504+
// updateState updates the connectivity.State of ClientConn.
538505
// If there's a change it notifies goroutines waiting on state change to
539506
// happen.
540-
func (csm *connectivityStateManager) updateState(state ConnectivityState) {
507+
func (csm *connectivityStateManager) updateState(state connectivity.State) {
541508
csm.mu.Lock()
542509
defer csm.mu.Unlock()
543-
if csm.state == Shutdown {
510+
if csm.state == connectivity.Shutdown {
544511
return
545512
}
546513
if csm.state == state {
@@ -554,7 +521,7 @@ func (csm *connectivityStateManager) updateState(state ConnectivityState) {
554521
}
555522
}
556523

557-
func (csm *connectivityStateManager) getState() ConnectivityState {
524+
func (csm *connectivityStateManager) getState() connectivity.State {
558525
csm.mu.Lock()
559526
defer csm.mu.Unlock()
560527
return csm.state
@@ -587,9 +554,10 @@ type ClientConn struct {
587554
mkp keepalive.ClientParameters
588555
}
589556

590-
// WaitForStateChange waits until the ConnectivityState of ClientConn changes from sourceState or
557+
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
591558
// ctx expires. A true value is returned in former case and false in latter.
592-
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) bool {
559+
// This is an EXPERIMENTAL API.
560+
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
593561
ch := cc.csMgr.getNotifyChan()
594562
if cc.csMgr.getState() != sourceState {
595563
return true
@@ -602,8 +570,9 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState Connec
602570
}
603571
}
604572

605-
// GetState returns the ConnectivityState of ClientConn.
606-
func (cc *ClientConn) GetState() ConnectivityState {
573+
// GetState returns the connectivity.State of ClientConn.
574+
// This is an EXPERIMENTAL API.
575+
func (cc *ClientConn) GetState() connectivity.State {
607576
return cc.csMgr.getState()
608577
}
609578

@@ -855,7 +824,7 @@ func (cc *ClientConn) Close() error {
855824
}
856825
conns := cc.conns
857826
cc.conns = nil
858-
cc.csMgr.updateState(Shutdown)
827+
cc.csMgr.updateState(connectivity.Shutdown)
859828
cc.mu.Unlock()
860829
if cc.dopts.balancer != nil {
861830
cc.dopts.balancer.Close()
@@ -879,7 +848,7 @@ type addrConn struct {
879848
csEvltr *connectivityStateEvaluator
880849

881850
mu sync.Mutex
882-
state ConnectivityState
851+
state connectivity.State
883852
down func(error) // the handler called when a connection is down.
884853
// ready is closed and becomes nil when a new transport is up or failed
885854
// due to timeout.
@@ -926,7 +895,7 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
926895
// - otherwise, it will be closed.
927896
func (ac *addrConn) resetTransport(drain bool) error {
928897
ac.mu.Lock()
929-
if ac.state == Shutdown {
898+
if ac.state == connectivity.Shutdown {
930899
ac.mu.Unlock()
931900
return errConnClosing
932901
}
@@ -936,7 +905,7 @@ func (ac *addrConn) resetTransport(drain bool) error {
936905
ac.down = nil
937906
}
938907
oldState := ac.state
939-
ac.state = Connecting
908+
ac.state = connectivity.Connecting
940909
ac.csEvltr.recordTransition(oldState, ac.state)
941910
t := ac.transport
942911
ac.transport = nil
@@ -949,7 +918,7 @@ func (ac *addrConn) resetTransport(drain bool) error {
949918
ac.cc.mu.RUnlock()
950919
for retries := 0; ; retries++ {
951920
ac.mu.Lock()
952-
if ac.state == Shutdown {
921+
if ac.state == connectivity.Shutdown {
953922
// ac.tearDown(...) has been invoked.
954923
ac.mu.Unlock()
955924
return errConnClosing
@@ -977,14 +946,14 @@ func (ac *addrConn) resetTransport(drain bool) error {
977946
}
978947
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
979948
ac.mu.Lock()
980-
if ac.state == Shutdown {
949+
if ac.state == connectivity.Shutdown {
981950
// ac.tearDown(...) has been invoked.
982951
ac.mu.Unlock()
983952
return errConnClosing
984953
}
985954
ac.errorf("transient failure: %v", err)
986955
oldState = ac.state
987-
ac.state = TransientFailure
956+
ac.state = connectivity.TransientFailure
988957
ac.csEvltr.recordTransition(oldState, ac.state)
989958
if ac.ready != nil {
990959
close(ac.ready)
@@ -1003,14 +972,14 @@ func (ac *addrConn) resetTransport(drain bool) error {
1003972
}
1004973
ac.mu.Lock()
1005974
ac.printf("ready")
1006-
if ac.state == Shutdown {
975+
if ac.state == connectivity.Shutdown {
1007976
// ac.tearDown(...) has been invoked.
1008977
ac.mu.Unlock()
1009978
newTransport.Close()
1010979
return errConnClosing
1011980
}
1012981
oldState = ac.state
1013-
ac.state = Ready
982+
ac.state = connectivity.Ready
1014983
ac.csEvltr.recordTransition(oldState, ac.state)
1015984
ac.transport = newTransport
1016985
if ac.ready != nil {
@@ -1081,13 +1050,13 @@ func (ac *addrConn) transportMonitor() {
10811050
default:
10821051
}
10831052
ac.mu.Lock()
1084-
if ac.state == Shutdown {
1053+
if ac.state == connectivity.Shutdown {
10851054
// ac has been shutdown.
10861055
ac.mu.Unlock()
10871056
return
10881057
}
10891058
oldState := ac.state
1090-
ac.state = TransientFailure
1059+
ac.state = connectivity.TransientFailure
10911060
ac.csEvltr.recordTransition(oldState, ac.state)
10921061
ac.mu.Unlock()
10931062
if err := ac.resetTransport(false); err != nil {
@@ -1107,12 +1076,12 @@ func (ac *addrConn) transportMonitor() {
11071076
}
11081077

11091078
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
1110-
// iv) transport is in TransientFailure and there is a balancer/failfast is true.
1079+
// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
11111080
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
11121081
for {
11131082
ac.mu.Lock()
11141083
switch {
1115-
case ac.state == Shutdown:
1084+
case ac.state == connectivity.Shutdown:
11161085
if failfast || !hasBalancer {
11171086
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
11181087
err := ac.tearDownErr
@@ -1121,11 +1090,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
11211090
}
11221091
ac.mu.Unlock()
11231092
return nil, errConnClosing
1124-
case ac.state == Ready:
1093+
case ac.state == connectivity.Ready:
11251094
ct := ac.transport
11261095
ac.mu.Unlock()
11271096
return ct, nil
1128-
case ac.state == TransientFailure:
1097+
case ac.state == connectivity.TransientFailure:
11291098
if failfast || hasBalancer {
11301099
ac.mu.Unlock()
11311100
return nil, errConnUnavailable
@@ -1167,11 +1136,11 @@ func (ac *addrConn) tearDown(err error) {
11671136
// address removal and GoAway.
11681137
ac.transport.GracefulClose()
11691138
}
1170-
if ac.state == Shutdown {
1139+
if ac.state == connectivity.Shutdown {
11711140
return
11721141
}
11731142
oldState := ac.state
1174-
ac.state = Shutdown
1143+
ac.state = connectivity.Shutdown
11751144
ac.tearDownErr = err
11761145
ac.csEvltr.recordTransition(oldState, ac.state)
11771146
if ac.events != nil {

clientconn_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,17 @@ import (
2626

2727
"golang.org/x/net/context"
2828

29+
"google.golang.org/grpc/connectivity"
2930
"google.golang.org/grpc/credentials"
3031
"google.golang.org/grpc/keepalive"
3132
"google.golang.org/grpc/naming"
3233
"google.golang.org/grpc/testdata"
3334
)
3435

35-
func assertState(wantState ConnectivityState, cc *ClientConn) (ConnectivityState, bool) {
36+
func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.State, bool) {
3637
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
3738
defer cancel()
38-
var state ConnectivityState
39+
var state connectivity.State
3940
for state = cc.GetState(); state != wantState && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
4041
}
4142
return state, state == wantState
@@ -54,7 +55,7 @@ func TestConnectivityStates(t *testing.T) {
5455
t.Fatalf("Dial(\"foo.bar.com\", WithBalancer(_)) = _, %v, want _ <nil>", err)
5556
}
5657
defer cc.Close()
57-
wantState := Ready
58+
wantState := connectivity.Ready
5859
if state, ok := assertState(wantState, cc); !ok {
5960
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
6061
}
@@ -66,7 +67,7 @@ func TestConnectivityStates(t *testing.T) {
6667
},
6768
}
6869
resolver.w.inject(update)
69-
wantState = TransientFailure
70+
wantState = connectivity.TransientFailure
7071
if state, ok := assertState(wantState, cc); !ok {
7172
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
7273
}
@@ -75,7 +76,7 @@ func TestConnectivityStates(t *testing.T) {
7576
Addr: "localhost:" + servers[1].port,
7677
}
7778
resolver.w.inject(update)
78-
wantState = Ready
79+
wantState = connectivity.Ready
7980
if state, ok := assertState(wantState, cc); !ok {
8081
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
8182
}

0 commit comments

Comments
 (0)