From 83e3327c531d9105899e4fd54675d355e29f19da Mon Sep 17 00:00:00 2001 From: Martin Hrabovcin Date: Tue, 30 Jan 2018 11:52:00 +0100 Subject: [PATCH] TestRecurringReAuthHang - fix race conditions in test --- zk/conn.go | 82 ++++++++++++++++++++++++++++++++++++------------- zk/conn_test.go | 50 +++++++++++++++++------------- 2 files changed, 89 insertions(+), 43 deletions(-) diff --git a/zk/conn.go b/zk/conn.go index f79a51b3..6b59e0b5 100644 --- a/zk/conn.go +++ b/zk/conn.go @@ -101,8 +101,12 @@ type Conn struct { reconnectLatch chan struct{} setWatchLimit int setWatchCallback func([]*setWatchesRequest) - // Debug (for recurring re-auth hang) - debugCloseRecvLoop bool + + // Debug (for recurring re-auth hang) test + // These variables shouldn't be used or modified as part of normal + // operation. + // See `TestRecurringReAuthHang` + debugCloseRecvLoop int32 debugReauthDone chan struct{} logger Logger @@ -192,20 +196,21 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti ec := make(chan Event, eventChanSize) conn := &Conn{ - dialer: net.DialTimeout, - hostProvider: &DNSHostProvider{}, - conn: nil, - state: StateDisconnected, - eventChan: ec, - shouldQuit: make(chan struct{}), - connectTimeout: 1 * time.Second, - sendChan: make(chan *request, sendChanSize), - requests: make(map[int32]*request), - watchers: make(map[watchPathType][]chan Event), - passwd: emptyPassword, - logger: DefaultLogger, - logInfo: true, // default is true for backwards compatability - buf: make([]byte, bufferSize), + dialer: net.DialTimeout, + hostProvider: &DNSHostProvider{}, + conn: nil, + state: StateDisconnected, + eventChan: ec, + shouldQuit: make(chan struct{}), + connectTimeout: 1 * time.Second, + sendChan: make(chan *request, sendChanSize), + requests: make(map[int32]*request), + watchers: make(map[watchPathType][]chan Event), + passwd: emptyPassword, + logger: DefaultLogger, + logInfo: true, // default is true for backwards compatability + buf: make([]byte, bufferSize), + debugReauthDone: make(chan struct{}), } // Set provided options. @@ -300,7 +305,7 @@ func WithMaxBufferSize(maxBufferSize int) connOption { } // WithMaxConnBufferSize sets maximum buffer size used to send and encode -// packets to Zookeeper server. The standard Zookeepeer client for java defaults +// packets to Zookeeper server. The standard Zookeeper client for java defaults // to a limit of 1mb. This option should be used for non-standard server setup // where znode is bigger than default 1mb. func WithMaxConnBufferSize(maxBufferSize int) connOption { @@ -328,6 +333,21 @@ func (c *Conn) SessionID() int64 { return atomic.LoadInt64(&c.sessionID) } +func (c *Conn) shouldDebugCloseRecvLoop() bool { + return (atomic.LoadInt32(&c.debugCloseRecvLoop) == 1) +} + +func (c *Conn) setDebugCloseRecvLoop(v bool) { + var store int32 + if v { + store = 1 + } else { + store = 0 + } + + atomic.StoreInt32(&c.debugCloseRecvLoop, store) +} + // SetLogger sets the logger to be used for printing errors. // Logger is an interface provided by this package. func (c *Conn) SetLogger(l Logger) { @@ -415,7 +435,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { for _, cred := range c.creds { if shouldCancel() { - c.logger.Printf("Cancel rer-submitting credentials") + c.logger.Printf("Cancel re-submitting credentials") return } resChan, err := c.sendRequest( @@ -437,7 +457,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { select { case res = <-resChan: case <-c.closeChan: - c.logger.Printf("Recv closed, cancel re-submitting credentials") + c.logger.Printf("Recv closed, cancel re-submitting credentials, cannot read") return case <-c.shouldQuit: c.logger.Printf("Should quit, cancel re-submitting credentials") @@ -503,8 +523,22 @@ func (c *Conn) loop() { wg.Add(1) go func() { <-reauthChan - if c.debugCloseRecvLoop { - close(c.debugReauthDone) + // This condition exists for signaling purposes, that the test + // `TestRecurringReAuthHang` was successful. The previous call + // `<-reauthChan` did not block. That means the + // `resendZkAuth` didn't block even on read loop error. + // See `TestRecurringReAuthHang` + if c.shouldDebugCloseRecvLoop() { + // It is possible that during the test the ZK conn will try + // to reconnect multiple times before cleanly closing the + // test. This select here is to prevent closing + // `c.debugReauthDone` channel twice during the test and + // panic. + select { + case <-c.debugReauthDone: + default: + close(c.debugReauthDone) + } } err := c.sendLoop() if err != nil || c.logInfo { @@ -517,7 +551,11 @@ func (c *Conn) loop() { wg.Add(1) go func() { var err error - if c.debugCloseRecvLoop { + // For purposes of testing recurring resendZkAuth we'll + // simulate error on read loop, which should force whole + // IO loop to close. + // See `TestRecurringReAuthHang` + if c.shouldDebugCloseRecvLoop() { err = errors.New("DEBUG: close recv loop") } else { err = c.recvLoop(c.conn) diff --git a/zk/conn_test.go b/zk/conn_test.go index 94206d95..ad03f3d2 100644 --- a/zk/conn_test.go +++ b/zk/conn_test.go @@ -1,39 +1,35 @@ package zk import ( + "context" "io/ioutil" "testing" "time" ) func TestRecurringReAuthHang(t *testing.T) { - t.Skip("Race condition in test") - - sessionTimeout := 2 * time.Second - - finish := make(chan struct{}) - defer close(finish) - go func() { - select { - case <-finish: - return - case <-time.After(5 * sessionTimeout): - panic("expected not hang") - } - }() - - zkC, err := StartTestCluster(2, ioutil.Discard, ioutil.Discard) + zkC, err := StartTestCluster(3, ioutil.Discard, ioutil.Discard) if err != nil { - panic(err) + t.Fatal(err) } defer zkC.Stop() conn, evtC, err := zkC.ConnectAll() if err != nil { - panic(err) + t.Fatal(err) } + + ctx, cancel := context.WithDeadline( + context.Background(), time.Now().Add(5*time.Second)) + defer cancel() for conn.State() != StateHasSession { time.Sleep(50 * time.Millisecond) + + select { + case <-ctx.Done(): + t.Fatal("Failed to connect to ZK") + default: + } } go func() { @@ -42,16 +38,28 @@ func TestRecurringReAuthHang(t *testing.T) { }() // Add auth. - conn.AddAuth("digest", []byte("test:test")) + conn.credsMu.Lock() + conn.creds = append(conn.creds, authCreds{"digest", []byte("test:test")}) + conn.credsMu.Unlock() currentServer := conn.Server() - conn.debugCloseRecvLoop = true - conn.debugReauthDone = make(chan struct{}) + conn.setDebugCloseRecvLoop(true) zkC.StopServer(currentServer) + // wait connect to new zookeeper. + ctx, cancel = context.WithDeadline( + context.Background(), time.Now().Add(5*time.Second)) + defer cancel() for conn.Server() == currentServer && conn.State() != StateHasSession { time.Sleep(100 * time.Millisecond) + + select { + case <-ctx.Done(): + t.Fatal("Failed to reconnect ZK next server") + default: + } } <-conn.debugReauthDone + conn.Close() }