Skip to content

Commit

Permalink
Replace paged_operations_size method with :rows_per_page option
Browse files Browse the repository at this point in the history
I did not use this approach when developing the plugin because I
was using a splat for paged_update.  However, as Dataset#update
only takes a hash and not a splat, paged_update should do the same,
making it possible to support an options hash.  Currently, only the
:rows_per_page option is supported.

Minor documentation updates while here.
  • Loading branch information
jeremyevans committed Sep 23, 2023
1 parent 2669aaa commit 2906807
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 68 deletions.
84 changes: 44 additions & 40 deletions lib/sequel/plugins/paged_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Plugins
# +paged_delete+ dataset methods. These behave similarly to
# the default +update+ and +delete+ dataset methods, except
# that the update or deletion is done in potentially multiple
# queries (by default, affected 1000 rows per query).
# queries (by default, affecting 1000 rows per query).
# For a large table, this prevents the change from
# locking the table for a long period of time.
#
Expand All @@ -19,7 +19,7 @@ module Plugins
#
# Examples:
#
# Album.where{name <= 'M'}.paged_update(:updated_at=>Sequel::CURRENT_TIMESTAMP)
# Album.where{name <= 'M'}.paged_update(updated_at: Sequel::CURRENT_TIMESTAMP)
# # SELECT id FROM albums WHERE (name <= 'M') ORDER BY id LIMIT 1 OFFSET 1001
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND ("id" < 1002))
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1001
Expand All @@ -37,20 +37,6 @@ module Plugins
# # SELECT id FROM albums WHERE (name > 'M') ORDER BY id LIMIT 1 OFFSET 10001
# # DELETE FROM albums WHERE (name > 'M')
#
# To set the number of rows to be updated or deleted per query
# by +paged_update+ or +paged_delete+, you can use the
# +paged_operations_size+ dataset method:
#
# Album.where{name <= 'M'}.paged_operations_size(3).
# paged_update(:updated_at=>Sequel::CURRENT_TIMESTAMP)
# # SELECT id FROM albums WHERE (name <= 'M') ORDER BY id LIMIT 1 OFFSET 4
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND ("id" < 5))
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 5)) ORDER BY id LIMIT 1 OFFSET 4
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND ("id" < 9) AND (id >= 5))
# # ...
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 12345)) ORDER BY id LIMIT 1 OFFSET 4
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND (id >= 12345))
#
# The plugin also adds a +paged_datasets+ method that will yield
# separate datasets limited in size that in total handle all
# rows in the receiver:
Expand All @@ -64,6 +50,17 @@ module Plugins
# # Runs: SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 10002)) ORDER BY id LIMIT 1 OFFSET 1001
# # Prints: SELECT * FROM albums WHERE ((name <= 'M') AND (id >= 10002))
#
# To set the number of rows per page, pass a :rows_per_page option:
#
# Album.where{name <= 'M'}.paged_update({x: Sequel[:x] + 1}, rows_per_page: 4)
# # SELECT id FROM albums WHERE (name <= 'M') ORDER BY id LIMIT 1 OFFSET 4
# # UPDATE albums SET x = x + 1 WHERE ((name <= 'M') AND ("id" < 5))
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 5)) ORDER BY id LIMIT 1 OFFSET 4
# # UPDATE albums SET x = x + 1 WHERE ((name <= 'M') AND ("id" < 9) AND (id >= 5))
# # ...
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 12345)) ORDER BY id LIMIT 1 OFFSET 4
# # UPDATE albums SET x = x + 1 WHERE ((name <= 'M') AND (id >= 12345))
#
# You should avoid using +paged_update+ or +paged_datasets+
# with updates that modify the primary key, as such usage is
# not supported by this plugin.
Expand All @@ -72,28 +69,32 @@ module Plugins
#
# Usage:
#
# # Make all model subclasses support paged update/delete
# # Make all model subclasses support paged update/delete/datasets
# # (called before loading subclasses)
# Sequel::Model.plugin :paged_operations
#
# # Make the Album class support paged update/delete
# # Make the Album class support paged update/delete/datasts
# Album.plugin :paged_operations
module PagedOperations
module ClassMethods
Plugins.def_dataset_methods(self, [:paged_datasets, :paged_delete, :paged_update, :paged_operations_size])
Plugins.def_dataset_methods(self, [:paged_datasets, :paged_delete, :paged_update])
end

module DatasetMethods
# Yield datasets for subsets of the receiver that are limited
# to no more than 1000 rows (you can configure the number of
# rows using paged_operations_size).
def paged_datasets
#
# Options:
# :rows_per_fetch :: The maximum number of rows in each yielded dataset
# (unless concurrent modifications are made to the table).
def paged_datasets(opts=OPTS)
unless defined?(yield)
return enum_for(:paged_datasets)
return enum_for(:paged_datasets, opts)
end

pk = _paged_operations_pk(:paged_update)
base_offset_ds = offset_ds = _paged_operations_offset_ds
base_offset_ds = offset_ds = _paged_operations_offset_ds(opts)
first = nil

while last = offset_ds.get(pk)
Expand All @@ -113,10 +114,14 @@ def paged_datasets
# Delete all rows of the dataset using using multiple queries so that
# no more than 1000 rows are deleted at a time (you can configure the
# number of rows using paged_operations_size).
def paged_delete
#
# Options:
# :rows_per_fetch :: The maximum number of rows affected by each DELETE query
# (unless concurrent modifications are made to the table).
def paged_delete(opts=OPTS)
pk = _paged_operations_pk(:paged_delete)
rows_deleted = 0
offset_ds = _paged_operations_offset_ds
offset_ds = _paged_operations_offset_ds(opts)
while last = offset_ds.get(pk)
rows_deleted += where(pk < last).delete
end
Expand All @@ -127,24 +132,21 @@ def paged_delete
# no more than 1000 rows are updated at a time (you can configure the
# number of rows using paged_operations_size). All arguments are
# passed to Dataset#update.
def paged_update(*args)
#
# Options:
# :rows_per_fetch :: The maximum number of rows affected by each UPDATE query
# (unless concurrent modifications are made to the table).
def paged_update(values, opts=OPTS)
rows_updated = 0
paged_datasets do |ds|
rows_updated += ds.update(*args)
paged_datasets(opts) do |ds|
rows_updated += ds.update(values)
end
rows_updated
end

# Set the number of rows to update or delete per query when using
# paged_update or paged_delete.
def paged_operations_size(rows)
raise Error, "paged_operations_size rows must be greater than 0" unless rows >= 1
clone(:paged_operations_rows=>rows)
end

private

# Run some basic checks before running paged UPDATE or DELETE queries,
# Run some basic checks common to paged_{datasets,delete,update}
# and return the primary key to operate on as a Sequel::Identifier.
def _paged_operations_pk(meth)
raise Error, "cannot use #{meth} if dataset has a limit or offset" if @opts[:limit] || @opts[:offset]
Expand All @@ -159,11 +161,13 @@ def _paged_operations_pk(meth)
end
end

# The dataset that will be used by paged_update and paged_delete
# to get the upper limit for the next UPDATE or DELETE query.
def _paged_operations_offset_ds
offset = @opts[:paged_operations_rows] || 1000
_force_primary_key_order.offset(offset)
# The dataset that will be used by paged_{datasets,delete,update}
# to get the upper limit for the next query.
def _paged_operations_offset_ds(opts)
if rows_per_page = opts[:rows_per_page]
raise Error, ":rows_per_page option must be at least 1" unless rows_per_page >= 1
end
_force_primary_key_order.offset(rows_per_page || 1000)
end
end
end
Expand Down
33 changes: 11 additions & 22 deletions spec/extensions/paged_operations_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@
]
end

it "#paged_operations_size should set the page size for paged_update" do
it "#paged_delete should support :rows_per_page option" do
@db.numrows = [4, 4, 2]
@ds.paged_operations_size(4).paged_delete.must_equal 10
@ds.paged_delete(:rows_per_page=>4).must_equal 10
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"DELETE FROM albums WHERE (id < 1002)",
Expand All @@ -142,9 +142,9 @@
]
end

it "#paged_operations_size should set the page size for paged_delete" do
it "#paged_update should support :rows_per_page option" do
@db.numrows = [4, 4, 2]
@ds.paged_operations_size(4).paged_update(:x=>1).must_equal 10
@ds.paged_update({:x=>1}, :rows_per_page=>4).must_equal 10
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"UPDATE albums SET x = 1 WHERE (id < 1002)",
Expand All @@ -155,9 +155,9 @@
]
end

it "#paged_operations_size should set the page size for paged_datasets" do
it "#paged_datasets should support :rows_per_page option" do
@db.numrows = [4, 4, 2]
@ds.paged_operations_size(4).paged_datasets.map(&:sql).must_equal [
@ds.paged_datasets(:rows_per_page=>4).map(&:sql).must_equal [
"SELECT * FROM albums WHERE (id < 1002)",
"SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))",
"SELECT * FROM albums WHERE (id >= 2002)"
Expand All @@ -168,9 +168,11 @@
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 4",
]
end
it "should raise error for invalid size passed to paged_operations_size" do
proc{@ds.paged_operations_size(0)}.must_raise Sequel::Error
proc{@ds.paged_operations_size(-1)}.must_raise Sequel::Error
it "should raise error for invalid :rows_per_page option" do
proc{@ds.paged_datasets(:rows_per_page=>0){}}.must_raise Sequel::Error
proc{@ds.paged_datasets(:rows_per_page=>-1){}}.must_raise Sequel::Error
proc{@ds.paged_delete(:rows_per_page=>0)}.must_raise Sequel::Error
proc{@ds.paged_update({:x=>1}, :rows_per_page=>0)}.must_raise Sequel::Error
end

it "should raise error for dataset with limit" do
Expand Down Expand Up @@ -235,17 +237,4 @@
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000",
]
end

it "should offer paged_operations_size class method" do
@db.numrows = [4, 4, 2]
@c.paged_operations_size(4).paged_delete.must_equal 10
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"DELETE FROM albums WHERE (id < 1002)",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"DELETE FROM albums WHERE (id < 2002)",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"DELETE FROM albums"
]
end
end
12 changes: 6 additions & 6 deletions spec/integration/plugin_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3042,7 +3042,7 @@ def set(k, v, ttl) self[k] = v end
it "Model#paged_delete should work on unfiltered dataset" do
@sizes.each do |rows|
@db.transaction(:rollback=>:always) do
@model.paged_operations_size(rows).paged_delete.must_equal 100
@model.paged_delete(:rows_per_page=>rows).must_equal 100
@model.count.must_equal 0
end
end
Expand All @@ -3054,7 +3054,7 @@ def set(k, v, ttl) self[k] = v end
expected = 100.times.map{|i| [i+1, i+200]}
@sizes.each do |rows|
@db.transaction(:rollback=>:always) do
@model.paged_operations_size(rows).paged_update(:o=>Sequel[:o] + 200).must_equal 100
@model.paged_update({:o=>Sequel[:o] + 200}, :rows_per_page=>rows).must_equal 100
@model.select_order_map([:id, :o]).must_equal expected
end
end
Expand All @@ -3067,7 +3067,7 @@ def set(k, v, ttl) self[k] = v end
@sizes.zip(final_counts).each do |rows, expected_fc|
@db.transaction(:rollback=>:always) do
counts = []
@model.paged_operations_size(rows).paged_datasets{|ds| counts << ds.count}
@model.paged_datasets(:rows_per_page=>rows){|ds| counts << ds.count}
counts.pop.must_equal expected_fc
counts.each{|c| c.must_equal rows}
end
Expand All @@ -3081,7 +3081,7 @@ def set(k, v, ttl) self[k] = v end
ds = @model.where{id < 50}
@sizes.each do |rows|
@db.transaction(:rollback=>:always) do
ds.paged_operations_size(rows).paged_delete.must_equal 49
ds.paged_delete(:rows_per_page=>rows).must_equal 49
ds.count.must_equal 0
@model.count.must_equal 51
end
Expand All @@ -3098,7 +3098,7 @@ def set(k, v, ttl) self[k] = v end
other_expected = 51.times.map{|i| [i+50, i+49]}
@sizes.each do |rows|
@db.transaction(:rollback=>:always) do
ds.paged_operations_size(rows).paged_update(:o=>Sequel[:o] + 200).must_equal 49
ds.paged_update({:o=>Sequel[:o] + 200}, :rows_per_page=>rows).must_equal 49
ds.select_order_map([:id, :o]).must_equal ds_expected
other.select_order_map([:id, :o]).must_equal other_expected
end
Expand All @@ -3114,7 +3114,7 @@ def set(k, v, ttl) self[k] = v end
@sizes.zip(final_counts).each do |rows, expected_fc|
@db.transaction(:rollback=>:always) do
counts = []
ds.paged_operations_size(rows).paged_datasets{|ds| counts << ds.count}
ds.paged_datasets(:rows_per_page=>rows){|ds| counts << ds.count}
counts.pop.must_equal expected_fc
counts.each{|c| c.must_equal rows}
end
Expand Down

0 comments on commit 2906807

Please sign in to comment.