-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Add retention hours to discarded
metrics
#15875
Conversation
87db09e
to
f269d13
Compare
discarded
metricsdiscarded
metrics
f269d13
to
62b8cfc
Compare
if err != nil { | ||
return i.onStreamCreationError(ctx, pushReqStream, err, labels) | ||
return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This err handler looks a bit off-place. What about getting the retentionHours after getting the labels?
labels, err := syntax.ParseLabels(pushReqStream.Labels)
if err != nil {
...
}
retentionHours := i.limiter.limits.RetentionHours(i.instanceID, labels)
if record != nil {
err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID)
return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call, fixed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite my suggestion but it's fine since it's a nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure? I'm doing it immediately after parsing the labels.
pkg/validation/limits.go
Outdated
@@ -959,6 +970,27 @@ func (o *Overrides) StreamRetention(userID string) []StreamRetention { | |||
return o.getOverridesForUser(userID).StreamRetention | |||
} | |||
|
|||
// RetentionHours returns the retention period for a given user. | |||
func (o *Overrides) RetentionHours(userID string, ls labels.Labels) string { | |||
streamRetentions := o.getOverridesForUser(userID).StreamRetention |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rather call the public StreamRetention() than the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/validation/limits.go
Outdated
@@ -959,6 +970,27 @@ func (o *Overrides) StreamRetention(userID string) []StreamRetention { | |||
return o.getOverridesForUser(userID).StreamRetention | |||
} | |||
|
|||
// RetentionHours returns the retention period for a given user. | |||
func (o *Overrides) RetentionHours(userID string, ls labels.Labels) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use this one instead which is the one the compactor actually uses
loki/pkg/compactor/retention/expiration.go
Line 134 in 4c88be0
func (tr *TenantsRetention) RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration { |
pkg/distributor/distributor.go
Outdated
if err != nil { | ||
d.writeFailuresManager.Log(tenantID, err) | ||
validationErrors.Add(err) | ||
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries))) | ||
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetentionHours).Add(float64(len(stream.Entries))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may be counting these twice since we already update the discarded metrics inside parseStreamLabels
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, you are right @salvacorts
…na/loki into retention-as-label-when-discarding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Left some more nits. I'm gonna ask Vlad to review it as well since he spotted the issue first.
pkg/distributor/distributor.go
Outdated
@@ -493,7 +492,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
} | |||
} | |||
|
|||
tenantRetentionHours := d.validator.Limits.RetentionHours(tenantID, nil) | |||
tenantRetentionHours := util.RetentionHours(d.tenantsRetention.RetentionPeriodFor(tenantID, nil)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of calling util.RetentionHours
what about having tenantsRetention.RetentionHoursFor() string
that calls util.RetentionHours
for the result of tenantsRetention.RetentionPeriodFor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/distributor/distributor.go
Outdated
@@ -524,7 +524,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
continue | |||
} | |||
|
|||
retentionHours := d.validator.Limits.RetentionHours(tenantID, lbs) | |||
retentionHours := util.RetentionHours(d.tenantsRetention.RetentionPeriodFor(tenantID, lbs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit for a followup PR since we were resolving the retention for the streams before the changes. I think we could be able to speed up pushes by caching this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think optimizing that would do more harm than good (based on the benchmarks). Doing the new retention evaluation on pushes didn't make a difference but adding the cache will likely do, either by the extra memory usage or by the extra complexity of adding of caching more stuff.
@@ -538,7 +541,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
prevTs := stream.Entries[0].Timestamp | |||
|
|||
for _, entry := range stream.Entries { | |||
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil { | |||
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry, retentionHours); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe validation context should have the validationMetrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, wdyt?
if err != nil { | ||
return i.onStreamCreationError(ctx, pushReqStream, err, labels) | ||
return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite my suggestion but it's fine since it's a nit
pkg/ingester/limiter.go
Outdated
@@ -33,6 +33,10 @@ type Limits interface { | |||
PerStreamRateLimit(userID string) validation.RateLimit | |||
ShardStreams(userID string) shardstreams.Config | |||
IngestionPartitionsTenantShardSize(userID string) int | |||
RetentionPeriod(userID string) time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add retention.Limits
here instead
type Limits interface {
...
ShardStreams(userID string) shardstreams.Config
IngestionPartitionsTenantShardSize(userID string) int
retention.Limits
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hah good catch 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. however, there are some comments on distributor's side.
pkg/distributor/distributor.go
Outdated
@@ -494,6 +492,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
} | |||
} | |||
|
|||
tenantRetentionHours := d.tenantsRetention.RetentionHoursFor(tenantID, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that in this case, we use the global tenant's retention period, rather than the retention period for the particular stream.
If so, we can not do it because the metric that shows ingested bytes count always use the retention period from the stream, and we need to follow it exactly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check but, we're only using tenantRetentionHours
when we fail to parse the stream labels, how can the ingested_bytes know the stream retention period if it couldn't parse the stream labels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it completely fails parsing the labels, it returns an error :
Line 245 in ebc84ca
if err != nil { |
but if the labels are parsed, then the retention hours is calculated based on what we got
Line 282 in ebc84ca
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) |
even if the length of the labels breaks the validation....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey I had talked with Vlad about this one, I pushed a commit addressing it.
pkg/distributor/distributor.go
Outdated
if err != nil { | ||
d.writeFailuresManager.Log(tenantID, err) | ||
validationErrors.Add(err) | ||
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries))) | ||
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetentionHours).Add(float64(len(stream.Entries))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, you are right @salvacorts
if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) { | ||
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited) | ||
if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.lineSize) { | ||
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would ask @trevorwhitney if it's necessary to pass retention_hours
to usageTacker for the discarded bytes metric. because they also expose this label in ingested bytes metric and might have a similar issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
huge thanks for addressing all the comments 👍
What this PR does / why we need it:
We have no distinction between the type of data that gets discarded. On this PR I'm adding
retention_hours
to our discarded metrics so that we can better look at what data was discarded.I'm also using a more performant form of retention strigifying when loading the retention string. Benchmark results:
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR