Skip to content

Commit

Permalink
Forward unhandled messages to optional subscriber handle_info callback
Browse files Browse the repository at this point in the history
The motivation for adding handle_info callbacks is to allow subscriber
worker processes which are spawned by brod to participate in message
passing, supporting a variety of use cases utilizing async acking and
committing.

An example use case:
* Start a group subscriber using `brod_group_subscriber_v2`
* In a partition worker spawn a new process for every message under a
  supervisor specific to the worker's topic-partition
* When the supervisor has <= N processes, ack last seen offset to fetch
  new messages. When the supervisor has > N processes, messages are not
  acked to apply backpressure
* When all processes up to offset O1 have completed, commit offset O1

Allowing arbitrary message passing in the topic and group subscriber
workers supports not only that use case but many others.
  • Loading branch information
urmastalimaa committed Jun 6, 2024
1 parent c34fa87 commit f35d281
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 4 deletions.
11 changes: 10 additions & 1 deletion src/brod_group_subscriber_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-include("brod_int.hrl").

%% brod_topic_subscriber callbacks
-export([init/2, handle_message/3, terminate/2]).
-export([init/2, handle_message/3, handle_info/2, terminate/2]).

-type start_options() ::
#{ group_id := brod:group_id()
Expand Down Expand Up @@ -91,6 +91,15 @@ handle_message(_Partition, Msg, State) ->
{ok, NewState}
end.

handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
%% Only the {noreply, State} return value is supported.
case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of
{noreply, NewCbState} ->
{noreply, State#state{cb_state = NewCbState}}
end.

terminate(Reason, #state{cb_module = CbModule, cb_state = State}) ->
brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok).

Expand Down
17 changes: 14 additions & 3 deletions src/brod_topic_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@
%% This callback is called before stopping the subscriber
-callback terminate(_Reason, cb_state()) -> _.

-optional_callbacks([terminate/2]).
%% This callback is called when the subscriber receives a message unrelated to
%% the subscription.
%% The callback must return `{noreply, NewCallbackState}'.
-callback handle_info(_Msg, cb_state()) -> {noreply, cb_state()}.

-optional_callbacks([terminate/2, handle_info/2]).

%%%_* Types and macros =========================================================

Expand Down Expand Up @@ -357,8 +362,14 @@ handle_info({'DOWN', _Mref, process, Pid, Reason},
%% not a consumer pid
{noreply, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
%% Only the {noreply, State} return value is supported.
case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of
{noreply, NewCbState} ->
{noreply, State#state{cb_state = NewCbState}}
end.

%% @private
handle_call(Call, _From, State) ->
Expand Down
55 changes: 55 additions & 0 deletions test/brod_topic_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
-export([ init/2
, terminate/2
, handle_message/3
, handle_info/2
]).

%% Test cases
Expand All @@ -40,6 +41,7 @@
, t_callback_crash/1
, t_begin_offset/1
, t_cb_fun/1
, t_consumer_ack_via_message_passing/1
]).

-include("brod_test_setup.hrl").
Expand Down Expand Up @@ -107,6 +109,21 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck
false -> {ok, ack, State}
end.

handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter
, worker_id = Ref
} = State0) ->
%% Participate in state continuity checks
?tp(topic_subscriber_seen_info,
#{ partition => Partition
, offset => Offset
, msg => Msg
, state => Counter
, worker_id => Ref
}),
State = State0#state{counter = Counter + 1},
ok = brod_topic_subscriber:ack(self(), Partition, Offset),
{noreply, State}.

terminate(Reason, #state{worker_id = Ref, counter = Counter}) ->
?tp(topic_subscriber_terminate,
#{ worker_id => Ref
Expand Down Expand Up @@ -184,6 +201,44 @@ t_async_acks(Config) when is_list(Config) ->
check_init_terminate(Trace)
end).

t_consumer_ack_via_message_passing(Config) when is_list(Config) ->
%% Process messages one by one with no prefetch
ConsumerConfig = [ {prefetch_count, 0}
, {prefetch_bytes, 0}
, {sleep_timeout, 0}
, {max_bytes, 0}
],
Partition = 0,
SendFun =
fun(I) ->
produce({?topic, Partition}, <<I>>)
end,
?check_trace(
%% Run stage:
begin
O0 = SendFun(0),
%% Send two messages
Offset0 = SendFun(1),
_Offset1 = SendFun(2),
InitArgs = {_IsAsyncAck = true,
_ConsumerOffsets = [{0, O0}]},
{ok, SubscriberPid} =
brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig,
?MODULE, InitArgs),
{ok, _} = wait_message(<<1>>),
%% ack_offset allows consumer to proceed to message 2
SubscriberPid ! {ack_offset, 0, Offset0},
{ok, _} = wait_message(<<2>>),
ok = brod_topic_subscriber:stop(SubscriberPid),
_Expected = [<<1>>, <<2>>]
end,
%% Check stage:
fun(Expected, Trace) ->
check_received_messages(Expected, Trace),
check_state_continuity(Trace),
check_init_terminate(Trace)
end).

t_begin_offset(Config) when is_list(Config) ->
ConsumerConfig = [ {prefetch_count, 100}
, {prefetch_bytes, 0} %% as discard
Expand Down

0 comments on commit f35d281

Please sign in to comment.