Skip to content

Commit c9ba62c

Browse files
Add SQL Support for MERGE INTO In Presto prestodb#20578 (engine)
feat(connector): Add support for custom connector-provided serialization codecs (prestodb#26257)
1 parent 1a9a341 commit c9ba62c

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

presto-main-base/src/main/java/com/facebook/presto/metadata/MergeHandleJacksonModule.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,46 @@
1313
*/
1414
package com.facebook.presto.metadata;
1515

16+
import com.facebook.presto.connector.ConnectorManager;
17+
import com.facebook.presto.spi.ConnectorCodec;
18+
import com.facebook.presto.spi.ConnectorId;
1619
import com.facebook.presto.spi.ConnectorMergeTableHandle;
20+
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
21+
import com.facebook.presto.sql.analyzer.FeaturesConfig;
22+
import jakarta.inject.Provider;
1723

1824
import javax.inject.Inject;
1925

26+
import java.util.Optional;
27+
import java.util.function.Function;
28+
2029
public class MergeHandleJacksonModule
2130
extends AbstractTypedJacksonModule<ConnectorMergeTableHandle>
2231
{
2332
@Inject
24-
public MergeHandleJacksonModule(HandleResolver handleResolver)
33+
public MergeHandleJacksonModule(
34+
HandleResolver handleResolver,
35+
Provider<ConnectorManager> connectorManagerProvider,
36+
FeaturesConfig featuresConfig)
37+
{
38+
super(ConnectorMergeTableHandle.class,
39+
handleResolver::getId,
40+
handleResolver::getMergeHandleClass,
41+
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
42+
connectorId -> connectorManagerProvider.get()
43+
.getConnectorCodecProvider(connectorId)
44+
.flatMap(ConnectorCodecProvider::getConnectorMergeTableHandleCodec));
45+
}
46+
47+
public MergeHandleJacksonModule(
48+
HandleResolver handleResolver,
49+
FeaturesConfig featuresConfig,
50+
Function<ConnectorId, Optional<ConnectorCodec<ConnectorMergeTableHandle>>> codecExtractor)
2551
{
2652
super(ConnectorMergeTableHandle.class,
2753
handleResolver::getId,
28-
handleResolver::getMergeHandleClass);
54+
handleResolver::getMergeHandleClass,
55+
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
56+
codecExtractor);
2957
}
3058
}

presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskConnectorCodec.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import com.facebook.presto.spi.ConnectorId;
8686
import com.facebook.presto.spi.ConnectorIndexHandle;
8787
import com.facebook.presto.spi.ConnectorInsertTableHandle;
88+
import com.facebook.presto.spi.ConnectorMergeTableHandle;
8889
import com.facebook.presto.spi.ConnectorOutputTableHandle;
8990
import com.facebook.presto.spi.ConnectorSplit;
9091
import com.facebook.presto.spi.ConnectorTableHandle;
@@ -984,6 +985,25 @@ public ConnectorDeleteTableHandle deserialize(byte[] data)
984985
}
985986
});
986987
}
988+
989+
public Optional<ConnectorCodec<ConnectorMergeTableHandle>> getConnectorMergeTableHandleCodec()
990+
{
991+
return Optional.of(new ConnectorCodec<>()
992+
{
993+
@Override
994+
public byte[] serialize(ConnectorMergeTableHandle handle)
995+
{
996+
TestConnectorMergeTableHandle mergeTableHandle = (TestConnectorMergeTableHandle) handle;
997+
return mergeTableHandle.getTableName().getBytes(UTF_8);
998+
}
999+
1000+
@Override
1001+
public ConnectorMergeTableHandle deserialize(byte[] data)
1002+
{
1003+
return new TestConnectorMergeTableHandle(new String(data, UTF_8));
1004+
}
1005+
});
1006+
}
9871007
}
9881008

9891009
/**
@@ -1287,6 +1307,53 @@ public int hashCode()
12871307
}
12881308
}
12891309

1310+
/**
1311+
* Test merge table handle with binary serialization support
1312+
*/
1313+
public static class TestConnectorMergeTableHandle
1314+
implements ConnectorMergeTableHandle
1315+
{
1316+
private final String tableName;
1317+
1318+
@JsonCreator
1319+
public TestConnectorMergeTableHandle(
1320+
@JsonProperty("tableName") String tableName)
1321+
{
1322+
this.tableName = tableName;
1323+
}
1324+
1325+
@JsonProperty
1326+
public String getTableName()
1327+
{
1328+
return tableName;
1329+
}
1330+
1331+
@Override
1332+
public ConnectorTableHandle getTableHandle()
1333+
{
1334+
throw new UnsupportedOperationException("Merge table handles not supported");
1335+
}
1336+
1337+
@Override
1338+
public boolean equals(Object o)
1339+
{
1340+
if (this == o) {
1341+
return true;
1342+
}
1343+
if (o == null || getClass() != o.getClass()) {
1344+
return false;
1345+
}
1346+
TestConnectorMergeTableHandle that = (TestConnectorMergeTableHandle) o;
1347+
return tableName.equals(that.tableName);
1348+
}
1349+
1350+
@Override
1351+
public int hashCode()
1352+
{
1353+
return Objects.hash(tableName);
1354+
}
1355+
}
1356+
12901357
public static class TestConnectorWithoutCodecSplit
12911358
implements ConnectorSplit
12921359
{
@@ -1396,5 +1463,11 @@ public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass()
13961463
{
13971464
throw new UnsupportedOperationException("Delete table handles not supported");
13981465
}
1466+
1467+
@Override
1468+
public Class<? extends ConnectorMergeTableHandle> getMergeTableHandleClass()
1469+
{
1470+
throw new UnsupportedOperationException("Merge table handles not supported");
1471+
}
13991472
}
14001473
}

presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
1919
import com.facebook.presto.spi.ConnectorIndexHandle;
2020
import com.facebook.presto.spi.ConnectorInsertTableHandle;
21+
import com.facebook.presto.spi.ConnectorMergeTableHandle;
2122
import com.facebook.presto.spi.ConnectorOutputTableHandle;
2223
import com.facebook.presto.spi.ConnectorSplit;
2324
import com.facebook.presto.spi.ConnectorTableHandle;
@@ -52,6 +53,11 @@ default Optional<ConnectorCodec<ConnectorDeleteTableHandle>> getConnectorDeleteT
5253
return Optional.empty();
5354
}
5455

56+
default Optional<ConnectorCodec<ConnectorMergeTableHandle>> getConnectorMergeTableHandleCodec()
57+
{
58+
return Optional.empty();
59+
}
60+
5561
default Optional<ConnectorCodec<ConnectorTableLayoutHandle>> getConnectorTableLayoutHandleCodec()
5662
{
5763
return Optional.empty();

0 commit comments

Comments
 (0)