@@ -61,23 +61,37 @@ func (c *Client) reconnectUntilClosed(ctx context.Context) error {
61
61
// Note that we currently have no timeout on connection, so this is
62
62
// potentially eternal.
63
63
b := tdsync .SyncBackoff (backoff .WithContext (c .connBackoff (), ctx ))
64
-
65
- return backoff .RetryNotify (func () error {
66
- if err := c .runUntilRestart (ctx ); err != nil {
67
- if c .isPermanentError (err ) {
68
- return backoff .Permanent (err )
64
+ g := tdsync .NewCancellableGroup (ctx )
65
+ g .Go (func (ctx context.Context ) error {
66
+ for {
67
+ select {
68
+ case <- ctx .Done ():
69
+ return ctx .Err ()
70
+ case <- c .ready .Ready ():
71
+ // Reset backoff on successful connection.
72
+ b .Reset ()
69
73
}
70
- return err
71
74
}
75
+ })
76
+ g .Go (func (ctx context.Context ) error {
77
+ return backoff .RetryNotify (func () error {
78
+ if err := c .runUntilRestart (ctx ); err != nil {
79
+ if c .isPermanentError (err ) {
80
+ return backoff .Permanent (err )
81
+ }
82
+ return err
83
+ }
72
84
73
- return nil
74
- }, b , func (err error , timeout time.Duration ) {
75
- c .log .Info ("Restarting connection" , zap .Error (err ), zap .Duration ("backoff" , timeout ))
85
+ return nil
86
+ }, b , func (err error , timeout time.Duration ) {
87
+ c .log .Info ("Restarting connection" , zap .Error (err ), zap .Duration ("backoff" , timeout ))
76
88
77
- c .connMux .Lock ()
78
- c .conn = c .createPrimaryConn (nil )
79
- c .connMux .Unlock ()
89
+ c .connMux .Lock ()
90
+ c .conn = c .createPrimaryConn (nil )
91
+ c .connMux .Unlock ()
92
+ })
80
93
})
94
+ return g .Wait ()
81
95
}
82
96
83
97
func (c * Client ) onReady () {
0 commit comments