@@ -80,11 +80,8 @@ type PartitionProcessor struct {
80
80
consumer sarama.Consumer
81
81
tmgr TopicManager
82
82
83
- stats * PartitionProcStats
84
- requestStats chan bool
85
- responseStats chan * PartitionProcStats
86
- updateStats chan func ()
87
- cancelStatsLoop context.CancelFunc
83
+ mStats sync.RWMutex
84
+ stats * PartitionProcStats
88
85
89
86
commit commitCallback
90
87
producer Producer
@@ -137,38 +134,32 @@ func newPartitionProcessor(partition int32,
137
134
138
135
log := logger .Prefix (fmt .Sprintf ("PartitionProcessor (%d)" , partition ))
139
136
140
- statsLoopCtx , cancel := context .WithCancel (context .Background ())
141
-
142
137
for _ , v := range graph .visitors {
143
138
visitCallbacks [v .(* visitor ).name ] = v .(* visitor ).cb
144
139
}
145
140
146
141
partProc := & PartitionProcessor {
147
- log : log ,
148
- opts : opts ,
149
- partition : partition ,
150
- state : NewSignal (PPStateIdle , PPStateRecovering , PPStateRunning , PPStateStopping , PPStateStopped ).SetState (PPStateIdle ),
151
- callbacks : callbacks ,
152
- lookups : lookupTables ,
153
- consumer : consumer ,
154
- producer : producer ,
155
- tmgr : tmgr ,
156
- joins : make (map [string ]* PartitionTable ),
157
- input : make (chan * message , opts .partitionChannelSize ),
158
- inputTopics : topicList ,
159
- visitInput : make (chan * visit , defaultPPVisitChannelSize ),
160
- visitCallbacks : visitCallbacks ,
161
- graph : graph ,
162
- stats : newPartitionProcStats (topicList , outputList ),
163
- requestStats : make (chan bool ),
164
- responseStats : make (chan * PartitionProcStats , 1 ),
165
- updateStats : make (chan func (), 10 ),
166
- cancelStatsLoop : cancel ,
167
- commit : commit ,
168
- runMode : runMode ,
169
- }
170
-
171
- go partProc .runStatsLoop (statsLoopCtx )
142
+ log : log ,
143
+ opts : opts ,
144
+ partition : partition ,
145
+ state : NewSignal (PPStateIdle , PPStateRecovering , PPStateRunning , PPStateStopping , PPStateStopped ).SetState (PPStateIdle ),
146
+ callbacks : callbacks ,
147
+ lookups : lookupTables ,
148
+ consumer : consumer ,
149
+ producer : producer ,
150
+ tmgr : tmgr ,
151
+ joins : make (map [string ]* PartitionTable ),
152
+ input : make (chan * message , opts .partitionChannelSize ),
153
+ inputTopics : topicList ,
154
+ visitInput : make (chan * visit , defaultPPVisitChannelSize ),
155
+ visitCallbacks : visitCallbacks ,
156
+ graph : graph ,
157
+
158
+ stats : newPartitionProcStats (topicList , outputList ),
159
+
160
+ commit : commit ,
161
+ runMode : runMode ,
162
+ }
172
163
173
164
if graph .GroupTable () != nil {
174
165
partProc .table = newPartitionTable (graph .GroupTable ().Topic (),
@@ -217,7 +208,6 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
217
208
defer pp .state .SetState (PPStateRunning )
218
209
219
210
if pp .table != nil {
220
- go pp .table .RunStatsLoop (runnerCtx )
221
211
setupErrg .Go (func () error {
222
212
pp .log .Debugf ("catching up table" )
223
213
defer pp .log .Debugf ("catching up table done" )
@@ -238,7 +228,6 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
238
228
)
239
229
pp .joins [join .Topic ()] = table
240
230
241
- go table .RunStatsLoop (runnerCtx )
242
231
setupErrg .Go (func () error {
243
232
return table .SetupAndRecover (setupCtx , false )
244
233
})
@@ -316,9 +305,6 @@ func (pp *PartitionProcessor) Stop() error {
316
305
pp .cancelRunnerGroup ()
317
306
}
318
307
319
- // stop the stats updating/serving loop
320
- pp .cancelStatsLoop ()
321
-
322
308
// wait for the runner to be done
323
309
runningErrs := multierror .Append (pp .runnerGroup .Wait ().ErrorOrNil ())
324
310
@@ -413,6 +399,9 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) {
413
399
}
414
400
}()
415
401
402
+ updateHwmStatsTicker := time .NewTicker (statsHwmUpdateInterval )
403
+ defer updateHwmStatsTicker .Stop ()
404
+
416
405
for {
417
406
select {
418
407
case ev , isOpen := <- pp .input :
@@ -425,7 +414,15 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) {
425
414
return newErrProcessing (pp .partition , err )
426
415
}
427
416
428
- pp .enqueueStatsUpdate (ctx , func () { pp .updateStatsWithMessage (ev ) })
417
+ pp .updateStats (func (stats * PartitionProcStats ) {
418
+ ip := stats .Input [ev .topic ]
419
+ ip .Bytes += len (ev .value )
420
+ ip .LastOffset = ev .offset
421
+ if ! ev .timestamp .IsZero () {
422
+ ip .Delay = time .Since (ev .timestamp )
423
+ }
424
+ ip .Count ++
425
+ })
429
426
case <- ctx .Done ():
430
427
pp .log .Debugf ("exiting, context is cancelled" )
431
428
return
@@ -440,75 +437,42 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) {
440
437
case <- asyncErrs :
441
438
pp .log .Debugf ("Errors occurred asynchronously. Will exit partition processor" )
442
439
return
443
- }
444
- }
445
- }
446
-
447
- func (pp * PartitionProcessor ) enqueueStatsUpdate (ctx context.Context , updater func ()) {
448
- select {
449
- case pp .updateStats <- updater :
450
- case <- ctx .Done ():
451
- default :
452
- // going to default indicates the updateStats channel is not read, so so the stats
453
- // loop is not actually running.
454
- // We must not block here, so we'll skip the update
455
- }
456
- }
457
-
458
- func (pp * PartitionProcessor ) runStatsLoop (ctx context.Context ) {
459
- updateHwmStatsTicker := time .NewTicker (statsHwmUpdateInterval )
460
- defer updateHwmStatsTicker .Stop ()
461
- for {
462
- select {
463
- case <- pp .requestStats :
464
- stats := pp .collectStats (ctx )
465
- select {
466
- case pp .responseStats <- stats :
467
- case <- ctx .Done ():
468
- pp .log .Debugf ("exiting, context is cancelled" )
469
- return
470
- }
471
- case update := <- pp .updateStats :
472
- update ()
473
440
case <- updateHwmStatsTicker .C :
474
- pp .updateHwmStats ()
475
- case <- ctx .Done ():
476
- return
441
+ pp .updateStats (pp .updateHwmStats )
477
442
}
478
443
}
479
444
}
480
445
481
- // updateStatsWithMessage updates the stats with a received message
482
- func (pp * PartitionProcessor ) updateStatsWithMessage (ev * message ) {
483
- ip := pp .stats .Input [ev .topic ]
484
- ip .Bytes += len (ev .value )
485
- ip .LastOffset = ev .offset
486
- if ! ev .timestamp .IsZero () {
487
- ip .Delay = time .Since (ev .timestamp )
488
- }
489
- ip .Count ++
446
+ func (pp * PartitionProcessor ) updateStats (updater func (stats * PartitionProcStats )) {
447
+ pp .mStats .Lock ()
448
+ defer pp .mStats .Unlock ()
449
+ updater (pp .stats )
490
450
}
491
451
492
452
// updateHwmStats updates the offset lag for all input topics based on the
493
453
// highwatermarks obtained by the consumer.
494
- func (pp * PartitionProcessor ) updateHwmStats () {
454
+ func (pp * PartitionProcessor ) updateHwmStats (stats * PartitionProcStats ) {
495
455
hwms := pp .consumer .HighWaterMarks ()
496
- for input , inputStats := range pp . stats .Input {
456
+ for input , inputStats := range stats .Input {
497
457
hwm := hwms [input ][pp.partition ]
498
458
if hwm != 0 && inputStats .LastOffset != 0 {
499
459
inputStats .OffsetLag = hwm - inputStats .LastOffset
500
460
}
501
461
}
502
462
}
503
463
504
- func (pp * PartitionProcessor ) collectStats (ctx context.Context ) * PartitionProcStats {
505
- var (
506
- stats = pp .stats .clone ()
507
- m sync.Mutex
508
- )
464
+ func (pp * PartitionProcessor ) fetchStats (ctx context.Context ) * PartitionProcStats {
465
+ pp .mStats .RLock ()
466
+ stats := pp .stats .clone ()
467
+ pp .mStats .RUnlock ()
468
+
469
+ // mutex for the local stats-clone so the
470
+ // error group below doesn't get a concurrent-map-access error
471
+ var m sync.Mutex
509
472
510
473
errg , ctx := multierr .NewErrGroup (ctx )
511
474
475
+ // fetch join table stats
512
476
for topic , join := range pp .joins {
513
477
topic , join := topic , join
514
478
errg .Go (func () error {
@@ -523,6 +487,7 @@ func (pp *PartitionProcessor) collectStats(ctx context.Context) *PartitionProcSt
523
487
})
524
488
}
525
489
490
+ // if we have processor state, get those stats
526
491
if pp .table != nil {
527
492
errg .Go (func () error {
528
493
stats .TableStats = pp .table .fetchStats (ctx )
@@ -541,31 +506,9 @@ func (pp *PartitionProcessor) collectStats(ctx context.Context) *PartitionProcSt
541
506
return stats
542
507
}
543
508
544
- func (pp * PartitionProcessor ) fetchStats (ctx context.Context ) * PartitionProcStats {
545
- select {
546
- case <- ctx .Done ():
547
- return nil
548
- case <- time .After (fetchStatsTimeout ):
549
- pp .log .Printf ("requesting stats timed out" )
550
- return nil
551
- case pp .requestStats <- true :
552
- }
553
-
554
- // retrieve from response-channel
555
- select {
556
- case <- ctx .Done ():
557
- return nil
558
- case <- time .After (fetchStatsTimeout ):
559
- pp .log .Printf ("Fetching stats timed out" )
560
- return nil
561
- case stats := <- pp .responseStats :
562
- return stats
563
- }
564
- }
565
-
566
509
func (pp * PartitionProcessor ) enqueueTrackOutputStats (ctx context.Context , topic string , size int ) {
567
- pp .enqueueStatsUpdate ( ctx , func () {
568
- pp . stats .trackOutput (topic , size )
510
+ pp .updateStats ( func (stats * PartitionProcStats ) {
511
+ stats .trackOutput (topic , size )
569
512
})
570
513
}
571
514
0 commit comments