Skip to content

Conversation

@xiaoxmeng
Copy link
Contributor

Differential Revision: D85830101

@xiaoxmeng xiaoxmeng requested review from a team as code owners October 30, 2025 07:05
@prestodb-ci prestodb-ci added the from:Meta PR from Meta label Oct 30, 2025
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Oct 30, 2025

Reviewer's Guide

This PR refactors the shuffle reading API to fetch data under a maximum bytes threshold rather than by batch count, updating the core interface, reader implementations, exchange source logic, and associated tests to support byte-based reading.

Sequence diagram for byte-based shuffle data fetching

sequenceDiagram
participant "ShuffleExchangeSource"
participant "LocalShuffleReader"
participant "FileSystem"
participant "ReadBatch"

"ShuffleExchangeSource"->>"LocalShuffleReader": next(maxBytes)
loop While totalBytes < maxBytes and files remain
    "LocalShuffleReader"->>"FileSystem": openFileForRead(filename)
    "LocalShuffleReader"->>"FileSystem": pread(0, fileSize, buffer)
    "LocalShuffleReader"->>"ReadBatch": Parse buffer into rows
    "LocalShuffleReader"->>"ShuffleExchangeSource": Add ReadBatch to result
end
"LocalShuffleReader"-->>"ShuffleExchangeSource": vector<ReadBatch>
Loading

Class diagram for updated ShuffleReader and LocalShuffleReader interfaces

classDiagram
class ShuffleReader {
  <<interface>>
  +next(uint64_t maxBytes) : SemiFuture<vector<unique_ptr<ReadBatch>>>
  +noMoreData(bool success)
}
class LocalShuffleReader {
  +next(uint64_t maxBytes) : SemiFuture<vector<unique_ptr<ReadBatch>>>
  +noMoreData(bool success)
}
ShuffleReader <|-- LocalShuffleReader
Loading

File-Level Changes

Change Details Files
Update ShuffleReader API to use a byte limit
  • Change next() signature to accept uint64_t maxBytes instead of size_t numBatches
  • Add documentation comments for maxBytes parameter
  • Update override declarations in LocalShuffle.h
ShuffleInterface.h
LocalShuffle.h
Refactor LocalShuffleReader.next to enforce maxBytes limit
  • Replace fixed-count loop with while loop that checks totalBytes against maxBytes
  • Track and accumulate file sizes in totalBytes, allowing at least one file to be read
  • Break when adding another file would exceed maxBytes
LocalShuffle.cpp
Propagate maxBytes through ShuffleExchangeSource
  • Modify request() signature to accept maxBytes
  • Capture maxBytes in the nextBatch lambda
  • Invoke shuffleReader_->next(maxBytes) instead of next(1)
ShuffleExchangeSource.cpp
Adapt and extend tests for byte-based fetching
  • Enforce maxBytes>0 in TestShuffleReader.next and accumulate batches by data size
  • Remove duplicate registerLocalFileSystem calls, centralizing it in SetUp
  • Add persistentShuffleBatch test with multiple maxBytes scenarios and expected output validations
ShuffleTest.cpp

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • In TestShuffleReader::next you currently push one batch unconditionally then loop until maxBytes; consider merging these into a single loop that accumulates bytes to simplify the logic.
  • The new persistentShuffleBatch test still has a commented-out ASSERT_EQ(numBatches, numRows); either remove that dead code or update the assertion to reflect the intended behavior.
  • ShuffleExchangeSource::request ignores the maxWait parameter—either implement a wait/backoff for maxWait or remove the unused argument to avoid confusion.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In TestShuffleReader::next you currently push one batch unconditionally then loop until maxBytes; consider merging these into a single loop that accumulates bytes to simplify the logic.
- The new persistentShuffleBatch test still has a commented-out ASSERT_EQ(numBatches, numRows); either remove that dead code or update the assertion to reflect the intended behavior.
- ShuffleExchangeSource::request ignores the maxWait parameter—either implement a wait/backoff for maxWait or remove the unused argument to avoid confusion.

## Individual Comments

### Comment 1
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:175` </location>
<code_context>
-  batches.reserve(numBatches);
+  uint64_t totalBytes{0};
+  // Read files until we reach maxBytes limit or run out of files.
+  while (readPartitionFileIndex_ < readPartitionFiles_.size()) {
+    const auto filename = readPartitionFiles_[readPartitionFileIndex_];
+    auto file = fileSystem_->openFileForRead(filename);
</code_context>

<issue_to_address>
**suggestion:** Consider handling the case where a single file exceeds maxBytes.

If a partition file exceeds maxBytes and batches is empty, the code will load the entire file, potentially causing high memory usage. Consider splitting large files or enforcing a stricter limit.

Suggested implementation:

```cpp
  while (readPartitionFileIndex_ < readPartitionFiles_.size()) {
    const auto filename = readPartitionFiles_[readPartitionFileIndex_];
    auto file = fileSystem_->openFileForRead(filename);
    const auto fileSize = file->size();

    // If the file is larger than maxBytes and batches is empty, only read up to maxBytes.
    if (fileSize > maxBytes && batches.empty()) {
      // Read only maxBytes from the file.
      auto batch = std::make_unique<ReadBatch>();
      batch->readFromFile(file, /*offset*/0, /*length*/maxBytes);
      batches.push_back(std::move(batch));
      totalBytes += maxBytes;
      // If we didn't finish the file, update the offset for next time.
      readPartitionFileOffsets_[readPartitionFileIndex_] = maxBytes;
      // If the file is not fully read, don't advance the file index.
      if (maxBytes < fileSize) {
        break;
      } else {
        ++readPartitionFileIndex_;
      }
      continue;
    }

    // Stop if adding this file would exceed maxBytes (unless we haven't read
    // any files yet)
    if (!batches.empty() && totalBytes + fileSize > maxBytes) {
      break;
    }

```

1. You will need to add and manage a `readPartitionFileOffsets_` vector or map to track the current read offset for each file, so that subsequent calls to `next()` can continue reading from where the previous call left off.
2. Implement the `ReadBatch::readFromFile(file, offset, length)` method if it does not already exist, or adapt the batch reading logic to support partial reads.
3. Ensure that the logic for advancing `readPartitionFileIndex_` and handling offsets is consistent throughout the function.
</issue_to_address>

### Comment 2
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1213-1210` </location>
<code_context>
+TEST_F(ShuffleTest, persistentShuffleBatch) {
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding tests for error conditions and edge cases in persistentShuffleBatch.

Please add tests for maxBytes = 0, negative, and very large values, as well as cases with zero rows or empty partitions, to improve coverage of edge conditions.

Suggested implementation:

```cpp
TEST_F(ShuffleTest, persistentShuffleBatch) {
  auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
  auto rootPath = rootDirectory->getPath();

  const uint32_t numPartitions = 1;
  const uint32_t partition = 0;

  const size_t numRows{20};
  std::vector<std::string> values{numRows};
  std::vector<std::string_view> views{numRows};
  const size_t rowSize{64};

  // Normal case (already present)
  // ... existing test logic ...

}

TEST_F(ShuffleTest, persistentShuffleBatch_maxBytesZero) {
  auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
  auto rootPath = rootDirectory->getPath();

  const uint32_t numPartitions = 1;
  const uint32_t partition = 0;
  const size_t numRows{10};
  std::vector<std::string> values(numRows, "test");
  std::vector<std::string_view> views(numRows, "test");
  const size_t rowSize{8};
  const int64_t maxBytes = 0;

  // Call persistentShuffleBatch with maxBytes = 0 and verify behavior
  ASSERT_THROW(
      persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes),
      std::invalid_argument);
}

TEST_F(ShuffleTest, persistentShuffleBatch_maxBytesNegative) {
  auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
  auto rootPath = rootDirectory->getPath();

  const uint32_t numPartitions = 1;
  const uint32_t partition = 0;
  const size_t numRows{10};
  std::vector<std::string> values(numRows, "test");
  std::vector<std::string_view> views(numRows, "test");
  const size_t rowSize{8};
  const int64_t maxBytes = -100;

  // Call persistentShuffleBatch with maxBytes < 0 and verify behavior
  ASSERT_THROW(
      persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes),
      std::invalid_argument);
}

TEST_F(ShuffleTest, persistentShuffleBatch_maxBytesVeryLarge) {
  auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
  auto rootPath = rootDirectory->getPath();

  const uint32_t numPartitions = 1;
  const uint32_t partition = 0;
  const size_t numRows{10};
  std::vector<std::string> values(numRows, "test");
  std::vector<std::string_view> views(numRows, "test");
  const size_t rowSize{8};
  const int64_t maxBytes = std::numeric_limits<int64_t>::max();

  // Call persistentShuffleBatch with very large maxBytes and verify no error
  ASSERT_NO_THROW(
      persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes));
}

TEST_F(ShuffleTest, persistentShuffleBatch_zeroRows) {
  auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
  auto rootPath = rootDirectory->getPath();

  const uint32_t numPartitions = 1;
  const uint32_t partition = 0;
  const size_t numRows{0};
  std::vector<std::string> values;
  std::vector<std::string_view> views;
  const size_t rowSize{8};
  const int64_t maxBytes = 1024;

  // Call persistentShuffleBatch with zero rows and verify no error
  ASSERT_NO_THROW(
      persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes));
}

TEST_F(ShuffleTest, persistentShuffleBatch_emptyPartitions) {
  auto rootDirectory = velox::exec::test::TempDirectoryPath::create();
  auto rootPath = rootDirectory->getPath();

  const uint32_t numPartitions = 0;
  const uint32_t partition = 0;
  const size_t numRows{10};
  std::vector<std::string> values(numRows, "test");
  std::vector<std::string_view> views(numRows, "test");
  const size_t rowSize{8};
  const int64_t maxBytes = 1024;

  // Call persistentShuffleBatch with zero partitions and verify error or no-op
  ASSERT_THROW(
      persistentShuffleBatch(rootPath, partition, views, rowSize, maxBytes),
      std::invalid_argument);
}

```

- You may need to adjust the exception type (`std::invalid_argument`) to match the actual error thrown by `persistentShuffleBatch` in your implementation.
- If `persistentShuffleBatch` does not currently throw for these error cases, you should update its implementation to do so.
- If your test framework uses a different macro for exception checking, replace `ASSERT_THROW` and `ASSERT_NO_THROW` accordingly.
- If the function signature for `persistentShuffleBatch` is different, adjust the arguments as needed.
</issue_to_address>

### Comment 3
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1289-1292` </location>
<code_context>
+
+    reader->noMoreData(true);
+
+    // Verify we read all rows.
+    ASSERT_EQ(totalRows, numRows);
+    // ASSERT_EQ(numBatches, numRows);
+    //  Verify number of output batches.
+    ASSERT_EQ(numOutputCalls, testData.expectedOutputCalls);
+  }
</code_context>

<issue_to_address>
**suggestion (testing):** Test assertions could be more comprehensive.

Consider adding assertions to verify that batch sizes do not exceed maxBytes and that the row contents match the expected data. This will better validate the batching logic.

Suggested implementation:

```cpp
    reader->noMoreData(true);

    // Verify we read all rows.
    ASSERT_EQ(totalRows, numRows);
    // ASSERT_EQ(numBatches, numRows);
    //  Verify number of output batches.
    ASSERT_EQ(numOutputCalls, testData.expectedOutputCalls);

    // Additional assertions for batch size and row contents.
    for (const auto& batch : batches) {
      // Assert batch size does not exceed maxBytes.
      ASSERT_LE(batch->data->size(), maxBytes);

      // Assert row contents match expected data.
      for (size_t i = 0; i < batch->rows.size(); ++i) {
        ASSERT_EQ(batch->rows[i], testData.expectedRows[batch->rowIndices[i]]);
      }
    }
  }

```

- Ensure that `testData.expectedRows` and `batch->rowIndices` are available and correctly populated in your test setup. If `rowIndices` is not present, you may need to adjust the row content assertion to match your data structure.
- If batches is not in scope at this point, you may need to collect all batches in a vector during the batching loop for these assertions.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

…stodb#26476)

Summary: Pull Request resolved: prestodb#26476

Differential Revision: D85830101
@xiaoxmeng xiaoxmeng changed the title [sv-cosco]perf: Support fetch multiple blocks based on max bytes perf: Support fetch multiple blocks based on max bytes Oct 30, 2025
Copy link
Contributor

@tanjialiang tanjialiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks LGTM

@xiaoxmeng xiaoxmeng merged commit edaed38 into prestodb:master Oct 30, 2025
82 of 85 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants