Skip to content

Commit

Permalink
Merge pull request #619 from ShapovalovaIrina/add-zstd-compression-su…
Browse files Browse the repository at this point in the history
…pport

feat: update Produce and Fetch APIs to support zstd compression
  • Loading branch information
zmstone authored Feb 27, 2025
2 parents e1a36e0 + 92e6916 commit 8af2314
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 19 deletions.
4 changes: 3 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{deps, [{kafka_protocol, "4.1.10"}]}.
{deps, [{kafka_protocol, "4.2.1"}]}.
{project_plugins, [{rebar3_lint, "~> 3.2.5"}]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warnings_as_errors, warn_unused_vars,warn_shadow_vars,warn_obsolete_guard,debug_info]}.
Expand All @@ -14,6 +14,8 @@
, {proper, "1.4.0"}
, {snappyer, "1.2.9"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {branch, "1.0.10"}}}
, {lz4b, "0.0.11"}
, {ezstd, "1.1.0"}
]},
{erl_opts, [warnings_as_errors, {d, build_brod_cli}]}
]}
Expand Down
2 changes: 1 addition & 1 deletion src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@
{ok, partition()}).
-type partitioner() :: partition_fun() | random | hash.
-type produce_ack_cb() :: fun((partition(), offset()) -> _).
-type compression() :: no_compression | gzip | snappy.
-type compression() :: no_compression | gzip | snappy | lz4 | zstd.
-type call_ref() :: #brod_call_ref{}. %% A record with caller, callee, and ref.
-type produce_result() :: brod_produce_req_buffered
| brod_produce_req_acked.
Expand Down
4 changes: 2 additions & 2 deletions src/brod_kafka_apis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ lookup_vsn_range(Conn, API) ->
%% Do not change range without verification.
supported_versions(API) ->
case API of
produce -> {0, 5};
fetch -> {0, 7};
produce -> {0, 7};
fetch -> {0, 10};
list_offsets -> {0, 2};
metadata -> {0, 2};
offset_commit -> {2, 2};
Expand Down
2 changes: 1 addition & 1 deletion src/brod_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
%% </li>
%% <li>`compression' (optional, default = `no_compression`):
%%
%% `gzip' or `snappy' to enable compression</li>
%% `gzip', `snappy', 'lz4' or `zstd` to enable compression</li>
%% <li>`max_linger_ms' (optional, default = 0):
%%
%% Messages are allowed to 'linger' in buffer for this amount of
Expand Down
56 changes: 46 additions & 10 deletions test/brod_compression_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
%% Test cases
-export([ t_produce_gzip/1
, t_produce_snappy/1
%, t_produce_lz4/1
, t_produce_lz4/1
, t_produce_zstd/1
, t_produce_compressed_batch_consume_from_middle_gzip/1
, t_produce_compressed_batch_consume_from_middle_snappy/1
%, t_produce_compressed_batch_consume_from_middle_lz4/1
, t_produce_compressed_batch_consume_from_middle_lz4/1
, t_produce_compressed_batch_consume_from_middle_zstd/1
]).

-include_lib("common_test/include/ct.hrl").
Expand Down Expand Up @@ -77,10 +79,29 @@ end_per_testcase(Case, Config) ->
Config.

all() -> [F || {F, _A} <- module_info(exports),
case atom_to_list(F) of
"t_" ++ _ -> true;
_ -> false
end].
is_test_case(F) andalso kafka_supports_compression_in_test(F)].

is_test_case(F) ->
case atom_to_list(F) of
"t_" ++ _ -> true;
_ -> false
end.

kafka_supports_compression_in_test(F) ->
CompressionMinVsns = #{
"gzip" => {0, 0},
"snappy" => {0, 8},
"lz4" => {0, 10},
"zstd" => {2, 1}
},
KafkaVsn = kafka_version(),
lists:all(
fun({Compression, MinVsn}) ->
IsTestContainCompression = string:str(atom_to_list(F), Compression) > 0,
not IsTestContainCompression orelse KafkaVsn >= MinVsn
end,
maps:to_list(CompressionMinVsns)
).

%%%_* Test functions ===========================================================

Expand All @@ -90,17 +111,23 @@ t_produce_gzip(Config) ->
t_produce_snappy(Config) ->
run(fun produce/1, snappy, Config).

%t_produce_lz4(Config) ->
% run(fun produce/1, lz4, Config).
t_produce_lz4(Config) ->
run(fun produce/1, lz4, Config).

t_produce_zstd(Config) ->
run(fun produce/1, zstd, Config).

t_produce_compressed_batch_consume_from_middle_gzip(Config) ->
run(fun produce_compressed_batch_consume_from_middle/1, gzip, Config).

t_produce_compressed_batch_consume_from_middle_snappy(Config) ->
run(fun produce_compressed_batch_consume_from_middle/1, snappy, Config).

%t_produce_compressed_batch_consume_from_middle_lz4(Config) ->
% run(fun produce_compressed_batch_consume_from_middle/1, lz4, Config).
t_produce_compressed_batch_consume_from_middle_lz4(Config) ->
run(fun produce_compressed_batch_consume_from_middle/1, lz4, Config).

t_produce_compressed_batch_consume_from_middle_zstd(Config) ->
run(fun produce_compressed_batch_consume_from_middle/1, zstd, Config).

%%%_* Help functions ===========================================================

Expand Down Expand Up @@ -203,6 +230,15 @@ start_client(Hosts, ClientId) ->
client_config() ->
kafka_test_helper:client_config().

kafka_version() ->
case os:getenv("KAFKA_VERSION") of
false ->
?LATEST_KAFKA_VERSION;
Vsn ->
[Major, Minor | _] = string:tokens(Vsn, "."),
{list_to_integer(Major), list_to_integer(Minor)}
end.

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
8 changes: 4 additions & 4 deletions test/brod_kafka_apis_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ only_one_version_test() ->
?assertEqual(0, brod_kafka_apis:pick_version(conn, list_groups)).

pick_brod_max_version_test() ->
%% brod supports max = 5, kafka supports max = 100
%% brod supports max = 7, kafka supports max = 100
?WITH_MECK(#{produce => {0, 100}},
?assertEqual(5, brod_kafka_apis:pick_version(self(), produce))).
?assertEqual(7, brod_kafka_apis:pick_version(self(), produce))).

pick_kafka_max_version_test() ->
%% brod supports max = 2, kafka supports max = 1
Expand All @@ -59,8 +59,8 @@ pick_min_brod_version_2_test() ->
?assertEqual(0, brod_kafka_apis:pick_version(self(), produce))).

no_version_range_intersection_test() ->
%% brod supports 0 - 2, kafka supports 6 - 7
?WITH_MECK(#{produce => {6, 7}},
%% brod supports 0 - 7, kafka supports 8 - 9
?WITH_MECK(#{produce => {8, 9}},
?assertError({unsupported_vsn_range, _, _, _},
brod_kafka_apis:pick_version(self(), produce))).

Expand Down

0 comments on commit 8af2314

Please sign in to comment.