-
Notifications
You must be signed in to change notification settings - Fork 1.4k
refactor: Add Iceberg connector #15581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
refactor: Add Iceberg connector #15581
Conversation
✅ Deploy Preview for meta-velox canceled.
|
|
@mbasmanova This is a follow up PR from discussion #15440 (comment). Code changes in Prestissimo: prestodb/presto#26661 |
|
|
||
| // Register IcebergConnector. | ||
| IcebergConnectorFactory icebergFactory; | ||
| auto icebergConnector = icebergFactory.newConnector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a way to specify the prefix to use for transform functions? This way, the application can control the contents of the function registry better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This makes sense.
But newConnector is an interface from ConnectorFactory, we cannot adding a new parameter to this method to specifying the prefix since the parameter doesn't mean anything to other connector. And I see Prestissimo calls this method for all types of connector with same parametes, so we cannot adding a new method either otherwise it requires special handling from caller.
Instead I hard-code the prefix in IcebergConnectorFactory::newConnector and pass it to IcebergConnector, then pass it to IcebergDataSink. This prefix is for internal use only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is desirable to avoid hard-coding this prefix. Can we define connector-specific configuration property and use that instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I looked at this too. But seems this way requires a new entry in the catalog property file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that this prefix is primarily for internal use, and therefore the application may not need to specify it explicitly in the catalog property file. And we still need some logic to amend this if the property is missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that currently a single application (Prestissimo) imposes restrictions on the Connector which may be used in other applications. If these other applications impose their own restrictions that conflict with restrictions imposed by Prestissimo there will be no way to satisfy these. Hence, it would be best to avoid such restrictions. Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. Yes, understood now.
Update the code to retrieve this prefix from config and setting the default prefix if not present.
| } | ||
| #endif | ||
|
|
||
| TEST_F(IcebergInsertTest, connectorConfiguration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this test belongs to 'insert' test? How do we verify that configuration properties do take effect and not just ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
Moved them to a new test file.
| test(rowType); | ||
| } | ||
|
|
||
| TEST_F(IcebergInsertTest, connectorProperties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test belong here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, moved.
| exec::test::AssertQueryBuilder(plan).splits(splits).assertResults(vectors); | ||
| } | ||
|
|
||
| static void resetIcebergConnector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this go into xxxTestBase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to new test file.
| /// Upstream engines such as Prestissimo and Gluten should register the same | ||
| /// functions with different prefixes to avoid conflicts. | ||
| void registerIcebergInternalFunctions(const std::string_view& prefix); | ||
| static constexpr std::string_view kDefaultIcebergFunctionPrefix{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to remove this constant altogether and let the application specify the prefix when registering the connector.
|
|
||
| IcebergConnectorFactory() : ConnectorFactory(kIcebergConnectorName) {} | ||
|
|
||
| std::shared_ptr<Connector> newConnector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you document this API? Specifically, what are ioExecutor and cpuExecutor are for and why they are optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Sure. cpuExecutor is not used by Hive and Iceberg connector.
| auto icebergInsertHandle = | ||
| std::dynamic_pointer_cast<const IcebergInsertTableHandle>( | ||
| connectorInsertTableHandle); | ||
| VELOX_USER_CHECK_NOT_NULL( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use checked_poiner_cast from velox/common/Casts.h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| ConnectorInsertTableHandlePtr connectorInsertTableHandle, | ||
| ConnectorQueryCtx* connectorQueryCtx, | ||
| CommitStrategy commitStrategy) { | ||
| if (auto icebergInsertHandle = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
b7f5029 to
80633b7
Compare
|
|
||
| namespace facebook::velox::connector::hive::iceberg { | ||
|
|
||
| static constexpr std::string_view kIcebergFunctionPrefix{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want something similar to velox/connectors/hive/HiveConfig.h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agree. I actually added IcebergConfig.h and then removed it since I only need one property in this PR.
Can I open another PR to add IcebergConfig.h? It requires some changes on other Iceberg classes too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Let's then add a TODO here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The names kIcebergFunctionPrefix and kDefaultIcebergFunctionPrefix suggest that these are both prefixes... however one is the name of the config property whose value is the prefix and the other is the prefix. Perhaps, rename kIcebergFunctionPrefix to kIcebergFunctionPrefixConfig to avoid confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| RowTypePtr inputType, | ||
| ConnectorInsertTableHandlePtr connectorInsertTableHandle, | ||
| ConnectorQueryCtx* connectorQueryCtx, | ||
| CommitStrategy commitStrategy) override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Iceberg have support for all commit strategies? If not, let's clarify what is supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, only kNoCommit is supported.
| /// @param connectorQueryCtx Query context for the write operation. | ||
| /// @param commitStrategy Strategy for committing the write operation. | ||
| /// @return IcebergDataSink instance configured for the write operation. | ||
| /// @throws VeloxUserError if connectorInsertTableHandle is not an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is redundant. Let's remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
| const std::string functionPrefix_; | ||
| }; | ||
|
|
||
| /// Factory for creating IcebergConnector instances. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks.
| const std::string& id, | ||
| std::shared_ptr<const config::ConfigBase> config, | ||
| folly::Executor* ioExecutor = nullptr, | ||
| folly::Executor* cpuExecutor = nullptr) override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
linter will complain about this parameter being unused. Add [[maybe_unused]].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added this to HiveConnector as well.
| opPool_.reset(); | ||
| root_.reset(); | ||
| queryCtx_.reset(); | ||
| // Unregister IcebergConnector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks.
mbasmanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
80633b7 to
e967787
Compare
|
@kagamiori has imported this pull request. If you are a Meta employee, you can view this in D87666371. |
Refactors the Iceberg connector implementation to separate it from Hive connector. Sets the foundation for independent evolution of both connectors.