-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat: Add SQL Support for MERGE INTO in Presto (engine) #26278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat: Add SQL Support for MERGE INTO in Presto (engine) #26278
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @acarpente-denodo, your pull request is larger than the review limit of 150000 diff characters
b8f9b3e to
b27ac47
Compare
71aebc7 to
6f60239
Compare
|
Please include documentation for |
6f60239 to
7a54a60
Compare
|
Hi @steveburnett. I appreciate your feedback. I added a new commit that includes the documentation for the MERGE INTO command. |
8a860c1 to
61ea3bd
Compare
...o-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorNodePartitioningProvider.java
Outdated
Show resolved
Hide resolved
presto-common/src/main/java/com/facebook/presto/common/block/Block.java
Outdated
Show resolved
Hide resolved
presto-common/src/main/java/com/facebook/presto/common/Page.java
Outdated
Show resolved
Hide resolved
presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java
Show resolved
Hide resolved
| return (Table) relation; | ||
| } | ||
| checkArgument(relation instanceof AliasedRelation, "relation is neither a Table nor an AliasedRelation"); | ||
| return (Table) ((AliasedRelation) relation).getRelation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to presume that this will always be a table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the relation passed as a method parameter should be a Table or an AliasedRelation whose relation is a Table.
presto-main-base/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java
Outdated
Show resolved
Hide resolved
presto-main-base/src/main/java/com/facebook/presto/sql/planner/MergePartitioningHandle.java
Outdated
Show resolved
Hide resolved
presto-main-base/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java
Outdated
Show resolved
Hide resolved
| joinSubPlan = subqueryPlanner.handleSubqueries(joinSubPlan, setExpression, mergeStmt, sqlPlannerContext); | ||
| expression = joinSubPlan.rewrite(setExpression); | ||
| expression = coerceIfNecessary(analysis, setExpression, expression); | ||
| expression = checkNotNullColumns(targetColumnHandle, expression, fieldNumber, mergeAnalysis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be done at the plan level, as it will be very expensive. Instead, we should send to the MergeWriterOperator what columns are non-null, and the operator can do a more efficient bulk check. Let me know if I misunderstood anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of this line is to prevent the MERGE INTO command from inserting or updating a NULL value in a non-null column. If the user attempts to execute a command that would do that, then stop the query execution as soon as possible.
The main drawback of your approach is that you have to propagate column nullability information down to MergeWriterOperator, which is not optimal. The propagation requires going from the TableScanOperator -> AssignUniqueIdOperator -> FilterAndProjectOperator -> LookupJoinOperator -> FilterAndProjectOperator -> MarkDistinctOperator -> FilterAndProjectOperator -> MergeProcessorOperator -> MergeWriterOperator.
The benefit of performing this verification during planning is that the Presto engine can cancel the query execution as soon as possible. In your approach, Presto ends up doing all the work before realizing in the last step that it was unnecessary because of an unsatisfied column constraint.
8204885 to
017d3e2
Compare
260cc7a to
6777bf3
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the doc! One comment to ask if you intended the formatting to turn out like it did.
9ae1d35 to
f96475c
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
Pull updated branch, new local doc build, looks good. Thanks!
308279e to
3f6cda5
Compare
tdcmeehan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this LGTM. Thanks! Let me know when it's ready for a final round of review.
presto-spi/src/main/java/com/facebook/presto/spi/connector/RowChangeParadigm.java
Outdated
Show resolved
Hide resolved
3f6cda5 to
d62184e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @acarpente-denodo, your pull request is larger than the review limit of 150000 diff characters
Cherry-pick of trinodb/trino@cee96c3 Co-authored-by: David Stryker <[email protected]>
Automated tests. Cherry-pick of trinodb/trino@cee96c3 Co-authored-by: David Stryker <[email protected]>
d62184e to
84b4519
Compare
Added MERGE INTO statement documentation.
84b4519 to
2140fe8
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
Pull updated branch, new local doc build. Looks good. Thanks!
tdcmeehan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very well designed. Thank you. I only have nits.
| // Add the target table row id field used to process the MERGE command. | ||
| ColumnHandle targetTableRowIdColumnHandle = metadata.getMergeTargetTableRowIdColumnHandle(session, tableHandle.get()); | ||
| Type targetTableRowIdType = metadata.getColumnMetadata(session, tableHandle.get(), targetTableRowIdColumnHandle).getType(); | ||
| Field targetTableRowIdField = Field.newUnqualified(Optional.empty(), "$target_table_row_id", targetTableRowIdType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Field targetTableRowIdField = Field.newUnqualified(Optional.empty(), "$target_table_row_id", targetTableRowIdType); | |
| Field targetTableRowIdField = Field.newUnqualified(table.getLocation(), "$target_table_row_id", targetTableRowIdType); |
| return blocks; | ||
| } | ||
|
|
||
| static boolean[] copyIsNullAndAppendNull(@Nullable boolean[] isNull, int offsetBase, int positionCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a unit test
| return new DictionaryBlock(idsOffset, getPositionCount(), loadedDictionary, ids, false, randomDictionaryId()); | ||
| } | ||
|
|
||
| public Block createProjection(Block newDictionary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a unit test in TestDictionaryBlock
| * DictionaryBlock, but the underlying block must be a RowBlock. The returned field blocks will be the same | ||
| * length as the specified block, which means they are not null suppressed. | ||
| */ | ||
| public static List<Block> getRowFieldsFromBlock(Block block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a unit test in TestRowBlock
|
|
||
| Query 20250204_010445_00022_ymwi5 failed: Iceberg table updates require at least format version 2 and update mode must be merge-on-read | ||
|
|
||
| Iceberg tables do not support running multiple ``MERGE`` statements on the same table in parallel. If two or more ``MERGE`` operations are executed concurrently on the same Iceberg table: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: would be good to link to the merge doc from here.
| Any connector can be used as a source table for a ``MERGE`` statement. | ||
| Only connectors which support the ``MERGE`` statement can be the target of a merge operation. | ||
| See the :doc:`connector documentation </connector/>` for more information. | ||
| The ``MERGE`` statement is currently supported only by the iceberg connector. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| The ``MERGE`` statement is currently supported only by the iceberg connector. | |
| The ``MERGE`` statement is currently supported only by the Iceberg connector. |
| throws Exception | ||
| { | ||
| ByteBuffer byteBuffer = reader.readBinary(); | ||
| assert (byteBuffer.position() == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use Java asserts because those can be disabled. We use checkArgument or checkState from Guava instead.
| // | ||
| // Merge | ||
| // | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // | |
| // Merge | |
| // |
| ANALYZE_FINISH, | ||
| EXPLAIN_ANALYZE, | ||
| UPDATE, | ||
| MERGE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| MERGE | |
| MERGE, |
Description
Engine support for SQL MERGE command. This command inserts or updates rows in a table based on specified conditions.
Syntax:
Example: MERGE INTO usage to update the sales information for existing products and insert the sales information for the new products in the market.
The Presto engine commit introduces an enum called RowChangeParadigm, which describes how a connector modifies rows. The iceberg connector will utilize the
DELETE_ROW_AND_INSERT_ROWparadigm, as it represents an updated row as a combination of a deleted row followed by an inserted row. TheCHANGE_ONLY_UPDATED_COLUMNSparadigm is meant for connectors that support updating individual columns of rows.Note: Changes were made after reviewing the following Trino PR: trinodb/trino#7126
So, this commit is deeply inspired by Trino's implementation.
Motivation and Context
The
MERGE INTOstatement is commonly used to integrate data from two tables with different contents but similar structures.For example, the source table could be part of a production transactional system, while the target table might be located in a data warehouse for analytics.
Regularly, MERGE operations are performed to update the analytics warehouse with the latest production data.
You can also use MERGE with tables that have different structures, as long as you can define a condition to match the rows between them.
Test Plan
Automated tests developed in TestSqlParser, TestSqlParserErrorHandling, TestStatementBuilder, AbstractAnalyzerTest, TestAnalyzer, and TestClassLoaderSafeWrappers classes.
Contributor checklist
Release Notes