Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -557,3 +557,17 @@ output for each input batch.
If this is true, then the protocol::SpatialJoinNode is converted to a
velox::core::SpatialJoinNode. Otherwise, it is converted to a
velox::core::NestedLoopJoinNode.

``optimizer.optimize_top_n_rank``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

If this is true, then filter and limit queries for ``n`` rows of
``rank()`` and ``dense_rank()`` window function values are executed
with a special TopNRowNumber operator instead of the
WindowFunction operator.

The TopNRowNumber operator is more efficient than window as
it has a streaming behavior and does not need to buffer all input rows.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public final class SystemSessionProperties
public static final String ADAPTIVE_PARTIAL_AGGREGATION = "adaptive_partial_aggregation";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_ROWS_REDUCTION_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold";
public static final String OPTIMIZE_TOP_N_ROW_NUMBER = "optimize_top_n_row_number";
public static final String OPTIMIZE_TOP_N_RANK = "optimize_top_n_rank";
public static final String OPTIMIZE_CASE_EXPRESSION_PREDICATE = "optimize_case_expression_predicate";
public static final String MAX_GROUPING_SETS = "max_grouping_sets";
public static final String LEGACY_UNNEST = "legacy_unnest";
Expand Down Expand Up @@ -982,6 +983,11 @@ public SystemSessionProperties(
"Use top N row number optimization",
featuresConfig.isOptimizeTopNRowNumber(),
false),
booleanProperty(
OPTIMIZE_TOP_N_RANK,
"Use top N rank and dense_rank optimization",
featuresConfig.isOptimizeTopNRank(),
false),
booleanProperty(
OPTIMIZE_CASE_EXPRESSION_PREDICATE,
"Optimize case expression predicates",
Expand Down Expand Up @@ -2567,6 +2573,11 @@ public static boolean isOptimizeTopNRowNumber(Session session)
return session.getSystemProperty(OPTIMIZE_TOP_N_ROW_NUMBER, Boolean.class);
}

public static boolean isOptimizeTopNRank(Session session)
{
return session.getSystemProperty(OPTIMIZE_TOP_N_RANK, Boolean.class);
}

public static boolean isOptimizeCaseExpressionPredicate(Session session)
{
return session.getSystemProperty(OPTIMIZE_CASE_EXPRESSION_PREDICATE, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public class FeaturesConfig
private boolean adaptivePartialAggregationEnabled;
private double adaptivePartialAggregationRowsReductionRatioThreshold = 0.8;
private boolean optimizeTopNRowNumber = true;

private boolean optimizeTopNRank;
private boolean pushLimitThroughOuterJoin = true;
private boolean optimizeConstantGroupingKeys = true;

Expand Down Expand Up @@ -1142,13 +1144,25 @@ public boolean isOptimizeTopNRowNumber()
return optimizeTopNRowNumber;
}

public boolean isOptimizeTopNRank()
{
return optimizeTopNRank;
}

@Config("optimizer.optimize-top-n-row-number")
public FeaturesConfig setOptimizeTopNRowNumber(boolean optimizeTopNRowNumber)
{
this.optimizeTopNRowNumber = optimizeTopNRowNumber;
return this;
}

@Config("optimizer.optimize-top-n-rank")
public FeaturesConfig setOptimizeTopNRank(boolean optimizeTopNRank)
{
this.optimizeTopNRank = optimizeTopNRank;
return this;
}

public boolean isOptimizeCaseExpressionPredicate()
{
return optimizeCaseExpressionPredicate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ public Optional<PlanNode> visitTopNRowNumber(TopNRowNumberNode node, Context con
new DataOrganizationSpecification(
partitionBy,
node.getSpecification().getOrderingScheme().map(scheme -> getCanonicalOrderingScheme(scheme, context.getExpressions()))),
node.getRankingFunction(),
rowNumberVariable,
node.getMaxRowCountPerPartition(),
node.isPartial(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ else if (!maxByAggregations.isEmpty() && minByAggregations.isEmpty()) {
node.getStatsEquivalentPlanNode(),
node.getSource(),
dataOrganizationSpecification,
TopNRowNumberNode.RankingFunction.ROW_NUMBER,
rowNumberVariable,
1,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, PreferredPr
idAllocator.getNextId(),
child.getNode(),
node.getSpecification(),
node.getRankingFunction(),
node.getRowNumberVariable(),
node.getMaxRowCountPerPartition(),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, HashComputa
node.getId(),
child.getNode(),
node.getSpecification(),
node.getRankingFunction(),
node.getRowNumberVariable(),
node.getMaxRowCountPerPartition(),
node.isPartial(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ public Optional<DecorrelationResult> visitTopN(TopNNode node, Void context)
new DataOrganizationSpecification(
ImmutableList.copyOf(childDecorrelationResult.variablesToPropagate),
Optional.of(orderingScheme)),
TopNRowNumberNode.RankingFunction.ROW_NUMBER,
variableAllocator.newVariable("row_number", BIGINT),
toIntExact(node.getCount()),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ public PlanNode visitTopNRowNumber(TopNRowNumberNode node, RewriteContext<Set<Va
node.getStatsEquivalentPlanNode(),
source,
node.getSpecification(),
node.getRankingFunction(),
node.getRowNumberVariable(),
node.getMaxRowCountPerPartition(),
node.isPartial(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ public PlanNode visitTopNRowNumber(TopNRowNumberNode node, RewriteContext<Void>
node.getId(),
context.rewrite(node.getSource()),
canonicalizeAndDistinct(node.getSpecification()),
node.getRankingFunction(),
canonicalize(node.getRowNumberVariable()),
node.getMaxRowCountPerPartition(),
node.isPartial(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@
import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;

import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.presto.SystemSessionProperties.isNativeExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isOptimizeTopNRank;
import static com.facebook.presto.SystemSessionProperties.isOptimizeTopNRowNumber;
import static com.facebook.presto.common.predicate.Marker.Bound.BELOW;
import static com.facebook.presto.common.type.BigintType.BIGINT;
Expand Down Expand Up @@ -134,6 +137,12 @@ public PlanNode visitWindow(WindowNode node, RewriteContext<Void> context)
return replaceChildren(node, ImmutableList.of(rewrittenSource));
}

private boolean canReplaceWithTopNRowNumber(WindowNode node)
{
return (canOptimizeRowNumberFunction(node, metadata.getFunctionAndTypeManager()) && isOptimizeTopNRowNumber(session)) ||
(canOptimizeRankFunction(node, metadata.getFunctionAndTypeManager()) && isOptimizeTopNRank(session) && isNativeExecutionEnabled(session));
}

@Override
public PlanNode visitLimit(LimitNode node, RewriteContext<Void> context)
{
Expand All @@ -152,16 +161,18 @@ public PlanNode visitLimit(LimitNode node, RewriteContext<Void> context)
planChanged = true;
source = rowNumberNode;
}
else if (source instanceof WindowNode && canOptimizeWindowFunction((WindowNode) source, metadata.getFunctionAndTypeManager()) && isOptimizeTopNRowNumber(session)) {
else if (source instanceof WindowNode) {
WindowNode windowNode = (WindowNode) source;
// verify that unordered row_number window functions are replaced by RowNumberNode
verify(windowNode.getOrderingScheme().isPresent());
TopNRowNumberNode topNRowNumberNode = convertToTopNRowNumber(windowNode, limit);
if (windowNode.getPartitionBy().isEmpty()) {
return topNRowNumberNode;
if (canReplaceWithTopNRowNumber(windowNode)) {
TopNRowNumberNode topNRowNumberNode = convertToTopNRowNumber(windowNode, limit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TopNRowNumberNode topNRowNumberNode = convertToTopNRowNumber(windowNode, limit);
TopNRowNumberNode topNRowNumberNode = convertToTopNRank(windowNode, limit);

Should we call convertToTopNRank here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a good catch... I intended to remove the original convertToTopNRowNumber and change convertToTopNRank function to convertToTopNRowNumber to match the naming everywhere.

Have fixed that.

// Limit can be entirely skipped for row_number without partitioning.
if (windowNode.getPartitionBy().isEmpty() &&
canOptimizeRowNumberFunction(windowNode, metadata.getFunctionAndTypeManager())) {
return topNRowNumberNode;
}
planChanged = true;
source = topNRowNumberNode;
}
planChanged = true;
source = topNRowNumberNode;
}
return replaceChildren(node, ImmutableList.of(source));
}
Expand All @@ -183,15 +194,17 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
return rewriteFilterSource(node, source, rowNumberVariable, upperBound.getAsInt());
}
}
else if (source instanceof WindowNode && canOptimizeWindowFunction((WindowNode) source, metadata.getFunctionAndTypeManager()) && isOptimizeTopNRowNumber(session)) {
else if (source instanceof WindowNode) {
WindowNode windowNode = (WindowNode) source;
VariableReferenceExpression rowNumberVariable = getOnlyElement(windowNode.getCreatedVariable());
OptionalInt upperBound = extractUpperBound(tupleDomain, rowNumberVariable);

if (upperBound.isPresent()) {
source = convertToTopNRowNumber(windowNode, upperBound.getAsInt());
planChanged = true;
return rewriteFilterSource(node, source, rowNumberVariable, upperBound.getAsInt());
if (canReplaceWithTopNRowNumber(windowNode)) {
VariableReferenceExpression rowNumberVariable = getOnlyElement(windowNode.getCreatedVariable());
OptionalInt upperBound = extractUpperBound(tupleDomain, rowNumberVariable);

if (upperBound.isPresent()) {
source = convertToTopNRowNumber(windowNode, upperBound.getAsInt());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems that we can call convertToTopNRank here as well. And completely drop the method convertToTopNRowNumber?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

planChanged = true;
return rewriteFilterSource(node, source, rowNumberVariable, upperBound.getAsInt());
}
}
}
return replaceChildren(node, ImmutableList.of(source));
Expand Down Expand Up @@ -275,11 +288,30 @@ private static RowNumberNode mergeLimit(RowNumberNode node, int newRowCountPerPa

private TopNRowNumberNode convertToTopNRowNumber(WindowNode windowNode, int limit)
{
String windowFunction = Iterables.getOnlyElement(windowNode.getWindowFunctions().values()).getFunctionCall().getFunctionHandle().getName();
String[] parts = windowFunction.split("\\.");
String windowFunctionName = parts[parts.length - 1];
TopNRowNumberNode.RankingFunction rankingFunction;
switch (windowFunctionName) {
case "row_number":
rankingFunction = TopNRowNumberNode.RankingFunction.ROW_NUMBER;
break;
case "rank":
rankingFunction = TopNRowNumberNode.RankingFunction.RANK;
break;
case "dense_rank":
rankingFunction = TopNRowNumberNode.RankingFunction.DENSE_RANK;
break;
default:
throw new IllegalArgumentException("Unsupported window function for TopNRowNumberNode: " + windowFunctionName);
}

return new TopNRowNumberNode(
windowNode.getSourceLocation(),
idAllocator.getNextId(),
windowNode.getSource(),
windowNode.getSpecification(),
rankingFunction,
getOnlyElement(windowNode.getCreatedVariable()),
limit,
false,
Expand All @@ -288,22 +320,37 @@ private TopNRowNumberNode convertToTopNRowNumber(WindowNode windowNode, int limi

private static boolean canReplaceWithRowNumber(WindowNode node, FunctionAndTypeManager functionAndTypeManager)
{
return canOptimizeWindowFunction(node, functionAndTypeManager) && !node.getOrderingScheme().isPresent();
return canOptimizeRowNumberFunction(node, functionAndTypeManager) && !node.getOrderingScheme().isPresent();
}

private static boolean canOptimizeRowNumberFunction(WindowNode node, FunctionAndTypeManager functionAndTypeManager)
{
if (node.getWindowFunctions().size() != 1) {
return false;
}
return isRowNumberMetadata(functionAndTypeManager, functionAndTypeManager.getFunctionMetadata(getOnlyElement(node.getWindowFunctions().values()).getFunctionHandle()));
}

private static boolean canOptimizeWindowFunction(WindowNode node, FunctionAndTypeManager functionAndTypeManager)
private static boolean canOptimizeRankFunction(WindowNode node, FunctionAndTypeManager functionAndTypeManager)
{
if (node.getWindowFunctions().size() != 1) {
return false;
}
VariableReferenceExpression rowNumberVariable = getOnlyElement(node.getWindowFunctions().keySet());
return isRowNumberMetadata(functionAndTypeManager, functionAndTypeManager.getFunctionMetadata(node.getWindowFunctions().get(rowNumberVariable).getFunctionHandle()));
return isRankMetadata(functionAndTypeManager, functionAndTypeManager.getFunctionMetadata(getOnlyElement(node.getWindowFunctions().values()).getFunctionHandle()));
}

private static boolean isRowNumberMetadata(FunctionAndTypeManager functionAndTypeManager, FunctionMetadata functionMetadata)
{
FunctionHandle rowNumberFunction = functionAndTypeManager.lookupFunction("row_number", ImmutableList.of());
return functionMetadata.equals(functionAndTypeManager.getFunctionMetadata(rowNumberFunction));
}

private static boolean isRankMetadata(FunctionAndTypeManager functionAndTypeManager, FunctionMetadata functionMetadata)
{
FunctionHandle rankFunction = functionAndTypeManager.lookupFunction("rank", ImmutableList.of());
FunctionHandle denseRankFunction = functionAndTypeManager.lookupFunction("dense_rank", ImmutableList.of());
return functionMetadata.equals(functionAndTypeManager.getFunctionMetadata(rankFunction)) ||
functionMetadata.equals(functionAndTypeManager.getFunctionMetadata(denseRankFunction));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@
public final class TopNRowNumberNode
extends InternalPlanNode
{
public enum RankingFunction
{
ROW_NUMBER,
RANK,
DENSE_RANK
}

private final PlanNode source;
private final DataOrganizationSpecification specification;
private final RankingFunction rankingFunction;
private final VariableReferenceExpression rowNumberVariable;
private final int maxRowCountPerPartition;
private final boolean partial;
Expand All @@ -48,12 +56,13 @@ public TopNRowNumberNode(
@JsonProperty("id") PlanNodeId id,
@JsonProperty("source") PlanNode source,
@JsonProperty("specification") DataOrganizationSpecification specification,
@JsonProperty("rankingType") RankingFunction rankingFunction,
@JsonProperty("rowNumberVariable") VariableReferenceExpression rowNumberVariable,
@JsonProperty("maxRowCountPerPartition") int maxRowCountPerPartition,
@JsonProperty("partial") boolean partial,
@JsonProperty("hashVariable") Optional<VariableReferenceExpression> hashVariable)
{
this(sourceLocation, id, Optional.empty(), source, specification, rowNumberVariable, maxRowCountPerPartition, partial, hashVariable);
this(sourceLocation, id, Optional.empty(), source, specification, rankingFunction, rowNumberVariable, maxRowCountPerPartition, partial, hashVariable);
}

public TopNRowNumberNode(
Expand All @@ -62,6 +71,7 @@ public TopNRowNumberNode(
Optional<PlanNode> statsEquivalentPlanNode,
PlanNode source,
DataOrganizationSpecification specification,
RankingFunction rankingFunction,
VariableReferenceExpression rowNumberVariable,
int maxRowCountPerPartition,
boolean partial,
Expand All @@ -75,9 +85,11 @@ public TopNRowNumberNode(
requireNonNull(rowNumberVariable, "rowNumberVariable is null");
checkArgument(maxRowCountPerPartition > 0, "maxRowCountPerPartition must be > 0");
requireNonNull(hashVariable, "hashVariable is null");
requireNonNull(rankingFunction, "rankingFunction is null");

this.source = source;
this.specification = specification;
this.rankingFunction = rankingFunction;
this.rowNumberVariable = rowNumberVariable;
this.maxRowCountPerPartition = maxRowCountPerPartition;
this.partial = partial;
Expand Down Expand Up @@ -113,6 +125,12 @@ public DataOrganizationSpecification getSpecification()
return specification;
}

@JsonProperty
public RankingFunction getRankingFunction()
{
return rankingFunction;
}

public List<VariableReferenceExpression> getPartitionBy()
{
return specification.getPartitionBy();
Expand Down Expand Up @@ -156,12 +174,12 @@ public <R, C> R accept(InternalPlanVisitor<R, C> visitor, C context)
@Override
public PlanNode replaceChildren(List<PlanNode> newChildren)
{
return new TopNRowNumberNode(getSourceLocation(), getId(), getStatsEquivalentPlanNode(), Iterables.getOnlyElement(newChildren), specification, rowNumberVariable, maxRowCountPerPartition, partial, hashVariable);
return new TopNRowNumberNode(getSourceLocation(), getId(), getStatsEquivalentPlanNode(), Iterables.getOnlyElement(newChildren), specification, rankingFunction, rowNumberVariable, maxRowCountPerPartition, partial, hashVariable);
}

@Override
public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode)
{
return new TopNRowNumberNode(getSourceLocation(), getId(), statsEquivalentPlanNode, source, specification, rowNumberVariable, maxRowCountPerPartition, partial, hashVariable);
return new TopNRowNumberNode(getSourceLocation(), getId(), statsEquivalentPlanNode, source, specification, rankingFunction, rowNumberVariable, maxRowCountPerPartition, partial, hashVariable);
}
}
Loading
Loading