Skip to content

Commit

Permalink
Improve Kafka messages handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jaguililla committed Sep 12, 2024
1 parent 57920fe commit 5afaa07
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,6 @@ class ApplicationConfiguration {
@Value(value = "${deleteMessage}")
private String deleteMessage;

// @Bean
// public KafkaAdmin kafkaAdmin() {
// return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress));
// }

// @Bean
// public NewTopic appointmentsTopic() {
// return new NewTopic("appointments", 1, (short) 1);
// }

// @Bean
// public ProducerFactory<String, String> producerFactory() {
// return new DefaultKafkaProducerFactory<>(Map.of(
// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
// ));
// }

// @Bean
// public ConsumerFactory<String, String> consumerFactory() {
// return new DefaultKafkaConsumerFactory<>(Map.of(
// ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
// ConsumerConfig.GROUP_ID_CONFIG, "group",
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
// ));
// }

@Bean
public KafkaTemplate<String, String> kafkaTemplate(
final ProducerFactory<String, String> producerFactory,
final ConsumerFactory<String, String> consumerFactory
) {
final var kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setConsumerFactory(consumerFactory);
return kafkaTemplate;
}

@Bean
AppointmentsNotifier appointmentsNotifier(final KafkaTemplate<String, String> kafkaTemplate) {
final var type = KafkaTemplateAppointmentsNotifier.class.getSimpleName();
Expand All @@ -78,6 +39,7 @@ AppointmentsService appointmentsService(
final UsersRepository usersRepository,
final AppointmentsNotifier appointmentsNotifier
) {
return new AppointmentsService(appointmentsRepository, usersRepository, appointmentsNotifier);
return
new AppointmentsService(appointmentsRepository, usersRepository, appointmentsNotifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.jaguililla.appointments.domain.AppointmentsNotifier;
import com.github.jaguililla.appointments.domain.Event;
import com.github.jaguililla.appointments.domain.model.Appointment;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
Expand Down Expand Up @@ -33,16 +34,24 @@ public KafkaTemplateAppointmentsNotifier(
public void notify(final Event event, final Appointment appointment) {
final var message = event == Event.CREATED ? createMessage : deleteMessage;

kafkaTemplate
.send(notifierTopic, message.formatted(appointment.start()))
.whenComplete((result, e) -> {
if (e == null) {
final var metadata = result.getRecordMetadata();
LOGGER.info("Message: '{}' offset: {}", message, metadata.offset());
}
else {
LOGGER.info("Message: '{}' FAILED due to: {}", message, e.getMessage());
}
});
try {
kafkaTemplate
.send(notifierTopic, message.formatted(appointment.start()))
.whenComplete((result, e) -> {
if (e == null) {
final var metadata = result.getRecordMetadata();
LOGGER.info("Message: '{}' offset: {}", message, metadata.offset());
}
else {
LOGGER.info("Message: '{}' FAILED due to: {}", message, e.getMessage());
}
})
.get();
}
catch (InterruptedException | ExecutionException e) {
var id = appointment.id();
var errorMessage = "Error sending notification for appointment: %s".formatted(id);
throw new IllegalStateException(errorMessage, e);
}
}
}
4 changes: 3 additions & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ spring:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# Consumer is only used in tests, auto-offset-reset is *REQUIRED*
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: group
group-id: tests
auto-offset-reset: earliest
27 changes: 12 additions & 15 deletions src/test/java/com/github/jaguililla/appointments/ApplicationIT.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.jaguililla.appointments;

import static java.util.Objects.requireNonNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
Expand All @@ -9,14 +8,14 @@
import com.github.jaguililla.appointments.http.controllers.messages.AppointmentResponse;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
Expand All @@ -36,8 +35,6 @@ class ApplicationIT {

private final TestTemplate client;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ConsumerFactory<String, String> consumerFactory;

ApplicationIT(@LocalServerPort final int portTest) {
Expand Down Expand Up @@ -88,29 +85,29 @@ void existing_appointments_can_be_fetched() {

@Test
void appointments_can_be_created_read_and_deleted() {
try (var consumer = consumerFactory.createConsumer()) {
consumer.subscribe(List.of("appointments"));
for (var r : consumer.poll(Duration.ZERO)) {

}
}

client.post("/appointments", new AppointmentRequest()
.id(UUID.randomUUID())
.startTimestamp(LocalDateTime.now())
.endTimestamp(LocalDateTime.now())
);
var response = client.getResponseBody(AppointmentResponse.class);
assertEquals(200, client.getResponseStatus().value());
var creationMessage = requireNonNull(kafkaTemplate.receive("appointments", 0, 0));
assertTrue(creationMessage.value().startsWith("Appointment created at"));
assertTrue(getLastMessage().startsWith("Appointment created at"));
client.get("/appointments/" + response.getId());
assertEquals(200, client.getResponseStatus().value());
client.delete("/appointments/" + response.getId());
assertEquals(200, client.getResponseStatus().value());
var deletionMessage = requireNonNull(kafkaTemplate.receive("appointments", 0, 1));
assertTrue(deletionMessage.value().startsWith("Appointment deleted at"));
assertTrue(getLastMessage().startsWith("Appointment deleted at"));
client.delete("/appointments/" + response.getId());
assertEquals(404, client.getResponseStatus().value());
}

private String getLastMessage() {
try (var consumer = consumerFactory.createConsumer()) {
consumer.assign(List.of(new TopicPartition("appointments", 0)));
var record = consumer.poll(Duration.ofMillis(250)).iterator().next().value();
consumer.commitSync();
return record;
}
}
}

0 comments on commit 5afaa07

Please sign in to comment.