This application is used to publish messages to Kafka along with registering the schema in the schema registry.
This application follows a 12-Factor App methodology.
Note: This is just an alternative to Kafka Connect and should be used only in the case of some bespoke scenario's which cannot be accomplished using the connect cluster.
Stream tweets from twitter for some specific hashtags and publish them into kafka
You need to have the following before you can start using this application
- JDK 1.8 +
- Zookeeper
- Kafka
- Schema Registry
- Kafka Commons maven dependency
- Twitter Integration tokens
Note: Once you have the twitter integration tokens, add them to the
kafka-avro-producer/src/main/resources/twitter4j.properties
file in the repo
Clone the repo and cd
into the root directory of the repo.
Run the following command to build the application
mvn clean install
Run the application using
java -jar ./target/*.jar
The configuration for the application can be externalized by using a dedicated config server or by having a properties file.
The following are the available customizing options
# Kafka settings
kafka:
topic: Tweets
bootstrap:
servers: localhost:9092
# Schema settings
schema:
registry:
url: http://localhost:8081
version: tweet:v1
# Twitter settings
twitter:
hashtags: sport,politics,health
# Server settings
server:
port: 10001
# Application information
info:
app:
name: Kafka Producer Application
description: Read messages from twitter and push it to kafka (Alternative to Kafka Connect)
version: 1.0.0
# Actuator Settings
management:
endpoints:
web:
exposure:
include: info,health,metrics,beans
Uses Spring's Actuator project. This can be used to track the health of the application, application metrics and beans registered.
Makes use of slf4j
for logging. All the logs are spit out to the console, but can be configured to do otherwise.
Prefered to run in a container, and fluentd
can be used to ship the logs to a fluentd
agent and searched in Splunk
or Elastic Search
.
K8s
ready
- As mentioned above, the applications health can be tracked vis the Actuator endpoints. You can run Confluent Platform's console avro consumer by running the following
./bin/kafka-avro-console-consumer --topic test \
--zookeeper localhost:2181 \
--from-beginning
Note: Make sure you have Confluent's binaries
The schema would have been registered in the schema registry. You can check that out by issuing the following cURL command
curl -X GET \
http://localhost:8081/subjects/TweetRegistry/versions/latest \
-H 'Cache-Control: no-cache'
which would result in an output like this:
{
"subject": "TweetRegistry",
"version": 2,
"id": 12,
"schema": "{\"type\":\"record\",\"name\":\"TweetRegistry\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"tweetedOn\",\"type\":\"string\",\"default\":\"null\"}]}"
}
Check out the Kafka-Avro-Consumer which can be used to consume these tweets with a BACKWARD
compatibility level of the schema.