Skip to content

Commit

Permalink
zio-archive#92 Kafka Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
tewecske committed Oct 28, 2022
1 parent 1cb4abf commit 46333d1
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 0 deletions.
26 changes: 26 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,32 @@ lazy val s3Connector = project
Test / fork := true
)

lazy val kafkaConnector = project
.in(file("connectors/kafka-connector"))
.settings(stdSettings("zio-connect-kafka"))
.settings(
libraryDependencies ++= Seq(
KafkaDependencies.`zio-kafka`,
KafkaDependencies.`zio-kafka-test-utils`,
zio,
`zio-streams`,
`zio-test`,
`zio-test-sbt`
)
)
.settings(
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n <= 12 => Seq(`scala-compact-collection`)
case _ => Seq.empty
}
}
)
.settings(
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework"),
Test / fork := true
)

lazy val docs = project
.in(file("zio-connect-docs"))
.settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package zio.connect.kafka

import zio.kafka.consumer._
import zio.stream._

trait KafkaConnector {
def read(topic: => String): ZStream[Any, Throwable, CommittableRecord[String, String]]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package zio.connect.kafka

import zio._
import zio.kafka.consumer._
import zio.kafka.serde._
import zio.stream._

case class LiveKafkaConnector(consumer: Consumer) extends KafkaConnector {

def read(topic: => String): ZStream[Any, Throwable, CommittableRecord[String, String]] =
consumer
.subscribeAnd(Subscription.Topics(Set(topic)))
.plainStream(Serde.string, Serde.string)

}


object LiveKafkaConnector {
val layer: ZLayer[Consumer, Nothing, KafkaConnector] =
ZLayer.fromZIO(ZIO.service[Consumer].map(LiveKafkaConnector(_)))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package zio.connect

import zio.kafka.consumer._
import zio.stream._

package object kafka {
def read(topic: => String): ZStream[KafkaConnector, Throwable, CommittableRecord[String, String]] =
ZStream.serviceWithStream(_.read(topic))

val kafkaConnectorLiveLayer = LiveKafkaConnector.layer

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package zio.connect.kafka

/*
import zio.ZLayer
import zio.kafka.KafkaTestUtils._
import zio.kafka.consumer._
import zio.kafka.embedded._
import zio.kafka.producer._
import zio.test.Assertion._
*/
import zio.test._

trait KafkaConnectorSpec extends ZIOSpecDefault{

/*
case class KafkaConnectorTestContext(
tempTopic: String,
tempClient: String,
tempGroup: String
)
val kafkaConnectorSpec: Spec[KafkaConnectorTestContext with KafkaConnector with Kafka with Producer, Throwable] =
readSuite
val tempTopic = "tempTestTopic"
val tempClient = "tempTestClient"
val tempGroup = "tempTestGroup"
val testKey1 = "testKey1"
val testMessage1 = "testMessage1"
private lazy val readSuite =
suite("read")(
test("succeeds") {
val kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i"))
for {
_ <- produceMany(tempTopic, kvs)
records <- read(tempTopic)
.take(5)
.runCollect
kvOut = records.map(r => (r.record.key, r.record.value)).toList
} yield assert(kvOut)(equalTo(kvs))
}.provide(ZLayer.succeed(KafkaConnectorTestContext("a", "b", "c")))
)
*/


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package zio.connect.kafka

import zio._
import zio.kafka.KafkaTestUtils._
import zio.kafka.embedded.Kafka
import zio.test._
import zio.test.Assertion._
import zio.test.TestAspect._

object LiveKafkaConnectorSpec extends KafkaConnectorSpec {

/*
override def spec =
suite("LiveKafkaConnectorSpec")(kafkaConnectorSpec)
.provide(
zio.connect.kafka.kafkaConnectorLiveLayer,
embedded,
KafkaTestUtils.producer,
//ZIO.serviceWithZIO[KafkaConnectorTestContext](ctx => KafkaTestUtils.consumer(ctx.tempClient)
)
*/

val tempTopic = "tempTestTopic"
val tempClient = "tempTestClient"
val tempGroup = "tempTestGroup"
val testKey1 = "testKey1"
val testMessage1 = "testMessage1"

override def spec =
suite("read")(
test("succeeds") {
val kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i"))
for {
_ <- produceMany(tempTopic, kvs)
records <- read(tempTopic)
.take(5)
.runCollect
kvOut = records.map(r => (r.record.key, r.record.value)).toList
} yield assert(kvOut)(equalTo(kvs))
}.provide(
zio.connect.kafka.kafkaConnectorLiveLayer,
consumer("clientId", Some("groupId")),
producer,
Kafka.embedded
)
) @@ withLiveClock @@ timeout(
300.seconds
)
}
9 changes: 9 additions & 0 deletions project/KafkaDependencies.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import sbt._

object KafkaDependencies {

lazy val zioKafkaVersion = "2.0.1"
lazy val `zio-kafka` = "dev.zio" %% "zio-kafka" % zioKafkaVersion
lazy val `zio-kafka-test-utils` = "dev.zio" %% "zio-kafka-test-utils" % zioKafkaVersion % "test"

}

0 comments on commit 46333d1

Please sign in to comment.