-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37814][table-planner] Add FlinkRightJoinToLeftJoinRule #26627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
d74d786
7c8dc2c
a93d238
7182aa3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.planner.plan.rules.logical; | ||
|
||
import org.apache.calcite.plan.RelOptRuleCall; | ||
import org.apache.calcite.plan.RelOptUtil; | ||
import org.apache.calcite.plan.RelRule; | ||
import org.apache.calcite.rel.RelNode; | ||
import org.apache.calcite.rel.core.Join; | ||
import org.apache.calcite.rel.core.JoinRelType; | ||
import org.apache.calcite.rel.logical.LogicalJoin; | ||
import org.apache.calcite.rel.rules.TransformationRule; | ||
import org.apache.calcite.rex.RexNode; | ||
import org.apache.calcite.tools.RelBuilder; | ||
import org.apache.calcite.tools.RelBuilderFactory; | ||
import org.immutables.value.Value; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* Rule to convert RIGHT JOIN to LEFT JOIN since Multi-Way Join operator works only with LEFT joins. | ||
*/ | ||
@Value.Enclosing | ||
public class FlinkRightJoinToLeftJoinRule extends RelRule<FlinkRightJoinToLeftJoinRule.Config> | ||
implements TransformationRule { | ||
|
||
public static final FlinkRightJoinToLeftJoinRule INSTANCE = | ||
FlinkRightJoinToLeftJoinRule.Config.DEFAULT.toRule(); | ||
|
||
/** Creates a FlinkRightJoinToLeftJoinRule. */ | ||
public FlinkRightJoinToLeftJoinRule(FlinkRightJoinToLeftJoinRule.Config config) { | ||
super(config); | ||
} | ||
|
||
@Deprecated // to be removed before 2.0 | ||
public FlinkRightJoinToLeftJoinRule(Class<? extends Join> clazz) { | ||
this(FlinkRightJoinToLeftJoinRule.Config.DEFAULT.withOperandFor(clazz)); | ||
} | ||
|
||
@Deprecated // to be removed before 2.0 | ||
public FlinkRightJoinToLeftJoinRule( | ||
Class<? extends Join> joinClass, RelBuilderFactory relBuilderFactory) { | ||
this( | ||
FlinkRightJoinToLeftJoinRule.Config.DEFAULT | ||
.withRelBuilderFactory(relBuilderFactory) | ||
.as(FlinkRightJoinToLeftJoinRule.Config.class) | ||
.withOperandFor(joinClass)); | ||
} | ||
|
||
@Override | ||
public boolean matches(RelOptRuleCall call) { | ||
Join origJoin = call.rel(0); | ||
return origJoin.getJoinType() == JoinRelType.RIGHT; | ||
} | ||
|
||
@Override | ||
public void onMatch(RelOptRuleCall call) { | ||
Join origJoin = call.rel(0); | ||
RelNode left = call.rel(1); | ||
dawidwys marked this conversation as resolved.
Show resolved
Hide resolved
|
||
RelNode right = call.rel(2); | ||
|
||
RexNode newCondition = shiftCondition(origJoin, left, right); | ||
|
||
Join leftJoin = | ||
origJoin.copy( | ||
origJoin.getTraitSet(), newCondition, right, left, JoinRelType.LEFT, false); | ||
|
||
RelBuilder relBuilder = call.builder(); | ||
relBuilder.push(leftJoin); | ||
|
||
RelNode project = reorderProjectedFields(left, right, relBuilder); | ||
|
||
call.transformTo(project); | ||
} | ||
|
||
private RelNode reorderProjectedFields(RelNode left, RelNode right, RelBuilder relBuilder) { | ||
int nFieldsOnLeft = left.getRowType().getFieldList().size(); | ||
int nFieldsOnRight = right.getRowType().getFieldList().size(); | ||
List<RexNode> reorderedFields = new ArrayList<>(); | ||
|
||
for (int i = 0; i < nFieldsOnLeft; i++) { | ||
reorderedFields.add(relBuilder.field(i + nFieldsOnRight)); | ||
} | ||
|
||
for (int i = 0; i < nFieldsOnRight; i++) { | ||
reorderedFields.add(relBuilder.field(i)); | ||
} | ||
|
||
return relBuilder.project(reorderedFields).build(); | ||
} | ||
|
||
private RexNode shiftCondition(Join joinRel, RelNode left, RelNode right) { | ||
RexNode condition = joinRel.getCondition(); | ||
int nFieldsOnLeft = left.getRowType().getFieldList().size(); | ||
int nFieldsOnRight = right.getRowType().getFieldList().size(); | ||
int[] adjustments = new int[nFieldsOnLeft + nFieldsOnRight]; | ||
for (int i = 0; i < nFieldsOnLeft + nFieldsOnRight; i++) { | ||
adjustments[i] = i < nFieldsOnLeft ? nFieldsOnRight : -nFieldsOnLeft; | ||
} | ||
|
||
return condition.accept( | ||
new RelOptUtil.RexInputConverter( | ||
joinRel.getCluster().getRexBuilder(), | ||
joinRel.getRowType().getFieldList(), | ||
adjustments)); | ||
} | ||
|
||
/** Rule configuration. */ | ||
@Value.Immutable(singleton = false) | ||
public interface Config extends RelRule.Config { | ||
FlinkRightJoinToLeftJoinRule.Config DEFAULT = | ||
ImmutableFlinkRightJoinToLeftJoinRule.Config.builder() | ||
.build() | ||
.as(FlinkRightJoinToLeftJoinRule.Config.class) | ||
.withOperandFor(LogicalJoin.class); | ||
|
||
@Override | ||
default FlinkRightJoinToLeftJoinRule toRule() { | ||
return new FlinkRightJoinToLeftJoinRule(this); | ||
} | ||
|
||
/** Defines an operand tree for the given classes. */ | ||
default FlinkRightJoinToLeftJoinRule.Config withOperandFor( | ||
Class<? extends Join> joinClass) { | ||
return withOperandSupplier( | ||
b0 -> | ||
b0.operand(joinClass) | ||
.inputs( | ||
b1 -> b1.operand(RelNode.class).anyInputs(), | ||
b2 -> b2.operand(RelNode.class).anyInputs())) | ||
.as(FlinkRightJoinToLeftJoinRule.Config.class); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
<?xml version="1.0" ?> | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to you under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
<Root> | ||
<TestCase name="testRightJoinIsConverted"> | ||
<Resource name="sql"> | ||
<![CDATA[SELECT * FROM T1 RIGHT JOIN T2 ON a = c]]> | ||
</Resource> | ||
<Resource name="ast"> | ||
<![CDATA[ | ||
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) | ||
+- LogicalJoin(condition=[=($0, $2)], joinType=[right]) | ||
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
]]> | ||
</Resource> | ||
<Resource name="optimized rel plan"> | ||
<![CDATA[ | ||
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) | ||
+- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2]) | ||
+- LogicalJoin(condition=[=($3, $0)], joinType=[left]) | ||
:- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
]]> | ||
</Resource> | ||
</TestCase> | ||
|
||
<TestCase name="testJoinChainIsConverted"> | ||
<Resource name="sql"> | ||
<![CDATA[SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f]]> | ||
</Resource> | ||
<Resource name="ast"> | ||
<![CDATA[ | ||
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6]) | ||
+- LogicalJoin(condition=[=($0, $5)], joinType=[right]) | ||
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) | ||
: +- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) | ||
: :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
: +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T3]]) | ||
]]> | ||
</Resource> | ||
<Resource name="optimized rel plan"> | ||
<![CDATA[ | ||
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6]) | ||
+- LogicalProject(a=[$2], b=[$3], c=[$4], d=[$5], e=[$6], f=[$0], g=[$1]) | ||
+- LogicalJoin(condition=[=($2, $0)], joinType=[left]) | ||
:- LogicalTableScan(table=[[default_catalog, default_database, T3]]) | ||
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) | ||
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) | ||
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
]]> | ||
</Resource> | ||
</TestCase> | ||
|
||
<TestCase name="testLeftJoinRemainsUnchanged"> | ||
<Resource name="sql"> | ||
<![CDATA[SELECT * FROM T1 LEFT JOIN T2 ON a = c]]> | ||
</Resource> | ||
<Resource name="ast"> | ||
<![CDATA[ | ||
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) | ||
+- LogicalJoin(condition=[=($0, $2)], joinType=[left]) | ||
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
]]> | ||
</Resource> | ||
<Resource name="optimized rel plan"> | ||
<![CDATA[ | ||
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) | ||
+- LogicalJoin(condition=[=($0, $2)], joinType=[left]) | ||
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
]]> | ||
</Resource> | ||
</TestCase> | ||
|
||
<TestCase name="testRightJoinWithExpressionCondition"> | ||
<Resource name="sql"> | ||
<![CDATA[SELECT * FROM T1 RIGHT JOIN T2 ON a + 1 = c - 1]]> | ||
</Resource> | ||
<Resource name="ast"> | ||
<![CDATA[ | ||
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) | ||
+- LogicalProject(a=[$0], b=[$1], c=[$3], d=[$4], e=[$5]) | ||
+- LogicalJoin(condition=[=($2, $6)], joinType=[right]) | ||
:- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)]) | ||
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
+- LogicalProject(c=[$0], d=[$1], e=[$2], $f3=[-($0, 1)]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
]]> | ||
</Resource> | ||
<Resource name="optimized rel plan"> | ||
<![CDATA[ | ||
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) | ||
+- LogicalProject(a=[$0], b=[$1], c=[$3], d=[$4], e=[$5]) | ||
+- LogicalProject(a=[$4], b=[$5], $f2=[$6], c=[$0], d=[$1], e=[$2], $f3=[$3]) | ||
+- LogicalJoin(condition=[=($6, $3)], joinType=[left]) | ||
:- LogicalProject(c=[$0], d=[$1], e=[$2], $f3=[-($0, 1)]) | ||
: +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
+- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
]]> | ||
</Resource> | ||
</TestCase> | ||
|
||
<TestCase name="testRightJoinWithProjection"> | ||
<Resource name="sql"> | ||
<![CDATA[SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c]]> | ||
</Resource> | ||
<Resource name="ast"> | ||
<![CDATA[ | ||
LogicalProject(b=[$1], d=[$3]) | ||
+- LogicalJoin(condition=[=($0, $2)], joinType=[right]) | ||
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
]]> | ||
</Resource> | ||
<Resource name="optimized rel plan"> | ||
<![CDATA[ | ||
LogicalProject(b=[$1], d=[$3]) | ||
+- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are indexes here for the same key (d=[$1] and d=[$3]) different? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we want to actually reorder the projected fields since the user expects the query to be returned as they wrote it? What we actually want is to make sure the indexes for the projected fields are correct There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we call transformTo on RelOptRuleCall calcite verifies that output record before transformation is the same as after applying. Without projection it will look like this:
Since I swap left and right input output record field order is changed. So I had to add this projection to return field order back to normal.
Say we have T1 with field a, T2 with field b and T3 with field c I think I could create a child class from calcite's Join, something like ConvertedJoin, to override getRowType. But I decided to avoid it. Let me know what do you think! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense! Thanks for the reply :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know when you address the other points. If you have a draft PR for the other ticket, send it my way and I'll try to work this weekend. I have the PR for the exec node open and might have to slightly change the joinAttributeMap coming from the logical node #26647 (comment) |
||
+- LogicalJoin(condition=[=($3, $0)], joinType=[left]) | ||
:- LogicalTableScan(table=[[default_catalog, default_database, T2]]) | ||
+- LogicalTableScan(table=[[default_catalog, default_database, T1]]) | ||
]]> | ||
</Resource> | ||
</TestCase> | ||
</Root> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please write the test in .java? We're trying to move away from scala and implement new things in java |
||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.flink.table.planner.plan.rules.logical | ||
|
||
import org.apache.flink.table.api._ | ||
import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE} | ||
import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase} | ||
|
||
import org.apache.calcite.plan.hep.HepMatchOrder | ||
import org.apache.calcite.tools.RuleSets | ||
import org.junit.jupiter.api.{BeforeEach, Test} | ||
|
||
/** Tests for [[FlinkRightJoinToLeftJoinRule]]. */ | ||
class FlinkRightJoinToLeftJoinRuleTest extends TableTestBase { | ||
private val util = batchTestUtil() | ||
|
||
@BeforeEach | ||
def setup(): Unit = { | ||
util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use streaming for the setup? |
||
val calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv.getConfig) | ||
calciteConfig.getBatchProgram.get.addLast( | ||
"right-to-left-rules", | ||
FlinkHepRuleSetProgramBuilder.newBuilder | ||
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) | ||
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP) | ||
.add(RuleSets.ofList(FlinkRightJoinToLeftJoinRule.INSTANCE)) | ||
.build() | ||
) | ||
|
||
util.addTableSource[(Int, Long)]("T1", 'a, 'b) | ||
util.addTableSource[(Int, Long, Long)]("T2", 'c, 'd, 'e) | ||
util.addTableSource[(Int, Long)]("T3", 'f, 'g) | ||
} | ||
|
||
@Test | ||
def testRightJoinIsConverted(): Unit = { | ||
val sqlQuery = "SELECT * FROM T1 RIGHT JOIN T2 ON a = c" | ||
util.verifyRelPlan(sqlQuery) | ||
} | ||
|
||
@Test | ||
def testJoinChainIsConverted(): Unit = { | ||
val sqlQuery = "SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice test |
||
util.verifyRelPlan(sqlQuery) | ||
} | ||
|
||
@Test | ||
def testLeftJoinRemainsUnchanged(): Unit = { | ||
val sqlQuery = "SELECT * FROM T1 LEFT JOIN T2 ON a = c" | ||
util.verifyRelPlan(sqlQuery) | ||
} | ||
|
||
@Test | ||
def testRightJoinWithExpressionCondition(): Unit = { | ||
val sqlQuery = "SELECT * FROM T1 RIGHT JOIN T2 ON a + 1 = c - 1" | ||
util.verifyRelPlan(sqlQuery) | ||
} | ||
|
||
@Test | ||
def testRightJoinWithProjection(): Unit = { | ||
val sqlQuery = "SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c" | ||
util.verifyRelPlan(sqlQuery) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please add one test for multiple chained right joins? T1 RIGHT JOIN T2 RIGHT JOIN T3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume we are copying code that has deprecated methods that should have been removed for version 2.
Do we need to add these deprecated methods?
At the least we should remove the comment as 2.0 is now out - so anything that was not removed will not be until another version change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the remark! I removed these methods because in this case they are not really needed.