Skip to content

Commit

Permalink
Add paged_datasets to paged_update_delete plugin
Browse files Browse the repository at this point in the history
This extracts the dataset part of paged_update and adds it as a
separate method, as it may be useful for things other than
updates.

While here, do not increment the offset, so that with a page size
of 1, it yields datasets that each contain 1 row, instead of 2 rows.
  • Loading branch information
jeremyevans committed Sep 21, 2023
1 parent b4ce555 commit 802b5de
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 41 deletions.
66 changes: 48 additions & 18 deletions lib/sequel/plugins/paged_update_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ module Plugins
# +paged_delete+ dataset methods. These behave similarly to
# the default +update+ and +delete+ dataset methods, except
# that the update or deletion is done in potentially multiple
# queries. For a large table, this prevents the change from
# queries (by default, affected 1000 rows per query).
# For a large table, this prevents the change from
# locking the table for a long period of time.
#
# Because the point of this is to prevent locking tables for
Expand Down Expand Up @@ -50,9 +51,22 @@ module Plugins
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 12345)) ORDER BY id LIMIT 1 OFFSET 4
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND (id >= 12345))
#
# You should avoid using +paged_update+ with updates that
# modify the primary key, as such usage is not supported by
# this plugin.
# The plugin also adds a +paged_datasets+ method that will yield
# separate datasets limited in size that in total handle all
# rows in the receiver:
#
# Album.where{name > 'M'}.paged_datasets{|ds| puts ds.sql}
# # Runs: SELECT id FROM albums WHERE (name <= 'M') ORDER BY id LIMIT 1 OFFSET 1001
# # Prints: SELECT * FROM albums WHERE ((name <= 'M') AND ("id" < 1002))
# # Runs: SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1001
# # Prints: SELECT * FROM albums WHERE ((name <= 'M') AND ("id" < 1002) AND (id >= 1002))
# # ...
# # Runs: SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 10002)) ORDER BY id LIMIT 1 OFFSET 1001
# # Prints: SELECT * FROM albums WHERE ((name <= 'M') AND (id >= 10002))
#
# You should avoid using +paged_update+ or +paged_datasets+
# with updates that modify the primary key, as such usage is
# not supported by this plugin.
#
# This plugin only supports models with scalar primary keys.
#
Expand All @@ -66,10 +80,36 @@ module Plugins
# Album.plugin :paged_update_delete
module PagedUpdateDelete
module ClassMethods
Plugins.def_dataset_methods(self, [:paged_delete, :paged_update, :paged_update_delete_size])
Plugins.def_dataset_methods(self, [:paged_datasets, :paged_delete, :paged_update, :paged_update_delete_size])
end

module DatasetMethods
# Yield datasets for subsets of the receiver that are limited
# to no more than 1000 rows (you can configure the number of
# rows using paged_update_delete_size).
def paged_datasets
unless defined?(yield)
return enum_for(:paged_datasets)
end

pk = _paged_update_delete_pk(:paged_update)
base_offset_ds = offset_ds = _paged_update_delete_offset_ds
first = nil

while last = offset_ds.get(pk)
ds = where(pk < last)
ds = ds.where(pk >= first) if first
yield ds
first = last
offset_ds = base_offset_ds.where(pk >= first)
end

ds = self
ds = ds.where(pk >= first) if first
yield ds
nil
end

# Delete all rows of the dataset using using multiple queries so that
# no more than 1000 rows are deleted at a time (you can configure the
# number of rows using paged_update_delete_size).
Expand All @@ -88,21 +128,11 @@ def paged_delete
# number of rows using paged_update_delete_size). All arguments are
# passed to Dataset#update.
def paged_update(*args)
pk = _paged_update_delete_pk(:paged_update)
rows_updated = 0
base_offset_ds = offset_ds = _paged_update_delete_offset_ds
first = nil

while last = offset_ds.get(pk)
ds = where(pk < last)
ds = ds.where(pk >= first) if first
paged_datasets do |ds|
rows_updated += ds.update(*args)
first = last
offset_ds = base_offset_ds.where(pk >= first)
end
ds = self
ds = ds.where(pk >= first) if first
rows_updated + ds.update(*args)
rows_updated
end

# Set the number of rows to update or delete per query when using
Expand Down Expand Up @@ -133,7 +163,7 @@ def _paged_update_delete_pk(meth)
# to get the upper limit for the next UPDATE or DELETE query.
def _paged_update_delete_offset_ds
offset = @opts[:paged_updated_delete_rows] || 1000
_force_primary_key_order.offset(offset+1)
_force_primary_key_order.offset(offset)
end
end
end
Expand Down
126 changes: 103 additions & 23 deletions spec/extensions/paged_update_delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,63 @@
it "#paged_delete should delete using multiple queries" do
@ds.paged_delete.must_equal 2002
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums WHERE (id < 1002)",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums WHERE (id < 2002)",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums"
]
end

it "#paged_update should update using multiple queries" do
@ds.paged_update(:x=>1).must_equal 2002
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE (id < 1002)",
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE ((id < 2002) AND (id >= 1002))",
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE (id >= 2002)"
]
end

it "#paged_datasets should yield multiple datasets making up dataset" do
sqls = []
@ds.paged_datasets{|ds| sqls << ds.sql}
sqls.must_equal [
"SELECT * FROM albums WHERE (id < 1002)",
"SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))",
"SELECT * FROM albums WHERE (id >= 2002)"
]
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000",
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000",
]
end

it "#paged_datasets should support returning enum" do
enum = @ds.paged_datasets
enum.must_be_kind_of Enumerator
enum.map(&:sql).must_equal [
"SELECT * FROM albums WHERE (id < 1002)",
"SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))",
"SELECT * FROM albums WHERE (id >= 2002)"
]
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000",
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000",
]
end

it "#paged_delete should handle case where number of rows is less than page size" do
@db.fetch = []
@db.numrows = [2]
@ds.paged_delete.must_equal 2
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums"
]
end
Expand All @@ -50,38 +80,58 @@
@db.numrows = [2]
@ds.paged_update(:x=>1).must_equal 2
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1"
]
end

it "#paged_datasets should handle case where number of rows is less than page size" do
@db.fetch = []
@ds.paged_datasets.map(&:sql).must_equal ['SELECT * FROM albums']
@db.sqls.must_equal ["SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000"]
end

it "#paged_delete should respect existing filters" do
@ds.where{x > 3}.paged_delete.must_equal 2002
@db.sqls.must_equal [
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums WHERE ((x > 3) AND (id < 1002))",
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums WHERE ((x > 3) AND (id < 2002))",
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums WHERE (x > 3)"
]
end

it "#paged_update should respect existing filters" do
@ds.where{x > 3}.paged_update(:x=>1).must_equal 2002
@db.sqls.must_equal [
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE ((x > 3) AND (id < 1002))",
"SELECT id FROM albums WHERE ((x > 3) AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE ((x > 3) AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE ((x > 3) AND (id < 2002) AND (id >= 1002))",
"SELECT id FROM albums WHERE ((x > 3) AND (id >= 2002)) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE ((x > 3) AND (id >= 2002)) ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE ((x > 3) AND (id >= 2002))"
]
end

it "#paged_datasets should respect existing filters" do
@ds.where{x > 3}.paged_datasets.map(&:sql).must_equal [
"SELECT * FROM albums WHERE ((x > 3) AND (id < 1002))",
"SELECT * FROM albums WHERE ((x > 3) AND (id < 2002) AND (id >= 1002))",
"SELECT * FROM albums WHERE ((x > 3) AND (id >= 2002))"
]

@db.sqls.must_equal [
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000",
"SELECT id FROM albums WHERE ((x > 3) AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1000",
"SELECT id FROM albums WHERE ((x > 3) AND (id >= 2002)) ORDER BY id LIMIT 1 OFFSET 1000",
]
end

it "#paged_update_delete_size should set the page size for paged_update" do
@db.numrows = [4, 4, 2]
@ds.paged_update_delete_size(3).paged_delete.must_equal 10
@ds.paged_update_delete_size(4).paged_delete.must_equal 10
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"DELETE FROM albums WHERE (id < 1002)",
Expand All @@ -94,7 +144,7 @@

it "#paged_update_delete_size should set the page size for paged_delete" do
@db.numrows = [4, 4, 2]
@ds.paged_update_delete_size(3).paged_update(:x=>1).must_equal 10
@ds.paged_update_delete_size(4).paged_update(:x=>1).must_equal 10
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"UPDATE albums SET x = 1 WHERE (id < 1002)",
Expand All @@ -105,6 +155,19 @@
]
end

it "#paged_update_delete_size should set the page size for paged_datasets" do
@db.numrows = [4, 4, 2]
@ds.paged_update_delete_size(4).paged_datasets.map(&:sql).must_equal [
"SELECT * FROM albums WHERE (id < 1002)",
"SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))",
"SELECT * FROM albums WHERE (id >= 2002)"
]
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 4",
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 4",
]
end
it "should raise error for invalid size passed to paged_update_delete_size" do
proc{@ds.paged_update_delete_size(0)}.must_raise Sequel::Error
proc{@ds.paged_update_delete_size(-1)}.must_raise Sequel::Error
Expand All @@ -113,52 +176,69 @@
it "should raise error for dataset with limit" do
proc{@ds.limit(1).paged_delete}.must_raise Sequel::Error
proc{@ds.limit(1).paged_update(:x=>1)}.must_raise Sequel::Error
proc{@ds.limit(1).paged_datasets{}}.must_raise Sequel::Error
end

it "should raise error for dataset with offset" do
proc{@ds.offset(1).paged_delete}.must_raise Sequel::Error
proc{@ds.offset(1).paged_update(:x=>1)}.must_raise Sequel::Error
proc{@ds.offset(1).paged_datasets{}}.must_raise Sequel::Error
end

it "should raise error for model with composite primary key" do
@c.set_primary_key [:id, :x]
proc{@c.dataset.paged_delete}.must_raise Sequel::Error
proc{@c.dataset.paged_update(:x=>1)}.must_raise Sequel::Error
proc{@c.dataset.paged_datasets{}}.must_raise Sequel::Error
end

it "should raise error for model with no primary key" do
@c.no_primary_key
proc{@c.dataset.paged_delete}.must_raise Sequel::Error
proc{@c.dataset.paged_update(:x=>1)}.must_raise Sequel::Error
proc{@c.dataset.paged_datasets{}}.must_raise Sequel::Error
end

it "should offer paged_delete class method" do
@c.paged_delete.must_equal 2002
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums WHERE (id < 1002)",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums WHERE (id < 2002)",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"DELETE FROM albums"
]
end

it "should offer paged_update class method" do
@c.paged_update(:x=>1).must_equal 2002
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE (id < 1002)",
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE ((id < 2002) AND (id >= 1002))",
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1001",
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000",
"UPDATE albums SET x = 1 WHERE (id >= 2002)"
]
end

it "should offer paged_datasets class method" do
@c.paged_datasets.map(&:sql).must_equal [
"SELECT * FROM albums WHERE (id < 1002)",
"SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))",
"SELECT * FROM albums WHERE (id >= 2002)"
]
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000",
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000",
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000",
]
end

it "should offer paged_update_delete_size class method" do
@db.numrows = [4, 4, 2]
@c.paged_update_delete_size(3).paged_delete.must_equal 10
@c.paged_update_delete_size(4).paged_delete.must_equal 10
@db.sqls.must_equal [
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4",
"DELETE FROM albums WHERE (id < 1002)",
Expand Down
31 changes: 31 additions & 0 deletions spec/integration/plugin_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3062,6 +3062,21 @@ def set(k, v, ttl) self[k] = v end
@model.select_order_map([:id, :o]).must_equal expected
end

it "Model#paged_datasets should work on unfiltered dataset" do
final_counts = [1, 2, 1, 10, 1, 2, 1, 100, 100]
@sizes.zip(final_counts).each do |rows, expected_fc|
@db.transaction(:rollback=>:always) do
counts = []
@model.paged_update_delete_size(rows).paged_datasets{|ds| counts << ds.count}
counts.pop.must_equal expected_fc
counts.each{|c| c.must_equal rows}
end
end
counts = []
@model.paged_datasets{|ds| counts << ds.count}
counts.must_equal [100]
end

it "Model#paged_delete should work on filtered dataset" do
ds = @model.where{id < 50}
@sizes.each do |rows|
Expand Down Expand Up @@ -3092,4 +3107,20 @@ def set(k, v, ttl) self[k] = v end
ds.select_order_map([:id, :o]).must_equal ds_expected
other.select_order_map([:id, :o]).must_equal other_expected
end

it "Model#paged_datasets should work on filtered dataset" do
ds = @model.where{id < 50}
final_counts = [1, 1, 1, 9, 5, 49, 49, 49, 49]
@sizes.zip(final_counts).each do |rows, expected_fc|
@db.transaction(:rollback=>:always) do
counts = []
ds.paged_update_delete_size(rows).paged_datasets{|ds| counts << ds.count}
counts.pop.must_equal expected_fc
counts.each{|c| c.must_equal rows}
end
end
counts = []
ds.paged_datasets{|ds| counts << ds.count}
counts.must_equal [49]
end
end

0 comments on commit 802b5de

Please sign in to comment.