Skip to content

Commit

Permalink
Add :min_wait_timeout option to sharded_threaded connection pool and …
Browse files Browse the repository at this point in the history
…default to 0.1, fixes stalling observed on JRuby

The sharded_threaded connection pool is implemented via per-shard
mutexes, condition variables, and arrays to store the connections.

If there is not an available connection and the pool is at
maximum, Sequel will wait until a connection is available. However,
if between when checking for the available connection and when
waiting, a connection is checked in, it's possible for the wait
to timeout even though a connection is then available.

Mitigate the loss of such a race by setting a minimum wait time
for the condition variable, by default, 0.1.  There is no
problem if there is still no connection available, since Sequel
was already setup to loop in that case.

This commit basically gives up some potential throughput for
better behavior in the worst case.

Simplify acquire while here.

The true solution to this problem is to use a queue that
supports a timeout when popping, which Sequel already implements
in the sharded_timed_queue connection pool, but that requires at
least Ruby 3.2 (Sequel will use it by default on Ruby 3.4+).
  • Loading branch information
jeremyevans committed Jun 13, 2024
1 parent 73fed97 commit 72f08cd
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
=== master

* Add :min_wait_timeout option to sharded_threaded connection pool and default to 0.1, fixes stalling observed on JRuby (jeremyevans)

* Support MERGE RETURNING on PostgreSQL 17+ (jeremyevans)

* Remove use of logger library in bin/sequel (jeremyevans)
Expand Down
24 changes: 12 additions & 12 deletions lib/sequel/connection_pool/sharded_threaded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class Sequel::ShardedThreadedConnectionPool < Sequel::ThreadedConnectionPool
# Sequel uses Hash.new(:default). You can use a hash with a default proc
# that raises an error if you want to catch all cases where a nonexistent
# server is used.
# :min_wait_timeout :: This sets a minimum timeout when waiting on a condition variable
# (default: 0.1).
def initialize(db, opts = OPTS)
super
@available_connections = {}
Expand All @@ -25,6 +27,7 @@ def initialize(db, opts = OPTS)
remove_instance_variable(:@allocated)
@allocated = {}
@waiters = {}
@min_wait_timeout = opts[:min_wait_timeout] || 0.1

add_servers([:default])
add_servers(opts[:servers].keys) if opts[:servers]
Expand Down Expand Up @@ -197,27 +200,16 @@ def acquire(thread, server)
timeout = @timeout
timer = Sequel.start_timer

sync do
@waiters[server].wait(@mutex, timeout)
if conn = next_available(server)
return(allocated(server)[thread] = conn)
end
end

until conn = assign_connection(thread, server)
elapsed = Sequel.elapsed_seconds_since(timer)
# :nocov:
raise_pool_timeout(elapsed, server) if elapsed > timeout

# It's difficult to get to this point, it can only happen if there is a race condition
# where a connection cannot be acquired even after the thread is signalled by the condition variable
sync do
@waiters[server].wait(@mutex, timeout - elapsed)
@waiters[server].wait(@mutex, condition_variable_wait_timeout(timeout - elapsed))
if conn = next_available(server)
return(allocated(server)[thread] = conn)
end
end
# :nocov:
end

conn
Expand Down Expand Up @@ -276,6 +268,14 @@ def checkin_connection(server, conn)
conn
end

# The amount of time to wait on the condition variable. Uses the given
# timeout or the :min_wait_timeout option, whichever is less. This mitigates
# the damage if a connection is not available immediately after the condition
# variable returns from #wait.
def condition_variable_wait_timeout(timeout)
timeout < @min_wait_timeout ? timeout : @min_wait_timeout
end

# Clear the array of available connections for the server, returning an array
# of previous available connections that should be disconnected (or nil if none should be).
# Mark any allocated connections to be removed when they are checked back in. The calling
Expand Down
9 changes: 9 additions & 0 deletions spec/core/connection_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,15 @@ def @pool.acquire(_) raise Sequel::DatabaseDisconnectError; end

include concurrent_connection_pool_specs
include threaded_connection_pool_specs

it "should handle a :min_wait_timeout longer than a connection checkout" do
pool = get_pool(:min_wait_timeout=>0.01, :max_connections=>1)
conns = []
4.times.map do
Thread.new{pool.hold{|c| conns << c; sleep 0.02}}
end.map(&:join)
conns.must_equal [1, 1, 1, 1]
end
end

{timed_queue_connection_pool=>"Timed Queue", sharded_timed_queue_connection_pool=>"Sharded Timed Queue"}.each do |pc, desc|
Expand Down

0 comments on commit 72f08cd

Please sign in to comment.