diff --git a/CHANGELOG b/CHANGELOG index edda30d2f..35207a77a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,7 @@ === master +* Add :min_wait_timeout option to sharded_threaded connection pool and default to 0.1, fixes stalling observed on JRuby (jeremyevans) + * Support MERGE RETURNING on PostgreSQL 17+ (jeremyevans) * Remove use of logger library in bin/sequel (jeremyevans) diff --git a/lib/sequel/connection_pool/sharded_threaded.rb b/lib/sequel/connection_pool/sharded_threaded.rb index ff8265e00..bf67d35a9 100644 --- a/lib/sequel/connection_pool/sharded_threaded.rb +++ b/lib/sequel/connection_pool/sharded_threaded.rb @@ -15,6 +15,8 @@ class Sequel::ShardedThreadedConnectionPool < Sequel::ThreadedConnectionPool # Sequel uses Hash.new(:default). You can use a hash with a default proc # that raises an error if you want to catch all cases where a nonexistent # server is used. + # :min_wait_timeout :: This sets a minimum timeout when waiting on a condition variable + # (default: 0.1). def initialize(db, opts = OPTS) super @available_connections = {} @@ -25,6 +27,7 @@ def initialize(db, opts = OPTS) remove_instance_variable(:@allocated) @allocated = {} @waiters = {} + @min_wait_timeout = opts[:min_wait_timeout] || 0.1 add_servers([:default]) add_servers(opts[:servers].keys) if opts[:servers] @@ -197,27 +200,16 @@ def acquire(thread, server) timeout = @timeout timer = Sequel.start_timer - sync do - @waiters[server].wait(@mutex, timeout) - if conn = next_available(server) - return(allocated(server)[thread] = conn) - end - end - until conn = assign_connection(thread, server) elapsed = Sequel.elapsed_seconds_since(timer) - # :nocov: raise_pool_timeout(elapsed, server) if elapsed > timeout - # It's difficult to get to this point, it can only happen if there is a race condition - # where a connection cannot be acquired even after the thread is signalled by the condition variable sync do - @waiters[server].wait(@mutex, timeout - elapsed) + @waiters[server].wait(@mutex, condition_variable_wait_timeout(timeout - elapsed)) if conn = next_available(server) return(allocated(server)[thread] = conn) end end - # :nocov: end conn @@ -276,6 +268,14 @@ def checkin_connection(server, conn) conn end + # The amount of time to wait on the condition variable. Uses the given + # timeout or the :min_wait_timeout option, whichever is less. This mitigates + # the damage if a connection is not available immediately after the condition + # variable returns from #wait. + def condition_variable_wait_timeout(timeout) + timeout < @min_wait_timeout ? timeout : @min_wait_timeout + end + # Clear the array of available connections for the server, returning an array # of previous available connections that should be disconnected (or nil if none should be). # Mark any allocated connections to be removed when they are checked back in. The calling diff --git a/spec/core/connection_pool_spec.rb b/spec/core/connection_pool_spec.rb index 0ace412d6..8003a2cf6 100644 --- a/spec/core/connection_pool_spec.rb +++ b/spec/core/connection_pool_spec.rb @@ -677,6 +677,15 @@ def @pool.acquire(_) raise Sequel::DatabaseDisconnectError; end include concurrent_connection_pool_specs include threaded_connection_pool_specs + + it "should handle a :min_wait_timeout longer than a connection checkout" do + pool = get_pool(:min_wait_timeout=>0.01, :max_connections=>1) + conns = [] + 4.times.map do + Thread.new{pool.hold{|c| conns << c; sleep 0.02}} + end.map(&:join) + conns.must_equal [1, 1, 1, 1] + end end {timed_queue_connection_pool=>"Timed Queue", sharded_timed_queue_connection_pool=>"Sharded Timed Queue"}.each do |pc, desc|