Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ inline std::string createShuffleFileName(
}
} // namespace

std::string LocalShuffleWriteInfo::serialize() const {
json obj;
obj["rootPath"] = rootPath;
obj["queryId"] = queryId;
obj["shuffleId"] = shuffleId;
obj["numPartitions"] = numPartitions;
obj["sortedShuffle"] = sortedShuffle;
return obj.dump();
}

LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize(
const std::string& info) {
const auto jsonReadInfo = json::parse(info);
Expand All @@ -224,6 +234,15 @@ LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize(
return shuffleInfo;
}

std::string LocalShuffleReadInfo::serialize() const {
json obj;
obj["rootPath"] = rootPath;
obj["queryId"] = queryId;
obj["partitionIds"] = partitionIds;
obj["sortedShuffle"] = sortedShuffle;
return obj.dump();
}

LocalShuffleReadInfo LocalShuffleReadInfo::deserialize(
const std::string& info) {
const auto jsonReadInfo = json::parse(info);
Expand Down Expand Up @@ -351,6 +370,8 @@ void LocalShuffleWriter::collect(
sortedShuffle_ || key.empty(),
"key '{}' must be empty for non-sorted shuffle",
key);
velox::common::testutil::TestValue::adjust(
"facebook::presto::operators::LocalShuffleWriter::collect", this);

const auto rowSize = this->rowSize(key.size(), data.size());
auto& buffer = inProgressPartitions_[partition];
Expand Down Expand Up @@ -517,6 +538,8 @@ LocalShuffleReader::next(uint64_t maxBytes) {
VELOX_CHECK(
initialized_,
"LocalShuffleReader::initialize() must be called before next()");
velox::common::testutil::TestValue::adjust(
"facebook::presto::operators::LocalShuffleReader::next", this);

return folly::makeSemiFuture(
sortedShuffle_ ? nextSorted(maxBytes) : nextUnsorted(maxBytes));
Expand Down Expand Up @@ -593,13 +616,4 @@ std::shared_ptr<ShuffleWriter> LocalPersistentShuffleFactory::createWriter(
writeInfo.sortedShuffle,
pool);
}

// Testing function to expose extractRowMetadata for tests.
// This will be removed after reader changes.
std::vector<RowMetadata> testingExtractRowMetadata(
const char* buffer,
size_t bufferSize,
bool sortedShuffle) {
return extractRowMetadata(buffer, bufferSize, sortedShuffle);
}
} // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ inline bool compareKeys(std::string_view key1, std::string_view key2) noexcept {
return key1.size() < key2.size();
}

// Testing function to expose extractRowMetadata for tests.
std::vector<RowMetadata> testingExtractRowMetadata(
const char* buffer,
size_t bufferSize,
bool sortedShuffle);

// LocalShuffleWriteInfo is used for containing shuffle write information.
// This struct is a 1:1 strict API mapping to
// presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkLocalShuffleWriteInfo.java
Expand All @@ -67,11 +61,13 @@ struct LocalShuffleWriteInfo {
uint32_t shuffleId;
bool sortedShuffle;

/// Serializes shuffle information to JSON format.
std::string serialize() const;

/// Deserializes shuffle information that is used by LocalPersistentShuffle.
/// Structures are assumed to be encoded in JSON format.
static LocalShuffleWriteInfo deserialize(const std::string& info);
};

// LocalShuffleReadInfo is used for containing shuffle read metadata
// This struct is a 1:1 strict API mapping to
// presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkLocalShuffleReadInfo.java.
Expand All @@ -84,8 +80,9 @@ struct LocalShuffleReadInfo {
std::vector<std::string> partitionIds;
bool sortedShuffle;

/// Deserializes shuffle information that is used by LocalPersistentShuffle.
/// Structures are assumed to be encoded in JSON format.
/// Serializes shuffle information to JSON format.
std::string serialize() const;

static LocalShuffleReadInfo deserialize(const std::string& info);
};

Expand Down
Loading
Loading