Skip to content

Commit eb233ff

Browse files
authored
feat: wait for poll step (#127)
1 parent 6dbf177 commit eb233ff

File tree

3 files changed

+25
-1
lines changed

3 files changed

+25
-1
lines changed

tzatziki-spring-kafka/src/main/java/com/decathlon/tzatziki/kafka/KafkaInterceptor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.decathlon.tzatziki.kafka;
22

3+
import com.decathlon.tzatziki.steps.KafkaSteps;
34
import com.decathlon.tzatziki.utils.Fields;
45
import lombok.AllArgsConstructor;
56
import lombok.extern.slf4j.Slf4j;
@@ -96,6 +97,9 @@ private Object createConsumerProxy(Consumer<?, ?> consumer) {
9697
return switch (method.getName()) {
9798
case "poll" -> {
9899
ConsumerRecords<String, ?> consumerRecords = (ConsumerRecords<String, ?>) method.invoke(consumer, args);
100+
consumer.subscription().stream()
101+
.filter(KafkaSteps.semaphoreByTopic::containsKey)
102+
.forEach(topic -> KafkaSteps.semaphoreByTopic.remove(topic).release());
99103
if (consumerRecords.count() == 0) {
100104
yield consumerRecords;
101105
}

tzatziki-spring-kafka/src/main/java/com/decathlon/tzatziki/steps/KafkaSteps.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.jetbrains.annotations.NotNull;
3030
import org.jetbrains.annotations.Nullable;
3131
import org.junit.Assert;
32+
import org.junit.jupiter.api.Assertions;
3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
3435
import org.springframework.kafka.core.ConsumerFactory;
@@ -39,6 +40,7 @@
3940
import java.time.Duration;
4041
import java.util.*;
4142
import java.util.concurrent.ExecutionException;
43+
import java.util.concurrent.Semaphore;
4244
import java.util.stream.Collectors;
4345
import java.util.stream.Stream;
4446
import java.util.stream.StreamSupport;
@@ -70,6 +72,7 @@ public class KafkaSteps {
7072

7173
private static boolean isStarted;
7274

75+
public static final Map<String, Semaphore> semaphoreByTopic = new LinkedHashMap<>();
7376
public static synchronized void start() {
7477
start(null);
7578
}
@@ -239,6 +242,19 @@ public void topic_has_been_consumed_on_every_partition(Guard guard, String group
239242
}))));
240243
}
241244

245+
@When(THAT + GUARD + "the " + VARIABLE + " topic was just polled$")
246+
public void topic_was_just_polled(Guard guard, String topic) {
247+
guard.in(objects, () -> {
248+
Semaphore semaphore = new Semaphore(0);
249+
semaphoreByTopic.put(topic, semaphore);
250+
try {
251+
semaphore.acquire();
252+
} catch (InterruptedException e) {
253+
Assertions.fail(e);
254+
}
255+
});
256+
}
257+
242258
@Given(THAT + "the current offset of " + VARIABLE + " on the topic " + VARIABLE + " is (\\d+)$")
243259
public void that_the_current_offset_the_groupid_on_topic_is(String groupId, String topic, long offset) throws ExecutionException, InterruptedException {
244260
try (Admin admin = Admin.create(avroConsumerFactories.get(0).getConfigurationProperties())) {

tzatziki-spring-kafka/src/test/resources/com/decathlon/tzatziki/steps/kafka.feature

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,4 +406,8 @@ Feature: to interact with a spring boot service having a connection to a kafka q
406406

407407
Then the json-users-input topic contains 1 json message
408408

409-
But within 500ms the json-users-input topic contains 2 json messages
409+
But within 500ms the json-users-input topic contains 2 json messages
410+
411+
@ignore
412+
Scenario: we wait for a poll to occur on a specific topic
413+
When the json-users-input topic was just polled

0 commit comments

Comments
 (0)