From 1a26f9d4c01c3ef1076305d163deababb1e066f7 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 3 Nov 2024 20:48:35 +0100 Subject: [PATCH] More specific errors than Throwable for Consumer Implements #241 Build still fails because test and bench projects have not been migrated yet, first want to gather some feedback on this change. --- .../main/scala/zio/kafka/example/Main.scala | 3 +- .../zio/kafka/testkit/KafkaTestUtils.scala | 12 +- .../kafka/consumer/CommittableRecord.scala | 9 +- .../scala/zio/kafka/consumer/Consumer.scala | 100 +++++++++++---- .../zio/kafka/consumer/ConsumerSettings.scala | 6 +- .../consumer/InvalidSubscriptionUnion.scala | 6 - .../scala/zio/kafka/consumer/Offset.scala | 25 ++-- .../zio/kafka/consumer/OffsetBatch.scala | 13 +- .../diagnostics/DiagnosticEvent.scala | 3 +- .../consumer/internal/ConsumerAccess.scala | 6 +- .../internal/PartitionStreamControl.scala | 36 +++--- .../zio/kafka/consumer/internal/Runloop.scala | 119 +++++++++++++----- .../consumer/internal/RunloopAccess.scala | 19 +-- .../consumer/internal/RunloopCommand.scala | 3 +- .../scala/zio/kafka/serde/Deserializer.scala | 23 ++-- .../main/scala/zio/kafka/serde/Serde.scala | 28 +++-- .../main/scala/zio/kafka/serde/Serdes.scala | 17 ++- .../scala/zio/kafka/utils/SslHelper.scala | 2 +- 18 files changed, 283 insertions(+), 147 deletions(-) delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/InvalidSubscriptionUnion.scala diff --git a/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala b/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala index a4baa348c..36b2ca279 100644 --- a/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala +++ b/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala @@ -2,6 +2,7 @@ package zio.kafka.example import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig } import zio._ +import zio.kafka.consumer.Consumer.ConsumerError import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription } import zio.kafka.serde.Serde @@ -49,7 +50,7 @@ object Main extends ZIOAppDefault { .withGroupId("test") } - private val runConsumerStream: ZIO[Consumer, Throwable, Unit] = + private val runConsumerStream: ZIO[Consumer, ConsumerError, Unit] = for { _ <- ZIO.logInfo("Consuming messages...") consumed <- Consumer diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 9d6ad7a8a..8697ca049 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -4,7 +4,7 @@ import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata } import zio._ import zio.kafka.admin._ -import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval } +import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, ConsumerError, OffsetRetrieval } import zio.kafka.consumer._ import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.producer._ @@ -192,7 +192,7 @@ object KafkaTestUtils { * Utility function to make a Consumer. It requires a ConsumerSettings layer. */ @deprecated("Use [[KafkaTestUtils.minimalConsumer]] instead", "2.3.1") - def simpleConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, Throwable, Consumer] = + def simpleConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, ConsumerError, Consumer] = ZLayer.makeSome[ConsumerSettings, Consumer]( ZLayer.succeed(diagnostics) >>> Consumer.live ) @@ -203,7 +203,7 @@ object KafkaTestUtils { * "minimal" because, unlike the other functions returning a `ZLayer[..., ..., Consumer]` of this file, you need to * provide the `ConsumerSettings` layer yourself. */ - def minimalConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, Throwable, Consumer] = + def minimalConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, ConsumerError, Consumer] = ZLayer.makeSome[ConsumerSettings, Consumer]( ZLayer.succeed(diagnostics) >>> Consumer.live ) @@ -223,7 +223,7 @@ object KafkaTestUtils { maxRebalanceDuration: Duration = 3.minutes, commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, properties: Map[String, String] = Map.empty - ): ZLayer[Kafka, Throwable, Consumer] = + ): ZLayer[Kafka, ConsumerError, Consumer] = (ZLayer( consumerSettings( clientId = clientId, @@ -253,7 +253,7 @@ object KafkaTestUtils { rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty, rebalanceListener: RebalanceListener = RebalanceListener.noop - ): ZLayer[Kafka, Throwable, Consumer] = + ): ZLayer[Kafka, ConsumerError, Consumer] = (ZLayer( transactionalConsumerSettings( groupId = groupId, @@ -274,7 +274,7 @@ object KafkaTestUtils { */ def consumeWithStrings(clientId: String, groupId: Option[String] = None, subscription: Subscription)( r: ConsumerRecord[String, String] => URIO[Any, Unit] - ): RIO[Kafka, Unit] = + ): ZIO[Kafka, ConsumerError, Unit] = consumerSettings(clientId, groupId, None).flatMap { settings => Consumer.consumeWith[Any, Any, String, String]( settings, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index d138ea6d8..79ce28ea0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -2,18 +2,19 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.Consumer.{ CommitError, DeserializationError } import zio.kafka.serde.Deserializer -import zio.{ RIO, Task } +import zio.{ IO, ZIO } final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], - private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( keyDeserializer: Deserializer[R, K1], valueDeserializer: Deserializer[R, V1] - )(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): RIO[R, CommittableRecord[K1, V1]] = + )(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): ZIO[R, DeserializationError, CommittableRecord[K1, V1]] = for { key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) @@ -52,7 +53,7 @@ final case class CommittableRecord[K, V]( object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], - commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + commitHandle: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 776e403e8..c5229587c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -7,7 +7,9 @@ import org.apache.kafka.clients.consumer.{ OffsetAndTimestamp } import org.apache.kafka.common._ +import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import zio._ +import zio.kafka.consumer.Consumer.{ CommitError, ConsumerError, PartitionStreamError } import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.diagnostics.Diagnostics.ConcurrentDiagnostics @@ -17,7 +19,6 @@ import zio.kafka.utils.SslHelper import zio.stream._ import scala.jdk.CollectionConverters._ -import scala.util.control.NoStackTrace trait Consumer { @@ -68,7 +69,7 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] + ): Stream[ConsumerError, Chunk[(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])]] /** * Create a stream with messages on the subscribed topic-partitions by topic-partition @@ -91,7 +92,7 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] + ): Stream[ConsumerError, (TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])] /** * Create a stream with all messages on the subscribed topic-partitions @@ -116,7 +117,7 @@ trait Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int = 4 - ): ZStream[R, Throwable, CommittableRecord[K, V]] + ): ZStream[R, ConsumerError, CommittableRecord[K, V]] /** * Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit @@ -131,10 +132,10 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], - commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) + commitRetryPolicy: Schedule[Any, CommitError, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) )( f: ConsumerRecord[K, V] => URIO[R1, Unit] - ): ZIO[R & R1, Throwable, Unit] + ): ZIO[R & R1, ConsumerError, Unit] /** * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest @@ -161,12 +162,56 @@ trait Consumer { } object Consumer { - case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace + sealed trait ConsumerError { + def message: String + } + + final case class AuthorizationError(cause: AuthorizationException) extends ConsumerError { + override def message: String = cause.getMessage + } + + final case class AuthenticationError(cause: AuthenticationException) extends ConsumerError { + override def message: String = cause.getMessage + } + + final case class SslValidationError(message: String) extends ConsumerError + final case class UnknownConsumerException(cause: Throwable) extends ConsumerError { + override def message: String = cause.getMessage + } + sealed trait SubscriptionError extends ConsumerError + final case class InvalidSubscriptionUnion(subscriptions: Chunk[Subscription]) extends SubscriptionError { + override def message: String = s"Unable to calculate union of subscriptions: ${subscriptions.mkString(",")}" + } + final case class InvalidTopic(topic: String, validationError: String) extends SubscriptionError { + override def message: String = s"Invalid topic '${topic}': ${validationError}" + } + final case class GetManualOffsetsError(cause: Throwable) extends ConsumerError { + override def message: String = s"Unable to retrieve manual offsets: ${cause.getMessage}" + } + sealed trait PartitionStreamError extends ConsumerError + final case class DeserializationError(message: String, cause: Option[Throwable] = None) extends PartitionStreamError + final case class PartitionStreamPullTimeout(topicPartition: TopicPartition, maxPollInterval: Duration) + extends PartitionStreamError { + override def message: String = + s"No records were polled for more than $maxPollInterval for topic partition $topicPartition. " + + "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + + "needs more time." + } + + sealed trait CommitError extends ConsumerError + object CommitError { + final case object CommitTimeout extends CommitError { + override def message: String = "Commit timed out" + } + final case class UnknownCommitException(cause: Throwable) extends CommitError { + override def message: String = cause.getMessage + } + } val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] = ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _) - def live: RLayer[ConsumerSettings & Diagnostics, Consumer] = + def live: ZLayer[ConsumerSettings & Diagnostics, ConsumerError, Consumer] = ZLayer.scoped { for { settings <- ZIO.service[ConsumerSettings] @@ -185,13 +230,15 @@ object Consumer { def make( settings: ConsumerSettings, diagnostics: Diagnostics = Diagnostics.NoOp - ): ZIO[Scope, Throwable, Consumer] = + ): ZIO[Scope, SslValidationError, Consumer] = for { wrappedDiagnostics <- ConcurrentDiagnostics.make(diagnostics) _ <- ZIO.addFinalizer(wrappedDiagnostics.emit(Finalization.ConsumerFinalized)) - _ <- SslHelper.validateEndpoint(settings.driverSettings) - consumerAccess <- ConsumerAccess.make(settings) - runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics) + _ <- SslHelper + .validateEndpoint(settings.driverSettings) + .mapError(e => SslValidationError(e.getMessage)) + consumerAccess <- ConsumerAccess.make(settings) + runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics) } yield new ConsumerLive(consumerAccess, runloopAccess) /** @@ -203,7 +250,7 @@ object Consumer { javaConsumer: JConsumer[Array[Byte], Array[Byte]], settings: ConsumerSettings, diagnostics: Diagnostics = Diagnostics.NoOp - ): ZIO[Scope, Throwable, Consumer] = + ): ZIO[Scope, ConsumerError, Consumer] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized)) consumerAccess <- ConsumerAccess.make(javaConsumer) @@ -258,7 +305,9 @@ object Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = + ): ZStream[Consumer, ConsumerError, Chunk[ + (TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]]) + ]] = ZStream.serviceWithStream[Consumer](_.partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer)) /** @@ -270,8 +319,8 @@ object Consumer { valueDeserializer: Deserializer[R, V] ): ZStream[ Consumer, - Throwable, - (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]]) + ConsumerError, + (TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]]) ] = ZStream.serviceWithStream[Consumer](_.partitionedStream(subscription, keyDeserializer, valueDeserializer)) @@ -283,7 +332,7 @@ object Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int = 4 - ): ZStream[R & Consumer, Throwable, CommittableRecord[K, V]] = + ): ZStream[R & Consumer, ConsumerError, CommittableRecord[K, V]] = ZStream.serviceWithStream[Consumer]( _.plainStream(subscription, keyDeserializer, valueDeserializer, bufferSize) ) @@ -354,7 +403,7 @@ object Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) - )(f: ConsumerRecord[K, V] => URIO[R1, Unit]): RIO[R & R1, Unit] = + )(f: ConsumerRecord[K, V] => URIO[R1, Unit]): ZIO[R & R1, ConsumerError, Unit] = ZIO.scoped[R & R1] { Consumer .make(settings) @@ -435,6 +484,7 @@ private[consumer] final class ConsumerLive private[consumer] ( override def assignment: Task[Set[TopicPartition]] = consumer.withConsumer(_.assignment().asScala.toSet) + // TODO should we change all of these too..? override def beginningOffsets( partitions: Set[TopicPartition], timeout: Duration = Duration.Infinity @@ -469,7 +519,7 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = { + ): Stream[ConsumerError, Chunk[(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])]] = { val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) ZStream.unwrapScoped { @@ -481,9 +531,9 @@ private[consumer] final class ConsumerLive private[consumer] ( .map { _.collect { case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => - val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = + val stream: ZStream[R, PartitionStreamError, CommittableRecord[K, V]] = if (onlyByteArraySerdes) - partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] + partitionStream.asInstanceOf[ZStream[R, PartitionStreamError, CommittableRecord[K, V]]] else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) tp -> stream @@ -496,7 +546,7 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] = + ): ZStream[Any, ConsumerError, (TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])] = partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks override def plainStream[R, K, V]( @@ -504,7 +554,7 @@ private[consumer] final class ConsumerLive private[consumer] ( keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int - ): ZStream[R, Throwable, CommittableRecord[K, V]] = + ): ZStream[R, ConsumerError, CommittableRecord[K, V]] = partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar( n = Int.MaxValue, bufferSize = bufferSize @@ -518,10 +568,10 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], - commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) + commitRetryPolicy: Schedule[Any, CommitError, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) )( f: ConsumerRecord[K, V] => URIO[R1, Unit] - ): ZIO[R & R1, Throwable, Unit] = + ): ZIO[R & R1, ConsumerError, Unit] = for { r <- ZIO.environment[R & R1] _ <- partitionedStream(subscription, keyDeserializer, valueDeserializer) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 7e5fc01b5..d5a87472c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -2,7 +2,7 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.ConsumerConfig import zio._ -import zio.kafka.consumer.Consumer.OffsetRetrieval +import zio.kafka.consumer.Consumer.{ ConsumerError, OffsetRetrieval } import zio.kafka.consumer.fetch.{ FetchStrategy, QueueSizeBasedFetchStrategy } import zio.kafka.security.KafkaCredentialStore import zio.metrics.MetricLabel @@ -31,7 +31,7 @@ final case class ConsumerSettings( fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(), metricLabels: Set[MetricLabel] = Set.empty, runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis), - authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis) + authErrorRetrySchedule: Schedule[Any, ConsumerError, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis) ) { // Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType: require( @@ -316,7 +316,7 @@ final case class ConsumerSettings( * * The default is {{{Schedule.recurs(5) && Schedule.spaced(500.millis)}}} which is, to retry 5 times, spaced by 500ms. */ - def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, Throwable, Any]): ConsumerSettings = + def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, ConsumerError, Any]): ConsumerSettings = copy(authErrorRetrySchedule = authErrorRetrySchedule) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/InvalidSubscriptionUnion.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/InvalidSubscriptionUnion.scala deleted file mode 100644 index 40a5d4759..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/InvalidSubscriptionUnion.scala +++ /dev/null @@ -1,6 +0,0 @@ -package zio.kafka.consumer - -import zio.Chunk - -final case class InvalidSubscriptionUnion(subscriptions: Chunk[Subscription]) - extends RuntimeException(s"Unable to calculate union of subscriptions: ${subscriptions.mkString(",")}") diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala index 07907e8ff..0dc741651 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala @@ -2,14 +2,15 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata, RetriableCommitFailedException } import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task } +import zio.kafka.consumer.Consumer.CommitError +import zio.{ IO, Schedule, ZIO } sealed trait Offset { def topic: String def partition: Int def offset: Long - def commit: Task[Unit] + def commit: IO[CommitError, Unit] def batch: OffsetBatch def consumerGroupMetadata: Option[ConsumerGroupMetadata] def withMetadata(metadata: String): Offset @@ -21,7 +22,7 @@ sealed trait Offset { * Attempts to commit and retries according to the given policy when the commit fails with a * RetriableCommitFailedException */ - final def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = + final def commitOrRetry[R](policy: Schedule[R, CommitError, Any]): ZIO[R, CommitError, Unit] = Offset.commitOrRetry(commit, policy) final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) @@ -29,14 +30,14 @@ sealed trait Offset { object Offset { private[consumer] def commitOrRetry[R, B]( - commit: Task[Unit], - policy: Schedule[R, Throwable, B] - ): RIO[R, Unit] = + commit: IO[CommitError, Unit], + policy: Schedule[R, CommitError, B] + ): ZIO[R, CommitError, Unit] = commit.retry( - Schedule.recurWhile[Throwable] { - case _: RetriableCommitFailedException => true - case Consumer.CommitTimeout => true - case _ => false + Schedule.recurWhile[CommitError] { + case _: RetriableCommitFailedException => true + case Consumer.CommitError.CommitTimeout => true + case _ => false } && policy ) } @@ -45,11 +46,11 @@ private final case class OffsetImpl( topic: String, partition: Int, offset: Long, - commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + commitHandle: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata], metadata: Option[String] = None ) extends Offset { - def commit: Task[Unit] = commitHandle(Map(topicPartition -> asJavaOffsetAndMetadata)) + def commit: IO[CommitError, Unit] = commitHandle(Map(topicPartition -> asJavaOffsetAndMetadata)) def batch: OffsetBatch = OffsetBatchImpl( Map(topicPartition -> asJavaOffsetAndMetadata), commitHandle, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index 6c4b5a977..93805a14d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -2,11 +2,12 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task, ZIO } +import zio.kafka.consumer.Consumer.CommitError +import zio.{ IO, Schedule, ZIO } sealed trait OffsetBatch { def offsets: Map[TopicPartition, OffsetAndMetadata] - def commit: Task[Unit] + def commit: IO[CommitError, Unit] def add(offset: Offset): OffsetBatch @deprecated("Use add(Offset) instead", "2.1.4") def merge(offset: Offset): OffsetBatch @@ -17,7 +18,7 @@ sealed trait OffsetBatch { * Attempts to commit and retries according to the given policy when the commit fails with a * RetriableCommitFailedException */ - def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = + def commitOrRetry[R](policy: Schedule[R, CommitError, Any]): ZIO[R, CommitError, Unit] = Offset.commitOrRetry(commit, policy) } @@ -29,10 +30,10 @@ object OffsetBatch { private final case class OffsetBatchImpl( offsets: Map[TopicPartition, OffsetAndMetadata], - commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + commitHandle: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ) extends OffsetBatch { - override def commit: Task[Unit] = commitHandle(offsets) + override def commit: IO[CommitError, Unit] = commitHandle(offsets) override def add(offset: Offset): OffsetBatch = { val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match { @@ -64,7 +65,7 @@ private final case class OffsetBatchImpl( case object EmptyOffsetBatch extends OffsetBatch { override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty - override val commit: Task[Unit] = ZIO.unit + override val commit: IO[CommitError, Unit] = ZIO.unit override def add(offset: Offset): OffsetBatch = offset.batch override def merge(offset: Offset): OffsetBatch = add(offset) override def merge(offsets: OffsetBatch): OffsetBatch = offsets diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala index ea97c6f80..801d54051 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala @@ -2,6 +2,7 @@ package zio.kafka.consumer.diagnostics import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.Consumer.CommitError sealed trait DiagnosticEvent object DiagnosticEvent { @@ -17,7 +18,7 @@ object DiagnosticEvent { object Commit { final case class Started(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit final case class Success(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit - final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit + final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: CommitError) extends Commit } final case class Rebalance( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 21d1c4e2f..5c46034f5 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -48,7 +48,7 @@ private[consumer] final class ConsumerAccess( private[consumer] object ConsumerAccess { type ByteArrayKafkaConsumer = JConsumer[Array[Byte], Array[Byte]] - def make(settings: ConsumerSettings): ZIO[Scope, Throwable, ConsumerAccess] = + def make(settings: ConsumerSettings): ZIO[Scope, Nothing, ConsumerAccess] = for { consumer <- ZIO.acquireRelease { ZIO.attemptBlocking { @@ -57,14 +57,14 @@ private[consumer] object ConsumerAccess { new ByteArrayDeserializer(), new ByteArrayDeserializer() ) - } + }.orDie } { consumer => ZIO.blocking(ZIO.attempt(consumer.close(settings.closeTimeout))).orDie } result <- make(consumer) } yield result - def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] = + def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Nothing, ConsumerAccess] = for { access <- Semaphore.make(1) } yield new ConsumerAccess(consumer, access) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 85a83eb19..99a36ff75 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -1,6 +1,7 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.Consumer.PartitionStreamPullTimeout import zio.kafka.consumer.Offset import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.PartitionStreamControl.QueueInfo @@ -8,9 +9,6 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.stream.{ Take, ZStream } import zio.{ Chunk, Clock, Duration, LogAnnotation, Promise, Queue, Ref, UIO, ZIO } -import java.util.concurrent.TimeoutException -import scala.util.control.NoStackTrace - abstract class PartitionStream { def tp: TopicPartition def queueSize: UIO[Int] @@ -36,9 +34,9 @@ abstract class PartitionStream { */ final class PartitionStreamControl private ( val tp: TopicPartition, - stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], - dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], - interruptionPromise: Promise[Throwable, Nothing], + stream: ZStream[Any, PartitionStreamPullTimeout, ByteArrayCommittableRecord], + dataQueue: Queue[Take[Nothing, ByteArrayCommittableRecord]], + interruptionPromise: Promise[PartitionStreamPullTimeout, Nothing], val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], maxPollInterval: Duration @@ -85,13 +83,8 @@ final class PartitionStreamControl private ( queueInfoRef.get.map(_.deadlineExceeded(now)) /** To be invoked when the stream is no longer processing. */ - private[internal] def halt: UIO[Boolean] = { - val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " + - "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + - "needs more time." - val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace - interruptionPromise.fail(consumeTimeout) - } + private[internal] def halt: UIO[Boolean] = + interruptionPromise.fail(PartitionStreamPullTimeout(tp, maxPollInterval)) /** To be invoked when the partition was lost. It clears the queue and ends the stream. */ private[internal] def lost: UIO[Unit] = @@ -119,7 +112,8 @@ final class PartitionStreamControl private ( private[internal] def isRunning: ZIO[Any, Nothing, Boolean] = isCompleted.negate - private[internal] val tpStream: (TopicPartition, ZStream[Any, Throwable, ByteArrayCommittableRecord]) = + private[internal] val tpStream + : (TopicPartition, ZStream[Any, PartitionStreamPullTimeout, ByteArrayCommittableRecord]) = (tp, stream) } @@ -142,12 +136,14 @@ object PartitionStreamControl { for { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") - interruptionPromise <- Promise.make[Throwable, Nothing] + interruptionPromise <- Promise.make[PartitionStreamPullTimeout, Nothing] completedPromise <- Promise.make[Nothing, Option[Offset]] - dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] + dataQueue <- Queue.unbounded[Take[Nothing, ByteArrayCommittableRecord]] now <- Clock.nanoTime queueInfo <- Ref.make(QueueInfo(now, 0, None, 0)) - requestAndAwaitData = + requestAndAwaitData: ZIO[Any, PartitionStreamPullTimeout, Chunk[ + Take[Nothing, ByteArrayCommittableRecord] + ]] = for { _ <- commandQueue.offer(RunloopCommand.Request(tp)) _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) @@ -174,7 +170,11 @@ object PartitionStreamControl { }.flattenTake.chunksWith { s => s.tap(records => registerPull(queueInfo, records)) // Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen. - .mapZIO(chunk => interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk)) + .mapZIO(chunk => + interruptionPromise.await + .whenZIO(interruptionPromise.isDone) + .as(chunk) + ) } } yield new PartitionStreamControl( tp, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 35cf60fa5..b7dfbbbeb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -2,9 +2,25 @@ package zio.kafka.consumer.internal import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException, RebalanceInProgressException } +import org.apache.kafka.common.errors.{ + AuthenticationException, + AuthorizationException, + InterruptException, + RebalanceInProgressException, + WakeupException +} import zio._ -import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval } +import zio.kafka.consumer.Consumer.CommitError.{ CommitTimeout, UnknownCommitException } +import zio.kafka.consumer.Consumer.{ + AuthenticationError, + AuthorizationError, + CommitError, + ConsumerError, + GetManualOffsetsError, + InvalidSubscriptionUnion, + OffsetRetrieval, + UnknownConsumerException +} import zio.kafka.consumer._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } @@ -29,7 +45,7 @@ private[consumer] final class Runloop private ( commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], - partitionsHub: Hub[Take[Throwable, PartitionAssignment]], + partitionsHub: Hub[Take[ConsumerError, PartitionAssignment]], diagnostics: Diagnostics, maxRebalanceDuration: Duration, currentStateRef: Ref[State], @@ -253,10 +269,10 @@ private[consumer] final class Runloop private ( } /** This is the implementation behind the user facing api `Offset.commit`. */ - private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = + private val commit: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit] = offsets => for { - p <- Promise.make[Throwable, Unit] + p <- Promise.make[CommitError, Unit] startTime = java.lang.System.nanoTime() _ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p)) _ <- commandQueue.offer(RunloopCommand.CommitAvailable) @@ -285,7 +301,7 @@ private[consumer] final class Runloop private ( val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) } - val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) + val cont = (e: Exit[CommitError, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) // We assume the commit is started immediately after returning from this method. val startTime = java.lang.System.nanoTime() val onSuccess = { @@ -305,8 +321,12 @@ private[consumer] final class Runloop private ( _ <- commitQueue.offerAll(commits) _ <- commandQueue.offer(RunloopCommand.CommitAvailable) } yield () - case err: Throwable => - cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) + case e @ (_: WakeupException | _: InterruptException) => + // This should not happen unless we made a programming error + cont(Exit.die(e)) + case e: Throwable => + val error = UnknownCommitException(e) + cont(Exit.fail(error)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, error)) } val callback = new OffsetCommitCallback { @@ -415,17 +435,24 @@ private[consumer] final class Runloop private ( else ZIO.none /** @return the topic-partitions for which received records should be ignored */ - private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] = + private def doSeekForNewPartitions( + c: ByteArrayKafkaConsumer, + tps: Set[TopicPartition] + ): IO[GetManualOffsetsError, Set[TopicPartition]] = settings.offsetRetrieval match { case OffsetRetrieval.Auto(_) => ZIO.succeed(Set.empty) case OffsetRetrieval.Manual(getOffsets, _) => if (tps.isEmpty) ZIO.succeed(Set.empty) else - getOffsets(tps).flatMap { offsets => - ZIO - .attempt(offsets.foreach { case (tp, offset) => c.seek(tp, offset) }) - .as(offsets.keySet) - } + getOffsets(tps) + .mapError(GetManualOffsetsError) + .map(userTps => userTps.view.filterKeys(tps.contains).toMap) // Only offsets for TPs we requested + .flatMap { offsets => + ZIO + .attempt(offsets.foreach { case (tp, offset) => c.seek(tp, offset) }) + .orDie // Any exceptions indicates zio-kafka programming errors, see seek() docs + .as(offsets.keySet) + } } /** @@ -434,7 +461,7 @@ private[consumer] final class Runloop private ( private def resumeAndPausePartitions( c: ByteArrayKafkaConsumer, requestedPartitions: Set[TopicPartition] - ): Task[(Int, Int)] = ZIO.attempt { + ): UIO[(Int, Int)] = ZIO.attempt { val assignment = c.assignment().asScala.toSet val toResume = assignment intersect requestedPartitions val toPause = assignment -- requestedPartitions @@ -443,25 +470,37 @@ private[consumer] final class Runloop private ( if (toPause.nonEmpty) c.pause(toPause.asJava) (toResume.size, toPause.size) - } + }.orDie // resume() and pause() throw IllegalStateExceptions when accessed concurrently or when partition is not currently assigned, which indicates a programming error on our side - private def doPoll(c: ByteArrayKafkaConsumer): Task[ConsumerRecords[Array[Byte], Array[Byte]]] = + private def doPoll(c: ByteArrayKafkaConsumer): IO[ConsumerError, ConsumerRecords[Array[Byte], Array[Byte]]] = ZIO.attempt { val recordsOrNull = c.poll(settings.pollTimeout) if (recordsOrNull eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]() else recordsOrNull + }.catchSome { case _: WakeupException => + ZIO.interrupt + }.refineOrDie { + // These two exceptions would indicate zio-kafka programming errors + case e if !e.isInstanceOf[IllegalStateException] && !e.isInstanceOf[IllegalArgumentException] => e + }.mapError { + case e: AuthorizationException => + AuthorizationError(e) + case e: AuthenticationException => + AuthenticationError(e) + case e => + UnknownConsumerException(e) } // Recover from spurious auth failures: .retry( - Schedule.recurWhileZIO[Any, Throwable] { - case _: AuthorizationException | _: AuthenticationException => + Schedule.recurWhileZIO[Any, ConsumerError] { + case _: AuthorizationError | _: AuthenticationError => consumerMetrics.observePollAuthError().as(true) case _ => ZIO.succeed(false) } && settings.authErrorRetrySchedule ) - private def handlePoll(state: State): Task[State] = { + private def handlePoll(state: State): IO[ConsumerError, State] = { for { partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) _ <- ZIO.logDebug( @@ -545,7 +584,10 @@ private[consumer] final class Runloop private ( // NOTE: the type annotation is needed to keep the IntelliJ compiler happy. _ <- committedOffsetsRef - .update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit] + .update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): IO[ + ConsumerError, + Unit + ] _ <- consumerMetrics.observeRebalance( currentAssigned.size, @@ -612,8 +654,8 @@ private[consumer] final class Runloop private ( _ <- shutdown.when(anyExceeded) } yield () - private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = { - def doChangeSubscription(newSubscriptionState: SubscriptionState): Task[State] = + private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): IO[ConsumerError, State] = { + def doChangeSubscription(newSubscriptionState: SubscriptionState): IO[ConsumerError, State] = applyNewSubscriptionState(newSubscriptionState).flatMap { newAssignedStreams => val newState = state.copy( assignedStreams = state.assignedStreams ++ newAssignedStreams, @@ -693,27 +735,45 @@ private[consumer] final class Runloop private ( private def applyNewSubscriptionState( newSubscriptionState: SubscriptionState - ): Task[Chunk[PartitionStreamControl]] = + ): IO[ConsumerError, Chunk[PartitionStreamControl]] = consumer.runloopAccess { c => newSubscriptionState match { case SubscriptionState.NotSubscribed => ZIO .attempt(c.unsubscribe()) + .refineOrDie { case e if !e.isInstanceOf[IllegalStateException] => e } + .mapError(UnknownConsumerException) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Pattern(pattern)) => val rebalanceListener = makeRebalanceListener ZIO .attempt(c.subscribe(pattern.pattern, rebalanceListener)) + .refineOrDie { + // This indicates zio-kafka programming errors, see subscribe() docs + case e if !e.isInstanceOf[IllegalStateException] && !e.isInstanceOf[IllegalArgumentException] => e + } + .mapError(UnknownConsumerException) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Topics(topics)) => val rebalanceListener = makeRebalanceListener ZIO .attempt(c.subscribe(topics.asJava, rebalanceListener)) + .refineOrDie { + // This indicates zio-kafka programming errors, see subscribe() docs + case e if !e.isInstanceOf[IllegalStateException] && !e.isInstanceOf[IllegalArgumentException] => e + } + .mapError(UnknownConsumerException) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Manual(topicPartitions)) => // For manual subscriptions we have to do some manual work before starting the run loop for { - _ <- ZIO.attempt(c.assign(topicPartitions.asJava)) + _ <- ZIO + .attempt(c.assign(topicPartitions.asJava)) + .refineOrDie { + // This indicates zio-kafka programming errors, see assign() docs + case e if !e.isInstanceOf[IllegalStateException] => e + } + .mapError(UnknownConsumerException) _ <- doSeekForNewPartitions(c, topicPartitions) partitionStreams <- ZIO.foreach(Chunk.fromIterable(topicPartitions))(newPartitionStream) _ <- partitionsHub.publish(Take.chunk(partitionStreams.map(_.tpStream))) @@ -733,7 +793,7 @@ private[consumer] final class Runloop private ( * - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after * initialization and rebalancing */ - private def run(initialState: State): ZIO[Scope, Throwable, Any] = { + private def run(initialState: State): ZIO[Scope, Nothing, Any] = { import Runloop.StreamOps ZStream @@ -760,6 +820,7 @@ private[consumer] final class Runloop private ( } .tapErrorCause(cause => ZIO.logErrorCause("Error in Runloop", cause)) .onError(cause => partitionsHub.offer(Take.failCause(cause))) + .ignore } private def observeRunloopMetrics(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ZIO[Any, Nothing, Unit] = { @@ -859,7 +920,7 @@ object Runloop { private[internal] final case class Commit( createdAt: NanoTime, offsets: Map[TopicPartition, OffsetAndMetadata], - cont: Promise[Throwable, Unit] + cont: Promise[CommitError, Unit] ) { @inline def isDone: UIO[Boolean] = cont.isDone @inline def isPending: UIO[Boolean] = isDone.negate @@ -871,7 +932,7 @@ object Runloop { maxRebalanceDuration: Duration, diagnostics: Diagnostics, consumer: ConsumerAccess, - partitionsHub: Hub[Take[Throwable, PartitionAssignment]] + partitionsHub: Hub[Take[ConsumerError, PartitionAssignment]] ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) @@ -905,7 +966,7 @@ object Runloop { // Run the entire loop on a dedicated thread to avoid executor shifts executor <- RunloopExecutor.newInstance fiber <- ZIO.onExecutor(executor)(runloop.run(initialState)).forkScoped - waitForRunloopStop = fiber.join.orDie + waitForRunloopStop = fiber.join _ <- ZIO.addFinalizer( ZIO.logDebug("Shutting down Runloop") *> diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 6c90d2d1c..efe47ed3e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -2,13 +2,14 @@ package zio.kafka.consumer.internal import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition +import zio._ +import zio.kafka.consumer.Consumer.{ ConsumerError, InvalidSubscriptionUnion, PartitionStreamPullTimeout } import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.{ ConsumerSettings, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } -import zio._ private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -26,7 +27,7 @@ private[internal] object RunloopState { */ private[consumer] final class RunloopAccess private ( runloopStateRef: Ref.Synchronized[RunloopState], - partitionHub: Hub[Take[Throwable, PartitionAssignment]], + partitionHub: Hub[Take[ConsumerError, PartitionAssignment]], makeRunloop: UIO[Runloop], diagnostics: Diagnostics ) { @@ -55,7 +56,7 @@ private[consumer] final class RunloopAccess private ( */ def subscribe( subscription: Subscription - ): ZIO[Scope, InvalidSubscriptionUnion, UStream[Take[Throwable, PartitionAssignment]]] = + ): ZIO[Scope, InvalidSubscriptionUnion, UStream[Take[ConsumerError, PartitionAssignment]]] = for { stream <- ZStream.fromHubScoped(partitionHub) // starts the Runloop if not already started @@ -69,22 +70,22 @@ private[consumer] final class RunloopAccess private ( } private[consumer] object RunloopAccess { - type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord]) + type PartitionAssignment = (TopicPartition, Stream[PartitionStreamPullTimeout, ByteArrayCommittableRecord]) def make( settings: ConsumerSettings, consumerAccess: ConsumerAccess, diagnostics: Diagnostics = Diagnostics.NoOp - ): ZIO[Scope, Throwable, RunloopAccess] = + ): ZIO[Scope, Nothing, RunloopAccess] = for { - maxPollInterval <- maxPollIntervalConfig(settings) + maxPollInterval <- ZIO.succeed(maxPollIntervalConfig(settings)) // See scaladoc of [[ConsumerSettings.withMaxRebalanceDuration]]: maxRebalanceDuration = settings.maxRebalanceDuration.getOrElse(((maxPollInterval.toNanos / 5L) * 3L).nanos) // This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer // When the Consumer is shutdown, the Runloop and the Hub will be shutdown too (before the consumer) consumerScope <- ZIO.scope partitionsHub <- ZIO - .acquireRelease(Hub.unbounded[Take[Throwable, PartitionAssignment]])(_.shutdown) + .acquireRelease(Hub.unbounded[Take[ConsumerError, PartitionAssignment]])(_.shutdown) .provide(ZLayer.succeed(consumerScope)) runloopStateRef <- Ref.Synchronized.make[RunloopState](RunloopState.NotStarted) makeRunloop = Runloop @@ -101,7 +102,7 @@ private[consumer] object RunloopAccess { .provide(ZLayer.succeed(consumerScope)) } yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics) - private def maxPollIntervalConfig(settings: ConsumerSettings): Task[Duration] = ZIO.attempt { + private def maxPollIntervalConfig(settings: ConsumerSettings): Duration = { def defaultMaxPollInterval: Int = ConsumerConfig .configDef() .defaultValues() diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index be43f585c..64f855abc 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -2,7 +2,8 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio._ -import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.Consumer.InvalidSubscriptionUnion +import zio.kafka.consumer.Subscription sealed trait RunloopCommand object RunloopCommand { diff --git a/zio-kafka/src/main/scala/zio/kafka/serde/Deserializer.scala b/zio-kafka/src/main/scala/zio/kafka/serde/Deserializer.scala index 3218664c4..51831afcb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/serde/Deserializer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/serde/Deserializer.scala @@ -2,10 +2,10 @@ package zio.kafka.serde import org.apache.kafka.common.header.Headers import org.apache.kafka.common.serialization.{ Deserializer => KafkaDeserializer } -import zio.{ RIO, Task, ZIO } +import zio.kafka.consumer.Consumer.DeserializationError +import zio.{ Task, ZIO } import scala.jdk.CollectionConverters._ -import scala.util.{ Failure, Success, Try } /** * Deserializer from byte array to a value of some type T @@ -16,7 +16,7 @@ import scala.util.{ Failure, Success, Try } * Value type */ trait Deserializer[-R, +T] { - def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] + def deserialize(topic: String, headers: Headers, data: Array[Byte]): ZIO[R, DeserializationError, T] /** * Returns a new deserializer that executes its deserialization function on the blocking threadpool. @@ -32,7 +32,9 @@ trait Deserializer[-R, +T] { /** * Create a deserializer for a type U based on the deserializer for type T and an effectful mapping function */ - def mapM[R1 <: R, U](f: T => RIO[R1, U]): Deserializer[R1, U] = Deserializer(deserialize(_, _, _).flatMap(f)) + def mapM[R1 <: R, U](f: T => ZIO[R1, DeserializationError, U]): Deserializer[R1, U] = Deserializer( + deserialize(_, _, _).flatMap(f) + ) /** * When this serializer fails, attempt to deserialize with the alternative @@ -49,8 +51,8 @@ trait Deserializer[-R, +T] { * * This is useful for explicitly handling deserialization failures. */ - def asTry: Deserializer[R, Try[T]] = - Deserializer(deserialize(_, _, _).fold(e => Failure(e), v => Success(v))) + def either: Deserializer[R, Either[DeserializationError, T]] = + Deserializer(deserialize(_, _, _).fold(e => Left(e), v => Right(v))) /** * Returns a new deserializer that deserializes values as Option values, mapping null data to None values. @@ -64,7 +66,7 @@ object Deserializer extends Serdes { /** * Create a deserializer from a function */ - def apply[R, T](deser: (String, Headers, Array[Byte]) => RIO[R, T]): Deserializer[R, T] = + def apply[R, T](deser: (String, Headers, Array[Byte]) => ZIO[R, DeserializationError, T]): Deserializer[R, T] = (topic: String, headers: Headers, data: Array[Byte]) => deser(topic, headers, data) /** @@ -79,8 +81,11 @@ object Deserializer extends Serdes { .attempt(deserializer.configure(props.asJava, isKey)) .as( new Deserializer[Any, T] { - override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Task[T] = - ZIO.attempt(deserializer.deserialize(topic, headers, data)) + override def deserialize(topic: String, headers: Headers, data: Array[Byte]) + : ZIO[Any, DeserializationError, T] = + ZIO + .attempt(deserializer.deserialize(topic, headers, data)) + .mapError(e => DeserializationError(e.getMessage, Some(e))) } ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/serde/Serde.scala b/zio-kafka/src/main/scala/zio/kafka/serde/Serde.scala index a6bd842ad..6c47b7c95 100644 --- a/zio-kafka/src/main/scala/zio/kafka/serde/Serde.scala +++ b/zio-kafka/src/main/scala/zio/kafka/serde/Serde.scala @@ -2,10 +2,10 @@ package zio.kafka.serde import org.apache.kafka.common.header.Headers import org.apache.kafka.common.serialization.{ Serde => KafkaSerde } +import zio.kafka.consumer.Consumer.DeserializationError import zio.{ RIO, Task, ZIO } import scala.jdk.CollectionConverters._ -import scala.util.Try /** * A serializer and deserializer for values of type T @@ -38,7 +38,7 @@ trait Serde[-R, T] extends Deserializer[R, T] with Serializer[R, T] { /** * Convert to a Serde of type U with effectful transformations */ - def inmapM[R1 <: R, U](f: T => RIO[R1, U])(g: U => RIO[R1, T]): Serde[R1, U] = + def inmapM[R1 <: R, U](f: T => ZIO[R1, DeserializationError, U])(g: U => RIO[R1, T]): Serde[R1, U] = Serde(mapM(f))(contramapM(g)) } @@ -50,12 +50,16 @@ object Serde extends Serdes { * The (de)serializer functions can returned a failure ZIO with a Throwable to indicate (de)serialization failure */ def apply[R, T]( - deser: (String, Headers, Array[Byte]) => RIO[R, T] + deser: (String, Headers, Array[Byte]) => ZIO[R, DeserializationError, T] )(ser: (String, Headers, T) => RIO[R, Array[Byte]]): Serde[R, T] = new Serde[R, T] { override final def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]] = ser(topic, headers, value) - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] = + override final def deserialize( + topic: String, + headers: Headers, + data: Array[Byte] + ): ZIO[R, DeserializationError, T] = deser(topic, headers, data) } @@ -66,7 +70,11 @@ object Serde extends Serdes { new Serde[R, T] { override final def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]] = ser.serialize(topic, headers, value) - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] = + override final def deserialize( + topic: String, + headers: Headers, + data: Array[Byte] + ): ZIO[R, DeserializationError, T] = deser.deserialize(topic, headers, data) } @@ -81,14 +89,14 @@ object Serde extends Serdes { private final val serializer = serde.serializer() private final val deserializer = serde.deserializer() - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): Task[T] = - ZIO.attempt(deserializer.deserialize(topic, headers, data)) + override final def deserialize(topic: String, headers: Headers, data: Array[Byte]) + : ZIO[Any, DeserializationError, T] = + ZIO + .attempt(deserializer.deserialize(topic, headers, data)) + .mapError(e => DeserializationError(e.getMessage, Some(e))) override final def serialize(topic: String, headers: Headers, value: T): Task[Array[Byte]] = ZIO.attempt(serializer.serialize(topic, headers, value)) } ) - - implicit def deserializerWithError[R, T](implicit deser: Deserializer[R, T]): Deserializer[R, Try[T]] = - deser.asTry } diff --git a/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala b/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala index f02577f72..cba1f112b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala +++ b/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala @@ -3,6 +3,7 @@ package zio.kafka.serde import org.apache.kafka.common.header.Headers import org.apache.kafka.common.serialization.{ Serde => KafkaSerde, Serdes => KafkaSerdes } import org.apache.kafka.common.utils.Bytes +import zio.kafka.consumer.Consumer.DeserializationError import zio.{ RIO, ZIO } import java.nio.ByteBuffer @@ -32,7 +33,11 @@ private[zio] trait Serdes { override final def serialize(topic: String, headers: Headers, value: Array[Byte]): RIO[Any, Array[Byte]] = ZIO.succeed(value) - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[Any, Array[Byte]] = + override final def deserialize( + topic: String, + headers: Headers, + data: Array[Byte] + ): ZIO[Any, DeserializationError, Array[Byte]] = ZIO.succeed(data) } @@ -41,8 +46,14 @@ private[zio] trait Serdes { private final val serializer = serde.serializer() private final val deserializer = serde.deserializer() - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[Any, T] = - ZIO.attempt(deserializer.deserialize(topic, headers, data)) + override final def deserialize( + topic: String, + headers: Headers, + data: Array[Byte] + ): ZIO[Any, DeserializationError, T] = + ZIO + .attempt(deserializer.deserialize(topic, headers, data)) + .mapError(e => DeserializationError(e.getMessage, Some(e))) override final def serialize(topic: String, headers: Headers, value: T): RIO[Any, Array[Byte]] = ZIO.attempt(serializer.serialize(topic, headers, value)) diff --git a/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala b/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala index 446ed00a6..130f2959e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala +++ b/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala @@ -105,7 +105,7 @@ object SslHelper { * Mimic behaviour of KafkaAdminClient.createInternal */ private def kafkaException(e: Throwable): KafkaException = - new KafkaException("Failed to create new KafkaAdminClient", e) + new KafkaException("Failed to validate SSL configuration", e) /** * Let's take some time here to discuss the algorithm of this function as it's a bit tricky and uses obscure Java