Skip to content

[FLINK-37814][table-planner] Add FlinkRightJoinToLeftJoinRule #26627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.rules.TransformationRule;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
import org.immutables.value.Value;

import java.util.ArrayList;
import java.util.List;

/**
* Rule to convert RIGHT JOIN to LEFT JOIN since Multi-Way Join operator works only with LEFT joins.
*/
@Value.Enclosing
public class FlinkRightJoinToLeftJoinRule extends RelRule<FlinkRightJoinToLeftJoinRule.Config>
implements TransformationRule {

public static final FlinkRightJoinToLeftJoinRule INSTANCE =
FlinkRightJoinToLeftJoinRule.Config.DEFAULT.toRule();

/** Creates a FlinkRightJoinToLeftJoinRule. */
public FlinkRightJoinToLeftJoinRule(FlinkRightJoinToLeftJoinRule.Config config) {
super(config);
}

@Deprecated // to be removed before 2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we are copying code that has deprecated methods that should have been removed for version 2.

Do we need to add these deprecated methods?

At the least we should remove the comment as 2.0 is now out - so anything that was not removed will not be until another version change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the remark! I removed these methods because in this case they are not really needed.

public FlinkRightJoinToLeftJoinRule(Class<? extends Join> clazz) {
this(FlinkRightJoinToLeftJoinRule.Config.DEFAULT.withOperandFor(clazz));
}

@Deprecated // to be removed before 2.0
public FlinkRightJoinToLeftJoinRule(
Class<? extends Join> joinClass, RelBuilderFactory relBuilderFactory) {
this(
FlinkRightJoinToLeftJoinRule.Config.DEFAULT
.withRelBuilderFactory(relBuilderFactory)
.as(FlinkRightJoinToLeftJoinRule.Config.class)
.withOperandFor(joinClass));
}

@Override
public boolean matches(RelOptRuleCall call) {
Join origJoin = call.rel(0);
return origJoin.getJoinType() == JoinRelType.RIGHT;
}

@Override
public void onMatch(RelOptRuleCall call) {
Join origJoin = call.rel(0);
RelNode left = call.rel(1);
RelNode right = call.rel(2);

RexNode newCondition = shiftCondition(origJoin, left, right);

Join leftJoin =
origJoin.copy(
origJoin.getTraitSet(), newCondition, right, left, JoinRelType.LEFT, false);

RelBuilder relBuilder = call.builder();
relBuilder.push(leftJoin);

RelNode project = reorderProjectedFields(left, right, relBuilder);

call.transformTo(project);
}

private RelNode reorderProjectedFields(RelNode left, RelNode right, RelBuilder relBuilder) {
int nFieldsOnLeft = left.getRowType().getFieldList().size();
int nFieldsOnRight = right.getRowType().getFieldList().size();
List<RexNode> reorderedFields = new ArrayList<>();

for (int i = 0; i < nFieldsOnLeft; i++) {
reorderedFields.add(relBuilder.field(i + nFieldsOnRight));
}

for (int i = 0; i < nFieldsOnRight; i++) {
reorderedFields.add(relBuilder.field(i));
}

return relBuilder.project(reorderedFields).build();
}

private RexNode shiftCondition(Join joinRel, RelNode left, RelNode right) {
RexNode condition = joinRel.getCondition();
int nFieldsOnLeft = left.getRowType().getFieldList().size();
int nFieldsOnRight = right.getRowType().getFieldList().size();
int[] adjustments = new int[nFieldsOnLeft + nFieldsOnRight];
for (int i = 0; i < nFieldsOnLeft + nFieldsOnRight; i++) {
adjustments[i] = i < nFieldsOnLeft ? nFieldsOnRight : -nFieldsOnLeft;
}

return condition.accept(
new RelOptUtil.RexInputConverter(
joinRel.getCluster().getRexBuilder(),
joinRel.getRowType().getFieldList(),
adjustments));
}

/** Rule configuration. */
@Value.Immutable(singleton = false)
public interface Config extends RelRule.Config {
FlinkRightJoinToLeftJoinRule.Config DEFAULT =
ImmutableFlinkRightJoinToLeftJoinRule.Config.builder()
.build()
.as(FlinkRightJoinToLeftJoinRule.Config.class)
.withOperandFor(LogicalJoin.class);

@Override
default FlinkRightJoinToLeftJoinRule toRule() {
return new FlinkRightJoinToLeftJoinRule(this);
}

/** Defines an operand tree for the given classes. */
default FlinkRightJoinToLeftJoinRule.Config withOperandFor(
Class<? extends Join> joinClass) {
return withOperandSupplier(
b0 ->
b0.operand(joinClass)
.inputs(
b1 -> b1.operand(RelNode.class).anyInputs(),
b2 -> b2.operand(RelNode.class).anyInputs()))
.as(FlinkRightJoinToLeftJoinRule.Config.class);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
<TestCase name="testRightJoinIsConverted">
<Resource name="sql">
<![CDATA[SELECT * FROM T1 RIGHT JOIN T2 ON a = c]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
+- LogicalJoin(condition=[=($0, $2)], joinType=[right])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
+- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2])
+- LogicalJoin(condition=[=($3, $0)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
</TestCase>

<TestCase name="testJoinChainIsConverted">
<Resource name="sql">
<![CDATA[SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6])
+- LogicalJoin(condition=[=($0, $5)], joinType=[right])
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
: +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
: :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+- LogicalTableScan(table=[[default_catalog, default_database, T3]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6])
+- LogicalProject(a=[$2], b=[$3], c=[$4], d=[$5], e=[$6], f=[$0], g=[$1])
+- LogicalJoin(condition=[=($2, $0)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
</TestCase>

<TestCase name="testLeftJoinRemainsUnchanged">
<Resource name="sql">
<![CDATA[SELECT * FROM T1 LEFT JOIN T2 ON a = c]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
+- LogicalJoin(condition=[=($0, $2)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
+- LogicalJoin(condition=[=($0, $2)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
</TestCase>

<TestCase name="testRightJoinWithExpressionCondition">
<Resource name="sql">
<![CDATA[SELECT * FROM T1 RIGHT JOIN T2 ON a + 1 = c - 1]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[$3], d=[$4], e=[$5])
+- LogicalJoin(condition=[=($2, $6)], joinType=[right])
:- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalProject(c=[$0], d=[$1], e=[$2], $f3=[-($0, 1)])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[$3], d=[$4], e=[$5])
+- LogicalProject(a=[$4], b=[$5], $f2=[$6], c=[$0], d=[$1], e=[$2], $f3=[$3])
+- LogicalJoin(condition=[=($6, $3)], joinType=[left])
:- LogicalProject(c=[$0], d=[$1], e=[$2], $f3=[-($0, 1)])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
</TestCase>

<TestCase name="testRightJoinWithProjection">
<Resource name="sql">
<![CDATA[SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(b=[$1], d=[$3])
+- LogicalJoin(condition=[=($0, $2)], joinType=[right])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(b=[$1], d=[$3])
+- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are indexes here for the same key (d=[$1] and d=[$3]) different?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to actually reorder the projected fields since the user expects the query to be returned as they wrote it? What we actually want is to make sure the indexes for the projected fields are correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we call transformTo on RelOptRuleCall calcite verifies that output record before transformation is the same as after applying. Without projection it will look like this:

Type mismatch:
rowtype of original rel: RecordType(INTEGER a, BIGINT b, INTEGER c, BIGINT d, BIGINT e) NOT NULL
rowtype of new rel: RecordType(INTEGER c, BIGINT d, BIGINT e, INTEGER a, BIGINT b) NOT NULL
Difference:
c: INTEGER -> BIGINT
d: BIGINT -> INTEGER

Since I swap left and right input output record field order is changed. So I had to add this projection to return field order back to normal.

RelNode project = reorderProjectedFields(left, right, relBuilder);

Say we have T1 with field a, T2 with field b and T3 with field c
(T1 join T2) right join T3 without multijoin will return abc
but with multijoin it will be converted to T3 left join (T1 join T2) ===> multijoin(T3, T1, T2) which will return cab.
So the projection is necessary. And I can handle it in JoinToMultiJoinRule.

I think I could create a child class from calcite's Join, something like ConvertedJoin, to override getRowType. But I decided to avoid it. Let me know what do you think!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! Thanks for the reply :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know when you address the other points. If you have a draft PR for the other ticket, send it my way and I'll try to work this weekend. I have the PR for the exec node open and might have to slightly change the joinAttributeMap coming from the logical node #26647 (comment)

+- LogicalJoin(condition=[=($3, $0)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please write the test in .java? We're trying to move away from scala and implement new things in java

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.rules.logical

import org.apache.flink.table.api._
import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase}

import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.tools.RuleSets
import org.junit.jupiter.api.{BeforeEach, Test}

/** Tests for [[FlinkRightJoinToLeftJoinRule]]. */
class FlinkRightJoinToLeftJoinRuleTest extends TableTestBase {
private val util = batchTestUtil()

@BeforeEach
def setup(): Unit = {
util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use streaming for the setup?

val calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv.getConfig)
calciteConfig.getBatchProgram.get.addLast(
"right-to-left-rules",
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(RuleSets.ofList(FlinkRightJoinToLeftJoinRule.INSTANCE))
.build()
)

util.addTableSource[(Int, Long)]("T1", 'a, 'b)
util.addTableSource[(Int, Long, Long)]("T2", 'c, 'd, 'e)
util.addTableSource[(Int, Long)]("T3", 'f, 'g)
}

@Test
def testRightJoinIsConverted(): Unit = {
val sqlQuery = "SELECT * FROM T1 RIGHT JOIN T2 ON a = c"
util.verifyRelPlan(sqlQuery)
}

@Test
def testJoinChainIsConverted(): Unit = {
val sqlQuery = "SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test

util.verifyRelPlan(sqlQuery)
}

@Test
def testLeftJoinRemainsUnchanged(): Unit = {
val sqlQuery = "SELECT * FROM T1 LEFT JOIN T2 ON a = c"
util.verifyRelPlan(sqlQuery)
}

@Test
def testRightJoinWithExpressionCondition(): Unit = {
val sqlQuery = "SELECT * FROM T1 RIGHT JOIN T2 ON a + 1 = c - 1"
util.verifyRelPlan(sqlQuery)
}

@Test
def testRightJoinWithProjection(): Unit = {
val sqlQuery = "SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c"
util.verifyRelPlan(sqlQuery)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add one test for multiple chained right joins? T1 RIGHT JOIN T2 RIGHT JOIN T3