Skip to content

Commit

Permalink
Abort HA Realization Logic After Timeout
Browse files Browse the repository at this point in the history
A strange HA behavior was reported in #787, resulting in both instances
being active.

The logs contained an entry of the previous active instance exiting the
HA.realize() method successfully after 1m9s. This, however, should not
be possible as the method's context is deadlined to a minute after the
heartbeat was received.

However, as it turns out, executing COMMIT on a database transaction is
not bound to the transaction's context, allowing to survive longer. To
mitigate this, another context watch was introduced. Doing so allows
directly handing over, while the other instance can now take over due to
the expired heartbeat in the database.

As a related change, the HA.insertEnvironment() method was inlined into
the retryable function to use the deadlined context. Otherwise, this
might block afterwards, as it was used within HA.realize(), but without
the passed context.

Since the retryable HA function may be executed a few times before
succeeding, the inserted heartbeat value will be directly outdated. The
heartbeat logic was slightly altered to always use the latest heartbeat
time value.

In addition, the main loop select cases for hactx.Done() and ctx.Done()
were unified, as hactx is a derived ctx. A closed ctx case may be lost
as the hactx case could have been chosen.
  • Loading branch information
oxzi committed Sep 24, 2024
1 parent 5e81041 commit 3e78b92
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 29 deletions.
7 changes: 4 additions & 3 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ func run() int {

cancelHactx()
case <-hactx.Done():
// Nothing to do here, surrounding loop will terminate now.
if ctx.Err() != nil {
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
}
// Otherwise, there is nothing to do here, surrounding loop will terminate now.
case <-ha.Done():
if err := ha.Err(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error"))
Expand All @@ -337,8 +340,6 @@ func run() int {
cancelHactx()

return ExitFailure
case <-ctx.Done():
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
case s := <-sig:
logger.Infow("Exiting due to signal", zap.String("signal", s.String()))
cancelHactx()
Expand Down
76 changes: 50 additions & 26 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (h *HA) controller() {
}
tt := t.Time()
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
Expand Down Expand Up @@ -218,7 +218,7 @@ func (h *HA) controller() {

// Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time.
realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime())
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time")
Expand Down Expand Up @@ -264,11 +264,16 @@ func (h *HA) controller() {

// realize a HA cycle triggered by a heartbeat event.
//
// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly
// enforced to abort the realization logic the moment the context expires.
//
// shouldLogRoutineEvents indicates if recurrent events should be logged.
//
// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one
// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens.
func (h *HA) realize(
ctx context.Context,
s *icingaredisv1.IcingaStatus,
t *types.UnixMilli,
envId types.Binary,
shouldLogRoutineEvents bool,
) error {
Expand Down Expand Up @@ -300,6 +305,7 @@ func (h *HA) realize(
if errBegin != nil {
return errors.Wrap(errBegin, "can't start transaction")
}
defer func() { _ = tx.Rollback() }()

query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock
Expand Down Expand Up @@ -350,7 +356,7 @@ func (h *HA) realize(
EnvironmentMeta: v1.EnvironmentMeta{
EnvironmentId: envId,
},
Heartbeat: *t,
Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessage())),
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Expand All @@ -370,15 +376,51 @@ func (h *HA) realize(

if takeover != "" {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)
if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil {
return database.CantPerformQuery(err, stmt)
}

if err != nil {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment)
if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}
}

if err := tx.Commit(); err != nil {
return errors.Wrap(err, "can't commit transaction")
// In general, cancellation does not work for COMMIT and ROLLBACK. Some database drivers may support a
// context-based abort, but only if the DBMS allows it. This was also discussed in the initial issue about
// context support to Go's sql package: https://github.com/golang/go/issues/15123#issuecomment-245882486
//
// This paragraph is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() -
// which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over
// to the driver's Commit() method. Drivers may behave differently. For example, the used
// github.com/go-sql-driver/mysql package calls its internal exec() method with a COMMIT query, writing and
// reading packets without honoring the context.
//
// In a nutshell, one cannot expect a Tx.Commit() call to be covered by the transaction context. For this
// reason, the following Commit() call has been moved to its own goroutine, which communicates back via a
// channel selected along with the context. If the context ends before Commit(), this retryable function
// returns with a non-retryable error.
//
// However, while the COMMIT continues in the background, it may still succeed. In this case, the state of
// the database does not match the state of Icinga DB, specifically the database says that this instance is
// active while this instance thinks otherwise. Fortunately, this mismatch is not critical because when this
// function is re-entered, the initial SELECT query would be empty for this Icinga DB node and imply the
// presence of another active instance for the other node. Effectively, this could result in a single HA
// cycle with no active node. Afterwards, either this instance takes over due to the false impression that
// no other node is active, or the other instances does so as the inserted heartbeat has already expired.
// Not great, not terrible.
commitErrCh := make(chan error, 1)
go func() { commitErrCh <- tx.Commit() }()

select {
case err := <-commitErrCh:
if err != nil {
return errors.Wrap(err, "can't commit transaction")
}
case <-ctx.Done():
return ctx.Err()
}

return nil
Expand Down Expand Up @@ -420,12 +462,6 @@ func (h *HA) realize(
}

if takeover != "" {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
if err := h.insertEnvironment(); err != nil {
return errors.Wrap(err, "can't insert environment")
}

h.signalTakeover(takeover)
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
Expand All @@ -445,18 +481,6 @@ func (h *HA) realizeLostHeartbeat() {
}
}

// insertEnvironment inserts the environment from the specified state into the database if it does not already exist.
func (h *HA) insertEnvironment() error {
// Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does.
stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment)

if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}

return nil
}

func (h *HA) removeInstance(ctx context.Context) {
h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
// Intentionally not using h.ctx here as it's already cancelled.
Expand Down
16 changes: 16 additions & 0 deletions pkg/icingaredis/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Heartbeat struct {
active bool
events chan *HeartbeatMessage
lastReceivedMs int64
lastMessageMs int64
cancelCtx context.CancelFunc
client *redis.Client
done chan struct{}
Expand Down Expand Up @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 {
return atomic.LoadInt64(&h.lastReceivedMs)
}

// LastMessage returns the last message's time in ms.
func (h *Heartbeat) LastMessage() int64 {
return atomic.LoadInt64(&h.lastMessageMs)
}

// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
Expand Down Expand Up @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())

statsT, err := m.stats.Time()
if err != nil {
h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err))
atomic.StoreInt64(&h.lastMessageMs, 0)
} else {
atomic.StoreInt64(&h.lastMessageMs, statsT.Time().UnixMilli())
}

h.sendEvent(m)
case <-time.After(Timeout):
if h.active {
Expand All @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

atomic.StoreInt64(&h.lastReceivedMs, 0)
atomic.StoreInt64(&h.lastMessageMs, 0)
case <-ctx.Done():
return ctx.Err()
}
Expand Down

0 comments on commit 3e78b92

Please sign in to comment.