GenBatcher
is a simple and lightweight batching utility for Elixir.
This package can be installed by adding :gen_batcher
to your list of
dependencies in mix.exs
:
def deps do
[
{:gen_batcher "~> 1.0.0"}
]
end
For additional documentation, see HexDocs.
GenBatcher
processes are easy to start and have a number of powerful
configuration options:
A GenBatcher
can be started with a simple child spec:
opts = [
name: :my_gen_batcher,
flush_trigger: {:size, 3},
batch_timeout: 30_000,
handle_flush:
fn items, _ ->
items
|> Enum.join(",")
|> IO.puts()
end
]
children = [
{GenBatcher, opts}
]
Supervisor.start_link(children, strategy: :one_for_one)
Once started, items can be inserted into the GenBatcher
:
GenBatcher.insert(:my_gen_batcher, "foo")
GenBatcher.insert(:my_gen_batcher, "bar")
And, once a flush condition has been met, a flush operation will be triggered:
GenBatcher.insert(:my_gen_batcher, "baz")
# Flush operation outputs "foo,bar,baz"
GenBatcher.insert(:my_gen_batcher, "foo")
# After 30 seconds pass...
# Flush operation outputs "foo"
A GenBatcher
can also be started with any module that implements the
GenBatcher
behaviour:
defmodule MyGenBatcher do
use GenBatcher
def start_link(opts \\ []) do
GenBatcher.start_link(__MODULE__, opts)
end
@impl GenBatcher
def handle_flush(items, _) do
items
|> Enum.join(",")
|> IO.puts()
end
@impl GenBatcher
def handle_insert(item, acc) do
size = acc + byte_size(item)
if size >= 9, do: :flush, else: {:cont, size}
end
@impl GenBatcher
def initial_acc, do: 0
end
Again, items can be inserted into the GenBatcher
once it starts:
GenBatcher.insert(MyGenBatcher, "foo")
GenBatcher.insert(MyGenBatcher, "bar")
And, again, a flush operation will be triggered once a flush condition is met:
GenBatcher.insert(MyGenBatcher, "baz")
# Flush operation outputs "foo,bar,baz"
By default, flush operations are asynchronous to both the caller and the
GenBatcher
partition. In practice, this is achieved by making use of the
Task.Supervisor
module. However, this can be configured with the
:blocking_flush?
option for GenBatcher.start_link/2
. If set to true
, the
partition process will perform the flush operation instead of delegating to a
Task
. This can be useful for applying backpressure and ensuring the system
isn't completely flooded.
A GenBatcher
with an extremely cheap flush operation might see a higher
throughput when utilizing blocking flushes but this is generally not the case.
In general, a size condition and/or timeout condition is sufficient for most use
cases. However, GenBatcher
also supports defining custom item-based flush
triggers. For example, these callbacks can be used to
trigger a flush based on byte size.
In cases where an item-based flush trigger is temporarily delayed (ie
GenBatcher.insert_all/3
), the c:GenBatcher.handle_insert/2
callback will not
be called again until after a flush operation is triggered. This means that the
accumulator term is guaranteed to be in a valid state whenever this callback is
invoked.
GenBatcher
leverages Elixir's PartitionSupervisor
in order to support
partitioning. All of a GenBatcher
's partitions collect items and flush
independently.
By default, GenBatcher
uses a round-robin partitioner when inserting items.
However, the partitioner can be overridden with the :partition_key
option for
GenBatcher.insert/3
and GenBatcher.insert_all/3
, allowing for custom
partitioning strategies.
All of a GenBatcher
's partitions utilize the same flush conditions. This can
occasionally lead to bursts of flush operations being triggered at around the
same time. The c:GenBatcher.initial_acc/0
callback can be leveraged to
"jitter" item-based flush triggers in order to "desync" flush operations and
mitigate this issue. For example, the GenBatcher
below enforces an absolute
maximum size of 1,000 items but randomly assigns each partition a maximum size
between 901 and 1,000 items:
defmodule MyJitteredGenBatcher do
use GenBatcher
def start_link(opts \\ []) do
opts = Keyword.put(opts, :partitions, 5)
GenBatcher.start_link(__MODULE__, opts)
end
@impl GenBatcher
def handle_flush(items, _) do
items
|> Enum.join(",")
|> IO.puts()
end
@impl GenBatcher
def handle_insert(_, 1), do: :flush
def handle_insert(_, acc), do: {:cont, acc - 1}
@impl GenBatcher
def initial_acc, do: 900 + :rand.uniform(100)
end
As long as a GenBatcher
is shutdown gracefully, it's guaranteed to flush all
inserted items. The last flush operation for each partition is always performed
by the partition itself, regardless of the :blocking_flush?
option provided to
GenBatcher.start_link/2
.
By default, non-blocking flushes are given unlimited time to complete during
shutdown to ensure that data is not lost. However, if this behavior is not
desirable and data loss is acceptable, this can be managed with the :shutdown
option for GenBatcher.start_link/2
.
GenBatcher
was created as a conceptual fork of the now-archived
ExBuffer
package and is intended
to supersede it.