Skip to content

Commit 1b95419

Browse files
perf(ingester): refactor lock acquisitions related to not_owned series limit functionality (#15839) (#15885)
Co-authored-by: Owen Diehl <[email protected]>
1 parent eab8559 commit 1b95419

File tree

5 files changed

+22
-20
lines changed

5 files changed

+22
-20
lines changed

pkg/ingester/instance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1183,7 +1183,7 @@ func (i *instance) updateOwnedStreams(isOwnedStream func(*stream) (bool, error))
11831183
}()
11841184

11851185
var err error
1186-
i.streams.WithLock(func() {
1186+
i.streams.WithRLock(func() {
11871187
i.ownedStreamsSvc.resetStreamCounts()
11881188
err = i.streams.ForEach(func(s *stream) (bool, error) {
11891189
ownedStream, err := isOwnedStream(s)

pkg/ingester/limiter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {
130130

131131
ownedStreamSvc := &ownedStreamService{
132132
fixedLimit: atomic.NewInt32(testData.fixedLimit),
133-
ownedStreamCount: testData.ownedStreamCount,
133+
ownedStreamCount: atomic.NewInt64(int64(testData.ownedStreamCount)),
134134
}
135135
strategy := &fixedStrategy{localLimit: testData.calculatedLocalLimit}
136136
limiter := NewLimiter(limits, NilMetrics, strategy, &TenantBasedStrategy{limits: limits})

pkg/ingester/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,8 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
318318
Namespace: constants.Loki,
319319
Name: "ingester_streams_ownership_check_duration_ms",
320320
Help: "Distribution of streams ownership check durations in milliseconds.",
321-
// 100ms to 5s.
322-
Buckets: []float64{100, 250, 350, 500, 750, 1000, 1500, 2000, 5000},
321+
// 1ms -> 16s
322+
Buckets: prometheus.ExponentialBuckets(1, 4, 8),
323323
}),
324324

325325
duplicateLogBytesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{

pkg/ingester/owned_streams.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,26 @@ type ownedStreamService struct {
2121
tenantID string
2222
limiter *Limiter
2323
fixedLimit *atomic.Int32
24-
ownedStreamCount int
24+
ownedStreamCount *atomic.Int64
2525
lock sync.RWMutex
2626
notOwnedStreams map[model.Fingerprint]any
2727
}
2828

2929
func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
3030
svc := &ownedStreamService{
31-
tenantID: tenantID,
32-
limiter: limiter,
33-
fixedLimit: atomic.NewInt32(0),
34-
notOwnedStreams: make(map[model.Fingerprint]any),
31+
tenantID: tenantID,
32+
limiter: limiter,
33+
fixedLimit: atomic.NewInt32(0),
34+
ownedStreamCount: atomic.NewInt64(0),
35+
notOwnedStreams: make(map[model.Fingerprint]any),
3536
}
3637

3738
svc.updateFixedLimit()
3839
return svc
3940
}
4041

4142
func (s *ownedStreamService) getOwnedStreamCount() int {
42-
s.lock.RLock()
43-
defer s.lock.RUnlock()
44-
return s.ownedStreamCount
43+
return int(s.ownedStreamCount.Load())
4544
}
4645

4746
func (s *ownedStreamService) updateFixedLimit() (old, new int32) {
@@ -55,12 +54,15 @@ func (s *ownedStreamService) getFixedLimit() int {
5554
}
5655

5756
func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) {
58-
s.lock.Lock()
59-
defer s.lock.Unlock()
57+
// only need to inc the owned count; can use sync atomics.
6058
if owned {
61-
s.ownedStreamCount++
59+
s.ownedStreamCount.Inc()
6260
return
6361
}
62+
63+
// need to update map; lock required
64+
s.lock.Lock()
65+
defer s.lock.Unlock()
6466
notOwnedStreamsMetric.Inc()
6567
s.notOwnedStreams[fp] = nil
6668
}
@@ -74,13 +76,13 @@ func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) {
7476
delete(s.notOwnedStreams, fp)
7577
return
7678
}
77-
s.ownedStreamCount--
79+
s.ownedStreamCount.Dec()
7880
}
7981

8082
func (s *ownedStreamService) resetStreamCounts() {
8183
s.lock.Lock()
8284
defer s.lock.Unlock()
83-
s.ownedStreamCount = 0
85+
s.ownedStreamCount.Store(0)
8486
notOwnedStreamsMetric.Sub(float64(len(s.notOwnedStreams)))
8587
s.notOwnedStreams = make(map[model.Fingerprint]any)
8688
}

pkg/ingester/recalculate_owned_streams_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func Test_recalculateOwnedStreams_newRecalculateOwnedStreamsIngester(t *testing.
3737
func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) {
3838
tests := map[string]struct {
3939
featureEnabled bool
40-
expectedOwnedStreamCount int
40+
expectedOwnedStreamCount int64
4141
expectedNotOwnedStreamCount int
4242
}{
4343
"expected streams ownership to be recalculated": {
@@ -101,7 +101,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
101101
mockRing.addMapping(createStream(t, tenant, 100), true)
102102
mockRing.addMapping(createStream(t, tenant, 250), true)
103103

104-
require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount)
104+
require.Equal(t, int64(7), tenant.ownedStreamsSvc.ownedStreamCount.Load())
105105
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0)
106106

107107
mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}}
@@ -116,7 +116,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
116116
if testData.featureEnabled {
117117
require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation")
118118
}
119-
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount)
119+
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount.Load())
120120
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount)
121121
})
122122
}

0 commit comments

Comments
 (0)