Skip to content

Commit

Permalink
Add sharded_timed_queue connection pool
Browse files Browse the repository at this point in the history
This adds a sharded version of the timed_queue connection pool, and makes the
connection_expiration, connection_validator, and server_block extension
work with it.

While working on this, I found a bug in the sharded_threaded connection
pool's remove_server method, so this fixes that.

To get better testing of this connection pool, this adds support for the
SEQUEL_DEFAULT_CONNECTION_POOL environment variable, which sets the
default :pool_class option if one is not specified.  Various changes
are made to the specs such that you can set this environment variable
when running the specs and have the specs still pass.

I plan to make the timed_queue and sharded_timed_queue connection
pools the default connection pools on Ruby 3.2 in the future.  It's
possible that will be done in Sequel 6, but it could be earlier
depending on feedback.
  • Loading branch information
jeremyevans committed Jun 12, 2023
1 parent 8b87432 commit b85d631
Show file tree
Hide file tree
Showing 13 changed files with 816 additions and 300 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
=== master

* Fix ShardedThreadedConnectionPool#remove_server to disconnect all connections if removing multiple servers (jeremyevans)

* Support SEQUEL_DEFAULT_CONNECTION_POOL environment variable for choosing connection pool when :pool_class Database option is not set (jeremyevans)

* Add sharded_timed_queue connection pool (jeremyevans)

* Make connection_{validator,expiration} and async_thread_pool extensions work with timed_queue connection pool (jeremyevans)

* Make connection_{validator,expiration} extensions raise error when used with single threaded pools (HoneyryderChuck, jeremyevans) (#2049)
Expand Down
8 changes: 7 additions & 1 deletion lib/sequel/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Sequel::ConnectionPool
:sharded_threaded => :ShardedThreadedConnectionPool,
:sharded_single => :ShardedSingleConnectionPool,
:timed_queue => :TimedQueueConnectionPool,
:sharded_timed_queue => :ShardedTimedQueueConnectionPool,
}
POOL_CLASS_MAP.to_a.each{|k, v| POOL_CLASS_MAP[k.to_s] = v}
POOL_CLASS_MAP.freeze
Expand All @@ -42,7 +43,8 @@ module ClassMethods
# Return a pool subclass instance based on the given options. If a <tt>:pool_class</tt>
# option is provided is provided, use that pool class, otherwise
# use a new instance of an appropriate pool subclass based on the
# <tt>:single_threaded</tt> and <tt>:servers</tt> options.
# +SEQUEL_DEFAULT_CONNECTION_POOL+ environment variable if set, or
# the <tt>:single_threaded</tt> and <tt>:servers</tt> options, otherwise.
def get_pool(db, opts = OPTS)
connection_pool_class(opts).new(db, opts)
end
Expand All @@ -62,9 +64,13 @@ def connection_pool_class(opts)
end

pc
elsif pc = ENV['SEQUEL_DEFAULT_CONNECTION_POOL']
connection_pool_class(:pool_class=>ENV['SEQUEL_DEFAULT_CONNECTION_POOL'])
else
pc = if opts[:single_threaded]
opts[:servers] ? :sharded_single : :single
#elsif RUBY_VERSION >= '3.2' # SEQUEL6 or maybe earlier
# opts[:servers] ? :sharded_timed_queue : :timed_queue
else
opts[:servers] ? :sharded_threaded : :threaded
end
Expand Down
21 changes: 11 additions & 10 deletions lib/sequel/connection_pool/sharded_threaded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

require_relative 'threaded'

# The slowest and most advanced connection, dealing with both multi-threaded
# The slowest and most advanced connection pool, dealing with both multi-threaded
# access and configurations with multiple shards/servers.
#
# In addition, this pool subclass also handles scheduling in-use connections
Expand Down Expand Up @@ -112,7 +112,7 @@ def freeze
# available, creates a new connection. Passes the connection to the supplied
# block:
#
# pool.hold {|conn| conn.execute('DROP TABLE posts')}
# pool.hold(:server1) {|conn| conn.execute('DROP TABLE posts')}
#
# Pool#hold is re-entrant, meaning it can be called recursively in
# the same thread without blocking.
Expand Down Expand Up @@ -145,12 +145,13 @@ def hold(server=:default)
# except that after it is used, future requests for the server will use the
# :default server instead.
def remove_servers(servers)
conns = nil
conns = []
raise(Sequel::Error, "cannot remove default server") if servers.include?(:default)

sync do
raise(Sequel::Error, "cannot remove default server") if servers.include?(:default)
servers.each do |server|
if @servers.include?(server)
conns = disconnect_server_connections(server)
conns.concat(disconnect_server_connections(server))
@waiters.delete(server)
@available_connections.delete(server)
@allocated.delete(server)
Expand All @@ -159,9 +160,9 @@ def remove_servers(servers)
end
end

if conns
disconnect_connections(conns)
end
nil
ensure
disconnect_connections(conns)
end

# Return an array of symbols for servers in the connection pool.
Expand All @@ -186,7 +187,7 @@ def _size(server)
# is available. The calling code should NOT already have the mutex when
# calling this.
#
# This should return a connection is one is available within the timeout,
# This should return a connection if one is available within the timeout,
# or nil if a connection could not be acquired within the timeout.
def acquire(thread, server)
if conn = assign_connection(thread, server)
Expand Down Expand Up @@ -325,7 +326,7 @@ def pick_server(server)
# Create the maximum number of connections immediately. The calling code should
# NOT have the mutex before calling this.
def preconnect(concurrent = false)
conn_servers = @servers.keys.map!{|s| Array.new(max_size - _size(s), s)}.flatten!
conn_servers = sync{@servers.keys}.map!{|s| Array.new(max_size - _size(s), s)}.flatten!

if concurrent
conn_servers.map!{|s| Thread.new{[s, make_new(s)]}}.map!(&:value)
Expand Down
Loading

0 comments on commit b85d631

Please sign in to comment.