-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat: TVF Part 7/X Final analyzer/planner/optimizer changes for tvf #26445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @mohsaka, your pull request is larger than the review limit of 150000 diff characters
056ebac to
4f63c71
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @mohsaka, your pull request is larger than the review limit of 150000 diff characters
aditi-pandit
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mohsaka.
It might get easier to review the code if we split as follows :
i) Changes in TestingTableFunctions.java
ii) Add the TableFunctionNode changes and the basic structural wiring (in Printer, GraphVizPrinter, PlanBuilder, PlanMatcher)
iii) Similar changes ffor TableFunctionProcessorNode..The PlanNode definition and the basic structural wiring like above.
iv) There are bunch of changes like those in Field.java, StatementAnalyzer changes which can be one off changes and can be made independent of this PR.
v) Basic Planner rule for ImplementTableFunctionSource
vi) PruneTableFunction* rules + TestPrune*
vii) Remove* rules + TestRemove*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this change to a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't understand the reason for the changes in this file. Why did you have to make this class public ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coerce function needed for RelationPlanner
QueryPlanner.PlanAndMappings copartitionCoercions = partitionQueryPlanner.coerce(sourcePlanBuilder, partitioningColumns, analysis, idAllocator, variableAllocator, metadata);
sourcePlanBuilder = copartitionCoercions.getSubPlan();
partitionBy = partitioningColumns.stream()
.map(copartitionCoercions::get)
.collect(toImmutableList());
| Scope inputScope = analysis.getScope(tableArgumentsByName.get(name).getRelation()); | ||
| columns.stream() | ||
| .filter(column -> column < 0 || column >= inputScope.getRelationType().getAllFieldCount()) // hidden columns can be required as well as visible columns | ||
| .filter(column -> column < 0 || column >= inputScope.getRelationType().getVisibleFieldCount()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be added as a single change without the others.
4f63c71 to
95fd75c
Compare
aditi-pandit
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code looks good. Will do one more read.
| // for processing or for pass-through. null value in the marker column indicates that the value at the same | ||
| // position in the source column should not be processed or passed-through. | ||
| // the mapping is only present if there are two or more sources. | ||
| private final Optional<Map<VariableReferenceExpression, VariableReferenceExpression>> markerVariables; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment with an example for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aditi-pandit Thanks. I've added an example in the comment.
| return orderBy; | ||
| } | ||
|
|
||
| static void addPassthroughColumns(ImmutableList.Builder<VariableReferenceExpression> outputVariables, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment about the usage of this function and explanation for each parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aditi-pandit Thanks. I've added a comment for the explannation.
|
@jaystarshot : PTAL. This code has the main planner changes for Table function. |
a952a34 to
33da439
Compare
| * - source T2(a2, b2) | ||
| * </pre> | ||
| */ | ||
| public class ImplementTableFunctionSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the naming of this rule is non intuitive, can it be improved? eg
TransformTableFunctionToProcessorNodeRule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaystarshot Thanks. I've renamed it to TransformTableFunctionToTableFunctionProcessor. What do you think of this name?
jaystarshot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is missing end to end integration (execution) tests, without those its hard to say if the addExchanges etc is correct. Can you please add them
33da439 to
f5e5c48
Compare
07de1c5 to
1207da6
Compare
|
@jaystarshot Thanks for the review. Please let me know if this works well for your review, or if you have any other thoughts. Thanks! |
|
Thanks @xin-zhang2 ,will review this as the source of truth. I might need help reviewing some execution changes later since i would ooo mid next week |
|
@tdcmeehan Hi Tim, would you mind helping review this PR while Jay is ooo, if possible? Thanks in advance! |
presto-main-base/src/main/java/com/facebook/presto/operator/RegularTableFunctionPartition.java
Outdated
Show resolved
Hide resolved
| PlanOptimizer predicatePushDown = new StatsRecordingPlanOptimizer(optimizerStats, new PredicatePushDown(metadata, sqlParser, expressionOptimizerManager, featuresConfig.isNativeExecutionEnabled())); | ||
| PlanOptimizer prefilterForLimitingAggregation = new StatsRecordingPlanOptimizer(optimizerStats, new PrefilterForLimitingAggregation(metadata, statsCalculator)); | ||
|
|
||
| builder.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add comments on the rule/optimizer placement i.e explaining why each of these is placed where it is?
| import static com.google.common.base.Preconditions.checkState; | ||
| import static com.google.common.collect.Iterators.getOnlyElement; | ||
|
|
||
| public class RewriteExcludeColumnsFunctionToProjection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment explaining the rule working like other rules
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've added a comment to explain the plan change.
| @Override | ||
| public Result apply(TableFunctionProcessorNode node, Captures captures, Context context) | ||
| { | ||
| if (!(node.getHandle().getFunctionHandle() instanceof ExcludeColumnsFunctionHandle)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's good practice to add optimizer rules for each individual table function. This sets a precedent where every built-in TVF could add its own rewrite rule to PlanOptimizers. Can this come via connectorOptimizer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaystarshot Agree... I want to add SPI/connectorOptimizer kind of framework for Table Functions.
We ported the entire Trino implementation to Presto, so this came along with it. But I would be fine to remove this code.
wdyt ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaystarshot @aditi-pandit
This may not be introducing a new precedent, since PlanOptimizers already contains several function-specific rules. For example, CombineApproxPercentileFunctions applies only to approx_percentile, and MinMaxByToWindowFunction applies only to min_by and max_by.
The Built-in TVFs are registered in GlobalSystemConnector, and we don't have a connectorOptimizer for it yet.
Still, I agree that it would be better to place such rules in the optimizer of the connector that registers the function.
...ain/java/com/facebook/presto/sql/planner/iterative/rule/RewriteTableFunctionToTableScan.java
Outdated
Show resolved
Hide resolved
...ain/java/com/facebook/presto/sql/planner/iterative/rule/RewriteTableFunctionToTableScan.java
Outdated
Show resolved
Hide resolved
...m/facebook/presto/sql/planner/iterative/rule/TransformTableFunctionProcessorToTableScan.java
Show resolved
Hide resolved
| import static com.google.common.collect.Maps.filterKeys; | ||
|
|
||
| /** | ||
| * This rule prunes unreferenced outputs of TableFunctionProcessorNode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add some comments showing the plan changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've added a comment to explain the plan change.
| import static com.google.common.collect.ImmutableList.toImmutableList; | ||
|
|
||
| /** | ||
| * TableFunctionProcessorNode has two kinds of outputs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment on the rule changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've added a comment to explain the plan change.
|
|
||
| node.getHashSymbol().ifPresent(requiredInputs::add); | ||
|
|
||
| Optional<Map<VariableReferenceExpression, VariableReferenceExpression>> updatedMarkerSymbols = node.getMarkerVariables() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a comment before the marker related updates explaining them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've added a detailed comment in TableFunctionProcessorNode to explain the markers, as @aditi-pandit suggested in #26445 (comment).
Here, the markers are updated by removing the unreferenced columns, and the logic simply checks whether each marker is present in the requiredInputs. Since this is farirly straightforward, adding a separate comment for it might not be necessary.
| BOOLEAN, | ||
| ImmutableList.of( | ||
| new CallExpression(IS_DISTINCT_FROM.name(), | ||
| functionResolution.comparisonFunction(IS_DISTINCT_FROM, INTEGER, INTEGER), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this harcoded to INTEGER? It should be between left and righT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out. It should be the left and right column type for IS_DISTINCT_FROM.
I've updated it to leftColumn.getType() and rightColumn.getType().
| * Example transformation for two sources, both with set semantics | ||
| * and KEEP WHEN EMPTY property: | ||
| * <pre> | ||
| * - TableFunction foo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add some comment explaining why we need the marker approach? I think that is pretty non intuitive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Please see #26445 (comment) for the usage of the marker.
jaystarshot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a pass at all rules except TransformTableFunctionProcessorNode. Next will review that and other optimizer changes
…nPlanner and ExcludeColumns optimizer rule. Co-authored-by: kasiafi <[email protected]> Co-authored-by: Xin Zhang <[email protected]>
Co-authored-by: kasiafi <[email protected]> Co-authored-by: mohsaka <[email protected]>
Co-authored-by: kasiafi <[email protected]> Co-authored-by: mohsaka <[email protected]>
Changes adapted from trino/PR#16584 Author: kasiafi Co-authored-by: kasiafi <[email protected]> Co-authored-by: Xin Zhang <[email protected]>
Changes adapted from trino/PR#16716 Author: kasiafi Co-authored-by: kasiafi <[email protected]>
Changes adapted from trino/PR#25493 Author: kasiafi Co-authored-by: kasiafi <[email protected]>
3a90c38 to
0657f45
Compare
| * - source T2(a2, b2) | ||
| * </pre> | ||
| */ | ||
| public class TransformTableFunctionToTableFunctionProcessor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies if I have misread, but from this optimizer it looks like it's handling the case where co-partitioned tables have different numbers of rows for the same partition key by always matching overflow rows with row 1 of the shorter table.
SELECT *
FROM TABLE(
analytics.correlate_sales(
orders => TABLE(orders) t1(product_id, amount)
PARTITION BY product_id,
inventory => TABLE(inventory) t2(product_id, stock)
PARTITION BY product_id
COPARTITION (t1, t2)
)
);Given Data:
orders (3 rows for product_id=101):
product_id | amount
-----------+--------
101 | 500
101 | 300
101 | 200
inventory (5 rows for product_id=101):
product_id | stock
-----------+-------
101 | 50
101 | 40
101 | 30
101 | 20
101 | 10
After Window Functions (internal):
orders with row numbers:
product_id | amount | row_number | partition_size
-----------+--------+------------+----------------
101 | 500 | 1 | 3
101 | 300 | 2 | 3
101 | 200 | 3 | 3
inventory with row numbers:
product_id | stock | row_number | partition_size
-----------+-------+------------+----------------
101 | 50 | 1 | 5
101 | 40 | 2 | 5
101 | 30 | 3 | 5
101 | 20 | 4 | 5
101 | 10 | 5 | 5
Current code
combined_row | orders_rn | amount | inventory_rn | stock | Explanation
-------------+-----------+--------+--------------+-------+---------------------------
1 | 1 | 500 | 1 | 50 | ✓ Normal match (1=1)
2 | 2 | 300 | 2 | 40 | ✓ Normal match (2=2)
3 | 3 | 200 | 3 | 30 | ✓ Normal match (3=3)
4 | 1 | 500 | 4 | 20 | ← Overflow: repeats orders row 1
5 | 1 | 500 | 5 | 10 | ← Overflow: repeats orders row 1
Some high level questions
-
Why always use row 1 for overflow?
-
Is this behavior documented or SQL standard?
-
Have alternative approaches been considered? For example:
- Cycling through rows (modulo-based matching)
- Enforcing distinct partitions**
I'm asking because this behavior is a core design decision that affects how users reason about table function semantics, and it would be helpful to understand the rationale especially since it's not immediately obvious from the code or comments why row 1 was chosen specifically and I don't think the user has any idea of this behavior.
If its easier and if the behavior is not sql standard anyway then maybe we can just enforce distinct partitions for now / throw an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a UX perspective, I'm not even sure users understand the row-number alignment behavior. When they write PARTITION BY product_id, they likely think 'each product_id is one group' (like GROUP BY), not 'product_id can have multiple rows that get matched by position.' If a user specifies PARTITION BY and COPARTITION, I would expect they want/assume distinct partitions (each partition key appears at most once).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaystarshot Thanks a lot for the detailed review, really appreciate it.
This optimizer rule is ported from Trino and is quite complex, so let me try to explain it. @aditi-pandit @mohsaka Please correct me if I'm mistaken anywhere.
This rule transforms a TableFunctionNode into a TableFunctionProcessorNode. For functions with no table arguments or only a single table argument, the transformation is straightforward and requires little preprocessing.
However, for functions with multiple table arguments, the rule transforms co-partitioned tables into an outer join of their partitions, and transforms non-co-partitioned tables into a cross join of their partitions.
The join combines rows from each table’s partition by matching their row numbers. When partitions differ in size, it uses a special condition that repeats the first row of the smaller partition to align with the remaining rows of the larger partition—creating filler rows.
Two marker columns are added to indicate which original row of which table each joined row comes from. Rows with null markers represent filler rows.
In your example above, the join result is like
combined_row | orders_rn | amount | orders_mk | inventory_rn | stock | inventory_mk | Explanation
-------------+-----------+--------+-----------+--------------+-------+--------------+-------------------------------
1 | 1 | 500 | 1 | 1 | 50 | 1 | ✓ Normal match (1 = 1)
2 | 2 | 300 | 2 | 2 | 40 | 2 | ✓ Normal match (2 = 2)
3 | 3 | 200 | 3 | 3 | 30 | 3 | ✓ Normal match (3 = 3)
4 | 1 | 500 | null | 4 | 20 | 4 | ← Filler: repeats orders row 1
5 | 1 | 500 | null | 5 | 10 | 5 | ← Filler: repeats orders row 1
inventory_mk shows that all five rows are real data from the inventory table. orders_mk, on the other hand, shows that the last two rows are filler rows for the orders table. These filler rows will be ignored when processing the orders data later (In RegularTableFunctionPartition, the marker channels are used to identifiy the end of data for each input).
The purpose of this join is NOT to produce the table function’s output. In fact, it has nothing to do with the output of the table function. It exists solely to align the partitions of the input tables, so that each partition can be processed independently later in the table function operators.
The SQL standard requires creating a virutal processor for each partition, and the union of the results from all processors becomes the output of the table function. Therefore, the actual output of the table function depends entirely on how each processor handles the data from input tabels. Each function should implement the TableFunctionDataProcessor.process interface, which takes a list of pages (one from each table) and produces the result page. This processing logic is function-specific.
Regarding your questions,
- We use the first row of the smaller partition as the filler, but it doesn't matter which row is used, since filler rows are ignored later.
- It's not part of the SQL standared, but rahter a technique to satisfy the standard requirements. It doesn't affect the actual output, and should be completely transparent to users.
- As mentioned in 1, the filler rows doesn't matter.
Please let know if this clarifies things or if you have any further questions. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xin-zhang2 That is correct. I would like to point out the location of the comment here.
// For each source, all source's output symbols are mapped to the source's row number symbol.
// The row number symbol will be later converted to a marker of "real" input rows vs "filler" input rows of the source.
// The "filler" input rows are the rows appended while joining partitions of different lengths,
// to fill the smaller partition up to the bigger partition's size. They are a side effect of the algorithm,
// and should not be processed by the table function.
Map<VariableReferenceExpression, VariableReferenceExpression> rowNumberSymbols = finalResultSource.rowNumberSymbolsMapping();
This explains the reason for filling them with row 1. and also this next comment.
// Remap the symbol mapping: replace the row number symbol with the corresponding marker symbol.
// In the new map, every source symbol is associated with the corresponding marker symbol.
// Null value of the marker indicates that the source value should be ignored by the table function.
ImmutableMap<VariableReferenceExpression, VariableReferenceExpression> markerSymbols = rowNumberSymbols.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> marked.variableToMarker().get(entry.getValue())));
This markerSymbol represents null if the the row numbers do not match. It is used in the operator to represent the end of the processable data.
RegularTableFunctionPartition.java
// for each input channel, the end position of actual data in that channel (exclusive) relative to partition. The remaining rows are "filler" rows, and should not be passed to table function or passed-through
private final int[] endOfData;
0657f45 to
a72f91a
Compare
Description
This PR contains all final changes to SPI/analysis/planning/optimization. This PR does not include LocalExecutionPlanner changes or the exclude columns optimization which will come in future PRs.
Motivation and Context
Impact
Test Plan
Added new test cases.
Rule test cases:
TestImplementTableFunctionSource
TestPruneTableFunctionProcessorColumns
TestPruneTableFunctionProcessorSourceColumns
TestRemoveRedundantTableFunction
Planner test case:
planner/TestTableFunctionInvocation
Re-ran previous test cases to check for regressions:
test/TestTableFunctionInvocation
TestTableFunctionRegistry
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.