-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] Non durable subscription backlog is wrong after topic unload #23239
Comments
Setting:
Fixes the problem. So the problem may be with the rewinding the NonDurableCursor, or a backlog calculation (if the cursor is set to the next, non-existing message, whats allowed). |
With this code, shouldn't the last message be re-delivered to the client after the subscription is created? So now I see two issues:
Have in mind that there may be implications of the incorrect backlog:
|
I've added a PR with the failing test. The summary is at the bottom: // It's OK to have a backlog, to re-consume a batch message
assertEquals(sub.get().getMsgBacklog(), 1);
// However the client should be able to receive the message
Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
consumer.acknowledge(msg);
// And the backlog should become empty
Awaitility.await().until(
() -> admin.topics().getStats(topic).getSubscriptions().get(subName)
.getMsgBacklog() == 0); |
@michalcukierman ConsumerImpl#messageReceived if (this.topicName.isPersistent() && isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
consumerName, startMessageId);
}
uncompressedPayload.release();
return;
} But the MsgBacklog is always not correct. For NonDurable subscription, we don't save the cursor info, I still don't know how to solve this problem. |
I think without understanding why the rewind/skip/batch logic was introduced, it's hard to introduce a solution. |
@summeriiii I was not able to provide a solution yesterday (I had false-positive tests on my local env, working with simple cases, failing with others). I've noticed that there were two changes in 2020:
That's I have tried to modify ServerCnx to not produce BatchMessageIdImpl when there is no batch index explicitly given: final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
: null; but unfortunately I got failing tests with: Another alternative would be to fix the backlog after the client ignored the message, but I think that the issue require some attention from a person who knows the codebase more than me. |
Search before asking
Read release policy
Version
3.3.1
3.0.5
Minimal reproduce step
What did you expect to see?
The cursor should be set to the end of the topic, backlog should be 0.
What did you see instead?
Backlog is not empty:
Before unloading
"msgBacklog" : 0,
"msgBacklogNoDelayed" : 0,
After unloading
"msgBacklog" : 1,
"msgBacklogNoDelayed" : 1,
Anything else?
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: