Skip to content

Commit cd2141b

Browse files
authored
Fix race condition (#426)
1 parent a14bfaf commit cd2141b

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

partition_table.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package goka
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"time"
78

89
"github.com/Shopify/sarama"
@@ -137,27 +138,34 @@ func (p *PartitionTable) loadRestarting(ctx context.Context, stopAfterCatchup bo
137138
var (
138139
resetTimer *time.Timer
139140
retries int
141+
retryMux sync.Mutex
140142
)
141143

142144
for {
143145
err := p.load(ctx, stopAfterCatchup)
144146
if err != nil {
145147
p.log.Printf("Error while starting up: %v", err)
146148

149+
retryMux.Lock()
147150
retries++
151+
retryMux.Unlock()
148152
if resetTimer != nil {
149153
resetTimer.Stop()
150154
}
151155
resetTimer = time.AfterFunc(p.backoffResetTimeout, func() {
152156
p.backoff.Reset()
157+
retryMux.Lock()
153158
retries = 0
159+
retryMux.Unlock()
154160
})
155161
} else {
156162
return nil
157163
}
158164

159165
retryDuration := p.backoff.Duration()
166+
retryMux.Lock()
160167
p.log.Printf("Will retry in %.0f seconds (retried %d times so far)", retryDuration.Seconds(), retries)
168+
retryMux.Unlock()
161169
select {
162170
case <-ctx.Done():
163171
return nil

simple_backoff.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package goka
22

3-
import "time"
3+
import (
4+
"sync"
5+
"time"
6+
)
47

58
// NewSimpleBackoff returns a simple backoff waiting the
69
// specified duration longer each iteration until reset.
@@ -12,16 +15,22 @@ func NewSimpleBackoff(step time.Duration, max time.Duration) Backoff {
1215
}
1316

1417
type simpleBackoff struct {
18+
sync.Mutex
19+
1520
current time.Duration
1621
step time.Duration
1722
max time.Duration
1823
}
1924

2025
func (b *simpleBackoff) Reset() {
26+
b.Lock()
27+
defer b.Unlock()
2128
b.current = time.Duration(0)
2229
}
2330

2431
func (b *simpleBackoff) Duration() time.Duration {
32+
b.Lock()
33+
defer b.Unlock()
2534
value := b.current
2635

2736
if (b.current + b.step) <= b.max {

0 commit comments

Comments
 (0)