Skip to content

Commit 7dbd1eb

Browse files
committed
Add global extraConfig parameter to KafkaClients
1 parent cc61bf1 commit 7dbd1eb

File tree

10 files changed

+50
-77
lines changed

10 files changed

+50
-77
lines changed

core/src/main/java/com/softwaremill/kmq/KafkaClients.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,18 @@
1212

1313
public class KafkaClients {
1414
private final String bootstrapServers;
15+
private final Map<String, Object> extraGlobalConfig;
1516

1617
public KafkaClients(String bootstrapServers) {
18+
this(bootstrapServers, Collections.emptyMap());
19+
}
20+
21+
/**
22+
* @param extraGlobalConfig Extra Kafka parameter configuration, e.g. SSL
23+
*/
24+
public KafkaClients(String bootstrapServers, Map<String, Object> extraGlobalConfig) {
1725
this.bootstrapServers = bootstrapServers;
26+
this.extraGlobalConfig = extraGlobalConfig;
1827
}
1928

2029
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer,
@@ -37,6 +46,9 @@ public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>>
3746
for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) {
3847
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
3948
}
49+
for (Map.Entry<String, Object> extraCfgEntry : extraGlobalConfig.entrySet()) {
50+
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
51+
}
4052

4153
return new KafkaProducer<>(props);
4254
}
@@ -60,10 +72,12 @@ public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
6072
if (groupId != null) {
6173
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
6274
}
63-
// extraConfig : configure the kafka parameters (ex: ssl, ...)
6475
for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) {
6576
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
6677
}
78+
for (Map.Entry<String, Object> extraCfgEntry : extraGlobalConfig.entrySet()) {
79+
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
80+
}
6781

6882
return new KafkaConsumer<>(props);
6983
}

core/src/main/java/com/softwaremill/kmq/KmqClient.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,24 +43,15 @@ public KmqClient(KmqConfig config, KafkaClients clients,
4343
Class<? extends Deserializer<K>> keyDeserializer,
4444
Class<? extends Deserializer<V>> valueDeserializer,
4545
long msgPollTimeout) {
46-
this(config, clients, keyDeserializer, valueDeserializer, msgPollTimeout, Collections.emptyMap());
47-
}
48-
49-
public KmqClient(KmqConfig config, KafkaClients clients,
50-
Class<? extends Deserializer<K>> keyDeserializer,
51-
Class<? extends Deserializer<V>> valueDeserializer,
52-
long msgPollTimeout, Map<String, Object> extraConfig) {
5346

5447
this.config = config;
5548
this.msgPollTimeout = msgPollTimeout;
5649

50+
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer);
5751
// Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
58-
// Adding the PARTITIONER_CLASS_CONFIG in extraConfig map, if extraConfig is not empty
59-
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer, extraConfig);
60-
extraConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class);
6152
this.markerProducer = clients.createProducer(
6253
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
63-
extraConfig);
54+
Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));
6455

6556
LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
6657
msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));

core/src/main/java/com/softwaremill/kmq/RedeliveryTracker.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@
1414
*/
1515
public class RedeliveryTracker {
1616
public static Closeable start(KafkaClients clients, KmqConfig config) {
17-
return start(clients, config, Collections.emptyMap());
18-
}
19-
20-
public static Closeable start(KafkaClients clients, KmqConfig config, Map<String, Object> extraConfig) {
21-
return RedeliveryActors.start(clients, config, extraConfig);
17+
return RedeliveryActors.start(clients, config);
2218
}
2319
}

core/src/main/scala/com.softwaremill.kmq/redelivery/CommitMarkerOffsetsActor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
1010
import scala.collection.JavaConverters._
1111
import scala.concurrent.duration._
1212

13-
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients, extraConfig: java.util.Map[String, Object]) extends Actor with StrictLogging {
13+
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients) extends Actor with StrictLogging {
1414

15-
private val consumer = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], extraConfig)
15+
private val consumer = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
1616

1717
private var toCommit: Map[Partition, Offset] = Map()
1818

core/src/main/scala/com.softwaremill.kmq/redelivery/ConsumeMarkersActor.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
1212

1313
import scala.collection.JavaConverters._
1414

15-
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig: java.util.Map[String, Object]) extends Actor with StrictLogging {
15+
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Actor with StrictLogging {
1616

1717
private val OneSecond = 1000L
1818

@@ -28,8 +28,8 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig:
2828
override def preStart(): Unit = {
2929
markerConsumer = clients.createConsumer(config.getRedeliveryConsumerGroupId,
3030
classOf[MarkerKey.MarkerKeyDeserializer],
31-
classOf[MarkerValue.MarkerValueDeserializer], extraConfig)
32-
producer = clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer], extraConfig)
31+
classOf[MarkerValue.MarkerValueDeserializer])
32+
producer = clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])
3333

3434
setupMarkerConsumer()
3535
setupOffsetCommitting()
@@ -61,7 +61,7 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig:
6161

6262
private def partitionAssigned(p: Partition, endOffset: Offset): Unit = {
6363
val redeliverActorProps = Props(
64-
new RedeliverActor(p, new RetryingRedeliverer(new DefaultRedeliverer(p, producer, config, clients, extraConfig))))
64+
new RedeliverActor(p, new RetryingRedeliverer(new DefaultRedeliverer(p, producer, config, clients))))
6565
.withDispatcher("kmq.redeliver-dispatcher")
6666
val redeliverActor = context.actorOf(
6767
redeliverActorProps,
@@ -74,7 +74,7 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig:
7474

7575
private def setupOffsetCommitting(): Unit = {
7676
commitMarkerOffsetsActor = context.actorOf(
77-
Props(new CommitMarkerOffsetsActor(config.getMarkerTopic, clients, extraConfig)),
77+
Props(new CommitMarkerOffsetsActor(config.getMarkerTopic, clients)),
7878
"commit-marker-offsets")
7979

8080
commitMarkerOffsetsActor ! DoCommit

core/src/main/scala/com.softwaremill.kmq/redelivery/Redeliverer.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@ trait Redeliverer {
2121

2222
class DefaultRedeliverer(
2323
partition: Partition, producer: KafkaProducer[Array[Byte], Array[Byte]],
24-
config: KmqConfig, clients: KafkaClients, extraConfig: java.util.Map[String, Object])
25-
extends Redeliverer with StrictLogging {
24+
config: KmqConfig, clients: KafkaClients) extends Redeliverer with StrictLogging {
2625

2726
private val SendTimeoutSeconds = 60L
2827

2928
private val tp = new TopicPartition(config.getMsgTopic, partition)
3029

3130
private val reader = {
32-
val c = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], extraConfig)
31+
val c = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
3332
c.assign(Collections.singleton(tp))
3433
new SingleOffsetReader(tp, c)
3534
}

core/src/main/scala/com.softwaremill.kmq/redelivery/RedeliveryActors.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,9 @@ import scala.collection.JavaConverters._
1313

1414
object RedeliveryActors extends StrictLogging {
1515
def start(clients: KafkaClients, config: KmqConfig): Closeable = {
16-
start(clients, config)
17-
}
18-
19-
def start(clients: KafkaClients, config: KmqConfig, extraConfig: java.util.Map[String, Object] = Collections.emptyMap()): Closeable = {
2016
val system = ActorSystem("kmq-redelivery")
2117

22-
val consumeMakersActor = system.actorOf(Props(new ConsumeMarkersActor(clients, config, extraConfig)), "consume-markers-actor")
18+
val consumeMakersActor = system.actorOf(Props(new ConsumeMarkersActor(clients, config)), "consume-markers-actor")
2319
consumeMakersActor ! DoConsume
2420

2521
logger.info("Started redelivery actors")

example-java/src/main/java/com/softwaremill/kmq/example/standalone/StandaloneConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,21 @@ class StandaloneConfig {
99
static final KmqConfig KMQ_CONFIG = new KmqConfig("queue", "markers", "kmq_client",
1010
"kmq_redelivery", Duration.ofSeconds(90).toMillis(), 1000);
1111

12+
/* EXAMPLE with extraConfig : SSL Encryption & SSL Authentication
13+
14+
Map extraConfig = new HashMap();
15+
//configure the following three settings for SSL Encryption
16+
extraConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
17+
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/directory/kafka.client.truststore.jks");
18+
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
19+
20+
// configure the following three settings for SSL Authentication
21+
extraConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/directory/kafka.client.keystore.jks");
22+
extraConfig.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
23+
extraConfig.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
24+
25+
static final KafkaClients KAFKA_CLIENTS = new KafkaClients("localhost:9092", extraConfig);
26+
*/
27+
1228
static final KafkaClients KAFKA_CLIENTS = new KafkaClients("localhost:9092");
1329
}

example-java/src/main/java/com/softwaremill/kmq/example/standalone/StandaloneProcessor.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,47 +3,26 @@
33
import com.softwaremill.kmq.KmqClient;
44
import com.softwaremill.kmq.example.UncaughtExceptionHandling;
55
import org.apache.kafka.clients.consumer.ConsumerRecord;
6-
import org.apache.kafka.clients.CommonClientConfigs;
76
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
8-
import org.apache.kafka.common.config.SslConfigs;
9-
107
import org.slf4j.Logger;
118
import org.slf4j.LoggerFactory;
129

13-
import java.io.IOException;
1410
import java.nio.ByteBuffer;
15-
import java.time.Clock;
1611
import java.util.Map;
17-
import java.util.HashMap;
1812
import java.util.Random;
1913
import java.util.concurrent.ConcurrentHashMap;
2014
import java.util.concurrent.ExecutorService;
2115
import java.util.concurrent.Executors;
2216
import java.util.concurrent.atomic.AtomicInteger;
2317

24-
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.*;
18+
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.KAFKA_CLIENTS;
19+
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.KMQ_CONFIG;
2520

2621
class StandaloneProcessor {
2722
private final static Logger LOG = LoggerFactory.getLogger(StandaloneProcessor.class);
2823

29-
public static void main(String[] args) throws InterruptedException, IOException {
24+
public static void main(String[] args) {
3025
UncaughtExceptionHandling.setup();
31-
32-
/* EXAMPLE with extraConfig : SSL Encryption & SSL Authentication
33-
Map extraConfig = new HashMap();
34-
//configure the following three settings for SSL Encryption
35-
extraConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
36-
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/directory/kafka.client.truststore.jks");
37-
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
38-
39-
// configure the following three settings for SSL Authentication
40-
extraConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/directory/kafka.client.keystore.jks");
41-
extraConfig.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
42-
extraConfig.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
43-
44-
KmqClient<ByteBuffer, ByteBuffer> kmqClient = new KmqClient<>(KMQ_CONFIG, KAFKA_CLIENTS,
45-
ByteBufferDeserializer.class, ByteBufferDeserializer.class, 100, extraConfig);
46-
*/
4726

4827
KmqClient<ByteBuffer, ByteBuffer> kmqClient = new KmqClient<>(KMQ_CONFIG, KAFKA_CLIENTS,
4928
ByteBufferDeserializer.class, ByteBufferDeserializer.class, 100);

example-java/src/main/java/com/softwaremill/kmq/example/standalone/StandaloneRedeliveryTracker.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,20 @@
22

33
import com.softwaremill.kmq.RedeliveryTracker;
44
import com.softwaremill.kmq.example.UncaughtExceptionHandling;
5-
import org.apache.kafka.clients.CommonClientConfigs;
6-
import org.apache.kafka.common.config.SslConfigs;
75
import org.slf4j.Logger;
86
import org.slf4j.LoggerFactory;
97

108
import java.io.Closeable;
119
import java.io.IOException;
12-
import java.util.HashMap;
13-
import java.util.Map;
1410

15-
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.*;
11+
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.KAFKA_CLIENTS;
12+
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.KMQ_CONFIG;
1613

1714
class StandaloneRedeliveryTracker {
1815
private final static Logger LOG = LoggerFactory.getLogger(StandaloneRedeliveryTracker.class);
1916

20-
public static void main(String[] args) throws InterruptedException, IOException {
17+
public static void main(String[] args) throws IOException {
2118
UncaughtExceptionHandling.setup();
22-
23-
/* EXAMPLE with extraConfig : SSL Encryption & SSL Authentication
24-
Map extraConfig = new HashMap();
25-
//configure the following three settings for SSL Encryption
26-
extraConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
27-
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/directory/kafka.client.truststore.jks");
28-
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
29-
30-
// configure the following three settings for SSL Authentication
31-
extraConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/directory/kafka.client.keystore.jks");
32-
extraConfig.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
33-
extraConfig.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
34-
35-
Closeable redelivery = RedeliveryTracker.start(KAFKA_CLIENTS, KMQ_CONFIG, scala.Option.apply(extraConfig));
36-
*/
3719

3820
Closeable redelivery = RedeliveryTracker.start(KAFKA_CLIENTS, KMQ_CONFIG);
3921
LOG.info("Redelivery tracker started");

0 commit comments

Comments
 (0)