This nice logo made by @l3r8yJ
Project architect: @h1alexbel
EO Kafka Producers and consumers for working with Apache Kafka message broker.
Read Kafka Producers and Consumers for Elegant Microservices,
the blog post about EO-Kafka
, and EO-Kafka with Spring,
about how to connect EO-Kafka
with Spring.
Motivation. We are not happy with Spring Kafka, because it is very procedural and not object-oriented. eo-kafka is suggesting to do almost exactly the same, but through objects.
Principles. These are the design principles behind eo-kafka.
How to use. All you need is this (get the latest version here):
Maven:
<dependency>
<groupId>io.github.eo-cqrs</groupId>
<artifactId>eo-kafka</artifactId>
</dependency>
To use it with Spring Boot:
<dependency>
<groupId>io.github.eo-cqrs</groupId>
<artifactId>eo-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
With Gradle:
dependencies {
compile 'io.github.eo-cqrs:eo-kafka:<version>'
}
To create Kafka Message with Topic, Key and Value:
final Message<String, String> msg = new Tkv<>("test.topic", "test-k", "test-v");
Creation Kafka Message with Partition:
final Message<String, String> msg =
new WithPartition<>(
0,
new Tkv<>(
"test.topic",
"test-k",
"test-v"
)
);
Creation Kafka Message with Timestamp:
final Message<String, String> msg =
new Timestamped<>(
tmstmp,
new WithPartition<>(
partition,
new Tkv<>(
topic,
key,
value
)
)
);
To create Kafka Producer you can wrap original KafkaProducer:
final KafkaProducer origin = ...;
final Producer<String, String> producer = new KfProducer<>(origin);
Or construct it with KfFlexible:
final Producer<String, String> producer =
new KfProducer<>(
new KfFlexible<>(
new KfProducerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new KeySerializer("org.apache.kafka.common.serialization.StringSerializer"),
new ValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
)
)
)
);
Or create it with XML file:
final Producer<String, String> producer =
new KfProducer<>(
new KfXmlFlexible<String, String>(
"producer.xml" // file with producer config
)
);
btw, your XML file should be in the resources
look like:
<producer>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializer>org.apache.kafka.common.serialization.StringSerializer</keySerializer>
<valueSerializer>org.apache.kafka.common.serialization.StringSerializer</valueSerializer>
</producer>
Since version 0.4.6
you can create Producer with JSON file:
final Producer<String, String> producer =
new KfProducer<>(
new KfJsonFlexible<String, String>(
"producer.json" // file with producer config
)
);
Your JSON, located in resources directory, should look like this:
{
"bootstrapServers": "localhost:9092",
"keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
"valueSerializer": "org.apache.kafka.common.serialization.StringSerializer"
}
Since version 0.5.6
you can create Producer with YAML file:
final Producer<String, String> producer =
new KfProducer<>(
new KfYamlProducerSettings<>(
"producer.yaml"
)
);
Your YAML, located in resources directory, should look like this:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
To send a message:
try (final Producer<String, String> producer = ...) {
producer.send(
new WithPartition<>(
0,
new Tkv<>(
"xyz.topic",
"key",
"message"
)
)
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
Also, you can create KfCallback, Kafka Producer with async Callback support:
final Producer<String, String> producer =
new KfCallback<>(
new KfFlexible<>(
new KfProducerParams(
new KfParams(
// producer params
)
)
),
new Callback() {
@Override
public void onCompletion(final RecordMetadata meta, final Exception ex) {
// logic
}
}
);
To create Kafka Consumer you can wrap original KafkaConsumer:
final KafkaConsumer origin = ...;
final Consumer<String, String> producer = new KfConsumer<>(origin);
Using KfFlexible:
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new GroupId("1"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
)
);
And XML file approach:
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfXmlFlexible<String, String>("consumer.xml")
);
Again, XML file should be in the resources
look like:
<consumer>
<bootstrapServers>localhost:9092</bootstrapServers>
<groupId>1</groupId>
<keyDeserializer>org.apache.kafka.common.serialization.StringDeserializer</keyDeserializer>
<valueDeserializer>org.apache.kafka.common.serialization.StringDeserializer</valueDeserializer>
</consumer>
Since version 0.4.6
you can create Consumer with JSON file:
final Consumer<String, String> producer =
new KfConsumer<>(
new KfJsonFlexible<String, String>(
"consumer.json" // file with producer config
)
);
Your JSON, located in resources directory, should look like this:
{
"bootstrapServers": "localhost:9092",
"groupId": "1",
"keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
Since version 0.5.6
you can create Consumer with YAML file:
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfYamlConsumerSettings<>(
"consumer.yaml"
)
);
Your YAML, located in resources directory, should look like this:
bootstrap-servers: localhost:9092
group-id: "1"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Consuming messages:
try (
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers(this.severs),
new GroupId("1"),
new AutoOffsetReset("earliest"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
)
)
) {
// you need to be subscribed on a topic to iterate over data in the topic
// consumer.subscribe(new ListOf<>("orders-saga-init")));
// or call #records(topic, duration) it will subscribe to the topic you provide
final ConsumerRecords<String, String> records = consumer.records("orders-saga-init", Duration.ofSeconds(5L));
}
}
Also, you can subscribe
with ConsumerRebalanceListener:
consumer.subscribe(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
}
}, "<your topic>");
}
);
Finally, you can unsubscribe
:
consumer.unsubscribe();
In case of mocking eo-kafka, you can use existing Fake Objects from
io.github.eocqrs.kafka.fake
package. They look like a normal ones,
but instead of talking to real Kafka broker, they are manipulating
in-memory XML document.
final FkBroker broker = new InXml(
new Synchronized(
new InFile(
"consumer-test", "<broker/>"
)
)
);
It will create in-memory XML document with following structure:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
<topics/>
<subs/>
</broker>
you can create a topic inside broker:
broker.with(new TopicDirs("fake.topic").value());
Under the hood XML will be modified to:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
<topics>
<topic>
<name>fake.topic</name>
<datasets/>
</topic>
</topics>
<subs/>
</broker>
final Producer<String, String> producer =
new FkProducer<>(
UUID.randomUUID(),
broker
);
final Consumer<Object, String> consumer =
new FkConsumer(
UUID.randomUUID(),
broker
);
final String topic = "test";
final Consumer<Object, String> consumer =
new FkConsumer(UUID.randomUUID(),
this.broker
.with(new TopicDirs(topic).value())
);
final Producer<String, String> producer =
new FkProducer<>(UUID.randomUUID(), this.broker);
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test1",
"test-data-1"
)
)
);
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test2",
"test-data-2"
)
)
);
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test-data-3",
"test3"
)
)
);
final ConsumerRecords<Object, String> records =
consumer.records(topic, Duration.ofSeconds(1L));
final List<String> datasets = new ListOf<>();
records.forEach(rec -> datasets.add(rec.value()));
MatcherAssert.assertThat(
"First datasets in right format",
datasets,
Matchers.contains("test-data-1", "test-data-2", "test-data-3")
);
As well as production producers and consumers, fake ones also should be closed after things been done:
fake.close();
Under the hood XML document will looks like this:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
<topics>
<topic>
<name>test</name>
<datasets>
<dataset>
<partition>0</partition>
<key>test1</key>
<value>test-data-1</value>
<seen>true</seen>
</dataset>
<dataset>
<partition>0</partition>
<key>test2</key>
<value>test-data-2</value>
<seen>true</seen>
</dataset>
<dataset>
<partition>0</partition>
<key>test3</key>
<value>test-data-3</value>
<seen>true</seen>
</dataset>
</datasets>
</topic>
</topics>
<subs>
<sub>
<topic>test</topic>
<consumer>aa4a2008-764b-4e19-9368-8250df4bea38</consumer>
</sub>
</subs>
</broker>
By the version 0.3.5
, eo-kafka support only String values in FkConsumer.
Kafka Property | eo-kafka API | XML/JSON tag | YAML |
---|---|---|---|
bootstrap.servers |
BootstrapServers | bootstrapServers | bootstrap-servers |
key.serializer |
KeySerializer | keySerializer | key-serializer |
value.serializer |
ValueSerializer | valueSerializer | value-serializer |
key.deserializer |
KeyDeserializer | keyDeserializer | key-deserializer |
value.deserializer |
ValueDeserializer | valueDeserializer | value-Deserializer |
group.id |
GroupId | groupId | group-id |
auto.offset.reset |
AutoOffsetReset | autoOffsetReset | auto-offset-reset |
client.id |
ClientId | clientId | client-id |
client.rack |
ClientRack | clientRack | client-rack |
acks |
Acks | acks | acks |
security.protocol |
SecurityProtocol | securityProtocol | security-protocol |
sasl.jaas.config |
SaslJaasConfig | saslJaasConfig | sasl-jaas-config |
sasl.mechanism |
SaslMechanism | saslMechanism | sasl-mechanism |
batch.size |
BatchSize | batchSize | batch-size |
buffer.memory |
BufferMemory | bufferMemory | buffer-memory |
linger.ms |
LingerMs | lingerMs | linger-ms |
retries |
Retries | retries | retries |
retry.backoff.ms |
RetryBackoffMs | retryBackoffMs | retry-backoff-ms |
compression.type |
CompressionType | compressionType | compression-type |
partition.assignment.strategy |
PartitionAssignmentStrategy | partitionAssignmentStrategy | partition-assignment-strategy |
max.poll.records |
MaxPollRecords | maxPollRecords | max-poll-records |
max.poll.interval.ms |
MaxPollIntervalMs | maxPollIntervalMs | max-poll-intervalMs |
heartbeat.interval.ms |
HeartbeatIntervalMs | heartbeatIntervalMs | heartbeat-interval-ms |
enable.auto.commit |
EnableAutoCommit | enableAutoCommit | enable-auto-commit |
session.timeout.ms |
SessionTimeoutMs | sessionTimeoutMs | session-timeout-ms |
max.partition.fetch.bytes |
MaxPartitionFetchBytes | maxPartitionFetchBytes | max-partition-fetch-bytes |
fetch.max.wait.ms |
FetchMaxWaitMs | fetchMaxWaitMs | fetch-max-wait-ms |
fetch.min.bytes |
FetchMinBytes | fetchMinBytes | fetch-min-bytes |
fetch.max.bytes |
FetchMaxBytes | fetchMaxBytes | fetch-max-bytes |
send.buffer.bytes |
SendBufferBytes | sendBufferBytes | send-buffer-bytes |
receive.buffer.bytes |
ReceiveBufferBytes | receiveBufferBytes | receive-buffer-bytes |
max.block.ms |
MaxBlockMs | maxBlockMs | max-block-ms |
max.request.size |
MaxRqSize | maxRequestSize | max-request-size |
group.instance.id |
GroupInstanceId | groupInstanceId | group-instance-id |
max.in.flight.requests.per.connection |
MaxInFlightRq | maxInFlightRequestsPerConnection | max-in-flight-requests-per-connection |
delivery.timeout.ms |
DeliveryTimeoutMs | deliveryTimeoutMs | delivery-timeout-ms |
enable.idempotence |
EnableIdempotence | enableIdempotence | enable-idempotence |
Fork repository, make changes, send us a pull request.
We will review your changes and apply them to the master
branch shortly,
provided they don't violate our quality standards. To avoid frustration,
before sending us your pull request please run full Maven build:
mvn clean install
You will need Maven 3.8.7+ and Java 17+.
If you want to contribute to the next release version of eo-kafka, please check the project board.
Our rultor image for CI/CD.