-
Notifications
You must be signed in to change notification settings - Fork 1.4k
refactor: Add Iceberg connector #15581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| /* | ||
| * Copyright (c) Facebook, Inc. and its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #include "velox/connectors/hive/iceberg/IcebergConnector.h" | ||
|
|
||
| #include "velox/connectors/hive/HiveConnector.h" | ||
| #include "velox/connectors/hive/iceberg/IcebergDataSink.h" | ||
|
|
||
| namespace facebook::velox::connector::hive::iceberg { | ||
|
|
||
| namespace { | ||
|
|
||
| // Registers Iceberg partition transform functions with prefix. | ||
| // NOTE: These functions are registered for internal transform usage only. | ||
| // Upstream engines such as Prestissimo and Gluten should register the same | ||
| // functions with different prefixes to avoid conflicts. | ||
| void registerIcebergInternalFunctions(const std::string& prefix) { | ||
| static std::once_flag registerFlag; | ||
|
|
||
| std::call_once(registerFlag, [prefix]() { | ||
| functions::iceberg::registerFunctions(prefix); | ||
| }); | ||
| } | ||
|
|
||
| } // namespace | ||
|
|
||
| IcebergConnector::IcebergConnector( | ||
| const std::string& id, | ||
| std::shared_ptr<const config::ConfigBase> config, | ||
| folly::Executor* ioExecutor) | ||
| : HiveConnector(id, config, ioExecutor), | ||
| functionPrefix_(config->get<std::string>( | ||
| std::string(kIcebergFunctionPrefixConfig), | ||
| std::string(kDefaultIcebergFunctionPrefix))) { | ||
| registerIcebergInternalFunctions(functionPrefix_); | ||
| } | ||
|
|
||
| std::unique_ptr<DataSink> IcebergConnector::createDataSink( | ||
| RowTypePtr inputType, | ||
| ConnectorInsertTableHandlePtr connectorInsertTableHandle, | ||
| ConnectorQueryCtx* connectorQueryCtx, | ||
| CommitStrategy commitStrategy) { | ||
| auto icebergInsertHandle = | ||
| checked_pointer_cast<const IcebergInsertTableHandle>( | ||
| connectorInsertTableHandle); | ||
|
|
||
| return std::make_unique<IcebergDataSink>( | ||
| inputType, | ||
| icebergInsertHandle, | ||
| connectorQueryCtx, | ||
| commitStrategy, | ||
| hiveConfig_, | ||
| functionPrefix_); | ||
| } | ||
|
|
||
| } // namespace facebook::velox::connector::hive::iceberg |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| /* | ||
| * Copyright (c) Facebook, Inc. and its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| #pragma once | ||
|
|
||
| #include "velox/connectors/hive/HiveConnector.h" | ||
|
|
||
| namespace facebook::velox::connector::hive::iceberg { | ||
|
|
||
| /// TODO Add IcebergConfig class and Move these configuration properties to | ||
| /// IcebergConfig.h | ||
| static constexpr std::string_view kIcebergFunctionPrefixConfig{ | ||
| "presto.iceberg-namespace"}; | ||
| static constexpr std::string_view kDefaultIcebergFunctionPrefix{ | ||
| "$internal$.iceberg."}; | ||
|
|
||
| /// Provides Iceberg table format support. | ||
| /// - Creates HiveDataSource instances that use IcebergSplitReader for reading | ||
| /// Iceberg tables with support for delete files and schema evolution. | ||
| /// - Creates IcebergDataSink instances for writing data with Iceberg-specific | ||
| /// partition transforms and commit metadata. | ||
| class IcebergConnector final : public HiveConnector { | ||
| public: | ||
| IcebergConnector( | ||
| const std::string& id, | ||
| std::shared_ptr<const config::ConfigBase> config, | ||
| folly::Executor* ioExecutor); | ||
|
|
||
| /// Creates IcebergDataSink for writing to Iceberg tables. | ||
| /// | ||
| /// @param inputType The schema of the input data to write. | ||
| /// @param connectorInsertTableHandle Must be an IcebergInsertTableHandle | ||
| /// containing Iceberg-specific write configuration. | ||
| /// @param connectorQueryCtx Query context for the write operation. | ||
| /// @param commitStrategy Strategy for committing the write operation. Only | ||
| /// CommitStrategy::kNoCommit is supported for Iceberg tables. Files | ||
| /// are written directly with their final names and commit metadata is | ||
| /// returned for the coordinator to update the Iceberg metadata tables. | ||
| /// @return IcebergDataSink instance configured for the write operation. | ||
| std::unique_ptr<DataSink> createDataSink( | ||
| RowTypePtr inputType, | ||
| ConnectorInsertTableHandlePtr connectorInsertTableHandle, | ||
| ConnectorQueryCtx* connectorQueryCtx, | ||
| CommitStrategy commitStrategy) override; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does Iceberg have support for all commit strategies? If not, let's clarify what is supported.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, only kNoCommit is supported. |
||
|
|
||
| private: | ||
| const std::string functionPrefix_; | ||
| }; | ||
|
|
||
| class IcebergConnectorFactory final : public ConnectorFactory { | ||
| public: | ||
| static constexpr const char* kIcebergConnectorName = "iceberg"; | ||
|
|
||
| IcebergConnectorFactory() : ConnectorFactory(kIcebergConnectorName) {} | ||
|
|
||
| /// Creates a new IcebergConnector instance. | ||
| /// | ||
| /// @param id Unique identifier for this connector instance (typically the | ||
| /// catalog name). | ||
| /// @param config Connector configuration properties | ||
| /// @param ioExecutor Optional executor for asynchronous I/O operations such | ||
| /// as split preloading and file prefetching. When provided, enables | ||
| /// background file operations off the main driver thread. If nullptr, I/O | ||
| /// operations run synchronously. | ||
| /// @param cpuExecutor ConnectorFactory interface to support other connector | ||
| /// types that may need CPU-bound async work. Currently unused by | ||
| /// IcebergConnector. | ||
| /// @return Shared pointer to the newly created IcebergConnector instance | ||
| std::shared_ptr<Connector> newConnector( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you document this API? Specifically, what are ioExecutor and cpuExecutor are for and why they are optional?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Sure. |
||
| const std::string& id, | ||
| std::shared_ptr<const config::ConfigBase> config, | ||
| folly::Executor* ioExecutor = nullptr, | ||
| [[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override { | ||
| return std::make_shared<IcebergConnector>(id, config, ioExecutor); | ||
| } | ||
| }; | ||
|
|
||
| } // namespace facebook::velox::connector::hive::iceberg | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Copyright (c) Facebook, Inc. and its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #include "velox/connectors/hive/iceberg/IcebergConnector.h" | ||
| #include <gtest/gtest.h> | ||
| #include "velox/connectors/hive/HiveConfig.h" | ||
| #include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h" | ||
|
|
||
| namespace facebook::velox::connector::hive::iceberg { | ||
|
|
||
| namespace { | ||
|
|
||
| class IcebergConnectorTest : public test::IcebergTestBase { | ||
| protected: | ||
| static void resetIcebergConnector( | ||
| const std::shared_ptr<const config::ConfigBase>& config) { | ||
| unregisterConnector(test::kIcebergConnectorId); | ||
|
|
||
| IcebergConnectorFactory factory; | ||
| auto icebergConnector = | ||
| factory.newConnector(test::kIcebergConnectorId, config); | ||
| registerConnector(icebergConnector); | ||
| } | ||
| }; | ||
|
|
||
| TEST_F(IcebergConnectorTest, connectorConfiguration) { | ||
| auto customConfig = std::make_shared<config::ConfigBase>( | ||
| std::unordered_map<std::string, std::string>{ | ||
| {hive::HiveConfig::kEnableFileHandleCache, "true"}, | ||
| {hive::HiveConfig::kNumCacheFileHandles, "1000"}}); | ||
|
|
||
| resetIcebergConnector(customConfig); | ||
|
|
||
| // Verify connector was registered successfully with custom config. | ||
| auto icebergConnector = getConnector(test::kIcebergConnectorId); | ||
| ASSERT_NE(icebergConnector, nullptr); | ||
|
|
||
| auto config = icebergConnector->connectorConfig(); | ||
| ASSERT_NE(config, nullptr); | ||
|
|
||
| hive::HiveConfig hiveConfig(config); | ||
| ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled()); | ||
| ASSERT_EQ(hiveConfig.numCacheFileHandles(), 1000); | ||
| } | ||
|
|
||
| TEST_F(IcebergConnectorTest, connectorProperties) { | ||
| auto icebergConnector = getConnector(test::kIcebergConnectorId); | ||
| ASSERT_NE(icebergConnector, nullptr); | ||
|
|
||
| ASSERT_TRUE(icebergConnector->canAddDynamicFilter()); | ||
| ASSERT_TRUE(icebergConnector->supportsSplitPreload()); | ||
| ASSERT_NE(icebergConnector->ioExecutor(), nullptr); | ||
| } | ||
|
|
||
| } // namespace | ||
|
|
||
| } // namespace facebook::velox::connector::hive::iceberg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.