Skip to content

Commit 7f74716

Browse files
committed
Add IcebergConnector
1 parent 7576f4e commit 7f74716

File tree

9 files changed

+265
-33
lines changed

9 files changed

+265
-33
lines changed

velox/connectors/hive/HiveConnector.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "velox/connectors/hive/HiveDataSink.h"
2121
#include "velox/connectors/hive/HiveDataSource.h"
2222
#include "velox/connectors/hive/HivePartitionFunction.h"
23-
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
2423

2524
#include <boost/lexical_cast.hpp>
2625
#include <memory>
@@ -74,16 +73,6 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
7473
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
7574
ConnectorQueryCtx* connectorQueryCtx,
7675
CommitStrategy commitStrategy) {
77-
if (auto icebergInsertHandle =
78-
std::dynamic_pointer_cast<const iceberg::IcebergInsertTableHandle>(
79-
connectorInsertTableHandle)) {
80-
return std::make_unique<iceberg::IcebergDataSink>(
81-
inputType,
82-
icebergInsertHandle,
83-
connectorQueryCtx,
84-
commitStrategy,
85-
hiveConfig_);
86-
}
8776
auto hiveInsertHandle =
8877
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
8978
connectorInsertTableHandle);

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
velox_add_library(
1616
velox_hive_iceberg_splitreader
17+
IcebergConnector.cpp
1718
IcebergDataSink.cpp
1819
IcebergPartitionName.cpp
1920
IcebergSplit.cpp
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/IcebergConnector.h"
18+
19+
#include "velox/connectors/hive/HiveDataSource.h"
20+
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
namespace {
25+
26+
// Registers Iceberg partition transform functions with prefix.
27+
// NOTE: These functions are registered for internal transform usage only.
28+
// Upstream engines such as Prestissimo and Gluten should register the same
29+
// functions with different prefixes to avoid conflicts.
30+
void registerIcebergInternalFunctions(const std::string_view& prefix) {
31+
static std::once_flag registerFlag;
32+
33+
std::call_once(registerFlag, [prefix]() {
34+
functions::iceberg::registerFunctions(std::string(prefix));
35+
});
36+
}
37+
38+
} // namespace
39+
40+
IcebergConnector::IcebergConnector(
41+
const std::string& id,
42+
std::shared_ptr<const config::ConfigBase> config,
43+
folly::Executor* ioExecutor)
44+
: Connector(id, std::move(config)),
45+
hiveConfig_(std::make_shared<HiveConfig>(connectorConfig())),
46+
fileHandleFactory_(
47+
hiveConfig_->isFileHandleCacheEnabled()
48+
? std::make_unique<SimpleLRUCache<FileHandleKey, FileHandle>>(
49+
hiveConfig_->numCacheFileHandles())
50+
: nullptr,
51+
std::make_unique<FileHandleGenerator>(hiveConfig_->config())),
52+
ioExecutor_(ioExecutor) {
53+
registerIcebergInternalFunctions(kDefaultIcebergFunctionPrefix);
54+
if (hiveConfig_->isFileHandleCacheEnabled()) {
55+
LOG(INFO) << "Iceberg connector " << connectorId()
56+
<< " created with maximum of "
57+
<< hiveConfig_->numCacheFileHandles()
58+
<< " cached file handles with expiration of "
59+
<< hiveConfig_->fileHandleExpirationDurationMs() << "ms.";
60+
} else {
61+
LOG(INFO) << "Iceberg connector " << connectorId()
62+
<< " created with file handle cache disabled";
63+
}
64+
}
65+
66+
std::unique_ptr<DataSource> IcebergConnector::createDataSource(
67+
const RowTypePtr& outputType,
68+
const ConnectorTableHandlePtr& tableHandle,
69+
const std::unordered_map<std::string, ColumnHandlePtr>& columnHandles,
70+
ConnectorQueryCtx* connectorQueryCtx) {
71+
return std::make_unique<HiveDataSource>(
72+
outputType,
73+
tableHandle,
74+
columnHandles,
75+
&fileHandleFactory_,
76+
ioExecutor_,
77+
connectorQueryCtx,
78+
hiveConfig_);
79+
}
80+
81+
std::unique_ptr<DataSink> IcebergConnector::createDataSink(
82+
RowTypePtr inputType,
83+
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
84+
ConnectorQueryCtx* connectorQueryCtx,
85+
CommitStrategy commitStrategy) {
86+
auto icebergInsertHandle =
87+
std::dynamic_pointer_cast<const IcebergInsertTableHandle>(
88+
connectorInsertTableHandle);
89+
VELOX_USER_CHECK_NOT_NULL(
90+
icebergInsertHandle,
91+
"Iceberg connector expecting IcebergInsertTableHandle!");
92+
93+
return std::make_unique<IcebergDataSink>(
94+
inputType,
95+
icebergInsertHandle,
96+
connectorQueryCtx,
97+
commitStrategy,
98+
hiveConfig_);
99+
}
100+
101+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "velox/connectors/Connector.h"
19+
#include "velox/connectors/hive/FileHandle.h"
20+
#include "velox/connectors/hive/HiveConfig.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
/// Provides Iceberg table format support.
25+
/// - Creates HiveDataSource instances that use IcebergSplitReader for reading.
26+
/// Iceberg tables with support for delete files and schema evolution.
27+
/// - Creates IcebergDataSink instances for writing data with Iceberg-specific
28+
/// partition transforms and commit metadata.
29+
class IcebergConnector final : public Connector {
30+
public:
31+
IcebergConnector(
32+
const std::string& id,
33+
std::shared_ptr<const config::ConfigBase> config,
34+
folly::Executor* ioExecutor);
35+
36+
bool canAddDynamicFilter() const override {
37+
return true;
38+
}
39+
40+
/// Creates a data source for reading Iceberg tables. Returns a HiveDataSource
41+
/// that will use IcebergSplitReader when processing splits marked with
42+
/// table_format="hive-iceberg" in customSplitInfo.
43+
std::unique_ptr<DataSource> createDataSource(
44+
const RowTypePtr& outputType,
45+
const ConnectorTableHandlePtr& tableHandle,
46+
const ColumnHandleMap& columnHandles,
47+
ConnectorQueryCtx* connectorQueryCtx) override;
48+
49+
bool supportsSplitPreload() const override {
50+
return true;
51+
}
52+
53+
/// Creates a data sink for writing to Iceberg tables. Always returns an
54+
/// IcebergDataSink that handles Iceberg-specific partition transforms and
55+
/// generates commit metadata in the format expected by Iceberg catalogs.
56+
///
57+
/// @param inputType The schema of the input data to write
58+
/// @param connectorInsertTableHandle Must be an IcebergInsertTableHandle
59+
/// containing Iceberg-specific write configuration
60+
/// @param connectorQueryCtx Query context for the write operation
61+
/// @param commitStrategy Strategy for committing the write operation
62+
/// @return IcebergDataSink instance configured for the write operation
63+
/// @throws VeloxUserError if connectorInsertTableHandle is not an
64+
/// IcebergInsertTableHandle
65+
std::unique_ptr<DataSink> createDataSink(
66+
RowTypePtr inputType,
67+
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
68+
ConnectorQueryCtx* connectorQueryCtx,
69+
CommitStrategy commitStrategy) override;
70+
71+
folly::Executor* ioExecutor() const override {
72+
return ioExecutor_;
73+
}
74+
75+
FileHandleCacheStats fileHandleCacheStats() {
76+
return fileHandleFactory_.cacheStats();
77+
}
78+
79+
// NOTE: this is to clear file handle cache which might affect performance,
80+
// and is only used for operational purposes.
81+
FileHandleCacheStats clearFileHandleCache() {
82+
return fileHandleFactory_.clearCache();
83+
}
84+
85+
private:
86+
const std::shared_ptr<HiveConfig> hiveConfig_;
87+
FileHandleFactory fileHandleFactory_;
88+
folly::Executor* ioExecutor_;
89+
};
90+
91+
/// Factory for creating IcebergConnector instances.
92+
class IcebergConnectorFactory final : public ConnectorFactory {
93+
public:
94+
static constexpr const char* kIcebergConnectorName = "iceberg";
95+
96+
IcebergConnectorFactory() : ConnectorFactory(kIcebergConnectorName) {}
97+
98+
std::shared_ptr<Connector> newConnector(
99+
const std::string& id,
100+
std::shared_ptr<const config::ConfigBase> config,
101+
folly::Executor* ioExecutor = nullptr,
102+
folly::Executor* cpuExecutor = nullptr) override {
103+
return std::make_shared<IcebergConnector>(id, config, ioExecutor);
104+
}
105+
};
106+
107+
} // namespace facebook::velox::connector::hive::iceberg

velox/connectors/hive/iceberg/IcebergDataSink.cpp

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,6 @@
2222

2323
namespace facebook::velox::connector::hive::iceberg {
2424

25-
static constexpr std::string_view kDefaultIcebergFunctionPrefix{
26-
"$internal$.iceberg."};
27-
28-
void registerIcebergInternalFunctions(const std::string_view& prefix) {
29-
static std::once_flag registerFlag;
30-
31-
std::call_once(registerFlag, [prefix]() {
32-
functions::iceberg::registerFunctions(std::string(prefix));
33-
});
34-
}
35-
3625
IcebergInsertTableHandle::IcebergInsertTableHandle(
3726
std::vector<HiveColumnHandlePtr> inputColumns,
3827
LocationHandlePtr locationHandle,
@@ -208,9 +197,7 @@ IcebergDataSink::IcebergDataSink(
208197
partitionSpec_ != nullptr
209198
? std::make_unique<IcebergPartitionName>(partitionSpec_)
210199
: nullptr),
211-
partitionRowType_(std::move(partitionRowType)) {
212-
registerIcebergInternalFunctions(std::string(kDefaultIcebergFunctionPrefix));
213-
}
200+
partitionRowType_(std::move(partitionRowType)) {}
214201

215202
std::vector<std::string> IcebergDataSink::commitMessage() const {
216203
std::vector<std::string> commitTasks;

velox/connectors/hive/iceberg/IcebergDataSink.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,8 @@
2424

2525
namespace facebook::velox::connector::hive::iceberg {
2626

27-
/// Registers Iceberg partition transform functions with prefix.
28-
/// NOTE: These functions are registered for internal transform usage only.
29-
/// Upstream engines such as Prestissimo and Gluten should register the same
30-
/// functions with different prefixes to avoid conflicts.
31-
void registerIcebergInternalFunctions(const std::string_view& prefix);
27+
static constexpr std::string_view kDefaultIcebergFunctionPrefix{
28+
"$internal$.iceberg."};
3229

3330
/// Represents a request for Iceberg write.
3431
class IcebergInsertTableHandle final : public HiveInsertTableHandle {

velox/connectors/hive/iceberg/tests/IcebergInsertTest.cpp

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
#include "velox/connectors/hive/iceberg/IcebergConnector.h"
1718
#include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h"
1819
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
1920
#include "velox/exec/tests/utils/PlanBuilder.h"
@@ -36,9 +37,24 @@ class IcebergInsertTest : public test::IcebergTestBase {
3637

3738
auto splits = createSplitsForDirectory(dataPath);
3839
ASSERT_EQ(splits.size(), commitTasks.size());
39-
auto plan = exec::test::PlanBuilder().tableScan(rowType).planNode();
40+
auto plan = exec::test::PlanBuilder()
41+
.startTableScan()
42+
.connectorId(test::kIcebergConnectorId)
43+
.outputType(rowType)
44+
.endTableScan()
45+
.planNode();
4046
exec::test::AssertQueryBuilder(plan).splits(splits).assertResults(vectors);
4147
}
48+
49+
static void resetIcebergConnector(
50+
const std::shared_ptr<const config::ConfigBase>& config) {
51+
unregisterConnector(test::kIcebergConnectorId);
52+
53+
IcebergConnectorFactory factory;
54+
auto icebergConnector =
55+
factory.newConnector(test::kIcebergConnectorId, config);
56+
registerConnector(icebergConnector);
57+
}
4258
};
4359

4460
TEST_F(IcebergInsertTest, basic) {
@@ -72,5 +88,26 @@ TEST_F(IcebergInsertTest, bigDecimal) {
7288
}
7389
#endif
7490

91+
TEST_F(IcebergInsertTest, connectorConfiguration) {
92+
auto customConfig = std::make_shared<config::ConfigBase>(
93+
std::unordered_map<std::string, std::string>{
94+
{"file-handle-cache-enabled", "true"},
95+
{"cache-max-file-handles", "1000"}});
96+
97+
resetIcebergConnector(customConfig);
98+
99+
auto rowType = ROW({"c1", "c2"}, {INTEGER(), VARCHAR()});
100+
test(rowType);
101+
}
102+
103+
TEST_F(IcebergInsertTest, connectorProperties) {
104+
auto icebergConnector = getConnector(test::kIcebergConnectorId);
105+
ASSERT_NE(icebergConnector, nullptr);
106+
107+
ASSERT_TRUE(icebergConnector->canAddDynamicFilter());
108+
ASSERT_TRUE(icebergConnector->supportsSplitPreload());
109+
ASSERT_NE(icebergConnector->ioExecutor(), nullptr);
110+
}
111+
75112
} // namespace
76113
} // namespace facebook::velox::connector::hive::iceberg

velox/connectors/hive/iceberg/tests/IcebergTestBase.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h"
1818

1919
#include <filesystem>
20-
20+
#include "velox/connectors/hive/iceberg/IcebergConnector.h"
2121
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
2222
#include "velox/connectors/hive/iceberg/PartitionSpec.h"
2323
#include "velox/expression/Expr.h"
@@ -36,6 +36,15 @@ void IcebergTestBase::SetUp() {
3636
functions::iceberg::registerFunctions(
3737
std::string(kDefaultTestIcebergFunctionNamePrefix));
3838

39+
// Register IcebergConnector.
40+
IcebergConnectorFactory icebergFactory;
41+
auto icebergConnector = icebergFactory.newConnector(
42+
kIcebergConnectorId,
43+
std::make_shared<config::ConfigBase>(
44+
std::unordered_map<std::string, std::string>()),
45+
ioExecutor_.get());
46+
registerConnector(icebergConnector);
47+
3948
connectorSessionProperties_ = std::make_shared<config::ConfigBase>(
4049
std::unordered_map<std::string, std::string>(), true);
4150

@@ -57,6 +66,8 @@ void IcebergTestBase::TearDown() {
5766
opPool_.reset();
5867
root_.reset();
5968
queryCtx_.reset();
69+
// Unregister IcebergConnector
70+
unregisterConnector(kIcebergConnectorId);
6071
HiveConnectorTestBase::TearDown();
6172
}
6273

@@ -232,7 +243,7 @@ IcebergTestBase::createSplitsForDirectory(const std::string& directory) {
232243
->openFileForRead(filePath);
233244
splits.push_back(
234245
std::make_shared<HiveIcebergSplit>(
235-
exec::test::kHiveConnectorId,
246+
kIcebergConnectorId,
236247
filePath,
237248
fileFormat_,
238249
0,

velox/connectors/hive/iceberg/tests/IcebergTestBase.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ namespace facebook::velox::connector::hive::iceberg::test {
3232
constexpr std::string_view kDefaultTestIcebergFunctionNamePrefix{
3333
"$internal$.test_iceberg."};
3434

35+
static const std::string kIcebergConnectorId = "test-iceberg";
36+
3537
struct PartitionField {
3638
// 0-based column index.
3739
int32_t id;

0 commit comments

Comments
 (0)