Skip to content

Commit

Permalink
Fix race condition in LedgerHandle in drainPendingAddsAndAdjustLength…
Browse files Browse the repository at this point in the history
… & sendAddSuccessCallbacks
  • Loading branch information
graysonzeng committed Jan 17, 2024
1 parent 54168b5 commit da3802a
Showing 1 changed file with 46 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private enum HandleState {
public static final long INVALID_LEDGER_ID = -0xABCDABCDL;

final Object metadataLock = new Object();
boolean changingEnsemble = false;
volatile boolean changingEnsemble = false;
final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
Queue<PendingAddOp> pendingAddOps;
ExplicitLacFlushPolicy explicitLacFlushPolicy;
Expand All @@ -172,6 +172,7 @@ private enum HandleState {
final Counter lacUpdateHitsCounter;
final Counter lacUpdateMissesCounter;
private final OpStatsLogger clientChannelWriteWaitStats;
private final AtomicBoolean sendAddSuccessCallbacksInProgress = new AtomicBoolean(false);

LedgerHandle(ClientContext clientCtx,
long ledgerId, Versioned<LedgerMetadata> versionedMetadata,
Expand Down Expand Up @@ -557,7 +558,7 @@ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc
pendingAdds = drainPendingAddsAndAdjustLength();

// taking the length must occur after draining, as draining changes the length
lastEntry = lastAddPushed = LedgerHandle.this.lastAddConfirmed;
lastEntry = lastAddPushed = pendingAddsSequenceHead = LedgerHandle.this.lastAddConfirmed;
finalLength = LedgerHandle.this.length;
handleState = HandleState.CLOSED;
}
Expand Down Expand Up @@ -1791,13 +1792,17 @@ void errorOutPendingAdds(int rc) {
}

synchronized List<PendingAddOp> drainPendingAddsAndAdjustLength() {
PendingAddOp pendingAddOp;
List<PendingAddOp> opsDrained = new ArrayList<PendingAddOp>(pendingAddOps.size());
while ((pendingAddOp = pendingAddOps.poll()) != null) {
addToLength(-pendingAddOp.entryLength);
opsDrained.add(pendingAddOp);
// synchronize on pendingAddOps to ensure that sendAddSuccessCallbacks isn't concurrently
// modifying pendingAddOps
synchronized (pendingAddOps) {
PendingAddOp pendingAddOp;
List<PendingAddOp> opsDrained = new ArrayList<PendingAddOp>(pendingAddOps.size());
while ((pendingAddOp = pendingAddOps.poll()) != null) {
addToLength(-pendingAddOp.entryLength);
opsDrained.add(pendingAddOp);
}
return opsDrained;
}
return opsDrained;
}

void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
Expand All @@ -1806,38 +1811,43 @@ void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
}
}


void sendAddSuccessCallbacks() {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back
PendingAddOp pendingAddOp;

while ((pendingAddOp = pendingAddOps.peek()) != null
&& !changingEnsemble) {
if (!pendingAddOp.completed) {
if (LOG.isDebugEnabled()) {
LOG.debug("pending add not completed: {}", pendingAddOp);
}
return;
}
// Check if it is the next entry in the sequence.
if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
pendingAddsSequenceHead + 1);
}
return;
}
if (!sendAddSuccessCallbacksInProgress.compareAndSet(false, true)) {
// another thread is already sending the callbacks
return;
}
try {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back

pendingAddOps.remove();
explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
pendingAddsSequenceHead = pendingAddOp.entryId;
if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
this.lastAddConfirmed = pendingAddsSequenceHead;
}
// synchronize on pendingAddOps to ensure that drainPendingAddsAndAdjustLength isn't concurrently
// modifying pendingAddOps
synchronized (pendingAddOps) {
PendingAddOp pendingAddOp;

pendingAddOp.submitCallback(BKException.Code.OK);
}
while ((pendingAddOp = pendingAddOps.peek()) != null
&& !changingEnsemble) {
if (!pendingAddOp.completed) {
if (LOG.isDebugEnabled()) {
LOG.debug("pending add not completed: {}", pendingAddOp);
}
return;
}

pendingAddOps.remove();
explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
pendingAddsSequenceHead = pendingAddOp.entryId;
if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
this.lastAddConfirmed = pendingAddsSequenceHead;
}

pendingAddOp.submitCallback(BKException.Code.OK);
}
}
} finally {
sendAddSuccessCallbacksInProgress.set(false);
}
}

@VisibleForTesting
Expand Down

0 comments on commit da3802a

Please sign in to comment.