-
Notifications
You must be signed in to change notification settings - Fork 5.5k
perf: Support fetch multiple blocks based on max bytes #26476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideThis PR refactors the shuffle reading API to fetch data under a maximum bytes threshold rather than by batch count, updating the core interface, reader implementations, exchange source logic, and associated tests to support byte-based reading. Sequence diagram for byte-based shuffle data fetchingsequenceDiagram
participant "ShuffleExchangeSource"
participant "LocalShuffleReader"
participant "FileSystem"
participant "ReadBatch"
"ShuffleExchangeSource"->>"LocalShuffleReader": next(maxBytes)
loop While totalBytes < maxBytes and files remain
"LocalShuffleReader"->>"FileSystem": openFileForRead(filename)
"LocalShuffleReader"->>"FileSystem": pread(0, fileSize, buffer)
"LocalShuffleReader"->>"ReadBatch": Parse buffer into rows
"LocalShuffleReader"->>"ShuffleExchangeSource": Add ReadBatch to result
end
"LocalShuffleReader"-->>"ShuffleExchangeSource": vector<ReadBatch>
Class diagram for updated ShuffleReader and LocalShuffleReader interfacesclassDiagram
class ShuffleReader {
<<interface>>
+next(uint64_t maxBytes) : SemiFuture<vector<unique_ptr<ReadBatch>>>
+noMoreData(bool success)
}
class LocalShuffleReader {
+next(uint64_t maxBytes) : SemiFuture<vector<unique_ptr<ReadBatch>>>
+noMoreData(bool success)
}
ShuffleReader <|-- LocalShuffleReader
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
af1594b to
13ab356
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- In TestShuffleReader::next you currently push one batch unconditionally then loop until maxBytes; consider merging these into a single loop that accumulates bytes to simplify the logic.
- The new persistentShuffleBatch test still has a commented-out ASSERT_EQ(numBatches, numRows); either remove that dead code or update the assertion to reflect the intended behavior.
- ShuffleExchangeSource::request ignores the maxWait parameter—either implement a wait/backoff for maxWait or remove the unused argument to avoid confusion.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In TestShuffleReader::next you currently push one batch unconditionally then loop until maxBytes; consider merging these into a single loop that accumulates bytes to simplify the logic.
- The new persistentShuffleBatch test still has a commented-out ASSERT_EQ(numBatches, numRows); either remove that dead code or update the assertion to reflect the intended behavior.
- ShuffleExchangeSource::request ignores the maxWait parameter—either implement a wait/backoff for maxWait or remove the unused argument to avoid confusion.
## Individual Comments
### Comment 1
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:175` </location>
<code_context>
- batches.reserve(numBatches);
+ uint64_t totalBytes{0};
+ // Read files until we reach maxBytes limit or run out of files.
+ while (readPartitionFileIndex_ < readPartitionFiles_.size()) {
+ const auto filename = readPartitionFiles_[readPartitionFileIndex_];
+ auto file = fileSystem_->openFileForRead(filename);
</code_context>
<issue_to_address>
**suggestion:** Consider handling the case where a single file exceeds maxBytes.
If a partition file exceeds maxBytes and batches is empty, the code will load the entire file, potentially causing high memory usage. Consider splitting large files or enforcing a stricter limit.
Suggested implementation:
```cpp
while (readPartitionFileIndex_ < readPartitionFiles_.size()) {
const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
const auto fileSize = file->size();
// If the file is larger than maxBytes and batches is empty, only read up to maxBytes.
if (fileSize > maxBytes && batches.empty()) {
// Read only maxBytes from the file.
auto batch = std::make_unique<ReadBatch>();
batch->readFromFile(file, /*offset*/0, /*length*/maxBytes);
batches.push_back(std::move(batch));
totalBytes += maxBytes;
// If we didn't finish the file, update the offset for next time.
readPartitionFileOffsets_[readPartitionFileIndex_] = maxBytes;
// If the file is not fully read, don't advance the file index.
if (maxBytes < fileSize) {
break;
} else {
++readPartitionFileIndex_;
}
continue;
}
// Stop if adding this file would exceed maxBytes (unless we haven't read
// any files yet)
if (!batches.empty() && totalBytes + fileSize > maxBytes) {
break;
}
```
1. You will need to add and manage a `readPartitionFileOffsets_` vector or map to track the current read offset for each file, so that subsequent calls to `next()` can continue reading from where the previous call left off.
2. Implement the `ReadBatch::readFromFile(file, offset, length)` method if it does not already exist, or adapt the batch reading logic to support partial reads.
3. Ensure that the logic for advancing `readPartitionFileIndex_` and handling offsets is consistent throughout the function.
</issue_to_address>
### Comment 2
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1213-1210` </location>
<code_context>
+TEST_F(ShuffleTest, persistentShuffleBatch) {
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for error conditions and edge cases in persistentShuffleBatch.
Please add tests for maxBytes = 0, negative, and very large values, as well as cases with zero rows or empty partitions, to improve coverage of edge conditions.
Suggested implementation:
```cpp
TEST_F(ShuffleTest, persistentShuffleBatch) {
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();
const uint32_t numPartitions = 1;
const uint32_t partition = 0;
const size_t numRows{20};
std::vector<std::string> values{numRows};
std::vector<std::string_view> views{numRows};
const size_t rowSize{64};
// Normal case (already present)
// ... existing test logic ...
}
TEST_F(ShuffleTest, persistentShuffleBatch_maxBytesZero) {
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();
const uint32_t numPartitions = 1;
const uint32_t partition = 0;
const size_t numRows{10};
std::vector<std::string> values(numRows, "test");
std::vector<std::string_view> views(numRows, "test");
const size_t rowSize{8};
const int64_t maxBytes = 0;
// Call persistentShuffleBatch with maxBytes = 0 and verify behavior
ASSERT_THROW(
persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes),
std::invalid_argument);
}
TEST_F(ShuffleTest, persistentShuffleBatch_maxBytesNegative) {
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();
const uint32_t numPartitions = 1;
const uint32_t partition = 0;
const size_t numRows{10};
std::vector<std::string> values(numRows, "test");
std::vector<std::string_view> views(numRows, "test");
const size_t rowSize{8};
const int64_t maxBytes = -100;
// Call persistentShuffleBatch with maxBytes < 0 and verify behavior
ASSERT_THROW(
persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes),
std::invalid_argument);
}
TEST_F(ShuffleTest, persistentShuffleBatch_maxBytesVeryLarge) {
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();
const uint32_t numPartitions = 1;
const uint32_t partition = 0;
const size_t numRows{10};
std::vector<std::string> values(numRows, "test");
std::vector<std::string_view> views(numRows, "test");
const size_t rowSize{8};
const int64_t maxBytes = std::numeric_limits<int64_t>::max();
// Call persistentShuffleBatch with very large maxBytes and verify no error
ASSERT_NO_THROW(
persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes));
}
TEST_F(ShuffleTest, persistentShuffleBatch_zeroRows) {
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();
const uint32_t numPartitions = 1;
const uint32_t partition = 0;
const size_t numRows{0};
std::vector<std::string> values;
std::vector<std::string_view> views;
const size_t rowSize{8};
const int64_t maxBytes = 1024;
// Call persistentShuffleBatch with zero rows and verify no error
ASSERT_NO_THROW(
persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes));
}
TEST_F(ShuffleTest, persistentShuffleBatch_emptyPartitions) {
auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
auto rootPath = rootDirectory->getPath();
const uint32_t numPartitions = 0;
const uint32_t partition = 0;
const size_t numRows{10};
std::vector<std::string> values(numRows, "test");
std::vector<std::string_view> views(numRows, "test");
const size_t rowSize{8};
const int64_t maxBytes = 1024;
// Call persistentShuffleBatch with zero partitions and verify error or no-op
ASSERT_THROW(
persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes),
std::invalid_argument);
}
```
- You may need to adjust the exception type (`std::invalid_argument`) to match the actual error thrown by `persistentShuffleBatch` in your implementation.
- If `persistentShuffleBatch` does not currently throw for these error cases, you should update its implementation to do so.
- If your test framework uses a different macro for exception checking, replace `ASSERT_THROW` and `ASSERT_NO_THROW` accordingly.
- If the function signature for `persistentShuffleBatch` is different, adjust the arguments as needed.
</issue_to_address>
### Comment 3
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1289-1292` </location>
<code_context>
+
+ reader->noMoreData(true);
+
+ // Verify we read all rows.
+ ASSERT_EQ(totalRows, numRows);
+ // ASSERT_EQ(numBatches, numRows);
+ // Verify number of output batches.
+ ASSERT_EQ(numOutputCalls, testData.expectedOutputCalls);
+ }
</code_context>
<issue_to_address>
**suggestion (testing):** Test assertions could be more comprehensive.
Consider adding assertions to verify that batch sizes do not exceed maxBytes and that the row contents match the expected data. This will better validate the batching logic.
Suggested implementation:
```cpp
reader->noMoreData(true);
// Verify we read all rows.
ASSERT_EQ(totalRows, numRows);
// ASSERT_EQ(numBatches, numRows);
// Verify number of output batches.
ASSERT_EQ(numOutputCalls, testData.expectedOutputCalls);
// Additional assertions for batch size and row contents.
for (const auto& batch : batches) {
// Assert batch size does not exceed maxBytes.
ASSERT_LE(batch->data->size(), maxBytes);
// Assert row contents match expected data.
for (size_t i = 0; i < batch->rows.size(); ++i) {
ASSERT_EQ(batch->rows[i], testData.expectedRows[batch->rowIndices[i]]);
}
}
}
```
- Ensure that `testData.expectedRows` and `batch->rowIndices` are available and correctly populated in your test setup. If `rowIndices` is not present, you may need to adjust the row content assertion to match your data structure.
- If batches is not in scope at this point, you may need to collect all batches in a vector during the batching loop for these assertions.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
…stodb#26476) Summary: Pull Request resolved: prestodb#26476 Differential Revision: D85830101
13ab356 to
35fa5c8
Compare
tanjialiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks LGTM
Differential Revision: D85830101