-
Notifications
You must be signed in to change notification settings - Fork 903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix race conditions of LedgerHandle in client #4171
base: master
Are you sure you want to change the base?
Conversation
It looks like all calls happen under the same object monitor lock (synchronized for PendingAddOp), therefore it seems that concurrent calls are already prevented. @graysonzeng Could you please check if this is the case? |
At first time I was thinking the same doubt as you, it didn't seem like it should be happening. But in fact, these two threads can call two different |
Thanks for explaining that @graysonzeng. Makes sense. Just wondering if the logic really works correctly without making the sendAddSuccessCallbacks method synchronized. For example, on line 1831, the call to |
@graysonzeng Another thread safety problem is the bookkeeper/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Lines 1895 to 1908 in 13e7efa
should we make |
When I first repaired it, I tried to use synchronized on sendAddSuccessCallbacks, and a deadlock occurred. "BookKeeperClientWorker-OrderedExecutor-8-0":
at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1811)
- waiting to lock <0x00000000bfae1120> (a org.apache.bookkeeper.client.LedgerHandle)
at org.apache.bookkeeper.client.PendingAddOp.unsetSuccessAndSendWriteRequest(PendingAddOp.java:204)
- locked <0x00000000c3946f68> (a org.apache.bookkeeper.client.PendingAddOp)
at org.apache.bookkeeper.client.LedgerHandle.unsetSuccessAndSendWriteRequest(LedgerHandle.java:2006)
at org.apache.bookkeeper.client.LedgerHandle.lambda$ensembleChangeLoop$10(LedgerHandle.java:1997)
at org.apache.bookkeeper.client.LedgerHandle$$Lambda$1586/0x00007f8e4c95ac58.accept(Unknown Source)
at java.util.concurrent.CompletableFuture.uniWhenComplete(java.base@17.0.8/CompletableFuture.java:863)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(java.base@17.0.8/CompletableFuture.java:841)
at java.util.concurrent.CompletableFuture$Completion.run(java.base@17.0.8/CompletableFuture.java:482)
at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137)
at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)
"pulsar-io-11-12":
at org.apache.bookkeeper.client.PendingAddOp.submitCallback(PendingAddOp.java:394)
- waiting to lock <0x00000000c3946f68> (a org.apache.bookkeeper.client.PendingAddOp)
at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1835)
- locked <0x00000000bfae1120> (a org.apache.bookkeeper.client.LedgerHandle)
at org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:390)
at org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:307)
- locked <0x00000000c3945640> (a org.apache.bookkeeper.client.PendingAddOp)
at org.apache.bookkeeper.proto.BookieClientImpl.completeAdd(BookieClientImpl.java:284)
at org.apache.bookkeeper.proto.BookieClientImpl.access$000(BookieClientImpl.java:78)
at org.apache.bookkeeper.proto.BookieClientImpl$ChannelReadyForAddEntryCallback.operationComplete(BookieClientImpl.java:396)
at org.apache.bookkeeper.proto.BookieClientImpl$ChannelReadyForAddEntryCallback.operationComplete(BookieClientImpl.java:356)
at org.apache.bookkeeper.proto.PerChannelBookieClient$ConnectionFutureListener.operationComplete(PerChannelBookieClient.java:2548)
at org.apache.bookkeeper.proto.PerChannelBookieClient$ConnectionFutureListener.operationComplete(PerChannelBookieClient.java:2453)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
at io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:110)
at io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:89)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:228)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:189)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:175)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@17.0.8/Thread.java:833) Therefore I want to use sendingCallbacks instead of synchronized
Thanks for reminding. I replaced pendingAddOps.remove() with pendingAddOps.remove(pendingAddOp) to ensure that the same element is processed.
Great suggestion, this will ensure the visibility of changingEmsemble. I've added it |
Makes sense. The only concern is about correctness. If the method is already processing while it gets called another time, is it necessary to execute the loop one extra time to ensure that something doesn't get left unprocessed? |
Btw. The thread execution model for LedgerHandle isn't consistent. In some cases, the thread is switched to use the ordered executor and sometimes it's not. For example, here the thread is switched: bookkeeper/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Lines 1876 to 1877 in 13e7efa
|
This observation isn't about this PR directly. I have so many doubts of the correctness of sendAddSuccessCallbacks when it's not executed by the ordered executor. Talking about the current code for bookkeeper/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Lines 1809 to 1841 in 13e7efa
If the order gets mixed, it seems that no progress could be made? bookkeeper/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Lines 1822 to 1829 in 13e7efa
|
The logic in |
The |
@graysonzeng I added another comment #4171 (comment) where I explain the problem. I think that this line should be changed to use the ordered executor: bookkeeper/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java Line 204 in 234b817
Changing Line 204 to
And then sendAddSuccessCallbacks could be made a synchronized method. That would prevent the dead lock seen in #4171 (comment) . Makes sense? |
It's possible that it doesn't solve the problem. Most likely it solves the dead lock when the synchronized method gets called without holding any other locks. |
@graysonzeng I have a question about the problem description:
Why does it exactly cause a problem when the method is called multiple times at the same time? |
I agree with your comment. In fact, this comment exactly answers the reason for the problem. when the method is called multiple times at the same time,missed or out of order will happen in pendingAddOp. In a ledgerHanle instance, we can see the peek pendingAddOp |
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
Outdated
Show resolved
Hide resolved
@graysonzeng After spending more time with this issue, here are my suggestions to fix the issue:
|
The
+1 for @eolivelli 's suggestion. You can add a “safeSendAddSuccessCallbacks” that calls sendAddSuccessCallbacks in the OrderedExecutor
After we make
IMO, the @graysonzeng We'd better add a unit test to verify and protect the logic. |
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
Outdated
Show resolved
Hide resolved
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
Outdated
Show resolved
Hide resolved
@hangc0276 |
@hangc0276 True, but that would be a significant change that could at least cause performance regressions. The Bookkeeper client code doesn't seem to use the OrderedExecutor in most cases. The current code base is using synchronized in this area. After thinking about it, it might not be necessary to make the method itself
The issue in drainPendingAddsAndAdjustLength isn't an ordering problem. The method simply doesn't update pendingAddsSequenceHead so that sendAddSuccessCallbacks could work after drainPendingAddsAndAdjustLength has been called. The instance will get stuck when pendingAddsSequenceHead gets out of sync in drainPendingAddsAndAdjustLength. My current assumption is that it's the root cause of this issue together with the changingEnsemble thread visibility issue. |
I created an alternative fix #4175 to bring clarity of what I'd be proposing. Once we have a reproducer for the issue in BK as a unit test, it will be easier to validate. |
@lhotari @hangc0276 @eolivelli Thank you very much for your suggestions, I'll be happy to continue improving it.
After that, we don't need to use OrderedExecutor. Is it right? @hangc0276
I'm a little confused about this . Can you tell it more about it? @lhotari |
@graysonzeng I don't think that using OrderedExecutor is justified since callback responses have never been executed by the OrderedExecutor. All sendAddSuccessCallbacks & drainPendingAddsAndAdjustLength calls should be serialized for correctness and that's what
@graysonzeng bookkeeper/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Lines 1822 to 1829 in 13e7efa
My assumption is that this doesn't hold under failure conditions. Why would there even need to be such a rule? When this rule kicks in, the LedgerHandle will stop calling callbacks. That doesn't make any sense to me. Here's a sign that the entry id isn't always a continuous sequence: bookkeeper/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Line 1372 in 13e7efa
When the LAC jumps forward and I wonder if I'm making correct observations about the code. @eolivelli could you please explain how the above condition is handled? |
Thanks for your reply @lhotari, I have some test for #4175 , Unfortunately it will also cause deadlock. "pulsar-io-3-4":
at org.apache.bookkeeper.client.PendingAddOp.submitCallback(PendingAddOp.java:394)
- waiting to lock <0x000000077f2becf0> (a org.apache.bookkeeper.client.PendingAddOp)
at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1838)
- locked <0x000000077e6c0d58> (a java.util.concurrent.ConcurrentLinkedQueue)
at org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:390)
at org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:307)
- locked <0x000000077f2bae98> (a org.apache.bookkeeper.client.PendingAddOp)
at org.apache.bookkeeper.proto.BookieClientImpl.completeAdd(BookieClientImpl.java:284)
at org.apache.bookkeeper.proto.BookieClientImpl.access$000(BookieClientImpl.java:78)
at org.apache.bookkeeper.proto.BookieClientImpl$ChannelReadyForAddEntryCallback.operationComplete(BookieClientImpl.java:396)
at org.apache.bookkeeper.proto.BookieClientImpl$ChannelReadyForAddEntryCallback.operationComplete(BookieClientImpl.java:356)
at org.apache.bookkeeper.proto.PerChannelBookieClient$ConnectionFutureListener.operationComplete(PerChannelBookieClient.java:2548)
at org.apache.bookkeeper.proto.PerChannelBookieClient$ConnectionFutureListener.operationComplete(PerChannelBookieClient.java:2453)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
at io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:110)
at io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:89)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:228)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:47)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:189)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:175)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)
"BookKeeperClientWorker-OrderedExecutor-7-0":
at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1819)
- waiting to lock <0x000000077e6c0d58> (a java.util.concurrent.ConcurrentLinkedQueue)
at org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:390)
at org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:307)
- locked <0x000000077f2becf0> (a org.apache.bookkeeper.client.PendingAddOp)
at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.writeComplete(PerChannelBookieClient.java:2183)
at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleResponse(PerChannelBookieClient.java:2240)
at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleV2Response(PerChannelBookieClient.java:2219)
at org.apache.bookkeeper.proto.PerChannelBookieClient$ReadV2ResponseCallback.run(PerChannelBookieClient.java:1397)
at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137)
at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:113)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)
Found 1 deadlock. From the stack we can see that this happens within sendAddSuccessCallbacks
thread A -> pendingAddOp lock A (locked) -> LedgerHandle lock L (locked) -> pendingAddOp lock B (waiting) Here is the entire stack file
Suppose we are not willing to use OrderedExecutor because of performance regressions, are there any other better suggestions? i'm looking forward to it @lhotari |
Thanks for testing this @graysonzeng . It seems that one possibility to prevent the deadlock would be to use the solution that you had proposed initially, the AtomicBoolean solution. The |
… & sendAddSuccessCallbacks
7cdab79
to
da3802a
Compare
Thanks for your help @lhotari ,I have updated the PR and test it again. |
@graysonzeng Does the current solution in this PR pass your tests? |
Client's BookieWriteLedgerTest and DeferredSyncTest are failing. It is always possible that those tests are invalid. The logic adding for deferred sync doesn't make sense to me. bookkeeper/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Lines 1822 to 1829 in 13e7efa
More comments about this in #4171 (comment) . I hope @eolivelli would have the chance to chime in again. |
it is pass. @lhotari
I'll fix it if needed |
@graysonzeng is this related to #4194 / #4097 ? |
@graysonzeng Thanks for your contribution, CI is not passed yet, any updates? |
@shoothzj Sorry for late reply, I think the current fix may still be controversial. Maybe it would be better to improve the PR after #4194 is approved? What do you think? |
Master Issue:
apache/pulsar#21860
Motivation
When bookies do a rolling restart, pulsar topic fence state may be triggered due to race conditions. And it can not recover.
After check the heap dump of the broker, we can see the pendingWriteOps is 161, this is the reason why the topic can not recover from the fenced state.
The topic will only change to unfenced when pendingWriteOps is reduced to 0. See unfenced condition
After a deep investigation, we found the cause of the error。The root cause due to
sendAddSuccessCallbacks
may be multiple called at the same time. One is thatunsetSuccessAndSendWriteRequest
is called by the BookKeeperClientWorker-OrderedExecutor thread, and the other iswriteComplete
in pulsar-io thread. We should prevent sendAddSuccessCallbacks from being called again before it completes.Changes
Add a boolean value
sendingCallbacks
to indicate whethersendAddSuccessCallbacks
is being called.If we find that method
sendAddSuccessCallbacks
is being called when we try to call it, return directly.Not recommended changes
If we add
synchronized
to thesendAddSuccessCallbacks
, it will impact the performance and may lead to deadlock.