Skip to content

Commit 2330863

Browse files
authored
Add redelivery backoff settings
This adds the ability to configure the redelivery backoff policy for messages that are negatively acknowledged or that are redelivered due to acknowledgement timeout.
1 parent 2bcbb83 commit 2330863

File tree

10 files changed

+204
-0
lines changed

10 files changed

+204
-0
lines changed

pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ private void configureConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
173173
consumerBuilder.negativeAckRedeliveryDelay(this.consumerSpec.getNegativeAckRedeliveryDelay().toMillis(),
174174
TimeUnit.MILLISECONDS);
175175
}
176+
if (this.consumerSpec.getAckTimeoutRedeliveryBackoff() != null) {
177+
consumerBuilder.ackTimeoutRedeliveryBackoff(this.consumerSpec.getAckTimeoutRedeliveryBackoff());
178+
}
179+
if (this.consumerSpec.getNegativeAckRedeliveryBackoff() != null) {
180+
consumerBuilder.negativeAckRedeliveryBackoff(this.consumerSpec.getNegativeAckRedeliveryBackoff());
181+
}
176182
if (this.consumerSpec.getDeadLetterPolicy() != null) {
177183
consumerBuilder.deadLetterPolicy(this.consumerSpec.getDeadLetterPolicy());
178184
}

pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.pulsar.client.api.PulsarClient;
4242
import org.apache.pulsar.client.api.PulsarClientException;
4343
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
44+
import org.apache.pulsar.client.api.RedeliveryBackoff;
4445
import org.apache.pulsar.client.api.RegexSubscriptionMode;
4546
import org.apache.pulsar.client.api.Schema;
4647
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -91,6 +92,8 @@ void consumerProperties() throws Exception {
9192
.maxRedeliverCount(1)
9293
.build();
9394

95+
TestRedeliveryBackoff testRedeliveryBackoff = new TestRedeliveryBackoff();
96+
9497
ConsumerConfigurationData<String> expectedConsumerConf = new ConsumerConfigurationData<>();
9598
expectedConsumerConf.setTopicNames(new HashSet<>(Arrays.asList("my-topic", "my-rlt")));
9699
expectedConsumerConf.setSubscriptionName("my-sub");
@@ -116,6 +119,8 @@ void consumerProperties() throws Exception {
116119
expectedConsumerConf.setTickDurationMillis(TimeUnit.SECONDS.toMillis(4));
117120
expectedConsumerConf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(5));
118121
expectedConsumerConf.setNegativeAckRedeliveryDelayMicros(TimeUnit.SECONDS.toMicros(6));
122+
expectedConsumerConf.setNegativeAckRedeliveryBackoff(testRedeliveryBackoff);
123+
expectedConsumerConf.setAckTimeoutRedeliveryBackoff(testRedeliveryBackoff);
119124
expectedConsumerConf.setDeadLetterPolicy(deadLetterPolicy);
120125
expectedConsumerConf.setRetryEnable(true);
121126
expectedConsumerConf.setReceiverQueueSize(7);
@@ -152,6 +157,8 @@ void consumerProperties() throws Exception {
152157
.ackTimeoutTickTime(Duration.ofSeconds(4))
153158
.acknowledgementsGroupTime(Duration.ofSeconds(5))
154159
.negativeAckRedeliveryDelay(Duration.ofSeconds(6))
160+
.negativeAckRedeliveryBackoff(testRedeliveryBackoff)
161+
.ackTimeoutRedeliveryBackoff(testRedeliveryBackoff)
155162
.deadLetterPolicy(deadLetterPolicy)
156163
.retryLetterTopicEnable(true)
157164
.receiverQueueSize(7)
@@ -429,4 +436,13 @@ void closeConsumerExceptionIsIgnored() throws Exception {
429436
verify(consumer).closeAsync();
430437
}
431438

439+
static class TestRedeliveryBackoff implements RedeliveryBackoff {
440+
441+
@Override
442+
public long next(int redeliveryCount) {
443+
return redeliveryCount * 2L;
444+
}
445+
446+
}
447+
432448
}

pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.pulsar.client.api.CryptoKeyReader;
3232
import org.apache.pulsar.client.api.DeadLetterPolicy;
3333
import org.apache.pulsar.client.api.KeySharedPolicy;
34+
import org.apache.pulsar.client.api.RedeliveryBackoff;
3435
import org.apache.pulsar.client.api.RegexSubscriptionMode;
3536
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3637
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -86,6 +87,10 @@ public class ImmutableReactiveMessageConsumerSpec implements ReactiveMessageCons
8687

8788
private final Duration negativeAckRedeliveryDelay;
8889

90+
private final RedeliveryBackoff negativeAckRedeliveryBackoff;
91+
92+
private final RedeliveryBackoff ackTimeoutRedeliveryBackoff;
93+
8994
private final DeadLetterPolicy deadLetterPolicy;
9095

9196
private final Boolean retryLetterTopicEnable;
@@ -160,6 +165,8 @@ public ImmutableReactiveMessageConsumerSpec(ReactiveMessageConsumerSpec consumer
160165
this.acknowledgeAsynchronously = consumerSpec.getAcknowledgeAsynchronously();
161166
this.acknowledgeScheduler = consumerSpec.getAcknowledgeScheduler();
162167
this.negativeAckRedeliveryDelay = consumerSpec.getNegativeAckRedeliveryDelay();
168+
this.negativeAckRedeliveryBackoff = consumerSpec.getNegativeAckRedeliveryBackoff();
169+
this.ackTimeoutRedeliveryBackoff = consumerSpec.getAckTimeoutRedeliveryBackoff();
163170

164171
this.deadLetterPolicy = consumerSpec.getDeadLetterPolicy();
165172

@@ -192,6 +199,7 @@ public ImmutableReactiveMessageConsumerSpec(List<String> topicNames, Pattern top
192199
Map<String, String> properties, Integer priorityLevel, Boolean readCompacted, Boolean batchIndexAckEnabled,
193200
Duration ackTimeout, Duration ackTimeoutTickTime, Duration acknowledgementsGroupTime,
194201
Boolean acknowledgeAsynchronously, Scheduler acknowledgeScheduler, Duration negativeAckRedeliveryDelay,
202+
RedeliveryBackoff negativeAckRedeliveryBackoff, RedeliveryBackoff ackTimeoutRedeliveryBackoff,
195203
DeadLetterPolicy deadLetterPolicy, Boolean retryLetterTopicEnable, Integer receiverQueueSize,
196204
Integer maxTotalReceiverQueueSizeAcrossPartitions, Boolean autoUpdatePartitions,
197205
Duration autoUpdatePartitionsInterval, CryptoKeyReader cryptoKeyReader,
@@ -219,6 +227,8 @@ public ImmutableReactiveMessageConsumerSpec(List<String> topicNames, Pattern top
219227
this.acknowledgeAsynchronously = acknowledgeAsynchronously;
220228
this.acknowledgeScheduler = acknowledgeScheduler;
221229
this.negativeAckRedeliveryDelay = negativeAckRedeliveryDelay;
230+
this.negativeAckRedeliveryBackoff = negativeAckRedeliveryBackoff;
231+
this.ackTimeoutRedeliveryBackoff = ackTimeoutRedeliveryBackoff;
222232
this.deadLetterPolicy = deadLetterPolicy;
223233
this.retryLetterTopicEnable = retryLetterTopicEnable;
224234
this.receiverQueueSize = receiverQueueSize;
@@ -342,6 +352,16 @@ public Duration getNegativeAckRedeliveryDelay() {
342352
return this.negativeAckRedeliveryDelay;
343353
}
344354

355+
@Override
356+
public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
357+
return this.negativeAckRedeliveryBackoff;
358+
}
359+
360+
@Override
361+
public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
362+
return this.ackTimeoutRedeliveryBackoff;
363+
}
364+
345365
@Override
346366
public DeadLetterPolicy getDeadLetterPolicy() {
347367
return this.deadLetterPolicy;

pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.pulsar.client.api.CryptoKeyReader;
3131
import org.apache.pulsar.client.api.DeadLetterPolicy;
3232
import org.apache.pulsar.client.api.KeySharedPolicy;
33+
import org.apache.pulsar.client.api.RedeliveryBackoff;
3334
import org.apache.pulsar.client.api.RegexSubscriptionMode;
3435
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3536
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -85,6 +86,10 @@ public class MutableReactiveMessageConsumerSpec implements ReactiveMessageConsum
8586

8687
private Duration negativeAckRedeliveryDelay;
8788

89+
private RedeliveryBackoff negativeAckRedeliveryBackoff;
90+
91+
private RedeliveryBackoff ackTimeoutRedeliveryBackoff;
92+
8893
private DeadLetterPolicy deadLetterPolicy;
8994

9095
private Boolean retryLetterTopicEnable;
@@ -165,6 +170,8 @@ public MutableReactiveMessageConsumerSpec(ReactiveMessageConsumerSpec consumerSp
165170
this.acknowledgeAsynchronously = consumerSpec.getAcknowledgeAsynchronously();
166171
this.acknowledgeScheduler = consumerSpec.getAcknowledgeScheduler();
167172
this.negativeAckRedeliveryDelay = consumerSpec.getNegativeAckRedeliveryDelay();
173+
this.negativeAckRedeliveryBackoff = consumerSpec.getNegativeAckRedeliveryBackoff();
174+
this.ackTimeoutRedeliveryBackoff = consumerSpec.getAckTimeoutRedeliveryBackoff();
168175

169176
this.deadLetterPolicy = consumerSpec.getDeadLetterPolicy();
170177

@@ -482,6 +489,35 @@ public void setNegativeAckRedeliveryDelay(Duration negativeAckRedeliveryDelay) {
482489
this.negativeAckRedeliveryDelay = negativeAckRedeliveryDelay;
483490
}
484491

492+
@Override
493+
public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
494+
return this.ackTimeoutRedeliveryBackoff;
495+
}
496+
497+
/**
498+
* Sets the redelivery backoff policy for messages that are redelivered due to
499+
* acknowledgement timeout.
500+
* @param ackTimeoutRedeliveryBackoff the backoff policy to use for messages that
501+
* exceed their ack timeout
502+
*/
503+
public void setAckTimeoutRedeliveryBackoff(RedeliveryBackoff ackTimeoutRedeliveryBackoff) {
504+
this.ackTimeoutRedeliveryBackoff = ackTimeoutRedeliveryBackoff;
505+
}
506+
507+
@Override
508+
public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
509+
return this.negativeAckRedeliveryBackoff;
510+
}
511+
512+
/**
513+
* Sets the redelivery backoff policy for messages that are negatively acknowledged.
514+
* @param negativeAckRedeliveryBackoff the backoff policy to use for negatively
515+
* acknowledged messages
516+
*/
517+
public void setNegativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff) {
518+
this.negativeAckRedeliveryBackoff = negativeAckRedeliveryBackoff;
519+
}
520+
485521
@Override
486522
public DeadLetterPolicy getDeadLetterPolicy() {
487523
return this.deadLetterPolicy;
@@ -705,6 +741,12 @@ public void applySpec(ReactiveMessageConsumerSpec consumerSpec) {
705741
if (consumerSpec.getNegativeAckRedeliveryDelay() != null) {
706742
setNegativeAckRedeliveryDelay(consumerSpec.getNegativeAckRedeliveryDelay());
707743
}
744+
if (consumerSpec.getNegativeAckRedeliveryBackoff() != null) {
745+
setNegativeAckRedeliveryBackoff(consumerSpec.getNegativeAckRedeliveryBackoff());
746+
}
747+
if (consumerSpec.getAckTimeoutRedeliveryBackoff() != null) {
748+
setAckTimeoutRedeliveryBackoff(consumerSpec.getAckTimeoutRedeliveryBackoff());
749+
}
708750
if (consumerSpec.getDeadLetterPolicy() != null) {
709751
setDeadLetterPolicy(consumerSpec.getDeadLetterPolicy());
710752
}

pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.pulsar.client.api.DeadLetterPolicy;
3333
import org.apache.pulsar.client.api.KeySharedPolicy;
3434
import org.apache.pulsar.client.api.Message;
35+
import org.apache.pulsar.client.api.RedeliveryBackoff;
3536
import org.apache.pulsar.client.api.RegexSubscriptionMode;
3637
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3738
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -527,6 +528,31 @@ default ReactiveMessageConsumerBuilder<T> negativeAckRedeliveryDelay(Duration ne
527528
return this;
528529
}
529530

531+
/**
532+
* Sets the redelivery backoff policy for messages that are negatively acknowledged.
533+
* @param negativeAckRedeliveryBackoff the backoff policy to use for negatively
534+
* acknowledged messages
535+
* @return the consumer builder instance
536+
*/
537+
default ReactiveMessageConsumerBuilder<T> negativeAckRedeliveryBackoff(
538+
RedeliveryBackoff negativeAckRedeliveryBackoff) {
539+
getMutableSpec().setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
540+
return this;
541+
}
542+
543+
/**
544+
* Sets the redelivery backoff policy for messages that are redelivered due to
545+
* acknowledgement timeout.
546+
* @param ackTimeoutRedeliveryBackoff the backoff policy to use for messages that
547+
* exceed their ack timeout
548+
* @return the consumer builder instance
549+
*/
550+
default ReactiveMessageConsumerBuilder<T> ackTimeoutRedeliveryBackoff(
551+
RedeliveryBackoff ackTimeoutRedeliveryBackoff) {
552+
getMutableSpec().setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
553+
return this;
554+
}
555+
530556
/**
531557
* Sets a dead letter policy for the consumer.
532558
*

pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.pulsar.client.api.CryptoKeyReader;
3030
import org.apache.pulsar.client.api.DeadLetterPolicy;
3131
import org.apache.pulsar.client.api.KeySharedPolicy;
32+
import org.apache.pulsar.client.api.RedeliveryBackoff;
3233
import org.apache.pulsar.client.api.RegexSubscriptionMode;
3334
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3435
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -198,6 +199,22 @@ public interface ReactiveMessageConsumerSpec {
198199
*/
199200
Duration getNegativeAckRedeliveryDelay();
200201

202+
/**
203+
* Get the negative ack redelivery backoff policy for messages that are negatively
204+
* acknowledged.
205+
* @return redeliveryBackoff
206+
* @see ConsumerBuilder#negativeAckRedeliveryBackoff
207+
*/
208+
RedeliveryBackoff getNegativeAckRedeliveryBackoff();
209+
210+
/**
211+
* Get the redelivery backoff policy for messages that are redelivered due to
212+
* acknowledgement timeout.
213+
* @return redeliveryBackoff
214+
* @see ConsumerBuilder#ackTimeoutRedeliveryBackoff
215+
*/
216+
RedeliveryBackoff getAckTimeoutRedeliveryBackoff();
217+
201218
/**
202219
* Gets the dead letter policy for the consumer.
203220
* @return the dead letter policy

pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilderTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.pulsar.client.api.DeadLetterPolicy;
3131
import org.apache.pulsar.client.api.EncryptionKeyInfo;
3232
import org.apache.pulsar.client.api.KeySharedPolicy;
33+
import org.apache.pulsar.client.api.RedeliveryBackoff;
3334
import org.apache.pulsar.client.api.RegexSubscriptionMode;
3435
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3536
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -55,6 +56,13 @@ class ReactiveMessageConsumerBuilderTests {
5556

5657
private static final Scheduler scheduler = Schedulers.newSingle("my-sched");
5758

59+
private static final RedeliveryBackoff redeliverBackoff = new RedeliveryBackoff() {
60+
@Override
61+
public long next(int redeliveryCount) {
62+
return redeliveryCount * 2L;
63+
}
64+
};
65+
5866
@Test
5967
void emptyBuilder() {
6068
MutableReactiveMessageConsumerSpec spec = new TestReactiveMessageConsumerBuilder().getMutableSpec();
@@ -154,6 +162,8 @@ private void assertConsumerSpecWithAllValues(ReactiveMessageConsumerSpec spec) {
154162
assertThat(spec.getAcknowledgeAsynchronously()).isTrue();
155163
assertThat(spec.getAcknowledgeScheduler()).isSameAs(scheduler);
156164
assertThat(spec.getNegativeAckRedeliveryDelay()).hasSeconds(6);
165+
assertThat(spec.getAckTimeoutRedeliveryBackoff()).isEqualTo(redeliverBackoff);
166+
assertThat(spec.getNegativeAckRedeliveryBackoff()).isEqualTo(redeliverBackoff);
157167
assertThat(spec.getDeadLetterPolicy()).isSameAs(deadLetterPolicy);
158168
assertThat(spec.getRetryLetterTopicEnable()).isTrue();
159169
assertThat(spec.getReceiverQueueSize()).isEqualTo(7);
@@ -194,6 +204,8 @@ private ReactiveMessageConsumerBuilder<String> createConsumerBuilder() {
194204
.acknowledgeAsynchronously(true)
195205
.acknowledgeScheduler(scheduler)
196206
.negativeAckRedeliveryDelay(Duration.ofSeconds(6))
207+
.negativeAckRedeliveryBackoff(redeliverBackoff)
208+
.ackTimeoutRedeliveryBackoff(redeliverBackoff)
197209
.deadLetterPolicy(deadLetterPolicy)
198210
.retryLetterTopicEnable(true)
199211
.receiverQueueSize(7)

pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.pulsar.client.api.CryptoKeyReader;
3232
import org.apache.pulsar.client.api.DeadLetterPolicy;
3333
import org.apache.pulsar.client.api.KeySharedPolicy;
34+
import org.apache.pulsar.client.api.RedeliveryBackoff;
3435
import org.apache.pulsar.client.api.RegexSubscriptionMode;
3536
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3637
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -64,6 +65,8 @@ abstract class ImmutableReactiveMessageConsumerSpecMixin {
6465
@JsonProperty("acknowledgeAsynchronously") Boolean acknowledgeAsynchronously,
6566
@JsonProperty("acknowledgeScheduler") Scheduler acknowledgeScheduler,
6667
@JsonProperty("negativeAckRedeliveryDelay") Duration negativeAckRedeliveryDelay,
68+
@JsonProperty("negativeAckRedeliveryBackoff") RedeliveryBackoff negativeAckRedeliveryBackoff,
69+
@JsonProperty("ackTimeoutRedeliveryBackoff") RedeliveryBackoff ackTimeoutRedeliveryBackoff,
6770
@JsonProperty("deadLetterPolicy") DeadLetterPolicy deadLetterPolicy,
6871
@JsonProperty("retryLetterTopicEnable") Boolean retryLetterTopicEnable,
6972
@JsonProperty("receiverQueueSize") Integer receiverQueueSize,

pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.pulsar.client.api.KeySharedPolicy;
3838
import org.apache.pulsar.client.api.MessageRouter;
3939
import org.apache.pulsar.client.api.Range;
40+
import org.apache.pulsar.client.api.RedeliveryBackoff;
4041
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
4142
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec;
4243
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
@@ -63,6 +64,8 @@ public PulsarReactiveClientModule() {
6364
addDeserializer(Scheduler.class, new SchedulerDeserializer());
6465
addSerializer(Scheduler.class, new SchedulerSerializer());
6566
addDeserializer(DeadLetterPolicy.class, new DeadLetterPolicyDeserializer());
67+
addDeserializer(RedeliveryBackoff.class, new ClassDeserializer<>());
68+
addSerializer(RedeliveryBackoff.class, new ClassSerializer<>());
6669
addDeserializer(CryptoKeyReader.class, new ClassDeserializer<>());
6770
addSerializer(CryptoKeyReader.class, new ClassSerializer<>());
6871
addDeserializer(Range.class, new RangeDeserializer());

0 commit comments

Comments
 (0)