Skip to content

Commit 7946693

Browse files
duxiao1212facebook-github-bot
authored andcommitted
misc: Refactor ShuffleTest (prestodb#26643)
Summary: Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class Differential Revision: D87160791
1 parent 5439413 commit 7946693

File tree

3 files changed

+875
-1034
lines changed

3 files changed

+875
-1034
lines changed

presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,16 @@ inline std::string createShuffleFileName(
218218
}
219219
} // namespace
220220

221+
std::string LocalShuffleWriteInfo::serialize() const {
222+
json obj;
223+
obj["rootPath"] = rootPath;
224+
obj["queryId"] = queryId;
225+
obj["shuffleId"] = shuffleId;
226+
obj["numPartitions"] = numPartitions;
227+
obj["sortedShuffle"] = sortedShuffle;
228+
return obj.dump();
229+
}
230+
221231
LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize(
222232
const std::string& info) {
223233
const auto jsonReadInfo = json::parse(info);
@@ -230,6 +240,15 @@ LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize(
230240
return shuffleInfo;
231241
}
232242

243+
std::string LocalShuffleReadInfo::serialize() const {
244+
json obj;
245+
obj["rootPath"] = rootPath;
246+
obj["queryId"] = queryId;
247+
obj["partitionIds"] = partitionIds;
248+
obj["sortedShuffle"] = sortedShuffle;
249+
return obj.dump();
250+
}
251+
233252
LocalShuffleReadInfo LocalShuffleReadInfo::deserialize(
234253
const std::string& info) {
235254
const auto jsonReadInfo = json::parse(info);
@@ -357,6 +376,8 @@ void LocalShuffleWriter::collect(
357376
sortedShuffle_ || key.empty(),
358377
"key '{}' must be empty for non-sorted shuffle",
359378
key);
379+
velox::common::testutil::TestValue::adjust(
380+
"facebook::presto::operators::LocalShuffleWriter::collect", this);
360381

361382
const auto rowSize = this->rowSize(key.size(), data.size());
362383
auto& buffer = inProgressPartitions_[partition];
@@ -521,6 +542,8 @@ LocalShuffleReader::next(uint64_t maxBytes) {
521542
VELOX_CHECK(
522543
initialized_,
523544
"LocalShuffleReader::initialize() must be called before next()");
545+
velox::common::testutil::TestValue::adjust(
546+
"facebook::presto::operators::LocalShuffleReader::next", this);
524547

525548
return folly::makeSemiFuture(
526549
sortedShuffle_ ? nextSorted(maxBytes) : nextUnsorted(maxBytes));
@@ -597,13 +620,4 @@ std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter(
597620
writeInfo.sortedShuffle,
598621
pool);
599622
}
600-
601-
// Testing function to expose extractRowMetadata for tests.
602-
// This will be removed after reader changes.
603-
std::vector<RowMetadata> testingExtractRowMetadata(
604-
const char* buffer,
605-
size_t bufferSize,
606-
bool sortedShuffle) {
607-
return extractRowMetadata(buffer, bufferSize, sortedShuffle);
608-
}
609623
} // namespace facebook::presto::operators

presto-native-execution/presto_cpp/main/operators/LocalShuffle.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,6 @@ inline bool compareKeys(std::string_view key1, std::string_view key2) noexcept {
5353
return key1.size() < key2.size();
5454
}
5555

56-
// Testing function to expose extractRowMetadata for tests.
57-
std::vector<RowMetadata> testingExtractRowMetadata(
58-
const char* buffer,
59-
size_t bufferSize,
60-
bool sortedShuffle);
61-
6256
// LocalShuffleWriteInfo is used for containing shuffle write information.
6357
// This struct is a 1:1 strict API mapping to
6458
// presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkLocalShuffleWriteInfo.java
@@ -72,11 +66,13 @@ struct LocalShuffleWriteInfo {
7266
uint32_t shuffleId;
7367
bool sortedShuffle;
7468

69+
/// Serializes shuffle information to JSON format.
70+
std::string serialize() const;
71+
7572
/// Deserializes shuffle information that is used by LocalPersistentShuffle.
7673
/// Structures are assumed to be encoded in JSON format.
7774
static LocalShuffleWriteInfo deserialize(const std::string& info);
7875
};
79-
8076
// LocalShuffleReadInfo is used for containing shuffle read metadata
8177
// This struct is a 1:1 strict API mapping to
8278
// presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkLocalShuffleReadInfo.java.
@@ -89,8 +85,9 @@ struct LocalShuffleReadInfo {
8985
std::vector<std::string> partitionIds;
9086
bool sortedShuffle;
9187

92-
/// Deserializes shuffle information that is used by LocalPersistentShuffle.
93-
/// Structures are assumed to be encoded in JSON format.
88+
/// Serializes shuffle information to JSON format.
89+
std::string serialize() const;
90+
9491
static LocalShuffleReadInfo deserialize(const std::string& info);
9592
};
9693

0 commit comments

Comments
 (0)