This repository has been archived by the owner on Jan 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
initial stab at new connector for kinesis streams #58 #102
Merged
adrianfilip
merged 2 commits into
zio-archive:master
from
rbraley:kinesis_data_streams_issue_58
Nov 17, 2022
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
27 changes: 27 additions & 0 deletions
27
...connector/src/main/scala/zio/connect/kinesisdatastreams/KinesisDataStreamsConnector.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package zio.connect.kinesisdatastreams | ||
|
||
import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{KinesisDataStreamsException, ProducerRecord} | ||
import zio.prelude.Newtype | ||
import zio.stream.ZSink | ||
import zio.{Chunk, Trace} | ||
|
||
trait KinesisDataStreamsConnector[T] { | ||
|
||
def sinkChunked(implicit | ||
trace: Trace | ||
): ZSink[Any, KinesisDataStreamsException, Chunk[ProducerRecord[T]], Nothing, Unit] | ||
} | ||
|
||
object KinesisDataStreamsConnector { | ||
|
||
object StreamName extends Newtype[String] | ||
type StreamName = StreamName.Type | ||
|
||
object PartitionKey extends Newtype[String] | ||
type PartitionKey = PartitionKey.Type | ||
|
||
final case class ProducerRecord[T](partitionKey: PartitionKey, data: T) | ||
|
||
case class KinesisDataStreamsException(reason: Throwable) | ||
|
||
} |
24 changes: 24 additions & 0 deletions
24
...ector/src/main/scala/zio/connect/kinesisdatastreams/LiveKinesisDataStreamsConnector.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package zio.connect.kinesisdatastreams | ||
import izumi.reflect.Tag | ||
import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{KinesisDataStreamsException, ProducerRecord} | ||
import zio.stream.ZSink | ||
import zio.{Chunk, Trace, ZIO, ZLayer} | ||
import nl.vroste.zio.kinesis.client.Producer | ||
|
||
final case class LiveKinesisDataStreamsConnector[T](producer: Producer[T]) extends KinesisDataStreamsConnector[T] { | ||
override def sinkChunked(implicit | ||
trace: Trace | ||
): ZSink[Any, KinesisDataStreamsException, Chunk[KinesisDataStreamsConnector.ProducerRecord[T]], Nothing, Unit] = | ||
producer.sinkChunked | ||
.mapError(e => KinesisDataStreamsException.apply(e)) | ||
.contramap[Chunk[ProducerRecord[T]]](chunk => | ||
chunk.map(record => nl.vroste.zio.kinesis.client.ProducerRecord[T](record.partitionKey.toString, record.data)) | ||
) | ||
} | ||
|
||
object LiveKinesisDataStreamsConnector { | ||
|
||
def layer[T](implicit tag: Tag[T]): ZLayer[Producer[T], Nothing, LiveKinesisDataStreamsConnector[T]] = | ||
ZLayer.fromZIO(ZIO.service[Producer[T]].map(prod => LiveKinesisDataStreamsConnector[T](prod))) | ||
|
||
} |
54 changes: 54 additions & 0 deletions
54
...ector/src/main/scala/zio/connect/kinesisdatastreams/TestKinesisDataStreamsConnector.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package zio.connect.kinesisdatastreams | ||
|
||
import izumi.reflect.Tag | ||
import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{ | ||
KinesisDataStreamsException, | ||
PartitionKey, | ||
ProducerRecord | ||
} | ||
import zio.connect.kinesisdatastreams.TestKinesisDataStreamsConnector.TestKinesisDataStream | ||
import zio.stm.{STM, TRef, ZSTM} | ||
import zio.stream.ZSink | ||
import zio.{Chunk, Trace, ZIO, ZLayer} | ||
|
||
private[kinesisdatastreams] final case class TestKinesisDataStreamsConnector[T]( | ||
kinesisDataStream: TestKinesisDataStream[T] | ||
) extends KinesisDataStreamsConnector[T] { | ||
|
||
override def sinkChunked(implicit | ||
trace: Trace | ||
): ZSink[Any, KinesisDataStreamsException, Chunk[ProducerRecord[T]], Nothing, Unit] = | ||
ZSink.foreach(records => kinesisDataStream.write(records)) | ||
|
||
} | ||
|
||
object TestKinesisDataStreamsConnector { | ||
|
||
def layer[T](implicit tag: Tag[T]): ZLayer[Any, Nothing, TestKinesisDataStreamsConnector[T]] = | ||
ZLayer.fromZIO(STM.atomically { | ||
for { | ||
a <- TRef.make(Map.empty[PartitionKey, Chunk[ProducerRecord[T]]]) | ||
} yield TestKinesisDataStreamsConnector(TestKinesisDataStream[T](a)) | ||
}) | ||
|
||
private[kinesisdatastreams] final case class TestKinesisDataStream[T]( | ||
repo: TRef[Map[PartitionKey, Chunk[ProducerRecord[T]]]] | ||
) { | ||
def write(records: Chunk[ProducerRecord[T]])(implicit | ||
trace: Trace | ||
): ZIO[Any, Nothing, Unit] = | ||
ZSTM.atomically { | ||
for { | ||
_ <- ZSTM.foreach(records) { record => | ||
repo.update(r => | ||
r.updated( | ||
record.partitionKey, | ||
r.getOrElse(record.partitionKey, Chunk.empty[ProducerRecord[T]]).appended(record) | ||
) | ||
) | ||
} | ||
} yield () | ||
} | ||
} | ||
|
||
} |
15 changes: 15 additions & 0 deletions
15
...inesis-data-streams-connector/src/main/scala/zio/connect/kinesisdatastreams/package.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package zio.connect | ||
|
||
import izumi.reflect.Tag | ||
import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{KinesisDataStreamsException, ProducerRecord} | ||
import zio.stream.ZSink | ||
import zio.{Chunk, Trace} | ||
|
||
package object kinesisdatastreams { | ||
def sinkChunked[T](implicit | ||
trace: Trace, | ||
tag: Tag[T] | ||
): ZSink[KinesisDataStreamsConnector[T], KinesisDataStreamsException, Chunk[ProducerRecord[T]], Nothing, Unit] = | ||
ZSink.serviceWithSink(_.sinkChunked) | ||
|
||
} |
29 changes: 29 additions & 0 deletions
29
...ector/src/test/scala/zio/connect/kinesisdatastreams/KinesisDataStreamsConnectorSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package zio.connect.kinesisdatastreams | ||
|
||
import zio.connect.kinesisdatastreams.KinesisDataStreamsConnector.{ | ||
KinesisDataStreamsException, | ||
PartitionKey, | ||
ProducerRecord | ||
} | ||
import zio.stream.ZStream | ||
import zio.test.Assertion._ | ||
import zio.test._ | ||
import zio.Chunk | ||
|
||
trait KinesisDataStreamsConnectorSpec extends ZIOSpecDefault { | ||
|
||
val kinesisDataStreamsConnectorSpec = produceRecordSuite | ||
|
||
private lazy val produceRecordSuite: Spec[KinesisDataStreamsConnector[String], KinesisDataStreamsException] = | ||
suite("produceRecords")( | ||
test("kinesis stream is populated by partition key ") { | ||
// val streamName = StreamName(UUID.randomUUID().toString) | ||
val record1 = ProducerRecord(PartitionKey("1"), "Data1") | ||
val record2 = ProducerRecord(PartitionKey("1"), "Data2") | ||
val record3 = ProducerRecord(PartitionKey("2"), "Data2") | ||
for { | ||
result <- ZStream.succeed(Chunk(record1, record2, record3)).run(sinkChunked[String]) | ||
} yield assert(result)(equalTo(result)) | ||
} | ||
) | ||
} |
57 changes: 57 additions & 0 deletions
57
...r/src/test/scala/zio/connect/kinesisdatastreams/LiveKinesisDataStreamsConnectorSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package zio.connect.kinesisdatastreams | ||
|
||
import nl.vroste.zio.kinesis.client.Producer | ||
import nl.vroste.zio.kinesis.client.serde.Serde | ||
import org.testcontainers.containers.localstack.LocalStackContainer | ||
import org.testcontainers.containers.localstack.LocalStackContainer.Service | ||
import org.testcontainers.utility.DockerImageName | ||
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} | ||
import software.amazon.awssdk.regions.Region | ||
import zio.aws.core.config.AwsConfig | ||
import zio.aws.core.httpclient.HttpClient | ||
import zio.aws.netty.NettyHttpClient | ||
import zio.aws.kinesis.Kinesis | ||
import zio.{Scope, ZIO, ZLayer} | ||
|
||
object LiveKinesisDataStreamsConnectorSpec extends KinesisDataStreamsConnectorSpec { | ||
override def spec = | ||
suite("LiveKinesisDataStreamsConnectorSpec")(kinesisDataStreamsConnectorSpec) | ||
.provideSomeShared[Scope]( | ||
localStackContainer, | ||
awsConfig, | ||
producer, | ||
zio.connect.kinesisdatastreams.LiveKinesisDataStreamsConnector.layer[String] | ||
) | ||
|
||
lazy val httpClient: ZLayer[Any, Throwable, HttpClient] = NettyHttpClient.default | ||
lazy val awsConfig: ZLayer[Any, Throwable, AwsConfig] = httpClient >>> AwsConfig.default | ||
|
||
lazy val localStackContainer: ZLayer[Scope, Throwable, LocalStackContainer] = | ||
ZLayer.fromZIO( | ||
ZIO.acquireRelease(ZIO.attempt { | ||
val localstackImage = DockerImageName.parse("localstack/localstack:0.11.3") | ||
val localstack = new LocalStackContainer(localstackImage) | ||
.withServices(Service.KINESIS) | ||
localstack.start() | ||
localstack | ||
})(ls => ZIO.attempt(ls.stop()).orDie) | ||
) | ||
|
||
lazy val producer: ZLayer[AwsConfig with LocalStackContainer with Scope, Throwable, Producer[String]] = | ||
ZLayer | ||
.fromZIO(for { | ||
localstack <- ZIO.service[LocalStackContainer] | ||
p <- Producer | ||
.make("TestStream", Serde.asciiString) | ||
.provideSome[Scope]( | ||
Kinesis.customized( | ||
_.credentialsProvider( | ||
StaticCredentialsProvider | ||
.create(AwsBasicCredentials.create(localstack.getAccessKey, localstack.getSecretKey)) | ||
).region(Region.of(localstack.getRegion)) | ||
.endpointOverride(localstack.getEndpointOverride(Service.KINESIS)) | ||
), | ||
awsConfig | ||
) | ||
} yield p) | ||
} |
10 changes: 10 additions & 0 deletions
10
...r/src/test/scala/zio/connect/kinesisdatastreams/TestKinesisDataStreamsConnectorSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package zio.connect.kinesisdatastreams | ||
|
||
object TestKinesisDataStreamsConnectorSpec extends KinesisDataStreamsConnectorSpec { | ||
|
||
override def spec = | ||
suite("TestKinesisDataStreamsConnectorSpec")(kinesisDataStreamsConnectorSpec).provide( | ||
TestKinesisDataStreamsConnector.layer[String] | ||
) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
import sbt._ | ||
|
||
object KinesisDataStreamsDependencies { | ||
|
||
//required for localstack testcontainer | ||
lazy val `aws-java-sdk-core` = "com.amazonaws" % "aws-java-sdk-core" % "1.12.319" % "test" | ||
|
||
lazy val localstack = "org.testcontainers" % "localstack" % "1.17.5" % "test" | ||
|
||
lazy val `zio-aws-kinesis` = "nl.vroste" %% "zio-kinesis" % "0.30.0" | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👋 zio-kinesis author here.
I think the
sinkChunked
on theProducer
could be changed to a non-chunked version, since ZStreams are chunked under the hood anyway. svroonland/zio-kinesis#791I would recommend to change that here too, to not expose the Chunks in the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@svroonland I will merge this PR and apply your suggestions in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@svroonland That PR change is not released yet. We will keep an eye on the releases and update it once 0.30.1 is out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I will try to make a release this week and will let you know here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adrianfilip I just released 0.30.1, should be on Maven soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@svroonland Fantastic. Will upgrade shortly.