Skip to content

Commit f699c09

Browse files
hdikemanfacebook-github-bot
authored andcommitted
feat(client): Add serialized response format option to Presto client (prestodb#26654)
Summary: The capability exists for Presto server to return serialized responses, but is currently not enabled in the client nor piped through to the CLI. As far as I know, there is no OSS client which can read using the serialized interface. It makes it kind of annoying to test out any changes in this area Adding support in the client to request and process serialized responses from the server To support the new client processing, I created a TypeManager and binary deserializer for use in the client NOTE: right now I only handle the types located in presto-common on the serialized path; there are some additional types declared in presto-main-base which the client does not have access to. Those could either be moved or redefined in presto-client, but since serialized APIs are opt-in and fails fast, it seemed acceptable to defer support for those types Differential Revision: D87268210
1 parent ee1a9f7 commit f699c09

File tree

18 files changed

+1786
-7
lines changed

18 files changed

+1786
-7
lines changed

presto-benchmark-driver/src/main/java/com/facebook/presto/benchmark/driver/BenchmarkDriverOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public ClientSession getClientSession()
111111
disableCompression,
112112
ImmutableMap.of(),
113113
ImmutableMap.of(),
114+
false,
114115
false);
115116
}
116117

presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ public class ClientOptions
162162
@Option(name = "--disable-redirects", title = "disable redirects", description = "Disable client following redirects from server")
163163
public boolean disableRedirects;
164164

165+
@Option(name = "--serialized", title = "enabled serialized results", description = "Enable serialized response encoding from server")
166+
public boolean serialized;
167+
165168
public enum OutputFormat
166169
{
167170
ALIGNED,
@@ -197,7 +200,8 @@ public ClientSession toClientSession()
197200
disableCompression,
198201
emptyMap(),
199202
emptyMap(),
200-
validateNextUriSource);
203+
validateNextUriSource,
204+
serialized);
201205
}
202206

203207
public static URI parseServer(String server)

presto-cli/src/test/java/com/facebook/presto/cli/AbstractCliTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ protected ClientSession createMockClientSession()
8585
true,
8686
ImmutableMap.of(),
8787
ImmutableMap.of(),
88+
false,
8889
false);
8990
}
9091

presto-cli/src/test/java/com/facebook/presto/cli/TestClientOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,12 @@ public void testValidateNextUriSource()
192192
assertTrue(console.clientOptions.validateNextUriSource);
193193
assertTrue(console.clientOptions.toClientSession().validateNextUriSource());
194194
}
195+
196+
@Test
197+
public void testSerializedFormat()
198+
{
199+
Console console = singleCommand(Console.class).parse("--serialized");
200+
assertTrue(console.clientOptions.serialized);
201+
assertTrue(console.clientOptions.toClientSession().isBinaryResults());
202+
}
195203
}

presto-client/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
<artifactId>presto-common</artifactId>
2929
</dependency>
3030

31+
<dependency>
32+
<groupId>io.airlift</groupId>
33+
<artifactId>slice</artifactId>
34+
</dependency>
35+
3136
<dependency>
3237
<groupId>com.google.errorprone</groupId>
3338
<artifactId>error_prone_annotations</artifactId>
@@ -117,6 +122,13 @@
117122
</dependency>
118123

119124
<!-- for testing -->
125+
<dependency>
126+
<groupId>com.facebook.presto</groupId>
127+
<artifactId>presto-common</artifactId>
128+
<type>test-jar</type>
129+
<scope>test</scope>
130+
</dependency>
131+
120132
<dependency>
121133
<groupId>com.facebook.airlift</groupId>
122134
<artifactId>concurrent</artifactId>
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.client;
15+
16+
import com.facebook.presto.common.Page;
17+
import com.facebook.presto.common.block.Block;
18+
import com.facebook.presto.common.block.BlockEncodingSerde;
19+
import com.facebook.presto.common.function.SqlFunctionProperties;
20+
import com.facebook.presto.common.type.Type;
21+
import com.facebook.presto.common.type.TypeManager;
22+
import com.facebook.presto.common.type.TypeSignature;
23+
import com.facebook.presto.common.type.TypeSignatureParameter;
24+
import com.facebook.presto.spi.page.PagesSerde;
25+
import com.facebook.presto.spi.page.SerializedPage;
26+
import com.google.common.collect.ImmutableList;
27+
import io.airlift.slice.BasicSliceInput;
28+
import io.airlift.slice.Slice;
29+
import io.airlift.slice.Slices;
30+
31+
import java.util.ArrayList;
32+
import java.util.Base64;
33+
import java.util.Collections;
34+
import java.util.List;
35+
import java.util.Optional;
36+
37+
import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage;
38+
import static com.google.common.base.Preconditions.checkArgument;
39+
import static java.util.Objects.requireNonNull;
40+
41+
public class BinaryDataDeserializer
42+
{
43+
private final PagesSerde pagesSerde;
44+
private final TypeManager typeManager;
45+
private final SqlFunctionProperties sqlFunctionProperties;
46+
47+
public BinaryDataDeserializer(
48+
BlockEncodingSerde blockEncodingSerde,
49+
TypeManager typeManager,
50+
ClientSession session)
51+
{
52+
requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
53+
requireNonNull(typeManager, "typeManager is null");
54+
requireNonNull(session, "session is null");
55+
56+
this.pagesSerde = new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), Optional.empty(), false);
57+
this.typeManager = typeManager;
58+
this.sqlFunctionProperties = createSqlFunctionPropertiesFromSession(session);
59+
}
60+
61+
public Iterable<List<Object>> deserialize(List<Column> columns, Iterable<String> binaryData)
62+
{
63+
requireNonNull(columns, "columns is null");
64+
requireNonNull(binaryData, "binaryData is null");
65+
66+
List<Type> columnTypes = extractTypesFromColumns(columns);
67+
ImmutableList.Builder<List<Object>> allRows = ImmutableList.builder();
68+
69+
for (String encodedPage : binaryData) {
70+
byte[] pageBytes = Base64.getDecoder().decode(encodedPage);
71+
Slice slice = Slices.wrappedBuffer(pageBytes);
72+
73+
BasicSliceInput sliceInput = slice.getInput();
74+
SerializedPage serializedPage = readSerializedPage(sliceInput);
75+
76+
Page page = pagesSerde.deserialize(serializedPage);
77+
78+
allRows.addAll(convertPageToRows(page, columnTypes));
79+
}
80+
81+
return allRows.build();
82+
}
83+
84+
private List<List<Object>> convertPageToRows(Page page, List<Type> columnTypes)
85+
{
86+
checkArgument(
87+
page.getChannelCount() == columnTypes.size(),
88+
"Expected %s columns in serialized page, found %s",
89+
columnTypes.size(),
90+
page.getChannelCount());
91+
92+
ImmutableList.Builder<List<Object>> rows = ImmutableList.builder();
93+
94+
for (int position = 0; position < page.getPositionCount(); position++) {
95+
List<Object> row = new ArrayList<>(page.getChannelCount());
96+
97+
for (int channel = 0; channel < page.getChannelCount(); channel++) {
98+
Type type = columnTypes.get(channel);
99+
Block block = page.getBlock(channel);
100+
101+
Object value = type.getObjectValue(sqlFunctionProperties, block, position);
102+
row.add(value);
103+
}
104+
105+
rows.add(Collections.unmodifiableList(row));
106+
}
107+
108+
return rows.build();
109+
}
110+
111+
private List<Type> extractTypesFromColumns(List<Column> columns)
112+
{
113+
ImmutableList.Builder<Type> types = ImmutableList.builder();
114+
115+
for (Column column : columns) {
116+
ClientTypeSignature clientTypeSignature = column.getTypeSignature();
117+
TypeSignature typeSignature = convertClientTypeSignatureToTypeSignature(clientTypeSignature);
118+
Type type = typeManager.getType(typeSignature);
119+
120+
if (type == null) {
121+
throw new IllegalArgumentException("Unknown type: " + typeSignature);
122+
}
123+
124+
types.add(type);
125+
}
126+
127+
return types.build();
128+
}
129+
130+
private TypeSignature convertClientTypeSignatureToTypeSignature(ClientTypeSignature clientTypeSignature)
131+
{
132+
List<TypeSignatureParameter> parameters = new ArrayList<>();
133+
134+
for (ClientTypeSignatureParameter argument : clientTypeSignature.getArguments()) {
135+
parameters.add(convertClientTypeSignatureParameterToTypeSignatureParameter(argument));
136+
}
137+
138+
return new TypeSignature(clientTypeSignature.getRawType(), parameters);
139+
}
140+
141+
private TypeSignatureParameter convertClientTypeSignatureParameterToTypeSignatureParameter(
142+
ClientTypeSignatureParameter parameter)
143+
{
144+
switch (parameter.getKind()) {
145+
case TYPE:
146+
return TypeSignatureParameter.of(
147+
convertClientTypeSignatureToTypeSignature(parameter.getTypeSignature()));
148+
case LONG:
149+
return TypeSignatureParameter.of(parameter.getLongLiteral());
150+
case NAMED_TYPE:
151+
return TypeSignatureParameter.of(parameter.getNamedTypeSignature());
152+
default:
153+
throw new UnsupportedOperationException("Unknown parameter kind: " + parameter.getKind());
154+
}
155+
}
156+
157+
private SqlFunctionProperties createSqlFunctionPropertiesFromSession(ClientSession session)
158+
{
159+
return SqlFunctionProperties.builder()
160+
.setTimeZoneKey(session.getTimeZone())
161+
.setSessionLocale(session.getLocale())
162+
.setSessionUser(session.getUser())
163+
.setSessionStartTime(System.currentTimeMillis())
164+
.build();
165+
}
166+
}

presto-client/src/main/java/com/facebook/presto/client/ClientSession.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class ClientSession
5555
private final boolean compressionDisabled;
5656
private final Map<String, String> sessionFunctions;
5757
private final boolean validateNextUriSource;
58+
private final boolean binaryResults;
5859

5960
public static Builder builder(ClientSession clientSession)
6061
{
@@ -89,7 +90,8 @@ public ClientSession(
8990
boolean compressionDisabled,
9091
Map<String, String> sessionFunctions,
9192
Map<String, String> customHeaders,
92-
boolean validateNextUriSource)
93+
boolean validateNextUriSource,
94+
boolean binaryResults)
9395
{
9496
this.server = requireNonNull(server, "server is null");
9597
this.user = user;
@@ -112,6 +114,7 @@ public ClientSession(
112114
this.compressionDisabled = compressionDisabled;
113115
this.sessionFunctions = ImmutableMap.copyOf(requireNonNull(sessionFunctions, "sessionFunctions is null"));
114116
this.validateNextUriSource = validateNextUriSource;
117+
this.binaryResults = binaryResults;
115118

116119
for (String clientTag : clientTags) {
117120
checkArgument(!clientTag.contains(","), "client tag cannot contain ','");
@@ -263,6 +266,11 @@ public boolean validateNextUriSource()
263266
return validateNextUriSource;
264267
}
265268

269+
public boolean isBinaryResults()
270+
{
271+
return binaryResults;
272+
}
273+
266274
@Override
267275
public String toString()
268276
{
@@ -305,6 +313,7 @@ public static final class Builder
305313
private boolean compressionDisabled;
306314
private Map<String, String> sessionFunctions;
307315
private boolean validateNextUriSource;
316+
private boolean binaryResults;
308317

309318
private Builder(ClientSession clientSession)
310319
{
@@ -330,6 +339,7 @@ private Builder(ClientSession clientSession)
330339
compressionDisabled = clientSession.isCompressionDisabled();
331340
sessionFunctions = clientSession.getSessionFunctions();
332341
validateNextUriSource = clientSession.validateNextUriSource();
342+
binaryResults = clientSession.isBinaryResults();
333343
}
334344

335345
public Builder withCatalog(String catalog)
@@ -404,6 +414,12 @@ public Builder withValidateNextUriSource(final boolean validateNextUriSource)
404414
return this;
405415
}
406416

417+
public Builder withBinaryResults(boolean binaryResults)
418+
{
419+
this.binaryResults = binaryResults;
420+
return this;
421+
}
422+
407423
public ClientSession build()
408424
{
409425
return new ClientSession(
@@ -427,7 +443,8 @@ public ClientSession build()
427443
compressionDisabled,
428444
sessionFunctions,
429445
customHeaders,
430-
validateNextUriSource);
446+
validateNextUriSource,
447+
binaryResults);
431448
}
432449
}
433450
}

0 commit comments

Comments
 (0)