Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"

#include <boost/lexical_cast.hpp>
#include <memory>
Expand Down Expand Up @@ -74,16 +73,6 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy) {
if (auto icebergInsertHandle =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

std::dynamic_pointer_cast<const iceberg::IcebergInsertTableHandle>(
connectorInsertTableHandle)) {
return std::make_unique<iceberg::IcebergDataSink>(
inputType,
icebergInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
}
auto hiveInsertHandle =
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
connectorInsertTableHandle);
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class HiveConnectorFactory : public ConnectorFactory {
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
[[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, ioExecutor);
}
};
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

velox_add_library(
velox_hive_iceberg_splitreader
IcebergConnector.cpp
IcebergDataSink.cpp
IcebergPartitionName.cpp
IcebergSplit.cpp
Expand Down
69 changes: 69 additions & 0 deletions velox/connectors/hive/iceberg/IcebergConnector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/IcebergConnector.h"

#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"

namespace facebook::velox::connector::hive::iceberg {

namespace {

// Registers Iceberg partition transform functions with prefix.
// NOTE: These functions are registered for internal transform usage only.
// Upstream engines such as Prestissimo and Gluten should register the same
// functions with different prefixes to avoid conflicts.
void registerIcebergInternalFunctions(const std::string& prefix) {
static std::once_flag registerFlag;

std::call_once(registerFlag, [prefix]() {
functions::iceberg::registerFunctions(prefix);
});
}

} // namespace

IcebergConnector::IcebergConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* ioExecutor)
: HiveConnector(id, config, ioExecutor),
functionPrefix_(config->get<std::string>(
std::string(kIcebergFunctionPrefixConfig),
std::string(kDefaultIcebergFunctionPrefix))) {
registerIcebergInternalFunctions(functionPrefix_);
}

std::unique_ptr<DataSink> IcebergConnector::createDataSink(
RowTypePtr inputType,
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy) {
auto icebergInsertHandle =
checked_pointer_cast<const IcebergInsertTableHandle>(
connectorInsertTableHandle);

return std::make_unique<IcebergDataSink>(
inputType,
icebergInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_,
functionPrefix_);
}

} // namespace facebook::velox::connector::hive::iceberg
90 changes: 90 additions & 0 deletions velox/connectors/hive/iceberg/IcebergConnector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/connectors/hive/HiveConnector.h"

namespace facebook::velox::connector::hive::iceberg {

/// TODO Add IcebergConfig class and Move these configuration properties to
/// IcebergConfig.h
static constexpr std::string_view kIcebergFunctionPrefixConfig{
"presto.iceberg-namespace"};
static constexpr std::string_view kDefaultIcebergFunctionPrefix{
"$internal$.iceberg."};

/// Provides Iceberg table format support.
/// - Creates HiveDataSource instances that use IcebergSplitReader for reading
/// Iceberg tables with support for delete files and schema evolution.
/// - Creates IcebergDataSink instances for writing data with Iceberg-specific
/// partition transforms and commit metadata.
class IcebergConnector final : public HiveConnector {
public:
IcebergConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* ioExecutor);

/// Creates IcebergDataSink for writing to Iceberg tables.
///
/// @param inputType The schema of the input data to write.
/// @param connectorInsertTableHandle Must be an IcebergInsertTableHandle
/// containing Iceberg-specific write configuration.
/// @param connectorQueryCtx Query context for the write operation.
/// @param commitStrategy Strategy for committing the write operation. Only
/// CommitStrategy::kNoCommit is supported for Iceberg tables. Files
/// are written directly with their final names and commit metadata is
/// returned for the coordinator to update the Iceberg metadata tables.
/// @return IcebergDataSink instance configured for the write operation.
std::unique_ptr<DataSink> createDataSink(
RowTypePtr inputType,
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy) override;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg have support for all commit strategies? If not, let's clarify what is supported.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, only kNoCommit is supported.


private:
const std::string functionPrefix_;
};

class IcebergConnectorFactory final : public ConnectorFactory {
public:
static constexpr const char* kIcebergConnectorName = "iceberg";

IcebergConnectorFactory() : ConnectorFactory(kIcebergConnectorName) {}

/// Creates a new IcebergConnector instance.
///
/// @param id Unique identifier for this connector instance (typically the
/// catalog name).
/// @param config Connector configuration properties
/// @param ioExecutor Optional executor for asynchronous I/O operations such
/// as split preloading and file prefetching. When provided, enables
/// background file operations off the main driver thread. If nullptr, I/O
/// operations run synchronously.
/// @param cpuExecutor ConnectorFactory interface to support other connector
/// types that may need CPU-bound async work. Currently unused by
/// IcebergConnector.
/// @return Shared pointer to the newly created IcebergConnector instance
std::shared_ptr<Connector> newConnector(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you document this API? Specifically, what are ioExecutor and cpuExecutor are for and why they are optional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Sure. cpuExecutor is not used by Hive and Iceberg connector.

const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* ioExecutor = nullptr,
[[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<IcebergConnector>(id, config, ioExecutor);
}
};

} // namespace facebook::velox::connector::hive::iceberg
41 changes: 15 additions & 26 deletions velox/connectors/hive/iceberg/IcebergDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@

namespace facebook::velox::connector::hive::iceberg {

static constexpr std::string_view kDefaultIcebergFunctionPrefix{
"$internal$.iceberg."};

void registerIcebergInternalFunctions(const std::string_view& prefix) {
static std::once_flag registerFlag;

std::call_once(registerFlag, [prefix]() {
functions::iceberg::registerFunctions(std::string(prefix));
});
}

IcebergInsertTableHandle::IcebergInsertTableHandle(
std::vector<HiveColumnHandlePtr> inputColumns,
LocationHandlePtr locationHandle,
Expand Down Expand Up @@ -148,7 +137,8 @@ IcebergDataSink::IcebergDataSink(
IcebergInsertTableHandlePtr insertTableHandle,
const ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig)
const std::shared_ptr<const HiveConfig>& hiveConfig,
const std::string& functionPrefix)
: IcebergDataSink(
std::move(inputType),
insertTableHandle,
Expand All @@ -158,7 +148,8 @@ IcebergDataSink::IcebergDataSink(
createPartitionChannels(
insertTableHandle->inputColumns(),
insertTableHandle->partitionSpec()),
createPartitionRowType(insertTableHandle->partitionSpec())) {}
createPartitionRowType(insertTableHandle->partitionSpec()),
functionPrefix) {}

IcebergDataSink::IcebergDataSink(
RowTypePtr inputType,
Expand All @@ -167,7 +158,8 @@ IcebergDataSink::IcebergDataSink(
CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const std::vector<column_index_t>& partitionChannels,
RowTypePtr partitionRowType)
RowTypePtr partitionRowType,
const std::string& functionPrefix)
: HiveDataSink(
inputType,
insertTableHandle,
Expand Down Expand Up @@ -195,22 +187,19 @@ IcebergDataSink::IcebergDataSink(
: nullptr),
partitionSpec_(insertTableHandle->partitionSpec()),
transformEvaluator_(
!partitionChannels.empty()
? std::make_unique<TransformEvaluator>(
TransformExprBuilder::toExpressions(
partitionSpec_,
partitionChannels_,
inputType_,
std::string(kDefaultIcebergFunctionPrefix)),
connectorQueryCtx_)
: nullptr),
!partitionChannels.empty() ? std::make_unique<TransformEvaluator>(
TransformExprBuilder::toExpressions(
partitionSpec_,
partitionChannels_,
inputType_,
functionPrefix),
connectorQueryCtx_)
: nullptr),
icebergPartitionName_(
partitionSpec_ != nullptr
? std::make_unique<IcebergPartitionName>(partitionSpec_)
: nullptr),
partitionRowType_(std::move(partitionRowType)) {
registerIcebergInternalFunctions(std::string(kDefaultIcebergFunctionPrefix));
}
partitionRowType_(std::move(partitionRowType)) {}

std::vector<std::string> IcebergDataSink::commitMessage() const {
std::vector<std::string> commitTasks;
Expand Down
12 changes: 4 additions & 8 deletions velox/connectors/hive/iceberg/IcebergDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@

namespace facebook::velox::connector::hive::iceberg {

/// Registers Iceberg partition transform functions with prefix.
/// NOTE: These functions are registered for internal transform usage only.
/// Upstream engines such as Prestissimo and Gluten should register the same
/// functions with different prefixes to avoid conflicts.
void registerIcebergInternalFunctions(const std::string_view& prefix);

/// Represents a request for Iceberg write.
class IcebergInsertTableHandle final : public HiveInsertTableHandle {
public:
Expand Down Expand Up @@ -93,7 +87,8 @@ class IcebergDataSink : public HiveDataSink {
IcebergInsertTableHandlePtr insertTableHandle,
const ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig);
const std::shared_ptr<const HiveConfig>& hiveConfig,
const std::string& functionPrefix);

/// Generates Iceberg-specific commit messages for all writers containing
/// metadata about written files. Creates a JSON object for each writer
Expand Down Expand Up @@ -125,7 +120,8 @@ class IcebergDataSink : public HiveDataSink {
CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const std::vector<column_index_t>& partitionChannels,
RowTypePtr partitionRowType);
RowTypePtr partitionRowType,
const std::string& functionPrefix);

void computePartitionAndBucketIds(const RowVectorPtr& input) override;

Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/iceberg/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ if(NOT VELOX_DISABLE_GOOGLETEST)

add_executable(
velox_hive_iceberg_insert_test
IcebergConnectorTest.cpp
IcebergInsertTest.cpp
IcebergTestBase.cpp
Main.cpp
Expand Down
70 changes: 70 additions & 0 deletions velox/connectors/hive/iceberg/tests/IcebergConnectorTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/IcebergConnector.h"
#include <gtest/gtest.h>
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h"

namespace facebook::velox::connector::hive::iceberg {

namespace {

class IcebergConnectorTest : public test::IcebergTestBase {
protected:
static void resetIcebergConnector(
const std::shared_ptr<const config::ConfigBase>& config) {
unregisterConnector(test::kIcebergConnectorId);

IcebergConnectorFactory factory;
auto icebergConnector =
factory.newConnector(test::kIcebergConnectorId, config);
registerConnector(icebergConnector);
}
};

TEST_F(IcebergConnectorTest, connectorConfiguration) {
auto customConfig = std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>{
{hive::HiveConfig::kEnableFileHandleCache, "true"},
{hive::HiveConfig::kNumCacheFileHandles, "1000"}});

resetIcebergConnector(customConfig);

// Verify connector was registered successfully with custom config.
auto icebergConnector = getConnector(test::kIcebergConnectorId);
ASSERT_NE(icebergConnector, nullptr);

auto config = icebergConnector->connectorConfig();
ASSERT_NE(config, nullptr);

hive::HiveConfig hiveConfig(config);
ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled());
ASSERT_EQ(hiveConfig.numCacheFileHandles(), 1000);
}

TEST_F(IcebergConnectorTest, connectorProperties) {
auto icebergConnector = getConnector(test::kIcebergConnectorId);
ASSERT_NE(icebergConnector, nullptr);

ASSERT_TRUE(icebergConnector->canAddDynamicFilter());
ASSERT_TRUE(icebergConnector->supportsSplitPreload());
ASSERT_NE(icebergConnector->ioExecutor(), nullptr);
}

} // namespace

} // namespace facebook::velox::connector::hive::iceberg
Loading
Loading