Skip to content

Commit 3eb0145

Browse files
authored
balancer/weightedroundrobin: Add emissions of metrics through metrics registry (#7439)
1 parent bc03420 commit 3eb0145

File tree

3 files changed

+114
-20
lines changed

3 files changed

+114
-20
lines changed

balancer/weightedroundrobin/balancer.go

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import (
3232
"google.golang.org/grpc/balancer"
3333
"google.golang.org/grpc/balancer/base"
3434
"google.golang.org/grpc/balancer/weightedroundrobin/internal"
35+
"google.golang.org/grpc/balancer/weightedtarget"
3536
"google.golang.org/grpc/connectivity"
37+
estats "google.golang.org/grpc/experimental/stats"
3638
"google.golang.org/grpc/internal/grpclog"
3739
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
3840
"google.golang.org/grpc/orca"
@@ -45,6 +47,43 @@ import (
4547
// Name is the name of the weighted round robin balancer.
4648
const Name = "weighted_round_robin"
4749

50+
var (
51+
rrFallbackMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
52+
Name: "grpc.lb.wrr.rr_fallback",
53+
Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.",
54+
Unit: "update",
55+
Labels: []string{"grpc.target"},
56+
OptionalLabels: []string{"grpc.lb.locality"},
57+
Default: false,
58+
})
59+
60+
endpointWeightNotYetUsableMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
61+
Name: "grpc.lb.wrr.endpoint_weight_not_yet_usable",
62+
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).",
63+
Unit: "endpoint",
64+
Labels: []string{"grpc.target"},
65+
OptionalLabels: []string{"grpc.lb.locality"},
66+
Default: false,
67+
})
68+
69+
endpointWeightStaleMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
70+
Name: "grpc.lb.wrr.endpoint_weight_stale",
71+
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.",
72+
Unit: "endpoint",
73+
Labels: []string{"grpc.target"},
74+
OptionalLabels: []string{"grpc.lb.locality"},
75+
Default: false,
76+
})
77+
endpointWeightsMetric = estats.RegisterFloat64Histo(estats.MetricDescriptor{
78+
Name: "grpc.lb.wrr.endpoint_weights",
79+
Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.",
80+
Unit: "endpoint",
81+
Labels: []string{"grpc.target"},
82+
OptionalLabels: []string{"grpc.lb.locality"},
83+
Default: false,
84+
})
85+
)
86+
4887
func init() {
4988
balancer.Register(bb{})
5089
}
@@ -58,7 +97,10 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
5897
csEvltr: &balancer.ConnectivityStateEvaluator{},
5998
scMap: make(map[balancer.SubConn]*weightedSubConn),
6099
connectivityState: connectivity.Connecting,
100+
target: bOpts.Target.String(),
101+
metricsRecorder: bOpts.MetricsRecorder,
61102
}
103+
62104
b.logger = prefixLogger(b)
63105
b.logger.Infof("Created")
64106
return b
@@ -101,8 +143,11 @@ func (bb) Name() string {
101143

102144
// wrrBalancer implements the weighted round robin LB policy.
103145
type wrrBalancer struct {
104-
cc balancer.ClientConn
105-
logger *grpclog.PrefixLogger
146+
// The following fields are immutable.
147+
cc balancer.ClientConn
148+
logger *grpclog.PrefixLogger
149+
target string
150+
metricsRecorder estats.MetricsRecorder
106151

107152
// The following fields are only accessed on calls into the LB policy, and
108153
// do not need a mutex.
@@ -114,6 +159,7 @@ type wrrBalancer struct {
114159
resolverErr error // the last error reported by the resolver; cleared on successful resolution
115160
connErr error // the last connection error; cleared upon leaving TransientFailure
116161
stopPicker func()
162+
locality string
117163
}
118164

119165
func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
@@ -125,6 +171,7 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
125171
}
126172

127173
b.cfg = cfg
174+
b.locality = weightedtarget.LocalityFromResolverState(ccs.ResolverState)
128175
b.updateAddresses(ccs.ResolverState.Addresses)
129176

130177
if len(ccs.ResolverState.Addresses) == 0 {
@@ -171,6 +218,10 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
171218
// Initially, we set load reports to off, because they are not
172219
// running upon initial weightedSubConn creation.
173220
cfg: &lbConfig{EnableOOBLoadReport: false},
221+
222+
metricsRecorder: b.metricsRecorder,
223+
target: b.target,
224+
locality: b.locality,
174225
}
175226
b.subConns.Set(addr, wsc)
176227
b.scMap[sc] = wsc
@@ -318,9 +369,12 @@ func (b *wrrBalancer) regeneratePicker() {
318369
}
319370

320371
p := &picker{
321-
v: rand.Uint32(), // start the scheduler at a random point
322-
cfg: b.cfg,
323-
subConns: b.readySubConns(),
372+
v: rand.Uint32(), // start the scheduler at a random point
373+
cfg: b.cfg,
374+
subConns: b.readySubConns(),
375+
metricsRecorder: b.metricsRecorder,
376+
locality: b.locality,
377+
target: b.target,
324378
}
325379
var ctx context.Context
326380
ctx, b.stopPicker = context.WithCancel(context.Background())
@@ -339,16 +393,20 @@ type picker struct {
339393
v uint32 // incrementing value used by the scheduler; accessed atomically
340394
cfg *lbConfig // active config when picker created
341395
subConns []*weightedSubConn // all READY subconns
396+
397+
// The following fields are immutable.
398+
target string
399+
locality string
400+
metricsRecorder estats.MetricsRecorder
342401
}
343402

344-
// scWeights returns a slice containing the weights from p.subConns in the same
345-
// order as p.subConns.
346-
func (p *picker) scWeights() []float64 {
403+
func (p *picker) scWeights(recordMetrics bool) []float64 {
347404
ws := make([]float64, len(p.subConns))
348405
now := internal.TimeNow()
349406
for i, wsc := range p.subConns {
350-
ws[i] = wsc.weight(now, time.Duration(p.cfg.WeightExpirationPeriod), time.Duration(p.cfg.BlackoutPeriod))
407+
ws[i] = wsc.weight(now, time.Duration(p.cfg.WeightExpirationPeriod), time.Duration(p.cfg.BlackoutPeriod), recordMetrics)
351408
}
409+
352410
return ws
353411
}
354412

@@ -357,7 +415,7 @@ func (p *picker) inc() uint32 {
357415
}
358416

359417
func (p *picker) regenerateScheduler() {
360-
s := newScheduler(p.scWeights(), p.inc)
418+
s := p.newScheduler()
361419
atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s))
362420
}
363421

@@ -367,6 +425,7 @@ func (p *picker) start(ctx context.Context) {
367425
// No need to regenerate weights with only one backend.
368426
return
369427
}
428+
370429
go func() {
371430
ticker := time.NewTicker(time.Duration(p.cfg.WeightUpdatePeriod))
372431
defer ticker.Stop()
@@ -404,8 +463,12 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
404463
// When needed, it also tracks connectivity state, listens for metrics updates
405464
// by implementing the orca.OOBListener interface and manages that listener.
406465
type weightedSubConn struct {
466+
// The following fields are immutable.
407467
balancer.SubConn
408-
logger *grpclog.PrefixLogger
468+
logger *grpclog.PrefixLogger
469+
target string
470+
metricsRecorder estats.MetricsRecorder
471+
locality string
409472

410473
// The following fields are only accessed on calls into the LB policy, and
411474
// do not need a mutex.
@@ -527,21 +590,37 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
527590

528591
// weight returns the current effective weight of the subconn, taking into
529592
// account the parameters. Returns 0 for blacked out or expired data, which
530-
// will cause the backend weight to be treated as the mean of the weights of
531-
// the other backends.
532-
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration) float64 {
593+
// will cause the backend weight to be treated as the mean of the weights of the
594+
// other backends. If forScheduler is set to true, this function will emit
595+
// metrics through the mtrics registry.
596+
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration, recordMetrics bool) (weight float64) {
533597
w.mu.Lock()
534598
defer w.mu.Unlock()
599+
600+
if recordMetrics {
601+
defer func() {
602+
endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality)
603+
}()
604+
}
605+
535606
// If the most recent update was longer ago than the expiration period,
536607
// reset nonEmptySince so that we apply the blackout period again if we
537608
// start getting data again in the future, and return 0.
538609
if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
610+
if recordMetrics {
611+
endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
612+
}
539613
w.nonEmptySince = time.Time{}
540614
return 0
541615
}
616+
542617
// If we don't have at least blackoutPeriod worth of data, return 0.
543618
if blackoutPeriod != 0 && (w.nonEmptySince == (time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
619+
if recordMetrics {
620+
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
621+
}
544622
return 0
545623
}
624+
546625
return w.weightVal
547626
}

balancer/weightedroundrobin/scheduler.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ type scheduler interface {
3131
// len(scWeights)-1 are zero or there is only a single subconn, otherwise it
3232
// will return an Earliest Deadline First (EDF) scheduler implementation that
3333
// selects the subchannels according to their weights.
34-
func newScheduler(scWeights []float64, inc func() uint32) scheduler {
34+
func (p *picker) newScheduler() scheduler {
35+
scWeights := p.scWeights(true)
3536
n := len(scWeights)
3637
if n == 0 {
3738
return nil
3839
}
3940
if n == 1 {
40-
return &rrScheduler{numSCs: 1, inc: inc}
41+
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
42+
return &rrScheduler{numSCs: 1, inc: p.inc}
4143
}
4244
sum := float64(0)
4345
numZero := 0
@@ -51,8 +53,10 @@ func newScheduler(scWeights []float64, inc func() uint32) scheduler {
5153
numZero++
5254
}
5355
}
56+
5457
if numZero >= n-1 {
55-
return &rrScheduler{numSCs: uint32(n), inc: inc}
58+
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
59+
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
5660
}
5761
unscaledMean := sum / float64(n-numZero)
5862
scalingFactor := maxWeight / max
@@ -74,11 +78,11 @@ func newScheduler(scWeights []float64, inc func() uint32) scheduler {
7478
}
7579

7680
if allEqual {
77-
return &rrScheduler{numSCs: uint32(n), inc: inc}
81+
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
7882
}
7983

8084
logger.Infof("using edf scheduler with weights: %v", weights)
81-
return &edfScheduler{weights: weights, inc: inc}
85+
return &edfScheduler{weights: weights, inc: p.inc}
8286
}
8387

8488
const maxWeight = math.MaxUint16

balancer/weightedtarget/weightedtarget.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@ type weightedTargetBalancer struct {
8484
targets map[string]Target
8585
}
8686

87+
type localityKeyType string
88+
89+
const localityKey = localityKeyType("locality")
90+
91+
// LocalityFromResolverState returns the locality from the resolver.State
92+
// provided, or an empty string if not present.
93+
func LocalityFromResolverState(state resolver.State) string {
94+
locality, _ := state.Attributes.Value(localityKey).(string)
95+
return locality
96+
}
97+
8798
// UpdateClientConnState takes the new targets in balancer group,
8899
// creates/deletes sub-balancers and sends them update. addresses are split into
89100
// groups based on hierarchy path.
@@ -142,7 +153,7 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
142153
ResolverState: resolver.State{
143154
Addresses: addressesSplit[name],
144155
ServiceConfig: s.ResolverState.ServiceConfig,
145-
Attributes: s.ResolverState.Attributes,
156+
Attributes: s.ResolverState.Attributes.WithValue(localityKey, name),
146157
},
147158
BalancerConfig: newT.ChildPolicy.Config,
148159
})

0 commit comments

Comments
 (0)