Skip to content

Commit b8f9b3e

Browse files
acarpente-denodoDavid Stryker
andcommitted
Add SQL Support for MERGE INTO In Presto #20578 (engine-tests)
Automated tests. Cherry-pick of trinodb/trino@cee96c3 Co-authored-by: David Stryker <[email protected]>
1 parent 85b50f6 commit b8f9b3e

File tree

7 files changed

+446
-0
lines changed

7 files changed

+446
-0
lines changed

presto-main-base/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.facebook.presto.spi.ConnectorTableMetadata;
2727
import com.facebook.presto.spi.Constraint;
2828
import com.facebook.presto.spi.MaterializedViewDefinition;
29+
import com.facebook.presto.spi.MergeHandle;
2930
import com.facebook.presto.spi.NewTableLayout;
3031
import com.facebook.presto.spi.SystemTable;
3132
import com.facebook.presto.spi.TableHandle;
@@ -35,6 +36,7 @@
3536
import com.facebook.presto.spi.connector.ConnectorCapabilities;
3637
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
3738
import com.facebook.presto.spi.connector.ConnectorTableVersion;
39+
import com.facebook.presto.spi.connector.RowChangeParadigm;
3840
import com.facebook.presto.spi.connector.TableFunctionApplicationResult;
3941
import com.facebook.presto.spi.constraints.TableConstraint;
4042
import com.facebook.presto.spi.function.SqlFunction;
@@ -447,6 +449,49 @@ public void finishUpdate(Session session, TableHandle tableHandle, Collection<Sl
447449
throw new UnsupportedOperationException();
448450
}
449451

452+
/**
453+
* Return the row update paradigm supported by the connector on the table or throw
454+
* an exception if row change is not supported.
455+
*/
456+
public RowChangeParadigm getRowChangeParadigm(Session session, TableHandle tableHandle)
457+
{
458+
throw new UnsupportedOperationException();
459+
}
460+
461+
/**
462+
* Get the column handle that will generate row IDs for the merge operation.
463+
* These IDs will be passed to the {@code storeMergedRows()} method of the
464+
* {@link com.facebook.presto.spi.ConnectorMergeSink} that created them.
465+
*/
466+
public ColumnHandle getMergeRowIdColumnHandle(Session session, TableHandle tableHandle)
467+
{
468+
throw new UnsupportedOperationException();
469+
}
470+
471+
/**
472+
* Get the physical layout for updated or deleted rows of a MERGE operation.
473+
*/
474+
public Optional<PartitioningHandle> getMergeUpdateLayout(Session session, TableHandle tableHandle)
475+
{
476+
throw new UnsupportedOperationException();
477+
}
478+
479+
/**
480+
* Begin merge query
481+
*/
482+
public MergeHandle beginMerge(Session session, TableHandle tableHandle)
483+
{
484+
throw new UnsupportedOperationException();
485+
}
486+
487+
/**
488+
* Finish merge query
489+
*/
490+
public void finishMerge(Session session, MergeHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
491+
{
492+
throw new UnsupportedOperationException();
493+
}
494+
450495
@Override
451496
public Optional<ConnectorId> getCatalogHandle(Session session, String catalogName)
452497
{

presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.facebook.presto.spi.ColumnMetadata;
4747
import com.facebook.presto.spi.ConnectorId;
4848
import com.facebook.presto.spi.ConnectorTableMetadata;
49+
import com.facebook.presto.spi.MaterializedViewDefinition;
4950
import com.facebook.presto.spi.SchemaTableName;
5051
import com.facebook.presto.spi.WarningCollector;
5152
import com.facebook.presto.spi.analyzer.ViewDefinition;
@@ -74,6 +75,8 @@
7475
import org.intellij.lang.annotations.Language;
7576
import org.testng.annotations.BeforeClass;
7677

78+
import java.util.ArrayList;
79+
import java.util.Collections;
7780
import java.util.List;
7881
import java.util.Optional;
7982
import java.util.function.Consumer;
@@ -313,6 +316,32 @@ public void setup()
313316
ColumnMetadata.builder().setName("z").setType(BIGINT).build())),
314317
false));
315318

319+
// materialized view referencing table in same schema
320+
List<SchemaTableName> baseTables = new ArrayList<>(Collections.singletonList(table2));
321+
MaterializedViewDefinition.TableColumn baseTableColumns = new MaterializedViewDefinition.TableColumn(table2, "a", true);
322+
323+
SchemaTableName materializedTable = new SchemaTableName("s1", "mv1");
324+
MaterializedViewDefinition.TableColumn materializedViewTableColumn = new MaterializedViewDefinition.TableColumn(materializedTable, "a", true);
325+
326+
List<MaterializedViewDefinition.ColumnMapping> columnMappings = Collections.singletonList(
327+
new MaterializedViewDefinition.ColumnMapping(materializedViewTableColumn, Collections.singletonList(baseTableColumns)));
328+
329+
MaterializedViewDefinition materializedViewData1 = new MaterializedViewDefinition(
330+
"select a from t2",
331+
"s1",
332+
"mv1",
333+
baseTables,
334+
Optional.of("user"),
335+
columnMappings,
336+
new ArrayList<>(),
337+
Optional.of(new ArrayList<>(Collections.singletonList("a"))));
338+
339+
ConnectorTableMetadata materializedViewMetadata1 = new ConnectorTableMetadata(
340+
materializedTable, ImmutableList.of(ColumnMetadata.builder().setName("a").setType(BIGINT).build()));
341+
342+
inSetupTransaction(session ->
343+
metadata.createMaterializedView(session, TPCH_CATALOG, materializedViewMetadata1, materializedViewData1, false));
344+
316345
// valid view referencing table in same schema
317346
String viewData1 = JsonCodec.jsonCodec(ViewDefinition.class).toJson(
318347
new ViewDefinition(

presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2339,4 +2339,17 @@ public void testTableFunctionRequiredColumns()
23392339
// table s1.t5 has two columns. The second column is hidden. Table function can require a hidden column.
23402340
analyze("SELECT * FROM TABLE(system.required_columns_function(input => TABLE(s1.t5)))");
23412341
}
2342+
2343+
@Test
2344+
public void testInvalidMerge()
2345+
{
2346+
assertFails(MISSING_TABLE, "Table tpch.s1.foo does not exist",
2347+
"MERGE INTO foo USING bar ON foo.id = bar.id WHEN MATCHED THEN UPDATE SET id = bar.id + 1");
2348+
2349+
assertFails(NOT_SUPPORTED, "line 1:1: Merging into views is not supported",
2350+
"MERGE INTO v1 USING t1 ON v1.a = t1.a WHEN MATCHED THEN UPDATE SET id = bar.id + 1");
2351+
2352+
assertFails(NOT_SUPPORTED, "line 1:1: Merging into materialized views is not supported",
2353+
"MERGE INTO mv1 USING t1 ON mv1.a = t1.a WHEN MATCHED THEN UPDATE SET id = bar.id + 1");
2354+
}
23422355
}
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.sql.planner;
15+
16+
import com.facebook.presto.common.Page;
17+
import com.facebook.presto.common.block.Block;
18+
import com.facebook.presto.common.block.BlockBuilder;
19+
import com.facebook.presto.common.block.ByteArrayBlock;
20+
import com.facebook.presto.common.block.IntArrayBlock;
21+
import com.facebook.presto.common.block.LongArrayBlock;
22+
import com.facebook.presto.common.block.PageBuilderStatus;
23+
import com.facebook.presto.common.block.RowBlock;
24+
import com.facebook.presto.common.type.RowType;
25+
import com.facebook.presto.common.type.Type;
26+
import com.facebook.presto.operator.DeleteAndInsertMergeProcessor;
27+
import com.google.common.collect.ImmutableList;
28+
import io.airlift.slice.Slices;
29+
import org.testng.annotations.Test;
30+
31+
import java.nio.charset.Charset;
32+
import java.util.List;
33+
import java.util.Optional;
34+
35+
import static com.facebook.presto.common.type.BigintType.BIGINT;
36+
import static com.facebook.presto.common.type.IntegerType.INTEGER;
37+
import static com.facebook.presto.common.type.TinyintType.TINYINT;
38+
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
39+
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
40+
import static com.facebook.presto.operator.MergeRowChangeProcessor.DEFAULT_CASE_OPERATION_NUMBER;
41+
import static com.facebook.presto.spi.ConnectorMergeSink.DELETE_OPERATION_NUMBER;
42+
import static com.facebook.presto.spi.ConnectorMergeSink.INSERT_OPERATION_NUMBER;
43+
import static com.facebook.presto.spi.ConnectorMergeSink.UPDATE_OPERATION_NUMBER;
44+
import static org.assertj.core.api.Assertions.assertThat;
45+
46+
public class TestDeleteAndInsertMergeProcessor
47+
{
48+
@Test
49+
public void testSimpleDeletedRowMerge()
50+
{
51+
// target: ('Dave', 11, 'Devon'), ('Dave', 11, 'Darbyshire')
52+
// source: ('Dave', 11, 'Darbyshire')
53+
// merge:
54+
// MERGE INTO target t USING source s
55+
// ON t.customer = s.customer" +
56+
// WHEN MATCHED AND t.address <> 'Darbyshire' AND s.purchases * 2 > 20" +
57+
// THEN DELETE
58+
// expected: ('Dave', 11, 'Darbyshire')
59+
DeleteAndInsertMergeProcessor processor = makeMergeProcessor();
60+
Page inputPage = makePageFromBlocks(
61+
2,
62+
Optional.empty(),
63+
new Block[] {
64+
makeLongArrayBlock(1, 1), // TransactionId
65+
makeLongArrayBlock(1, 0), // rowId
66+
makeIntArrayBlock(536870912, 536870912)}, // bucket
67+
new Block[] {
68+
makeVarcharArrayBlock("", "Dave"), // customer
69+
makeIntArrayBlock(0, 11), // purchases
70+
makeVarcharArrayBlock("", "Devon"), // address
71+
makeByteArrayBlock(1, 1), // "present" boolean
72+
makeByteArrayBlock(DEFAULT_CASE_OPERATION_NUMBER, DELETE_OPERATION_NUMBER),
73+
makeIntArrayBlock(-1, 0)});
74+
75+
Page outputPage = processor.transformPage(inputPage);
76+
assertThat(outputPage.getPositionCount()).isEqualTo(1);
77+
78+
// The single operation is a delete
79+
assertThat(TINYINT.getLong(outputPage.getBlock(3), 0)).isEqualTo(DELETE_OPERATION_NUMBER);
80+
81+
// Show that the row to be deleted is rowId 0, e.g. ('Dave', 11, 'Devon')
82+
Block rowIdRow = outputPage.getBlock(4).getBlock(0);
83+
assertThat(INTEGER.getLong(rowIdRow, 1)).isEqualTo(0);
84+
}
85+
86+
@Test
87+
public void testUpdateAndDeletedMerge()
88+
{
89+
// target: ('Aaron', 11, 'Arches'), ('Bill', 7, 'Buena'), ('Dave', 11, 'Darbyshire'), ('Dave', 11, 'Devon'), ('Ed', 7, 'Etherville')
90+
// source: ('Aaron', 6, 'Arches'), ('Carol', 9, 'Centreville'), ('Dave', 11, 'Darbyshire'), ('Ed', 7, 'Etherville')
91+
// merge:
92+
// MERGE INTO target t USING source s
93+
// ON t.customer = s.customer" +
94+
// WHEN MATCHED AND t.address <> 'Darbyshire' AND s.purchases * 2 > 20
95+
// THEN DELETE" +
96+
// WHEN MATCHED" +
97+
// THEN UPDATE SET purchases = s.purchases + t.purchases, address = concat(t.address, '/', s.address)" +
98+
// WHEN NOT MATCHED" +
99+
// THEN INSERT (customer, purchases, address) VALUES(s.customer, s.purchases, s.address)
100+
// expected: ('Aaron', 17, 'Arches/Arches'), ('Bill', 7, 'Buena'), ('Carol', 9, 'Centreville'), ('Dave', 22, 'Darbyshire/Darbyshire'), ('Ed', 14, 'Etherville/Etherville'), ('Fred', 30, 'Franklin')
101+
DeleteAndInsertMergeProcessor processor = makeMergeProcessor();
102+
boolean[] rowIdNulls = new boolean[] {false, true, false, false, false};
103+
Page inputPage = makePageFromBlocks(
104+
5,
105+
Optional.of(rowIdNulls),
106+
new Block[] {
107+
makeLongArrayBlockWithNulls(rowIdNulls, 5, 2, 1, 2, 2), // TransactionId
108+
makeLongArrayBlockWithNulls(rowIdNulls, 5, 0, 3, 1, 2), // rowId
109+
makeIntArrayBlockWithNulls(rowIdNulls, 5, 536870912, 536870912, 536870912, 536870912)}, // bucket
110+
new Block[] {
111+
// customer
112+
makeVarcharArrayBlock("Aaron", "Carol", "Dave", "Dave", "Ed"),
113+
// purchases
114+
makeIntArrayBlock(17, 9, 11, 22, 14),
115+
// address
116+
makeVarcharArrayBlock("Arches/Arches", "Centreville", "Devon", "Darbyshire/Darbyshire", "Etherville/Etherville"),
117+
// "present" boolean
118+
makeByteArrayBlock(1, 0, 1, 1, 1),
119+
// operation number: update, insert, delete, update
120+
makeByteArrayBlock(UPDATE_OPERATION_NUMBER, INSERT_OPERATION_NUMBER, DELETE_OPERATION_NUMBER, UPDATE_OPERATION_NUMBER, UPDATE_OPERATION_NUMBER),
121+
makeIntArrayBlock(0, 1, 2, 0, 0)});
122+
123+
Page outputPage = processor.transformPage(inputPage);
124+
assertThat(outputPage.getPositionCount()).isEqualTo(8);
125+
RowBlock rowIdBlock = (RowBlock) outputPage.getBlock(4);
126+
assertThat(rowIdBlock.getPositionCount()).isEqualTo(8);
127+
// Show that the first row has address "Arches"
128+
assertThat(getString(outputPage.getBlock(2), 1)).isEqualTo("Arches/Arches");
129+
}
130+
131+
@Test
132+
public void testAnotherMergeCase()
133+
{
134+
/*
135+
inputPage: Page[positions=5
136+
0:Row[0:Long[2, 1, 2, 2], 1:Long[0, 3, 1, 2], 2:Int[536870912, 536870912, 536870912, 536870912]],
137+
1:Row[0:VarWidth["Aaron", "Carol", "Dave", "Dave", "Ed"], 1:Int[17, 9, 11, 22, 14], 2:VarWidth["Arches/Arches", "Centreville", "Devon", "Darbyshire/Darbyshir...", "Etherville/Ethervill..."], 3:Int[1, 2, 0, 1, 1], 4:Int[3, 1, 2, 3, 3]]]
138+
Page[positions=8 0:Dict[VarWidth["Aaron", "Dave", "Dave", "Ed", "Aaron", "Carol", "Dave", "Ed"]], 1:Dict[Int[17, 11, 22, 14, 17, 9, 22, 14]], 2:Dict[VarWidth["Arches/Arches", "Devon", "Darbyshire/Darbyshir...", "Etherville/Ethervill...", "Arches/Arches", "Centreville", "Darbyshire/Darbyshir...", "Etherville/Ethervill..."]], 3:Int[2, 2, 2, 2, 1, 1, 1, 1], 4:Row[0:Dict[Long[2, 1, 2, 2, 2, 2, 2, 2]], 1:Dict[Long[0, 3, 1, 2, 0, 0, 0, 0]], 2:Dict[Int[536870912, 536870912, 536870912, 536870912, 536870912, 536870912, 536870912, 536870912]]]]
139+
Expected row count to be <5>, but was <7>; rows=[[Bill, 7, Buena], [Dave, 11, Devon], [Aaron, 11, Arches], [Aaron, 17, Arches/Arches], [Carol, 9, Centreville], [Dave, 22, Darbyshire/Darbyshire], [Ed, 14, Etherville/Etherville]]
140+
*/
141+
DeleteAndInsertMergeProcessor processor = makeMergeProcessor();
142+
boolean[] rowIdNulls = new boolean[] {false, true, false, false, false};
143+
Page inputPage = makePageFromBlocks(
144+
5,
145+
Optional.of(rowIdNulls),
146+
new Block[] {
147+
makeLongArrayBlockWithNulls(rowIdNulls, 5, 2, 1, 2, 2), // TransactionId
148+
makeLongArrayBlockWithNulls(rowIdNulls, 5, 0, 3, 1, 2), // rowId
149+
makeIntArrayBlockWithNulls(rowIdNulls, 5, 536870912, 536870912, 536870912, 536870912)}, // bucket
150+
new Block[] {
151+
// customer
152+
makeVarcharArrayBlock("Aaron", "Carol", "Dave", "Dave", "Ed"),
153+
// purchases
154+
makeIntArrayBlock(17, 9, 11, 22, 14),
155+
// address
156+
makeVarcharArrayBlock("Arches/Arches", "Centreville", "Devon", "Darbyshire/Darbyshire", "Etherville/Etherville"),
157+
// "present" boolean
158+
makeByteArrayBlock(1, 0, 1, 1, 0),
159+
// operation number: update, insert, delete, update, update
160+
makeByteArrayBlock(3, 1, 2, 3, 3),
161+
makeIntArrayBlock(0, -1, 1, 0, 0)});
162+
163+
Page outputPage = processor.transformPage(inputPage);
164+
assertThat(outputPage.getPositionCount()).isEqualTo(8);
165+
RowBlock rowIdBlock = (RowBlock) outputPage.getBlock(4);
166+
assertThat(rowIdBlock.getPositionCount()).isEqualTo(8);
167+
// Show that the first row has address "Arches/Arches"
168+
assertThat(getString(outputPage.getBlock(2), 1)).isEqualTo("Arches/Arches");
169+
}
170+
171+
private Page makePageFromBlocks(int positionCount, Optional<boolean[]> rowIdNulls, Block[] rowIdBlocks, Block[] mergeCaseBlocks)
172+
{
173+
Block[] pageBlocks = new Block[] {
174+
RowBlock.fromFieldBlocks(positionCount, rowIdNulls, rowIdBlocks),
175+
RowBlock.fromFieldBlocks(positionCount, Optional.empty(), mergeCaseBlocks)
176+
};
177+
return new Page(pageBlocks);
178+
}
179+
180+
private DeleteAndInsertMergeProcessor makeMergeProcessor()
181+
{
182+
// CREATE TABLE (customer VARCHAR, purchases INTEGER, address VARCHAR)
183+
List<Type> types = ImmutableList.of(VARCHAR, INTEGER, VARCHAR);
184+
185+
RowType rowIdType = RowType.anonymous(ImmutableList.of(BIGINT, BIGINT, INTEGER));
186+
return new DeleteAndInsertMergeProcessor(types, rowIdType, 0, 1, ImmutableList.of(), ImmutableList.of(0, 1, 2));
187+
}
188+
189+
private String getString(Block block, int position)
190+
{
191+
return VARBINARY.getSlice(block, position).toString(Charset.defaultCharset());
192+
}
193+
194+
private LongArrayBlock makeLongArrayBlock(long... elements)
195+
{
196+
return new LongArrayBlock(elements.length, Optional.empty(), elements);
197+
}
198+
199+
private LongArrayBlock makeLongArrayBlockWithNulls(boolean[] nulls, int positionCount, long... elements)
200+
{
201+
assertThat(countNonNull(nulls) + elements.length).isEqualTo(positionCount);
202+
return new LongArrayBlock(elements.length, Optional.of(nulls), elements);
203+
}
204+
205+
private IntArrayBlock makeIntArrayBlock(int... elements)
206+
{
207+
return new IntArrayBlock(elements.length, Optional.empty(), elements);
208+
}
209+
210+
private IntArrayBlock makeIntArrayBlockWithNulls(boolean[] nulls, int positionCount, int... elements)
211+
{
212+
assertThat(countNonNull(nulls) + elements.length).isEqualTo(positionCount);
213+
return new IntArrayBlock(elements.length, Optional.of(nulls), elements);
214+
}
215+
216+
private int countNonNull(boolean[] nulls)
217+
{
218+
int count = 0;
219+
for (int position = 0; position < nulls.length; position++) {
220+
if (nulls[position]) {
221+
count++;
222+
}
223+
}
224+
return count;
225+
}
226+
227+
private ByteArrayBlock makeByteArrayBlock(int... elements)
228+
{
229+
byte[] bytes = new byte[elements.length];
230+
for (int index = 0; index < elements.length; index++) {
231+
bytes[index] = (byte) elements[index];
232+
}
233+
return new ByteArrayBlock(elements.length, Optional.empty(), bytes);
234+
}
235+
236+
private Block makeVarcharArrayBlock(String... elements)
237+
{
238+
BlockBuilder builder = VARCHAR.createBlockBuilder(new PageBuilderStatus().createBlockBuilderStatus(), elements.length);
239+
for (String element : elements) {
240+
VARCHAR.writeSlice(builder, Slices.utf8Slice(element));
241+
}
242+
return builder.build();
243+
}
244+
}

0 commit comments

Comments
 (0)