Skip to content

Commit

Permalink
Revert "Add :min_wait_timeout option to sharded_threaded connection p…
Browse files Browse the repository at this point in the history
…ool and default to 0.1, fixes stalling observed on JRuby"

This reverts commit 72f08cd.

This implements a better fix, by rechecking for an available
connection inside the sync block before waiting on the condition
variable.  Because the recheck is inside the sync block, it should
avoid the race condition (barring a bug in the condition variable
signal/wait code).

Add an acquire_available private method to both connection pools
to handle this logic.
  • Loading branch information
jeremyevans committed Jun 17, 2024
1 parent 7b748b5 commit a687bdd
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 37 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
=== master

* Emulate dropping a unique column or a column that is part of an index on SQLite 3.35.0+ (jeremyevans) (#2176)
* Fix race condition in threaded/sharded_threaded connection pools that could cause stalls (jeremyevans)

* Add :min_wait_timeout option to sharded_threaded connection pool and default to 0.1, fixes stalling observed on JRuby (jeremyevans)
* Emulate dropping a unique column or a column that is part of an index on SQLite 3.35.0+ (jeremyevans) (#2176)

* Support MERGE RETURNING on PostgreSQL 17+ (jeremyevans)

Expand Down
44 changes: 28 additions & 16 deletions lib/sequel/connection_pool/sharded_threaded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ class Sequel::ShardedThreadedConnectionPool < Sequel::ThreadedConnectionPool
# Sequel uses Hash.new(:default). You can use a hash with a default proc
# that raises an error if you want to catch all cases where a nonexistent
# server is used.
# :min_wait_timeout :: This sets a minimum timeout when waiting on a condition variable
# (default: 0.1).
def initialize(db, opts = OPTS)
super
@available_connections = {}
Expand All @@ -27,7 +25,6 @@ def initialize(db, opts = OPTS)
remove_instance_variable(:@allocated)
@allocated = {}
@waiters = {}
@min_wait_timeout = opts[:min_wait_timeout] || 0.1

add_servers([:default])
add_servers(opts[:servers].keys) if opts[:servers]
Expand Down Expand Up @@ -200,21 +197,44 @@ def acquire(thread, server)
timeout = @timeout
timer = Sequel.start_timer

if conn = acquire_available(thread, server, timeout)
return conn
end

until conn = assign_connection(thread, server)
elapsed = Sequel.elapsed_seconds_since(timer)
# :nocov:
raise_pool_timeout(elapsed, server) if elapsed > timeout

sync do
@waiters[server].wait(@mutex, condition_variable_wait_timeout(timeout - elapsed))
if conn = next_available(server)
return(allocated(server)[thread] = conn)
end
# It's difficult to get to this point, it can only happen if there is a race condition
# where a connection cannot be acquired even after the thread is signalled by the condition variable
if conn = acquire_available(thread, server, timeout - elapsed)
return conn
end
# :nocov:
end

conn
end

# Acquire a connection if one is already available, or waiting until it becomes available.
def acquire_available(thread, server, timeout)
sync do
# Check if connection was checked in between when assign_connection failed and now.
if conn = next_available(server)
return(allocated(server)[thread] = conn)
end

@waiters[server].wait(@mutex, timeout)

# Connection still not available, could be because a connection was disconnected,
# may have to retry assign_connection to see if a new connection can be made.
if conn = next_available(server)
return(allocated(server)[thread] = conn)
end
end
end

# Assign a connection to the thread, or return nil if one cannot be assigned.
# The caller should NOT have the mutex before calling this.
def assign_connection(thread, server)
Expand Down Expand Up @@ -268,14 +288,6 @@ def checkin_connection(server, conn)
conn
end

# The amount of time to wait on the condition variable. Uses the given
# timeout or the :min_wait_timeout option, whichever is less. This mitigates
# the damage if a connection is not available immediately after the condition
# variable returns from #wait.
def condition_variable_wait_timeout(timeout)
timeout < @min_wait_timeout ? timeout : @min_wait_timeout
end

# Clear the array of available connections for the server, returning an array
# of previous available connections that should be disconnected (or nil if none should be).
# Mark any allocated connections to be removed when they are checked back in. The calling
Expand Down
32 changes: 22 additions & 10 deletions lib/sequel/connection_pool/threaded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,8 @@ def acquire(thread)
timeout = @timeout
timer = Sequel.start_timer

sync do
@waiter.wait(@mutex, timeout)
if conn = next_available
return(@allocated[thread] = conn)
end
if conn = acquire_available(thread, timeout)
return conn
end

until conn = assign_connection(thread)
Expand All @@ -157,18 +154,33 @@ def acquire(thread)

# It's difficult to get to this point, it can only happen if there is a race condition
# where a connection cannot be acquired even after the thread is signalled by the condition variable
sync do
@waiter.wait(@mutex, timeout - elapsed)
if conn = next_available
return(@allocated[thread] = conn)
end
if conn = acquire_available(thread, timeout - elapsed)
return conn
end
# :nocov:
end

conn
end

# Acquire a connection if one is already available, or waiting until it becomes available.
def acquire_available(thread, timeout)
sync do
# Check if connection was checked in between when assign_connection failed and now.
if conn = next_available
return(@allocated[thread] = conn)
end

@waiter.wait(@mutex, timeout)

# Connection still not available, could be because a connection was disconnected,
# may have to retry assign_connection to see if a new connection can be made.
if conn = next_available
return(@allocated[thread] = conn)
end
end
end

# Assign a connection to the thread, or return nil if one cannot be assigned.
# The caller should NOT have the mutex before calling this.
def assign_connection(thread)
Expand Down
9 changes: 0 additions & 9 deletions spec/core/connection_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -677,15 +677,6 @@ def @pool.acquire(_) raise Sequel::DatabaseDisconnectError; end

include concurrent_connection_pool_specs
include threaded_connection_pool_specs

it "should handle a :min_wait_timeout longer than a connection checkout" do
pool = get_pool(:min_wait_timeout=>0.01, :max_connections=>1)
conns = []
4.times.map do
Thread.new{pool.hold{|c| conns << c; sleep 0.02}}
end.map(&:join)
conns.must_equal [1, 1, 1, 1]
end
end

{timed_queue_connection_pool=>"Timed Queue", sharded_timed_queue_connection_pool=>"Sharded Timed Queue"}.each do |pc, desc|
Expand Down

0 comments on commit a687bdd

Please sign in to comment.