Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#3513: Mark muted alerts #3793

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func run() int {
inhibitor,
silencer,
intervener,
marker,
notificationLog,
pipelinePeer,
)
Expand Down
45 changes: 26 additions & 19 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Dispatcher struct {
route *Route
alerts provider.Alerts
stage notify.Stage
marker types.GroupMarker
metrics *DispatcherMetrics
limits Limits

Expand Down Expand Up @@ -107,7 +108,7 @@ func NewDispatcher(
ap provider.Alerts,
r *Route,
s notify.Stage,
mk types.AlertMarker,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mk was never used, here I am replacing it with types.GroupMarker.

mk types.GroupMarker,
to func(time.Duration) time.Duration,
lim Limits,
l log.Logger,
Expand All @@ -121,6 +122,7 @@ func NewDispatcher(
alerts: ap,
stage: s,
route: r,
marker: mk,
timeout: to,
logger: log.With(l, "component", "dispatcher"),
metrics: m,
Expand All @@ -145,8 +147,8 @@ func (d *Dispatcher) Run() {
}

func (d *Dispatcher) run(it provider.AlertIterator) {
cleanup := time.NewTicker(30 * time.Second)
defer cleanup.Stop()
maintenance := time.NewTicker(30 * time.Second)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this as all other "cleanup" functions in Alertmanager are called maintenance.

defer maintenance.Stop()

defer it.Close()

Expand Down Expand Up @@ -175,28 +177,30 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())

case <-cleanup.C:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this logic to a method called doMaintenance. The main purpose of this was being able to test it.

d.mtx.Lock()

for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
if ag.empty() {
ag.stop()
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
}
}
}

d.mtx.Unlock()

case <-maintenance.C:
d.doMaintenance()
case <-d.ctx.Done():
return
}
}
}

func (d *Dispatcher) doMaintenance() {
d.mtx.Lock()
grobinson-grafana marked this conversation as resolved.
Show resolved Hide resolved
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
if ag.empty() {
ag.stop()
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We delete the marker for the aggregation group when the aggregation group is deleted itself.

delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
}
}
}
d.mtx.Unlock()
}

// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
Alerts types.AlertSlice
Expand Down Expand Up @@ -374,6 +378,7 @@ type aggrGroup struct {
labels model.LabelSet
opts *RouteOpts
logger log.Logger
routeID string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We needed to add routeID to aggrGroup because it's used in d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey()).

routeKey string

alerts *store.Alerts
Expand All @@ -394,6 +399,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
}
ag := &aggrGroup{
labels: labels,
routeID: r.ID(),
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
Expand Down Expand Up @@ -447,6 +453,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to add it to the context so it can be extracted in TimeMuteStage and TimeActiveStage.


// Wait the configured interval before calling flush again.
ag.mtx.Lock()
Expand Down
45 changes: 45 additions & 0 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,3 +691,48 @@ type limits struct {
func (l limits) MaxNumberOfAggregationGroups() int {
return l.groups
}

func TestDispatcher_DoMaintenance(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am testing that the marker is deleted when the aggregation group is garbage collected.

r := prometheus.NewRegistry()
marker := types.NewMarker(r)

alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, log.NewNopLogger(), nil)
if err != nil {
t.Fatal(err)
}

route := &Route{
RouteOpts: RouteOpts{
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: 5 * time.Minute, // Should never hit in this test.
},
}
timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}

ctx := context.Background()
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, log.NewNopLogger(), NewDispatcherMetrics(false, r))
aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup)
aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup)

// Insert an aggregation group with no alerts.
labels := model.LabelSet{"alertname": "1"}
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, log.NewNopLogger())
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
dispatcher.aggrGroupsPerRoute = aggrGroups
// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true })

// Insert a marker for the aggregation group's group key.
marker.SetMuted(route.ID(), aggrGroup1.GroupKey(), []string{"weekends"})
mutedBy, isMuted := marker.Muted(route.ID(), aggrGroup1.GroupKey())
require.True(t, isMuted)
require.Equal(t, []string{"weekends"}, mutedBy)

// Run the maintenance and the marker should be removed.
dispatcher.doMaintenance()
mutedBy, isMuted = marker.Muted(route.ID(), aggrGroup1.GroupKey())
require.False(t, isMuted)
require.Empty(t, mutedBy)
}
64 changes: 55 additions & 9 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const (
keyNow
keyMuteTimeIntervals
keyActiveTimeIntervals
keyRouteID
)

// WithReceiverName populates a context with a receiver name.
Expand Down Expand Up @@ -165,6 +166,10 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context {
return context.WithValue(ctx, keyActiveTimeIntervals, at)
}

func WithRouteID(ctx context.Context, routeID string) context.Context {
return context.WithValue(ctx, keyRouteID, routeID)
}

// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) {
Expand Down Expand Up @@ -228,6 +233,13 @@ func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) {
return v, ok
}

// RouteID extracts a RouteID from the context. Iff none exists, the
// // second argument is false.
Comment on lines +236 to +237
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// RouteID extracts a RouteID from the context. Iff none exists, the
// // second argument is false.
// RouteID extracts a RouteID from the context. If none exists, the
// second argument is false.

The other comments have the same typo of iff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh iff is not a spelling mistake, it refers to if and only if.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today I learned 🤯

func RouteID(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyRouteID).(string)
return v, ok
}

// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
Expand Down Expand Up @@ -384,15 +396,16 @@ func (pb *PipelineBuilder) New(
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
intervener *timeinterval.Intervener,
marker types.GroupMarker,
notificationLog NotificationLog,
peer Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))

ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor, pb.metrics)
tas := NewTimeActiveStage(intervener, pb.metrics)
tms := NewTimeMuteStage(intervener, pb.metrics)
tas := NewTimeActiveStage(intervener, marker, pb.metrics)
tms := NewTimeMuteStage(intervener, marker, pb.metrics)
ss := NewMuteStage(silencer, pb.metrics)

for name := range receivers {
Expand Down Expand Up @@ -923,18 +936,29 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ

type timeStage struct {
muter types.TimeMuter
marker types.GroupMarker
metrics *Metrics
}

type TimeMuteStage timeStage

func NewTimeMuteStage(m types.TimeMuter, metrics *Metrics) *TimeMuteStage {
return &TimeMuteStage{m, metrics}
func NewTimeMuteStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeMuteStage {
return &TimeMuteStage{muter, marker, metrics}
}

// Exec implements the stage interface for TimeMuteStage.
// TimeMuteStage is responsible for muting alerts whose route is not in an active time.
func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
routeID, ok := RouteID(ctx)
if !ok {
return ctx, nil, errors.New("route ID missing")
}

gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx)
if !ok {
return ctx, alerts, nil
Expand All @@ -949,29 +973,42 @@ func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*type
return ctx, alerts, nil
}

muted, _, err := tms.muter.Mutes(muteTimeIntervalNames, now)
muted, mutedBy, err := tms.muter.Mutes(muteTimeIntervalNames, now)
if err != nil {
return ctx, alerts, err
}
// If muted is false then mutedBy is nil and the muted marker is removed.
tms.marker.SetMuted(routeID, gkey, mutedBy)

// If the current time is inside a mute time, all alerts are removed from the pipeline.
if muted {
tms.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonMuteTimeInterval).Add(float64(len(alerts)))
level.Debug(l).Log("msg", "Notifications not sent, route is within mute time", "alerts", len(alerts))
return ctx, nil, nil
}

return ctx, alerts, nil
}

type TimeActiveStage timeStage

func NewTimeActiveStage(m types.TimeMuter, metrics *Metrics) *TimeActiveStage {
return &TimeActiveStage{m, metrics}
func NewTimeActiveStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeActiveStage {
return &TimeActiveStage{muter, marker, metrics}
}

// Exec implements the stage interface for TimeActiveStage.
// TimeActiveStage is responsible for muting alerts whose route is not in an active time.
func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
routeID, ok := RouteID(ctx)
if !ok {
return ctx, nil, errors.New("route ID missing")
}

gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx)
if !ok {
return ctx, alerts, nil
Expand All @@ -987,13 +1024,22 @@ func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*ty
return ctx, alerts, errors.New("missing now timestamp")
}

muted, _, err := tas.muter.Mutes(activeTimeIntervalNames, now)
active, _, err := tas.muter.Mutes(activeTimeIntervalNames, now)
if err != nil {
return ctx, alerts, err
}

var mutedBy []string
if !active {
// If the group is muted, then it must be muted by all active time intervals.
// Otherwise, the group must be in at least one active time interval for it
// to be active.
mutedBy = activeTimeIntervalNames
}
tas.marker.SetMuted(routeID, gkey, mutedBy)

// If the current time is not inside an active time, all alerts are removed from the pipeline
if !muted {
if !active {
tas.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonActiveTimeInterval).Add(float64(len(alerts)))
level.Debug(l).Log("msg", "Notifications not sent, route is not within active time", "alerts", len(alerts))
return ctx, nil, nil
Expand Down
Loading
Loading