Skip to content

Update kafka-clients and embedded-kafka to 4.0.0 #1504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 21, 2025
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ jobs:
fail-fast: false
matrix:
java:
- '11'
- '17'
- '21'
- '24'
steps:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ lazy val binCompatVersionToCompare =
compatVersion
}

lazy val kafkaVersion = "3.9.0"
lazy val embeddedKafkaVersion = "3.9.0" // Should be the same as kafkaVersion, except for the patch part
lazy val kafkaVersion = "4.0.0"
lazy val embeddedKafkaVersion = "4.0.1" // Should be the same as kafkaVersion, except for the patch part

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.18"
Expand All @@ -49,6 +49,7 @@ inThisBuild(
scala3 := _scala3,
// We only support Scala 2.13+ and 3+. See https://github.com/zio/zio-kafka/releases/tag/v2.7.0
crossScalaVersions := List(scala213.value, scala3.value),
ciTargetJavaVersions := List("17", "21", "24"),
ciEnabledBranches := Seq("master", "series/0.x"),
useCoursier := false,
Test / parallelExecution := false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.openjdk.jmh.annotations._
import zio.kafka.admin.AdminClient.NewTopic
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription }
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils
Expand All @@ -25,8 +26,10 @@ class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka] {
adminClient <- KafkaTestUtils.makeAdminClient
_ <- adminClient.deleteTopic(topic1).ignore
_ <- adminClient.createTopic(NewTopic(topic1, partitionCount, replicationFactor = 1))
producer <- KafkaTestUtils.makeProducer
_ <- KafkaTestUtils.produceMany(producer, topic1, kvs)
// Our tests run too short for linger to have a positive influence: set it to zero.
settings <- KafkaTestUtils.producerSettings.map(_.withLinger(0.millis))
producer <- Producer.make(settings)
_ <- KafkaTestUtils.produceMany(producer, topic1, kvs)
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ class ZioKafkaProducerBenchmark extends ProducerZioBenchmark[Kafka with Producer
})

override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] =
ZLayer.make[Kafka with Producer](Kafka.embedded, KafkaTestUtils.producer).orDie
ZLayer
.make[Kafka with Producer](
Kafka.embedded,
// Our tests run too short for linger to have a positive influence: set it to zero.
ZLayer.fromZIO(KafkaTestUtils.producerSettings.map(_.withLinger(0.millis))),
Producer.live
)
.orDie

override def initialize: ZIO[Kafka & Producer, Throwable, Any] =
ZIO.scoped {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object MyKafka {
customBrokerProperties = Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"authorizer.class.name" -> "org.apache.kafka.metadata.authorizer.StandardAuthorizer",
"super.users" -> "User:ANONYMOUS"
)
)
Expand Down
79 changes: 4 additions & 75 deletions zio-kafka-test/src/test/scala/zio/kafka/admin/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import zio.kafka.admin.AdminClient.{
ConfigResource,
ConfigResourceType,
ConsumerGroupDescription,
ConsumerGroupState,
KafkaConfig,
GroupState,
ListConsumerGroupOffsetsSpec,
ListConsumerGroupsOptions,
OffsetAndMetadata,
Expand Down Expand Up @@ -280,7 +279,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
groupId <- randomGroup
_ <- consumeNoop(topicName, groupId, "consumer1", Some("instance1")).fork
_ <- getStableConsumerGroupDescription(client, groupId)
list <- client.listConsumerGroups(Some(ListConsumerGroupsOptions(Set(ConsumerGroupState.Stable))))
list <- client.listConsumerGroups(Some(ListConsumerGroupsOptions(Set(GroupState.Stable))))
} yield assert(list)(exists(hasField("groupId", _.groupId, equalTo(groupId))))
},
test("list consumer group offsets") {
Expand Down Expand Up @@ -518,76 +517,6 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
assert(deleteResult)(equalTo(Map((configResource, ()))))
}
},
test("alter configs") {
for {
topicName <- randomTopic
client <- KafkaTestUtils.makeAdminClient
_ <- client.createTopic(AdminClient.NewTopic(topicName, numPartitions = 1, replicationFactor = 1))

configEntry = new ConfigEntry("retention.ms", "1")
configResource = ConfigResource(ConfigResourceType.Topic, topicName)

kafkaConfig = KafkaConfig(Map(topicName -> configEntry))
_ <- client.alterConfigs(Map(configResource -> kafkaConfig), AlterConfigsOptions())
updatedConfigsWithUpdate <- client.describeConfigs(Seq(ConfigResource(ConfigResourceType.Topic, topicName)))

emptyKafkaConfig = KafkaConfig(Map.empty[String, ConfigEntry])
_ <- client.alterConfigs(Map(configResource -> emptyKafkaConfig), AlterConfigsOptions())
updatedConfigsWithDelete <- client.describeConfigs(Seq(ConfigResource(ConfigResourceType.Topic, topicName)))
} yield {
val updatedRetentionMsConfig =
updatedConfigsWithUpdate.get(configResource).flatMap(_.entries.get("retention.ms"))
val deleteRetentionMsConfig =
updatedConfigsWithDelete.get(configResource).flatMap(_.entries.get("retention.ms"))
assert(updatedRetentionMsConfig.map(_.value()))(isSome(equalTo("1"))) &&
assert(updatedRetentionMsConfig.map(_.source()))(isSome(equalTo(ConfigSource.DYNAMIC_TOPIC_CONFIG))) &&
assert(deleteRetentionMsConfig.map(_.value()))(isSome(equalTo("604800000"))) &&
assert(deleteRetentionMsConfig.map(_.source()))(isSome(equalTo(ConfigSource.DEFAULT_CONFIG)))
}
},
test("alter configs async") {
for {
topicName <- randomTopic
client <- KafkaTestUtils.makeAdminClient
_ <- client.createTopic(AdminClient.NewTopic(topicName, numPartitions = 1, replicationFactor = 1))

configEntry = new ConfigEntry("retention.ms", "1")
configResource = ConfigResource(ConfigResourceType.Topic, topicName)

kafkaConfig = KafkaConfig(Map(topicName -> configEntry))
setResult <-
client
.alterConfigsAsync(Map(configResource -> kafkaConfig), AlterConfigsOptions())
.flatMap { configsAsync =>
ZIO.foreachPar(configsAsync) { case (configResource, unitAsync) =>
unitAsync.map(unit => (configResource, unit))
}
}
updatedConfigsWithUpdate <- client.describeConfigs(Seq(ConfigResource(ConfigResourceType.Topic, topicName)))

emptyKafkaConfig = KafkaConfig(Map.empty[String, ConfigEntry])
deleteResult <-
client
.alterConfigsAsync(Map(configResource -> emptyKafkaConfig), AlterConfigsOptions())
.flatMap { configsAsync =>
ZIO.foreachPar(configsAsync) { case (configResource, unitAsync) =>
unitAsync.map(unit => (configResource, unit))
}
}
updatedConfigsWithDelete <- client.describeConfigs(Seq(ConfigResource(ConfigResourceType.Topic, topicName)))
} yield {
val updatedRetentionMsConfig =
updatedConfigsWithUpdate.get(configResource).flatMap(_.entries.get("retention.ms"))
val deleteRetentionMsConfig =
updatedConfigsWithDelete.get(configResource).flatMap(_.entries.get("retention.ms"))
assert(updatedRetentionMsConfig.map(_.value()))(isSome(equalTo("1"))) &&
assert(updatedRetentionMsConfig.map(_.source()))(isSome(equalTo(ConfigSource.DYNAMIC_TOPIC_CONFIG))) &&
assert(deleteRetentionMsConfig.map(_.value()))(isSome(equalTo("604800000"))) &&
assert(deleteRetentionMsConfig.map(_.source()))(isSome(equalTo(ConfigSource.DEFAULT_CONFIG))) &&
assert(setResult)(equalTo(Map((configResource, ())))) &&
assert(deleteResult)(equalTo(Map((configResource, ()))))
}
},
test("ACLs") {
for {
client <- KafkaTestUtils.makeAdminClient
Expand Down Expand Up @@ -650,11 +579,11 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.repeat(
(Schedule.recurs(5) && Schedule.fixed(Duration.fromMillis(500)) && Schedule
.recurUntil[ConsumerGroupDescription](
_.state == AdminClient.ConsumerGroupState.Stable
_.state == AdminClient.GroupState.Stable
)).map(_._3)
)
.flatMap(desc =>
if (desc.state == AdminClient.ConsumerGroupState.Stable) {
if (desc.state == AdminClient.GroupState.Stable) {
ZIO.succeed(desc)
} else {
ZIO.fail(new IllegalStateException(s"Client is not in stable state: $desc"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.{ MockConsumer, OffsetAndMetadata, OffsetCommitCallback, OffsetResetStrategy }
import org.apache.kafka.clients.consumer.{ MockConsumer, OffsetAndMetadata, OffsetCommitCallback }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio.test._
Expand Down Expand Up @@ -199,7 +199,7 @@ object CommitterSpec extends ZIOSpecDefault {
onCommitAsync: Map[TopicPartition, OffsetAndMetadata] => Task[Map[TopicPartition, OffsetAndMetadata]]
): ZIO[Any, Nothing, MockConsumer[Array[Byte], Array[Byte]]] =
ZIO.runtime[Any].map { runtime =>
new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.LATEST) {
new MockConsumer[Array[Byte], Array[Byte]]("latest") {
override def commitAsync(
offsets: JavaMap[TopicPartition, OffsetAndMetadata],
callback: OffsetCommitCallback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
test("should track assigned, revoked and lost partitions") {
for {
lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None)
consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {}
consumer = new BinaryMockConsumer("latest") {}
tp = new TopicPartition("topic", 0)
tp2 = new TopicPartition("topic", 1)
tp3 = new TopicPartition("topic", 2)
Expand Down Expand Up @@ -81,7 +81,7 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
test("should end streams for revoked and lost partitions") {
for {
lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None)
consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {}
consumer = new BinaryMockConsumer("latest") {}
tp = new TopicPartition("topic", 0)
tp2 = new TopicPartition("topic", 1)
tp3 = new TopicPartition("topic", 2)
Expand Down Expand Up @@ -154,7 +154,7 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
// - Assert that onRevoked takes its time, waiting for the configured max rebalance duration
for {
lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None)
consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {}
consumer = new BinaryMockConsumer("latest") {}
tp = new TopicPartition("topic", 0)
streamControl <- makeStreamControl(tp)
recordCount = 3
Expand Down Expand Up @@ -195,7 +195,7 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
// - Assert that onRevoked completes immediately, meaning that it does not wait for the stream to commit
for {
lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None)
consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {}
consumer = new BinaryMockConsumer("latest") {}
tp = new TopicPartition("topic", 0)
streamControl <- makeStreamControl(tp)
recordCount = 3
Expand Down Expand Up @@ -314,7 +314,7 @@ abstract private class MockCommitter extends Committer {
private class CommitTrackingMockConsumer(
runtime: Runtime[Any],
lastCommittedOffsets: Ref[Map[TopicPartition, Long]]
) extends MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.LATEST) {
) extends MockConsumer[Array[Byte], Array[Byte]]("latest") {
override def commitAsync(
offsets: JavaMap[TopicPartition, OffsetAndMetadata],
callback: OffsetCommitCallback
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.{
ConsumerRebalanceListener,
ConsumerRecord,
MockConsumer,
OffsetResetStrategy
}
import org.apache.kafka.clients.consumer.{ ConsumerRebalanceListener, ConsumerRecord, MockConsumer }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException }
import zio._
Expand Down Expand Up @@ -109,7 +104,7 @@ object RunloopSpec extends ZIOSpecDefaultSlf4j {
var rebalanceListener: ConsumerRebalanceListener = null

// Catches the rebalance listener so we can use it
val mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {
val mockConsumer: BinaryMockConsumer = new BinaryMockConsumer("latest") {
override def subscribe(
topics: util.Collection[String],
listener: ConsumerRebalanceListener
Expand Down Expand Up @@ -191,7 +186,7 @@ object RunloopSpec extends ZIOSpecDefaultSlf4j {

private def withRunloop(
diagnostics: ConsumerDiagnostics = Diagnostics.NoOp,
mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST)
mockConsumer: BinaryMockConsumer = new BinaryMockConsumer("latest")
)(
f: (BinaryMockConsumer, PartitionsHub, Runloop) => ZIO[Scope, Throwable, TestResult]
): ZIO[Scope, Throwable, TestResult] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetad
import org.apache.kafka.clients.producer.{ Callback, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition, Uuid }
import org.apache.kafka.clients.producer.{ Producer => KafkaProducer }
import org.apache.kafka.common.metrics.KafkaMetric
import zio._

import java.time.Duration
Expand Down Expand Up @@ -249,11 +250,6 @@ class NotSupportedProducer[K, V] extends KafkaProducer[K, V] {

override def beginTransaction(): Unit = throw new UnsupportedOperationException()

override def sendOffsetsToTransaction(
offsets: JMap[TopicPartition, OffsetAndMetadata],
consumerGroupId: String
): Unit = throw new UnsupportedOperationException()

override def sendOffsetsToTransaction(
offsets: JMap[TopicPartition, OffsetAndMetadata],
groupMetadata: ConsumerGroupMetadata
Expand All @@ -279,4 +275,8 @@ class NotSupportedProducer[K, V] extends KafkaProducer[K, V] {
override def close(): Unit = throw new UnsupportedOperationException()

override def close(timeout: Duration): Unit = throw new UnsupportedOperationException()

override def registerMetricForSubscription(metric: KafkaMetric): Unit = throw new UnsupportedOperationException()

override def unregisterMetricFromSubscription(metric: KafkaMetric): Unit = throw new UnsupportedOperationException()
}
25 changes: 22 additions & 3 deletions zio-kafka-testkit/src/main/resources/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
# Keystore and truststore files generation

The files:
The following files are used in the testkit. They were generated thanks to https://github.com/confluentinc/confluent-platform-security-tools/blob/master/kafka-generate-ssl.sh:

- `keystore/kafka.keystore.jks`
- View with `keytool -list -v -keystore keystore/kafka.keystore.jks`
- Password: `123456`
- Entry 1
- alias: `caroot`
- owner: `O=Internet Widgits Pty Ltd, ST=Some-State, C=AU`
- valid until 2033-01-16
- Entry 2
- alias: `localhost`
- owner: `CN=localhost, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown`
- valid until 2033-01-16
- `truststore/kafka.truststore.jks`
- View with `keytool -list -v -keystore truststore/kafka.truststore.jks`
- Password: `123456`
- Keystore type: `PKCS12`
- Entry 1
- alias: `caroot`
- Owner: `O=Internet Widgits Pty Ltd, ST=Some-State, C=AU`
- valid until 2033-01-16
- `truststore/ca-key`

were generated thanks to https://github.com/confluentinc/confluent-platform-security-tools/blob/master/kafka-generate-ssl.sh
- View with `openssl rsa -in truststore/ca-key -text`
- RSA certificate
- Password: `123456`
Loading
Loading