Skip to content

Commit 7561dde

Browse files
author
Jan Kolena
committed
Extract Channel-related stuff from ConsumerBase to ConsumerChannelOps
Optionally don't return any delivery result from consumer... Fix streaming consumer closing
1 parent 831ca6a commit 7561dde

11 files changed

+397
-241
lines changed

core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala

+7-108
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,28 @@ import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Timer}
44
import cats.implicits.{catsSyntaxApplicativeError, toFunctorOps}
55
import cats.syntax.flatMap._
66
import com.avast.bytes.Bytes
7-
import com.avast.clients.rabbitmq.DefaultRabbitMQConsumer._
87
import com.avast.clients.rabbitmq.JavaConverters.AmqpPropertiesConversions
98
import com.avast.clients.rabbitmq.api._
109
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
1110
import com.avast.metrics.scalaapi.Monitor
12-
import com.rabbitmq.client.AMQP.BasicProperties
1311
import com.rabbitmq.client.{AMQP, Envelope}
1412
import org.slf4j.event.Level
1513

1614
import scala.concurrent.TimeoutException
1715
import scala.concurrent.duration.{Duration, FiniteDuration}
18-
import scala.jdk.CollectionConverters._
1916
import scala.util._
2017

2118
// it's case-class to have `copy` method for free....
2219
final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A](
2320
consumerName: String,
2421
queueName: String,
25-
channel: ServerChannel,
2622
blocker: Blocker,
27-
republishStrategy: RepublishStrategy[F],
28-
poisonedMessageHandler: PoisonedMessageHandler[F, A],
29-
connectionInfo: RabbitMQConnectionInfo,
3023
consumerLogger: ImplicitContextLogger[F],
3124
consumerRootMonitor: Monitor)(implicit val contextShift: ContextShift[F], implicit val deliveryConverter: DeliveryConverter[A]) {
3225

3326
val F: ConcurrentEffect[F] = ConcurrentEffect[F] // scalastyle:ignore
3427

35-
val resultsMonitor: Monitor = consumerRootMonitor.named("results")
36-
private val resultAckMeter = resultsMonitor.meter("ack")
37-
private val resultRejectMeter = resultsMonitor.meter("reject")
38-
private val resultRetryMeter = resultsMonitor.meter("retry")
39-
private val resultRepublishMeter = resultsMonitor.meter("republish")
28+
private val timeoutsMeter = consumerRootMonitor.meter("timeouts")
4029

4130
def parseDelivery(envelope: Envelope, rawBody: Bytes, properties: AMQP.BasicProperties): F[DeliveryWithMetadata[A]] = {
4231
val metadata = DeliveryMetadata.from(envelope, properties)
@@ -48,122 +37,34 @@ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A
4837
case Success(Right(a)) =>
4938
val delivery = Delivery(a, metadata.fixedProperties.asScala, metadata.routingKey.value)
5039

51-
consumerLogger.trace(s"[$consumerName] Received delivery: ${delivery.copy(body = rawBody)}").as {
40+
consumerLogger.trace(s"[$consumerName] Received delivery from queue '$queueName': ${delivery.copy(body = rawBody)}").as {
5241
delivery
5342
}
5443

5544
case Success(Left(ce)) =>
5645
val delivery = Delivery.MalformedContent(rawBody, metadata.fixedProperties.asScala, metadata.routingKey.value, ce)
5746

58-
consumerLogger.trace(s"[$consumerName] Received delivery but could not convert it: $delivery").as {
47+
consumerLogger.trace(s"[$consumerName] Received delivery from queue '$queueName' but could not convert it: $delivery").as {
5948
delivery
6049
}
6150

6251
case Failure(ce) =>
6352
val ex = ConversionException("Unexpected failure", ce)
6453
val delivery = Delivery.MalformedContent(rawBody, metadata.fixedProperties.asScala, metadata.routingKey.value, ex)
6554

66-
consumerLogger.trace(s"[$consumerName] Received delivery but could not convert it as the convertor has failed: $delivery").as {
67-
delivery
68-
}
55+
consumerLogger
56+
.trace(
57+
s"[$consumerName] Received delivery from queue '$queueName' but could not convert it as the convertor has failed: $delivery")
58+
.as(delivery)
6959
}
7060
.map(DeliveryWithMetadata(_, metadata))
7161
}
7262

73-
def handleResult(messageId: MessageId,
74-
deliveryTag: DeliveryTag,
75-
properties: BasicProperties,
76-
routingKey: RoutingKey,
77-
rawBody: Bytes,
78-
delivery: Delivery[A])(res: DeliveryResult)(implicit correlationId: CorrelationId): F[Unit] = {
79-
import DeliveryResult._
80-
81-
poisonedMessageHandler.interceptResult(delivery, messageId, rawBody)(res).flatMap {
82-
case Ack => ack(messageId, deliveryTag)
83-
case Reject => reject(messageId, deliveryTag)
84-
case Retry => retry(messageId, deliveryTag)
85-
case Republish(_, newHeaders) =>
86-
republish(messageId, deliveryTag, createPropertiesForRepublish(newHeaders, properties, routingKey), rawBody)
87-
}
88-
}
89-
90-
protected def ack(messageId: MessageId, deliveryTag: DeliveryTag)(implicit correlationId: CorrelationId): F[Unit] = {
91-
consumerLogger.debug(s"[$consumerName] ACK delivery $messageId, $deliveryTag") >>
92-
blocker
93-
.delay {
94-
if (!channel.isOpen) throw new IllegalStateException("Cannot ack delivery on closed channel")
95-
channel.basicAck(deliveryTag.value, false)
96-
resultAckMeter.mark()
97-
}
98-
.attempt
99-
.flatMap {
100-
case Right(()) => F.unit
101-
case Left(e) => consumerLogger.warn(e)(s"[$consumerName] Error while confirming the delivery $messageId")
102-
}
103-
}
104-
105-
protected def reject(messageId: MessageId, deliveryTag: DeliveryTag)(implicit correlationId: CorrelationId): F[Unit] = {
106-
consumerLogger.debug(s"[$consumerName] REJECT delivery $messageId, $deliveryTag") >>
107-
blocker
108-
.delay {
109-
if (!channel.isOpen) throw new IllegalStateException("Cannot reject delivery on closed channel")
110-
channel.basicReject(deliveryTag.value, false)
111-
resultRejectMeter.mark()
112-
}
113-
.attempt
114-
.flatMap {
115-
case Right(()) => F.unit
116-
case Left(e) => consumerLogger.warn(e)(s"[$consumerName] Error while rejecting the delivery $messageId")
117-
}
118-
}
119-
120-
protected def retry(messageId: MessageId, deliveryTag: DeliveryTag)(implicit correlationId: CorrelationId): F[Unit] = {
121-
consumerLogger.debug(s"[$consumerName] REJECT (with requeue) delivery $messageId, $deliveryTag") >>
122-
blocker
123-
.delay {
124-
if (!channel.isOpen) throw new IllegalStateException("Cannot retry delivery on closed channel")
125-
channel.basicReject(deliveryTag.value, true)
126-
resultRetryMeter.mark()
127-
}
128-
.attempt
129-
.flatMap {
130-
case Right(()) => F.unit
131-
case Left(e) => consumerLogger.warn(e)(s"[$consumerName] Error while rejecting (with requeue) the delivery $messageId")
132-
}
133-
}
134-
135-
protected def republish(messageId: MessageId, deliveryTag: DeliveryTag, properties: BasicProperties, rawBody: Bytes)(
136-
implicit correlationId: CorrelationId): F[Unit] = {
137-
republishStrategy
138-
.republish(blocker, channel, consumerName)(queueName, messageId, deliveryTag, properties, rawBody)
139-
.flatTap(_ => F.delay(resultRepublishMeter.mark()))
140-
}
141-
142-
protected def createPropertiesForRepublish(newHeaders: Map[String, AnyRef],
143-
properties: BasicProperties,
144-
routingKey: RoutingKey): BasicProperties = {
145-
// values in newHeaders will overwrite values in original headers
146-
// we must also ensure that UserID will be the same as current username (or nothing): https://www.rabbitmq.com/validated-user-id.html
147-
val originalUserId = Option(properties.getUserId).filter(_.nonEmpty)
148-
val h = originalUserId match {
149-
case Some(uid) => newHeaders + (RepublishOriginalRoutingKeyHeaderName -> routingKey.value) + (RepublishOriginalUserId -> uid)
150-
case None => newHeaders + (RepublishOriginalRoutingKeyHeaderName -> routingKey.value)
151-
}
152-
val headers = Option(properties.getHeaders).map(_.asScala ++ h).getOrElse(h)
153-
val newUserId = originalUserId match {
154-
case Some(_) => connectionInfo.username.orNull
155-
case None => null
156-
}
157-
properties.builder().headers(headers.asJava).userId(newUserId).build()
158-
}
159-
16063
def watchForTimeoutIfConfigured(processTimeout: FiniteDuration,
16164
timeoutAction: DeliveryResult,
16265
timeoutLogLevel: Level)(delivery: Delivery[A], messageId: MessageId, result: F[DeliveryResult])(
16366
customTimeoutAction: F[Unit],
16467
)(implicit correlationId: CorrelationId): F[DeliveryResult] = {
165-
val timeoutsMeter = consumerRootMonitor.meter("timeouts")
166-
16768
if (processTimeout != Duration.Zero) {
16869
Concurrent
16970
.timeout(result, processTimeout)
@@ -190,6 +91,4 @@ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A
19091
}
19192
} else result
19293
}
193-
194-
def withNewChannel(newChannel: ServerChannel): ConsumerBase[F, A] = copy(channel = newChannel)
19594
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package com.avast.clients.rabbitmq
2+
3+
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Timer}
4+
import cats.implicits.catsSyntaxApplicativeError
5+
import cats.syntax.flatMap._
6+
import com.avast.bytes.Bytes
7+
import com.avast.clients.rabbitmq.DefaultRabbitMQConsumer._
8+
import com.avast.clients.rabbitmq.api._
9+
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
10+
import com.avast.metrics.scalaapi.Monitor
11+
import com.rabbitmq.client.AMQP.BasicProperties
12+
13+
import scala.jdk.CollectionConverters._
14+
import scala.util._
15+
16+
// it's case-class to have `copy` method for free....
17+
final private[rabbitmq] case class ConsumerChannelOps[F[_]: ConcurrentEffect: Timer: ContextShift, A](
18+
private val consumerName: String,
19+
private val queueName: String,
20+
channel: ServerChannel,
21+
private val blocker: Blocker,
22+
republishStrategy: RepublishStrategy[F],
23+
poisonedMessageHandler: PoisonedMessageHandler[F, A],
24+
connectionInfo: RabbitMQConnectionInfo,
25+
private val consumerLogger: ImplicitContextLogger[F],
26+
private val consumerRootMonitor: Monitor) {
27+
28+
private val F: ConcurrentEffect[F] = ConcurrentEffect[F] // scalastyle:ignore
29+
30+
val resultsMonitor: Monitor = consumerRootMonitor.named("results")
31+
private val resultAckMeter = resultsMonitor.meter("ack")
32+
private val resultRejectMeter = resultsMonitor.meter("reject")
33+
private val resultRetryMeter = resultsMonitor.meter("retry")
34+
private val resultRepublishMeter = resultsMonitor.meter("republish")
35+
36+
def handleResult(messageId: MessageId,
37+
deliveryTag: DeliveryTag,
38+
properties: BasicProperties,
39+
routingKey: RoutingKey,
40+
rawBody: Bytes,
41+
delivery: Delivery[A])(res: DeliveryResult)(implicit correlationId: CorrelationId): F[Unit] = {
42+
import DeliveryResult._
43+
44+
poisonedMessageHandler.interceptResult(delivery, messageId, rawBody)(res).flatMap {
45+
case Ack => ack(messageId, deliveryTag)
46+
case Reject => reject(messageId, deliveryTag)
47+
case Retry => retry(messageId, deliveryTag)
48+
case Republish(_, newHeaders) =>
49+
republish(messageId, deliveryTag, createPropertiesForRepublish(newHeaders, properties, routingKey), rawBody)
50+
}
51+
}
52+
53+
protected def ack(messageId: MessageId, deliveryTag: DeliveryTag)(implicit correlationId: CorrelationId): F[Unit] = {
54+
consumerLogger.debug(s"[$consumerName] ACK delivery $messageId, $deliveryTag") >>
55+
blocker
56+
.delay {
57+
if (!channel.isOpen) throw new IllegalStateException("Cannot ack delivery on closed channel")
58+
channel.basicAck(deliveryTag.value, false)
59+
resultAckMeter.mark()
60+
}
61+
.attempt
62+
.flatMap {
63+
case Right(()) => F.unit
64+
case Left(e) => consumerLogger.warn(e)(s"[$consumerName] Error while confirming the delivery $messageId")
65+
}
66+
}
67+
68+
protected def reject(messageId: MessageId, deliveryTag: DeliveryTag)(implicit correlationId: CorrelationId): F[Unit] = {
69+
consumerLogger.debug(s"[$consumerName] REJECT delivery $messageId, $deliveryTag") >>
70+
blocker
71+
.delay {
72+
if (!channel.isOpen) throw new IllegalStateException("Cannot reject delivery on closed channel")
73+
channel.basicReject(deliveryTag.value, false)
74+
resultRejectMeter.mark()
75+
}
76+
.attempt
77+
.flatMap {
78+
case Right(()) => F.unit
79+
case Left(e) => consumerLogger.warn(e)(s"[$consumerName] Error while rejecting the delivery $messageId")
80+
}
81+
}
82+
83+
protected def retry(messageId: MessageId, deliveryTag: DeliveryTag)(implicit correlationId: CorrelationId): F[Unit] = {
84+
consumerLogger.debug(s"[$consumerName] REJECT (with requeue) delivery $messageId, $deliveryTag") >>
85+
blocker
86+
.delay {
87+
if (!channel.isOpen) throw new IllegalStateException("Cannot retry delivery on closed channel")
88+
channel.basicReject(deliveryTag.value, true)
89+
resultRetryMeter.mark()
90+
}
91+
.attempt
92+
.flatMap {
93+
case Right(()) => F.unit
94+
case Left(e) => consumerLogger.warn(e)(s"[$consumerName] Error while rejecting (with requeue) the delivery $messageId")
95+
}
96+
}
97+
98+
protected def republish(messageId: MessageId, deliveryTag: DeliveryTag, properties: BasicProperties, rawBody: Bytes)(
99+
implicit correlationId: CorrelationId): F[Unit] = {
100+
republishStrategy
101+
.republish(blocker, channel, consumerName)(queueName, messageId, deliveryTag, properties, rawBody)
102+
.flatTap(_ => F.delay(resultRepublishMeter.mark()))
103+
}
104+
105+
protected def createPropertiesForRepublish(newHeaders: Map[String, AnyRef],
106+
properties: BasicProperties,
107+
routingKey: RoutingKey): BasicProperties = {
108+
// values in newHeaders will overwrite values in original headers
109+
// we must also ensure that UserID will be the same as current username (or nothing): https://www.rabbitmq.com/validated-user-id.html
110+
val originalUserId = Option(properties.getUserId).filter(_.nonEmpty)
111+
val h = originalUserId match {
112+
case Some(uid) => newHeaders + (RepublishOriginalRoutingKeyHeaderName -> routingKey.value) + (RepublishOriginalUserId -> uid)
113+
case None => newHeaders + (RepublishOriginalRoutingKeyHeaderName -> routingKey.value)
114+
}
115+
val headers = Option(properties.getHeaders).map(_.asScala ++ h).getOrElse(h)
116+
val newUserId = originalUserId match {
117+
case Some(_) => connectionInfo.username.orNull
118+
case None => null
119+
}
120+
properties.builder().headers(headers.asJava).userId(newUserId).build()
121+
}
122+
}
123+
124+
class ConsumerChannelOpsFactory[F[_]: ConcurrentEffect: Timer: ContextShift, A: DeliveryConverter](
125+
consumerName: String,
126+
queueName: String,
127+
blocker: Blocker,
128+
republishStrategy: RepublishStrategy[F],
129+
poisonedMessageHandler: PoisonedMessageHandler[F, A],
130+
connectionInfo: RabbitMQConnectionInfo,
131+
consumerLogger: ImplicitContextLogger[F],
132+
consumerRootMonitor: Monitor,
133+
newChannel: Resource[F, ServerChannel]) {
134+
135+
val create: Resource[F, ConsumerChannelOps[F, A]] = {
136+
newChannel.map { channel =>
137+
new ConsumerChannelOps[F, A](consumerName,
138+
queueName,
139+
channel,
140+
blocker,
141+
republishStrategy,
142+
poisonedMessageHandler,
143+
connectionInfo,
144+
consumerLogger,
145+
consumerRootMonitor)
146+
}
147+
}
148+
}

core/src/main/scala/com/avast/clients/rabbitmq/ConsumerWithCallbackBase.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ import java.util.concurrent.atomic.AtomicInteger
1414
import scala.util.control.NonFatal
1515

1616
abstract class ConsumerWithCallbackBase[F[_]: ConcurrentEffect: CatsTimer, A: DeliveryConverter](base: ConsumerBase[F, A],
17+
channelOps: ConsumerChannelOps[F, A],
1718
failureAction: DeliveryResult,
1819
consumerListener: ConsumerListener[F])
19-
extends DefaultConsumer(base.channel) {
20+
extends DefaultConsumer(channelOps.channel) {
2021
import base._
22+
import channelOps._
2123

2224
protected val readMeter: Meter = consumerRootMonitor.meter("read")
2325

@@ -34,7 +36,7 @@ abstract class ConsumerWithCallbackBase[F[_]: ConcurrentEffect: CatsTimer, A: De
3436
override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit =
3537
consumerListener.onShutdown(this, channel, consumerName, consumerTag, sig)
3638

37-
protected def handleNewDelivery(d: DeliveryWithMetadata[A]): F[DeliveryResult]
39+
protected def handleNewDelivery(d: DeliveryWithMetadata[A]): F[Option[DeliveryResult]]
3840

3941
override final def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]): Unit = {
4042
val action = F.delay(processingCount.incrementAndGet()) >> {
@@ -87,13 +89,15 @@ abstract class ConsumerWithCallbackBase[F[_]: ConcurrentEffect: CatsTimer, A: De
8789

8890
handleNewDelivery(delivery)
8991
.flatMap {
90-
handleResult(messageId, deliveryTag, properties, routingKey, rawBody, delivery.delivery)(_)
92+
case Some(dr) => handleResult(messageId, deliveryTag, properties, routingKey, rawBody, delivery.delivery)(dr)
93+
case None => consumerLogger.trace(s"[$consumerName] Delivery result for $messageId ignored")
9194
}
9295
.flatTap(_ =>
9396
F.delay {
9497
val duration = taskDuration()
9598
consumerLogger.debug(s"[$consumerName] Delivery $messageId handling succeeded in $duration")
9699
processedTimer.update(duration)
100+
processingCount.decrementAndGet()
97101
})
98102
.recoverWith {
99103
case NonFatal(t) =>

0 commit comments

Comments
 (0)