Skip to content

Commit 9758b21

Browse files
authored
fx: Tests fixes + allow non-avro apps to assert full topic consumption (#153)
1 parent 5f23dc2 commit 9758b21

File tree

3 files changed

+15
-10
lines changed

3 files changed

+15
-10
lines changed

tzatziki-spring-kafka/src/main/java/com/decathlon/tzatziki/steps/KafkaSteps.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public void a_message_is_consumed_from_a_topic(Guard guard, String name, boolean
192192
guard.in(objects, () -> {
193193
KafkaInterceptor.awaitForSuccessfullOnly = successfully;
194194
if (!checkedTopics.contains(topic)) {
195-
try (Admin admin = Admin.create(avroConsumerFactories.get(0).getConfigurationProperties())) {
195+
try (Admin admin = Admin.create(getAnyConsumerFactory().getConfigurationProperties())) {
196196
awaitUntil(() -> {
197197
List<String> groupIds = admin.listConsumerGroups().all().get().stream().map(ConsumerGroupListing::groupId).toList();
198198
Map<String, KafkaFuture<ConsumerGroupDescription>> groupDescriptions = admin.describeConsumerGroups(groupIds).describedGroups();
@@ -222,6 +222,11 @@ public void a_message_is_consumed_from_a_topic(Guard guard, String name, boolean
222222
});
223223
}
224224

225+
@NotNull
226+
private ConsumerFactory<String, ?> getAnyConsumerFactory() {
227+
return Stream.concat(Stream.concat(jsonConsumerFactories.stream(), avroConsumerFactories.stream()), avroJacksonConsumerFactories.stream()).findFirst().get();
228+
}
229+
225230
@SneakyThrows
226231
@When(THAT + GUARD + "the " + VARIABLE + " group id has fully consumed the " + VARIABLE + " topic$")
227232
public void topic_has_been_consumed_on_every_partition(Guard guard, String groupId, String topic) {

tzatziki-spring-kafka/src/test/java/com/decathlon/tzatziki/kafka/KafkaUsersListener.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
11
package com.decathlon.tzatziki.kafka;
22

3-
import java.util.List;
43
import lombok.extern.slf4j.Slf4j;
54
import org.apache.avro.generic.GenericRecord;
6-
import org.springframework.boot.test.mock.mockito.SpyBean;
5+
import org.springframework.beans.factory.annotation.Autowired;
76
import org.springframework.kafka.annotation.KafkaListener;
87
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
98
import org.springframework.kafka.listener.BatchListenerFailedException;
109
import org.springframework.messaging.handler.annotation.Header;
1110
import org.springframework.messaging.handler.annotation.Payload;
1211
import org.springframework.stereotype.Service;
1312

14-
import static org.springframework.kafka.support.KafkaHeaders.OFFSET;
15-
import static org.springframework.kafka.support.KafkaHeaders.RECEIVED_KEY;
16-
import static org.springframework.kafka.support.KafkaHeaders.RECEIVED_PARTITION;
17-
import static org.springframework.kafka.support.KafkaHeaders.RECEIVED_TOPIC;
13+
import java.util.List;
14+
15+
import static org.springframework.kafka.support.KafkaHeaders.*;
1816

1917
@Service
2018
@Slf4j
2119
public class KafkaUsersListener extends AbstractConsumerSeekAware implements Seeker {
22-
@SpyBean
20+
@Autowired
2321
private CountService countService;
2422

2523
@KafkaListener(topics = "users", groupId = "users-group-id", containerFactory = "defaultFactory")

tzatziki-spring-kafka/src/test/java/com/decathlon/tzatziki/steps/TestApplicationSteps.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.mockito.Mockito;
1515
import org.springframework.beans.factory.annotation.Autowired;
1616
import org.springframework.boot.test.context.SpringBootTest;
17+
import org.springframework.boot.test.mock.mockito.SpyBean;
1718
import org.springframework.boot.test.util.TestPropertyValues;
1819
import org.springframework.context.ApplicationContextInitializer;
1920
import org.springframework.context.ConfigurableApplicationContext;
@@ -36,7 +37,8 @@ public void initialize(ConfigurableApplicationContext configurableApplicationCon
3637
TestPropertyValues.of(
3738
"spring.kafka.bootstrap-servers=" + KafkaSteps.bootstrapServers(),
3839
"spring.kafka.consumer.properties.fetch.min.bytes=100000",
39-
"spring.kafka.consumer.properties.fetch.max.wait.ms=1"
40+
"spring.kafka.consumer.properties.fetch.max.wait.ms=1",
41+
"spring.kafka.consumer.auto-offset-reset=earliest"
4042
).applyTo(configurableApplicationContext.getEnvironment());
4143
KafkaSteps.autoSeekTopics("exposed-users", "json-users");
4244
}
@@ -48,7 +50,7 @@ public TestApplicationSteps(ObjectSteps objects) {
4850
this.objects = objects;
4951
}
5052

51-
@Autowired
53+
@SpyBean
5254
CountService spyCountService;
5355

5456
@Autowired

0 commit comments

Comments
 (0)