Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ SystemConfig::SystemConfig() {
STR_PROP(kSpillerFileCreateConfig, ""),
STR_PROP(kSpillerDirectoryCreateConfig, ""),
NONE_PROP(kSpillerSpillPath),
STR_PROP(kBroadcasterDirectoryCreateConfig, ""),
NUM_PROP(kShutdownOnsetSec, 10),
NUM_PROP(kSystemMemoryGb, 57),
BOOL_PROP(kSystemMemPushbackEnabled, false),
Expand Down Expand Up @@ -569,6 +570,11 @@ std::string SystemConfig::spillerDirectoryCreateConfig() const {
return optionalProperty<std::string>(kSpillerDirectoryCreateConfig).value();
}

std::string SystemConfig::broadcasterDirectoryCreateConfig() const {
return optionalProperty<std::string>(kBroadcasterDirectoryCreateConfig)
.value();
}

folly::Optional<std::string> SystemConfig::spillerSpillPath() const {
return optionalProperty(kSpillerSpillPath);
}
Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ class SystemConfig : public ConfigBase {

static constexpr std::string_view kSpillerSpillPath{
"experimental.spiller-spill-path"};

/// Config used to create broadcast directories. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr std::string_view kBroadcasterDirectoryCreateConfig{
"broadcaster.directory-create-config"};

static constexpr std::string_view kShutdownOnsetSec{"shutdown-onset-sec"};

/// Memory allocation limit enforced via internal memory allocator.
Expand Down Expand Up @@ -975,6 +982,8 @@ class SystemConfig : public ConfigBase {

std::string spillerDirectoryCreateConfig() const;

std::string broadcasterDirectoryCreateConfig() const;

folly::Optional<std::string> spillerSpillPath() const;

int32_t shutdownOnsetSec() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/operators/BroadcastFile.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/OperatorUtils.h"
Expand Down Expand Up @@ -54,7 +55,18 @@ class BroadcastWriteOperator : public Operator {
const auto& basePath = planNode->basePath();
VELOX_CHECK(!basePath.empty(), "Base path for broadcast files is empty!");
auto fileSystem = velox::filesystems::getFileSystem(basePath, nullptr);
fileSystem->mkdir(basePath);

const SystemConfig* systemConfig = SystemConfig::instance();
const std::string directoryCreateConfig =
systemConfig->broadcasterDirectoryCreateConfig();
velox::filesystems::DirectoryOptions dirOptions;
if (!directoryCreateConfig.empty()) {
dirOptions.values.insert(
{velox::filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
directoryCreateConfig});
}
fileSystem->mkdir(basePath, dirOptions);

fileBroadcastWriter_ = std::make_unique<BroadcastFileWriter>(
fmt::format("{}/file_broadcast_{}", basePath, makeUuid()),
planNode->maxBroadcastBytes(),
Expand Down
Loading