Skip to content

Commit

Permalink
chore: prettify md/yaml files
Browse files Browse the repository at this point in the history
Resolved and found through these commands:
- `prettier -w .`
- `markdownlint . --disable MD013 MD026 MD025 MD024`
  • Loading branch information
kianmeng committed Sep 11, 2022
1 parent 4899735 commit d412d8d
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 262 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ name: brod
on:
push:
branches:
- '*'
- "*"
pull_request:
branches:
- master
env:
OTP_VERSION: '24.1'
REBAR_VERSION: '3.17.0'
OTP_VERSION: "24.1"
REBAR_VERSION: "3.17.0"

jobs:
lint:
Expand Down Expand Up @@ -44,8 +44,8 @@ jobs:
strategy:
fail-fast: false
matrix:
otp: [ '24.1', '23.3.4.7', '22.3.4.21' ]
kafka: [ '2.4', '1.1', '0.11' ]
otp: ["24.1", "23.3.4.7", "22.3.4.21"]
kafka: ["2.4", "1.1", "0.11"]
steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
389 changes: 196 additions & 193 deletions changelog.md → CHANGELOG.md

Large diffs are not rendered by default.

71 changes: 39 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,38 @@
![brod](https://github.com/kafka4beam/brod/workflows/brod/badge.svg?branch=master)
# NOTICE

This product includes software developed by
[Klarna Bank AB (publ)](https://www.klarna.com)

# Brod - Apache Kafka Client for Erlang/Elixir

![brod](https://github.com/kafka4beam/brod/workflows/brod/badge.svg?branch=master)

Brod is an Erlang implementation of the Apache Kafka protocol, providing support for both producers and consumers.

Why "brod"? [http://en.wikipedia.org/wiki/Max_Brod](http://en.wikipedia.org/wiki/Max_Brod)

# Features

* Supports Apache Kafka v0.8+
* Robust producer implementation supporting in-flight requests and asynchronous acknowledgements
* Both consumer and producer handle leader re-election and other cluster disturbances internally
* Opens max 1 tcp connection to a broker per `brod_client`, one can create more clients if needed
* Producer: will start to batch automatically when number of unacknowledged (in flight) requests exceeds configurable maximum
* Producer: will try to re-send buffered messages on common errors like "Not a leader for partition", errors are resolved automatically by refreshing metadata
* Simple consumer: The poller, has a configurable "prefetch count" - it will continue sending fetch requests as long as total number of unprocessed messages (not message-sets) is less than "prefetch count"
* Group subscriber: Support for consumer groups with options to have Kafka as offset storage or a custom one
* Topic subscriber: Subscribe on messages from all or selected topic partitions without using consumer groups
* Pick latest supported version when sending requests to kafka.
* Direct APIs for message send/fetch and cluster inspection/management without having to start clients/producers/consumers.
* A escriptized command-line tool for message send/fetch and cluster inspection/management.
* Configurable compression library. `snappy` compression is supported by default.
- Supports Apache Kafka v0.8+
- Robust producer implementation supporting in-flight requests and asynchronous acknowledgements
- Both consumer and producer handle leader re-election and other cluster disturbances internally
- Opens max 1 tcp connection to a broker per `brod_client`, one can create more clients if needed
- Producer: will start to batch automatically when number of unacknowledged (in flight) requests exceeds configurable maximum
- Producer: will try to re-send buffered messages on common errors like "Not a leader for partition", errors are resolved automatically by refreshing metadata
- Simple consumer: The poller, has a configurable "prefetch count" - it will continue sending fetch requests as long as total number of unprocessed messages (not message-sets) is less than "prefetch count"
- Group subscriber: Support for consumer groups with options to have Kafka as offset storage or a custom one
- Topic subscriber: Subscribe on messages from all or selected topic partitions without using consumer groups
- Pick latest supported version when sending requests to kafka.
- Direct APIs for message send/fetch and cluster inspection/management without having to start clients/producers/consumers.
- A escriptized command-line tool for message send/fetch and cluster inspection/management.
- Configurable compression library. `snappy` compression is supported by default.
For more compression options, see [kafka_protocol/README](https://github.com/kafka4beam/kafka_protocol/blob/master/README.md#compression-support)

# Building and testing

NOTE: Min Erlang/OTP version 22

```
```sh
make compile
make test-env t # requires docker-compose in place
```
Expand All @@ -44,7 +45,7 @@ sending such request to older version brokers will cause connection failure.

e.g. in sys.config:

```
```erlang
[{brod,
[ { clients
, [ { brod_client_1 %% registered name
Expand Down Expand Up @@ -101,7 +102,7 @@ Produced to partition 0 at base-offset 406

Brod supervision (and process link) tree.

![](https://cloud.githubusercontent.com/assets/164324/19621338/0b53ccbe-9890-11e6-9142-432a3a87bcc7.jpg)
![brod supervision architecture](https://cloud.githubusercontent.com/assets/164324/19621338/0b53ccbe-9890-11e6-9142-432a3a87bcc7.jpg)

# Clients

Expand Down Expand Up @@ -254,6 +255,7 @@ the caller should expect a message of below pattern for each produce call.
, result = brod_produce_req_acked
}
```

Add `-include_lib("brod/include/brod.hrl").` to use the record.

In case the `brod:produce` caller is a process like `gen_server` which
Expand Down Expand Up @@ -298,7 +300,8 @@ Below diagrams illustrate 3 examples of how subscriber processes may work
with `brod_consumer`.

## Partition subscriber
![](https://cloud.githubusercontent.com/assets/164324/19621677/5e469350-9897-11e6-8c8e-8a6a4f723f73.jpg)

![partition subscriber architecture](https://cloud.githubusercontent.com/assets/164324/19621677/5e469350-9897-11e6-8c8e-8a6a4f723f73.jpg)

This gives the best flexibility as the per-partition subscribers work
directly with per-partition pollers.
Expand All @@ -308,7 +311,8 @@ not individual messages, (however the subscribers are allowed to
ack individual offsets).

## Topic subscriber (`brod_topic_subscriber`)
![](https://cloud.githubusercontent.com/assets/164324/19621951/41e1d75e-989e-11e6-9bc2-49fe814d3020.jpg)

![topic subscribe flow](https://cloud.githubusercontent.com/assets/164324/19621951/41e1d75e-989e-11e6-9bc2-49fe814d3020.jpg)

A topic subscriber provides the easiest way to receive and process messages from
ALL partitions of a given topic. See
Expand All @@ -320,7 +324,8 @@ in a module, or simply provide an anonymous callback function to have the
individual messages processed.

## Group subscriber (`brod_group_subscriber`)
![](https://cloud.githubusercontent.com/assets/164324/19621956/59d76a9a-989e-11e6-9633-a0bc677e06f3.jpg)

![group subscriber flow](https://cloud.githubusercontent.com/assets/164324/19621956/59d76a9a-989e-11e6-9633-a0bc677e06f3.jpg)

Similar to topic subscriber, the `brod_group_subscriber` behaviour callbacks are
to be implemented to process individual messages. See
Expand Down Expand Up @@ -398,12 +403,12 @@ auth(Host :: string(), Sock :: gen_tcp:socket() | ssl:sslsocket(),
```

If authentication is successful - callback function should return an atom `ok`, otherwise - error tuple with reason description.
For example, you can use `brod_gssapi` plugin (https://github.com/kafka4beam/brod_gssapi) for SASL GSSAPI authentication.
For example, you can use [`brod_gssapi` plugin](https://github.com/kafka4beam/brod_gssapi) for SASL GSSAPI authentication.
To use it - add it as dependency to your top level project that uses brod.
Then add `{sasl, {callback, brod_gssapi, {gssapi, Keytab, Principal}}}` to client config.
Keytab should be the keytab file path, and Principal should be a byte-list or binary string.

See also: https://github.com/klarna/brod/wiki/SASL-gssapi-(kerberos)-authentication
See also: <https://github.com/klarna/brod/wiki/SASL-gssapi-(kerberos)-authentication>

# Other API to play with/inspect kafka

Expand Down Expand Up @@ -449,20 +454,21 @@ or (for fetch command) when the partition leader migrates to another broker in t
## brod-cli examples (with `alias brod=_build/brod_cli/rel/brod/bin/brod`):

### Fetch and print metadata
```

```sh
brod meta -b localhost
```

### Produce a Message

```
```sh
brod send -b localhost -t test-topic -p 0 -k "key" -v "value"

```

### Fetch a Message

```
```sh
brod fetch -b localhost -t test-topic -p 0 --fmt 'io:format("offset=~p, ts=~p, key=~s, value=~s\n", [Offset, Ts, Key, Value])'
```

Expand All @@ -477,19 +483,20 @@ Bound variables to be used in `--fmt` expression:
### Stream Messages to Kafka

Send `README.md` to kafka one line per kafka message
```

```sh
brod pipe -b localhost:9092 -t test-topic -p 0 -s @./README.md
```

### Resolve Offset

```
```sh
brod offset -b localhost:9092 -t test-topic -p 0
```

### List or Describe Groups

```
```sh
# List all groups
brod groups -b localhost:9092

Expand All @@ -499,7 +506,7 @@ brod groups -b localhost:9092 --ids group-1,group-2

### Display Committed Offsets

```
```sh
# all topics
brod commits -b localhost:9092 --id the-group-id --describe

Expand All @@ -511,7 +518,7 @@ brod commits -b localhost:9092 --id the-group-id --describe --topic topic-name

NOTE: This feature is designed for force overwriting commits, not for regular use of offset commit.

```
```sh
# Commit 'latest' offsets of all partitions with 2 days retention
brod commits -b localhost:9092 --id the-group-id --topic topic-name --offsets latest --retention 2d

Expand All @@ -527,5 +534,5 @@ brod commits -b localhost:9092 -i the-group-id -t topic-name -o "0:10000" --prot

## TODOs

* Support scram-sasl in brod-cli
* Transactional produce APIs
- Support scram-sasl in brod-cli
- Transactional produce APIs
24 changes: 18 additions & 6 deletions contrib/examples/elixir/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
This is an example of how to use `brod` with Elixir.

## Kafka

For this, we assume you have Kafka up and running at `localhost:9092`

You can use this docker-compose: `https://github.com/obsidiandynamics/kafdrop/blob/master/docker-compose/kafka-kafdrop/docker-compose.yaml` to have Kafdrop running and be able to create topics through a UI on `localhost:9000`

To follow this you have to create a topic called `streaming.events` with more than 1 partition.

## Dependency

First thing you'll need is to add brod to your dependencies
To find the latest version published on hex, run: `mix hex.search brod`

As of writing this, the output was:
```

```sh
➜ brod_sample git:(master) ✗ mix hex.search brod
Package Description Version URL
brod Apache Kafka Erlang client library 3.10.0 https://hex.pm/packages/brod
Expand Down Expand Up @@ -63,7 +66,7 @@ Now with the `kafka_client` in place we can look at how to publish and consumer

## Publisher

To send a message with brod we can use the `produce_sync` function, you can take a better look at the docs and see this and other possibilities at: https://hexdocs.pm/brod/brod.html#produce_sync-5
To send a message with brod we can use the `produce_sync` function, you can take a better look at the docs and see this and other possibilities at: <https://hexdocs.pm/brod/brod.html#produce_sync-5>

Now, lets make a module to allow us publishing to Kafka

Expand Down Expand Up @@ -100,9 +103,9 @@ config :brod,

```


Now let's run and give it a try
```

```sh
➜ brod_sample git:(master) ✗ iex -S mix
Erlang/OTP 22 [erts-10.7.1] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [hipe]

Expand Down Expand Up @@ -154,8 +157,10 @@ defmodule BrodSample.Publisher do
end
end
```
Let's take it for a spin
```
```elixir
iex(2)> recompile
Compiling 1 file (.ex)
:ok
Expand All @@ -172,7 +177,7 @@ Now we need to get those messages and do something with
First what we need is to define a group of subcribers to our topic, brod provides us an implementation called `group_subscriber_v2` which will create a worker for each partition of our topic, this not only allow us to have a better throughput, but in case one of these partitions end up having problems only that worker will be affected.
Let's take a look at the docs of the `group_subscriber_v2` at https://hexdocs.pm/brod/brod_group_subscriber_v2.html
Let's take a look at the docs of the `group_subscriber_v2` at <https://hexdocs.pm/brod/brod_group_subscriber_v2.html>
The first thing we can see is that it has some required functions and some optional.
Required callback functions: `init/2`, `handle_message/2`.
Expand Down Expand Up @@ -229,26 +234,33 @@ This can be done the following way.
{:ok, pid} = :brod.start_link_group_subscriber_v2(config)
```
There's a lot of information in here, so let's take a look on the most important ones.
### client
Expects the identifier of your kafka client, remember that we configure `:kafka_client` on the `dev.exs`, here we are just referencing that client that we already configured.
### group_id
This is the name of the consumer_group that will be used
### cb_module
The module where you defined the `init/2` and `handle_message/2`
### group_config
The configurations to use for the group coordinator
### consumer_config
Configurations for the partition consumer, here we only defined the `begin_offset` to `:earliest`, this means that our consumer will start from the earliest message available on the topic, you can also use `:latest` to start with the latests message available (This basically means that your consumer group will only get messages after it comes online)
After all of that we call `{:ok, pid} = :brod.start_link_group_subscriber_v2(config)` and that's it, brod will now start a worker for each partition our topic has and start consuming messages.
You should now see on your console all the messages you've sent earlier
## Warning
If you are not running your application in cluster mode you may go into some issues as the `group_subscriber` on multiple nodes may force each other re-join the group, if you wish to simulate this you can start
2 changes: 1 addition & 1 deletion contrib/examples/elixir/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ services:
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
Loading

0 comments on commit d412d8d

Please sign in to comment.