Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Message ordering isn't retained in Key_Shared AUTO_SPLIT mode in a rolling restart type of test scenario #23307

Closed
3 tasks done
lhotari opened this issue Sep 13, 2024 · 9 comments · Fixed by #23352
Closed
3 tasks done
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented Sep 13, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

Warning

It's possible that the test setup contains bugs. More work is needed to validate that the test case is valid.

Note

It's possible that multiple bugs are involved. The comments describe some observations which might be client side bugs.

This applies to both master branch (which includes PIP-282 changes) as well as in branch-3.3

Minimal reproduce step

the test case has invocationCount=50 to repeat it 50 times. It usually fails about 25% of the test runs.

This is the test case:
org.apache.pulsar.client.api.KeySharedSubscriptionTest#testOrderingAfterReconnects
The test code depends on some new test utility methods.

reproducing based on master branch (with PIP-282 changes)

git clone -b lh-key_shared-testing-2024-09-13-220512 --depth=2 --single-branch https://github.com/lhotari/pulsar
cd pulsar
mvn -Pcore-modules,-main -T 1C clean install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true -DnarPluginPhase=none
mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl pulsar-broker "-Dtest=org.apache.pulsar.client.api.KeySharedSubscriptionTest#testOrderingAfterReconnects"

based on branch-3.3 (without PIP-282)

git clone -b lh-key_shared-testing-branch-3.3-2024-09-13-220747 --depth=2 --single-branch https://github.com/lhotari/pulsar
cd pulsar
mvn -Pcore-modules,-main -T 1C clean install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true -DnarPluginPhase=none
mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl pulsar-broker "-Dtest=org.apache.pulsar.client.api.KeySharedSubscriptionTest#testOrderingAfterReconnects"

What did you expect to see?

Key_Shared subscription in AUTO_SPLIT mode should retain ordering of messages by message key.
This should happen by holding back delivery of message keys that are currently handled by a consumer that is no longer the current "owner" of the hash range where the message key belongs to at the time of sending.

What did you see instead?

  • message ordering by key isn't preserved in message processing
  • sometimes message processing gets blocked and doesn't proceed
  • there are duplicate messages in cases where it isn't expected

Anything else?

The initial goal of the test scenario was to simulate a rolling restart of consumers. However this isn't strictly followed in the test case. The test case was modified until test failures started appearing.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari lhotari added the type/bug The PR fixed a bug or issue reported a bug label Sep 13, 2024
@lhotari lhotari self-assigned this Sep 13, 2024
@lhotari
Copy link
Member Author

lhotari commented Sep 13, 2024

@equanz Do you have a chance to validate whether the test results are valid?

@lhotari
Copy link
Member Author

lhotari commented Sep 13, 2024

It's possible that this is a Pulsar client bug. I'm using .receiveAsync in the test case.
I also tested with .receive and threads to consume: https://github.com/lhotari/pulsar/blob/53188f42babd14c8120b7244b83066ec22b8222a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java#L2074-L2108
The results were similar.

The third experiment was with messageListeners to receive messages and where consumers are paused initially. The problem doesn't reproduce in this case. I added some jitter for simulating processing. https://github.com/lhotari/pulsar/blob/0e608c93c5b0d6a9cf76751a4a82726b5e1654b4/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java#L1922-L2109

It looks like using .receive/.receiveAsync might be messing up ordering.

@lhotari
Copy link
Member Author

lhotari commented Sep 13, 2024

The test fails consistently when using .messageListener without .startPaused(true). In that case, there's 7 to 10 out-of-order messages in each test run. With these changes: lhotari@ca41a88f

This looks like a client-side bug instead of broker side issue with Key_Shared.

@lhotari
Copy link
Member Author

lhotari commented Sep 13, 2024

The test fails consistently when using .messageListener without .startPaused(true). In that case, there's 7 to 10 out-of-order messages in each test run. With these changes: lhotari@ca41a88f

This looks like a client-side bug instead of broker side issue with Key_Shared.

I was testing this in branch-3.3 . It's possible that it reproduces the PIP-282 issue without .startPaused(true). I'll check.

@lhotari
Copy link
Member Author

lhotari commented Sep 13, 2024

I was testing this in branch-3.3 . It's possible that it reproduces the PIP-282 issue without .startPaused(true). I'll check.

Tested also with master branch (includes PIP-282), the problem reproduces with .messageListener unless .startPaused(true) is used for c2 in the test (commit lhotari@1ff8c7ca).

@lhotari
Copy link
Member Author

lhotari commented Sep 14, 2024

I've created #23309 as a proposal to address the issue.

@equanz
Copy link
Contributor

equanz commented Sep 17, 2024

Further investigation may be needed, but I think this is an issue related to redelivery.

I added the following patches.

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index b0717f1326..3ae52d71f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -2348,7 +2348,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {

         // Adding a new consumer.
         @Cleanup
-        Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+        ConsumerImpl<Integer> c1 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .consumerName("c1")
                 .subscriptionName(subscriptionName)
@@ -2358,7 +2358,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                 .subscribe();

         @Cleanup
-        Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+        ConsumerImpl<Integer> c2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .consumerName("c2")
                 .subscriptionName(subscriptionName)
@@ -2367,7 +2367,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                 .subscribe();

         @Cleanup
-        Consumer<Integer> c3 = pulsarClient.newConsumer(Schema.INT32)
+        ConsumerImpl<Integer> c3 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .consumerName("c3")
                 .subscriptionName(subscriptionName)
@@ -2417,8 +2417,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         c1.resume();
         c3.resume();
         Thread.sleep(pauseTime);
+        log.info("c1 incomingMessages: {}", c1.incomingMessages.toList().stream().map(Message::getMessageId).toList());
+        log.info("c3 incomingMessages: {}", c3.incomingMessages.toList().stream().map(Message::getMessageId).toList());
         // reconnect c2
-        c2 = pulsarClient.newConsumer(Schema.INT32)
+        c2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .consumerName("c2")
                 .subscriptionName(subscriptionName)
@@ -2428,7 +2430,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         // close and reconnect c1
         c1.close();
         Thread.sleep(pauseTime);
-        c1 = pulsarClient.newConsumer(Schema.INT32)
+        c1 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .consumerName("c1")
                 .subscriptionName(subscriptionName)
@@ -2438,7 +2440,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         // close and reconnect c3
         c3.close();
         Thread.sleep(pauseTime);
-        c3 = pulsarClient.newConsumer(Schema.INT32)
+        c3 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .consumerName("c3")
                 .subscriptionName(subscriptionName)
@@ -2450,6 +2452,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                 + dispatcher.getNumberOfMessagesInReplay());

         logTopicStats(topic);
+        log.info("c2 incomingMessages: {}", c2.incomingMessages.toList().stream().map(Message::getMessageId).toList());

         // produce more messages
         for (int i = 1000; i < 2000; i++) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 9748a42f0c..a5d5cd7772 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -87,7 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final ExecutorService externalPinnedExecutor;
     protected final ExecutorService internalPinnedExecutor;
     protected UnAckedMessageTracker unAckedMessageTracker;
-    final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
+    public final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
     protected final int maxReceiverQueueSize;

And I ran the test, then failed as follows.

2024-09-17T18:15:31,906 - INFO  - [TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c1 incomingMessages: [8:5:-1, 8:6:-1, 8:7:-1, 8:11:-1, 8:13:-1, 8:17:-1, 8:500:-1, 8:511:-1, 8:549:-1, 8:559:-1]
2024-09-17T18:15:31,906 - INFO  - [TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c3 incomingMessages: [8:0:-1, 8:1:-1, 8:2:-1, 8:3:-1, 8:4:-1, 8:8:-1, 8:9:-1, 8:10:-1, 8:12:-1, 8:14:-1]
...
2024-09-17T18:15:32,186 - INFO  - [TestNG-method=testOrderingAfterReconnects-1:MockedPulsarServiceBaseTest] - [testOrderingAfterReconnects-40ecbc0c-7f8f-4973-9513-c3bcadd0fb5b] stats: {
...
      "consumersAfterMarkDeletePosition" : {
        "consumerName=c2, consumerId=11, address=/127.0.0.1:63188" : "8:500",
        "consumerName=c1, consumerId=12, address=/127.0.0.1:63188" : "8:500",
        "consumerName=c3, consumerId=13, address=/127.0.0.1:63188" : "8:500"
      },
      "lastSentPosition" : "8:500",
      "individuallySentPositions" : "[(8:510..8:511],(8:548..8:549],(8:558..8:559]]",
...
2024-09-17T18:15:32,192 - INFO  - [TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c2 incomingMessages: [8:15:-1, 8:16:-1, 8:18:-1, 8:19:-1, 8:20:-1, 8:21:-1, 8:22:-1, 8:23:-1, 8:24:-1, 8:25:-1]
2024-09-17T18:15:33,860 - ERROR - [broker-test-util-executor-2-1:KeySharedSubscriptionTest] - key: 397 value: 2 prev: 8:21/c2 current: 8:2/c2

When the c2 is reconnected, the dispatcher sends replay messages to the c2. After the c3 is disconnected, 8:2 will be resent to the c2.

(I didn't address redelivery messages in the PIP-282 because I think we can't fully address redelivery messages' ordering. For example, the dispatcher can't control the client-side redelivery operation such as unack x:101 before x:100. cf. #21953 (comment) )

I am asking some questions below to see if this issue is resolved in PIP-379.
#23309 (review)

@lhotari
Copy link
Member Author

lhotari commented Sep 19, 2024

Thanks for the careful investigation @equanz. Very helpful!

When the c2 is reconnected, the dispatcher sends replay messages to the c2. After the c3 is disconnected, 8:2 will be resent to the c2.

(I didn't address redelivery messages in the PIP-282 because I think we can't fully address redelivery messages' ordering. For example, the dispatcher can't control the client-side redelivery operation such as unack x:101 before x:100. cf. #21953 (comment) )

There's a past related issue #12885 with PR #12890 in this area.

I am asking some questions below to see if this issue is resolved in PIP-379. #23309 (review)

It'll try to take this case into account when working on PIP-379 design and implementation.

@lhotari
Copy link
Member Author

lhotari commented Oct 3, 2024

This issue is fixed in PIP-379 implementation, #23352

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Development

Successfully merging a pull request may close this issue.

2 participants