Skip to content

Commit e967787

Browse files
committed
Add IcebergConnector
1 parent 7576f4e commit e967787

14 files changed

+278
-57
lines changed

velox/connectors/hive/HiveConnector.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "velox/connectors/hive/HiveDataSink.h"
2121
#include "velox/connectors/hive/HiveDataSource.h"
2222
#include "velox/connectors/hive/HivePartitionFunction.h"
23-
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
2423

2524
#include <boost/lexical_cast.hpp>
2625
#include <memory>
@@ -74,16 +73,6 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
7473
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
7574
ConnectorQueryCtx* connectorQueryCtx,
7675
CommitStrategy commitStrategy) {
77-
if (auto icebergInsertHandle =
78-
std::dynamic_pointer_cast<const iceberg::IcebergInsertTableHandle>(
79-
connectorInsertTableHandle)) {
80-
return std::make_unique<iceberg::IcebergDataSink>(
81-
inputType,
82-
icebergInsertHandle,
83-
connectorQueryCtx,
84-
commitStrategy,
85-
hiveConfig_);
86-
}
8776
auto hiveInsertHandle =
8877
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
8978
connectorInsertTableHandle);

velox/connectors/hive/HiveConnector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class HiveConnectorFactory : public ConnectorFactory {
8989
const std::string& id,
9090
std::shared_ptr<const config::ConfigBase> config,
9191
folly::Executor* ioExecutor = nullptr,
92-
folly::Executor* cpuExecutor = nullptr) override {
92+
[[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override {
9393
return std::make_shared<HiveConnector>(id, config, ioExecutor);
9494
}
9595
};

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
velox_add_library(
1616
velox_hive_iceberg_splitreader
17+
IcebergConnector.cpp
1718
IcebergDataSink.cpp
1819
IcebergPartitionName.cpp
1920
IcebergSplit.cpp
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/IcebergConnector.h"
18+
19+
#include "velox/connectors/hive/HiveConnector.h"
20+
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
namespace {
25+
26+
// Registers Iceberg partition transform functions with prefix.
27+
// NOTE: These functions are registered for internal transform usage only.
28+
// Upstream engines such as Prestissimo and Gluten should register the same
29+
// functions with different prefixes to avoid conflicts.
30+
void registerIcebergInternalFunctions(const std::string& prefix) {
31+
static std::once_flag registerFlag;
32+
33+
std::call_once(registerFlag, [prefix]() {
34+
functions::iceberg::registerFunctions(prefix);
35+
});
36+
}
37+
38+
} // namespace
39+
40+
IcebergConnector::IcebergConnector(
41+
const std::string& id,
42+
std::shared_ptr<const config::ConfigBase> config,
43+
folly::Executor* ioExecutor)
44+
: HiveConnector(id, config, ioExecutor),
45+
functionPrefix_(config->get<std::string>(
46+
std::string(kIcebergFunctionPrefixConfig),
47+
std::string(kDefaultIcebergFunctionPrefix))) {
48+
registerIcebergInternalFunctions(functionPrefix_);
49+
}
50+
51+
std::unique_ptr<DataSink> IcebergConnector::createDataSink(
52+
RowTypePtr inputType,
53+
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
54+
ConnectorQueryCtx* connectorQueryCtx,
55+
CommitStrategy commitStrategy) {
56+
auto icebergInsertHandle =
57+
checked_pointer_cast<const IcebergInsertTableHandle>(
58+
connectorInsertTableHandle);
59+
60+
return std::make_unique<IcebergDataSink>(
61+
inputType,
62+
icebergInsertHandle,
63+
connectorQueryCtx,
64+
commitStrategy,
65+
hiveConfig_,
66+
functionPrefix_);
67+
}
68+
69+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "velox/connectors/hive/HiveConnector.h"
19+
20+
namespace facebook::velox::connector::hive::iceberg {
21+
22+
/// TODO Add IcebergConfig class and Move these configuration properties to
23+
/// IcebergConfig.h
24+
static constexpr std::string_view kIcebergFunctionPrefixConfig{
25+
"presto.iceberg-namespace"};
26+
static constexpr std::string_view kDefaultIcebergFunctionPrefix{
27+
"$internal$.iceberg."};
28+
29+
/// Provides Iceberg table format support.
30+
/// - Creates HiveDataSource instances that use IcebergSplitReader for reading
31+
/// Iceberg tables with support for delete files and schema evolution.
32+
/// - Creates IcebergDataSink instances for writing data with Iceberg-specific
33+
/// partition transforms and commit metadata.
34+
class IcebergConnector final : public HiveConnector {
35+
public:
36+
IcebergConnector(
37+
const std::string& id,
38+
std::shared_ptr<const config::ConfigBase> config,
39+
folly::Executor* ioExecutor);
40+
41+
/// Creates IcebergDataSink for writing to Iceberg tables.
42+
///
43+
/// @param inputType The schema of the input data to write.
44+
/// @param connectorInsertTableHandle Must be an IcebergInsertTableHandle
45+
/// containing Iceberg-specific write configuration.
46+
/// @param connectorQueryCtx Query context for the write operation.
47+
/// @param commitStrategy Strategy for committing the write operation. Only
48+
/// CommitStrategy::kNoCommit is supported for Iceberg tables. Files
49+
/// are written directly with their final names and commit metadata is
50+
/// returned for the coordinator to update the Iceberg metadata tables.
51+
/// @return IcebergDataSink instance configured for the write operation.
52+
std::unique_ptr<DataSink> createDataSink(
53+
RowTypePtr inputType,
54+
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
55+
ConnectorQueryCtx* connectorQueryCtx,
56+
CommitStrategy commitStrategy) override;
57+
58+
private:
59+
const std::string functionPrefix_;
60+
};
61+
62+
class IcebergConnectorFactory final : public ConnectorFactory {
63+
public:
64+
static constexpr const char* kIcebergConnectorName = "iceberg";
65+
66+
IcebergConnectorFactory() : ConnectorFactory(kIcebergConnectorName) {}
67+
68+
/// Creates a new IcebergConnector instance.
69+
///
70+
/// @param id Unique identifier for this connector instance (typically the
71+
/// catalog name).
72+
/// @param config Connector configuration properties
73+
/// @param ioExecutor Optional executor for asynchronous I/O operations such
74+
/// as split preloading and file prefetching. When provided, enables
75+
/// background file operations off the main driver thread. If nullptr, I/O
76+
/// operations run synchronously.
77+
/// @param cpuExecutor ConnectorFactory interface to support other connector
78+
/// types that may need CPU-bound async work. Currently unused by
79+
/// IcebergConnector.
80+
/// @return Shared pointer to the newly created IcebergConnector instance
81+
std::shared_ptr<Connector> newConnector(
82+
const std::string& id,
83+
std::shared_ptr<const config::ConfigBase> config,
84+
folly::Executor* ioExecutor = nullptr,
85+
[[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override {
86+
return std::make_shared<IcebergConnector>(id, config, ioExecutor);
87+
}
88+
};
89+
90+
} // namespace facebook::velox::connector::hive::iceberg

velox/connectors/hive/iceberg/IcebergDataSink.cpp

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,6 @@
2222

2323
namespace facebook::velox::connector::hive::iceberg {
2424

25-
static constexpr std::string_view kDefaultIcebergFunctionPrefix{
26-
"$internal$.iceberg."};
27-
28-
void registerIcebergInternalFunctions(const std::string_view& prefix) {
29-
static std::once_flag registerFlag;
30-
31-
std::call_once(registerFlag, [prefix]() {
32-
functions::iceberg::registerFunctions(std::string(prefix));
33-
});
34-
}
35-
3625
IcebergInsertTableHandle::IcebergInsertTableHandle(
3726
std::vector<HiveColumnHandlePtr> inputColumns,
3827
LocationHandlePtr locationHandle,
@@ -148,7 +137,8 @@ IcebergDataSink::IcebergDataSink(
148137
IcebergInsertTableHandlePtr insertTableHandle,
149138
const ConnectorQueryCtx* connectorQueryCtx,
150139
CommitStrategy commitStrategy,
151-
const std::shared_ptr<const HiveConfig>& hiveConfig)
140+
const std::shared_ptr<const HiveConfig>& hiveConfig,
141+
const std::string& functionPrefix)
152142
: IcebergDataSink(
153143
std::move(inputType),
154144
insertTableHandle,
@@ -158,7 +148,8 @@ IcebergDataSink::IcebergDataSink(
158148
createPartitionChannels(
159149
insertTableHandle->inputColumns(),
160150
insertTableHandle->partitionSpec()),
161-
createPartitionRowType(insertTableHandle->partitionSpec())) {}
151+
createPartitionRowType(insertTableHandle->partitionSpec()),
152+
functionPrefix) {}
162153

163154
IcebergDataSink::IcebergDataSink(
164155
RowTypePtr inputType,
@@ -167,7 +158,8 @@ IcebergDataSink::IcebergDataSink(
167158
CommitStrategy commitStrategy,
168159
const std::shared_ptr<const HiveConfig>& hiveConfig,
169160
const std::vector<column_index_t>& partitionChannels,
170-
RowTypePtr partitionRowType)
161+
RowTypePtr partitionRowType,
162+
const std::string& functionPrefix)
171163
: HiveDataSink(
172164
inputType,
173165
insertTableHandle,
@@ -195,22 +187,19 @@ IcebergDataSink::IcebergDataSink(
195187
: nullptr),
196188
partitionSpec_(insertTableHandle->partitionSpec()),
197189
transformEvaluator_(
198-
!partitionChannels.empty()
199-
? std::make_unique<TransformEvaluator>(
200-
TransformExprBuilder::toExpressions(
201-
partitionSpec_,
202-
partitionChannels_,
203-
inputType_,
204-
std::string(kDefaultIcebergFunctionPrefix)),
205-
connectorQueryCtx_)
206-
: nullptr),
190+
!partitionChannels.empty() ? std::make_unique<TransformEvaluator>(
191+
TransformExprBuilder::toExpressions(
192+
partitionSpec_,
193+
partitionChannels_,
194+
inputType_,
195+
functionPrefix),
196+
connectorQueryCtx_)
197+
: nullptr),
207198
icebergPartitionName_(
208199
partitionSpec_ != nullptr
209200
? std::make_unique<IcebergPartitionName>(partitionSpec_)
210201
: nullptr),
211-
partitionRowType_(std::move(partitionRowType)) {
212-
registerIcebergInternalFunctions(std::string(kDefaultIcebergFunctionPrefix));
213-
}
202+
partitionRowType_(std::move(partitionRowType)) {}
214203

215204
std::vector<std::string> IcebergDataSink::commitMessage() const {
216205
std::vector<std::string> commitTasks;

velox/connectors/hive/iceberg/IcebergDataSink.h

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,6 @@
2424

2525
namespace facebook::velox::connector::hive::iceberg {
2626

27-
/// Registers Iceberg partition transform functions with prefix.
28-
/// NOTE: These functions are registered for internal transform usage only.
29-
/// Upstream engines such as Prestissimo and Gluten should register the same
30-
/// functions with different prefixes to avoid conflicts.
31-
void registerIcebergInternalFunctions(const std::string_view& prefix);
32-
3327
/// Represents a request for Iceberg write.
3428
class IcebergInsertTableHandle final : public HiveInsertTableHandle {
3529
public:
@@ -93,7 +87,8 @@ class IcebergDataSink : public HiveDataSink {
9387
IcebergInsertTableHandlePtr insertTableHandle,
9488
const ConnectorQueryCtx* connectorQueryCtx,
9589
CommitStrategy commitStrategy,
96-
const std::shared_ptr<const HiveConfig>& hiveConfig);
90+
const std::shared_ptr<const HiveConfig>& hiveConfig,
91+
const std::string& functionPrefix);
9792

9893
/// Generates Iceberg-specific commit messages for all writers containing
9994
/// metadata about written files. Creates a JSON object for each writer
@@ -125,7 +120,8 @@ class IcebergDataSink : public HiveDataSink {
125120
CommitStrategy commitStrategy,
126121
const std::shared_ptr<const HiveConfig>& hiveConfig,
127122
const std::vector<column_index_t>& partitionChannels,
128-
RowTypePtr partitionRowType);
123+
RowTypePtr partitionRowType,
124+
const std::string& functionPrefix);
129125

130126
void computePartitionAndBucketIds(const RowVectorPtr& input) override;
131127

velox/connectors/hive/iceberg/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ if(NOT VELOX_DISABLE_GOOGLETEST)
5959

6060
add_executable(
6161
velox_hive_iceberg_insert_test
62+
IcebergConnectorTest.cpp
6263
IcebergInsertTest.cpp
6364
IcebergTestBase.cpp
6465
Main.cpp
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/IcebergConnector.h"
18+
#include <gtest/gtest.h>
19+
#include "velox/connectors/hive/HiveConfig.h"
20+
#include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
namespace {
25+
26+
class IcebergConnectorTest : public test::IcebergTestBase {
27+
protected:
28+
static void resetIcebergConnector(
29+
const std::shared_ptr<const config::ConfigBase>& config) {
30+
unregisterConnector(test::kIcebergConnectorId);
31+
32+
IcebergConnectorFactory factory;
33+
auto icebergConnector =
34+
factory.newConnector(test::kIcebergConnectorId, config);
35+
registerConnector(icebergConnector);
36+
}
37+
};
38+
39+
TEST_F(IcebergConnectorTest, connectorConfiguration) {
40+
auto customConfig = std::make_shared<config::ConfigBase>(
41+
std::unordered_map<std::string, std::string>{
42+
{hive::HiveConfig::kEnableFileHandleCache, "true"},
43+
{hive::HiveConfig::kNumCacheFileHandles, "1000"}});
44+
45+
resetIcebergConnector(customConfig);
46+
47+
// Verify connector was registered successfully with custom config.
48+
auto icebergConnector = getConnector(test::kIcebergConnectorId);
49+
ASSERT_NE(icebergConnector, nullptr);
50+
51+
auto config = icebergConnector->connectorConfig();
52+
ASSERT_NE(config, nullptr);
53+
54+
hive::HiveConfig hiveConfig(config);
55+
ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled());
56+
ASSERT_EQ(hiveConfig.numCacheFileHandles(), 1000);
57+
}
58+
59+
TEST_F(IcebergConnectorTest, connectorProperties) {
60+
auto icebergConnector = getConnector(test::kIcebergConnectorId);
61+
ASSERT_NE(icebergConnector, nullptr);
62+
63+
ASSERT_TRUE(icebergConnector->canAddDynamicFilter());
64+
ASSERT_TRUE(icebergConnector->supportsSplitPreload());
65+
ASSERT_NE(icebergConnector->ioExecutor(), nullptr);
66+
}
67+
68+
} // namespace
69+
70+
} // namespace facebook::velox::connector::hive::iceberg

0 commit comments

Comments
 (0)