Skip to content

Commit

Permalink
fix_topic_unavailable_error
Browse files Browse the repository at this point in the history
  • Loading branch information
graysonzeng committed Jan 5, 2024
1 parent f88f437 commit 4a4d1b2
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ private enum HandleState {

ScheduledFuture<?> timeoutFuture = null;

private AtomicBoolean sendingCallbacks = new AtomicBoolean(false);

@VisibleForTesting
final Map<Integer, BookieId> delayedWriteFailedBookies =
new HashMap<Integer, BookieId>();
Expand Down Expand Up @@ -1811,12 +1813,17 @@ void sendAddSuccessCallbacks() {
// entries that have had all their responses come back
PendingAddOp pendingAddOp;

if (!sendingCallbacks.compareAndSet(false, true)) {
return;
}

while ((pendingAddOp = pendingAddOps.peek()) != null
&& !changingEnsemble) {
if (!pendingAddOp.completed) {
if (LOG.isDebugEnabled()) {
LOG.debug("pending add not completed: {}", pendingAddOp);
}
sendingCallbacks.set(false);
return;
}
// Check if it is the next entry in the sequence.
Expand All @@ -1825,6 +1832,7 @@ void sendAddSuccessCallbacks() {
LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
pendingAddsSequenceHead + 1);
}
sendingCallbacks.set(false);
return;
}

Expand All @@ -1838,6 +1846,7 @@ void sendAddSuccessCallbacks() {
pendingAddOp.submitCallback(BKException.Code.OK);
}

sendingCallbacks.set(false);
}

@VisibleForTesting
Expand Down

0 comments on commit 4a4d1b2

Please sign in to comment.