Clojure wrapper for Kafka Streams
Declare kastr in your project.clj:
Use kastr in your clojure code:
(require '[kastr.core :as kastr])
:application-id - Name of Kafka Streams application
:bootstrap-servers - Comma separated list of Kafka brokers
:zookeeper-connect - Comma separated list of Zookeeper servers
:num-stream-threads (Default: 1) - Number of threads the stream instance will run
:key-serde-class (Default: String) - Default key serializer/deserializer
:value-serde-class (Default: String) - Default value serializer/deserializer
:auto-offset-reset (Default: :earliest) - Kafka topic offset the stream instance will read from
Example:
{:kafka-streams {:application-id "threat-detection-1"
:bootstrap-servers "localhost:9092"
:zookeeper-connect "localhost:2181"
:num-stream-threads 8}
:job {:input-topic "input-messages"
:output-topic "output-messages"}}
I would recommend using Component for managing the Kafka Streams runtime.
Example:
(defrecord KafkaStreams
[configuration kafka-streams-topology kafka-streams]
component/Lifecycle
(start [component]
(info "Starting Kafka Streams")
(let [configuration (kafka-streams-configuration configuration)
kafka-streams (init configuration kafka-streams-topology)]
(clean kafka-streams)
(start kafka-streams)
(assoc component :kafka-streams kafka-streams)))
(stop [component]
(try
(finally
(info "Shutting down Kafka Streams")
(if kafka-streams
(stop kafka-streams))))
(assoc component :kafka-streams nil)))
The kafka-streams-topology
is where you carry out your logic.
Basic Example:
Take messages from the input topic and send them to the output topic.
(defn mapv-function
[v]
(+ 1 v))
(defn basic-kafka-streams-topology
[configuration stream-builder]
(let [{:keys [job]} configuration
{:keys [input-topic output-topic]} job
message-stream (stream stream-builder [input-topic])]
(-> (map-v message-stream mapv-function)
(ks/to output-topic))))
I'm currently working on this so for now here is the official Developer Guide.
Run tests
$ lein test
Want to become a Kastr contributor?
Then checkout our code of conduct and contributing guidelines.
Copyright (c) 2017 Conor Hughes - Released under the MIT license.