6
6
"time"
7
7
8
8
"github.com/Shopify/sarama"
9
+ "github.com/hashicorp/go-multierror"
9
10
"github.com/lovoo/goka/multierr"
10
11
"github.com/lovoo/goka/storage"
11
12
)
@@ -17,6 +18,8 @@ const (
17
18
18
19
// internal offset we use to detect if the offset has never been stored locally
19
20
offsetNotStored int64 = - 3
21
+
22
+ consumerDrainTimeout = time .Second
20
23
)
21
24
22
25
// Backoff is used for adding backoff capabilities to the restarting
@@ -191,7 +194,6 @@ func (p *PartitionTable) Close() error {
191
194
func (p * PartitionTable ) createStorage (ctx context.Context ) (* storageProxy , error ) {
192
195
var (
193
196
err error
194
- errs = new (multierr.Errors )
195
197
st storage.Storage
196
198
start = time .Now ()
197
199
done = make (chan struct {})
@@ -217,9 +219,7 @@ func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, erro
217
219
}
218
220
err = st .Open ()
219
221
if err != nil {
220
- errs .Collect (st .Close ())
221
- errs .Collect (fmt .Errorf ("error opening storage: %v" , err ))
222
- return nil , errs .NilOrError ()
222
+ return nil , multierror .Append (st .Close (), fmt .Errorf ("error opening storage: %v" , err )).ErrorOrNil ()
223
223
}
224
224
225
225
// close the db if context was cancelled before the builder returned
@@ -228,10 +228,9 @@ func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, erro
228
228
err = st .Close ()
229
229
// only collect context error if Close() errored out
230
230
if err != nil {
231
- errs .Collect (err )
232
- errs .Collect (ctx .Err ())
231
+ return nil , multierror .Append (err , ctx .Err ()).ErrorOrNil ()
233
232
}
234
- return nil , errs . NilOrError ()
233
+ return nil , nil
235
234
default :
236
235
}
237
236
@@ -280,37 +279,25 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
280
279
storedOffset int64
281
280
partConsumer sarama.PartitionConsumer
282
281
err error
283
- errs = new (multierr.Errors )
284
282
)
285
283
ctx , cancel := context .WithCancel (ctx )
286
284
defer cancel ()
287
285
288
- // deferred error handling
289
- defer func () {
290
- errs .Collect (rerr )
291
-
292
- rerr = errs .NilOrError ()
293
- return
294
- }()
295
-
296
286
p .state .SetState (State (PartitionConnecting ))
297
287
298
288
// fetch local offset
299
289
storedOffset , err = p .st .GetOffset (offsetNotStored )
300
290
if err != nil {
301
- errs .Collect (fmt .Errorf ("error reading local offset: %v" , err ))
302
- return
291
+ return fmt .Errorf ("error reading local offset: %v" , err )
303
292
}
304
293
305
294
loadOffset , hwm , err := p .findOffsetToLoad (storedOffset )
306
295
if err != nil {
307
- errs .Collect (err )
308
- return
296
+ return err
309
297
}
310
298
311
299
if storedOffset > 0 && hwm == 0 {
312
- errs .Collect (fmt .Errorf ("kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic %s, partition %d, hwm %d, local offset %d" , p .topic , p .partition , hwm , storedOffset ))
313
- return
300
+ return fmt .Errorf ("kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic %s, partition %d, hwm %d, local offset %d" , p .topic , p .partition , hwm , storedOffset )
314
301
}
315
302
316
303
if storedOffset >= hwm {
@@ -334,8 +321,7 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
334
321
// AND we're here for catchup, so let's stop here
335
322
// and do not attempt to load anything
336
323
if stopAfterCatchup && loadOffset >= hwm {
337
- errs .Collect (p .markRecovered (ctx ))
338
- return
324
+ return p .markRecovered (ctx )
339
325
}
340
326
341
327
if stopAfterCatchup {
@@ -348,17 +334,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
348
334
349
335
partConsumer , err = p .consumer .ConsumePartition (p .topic , p .partition , loadOffset )
350
336
if err != nil {
351
- errs .Collect (fmt .Errorf ("Error creating partition consumer for topic %s, partition %d, offset %d: %v" , p .topic , p .partition , storedOffset , err ))
352
- return
337
+ return fmt .Errorf ("Error creating partition consumer for topic %s, partition %d, offset %d: %v" , p .topic , p .partition , storedOffset , err )
353
338
}
354
339
355
- // consume errors asynchronously
356
- go p .handleConsumerErrors (ctx , errs , partConsumer )
357
-
358
340
// close the consumer
359
341
defer func () {
360
342
partConsumer .AsyncClose ()
361
- p .drainConsumer (partConsumer , errs )
343
+ rerr = multierror . Append ( rerr , p .drainConsumer (partConsumer )). ErrorOrNil ( )
362
344
}()
363
345
364
346
if stopAfterCatchup {
@@ -371,15 +353,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
371
353
loadErr := p .loadMessages (ctx , partConsumer , hwm , stopAfterCatchup )
372
354
373
355
if loadErr != nil {
374
- errs .Collect (loadErr )
375
- return
356
+ return loadErr
376
357
}
377
358
378
359
if stopAfterCatchup {
379
- errs .Collect (p .markRecovered (ctx ))
380
-
381
- now := time .Now ()
382
- p .enqueueStatsUpdate (ctx , func () { p .stats .Recovery .RecoveryTime = now })
360
+ err := p .markRecovered (ctx )
361
+ p .enqueueStatsUpdate (ctx , func () { p .stats .Recovery .RecoveryTime = time .Now () })
362
+ return err
383
363
}
384
364
return
385
365
}
@@ -424,43 +404,27 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
424
404
}
425
405
}
426
406
427
- func (p * PartitionTable ) handleConsumerErrors (ctx context.Context , errs * multierr.Errors , cons sarama.PartitionConsumer ) {
428
- for {
429
- select {
430
- case consError , ok := <- cons .Errors ():
431
- if ! ok {
432
- return
433
- }
434
- err := fmt .Errorf ("Consumer error: %v" , consError )
435
- p .log .Printf ("%v" , err )
436
- errs .Collect (err )
437
- // if there's an error, close the consumer
438
- cons .AsyncClose ()
439
- case <- ctx .Done ():
440
- return
441
- }
442
- }
443
- }
407
+ func (p * PartitionTable ) drainConsumer (cons sarama.PartitionConsumer ) error {
444
408
445
- func (p * PartitionTable ) drainConsumer (cons sarama.PartitionConsumer , errs * multierr.Errors ) {
446
-
447
- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
409
+ timeoutCtx , cancel := context .WithTimeout (context .Background (), consumerDrainTimeout )
448
410
defer cancel ()
449
411
450
- errg , ctx := multierr .NewErrGroup (ctx )
412
+ errg , _ := multierr .NewErrGroup (context . Background () )
451
413
452
414
// drain errors channel
453
415
errg .Go (func () error {
416
+ var errs * multierror.Error
417
+
454
418
for {
455
419
select {
456
- case <- ctx .Done ():
420
+ case <- timeoutCtx .Done ():
457
421
p .log .Printf ("draining errors channel timed out" )
458
- return nil
422
+ return errs
459
423
case err , ok := <- cons .Errors ():
460
424
if ! ok {
461
- return nil
425
+ return errs
462
426
}
463
- errs . Collect ( err )
427
+ errs = multierror . Append ( errs , err )
464
428
}
465
429
}
466
430
})
@@ -469,7 +433,7 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *mult
469
433
errg .Go (func () error {
470
434
for {
471
435
select {
472
- case <- ctx .Done ():
436
+ case <- timeoutCtx .Done ():
473
437
p .log .Printf ("draining messages channel timed out" )
474
438
return nil
475
439
case _ , ok := <- cons .Messages ():
@@ -480,30 +444,31 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *mult
480
444
}
481
445
})
482
446
483
- errg .Wait ()
447
+ return errg .Wait (). ErrorOrNil ()
484
448
}
485
449
486
- func (p * PartitionTable ) loadMessages (ctx context.Context , cons sarama.PartitionConsumer , partitionHwm int64 , stopAfterCatchup bool ) (rerr error ) {
487
- errs := new (multierr.Errors )
488
-
489
- // deferred error handling
490
- defer func () {
491
- errs .Collect (rerr )
492
-
493
- rerr = errs .NilOrError ()
494
- return
495
- }()
450
+ func (p * PartitionTable ) loadMessages (ctx context.Context , cons sarama.PartitionConsumer , partitionHwm int64 , stopAfterCatchup bool ) error {
496
451
497
452
stallTicker := time .NewTicker (p .stallPeriod )
498
453
defer stallTicker .Stop ()
499
454
500
455
lastMessage := time .Now ()
501
456
457
+ messages := cons .Messages ()
458
+ errors := cons .Errors ()
459
+
502
460
for {
503
461
select {
504
- case msg , ok := <- cons . Messages () :
462
+ case err , ok := <- errors :
505
463
if ! ok {
506
- return
464
+ return nil
465
+ }
466
+ if err != nil {
467
+ return err
468
+ }
469
+ case msg , ok := <- messages :
470
+ if ! ok {
471
+ return nil
507
472
}
508
473
509
474
// This case is for the Tester to achieve synchronity.
@@ -521,8 +486,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
521
486
522
487
lastMessage = time .Now ()
523
488
if err := p .storeEvent (string (msg .Key ), msg .Value , msg .Offset , msg .Headers ); err != nil {
524
- errs .Collect (fmt .Errorf ("load: error updating storage: %v" , err ))
525
- return
489
+ return fmt .Errorf ("load: error updating storage: %v" , err )
526
490
}
527
491
528
492
if stopAfterCatchup {
@@ -532,7 +496,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
532
496
p .enqueueStatsUpdate (ctx , func () { p .trackIncomingMessageStats (msg ) })
533
497
534
498
if stopAfterCatchup && msg .Offset >= partitionHwm - 1 {
535
- return
499
+ return nil
536
500
}
537
501
538
502
case now := <- stallTicker .C :
@@ -543,7 +507,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
543
507
}
544
508
545
509
case <- ctx .Done ():
546
- return
510
+ return nil
547
511
}
548
512
}
549
513
}
@@ -711,7 +675,7 @@ func (p *PartitionTable) IteratorWithRange(start []byte, limit []byte) (storage.
711
675
712
676
func (p * PartitionTable ) readyToRead () error {
713
677
pstate := p .CurrentState ()
714
- if pstate != PartitionRunning {
678
+ if pstate < PartitionConnecting {
715
679
return fmt .Errorf ("Partition is not running (but %v) so it's not safe to read values" , pstate )
716
680
}
717
681
return nil
0 commit comments