1717#include < boost/uuid/uuid_io.hpp>
1818#include " presto_cpp/main/operators/BroadcastFile.h"
1919#include " velox/common/file/FileSystems.h"
20+ #include " velox/exec/OperatorUtils.h"
2021
2122using namespace facebook ::velox::exec;
2223using namespace facebook ::velox;
@@ -31,22 +32,7 @@ velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) {
3132 return obj[" id" ].asString ();
3233}
3334
34- // TODO: This is a copy from Exchange.cpp. We should refactor
35- // such that this method is globally accessible from a single location. This is
36- // to prevent diverges of serde options during write and read.
37- std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions (
38- const core::QueryConfig& queryConfig,
39- VectorSerde::Kind kind) {
40- std::unique_ptr<VectorSerde::Options> options =
41- kind == VectorSerde::Kind::kPresto
42- ? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
43- : std::make_unique<VectorSerde::Options>();
44- options->compressionKind =
45- common::stringToCompressionKind (queryConfig.shuffleCompressionKind ());
46- return options;
47- }
48-
49- // BroadcastWriteOperator writes input RowVectors to specified file.
35+ // / BroadcastWriteOperator writes input RowVectors to specified file.
5036class BroadcastWriteOperator : public Operator {
5137 public:
5238 BroadcastWriteOperator (
@@ -73,7 +59,10 @@ class BroadcastWriteOperator : public Operator {
7359 fmt::format (" {}/file_broadcast_{}" , basePath, makeUuid ()),
7460 planNode->maxBroadcastBytes (),
7561 8 << 20 ,
76- getVectorSerdeOptions (ctx->queryConfig (), VectorSerde::Kind::kPresto ),
62+ getVectorSerdeOptions (
63+ common::stringToCompressionKind (
64+ ctx->queryConfig ().shuffleCompressionKind ()),
65+ VectorSerde::Kind::kPresto ),
7766 operatorCtx_->pool ());
7867 }
7968
0 commit comments