-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37814][table-planner] Add FlinkRightJoinToLeftJoinRule #26627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@gustavodemorais Hey! Please, take a look at this. Also I'm waiting for the ticket with join to multi join rule since i'm also almost done with it. |
… need of multi-way join operator
09738f8
to
d74d786
Compare
super(config); | ||
} | ||
|
||
@Deprecated // to be removed before 2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume we are copying code that has deprecated methods that should have been removed for version 2.
Do we need to add these deprecated methods?
At the least we should remove the comment as 2.0 is now out - so anything that was not removed will not be until another version change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the remark! I removed these methods because in this case they are not really needed.
@@ -0,0 +1,79 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please write the test in .java? We're trying to move away from scala and implement new things in java
|
||
@BeforeEach | ||
def setup(): Unit = { | ||
util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use streaming for the setup?
|
||
@Test | ||
def testJoinChainIsConverted(): Unit = { | ||
val sqlQuery = "SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test
val sqlQuery = "SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c" | ||
util.verifyRelPlan(sqlQuery) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add one test for multiple chained right joins? T1 RIGHT JOIN T2 RIGHT JOIN T3
<Resource name="optimized rel plan"> | ||
<![CDATA[ | ||
LogicalProject(b=[$1], d=[$3]) | ||
+- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are indexes here for the same key (d=[$1] and d=[$3]) different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to actually reorder the projected fields since the user expects the query to be returned as they wrote it? What we actually want is to make sure the indexes for the projected fields are correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we call transformTo on RelOptRuleCall calcite verifies that output record before transformation is the same as after applying. Without projection it will look like this:
Type mismatch:
rowtype of original rel: RecordType(INTEGER a, BIGINT b, INTEGER c, BIGINT d, BIGINT e) NOT NULL
rowtype of new rel: RecordType(INTEGER c, BIGINT d, BIGINT e, INTEGER a, BIGINT b) NOT NULL
Difference:
c: INTEGER -> BIGINT
d: BIGINT -> INTEGER
Since I swap left and right input output record field order is changed. So I had to add this projection to return field order back to normal.
RelNode project = reorderProjectedFields(left, right, relBuilder);
Say we have T1 with field a, T2 with field b and T3 with field c
(T1 join T2) right join T3
without multijoin will return abc
but with multijoin it will be converted to T3 left join (T1 join T2) ===> multijoin(T3, T1, T2)
which will return cab.
So the projection is necessary. And I can handle it in JoinToMultiJoinRule.
I think I could create a child class from calcite's Join, something like ConvertedJoin, to override getRowType. But I decided to avoid it. Let me know what do you think!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense! Thanks for the reply :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know when you address the other points. If you have a draft PR for the other ticket, send it my way and I'll try to work this weekend. I have the PR for the exec node open and might have to slightly change the joinAttributeMap coming from the logical node #26647 (comment)
Here's the other ticket https://issues.apache.org/jira/browse/FLINK-37889 |
Hey, @gustavodemorais! I added some tests, made it java instead of scala and changed batch program to stream program. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change and the contribution, @SteveStevenpoor! Two last nits and apart from that it looks good to me 👍
...ain/java/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRule.java
Show resolved
Hide resolved
@@ -0,0 +1,139 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that this is .java, please move it to the java test path instead of scala. Smth like flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRuleTest.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank's, I've missed it! Now it's in java tests path.
The PR looks good to me. Let's get a committer to review it and we should be good to go |
This PR is part of multi-way join implementation. See FLIP-516
What is the purpose of the change
StreamingMultiJoinOperator introduced in this PR supports only LEFT JOINs. Since in multi-way join left and right joins are practically the same we need a rule to change right join to the left ones before transforming join chain to single multijoin node.
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation