Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

initial stab at new connector for kinesis streams #58 #102

Merged
merged 2 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,33 @@ lazy val fileConnector = project
)
.settings(testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework"))

lazy val kinesisDataStreamsConnector = project
.in(file("connectors/kinesis-data-streams-connector"))
.settings(stdSettings("zio-connect-kinesis-data-streams"))
.settings(
libraryDependencies ++= Seq(
KinesisDataStreamsDependencies.`aws-java-sdk-core`,
KinesisDataStreamsDependencies.localstack,
KinesisDataStreamsDependencies.`zio-aws-kinesis`,
zio,
`zio-streams`,
`zio-test`,
`zio-test-sbt`
)
)
.settings(
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n <= 12 => Seq(`scala-compact-collection`)
case _ => Seq.empty
}
}
)
.settings(
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework"),
Test / fork := true
)

lazy val s3Connector = project
.in(file("connectors/s3-connector"))
.settings(stdSettings("zio-connect-s3"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package zio.connect.kinesisdatastreams

import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{KinesisDataStreamsException, ProducerRecord}
import zio.prelude.Newtype
import zio.stream.ZSink
import zio.{Chunk, Trace}

trait KinesisDataStreamsConnector[T] {

def sinkChunked(implicit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 zio-kinesis author here.

I think the sinkChunked on the Producer could be changed to a non-chunked version, since ZStreams are chunked under the hood anyway. svroonland/zio-kinesis#791

I would recommend to change that here too, to not expose the Chunks in the interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@svroonland I will merge this PR and apply your suggestions in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@svroonland That PR change is not released yet. We will keep an eye on the releases and update it once 0.30.1 is out.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I will try to make a release this week and will let you know here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adrianfilip I just released 0.30.1, should be on Maven soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@svroonland Fantastic. Will upgrade shortly.

trace: Trace
): ZSink[Any, KinesisDataStreamsException, Chunk[ProducerRecord[T]], Nothing, Unit]
}

object KinesisDataStreamsConnector {

object StreamName extends Newtype[String]
type StreamName = StreamName.Type

object PartitionKey extends Newtype[String]
type PartitionKey = PartitionKey.Type

final case class ProducerRecord[T](partitionKey: PartitionKey, data: T)

case class KinesisDataStreamsException(reason: Throwable)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package zio.connect.kinesisdatastreams
import izumi.reflect.Tag
import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{KinesisDataStreamsException, ProducerRecord}
import zio.stream.ZSink
import zio.{Chunk, Trace, ZIO, ZLayer}
import nl.vroste.zio.kinesis.client.Producer

final case class LiveKinesisDataStreamsConnector[T](producer: Producer[T]) extends KinesisDataStreamsConnector[T] {
override def sinkChunked(implicit
trace: Trace
): ZSink[Any, KinesisDataStreamsException, Chunk[KinesisDataStreamsConnector.ProducerRecord[T]], Nothing, Unit] =
producer.sinkChunked
.mapError(e => KinesisDataStreamsException.apply(e))
.contramap[Chunk[ProducerRecord[T]]](chunk =>
chunk.map(record => nl.vroste.zio.kinesis.client.ProducerRecord[T](record.partitionKey.toString, record.data))
)
}

object LiveKinesisDataStreamsConnector {

def layer[T](implicit tag: Tag[T]): ZLayer[Producer[T], Nothing, LiveKinesisDataStreamsConnector[T]] =
ZLayer.fromZIO(ZIO.service[Producer[T]].map(prod => LiveKinesisDataStreamsConnector[T](prod)))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package zio.connect.kinesisdatastreams

import izumi.reflect.Tag
import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{
KinesisDataStreamsException,
PartitionKey,
ProducerRecord
}
import zio.connect.kinesisdatastreams.TestKinesisDataStreamsConnector.TestKinesisDataStream
import zio.stm.{STM, TRef, ZSTM}
import zio.stream.ZSink
import zio.{Chunk, Trace, ZIO, ZLayer}

private[kinesisdatastreams] final case class TestKinesisDataStreamsConnector[T](
kinesisDataStream: TestKinesisDataStream[T]
) extends KinesisDataStreamsConnector[T] {

override def sinkChunked(implicit
trace: Trace
): ZSink[Any, KinesisDataStreamsException, Chunk[ProducerRecord[T]], Nothing, Unit] =
ZSink.foreach(records => kinesisDataStream.write(records))

}

object TestKinesisDataStreamsConnector {

def layer[T](implicit tag: Tag[T]): ZLayer[Any, Nothing, TestKinesisDataStreamsConnector[T]] =
ZLayer.fromZIO(STM.atomically {
for {
a <- TRef.make(Map.empty[PartitionKey, Chunk[ProducerRecord[T]]])
} yield TestKinesisDataStreamsConnector(TestKinesisDataStream[T](a))
})

private[kinesisdatastreams] final case class TestKinesisDataStream[T](
repo: TRef[Map[PartitionKey, Chunk[ProducerRecord[T]]]]
) {
def write(records: Chunk[ProducerRecord[T]])(implicit
trace: Trace
): ZIO[Any, Nothing, Unit] =
ZSTM.atomically {
for {
_ <- ZSTM.foreach(records) { record =>
repo.update(r =>
r.updated(
record.partitionKey,
r.getOrElse(record.partitionKey, Chunk.empty[ProducerRecord[T]]).appended(record)
)
)
}
} yield ()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package zio.connect

import izumi.reflect.Tag
import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{KinesisDataStreamsException, ProducerRecord}
import zio.stream.ZSink
import zio.{Chunk, Trace}

package object kinesisdatastreams {
def sinkChunked[T](implicit
trace: Trace,
tag: Tag[T]
): ZSink[KinesisDataStreamsConnector[T], KinesisDataStreamsException, Chunk[ProducerRecord[T]], Nothing, Unit] =
ZSink.serviceWithSink(_.sinkChunked)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package zio.connect.kinesisdatastreams

import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{
KinesisDataStreamsException,
PartitionKey,
ProducerRecord
}
import zio.stream.ZStream
import zio.test.Assertion._
import zio.test._
import zio.Chunk

trait KinesisDataStreamsConnectorSpec extends ZIOSpecDefault {

val kinesisDataStreamsConnectorSpec = produceRecordSuite

private lazy val produceRecordSuite: Spec[KinesisDataStreamsConnector[String], KinesisDataStreamsException] =
suite("produceRecords")(
test("kinesis stream is populated by partition key ") {
// val streamName = StreamName(UUID.randomUUID().toString)
val record1 = ProducerRecord(PartitionKey("1"), "Data1")
val record2 = ProducerRecord(PartitionKey("1"), "Data2")
val record3 = ProducerRecord(PartitionKey("2"), "Data2")
for {
result <- ZStream.succeed(Chunk(record1, record2, record3)).run(sinkChunked[String])
} yield assert(result)(equalTo(result))
}
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package zio.connect.kinesisdatastreams

import nl.vroste.zio.kinesis.client.Producer
import nl.vroste.zio.kinesis.client.serde.Serde
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.containers.localstack.LocalStackContainer.Service
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import zio.aws.core.config.AwsConfig
import zio.aws.core.httpclient.HttpClient
import zio.aws.netty.NettyHttpClient
import zio.aws.kinesis.Kinesis
import zio.{Scope, ZIO, ZLayer}

object LiveKinesisDataStreamsConnectorSpec extends KinesisDataStreamsConnectorSpec {
override def spec =
suite("LiveKinesisDataStreamsConnectorSpec")(kinesisDataStreamsConnectorSpec)
.provideSomeShared[Scope](
localStackContainer,
awsConfig,
producer,
zio.connect.kinesisdatastreams.LiveKinesisDataStreamsConnector.layer[String]
)

lazy val httpClient: ZLayer[Any, Throwable, HttpClient] = NettyHttpClient.default
lazy val awsConfig: ZLayer[Any, Throwable, AwsConfig] = httpClient >>> AwsConfig.default

lazy val localStackContainer: ZLayer[Scope, Throwable, LocalStackContainer] =
ZLayer.fromZIO(
ZIO.acquireRelease(ZIO.attempt {
val localstackImage = DockerImageName.parse("localstack/localstack:0.11.3")
val localstack = new LocalStackContainer(localstackImage)
.withServices(Service.KINESIS)
localstack.start()
localstack
})(ls => ZIO.attempt(ls.stop()).orDie)
)

lazy val producer: ZLayer[AwsConfig with LocalStackContainer with Scope, Throwable, Producer[String]] =
ZLayer
.fromZIO(for {
localstack <- ZIO.service[LocalStackContainer]
p <- Producer
.make("TestStream", Serde.asciiString)
.provideSome[Scope](
Kinesis.customized(
_.credentialsProvider(
StaticCredentialsProvider
.create(AwsBasicCredentials.create(localstack.getAccessKey, localstack.getSecretKey))
).region(Region.of(localstack.getRegion))
.endpointOverride(localstack.getEndpointOverride(Service.KINESIS))
),
awsConfig
)
} yield p)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package zio.connect.kinesisdatastreams

object TestKinesisDataStreamsConnectorSpec extends KinesisDataStreamsConnectorSpec {

override def spec =
suite("TestKinesisDataStreamsConnectorSpec")(kinesisDataStreamsConnectorSpec).provide(
TestKinesisDataStreamsConnector.layer[String]
)

}
11 changes: 11 additions & 0 deletions project/KinesisDataStreamsDependencies.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import sbt._

object KinesisDataStreamsDependencies {

//required for localstack testcontainer
lazy val `aws-java-sdk-core` = "com.amazonaws" % "aws-java-sdk-core" % "1.12.319" % "test"

lazy val localstack = "org.testcontainers" % "localstack" % "1.17.5" % "test"

lazy val `zio-aws-kinesis` = "nl.vroste" %% "zio-kinesis" % "0.30.0"
}