diff --git a/CHANGELOG b/CHANGELOG index 499527912..c92def288 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,8 +1,8 @@ === master -* Emulate dropping a unique column or a column that is part of an index on SQLite 3.35.0+ (jeremyevans) (#2176) +* Fix race condition in threaded/sharded_threaded connection pools that could cause stalls (jeremyevans) -* Add :min_wait_timeout option to sharded_threaded connection pool and default to 0.1, fixes stalling observed on JRuby (jeremyevans) +* Emulate dropping a unique column or a column that is part of an index on SQLite 3.35.0+ (jeremyevans) (#2176) * Support MERGE RETURNING on PostgreSQL 17+ (jeremyevans) diff --git a/lib/sequel/connection_pool/sharded_threaded.rb b/lib/sequel/connection_pool/sharded_threaded.rb index bf67d35a9..0afa2a52c 100644 --- a/lib/sequel/connection_pool/sharded_threaded.rb +++ b/lib/sequel/connection_pool/sharded_threaded.rb @@ -15,8 +15,6 @@ class Sequel::ShardedThreadedConnectionPool < Sequel::ThreadedConnectionPool # Sequel uses Hash.new(:default). You can use a hash with a default proc # that raises an error if you want to catch all cases where a nonexistent # server is used. - # :min_wait_timeout :: This sets a minimum timeout when waiting on a condition variable - # (default: 0.1). def initialize(db, opts = OPTS) super @available_connections = {} @@ -27,7 +25,6 @@ def initialize(db, opts = OPTS) remove_instance_variable(:@allocated) @allocated = {} @waiters = {} - @min_wait_timeout = opts[:min_wait_timeout] || 0.1 add_servers([:default]) add_servers(opts[:servers].keys) if opts[:servers] @@ -200,21 +197,44 @@ def acquire(thread, server) timeout = @timeout timer = Sequel.start_timer + if conn = acquire_available(thread, server, timeout) + return conn + end + until conn = assign_connection(thread, server) elapsed = Sequel.elapsed_seconds_since(timer) + # :nocov: raise_pool_timeout(elapsed, server) if elapsed > timeout - sync do - @waiters[server].wait(@mutex, condition_variable_wait_timeout(timeout - elapsed)) - if conn = next_available(server) - return(allocated(server)[thread] = conn) - end + # It's difficult to get to this point, it can only happen if there is a race condition + # where a connection cannot be acquired even after the thread is signalled by the condition variable + if conn = acquire_available(thread, server, timeout - elapsed) + return conn end + # :nocov: end conn end + # Acquire a connection if one is already available, or waiting until it becomes available. + def acquire_available(thread, server, timeout) + sync do + # Check if connection was checked in between when assign_connection failed and now. + if conn = next_available(server) + return(allocated(server)[thread] = conn) + end + + @waiters[server].wait(@mutex, timeout) + + # Connection still not available, could be because a connection was disconnected, + # may have to retry assign_connection to see if a new connection can be made. + if conn = next_available(server) + return(allocated(server)[thread] = conn) + end + end + end + # Assign a connection to the thread, or return nil if one cannot be assigned. # The caller should NOT have the mutex before calling this. def assign_connection(thread, server) @@ -268,14 +288,6 @@ def checkin_connection(server, conn) conn end - # The amount of time to wait on the condition variable. Uses the given - # timeout or the :min_wait_timeout option, whichever is less. This mitigates - # the damage if a connection is not available immediately after the condition - # variable returns from #wait. - def condition_variable_wait_timeout(timeout) - timeout < @min_wait_timeout ? timeout : @min_wait_timeout - end - # Clear the array of available connections for the server, returning an array # of previous available connections that should be disconnected (or nil if none should be). # Mark any allocated connections to be removed when they are checked back in. The calling diff --git a/lib/sequel/connection_pool/threaded.rb b/lib/sequel/connection_pool/threaded.rb index 8961075ee..0c61319bc 100644 --- a/lib/sequel/connection_pool/threaded.rb +++ b/lib/sequel/connection_pool/threaded.rb @@ -143,11 +143,8 @@ def acquire(thread) timeout = @timeout timer = Sequel.start_timer - sync do - @waiter.wait(@mutex, timeout) - if conn = next_available - return(@allocated[thread] = conn) - end + if conn = acquire_available(thread, timeout) + return conn end until conn = assign_connection(thread) @@ -157,11 +154,8 @@ def acquire(thread) # It's difficult to get to this point, it can only happen if there is a race condition # where a connection cannot be acquired even after the thread is signalled by the condition variable - sync do - @waiter.wait(@mutex, timeout - elapsed) - if conn = next_available - return(@allocated[thread] = conn) - end + if conn = acquire_available(thread, timeout - elapsed) + return conn end # :nocov: end @@ -169,6 +163,24 @@ def acquire(thread) conn end + # Acquire a connection if one is already available, or waiting until it becomes available. + def acquire_available(thread, timeout) + sync do + # Check if connection was checked in between when assign_connection failed and now. + if conn = next_available + return(@allocated[thread] = conn) + end + + @waiter.wait(@mutex, timeout) + + # Connection still not available, could be because a connection was disconnected, + # may have to retry assign_connection to see if a new connection can be made. + if conn = next_available + return(@allocated[thread] = conn) + end + end + end + # Assign a connection to the thread, or return nil if one cannot be assigned. # The caller should NOT have the mutex before calling this. def assign_connection(thread) diff --git a/spec/core/connection_pool_spec.rb b/spec/core/connection_pool_spec.rb index 8003a2cf6..0ace412d6 100644 --- a/spec/core/connection_pool_spec.rb +++ b/spec/core/connection_pool_spec.rb @@ -677,15 +677,6 @@ def @pool.acquire(_) raise Sequel::DatabaseDisconnectError; end include concurrent_connection_pool_specs include threaded_connection_pool_specs - - it "should handle a :min_wait_timeout longer than a connection checkout" do - pool = get_pool(:min_wait_timeout=>0.01, :max_connections=>1) - conns = [] - 4.times.map do - Thread.new{pool.hold{|c| conns << c; sleep 0.02}} - end.map(&:join) - conns.must_equal [1, 1, 1, 1] - end end {timed_queue_connection_pool=>"Timed Queue", sharded_timed_queue_connection_pool=>"Sharded Timed Queue"}.each do |pc, desc|