Skip to content

Commit 9f86028

Browse files
Add SQL Support for MERGE INTO In Presto prestodb#20578 (engine)
Changes to include the code necessary for: - feat(connector): Add support for custom connector-provided serialization codecs (prestodb#26257) - Add support to provide thrift codec for connector specific fields (prestodb#25242)
1 parent fed0a39 commit 9f86028

File tree

14 files changed

+278
-37
lines changed

14 files changed

+278
-37
lines changed

presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
1919
import com.facebook.presto.spi.ConnectorId;
2020
import com.facebook.presto.spi.ConnectorInsertTableHandle;
21+
import com.facebook.presto.spi.ConnectorMergeTableHandle;
2122
import com.facebook.presto.spi.ConnectorOutputTableHandle;
2223
import com.facebook.presto.spi.ConnectorSplit;
2324
import com.facebook.presto.spi.ConnectorTableHandle;
@@ -85,6 +86,12 @@ public Optional<ConnectorCodec<ConnectorDeleteTableHandle>> getDeleteTableHandle
8586
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorDeleteTableHandleCodec);
8687
}
8788

89+
public Optional<ConnectorCodec<ConnectorMergeTableHandle>> getMergeTableHandleCodec(String connectorId)
90+
{
91+
requireNonNull(connectorId, "connectorId is null");
92+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorMergeTableHandleCodec);
93+
}
94+
8895
public Optional<ConnectorCodec<ConnectorTableLayoutHandle>> getTableLayoutHandleCodec(String connectorId)
8996
{
9097
requireNonNull(connectorId, "connectorId is null");

presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ public String toString()
230230
}
231231
}
232232

233+
@ThriftStruct
233234
public static class MergeHandle
234235
extends ExecutionWriterTarget
235236
{
@@ -239,6 +240,7 @@ public static class MergeHandle
239240
// private final ConnectorMergeTableHandle connectorMergeTableHandle;
240241

241242
@JsonCreator
243+
@ThriftConstructor
242244
public MergeHandle(
243245
@JsonProperty("handle") com.facebook.presto.spi.MergeHandle handle)
244246
// @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@@ -250,6 +252,7 @@ public MergeHandle(
250252
}
251253

252254
@JsonProperty
255+
@ThriftField(1)
253256
public com.facebook.presto.spi.MergeHandle getHandle()
254257
{
255258
return handle;

presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTargetUnion.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class ExecutionWriterTargetUnion
2929
private ExecutionWriterTarget.DeleteHandle deleteHandle;
3030
private ExecutionWriterTarget.RefreshMaterializedViewHandle refreshMaterializedViewHandle;
3131
private ExecutionWriterTarget.UpdateHandle updateHandle;
32+
private ExecutionWriterTarget.MergeHandle mergeHandle;
3233

3334
@ThriftConstructor
3435
public ExecutionWriterTargetUnion()
@@ -101,6 +102,19 @@ public ExecutionWriterTarget.UpdateHandle getUpdateHandle()
101102
return updateHandle;
102103
}
103104

105+
@ThriftConstructor
106+
public ExecutionWriterTargetUnion(ExecutionWriterTarget.MergeHandle mergeHandle)
107+
{
108+
this.id = 6;
109+
this.mergeHandle = mergeHandle;
110+
}
111+
112+
@ThriftField(6)
113+
public ExecutionWriterTarget.MergeHandle getMergeHandle()
114+
{
115+
return mergeHandle;
116+
}
117+
104118
@ThriftUnionId
105119
public short getId()
106120
{
@@ -125,6 +139,9 @@ else if (executionWriterTargetUnion.getRefreshMaterializedViewHandle() != null)
125139
else if (executionWriterTargetUnion.getUpdateHandle() != null) {
126140
return executionWriterTargetUnion.getUpdateHandle();
127141
}
142+
else if (executionWriterTargetUnion.getMergeHandle() != null) {
143+
return executionWriterTargetUnion.getMergeHandle();
144+
}
128145
else {
129146
throw new IllegalArgumentException("Unrecognized execution writer target: " + executionWriterTargetUnion);
130147
}
@@ -149,6 +166,9 @@ else if (executionWriterTarget instanceof ExecutionWriterTarget.RefreshMateriali
149166
else if (executionWriterTarget instanceof ExecutionWriterTarget.UpdateHandle) {
150167
return new ExecutionWriterTargetUnion((ExecutionWriterTarget.UpdateHandle) executionWriterTarget);
151168
}
169+
else if (executionWriterTarget instanceof ExecutionWriterTarget.MergeHandle) {
170+
return new ExecutionWriterTargetUnion((ExecutionWriterTarget.MergeHandle) executionWriterTarget);
171+
}
152172
else {
153173
throw new IllegalArgumentException("Unsupported execution writer target: " + executionWriterTarget);
154174
}

presto-main-base/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public void configure(Binder binder)
3333
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
3434
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
3535
jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class);
36-
jsonBinder(binder).addModuleBinding().to(MergeHandleJacksonModule.class);
36+
jsonBinder(binder).addModuleBinding().to(MergeTableHandleJacksonModule.class);
3737
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
3838
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
3939
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);

presto-main-base/src/main/java/com/facebook/presto/metadata/HandleResolver.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public String getId(FunctionHandle functionHandle)
137137

138138
public String getId(ConnectorMergeTableHandle mergeHandle)
139139
{
140-
return getId(mergeHandle, MaterializedHandleResolver::getMergeHandleClass);
140+
return getId(mergeHandle, MaterializedHandleResolver::getMergeTableHandleClass);
141141
}
142142

143143
public Class<? extends ConnectorTableHandle> getTableHandleClass(String id)
@@ -180,9 +180,9 @@ public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass(Str
180180
return resolverFor(id).getDeleteTableHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
181181
}
182182

183-
public Class<? extends ConnectorMergeTableHandle> getMergeHandleClass(String id)
183+
public Class<? extends ConnectorMergeTableHandle> getMergeTableHandleClass(String id)
184184
{
185-
return resolverFor(id).getMergeHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
185+
return resolverFor(id).getMergeTableHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
186186
}
187187

188188
public Class<? extends ConnectorPartitioningHandle> getPartitioningHandleClass(String id)
@@ -321,7 +321,7 @@ public Optional<Class<? extends ConnectorDeleteTableHandle>> getDeleteTableHandl
321321
return deleteTableHandle;
322322
}
323323

324-
public Optional<Class<? extends ConnectorMergeTableHandle>> getMergeHandleClass()
324+
public Optional<Class<? extends ConnectorMergeTableHandle>> getMergeTableHandleClass()
325325
{
326326
return mergeTableHandle;
327327
}

presto-main-base/src/main/java/com/facebook/presto/metadata/MergeHandleJacksonModule.java

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.metadata;
15+
16+
import com.facebook.presto.connector.ConnectorManager;
17+
import com.facebook.presto.spi.ConnectorCodec;
18+
import com.facebook.presto.spi.ConnectorId;
19+
import com.facebook.presto.spi.ConnectorMergeTableHandle;
20+
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
21+
import com.facebook.presto.sql.analyzer.FeaturesConfig;
22+
import jakarta.inject.Provider;
23+
24+
import javax.inject.Inject;
25+
26+
import java.util.Optional;
27+
import java.util.function.Function;
28+
29+
public class MergeTableHandleJacksonModule
30+
extends AbstractTypedJacksonModule<ConnectorMergeTableHandle>
31+
{
32+
@Inject
33+
public MergeTableHandleJacksonModule(
34+
HandleResolver handleResolver,
35+
Provider<ConnectorManager> connectorManagerProvider,
36+
FeaturesConfig featuresConfig)
37+
{
38+
super(ConnectorMergeTableHandle.class,
39+
handleResolver::getId,
40+
handleResolver::getMergeTableHandleClass,
41+
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
42+
connectorId -> connectorManagerProvider.get()
43+
.getConnectorCodecProvider(connectorId)
44+
.flatMap(ConnectorCodecProvider::getConnectorMergeTableHandleCodec));
45+
}
46+
47+
public MergeTableHandleJacksonModule(
48+
HandleResolver handleResolver,
49+
FeaturesConfig featuresConfig,
50+
Function<ConnectorId, Optional<ConnectorCodec<ConnectorMergeTableHandle>>> codecExtractor)
51+
{
52+
super(ConnectorMergeTableHandle.class,
53+
handleResolver::getId,
54+
handleResolver::getMergeTableHandleClass,
55+
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
56+
codecExtractor);
57+
}
58+
}

presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.metadata.HandleResolver;
1717
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
1818
import com.facebook.presto.spi.ConnectorInsertTableHandle;
19+
import com.facebook.presto.spi.ConnectorMergeTableHandle;
1920
import com.facebook.presto.spi.ConnectorOutputTableHandle;
2021
import com.facebook.presto.spi.ConnectorSplit;
2122
import com.facebook.presto.spi.ConnectorTableHandle;
@@ -39,6 +40,7 @@ public void configure(Binder binder)
3940
thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class);
4041
thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class);
4142
thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
43+
thriftCodecBinder(binder).bindCustomThriftCodec(MergeTableHandleThriftCodec.class);
4244
thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
4345
thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
4446

@@ -47,6 +49,7 @@ public void configure(Binder binder)
4749
jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class);
4850
jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class);
4951
jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class);
52+
jsonCodecBinder(binder).bindJsonCodec(ConnectorMergeTableHandle.class);
5053
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class);
5154
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class);
5255

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.server.thrift;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.drift.codec.CodecThriftType;
18+
import com.facebook.drift.codec.metadata.ThriftType;
19+
import com.facebook.drift.protocol.TProtocolReader;
20+
import com.facebook.drift.protocol.TProtocolWriter;
21+
import com.facebook.presto.connector.ConnectorCodecManager;
22+
import com.facebook.presto.metadata.HandleResolver;
23+
import com.facebook.presto.spi.ConnectorMergeTableHandle;
24+
25+
import javax.inject.Inject;
26+
27+
import java.nio.ByteBuffer;
28+
29+
import static java.util.Objects.requireNonNull;
30+
31+
public class MergeTableHandleThriftCodec
32+
extends AbstractTypedThriftCodec<ConnectorMergeTableHandle>
33+
{
34+
private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorMergeTableHandle.class);
35+
private final ConnectorCodecManager connectorCodecManager;
36+
37+
@Inject
38+
public MergeTableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec<ConnectorMergeTableHandle> jsonCodec)
39+
{
40+
super(ConnectorMergeTableHandle.class,
41+
requireNonNull(jsonCodec, "jsonCodec is null"),
42+
requireNonNull(handleResolver, "handleResolver is null")::getId,
43+
handleResolver::getMergeTableHandleClass);
44+
this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
45+
}
46+
47+
@CodecThriftType
48+
public static ThriftType getThriftType()
49+
{
50+
return THRIFT_TYPE;
51+
}
52+
53+
@Override
54+
public ThriftType getType()
55+
{
56+
return THRIFT_TYPE;
57+
}
58+
59+
@Override
60+
public ConnectorMergeTableHandle readConcreteValue(String connectorId, TProtocolReader reader)
61+
throws Exception
62+
{
63+
ByteBuffer byteBuffer = reader.readBinary();
64+
assert (byteBuffer.position() == 0);
65+
byte[] bytes = byteBuffer.array();
66+
return connectorCodecManager.getMergeTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
67+
}
68+
69+
@Override
70+
public void writeConcreteValue(String connectorId, ConnectorMergeTableHandle value, TProtocolWriter writer)
71+
throws Exception
72+
{
73+
requireNonNull(value, "value is null");
74+
writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getMergeTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
75+
}
76+
77+
@Override
78+
public boolean isThriftCodecAvailable(String connectorId)
79+
{
80+
return connectorCodecManager.getMergeTableHandleCodec(connectorId).isPresent();
81+
}
82+
}

presto-main-base/src/main/java/com/facebook/presto/sql/planner/MergePartitioningHandle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ public MergePartitioningHandle(
6060
checkArgument(insertPartitioning.isPresent() || updatePartitioning.isPresent(), "insert or update partitioning must be present");
6161
}
6262

63-
@JsonProperty("insertPartitioning")
63+
@JsonProperty
6464
public Optional<PartitioningScheme> getInsertPartitioning()
6565
{
6666
return insertPartitioning;
6767
}
6868

69-
@JsonProperty("updatePartitioning")
69+
@JsonProperty
7070
public Optional<PartitioningScheme> getUpdatePartitioning()
7171
{
7272
return updatePartitioning;

0 commit comments

Comments
 (0)