This Java project provides a command-line utility for interacting with Apache Kafka. It allows users to produce messages, consume messages, describe Kafka topics, and view consumer group details. The utility is packaged as a fat JAR, including all necessary dependencies to run Kafka clients.
This project uses Maven for dependency management and building. To build the project, navigate to the project root directory and run the following command:
mvn clean compile assembly:single
This command compiles the project and creates a fat JAR in the target
directory, which contains all the necessary dependencies.
The utility accepts various parameters based on the operation you want to perform. The general usage pattern is:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation <operation> [other options]
--operation
: The operation to perform. Acceptsproduce
,consume
,describe-topic
,describe-group
,describe-cluster
,list-topics
andlist-consumers
.--bootstrap-server
: The Kafka bootstrap server(s) to connect to. Format:host1:port,host2:port
.--topic
: The name of the Kafka topic to interact with.--num-messages
: he number of messages to produce or consume. Default is1
.--group
: (Only forconsume
anddescribe-group
) The consumer group ID.--config-file
: Path of the config file including kafka client's security and other configs.--format
: Format of the message to produce/consume. Supported valuesstring
(default) ,avro
,protobuf
,jsonsr
. Expect forstring
, other formats requireschema.registry.url
and other Schema Registry security properties to be set in config file passed through--config-file
. Note this only definesvalue
's format.Key
format is hardcoded tostring
.--schema
: Schema of the Record's value.--schema-id
: Schema id for the schema from Schema Registry. Use this instead of--schema
when schema exists on Schema Registry.--send-keys
: Use this withoperation produce
to send Keys. Default isfalse
To produce messages to a topic:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation produce --bootstrap-server localhost:9092 --topic myTopic --num-messages 5
To produce messages to a topic:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation produce --bootstrap-server localhost:9092 --topic myTopic --send-keys true --num-messages 5
- Create a client.properties file with following Schema Registry configs:
cat client.properties:
# Set the following when producing/consuming in Schema Registry aware format
schema.registry.url=https://psrc-xxx.us-central1.gcp.confluent.cloud
# Set below configs if Schema Registry is secured
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=xxxx:xxxxxxxxxxxx
To produce messages to a topic using --schema:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation produce --bootstrap-server localhost:9092 --topic myTopic --num-messages 1 --config-file client.properties --format avro --schema '{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
>{"f1": "value1"}
To produce messages to a topic using --schema-id:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation produce --bootstrap-server localhost:9092 --topic myTopic --num-messages 1 --config-file client.properties --format avro --schema-id 101
>{"f1": "value1"}
- Create a client.properties file with following Schema Registry configs:
cat client.properties:
# Set the following when producing/consuming in Schema Registry aware format
schema.registry.url=https://psrc-xxx.us-central1.gcp.confluent.cloud
# Set below configs if Schema Registry is secured
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=xxxx:xxxxxxxxxxxx
To produce messages to a topic using --schema:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation produce --bootstrap-server localhost:9092 --topic myTopic --num-messages 1 --config-file client.properties --format protobuf --schema 'syntax = "proto3"; message SampleRecord {int32 my_field1 = 1 ; string my_field2 = 2;}'
>{"my_field1": 123, "my_field2": "hello world"}
To produce messages to a topic using --schema-id:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation produce --bootstrap-server localhost:9092 --topic myTopic --num-messages 1 --config-file client.properties --format protobuf --schema-id 101
>{"my_field1": 123, "my_field2": "hello world"}
- Create a client.properties file with following Schema Registry configs:
cat client.properties:
# Set the following when producing/consuming in Schema Registry aware format
schema.registry.url=https://psrc-xxx.us-central1.gcp.confluent.cloud
# Set below configs if Schema Registry is secured
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=xxxx:xxxxxxxxxxxx
To produce messages to a topic using --schema:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation produce --bootstrap-server localhost:9092 --topic myTopic --num-messages 1 --config-file client.properties --format jsonsr --schema '{"type":"object","properties":{"f1":{"type":"string"}}}'
>{"f1": "value1"}
To produce messages to a topic using --schema-id:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation produce --bootstrap-server localhost:9092 --topic myTopic --num-messages 1 --config-file client.properties --format jsonsr --schema-id 101
>{"f1": "value1"}
To consume messages from a topic:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation consume --bootstrap-server localhost:9092 --topic myTopic --group myGroup --num-messages 10
- Create a client.properties file with following Schema Registry configs:
cat client.properties:
# Set the following when producing/consuming in Schema Registry aware format
schema.registry.url=https://psrc-xxx.us-central1.gcp.confluent.cloud
# Set below configs if Schema Registry is secured
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=xxxx:xxxxxxxxxxxx
To consume messages from a topic use --format <avro|protobuf|jsonsr>
:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation consume --bootstrap-server localhost:9092 --topic myTopic --group myGroup --num-messages 10 --config-file client.properties --format avro
To describe a Kafka topic:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation describe-topic --bootstrap-server localhost:9092 --topic myTopic
To describe a Kafka topic:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation list-topics --bootstrap-server localhost:9092
To describe a Kafka topic:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation list-consumers --bootstrap-server localhost:9092
To describe a Kafka consumer group:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation describe-group --bootstrap-server localhost:9092 --group myGroup
To describe a Kafka cluster:
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation describe-cluster --bootstrap-server localhost:9092
To descibe topic in Confluent Cloud:
- Create a client.properties file with following configs:
cat client.properties:
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxxx" password="xxxxxx";
security.protocol=SASL_SSL
# Set the following when producing/consuming in Schema Registry aware format
schema.registry.url=https://psrc-xxx.us-central1.gcp.confluent.cloud
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=xxxx:xxxxxxxxxxxx
Run the command passing --config-file client.properties
java -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation describe-topic --bootstrap-server pkc-xxx.xx.gcp.confluent.cloud:9092 --topic myTopic --config-file client.properties
To enable DEBUG logging for kafka clients:
- Set rootLogger to DEBUG:
log4j.rootLogger=DEBUG, stderr
- For specific class/package TRACE logging, add (Example):
log4j.logger.org.apache.kafka.clients.producer.internals.ProducerBatch=TRACE
- Run the jar with
-Dlog4j.configuration=file:
java -Dlog4j.configuration=file:src/main/resources/log4j.properties -jar target/testKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar --operation <operation>