Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix race conditions of LedgerHandle in client #4171

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private enum HandleState {
public static final long INVALID_LEDGER_ID = -0xABCDABCDL;

final Object metadataLock = new Object();
boolean changingEnsemble = false;
volatile boolean changingEnsemble = false;
final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
Queue<PendingAddOp> pendingAddOps;
ExplicitLacFlushPolicy explicitLacFlushPolicy;
Expand All @@ -172,6 +172,7 @@ private enum HandleState {
final Counter lacUpdateHitsCounter;
final Counter lacUpdateMissesCounter;
private final OpStatsLogger clientChannelWriteWaitStats;
private final AtomicBoolean sendAddSuccessCallbacksInProgress = new AtomicBoolean(false);

LedgerHandle(ClientContext clientCtx,
long ledgerId, Versioned<LedgerMetadata> versionedMetadata,
Expand Down Expand Up @@ -557,7 +558,7 @@ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc
pendingAdds = drainPendingAddsAndAdjustLength();

// taking the length must occur after draining, as draining changes the length
lastEntry = lastAddPushed = LedgerHandle.this.lastAddConfirmed;
lastEntry = lastAddPushed = pendingAddsSequenceHead = LedgerHandle.this.lastAddConfirmed;
finalLength = LedgerHandle.this.length;
handleState = HandleState.CLOSED;
}
Expand Down Expand Up @@ -1791,13 +1792,17 @@ void errorOutPendingAdds(int rc) {
}

synchronized List<PendingAddOp> drainPendingAddsAndAdjustLength() {
PendingAddOp pendingAddOp;
List<PendingAddOp> opsDrained = new ArrayList<PendingAddOp>(pendingAddOps.size());
while ((pendingAddOp = pendingAddOps.poll()) != null) {
addToLength(-pendingAddOp.entryLength);
opsDrained.add(pendingAddOp);
// synchronize on pendingAddOps to ensure that sendAddSuccessCallbacks isn't concurrently
// modifying pendingAddOps
synchronized (pendingAddOps) {
PendingAddOp pendingAddOp;
List<PendingAddOp> opsDrained = new ArrayList<PendingAddOp>(pendingAddOps.size());
while ((pendingAddOp = pendingAddOps.poll()) != null) {
addToLength(-pendingAddOp.entryLength);
opsDrained.add(pendingAddOp);
}
return opsDrained;
}
return opsDrained;
}

void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
Expand All @@ -1806,38 +1811,43 @@ void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
}
}


void sendAddSuccessCallbacks() {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back
PendingAddOp pendingAddOp;

while ((pendingAddOp = pendingAddOps.peek()) != null
&& !changingEnsemble) {
if (!pendingAddOp.completed) {
if (LOG.isDebugEnabled()) {
LOG.debug("pending add not completed: {}", pendingAddOp);
}
return;
}
// Check if it is the next entry in the sequence.
if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
pendingAddsSequenceHead + 1);
}
return;
}
if (!sendAddSuccessCallbacksInProgress.compareAndSet(false, true)) {
// another thread is already sending the callbacks
return;
}
try {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back

pendingAddOps.remove();
explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
pendingAddsSequenceHead = pendingAddOp.entryId;
if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
this.lastAddConfirmed = pendingAddsSequenceHead;
}
// synchronize on pendingAddOps to ensure that drainPendingAddsAndAdjustLength isn't concurrently
// modifying pendingAddOps
synchronized (pendingAddOps) {
PendingAddOp pendingAddOp;

pendingAddOp.submitCallback(BKException.Code.OK);
}
while ((pendingAddOp = pendingAddOps.peek()) != null
&& !changingEnsemble) {
if (!pendingAddOp.completed) {
if (LOG.isDebugEnabled()) {
LOG.debug("pending add not completed: {}", pendingAddOp);
}
return;
}

pendingAddOps.remove();
explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
pendingAddsSequenceHead = pendingAddOp.entryId;
if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
this.lastAddConfirmed = pendingAddsSequenceHead;
}

pendingAddOp.submitCallback(BKException.Code.OK);
}
}
} finally {
sendAddSuccessCallbacksInProgress.set(false);
}
}

@VisibleForTesting
Expand Down
Loading