Skip to content

[FLINK-37814][table-planner] Add FlinkRightJoinToLeftJoinRule #26627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 9, 2025

Conversation

SteveStevenpoor
Copy link
Contributor

This PR is part of multi-way join implementation. See FLIP-516

What is the purpose of the change

StreamingMultiJoinOperator introduced in this PR supports only LEFT JOINs. Since in multi-way join left and right joins are practically the same we need a rule to change right join to the left ones before transforming join chain to single multijoin node.

Brief change log

  • Added FlinkRightJoinToLeftJoinRule

Verifying this change

  • Added simple test for rel plan verification after applying the rule

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@SteveStevenpoor
Copy link
Contributor Author

@gustavodemorais Hey! Please, take a look at this. Also I'm waiting for the ticket with join to multi join rule since i'm also almost done with it.

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 3, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

super(config);
}

@Deprecated // to be removed before 2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we are copying code that has deprecated methods that should have been removed for version 2.

Do we need to add these deprecated methods?

At the least we should remove the comment as 2.0 is now out - so anything that was not removed will not be until another version change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the remark! I removed these methods because in this case they are not really needed.

@@ -0,0 +1,79 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please write the test in .java? We're trying to move away from scala and implement new things in java


@BeforeEach
def setup(): Unit = {
util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use streaming for the setup?


@Test
def testJoinChainIsConverted(): Unit = {
val sqlQuery = "SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test

val sqlQuery = "SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c"
util.verifyRelPlan(sqlQuery)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add one test for multiple chained right joins? T1 RIGHT JOIN T2 RIGHT JOIN T3

<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(b=[$1], d=[$3])
+- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are indexes here for the same key (d=[$1] and d=[$3]) different?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to actually reorder the projected fields since the user expects the query to be returned as they wrote it? What we actually want is to make sure the indexes for the projected fields are correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we call transformTo on RelOptRuleCall calcite verifies that output record before transformation is the same as after applying. Without projection it will look like this:

Type mismatch:
rowtype of original rel: RecordType(INTEGER a, BIGINT b, INTEGER c, BIGINT d, BIGINT e) NOT NULL
rowtype of new rel: RecordType(INTEGER c, BIGINT d, BIGINT e, INTEGER a, BIGINT b) NOT NULL
Difference:
c: INTEGER -> BIGINT
d: BIGINT -> INTEGER

Since I swap left and right input output record field order is changed. So I had to add this projection to return field order back to normal.

RelNode project = reorderProjectedFields(left, right, relBuilder);

Say we have T1 with field a, T2 with field b and T3 with field c
(T1 join T2) right join T3 without multijoin will return abc
but with multijoin it will be converted to T3 left join (T1 join T2) ===> multijoin(T3, T1, T2) which will return cab.
So the projection is necessary. And I can handle it in JoinToMultiJoinRule.

I think I could create a child class from calcite's Join, something like ConvertedJoin, to override getRowType. But I decided to avoid it. Let me know what do you think!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! Thanks for the reply :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know when you address the other points. If you have a draft PR for the other ticket, send it my way and I'll try to work this weekend. I have the PR for the exec node open and might have to slightly change the joinAttributeMap coming from the logical node #26647 (comment)

@gustavodemorais
Copy link
Contributor

@gustavodemorais Hey! Please, take a look at this. Also I'm waiting for the ticket with join to multi join rule since i'm also almost done with it.

Here's the other ticket https://issues.apache.org/jira/browse/FLINK-37889

@SteveStevenpoor
Copy link
Contributor Author

Hey, @gustavodemorais! I added some tests, made it java instead of scala and changed batch program to stream program.

Copy link
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change and the contribution, @SteveStevenpoor! Two last nits and apart from that it looks good to me 👍

@@ -0,0 +1,139 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is .java, please move it to the java test path instead of scala. Smth like flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRuleTest.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank's, I've missed it! Now it's in java tests path.

@gustavodemorais
Copy link
Contributor

The PR looks good to me. Let's get a committer to review it and we should be good to go

@dawidwys dawidwys merged commit 58d3cb4 into apache:master Jun 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants