Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't spam log messages on backend connection interrupt #515

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions pkg/config/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.uber.org/zap"
"net"
"strings"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -73,6 +74,9 @@ func (r *Redis) NewClient(logger *logging.Logger) (*icingaredis.Client, error) {

// dialWithLogging returns a Redis Dialer with logging capabilities.
func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc {
silenceErrorsUntil := new(int64)
silenceSuccessUntil := new(int64)

// dial behaves like net.Dialer#DialContext,
// but re-tries on common errors that are considered retryable.
return func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
Expand All @@ -88,12 +92,34 @@ func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc
Timeout: 5 * time.Minute,
OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err))
atomic.StoreInt64(silenceSuccessUntil, 0)

now := time.Now()
logw := logger.Debugw

if seu := atomic.LoadInt64(silenceErrorsUntil); seu < now.UnixMilli() {
if atomic.CompareAndSwapInt64(silenceErrorsUntil, seu, now.Add(time.Minute).UnixMilli()) {
logw = logger.Warnw
}
}

logw("Can't connect to Redis. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
if attempt > 0 {
logger.Infow("Reconnected to Redis",
atomic.StoreInt64(silenceErrorsUntil, 0)

now := time.Now()
logw := logger.Debugw

if ssu := atomic.LoadInt64(silenceSuccessUntil); ssu < now.UnixMilli() {
if atomic.CompareAndSwapInt64(silenceSuccessUntil, ssu, now.Add(time.Minute).UnixMilli()) {
logw = logger.Infow
}
}

logw("Reconnected to Redis",
zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))
}
},
Expand Down
36 changes: 31 additions & 5 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.uber.org/zap"
"sync/atomic"
"time"
)

Expand All @@ -23,11 +24,14 @@ var timeout = time.Minute * 5
// RetryConnector wraps driver.Connector with retry logic.
type RetryConnector struct {
driver.Connector
driver Driver

driver Driver
silenceErrorsUntil int64
silenceSuccessUntil int64
}

// Connect implements part of the driver.Connector interface.
func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
func (c *RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
var conn driver.Conn
err := errors.Wrap(retry.WithBackoff(
ctx,
Expand All @@ -43,14 +47,36 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
telemetry.UpdateCurrentDbConnErr(err)

if lastErr == nil || err.Error() != lastErr.Error() {
c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err))
atomic.StoreInt64(&c.silenceSuccessUntil, 0)

now := time.Now()
logw := c.driver.Logger.Debugw

if seu := atomic.LoadInt64(&c.silenceErrorsUntil); seu < now.UnixMilli() {
if atomic.CompareAndSwapInt64(&c.silenceErrorsUntil, seu, now.Add(time.Minute).UnixMilli()) {
logw = c.driver.Logger.Warnw
}
}

logw("Can't connect to database. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
telemetry.UpdateCurrentDbConnErr(nil)

if attempt > 0 {
c.driver.Logger.Infow("Reconnected to database",
atomic.StoreInt64(&c.silenceErrorsUntil, 0)

now := time.Now()
logw := c.driver.Logger.Debugw

if ssu := atomic.LoadInt64(&c.silenceSuccessUntil); ssu < now.UnixMilli() {
if atomic.CompareAndSwapInt64(&c.silenceSuccessUntil, ssu, now.Add(time.Minute).UnixMilli()) {
logw = c.driver.Logger.Infow
}
}

logw("Reconnected to database",
zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))
}
},
Expand All @@ -60,7 +86,7 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
}

// Driver implements part of the driver.Connector interface.
func (c RetryConnector) Driver() driver.Driver {
func (c *RetryConnector) Driver() driver.Driver {
return c.driver
}

Expand Down