Skip to content

Commit 2f8bbba

Browse files
authored
feat: Distributed Procedure Support Part 1/X - core code base changes (#26373)
## Description This PR is the first part of many PRs to support distributed procedure into Presto. It is a split of the original entire PR which is located here: #22659. The whole work in this PR includes the following parts: 1. Re-factor `ProcedureRegistry`/`Procedure` data structure to support the creation and register of `DistributedProcedure`. And make sure `ProcedureRegistry` be available in `presto-analyzer` module and connectors, so that we can recognize distributed procedures in call statement during prepare analyze stages. 2. Handle call statement on distributed procedures in preparer stage. In this stage, we figure out the procedure's type in call statement, and define a new query type `CALL_DISTRIBUTED_PROCEDURE` for `call distributed procedure` in `BuiltInPreparedQuery`. In this way, `call distributed procedure` statement would be handled by `SqlQueryExecutionFactory`, then be created and handled as a `SqlQueryExecution`. 3. Analyze and plan the `call distributed procedure` statement based on the subtype of the distributed procedure. For subtype `TableDataRewriteDistributedProcedure`, ultimately generate a logical plan for it as follows: ``` OutputNode <- TableFinishNode <- CallDistributedProcedureNode <- FilterNode <- TableScanNode ``` 4. Optimize, segmentation, grouped tag and local plan for the logical plan generated above. The handle logical for `CallDistributedProcedureNode` is similar as `TableWriterNode`. Besides, a new optimizer `RewriteWriterTarget` is added, which is placed after all optimization rules. It is used to update the `TableHandle` held in `TableFinishNode` and `CallDistributedProcedureNode` based on the underlying `TableScanNode` after the entire optimization is completed, considering the possible filter pushing down. ## Motivation and Context prestodb/rfcs#12 ## Impact N/A ## Test Plan - Add test cases in each phase involving the procedure architecture expansion, including creating and registering for distributed procedures, preparing for call distributed procedure, analyzing for call distributed procedure, logical planning and optimizing for call distributed procedure ## Contributor checklist - [x] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [x] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [x] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [x] Adequate tests were added if applicable. - [x] CI passed. - [ ] If adding new dependencies, verified they have an [OpenSSF Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or higher (or obtained explicit TSC approval for lower scores). ## Release Notes ``` == RELEASE NOTES == General Changes * Upgrade the procedure architecture to support distributed executing procedures ```
1 parent 1a10043 commit 2f8bbba

File tree

109 files changed

+4090
-813
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+4090
-813
lines changed

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.facebook.presto.spi.function.FunctionKind;
3535
import com.facebook.presto.spi.function.table.Argument;
3636
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
37+
import com.facebook.presto.spi.procedure.DistributedProcedure;
3738
import com.facebook.presto.spi.security.AccessControl;
3839
import com.facebook.presto.spi.security.AccessControlContext;
3940
import com.facebook.presto.spi.security.AllowAllAccessControl;
@@ -176,6 +177,13 @@ public class Analysis
176177
private final Multiset<ColumnMaskScopeEntry> columnMaskScopes = HashMultiset.create();
177178
private final Map<NodeRef<Table>, Map<String, Expression>> columnMasks = new LinkedHashMap<>();
178179

180+
// for call distributed procedure
181+
private Optional<DistributedProcedure.DistributedProcedureType> distributedProcedureType = Optional.empty();
182+
private Optional<QualifiedObjectName> procedureName = Optional.empty();
183+
private Optional<Object[]> procedureArguments = Optional.empty();
184+
private Optional<TableHandle> callTarget = Optional.empty();
185+
private Optional<QuerySpecification> targetQuery = Optional.empty();
186+
179187
// for create table
180188
private Optional<QualifiedObjectName> createTableDestination = Optional.empty();
181189
private Map<String, Expression> createTableProperties = ImmutableMap.of();
@@ -670,6 +678,46 @@ public Optional<QualifiedObjectName> getCreateTableDestination()
670678
return createTableDestination;
671679
}
672680

681+
public Optional<QualifiedObjectName> getProcedureName()
682+
{
683+
return procedureName;
684+
}
685+
686+
public void setProcedureName(Optional<QualifiedObjectName> procedureName)
687+
{
688+
this.procedureName = procedureName;
689+
}
690+
691+
public Optional<DistributedProcedure.DistributedProcedureType> getDistributedProcedureType()
692+
{
693+
return distributedProcedureType;
694+
}
695+
696+
public void setDistributedProcedureType(Optional<DistributedProcedure.DistributedProcedureType> distributedProcedureType)
697+
{
698+
this.distributedProcedureType = distributedProcedureType;
699+
}
700+
701+
public Optional<Object[]> getProcedureArguments()
702+
{
703+
return procedureArguments;
704+
}
705+
706+
public void setProcedureArguments(Optional<Object[]> procedureArguments)
707+
{
708+
this.procedureArguments = procedureArguments;
709+
}
710+
711+
public Optional<TableHandle> getCallTarget()
712+
{
713+
return callTarget;
714+
}
715+
716+
public void setCallTarget(TableHandle callTarget)
717+
{
718+
this.callTarget = Optional.of(callTarget);
719+
}
720+
673721
public Optional<TableHandle> getAnalyzeTarget()
674722
{
675723
return analyzeTarget;
@@ -1044,6 +1092,16 @@ public Optional<QuerySpecification> getCurrentQuerySpecification()
10441092
return currentQuerySpecification;
10451093
}
10461094

1095+
public void setTargetQuery(QuerySpecification targetQuery)
1096+
{
1097+
this.targetQuery = Optional.of(targetQuery);
1098+
}
1099+
1100+
public Optional<QuerySpecification> getTargetQuery()
1101+
{
1102+
return this.targetQuery;
1103+
}
1104+
10471105
public Map<FunctionKind, Set<String>> getInvokedFunctions()
10481106
{
10491107
Map<FunctionKind, Set<String>> functionMap = new HashMap<>();

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/BuiltInQueryPreparer.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,25 @@
1313
*/
1414
package com.facebook.presto.sql.analyzer;
1515

16+
import com.facebook.presto.common.QualifiedObjectName;
1617
import com.facebook.presto.common.analyzer.PreparedQuery;
1718
import com.facebook.presto.common.resourceGroups.QueryType;
19+
import com.facebook.presto.spi.ConnectorId;
1820
import com.facebook.presto.spi.PrestoException;
1921
import com.facebook.presto.spi.PrestoWarning;
22+
import com.facebook.presto.spi.SchemaTableName;
2023
import com.facebook.presto.spi.WarningCollector;
2124
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
2225
import com.facebook.presto.spi.analyzer.QueryPreparer;
26+
import com.facebook.presto.spi.procedure.ProcedureRegistry;
2327
import com.facebook.presto.sql.analyzer.utils.StatementUtils;
2428
import com.facebook.presto.sql.parser.SqlParser;
29+
import com.facebook.presto.sql.tree.Call;
2530
import com.facebook.presto.sql.tree.Execute;
2631
import com.facebook.presto.sql.tree.Explain;
2732
import com.facebook.presto.sql.tree.ExplainType;
2833
import com.facebook.presto.sql.tree.Expression;
34+
import com.facebook.presto.sql.tree.QualifiedName;
2935
import com.facebook.presto.sql.tree.Statement;
3036
import com.google.common.collect.ImmutableList;
3137
import com.google.common.collect.ImmutableSet;
@@ -36,13 +42,15 @@
3642
import java.util.Optional;
3743

3844
import static com.facebook.presto.common.WarningHandlingLevel.AS_ERROR;
45+
import static com.facebook.presto.common.resourceGroups.QueryType.CALL_DISTRIBUTED_PROCEDURE;
3946
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
4047
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
4148
import static com.facebook.presto.spi.StandardErrorCode.WARNING_AS_ERROR;
4249
import static com.facebook.presto.sql.SqlFormatter.formatSql;
4350
import static com.facebook.presto.sql.analyzer.ConstantExpressionVerifier.verifyExpressionIsConstant;
4451
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_PARAMETER_USAGE;
4552
import static com.facebook.presto.sql.analyzer.utils.AnalyzerUtil.createParsingOptions;
53+
import static com.facebook.presto.sql.analyzer.utils.MetadataUtils.createQualifiedObjectName;
4654
import static com.facebook.presto.sql.analyzer.utils.ParameterExtractor.getParameterCount;
4755
import static com.facebook.presto.sql.tree.ExplainType.Type.VALIDATE;
4856
import static java.lang.String.format;
@@ -56,11 +64,15 @@ public class BuiltInQueryPreparer
5664
implements QueryPreparer
5765
{
5866
private final SqlParser sqlParser;
67+
private final ProcedureRegistry procedureRegistry;
5968

6069
@Inject
61-
public BuiltInQueryPreparer(SqlParser sqlParser)
70+
public BuiltInQueryPreparer(
71+
SqlParser sqlParser,
72+
ProcedureRegistry procedureRegistry)
6273
{
6374
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
75+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
6476
}
6577

6678
@Override
@@ -87,6 +99,18 @@ public BuiltInPreparedQuery prepareQuery(AnalyzerOptions analyzerOptions, Statem
8799
statement = sqlParser.createStatement(query, createParsingOptions(analyzerOptions));
88100
}
89101

102+
Optional<QualifiedObjectName> distributedProcedureName = Optional.empty();
103+
if (statement instanceof Call) {
104+
QualifiedName qualifiedName = ((Call) statement).getName();
105+
QualifiedObjectName qualifiedObjectName = createQualifiedObjectName(analyzerOptions.getSessionCatalogName(), analyzerOptions.getSessionSchemaName(),
106+
statement, qualifiedName, (catalogName, objectName) -> objectName);
107+
if (procedureRegistry.isDistributedProcedure(
108+
new ConnectorId(qualifiedObjectName.getCatalogName()),
109+
new SchemaTableName(qualifiedObjectName.getSchemaName(), qualifiedObjectName.getObjectName()))) {
110+
distributedProcedureName = Optional.of(qualifiedObjectName);
111+
}
112+
}
113+
90114
if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
91115
Statement innerStatement = ((Explain) statement).getStatement();
92116
Optional<QueryType> innerQueryType = StatementUtils.getQueryType(innerStatement.getClass());
@@ -103,7 +127,7 @@ public BuiltInPreparedQuery prepareQuery(AnalyzerOptions analyzerOptions, Statem
103127
if (analyzerOptions.isLogFormattedQueryEnabled()) {
104128
formattedQuery = Optional.of(getFormattedQuery(statement, parameters));
105129
}
106-
return new BuiltInPreparedQuery(wrappedStatement, statement, parameters, formattedQuery, prepareSql);
130+
return new BuiltInPreparedQuery(wrappedStatement, statement, parameters, formattedQuery, prepareSql, distributedProcedureName);
107131
}
108132

109133
private static String getFormattedQuery(Statement statement, List<Expression> parameters)
@@ -131,13 +155,19 @@ public static class BuiltInPreparedQuery
131155
private final Statement statement;
132156
private final Statement wrappedStatement;
133157
private final List<Expression> parameters;
158+
private final Optional<QualifiedObjectName> distributedProcedureName;
134159

135-
public BuiltInPreparedQuery(Statement wrappedStatement, Statement statement, List<Expression> parameters, Optional<String> formattedQuery, Optional<String> prepareSql)
160+
public BuiltInPreparedQuery(
161+
Statement wrappedStatement,
162+
Statement statement, List<Expression> parameters,
163+
Optional<String> formattedQuery, Optional<String> prepareSql,
164+
Optional<QualifiedObjectName> distributedProcedureName)
136165
{
137166
super(formattedQuery, prepareSql);
138167
this.wrappedStatement = requireNonNull(wrappedStatement, "wrappedStatement is null");
139168
this.statement = requireNonNull(statement, "statement is null");
140169
this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null"));
170+
this.distributedProcedureName = requireNonNull(distributedProcedureName, "distributedProcedureName is null");
141171
}
142172

143173
public Statement getStatement()
@@ -157,9 +187,17 @@ public List<Expression> getParameters()
157187

158188
public Optional<QueryType> getQueryType()
159189
{
190+
if (getDistributedProcedureName().isPresent()) {
191+
return Optional.of(CALL_DISTRIBUTED_PROCEDURE);
192+
}
160193
return StatementUtils.getQueryType(statement.getClass());
161194
}
162195

196+
public Optional<QualifiedObjectName> getDistributedProcedureName()
197+
{
198+
return this.distributedProcedureName;
199+
}
200+
163201
public boolean isTransactionControlStatement()
164202
{
165203
return StatementUtils.isTransactionControlStatement(getStatement());

presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/SemanticErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public enum SemanticErrorCode
9090

9191
SAMPLE_PERCENTAGE_OUT_OF_RANGE,
9292

93+
PROCEDURE_NOT_FOUND,
9394
INVALID_PROCEDURE_ARGUMENTS,
9495

9596
INVALID_SESSION_PROPERTY,
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.sql.analyzer.utils;
15+
16+
import com.facebook.presto.common.QualifiedObjectName;
17+
import com.facebook.presto.spi.PrestoException;
18+
import com.facebook.presto.sql.analyzer.SemanticException;
19+
import com.facebook.presto.sql.tree.Identifier;
20+
import com.facebook.presto.sql.tree.Node;
21+
import com.facebook.presto.sql.tree.QualifiedName;
22+
import com.google.common.collect.Lists;
23+
24+
import java.util.List;
25+
import java.util.Optional;
26+
import java.util.function.BiFunction;
27+
28+
import static com.facebook.presto.spi.StandardErrorCode.SYNTAX_ERROR;
29+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.CATALOG_NOT_SPECIFIED;
30+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SCHEMA_NOT_SPECIFIED;
31+
import static java.lang.String.format;
32+
import static java.util.Locale.ENGLISH;
33+
import static java.util.Objects.requireNonNull;
34+
35+
public class MetadataUtils
36+
{
37+
private MetadataUtils()
38+
{}
39+
40+
public static QualifiedObjectName createQualifiedObjectName(Optional<String> sessionCatalogName, Optional<String> sessionSchemaName, Node node, QualifiedName name,
41+
BiFunction<String, String, String> normalizer)
42+
{
43+
requireNonNull(sessionCatalogName, "sessionCatalogName is null");
44+
requireNonNull(sessionSchemaName, "sessionSchemaName is null");
45+
requireNonNull(name, "name is null");
46+
if (name.getParts().size() > 3) {
47+
throw new PrestoException(SYNTAX_ERROR, format("Too many dots in table name: %s", name));
48+
}
49+
50+
List<Identifier> parts = Lists.reverse(name.getOriginalParts());
51+
String objectName = parts.get(0).getValue();
52+
String schemaName = (parts.size() > 1) ? parts.get(1).getValue() : sessionSchemaName.orElseThrow(() ->
53+
new SemanticException(SCHEMA_NOT_SPECIFIED, node, "Schema must be specified when session schema is not set"));
54+
String catalogName = (parts.size() > 2) ? parts.get(2).getValue() : sessionCatalogName.orElseThrow(() ->
55+
new SemanticException(CATALOG_NOT_SPECIFIED, node, "Catalog must be specified when session catalog is not set"));
56+
57+
catalogName = catalogName.toLowerCase(ENGLISH);
58+
schemaName = normalizer.apply(catalogName, schemaName);
59+
objectName = normalizer.apply(catalogName, objectName);
60+
return new QualifiedObjectName(catalogName, schemaName, objectName);
61+
}
62+
}

0 commit comments

Comments
 (0)