From a687bddb1b611e945e1f7651b069af2e40d2de28 Mon Sep 17 00:00:00 2001 From: Jeremy Evans Date: Mon, 17 Jun 2024 12:53:50 -0700 Subject: [PATCH] Revert "Add :min_wait_timeout option to sharded_threaded connection pool and default to 0.1, fixes stalling observed on JRuby" This reverts commit 72f08cd4cb1186b60d937df283317b29107c4506. This implements a better fix, by rechecking for an available connection inside the sync block before waiting on the condition variable. Because the recheck is inside the sync block, it should avoid the race condition (barring a bug in the condition variable signal/wait code). Add an acquire_available private method to both connection pools to handle this logic. --- CHANGELOG | 4 +- .../connection_pool/sharded_threaded.rb | 44 ++++++++++++------- lib/sequel/connection_pool/threaded.rb | 32 +++++++++----- spec/core/connection_pool_spec.rb | 9 ---- 4 files changed, 52 insertions(+), 37 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 499527912..c92def288 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,8 +1,8 @@ === master -* Emulate dropping a unique column or a column that is part of an index on SQLite 3.35.0+ (jeremyevans) (#2176) +* Fix race condition in threaded/sharded_threaded connection pools that could cause stalls (jeremyevans) -* Add :min_wait_timeout option to sharded_threaded connection pool and default to 0.1, fixes stalling observed on JRuby (jeremyevans) +* Emulate dropping a unique column or a column that is part of an index on SQLite 3.35.0+ (jeremyevans) (#2176) * Support MERGE RETURNING on PostgreSQL 17+ (jeremyevans) diff --git a/lib/sequel/connection_pool/sharded_threaded.rb b/lib/sequel/connection_pool/sharded_threaded.rb index bf67d35a9..0afa2a52c 100644 --- a/lib/sequel/connection_pool/sharded_threaded.rb +++ b/lib/sequel/connection_pool/sharded_threaded.rb @@ -15,8 +15,6 @@ class Sequel::ShardedThreadedConnectionPool < Sequel::ThreadedConnectionPool # Sequel uses Hash.new(:default). You can use a hash with a default proc # that raises an error if you want to catch all cases where a nonexistent # server is used. - # :min_wait_timeout :: This sets a minimum timeout when waiting on a condition variable - # (default: 0.1). def initialize(db, opts = OPTS) super @available_connections = {} @@ -27,7 +25,6 @@ def initialize(db, opts = OPTS) remove_instance_variable(:@allocated) @allocated = {} @waiters = {} - @min_wait_timeout = opts[:min_wait_timeout] || 0.1 add_servers([:default]) add_servers(opts[:servers].keys) if opts[:servers] @@ -200,21 +197,44 @@ def acquire(thread, server) timeout = @timeout timer = Sequel.start_timer + if conn = acquire_available(thread, server, timeout) + return conn + end + until conn = assign_connection(thread, server) elapsed = Sequel.elapsed_seconds_since(timer) + # :nocov: raise_pool_timeout(elapsed, server) if elapsed > timeout - sync do - @waiters[server].wait(@mutex, condition_variable_wait_timeout(timeout - elapsed)) - if conn = next_available(server) - return(allocated(server)[thread] = conn) - end + # It's difficult to get to this point, it can only happen if there is a race condition + # where a connection cannot be acquired even after the thread is signalled by the condition variable + if conn = acquire_available(thread, server, timeout - elapsed) + return conn end + # :nocov: end conn end + # Acquire a connection if one is already available, or waiting until it becomes available. + def acquire_available(thread, server, timeout) + sync do + # Check if connection was checked in between when assign_connection failed and now. + if conn = next_available(server) + return(allocated(server)[thread] = conn) + end + + @waiters[server].wait(@mutex, timeout) + + # Connection still not available, could be because a connection was disconnected, + # may have to retry assign_connection to see if a new connection can be made. + if conn = next_available(server) + return(allocated(server)[thread] = conn) + end + end + end + # Assign a connection to the thread, or return nil if one cannot be assigned. # The caller should NOT have the mutex before calling this. def assign_connection(thread, server) @@ -268,14 +288,6 @@ def checkin_connection(server, conn) conn end - # The amount of time to wait on the condition variable. Uses the given - # timeout or the :min_wait_timeout option, whichever is less. This mitigates - # the damage if a connection is not available immediately after the condition - # variable returns from #wait. - def condition_variable_wait_timeout(timeout) - timeout < @min_wait_timeout ? timeout : @min_wait_timeout - end - # Clear the array of available connections for the server, returning an array # of previous available connections that should be disconnected (or nil if none should be). # Mark any allocated connections to be removed when they are checked back in. The calling diff --git a/lib/sequel/connection_pool/threaded.rb b/lib/sequel/connection_pool/threaded.rb index 8961075ee..0c61319bc 100644 --- a/lib/sequel/connection_pool/threaded.rb +++ b/lib/sequel/connection_pool/threaded.rb @@ -143,11 +143,8 @@ def acquire(thread) timeout = @timeout timer = Sequel.start_timer - sync do - @waiter.wait(@mutex, timeout) - if conn = next_available - return(@allocated[thread] = conn) - end + if conn = acquire_available(thread, timeout) + return conn end until conn = assign_connection(thread) @@ -157,11 +154,8 @@ def acquire(thread) # It's difficult to get to this point, it can only happen if there is a race condition # where a connection cannot be acquired even after the thread is signalled by the condition variable - sync do - @waiter.wait(@mutex, timeout - elapsed) - if conn = next_available - return(@allocated[thread] = conn) - end + if conn = acquire_available(thread, timeout - elapsed) + return conn end # :nocov: end @@ -169,6 +163,24 @@ def acquire(thread) conn end + # Acquire a connection if one is already available, or waiting until it becomes available. + def acquire_available(thread, timeout) + sync do + # Check if connection was checked in between when assign_connection failed and now. + if conn = next_available + return(@allocated[thread] = conn) + end + + @waiter.wait(@mutex, timeout) + + # Connection still not available, could be because a connection was disconnected, + # may have to retry assign_connection to see if a new connection can be made. + if conn = next_available + return(@allocated[thread] = conn) + end + end + end + # Assign a connection to the thread, or return nil if one cannot be assigned. # The caller should NOT have the mutex before calling this. def assign_connection(thread) diff --git a/spec/core/connection_pool_spec.rb b/spec/core/connection_pool_spec.rb index 8003a2cf6..0ace412d6 100644 --- a/spec/core/connection_pool_spec.rb +++ b/spec/core/connection_pool_spec.rb @@ -677,15 +677,6 @@ def @pool.acquire(_) raise Sequel::DatabaseDisconnectError; end include concurrent_connection_pool_specs include threaded_connection_pool_specs - - it "should handle a :min_wait_timeout longer than a connection checkout" do - pool = get_pool(:min_wait_timeout=>0.01, :max_connections=>1) - conns = [] - 4.times.map do - Thread.new{pool.hold{|c| conns << c; sleep 0.02}} - end.map(&:join) - conns.must_equal [1, 1, 1, 1] - end end {timed_queue_connection_pool=>"Timed Queue", sharded_timed_queue_connection_pool=>"Sharded Timed Queue"}.each do |pc, desc|