Skip to content

Commit 0dc094e

Browse files
acarpente-denodoDavid Stryker
andcommitted
Add SQL Support for MERGE INTO In Presto #20578 (engine)
Working in progress Cherry-pick of trinodb/trino@cee96c3 Co-authored-by: David Stryker <[email protected]>
1 parent de1d051 commit 0dc094e

File tree

85 files changed

+4110
-109
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+4110
-109
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.facebook.presto.spi.ColumnHandle;
2222
import com.facebook.presto.spi.ColumnMetadata;
2323
import com.facebook.presto.spi.ConnectorId;
24+
import com.facebook.presto.spi.NewTableLayout;
2425
import com.facebook.presto.spi.TableHandle;
2526
import com.facebook.presto.spi.analyzer.AccessControlInfo;
2627
import com.facebook.presto.spi.analyzer.AccessControlInfoForTable;
@@ -32,12 +33,14 @@
3233
import com.facebook.presto.spi.function.FunctionKind;
3334
import com.facebook.presto.spi.function.table.Argument;
3435
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
36+
import com.facebook.presto.spi.plan.PartitioningHandle;
3537
import com.facebook.presto.spi.security.AccessControl;
3638
import com.facebook.presto.spi.security.AccessControlContext;
3739
import com.facebook.presto.spi.security.AllowAllAccessControl;
3840
import com.facebook.presto.spi.security.Identity;
3941
import com.facebook.presto.sql.tree.ExistsPredicate;
4042
import com.facebook.presto.sql.tree.Expression;
43+
import com.facebook.presto.sql.tree.FieldReference;
4144
import com.facebook.presto.sql.tree.FunctionCall;
4245
import com.facebook.presto.sql.tree.GroupingOperation;
4346
import com.facebook.presto.sql.tree.Identifier;
@@ -187,6 +190,7 @@ public class Analysis
187190
private Optional<TableHandle> analyzeTarget = Optional.empty();
188191

189192
private Optional<List<ColumnMetadata>> updatedColumns = Optional.empty();
193+
private Optional<MergeAnalysis> mergeAnalysis = Optional.empty();
190194

191195
// for describe input and describe output
192196
private final boolean isDescribe;
@@ -221,6 +225,9 @@ public class Analysis
221225
private final Set<NodeRef<Relation>> aliasedRelations = new LinkedHashSet<>();
222226
private final Set<NodeRef<TableFunctionInvocation>> polymorphicTableFunctions = new LinkedHashSet<>();
223227

228+
// Row id field used for MERGE INTO command.
229+
private final Map<NodeRef<Table>, FieldReference> rowIdField = new LinkedHashMap<>();
230+
224231
public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters, boolean isDescribe)
225232
{
226233
this.root = root;
@@ -433,6 +440,16 @@ public Expression getJoinCriteria(Join join)
433440
return joins.get(NodeRef.of(join));
434441
}
435442

443+
public void setRowIdField(Table table, FieldReference field)
444+
{
445+
rowIdField.put(NodeRef.of(table), field);
446+
}
447+
448+
public FieldReference getRowIdField(Table table)
449+
{
450+
return rowIdField.get(NodeRef.of(table));
451+
}
452+
436453
public void recordSubqueries(Node node, ExpressionAnalysis expressionAnalysis)
437454
{
438455
NodeRef<Node> key = NodeRef.of(node);
@@ -726,6 +743,16 @@ public Optional<List<ColumnMetadata>> getUpdatedColumns()
726743
return updatedColumns;
727744
}
728745

746+
public Optional<MergeAnalysis> getMergeAnalysis()
747+
{
748+
return mergeAnalysis;
749+
}
750+
751+
public void setMergeAnalysis(MergeAnalysis mergeAnalysis)
752+
{
753+
this.mergeAnalysis = Optional.of(mergeAnalysis);
754+
}
755+
729756
public void setRefreshMaterializedViewAnalysis(RefreshMaterializedViewAnalysis refreshMaterializedViewAnalysis)
730757
{
731758
this.refreshMaterializedViewAnalysis = Optional.of(refreshMaterializedViewAnalysis);
@@ -1694,4 +1721,108 @@ public ConnectorTransactionHandle getTransactionHandle()
16941721
return transactionHandle;
16951722
}
16961723
}
1724+
1725+
public static class MergeAnalysis
1726+
{
1727+
private final Table targetTable;
1728+
private final List<ColumnMetadata> targetColumnsMetadata;
1729+
private final List<ColumnHandle> targetColumnHandles;
1730+
private final List<ColumnHandle> targetRedistributionColumnHandles;
1731+
private final List<List<ColumnHandle>> mergeCaseColumnHandles;
1732+
private final Set<ColumnHandle> nonNullableColumnHandles;
1733+
private final Map<ColumnHandle, Integer> columnHandleFieldNumbers;
1734+
private final List<Integer> insertPartitioningArgumentIndexes;
1735+
private final Optional<NewTableLayout> insertLayout;
1736+
private final Optional<PartitioningHandle> updateLayout;
1737+
private final Scope targetTableScope;
1738+
private final Scope joinScope;
1739+
1740+
public MergeAnalysis(
1741+
Table targetTable,
1742+
List<ColumnMetadata> targetColumnsMetadata,
1743+
List<ColumnHandle> targetColumnHandles,
1744+
List<ColumnHandle> targetRedistributionColumnHandles,
1745+
List<List<ColumnHandle>> mergeCaseColumnHandles,
1746+
Set<ColumnHandle> nonNullableTargetColumnHandles,
1747+
Map<ColumnHandle, Integer> targetColumnHandleFieldNumbers,
1748+
List<Integer> insertPartitioningArgumentIndexes,
1749+
Optional<NewTableLayout> insertLayout,
1750+
Optional<PartitioningHandle> updateLayout,
1751+
Scope targetTableScope,
1752+
Scope joinScope)
1753+
{
1754+
this.targetTable = requireNonNull(targetTable, "targetTable is null");
1755+
this.targetColumnsMetadata = requireNonNull(targetColumnsMetadata, "targetColumnsMetadata is null");
1756+
this.targetColumnHandles = requireNonNull(targetColumnHandles, "targetColumnHandles is null");
1757+
this.targetRedistributionColumnHandles = requireNonNull(targetRedistributionColumnHandles, "targetRedistributionColumnHandles is null");
1758+
this.mergeCaseColumnHandles = requireNonNull(mergeCaseColumnHandles, "mergeCaseColumnHandles is null");
1759+
this.nonNullableColumnHandles = requireNonNull(nonNullableTargetColumnHandles, "nonNullableTargetColumnHandles is null");
1760+
this.columnHandleFieldNumbers = requireNonNull(targetColumnHandleFieldNumbers, "targetColumnHandleFieldNumbers is null");
1761+
this.insertLayout = requireNonNull(insertLayout, "insertLayout is null");
1762+
this.updateLayout = requireNonNull(updateLayout, "updateLayout is null");
1763+
this.insertPartitioningArgumentIndexes = (requireNonNull(insertPartitioningArgumentIndexes, "insertPartitioningArgumentIndexes is null"));
1764+
this.targetTableScope = requireNonNull(targetTableScope, "targetTableScope is null");
1765+
this.joinScope = requireNonNull(joinScope, "joinScope is null");
1766+
}
1767+
1768+
public Table getTargetTable()
1769+
{
1770+
return targetTable;
1771+
}
1772+
1773+
public List<ColumnMetadata> getTargetColumnsMetadata()
1774+
{
1775+
return targetColumnsMetadata;
1776+
}
1777+
1778+
public List<ColumnHandle> getTargetColumnHandles()
1779+
{
1780+
return targetColumnHandles;
1781+
}
1782+
1783+
public List<ColumnHandle> getTargetRedistributionColumnHandles()
1784+
{
1785+
return targetRedistributionColumnHandles;
1786+
}
1787+
1788+
public List<List<ColumnHandle>> getMergeCaseColumnHandles()
1789+
{
1790+
return mergeCaseColumnHandles;
1791+
}
1792+
1793+
public Set<ColumnHandle> getNonNullableColumnHandles()
1794+
{
1795+
return nonNullableColumnHandles;
1796+
}
1797+
1798+
public Map<ColumnHandle, Integer> getColumnHandleFieldNumbers()
1799+
{
1800+
return columnHandleFieldNumbers;
1801+
}
1802+
1803+
public List<Integer> getInsertPartitioningArgumentIndexes()
1804+
{
1805+
return insertPartitioningArgumentIndexes;
1806+
}
1807+
1808+
public Optional<NewTableLayout> getInsertLayout()
1809+
{
1810+
return insertLayout;
1811+
}
1812+
1813+
public Optional<PartitioningHandle> getUpdateLayout()
1814+
{
1815+
return updateLayout;
1816+
}
1817+
1818+
public Scope getJoinScope()
1819+
{
1820+
return joinScope;
1821+
}
1822+
1823+
public Scope getTargetTableScope()
1824+
{
1825+
return targetTableScope;
1826+
}
1827+
}
16971828
}

presto-blackhole/src/main/java/com/facebook/presto/plugin/blackhole/BlackHoleNodePartitioningProvider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
2727

2828
import java.util.List;
29+
import java.util.Optional;
2930
import java.util.function.ToIntFunction;
3031

3132
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -43,10 +44,10 @@ public BlackHoleNodePartitioningProvider(NodeManager nodeManager)
4344
}
4445

4546
@Override
46-
public ConnectorBucketNodeMap getBucketNodeMap(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Node> sortedNodes)
47+
public Optional<ConnectorBucketNodeMap> getBucketNodeMap(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Node> sortedNodes)
4748
{
4849
// create one bucket per node
49-
return createBucketNodeMap(nodeManager.getRequiredWorkerNodes().size());
50+
return Optional.of(createBucketNodeMap(nodeManager.getRequiredWorkerNodes().size()));
5051
}
5152

5253
@Override

presto-common/src/main/java/com/facebook/presto/common/Page.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,17 @@ public Page getLoadedPage(int... channels)
357357
return wrapBlocksWithoutCopy(positionCount, blocks);
358358
}
359359

360+
public Page getColumns(int... columns)
361+
{
362+
requireNonNull(columns, "columns is null");
363+
364+
Block[] blocks = new Block[columns.length];
365+
for (int i = 0; i < columns.length; i++) {
366+
blocks[i] = this.blocks[columns[i]];
367+
}
368+
return wrapBlocksWithoutCopy(positionCount, blocks);
369+
}
370+
360371
@Override
361372
public String toString()
362373
{

presto-common/src/main/java/com/facebook/presto/common/block/Block.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,4 +400,20 @@ default long toLong(int position)
400400
{
401401
throw new UnsupportedOperationException(getClass().getName());
402402
}
403+
404+
/**
405+
* Returns the underlying value block underlying this block.
406+
*/
407+
default Block getUnderlyingValueBlock()
408+
{
409+
return this;
410+
}
411+
412+
/**
413+
* Returns the position in the underlying value block corresponding to the specified position in this block.
414+
*/
415+
default int getUnderlyingValuePosition(int position)
416+
{
417+
return position;
418+
}
403419
}

presto-common/src/main/java/com/facebook/presto/common/block/BlockUtil.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,4 +307,27 @@ static Block[] ensureBlocksAreLoaded(Block[] blocks)
307307
// No newly loaded blocks
308308
return blocks;
309309
}
310+
311+
static boolean[] copyIsNullAndAppendNull(@Nullable boolean[] isNull, int offsetBase, int positionCount)
312+
{
313+
int desiredLength = offsetBase + positionCount + 1;
314+
boolean[] newIsNull = new boolean[desiredLength];
315+
if (isNull != null) {
316+
checkArrayRange(isNull, offsetBase, positionCount);
317+
System.arraycopy(isNull, 0, newIsNull, 0, desiredLength - 1);
318+
}
319+
// mark the last element to append null
320+
newIsNull[desiredLength - 1] = true;
321+
return newIsNull;
322+
}
323+
324+
static int[] copyOffsetsAndAppendNull(int[] offsets, int offsetBase, int positionCount)
325+
{
326+
int desiredLength = offsetBase + positionCount + 2;
327+
checkArrayRange(offsets, offsetBase, positionCount + 1);
328+
int[] newOffsets = Arrays.copyOf(offsets, desiredLength);
329+
// Null element does not move the offset forward
330+
newOffsets[desiredLength - 1] = newOffsets[desiredLength - 2];
331+
return newOffsets;
332+
}
310333
}

presto-common/src/main/java/com/facebook/presto/common/block/DictionaryBlock.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,45 @@ public Block getLoadedBlock()
508508
return new DictionaryBlock(idsOffset, getPositionCount(), loadedDictionary, ids, false, randomDictionaryId());
509509
}
510510

511+
@Override
512+
public Block getUnderlyingValueBlock()
513+
{
514+
return dictionary.getUnderlyingValueBlock();
515+
}
516+
517+
@Override
518+
public int getUnderlyingValuePosition(int position)
519+
{
520+
return dictionary.getUnderlyingValuePosition(getId(position));
521+
}
522+
523+
public Block createProjection(Block newDictionary)
524+
{
525+
if (newDictionary.getPositionCount() != dictionary.getPositionCount()) {
526+
throw new IllegalArgumentException("newDictionary must have the same position count");
527+
}
528+
529+
// if the new dictionary is lazy be careful to not materialize it
530+
if (newDictionary instanceof LazyBlock) {
531+
return new LazyBlock(positionCount, (block) -> {
532+
Block newDictionaryBlock = newDictionary.getBlock(0);
533+
Block newBlock = createProjection(newDictionaryBlock);
534+
block.setBlock(newBlock);
535+
});
536+
}
537+
if (newDictionary instanceof RunLengthEncodedBlock) {
538+
RunLengthEncodedBlock rle = (RunLengthEncodedBlock) newDictionary;
539+
return new RunLengthEncodedBlock(rle.getValue(), positionCount);
540+
}
541+
542+
// unwrap dictionary in dictionary
543+
int[] newIds = new int[positionCount];
544+
for (int position = 0; position < positionCount; position++) {
545+
newIds[position] = newDictionary.getUnderlyingValuePosition(getIdUnchecked(position));
546+
}
547+
return new DictionaryBlock(0, positionCount, newDictionary.getUnderlyingValueBlock(), newIds, false, randomDictionaryId());
548+
}
549+
511550
public Block getDictionary()
512551
{
513552
return dictionary;
@@ -533,6 +572,11 @@ public int getId(int position)
533572
return ids[position + idsOffset];
534573
}
535574

575+
private int getIdUnchecked(int position)
576+
{
577+
return ids[position + idsOffset];
578+
}
579+
536580
public DictionaryId getDictionarySourceId()
537581
{
538582
return dictionarySourceId;

presto-common/src/main/java/com/facebook/presto/common/block/LazyBlock.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,18 @@ public boolean isNullUnchecked(int internalPosition)
389389
return block.isNull(internalPosition);
390390
}
391391

392+
@Override
393+
public Block getUnderlyingValueBlock()
394+
{
395+
return block.getUnderlyingValueBlock();
396+
}
397+
398+
@Override
399+
public int getUnderlyingValuePosition(int position)
400+
{
401+
return block.getUnderlyingValuePosition(position);
402+
}
403+
392404
@Override
393405
public Block appendNull()
394406
{

0 commit comments

Comments
 (0)