diff --git a/CHANGELOG.md b/CHANGELOG.md index e28a5215..69ece89b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +- 4.3.1 + - Fixed `brod_client:stop_consumer` so that it doesn't crash the client process if an unknown consumer is given as argument. + - Previously, `brod_group_subscriber_v2` could leave `brod_consumer` processes lingering even after its shutdown. Now, those processes are terminated. + - 4.3.0 - Split brod-cli out to a separate project [kafka4beam/brod-cli](https://github.com/kafka4beam/brod-cli) diff --git a/src/brod_client.erl b/src/brod_client.erl index d52f1b38..b708578c 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -388,8 +388,8 @@ handle_call({stop_producer, Topic}, _From, State) -> ok = brod_producers_sup:stop_producer(State#state.producers_sup, Topic), {reply, ok, State}; handle_call({stop_consumer, Topic}, _From, State) -> - ok = brod_consumers_sup:stop_consumer(State#state.consumers_sup, Topic), - {reply, ok, State}; + Reply = brod_consumers_sup:stop_consumer(State#state.consumers_sup, Topic), + {reply, Reply, State}; handle_call({get_leader_connection, Topic, Partition}, _From, State) -> {Result, NewState} = do_get_leader_connection(State, Topic, Partition), {reply, Result, NewState}; diff --git a/src/brod_group_subscriber_v2.erl b/src/brod_group_subscriber_v2.erl index c1e43fdd..47ece4b0 100644 --- a/src/brod_group_subscriber_v2.erl +++ b/src/brod_group_subscriber_v2.erl @@ -395,12 +395,15 @@ handle_info(_Info, State) -> %%-------------------------------------------------------------------- -spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), State :: term()) -> any(). -terminate(_Reason, #state{workers = Workers, +terminate(_Reason, #state{config = Config, + workers = Workers, coordinator = Coordinator, group_id = GroupId }) -> ok = terminate_all_workers(Workers), - ok = flush_offset_commits(GroupId, Coordinator). + ok = flush_offset_commits(GroupId, Coordinator), + ok = stop_consumers(Config), + ok. %%%=================================================================== %%% Internal functions @@ -529,6 +532,16 @@ do_ack(Topic, Partition, Offset, #state{ workers = Workers {error, unknown_topic_or_partition} end. +stop_consumers(Config) -> + #{ client := Client + , topics := Topics + } = Config, + lists:foreach( + fun(Topic) -> + _ = brod_client:stop_consumer(Client, Topic) + end, + Topics). + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/test/brod_client_SUITE.erl b/test/brod_client_SUITE.erl index eff92257..6217a687 100644 --- a/test/brod_client_SUITE.erl +++ b/test/brod_client_SUITE.erl @@ -44,6 +44,7 @@ , t_sasl_callback/1 , t_magic_version/1 , t_get_partitions_count_safe/1 + , t_double_stop_consumer/1 ]). -include_lib("common_test/include/ct.hrl"). @@ -51,9 +52,9 @@ -include("brod_int.hrl"). -define(HOST, "localhost"). --define(HOSTS, [{?HOST, 9092}]). --define(HOSTS_SSL, [{?HOST, 9093}]). --define(HOSTS_SASL_SSL, [{?HOST, 9094}]). +-define(HOSTS, [{?HOST, 9192}]). +-define(HOSTS_SSL, [{?HOST, 9193}]). +-define(HOSTS_SASL_SSL, [{?HOST, 9194}]). -define(TOPIC, <<"brod-client-SUITE-topic">>). -define(WAIT(PATTERN, RESULT, TIMEOUT), @@ -379,6 +380,21 @@ t_magic_version(Config) when is_list(Config) -> ?assert(is_integer(Ts)) end. +t_double_stop_consumer({init, Config}) -> Config; +t_double_stop_consumer({'end', Config}) -> + brod:stop_client(?FUNCTION_NAME), + Config; +t_double_stop_consumer(Config) when is_list(Config) -> + Client = ?FUNCTION_NAME, + ClientConfig = [{get_metadata_timeout_seconds, 10}], + ok = start_client(?HOSTS, Client, ClientConfig), + ok = brod:start_consumer(Client, ?TOPIC, []), + ?assertMatch({ok, _}, brod_client:get_consumer(Client, ?TOPIC, 0)), + ?assertMatch(ok, brod_client:stop_consumer(Client, ?TOPIC)), + ?assertMatch({error, {consumer_not_found, _}}, brod_client:get_consumer(Client, ?TOPIC, 0)), + ?assertMatch({error, not_found}, brod_client:stop_consumer(Client, ?TOPIC)), + ok. + %%%_* Help functions =========================================================== %% mocked callback diff --git a/test/brod_group_subscriber_SUITE.erl b/test/brod_group_subscriber_SUITE.erl index 7ad69d14..a08cf569 100644 --- a/test/brod_group_subscriber_SUITE.erl +++ b/test/brod_group_subscriber_SUITE.erl @@ -52,6 +52,7 @@ , t_assign_partitions_handles_updating_state/1 , t_get_workers/1 , v2_coordinator_crash/1 + , v2_consumer_cleanup/1 , v2_subscriber_shutdown/1 , v2_subscriber_assignments_revoked/1 ]). @@ -97,6 +98,7 @@ groups() -> , t_assign_partitions_handles_updating_state , t_get_workers , v2_coordinator_crash + , v2_consumer_cleanup , v2_subscriber_shutdown , v2_subscriber_assignments_revoked ]} @@ -391,6 +393,31 @@ v2_coordinator_crash(Config) when is_list(Config) -> ok end). +%% Checks that we don't leave `brod_consumer' processes lingering after we stop a group +%% subscriber v2. +v2_consumer_cleanup(Config) when is_list(Config) -> + InitArgs = #{}, + Topic = ?topic, + Partition = 0, + Client = ?CLIENT_ID, + ?check_trace( + #{timetrap => 5_000}, + begin + {ok, SubscriberPid} = start_subscriber(?group_id, Config, [Topic], InitArgs), + %% Send a message to the topic and wait until it's received to make sure + %% the subscriber is stable: + produce({Topic, Partition}, <<0>>), + {ok, _} = ?wait_message(Topic, Partition, <<0>>, _), + ?assertMatch({ok, _}, brod_client:get_consumer(Client, Topic, Partition)), + ok = stop_subscriber(Config, SubscriberPid), + ?assertMatch({error, {consumer_not_found, _}}, + brod_client:get_consumer(Client, Topic, Partition)), + ok + end, + [] + ), + ok. + v2_subscriber_shutdown(Config) when is_list(Config) -> %% Test graceful shutdown of the group subscriber: InitArgs = #{async_ack => true},