Skip to content

Commit 482b361

Browse files
committed
Make the benchmarks run, move message packing into the Sender
1 parent ce9808b commit 482b361

10 files changed

+50
-14
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/_build
2+
/bench/snapshots
23
/cover
34
/deps
45
erl_crash.dump
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

bench/send_vs_encode_bench.exs

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
defmodule SendVsEncodeBench do
2+
use Benchfella
3+
4+
defmodule Receiver do
5+
def loop do
6+
receive do
7+
_ ->
8+
loop()
9+
end
10+
end
11+
end
12+
13+
setup_all do
14+
pid = spawn(Receiver, :loop, [])
15+
{:ok, pid}
16+
end
17+
18+
bench "sending message", [message: gen_message()] do
19+
send(bench_context, message)
20+
end
21+
22+
bench "encoding message", [message: gen_message()] do
23+
:erlang.term_to_binary(message)
24+
end
25+
26+
defp gen_message() do
27+
Map.new(1..1_000_000, fn item -> {item, :erlang.unique_integer()} end)
28+
end
29+
end

lib/manifold.ex

+8-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ defmodule Manifold do
55
alias Manifold.Sender
66
alias Manifold.Utils
77

8+
@type pack_mode :: :binary | :etf | nil
9+
810
@max_partitioners 32
911
@partitioners min(Application.get_env(:manifold, :partitioners, 1), @max_partitioners)
1012
@workers_per_partitioner Application.get_env(:manifold, :workers_per_partitioner, System.schedulers_online)
@@ -39,6 +41,8 @@ defmodule Manifold do
3941
@spec valid_send_options?(Keyword.t()) :: boolean()
4042
def valid_send_options?(options) when is_list(options) do
4143
valid_options = [
44+
{:pack_mode, :binary},
45+
{:pack_mode, :etf},
4246
{:send_mode, :offload},
4347
]
4448

@@ -57,13 +61,13 @@ defmodule Manifold do
5761
def send([pid], message, options), do: __MODULE__.send(pid, message, options)
5862

5963
def send(pids, message, options) when is_list(pids) do
60-
message = Utils.pack_message(options[:pack_mode], message)
61-
6264
case options[:send_mode] do
6365
:offload ->
64-
Sender.send(current_sender(), current_partitioner(), pids, message)
66+
Sender.send(current_sender(), current_partitioner(), pids, message, options[:pack_mode])
6567

6668
nil ->
69+
message = Utils.pack_message(options[:pack_mode], message)
70+
6771
partitioner_name = current_partitioner()
6872

6973
grouped_by =
@@ -85,7 +89,7 @@ defmodule Manifold do
8589
:offload ->
8690
# To maintain linearizability guaranteed by send/2, we have to send
8791
# it to the sender process, even for a single receiving pid.
88-
Sender.send(current_sender(), current_partitioner(), [pid], message)
92+
Sender.send(current_sender(), current_partitioner(), [pid], message, :etf)
8993

9094
nil ->
9195
Partitioner.send({current_partitioner(), node(pid)}, [pid], message)

lib/manifold/partitioner.ex

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ defmodule Manifold.Partitioner do
2020
GenServer.start_link(__MODULE__, partitions, opts)
2121
end
2222

23-
@spec send(pid, [pid], term) :: :ok
24-
def send(pid, pids, message) do
25-
@gen_module.cast(pid, {:send, pids, message})
23+
@spec send(partitioner :: GenServer.server(), pids :: [pid()], message :: term()) :: :ok
24+
def send(partitioner, pids, message) do
25+
@gen_module.cast(partitioner, {:send, pids, message})
2626
end
2727

2828
## Server Callbacks

lib/manifold/sender.ex

+6-4
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ defmodule Manifold.Sender do
1818
GenServer.start_link(__MODULE__, :ok, opts)
1919
end
2020

21-
@spec send(pid, atom, [pid], term) :: :ok
22-
def send(pid, partitioner, pids, message) do
23-
@gen_module.cast(pid, {:send, partitioner, pids, message})
21+
@spec send(sender :: GenServer.server(), partitioner :: GenServer.server(), pids :: [pid()], message :: term(), pack_mode :: Manifold.pack_mode()) :: :ok
22+
def send(sender, partitioner, pids, message, pack_mode) do
23+
@gen_module.cast(sender, {:send, partitioner, pids, message, pack_mode})
2424
end
2525

2626
## Server Callbacks
@@ -32,7 +32,9 @@ defmodule Manifold.Sender do
3232
{:ok, nil}
3333
end
3434

35-
def handle_cast({:send, partitioner, pids, message}, nil) do
35+
def handle_cast({:send, partitioner, pids, message, pack_mode}, nil) do
36+
message = Utils.pack_message(pack_mode, message)
37+
3638
grouped_by =
3739
Utils.group_by(pids, fn
3840
nil -> nil

lib/manifold/utils.ex

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,19 @@ defmodule Manifold.Utils do
4545
value
4646
end
4747
def hash(key), do: hash("#{key}")
48-
48+
4949
@doc """
5050
Gets the next delay at which we should attempt to hibernate a worker or partitioner process.
5151
"""
52-
@spec next_hibernate_delay() :: integer
52+
@spec next_hibernate_delay() :: integer
5353
def next_hibernate_delay() do
5454
hibernate_delay = Application.get_env(:manifold, :hibernate_delay, 60_000)
5555
hibernate_jitter = Application.get_env(:manifold, :hibernate_jitter, 30_000)
5656

5757
hibernate_delay + :rand.uniform(hibernate_jitter)
5858
end
5959

60-
@spec pack_message(mode :: atom(), message :: term()) :: term()
60+
@spec pack_message(mode :: Manifold.pack_mode(), message :: term()) :: term()
6161
def pack_message(:binary, message), do: {:manifold_binary, :erlang.term_to_binary(message)}
6262
def pack_message(_mode, message), do: message
6363

0 commit comments

Comments
 (0)