Skip to content

Commit

Permalink
Unify ByteBufAllocator for the DirectIO component (#3985)
Browse files Browse the repository at this point in the history
### Motivation
Some classes in the DirectIO component use `PooledByteBufAllocator` to create ByteBuf instead of using the BookKeeper-initiated allocator. When we configured the BookKeeper allocator OOM policy to shut down, the OOM policy can't apply to those classes which use `PooledByteBufAllocator` to create ByteBuf.

### Modifications
- Unify ByteBufAllocator for the DirectIO component.

(cherry picked from commit bf06642)
  • Loading branch information
hangc0276 authored and zymap committed Jun 19, 2023
1 parent cec8551 commit 306eaef
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -60,12 +60,14 @@ class Buffer {
final int bufferSize;
ByteBuf buffer;
ByteBuffer byteBuffer;
ByteBufAllocator allocator;
long pointer = 0;

Buffer(NativeIO nativeIO, int bufferSize) throws IOException {
Buffer(NativeIO nativeIO, ByteBufAllocator allocator, int bufferSize) throws IOException {
checkArgument(isAligned(bufferSize),
"Buffer size not aligned %d", bufferSize);

this.allocator = allocator;
this.buffer = allocateAligned(ALIGNMENT, bufferSize);
this.nativeIO = nativeIO;
this.bufferSize = bufferSize;
Expand All @@ -74,7 +76,7 @@ class Buffer {
}

private ByteBuf allocateAligned(int alignment, int bufferSize) {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize + alignment);
ByteBuf buf = allocator.directBuffer(bufferSize + alignment);
long addr = buf.memoryAddress();
if ((addr & (alignment - 1)) == 0) {
// The address is already aligned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.bookie.storage.directentrylogger;

import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.bookkeeper.common.util.nativeio.NativeIO;
Expand All @@ -30,10 +31,10 @@
public class BufferPool implements AutoCloseable {
private final ArrayBlockingQueue<Buffer> pool;

BufferPool(NativeIO nativeIO, int bufferSize, int maxPoolSize) throws IOException {
BufferPool(NativeIO nativeIO, ByteBufAllocator allocator, int bufferSize, int maxPoolSize) throws IOException {
pool = new ArrayBlockingQueue<>(maxPoolSize);
for (int i = 0; i < maxPoolSize; i++) {
pool.add(new Buffer(nativeIO, bufferSize));
pool.add(new Buffer(nativeIO, allocator, bufferSize));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void markCompacted() throws IOException {
public void scan(EntryLogScanner scanner) throws IOException {
try (LogReader reader = new DirectReader(dstLogId, compactedFile.toString(), allocator, nativeIO,
readBufferSize, maxSaneEntrySize, readBlockStats)) {
LogReaderScan.scan(reader, scanner);
LogReaderScan.scan(allocator, reader, scanner);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public DirectEntryLogger(File ledgerDir,
this.allocator = allocator;

int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);
this.writeBuffers = new BufferPool(nativeIO, allocator, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);

// The total read buffer memory needs to get split across all the read threads, since the caches
// are thread-specific and we want to ensure we don't pass the total memory limit.
Expand Down Expand Up @@ -385,7 +385,7 @@ public boolean removeEntryLog(long entryLogId) {
public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
checkArgument(entryLogId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", entryLogId);
try (LogReader reader = newDirectReader((int) entryLogId)) {
LogReaderScan.scan(reader, scanner);
LogReaderScan.scan(allocator, reader, scanner);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DirectReader implements LogReader {
.kv("errno", ne.getErrno()).toString());
}
refreshMaxOffset();
nativeBuffer = new Buffer(nativeIO, bufferSize);
nativeBuffer = new Buffer(nativeIO, allocator, bufferSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
package org.apache.bookkeeper.bookie.storage.directentrylogger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import org.apache.bookkeeper.bookie.storage.EntryLogScanner;

class LogReaderScan {
static void scan(LogReader reader, EntryLogScanner scanner) throws IOException {
static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner scanner) throws IOException {
int offset = Header.LOGFILE_LEGACY_HEADER_SIZE;

ByteBuf entry = PooledByteBufAllocator.DEFAULT.directBuffer(16 * 1024 * 1024);
ByteBuf entry = allocator.directBuffer(16 * 1024 * 1024);

try {
while (offset < reader.maxOffset()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

// CHECKSTYLE.OFF: IllegalImport
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
Expand Down Expand Up @@ -70,13 +71,13 @@ public void testMaxAlignment() throws Exception {

@Test(expected = IllegalArgumentException.class)
public void testCreateUnaligned() throws Exception {
new Buffer(new NativeIOImpl(), 1234);
new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1234);
}

@Test
public void testWriteInt() throws Exception {
int bufferSize = 1 << 20;
Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, bufferSize);
assertTrue(b.hasSpace(bufferSize));
assertEquals(0, b.position());
b.writeInt(0xdeadbeef);
Expand Down Expand Up @@ -111,7 +112,7 @@ public void testWriteBuffer() throws Exception {
ByteBuf bb = Unpooled.buffer(1021);
fillByteBuf(bb, 0xdeadbeef);
int bufferSize = 1 << 20;
Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, bufferSize);
assertEquals(0, b.position());
b.writeByteBuf(bb);
assertEquals(1021, b.position());
Expand All @@ -138,7 +139,7 @@ public void testWriteBuffer() throws Exception {
public void testPartialRead() throws Exception {
ByteBuf bb = Unpooled.buffer(5000);

Buffer b = new Buffer(new NativeIOImpl(), 4096);
Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);
for (int i = 0; i < 4096 / Integer.BYTES; i++) {
b.writeInt(0xdeadbeef);
}
Expand All @@ -149,7 +150,7 @@ public void testPartialRead() throws Exception {

@Test(expected = IOException.class)
public void testReadIntAtBoundary() throws Exception {
Buffer b = new Buffer(new NativeIOImpl(), 4096);
Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);

for (int i = 0; i < 4096 / Integer.BYTES; i++) {
b.writeInt(0xdeadbeef);
Expand All @@ -163,7 +164,7 @@ public void testReadIntAtBoundary() throws Exception {

@Test(expected = IOException.class)
public void testReadLongAtBoundary() throws Exception {
Buffer b = new Buffer(new NativeIOImpl(), 4096);
Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);

for (int i = 0; i < 4096 / Integer.BYTES; i++) {
b.writeInt(0xdeadbeef);
Expand All @@ -177,7 +178,7 @@ public void testReadLongAtBoundary() throws Exception {

@Test
public void testPadToAlignment() throws Exception {
Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 23);

for (int i = 0; i < 1025; i++) {
b.writeInt(0xdededede);
Expand All @@ -194,7 +195,7 @@ public void testPadToAlignment() throws Exception {

@Test
public void testFree() throws Exception {
Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 23);
b.free(); // success if process doesn't explode
b.free();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testReadBufferAcrossBoundary() throws Exception {
File ledgerDir = tmpDirs.createNew("readBuffer", "logs");

writeFileWithPattern(ledgerDir, 1234, 0xbeefcafe, 1, 1 << 20);
BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 4, 8);
BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 4, 8);

try (LogReader reader = new DirectReader(1234, logFilename(ledgerDir, 1234),
ByteBufAllocator.DEFAULT,
Expand Down Expand Up @@ -268,7 +268,7 @@ public void testReadEntries() throws Exception {

int entrySize = Buffer.ALIGNMENT / 4 + 100;
Map<Integer, Integer> offset2Pattern = new HashMap<>();
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
1 << 20, MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
Expand Down Expand Up @@ -315,7 +315,7 @@ public int fallocate(int fd, int mode, long offset, long len)
return 0;
}
};
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
1 << 20, MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE);
Expand Down Expand Up @@ -353,7 +353,7 @@ public void testReadFromFileBeingWrittenReadInPreallocated() throws Exception {

int entrySize = Buffer.ALIGNMENT / 2 + 8;

try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
1 << 20, MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE);
Expand Down Expand Up @@ -403,7 +403,8 @@ public int fallocate(int fd, int mode, long offset, long len)
return 0; // don't preallocate
}
};
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 10, 8);
try (BufferPool buffers = new BufferPool(new NativeIOImpl(),
ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 10, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234), 1 << 20,
MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
Expand Down Expand Up @@ -452,7 +453,7 @@ public void testLargeEntry() throws Exception {
int entrySize = Buffer.ALIGNMENT * 4;

int offset1, offset2;
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 8, 8);
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 8, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234), 1 << 20,
MoreExecutors.newDirectExecutorService(), buffers, new NativeIOImpl(),
Slogger.CONSOLE)) {
Expand Down Expand Up @@ -496,7 +497,7 @@ public void testLargeEntry() throws Exception {

private static void writeFileWithPattern(File directory, int logId,
int pattern, int blockIncrement, int fileSize) throws Exception {
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(logId, logFilename(directory, logId),
fileSize, MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
Expand Down
Loading

0 comments on commit 306eaef

Please sign in to comment.