Skip to content

Commit

Permalink
HA/Heartbeat: Use last message's timestamp
Browse files Browse the repository at this point in the history
Since the retryable HA function may be executed a few times before
succeeding, the inserted heartbeat value will be directly outdated. The
heartbeat logic was slightly altered to always use the latest heartbeat
time value.
  • Loading branch information
oxzi committed Oct 25, 2024
1 parent dd0ca8f commit c2d8bd6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
8 changes: 5 additions & 3 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (h *HA) controller() {

// Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time.
realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime())
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time")
Expand Down Expand Up @@ -268,10 +268,12 @@ func (h *HA) controller() {
// realize a HA cycle triggered by a heartbeat event.
//
// shouldLogRoutineEvents indicates if recurrent events should be logged.
//
// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one
// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens.
func (h *HA) realize(
ctx context.Context,
s *icingaredisv1.IcingaStatus,
t *types.UnixMilli,
envId types.Binary,
shouldLogRoutineEvents bool,
) error {
Expand Down Expand Up @@ -354,7 +356,7 @@ func (h *HA) realize(
EnvironmentMeta: v1.EnvironmentMeta{
EnvironmentId: envId,
},
Heartbeat: *t,
Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessageTime())),
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Expand Down
16 changes: 16 additions & 0 deletions pkg/icingaredis/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Heartbeat struct {
active bool
events chan *HeartbeatMessage
lastReceivedMs atomic.Int64
lastMessageMs atomic.Int64
cancelCtx context.CancelFunc
client *redis.Client
done chan struct{}
Expand Down Expand Up @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 {
return h.lastReceivedMs.Load()
}

// LastMessageTime returns the last message's time in ms.
func (h *Heartbeat) LastMessageTime() int64 {
return h.lastMessageMs.Load()
}

// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
Expand Down Expand Up @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

h.lastReceivedMs.Store(m.received.UnixMilli())

statsT, err := m.stats.Time()
if err != nil {
h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err))
h.lastMessageMs.Store(0)
} else {
h.lastMessageMs.Store(statsT.Time().UnixMilli())
}

h.sendEvent(m)
case <-time.After(Timeout):
if h.active {
Expand All @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

h.lastReceivedMs.Store(0)
h.lastMessageMs.Store(0)
case <-ctx.Done():
return ctx.Err()
}
Expand Down

0 comments on commit c2d8bd6

Please sign in to comment.