Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public ClientSession getClientSession()
disableCompression,
ImmutableMap.of(),
ImmutableMap.of(),
false,
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ public class ClientOptions
@Option(name = "--disable-redirects", title = "disable redirects", description = "Disable client following redirects from server")
public boolean disableRedirects;

@Option(name = "--serialized", title = "enabled serialized results", description = "Enable serialized response encoding from server")
public boolean serialized;

public enum OutputFormat
{
ALIGNED,
Expand Down Expand Up @@ -197,7 +200,8 @@ public ClientSession toClientSession()
disableCompression,
emptyMap(),
emptyMap(),
validateNextUriSource);
validateNextUriSource,
serialized);
}

public static URI parseServer(String server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ protected ClientSession createMockClientSession()
true,
ImmutableMap.of(),
ImmutableMap.of(),
false,
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,12 @@ public void testValidateNextUriSource()
assertTrue(console.clientOptions.validateNextUriSource);
assertTrue(console.clientOptions.toClientSession().validateNextUriSource());
}

@Test
public void testSerializedFormat()
{
Console console = singleCommand(Console.class).parse("--serialized");
assertTrue(console.clientOptions.serialized);
assertTrue(console.clientOptions.toClientSession().isBinaryResults());
}
}
12 changes: 12 additions & 0 deletions presto-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<artifactId>presto-common</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
Expand Down Expand Up @@ -117,6 +122,13 @@
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>concurrent</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.client;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.common.type.TypeSignatureParameter;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.BasicSliceInput;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class BinaryDataDeserializer
{
private final PagesSerde pagesSerde;
private final TypeManager typeManager;
private final SqlFunctionProperties sqlFunctionProperties;

public BinaryDataDeserializer(
BlockEncodingSerde blockEncodingSerde,
TypeManager typeManager,
ClientSession session)
{
requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
requireNonNull(typeManager, "typeManager is null");
requireNonNull(session, "session is null");

this.pagesSerde = new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), Optional.empty(), false);
this.typeManager = typeManager;
this.sqlFunctionProperties = createSqlFunctionPropertiesFromSession(session);
}

public Iterable<List<Object>> deserialize(List<Column> columns, Iterable<String> binaryData)
{
requireNonNull(columns, "columns is null");
requireNonNull(binaryData, "binaryData is null");

List<Type> columnTypes = extractTypesFromColumns(columns);
ImmutableList.Builder<List<Object>> allRows = ImmutableList.builder();

for (String encodedPage : binaryData) {
byte[] pageBytes = Base64.getDecoder().decode(encodedPage);
Slice slice = Slices.wrappedBuffer(pageBytes);

BasicSliceInput sliceInput = slice.getInput();
SerializedPage serializedPage = readSerializedPage(sliceInput);

Page page = pagesSerde.deserialize(serializedPage);

allRows.addAll(convertPageToRows(page, columnTypes));
}

return allRows.build();
}

private List<List<Object>> convertPageToRows(Page page, List<Type> columnTypes)
{
checkArgument(
page.getChannelCount() == columnTypes.size(),
"Expected %s columns in serialized page, found %s",
columnTypes.size(),
page.getChannelCount());

ImmutableList.Builder<List<Object>> rows = ImmutableList.builder();

for (int position = 0; position < page.getPositionCount(); position++) {
List<Object> row = new ArrayList<>(page.getChannelCount());

for (int channel = 0; channel < page.getChannelCount(); channel++) {
Type type = columnTypes.get(channel);
Block block = page.getBlock(channel);

Object value = type.getObjectValue(sqlFunctionProperties, block, position);
row.add(value);
}

rows.add(Collections.unmodifiableList(row));
}

return rows.build();
}

private List<Type> extractTypesFromColumns(List<Column> columns)
{
ImmutableList.Builder<Type> types = ImmutableList.builder();

for (Column column : columns) {
ClientTypeSignature clientTypeSignature = column.getTypeSignature();
TypeSignature typeSignature = convertClientTypeSignatureToTypeSignature(clientTypeSignature);
Type type = typeManager.getType(typeSignature);

if (type == null) {
throw new IllegalArgumentException("Unknown type: " + typeSignature);
}
Comment on lines +111 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider logging or reporting unknown type errors with column context.

Including the column name or index in the exception message will help identify which column caused the error.

Suggested change
private List<Type> extractTypesFromColumns(List<Column> columns)
{
ImmutableList.Builder<Type> types = ImmutableList.builder();
for (Column column : columns) {
ClientTypeSignature clientTypeSignature = column.getTypeSignature();
TypeSignature typeSignature = convertClientTypeSignatureToTypeSignature(clientTypeSignature);
Type type = typeManager.getType(typeSignature);
if (type == null) {
throw new IllegalArgumentException("Unknown type: " + typeSignature);
}
private List<Type> extractTypesFromColumns(List<Column> columns)
{
ImmutableList.Builder<Type> types = ImmutableList.builder();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
ClientTypeSignature clientTypeSignature = column.getTypeSignature();
TypeSignature typeSignature = convertClientTypeSignatureToTypeSignature(clientTypeSignature);
Type type = typeManager.getType(typeSignature);
if (type == null) {
String columnName = column.getName();
throw new IllegalArgumentException(
String.format("Unknown type: %s for column '%s' (index %d)", typeSignature, columnName, i)
);
}


types.add(type);
}

return types.build();
}

private TypeSignature convertClientTypeSignatureToTypeSignature(ClientTypeSignature clientTypeSignature)
{
List<TypeSignatureParameter> parameters = new ArrayList<>();

for (ClientTypeSignatureParameter argument : clientTypeSignature.getArguments()) {
parameters.add(convertClientTypeSignatureParameterToTypeSignatureParameter(argument));
}

return new TypeSignature(clientTypeSignature.getRawType(), parameters);
}

private TypeSignatureParameter convertClientTypeSignatureParameterToTypeSignatureParameter(
ClientTypeSignatureParameter parameter)
{
switch (parameter.getKind()) {
case TYPE:
return TypeSignatureParameter.of(
convertClientTypeSignatureToTypeSignature(parameter.getTypeSignature()));
case LONG:
return TypeSignatureParameter.of(parameter.getLongLiteral());
case NAMED_TYPE:
return TypeSignatureParameter.of(parameter.getNamedTypeSignature());
default:
throw new UnsupportedOperationException("Unknown parameter kind: " + parameter.getKind());
}
}

private SqlFunctionProperties createSqlFunctionPropertiesFromSession(ClientSession session)
{
return SqlFunctionProperties.builder()
.setTimeZoneKey(session.getTimeZone())
.setSessionLocale(session.getLocale())
.setSessionUser(session.getUser())
.setSessionStartTime(System.currentTimeMillis())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ClientSession
private final boolean compressionDisabled;
private final Map<String, String> sessionFunctions;
private final boolean validateNextUriSource;
private final boolean binaryResults;

public static Builder builder(ClientSession clientSession)
{
Expand Down Expand Up @@ -89,7 +90,8 @@ public ClientSession(
boolean compressionDisabled,
Map<String, String> sessionFunctions,
Map<String, String> customHeaders,
boolean validateNextUriSource)
boolean validateNextUriSource,
boolean binaryResults)
{
this.server = requireNonNull(server, "server is null");
this.user = user;
Expand All @@ -112,6 +114,7 @@ public ClientSession(
this.compressionDisabled = compressionDisabled;
this.sessionFunctions = ImmutableMap.copyOf(requireNonNull(sessionFunctions, "sessionFunctions is null"));
this.validateNextUriSource = validateNextUriSource;
this.binaryResults = binaryResults;

for (String clientTag : clientTags) {
checkArgument(!clientTag.contains(","), "client tag cannot contain ','");
Expand Down Expand Up @@ -263,6 +266,11 @@ public boolean validateNextUriSource()
return validateNextUriSource;
}

public boolean isBinaryResults()
{
return binaryResults;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -305,6 +313,7 @@ public static final class Builder
private boolean compressionDisabled;
private Map<String, String> sessionFunctions;
private boolean validateNextUriSource;
private boolean binaryResults;

private Builder(ClientSession clientSession)
{
Expand All @@ -330,6 +339,7 @@ private Builder(ClientSession clientSession)
compressionDisabled = clientSession.isCompressionDisabled();
sessionFunctions = clientSession.getSessionFunctions();
validateNextUriSource = clientSession.validateNextUriSource();
binaryResults = clientSession.isBinaryResults();
}

public Builder withCatalog(String catalog)
Expand Down Expand Up @@ -404,6 +414,12 @@ public Builder withValidateNextUriSource(final boolean validateNextUriSource)
return this;
}

public Builder withBinaryResults(boolean binaryResults)
{
this.binaryResults = binaryResults;
return this;
}

public ClientSession build()
{
return new ClientSession(
Expand All @@ -427,7 +443,8 @@ public ClientSession build()
compressionDisabled,
sessionFunctions,
customHeaders,
validateNextUriSource);
validateNextUriSource,
binaryResults);
}
}
}
Loading
Loading