Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.bootstrap.Bootstrap;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.JsonModule;
import com.facebook.drift.codec.guice.ThriftCodecModule;
import com.facebook.presto.block.BlockJsonSerde;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncoding;
Expand All @@ -24,6 +25,7 @@
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.HandleJsonModule;
import com.facebook.presto.metadata.HandleResolver;
Expand Down Expand Up @@ -92,6 +94,8 @@ private JsonCodec<DeltaTableHandle> getJsonCodec()
Module module = binder -> {
binder.install(new JsonModule());
binder.install(new HandleJsonModule());
binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON);
binder.install(new ThriftCodecModule());
configBinder(binder).bindConfig(FeaturesConfig.class);
FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager();
binder.bind(TypeManager.class).toInstance(functionAndTypeManager);
Expand Down
18 changes: 18 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,24 @@ shared across all of the partitioned consumers. Increasing this value may
improve network throughput for data transferred between stages if the
network has high latency or if there are many nodes in the cluster.

``use-connector-provided-serialization-codecs``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

Enables the use of custom connector-provided serialization codecs for handles.
This feature allows connectors to use their own serialization format for
handle objects (such as table handles, column handles, and splits) instead
of standard JSON serialization.

When enabled, connectors that provide a ``ConnectorCodecProvider`` with
appropriate codecs will have their handles serialized using custom binary
formats, which are then Base64-encoded for transport. Connectors without
codec support automatically fall back to standard JSON serialization.
Internal Presto handles (prefixed with ``$``) always use JSON serialization
regardless of this setting.

.. _task-properties:

Task Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import com.facebook.airlift.bootstrap.Bootstrap;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.JsonModule;
import com.facebook.drift.codec.guice.ThriftCodecModule;
import com.facebook.presto.block.BlockJsonSerde;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncoding;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
Expand Down Expand Up @@ -153,8 +155,10 @@ private JsonCodec<HiveSplit> getJsonCodec()
{
Module module = binder -> {
binder.install(new JsonModule());
binder.install(new ThriftCodecModule());
binder.install(new HandleJsonModule());
configBinder(binder).bindConfig(FeaturesConfig.class);
binder.bind(ConnectorManager.class).toProvider(() -> null);
FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager();
binder.bind(TypeManager.class).toInstance(functionAndTypeManager);
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,16 @@ private Connector createConnector(ConnectorId connectorId, ConnectorFactory fact
}
}

public Optional<ConnectorCodecProvider> getConnectorCodecProvider(ConnectorId connectorId)
{
requireNonNull(connectorId, "connectorId is null");
MaterializedConnector materializedConnector = connectors.get(connectorId);
if (materializedConnector == null) {
return Optional.empty();
}
return materializedConnector.getConnectorCodecProvider();
}

private static class MaterializedConnector
{
private final ConnectorId connectorId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,47 @@
*/
package com.facebook.presto.index;

import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.metadata.AbstractTypedJacksonModule;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.spi.ConnectorCodec;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import jakarta.inject.Inject;
import jakarta.inject.Provider;

import java.util.Optional;
import java.util.function.Function;

public class IndexHandleJacksonModule
extends AbstractTypedJacksonModule<ConnectorIndexHandle>
{
@Inject
public IndexHandleJacksonModule(HandleResolver handleResolver)
public IndexHandleJacksonModule(
HandleResolver handleResolver,
Provider<ConnectorManager> connectorManagerProvider,
FeaturesConfig featuresConfig)
{
super(ConnectorIndexHandle.class,
handleResolver::getId,
handleResolver::getIndexHandleClass,
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
connectorId -> connectorManagerProvider.get()
.getConnectorCodecProvider(connectorId)
.flatMap(ConnectorCodecProvider::getConnectorIndexHandleCodec));
}

public IndexHandleJacksonModule(
HandleResolver handleResolver,
FeaturesConfig featuresConfig,
Function<ConnectorId, Optional<ConnectorCodec<ConnectorIndexHandle>>> codecExtractor)
{
super(ConnectorIndexHandle.class,
handleResolver::getId,
handleResolver::getIndexHandleClass);
handleResolver::getIndexHandleClass,
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
codecExtractor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorCodec;
import com.facebook.presto.spi.ConnectorId;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -38,6 +40,7 @@
import com.google.common.cache.CacheBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

Expand All @@ -49,18 +52,38 @@ public abstract class AbstractTypedJacksonModule<T>
extends SimpleModule
{
private static final String TYPE_PROPERTY = "@type";
private static final String DATA_PROPERTY = "customSerializedValue";

protected AbstractTypedJacksonModule(
Class<T> baseClass,
Function<T, String> nameResolver,
Function<String, Class<? extends T>> classResolver)
Function<String, Class<? extends T>> classResolver,
boolean binarySerializationEnabled,
Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor)
{
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());

TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
requireNonNull(baseClass, "baseClass is null");
requireNonNull(nameResolver, "nameResolver is null");
requireNonNull(classResolver, "classResolver is null");
requireNonNull(codecExtractor, "codecExtractor is null");

addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
if (binarySerializationEnabled) {
// Use codec serialization
addSerializer(baseClass, new CodecSerializer<>(
TYPE_PROPERTY,
DATA_PROPERTY,
codecExtractor,
nameResolver,
new InternalTypeResolver<>(nameResolver, classResolver)));
addDeserializer(baseClass, new CodecDeserializer<>(TYPE_PROPERTY, DATA_PROPERTY, codecExtractor, classResolver));
}
else {
// Use legacy typed serialization
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
}
}

private static class InternalTypeDeserializer<T>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorCodec;
import com.facebook.presto.spi.ConnectorId;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

class CodecDeserializer<T>
extends JsonDeserializer<T>
{
private final Function<String, Class<? extends T>> classResolver;
private final Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor;
private final String typePropertyName;
private final String dataPropertyName;

public CodecDeserializer(
String typePropertyName,
String dataPropertyName,
Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor,
Function<String, Class<? extends T>> classResolver)
{
this.classResolver = requireNonNull(classResolver, "classResolver is null");
this.codecExtractor = requireNonNull(codecExtractor, "codecExtractor is null");
this.typePropertyName = requireNonNull(typePropertyName, "typePropertyName is null");
this.dataPropertyName = requireNonNull(dataPropertyName, "dataPropertyName is null");
}

@Override
public T deserialize(JsonParser parser, DeserializationContext context)
throws IOException
{
if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
return null;
}

if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
throw new IOException("Expected START_OBJECT, got " + parser.getCurrentToken());
}

// Parse the JSON tree
TreeNode tree = parser.readValueAsTree();

if (tree instanceof ObjectNode) {
ObjectNode node = (ObjectNode) tree;

// Get the @type field
if (!node.has(typePropertyName)) {
throw new IOException("Missing " + typePropertyName + " field");
}
String connectorIdString = node.get(typePropertyName).asText();
// Check if @data field is present (binary serialization)
if (node.has(dataPropertyName)) {
// Binary data is present, we need a codec to deserialize it
// Special handling for internal handles like "$remote"
if (!connectorIdString.startsWith("$")) {
ConnectorId connectorId = new ConnectorId(connectorIdString);
Optional<ConnectorCodec<T>> codec = codecExtractor.apply(connectorId);
if (codec.isPresent()) {
String base64Data = node.get(dataPropertyName).asText();
byte[] data = Base64.getDecoder().decode(base64Data);
return codec.get().deserialize(data);
}
}
// @data field present but no codec available or internal handle
throw new IOException("Type " + connectorIdString + " has binary data (customSerializedValue field) but no codec available to deserialize it");
}

// No @data field - use standard JSON deserialization
Class<? extends T> handleClass = classResolver.apply(connectorIdString);

// Remove the @type field and deserialize the remaining content
node.remove(typePropertyName);
return context.readTreeAsValue(node, handleClass);
}

throw new IOException("Unable to deserialize");
}

@Override
public T deserializeWithType(JsonParser p, DeserializationContext ctxt,
TypeDeserializer typeDeserializer)
throws IOException
{
// We handle the type ourselves
return deserialize(p, ctxt);
}

@Override
public T deserializeWithType(JsonParser p, DeserializationContext ctxt,
TypeDeserializer typeDeserializer, T intoValue)
throws IOException
{
// We handle the type ourselves
return deserialize(p, ctxt);
}
}
Loading
Loading