Skip to content

Commit

Permalink
Added Kip62 polling config parameters (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
leandrob13 authored and Avasil committed Feb 1, 2018
1 parent 9fde038 commit 2617026
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
7 changes: 5 additions & 2 deletions kafka-1.0.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ kafka {
group.id = ""
heartbeat.interval.ms = 3000
max.partition.fetch.bytes = 1048576
session.timeout.ms = 30000
auto.offset.reset = "latest"
enable.auto.commit = true
exclude.internal.topics = true
max.poll.records = 2147483647
receive.buffer.bytes = 65536
check.crcs = true
fetch.max.wait.ms = 500
# Default values for polling
# See https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
session.timeout.ms = 10000
max.poll.records = 500
max.poll.interval.ms = 300000

# Monix specific settings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ final case class KafkaConsumerConfig(
enableAutoCommit: Boolean,
excludeInternalTopics: Boolean,
maxPollRecords: Int,
maxPollInterval: FiniteDuration,
receiveBufferInBytes: Int,
requestTimeout: FiniteDuration,
saslKerberosServiceName: Option[String],
Expand Down Expand Up @@ -255,6 +256,7 @@ final case class KafkaConsumerConfig(
"enable.auto.commit" -> enableAutoCommit.toString,
"exclude.internal.topics" -> excludeInternalTopics.toString,
"max.poll.records" -> maxPollRecords.toString,
"max.poll.interval.ms" -> maxPollInterval.toMillis.toString,
"receive.buffer.bytes" -> receiveBufferInBytes.toString,
"request.timeout.ms" -> requestTimeout.toMillis.toString,
"sasl.kerberos.service.name" -> saslKerberosServiceName.orNull,
Expand Down Expand Up @@ -390,6 +392,7 @@ object KafkaConsumerConfig {
enableAutoCommit = config.getBoolean("enable.auto.commit"),
excludeInternalTopics = config.getBoolean("exclude.internal.topics"),
maxPollRecords = config.getInt("max.poll.records"),
maxPollInterval = config.getInt("max.poll.interval.ms").millis,
receiveBufferInBytes = config.getInt("receive.buffer.bytes"),
requestTimeout = config.getInt("request.timeout.ms").millis,
saslKerberosServiceName = getOptString("sasl.kerberos.service.name"),
Expand Down

0 comments on commit 2617026

Please sign in to comment.