diff --git a/go/pools/smartconnpool/benchmarking/legacy/resource_pool.go b/go/pools/smartconnpool/benchmarking/legacy/resource_pool.go index df8c44e1530..33547349860 100644 --- a/go/pools/smartconnpool/benchmarking/legacy/resource_pool.go +++ b/go/pools/smartconnpool/benchmarking/legacy/resource_pool.go @@ -20,7 +20,7 @@ import ( "context" "errors" "fmt" - "math/rand" + "math/rand/v2" "sync" "sync/atomic" "time" @@ -576,7 +576,7 @@ func (rp *ResourcePool) extendedMaxLifetime() time.Duration { if maxLifetime == 0 { return 0 } - return time.Duration(maxLifetime + rand.Int63n(maxLifetime)) + return time.Duration(maxLifetime + rand.Int64N(maxLifetime)) } // MaxLifetimeClosed returns the count of resources closed due to refresh timeout. diff --git a/go/pools/smartconnpool/benchmarking/load_test.go b/go/pools/smartconnpool/benchmarking/load_test.go index 537daf2c357..e97537bca0c 100644 --- a/go/pools/smartconnpool/benchmarking/load_test.go +++ b/go/pools/smartconnpool/benchmarking/load_test.go @@ -21,7 +21,7 @@ import ( "encoding/json" "fmt" "math" - "math/rand" + "math/rand/v2" "os" "sort" "sync" diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index ecc3f827c7b..52346adb1a4 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -18,12 +18,12 @@ package smartconnpool import ( "context" + "math/rand/v2" "slices" "sync" "sync/atomic" "time" - "vitess.io/vitess/go/hack" "vitess.io/vitess/go/vt/log" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" @@ -92,6 +92,7 @@ type RefreshCheck func() (bool, error) type Config[C Connection] struct { Capacity int64 + MaxIdleCount int64 IdleTimeout time.Duration MaxLifetime time.Duration RefreshInterval time.Duration @@ -123,6 +124,8 @@ type ConnPool[C Connection] struct { active atomic.Int64 // capacity is the maximum number of connections that this pool can open capacity atomic.Int64 + // maxIdleCount is the maximum idle connections in the pool + idleCount atomic.Int64 // workers is a waitgroup for all the currently running worker goroutines workers sync.WaitGroup @@ -138,6 +141,8 @@ type ConnPool[C Connection] struct { // maxCapacity is the maximum value to which capacity can be set; when the pool // is re-opened, it defaults to this capacity maxCapacity int64 + // maxIdleCount is the maximum idle connections in the pool + maxIdleCount int64 // maxLifetime is the maximum time a connection can be open maxLifetime atomic.Int64 // idleTimeout is the maximum time a connection can remain idle @@ -156,8 +161,8 @@ type ConnPool[C Connection] struct { // The pool must be ConnPool.Open before it can start giving out connections func NewPool[C Connection](config *Config[C]) *ConnPool[C] { pool := &ConnPool[C]{} - pool.freshSettingsStack.Store(-1) pool.config.maxCapacity = config.Capacity + pool.config.maxIdleCount = config.MaxIdleCount pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds()) pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds()) pool.config.refreshInterval.Store(config.RefreshInterval.Nanoseconds()) @@ -192,11 +197,18 @@ func (pool *ConnPool[C]) runWorker(close <-chan struct{}, interval time.Duration func (pool *ConnPool[C]) open() { pool.close = make(chan struct{}) pool.capacity.Store(pool.config.maxCapacity) + pool.setIdleCount() // The expire worker takes care of removing from the waiter list any clients whose // context has been cancelled. - pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool { - pool.wait.expire(false) + pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool { + maybeStarving := pool.wait.expire(false) + + // Do not allow connections to starve; if there's waiters in the queue + // and connections in the stack, it means we could be starving them. + // Try getting out a connection and handing it over directly + for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ { + } return true }) @@ -315,6 +327,16 @@ func (pool *ConnPool[C]) MaxCapacity() int64 { return pool.config.maxCapacity } +func (pool *ConnPool[C]) setIdleCount() { + capacity := pool.Capacity() + maxIdleCount := pool.config.maxIdleCount + if maxIdleCount == 0 || maxIdleCount > capacity { + pool.idleCount.Store(capacity) + } else { + pool.idleCount.Store(maxIdleCount) + } +} + // InUse returns the number of connections that the pool has lent out to clients and that // haven't been returned yet. func (pool *ConnPool[C]) InUse() int64 { @@ -340,6 +362,10 @@ func (pool *ConnPool[C]) SetIdleTimeout(duration time.Duration) { pool.config.idleTimeout.Store(duration.Nanoseconds()) } +func (pool *ConnPool[D]) IdleCount() int64 { + return pool.idleCount.Load() +} + func (pool *ConnPool[D]) RefreshInterval() time.Duration { return time.Duration(pool.config.refreshInterval.Load()) } @@ -395,14 +421,52 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { } } - if !pool.wait.tryReturnConn(conn) { - connSetting := conn.Conn.Setting() - if connSetting == nil { - pool.clean.Push(conn) - } else { - stack := connSetting.bucket & stackMask - pool.settings[stack].Push(conn) - pool.freshSettingsStack.Store(int64(stack)) + pool.tryReturnConn(conn) +} + +func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool { + if pool.wait.tryReturnConn(conn) { + return true + } + if pool.closeOnIdleLimitReached(conn) { + return false + } + connSetting := conn.Conn.Setting() + if connSetting == nil { + pool.clean.Push(conn) + } else { + stack := connSetting.bucket & stackMask + pool.settings[stack].Push(conn) + pool.freshSettingsStack.Store(int64(stack)) + } + return false +} + +func (pool *ConnPool[C]) tryReturnAnyConn() bool { + if conn, ok := pool.clean.Pop(); ok { + return pool.tryReturnConn(conn) + } + for u := 0; u <= stackMask; u++ { + if conn, ok := pool.settings[u].Pop(); ok { + return pool.tryReturnConn(conn) + } + } + return false +} + +// closeOnIdleLimitReached closes a connection if the number of idle connections (active - inuse) in the pool +// exceeds the idleCount limit. It returns true if the connection is closed, false otherwise. +func (pool *ConnPool[C]) closeOnIdleLimitReached(conn *Pooled[C]) bool { + for { + open := pool.active.Load() + idle := open - pool.borrowed.Load() + if idle <= pool.idleCount.Load() { + return false + } + if pool.active.CompareAndSwap(open, open-1) { + pool.Metrics.idleClosed.Add(1) + conn.Close() + return true } } } @@ -412,8 +476,7 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration { if maxLifetime == 0 { return 0 } - extended := hack.FastRand() % uint32(maxLifetime) - return time.Duration(maxLifetime) + time.Duration(extended) + return time.Duration(maxLifetime) + time.Duration(rand.Uint32N(uint32(maxLifetime))) } func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Time) error { @@ -443,14 +506,9 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) { } func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] { - fresh := pool.freshSettingsStack.Load() - if fresh < 0 { - return nil - } - var start uint32 if setting == nil { - start = uint32(fresh) + start = uint32(pool.freshSettingsStack.Load()) } else { start = setting.bucket } @@ -630,6 +688,9 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error { if oldcap == newcap { return nil } + // update the idle count to match the new capacity if necessary + // wait for connections to be returned to the pool if we're reducing the capacity. + defer pool.setIdleCount() const delay = 10 * time.Millisecond @@ -733,6 +794,9 @@ func (pool *ConnPool[C]) RegisterStats(stats *servenv.Exporter, name string) { // the smartconnpool doesn't have a maximum capacity return pool.Capacity() }) + stats.NewGaugeFunc(name+"IdleAllowed", "Tablet server conn pool idle allowed limit", func() int64 { + return pool.IdleCount() + }) stats.NewCounterFunc(name+"WaitCount", "Tablet server conn pool wait count", func() int64 { return pool.Metrics.WaitCount() }) diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 701327005ad..3fbc6a853db 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "sync" "sync/atomic" "testing" "time" @@ -746,6 +747,51 @@ func TestExtendedLifetimeTimeout(t *testing.T) { } } +// TestMaxIdleCount tests the MaxIdleCount setting, to check if the pool closes +// the idle connections when the number of idle connections exceeds the limit. +func TestMaxIdleCount(t *testing.T) { + testMaxIdleCount := func(t *testing.T, setting *Setting, maxIdleCount int64, expClosedConn int) { + var state TestState + + ctx := context.Background() + p := NewPool(&Config[*TestConn]{ + Capacity: 5, + MaxIdleCount: maxIdleCount, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + defer p.Close() + + var conns []*Pooled[*TestConn] + for i := 0; i < 5; i++ { + r, err := p.Get(ctx, setting) + require.NoError(t, err) + assert.EqualValues(t, i+1, state.open.Load()) + assert.EqualValues(t, 0, p.Metrics.IdleClosed()) + + conns = append(conns, r) + } + + for _, conn := range conns { + p.put(conn) + } + + closedConn := 0 + for _, conn := range conns { + if conn.Conn.IsClosed() { + closedConn++ + } + } + assert.EqualValues(t, expClosedConn, closedConn) + assert.EqualValues(t, expClosedConn, p.Metrics.IdleClosed()) + } + + t.Run("WithoutSettings", func(t *testing.T) { testMaxIdleCount(t, nil, 2, 3) }) + t.Run("WithSettings", func(t *testing.T) { testMaxIdleCount(t, sFoo, 2, 3) }) + t.Run("WithoutSettings-MaxIdleCount-Zero", func(t *testing.T) { testMaxIdleCount(t, nil, 0, 0) }) + t.Run("WithSettings-MaxIdleCount-Zero", func(t *testing.T) { testMaxIdleCount(t, sFoo, 0, 0) }) +} + func TestCreateFail(t *testing.T) { var state TestState state.chaos.failConnect = true @@ -1080,3 +1126,70 @@ func TestApplySettingsFailure(t *testing.T) { p.put(r) } } + +func TestGetSpike(t *testing.T) { + var state TestState + + ctx := context.Background() + p := NewPool(&Config[*TestConn]{ + Capacity: 5, + IdleTimeout: time.Second, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + var resources [10]*Pooled[*TestConn] + var r *Pooled[*TestConn] + var err error + + // Ensure we have a pool with 5 available resources + for i := 0; i < 5; i++ { + r, err = p.Get(ctx, nil) + + require.NoError(t, err) + resources[i] = r + assert.EqualValues(t, 5-i-1, p.Available()) + assert.Zero(t, p.Metrics.WaitCount()) + assert.Zero(t, len(state.waits)) + assert.Zero(t, p.Metrics.WaitTime()) + assert.EqualValues(t, i+1, state.lastID.Load()) + assert.EqualValues(t, i+1, state.open.Load()) + } + + for i := 0; i < 5; i++ { + p.put(resources[i]) + } + + assert.EqualValues(t, 5, p.Available()) + assert.EqualValues(t, 5, p.Active()) + assert.EqualValues(t, 0, p.InUse()) + + for i := 0; i < 2000; i++ { + wg := sync.WaitGroup{} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + errs := make(chan error, 80) + + for j := 0; j < 80; j++ { + wg.Add(1) + + go func() { + defer wg.Done() + r, err = p.Get(ctx, nil) + defer p.put(r) + + if err != nil { + errs <- err + } + }() + } + wg.Wait() + + if len(errs) > 0 { + t.Errorf("Error getting connection: %v", <-errs) + } + + close(errs) + } +} diff --git a/go/pools/smartconnpool/waitlist.go b/go/pools/smartconnpool/waitlist.go index f16215f4b14..ef1eb1fe997 100644 --- a/go/pools/smartconnpool/waitlist.go +++ b/go/pools/smartconnpool/waitlist.go @@ -76,7 +76,7 @@ func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting) (*Pool // expire removes and wakes any expired waiter in the waitlist. // if force is true, it'll wake and remove all the waiters. -func (wl *waitlist[C]) expire(force bool) { +func (wl *waitlist[C]) expire(force bool) (maybeStarving int) { if wl.list.Len() == 0 { return } @@ -91,6 +91,9 @@ func (wl *waitlist[C]) expire(force bool) { expired = append(expired, e) continue } + if e.Value.age == 0 { + maybeStarving++ + } } // remove the expired waiters from the waitlist after traversing it for _, e := range expired { @@ -102,6 +105,7 @@ func (wl *waitlist[C]) expire(force bool) { for _, e := range expired { e.Value.sema.notify(false) } + return } // tryReturnConn tries handing over a connection to one of the waiters in the pool.