From da3802a3852c1c723ba68bbb71886bf5951ad7ec Mon Sep 17 00:00:00 2001 From: graysonzeng Date: Wed, 17 Jan 2024 17:36:28 +0800 Subject: [PATCH] Fix race condition in LedgerHandle in drainPendingAddsAndAdjustLength & sendAddSuccessCallbacks --- .../bookkeeper/client/LedgerHandle.java | 82 +++++++++++-------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 9486b2e632c..5bbd396fec5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -163,7 +163,7 @@ private enum HandleState { public static final long INVALID_LEDGER_ID = -0xABCDABCDL; final Object metadataLock = new Object(); - boolean changingEnsemble = false; + volatile boolean changingEnsemble = false; final AtomicInteger numEnsembleChanges = new AtomicInteger(0); Queue pendingAddOps; ExplicitLacFlushPolicy explicitLacFlushPolicy; @@ -172,6 +172,7 @@ private enum HandleState { final Counter lacUpdateHitsCounter; final Counter lacUpdateMissesCounter; private final OpStatsLogger clientChannelWriteWaitStats; + private final AtomicBoolean sendAddSuccessCallbacksInProgress = new AtomicBoolean(false); LedgerHandle(ClientContext clientCtx, long ledgerId, Versioned versionedMetadata, @@ -557,7 +558,7 @@ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc pendingAdds = drainPendingAddsAndAdjustLength(); // taking the length must occur after draining, as draining changes the length - lastEntry = lastAddPushed = LedgerHandle.this.lastAddConfirmed; + lastEntry = lastAddPushed = pendingAddsSequenceHead = LedgerHandle.this.lastAddConfirmed; finalLength = LedgerHandle.this.length; handleState = HandleState.CLOSED; } @@ -1791,13 +1792,17 @@ void errorOutPendingAdds(int rc) { } synchronized List drainPendingAddsAndAdjustLength() { - PendingAddOp pendingAddOp; - List opsDrained = new ArrayList(pendingAddOps.size()); - while ((pendingAddOp = pendingAddOps.poll()) != null) { - addToLength(-pendingAddOp.entryLength); - opsDrained.add(pendingAddOp); + // synchronize on pendingAddOps to ensure that sendAddSuccessCallbacks isn't concurrently + // modifying pendingAddOps + synchronized (pendingAddOps) { + PendingAddOp pendingAddOp; + List opsDrained = new ArrayList(pendingAddOps.size()); + while ((pendingAddOp = pendingAddOps.poll()) != null) { + addToLength(-pendingAddOp.entryLength); + opsDrained.add(pendingAddOp); + } + return opsDrained; } - return opsDrained; } void errorOutPendingAdds(int rc, List ops) { @@ -1806,38 +1811,43 @@ void errorOutPendingAdds(int rc, List ops) { } } + void sendAddSuccessCallbacks() { - // Start from the head of the queue and proceed while there are - // entries that have had all their responses come back - PendingAddOp pendingAddOp; - - while ((pendingAddOp = pendingAddOps.peek()) != null - && !changingEnsemble) { - if (!pendingAddOp.completed) { - if (LOG.isDebugEnabled()) { - LOG.debug("pending add not completed: {}", pendingAddOp); - } - return; - } - // Check if it is the next entry in the sequence. - if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) { - if (LOG.isDebugEnabled()) { - LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId, - pendingAddsSequenceHead + 1); - } - return; - } + if (!sendAddSuccessCallbacksInProgress.compareAndSet(false, true)) { + // another thread is already sending the callbacks + return; + } + try { + // Start from the head of the queue and proceed while there are + // entries that have had all their responses come back - pendingAddOps.remove(); - explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed); - pendingAddsSequenceHead = pendingAddOp.entryId; - if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) { - this.lastAddConfirmed = pendingAddsSequenceHead; - } + // synchronize on pendingAddOps to ensure that drainPendingAddsAndAdjustLength isn't concurrently + // modifying pendingAddOps + synchronized (pendingAddOps) { + PendingAddOp pendingAddOp; - pendingAddOp.submitCallback(BKException.Code.OK); - } + while ((pendingAddOp = pendingAddOps.peek()) != null + && !changingEnsemble) { + if (!pendingAddOp.completed) { + if (LOG.isDebugEnabled()) { + LOG.debug("pending add not completed: {}", pendingAddOp); + } + return; + } + pendingAddOps.remove(); + explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed); + pendingAddsSequenceHead = pendingAddOp.entryId; + if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) { + this.lastAddConfirmed = pendingAddsSequenceHead; + } + + pendingAddOp.submitCallback(BKException.Code.OK); + } + } + } finally { + sendAddSuccessCallbacksInProgress.set(false); + } } @VisibleForTesting