-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add paged_update_delete plugin for paged deletes and updates
This allows for splitting a large update or delete into multiple queries to prevent locking the related database table for a long period of time. Each update or delete query operates on a range of primary key values. Split a _force_primary_key_order private model dataset method from _primary_key_order and use it in the plugin.
- Loading branch information
1 parent
ebe04fd
commit b4ce555
Showing
6 changed files
with
403 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
# frozen-string-literal: true | ||
|
||
module Sequel | ||
module Plugins | ||
# The paged_update_delete plugin adds +paged_update+ and | ||
# +paged_delete+ dataset methods. These behave similarly to | ||
# the default +update+ and +delete+ dataset methods, except | ||
# that the update or deletion is done in potentially multiple | ||
# queries. For a large table, this prevents the change from | ||
# locking the table for a long period of time. | ||
# | ||
# Because the point of this is to prevent locking tables for | ||
# long periods of time, the separate queries are not contained | ||
# in a transaction, which means if a later query fails, | ||
# earlier queries will still be committed. You could prevent | ||
# this by using a transaction manually, but that defeats the | ||
# purpose of using these methods. | ||
# | ||
# Examples: | ||
# | ||
# Album.where{name <= 'M'}.paged_update(:updated_at=>Sequel::CURRENT_TIMESTAMP) | ||
# # SELECT id FROM albums WHERE (name <= 'M') ORDER BY id LIMIT 1 OFFSET 1001 | ||
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND ("id" < 1002)) | ||
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1001 | ||
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND ("id" < 1002) AND (id >= 1002)) | ||
# # ... | ||
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 10002)) ORDER BY id LIMIT 1 OFFSET 1001 | ||
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND (id >= 10002)) | ||
# | ||
# Album.where{name > 'M'}.paged_delete | ||
# # SELECT id FROM albums WHERE (name > 'M') ORDER BY id LIMIT 1 OFFSET 1001 | ||
# # DELETE FROM albums WHERE ((name > 'M') AND (id < 1002)) | ||
# # SELECT id FROM albums WHERE (name > 'M') ORDER BY id LIMIT 1 OFFSET 1001 | ||
# # DELETE FROM albums WHERE ((name > 'M') AND (id < 2002)) | ||
# # ... | ||
# # SELECT id FROM albums WHERE (name > 'M') ORDER BY id LIMIT 1 OFFSET 10001 | ||
# # DELETE FROM albums WHERE (name > 'M') | ||
# | ||
# To set the number of rows to be updated or deleted per query | ||
# by +paged_update+ or +paged_delete+, you can use the | ||
# +paged_update_delete_size+ dataset method: | ||
# | ||
# Album.where{name <= 'M'}.paged_update_delete_size(3). | ||
# paged_update(:updated_at=>Sequel::CURRENT_TIMESTAMP) | ||
# # SELECT id FROM albums WHERE (name <= 'M') ORDER BY id LIMIT 1 OFFSET 4 | ||
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND ("id" < 5)) | ||
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 5)) ORDER BY id LIMIT 1 OFFSET 4 | ||
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND ("id" < 9) AND (id >= 5)) | ||
# # ... | ||
# # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 12345)) ORDER BY id LIMIT 1 OFFSET 4 | ||
# # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND (id >= 12345)) | ||
# | ||
# You should avoid using +paged_update+ with updates that | ||
# modify the primary key, as such usage is not supported by | ||
# this plugin. | ||
# | ||
# This plugin only supports models with scalar primary keys. | ||
# | ||
# Usage: | ||
# | ||
# # Make all model subclasses support paged update/delete | ||
# # (called before loading subclasses) | ||
# Sequel::Model.plugin :paged_update_delete | ||
# | ||
# # Make the Album class support paged update/delete | ||
# Album.plugin :paged_update_delete | ||
module PagedUpdateDelete | ||
module ClassMethods | ||
Plugins.def_dataset_methods(self, [:paged_delete, :paged_update, :paged_update_delete_size]) | ||
end | ||
|
||
module DatasetMethods | ||
# Delete all rows of the dataset using using multiple queries so that | ||
# no more than 1000 rows are deleted at a time (you can configure the | ||
# number of rows using paged_update_delete_size). | ||
def paged_delete | ||
pk = _paged_update_delete_pk(:paged_delete) | ||
rows_deleted = 0 | ||
offset_ds = _paged_update_delete_offset_ds | ||
while last = offset_ds.get(pk) | ||
rows_deleted += where(pk < last).delete | ||
end | ||
rows_deleted + delete | ||
end | ||
|
||
# Update all rows of the dataset using using multiple queries so that | ||
# no more than 1000 rows are updated at a time (you can configure the | ||
# number of rows using paged_update_delete_size). All arguments are | ||
# passed to Dataset#update. | ||
def paged_update(*args) | ||
pk = _paged_update_delete_pk(:paged_update) | ||
rows_updated = 0 | ||
base_offset_ds = offset_ds = _paged_update_delete_offset_ds | ||
first = nil | ||
|
||
while last = offset_ds.get(pk) | ||
ds = where(pk < last) | ||
ds = ds.where(pk >= first) if first | ||
rows_updated += ds.update(*args) | ||
first = last | ||
offset_ds = base_offset_ds.where(pk >= first) | ||
end | ||
ds = self | ||
ds = ds.where(pk >= first) if first | ||
rows_updated + ds.update(*args) | ||
end | ||
|
||
# Set the number of rows to update or delete per query when using | ||
# paged_update or paged_delete. | ||
def paged_update_delete_size(rows) | ||
raise Error, "paged_update_delete_size rows must be greater than 0" unless rows >= 1 | ||
clone(:paged_updated_delete_rows=>rows) | ||
end | ||
|
||
private | ||
|
||
# Run some basic checks before running paged UPDATE or DELETE queries, | ||
# and return the primary key to operate on as a Sequel::Identifier. | ||
def _paged_update_delete_pk(meth) | ||
raise Error, "cannot use #{meth} if dataset has a limit or offset" if @opts[:limit] || @opts[:offset] | ||
|
||
case pk = model.primary_key | ||
when Symbol | ||
Sequel.identifier(pk) | ||
when Array | ||
raise Error, "cannot use #{meth} on a model with a composite primary key" | ||
else | ||
raise Error, "cannot use #{meth} on a model without a primary key" | ||
end | ||
end | ||
|
||
# The dataset that will be used by paged_update and paged_delete | ||
# to get the upper limit for the next UPDATE or DELETE query. | ||
def _paged_update_delete_offset_ds | ||
offset = @opts[:paged_updated_delete_rows] || 1000 | ||
_force_primary_key_order.offset(offset+1) | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
require_relative "spec_helper" | ||
|
||
describe "paged_update_delete plugin" do | ||
before do | ||
@db = Sequel.mock | ||
@c = Class.new(Sequel::Model(@db[:albums])) | ||
@c.plugin :paged_update_delete | ||
@ds = @c.dataset | ||
@db.sqls | ||
@db.fetch = [[{:id=>1002}], [{:id=>2002}]] | ||
@db.numrows = [1000, 1000, 2] | ||
end | ||
|
||
it "#paged_delete should delete using multiple queries" do | ||
@ds.paged_delete.must_equal 2002 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums WHERE (id < 1002)", | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums WHERE (id < 2002)", | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums" | ||
] | ||
end | ||
|
||
it "#paged_update should update using multiple queries" do | ||
@ds.paged_update(:x=>1).must_equal 2002 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE (id < 1002)", | ||
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE ((id < 2002) AND (id >= 1002))", | ||
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE (id >= 2002)" | ||
] | ||
end | ||
|
||
it "#paged_delete should handle case where number of rows is less than page size" do | ||
@db.fetch = [] | ||
@db.numrows = [2] | ||
@ds.paged_delete.must_equal 2 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums" | ||
] | ||
end | ||
|
||
it "#paged_update should handle case where number of rows is less than page size" do | ||
@db.fetch = [] | ||
@db.numrows = [2] | ||
@ds.paged_update(:x=>1).must_equal 2 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1" | ||
] | ||
end | ||
|
||
it "#paged_delete should respect existing filters" do | ||
@ds.where{x > 3}.paged_delete.must_equal 2002 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums WHERE ((x > 3) AND (id < 1002))", | ||
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums WHERE ((x > 3) AND (id < 2002))", | ||
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums WHERE (x > 3)" | ||
] | ||
end | ||
|
||
it "#paged_update should respect existing filters" do | ||
@ds.where{x > 3}.paged_update(:x=>1).must_equal 2002 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE ((x > 3) AND (id < 1002))", | ||
"SELECT id FROM albums WHERE ((x > 3) AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE ((x > 3) AND (id < 2002) AND (id >= 1002))", | ||
"SELECT id FROM albums WHERE ((x > 3) AND (id >= 2002)) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE ((x > 3) AND (id >= 2002))" | ||
] | ||
end | ||
|
||
it "#paged_update_delete_size should set the page size for paged_update" do | ||
@db.numrows = [4, 4, 2] | ||
@ds.paged_update_delete_size(3).paged_delete.must_equal 10 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", | ||
"DELETE FROM albums WHERE (id < 1002)", | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", | ||
"DELETE FROM albums WHERE (id < 2002)", | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", | ||
"DELETE FROM albums" | ||
] | ||
end | ||
|
||
it "#paged_update_delete_size should set the page size for paged_delete" do | ||
@db.numrows = [4, 4, 2] | ||
@ds.paged_update_delete_size(3).paged_update(:x=>1).must_equal 10 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", | ||
"UPDATE albums SET x = 1 WHERE (id < 1002)", | ||
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 4", | ||
"UPDATE albums SET x = 1 WHERE ((id < 2002) AND (id >= 1002))", | ||
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 4", | ||
"UPDATE albums SET x = 1 WHERE (id >= 2002)" | ||
] | ||
end | ||
|
||
it "should raise error for invalid size passed to paged_update_delete_size" do | ||
proc{@ds.paged_update_delete_size(0)}.must_raise Sequel::Error | ||
proc{@ds.paged_update_delete_size(-1)}.must_raise Sequel::Error | ||
end | ||
|
||
it "should raise error for dataset with limit" do | ||
proc{@ds.limit(1).paged_delete}.must_raise Sequel::Error | ||
proc{@ds.limit(1).paged_update(:x=>1)}.must_raise Sequel::Error | ||
end | ||
|
||
it "should raise error for dataset with offset" do | ||
proc{@ds.offset(1).paged_delete}.must_raise Sequel::Error | ||
proc{@ds.offset(1).paged_update(:x=>1)}.must_raise Sequel::Error | ||
end | ||
|
||
it "should raise error for model with composite primary key" do | ||
@c.set_primary_key [:id, :x] | ||
proc{@c.dataset.paged_delete}.must_raise Sequel::Error | ||
proc{@c.dataset.paged_update(:x=>1)}.must_raise Sequel::Error | ||
end | ||
|
||
it "should raise error for model with no primary key" do | ||
@c.no_primary_key | ||
proc{@c.dataset.paged_delete}.must_raise Sequel::Error | ||
proc{@c.dataset.paged_update(:x=>1)}.must_raise Sequel::Error | ||
end | ||
|
||
it "should offer paged_delete class method" do | ||
@c.paged_delete.must_equal 2002 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums WHERE (id < 1002)", | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums WHERE (id < 2002)", | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"DELETE FROM albums" | ||
] | ||
end | ||
|
||
it "should offer paged_update class method" do | ||
@c.paged_update(:x=>1).must_equal 2002 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE (id < 1002)", | ||
"SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE ((id < 2002) AND (id >= 1002))", | ||
"SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1001", | ||
"UPDATE albums SET x = 1 WHERE (id >= 2002)" | ||
] | ||
end | ||
|
||
it "should offer paged_update_delete_size class method" do | ||
@db.numrows = [4, 4, 2] | ||
@c.paged_update_delete_size(3).paged_delete.must_equal 10 | ||
@db.sqls.must_equal [ | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", | ||
"DELETE FROM albums WHERE (id < 1002)", | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", | ||
"DELETE FROM albums WHERE (id < 2002)", | ||
"SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", | ||
"DELETE FROM albums" | ||
] | ||
end | ||
end |
Oops, something went wrong.