Skip to content

Commit d22bdc7

Browse files
duxiao1212facebook-github-bot
authored andcommitted
misc: Refactor ShuffleTest (prestodb#26643)
Summary: 1. Refactored test infrastructure - Removed TestShuffleWriter/TestShuffleReader and migrated tests to use LocalShuffle exclusively 2. Reorganized test code - Consolidated helper functions and restructured test cases (move the PartitionAndSerlized tests above the shuffleEndToEnd) 3. Added serialize() methods - Added serialization methods to complement localShuffleInfo 4. Added test injection points - Added TestValue::adjust calls in LocalShuffleWriter::collect and LocalShuffleReader::next Differential Revision: D87160791
1 parent c07cdb5 commit d22bdc7

File tree

3 files changed

+883
-1056
lines changed

3 files changed

+883
-1056
lines changed

presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,16 @@ inline std::string createShuffleFileName(
212212
}
213213
} // namespace
214214

215+
std::string LocalShuffleWriteInfo::serialize() const {
216+
json obj;
217+
obj["rootPath"] = rootPath;
218+
obj["queryId"] = queryId;
219+
obj["shuffleId"] = shuffleId;
220+
obj["numPartitions"] = numPartitions;
221+
obj["sortedShuffle"] = sortedShuffle;
222+
return obj.dump();
223+
}
224+
215225
LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize(
216226
const std::string& info) {
217227
const auto jsonReadInfo = json::parse(info);
@@ -224,6 +234,15 @@ LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize(
224234
return shuffleInfo;
225235
}
226236

237+
std::string LocalShuffleReadInfo::serialize() const {
238+
json obj;
239+
obj["rootPath"] = rootPath;
240+
obj["queryId"] = queryId;
241+
obj["partitionIds"] = partitionIds;
242+
obj["sortedShuffle"] = sortedShuffle;
243+
return obj.dump();
244+
}
245+
227246
LocalShuffleReadInfo LocalShuffleReadInfo::deserialize(
228247
const std::string& info) {
229248
const auto jsonReadInfo = json::parse(info);
@@ -351,6 +370,8 @@ void LocalShuffleWriter::collect(
351370
sortedShuffle_ || key.empty(),
352371
"key '{}' must be empty for non-sorted shuffle",
353372
key);
373+
velox::common::testutil::TestValue::adjust(
374+
"facebook::presto::operators::LocalShuffleWriter::collect", this);
354375

355376
const auto rowSize = this->rowSize(key.size(), data.size());
356377
auto& buffer = inProgressPartitions_[partition];
@@ -517,6 +538,8 @@ LocalShuffleReader::next(uint64_t maxBytes) {
517538
VELOX_CHECK(
518539
initialized_,
519540
"LocalShuffleReader::initialize() must be called before next()");
541+
velox::common::testutil::TestValue::adjust(
542+
"facebook::presto::operators::LocalShuffleReader::next", this);
520543

521544
return folly::makeSemiFuture(
522545
sortedShuffle_ ? nextSorted(maxBytes) : nextUnsorted(maxBytes));
@@ -593,13 +616,4 @@ std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter(
593616
writeInfo.sortedShuffle,
594617
pool);
595618
}
596-
597-
// Testing function to expose extractRowMetadata for tests.
598-
// This will be removed after reader changes.
599-
std::vector<RowMetadata> testingExtractRowMetadata(
600-
const char* buffer,
601-
size_t bufferSize,
602-
bool sortedShuffle) {
603-
return extractRowMetadata(buffer, bufferSize, sortedShuffle);
604-
}
605619
} // namespace facebook::presto::operators

presto-native-execution/presto_cpp/main/operators/LocalShuffle.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,6 @@ inline bool compareKeys(std::string_view key1, std::string_view key2) noexcept {
4848
return key1.size() < key2.size();
4949
}
5050

51-
// Testing function to expose extractRowMetadata for tests.
52-
std::vector<RowMetadata> testingExtractRowMetadata(
53-
const char* buffer,
54-
size_t bufferSize,
55-
bool sortedShuffle);
56-
5751
// LocalShuffleWriteInfo is used for containing shuffle write information.
5852
// This struct is a 1:1 strict API mapping to
5953
// presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkLocalShuffleWriteInfo.java
@@ -67,11 +61,13 @@ struct LocalShuffleWriteInfo {
6761
uint32_t shuffleId;
6862
bool sortedShuffle;
6963

64+
/// Serializes shuffle information to JSON format.
65+
std::string serialize() const;
66+
7067
/// Deserializes shuffle information that is used by LocalPersistentShuffle.
7168
/// Structures are assumed to be encoded in JSON format.
7269
static LocalShuffleWriteInfo deserialize(const std::string& info);
7370
};
74-
7571
// LocalShuffleReadInfo is used for containing shuffle read metadata
7672
// This struct is a 1:1 strict API mapping to
7773
// presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkLocalShuffleReadInfo.java.
@@ -84,8 +80,9 @@ struct LocalShuffleReadInfo {
8480
std::vector<std::string> partitionIds;
8581
bool sortedShuffle;
8682

87-
/// Deserializes shuffle information that is used by LocalPersistentShuffle.
88-
/// Structures are assumed to be encoded in JSON format.
83+
/// Serializes shuffle information to JSON format.
84+
std::string serialize() const;
85+
8986
static LocalShuffleReadInfo deserialize(const std::string& info);
9087
};
9188

0 commit comments

Comments
 (0)