Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

TestRecurringReAuthHang - fix race conditions in test #186

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 60 additions & 22 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ type Conn struct {
reconnectLatch chan struct{}
setWatchLimit int
setWatchCallback func([]*setWatchesRequest)
// Debug (for recurring re-auth hang)
debugCloseRecvLoop bool

// Debug (for recurring re-auth hang) test
// These variables shouldn't be used or modified as part of normal
// operation.
// See `TestRecurringReAuthHang`
debugCloseRecvLoop int32
debugReauthDone chan struct{}

logger Logger
Expand Down Expand Up @@ -192,20 +196,21 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti

ec := make(chan Event, eventChanSize)
conn := &Conn{
dialer: net.DialTimeout,
hostProvider: &DNSHostProvider{},
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan struct{}),
connectTimeout: 1 * time.Second,
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
dialer: net.DialTimeout,
hostProvider: &DNSHostProvider{},
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan struct{}),
connectTimeout: 1 * time.Second,
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
debugReauthDone: make(chan struct{}),
}

// Set provided options.
Expand Down Expand Up @@ -300,7 +305,7 @@ func WithMaxBufferSize(maxBufferSize int) connOption {
}

// WithMaxConnBufferSize sets maximum buffer size used to send and encode
// packets to Zookeeper server. The standard Zookeepeer client for java defaults
// packets to Zookeeper server. The standard Zookeeper client for java defaults
// to a limit of 1mb. This option should be used for non-standard server setup
// where znode is bigger than default 1mb.
func WithMaxConnBufferSize(maxBufferSize int) connOption {
Expand Down Expand Up @@ -328,6 +333,21 @@ func (c *Conn) SessionID() int64 {
return atomic.LoadInt64(&c.sessionID)
}

func (c *Conn) shouldDebugCloseRecvLoop() bool {
return (atomic.LoadInt32(&c.debugCloseRecvLoop) == 1)
}

func (c *Conn) setDebugCloseRecvLoop(v bool) {
var store int32
if v {
store = 1
} else {
store = 0
}

atomic.StoreInt32(&c.debugCloseRecvLoop, store)
}

// SetLogger sets the logger to be used for printing errors.
// Logger is an interface provided by this package.
func (c *Conn) SetLogger(l Logger) {
Expand Down Expand Up @@ -415,7 +435,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {

for _, cred := range c.creds {
if shouldCancel() {
c.logger.Printf("Cancel rer-submitting credentials")
c.logger.Printf("Cancel re-submitting credentials")
return
}
resChan, err := c.sendRequest(
Expand All @@ -437,7 +457,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
select {
case res = <-resChan:
case <-c.closeChan:
c.logger.Printf("Recv closed, cancel re-submitting credentials")
c.logger.Printf("Recv closed, cancel re-submitting credentials, cannot read")
return
case <-c.shouldQuit:
c.logger.Printf("Should quit, cancel re-submitting credentials")
Expand Down Expand Up @@ -503,8 +523,22 @@ func (c *Conn) loop() {
wg.Add(1)
go func() {
<-reauthChan
if c.debugCloseRecvLoop {
close(c.debugReauthDone)
// This condition exists for signaling purposes, that the test
// `TestRecurringReAuthHang` was successful. The previous call
// `<-reauthChan` did not block. That means the
// `resendZkAuth` didn't block even on read loop error.
// See `TestRecurringReAuthHang`
if c.shouldDebugCloseRecvLoop() {
// It is possible that during the test the ZK conn will try
// to reconnect multiple times before cleanly closing the
// test. This select here is to prevent closing
// `c.debugReauthDone` channel twice during the test and
// panic.
select {
case <-c.debugReauthDone:
default:
close(c.debugReauthDone)
}
}
err := c.sendLoop()
if err != nil || c.logInfo {
Expand All @@ -517,7 +551,11 @@ func (c *Conn) loop() {
wg.Add(1)
go func() {
var err error
if c.debugCloseRecvLoop {
// For purposes of testing recurring resendZkAuth we'll
// simulate error on read loop, which should force whole
// IO loop to close.
// See `TestRecurringReAuthHang`
if c.shouldDebugCloseRecvLoop() {
err = errors.New("DEBUG: close recv loop")
} else {
err = c.recvLoop(c.conn)
Expand Down
50 changes: 29 additions & 21 deletions zk/conn_test.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,35 @@
package zk

import (
"context"
"io/ioutil"
"testing"
"time"
)

func TestRecurringReAuthHang(t *testing.T) {
t.Skip("Race condition in test")

sessionTimeout := 2 * time.Second

finish := make(chan struct{})
defer close(finish)
go func() {
select {
case <-finish:
return
case <-time.After(5 * sessionTimeout):
panic("expected not hang")
}
}()

zkC, err := StartTestCluster(2, ioutil.Discard, ioutil.Discard)
zkC, err := StartTestCluster(3, ioutil.Discard, ioutil.Discard)
if err != nil {
panic(err)
t.Fatal(err)
}
defer zkC.Stop()

conn, evtC, err := zkC.ConnectAll()
if err != nil {
panic(err)
t.Fatal(err)
}

ctx, cancel := context.WithDeadline(
context.Background(), time.Now().Add(5*time.Second))
defer cancel()
for conn.State() != StateHasSession {
time.Sleep(50 * time.Millisecond)

select {
case <-ctx.Done():
t.Fatal("Failed to connect to ZK")
default:
}
}

go func() {
Expand All @@ -42,16 +38,28 @@ func TestRecurringReAuthHang(t *testing.T) {
}()

// Add auth.
conn.AddAuth("digest", []byte("test:test"))
conn.credsMu.Lock()
conn.creds = append(conn.creds, authCreds{"digest", []byte("test:test")})
conn.credsMu.Unlock()

currentServer := conn.Server()
conn.debugCloseRecvLoop = true
conn.debugReauthDone = make(chan struct{})
conn.setDebugCloseRecvLoop(true)
zkC.StopServer(currentServer)

// wait connect to new zookeeper.
ctx, cancel = context.WithDeadline(
context.Background(), time.Now().Add(5*time.Second))
defer cancel()
for conn.Server() == currentServer && conn.State() != StateHasSession {
time.Sleep(100 * time.Millisecond)

select {
case <-ctx.Done():
t.Fatal("Failed to reconnect ZK next server")
default:
}
}

<-conn.debugReauthDone
conn.Close()
}