Skip to content

Commit

Permalink
Merge pull request kafka4beam#17 from klarna/add-consumers-to-supervi…
Browse files Browse the repository at this point in the history
…sion-tree

refactor consumer
  • Loading branch information
id committed Feb 17, 2016
2 parents 697bbe8 + 17ae127 commit ecaa7bc
Show file tree
Hide file tree
Showing 33 changed files with 2,347 additions and 973 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ logs/
ebin/
*.beam
scripts/brod
/brod.plt
22 changes: 7 additions & 15 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,15 @@ language: erlang

sudo: required

env:
BROD_CLI_PRODUCE_TEST_TOPIC: brod-cli-produce-test
BROD_CLIENT_SUITE_TOPIC: brod-client-SUITE-topic
BROD_PRODUCER_SUITE_TOPIC: brod_producer_SUITE

before_install:
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 36A1D7869245C8950F966E92D8576A8BA88D21E9
- sudo sh -c "echo deb https://get.docker.io/ubuntu docker main > /etc/apt/sources.list.d/docker.list"
- sudo apt-key adv --keyserver hkp://p80.pool.sks-keyservers.net:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D
- sudo sh -c "echo deb https://apt.dockerproject.org/repo ubuntu-precise main > /etc/apt/sources.list.d/docker.list"
- sudo apt-get update
- sudo apt-get install lxc-docker
- sudo apt-get install docker-engine
- sudo docker info
- sudo docker pull spotify/kafka
- sudo docker run --name kafka -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST="localhost" --env ADVERTISED_PORT=9092 spotify/kafka
- sudo docker ps -a
- n=0; while [ "$(sudo docker exec kafka bash -c '/opt/kafka*/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do if [ $n -gt 4 ]; then echo timeout; exit 1; fi; n=$(( n + 1 )); sleep 1; done
- sudo docker exec kafka bash -c "/opt/kafka*/bin/kafka-topics.sh --zookeeper localhost --create --partitions 1 --replication-factor 1 --topic $BROD_CLI_PRODUCE_TEST_TOPIC"
- sudo docker exec kafka bash -c "/opt/kafka*/bin/kafka-topics.sh --zookeeper localhost --create --partitions 1 --replication-factor 1 --topic $BROD_CLIENT_SUITE_TOPIC"
- sudo docker exec kafka bash -c "/opt/kafka*/bin/kafka-topics.sh --zookeeper localhost --create --partitions 1 --replication-factor 1 --topic $BROD_PRODUCER_SUITE_TOPIC"
- curl -L https://github.com/docker/compose/releases/download/1.6.0/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin/

notifications:
email: false
Expand All @@ -30,6 +21,7 @@ otp_release:
- R16B03-1

script:
- scripts/setup-1-node-test-env.sh
- make deps
- make test; TEST_RESULT=$?; if [ ! $TEST_RESULT -eq 0 ]; then cat logs/ct_run.test@*/*.brod.logs/run.*/suite.log; echo $TEST_RESULT; exit 1; fi
- make xref
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ clean:
plt: $(DIALYZER_PLT)

$(DIALYZER_PLT):
dialyzer --build_plt --apps $(PLT_APPS) ebin --output_plt $(DIALYZER_PLT)
dialyzer --build_plt --apps $(PLT_APPS) ebin deps/supervisor3/ebin --output_plt $(DIALYZER_PLT)

dialyze: $(DIALYZER_PLT)
dialyzer -r ebin $(DIALYZER_OPTS)
dialyzer -r ebin deps/supervisor3/ebin $(DIALYZER_OPTS)

xref: compile
$(REBAR) xref
Expand Down
23 changes: 23 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM java:openjdk-8-jre

ENV SCALA_VERSION 2.11
ENV KAFKA_VERSION 0.9.0.0

RUN apt-get update && \
apt-get install -y zookeeper wget supervisor dnsutils && \
rm -rf /var/lib/apt/lists/* && \
apt-get clean && \
wget -q http://apache.mirrors.spacedump.net/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && \
tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && \
rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && \
ln -s /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION" /opt/kafka && \
mkdir -p /etc/kafka

COPY docker-entrypoint.sh /docker-entrypoint.sh
COPY server.properties /etc/kafka/server.properties

ENTRYPOINT ["/docker-entrypoint.sh"]

## run kafka by default, 'run zookeeper' to start zookeeper instead
CMD ["run", "kafka"]

16 changes: 16 additions & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# docker-compose kafka cluster

## quick ref:

### build/rebuild docker images:

docker-compose -f docker-compose-basic.yml build

### start one-node cluster:

docker-compose -f docker-compose-kafka-1.yml up -d

### start two-node cluster:

docker-compose -f docker-compose-kafka-2.yml up -d

13 changes: 13 additions & 0 deletions docker/docker-compose-basic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '2'

services:
kafka:
build:
context: .
zookeeper:
image: docker_kafka
ports:
- "2181:2181"
container_name: zookeeper
command: run zookeeper

13 changes: 13 additions & 0 deletions docker/docker-compose-kafka-1.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '2'

services:
zookeeper:
extends:
file: docker-compose-basic.yml
service: zookeeper
kafka_1:
image: docker_kafka
container_name: kafka_1
ports:
- "9092:9092"

21 changes: 21 additions & 0 deletions docker/docker-compose-kafka-2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version: '2'

services:
zookeeper:
extends:
file: docker-compose-basic.yml
service: zookeeper
kafka_1:
image: docker_kafka
container_name: kafka_1
ports:
- "9092:9092"
kafka_2:
image: docker_kafka
container_name: kafka_2
ports:
- "9093:9093"
environment:
BROKER_ID: 1
KAFKA_PORT: 9093

40 changes: 40 additions & 0 deletions docker/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash -xe

## run something other than zookeeper and kafka
if [ "$1" != "run" ]; then
exec "$@"
fi

## run zookeeper
if [ "$2" = "zookeeper" ]; then
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
exit $?
fi

if [ "$2" != "kafka" ]; then
echo "unknown target to run: $2"
exit 1
fi

## run kafka

prop_file="/etc/kafka/server.properties"

if [ ! -z "$BROKER_ID" ]; then
echo "broker id: $BROKER_ID"
sed -r -i "s/^(broker.id)=(.*)/\1=$BROKER_ID/g" $prop_file
fi

if [ ! -z "$KAFKA_PORT" ]; then
echo "port: $KAFKA_PORT"
sed -r -i "s/^(port)=(.*)/\1=$KAFKA_PORT/g" $prop_file
sed -r -i "s/^(listeners)=(.*)/\1=PLAINTEXT:\/\/:$KAFKA_PORT/g" $prop_file
fi

ipwithnetmask="$(ip -f inet addr show dev eth0 | awk '/inet / { print $2 }')"
ipaddress="${ipwithnetmask%/*}"

sed -r -i "s/^(advertised.host.name)=(.*)/\1=$ipaddress/g" $prop_file

/opt/kafka/bin/kafka-server-start.sh $prop_file

124 changes: 124 additions & 0 deletions docker/server.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

listeners=PLAINTEXT://:9092

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=localhost

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

auto.create.topics.enable=false

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zookeeper:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
50 changes: 30 additions & 20 deletions include/brod.hrl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%
%%% Copyright (c) 2014, 2015, Klarna AB
%%% Copyright (c) 2014-2016, Klarna AB
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand All @@ -17,25 +17,35 @@
-ifndef(__BROD_HRL).
-define(__BROD_HRL, true).

-record(message, { offset :: integer()
, crc :: integer()
, magic_byte :: integer()
, attributes :: integer()
, key :: binary()
, value :: binary()
}).

%% delivered to subsriber by brod_consumer
-record(message_set, { topic :: binary()
, partition :: integer()
, high_wm_offset :: integer()
, messages :: [#message{}]
}).

-type callback_fun() :: fun((#message_set{}) -> any()) |
fun((Offset :: integer(), Key :: binary(), Value :: binary()) -> any()).

-type client_id() :: atom().
-type kafka_topic() :: binary().
-type kafka_partition() :: non_neg_integer().
-type kafka_offset() :: integer().
-type kafka_error_code() :: atom() | integer().

-record(kafka_message,
{ offset :: kafka_offset()
, crc :: integer()
, magic_byte :: integer()
, attributes :: integer()
, key :: binary()
, value :: binary()
}).

-record(kafka_message_set,
{ topic :: topic()
, partition :: partition()
, high_wm_offset :: integer() %% max offset of the partition
, messages :: [#kafka_message{}]
}).

-record(kafka_fetch_error,
{ topic :: topic()
, partition :: partition()
, error_code :: error_code()
, error_desc :: string()
}).

-type brod_client_id() :: atom().

-define(BROD_DEFAULT_CLIENT_ID, brod_default_client).

Expand Down
2 changes: 1 addition & 1 deletion scripts/cli-produce-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ THIS_DIR="$(dirname $(readlink -f $0))"

BROD=$THIS_DIR/brod

$BROD produce localhost $BROD_CLI_PRODUCE_TEST_TOPIC 0 $(date +%s):$(date +%y-%m-%d-%H-%M-%S)
$BROD produce localhost brod-cli-produce-test 0 $(date +%s):$(date +%y-%m-%d-%H-%M-%S)

Loading

0 comments on commit ecaa7bc

Please sign in to comment.