Skip to content

Commit 57e7121

Browse files
acarpente-denodoDavid Stryker
andcommitted
Add SQL Support for MERGE INTO In Presto #20578 (engine)
Cherry-pick of trinodb/trino@cee96c3 Co-authored-by: David Stryker <[email protected]>
1 parent ee1a9f7 commit 57e7121

File tree

82 files changed

+3898
-45
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+3898
-45
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.facebook.presto.spi.security.Identity;
4242
import com.facebook.presto.sql.tree.ExistsPredicate;
4343
import com.facebook.presto.sql.tree.Expression;
44+
import com.facebook.presto.sql.tree.FieldReference;
4445
import com.facebook.presto.sql.tree.FunctionCall;
4546
import com.facebook.presto.sql.tree.GroupingOperation;
4647
import com.facebook.presto.sql.tree.Identifier;
@@ -197,6 +198,7 @@ public class Analysis
197198
private Optional<TableHandle> analyzeTarget = Optional.empty();
198199

199200
private Optional<List<ColumnMetadata>> updatedColumns = Optional.empty();
201+
private Optional<MergeAnalysis> mergeAnalysis = Optional.empty();
200202

201203
// for describe input and describe output
202204
private final boolean isDescribe;
@@ -233,6 +235,9 @@ public class Analysis
233235
private final Set<NodeRef<Relation>> aliasedRelations = new LinkedHashSet<>();
234236
private final Set<NodeRef<TableFunctionInvocation>> polymorphicTableFunctions = new LinkedHashSet<>();
235237

238+
// Row id field used for MERGE INTO command.
239+
private final Map<NodeRef<Table>, FieldReference> rowIdField = new LinkedHashMap<>();
240+
236241
public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters, boolean isDescribe)
237242
{
238243
this.root = root;
@@ -445,6 +450,16 @@ public Expression getJoinCriteria(Join join)
445450
return joins.get(NodeRef.of(join));
446451
}
447452

453+
public void setRowIdField(Table table, FieldReference field)
454+
{
455+
rowIdField.put(NodeRef.of(table), field);
456+
}
457+
458+
public FieldReference getRowIdField(Table table)
459+
{
460+
return rowIdField.get(NodeRef.of(table));
461+
}
462+
448463
public void recordSubqueries(Node node, ExpressionAnalysis expressionAnalysis)
449464
{
450465
NodeRef<Node> key = NodeRef.of(node);
@@ -778,6 +793,16 @@ public Optional<List<ColumnMetadata>> getUpdatedColumns()
778793
return updatedColumns;
779794
}
780795

796+
public Optional<MergeAnalysis> getMergeAnalysis()
797+
{
798+
return mergeAnalysis;
799+
}
800+
801+
public void setMergeAnalysis(MergeAnalysis mergeAnalysis)
802+
{
803+
this.mergeAnalysis = Optional.of(mergeAnalysis);
804+
}
805+
781806
public void setRefreshMaterializedViewAnalysis(RefreshMaterializedViewAnalysis refreshMaterializedViewAnalysis)
782807
{
783808
this.refreshMaterializedViewAnalysis = Optional.of(refreshMaterializedViewAnalysis);
@@ -1817,4 +1842,76 @@ public ConnectorTransactionHandle getTransactionHandle()
18171842
return transactionHandle;
18181843
}
18191844
}
1845+
1846+
public static class MergeAnalysis
1847+
{
1848+
private final Table targetTable;
1849+
private final List<ColumnMetadata> targetColumnsMetadata;
1850+
private final List<ColumnHandle> targetColumnHandles;
1851+
private final List<List<ColumnHandle>> mergeCaseColumnHandles;
1852+
private final Set<ColumnHandle> nonNullableColumnHandles;
1853+
private final Map<ColumnHandle, Integer> columnHandleFieldNumbers;
1854+
private final Scope targetTableScope;
1855+
private final Scope joinScope;
1856+
1857+
public MergeAnalysis(
1858+
Table targetTable,
1859+
List<ColumnMetadata> targetColumnsMetadata,
1860+
List<ColumnHandle> targetColumnHandles,
1861+
List<List<ColumnHandle>> mergeCaseColumnHandles,
1862+
Set<ColumnHandle> nonNullableTargetColumnHandles,
1863+
Map<ColumnHandle, Integer> targetColumnHandleFieldNumbers,
1864+
Scope targetTableScope,
1865+
Scope joinScope)
1866+
{
1867+
this.targetTable = requireNonNull(targetTable, "targetTable is null");
1868+
this.targetColumnsMetadata = requireNonNull(targetColumnsMetadata, "targetColumnsMetadata is null");
1869+
this.targetColumnHandles = requireNonNull(targetColumnHandles, "targetColumnHandles is null");
1870+
this.mergeCaseColumnHandles = requireNonNull(mergeCaseColumnHandles, "mergeCaseColumnHandles is null");
1871+
this.nonNullableColumnHandles = requireNonNull(nonNullableTargetColumnHandles, "nonNullableTargetColumnHandles is null");
1872+
this.columnHandleFieldNumbers = requireNonNull(targetColumnHandleFieldNumbers, "targetColumnHandleFieldNumbers is null");
1873+
this.targetTableScope = requireNonNull(targetTableScope, "targetTableScope is null");
1874+
this.joinScope = requireNonNull(joinScope, "joinScope is null");
1875+
}
1876+
1877+
public Table getTargetTable()
1878+
{
1879+
return targetTable;
1880+
}
1881+
1882+
public List<ColumnMetadata> getTargetColumnsMetadata()
1883+
{
1884+
return targetColumnsMetadata;
1885+
}
1886+
1887+
public List<ColumnHandle> getTargetColumnHandles()
1888+
{
1889+
return targetColumnHandles;
1890+
}
1891+
1892+
public List<List<ColumnHandle>> getMergeCaseColumnHandles()
1893+
{
1894+
return mergeCaseColumnHandles;
1895+
}
1896+
1897+
public Set<ColumnHandle> getNonNullableColumnHandles()
1898+
{
1899+
return nonNullableColumnHandles;
1900+
}
1901+
1902+
public Map<ColumnHandle, Integer> getColumnHandleFieldNumbers()
1903+
{
1904+
return columnHandleFieldNumbers;
1905+
}
1906+
1907+
public Scope getJoinScope()
1908+
{
1909+
return joinScope;
1910+
}
1911+
1912+
public Scope getTargetTableScope()
1913+
{
1914+
return targetTableScope;
1915+
}
1916+
}
18201917
}

presto-common/src/main/java/com/facebook/presto/common/block/BlockUtil.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,4 +307,27 @@ static Block[] ensureBlocksAreLoaded(Block[] blocks)
307307
// No newly loaded blocks
308308
return blocks;
309309
}
310+
311+
static boolean[] copyIsNullAndAppendNull(@Nullable boolean[] isNull, int offsetBase, int positionCount)
312+
{
313+
int desiredLength = offsetBase + positionCount + 1;
314+
boolean[] newIsNull = new boolean[desiredLength];
315+
if (isNull != null) {
316+
checkArrayRange(isNull, offsetBase, positionCount);
317+
System.arraycopy(isNull, 0, newIsNull, 0, desiredLength - 1);
318+
}
319+
// mark the last element to append null
320+
newIsNull[desiredLength - 1] = true;
321+
return newIsNull;
322+
}
323+
324+
static int[] copyOffsetsAndAppendNull(int[] offsets, int offsetBase, int positionCount)
325+
{
326+
int desiredLength = offsetBase + positionCount + 2;
327+
checkArrayRange(offsets, offsetBase, positionCount + 1);
328+
int[] newOffsets = Arrays.copyOf(offsets, desiredLength);
329+
// Null element does not move the offset forward
330+
newOffsets[desiredLength - 1] = newOffsets[desiredLength - 2];
331+
return newOffsets;
332+
}
310333
}

presto-common/src/main/java/com/facebook/presto/common/block/DictionaryBlock.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,33 @@ public Block getLoadedBlock()
508508
return new DictionaryBlock(idsOffset, getPositionCount(), loadedDictionary, ids, false, randomDictionaryId());
509509
}
510510

511+
public Block createProjection(Block newDictionary)
512+
{
513+
if (newDictionary.getPositionCount() != dictionary.getPositionCount()) {
514+
throw new IllegalArgumentException("newDictionary must have the same position count");
515+
}
516+
517+
// if the new dictionary is lazy be careful to not materialize it
518+
if (newDictionary instanceof LazyBlock) {
519+
return new LazyBlock(positionCount, (block) -> {
520+
Block newDictionaryBlock = newDictionary.getBlock(0);
521+
Block newBlock = createProjection(newDictionaryBlock);
522+
block.setBlock(newBlock);
523+
});
524+
}
525+
if (newDictionary instanceof RunLengthEncodedBlock) {
526+
RunLengthEncodedBlock rle = (RunLengthEncodedBlock) newDictionary;
527+
return new RunLengthEncodedBlock(rle.getValue(), positionCount);
528+
}
529+
530+
// unwrap dictionary in dictionary
531+
int[] newIds = new int[positionCount];
532+
for (int position = 0; position < positionCount; position++) {
533+
newIds[position] = getIdUnchecked(position);
534+
}
535+
return new DictionaryBlock(0, positionCount, newDictionary, newIds, false, randomDictionaryId());
536+
}
537+
511538
public Block getDictionary()
512539
{
513540
return dictionary;
@@ -533,6 +560,11 @@ public int getId(int position)
533560
return ids[position + idsOffset];
534561
}
535562

563+
private int getIdUnchecked(int position)
564+
{
565+
return ids[position + idsOffset];
566+
}
567+
536568
public DictionaryId getDictionarySourceId()
537569
{
538570
return dictionarySourceId;

presto-common/src/main/java/com/facebook/presto/common/block/RowBlock.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
import org.openjdk.jol.info.ClassLayout;
1818

1919
import java.util.Arrays;
20+
import java.util.List;
2021
import java.util.Objects;
2122
import java.util.Optional;
2223
import java.util.function.ObjLongConsumer;
24+
import java.util.stream.Collectors;
2325

2426
import static com.facebook.presto.common.block.BlockUtil.ensureBlocksAreLoaded;
2527
import static io.airlift.slice.SizeOf.sizeOf;
@@ -248,6 +250,39 @@ public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
248250
consumer.accept(this, INSTANCE_SIZE);
249251
}
250252

253+
/**
254+
* Returns the row fields from the specified block. The block maybe a LazyBlock, RunLengthEncodedBlock, or
255+
* DictionaryBlock, but the underlying block must be a RowBlock. The returned field blocks will be the same
256+
* length as the specified block, which means they are not null suppressed.
257+
*/
258+
public static List<Block> getRowFieldsFromBlock(Block block)
259+
{
260+
// if the block is lazy, be careful to not materialize the nested blocks
261+
if (block instanceof LazyBlock) {
262+
LazyBlock lazyBlock = (LazyBlock) block;
263+
block = lazyBlock.getBlock(0);
264+
}
265+
266+
if (block instanceof RunLengthEncodedBlock) {
267+
RunLengthEncodedBlock runLengthEncodedBlock = (RunLengthEncodedBlock) block;
268+
RowBlock rowBlock = (RowBlock) runLengthEncodedBlock.getValue();
269+
return Arrays.stream(rowBlock.fieldBlocks)
270+
.map(fieldBlock -> new RunLengthEncodedBlock(fieldBlock, runLengthEncodedBlock.getPositionCount()))
271+
.collect(Collectors.toList());
272+
}
273+
if (block instanceof DictionaryBlock) {
274+
DictionaryBlock dictionaryBlock = (DictionaryBlock) block;
275+
RowBlock rowBlock = (RowBlock) dictionaryBlock.getDictionary();
276+
return Arrays.stream(rowBlock.fieldBlocks)
277+
.map(dictionaryBlock::createProjection)
278+
.collect(Collectors.toList());
279+
}
280+
if (block instanceof RowBlock) {
281+
return Arrays.asList(((RowBlock) block).fieldBlocks);
282+
}
283+
throw new IllegalArgumentException("Unexpected block type: " + block.getClass().getSimpleName());
284+
}
285+
251286
@Override
252287
public String toString()
253288
{

presto-common/src/main/java/com/facebook/presto/common/type/AbstractType.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ public void writeBoolean(BlockBuilder blockBuilder, boolean value)
9292
throw new UnsupportedOperationException(getClass().getName());
9393
}
9494

95+
@Override
96+
public byte getByte(Block block, int position)
97+
{
98+
throw new UnsupportedOperationException(getClass().getName());
99+
}
100+
95101
@Override
96102
public long getLong(Block block, int position)
97103
{

presto-common/src/main/java/com/facebook/presto/common/type/FunctionType.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ public void writeBoolean(BlockBuilder blockBuilder, boolean value)
144144
throw new UnsupportedOperationException(getClass().getName());
145145
}
146146

147+
@Override
148+
public byte getByte(Block block, int position)
149+
{
150+
throw new UnsupportedOperationException(getClass().getName());
151+
}
152+
147153
@Override
148154
public long getLong(Block block, int position)
149155
{

presto-common/src/main/java/com/facebook/presto/common/type/TinyintType.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ public long getLong(Block block, int position)
133133
return (long) block.getByte(position);
134134
}
135135

136+
@Override
137+
public byte getByte(Block block, int position)
138+
{
139+
return block.getByte(position);
140+
}
141+
136142
@Override
137143
public long getLongUnchecked(UncheckedBlock block, int internalPosition)
138144
{

presto-common/src/main/java/com/facebook/presto/common/type/Type.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ default boolean equalValuesAreIdentical()
9898
*/
9999
boolean getBooleanUnchecked(UncheckedBlock block, int internalPosition);
100100

101+
/**
102+
* Gets the value at the {@code block} {@code position} as a byte.
103+
*/
104+
byte getByte(Block block, int position);
105+
101106
/**
102107
* Gets the value at the {@code block} {@code position} as a long.
103108
*/

presto-common/src/main/java/com/facebook/presto/common/type/TypeWithName.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@ public boolean getBooleanUnchecked(UncheckedBlock block, int internalPosition)
145145
return type.getBooleanUnchecked(block, internalPosition);
146146
}
147147

148+
@Override
149+
public byte getByte(Block block, int position)
150+
{
151+
return type.getByte(block, position);
152+
}
153+
148154
@Override
149155
public long getLong(Block block, int position)
150156
{

presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
1919
import com.facebook.presto.spi.ConnectorId;
2020
import com.facebook.presto.spi.ConnectorInsertTableHandle;
21+
import com.facebook.presto.spi.ConnectorMergeTableHandle;
2122
import com.facebook.presto.spi.ConnectorOutputTableHandle;
2223
import com.facebook.presto.spi.ConnectorSplit;
2324
import com.facebook.presto.spi.ConnectorTableHandle;
@@ -85,6 +86,12 @@ public Optional<ConnectorCodec<ConnectorDeleteTableHandle>> getDeleteTableHandle
8586
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorDeleteTableHandleCodec);
8687
}
8788

89+
public Optional<ConnectorCodec<ConnectorMergeTableHandle>> getMergeTableHandleCodec(String connectorId)
90+
{
91+
requireNonNull(connectorId, "connectorId is null");
92+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorMergeTableHandleCodec);
93+
}
94+
8895
public Optional<ConnectorCodec<ConnectorTableLayoutHandle>> getTableLayoutHandleCodec(String connectorId)
8996
{
9097
requireNonNull(connectorId, "connectorId is null");

0 commit comments

Comments
 (0)