Skip to content

Commit e68143a

Browse files
authored
fix(optimizer): Fix local exchange for merge join (#26590)
## Description Current code will try to add a round robin local exchange below the merge join node, which will break the sorted property of the input. In this PR, we fixed it. ## Motivation and Context Bug fix ## Impact Bug fix ## Test Plan Unit test ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. - [ ] If adding new dependencies, verified they have an [OpenSSF Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or higher (or obtained explicit TSC approval for lower scores). ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == NO RELEASE NOTE == ```
1 parent 7782e51 commit e68143a

File tree

3 files changed

+17
-17
lines changed

3 files changed

+17
-17
lines changed

presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.presto.spi.plan.JoinType;
1818
import com.facebook.presto.sql.analyzer.FeaturesConfig;
1919
import com.facebook.presto.sql.planner.assertions.PlanMatchPattern;
20+
import com.facebook.presto.sql.planner.plan.ExchangeNode;
2021
import com.facebook.presto.testing.QueryRunner;
2122
import com.facebook.presto.tests.AbstractTestQueryFramework;
2223
import com.google.common.collect.ImmutableList;
@@ -37,10 +38,13 @@
3738
import static com.facebook.presto.spi.plan.JoinType.RIGHT;
3839
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
3940
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause;
41+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange;
4042
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join;
4143
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.mergeJoin;
4244
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.sort;
4345
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;
46+
import static com.facebook.presto.sql.tree.SortItem.NullOrdering.FIRST;
47+
import static com.facebook.presto.sql.tree.SortItem.Ordering.ASCENDING;
4448
import static io.airlift.tpch.TpchTable.CUSTOMER;
4549
import static io.airlift.tpch.TpchTable.LINE_ITEM;
4650
import static io.airlift.tpch.TpchTable.NATION;
@@ -179,7 +183,8 @@ public void testDifferentBucketedByKey()
179183
anyTree(
180184
mergeJoin(INNER, ImmutableList.of(equiJoinClause("custkey_l", "custkey_r")),
181185
Optional.empty(),
182-
anyTree(sort(anyTree(tableScan("test_join_customer2", ImmutableMap.of("custkey_l", "custkey"))))),
186+
exchange(ExchangeNode.Scope.LOCAL, ExchangeNode.Type.GATHER, ImmutableList.of(sort("custkey_l", ASCENDING, FIRST)),
187+
sort(anyTree(tableScan("test_join_customer2", ImmutableMap.of("custkey_l", "custkey"))))),
183188
tableScan("test_join_order2", ImmutableMap.of("custkey_r", "custkey")))));
184189
}
185190
finally {
@@ -211,7 +216,8 @@ public void testDifferentSortByKey()
211216
anyTree(
212217
mergeJoin(INNER, ImmutableList.of(equiJoinClause("custkey_l", "custkey_r")),
213218
Optional.empty(),
214-
anyTree(sort(tableScan("test_join_customer3", ImmutableMap.of("custkey_l", "custkey")))),
219+
exchange(ExchangeNode.Scope.LOCAL, ExchangeNode.Type.GATHER, ImmutableList.of(sort("custkey_l", ASCENDING, FIRST)),
220+
sort(tableScan("test_join_customer3", ImmutableMap.of("custkey_l", "custkey")))),
215221
tableScan("test_join_order3", ImmutableMap.of("custkey_r", "custkey")))));
216222
}
217223
finally {

presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled;
8080
import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled;
8181
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
82-
import static com.facebook.presto.SystemSessionProperties.preferSortMergeJoin;
8382
import static com.facebook.presto.common.type.BigintType.BIGINT;
8483
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
8584
import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference;
@@ -893,12 +892,11 @@ public PlanWithProperties visitSpatialJoin(SpatialJoinNode node, StreamPreferred
893892
@Override
894893
public PlanWithProperties visitMergeJoin(MergeJoinNode node, StreamPreferredProperties parentPreferences)
895894
{
896-
if (preferSortMergeJoin(session)) {
897-
PlanWithProperties probe = planAndEnforce(node.getLeft(), singleStream(), singleStream());
898-
PlanWithProperties build = planAndEnforce(node.getRight(), singleStream(), singleStream());
899-
return rebaseAndDeriveProperties(node, ImmutableList.of(probe, build));
900-
}
901-
return super.visitMergeJoin(node, parentPreferences);
895+
// The optimizer rule MergeJoinForSortedInputOptimizer and SortMergeJoinOptimizer which add the merge join node is responsible to ensure the input of the merge join is sorted.
896+
// Here we use `any().withOrderSensitivity()` meaning respect the input distribution of the input and keep the input order.
897+
PlanWithProperties probe = planAndEnforce(node.getLeft(), any().withOrderSensitivity(), any().withOrderSensitivity());
898+
PlanWithProperties build = planAndEnforce(node.getRight(), any().withOrderSensitivity(), any().withOrderSensitivity());
899+
return rebaseAndDeriveProperties(node, ImmutableList.of(probe, build));
902900
}
903901

904902
@Override

presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -558,8 +558,7 @@ public void testSortMergeJoin()
558558
preferSortMergeJoin,
559559
anyTree(
560560
mergeJoin(INNER, ImmutableList.of(equiJoinClause("ORDERS_OK", "LINEITEM_PK")), Optional.empty(),
561-
exchange(LOCAL, GATHER, ImmutableList.of(),
562-
tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey"))),
561+
tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey")),
563562
sort(
564563
ImmutableList.of(sort("LINEITEM_PK", ASCENDING, FIRST)),
565564
exchange(LOCAL, GATHER, ImmutableList.of(),
@@ -574,18 +573,15 @@ public void testSortMergeJoin()
574573
ImmutableList.of(sort("ORDERS_CK", ASCENDING, FIRST)),
575574
exchange(LOCAL, GATHER, ImmutableList.of(),
576575
tableScan("orders", ImmutableMap.of("ORDERS_CK", "custkey")))),
577-
exchange(LOCAL, GATHER, ImmutableList.of(),
578-
tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey"))))));
576+
tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey")))));
579577

580578
// Both sides are sorted.
581579
assertPlan("SELECT o.orderkey FROM orders o INNER JOIN lineitem l ON o.orderkey = l.orderkey",
582580
preferSortMergeJoin,
583581
anyTree(
584582
mergeJoin(INNER, ImmutableList.of(equiJoinClause("ORDERS_OK", "LINEITEM_OK")), Optional.empty(),
585-
exchange(LOCAL, GATHER, ImmutableList.of(),
586-
tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey"))),
587-
exchange(LOCAL, GATHER, ImmutableList.of(),
588-
tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey"))))));
583+
tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey")),
584+
tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey")))));
589585
}
590586

591587
@Test

0 commit comments

Comments
 (0)