diff --git a/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp index 05b79a00799b..16a8a972d785 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp +++ b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp @@ -212,6 +212,16 @@ inline std::string createShuffleFileName( } } // namespace +std::string LocalShuffleWriteInfo::serialize() const { + json obj; + obj["rootPath"] = rootPath; + obj["queryId"] = queryId; + obj["shuffleId"] = shuffleId; + obj["numPartitions"] = numPartitions; + obj["sortedShuffle"] = sortedShuffle; + return obj.dump(); +} + LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize( const std::string& info) { const auto jsonReadInfo = json::parse(info); @@ -224,6 +234,15 @@ LocalShuffleWriteInfo LocalShuffleWriteInfo::deserialize( return shuffleInfo; } +std::string LocalShuffleReadInfo::serialize() const { + json obj; + obj["rootPath"] = rootPath; + obj["queryId"] = queryId; + obj["partitionIds"] = partitionIds; + obj["sortedShuffle"] = sortedShuffle; + return obj.dump(); +} + LocalShuffleReadInfo LocalShuffleReadInfo::deserialize( const std::string& info) { const auto jsonReadInfo = json::parse(info); @@ -351,6 +370,8 @@ void LocalShuffleWriter::collect( sortedShuffle_ || key.empty(), "key '{}' must be empty for non-sorted shuffle", key); + velox::common::testutil::TestValue::adjust( + "facebook::presto::operators::LocalShuffleWriter::collect", this); const auto rowSize = this->rowSize(key.size(), data.size()); auto& buffer = inProgressPartitions_[partition]; @@ -517,6 +538,8 @@ LocalShuffleReader::next(uint64_t maxBytes) { VELOX_CHECK( initialized_, "LocalShuffleReader::initialize() must be called before next()"); + velox::common::testutil::TestValue::adjust( + "facebook::presto::operators::LocalShuffleReader::next", this); return folly::makeSemiFuture( sortedShuffle_ ? nextSorted(maxBytes) : nextUnsorted(maxBytes)); @@ -593,13 +616,4 @@ std::shared_ptr LocalPersistentShuffleFactory::createWriter( writeInfo.sortedShuffle, pool); } - -// Testing function to expose extractRowMetadata for tests. -// This will be removed after reader changes. -std::vector testingExtractRowMetadata( - const char* buffer, - size_t bufferSize, - bool sortedShuffle) { - return extractRowMetadata(buffer, bufferSize, sortedShuffle); -} } // namespace facebook::presto::operators diff --git a/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h index 30bc1bd14a65..8d362c8d8c77 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h +++ b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h @@ -48,12 +48,6 @@ inline bool compareKeys(std::string_view key1, std::string_view key2) noexcept { return key1.size() < key2.size(); } -// Testing function to expose extractRowMetadata for tests. -std::vector testingExtractRowMetadata( - const char* buffer, - size_t bufferSize, - bool sortedShuffle); - // LocalShuffleWriteInfo is used for containing shuffle write information. // This struct is a 1:1 strict API mapping to // presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkLocalShuffleWriteInfo.java @@ -67,11 +61,13 @@ struct LocalShuffleWriteInfo { uint32_t shuffleId; bool sortedShuffle; + /// Serializes shuffle information to JSON format. + std::string serialize() const; + /// Deserializes shuffle information that is used by LocalPersistentShuffle. /// Structures are assumed to be encoded in JSON format. static LocalShuffleWriteInfo deserialize(const std::string& info); }; - // LocalShuffleReadInfo is used for containing shuffle read metadata // This struct is a 1:1 strict API mapping to // presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkLocalShuffleReadInfo.java. @@ -84,8 +80,9 @@ struct LocalShuffleReadInfo { std::vector partitionIds; bool sortedShuffle; - /// Deserializes shuffle information that is used by LocalPersistentShuffle. - /// Structures are assumed to be encoded in JSON format. + /// Serializes shuffle information to JSON format. + std::string serialize() const; + static LocalShuffleReadInfo deserialize(const std::string& info); }; diff --git a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp index 02fa24e6deb4..9ea336bd08c1 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp @@ -14,6 +14,10 @@ #include #include "folly/init/Init.h" +#include +#include +#include + #include "presto_cpp/external/json/nlohmann/json.hpp" #include "presto_cpp/main/operators/LocalShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" @@ -21,6 +25,7 @@ #include "presto_cpp/main/operators/ShuffleRead.h" #include "presto_cpp/main/operators/ShuffleWrite.h" #include "presto_cpp/main/operators/tests/PlanBuilder.h" + #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/testutil/TestValue.h" #include "velox/exec/Exchange.h" @@ -39,10 +44,51 @@ using namespace facebook::presto::operators; using namespace ::testing; namespace facebook::presto::operators::test { - namespace { -static const uint64_t kFakeBackgroundCpuTimeNanos = 123000000; +std::string makeTaskId( + const std::string& prefix, + int num, + const std::string& shuffleInfo = "") { + auto url = fmt::format("batch://{}-{}", prefix, num); + if (shuffleInfo.empty()) { + return url; + } + return url + "?shuffleInfo=" + shuffleInfo; +} + +vector_size_t countNulls(const VectorPtr& vector) { + vector_size_t numNulls = 0; + for (auto i = 0; i < vector->size(); ++i) { + if (vector->isNullAt(i)) { + ++numNulls; + } + } + return numNulls; +} + +void cleanupDirectory(const std::string& rootPath) { + auto fileSystem = velox::filesystems::getFileSystem(rootPath, nullptr); + auto files = fileSystem->list(rootPath); + for (auto& file : files) { + fileSystem->remove(file); + } +} + +RowTypePtr createSerdeLayoutType( + const RowTypePtr& originalType, + const std::vector& layout) { + std::vector names; + std::vector types; + for (const auto& name : layout) { + auto idx = originalType->getChildIdx(name); + names.push_back(name); + types.push_back(originalType->childAt(idx)); + } + return ROW(std::move(names), std::move(types)); +} + +using json = nlohmann::json; struct TestShuffleInfo { uint32_t numPartitions; @@ -60,222 +106,14 @@ struct TestShuffleInfo { std::vector getSortOrder(const std::vector& keys) { std::vector indices(keys.size()); - std::iota(indices.begin(), indices.end(), 0); - std::sort(indices.begin(), indices.end(), [&keys](int a, int b) { - return compareKeys(keys[a], keys[b]); - }); + boost::algorithm::iota(indices, 0); + boost::range::sort( + indices, [&keys](int a, int b) { return compareKeys(keys[a], keys[b]); }); return indices; } -class TestShuffleWriter : public ShuffleWriter { - public: - TestShuffleWriter( - memory::MemoryPool* pool, - uint32_t numPartitions, - uint32_t maxBytesPerPartition, - uint32_t maxKeyBytes = 1024) // 1KB - : pool_(pool), - numPartitions_(numPartitions), - maxBytesPerPartition_(maxBytesPerPartition), - maxKeyBytes_(maxKeyBytes), - inProgressSizes_(numPartitions, 0), - readyPartitions_( - std::make_shared< - std::vector>>>()), - serializedSortKeys_( - std::make_shared>>()) { - inProgressPartitions_.resize(numPartitions_); - readyPartitions_->resize(numPartitions_); - serializedSortKeys_->resize(numPartitions_); - } - - void initialize(velox::memory::MemoryPool* pool) { - if (pool_ == nullptr) { - pool_ = pool; - } - } - - void collect(int32_t partition, std::string_view key, std::string_view data) - override { - TestValue::adjust( - "facebook::presto::operators::test::TestShuffleWriter::collect", this); - - auto& readBatch = inProgressPartitions_[partition]; - TRowSize rowSize = data.size(); - auto size = sizeof(TRowSize) + rowSize; - - // Check if there is enough space in the buffer. - if (readBatch && - inProgressSizes_[partition] + size >= maxBytesPerPartition_) { - readBatch->data->setSize(inProgressSizes_[partition]); - (*readyPartitions_)[partition].emplace_back(std::move(readBatch)); - inProgressPartitions_[partition].reset(); - } - - // Allocate buffer if needed. - if (readBatch == nullptr) { - auto buffer = AlignedBuffer::allocate(maxBytesPerPartition_, pool_); - VELOX_CHECK_NOT_NULL(buffer); - readBatch = std::make_unique( - std::vector{}, std::move(buffer)); - inProgressPartitions_[partition] = std::move(readBatch); - inProgressSizes_[partition] = 0; - } - - // Copy data. - auto offset = inProgressSizes_[partition]; - auto* rawBuffer = readBatch->data->asMutable() + offset; - - *(TRowSize*)(rawBuffer) = folly::Endian::big(rowSize); - ::memcpy(rawBuffer + sizeof(TRowSize), data.data(), rowSize); - readBatch->rows.push_back( - std::string_view(rawBuffer + sizeof(TRowSize), rowSize)); - inProgressSizes_[partition] += size; - - if (!key.empty()) { - serializedSortKeys_->at(partition).emplace_back(key); - } - } - - void noMoreData(bool success) override { - VELOX_CHECK(success, "Unexpected error"); - // Flush in-progress buffers. - for (auto i = 0; i < numPartitions_; ++i) { - if (inProgressSizes_[i] > 0) { - auto& readBatch = inProgressPartitions_[i]; - readBatch->data->setSize(inProgressSizes_[i]); - (*readyPartitions_)[i].emplace_back(std::move(readBatch)); - inProgressPartitions_[i].reset(); - } - } - } - - folly::F14FastMap stats() const override { - return { - {"test-shuffle.write", 1002}, - {exec::Operator::kBackgroundCpuTimeNanos, kFakeBackgroundCpuTimeNanos}}; - } - - std::shared_ptr>>>& - readyPartitions() { - return readyPartitions_; - } - - std::shared_ptr>>& serializedSortKeys() { - return serializedSortKeys_; - } - - static void reset() { - getInstance().reset(); - } - - /// Maintains a single shuffle write interface for testing purpose. - static std::shared_ptr& getInstance() { - static std::shared_ptr instance_; - return instance_; - } - - static std::shared_ptr createWriter( - const std::string& serializedShuffleInfo, - velox::memory::MemoryPool* pool) { - std::shared_ptr& instance = getInstance(); - if (instance) { - return instance; - } - TestShuffleInfo writeInfo = - TestShuffleInfo::deserializeShuffleInfo(serializedShuffleInfo); - // We need only one instance for the shuffle since it's in-memory. - instance = std::make_shared( - pool, writeInfo.numPartitions, writeInfo.maxBytesPerPartition); - return instance; - } - - private: - memory::MemoryPool* pool_{nullptr}; - const uint32_t numPartitions_; - const uint32_t maxBytesPerPartition_; - const uint32_t maxKeyBytes_; - - /// Indexed by partition number. Each element represents currently being - /// accumulated buffer by shuffler for a certain partition. Internal layout: - /// | row-size | ..row-payload.. | row-size | ..row-payload.. | .. - std::vector> inProgressPartitions_; - - /// Tracks the total size of each in-progress partition in - /// inProgressPartitions_ - std::vector inProgressSizes_; - std::shared_ptr>>> - readyPartitions_; - std::shared_ptr>> serializedSortKeys_; -}; - -class TestShuffleReader : public ShuffleReader { - public: - TestShuffleReader( - const int32_t partition, - const std::shared_ptr< - std::vector>>>& - readyPartitions) - : partition_(partition), readyPartitions_(readyPartitions) {} - - folly::SemiFuture>> next( - uint64_t maxBytes) override { - VELOX_CHECK_GT(maxBytes, 0, "maxBytes must be greater than 0"); - TestValue::adjust( - "facebook::presto::operators::test::TestShuffleReader::next", this); - std::vector> result; - auto& partitionBatches = (*readyPartitions_)[partition_]; - - if (!partitionBatches.empty()) { - result.push_back(std::move(partitionBatches.back())); - partitionBatches.pop_back(); - } - - for (size_t totalBytes = 0; - totalBytes < maxBytes && !partitionBatches.empty();) { - result.push_back(std::move(partitionBatches.back())); - totalBytes += result.back()->data->size(); - partitionBatches.pop_back(); - } - - return folly::makeSemiFuture(std::move(result)); - } - - void noMoreData(bool success) override { - VELOX_CHECK(success, "Unexpected error"); - } - - folly::F14FastMap stats() const override { - return { - {"test-shuffle.read", 1032}, - {exec::Operator::kBackgroundCpuTimeNanos, kFakeBackgroundCpuTimeNanos}}; - } - - private: - const int32_t partition_; - const std::shared_ptr>>>& - readyPartitions_; -}; - -class TestShuffleFactory : public ShuffleInterfaceFactory { - public: - static constexpr std::string_view kShuffleName = "test-shuffle"; - - std::shared_ptr createReader( - const std::string& /* serializedShuffleInfo */, - const int partition, - velox::memory::MemoryPool* pool) override { - return std::make_shared( - partition, TestShuffleWriter::getInstance()->readyPartitions()); - } - - std::shared_ptr createWriter( - const std::string& serializedShuffleInfo, - velox::memory::MemoryPool* pool) override { - return TestShuffleWriter::createWriter(serializedShuffleInfo, pool); - } -}; - +// Register an ExchangeSource factory that creates ShuffleExchangeSource +// instances. void registerExchangeSource(const std::string& shuffleName) { exec::ExchangeSource::factories().clear(); exec::ExchangeSource::registerFactory( @@ -284,99 +122,84 @@ void registerExchangeSource(const std::string& shuffleName) { int destination, const std::shared_ptr& queue, memory::MemoryPool* pool) -> std::shared_ptr { - if (strncmp(taskId.c_str(), "batch://", 8) == 0) { - auto uri = folly::Uri(taskId); - for (auto& pair : uri.getQueryParams()) { - if (pair.first == "shuffleInfo") { - return std::make_shared( - taskId, - destination, - queue, - ShuffleInterfaceFactory::factory(shuffleName) - ->createReader(pair.second, destination, pool), - pool); - } - } - VELOX_USER_FAIL( - "No shuffle read info provided in taskId. taskId: {}", taskId); + if (!taskId.starts_with("batch://")) { + return nullptr; } - return nullptr; + auto uri = folly::Uri(taskId); + auto queryParams = uri.getQueryParams(); + auto it = boost::range::find_if(queryParams, [](const auto& pair) { + return pair.first == "shuffleInfo"; + }); + EXPECT_NE(it, queryParams.end()) + << "No shuffle read info provided in taskId: " << taskId; + + // Create ShuffleExchangeSource with the shuffle reader + return std::make_shared( + taskId, + destination, + queue, + ShuffleInterfaceFactory::factory(shuffleName) + ->createReader(it->second, destination, pool), + pool); }); } -} // namespace -class ShuffleTest : public exec::test::OperatorTestBase { - public: - std::string testShuffleInfo( - uint32_t numPartitions, - uint32_t maxBytesPerPartition) { - static constexpr std::string_view kTemplate = - "{{\n" - " \"numPartitions\": {},\n" - " \"maxBytesPerPartition\": {}\n" - "}}"; - return fmt::format(kTemplate, numPartitions, maxBytesPerPartition); - } +std::string localShuffleWriteInfo( + const std::string& rootPath, + uint32_t numPartitions, + bool sortedShuffle = false) { + return LocalShuffleWriteInfo{ + .rootPath = rootPath, + .queryId = "query_id", + .numPartitions = numPartitions, + .shuffleId = 0, + .sortedShuffle = sortedShuffle} + .serialize(); +} - std::string localShuffleWriteInfo( - const std::string& rootPath, - uint32_t numPartitions) { - static constexpr std::string_view kTemplate = - "{{\n" - " \"rootPath\": \"{}\",\n" - " \"queryId\": \"query_id\",\n" - " \"shuffleId\": 0,\n" - " \"numPartitions\": {}\n" - "}}"; - return fmt::format(kTemplate, rootPath, numPartitions); - } +std::string localShuffleReadInfo( + const std::string& rootPath, + uint32_t partition, + bool sortedShuffle = false) { + return LocalShuffleReadInfo{ + .rootPath = rootPath, + .queryId = "query_id", + .partitionIds = {fmt::format("shuffle_0_0_{}", partition)}, + .sortedShuffle = sortedShuffle} + .serialize(); +} - std::string localShuffleReadInfo( - const std::string& rootPath, - uint32_t numPartitions, - uint32_t partition) { - static constexpr std::string_view kTemplate = - "{{\n" - " \"rootPath\": \"{}\",\n" - " \"queryId\": \"query_id\",\n" - " \"partitionIds\": [ \"shuffle_0_0_{}\" ]\n" - "}}"; - return fmt::format(kTemplate, rootPath, partition, numPartitions); - } +} // namespace +class ShuffleTest : public exec::test::OperatorTestBase { protected: void SetUp() override { exec::test::OperatorTestBase::SetUp(); - velox::filesystems::registerLocalFileSystem(); - ShuffleInterfaceFactory::registerFactory( - std::string(TestShuffleFactory::kShuffleName), - std::make_unique()); - ShuffleInterfaceFactory::registerFactory( - std::string(LocalPersistentShuffleFactory::kShuffleName), - std::make_unique()); + filesystems::registerLocalFileSystem(); + + // Register shuffle operators once exec::Operator::registerOperator( std::make_unique()); exec::Operator::registerOperator( std::make_unique()); + exec::Operator::registerOperator(std::make_unique()); + + shuffleName_ = LocalPersistentShuffleFactory::kShuffleName; + exec::ExchangeSource::factories().clear(); + ShuffleInterfaceFactory::registerFactory( + std::string(shuffleName_), + std::make_unique()); + registerExchangeSource(std::string(shuffleName_)); + // Create a temporary directory for shuffle files + tempDir_ = exec::test::TempDirectoryPath::create(); } void TearDown() override { - TestShuffleWriter::reset(); exec::test::waitForAllTasksToBeDeleted(); + exec::ExchangeSource::factories().clear(); exec::test::OperatorTestBase::TearDown(); } - static std::string makeTaskId( - const std::string& prefix, - int num, - const std::string& shuffleInfo = "") { - auto url = fmt::format("batch://{}-{}", prefix, num); - if (shuffleInfo.empty()) { - return url; - } - return url + "?shuffleInfo=" + shuffleInfo; - } - std::shared_ptr makeTask( const std::string& taskId, core::PlanNodePtr planNode, @@ -393,33 +216,6 @@ class ShuffleTest : public exec::test::OperatorTestBase { exec::Task::ExecutionMode::kParallel); } - RowVectorPtr deserialize( - const RowVectorPtr& serializedResult, - const RowTypePtr& rowType) { - auto serializedData = - serializedResult->childAt(2)->as>(); - auto* rawValues = serializedData->rawValues(); - - std::vector rows; - rows.reserve(serializedData->size()); - for (auto i = 0; i < serializedData->size(); ++i) { - const auto& serializedRow = rawValues[i]; - rows.push_back( - std::string_view(serializedRow.data(), serializedRow.size())); - } - - return std::dynamic_pointer_cast( - row::CompactRow::deserialize(rows, rowType, pool())); - } - - RowVectorPtr copyResultVector(const RowVectorPtr& result) { - auto vector = std::static_pointer_cast( - BaseVector::create(result->type(), result->size(), pool())); - vector->copy(result.get(), 0, 0, result->size()); - VELOX_CHECK_EQ(vector->size(), result->size()); - return vector; - } - void testPartitionAndSerialize( const RowVectorPtr& data, const VectorPtr& expectedReplicate = nullptr) { @@ -440,7 +236,7 @@ class ShuffleTest : public exec::test::OperatorTestBase { } // Verify 'data'. - auto deserialized = deserialize(results, asRowType(data->type())); + auto deserialized = deserializeResult(results, asRowType(data->type())); velox::test::assertEqualVectors(data, deserialized); // Verify 'replicate' flags. @@ -462,7 +258,7 @@ class ShuffleTest : public exec::test::OperatorTestBase { // Verify that serialized data can be deserialized successfully into the // original data. auto deserialized = - deserialize(serializedResult, asRowType(expected->type())); + deserializeResult(serializedResult, asRowType(expected->type())); if (deserialized != nullptr) { result->append(deserialized.get()); } @@ -599,14 +395,84 @@ class ShuffleTest : public exec::test::OperatorTestBase { ASSERT_EQ(1, exchangeStats.count(fmt::format("{}.read", shuffleName))); vector_size_t numResults = 0; + std::vector partitionResults; for (const auto& result : results) { - outputVectors.push_back(copyResultVector(result)); + auto copied = copyResultVector(result); + outputVectors.push_back(copied); + partitionResults.push_back(copied); numResults += result->size(); numNulls[partition] += countNulls(result->childAt(0)); } if (numResults == 0) { emptyPartitions.insert(partition); } + + // Verify ordering for sorted shuffle + if (sortOrders.has_value() && !partitionResults.empty() && + fields.has_value()) { + // Concatenate all result vectors for this partition + auto totalSize = 0; + for (const auto& vec : partitionResults) { + totalSize += vec->size(); + } + auto result = BaseVector::create( + partitionResults[0]->type(), totalSize, pool()); + auto offset = 0; + for (const auto& vec : partitionResults) { + result->copy(vec.get(), offset, 0, vec->size()); + offset += vec->size(); + } + + // Verify that rows are sorted according to sortOrders + const auto& orders = sortOrders.value(); + const auto& sortFields = fields.value(); + + for (vector_size_t row = 1; row < result->size(); ++row) { + for (size_t i = 0; i < orders.size(); ++i) { + const auto& sortOrder = orders[i]; + const auto& field = sortFields[i]; + + // Find the column index for this field + auto fieldName = field->name(); + auto channel = dataType->getChildIdx(fieldName); + + auto column = result->childAt(channel); + auto prevIsNull = column->isNullAt(row - 1); + auto currIsNull = column->isNullAt(row); + + // Handle nulls + if (prevIsNull && currIsNull) { + continue; // Both null, equal, check next key + } + if (prevIsNull) { + // Previous is null, current is not + ASSERT_TRUE(sortOrder.isNullsFirst()) + << "Partition " << partition << ": Null at row " << (row - 1) + << " should come first for field " << fieldName; + break; + } + if (currIsNull) { + // Current is null, previous is not + ASSERT_FALSE(sortOrder.isNullsFirst()) + << "Partition " << partition << ": Null at row " << row + << " should come last for field " << fieldName; + break; + } + + auto cmp = column->compare(column.get(), row - 1, row); + if (cmp == 0) { + continue; + } + + bool inOrder = sortOrder.isAscending() ? (cmp < 0) : (cmp > 0); + ASSERT_TRUE(inOrder) + << "Partition " << partition << ": Row " << row + << " is out of order on sort field " << fieldName + << " (ascending=" << sortOrder.isAscending() << ")"; + break; + } + } + } } if (replicateNullsAndAny) { @@ -623,127 +489,11 @@ class ShuffleTest : public exec::test::OperatorTestBase { ASSERT_TRUE(numNulls.contains(i)); ASSERT_EQ(expectedNullCount, numNulls[i]); } - // TODO: Add assertContainResults for the remaining elements } else { velox::exec::test::assertEqualResults( expectedOutputVectors, outputVectors); } - - auto shuffleWriter = TestShuffleWriter::getInstance(); - if (shuffleWriter) { - const auto serializedSortKeys = shuffleWriter->serializedSortKeys(); - if (sortOrders && fields) { - for (auto i = 0; i < numPartitions; ++i) { - const auto resultSortingOrder = - getSortOrder((*serializedSortKeys)[i]); - EXPECT_EQ(expectedOrdering.value()[i], resultSortingOrder); - } - } else { - for (auto i = 0; i < numPartitions; ++i) { - EXPECT_TRUE((*serializedSortKeys)[i].empty()); - } - } - } else { - // Sorted shuffle is not supported with local shuffle. - EXPECT_FALSE(sortOrders && fields); - } - } - - static vector_size_t countNulls(const VectorPtr& vector) { - vector_size_t numNulls = 0; - for (auto i = 0; i < vector->size(); ++i) { - if (vector->isNullAt(i)) { - ++numNulls; - } - } - return numNulls; - } - - void fuzzerTest(bool replicateNullsAndAny, size_t numPartitions) { - // For unit testing, these numbers are set to relatively small values. - // For stress testing, the following parameters and the fuzzer vector, - // string and container sizes can be bumped up. - size_t numMapDrivers = 1; - size_t numInputVectors = 5; - size_t numIterations = 5; - - // Set up the fuzzer parameters. - VectorFuzzer::Options opts; - opts.vectorSize = 1000; - opts.nullRatio = 0.1; - opts.dictionaryHasNulls = false; - opts.stringVariableLength = true; - - // UnsafeRows use microseconds to store timestamp. - opts.timestampPrecision = - VectorFuzzer::Options::TimestampPrecision::kMicroSeconds; - opts.stringLength = 100; - opts.containerLength = 10; - - // For the time being, we are not including any MAP or more than three level - // nested data structures given the limitations of the fuzzer and - // assertEqualResults: - // Limitations of assertEqualResults: - // https://github.com/facebookincubator/velox/issues/2859 - auto rowType = ROW({ - {"c0", INTEGER()}, - {"c1", TINYINT()}, - {"c2", INTEGER()}, - {"c3", BIGINT()}, - {"c4", INTEGER()}, - {"c5", TIMESTAMP()}, - {"c6", REAL()}, - {"c7", TINYINT()}, - {"c8", DOUBLE()}, - {"c9", VARCHAR()}, - {"c10", ROW({VARCHAR(), INTEGER(), TIMESTAMP()})}, - {"c11", INTEGER()}, - {"c12", REAL()}, - {"c13", ARRAY(INTEGER())}, - {"c14", ARRAY(TINYINT())}, - {"c15", ROW({INTEGER(), VARCHAR(), ARRAY(INTEGER())})}, - {"c16", MAP(TINYINT(), REAL())}, - }); - - auto rootDirectory = velox::exec::test::TempDirectoryPath::create(); - auto rootPath = rootDirectory->getPath(); - const std::string shuffleWriteInfo = - localShuffleWriteInfo(rootPath, numPartitions); - - for (int it = 0; it < numIterations; it++) { - auto seed = folly::Random::rand32(); - - SCOPED_TRACE( - fmt::format( - "Iteration {}, numPartitions {}, replicateNullsAndAny {}, seed {}", - it, - numPartitions, - replicateNullsAndAny, - seed)); - - VectorFuzzer fuzzer(opts, pool_.get(), seed); - std::vector inputVectors; - // Create input vectors. - for (size_t i = 0; i < numInputVectors; ++i) { - auto input = fuzzer.fuzzInputRow(rowType); - inputVectors.push_back(input); - } - velox::exec::ExchangeSource::factories().clear(); - registerExchangeSource( - std::string(LocalPersistentShuffleFactory::kShuffleName)); - runShuffleTest( - std::string(LocalPersistentShuffleFactory::kShuffleName), - shuffleWriteInfo, - [&](auto partition) { - return localShuffleReadInfo(rootPath, numPartitions, partition); - }, - replicateNullsAndAny, - numPartitions, - numMapDrivers, - inputVectors); - cleanupDirectory(rootPath); - } } void partitionAndSerializeWithThresholds( @@ -815,24 +565,13 @@ class ShuffleTest : public exec::test::OperatorTestBase { testPartitionAndSerialize(plan, expected, params, expectedOutputCount); } - void cleanupDirectory(const std::string& rootPath) { - auto fileSystem = velox::filesystems::getFileSystem(rootPath, nullptr); - auto files = fileSystem->list(rootPath); - for (auto& file : files) { - fileSystem->remove(file); - } - } - void runPartitionAndSerializeSerdeTest( const RowVectorPtr& data, size_t numPartitions, const std::optional>& serdeLayout = std::nullopt) { - TestShuffleWriter::reset(); - - auto shuffleInfo = testShuffleInfo(numPartitions, 1 << 20); - TestShuffleWriter::createWriter(shuffleInfo, pool()); - + auto shuffleInfo = + localShuffleWriteInfo(tempDir_->getPath(), numPartitions); auto plan = exec::test::PlanBuilder() .values({data}, true) .addNode(addPartitionAndSerializeNode( @@ -841,9 +580,7 @@ class ShuffleTest : public exec::test::OperatorTestBase { serdeLayout.value_or(std::vector{}))) .localPartition(std::vector{}) .addNode(addShuffleWriteNode( - numPartitions, - std::string(TestShuffleFactory::kShuffleName), - shuffleInfo)) + numPartitions, std::string(shuffleName_), shuffleInfo)) .planNode(); exec::CursorParameters params; @@ -853,55 +590,284 @@ class ShuffleTest : public exec::test::OperatorTestBase { auto [taskCursor, results] = exec::test::readCursor(params); ASSERT_EQ(results.size(), 0); - auto shuffleWriter = TestShuffleWriter::getInstance(); - ASSERT_NE(shuffleWriter, nullptr); - - auto readyPartitions = shuffleWriter->readyPartitions(); - ASSERT_NE(readyPartitions, nullptr); - - size_t totalRows = 0; - for (size_t partitionIdx = 0; partitionIdx < numPartitions; - ++partitionIdx) { - for (const auto& batch : (*readyPartitions)[partitionIdx]) { - totalRows += batch->rows.size(); + // Verify by reading shuffle files + int totalRows = 0; + for (size_t partition = 0; partition < numPartitions; ++partition) { + auto reader = std::make_shared( + tempDir_->getPath(), + "query_id", + std::vector{fmt::format("shuffle_0_0_{}", partition)}, + false, + pool()); + reader->initialize(); + + while (true) { + auto batches = reader->next(1 << 20).get(); + if (batches.empty()) { + break; + } + for (auto& batch : batches) { + totalRows += batch->rows.size(); + } } } ASSERT_EQ(totalRows, data->size()); + } - auto expectedType = serdeLayout.has_value() - ? createSerdeLayoutType(asRowType(data->type()), serdeLayout.value()) - : asRowType(data->type()); - - std::vector deserializedData; - for (size_t partitionIdx = 0; partitionIdx < numPartitions; - ++partitionIdx) { - for (const auto& batch : (*readyPartitions)[partitionIdx]) { - auto deserialized = std::dynamic_pointer_cast( - row::CompactRow::deserialize(batch->rows, expectedType, pool())); - if (deserialized != nullptr && deserialized->size() > 0) { - deserializedData.push_back(deserialized); - } + void fuzzerTest(bool replicateNullsAndAny, size_t numPartitions) { + // For unit testing, these numbers are set to relatively small values. + // For stress testing, the following parameters and the fuzzer vector, + // string and container sizes can be bumped up. + size_t numMapDrivers = 1; + size_t numInputVectors = 5; + size_t numIterations = 5; + + // Set up the fuzzer parameters. + VectorFuzzer::Options opts; + opts.vectorSize = 1000; + opts.nullRatio = 0.1; + opts.dictionaryHasNulls = false; + opts.stringVariableLength = true; + + // UnsafeRows use microseconds to store timestamp. + opts.timestampPrecision = + VectorFuzzer::Options::TimestampPrecision::kMicroSeconds; + opts.stringLength = 100; + opts.containerLength = 10; + + // For the time being, we are not including any MAP or more than three level + // nested data structures given the limitations of the fuzzer and + // assertEqualResults: + // Limitations of assertEqualResults: + // https://github.com/facebookincubator/velox/issues/2859 + auto rowType = ROW({ + {"c0", INTEGER()}, + {"c1", TINYINT()}, + {"c2", INTEGER()}, + {"c3", BIGINT()}, + {"c4", INTEGER()}, + {"c5", TIMESTAMP()}, + {"c6", REAL()}, + {"c7", TINYINT()}, + {"c8", DOUBLE()}, + {"c9", VARCHAR()}, + {"c10", ROW({VARCHAR(), INTEGER(), TIMESTAMP()})}, + {"c11", INTEGER()}, + {"c12", REAL()}, + {"c13", ARRAY(INTEGER())}, + {"c14", ARRAY(TINYINT())}, + {"c15", ROW({INTEGER(), VARCHAR(), ARRAY(INTEGER())})}, + {"c16", MAP(TINYINT(), REAL())}, + }); + + auto rootDirectory = velox::exec::test::TempDirectoryPath::create(); + auto rootPath = rootDirectory->getPath(); + const std::string shuffleWriteInfo = + localShuffleWriteInfo(rootPath, numPartitions); + + for (int it = 0; it < numIterations; it++) { + auto seed = folly::Random::rand32(); + + SCOPED_TRACE( + fmt::format( + "Iteration {}, numPartitions {}, replicateNullsAndAny {}, seed {}", + it, + numPartitions, + replicateNullsAndAny, + seed)); + + VectorFuzzer fuzzer(opts, pool_.get(), seed); + std::vector inputVectors; + // Create input vectors. + for (size_t i = 0; i < numInputVectors; ++i) { + auto input = fuzzer.fuzzInputRow(rowType); + inputVectors.push_back(input); } + velox::exec::ExchangeSource::factories().clear(); + registerExchangeSource( + std::string(LocalPersistentShuffleFactory::kShuffleName)); + runShuffleTest( + std::string(LocalPersistentShuffleFactory::kShuffleName), + shuffleWriteInfo, + [&](auto partition) { + return localShuffleReadInfo(rootPath, partition); + }, + replicateNullsAndAny, + numPartitions, + numMapDrivers, + inputVectors); + cleanupDirectory(rootPath); } + } + + std::string_view shuffleName_; + std::shared_ptr tempDir_; + enum class DataType { + BASIC, + BASIC_WITH_NULLS, + SORTED_BIGINT, + SORTED_VARCHAR + }; - auto expected = serdeLayout.has_value() - ? reorderColumns(data, serdeLayout.value()) - : data; - velox::exec::test::assertEqualResults({expected}, deserializedData); + struct DataGeneratorConfig { + size_t numRows = 6; + size_t numBatches = 1; + double nullRatio = 0.0; + int32_t seed = 0; + }; + + struct GeneratedTestData { + std::vector inputData; + std::optional> ordering; + std::optional> fields; + std::optional>> expectedSortingOrder; + std::optional queryConfig; + }; + + GeneratedTestData generateTestData( + const DataGeneratorConfig& genConfig, + DataType type) { + GeneratedTestData result; + + VectorFuzzer::Options opts; + opts.vectorSize = genConfig.numRows; + opts.nullRatio = genConfig.nullRatio; + opts.timestampPrecision = + VectorFuzzer::Options::TimestampPrecision::kMicroSeconds; + VectorFuzzer fuzzer(opts, pool_.get(), genConfig.seed); + + switch (type) { + case DataType::BASIC: { + for (size_t i = 0; i < genConfig.numBatches; ++i) { + result.inputData.push_back(makeRowVector({ + makeFlatVector( + genConfig.numRows, + [i, genConfig](auto row) { + return static_cast( + (i * genConfig.numRows) + row + 1); + }), + makeFlatVector( + genConfig.numRows, + [i, genConfig](auto row) { + return static_cast( + (i * genConfig.numRows + row + 1) * 10); + }), + })); + } + break; + } + + case DataType::BASIC_WITH_NULLS: { + for (size_t i = 0; i < genConfig.numBatches; ++i) { + std::vector> col0Values; + std::vector col1Values; + col0Values.reserve(genConfig.numRows + 1); + col1Values.reserve(genConfig.numRows + 1); + + for (size_t row = 0; row < genConfig.numRows; ++row) { + col0Values.emplace_back( + static_cast((i * genConfig.numRows) + row + 1)); + col1Values.push_back( + static_cast((i * genConfig.numRows + row + 1) * 10)); + } + col0Values.emplace_back(std::nullopt); + col1Values.push_back( + static_cast((genConfig.numRows + 1) * 10)); + + result.inputData.push_back(makeRowVector({ + makeNullableFlatVector(col0Values), + makeFlatVector(col1Values), + })); + } + break; + } + + case DataType::SORTED_BIGINT: { + const size_t rowsPerBatch = genConfig.numRows / genConfig.numBatches; + for (size_t i = 0; i < genConfig.numBatches; ++i) { + std::vector partitions; + std::vector values; + partitions.reserve(rowsPerBatch); + values.reserve(rowsPerBatch); + + for (size_t row = 0; row < rowsPerBatch; ++row) { + partitions.push_back(static_cast(row % 2)); + values.push_back( + static_cast((i * rowsPerBatch) + row + 1) * 10); + } + + result.inputData.push_back(makeRowVector({ + makeFlatVector(partitions), + makeFlatVector(values), + })); + } + result.ordering = {velox::core::SortOrder(velox::core::kAscNullsFirst)}; + result.fields = std::vector{ + std::make_shared( + velox::BIGINT(), fmt::format("c{}", 1))}; + break; + } + + case DataType::SORTED_VARCHAR: { + const size_t rowsPerBatch = genConfig.numRows / genConfig.numBatches; + for (size_t i = 0; i < genConfig.numBatches; ++i) { + std::vector partitions; + std::vector keys; + partitions.reserve(rowsPerBatch); + keys.reserve(rowsPerBatch); + + for (size_t row = 0; row < rowsPerBatch; ++row) { + partitions.push_back(static_cast(row % 2)); + keys.push_back(fmt::format("key{:04d}", rowsPerBatch - row)); + } + + result.inputData.push_back(makeRowVector({ + makeFlatVector(partitions), + makeFlatVector(keys), + })); + } + result.ordering = {velox::core::SortOrder(velox::core::kAscNullsFirst)}; + result.fields = std::vector{ + std::make_shared( + velox::VARCHAR(), fmt::format("c{}", 1))}; + auto properties = std::unordered_map{ + {core::QueryConfig::kPreferredOutputBatchBytes, + std::to_string(1'000'000'000)}, + {core::QueryConfig::kPreferredOutputBatchRows, std::to_string(3)}}; + result.queryConfig = core::QueryConfig(properties); + break; + } + } + + return result; } private: - RowTypePtr createSerdeLayoutType( - const RowTypePtr& originalType, - const std::vector& layout) { - std::vector names; - std::vector types; - for (const auto& name : layout) { - auto idx = originalType->getChildIdx(name); - names.push_back(name); - types.push_back(originalType->childAt(idx)); + RowVectorPtr deserializeResult( + const RowVectorPtr& serializedResult, + const RowTypePtr& rowType) { + auto serializedData = + serializedResult->childAt(2)->as>(); + auto* rawValues = serializedData->rawValues(); + + std::vector rows; + rows.reserve(serializedData->size()); + for (auto i = 0; i < serializedData->size(); ++i) { + const auto& serializedRow = rawValues[i]; + rows.push_back( + std::string_view(serializedRow.data(), serializedRow.size())); } - return ROW(std::move(names), std::move(types)); + + return std::dynamic_pointer_cast( + row::CompactRow::deserialize(rows, rowType, pool())); + } + + RowVectorPtr copyResultVector(const RowVectorPtr& result) { + auto vector = std::static_pointer_cast( + BaseVector::create(result->type(), result->size(), pool())); + vector->copy(result.get(), 0, 0, result->size()); + VELOX_CHECK_EQ(vector->size(), result->size()); + return vector; } RowVectorPtr reorderColumns( @@ -917,26 +883,23 @@ class ShuffleTest : public exec::test::OperatorTestBase { }; TEST_F(ShuffleTest, operators) { + const std::string shuffleRootPath = tempDir_->getPath(); auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), makeFlatVector({10, 20, 30, 40}), }); - auto info = testShuffleInfo(4, 1 << 20 /* 1MB */); - auto plan = exec::test::PlanBuilder() - .values({data}, true) - .addNode(addPartitionAndSerializeNode(4, false)) - .localPartition(std::vector{}) - .addNode(addShuffleWriteNode( - 4, std::string(TestShuffleFactory::kShuffleName), info)) - .planNode(); - - exec::CursorParameters params; - params.planNode = plan; - params.maxDrivers = 2; + auto info = localShuffleWriteInfo(shuffleRootPath, 4); + auto plan = + exec::test::PlanBuilder() + .values({data}, true) + .addNode(addPartitionAndSerializeNode(4, false)) + .localPartition(std::vector{}) + .addNode(addShuffleWriteNode(4, std::string(shuffleName_), info)) + .planNode(); - auto [taskCursor, serializedResults] = exec::test::readCursor(params); - ASSERT_EQ(serializedResults.size(), 0); - TestShuffleWriter::reset(); + auto results = exec::test::AssertQueryBuilder(plan).copyResults(pool()); + // Shuffle write returns empty results + EXPECT_EQ(results->size(), 0); } DEBUG_ONLY_TEST_F(ShuffleTest, shuffleWriterExceptions) { @@ -944,12 +907,12 @@ DEBUG_ONLY_TEST_F(ShuffleTest, shuffleWriterExceptions) { makeFlatVector({1, 2, 3, 4}), makeFlatVector({10, 20, 30, 40}), }); - auto info = testShuffleInfo(4, 1 << 20 /* 1MB */); + auto info = localShuffleWriteInfo(tempDir_->getPath(), 4); SCOPED_TESTVALUE_SET( - "facebook::presto::operators::test::TestShuffleWriter::collect", - std::function( - [&](TestShuffleWriter* /*writer*/) { + "facebook::presto::operators::LocalShuffleWriter::collect", + std::function( + [&](LocalShuffleWriter* /*writer*/) { // Trigger a std::bad_function_call exception. std::function nullFunction = nullptr; VELOX_CHECK(nullFunction()); @@ -960,8 +923,7 @@ DEBUG_ONLY_TEST_F(ShuffleTest, shuffleWriterExceptions) { exec::test::PlanBuilder() .values({data}) .addNode(addPartitionAndSerializeNode(4, false)) - .addNode(addShuffleWriteNode( - 4, std::string(TestShuffleFactory::kShuffleName), info)) + .addNode(addShuffleWriteNode(4, std::string(shuffleName_), info)) .planNode(); VELOX_ASSERT_THROW( @@ -974,226 +936,44 @@ DEBUG_ONLY_TEST_F(ShuffleTest, shuffleReaderExceptions) { makeFlatVector({10, 20, 30, 40}), }); - auto info = testShuffleInfo(4, 1 << 20 /* 1MB */); - TestShuffleWriter::createWriter(info, pool()); + auto writeInfo = localShuffleWriteInfo(tempDir_->getPath(), 4); exec::CursorParameters params; params.planNode = exec::test::PlanBuilder() .values({data}) .addNode(addPartitionAndSerializeNode(2, false)) - .addNode(addShuffleWriteNode( - 2, std::string(TestShuffleFactory::kShuffleName), info)) + .addNode(addShuffleWriteNode(2, std::string(shuffleName_), writeInfo)) .planNode(); ASSERT_NO_THROW(exec::test::readCursor(params)); - std::function injectFailure = - [&](TestShuffleReader* /*reader*/) { + std::function injectFailure = + [&](LocalShuffleReader* /*reader*/) { // Trigger a std::bad_function_call exception. std::function nullFunction = nullptr; VELOX_CHECK(nullFunction()); }; exec::Operator::registerOperator(std::make_unique()); - registerExchangeSource(std::string(TestShuffleFactory::kShuffleName)); + velox::exec::ExchangeSource::factories().clear(); + registerExchangeSource(std::string(shuffleName_)); + params.planNode = exec::test::PlanBuilder() .addNode(addShuffleReadNode(asRowType(data->type()))) .planNode(); { SCOPED_TESTVALUE_SET( - "facebook::presto::operators::test::TestShuffleReader::next", - injectFailure); + "facebook::presto::operators::LocalShuffleReader::next", injectFailure); + auto readInfo = localShuffleReadInfo(tempDir_->getPath(), 2, 0); VELOX_ASSERT_THROW( - runShuffleReadTask(params, info), "ShuffleReader::next failed"); + runShuffleReadTask(params, readInfo), "ShuffleReader::next failed"); } } -TEST_F(ShuffleTest, endToEnd) { - size_t numPartitions = 5; - size_t numMapDrivers = 2; - - auto data = makeRowVector({ - makeFlatVector({1, 2, 3, 4, 5, 6}), - makeFlatVector({10, 20, 30, 40, 50, 60}), - }); - - // Make sure all previously registered exchange factory are gone. - velox::exec::ExchangeSource::factories().clear(); - auto shuffleInfo = testShuffleInfo(numPartitions, 1 << 20); - TestShuffleWriter::createWriter(shuffleInfo, pool()); - registerExchangeSource(std::string(TestShuffleFactory::kShuffleName)); - runShuffleTest( - std::string(TestShuffleFactory::kShuffleName), - shuffleInfo, - [&](auto /*partition*/) { return shuffleInfo; }, - false, - numPartitions, - numMapDrivers, - {data}, - kFakeBackgroundCpuTimeNanos); -} - -TEST_F(ShuffleTest, endToEndWithSortedShuffle) { - size_t numPartitions = 2; - size_t numMapDrivers = 1; - - auto batch1 = makeRowVector({ - makeFlatVector({0, 0, 1, 1, 1, 1}), // partition key - makeFlatVector({30, 10, 20, 50, 40, 60}), // sorting column - }); - - auto batch2 = makeRowVector({ - makeFlatVector({0, 0, 1}), // partition key - makeFlatVector({70, 80, 90}), // sorting column - }); - - auto expectedSortingOrder = { - std::vector{1, 0, 2, 3}, // partition key 0 - std::vector{0, 2, 1, 3, 4}, // partition key 1 - }; - - auto ordering = {velox::core::SortOrder(velox::core::kAscNullsFirst)}; - std::vector> fields; - fields.push_back( - std::make_shared( - velox::BIGINT(), fmt::format("c{}", 1))); - - // Make sure all previously registered exchange factory are gone. - velox::exec::ExchangeSource::factories().clear(); - std::string shuffleInfo = testShuffleInfo(numPartitions, 1 << 20); - TestShuffleWriter::createWriter(shuffleInfo, pool()); - registerExchangeSource(std::string(TestShuffleFactory::kShuffleName)); - runShuffleTest( - std::string(TestShuffleFactory::kShuffleName), - shuffleInfo, - [&](auto /*partition*/) { return shuffleInfo; }, - false, - numPartitions, - numMapDrivers, - {batch1, batch2}, - kFakeBackgroundCpuTimeNanos, - ordering, - fields, - expectedSortingOrder); -} - -TEST_F(ShuffleTest, endToEndWithSortedShuffleRowLimit) { - size_t numPartitions = 3; - size_t numMapDrivers = 1; - - auto data = makeRowVector({ - makeFlatVector({0, 0, 1, 1, 1, 1, 2, 2, 2}), // partition key - makeFlatVector( - {"key3", - "key1", - "key22", - "key55", - "key44", - "key66", - "key111", - "key222", - "key333"}) // sorting column - }); - - auto expectedSortingOrder = { - std::vector{1, 0}, // partition key 0 - std::vector{0, 2, 1, 3}, // partition key 1 - std::vector{0, 1, 2} // partition key 2 - }; - - auto ordering = {velox::core::SortOrder(velox::core::kAscNullsFirst)}; - std::vector> fields; - fields.push_back( - std::make_shared( - velox::VARCHAR(), fmt::format("c{}", 1))); - - // Make sure all previously registered exchange factory are gone. - velox::exec::ExchangeSource::factories().clear(); - std::string shuffleInfo = testShuffleInfo(numPartitions, 1 << 20); - TestShuffleWriter::createWriter(shuffleInfo, pool()); - registerExchangeSource(std::string(TestShuffleFactory::kShuffleName)); - - auto properties = std::unordered_map{ - {core::QueryConfig::kPreferredOutputBatchBytes, - std::to_string(1'000'000'000)}, - {core::QueryConfig::kPreferredOutputBatchRows, std::to_string(3)}}; - - auto queryConfig = core::QueryConfig(properties); - - runShuffleTest( - std::string(TestShuffleFactory::kShuffleName), - shuffleInfo, - [&](auto /*partition*/) { return shuffleInfo; }, - false, - numPartitions, - numMapDrivers, - {data}, - kFakeBackgroundCpuTimeNanos, - ordering, - fields, - expectedSortingOrder, - std::move(queryConfig)); -} - -TEST_F(ShuffleTest, endToEndWithReplicateNullAndAny) { - size_t numPartitions = 9; - size_t numMapDrivers = 2; - - auto data = makeRowVector({ - makeNullableFlatVector({1, 2, 3, 4, 5, 6, std::nullopt}), - makeFlatVector({10, 20, 30, 40, 50, 60, 70}), - }); - - // Make sure all previously registered exchange factory are gone. - velox::exec::ExchangeSource::factories().clear(); - const std::string shuffleInfo = testShuffleInfo(numPartitions, 1 << 20); - TestShuffleWriter::createWriter(shuffleInfo, pool()); - registerExchangeSource(std::string(TestShuffleFactory::kShuffleName)); - runShuffleTest( - std::string(TestShuffleFactory::kShuffleName), - shuffleInfo, - [&](auto /*partition*/) { return shuffleInfo; }, - true, - numPartitions, - numMapDrivers, - {data}, - kFakeBackgroundCpuTimeNanos); -} - -TEST_F(ShuffleTest, replicateNullsAndAny) { - // No nulls. Expect to replicate first row. - auto data = makeRowVector({ - makeFlatVector({1, 2, 3, 4}), - makeFlatVector({10, 20, 30, 40}), - }); - - testPartitionAndSerialize( - data, makeFlatVector({true, false, false, false})); - - // Nulls. Expect to replicate rows with nulls. - data = makeRowVector({ - makeNullableFlatVector({1, 2, std::nullopt, std::nullopt, 5}), - makeFlatVector({10, 20, 30, 40, 50}), - }); - - testPartitionAndSerialize( - data, makeFlatVector({false, false, true, true, false})); - - // Null in the first row. - data = makeRowVector({ - makeNullableFlatVector( - {std::nullopt, 2, std::nullopt, std::nullopt, 5}), - makeFlatVector({10, 20, 30, 40, 50}), - }); - - testPartitionAndSerialize( - data, makeFlatVector({true, false, true, true, false})); -} - -TEST_F(ShuffleTest, persistentShuffleDeser) { +TEST_F(ShuffleTest, shuffleDeserialization) { std::string serializedWriteInfo = "{\n" " \"rootPath\": \"abc\",\n" @@ -1255,153 +1035,34 @@ TEST_F(ShuffleTest, persistentShuffleDeser) { nlohmann::detail::type_error); } -TEST_F(ShuffleTest, persistentShuffle) { - uint32_t numPartitions = 1; - uint32_t numMapDrivers = 1; - - auto rootDirectory = velox::exec::test::TempDirectoryPath::create(); - auto rootPath = rootDirectory->getPath(); - +TEST_F(ShuffleTest, replicateNullsAndAny) { + // No nulls. Expect to replicate first row. auto data = makeRowVector({ - makeFlatVector({1, 2, 3, 4, 5, 6}), - makeFlatVector({10, 20, 30, 40, 50, 60}), + makeFlatVector({1, 2, 3, 4}), + makeFlatVector({10, 20, 30, 40}), }); - // Make sure all previously registered exchange factory are gone. - velox::exec::ExchangeSource::factories().clear(); - const std::string shuffleWriteInfo = - localShuffleWriteInfo(rootPath, numPartitions); - registerExchangeSource( - std::string(LocalPersistentShuffleFactory::kShuffleName)); - runShuffleTest( - std::string(LocalPersistentShuffleFactory::kShuffleName), - shuffleWriteInfo, - [&](auto partition) { - return localShuffleReadInfo(rootPath, numPartitions, partition); - }, - false, - numPartitions, - numMapDrivers, - {data}); - cleanupDirectory(rootPath); -} - -TEST_F(ShuffleTest, persistentShuffleBatch) { - const uint32_t numPartitions = 1; - const uint32_t partition = 0; - - const size_t numRows{20}; - std::vector values{numRows}; - std::vector views{numRows}; - const size_t rowSize{64}; - for (auto i = 0; i < numRows; ++i) { - values[i] = std::string(rowSize, 'a' + i % 26); - views[i] = values[i]; - } - - struct { - uint64_t maxBytes; - int expectedOutputCalls; - bool sortedShuffle; - - std::string debugString() const { - return fmt::format( - "maxBytes: {}, expectedOutputCalls: {}, sortedShuffle: {}", - maxBytes, - expectedOutputCalls, - sortedShuffle); - } - } testSettings[] = { - {.maxBytes = 1, .expectedOutputCalls = numRows, .sortedShuffle = false}, - {.maxBytes = 100, .expectedOutputCalls = numRows, .sortedShuffle = false}, - {.maxBytes = 1 << 25, .expectedOutputCalls = 1, .sortedShuffle = false}, - {.maxBytes = 1, .expectedOutputCalls = numRows, .sortedShuffle = true}, - {.maxBytes = 100, .expectedOutputCalls = numRows, .sortedShuffle = true}, - {.maxBytes = 1 << 25, .expectedOutputCalls = 1, .sortedShuffle = true}}; - - for (const auto& testData : testSettings) { - SCOPED_TRACE(testData.debugString()); - - auto tempRootDir = velox::exec::test::TempDirectoryPath::create(); - auto testRootPath = tempRootDir->getPath(); - - LocalShuffleWriteInfo writeInfo = LocalShuffleWriteInfo::deserialize( - localShuffleWriteInfo(testRootPath, numPartitions)); - - auto writer = std::make_shared( - writeInfo.rootPath, - writeInfo.queryId, - writeInfo.shuffleId, - writeInfo.numPartitions, - /*maxBytesPerPartition=*/1, - testData.sortedShuffle, - pool()); - - for (auto i = 0; i < numRows; ++i) { - writer->collect( - partition, - testData.sortedShuffle ? std::string_view(values[i].data(), 8) - : std::string_view{}, - views[i]); - } - writer->noMoreData(true); - - // Create reader - LocalShuffleReadInfo readInfo = LocalShuffleReadInfo::deserialize( - localShuffleReadInfo(testRootPath, numPartitions, partition)); - - auto reader = std::make_shared( - readInfo.rootPath, - readInfo.queryId, - readInfo.partitionIds, - testData.sortedShuffle, - pool()); - reader->initialize(); - - int numOutputCalls{0}; - int numBatches{0}; - int totalRows{0}; - // Read all batches - while (true) { - auto batches = reader->next(testData.maxBytes) - .via(folly::getGlobalCPUExecutor()) - .get(); - if (batches.empty()) { - break; - } - - ++numOutputCalls; - numBatches += batches.size(); - for (const auto& batch : batches) { - totalRows += batch->rows.size(); - } - } - - reader->noMoreData(true); + testPartitionAndSerialize( + data, makeFlatVector({true, false, false, false})); - // Verify we read all rows. - ASSERT_EQ(totalRows, numRows); - // ASSERT_EQ(numBatches, numRows); - // Verify number of output batches. - ASSERT_EQ(numOutputCalls, testData.expectedOutputCalls); - cleanupDirectory(testRootPath); - } -} + // Nulls. Expect to replicate rows with nulls. + data = makeRowVector({ + makeNullableFlatVector({1, 2, std::nullopt, std::nullopt, 5}), + makeFlatVector({10, 20, 30, 40, 50}), + }); -TEST_F(ShuffleTest, persistentShuffleFuzz) { - fuzzerTest(false, 1); - fuzzerTest(false, 3); - fuzzerTest(false, 7); -} + testPartitionAndSerialize( + data, makeFlatVector({false, false, true, true, false})); -TEST_F(ShuffleTest, persistentShuffleFuzzWithReplicateNullsAndAny) { - fuzzerTest(true, 1); - fuzzerTest(true, 3); - fuzzerTest(true, 7); -} + // Null in the first row. + data = makeRowVector({ + makeNullableFlatVector( + {std::nullopt, 2, std::nullopt, std::nullopt, 5}), + makeFlatVector({10, 20, 30, 40, 50}), + }); -TEST_F(ShuffleTest, partitionAndSerializeOutputByteLimit) { - partitionAndSerializeWithThresholds(10'000, 1, 10, 10); + testPartitionAndSerialize( + data, makeFlatVector({true, false, true, true, false})); } TEST_F(ShuffleTest, partitionAndSerializeOutputRowLimit) { @@ -1450,65 +1111,8 @@ TEST_F(ShuffleTest, partitionAndSerializeWithLargeInput) { testPartitionAndSerialize(plan, data); } -// Test the write path of sorted shuffle by directly reading files from disk. -// This test and parseShuffleRows only for testing the correctness of the -// LocalShuffleWriter::collect() -TEST_F(ShuffleTest, localShuffleWriterSortedOutput) { - const uint32_t numPartitions = 2; - auto rootDirectory = velox::exec::test::TempDirectoryPath::create(); - const auto& rootPath = rootDirectory->getPath(); - const std::vector keys = { - "key3", "key1", "key5", "key2", "key4", "key6"}; - const std::vector values = { - "data3", "data1", "data5", "data2", "data4", "data6"}; - const std::vector>> expected = - {{{"key3", "data3"}, {"key4", "data4"}, {"key5", "data5"}}, - {{"key1", "data1"}, {"key2", "data2"}, {"key6", "data6"}}}; - - LocalShuffleWriteInfo writeInfo = LocalShuffleWriteInfo::deserialize( - localShuffleWriteInfo(rootPath, numPartitions)); - auto writer = std::make_shared( - writeInfo.rootPath, - writeInfo.queryId, - writeInfo.shuffleId, - writeInfo.numPartitions, - /*maxBytesPerPartition=*/1024, - /*sortedShuffle=*/true, - pool()); - for (size_t i = 0; i < keys.size(); ++i) { - writer->collect(i % numPartitions, keys[i], values[i]); - } - writer->noMoreData(true); - - auto fs = velox::filesystems::getFileSystem(rootPath, nullptr); - std::map> partitionFiles; - for (const auto& file : fs->list(rootPath)) { - const auto pos = file.find("_shuffle_0_0_") + 13; - partitionFiles[std::stoi(file.substr(pos, 1))].push_back(file); - } - - for (int partition = 0; partition < numPartitions; ++partition) { - std::vector> actual; - for (const auto& filename : partitionFiles[partition]) { - auto file = fs->openFileForRead(filename); - auto buffer = AlignedBuffer::allocate(file->size(), pool(), 0); - file->pread(0, file->size(), buffer->asMutable()); - auto parsedRows = - testingExtractRowMetadata(buffer->as(), file->size(), true); - - for (const auto& row : parsedRows) { - const char* data = buffer->as(); - const char* keyPtr = data + row.rowStart + (sizeof(uint32_t) * 2); - const char* dataPtr = keyPtr + row.keySize; - actual.emplace_back( - std::string(keyPtr, row.keySize), - std::string(dataPtr, row.dataSize)); - } - } - EXPECT_EQ(actual, expected[partition]); - } - - cleanupDirectory(rootPath); +TEST_F(ShuffleTest, partitionAndSerializeOutputByteLimit) { + partitionAndSerializeWithThresholds(10'000, 1, 10, 10); } TEST_F(ShuffleTest, partitionAndSerializeWithDifferentColumnOrder) { @@ -1560,56 +1164,23 @@ TEST_F(ShuffleTest, partitionAndSerializeOperatorWhenSinglePartition) { testPartitionAndSerialize(plan, data); } -TEST_F(ShuffleTest, shuffleWriterToString) { +TEST_F(ShuffleTest, partitionAndSerializeEndToEnd) { auto data = makeRowVector({ - makeFlatVector(1'000, [](auto row) { return row; }), - makeFlatVector(1'000, [](auto row) { return row * 10; }), + makeFlatVector({1, 2, 3, 4, 5, 6}), + makeFlatVector({10, 20, 30, 40, 50, 60}), }); + runPartitionAndSerializeSerdeTest(data, 4); - auto plan = exec::test::PlanBuilder() - .values({data}, true) - .addNode(addPartitionAndSerializeNode(4, false)) - .localPartition(std::vector{}) - .addNode(addShuffleWriteNode( - 4, - std::string(TestShuffleFactory::kShuffleName), - testShuffleInfo(10, 10))) - .planNode(); - - ASSERT_EQ(plan->toString(false, false), "-- ShuffleWrite[3]\n"); - ASSERT_EQ( - plan->toString(true, false), - "-- ShuffleWrite[3][4, test-shuffle]" - " -> partition:INTEGER, key:VARBINARY, data:VARBINARY\n"); -} + // Clean up shuffle files between test scenarios to avoid file name collisions + cleanupDirectory(tempDir_->getPath()); -TEST_F(ShuffleTest, partitionAndSerializeToString) { - auto data = makeRowVector({ - makeFlatVector(1'000, [](auto row) { return row; }), - makeFlatVector(1'000, [](auto row) { return row * 10; }), + data = makeRowVector({ + makeFlatVector({1, 2, 3, 4}), + makeFlatVector({10, 20, 30, 40}), + makeFlatVector({"a", "b", "c", "d"}), }); - auto plan = exec::test::PlanBuilder() - .values({data}, true) - .addNode(addPartitionAndSerializeNode(4, false)) - .planNode(); - - ASSERT_EQ(plan->toString(false, false), "-- PartitionAndSerialize[1]\n"); - ASSERT_EQ( - plan->toString(true, false), - "-- PartitionAndSerialize[1][(c0) 4 HASH(c0) ROW]" - " -> partition:INTEGER, key:VARBINARY, data:VARBINARY\n"); - - plan = exec::test::PlanBuilder() - .values({data}, true) - .addNode(addPartitionAndSerializeNode(4, true)) - .planNode(); - - ASSERT_EQ(plan->toString(false, false), "-- PartitionAndSerialize[1]\n"); - ASSERT_EQ( - plan->toString(true, false), - "-- PartitionAndSerialize[1][(c0) 4 HASH(c0) ROW]" - " -> partition:INTEGER, key:VARBINARY, data:VARBINARY, replicate:BOOLEAN\n"); + runPartitionAndSerializeSerdeTest(data, 2, {{"c2", "c0"}}); } class DummyShuffleInterfaceFactory : public ShuffleInterfaceFactory { @@ -1645,7 +1216,7 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) { exec::Operator::registerOperator(std::make_unique()); exec::Operator::registerOperator(std::make_unique()); velox::exec::ExchangeSource::factories().clear(); - registerExchangeSource(std::string(TestShuffleFactory::kShuffleName)); + registerExchangeSource(std::string(shuffleName_)); const auto dataType = ROW({ {"c0", INTEGER()}, @@ -1675,8 +1246,8 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) { inputVectors.push_back(fuzzer.fuzzInputRow(dataType)); } - auto shuffleInfo = testShuffleInfo(numPartitions, 1 << 20); - TestShuffleWriter::createWriter(shuffleInfo, pool()); + auto shuffleInfo = + localShuffleWriteInfo(tempDir_->getPath(), numPartitions); auto writerPlan = exec::test::PlanBuilder() @@ -1684,9 +1255,7 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) { .addNode(addPartitionAndSerializeNode(numPartitions, false)) .localPartition(std::vector{}) .addNode(addShuffleWriteNode( - numPartitions, - std::string(TestShuffleFactory::kShuffleName), - shuffleInfo)) + numPartitions, std::string(shuffleName_), shuffleInfo)) .planNode(); auto writerTaskId = makeTaskId("leaf", 0); @@ -1706,7 +1275,8 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) { params.destination = 0; params.maxDrivers = 1; - auto [taskCursor, results] = runShuffleReadTask(params, shuffleInfo); + auto [taskCursor, results] = runShuffleReadTask( + params, localShuffleReadInfo(tempDir_->getPath(), 0)); ASSERT_TRUE( exec::test::waitForTaskCompletion(taskCursor->task().get(), 5'000'000)); @@ -1737,71 +1307,262 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) { } } -TEST_F(ShuffleTest, partitionAndSerializeEndToEnd) { - auto data = makeRowVector({ - makeFlatVector({1, 2, 3, 4, 5, 6}), - makeFlatVector({10, 20, 30, 40, 50, 60}), - }); - runPartitionAndSerializeSerdeTest(data, 4); +TEST_F(ShuffleTest, shuffleEndToEnd) { + struct TestConfig { + std::string testName; + DataType dataType; + size_t numPartitions; + size_t numMapDrivers; + bool replicateNullsAndAny; + bool sortedShuffle; + bool withRowLimit; + bool useCustomTempDir; + DataGeneratorConfig dataGenConfig; - data = makeRowVector({ - makeFlatVector({1, 2, 3, 4}), - makeFlatVector({10, 20, 30, 40}), - makeFlatVector({"a", "b", "c", "d"}), - }); + std::string debugString() const { + return fmt::format( + "test:{}, partitions:{}, drivers:{}, replicate:{}, sorted:{}, rowLimit:{}, customDir:{}, numRows:{}, numBatches:{}", + testName, + numPartitions, + numMapDrivers, + replicateNullsAndAny, + sortedShuffle, + withRowLimit, + useCustomTempDir, + dataGenConfig.numRows, + dataGenConfig.numBatches); + } + }; - runPartitionAndSerializeSerdeTest(data, 2, {{"c2", "c0"}}); + const TestConfig testSettings[] = { + {.testName = "endToEnd", + .dataType = DataType::BASIC, + .numPartitions = 5, + .numMapDrivers = 2, + .replicateNullsAndAny = false, + .sortedShuffle = false, + .withRowLimit = false, + .useCustomTempDir = false, + .dataGenConfig = {.numRows = 1000, .numBatches = 2, .seed = 1}}, + {.testName = "endToEndSmallData", + .dataType = DataType::BASIC, + .numPartitions = 3, + .numMapDrivers = 1, + .replicateNullsAndAny = false, + .sortedShuffle = false, + .withRowLimit = false, + .useCustomTempDir = false, + .dataGenConfig = {.numRows = 10, .numBatches = 1, .seed = 1}}, + {.testName = "endToEndLargeData", + .dataType = DataType::BASIC, + .numPartitions = 8, + .numMapDrivers = 4, + .replicateNullsAndAny = false, + .sortedShuffle = false, + .withRowLimit = false, + .useCustomTempDir = false, + .dataGenConfig = {.numRows = 10000, .numBatches = 5, .seed = 1}}, + {.testName = "endToEndWithSortedShuffle", + .dataType = DataType::SORTED_BIGINT, + .numPartitions = 2, + .numMapDrivers = 1, + .replicateNullsAndAny = false, + .sortedShuffle = true, + .withRowLimit = false, + .useCustomTempDir = false, + .dataGenConfig = {.numRows = 500, .numBatches = 2, .seed = 1}}, + {.testName = "endToEndWithSortedShuffleSmallData", + .dataType = DataType::SORTED_BIGINT, + .numPartitions = 2, + .numMapDrivers = 1, + .replicateNullsAndAny = false, + .sortedShuffle = true, + .withRowLimit = false, + .useCustomTempDir = false, + .dataGenConfig = {.numRows = 20, .numBatches = 1, .seed = 1}}, + {.testName = "endToEndWithSortedShuffleRowLimit", + .dataType = DataType::SORTED_VARCHAR, + .numPartitions = 2, + .numMapDrivers = 1, + .replicateNullsAndAny = false, + .sortedShuffle = true, + .withRowLimit = true, + .useCustomTempDir = false, + .dataGenConfig = {.numRows = 100, .numBatches = 1, .seed = 1}}, + {.testName = "endToEndWithReplicateNullAndAny", + .dataType = DataType::BASIC_WITH_NULLS, + .numPartitions = 9, + .numMapDrivers = 2, + .replicateNullsAndAny = true, + .sortedShuffle = false, + .withRowLimit = false, + .useCustomTempDir = false, + .dataGenConfig = {.numRows = 200, .numBatches = 3, .seed = 1}}, + {.testName = "localShuffle", + .dataType = DataType::BASIC, + .numPartitions = 1, + .numMapDrivers = 1, + .replicateNullsAndAny = false, + .sortedShuffle = false, + .withRowLimit = false, + .useCustomTempDir = true, + .dataGenConfig = {.numRows = 50, .numBatches = 1, .seed = 1}}, + }; + + for (const auto& config : testSettings) { + SCOPED_TRACE(config.debugString()); + + std::shared_ptr customTempDir; + std::string rootPath; + if (config.useCustomTempDir) { + customTempDir = exec::test::TempDirectoryPath::create(); + rootPath = customTempDir->getPath(); + } else { + rootPath = tempDir_->getPath(); + cleanupDirectory(rootPath); + } + + auto testData = generateTestData(config.dataGenConfig, config.dataType); + auto& inputData = testData.inputData; + auto& ordering = testData.ordering; + auto& fields = testData.fields; + auto& expectedSortingOrder = testData.expectedSortingOrder; + auto& queryConfig = testData.queryConfig; + + velox::exec::ExchangeSource::factories().clear(); + const std::string shuffleName = config.useCustomTempDir + ? std::string(LocalPersistentShuffleFactory::kShuffleName) + : std::string(shuffleName_); + const std::string shuffleInfo = localShuffleWriteInfo( + rootPath, config.numPartitions, config.sortedShuffle); + registerExchangeSource(shuffleName); + + runShuffleTest( + shuffleName, + shuffleInfo, + [&](auto partition) { + return localShuffleReadInfo( + rootPath, partition, config.sortedShuffle); + }, + config.replicateNullsAndAny, + config.numPartitions, + config.numMapDrivers, + inputData, + 0, + ordering, + fields, + expectedSortingOrder, + queryConfig.has_value() ? std::move(queryConfig.value()) + : core::QueryConfig({})); + cleanupDirectory(rootPath); + } } -TEST_F(ShuffleTest, persistentShuffleSortedEndToEnd) { +TEST_F(ShuffleTest, shuffleWriterReader) { const uint32_t numPartitions = 1; const uint32_t partition = 0; struct TestConfig { + bool sortedShuffle; size_t maxBytesPerPartition; size_t numRows; uint64_t readMaxBytes; size_t minDataSize; size_t maxDataSize; + std::optional expectedOutputCalls; std::string debugString() const { return fmt::format( - "maxBytesPerPartition:{}, rows:{}, readMax:{}, dataSize:{}-{}", + "sorted:{}, maxBytesPerPartition:{}, rows:{}, readMax:{}, dataSize:{}-{}, expectedCalls:{}", + sortedShuffle, maxBytesPerPartition, numRows, readMaxBytes, minDataSize, - maxDataSize); + maxDataSize, + expectedOutputCalls.has_value() ? std::to_string(*expectedOutputCalls) + : "None"); } } testSettings[] = { - {.maxBytesPerPartition = 1024, + // Sorted shuffle tests: verify data comes back in sorted order + {.sortedShuffle = true, + .maxBytesPerPartition = 1024, .numRows = 1, .readMaxBytes = 1024, .minDataSize = 10, .maxDataSize = 50}, - {.maxBytesPerPartition = 1024, + {.sortedShuffle = true, + .maxBytesPerPartition = 1024, .numRows = 10, .readMaxBytes = 1024 * 1024, .minDataSize = 50, .maxDataSize = 200}, - {.maxBytesPerPartition = 500, + {.sortedShuffle = true, + .maxBytesPerPartition = 500, .numRows = 20, .readMaxBytes = 1024 * 1024, .minDataSize = 50, .maxDataSize = 150}, - {.maxBytesPerPartition = 1024, + {.sortedShuffle = true, + .maxBytesPerPartition = 1024, .numRows = 50, .readMaxBytes = 8192, .minDataSize = 100, .maxDataSize = 400}, - {.maxBytesPerPartition = 2048, + {.sortedShuffle = true, + .maxBytesPerPartition = 2048, .numRows = 100, .readMaxBytes = 1024 * 1024, .minDataSize = 200, .maxDataSize = 1000}, + // Sorted shuffle batching tests: verify correct number of output batches + {.sortedShuffle = true, + .maxBytesPerPartition = 1, + .numRows = 20, + .readMaxBytes = 1, + .minDataSize = 64, + .maxDataSize = 64, + .expectedOutputCalls = 20}, + {.sortedShuffle = true, + .maxBytesPerPartition = 1, + .numRows = 20, + .readMaxBytes = 100, + .minDataSize = 64, + .maxDataSize = 64, + .expectedOutputCalls = 20}, + {.sortedShuffle = true, + .maxBytesPerPartition = 1, + .numRows = 20, + .readMaxBytes = 1 << 25, + .minDataSize = 64, + .maxDataSize = 64, + .expectedOutputCalls = 1}, + // Unsorted shuffle batching tests: verify correct number of output + // batches + {.sortedShuffle = false, + .maxBytesPerPartition = 1, + .numRows = 20, + .readMaxBytes = 1, + .minDataSize = 64, + .maxDataSize = 64, + .expectedOutputCalls = 20}, + {.sortedShuffle = false, + .maxBytesPerPartition = 1, + .numRows = 20, + .readMaxBytes = 100, + .minDataSize = 64, + .maxDataSize = 64, + .expectedOutputCalls = 20}, + {.sortedShuffle = false, + .maxBytesPerPartition = 1, + .numRows = 20, + .readMaxBytes = 1 << 25, + .minDataSize = 64, + .maxDataSize = 64, + .expectedOutputCalls = 1}, }; - for (const auto& testData : testSettings) { - SCOPED_TRACE(testData.debugString()); + for (const auto& config : testSettings) { + SCOPED_TRACE(config.debugString()); auto tempRootDir = velox::exec::test::TempDirectoryPath::create(); const auto testRootPath = tempRootDir->getPath(); @@ -1814,100 +1575,181 @@ TEST_F(ShuffleTest, persistentShuffleSortedEndToEnd) { writeInfo.queryId, writeInfo.shuffleId, writeInfo.numPartitions, - testData.maxBytesPerPartition, - /*sortedShuffle=*/true, + config.maxBytesPerPartition, + config.sortedShuffle, pool()); - folly::Random::DefaultGenerator rng; - rng.seed(1); - std::vector randomKeys; - randomKeys.reserve(testData.numRows); + std::vector keys; std::vector dataValues; - dataValues.reserve(testData.numRows); - - for (size_t i = 0; i < testData.numRows; ++i) { - randomKeys.push_back(static_cast(folly::Random::rand32(rng))); - - const size_t sizeRange = testData.maxDataSize - testData.minDataSize; - const size_t dataSize = testData.minDataSize + - (sizeRange > 0 ? folly::Random::rand32(rng) % sizeRange : 0); - - // Create data with index marker at the end for verification - std::string data(dataSize, static_cast('a' + (i % 26))); - data.append(fmt::format("_idx{:04d}", i)); - dataValues.push_back(std::move(data)); + keys.reserve(config.numRows); + dataValues.reserve(config.numRows); + + if (config.sortedShuffle) { + // For sorted shuffle, generate random keys and include index markers for + // verification + folly::Random::DefaultGenerator rng; + rng.seed(1); + for (size_t i = 0; i < config.numRows; ++i) { + keys.push_back(static_cast(folly::Random::rand32(rng))); + const size_t sizeRange = config.maxDataSize - config.minDataSize; + const size_t dataSize = config.minDataSize + + (sizeRange > 0 ? folly::Random::rand32(rng) % sizeRange : 0); + std::string data(dataSize, static_cast('a' + (i % 26))); + data.append(fmt::format("_idx{:04d}", i)); + dataValues.push_back(std::move(data)); + } + } else { + // For unsorted shuffle, use sequential keys + for (size_t i = 0; i < config.numRows; ++i) { + keys.push_back(i); + dataValues.push_back(std::string(config.minDataSize, 'a' + i % 26)); + } } - for (size_t i = 0; i < randomKeys.size(); ++i) { - int32_t keyBigEndian = folly::Endian::big(randomKeys[i]); - std::string_view keyBytes( - reinterpret_cast(&keyBigEndian), kUint32Size); - writer->collect(partition, keyBytes, dataValues[i]); + + for (size_t i = 0; i < keys.size(); ++i) { + if (config.sortedShuffle) { + int32_t keyBigEndian = folly::Endian::big(keys[i]); + std::string_view keyBytes( + reinterpret_cast(&keyBigEndian), kUint32Size); + writer->collect(partition, keyBytes, dataValues[i]); + } else { + writer->collect(partition, std::string_view{}, dataValues[i]); + } } writer->noMoreData(true); LocalShuffleReadInfo readInfo = LocalShuffleReadInfo::deserialize( - localShuffleReadInfo(testRootPath, numPartitions, partition)); + localShuffleReadInfo(testRootPath, partition, config.sortedShuffle)); auto reader = std::make_shared( readInfo.rootPath, readInfo.queryId, readInfo.partitionIds, - /*sortedShuffle=*/true, + config.sortedShuffle, pool()); reader->initialize(); - size_t count = 0; + size_t totalRows = 0; + int numOutputCalls = 0; std::vector readDataValues; while (true) { - auto batches = reader->next(testData.readMaxBytes) + auto batches = reader->next(config.readMaxBytes) .via(folly::getGlobalCPUExecutor()) .get(); if (batches.empty()) { break; } + ++numOutputCalls; for (const auto& batch : batches) { for (const auto& row : batch->rows) { readDataValues.emplace_back(row.data(), row.size()); - ++count; + ++totalRows; } } } - EXPECT_EQ(randomKeys.size(), count); + EXPECT_EQ(config.numRows, totalRows); - // Get the sorted order of original keys using getSortOrder - std::vector keys; - keys.reserve(randomKeys.size()); - for (const auto& key : randomKeys) { - int32_t keyBigEndian = folly::Endian::big(key); - keys.emplace_back( - reinterpret_cast(&keyBigEndian), sizeof(int32_t)); + if (config.sortedShuffle) { + // Verify data came back in sorted order + std::vector sortKeys; + sortKeys.reserve(keys.size()); + for (const auto& key : keys) { + int32_t keyBigEndian = folly::Endian::big(key); + sortKeys.emplace_back( + reinterpret_cast(&keyBigEndian), sizeof(int32_t)); + } + auto sortedOrder = getSortOrder(sortKeys); + + for (size_t i = 0; i < readDataValues.size(); ++i) { + const std::string& dataValue = readDataValues[i]; + size_t idxPos = dataValue.find("_idx"); + ASSERT_NE(idxPos, std::string::npos) + << "Data value at position " << i << " missing '_idx' marker: '" + << dataValue << "'"; + size_t originalIdx = std::stoul(dataValue.substr(idxPos + 4)); + EXPECT_EQ(originalIdx, sortedOrder[i]) + << "Data at position " << i << " should correspond to key at index " + << sortedOrder[i] << " but corresponds to index " << originalIdx; + } } - auto sortedOrder = getSortOrder(keys); - - // Verify data appears in sorted key order - for (size_t i = 0; i < readDataValues.size(); ++i) { - // Extract original index from data value (format: [chars]_idx0000) - const std::string& dataValue = readDataValues[i]; - size_t idxPos = dataValue.find("_idx"); - ASSERT_NE(idxPos, std::string::npos) - << "Data value at position " << i << " missing '_idx' marker: '" - << dataValue << "'"; - - size_t originalIdx = std::stoul(dataValue.substr(idxPos + 4)); - - // The data at position i should correspond to the key at sortedOrder[i] - EXPECT_EQ(originalIdx, sortedOrder[i]) - << "Data at position " << i << " should correspond to key at index " - << sortedOrder[i] << " but corresponds to index " << originalIdx; + + if (config.expectedOutputCalls.has_value()) { + EXPECT_EQ(numOutputCalls, config.expectedOutputCalls.value()) + << "Expected " << config.expectedOutputCalls.value() + << " output calls but got " << numOutputCalls + << " (readMaxBytes=" << config.readMaxBytes << ")"; } + reader->noMoreData(true); cleanupDirectory(testRootPath); } } +TEST_F(ShuffleTest, shuffleFuzzTest) { + fuzzerTest(false, 1); + fuzzerTest(false, 3); + fuzzerTest(false, 7); + + // Test With ReplicateNullsAndAny + fuzzerTest(true, 1); + fuzzerTest(true, 3); + fuzzerTest(true, 7); +} + +TEST_F(ShuffleTest, shuffleWriterToString) { + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + makeFlatVector(1'000, [](auto row) { return row * 10; }), + }); + + auto plan = exec::test::PlanBuilder() + .values({data}, true) + .addNode(addPartitionAndSerializeNode(4, false)) + .localPartition(std::vector{}) + .addNode(addShuffleWriteNode( + 4, + std::string(shuffleName_), + localShuffleWriteInfo(tempDir_->getPath(), 10))) + .planNode(); + + ASSERT_EQ(plan->toString(false, false), "-- ShuffleWrite[3]\n"); + ASSERT_EQ( + plan->toString(true, false), + "-- ShuffleWrite[3][4, local]" + " -> partition:INTEGER, key:VARBINARY, data:VARBINARY\n"); +} + +TEST_F(ShuffleTest, partitionAndSerializeToString) { + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + makeFlatVector(1'000, [](auto row) { return row * 10; }), + }); + + auto plan = exec::test::PlanBuilder() + .values({data}, true) + .addNode(addPartitionAndSerializeNode(4, false)) + .planNode(); + + ASSERT_EQ(plan->toString(false, false), "-- PartitionAndSerialize[1]\n"); + ASSERT_EQ( + plan->toString(true, false), + "-- PartitionAndSerialize[1][(c0) 4 HASH(c0) ROW]" + " -> partition:INTEGER, key:VARBINARY, data:VARBINARY\n"); + + plan = exec::test::PlanBuilder() + .values({data}, true) + .addNode(addPartitionAndSerializeNode(4, true)) + .planNode(); + + ASSERT_EQ(plan->toString(false, false), "-- PartitionAndSerialize[1]\n"); + ASSERT_EQ( + plan->toString(true, false), + "-- PartitionAndSerialize[1][(c0) 4 HASH(c0) ROW]" + " -> partition:INTEGER, key:VARBINARY, data:VARBINARY, replicate:BOOLEAN\n"); +} } // namespace facebook::presto::operators::test int main(int argc, char** argv) {