Skip to content

Commit

Permalink
Add a property to disable prefetch read
Browse files Browse the repository at this point in the history
  • Loading branch information
顾鹏 authored and 顾鹏 committed Aug 8, 2024
1 parent df03c63 commit df9a4cf
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public enum BlockInStreamSource {

private boolean mClosed = false;
private boolean mEOF = false;
private boolean mEnablePrefetchRead = true;

/**
* Creates a {@link BlockInStream}.
Expand Down Expand Up @@ -120,6 +121,7 @@ public static BlockInStream create(FileSystemContext context, BlockInfo info,
alluxioConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_PREFERRED);
boolean sourceSupportsDomainSocket = NettyUtils.isDomainSocketSupported(dataSource);
boolean sourceIsLocal = dataSourceType == BlockInStreamSource.NODE_LOCAL;
boolean enablePrefetchRead = alluxioConf.getBoolean(PropertyKey.USER_PREFETCH_READ_ENABLED);

// Short circuit is enabled when
// 1. data source is local node
Expand All @@ -141,9 +143,9 @@ public static BlockInStream create(FileSystemContext context, BlockInfo info,
// gRPC
LOG.debug("Creating gRPC input stream for block {} @ {} from client {} reading through {} ("
+ "data locates in the local worker {}, shortCircuitEnabled {}, "
+ "shortCircuitPreferred {}, sourceSupportDomainSocket {})",
+ "shortCircuitPreferred {}, sourceSupportDomainSocket {}, enablePrefetchRead {})",
blockId, dataSource, NetworkAddressUtils.getClientHostName(alluxioConf), dataSource,
sourceIsLocal, shortCircuit, shortCircuitPreferred, sourceSupportsDomainSocket);
sourceIsLocal, shortCircuit, shortCircuitPreferred, sourceSupportsDomainSocket, enablePrefetchRead);
return createGrpcBlockInStream(context, dataSource, dataSourceType, blockId,
blockSize, options);
}
Expand Down Expand Up @@ -208,6 +210,8 @@ private static BlockInStream createGrpcBlockInStream(FileSystemContext context,
AlluxioConfiguration conf = context.getClusterConf();
long chunkSize = conf.getBytes(
PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
boolean enablePrefetchRead = conf.getBoolean(
PropertyKey.USER_PREFETCH_READ_ENABLED);
// Construct the partial read request
ReadRequest.Builder builder = ReadRequest.newBuilder()
.setBlockId(blockId)
Expand All @@ -224,7 +228,7 @@ private static BlockInStream createGrpcBlockInStream(FileSystemContext context,
} else {
factory = new GrpcDataReader.Factory(context, address, builder);
}
return new BlockInStream(factory, address, blockSource, blockId, blockSize);
return new BlockInStream(factory, address, blockSource, blockId, blockSize, enablePrefetchRead);
}

/**
Expand All @@ -246,11 +250,13 @@ public static BlockInStream createRemoteBlockInStream(FileSystemContext context,
AlluxioConfiguration conf = context.getClusterConf();
long chunkSize = conf.getBytes(
PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
boolean enablePrefetchRead = conf.getBoolean(
PropertyKey.USER_PREFETCH_READ_ENABLED);
ReadRequest readRequest = ReadRequest.newBuilder().setBlockId(blockId)
.setOpenUfsBlockOptions(ufsOptions).setChunkSize(chunkSize).buildPartial();
DataReader.Factory factory = new GrpcDataReader.Factory(context, address,
readRequest.toBuilder());
return new BlockInStream(factory, address, blockSource, blockId, blockSize);
return new BlockInStream(factory, address, blockSource, blockId, blockSize, enablePrefetchRead);
}

/**
Expand All @@ -272,6 +278,23 @@ protected BlockInStream(DataReader.Factory dataReaderFactory,
mLength = length;
}

/**
* Creates an instance of {@link BlockInStream}.
*
* @param dataReaderFactory the data reader factory
* @param address the address of the gRPC data server
* @param blockSource the source location of the block
* @param id the ID (either block ID or UFS file ID)
* @param length the length
* @param enablePrefetchRead whether to enable prefetch
*/
@VisibleForTesting
protected BlockInStream(DataReader.Factory dataReaderFactory,
WorkerNetAddress address, BlockInStreamSource blockSource, long id, long length, boolean enablePrefetchRead) {
this(dataReaderFactory, address, blockSource, id, length);
mEnablePrefetchRead = enablePrefetchRead;
}

@Override
public long getPos() {
return mPos;
Expand Down Expand Up @@ -318,7 +341,11 @@ public int read(ByteBuffer byteBuffer, int off, int len) throws IOException {
if (mPos == mLength) {
return -1;
}
readChunk();
if (mEnablePrefetchRead) {
readChunk();
} else {
readLengthChunk(len);
}
if (mCurrentChunk == null) {
mEOF = true;
}
Expand Down Expand Up @@ -498,6 +525,21 @@ private void readChunk() throws IOException {
}
}

/**
* Reads a new length from the channel into current chunk.
*/
private void readLengthChunk(int length) throws IOException {
if (mDataReader == null) {
mDataReader = mDataReaderFactory.create(mPos, length);
} else if (mCurrentChunk != null && mCurrentChunk.readableBytes() == 0) {
closeDataReader();
mDataReader = mDataReaderFactory.create(mPos, length);
}
if (mCurrentChunk == null) {
mCurrentChunk = mDataReader.readChunk();
}
}

/**
* Close the current data reader.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,34 @@ public void closeReaderAfterReadingAllData() throws Exception {
assertTrue(reader.isClosed());
}

@Test
public void disablePrefetchReadTest() throws Exception {
int chunkSize = 512;
TestDataReader.Factory factory = new TestDataReader.Factory(
chunkSize, BufferUtils.getIncreasingByteArray(2 * chunkSize));
BlockInStream stream = new BlockInStream(factory, new WorkerNetAddress(),
BlockInStream.BlockInStreamSource.REMOTE, -1, 1024, false);
byte[] res = new byte[chunkSize];
int read;
read = stream.read(res);
TestDataReader reader = factory.getDataReader();
assertEquals(chunkSize, read);
assertEquals(stream.mCurrentChunk.getLength(), res.length);
assertNotNull(reader);
assertFalse(reader.isClosed());

read = stream.read(res, 0, chunkSize);
assertEquals(chunkSize, read);
assertTrue(reader.isClosed());

read = stream.read(res, 0, chunkSize);
assertEquals(-1, read);
assertTrue(reader.isClosed());

stream.close();
assertTrue(reader.isClosed());
}

@Test
public void createShortCircuit() throws Exception {
WorkerNetAddress dataSource = new WorkerNetAddress();
Expand Down
10 changes: 9 additions & 1 deletion core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -7090,7 +7090,14 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();

public static final PropertyKey USER_PREFETCH_READ_ENABLED =
booleanBuilder(Name.USER_PREFETCH_READ_ENABLED)
.setDefaultValue(true)
.setDescription("Whether to enable the data prefetch, when remote read, "
+ "prevents the worker from sending the entire block of data to the client.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
//
// FUSE integration related properties
//
Expand Down Expand Up @@ -9288,6 +9295,7 @@ public static final class Name {
"alluxio.user.short.circuit.preferred";
public static final String USER_WORKER_LIST_REFRESH_INTERVAL =
"alluxio.user.worker.list.refresh.interval";
public static final String USER_PREFETCH_READ_ENABLED = "alluxio.user.prefetch.read.enabled";

//
// FUSE integration related properties
Expand Down

0 comments on commit df9a4cf

Please sign in to comment.