Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort HA Realization Logic After Timeout #800

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,10 @@ func run() int {

cancelHactx()
case <-hactx.Done():
// Nothing to do here, surrounding loop will terminate now.
if ctx.Err() != nil {
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
}
// Otherwise, there is nothing to do here, surrounding loop will terminate now.
case <-ha.Done():
if err := ha.Err(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error"))
Expand All @@ -338,8 +341,6 @@ func run() int {
cancelHactx()

return ExitFailure
case <-ctx.Done():
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
case s := <-sig:
logger.Infow("Exiting due to signal", zap.String("signal", s.String()))
cancelHactx()
Expand Down
76 changes: 50 additions & 26 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (h *HA) controller() {
}
tt := t.Time()
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
Expand Down Expand Up @@ -221,7 +221,7 @@ func (h *HA) controller() {

// Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time.
realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime())
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time")
Expand Down Expand Up @@ -267,11 +267,16 @@ func (h *HA) controller() {

// realize a HA cycle triggered by a heartbeat event.
//
// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly
// enforced to abort the realization logic the moment the context expires.
//
// shouldLogRoutineEvents indicates if recurrent events should be logged.
//
// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one
// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens.
func (h *HA) realize(
ctx context.Context,
s *icingaredisv1.IcingaStatus,
t *types.UnixMilli,
envId types.Binary,
shouldLogRoutineEvents bool,
) error {
Expand Down Expand Up @@ -303,6 +308,7 @@ func (h *HA) realize(
if errBegin != nil {
return errors.Wrap(errBegin, "can't start transaction")
}
defer func() { _ = tx.Rollback() }()

query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock
Expand Down Expand Up @@ -353,7 +359,7 @@ func (h *HA) realize(
EnvironmentMeta: v1.EnvironmentMeta{
EnvironmentId: envId,
},
Heartbeat: *t,
Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessageTime())),
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Expand All @@ -373,15 +379,51 @@ func (h *HA) realize(

if takeover != "" {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)
if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil {
return database.CantPerformQuery(err, stmt)
}

if err != nil {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment)
if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}
}

if err := tx.Commit(); err != nil {
return errors.Wrap(err, "can't commit transaction")
// In general, cancellation does not work for COMMIT and ROLLBACK. Some database drivers may support a
// context-based abort, but only if the DBMS allows it. This was also discussed in the initial issue about
// context support to Go's sql package: https://github.com/golang/go/issues/15123#issuecomment-245882486
//
// This paragraph is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() -
// which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over
// to the driver's Commit() method. Drivers may behave differently. For example, the used
// github.com/go-sql-driver/mysql package calls its internal exec() method with a COMMIT query, writing and
// reading packets without honoring the context.
//
// In a nutshell, one cannot expect a Tx.Commit() call to be covered by the transaction context. For this
// reason, the following Commit() call has been moved to its own goroutine, which communicates back via a
// channel selected along with the context. If the context ends before Commit(), this retryable function
// returns with a non-retryable error.
//
// However, while the COMMIT continues in the background, it may still succeed. In this case, the state of
// the database does not match the state of Icinga DB, specifically the database says that this instance is
// active while this instance thinks otherwise. Fortunately, this mismatch is not critical because when this
// function is re-entered, the initial SELECT query would be empty for this Icinga DB node and imply the
// presence of another active instance for the other node. Effectively, this could result in a single HA
// cycle with no active node. Afterwards, either this instance takes over due to the false impression that
// no other node is active, or the other instances does so as the inserted heartbeat has already expired.
// Not great, not terrible.
commitErrCh := make(chan error, 1)
go func() { commitErrCh <- tx.Commit() }()

select {
case err := <-commitErrCh:
if err != nil {
return errors.Wrap(err, "can't commit transaction")
}
case <-ctx.Done():
return ctx.Err()
}

return nil
Expand Down Expand Up @@ -423,12 +465,6 @@ func (h *HA) realize(
}

if takeover != "" {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
if err := h.insertEnvironment(); err != nil {
return errors.Wrap(err, "can't insert environment")
}

h.signalTakeover(takeover)
} else if otherResponsible {
if state := h.state.Load(); !state.otherResponsible {
Expand All @@ -451,18 +487,6 @@ func (h *HA) realizeLostHeartbeat() {
}
}

// insertEnvironment inserts the environment from the specified state into the database if it does not already exist.
func (h *HA) insertEnvironment() error {
// Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does.
stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment)

if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}

return nil
}

func (h *HA) removeInstance(ctx context.Context) {
h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
// Intentionally not using h.ctx here as it's already cancelled.
Expand Down
16 changes: 16 additions & 0 deletions pkg/icingaredis/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Heartbeat struct {
active bool
events chan *HeartbeatMessage
lastReceivedMs atomic.Int64
lastMessageMs atomic.Int64
cancelCtx context.CancelFunc
client *redis.Client
done chan struct{}
Expand Down Expand Up @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 {
return h.lastReceivedMs.Load()
}

// LastMessageTime returns the last message's time in ms.
func (h *Heartbeat) LastMessageTime() int64 {
return h.lastMessageMs.Load()
}

// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
Expand Down Expand Up @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

h.lastReceivedMs.Store(m.received.UnixMilli())

statsT, err := m.stats.Time()
if err != nil {
h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err))
h.lastMessageMs.Store(0)
} else {
h.lastMessageMs.Store(statsT.Time().UnixMilli())
}

h.sendEvent(m)
case <-time.After(Timeout):
if h.active {
Expand All @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

h.lastReceivedMs.Store(0)
h.lastMessageMs.Store(0)
case <-ctx.Done():
return ctx.Err()
}
Expand Down
Loading