-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] Message ordering isn't retained in Key_Shared AUTO_SPLIT mode in a rolling restart type of test scenario #23307
Comments
@equanz Do you have a chance to validate whether the test results are valid? |
It's possible that this is a Pulsar client bug. I'm using The third experiment was with messageListeners to receive messages and where consumers are paused initially. The problem doesn't reproduce in this case. I added some jitter for simulating processing. https://github.com/lhotari/pulsar/blob/0e608c93c5b0d6a9cf76751a4a82726b5e1654b4/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java#L1922-L2109 It looks like using |
The test fails consistently when using This looks like a client-side bug instead of broker side issue with Key_Shared. |
I was testing this in branch-3.3 . It's possible that it reproduces the PIP-282 issue without |
Tested also with master branch (includes PIP-282), the problem reproduces with .messageListener unless |
I've created #23309 as a proposal to address the issue. |
Further investigation may be needed, but I think this is an issue related to redelivery. I added the following patches. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index b0717f1326..3ae52d71f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -2348,7 +2348,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
// Adding a new consumer.
@Cleanup
- Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+ ConsumerImpl<Integer> c1 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c1")
.subscriptionName(subscriptionName)
@@ -2358,7 +2358,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
.subscribe();
@Cleanup
- Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+ ConsumerImpl<Integer> c2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c2")
.subscriptionName(subscriptionName)
@@ -2367,7 +2367,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
.subscribe();
@Cleanup
- Consumer<Integer> c3 = pulsarClient.newConsumer(Schema.INT32)
+ ConsumerImpl<Integer> c3 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c3")
.subscriptionName(subscriptionName)
@@ -2417,8 +2417,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
c1.resume();
c3.resume();
Thread.sleep(pauseTime);
+ log.info("c1 incomingMessages: {}", c1.incomingMessages.toList().stream().map(Message::getMessageId).toList());
+ log.info("c3 incomingMessages: {}", c3.incomingMessages.toList().stream().map(Message::getMessageId).toList());
// reconnect c2
- c2 = pulsarClient.newConsumer(Schema.INT32)
+ c2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c2")
.subscriptionName(subscriptionName)
@@ -2428,7 +2430,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
// close and reconnect c1
c1.close();
Thread.sleep(pauseTime);
- c1 = pulsarClient.newConsumer(Schema.INT32)
+ c1 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c1")
.subscriptionName(subscriptionName)
@@ -2438,7 +2440,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
// close and reconnect c3
c3.close();
Thread.sleep(pauseTime);
- c3 = pulsarClient.newConsumer(Schema.INT32)
+ c3 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c3")
.subscriptionName(subscriptionName)
@@ -2450,6 +2452,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
+ dispatcher.getNumberOfMessagesInReplay());
logTopicStats(topic);
+ log.info("c2 incomingMessages: {}", c2.incomingMessages.toList().stream().map(Message::getMessageId).toList());
// produce more messages
for (int i = 1000; i < 2000; i++) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 9748a42f0c..a5d5cd7772 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -87,7 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final ExecutorService externalPinnedExecutor;
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
- final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
+ public final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected final int maxReceiverQueueSize; And I ran the test, then failed as follows.
When the c2 is reconnected, the dispatcher sends replay messages to the c2. After the c3 is disconnected, (I didn't address redelivery messages in the PIP-282 because I think we can't fully address redelivery messages' ordering. For example, the dispatcher can't control the client-side redelivery operation such as unack I am asking some questions below to see if this issue is resolved in PIP-379. |
Thanks for the careful investigation @equanz. Very helpful!
There's a past related issue #12885 with PR #12890 in this area.
It'll try to take this case into account when working on PIP-379 design and implementation. |
This issue is fixed in PIP-379 implementation, #23352 |
Search before asking
Read release policy
Version
Warning
It's possible that the test setup contains bugs. More work is needed to validate that the test case is valid.
Note
It's possible that multiple bugs are involved. The comments describe some observations which might be client side bugs.
This applies to both master branch (which includes PIP-282 changes) as well as in branch-3.3
Minimal reproduce step
the test case has invocationCount=50 to repeat it 50 times. It usually fails about 25% of the test runs.
This is the test case:
org.apache.pulsar.client.api.KeySharedSubscriptionTest#testOrderingAfterReconnects
The test code depends on some new test utility methods.
reproducing based on master branch (with PIP-282 changes)
based on branch-3.3 (without PIP-282)
What did you expect to see?
Key_Shared subscription in AUTO_SPLIT mode should retain ordering of messages by message key.
This should happen by holding back delivery of message keys that are currently handled by a consumer that is no longer the current "owner" of the hash range where the message key belongs to at the time of sending.
What did you see instead?
Anything else?
The initial goal of the test scenario was to simulate a rolling restart of consumers. However this isn't strictly followed in the test case. The test case was modified until test failures started appearing.
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: