diff --git a/CHANGELOG b/CHANGELOG index f41cfbf36..5c31c2b5c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,7 @@ +=== master + +* Add temporarily_release_connection Database extension for multithreaded transactional testing (jeremyevans) + === 5.80.0 (2024-05-01) * Support Dataset#skip_locked on MariaDB 10.6+ (simi) (#2150) diff --git a/lib/sequel/extensions/temporarily_release_connection.rb b/lib/sequel/extensions/temporarily_release_connection.rb new file mode 100644 index 000000000..715de2b62 --- /dev/null +++ b/lib/sequel/extensions/temporarily_release_connection.rb @@ -0,0 +1,178 @@ +# frozen-string-literal: true +# +# The temporarily_release_connection extension adds support for temporarily +# releasing a checked out connection back to the connection pool. It is +# designed for use in multithreaded transactional integration tests, allowing +# a connection to start a transaction in one thread, but be temporarily +# released back to the connection pool, so it can be operated on safely +# by multiple threads inside a block. For example, the main thread could be +# running tests that send web requests, and a separate thread running a web +# server that is responding to those requests, and the same connection and +# transaction would be used for both. +# +# To load the extension into the database: +# +# DB.extension :temporarily_release_connection +# +# After the extension is loaded, call the +temporarily_release_connection+ +# method with the connection object to temporarily release the connection +# back to the pool. Example: +# +# DB.transaction(rollback: :always, auto_savepoint: true) do |conn| +# DB.temporarily_release_connection(conn) do +# # Other threads can operate on connection safely inside the transaction +# yield +# end +# end +# +# For sharded connection pools, the second argument to +temporarily_release_connection+ +# is respected, and specifies the server on which to temporarily release the connection. +# +# The temporarily_release_connection extension is only supported with the +# threaded and timed_queue connection pools that ship with Sequel (and the sharded +# versions of each). To make sure that same connection object can be reacquired, it +# is only supported if the maximum connection pool size is 1, so set the Database +# :max_connections option to 1 if you plan to use this extension. +# +# If the +temporarily_release_connection+ method cannot reacquire the same connection +# it released to the pool, it will raise a Sequel::UnableToReacquireConnectionError +# exception. This should only happen if the connection has been disconnected +# while it was temporarily released. If this error is raised, Database#transaction +# will not rollback the transaction, since the connection object is likely no longer +# valid, and on poorly written database drivers, that could cause the process to crash. +# +# Related modules: Sequel::TemporarilyReleaseConnection, +# Sequel::UnableToReacquireConnectionError + +# +module Sequel + # Error class raised if the connection pool does not provide the same connection + # object when checking a temporarily released connection out. + class UnableToReacquireConnectionError < Error + end + + module TemporarilyReleaseConnection + module DatabaseMethods + # Temporarily release the connection back to the connection pool for the + # duration of the block. + def temporarily_release_connection(conn, server=:default, &block) + pool.temporarily_release_connection(conn, server, &block) + end + + private + + # Do nothing if UnableToReacquireConnectionError is raised, as it is + # likely the connection is not in a usable state. + def rollback_transaction(conn, opts) + return if UnableToReacquireConnectionError === $! + super + end + end + + module PoolMethods + # Temporarily release a currently checked out connection, then yield to the block. Reacquire the same + # connection upon the exit of the block. + def temporarily_release_connection(conn, server) + t = Sequel.current + raise Error, "connection not currently checked out" unless conn.equal?(trc_owned_connection(t, server)) + + begin + trc_release(t, conn, server) + yield + ensure + c = trc_acquire(t, server) + unless conn.equal?(c) + raise UnableToReacquireConnectionError, "reacquired connection not the same as initial connection" + end + end + end + end + + module TimedQueue + private + + def trc_owned_connection(t, server) + owned_connection(t) + end + + def trc_release(t, conn, server) + release(t) + end + + def trc_acquire(t, server) + acquire(t) + end + end + + module ShardedTimedQueue + # Normalize the server name for sharded connection pools + def temporarily_release_connection(conn, server) + server = pick_server(server) + super + end + + private + + def trc_owned_connection(t, server) + owned_connection(t, server) + end + + def trc_release(t, conn, server) + release(t, conn, server) + end + + def trc_acquire(t, server) + acquire(t, server) + end + end + + module ThreadedBase + private + + def trc_release(t, conn, server) + sync{super} + end + end + + module Threaded + include TimedQueue + include ThreadedBase + end + + module ShardedThreaded + include ShardedTimedQueue + include ThreadedBase + end + end + + trc = TemporarilyReleaseConnection + trc_map = { + :threaded => trc::Threaded, + :sharded_threaded => trc::ShardedThreaded, + :timed_queue => trc::TimedQueue, + :sharded_timed_queue => trc::ShardedTimedQueue, + }.freeze + + Database.register_extension(:temporarily_release_connection) do |db| + unless pool_mod = trc_map[db.pool.pool_type] + raise(Error, "temporarily_release_connection extension not supported for connection pool type #{db.pool.pool_type}") + end + + case db.pool.pool_type + when :threaded, :sharded_threaded + if db.opts[:connection_handling] == :disconnect + raise Error, "temporarily_release_connection extension not supported with connection_handling: :disconnect option" + end + end + + unless db.pool.max_size == 1 + raise Error, "temporarily_release_connection extension not supported unless :max_connections option is 1" + end + + db.extend(trc::DatabaseMethods) + db.pool.extend(trc::PoolMethods) + db.pool.extend(pool_mod) + end + + private_constant :TemporarilyReleaseConnection +end diff --git a/spec/adapters/sqlite_spec.rb b/spec/adapters/sqlite_spec.rb index becf70764..808d4e22c 100644 --- a/spec/adapters/sqlite_spec.rb +++ b/spec/adapters/sqlite_spec.rb @@ -1058,3 +1058,29 @@ def setup_db(opts) db[:names].where(name: /^J/).select_order_map(:name).must_equal %w[Jane John] end if RUBY_VERSION >= '3.3' end if DB.adapter_scheme == :sqlite + +# Force a separate Database object for these tests, so temporarily_release_connection +# extension is always tested if testing the sqlite adapter. +describe 'temporarily_release_connection plugin' do + it "should temporarily release a connection" do + db = Sequel.sqlite + db.extension :temporarily_release_connection + + db.create_table(:i){Integer :i} + + db.transaction(:rollback=>:always) do |c| + db.temporarily_release_connection(c) do + 4.times.map do |i| + Thread.new do + db.synchronize do |conn| + _(conn).must_be_same_as c + end + db[:i].insert(i) + end + end.map(&:join) + end + db[:i].count.must_equal 4 + end + db[:i].count.must_equal 0 + end +end if DB.adapter_scheme == :sqlite diff --git a/spec/extensions/temporarily_release_connection_spec.rb b/spec/extensions/temporarily_release_connection_spec.rb new file mode 100644 index 000000000..9e8916325 --- /dev/null +++ b/spec/extensions/temporarily_release_connection_spec.rb @@ -0,0 +1,110 @@ +require_relative "spec_helper" + +pool_types = [ :threaded, :sharded_threaded] +pool_types += [ :timed_queue, :sharded_timed_queue] if RUBY_VERSION >= '3.2' + +pool_types.each do |pool_type| + describe "temporarily_release_connection extension with pool class #{pool_type}" do + before do + opts = {:max_connections=>1, :pool_class=>pool_type} + if pool_type.to_s.start_with?('sharded') + opts[:servers] = {:foo=>{}, :bar=>{}} + end + @db = Sequel.mock(opts).extension(:temporarily_release_connection) + end + + it "should temporarily release connection during block so it can be acquired by other threads" do + conns = [] + @db.transaction(:rollback=>:always) do |c| + @db.temporarily_release_connection(c) do + 4.times.map do |i| + Thread.new do + @db.synchronize do |conn| + conns << conn + end + end + end.map(&:join) + end + end + + c = @db.synchronize{|conn| conn} + conns.size.must_equal 4 + conns.each do |conn| + conn.must_be_same_as c + end + + @db.sqls.must_equal ['BEGIN', 'ROLLBACK'] + end + + it "should temporarily release connection for specific shard during block so it can be acquired by other threads" do + conns = [] + @db.transaction(:rollback=>:always, :server=>:foo) do |c| + @db.temporarily_release_connection(c, :foo) do + @db.transaction(:rollback=>:always, :server=>:bar) do |c2| + @db.temporarily_release_connection(c2, :bar) do + 4.times.map do |i| + Thread.new do + @db.synchronize(:foo) do |conn| + @db.synchronize(:bar) do |conn2| + conns << [conn, conn2] + end + end + end + end.map(&:join) + end + end + end + end + + c = @db.synchronize(:foo){|conn| conn} + c2 = @db.synchronize(:bar){|conn| conn} + conns.size.must_equal 4 + conns.each do |conn, conn2| + conn.must_be_same_as c + conn2.must_be_same_as c2 + end + + @db.sqls.must_equal ["BEGIN -- foo", "BEGIN -- bar", "ROLLBACK -- bar", "ROLLBACK -- foo"] + end if pool_type.to_s.start_with?('sharded') + + it "should raise UnableToReacquireConnectionError if unable to reacquire the same connection it released" do + proc do + @db.transaction(rollback: :always) do |conn| + @db.temporarily_release_connection(conn) do + @db.disconnect + end + end + end.must_raise Sequel::UnableToReacquireConnectionError + @db.sqls.must_equal ['BEGIN'] + end + + it "should raise if provided a connection that is not checked out" do + proc do + @db.temporarily_release_connection(@db.synchronize{|conn| conn}) + end.must_raise Sequel::Error + end + + it "should raise if pool max_size is not 1" do + db = Sequel.mock(:pool_type=>pool_type) + proc do + db.extension(:temporarily_release_connection) + end.must_raise Sequel::Error + end + end +end + +describe "temporarily_release_connection extension" do + it "should raise if pool uses connection_handling: :disconnect option" do + db = Sequel.mock(:connection_handling=>:disconnect) + proc do + db.extension(:temporarily_release_connection) + end.must_raise Sequel::Error + end + + it "should raise if pool uses unsupported pool type" do + db = Sequel.mock(:pool_class=>:single) + proc do + db.extension(:temporarily_release_connection) + end.must_raise Sequel::Error + end +end diff --git a/www/pages/plugins.html.erb b/www/pages/plugins.html.erb index 00238b592..8788507ca 100644 --- a/www/pages/plugins.html.erb +++ b/www/pages/plugins.html.erb @@ -771,6 +771,10 @@ Normalizes SQL before logging, helpful for analytics and sensitive data.