Skip to content

Commit 2ff798e

Browse files
authored
feat: Multiple Kafka Consumer Factories are now handled (both AVRO & JSON) (#48)
1 parent 1ef11ad commit 2ff798e

File tree

1 file changed

+9
-9
lines changed
  • tzatziki-spring-kafka/src/main/java/com/decathlon/tzatziki/steps

1 file changed

+9
-9
lines changed

tzatziki-spring-kafka/src/main/java/com/decathlon/tzatziki/steps/KafkaSteps.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,10 @@ public static String bootstrapServers() {
105105
List<ConsumerFactory<String, Object>> avroJacksonConsumerFactories = new ArrayList<>();
106106

107107
@Autowired(required = false)
108-
ConsumerFactory<String, GenericRecord> avroConsumerFactory;
108+
List<ConsumerFactory<String, GenericRecord>> avroConsumerFactories = new ArrayList<>();
109109

110110
@Autowired(required = false)
111-
ConsumerFactory<String, String> jsonConsumerFactory;
111+
List<ConsumerFactory<String, String>> jsonConsumerFactories = new ArrayList<>();
112112

113113
@Autowired(required = false)
114114
private KafkaListenerEndpointRegistry registry;
@@ -189,7 +189,7 @@ public void a_message_is_consumed_from_a_topic(Guard guard, String name, boolean
189189
guard.in(objects, () -> {
190190
KafkaInterceptor.awaitForSuccessfullOnly = successfully;
191191
if (!checkedTopics.contains(topic)) {
192-
try (Admin admin = Admin.create(avroConsumerFactory.getConfigurationProperties())) {
192+
try (Admin admin = Admin.create(avroConsumerFactories.get(0).getConfigurationProperties())) {
193193
awaitUntil(() -> {
194194
List<String> groupIds = admin.listConsumerGroups().all().get().stream().map(ConsumerGroupListing::groupId).toList();
195195
Map<String, KafkaFuture<ConsumerGroupDescription>> groupDescriptions = admin.describeConsumerGroups(groupIds).describedGroups();
@@ -223,7 +223,7 @@ public void a_message_is_consumed_from_a_topic(Guard guard, String name, boolean
223223
@When(THAT + GUARD + "the " + VARIABLE + " group id has fully consumed the " + VARIABLE + " topic$")
224224
public void topic_has_been_consumed_on_every_partition(Guard guard, String groupId, String topic) {
225225
guard.in(objects, () -> awaitUntilAsserted(() -> getAllConsumers(topic).forEach(consumer -> unchecked(() -> {
226-
try (Admin admin = Admin.create(avroConsumerFactory.getConfigurationProperties())) {
226+
try (Admin admin = Admin.create(avroConsumerFactories.get(0).getConfigurationProperties())) {
227227
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = admin
228228
.listConsumerGroupOffsets(groupId)
229229
.partitionsToOffsetAndMetadata().get();
@@ -241,7 +241,7 @@ public void topic_has_been_consumed_on_every_partition(Guard guard, String group
241241

242242
@Given(THAT + "the current offset of " + VARIABLE + " on the topic " + VARIABLE + " is (\\d+)$")
243243
public void that_the_current_offset_the_groupid_on_topic_is(String groupId, String topic, long offset) throws ExecutionException, InterruptedException {
244-
try (Admin admin = Admin.create(avroConsumerFactory.getConfigurationProperties())) {
244+
try (Admin admin = Admin.create(avroConsumerFactories.get(0).getConfigurationProperties())) {
245245
admin.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
246246
TopicPartition topicPartition = new TopicPartition(topic, 0);
247247
Collection<MemberDescription> members = admin.describeConsumerGroups(List.of(groupId)).describedGroups().get(groupId).get().members();
@@ -432,17 +432,17 @@ public List<Consumer<String, Object>> getAvroJacksonConsumers(String topic) {
432432
}
433433

434434
public Consumer<String, GenericRecord> getAvroConsumer(String topic) {
435-
if (avroConsumerFactory == null) {
435+
if (avroConsumerFactories.isEmpty()) {
436436
return null;
437437
}
438-
return avroConsumers.computeIfAbsent(topic, t -> avroConsumerFactory.createConsumer(UUID.randomUUID() + "_avro_" + t, ""));
438+
return avroConsumers.computeIfAbsent(topic, t -> avroConsumerFactories.get(0).createConsumer(UUID.randomUUID() + "_avro_" + t, ""));
439439
}
440440

441441
public Consumer<String, String> getJsonConsumer(String topic) {
442-
if (jsonConsumerFactory == null) {
442+
if (jsonConsumerFactories.isEmpty()) {
443443
return null;
444444
}
445-
return jsonConsumers.computeIfAbsent(topic, t -> this.jsonConsumerFactory.createConsumer(UUID.randomUUID() + "_json_" + t, ""));
445+
return jsonConsumers.computeIfAbsent(topic, t -> this.jsonConsumerFactories.get(0).createConsumer(UUID.randomUUID() + "_json_" + t, ""));
446446
}
447447

448448
@NotNull

0 commit comments

Comments
 (0)