Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] Expose asyncBatchReadUnconfirmed to ReadHandle. #4487

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
Expand Down Expand Up @@ -1040,6 +1041,27 @@ public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, lo
return readEntriesInternalAsync(firstEntry, lastEntry, false);
}

@Override
public CompletableFuture<LedgerEntries> batchReadUnconfirmedAsync(long firstEntry, int maxCount, int maxSize) {
CompletableFuture<LedgerEntries> f = new CompletableFuture<>();
asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, new ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
if (rc != Code.OK) {
f.completeExceptionally(BKException.create(rc));
} else {
List<org.apache.bookkeeper.client.api.LedgerEntry> entries = new ArrayList<>(maxCount);
while (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
entries.add(LedgerEntryImpl.create(entry.ledgerId, entry.entryId, entry.length, entry.data));
}
f.complete(LedgerEntriesImpl.create(entries));
}
}
}, null);
return f;
}

void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb,
Object ctx, boolean isRecoveryRead) {
if (!clientCtx.isClientClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.apache.bookkeeper.client.api;

import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
Expand Down Expand Up @@ -133,6 +134,52 @@ default LedgerEntries readUnconfirmed(long firstEntry, long lastEntry)
BKException.HANDLER);
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* <br>This is the same of
* {@link org.apache.bookkeeper.client.LedgerHandle#asyncReadEntries(
* long, long, AsyncCallback.ReadCallback, Object) }
* but it lets the client read without checking the local value of LastAddConfirmed, so that it is possible to
* read entries for which the writer has not received the acknowledge yet. <br>
* For entries which are within the range 0..LastAddConfirmed BookKeeper guarantees that the writer has successfully
* received the acknowledge.<br>
* For entries outside that range it is possible that the writer never received the acknowledge
* and so there is the risk that the reader is seeing entries before the writer and this could result in
* a consistency issue in some cases.<br>
* With this method you can even read entries before the LastAddConfirmed and entries after it with one call,
* the expected consistency will be as described above for each subrange of ids.
*
* @param firstEntry
* id of first entry of sequence
* @param maxCount
* id of last entry of sequence
* @param maxSize
* the total entries size.
*
* @see org.apache.bookkeeper.client.LedgerHandle#asyncReadEntries(long, long, AsyncCallback.ReadCallback, Object)
* @see org.apache.bookkeeper.client.LedgerHandle#asyncReadLastConfirmed(
* AsyncCallback.ReadLastConfirmedCallback, Object)
* @see org.apache.bookkeeper.client.LedgerHandle#readUnconfirmedEntries(long, long)
*/
CompletableFuture<LedgerEntries> batchReadUnconfirmedAsync(long firstEntry, int maxCount, int maxSize);

/**
* Read a sequence of entries synchronously.
*
* @param firstEntry
* id of first entry of sequence
* @param maxCount
* id of last entry of sequence
* @param maxSize
* the total entries size.
*
* @see #batchReadUnconfirmedAsync(long, int, int)
*/
default LedgerEntries batchReadUnconfirmed(long firstEntry, int maxCount, int maxSize)
throws BKException, InterruptedException {
return FutureUtils.result(batchReadUnconfirmedAsync(firstEntry, maxCount, maxSize), BKException.HANDLER);
}

/**
* Obtains asynchronously the last confirmed write from a quorum of bookies. This
* call obtains the last add confirmed each bookie has received for this ledger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, lo
return readHandle.readUnconfirmedAsync(firstEntry, lastEntry);
}

@Override
public CompletableFuture<LedgerEntries> batchReadUnconfirmedAsync(long firstEntry, int maxCount, int maxSize) {
return readHandle.batchReadUnconfirmedAsync(firstEntry, maxCount, maxSize);
}

@Override
public CompletableFuture<Long> readLastAddConfirmedAsync() {
return readHandle.readLastAddConfirmedAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.commons.lang3.mutable.MutableInt;


/**
Expand Down Expand Up @@ -82,11 +83,56 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr

}

@Override
public CompletableFuture<LedgerEntries> batchReadAsync(long firstEntry, int maxCount, long _maxSize) {
final int maxFrameSize = 5 * 1024 * 1024;
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
if (bk.isStopped()) {
promise.completeExceptionally(new BKException.BKClientClosedException());
return promise;
}

long maxSize = _maxSize > 0 ? _maxSize : maxFrameSize;
long lastEntry = Math.min(firstEntry + maxCount - 1, getLastAddConfirmed());
MutableInt size = new MutableInt(0);
bk.orderedExecutor.chooseThread().execute(() -> {
if (bk.getProgrammedFailStatus()) {
promise.completeExceptionally(BKException.create(bk.failReturnCode));
return;
} else if (bk.isStopped()) {
promise.completeExceptionally(new BKException.BKClientClosedException());
return;
}

if (log.isDebugEnabled()) {
log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
}
List<LedgerEntry> seq = new ArrayList<>();
long entryId = firstEntry;
while (entryId <= lastEntry && entryId < entries.size()) {
if (size.addAndGet(entries.get((int) entryId).getLength()) > maxSize) {
break;
}
seq.add(entries.get((int) entryId++).duplicate());
}
if (log.isDebugEnabled()) {
log.debug("Entries read: {}", seq);
}
promise.complete(LedgerEntriesImpl.create(seq));
});
return promise;
}

@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry);
}

@Override
public CompletableFuture<LedgerEntries> batchReadUnconfirmedAsync(long firstEntry, int maxCount, int maxSize) {
return batchReadAsync(firstEntry, maxCount, maxSize);
}

@Override
public CompletableFuture<Long> readLastAddConfirmedAsync() {
return CompletableFuture.completedFuture(getLastAddConfirmed());
Expand Down
Loading