Skip to content

Commit

Permalink
More specific errors than Throwable for Consumer
Browse files Browse the repository at this point in the history
Implements #241

Build still fails because test and bench projects have not been migrated yet, first want to gather some feedback on this change.
  • Loading branch information
svroonland committed Nov 3, 2024
1 parent d59e8f6 commit 1a26f9d
Show file tree
Hide file tree
Showing 18 changed files with 283 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._
import zio.kafka.consumer.Consumer.ConsumerError
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription }
import zio.kafka.serde.Serde
Expand Down Expand Up @@ -49,7 +50,7 @@ object Main extends ZIOAppDefault {
.withGroupId("test")
}

private val runConsumerStream: ZIO[Consumer, Throwable, Unit] =
private val runConsumerStream: ZIO[Consumer, ConsumerError, Unit] =
for {
_ <- ZIO.logInfo("Consuming messages...")
consumed <- Consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
import zio._
import zio.kafka.admin._
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, ConsumerError, OffsetRetrieval }
import zio.kafka.consumer._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.producer._
Expand Down Expand Up @@ -192,7 +192,7 @@ object KafkaTestUtils {
* Utility function to make a Consumer. It requires a ConsumerSettings layer.
*/
@deprecated("Use [[KafkaTestUtils.minimalConsumer]] instead", "2.3.1")
def simpleConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, Throwable, Consumer] =
def simpleConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, ConsumerError, Consumer] =
ZLayer.makeSome[ConsumerSettings, Consumer](
ZLayer.succeed(diagnostics) >>> Consumer.live
)
Expand All @@ -203,7 +203,7 @@ object KafkaTestUtils {
* "minimal" because, unlike the other functions returning a `ZLayer[..., ..., Consumer]` of this file, you need to
* provide the `ConsumerSettings` layer yourself.
*/
def minimalConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, Throwable, Consumer] =
def minimalConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, ConsumerError, Consumer] =
ZLayer.makeSome[ConsumerSettings, Consumer](
ZLayer.succeed(diagnostics) >>> Consumer.live
)
Expand All @@ -223,7 +223,7 @@ object KafkaTestUtils {
maxRebalanceDuration: Duration = 3.minutes,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): ZLayer[Kafka, Throwable, Consumer] =
): ZLayer[Kafka, ConsumerError, Consumer] =
(ZLayer(
consumerSettings(
clientId = clientId,
Expand Down Expand Up @@ -253,7 +253,7 @@ object KafkaTestUtils {
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty,
rebalanceListener: RebalanceListener = RebalanceListener.noop
): ZLayer[Kafka, Throwable, Consumer] =
): ZLayer[Kafka, ConsumerError, Consumer] =
(ZLayer(
transactionalConsumerSettings(
groupId = groupId,
Expand All @@ -274,7 +274,7 @@ object KafkaTestUtils {
*/
def consumeWithStrings(clientId: String, groupId: Option[String] = None, subscription: Subscription)(
r: ConsumerRecord[String, String] => URIO[Any, Unit]
): RIO[Kafka, Unit] =
): ZIO[Kafka, ConsumerError, Unit] =
consumerSettings(clientId, groupId, None).flatMap { settings =>
Consumer.consumeWith[Any, Any, String, String](
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import zio.kafka.consumer.Consumer.{ CommitError, DeserializationError }
import zio.kafka.serde.Deserializer
import zio.{ RIO, Task }
import zio.{ IO, ZIO }

final case class CommittableRecord[K, V](
record: ConsumerRecord[K, V],
private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit],
private val consumerGroupMetadata: Option[ConsumerGroupMetadata]
) {
def deserializeWith[R, K1, V1](
keyDeserializer: Deserializer[R, K1],
valueDeserializer: Deserializer[R, V1]
)(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): RIO[R, CommittableRecord[K1, V1]] =
)(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): ZIO[R, DeserializationError, CommittableRecord[K1, V1]] =
for {
key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key())
value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value())
Expand Down Expand Up @@ -52,7 +53,7 @@ final case class CommittableRecord[K, V](
object CommittableRecord {
def apply[K, V](
record: ConsumerRecord[K, V],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
): CommittableRecord[K, V] =
new CommittableRecord(
Expand Down
100 changes: 75 additions & 25 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import org.apache.kafka.clients.consumer.{
OffsetAndTimestamp
}
import org.apache.kafka.common._
import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException }
import zio._
import zio.kafka.consumer.Consumer.{ CommitError, ConsumerError, PartitionStreamError }
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.diagnostics.Diagnostics.ConcurrentDiagnostics
Expand All @@ -17,7 +19,6 @@ import zio.kafka.utils.SslHelper
import zio.stream._

import scala.jdk.CollectionConverters._
import scala.util.control.NoStackTrace

trait Consumer {

Expand Down Expand Up @@ -68,7 +69,7 @@ trait Consumer {
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]
): Stream[ConsumerError, Chunk[(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])]]

/**
* Create a stream with messages on the subscribed topic-partitions by topic-partition
Expand All @@ -91,7 +92,7 @@ trait Consumer {
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]
): Stream[ConsumerError, (TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])]

/**
* Create a stream with all messages on the subscribed topic-partitions
Expand All @@ -116,7 +117,7 @@ trait Consumer {
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4
): ZStream[R, Throwable, CommittableRecord[K, V]]
): ZStream[R, ConsumerError, CommittableRecord[K, V]]

/**
* Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit
Expand All @@ -131,10 +132,10 @@ trait Consumer {
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
commitRetryPolicy: Schedule[Any, CommitError, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(
f: ConsumerRecord[K, V] => URIO[R1, Unit]
): ZIO[R & R1, Throwable, Unit]
): ZIO[R & R1, ConsumerError, Unit]

/**
* Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest
Expand All @@ -161,12 +162,56 @@ trait Consumer {
}

object Consumer {
case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace
sealed trait ConsumerError {
def message: String
}

final case class AuthorizationError(cause: AuthorizationException) extends ConsumerError {
override def message: String = cause.getMessage
}

final case class AuthenticationError(cause: AuthenticationException) extends ConsumerError {
override def message: String = cause.getMessage
}

final case class SslValidationError(message: String) extends ConsumerError
final case class UnknownConsumerException(cause: Throwable) extends ConsumerError {
override def message: String = cause.getMessage
}
sealed trait SubscriptionError extends ConsumerError
final case class InvalidSubscriptionUnion(subscriptions: Chunk[Subscription]) extends SubscriptionError {
override def message: String = s"Unable to calculate union of subscriptions: ${subscriptions.mkString(",")}"
}
final case class InvalidTopic(topic: String, validationError: String) extends SubscriptionError {
override def message: String = s"Invalid topic '${topic}': ${validationError}"
}
final case class GetManualOffsetsError(cause: Throwable) extends ConsumerError {
override def message: String = s"Unable to retrieve manual offsets: ${cause.getMessage}"
}
sealed trait PartitionStreamError extends ConsumerError
final case class DeserializationError(message: String, cause: Option[Throwable] = None) extends PartitionStreamError
final case class PartitionStreamPullTimeout(topicPartition: TopicPartition, maxPollInterval: Duration)
extends PartitionStreamError {
override def message: String =
s"No records were polled for more than $maxPollInterval for topic partition $topicPartition. " +
"Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " +
"needs more time."
}

sealed trait CommitError extends ConsumerError
object CommitError {
final case object CommitTimeout extends CommitError {
override def message: String = "Commit timed out"
}
final case class UnknownCommitException(cause: Throwable) extends CommitError {
override def message: String = cause.getMessage
}
}

val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] =
ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _)

def live: RLayer[ConsumerSettings & Diagnostics, Consumer] =
def live: ZLayer[ConsumerSettings & Diagnostics, ConsumerError, Consumer] =
ZLayer.scoped {
for {
settings <- ZIO.service[ConsumerSettings]
Expand All @@ -185,13 +230,15 @@ object Consumer {
def make(
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
): ZIO[Scope, SslValidationError, Consumer] =
for {
wrappedDiagnostics <- ConcurrentDiagnostics.make(diagnostics)
_ <- ZIO.addFinalizer(wrappedDiagnostics.emit(Finalization.ConsumerFinalized))
_ <- SslHelper.validateEndpoint(settings.driverSettings)
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics)
_ <- SslHelper
.validateEndpoint(settings.driverSettings)
.mapError(e => SslValidationError(e.getMessage))
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
Expand All @@ -203,7 +250,7 @@ object Consumer {
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
): ZIO[Scope, ConsumerError, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
consumerAccess <- ConsumerAccess.make(javaConsumer)
Expand Down Expand Up @@ -258,7 +305,9 @@ object Consumer {
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] =
): ZStream[Consumer, ConsumerError, Chunk[
(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])
]] =
ZStream.serviceWithStream[Consumer](_.partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer))

/**
Expand All @@ -270,8 +319,8 @@ object Consumer {
valueDeserializer: Deserializer[R, V]
): ZStream[
Consumer,
Throwable,
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])
ConsumerError,
(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])
] =
ZStream.serviceWithStream[Consumer](_.partitionedStream(subscription, keyDeserializer, valueDeserializer))

Expand All @@ -283,7 +332,7 @@ object Consumer {
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4
): ZStream[R & Consumer, Throwable, CommittableRecord[K, V]] =
): ZStream[R & Consumer, ConsumerError, CommittableRecord[K, V]] =
ZStream.serviceWithStream[Consumer](
_.plainStream(subscription, keyDeserializer, valueDeserializer, bufferSize)
)
Expand Down Expand Up @@ -354,7 +403,7 @@ object Consumer {
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(f: ConsumerRecord[K, V] => URIO[R1, Unit]): RIO[R & R1, Unit] =
)(f: ConsumerRecord[K, V] => URIO[R1, Unit]): ZIO[R & R1, ConsumerError, Unit] =
ZIO.scoped[R & R1] {
Consumer
.make(settings)
Expand Down Expand Up @@ -435,6 +484,7 @@ private[consumer] final class ConsumerLive private[consumer] (
override def assignment: Task[Set[TopicPartition]] =
consumer.withConsumer(_.assignment().asScala.toSet)

// TODO should we change all of these too..?
override def beginningOffsets(
partitions: Set[TopicPartition],
timeout: Duration = Duration.Infinity
Expand Down Expand Up @@ -469,7 +519,7 @@ private[consumer] final class ConsumerLive private[consumer] (
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = {
): Stream[ConsumerError, Chunk[(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])]] = {
val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray)

ZStream.unwrapScoped {
Expand All @@ -481,9 +531,9 @@ private[consumer] final class ConsumerLive private[consumer] (
.map {
_.collect {
case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) =>
val stream: ZStream[R, Throwable, CommittableRecord[K, V]] =
val stream: ZStream[R, PartitionStreamError, CommittableRecord[K, V]] =
if (onlyByteArraySerdes)
partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]]
partitionStream.asInstanceOf[ZStream[R, PartitionStreamError, CommittableRecord[K, V]]]
else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer)))

tp -> stream
Expand All @@ -496,15 +546,15 @@ private[consumer] final class ConsumerLive private[consumer] (
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] =
): ZStream[Any, ConsumerError, (TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])] =
partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks

override def plainStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int
): ZStream[R, Throwable, CommittableRecord[K, V]] =
): ZStream[R, ConsumerError, CommittableRecord[K, V]] =
partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar(
n = Int.MaxValue,
bufferSize = bufferSize
Expand All @@ -518,10 +568,10 @@ private[consumer] final class ConsumerLive private[consumer] (
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
commitRetryPolicy: Schedule[Any, CommitError, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(
f: ConsumerRecord[K, V] => URIO[R1, Unit]
): ZIO[R & R1, Throwable, Unit] =
): ZIO[R & R1, ConsumerError, Unit] =
for {
r <- ZIO.environment[R & R1]
_ <- partitionedStream(subscription, keyDeserializer, valueDeserializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.kafka.consumer

import org.apache.kafka.clients.consumer.ConsumerConfig
import zio._
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.Consumer.{ ConsumerError, OffsetRetrieval }
import zio.kafka.consumer.fetch.{ FetchStrategy, QueueSizeBasedFetchStrategy }
import zio.kafka.security.KafkaCredentialStore
import zio.metrics.MetricLabel
Expand Down Expand Up @@ -31,7 +31,7 @@ final case class ConsumerSettings(
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(),
metricLabels: Set[MetricLabel] = Set.empty,
runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis),
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
authErrorRetrySchedule: Schedule[Any, ConsumerError, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
) {
// Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType:
require(
Expand Down Expand Up @@ -316,7 +316,7 @@ final case class ConsumerSettings(
*
* The default is {{{Schedule.recurs(5) && Schedule.spaced(500.millis)}}} which is, to retry 5 times, spaced by 500ms.
*/
def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, Throwable, Any]): ConsumerSettings =
def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, ConsumerError, Any]): ConsumerSettings =
copy(authErrorRetrySchedule = authErrorRetrySchedule)

}
Expand Down

This file was deleted.

Loading

0 comments on commit 1a26f9d

Please sign in to comment.