forked from kafka4beam/brod
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request kafka4beam#499 from spencerdcarlson/sdc/ex-doc
Use ex_doc to generate docs
- Loading branch information
Showing
11 changed files
with
360 additions
and
88 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,6 +74,7 @@ | |
} | ||
] | ||
} | ||
, {verbose, true} | ||
] | ||
} | ||
]. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
# Authentication | ||
|
||
|
||
|
||
## SASL/PLAIN | ||
|
||
### Erlang | ||
|
||
```erlang | ||
[{brod, | ||
[{clients | ||
, [{kafka_client | ||
, [ { endpoints, [{"localhost", 9092}] } | ||
, { ssl, true} | ||
, { sasl, {plain, "GFRW5BSQHKEH0TSG", "GrL3CNTkLhsvtBr8srGn0VilMpgDb4lPD"}} | ||
] | ||
} | ||
] | ||
} | ||
] | ||
}] | ||
``` | ||
|
||
### Elixir | ||
|
||
```elixir | ||
import Config | ||
|
||
config :brod, | ||
clients: [ | ||
kafka_client: [ | ||
endpoints: [ | ||
localhost: 9092 | ||
], | ||
ssl: true, | ||
sasl: { | ||
:plain, | ||
System.get_env("KAFKA_USERNAME"), | ||
System.get_env("KAFKA_PASSWORD") | ||
} | ||
] | ||
] | ||
``` | ||
|
||
## SSL Certificate Validation | ||
Erlang's default configuration for SSL is [verify_none](https://github.com/erlang/otp/blob/OTP-24.3.4/lib/ssl/src/ssl_internal.hrl#L120-L218) | ||
which means that certificates are accepted but not validated. brod passes SSL options to the [kafka_protocol](https://hex.pm/packages/kafka_protocol) library | ||
where they are used to create the [SSL connection](https://github.com/kafka4beam/kafka_protocol/blob/4.0.3/src/kpro_connection.erl#L305). | ||
|
||
For more info see the Erlang Ecosystem Foundation's [server certificate verification](https://erlef.github.io/security-wg/secure_coding_and_deployment_hardening/ssl.html#server-certificate-verification) recommendations. | ||
|
||
## Erlang | ||
```erlang | ||
[{brod, | ||
[{clients | ||
, [{kafka_client | ||
, [ { endpoints, [{"localhost", 9092}] } | ||
, { ssl, [ { verify, verify_peer } | ||
, { cacertfile, "/etc/ssl/certs/ca-certificates.crt" } | ||
, { depth, 3 } | ||
, { customize_hostname_check, | ||
[{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]} | ||
]} | ||
, { sasl, {plain, "GFRW5BSQHKEH0TSG", "GrL3CNTkLhsvtBr8srGn0VilMpgDb4lPD"}} | ||
] | ||
} | ||
] | ||
} | ||
] | ||
}] | ||
``` | ||
|
||
## Elixir | ||
```elixir | ||
import Config | ||
|
||
config :brod, | ||
clients: [ | ||
kafka_client: [ | ||
endpoints: [ | ||
localhost: 9092 | ||
], | ||
ssl: [ | ||
verify: :verify_peer, | ||
cacertfile: "/etc/ssl/certs/ca-certificates.crt", | ||
depth: 3, | ||
customize_hostname_check: [ | ||
match_fun: :public_key.pkix_verify_hostname_match_fun(:https) | ||
], | ||
], | ||
sasl: { | ||
:plain, | ||
System.get_env("KAFKA_USERNAME"), | ||
System.get_env("KAFKA_PASSWORD") | ||
} | ||
] | ||
] | ||
``` | ||
|
||
The examples above are using `/etc/ssl/certs/ca-certificates.crt` which is the certificate authority that comes | ||
with [alpine](https://hub.docker.com/_/alpine) linux. You will need to provide a path to a valid certificate authority | ||
certificate or use [certifi](https://hex.pm/packages/certifi) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
# Consumer Example | ||
|
||
Ensure `:brod` is added to your deps on `mix.exs` | ||
|
||
```elixir | ||
defp deps do | ||
[ | ||
{:brod, "~> 3.10.0"} | ||
] | ||
end | ||
``` | ||
|
||
## Consumer | ||
|
||
Either the `brod_group_subscriber_v2` or `brod_group_subscriber` behaviours can be used | ||
to consume messages. The key difference is that the v2 subscriber runs a worker for each | ||
partition in a separate Erlang process, allowing parallel message processing. | ||
|
||
Here is an example of callback module that implements the `brod_group_subscriber_v2` behaviour to consume messages. | ||
|
||
```elixir | ||
defmodule BrodSample.GroupSubscriberV2 do | ||
@behaviour :brod_group_subscriber_v2 | ||
|
||
def child_spec(_arg) do | ||
config = %{ | ||
client: :kafka_client, | ||
group_id: "consumer_group_name", | ||
topics: ["streaming.events"], | ||
cb_module: __MODULE__, | ||
consumer_config: [{:begin_offset, :earliest}], | ||
init_data: [], | ||
message_type: :message_set, | ||
group_config: [ | ||
offset_commit_policy: :commit_to_kafka_v2, | ||
offset_commit_interval_seconds: 5, | ||
rejoin_delay_seconds: 60, | ||
reconnect_cool_down_seconds: 60 | ||
] | ||
} | ||
|
||
%{ | ||
id: __MODULE__, | ||
start: {brod_group_subscriber_v2, :start_link, [config]}, | ||
type: :worker, | ||
restart: :temporary, | ||
shutdown: 5000 | ||
} | ||
end | ||
|
||
@impl :brod_group_subscriber_v2 | ||
def init(_group_id, _init_data), do: {:ok, []} | ||
|
||
@impl :brod_group_subscriber_v2 | ||
def handle_message(message, _state) do | ||
IO.inspect(message, label: "message") | ||
{:ok, :commit, []} | ||
end | ||
end | ||
``` | ||
|
||
The example module implements `child_spec/1` so that our consumer can be started by a Supervisor. The restart policy is set to `:temporary` | ||
because, in this case, if a message can not be processed, then there is no point in restarting. This might not always | ||
be the case. | ||
|
||
See `brod_group_subscriber_v2:start_link/1` for details on the configuration options. | ||
|
||
See docs for more details about the required or optional callbacks. |
Oops, something went wrong.