Provides a Kafka consumption pool component to work with Stuart Sierra's component.
The consumer component will manually commit offsets after each batch of messages
has been processed, instead of relying on auto.commit.enable
. This library takes
special care to make sure that messages have actually been processed before committing
which the default auto.commit.enable
doesn't have as strong semantics. To avoid a
thread committing another thread's offsets, each thread receives its own kafka consumer.
Add to your dependencies in your project.clj
:
Use KafkaWriter
and KafkaReader
in a system:
(ns myapp
(:require [kafka-component.core :as kafka-component]
[com.stuartsierra.component :as component])
(def config
{:writer-config {:native-producer-overrides {"bootstrap.servers" "localhost:9092"}}
:reader-config {:shutdown-timeout 4 ; default
:concurrency-level 2
:topics ["topic_one" "or_more"]
:native-consumer-overrides {"bootstrap.servers" "localhost:9092"
"group.id" "myapp"
"auto.offset.reset" "largest"}}})
(defn create-system [config]
(component/system-using
(component/system-map
:logger println
:exception-handler (fn [e] (.printStackTrace e))
:record-processor {:process (fn [{:keys [key value]}] (println "Received message: " value))}
:reader (kafka-component/map->KafkaReader (config :reader-config))
:writer (kafka-component/map->KafkaWriter (config :writer-config)))))
{:reader [:logger :exception-handler :record-processor]}))
(let [{:keys [writer]} (component/start (create-system config))]
(kafka-component/write writer "topic_one" "message-key" "message-body"))
It is also possible to kakfka-component/write-async
.
Because reader is listening to "topic_one", it will deliver a message to the record-processor:
Received message: message-body
You can use specically designed mock implementations for tests. The mocks emulate kafka, in-memory without the large startup overhead.
(ns myapp.tests
(:require [kafka-component.mock :as kafka-mock]
[clojure.test :refer :all]))
(deftest basic-produce-consume
(kafka-mock/with-test-producer-consumer producer consumer
;; tell mock producer to send a message on a kafka queue
(kafka-mock/send producer "topic" "key" "value")
;; tell mock consumer to subscribe to topic
(is (= [{:value "value" :key "key" :partition 0 :topic "topic" :offset 0}]
(kafka-mock/accumulate-messages consumer "topic" {:timeout 1000})))))
(deftest transforming-messages
(kafka-mock/with-test-producer-consumer producer consumer
(dotimes [n 100]
(kafka-mock/send producer "topic" "key" (str n)))
;; transform and filter messages
(is (= [1 3]
(kafka-mock/txfm-messages consumer "topic"
(comp (map :value)
(map #(Integer/parseInt %))
(filter odd?)
(take 2))
{:timeout 1000})))))
It is also possible to kafka-mock/send-async
.
The producer and consumer created by with-test-producer-consumer
run outside of
a system. To use the mocks inside a system, modify the system config:
(def test-config
(-> myapp/config
(assoc-in [:writer-config :native-producer-type] :mock)
(assoc-in [:reader-config :native-consumer-type] :mock)
(assoc-in [:reader-config :native-consumer-overrides "auto.offset.reset"] "earliest")))
Usually, you will want to do both.
- If your system has a
reader
, create messages outside of the system with aproducer
, then test that the system reads and processes the messages correctly. - If your system has a
writer
, poke your system to produce a message, then test that an externalconsumer
can read the message.
See the documentation for more.