diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 764057183..92fd0bb76 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -221,7 +221,7 @@ func (h *HA) controller() { // Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time. realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) - err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) + err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time") @@ -268,10 +268,12 @@ func (h *HA) controller() { // realize a HA cycle triggered by a heartbeat event. // // shouldLogRoutineEvents indicates if recurrent events should be logged. +// +// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one +// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens. func (h *HA) realize( ctx context.Context, s *icingaredisv1.IcingaStatus, - t *types.UnixMilli, envId types.Binary, shouldLogRoutineEvents bool, ) error { @@ -354,7 +356,7 @@ func (h *HA) realize( EnvironmentMeta: v1.EnvironmentMeta{ EnvironmentId: envId, }, - Heartbeat: *t, + Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessageTime())), Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true}, EndpointId: s.EndpointId, Icinga2Version: s.Version, diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index bb1e83796..7473659fb 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -25,6 +25,7 @@ type Heartbeat struct { active bool events chan *HeartbeatMessage lastReceivedMs atomic.Int64 + lastMessageMs atomic.Int64 cancelCtx context.CancelFunc client *redis.Client done chan struct{} @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 { return h.lastReceivedMs.Load() } +// LastMessageTime returns the last message's time in ms. +func (h *Heartbeat) LastMessageTime() int64 { + return h.lastMessageMs.Load() +} + // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. // Implements the io.Closer interface. func (h *Heartbeat) Close() error { @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) { } h.lastReceivedMs.Store(m.received.UnixMilli()) + + statsT, err := m.stats.Time() + if err != nil { + h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err)) + h.lastMessageMs.Store(0) + } else { + h.lastMessageMs.Store(statsT.Time().UnixMilli()) + } + h.sendEvent(m) case <-time.After(Timeout): if h.active { @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) { } h.lastReceivedMs.Store(0) + h.lastMessageMs.Store(0) case <-ctx.Done(): return ctx.Err() }