Skip to content

Conversation

@mohsaka
Copy link
Contributor

@mohsaka mohsaka commented Oct 27, 2025

Description

This PR contains all final changes to SPI/analysis/planning/optimization. This PR does not include LocalExecutionPlanner changes or the exclude columns optimization which will come in future PRs.

Motivation and Context

Impact

Test Plan

Added new test cases.

Rule test cases:
TestImplementTableFunctionSource
TestPruneTableFunctionProcessorColumns
TestPruneTableFunctionProcessorSourceColumns
TestRemoveRedundantTableFunction

Planner test case:
planner/TestTableFunctionInvocation

Re-ran previous test cases to check for regressions:
test/TestTableFunctionInvocation
TestTableFunctionRegistry

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== NO RELEASE NOTE ==

@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Oct 27, 2025
@mohsaka mohsaka changed the title Final analyzer/planner/optimizer changes for tvf feat: Final analyzer/planner/optimizer changes for tvf Oct 27, 2025
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @mohsaka, your pull request is larger than the review limit of 150000 diff characters

@mohsaka mohsaka force-pushed the tvf_analyzer_final branch from 056ebac to 4f63c71 Compare October 27, 2025 23:06
@mohsaka mohsaka changed the title feat: Final analyzer/planner/optimizer changes for tvf feat: TVF Part 7/X Final analyzer/planner/optimizer changes for tvf Oct 27, 2025
@mohsaka mohsaka requested a review from aditi-pandit October 27, 2025 23:27
@mohsaka mohsaka marked this pull request as ready for review October 27, 2025 23:27
@prestodb-ci prestodb-ci requested review from a team and nmahadevuni and removed request for a team October 27, 2025 23:27
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @mohsaka, your pull request is larger than the review limit of 150000 diff characters

Copy link
Contributor

@aditi-pandit aditi-pandit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mohsaka.

It might get easier to review the code if we split as follows :
i) Changes in TestingTableFunctions.java
ii) Add the TableFunctionNode changes and the basic structural wiring (in Printer, GraphVizPrinter, PlanBuilder, PlanMatcher)
iii) Similar changes ffor TableFunctionProcessorNode..The PlanNode definition and the basic structural wiring like above.
iv) There are bunch of changes like those in Field.java, StatementAnalyzer changes which can be one off changes and can be made independent of this PR.
v) Basic Planner rule for ImplementTableFunctionSource
vi) PruneTableFunction* rules + TestPrune*
vii) Remove* rules + TestRemove*

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this change to a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand the reason for the changes in this file. Why did you have to make this class public ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coerce function needed for RelationPlanner

                    QueryPlanner.PlanAndMappings copartitionCoercions = partitionQueryPlanner.coerce(sourcePlanBuilder, partitioningColumns, analysis, idAllocator, variableAllocator, metadata);
                    sourcePlanBuilder = copartitionCoercions.getSubPlan();
                    partitionBy = partitioningColumns.stream()
                            .map(copartitionCoercions::get)
                            .collect(toImmutableList());

Scope inputScope = analysis.getScope(tableArgumentsByName.get(name).getRelation());
columns.stream()
.filter(column -> column < 0 || column >= inputScope.getRelationType().getAllFieldCount()) // hidden columns can be required as well as visible columns
.filter(column -> column < 0 || column >= inputScope.getRelationType().getVisibleFieldCount())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be added as a single change without the others.

Copy link
Contributor

@aditi-pandit aditi-pandit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks good. Will do one more read.

// for processing or for pass-through. null value in the marker column indicates that the value at the same
// position in the source column should not be processed or passed-through.
// the mapping is only present if there are two or more sources.
private final Optional<Map<VariableReferenceExpression, VariableReferenceExpression>> markerVariables;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment with an example for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit Thanks. I've added an example in the comment.

return orderBy;
}

static void addPassthroughColumns(ImmutableList.Builder<VariableReferenceExpression> outputVariables,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment about the usage of this function and explanation for each parameter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit Thanks. I've added a comment for the explannation.

@aditi-pandit
Copy link
Contributor

@jaystarshot : PTAL. This code has the main planner changes for Table function.

@xin-zhang2 xin-zhang2 force-pushed the tvf_analyzer_final branch 4 times, most recently from a952a34 to 33da439 Compare November 11, 2025 10:57
* - source T2(a2, b2)
* </pre>
*/
public class ImplementTableFunctionSource
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the naming of this rule is non intuitive, can it be improved? eg
TransformTableFunctionToProcessorNodeRule

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaystarshot Thanks. I've renamed it to TransformTableFunctionToTableFunctionProcessor. What do you think of this name?

Copy link
Member

@jaystarshot jaystarshot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is missing end to end integration (execution) tests, without those its hard to say if the addExchanges etc is correct. Can you please add them

@xin-zhang2 xin-zhang2 force-pushed the tvf_analyzer_final branch 2 times, most recently from 07de1c5 to 1207da6 Compare November 13, 2025 16:32
@xin-zhang2
Copy link
Contributor

@jaystarshot Thanks for the review.
An end to end integration tests requires the changes in LocalExecutionPlanner and TestingTableFunctions which defines all the table functions used for testing. These change are included in commit 92fb07e that were orginally intended for a subsequent PR.
I've now updated this PR to include that commit, along with some optimization changes from #26506 and #26188. This could provide a comprehensive view of the end-to-end code changes and test coverage.

Please let me know if this works well for your review, or if you have any other thoughts. Thanks!

cc @aditi-pandit

@jaystarshot
Copy link
Member

Thanks @xin-zhang2 ,will review this as the source of truth. I might need help reviewing some execution changes later since i would ooo mid next week

@xin-zhang2
Copy link
Contributor

@tdcmeehan Hi Tim, would you mind helping review this PR while Jay is ooo, if possible? Thanks in advance!

PlanOptimizer predicatePushDown = new StatsRecordingPlanOptimizer(optimizerStats, new PredicatePushDown(metadata, sqlParser, expressionOptimizerManager, featuresConfig.isNativeExecutionEnabled()));
PlanOptimizer prefilterForLimitingAggregation = new StatsRecordingPlanOptimizer(optimizerStats, new PrefilterForLimitingAggregation(metadata, statsCalculator));

builder.add(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add comments on the rule/optimizer placement i.e explaining why each of these is placed where it is?

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterators.getOnlyElement;

public class RewriteExcludeColumnsFunctionToProjection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment explaining the rule working like other rules

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've added a comment to explain the plan change.

@Override
public Result apply(TableFunctionProcessorNode node, Captures captures, Context context)
{
if (!(node.getHandle().getFunctionHandle() instanceof ExcludeColumnsFunctionHandle)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's good practice to add optimizer rules for each individual table function. This sets a precedent where every built-in TVF could add its own rewrite rule to PlanOptimizers. Can this come via connectorOptimizer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaystarshot Agree... I want to add SPI/connectorOptimizer kind of framework for Table Functions.

We ported the entire Trino implementation to Presto, so this came along with it. But I would be fine to remove this code.

wdyt ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaystarshot @aditi-pandit
This may not be introducing a new precedent, since PlanOptimizers already contains several function-specific rules. For example, CombineApproxPercentileFunctions applies only to approx_percentile, and MinMaxByToWindowFunction applies only to min_by and max_by.

The Built-in TVFs are registered in GlobalSystemConnector, and we don't have a connectorOptimizer for it yet.
Still, I agree that it would be better to place such rules in the optimizer of the connector that registers the function.

import static com.google.common.collect.Maps.filterKeys;

/**
* This rule prunes unreferenced outputs of TableFunctionProcessorNode.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add some comments showing the plan changes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've added a comment to explain the plan change.

import static com.google.common.collect.ImmutableList.toImmutableList;

/**
* TableFunctionProcessorNode has two kinds of outputs:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment on the rule changes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've added a comment to explain the plan change.


node.getHashSymbol().ifPresent(requiredInputs::add);

Optional<Map<VariableReferenceExpression, VariableReferenceExpression>> updatedMarkerSymbols = node.getMarkerVariables()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment before the marker related updates explaining them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've added a detailed comment in TableFunctionProcessorNode to explain the markers, as @aditi-pandit suggested in #26445 (comment).
Here, the markers are updated by removing the unreferenced columns, and the logic simply checks whether each marker is present in the requiredInputs. Since this is farirly straightforward, adding a separate comment for it might not be necessary.

BOOLEAN,
ImmutableList.of(
new CallExpression(IS_DISTINCT_FROM.name(),
functionResolution.comparisonFunction(IS_DISTINCT_FROM, INTEGER, INTEGER),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this harcoded to INTEGER? It should be between left and righT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. It should be the left and right column type for IS_DISTINCT_FROM.
I've updated it to leftColumn.getType() and rightColumn.getType().

* Example transformation for two sources, both with set semantics
* and KEEP WHEN EMPTY property:
* <pre>
* - TableFunction foo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add some comment explaining why we need the marker approach? I think that is pretty non intuitive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Please see #26445 (comment) for the usage of the marker.

Copy link
Member

@jaystarshot jaystarshot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a pass at all rules except TransformTableFunctionProcessorNode. Next will review that and other optimizer changes

mohsaka and others added 8 commits November 17, 2025 16:29
…nPlanner and ExcludeColumns optimizer rule.

Co-authored-by: kasiafi <[email protected]>
Co-authored-by: Xin Zhang <[email protected]>
Co-authored-by: kasiafi <[email protected]>
Co-authored-by: mohsaka <[email protected]>
Changes adapted from trino/PR#16584
Author: kasiafi

Co-authored-by: kasiafi <[email protected]>
Co-authored-by: Xin Zhang <[email protected]>
Changes adapted from trino/PR#16716
Author: kasiafi

Co-authored-by: kasiafi <[email protected]>
Changes adapted from trino/PR#25493
Author: kasiafi

Co-authored-by: kasiafi <[email protected]>
* - source T2(a2, b2)
* </pre>
*/
public class TransformTableFunctionToTableFunctionProcessor
Copy link
Member

@jaystarshot jaystarshot Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies if I have misread, but from this optimizer it looks like it's handling the case where co-partitioned tables have different numbers of rows for the same partition key by always matching overflow rows with row 1 of the shorter table.

SELECT *
FROM TABLE(
    analytics.correlate_sales(
        orders => TABLE(orders) t1(product_id, amount)
                  PARTITION BY product_id,
        
        inventory => TABLE(inventory) t2(product_id, stock)
                     PARTITION BY product_id
        
        COPARTITION (t1, t2)
    )
);

Given Data:

orders (3 rows for product_id=101):

product_id | amount
-----------+--------
    101    |   500
    101    |   300
    101    |   200

inventory (5 rows for product_id=101):

product_id | stock
-----------+-------
    101    |   50
    101    |   40
    101    |   30
    101    |   20
    101    |   10

After Window Functions (internal):

orders with row numbers:

product_id | amount | row_number | partition_size
-----------+--------+------------+----------------
    101    |   500  |     1      |       3
    101    |   300  |     2      |       3
    101    |   200  |     3      |       3

inventory with row numbers:

product_id | stock | row_number | partition_size
-----------+-------+------------+----------------
    101    |   50  |     1      |       5
    101    |   40  |     2      |       5
    101    |   30  |     3      |       5
    101    |   20  |     4      |       5
    101    |   10  |     5      |       5

Current code

combined_row | orders_rn | amount | inventory_rn | stock | Explanation
-------------+-----------+--------+--------------+-------+---------------------------
      1      |     1     |   500  |      1       |   50  | ✓ Normal match (1=1)
      2      |     2     |   300  |      2       |   40  | ✓ Normal match (2=2)
      3      |     3     |   200  |      3       |   30  | ✓ Normal match (3=3)
      4      |     1     |   500  |      4       |   20  | ← Overflow: repeats orders row 1
      5      |     1     |   500  |      5       |   10  | ← Overflow: repeats orders row 1

Some high level questions

  1. Why always use row 1 for overflow?

  2. Is this behavior documented or SQL standard?

  3. Have alternative approaches been considered? For example:

    • Cycling through rows (modulo-based matching)
    • Enforcing distinct partitions**

I'm asking because this behavior is a core design decision that affects how users reason about table function semantics, and it would be helpful to understand the rationale especially since it's not immediately obvious from the code or comments why row 1 was chosen specifically and I don't think the user has any idea of this behavior.

If its easier and if the behavior is not sql standard anyway then maybe we can just enforce distinct partitions for now / throw an error

Copy link
Member

@jaystarshot jaystarshot Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a UX perspective, I'm not even sure users understand the row-number alignment behavior. When they write PARTITION BY product_id, they likely think 'each product_id is one group' (like GROUP BY), not 'product_id can have multiple rows that get matched by position.' If a user specifies PARTITION BY and COPARTITION, I would expect they want/assume distinct partitions (each partition key appears at most once).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaystarshot Thanks a lot for the detailed review, really appreciate it.
This optimizer rule is ported from Trino and is quite complex, so let me try to explain it. @aditi-pandit @mohsaka Please correct me if I'm mistaken anywhere.

This rule transforms a TableFunctionNode into a TableFunctionProcessorNode. For functions with no table arguments or only a single table argument, the transformation is straightforward and requires little preprocessing.

However, for functions with multiple table arguments, the rule transforms co-partitioned tables into an outer join of their partitions, and transforms non-co-partitioned tables into a cross join of their partitions.
The join combines rows from each table’s partition by matching their row numbers. When partitions differ in size, it uses a special condition that repeats the first row of the smaller partition to align with the remaining rows of the larger partition—creating filler rows.
Two marker columns are added to indicate which original row of which table each joined row comes from. Rows with null markers represent filler rows.

In your example above, the join result is like

combined_row | orders_rn | amount | orders_mk | inventory_rn | stock | inventory_mk | Explanation
-------------+-----------+--------+-----------+--------------+-------+--------------+-------------------------------
      1      |     1     |   500  |     1     |      1       |   50  |       1      | ✓ Normal match (1 = 1)
      2      |     2     |   300  |     2     |      2       |   40  |       2      | ✓ Normal match (2 = 2)
      3      |     3     |   200  |     3     |      3       |   30  |       3      | ✓ Normal match (3 = 3)
      4      |     1     |   500  |    null   |      4       |   20  |       4      | ← Filler: repeats orders row 1
      5      |     1     |   500  |    null   |      5       |   10  |       5      | ← Filler: repeats orders row 1

inventory_mk shows that all five rows are real data from the inventory table. orders_mk, on the other hand, shows that the last two rows are filler rows for the orders table. These filler rows will be ignored when processing the orders data later (In RegularTableFunctionPartition, the marker channels are used to identifiy the end of data for each input).

The purpose of this join is NOT to produce the table function’s output. In fact, it has nothing to do with the output of the table function. It exists solely to align the partitions of the input tables, so that each partition can be processed independently later in the table function operators.

The SQL standard requires creating a virutal processor for each partition, and the union of the results from all processors becomes the output of the table function. Therefore, the actual output of the table function depends entirely on how each processor handles the data from input tabels. Each function should implement the TableFunctionDataProcessor.process interface, which takes a list of pages (one from each table) and produces the result page. This processing logic is function-specific.

Regarding your questions,

  1. We use the first row of the smaller partition as the filler, but it doesn't matter which row is used, since filler rows are ignored later.
  2. It's not part of the SQL standared, but rahter a technique to satisfy the standard requirements. It doesn't affect the actual output, and should be completely transparent to users.
  3. As mentioned in 1, the filler rows doesn't matter.

Please let know if this clarifies things or if you have any further questions. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-zhang2 That is correct. I would like to point out the location of the comment here.

        // For each source, all source's output symbols are mapped to the source's row number symbol.
        // The row number symbol will be later converted to a marker of "real" input rows vs "filler" input rows of the source.
        // The "filler" input rows are the rows appended while joining partitions of different lengths,
        // to fill the smaller partition up to the bigger partition's size. They are a side effect of the algorithm,
        // and should not be processed by the table function.
        Map<VariableReferenceExpression, VariableReferenceExpression> rowNumberSymbols = finalResultSource.rowNumberSymbolsMapping();

This explains the reason for filling them with row 1. and also this next comment.

        // Remap the symbol mapping: replace the row number symbol with the corresponding marker symbol.
        // In the new map, every source symbol is associated with the corresponding marker symbol.
        // Null value of the marker indicates that the source value should be ignored by the table function.
        ImmutableMap<VariableReferenceExpression, VariableReferenceExpression> markerSymbols = rowNumberSymbols.entrySet().stream()
                .collect(toImmutableMap(Map.Entry::getKey, entry -> marked.variableToMarker().get(entry.getValue())));

This markerSymbol represents null if the the row numbers do not match. It is used in the operator to represent the end of the processable data.

RegularTableFunctionPartition.java

    // for each input channel, the end position of actual data in that channel (exclusive) relative to partition. The remaining rows are "filler" rows, and should not be passed to table function or passed-through
    private final int[] endOfData;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:IBM PR from IBM

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants