Skip to content

Commit 4e1bc06

Browse files
authored
Merge pull request #191 from rjeczalik/renewer-fix
tokenrenewer: improve fix for a panic #187
2 parents 077dc6d + 200f141 commit 4e1bc06

File tree

6 files changed

+75
-29
lines changed

6 files changed

+75
-29
lines changed

.travis.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
language: go
22
sudo: false
33
go:
4-
- 1.4.3
5-
- 1.6.3
6-
- 1.7.1
4+
- 1.7.4
5+
- tip
76
install:
87
- go get -d -v -t ./...
98
script:

client.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -564,51 +564,55 @@ func (c *Client) OnTokenRenew(handler func(token string)) {
564564
// callOnConnectHandlers runs the registered connect handlers.
565565
func (c *Client) callOnConnectHandlers() {
566566
c.m.RLock()
567+
defer c.m.RUnlock()
568+
567569
for _, handler := range c.onConnectHandlers {
568570
func() {
569-
defer recover()
571+
defer nopRecover()
570572
handler()
571573
}()
572574
}
573-
c.m.RUnlock()
574575
}
575576

576577
// callOnDisconnectHandlers runs the registered disconnect handlers.
577578
func (c *Client) callOnDisconnectHandlers() {
578579
c.m.RLock()
580+
defer c.m.RUnlock()
581+
579582
for _, handler := range c.onDisconnectHandlers {
580583
func() {
581-
defer recover()
584+
defer nopRecover()
582585
handler()
583586
}()
584587
}
585-
c.m.RUnlock()
586588
}
587589

588590
// callOnTokenExpireHandlers calls registered functions when an error
589591
// from remote kite is received that token used is expired.
590592
func (c *Client) callOnTokenExpireHandlers() {
591593
c.m.RLock()
594+
defer c.m.RUnlock()
595+
592596
for _, handler := range c.onTokenExpireHandlers {
593597
func() {
594-
defer recover()
598+
defer nopRecover()
595599
handler()
596600
}()
597601
}
598-
c.m.RUnlock()
599602
}
600603

601604
// callOnTokenRenewHandlers calls all registered functions when
602605
// we successfully obtain new token from kontrol.
603606
func (c *Client) callOnTokenRenewHandlers(token string) {
604607
c.m.RLock()
608+
defer c.m.RUnlock()
609+
605610
for _, handler := range c.onTokenRenewHandlers {
606611
func() {
607-
defer recover()
612+
defer nopRecover()
608613
handler(token)
609614
}()
610615
}
611-
c.m.RUnlock()
612616
}
613617

614618
func (c *Client) wrapMethodArgs(args []interface{}, responseCallback dnode.Function) []interface{} {

handlers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"net/http"
66
"net/url"
7+
"os"
78
"os/exec"
89
"runtime"
910

@@ -66,7 +67,7 @@ func handlePrompt(r *Request) (interface{}, error) {
6667
// handleGetPass reads a line of input from a terminal without local echo.
6768
func handleGetPass(r *Request) (interface{}, error) {
6869
fmt.Print(r.Args.One().MustString())
69-
data, err := terminal.ReadPassword(0) // stdin
70+
data, err := terminal.ReadPassword(int(os.Stdin.Fd())) // stdin
7071
fmt.Println()
7172
if err != nil {
7273
return nil, err

kite.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,10 @@ func (k *Kite) callOnConnectHandlers(c *Client) {
324324
defer k.handlersMu.RUnlock()
325325

326326
for _, handler := range k.onConnectHandlers {
327-
handler(c)
327+
func() {
328+
defer nopRecover()
329+
handler(c)
330+
}()
328331
}
329332
}
330333

@@ -333,7 +336,10 @@ func (k *Kite) callOnFirstRequestHandlers(c *Client) {
333336
defer k.handlersMu.RUnlock()
334337

335338
for _, handler := range k.onFirstRequestHandlers {
336-
handler(c)
339+
func() {
340+
defer nopRecover()
341+
handler(c)
342+
}()
337343
}
338344
}
339345

@@ -342,7 +348,10 @@ func (k *Kite) callOnDisconnectHandlers(c *Client) {
342348
defer k.handlersMu.RUnlock()
343349

344350
for _, handler := range k.onDisconnectHandlers {
345-
handler(c)
351+
func() {
352+
defer nopRecover()
353+
handler(c)
354+
}()
346355
}
347356
}
348357

@@ -351,7 +360,10 @@ func (k *Kite) callOnRegisterHandlers(r *protocol.RegisterResult) {
351360
defer k.handlersMu.RUnlock()
352361

353362
for _, handler := range k.onRegisterHandlers {
354-
handler(r)
363+
func() {
364+
defer nopRecover()
365+
handler(r)
366+
}()
355367
}
356368
}
357369

@@ -521,3 +533,5 @@ func (fn closerFunc) Close() error {
521533

522534
return nil
523535
}
536+
537+
func nopRecover() { recover() }

kite_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/koding/kite/config"
1515
"github.com/koding/kite/dnode"
16+
"github.com/koding/kite/protocol"
1617
"github.com/koding/kite/sockjsclient"
1718
_ "github.com/koding/kite/testutil"
1819

@@ -23,6 +24,14 @@ func init() {
2324
rand.Seed(time.Now().Unix() + int64(os.Getpid()))
2425
}
2526

27+
func panicHandler(*Client) {
28+
panic("this panic should be ignored")
29+
}
30+
31+
func panicRegisterHandler(*protocol.RegisterResult) {
32+
panic("this panic should be ignored")
33+
}
34+
2635
func TestMultiple(t *testing.T) {
2736
testDuration := time.Second * 10
2837

@@ -51,6 +60,11 @@ func TestMultiple(t *testing.T) {
5160
m.Config.Transport = transport
5261
m.Config.Port = port + i
5362

63+
m.OnConnect(panicHandler)
64+
m.OnRegister(panicRegisterHandler)
65+
m.OnDisconnect(panicHandler)
66+
m.OnFirstRequest(panicHandler)
67+
5468
m.HandleFunc("square", Square)
5569
go m.Run()
5670
<-m.ServerReadyNotify()

tokenrenewer.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package kite
22

33
import (
44
"fmt"
5-
"sync/atomic"
5+
"strings"
6+
"sync"
67
"time"
78

89
jwt "github.com/dgrijalva/jwt-go"
@@ -22,15 +23,16 @@ type TokenRenewer struct {
2223
validUntil time.Time
2324
signalRenewToken chan struct{}
2425
disconnect chan struct{}
25-
active uint32
26+
once sync.Once // for c.installHandlers
27+
renewLoopWG sync.WaitGroup
2628
}
2729

2830
func NewTokenRenewer(r *Client, k *Kite) (*TokenRenewer, error) {
2931
t := &TokenRenewer{
3032
client: r,
3133
localKite: k,
32-
signalRenewToken: make(chan struct{}, 1),
33-
disconnect: make(chan struct{}, 1),
34+
signalRenewToken: make(chan struct{}),
35+
disconnect: make(chan struct{}),
3436
}
3537
return t, t.parse(r.Auth.Key)
3638
}
@@ -60,31 +62,35 @@ func (t *TokenRenewer) parse(tokenString string) error {
6062

6163
// RenewWhenExpires renews the token before it expires.
6264
func (t *TokenRenewer) RenewWhenExpires() {
63-
if atomic.CompareAndSwapUint32(&t.active, 0, 1) {
64-
t.client.OnConnect(t.startRenewLoop)
65-
t.client.OnTokenExpire(t.sendRenewTokenSignal)
66-
t.client.OnDisconnect(t.sendDisconnectSignal)
67-
}
65+
t.once.Do(t.installHandlers)
66+
}
67+
68+
func (t *TokenRenewer) installHandlers() {
69+
t.client.OnConnect(t.startRenewLoop)
70+
t.client.OnTokenExpire(t.sendRenewTokenSignal)
71+
t.client.OnDisconnect(t.sendDisconnectSignal)
6872
}
6973

7074
func (t *TokenRenewer) renewLoop() {
75+
t.renewLoopWG.Add(1)
76+
defer t.renewLoopWG.Done()
77+
7178
// renews token before it expires (sends the first signal to the goroutine below)
7279
go time.AfterFunc(t.renewDuration(), t.sendRenewTokenSignal)
7380

7481
// renew token on signal util remote kite disconnects.
7582
for {
7683
select {
7784
case <-t.signalRenewToken:
78-
switch err := t.renewToken(); err {
79-
case nil:
85+
switch err := t.renewToken(); {
86+
case err == nil:
8087
go time.AfterFunc(t.renewDuration(), t.sendRenewTokenSignal)
81-
case ErrNoKitesAvailable:
88+
case err == ErrNoKitesAvailable || strings.Contains(err.Error(), "no kites found"):
8289
// If kite went down we're not going to renew the token,
8390
// as we need to dial either way.
8491
//
8592
// This case handles a situation, when kite missed
8693
// disconnect signal (observed to happen with XHR transport).
87-
return
8894
default:
8995
t.localKite.Log.Error("token renewer: %s Cannot renew token for Kite: %s I will retry in %d seconds...",
9096
err, t.client.ID, retryInterval/time.Second)
@@ -107,6 +113,14 @@ func (t *TokenRenewer) renewDuration() time.Duration {
107113
}
108114

109115
func (t *TokenRenewer) startRenewLoop() {
116+
// In case when t.client missed a disconnect signal (e.g. due to timeout observed
117+
// by the remote end), previous renewLoop will be still running.
118+
t.sendDisconnectSignal()
119+
120+
// if we don't wait to observe previous renewLoop goroutine handle the disconnect
121+
// signal, we'd have a race resulting in new renewLoop goroutine handling it.
122+
t.renewLoopWG.Wait()
123+
110124
go t.renewLoop()
111125
}
112126

0 commit comments

Comments
 (0)