diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index 877a3ac300a..6b439b0960d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -29,7 +29,9 @@ import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY; import com.google.common.util.concurrent.RateLimiter; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import java.util.Enumeration; import java.util.HashSet; import java.util.Iterator; @@ -403,17 +405,24 @@ public void readComplete(int rc, LedgerHandle lh, numEntriesRead.inc(); numBytesRead.registerSuccessfulValue(dataLength); - ByteBufList toSend = lh.getDigestManager() + ReferenceCounted toSend = lh.getDigestManager() .computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), entry.getLength(), - Unpooled.wrappedBuffer(data, 0, data.length)); + Unpooled.wrappedBuffer(data, 0, data.length), + lh.getLedgerKey(), + 0 + ); if (replicationThrottle != null) { - updateAverageEntrySize(toSend.readableBytes()); + if (toSend instanceof ByteBuf) { + updateAverageEntrySize(((ByteBuf) toSend).readableBytes()); + } else if (toSend instanceof ByteBufList) { + updateAverageEntrySize(((ByteBufList) toSend).readableBytes()); + } } for (BookieId newBookie : newBookies) { long startWriteEntryTime = MathUtils.nowInNano(); bkc.getBookieClient().addEntry(newBookie, lh.getId(), - lh.getLedgerKey(), entryId, ByteBufList.clone(toSend), + lh.getLedgerKey(), entryId, toSend, multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD, false, WriteFlag.NONE); writeDataLatency.registerSuccessfulEvent( diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 394c961cbc9..51f559a86cc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -28,6 +28,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -38,7 +39,6 @@ import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; -import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +56,7 @@ class PendingAddOp implements WriteCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); ByteBuf payload; - ByteBufList toSend; + ReferenceCounted toSend; AddCallbackWithLatency cb; Object ctx; long entryId; @@ -242,9 +242,10 @@ public synchronized void initiate() { checkNotNull(lh); checkNotNull(lh.macManager); + int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE; this.toSend = lh.macManager.computeDigestAndPackageForSending( entryId, lh.lastAddConfirmed, currentLedgerLength, - payload); + payload, lh.ledgerKey, flags); // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending payload = null; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index ba5dc9948dc..f923b61ad50 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME; import com.google.protobuf.ByteString; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; @@ -39,6 +40,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; +import org.apache.bookkeeper.util.ByteBufList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -358,8 +360,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } else { waitingForAuth.add(msg); } + } else if (msg instanceof ByteBuf || msg instanceof ByteBufList) { + waitingForAuth.add(msg); } else { - LOG.info("dropping write of message {}", msg); + LOG.info("[{}] dropping write of message {}", ctx.channel(), msg); } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 81be386f7ca..938874fac04 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -20,6 +20,7 @@ */ package org.apache.bookkeeper.proto; +import io.netty.util.ReferenceCounted; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -139,7 +140,7 @@ void writeLac(BookieId address, long ledgerId, byte[] masterKey, * {@link org.apache.bookkeeper.client.api.WriteFlag} */ void addEntry(BookieId address, long ledgerId, byte[] masterKey, - long entryId, ByteBufList toSend, WriteCallback cb, Object ctx, + long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx, int options, boolean allowFastFail, EnumSet writeFlags); /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index cd11bc17d7a..c305a51ea42 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -32,6 +32,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.util.EnumSet; @@ -288,7 +289,7 @@ public void addEntry(final BookieId addr, final long ledgerId, final byte[] masterKey, final long entryId, - final ByteBufList toSend, + final ReferenceCounted toSend, final WriteCallback cb, final Object ctx, final int options, @@ -357,7 +358,7 @@ private static class ChannelReadyForAddEntryCallback private final Handle recyclerHandle; private BookieClientImpl bookieClient; - private ByteBufList toSend; + private ReferenceCounted toSend; private long ledgerId; private long entryId; private BookieId addr; @@ -369,7 +370,7 @@ private static class ChannelReadyForAddEntryCallback private EnumSet writeFlags; static ChannelReadyForAddEntryCallback create( - BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId, + BookieClientImpl bookieClient, ReferenceCounted toSend, long ledgerId, long entryId, BookieId addr, Object ctx, WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail, EnumSet writeFlags) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 8cf40d2b8e2..c56235dbe67 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -110,23 +110,7 @@ public Object encode(Object msg, ByteBufAllocator allocator) return msg; } BookieProtocol.Request r = (BookieProtocol.Request) msg; - if (r instanceof BookieProtocol.AddRequest) { - BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r; - ByteBufList data = ar.getData(); - - int totalHeaderSize = 4 // for the request header - + BookieProtocol.MASTER_KEY_LENGTH; // for the master key - - int totalPayloadSize = totalHeaderSize + data.readableBytes(); - ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame size */); - buf.writeInt(totalPayloadSize); // Frame header - buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags())); - buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); - - ar.recycle(); - data.prepend(buf); - return data; - } else if (r instanceof BookieProtocol.ReadRequest) { + if (r instanceof BookieProtocol.ReadRequest) { int totalHeaderSize = 4 // for request type + 8 // for ledgerId + 8; // for entryId @@ -437,7 +421,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (LOG.isTraceEnabled()) { LOG.trace("Encode request {} to channel {}.", msg, ctx.channel()); } - if (msg instanceof BookkeeperProtocol.Request) { + if (msg instanceof ByteBuf || msg instanceof ByteBufList) { + ctx.write(msg, promise); + } else if (msg instanceof BookkeeperProtocol.Request) { ctx.write(reqV3.encode(msg, ctx.alloc()), promise); } else if (msg instanceof BookieProtocol.Request) { ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 86c3ed54693..3a27f08a95d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -27,7 +27,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; -import org.apache.bookkeeper.util.ByteBufList; /** * The packets of the Bookie protocol all have a 4-byte integer indicating the @@ -252,58 +251,6 @@ public String toString() { public void recycle() {} } - /** - * A Request that adds data. - */ - class AddRequest extends Request { - ByteBufList data; - - static AddRequest create(byte protocolVersion, long ledgerId, - long entryId, short flags, byte[] masterKey, - ByteBufList data) { - AddRequest add = RECYCLER.get(); - add.protocolVersion = protocolVersion; - add.opCode = ADDENTRY; - add.ledgerId = ledgerId; - add.entryId = entryId; - add.flags = flags; - add.masterKey = masterKey; - add.data = data.retain(); - return add; - } - - ByteBufList getData() { - // We need to have different ByteBufList instances for each bookie write - return ByteBufList.clone(data); - } - - boolean isRecoveryAdd() { - return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD; - } - - private final Handle recyclerHandle; - private AddRequest(Handle recyclerHandle) { - this.recyclerHandle = recyclerHandle; - } - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected AddRequest newObject(Handle handle) { - return new AddRequest(handle); - } - }; - - @Override - public void recycle() { - ledgerId = -1; - entryId = -1; - masterKey = null; - ReferenceCountUtil.release(data); - data = null; - recyclerHandle.recycle(this); - } - } - /** * This is similar to add request, but it used when processing the request on the bookie side. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 4d08faad202..41add30ca56 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -62,6 +62,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; @@ -771,7 +772,7 @@ void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) { * @param writeFlags * WriteFlags */ - void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb, + void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx, final int options, boolean allowFastFail, final EnumSet writeFlags) { Object request = null; CompletionKey completionKey = null; @@ -782,9 +783,12 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf return; } completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY); - request = BookieProtocol.AddRequest.create( - BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, - (short) options, masterKey, toSend); + + if (toSend instanceof ByteBuf) { + request = ((ByteBuf) toSend).retainedDuplicate(); + } else { + request = ByteBufList.clone((ByteBufList) toSend); + } } else { final long txnId = getTxnId(); completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY); @@ -799,11 +803,14 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf } ByteString body = null; - if (toSend.hasArray()) { - body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes()); + ByteBufList bufToSend = (ByteBufList) toSend; + + if (bufToSend.hasArray()) { + body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(), + bufToSend.readableBytes()); } else { - for (int i = 0; i < toSend.size(); i++) { - ByteString piece = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer()); + for (int i = 0; i < bufToSend.size(); i++) { + ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer()); // use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs body = (body == null) ? piece : body.concat(piece); } @@ -1143,14 +1150,6 @@ private void writeAndFlush(final Channel channel, StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); - - // If the request is a V2 add request, we retained the data's reference when creating the AddRequest - // object. To avoid the object leak, we need to release the reference if we met any errors - // before sending it. - if (request instanceof BookieProtocol.AddRequest) { - BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) request; - ar.recycle(); - } return; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index a97c301311f..9f931f731a2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -24,11 +24,14 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.FastThreadLocal; import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.proto.BookieProtoEncoding; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType; import org.apache.bookkeeper.util.ByteBufList; import org.slf4j.Logger; @@ -97,14 +100,76 @@ public static byte[] generateMasterKey(byte[] password) throws NoSuchAlgorithmEx * @param data * @return */ - public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, - ByteBuf data) { - ByteBuf headersBuffer; + public ReferenceCounted computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, + ByteBuf data, byte[] masterKey, int flags) { if (this.useV2Protocol) { - headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength); + return computeDigestAndPackageForSendingV2(entryId, lastAddConfirmed, length, data, masterKey, flags); + } else { + return computeDigestAndPackageForSendingV3(entryId, lastAddConfirmed, length, data); + } + } + + private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long lastAddConfirmed, long length, + ByteBuf data, byte[] masterKey, int flags) { + boolean isSmallEntry = data.readableBytes() < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD; + + int headersSize = 4 // Request header + + BookieProtocol.MASTER_KEY_LENGTH // for the master key + + METADATA_LENGTH // + + macCodeLength; + int payloadSize = data.readableBytes(); + int bufferSize = 4 + headersSize + (isSmallEntry ? payloadSize : 0); + + ByteBuf buf = allocator.buffer(bufferSize, bufferSize); + buf.writeInt(headersSize + payloadSize); + buf.writeInt( + BookieProtocol.PacketHeader.toInt( + BookieProtocol.CURRENT_PROTOCOL_VERSION, BookieProtocol.ADDENTRY, (short) flags)); + buf.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH); + + // The checksum is computed on the next part of the buffer only + buf.readerIndex(buf.writerIndex()); + buf.writeLong(ledgerId); + buf.writeLong(entryId); + buf.writeLong(lastAddConfirmed); + buf.writeLong(length); + + // Compute checksum over the headers + int digest = update(0, buf, buf.readerIndex(), buf.readableBytes()); + + // don't unwrap slices + final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf + ? data.unwrap() : data; + ReferenceCountUtil.retain(unwrapped); + ReferenceCountUtil.safeRelease(data); + + if (unwrapped instanceof CompositeByteBuf) { + CompositeByteBuf cbb = (CompositeByteBuf) unwrapped; + for (int i = 0; i < cbb.numComponents(); i++) { + ByteBuf b = cbb.component(i); + digest = update(digest, b, b.readerIndex(), b.readableBytes()); + } + } else { + digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); + } + + populateValueAndReset(digest, buf); + + // Reset the reader index to the beginning + buf.readerIndex(0); + + if (isSmallEntry) { + buf.writeBytes(unwrapped); + unwrapped.release(); + return buf; } else { - headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength); + return ByteBufList.get(buf, unwrapped); } + } + + private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastAddConfirmed, long length, + ByteBuf data) { + ByteBuf headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength); headersBuffer.writeLong(ledgerId); headersBuffer.writeLong(entryId); headersBuffer.writeLong(lastAddConfirmed); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java index d24022e5042..3f8af53c133 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java @@ -27,8 +27,8 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType; +import org.apache.bookkeeper.proto.MockBookieClient; import org.apache.bookkeeper.proto.checksum.DigestManager; -import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.versioning.Versioned; /** @@ -48,8 +48,8 @@ public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddCo int offset, int len) throws GeneralSecurityException { DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32, UnpooledByteBufAllocator.DEFAULT, true); - return ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length, - Unpooled.wrappedBuffer(data, offset, len))); + return MockBookieClient.copyDataWithSkipHeader(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, + length, Unpooled.wrappedBuffer(data, offset, len), new byte[20], 0)); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java index 7d5cad6531a..086e9f330c5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java @@ -20,7 +20,7 @@ package org.apache.bookkeeper.client; import io.netty.buffer.ByteBuf; -import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.proto.MockBookieClient; /** * Adapter for tests to get the public access from LedgerHandle for its default @@ -28,8 +28,9 @@ */ public class LedgerHandleAdapter { - public static ByteBufList toSend(LedgerHandle lh, long entryId, ByteBuf data) { - return lh.getDigestManager().computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), - lh.addToLength(data.readableBytes()), data); + public static ByteBuf toSend(LedgerHandle lh, long entryId, ByteBuf data) { + return MockBookieClient.copyData(lh.getDigestManager() + .computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), + lh.addToLength(data.readableBytes()), data, new byte[20], 0)); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index 63255310c30..e7710124707 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -35,6 +35,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ReferenceCounted; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collections; @@ -67,6 +68,7 @@ import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.apache.bookkeeper.proto.MockBookieClient; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ByteBufList; @@ -504,10 +506,10 @@ protected void setupBookieClientReadEntry() { if (mockEntry != null) { LOG.info("readEntry - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress); - ByteBufList entry = macManager.computeDigestAndPackageForSending(entryId, + ReferenceCounted entry = macManager.computeDigestAndPackageForSending(entryId, mockEntry.lastAddConfirmed, mockEntry.payload.length, - Unpooled.wrappedBuffer(mockEntry.payload)); - callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, ByteBufList.coalesce(entry), + Unpooled.wrappedBuffer(mockEntry.payload), new byte[20], 0); + callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, MockBookieClient.copyData(entry), args[4]); entry.release(); } else { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java index 0fa2f0d7762..4efc4465e38 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java @@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import java.io.IOException; import java.util.Enumeration; import java.util.concurrent.CompletableFuture; @@ -61,7 +62,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.commons.lang3.mutable.MutableInt; @@ -425,9 +425,10 @@ public void testRecoveryOnEntryGap() throws Exception { long entryId = 14; long lac = 8; byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8); - ByteBufList toSend = + ReferenceCounted toSend = lh.macManager.computeDigestAndPackageForSending( - entryId, lac, lh.getLength() + 100, Unpooled.wrappedBuffer(data, 0, data.length)); + entryId, lac, lh.getLength() + 100, Unpooled.wrappedBuffer(data, 0, data.length), + new byte[20], 0); final CountDownLatch addLatch = new CountDownLatch(1); final AtomicBoolean addSuccess = new AtomicBoolean(false); LOG.info("Add entry {} with lac = {}", entryId, lac); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java index b538a50c2a0..8e3cfd72e42 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java @@ -33,6 +33,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ReferenceCounted; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -166,10 +167,15 @@ public void testSpeculativeResponses() throws Exception { final long lac = 1L; ByteBuf data = Unpooled.copiedBuffer("test-speculative-responses", UTF_8); - ByteBufList dataWithDigest = digestManager.computeDigestAndPackageForSending( - entryId, lac, data.readableBytes(), data); - byte[] bytesWithDigest = new byte[dataWithDigest.readableBytes()]; - assertEquals(bytesWithDigest.length, dataWithDigest.getBytes(bytesWithDigest)); + ReferenceCounted refCnt = digestManager.computeDigestAndPackageForSending( + entryId, lac, data.readableBytes(), data, new byte[20], 0); + + byte[] bytesWithDigest = null; + if (refCnt instanceof ByteBufList) { + ByteBufList dataWithDigest = (ByteBufList) refCnt; + bytesWithDigest = new byte[dataWithDigest.readableBytes()]; + assertEquals(bytesWithDigest.length, dataWithDigest.getBytes(bytesWithDigest)); + } final Map callbacks = Collections.synchronizedMap(new HashMap<>()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java index 82b031c5ae8..a37462dee7d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java @@ -23,8 +23,10 @@ import static org.junit.Assert.assertEquals; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.proto.MockBookieClient; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.ByteBufList; import org.junit.Test; @@ -57,17 +59,20 @@ public void testPendingReadLacOpMissingExplicitLAC() throws Exception { public void initiate() { for (int i = 0; i < lh.getCurrentEnsemble().size(); i++) { final int index = i; - ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSending( + ReferenceCounted toSend = lh.getDigestManager().computeDigestAndPackageForSending( 2, 1, data.length, - Unpooled.wrappedBuffer(data)); + Unpooled.wrappedBuffer(data), + new byte[20], + 0); + bkc.scheduler.schedule(() -> { readLacComplete( 0, lh.getId(), null, - Unpooled.copiedBuffer(buffer.toArray()), + MockBookieClient.copyData(toSend), index); }, 0, TimeUnit.SECONDS); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java index ce8d65fb76b..8721a2c7819 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java @@ -18,6 +18,8 @@ */ package org.apache.bookkeeper.proto; +import org.apache.bookkeeper.client.BookKeeperTestClient; +import org.apache.bookkeeper.test.TestStatsProvider; import org.junit.Before; /** @@ -30,6 +32,8 @@ public class BookieBackpressureForV2Test extends BookieBackpressureTest { public void setUp() throws Exception { super.setUp(); baseClientConf.setUseV2WireProtocol(true); + bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider()); + // the backpressure will bloc the read response, disable it to let it use backpressure mechanism confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java index 2de9ab5e19a..c4344c74d00 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -148,7 +149,7 @@ public void writeLac(BookieId addr, long ledgerId, byte[] masterKey, @Override public void addEntry(BookieId addr, long ledgerId, byte[] masterKey, - long entryId, ByteBufList toSend, WriteCallback cb, Object ctx, + long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx, int options, boolean allowFastFail, EnumSet writeFlags) { toSend.retain(); preWriteHook.runHook(addr, ledgerId, entryId) @@ -262,11 +263,29 @@ public boolean isClosed() { public void close() { } - private static ByteBuf copyData(ByteBufList list) { - ByteBuf buf = Unpooled.buffer(list.readableBytes()); - for (int i = 0; i < list.size(); i++) { - buf.writeBytes(list.getBuffer(i).slice()); + public static ByteBuf copyData(ReferenceCounted rc) { + ByteBuf res; + if (rc instanceof ByteBuf) { + res = Unpooled.copiedBuffer((ByteBuf) rc); + } else { + res = ByteBufList.coalesce((ByteBufList) rc); } - return buf; + + return res; + } + + public static ByteBuf copyDataWithSkipHeader(ReferenceCounted rc) { + ByteBuf res; + if (rc instanceof ByteBuf) { + res = Unpooled.copiedBuffer((ByteBuf) rc); + } else { + res = ByteBufList.coalesce((ByteBufList) rc); + } + + // Skip headers + res.skipBytes(28); + rc.release(); + + return res; } -} \ No newline at end of file +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java index c670e87ff0c..cef77c3f99a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java @@ -84,8 +84,8 @@ public ByteBuf generateEntry(long ledgerId, long entryId, long lac) throws Excep DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DataFormats.LedgerMetadataFormat.DigestType.CRC32C, UnpooledByteBufAllocator.DEFAULT, false); - return ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending( - entryId, lac, 0, Unpooled.buffer(10))); + return ByteBufList.coalesce((ByteBufList) digestManager.computeDigestAndPackageForSending( + entryId, lac, 0, Unpooled.buffer(10), new byte[20], 0)); } diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java index 7eb8fa97746..308463608b5 100644 --- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java +++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java @@ -79,22 +79,6 @@ public void prepare() { this.reqEnDeV3 = new RequestEnDecoderV3(null); } - - @Benchmark - public void testAddEntryV2() throws Exception { - ByteBufList list = ByteBufList.get(entry.retainedSlice()); - BookieProtocol.AddRequest req = BookieProtocol.AddRequest.create( - BookieProtocol.CURRENT_PROTOCOL_VERSION, - ledgerId, - entryId, - flags, - masterKey, - list); - Object res = this.reqEnDeV2.encode(req, ByteBufAllocator.DEFAULT); - ReferenceCountUtil.release(res); - ReferenceCountUtil.release(list); - } - @Benchmark public void testAddEntryV3() throws Exception { // Build the request and calculate the total size to be included in the packet. diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java index d9545620163..04fa500d476 100644 --- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java +++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java @@ -83,8 +83,9 @@ public void doSetup() throws Exception { data.writeBytes(randomBytes(entrySize)); digestBuf = ByteBufAllocator.DEFAULT.directBuffer(); - digestBuf.writeBytes(ByteBufList.coalesce( - dm.computeDigestAndPackageForSending(1234, 1234, entrySize, data))); + digestBuf.writeBytes((ByteBuf) + dm.computeDigestAndPackageForSending(1234, 1234, entrySize, data, + new byte[0], 0)); } }