Skip to content

Conversation

@acarpente-denodo
Copy link
Contributor

@acarpente-denodo acarpente-denodo commented Oct 10, 2025

Description

Engine support for SQL MERGE command. This command inserts or updates rows in a table based on specified conditions.

Syntax:

MERGE INTO target_table [ [ AS ]  target_alias ]
USING { source_table | query } [ [ AS ] source_alias ]
ON search_condition
WHEN MATCHED THEN
    UPDATE SET ( column = expression [, ...] )
WHEN NOT MATCHED THEN
    INSERT [ column_list ]
    VALUES (expression, ...)

Example: MERGE INTO usage to update the sales information for existing products and insert the sales information for the new products in the market.

MERGE INTO product_sales AS s
    USING monthly_sales AS ms
    ON s.product_id = ms.product_id
WHEN MATCHED THEN
    UPDATE SET
        sales = sales + ms.sales
      , last_sale = ms.sale_date
      , current_price = ms.price
WHEN NOT MATCHED THEN
    INSERT (product_id, sales, last_sale, current_price)
    VALUES (ms.product_id, ms.sales, ms.sale_date, ms.price)

The Presto engine commit introduces an enum called RowChangeParadigm, which describes how a connector modifies rows. The iceberg connector will utilize the DELETE_ROW_AND_INSERT_ROW paradigm, as it represents an updated row as a combination of a deleted row followed by an inserted row. The CHANGE_ONLY_UPDATED_COLUMNS paradigm is meant for connectors that support updating individual columns of rows.

Note: Changes were made after reviewing the following Trino PR: trinodb/trino#7126
So, this commit is deeply inspired by Trino's implementation.

Motivation and Context

The MERGE INTO statement is commonly used to integrate data from two tables with different contents but similar structures.
For example, the source table could be part of a production transactional system, while the target table might be located in a data warehouse for analytics.
Regularly, MERGE operations are performed to update the analytics warehouse with the latest production data.
You can also use MERGE with tables that have different structures, as long as you can define a condition to match the rows between them.

Test Plan

Automated tests developed in TestSqlParser, TestSqlParserErrorHandling, TestStatementBuilder, AbstractAnalyzerTest, TestAnalyzer, and TestClassLoaderSafeWrappers classes.

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

== RELEASE NOTES ==

General Changes
* Add support for the MERGE command in the Presto engine.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @acarpente-denodo, your pull request is larger than the review limit of 150000 diff characters

@acarpente-denodo acarpente-denodo changed the title Draft: feat: Add SQL Support for MERGE INTO in Presto (engine) feat: Add SQL Support for MERGE INTO in Presto (engine) Oct 10, 2025
@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch from b8f9b3e to b27ac47 Compare October 13, 2025 12:21
@tdcmeehan tdcmeehan self-assigned this Oct 13, 2025
@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch 2 times, most recently from 71aebc7 to 6f60239 Compare October 14, 2025 08:00
@steveburnett
Copy link
Contributor

Please include documentation for SQL MERGE INTO as a new file in
https://github.com/prestodb/presto/tree/master/presto-docs/src/main/sphinx/sql
and add the new file to the index page
https://github.com/prestodb/presto/blob/master/presto-docs/src/main/sphinx/sql.rst

@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch from 6f60239 to 7a54a60 Compare October 14, 2025 14:00
@acarpente-denodo
Copy link
Contributor Author

Hi @steveburnett. I appreciate your feedback. I added a new commit that includes the documentation for the MERGE INTO command.

@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch 2 times, most recently from 8a860c1 to 61ea3bd Compare October 16, 2025 08:14
return (Table) relation;
}
checkArgument(relation instanceof AliasedRelation, "relation is neither a Table nor an AliasedRelation");
return (Table) ((AliasedRelation) relation).getRelation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to presume that this will always be a table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the relation passed as a method parameter should be a Table or an AliasedRelation whose relation is a Table.

joinSubPlan = subqueryPlanner.handleSubqueries(joinSubPlan, setExpression, mergeStmt, sqlPlannerContext);
expression = joinSubPlan.rewrite(setExpression);
expression = coerceIfNecessary(analysis, setExpression, expression);
expression = checkNotNullColumns(targetColumnHandle, expression, fieldNumber, mergeAnalysis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be done at the plan level, as it will be very expensive. Instead, we should send to the MergeWriterOperator what columns are non-null, and the operator can do a more efficient bulk check. Let me know if I misunderstood anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this line is to prevent the MERGE INTO command from inserting or updating a NULL value in a non-null column. If the user attempts to execute a command that would do that, then stop the query execution as soon as possible.

The main drawback of your approach is that you have to propagate column nullability information down to MergeWriterOperator, which is not optimal. The propagation requires going from the TableScanOperator -> AssignUniqueIdOperator -> FilterAndProjectOperator -> LookupJoinOperator -> FilterAndProjectOperator -> MarkDistinctOperator -> FilterAndProjectOperator -> MergeProcessorOperator -> MergeWriterOperator.

The benefit of performing this verification during planning is that the Presto engine can cancel the query execution as soon as possible. In your approach, Presto ends up doing all the work before realizing in the last step that it was unnecessary because of an unsatisfied column constraint.

@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch 6 times, most recently from 8204885 to 017d3e2 Compare November 4, 2025 09:55
@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch 3 times, most recently from 260cc7a to 6777bf3 Compare November 7, 2025 11:16
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the doc! One comment to ask if you intended the formatting to turn out like it did.

@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch 2 times, most recently from 9ae1d35 to f96475c Compare November 12, 2025 15:55
steveburnett
steveburnett previously approved these changes Nov 12, 2025
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull updated branch, new local doc build, looks good. Thanks!

@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch 2 times, most recently from 308279e to 3f6cda5 Compare November 13, 2025 15:50
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this LGTM. Thanks! Let me know when it's ready for a final round of review.

@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch from 3f6cda5 to d62184e Compare November 17, 2025 15:37
@acarpente-denodo acarpente-denodo marked this pull request as ready for review November 18, 2025 17:06
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @acarpente-denodo, your pull request is larger than the review limit of 150000 diff characters

@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch from d62184e to 84b4519 Compare November 19, 2025 10:53
@acarpente-denodo acarpente-denodo force-pushed the feature/20578_SQL_Support_for_MERGE_INTO_(engine) branch from 84b4519 to 2140fe8 Compare November 19, 2025 14:10
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull updated branch, new local doc build. Looks good. Thanks!

Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very well designed. Thank you. I only have nits.

// Add the target table row id field used to process the MERGE command.
ColumnHandle targetTableRowIdColumnHandle = metadata.getMergeTargetTableRowIdColumnHandle(session, tableHandle.get());
Type targetTableRowIdType = metadata.getColumnMetadata(session, tableHandle.get(), targetTableRowIdColumnHandle).getType();
Field targetTableRowIdField = Field.newUnqualified(Optional.empty(), "$target_table_row_id", targetTableRowIdType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Field targetTableRowIdField = Field.newUnqualified(Optional.empty(), "$target_table_row_id", targetTableRowIdType);
Field targetTableRowIdField = Field.newUnqualified(table.getLocation(), "$target_table_row_id", targetTableRowIdType);

return blocks;
}

static boolean[] copyIsNullAndAppendNull(@Nullable boolean[] isNull, int offsetBase, int positionCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a unit test

return new DictionaryBlock(idsOffset, getPositionCount(), loadedDictionary, ids, false, randomDictionaryId());
}

public Block createProjection(Block newDictionary)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a unit test in TestDictionaryBlock

* DictionaryBlock, but the underlying block must be a RowBlock. The returned field blocks will be the same
* length as the specified block, which means they are not null suppressed.
*/
public static List<Block> getRowFieldsFromBlock(Block block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a unit test in TestRowBlock


Query 20250204_010445_00022_ymwi5 failed: Iceberg table updates require at least format version 2 and update mode must be merge-on-read

Iceberg tables do not support running multiple ``MERGE`` statements on the same table in parallel. If two or more ``MERGE`` operations are executed concurrently on the same Iceberg table:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: would be good to link to the merge doc from here.

Any connector can be used as a source table for a ``MERGE`` statement.
Only connectors which support the ``MERGE`` statement can be the target of a merge operation.
See the :doc:`connector documentation </connector/>` for more information.
The ``MERGE`` statement is currently supported only by the iceberg connector.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The ``MERGE`` statement is currently supported only by the iceberg connector.
The ``MERGE`` statement is currently supported only by the Iceberg connector.

throws Exception
{
ByteBuffer byteBuffer = reader.readBinary();
assert (byteBuffer.position() == 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use Java asserts because those can be disabled. We use checkArgument or checkState from Guava instead.

Comment on lines +745 to +748
//
// Merge
//

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//
// Merge
//

ANALYZE_FINISH,
EXPLAIN_ANALYZE,
UPDATE,
MERGE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MERGE
MERGE,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants