From cecc8a3e9f28785718cc1c9af1bf84c48ce844b0 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Sun, 22 Dec 2024 10:37:06 -0500 Subject: [PATCH 01/11] support left outer and left mark join for hash join rule Not ideal, wants to unite inner, left-outer, and left-mark into one rule Signed-off-by: Yuchen Liang --- optd-datafusion-bridge/src/from_optd.rs | 2 + optd-datafusion-repr/src/lib.rs | 6 +- optd-datafusion-repr/src/rules/joins.rs | 239 +++++++++++++++++++++++- 3 files changed, 240 insertions(+), 7 deletions(-) diff --git a/optd-datafusion-bridge/src/from_optd.rs b/optd-datafusion-bridge/src/from_optd.rs index 263976bd..fba074d8 100644 --- a/optd-datafusion-bridge/src/from_optd.rs +++ b/optd-datafusion-bridge/src/from_optd.rs @@ -529,6 +529,8 @@ impl OptdPlanContext<'_> { let right_exec = self.conv_from_optd_plan_node(node.right(), meta).await?; let join_type = match node.join_type() { JoinType::Inner => datafusion::logical_expr::JoinType::Inner, + JoinType::LeftOuter => datafusion::logical_expr::JoinType::Left, + JoinType::LeftMark => datafusion::logical_expr::JoinType::LeftMark, _ => unimplemented!(), }; let left_exprs = node.left_keys().to_vec(); diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index d7d17eec..a85bc19c 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -105,7 +105,9 @@ impl DatafusionOptimizer { rule_wrappers.push(Arc::new(rules::FilterInnerJoinTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterSortTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterAggTransposeRule::new())); - rule_wrappers.push(Arc::new(rules::HashJoinRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinInnerRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinLeftOuterRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinLeftMarkRule::new())); rule_wrappers.push(Arc::new(rules::JoinCommuteRule::new())); rule_wrappers.push(Arc::new(rules::JoinAssocRule::new())); rule_wrappers.push(Arc::new(rules::ProjectionPullUpJoin::new())); @@ -177,7 +179,7 @@ impl DatafusionOptimizer { for rule in rules { rule_wrappers.push(rule); } - rule_wrappers.push(Arc::new(rules::HashJoinRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinInnerRule::new())); rule_wrappers.insert(0, Arc::new(rules::JoinCommuteRule::new())); rule_wrappers.insert(1, Arc::new(rules::JoinAssocRule::new())); rule_wrappers.insert(2, Arc::new(rules::ProjectionPullUpJoin::new())); diff --git a/optd-datafusion-repr/src/rules/joins.rs b/optd-datafusion-repr/src/rules/joins.rs index ebc804ec..a041677c 100644 --- a/optd-datafusion-repr/src/rules/joins.rs +++ b/optd-datafusion-repr/src/rules/joins.rs @@ -141,12 +141,12 @@ fn apply_join_assoc( } define_impl_rule!( - HashJoinRule, - apply_hash_join, + HashJoinInnerRule, + apply_hash_join_inner, (Join(JoinType::Inner), left, right) ); -fn apply_hash_join( +fn apply_hash_join_inner( optimizer: &impl Optimizer, binding: ArcDfPlanNode, ) -> Vec> { @@ -154,6 +154,7 @@ fn apply_hash_join( let cond = join.cond(); let left = join.left(); let right = join.right(); + let join_type = join.join_type(); match cond.typ { DfPredType::BinOp(BinOpType::Eq) => { let left_schema = optimizer.get_schema_of(left.clone()); @@ -186,7 +187,7 @@ fn apply_hash_join( right, ListPred::new(vec![left_expr.into_pred_node()]), ListPred::new(vec![right_expr.into_pred_node()]), - JoinType::Inner, + *join_type, ); return vec![node.into_plan_node().into()]; } @@ -244,7 +245,235 @@ fn apply_hash_join( right, ListPred::new(left_exprs), ListPred::new(right_exprs), - JoinType::Inner, + *join_type, + ); + return vec![node.into_plan_node().into()]; + } + _ => {} + } + vec![] +} + +define_impl_rule!( + HashJoinLeftOuterRule, + apply_hash_join_left_outer, + (Join(JoinType::LeftOuter), left, right) +); + +fn apply_hash_join_left_outer( + optimizer: &impl Optimizer, + binding: ArcDfPlanNode, +) -> Vec> { + let join = LogicalJoin::from_plan_node(binding).unwrap(); + let cond = join.cond(); + let left = join.left(); + let right = join.right(); + let join_type = join.join_type(); + match cond.typ { + DfPredType::BinOp(BinOpType::Eq) => { + let left_schema = optimizer.get_schema_of(left.clone()); + let op = BinOpPred::from_pred_node(cond.clone()).unwrap(); + let left_expr = op.left_child(); + let right_expr = op.right_child(); + let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { + return vec![]; + }; + let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { + return vec![]; + }; + let can_convert = if left_expr.index() < left_schema.len() + && right_expr.index() >= left_schema.len() + { + true + } else if right_expr.index() < left_schema.len() + && left_expr.index() >= left_schema.len() + { + (left_expr, right_expr) = (right_expr, left_expr); + true + } else { + false + }; + + if can_convert { + let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); + let node = PhysicalHashJoin::new_unchecked( + left, + right, + ListPred::new(vec![left_expr.into_pred_node()]), + ListPred::new(vec![right_expr.into_pred_node()]), + *join_type, + ); + return vec![node.into_plan_node().into()]; + } + } + DfPredType::LogOp(LogOpType::And) => { + // currently only support consecutive equal queries + let mut is_consecutive_eq = true; + for child in cond.children.clone() { + if let DfPredType::BinOp(BinOpType::Eq) = child.typ { + continue; + } else { + is_consecutive_eq = false; + break; + } + } + if !is_consecutive_eq { + return vec![]; + } + + let left_schema = optimizer.get_schema_of(left.clone()); + let mut left_exprs = vec![]; + let mut right_exprs = vec![]; + for child in &cond.children { + let bin_op = BinOpPred::from_pred_node(child.clone()).unwrap(); + let left_expr = bin_op.left_child(); + let right_expr = bin_op.right_child(); + let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { + return vec![]; + }; + let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { + return vec![]; + }; + let can_convert = if left_expr.index() < left_schema.len() + && right_expr.index() >= left_schema.len() + { + true + } else if right_expr.index() < left_schema.len() + && left_expr.index() >= left_schema.len() + { + (left_expr, right_expr) = (right_expr, left_expr); + true + } else { + false + }; + if !can_convert { + return vec![]; + } + let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); + right_exprs.push(right_expr.into_pred_node()); + left_exprs.push(left_expr.into_pred_node()); + } + + let node = PhysicalHashJoin::new_unchecked( + left, + right, + ListPred::new(left_exprs), + ListPred::new(right_exprs), + *join_type, + ); + return vec![node.into_plan_node().into()]; + } + _ => {} + } + vec![] +} + +define_impl_rule!( + HashJoinLeftMarkRule, + apply_hash_join_left_mark, + (Join(JoinType::LeftMark), left, right) +); + +fn apply_hash_join_left_mark( + optimizer: &impl Optimizer, + binding: ArcDfPlanNode, +) -> Vec> { + let join = LogicalJoin::from_plan_node(binding).unwrap(); + let cond = join.cond(); + let left = join.left(); + let right = join.right(); + let join_type = join.join_type(); + match cond.typ { + DfPredType::BinOp(BinOpType::Eq) => { + let left_schema = optimizer.get_schema_of(left.clone()); + let op = BinOpPred::from_pred_node(cond.clone()).unwrap(); + let left_expr = op.left_child(); + let right_expr = op.right_child(); + let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { + return vec![]; + }; + let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { + return vec![]; + }; + let can_convert = if left_expr.index() < left_schema.len() + && right_expr.index() >= left_schema.len() + { + true + } else if right_expr.index() < left_schema.len() + && left_expr.index() >= left_schema.len() + { + (left_expr, right_expr) = (right_expr, left_expr); + true + } else { + false + }; + + if can_convert { + let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); + let node = PhysicalHashJoin::new_unchecked( + left, + right, + ListPred::new(vec![left_expr.into_pred_node()]), + ListPred::new(vec![right_expr.into_pred_node()]), + *join_type, + ); + return vec![node.into_plan_node().into()]; + } + } + DfPredType::LogOp(LogOpType::And) => { + // currently only support consecutive equal queries + let mut is_consecutive_eq = true; + for child in cond.children.clone() { + if let DfPredType::BinOp(BinOpType::Eq) = child.typ { + continue; + } else { + is_consecutive_eq = false; + break; + } + } + if !is_consecutive_eq { + return vec![]; + } + + let left_schema = optimizer.get_schema_of(left.clone()); + let mut left_exprs = vec![]; + let mut right_exprs = vec![]; + for child in &cond.children { + let bin_op = BinOpPred::from_pred_node(child.clone()).unwrap(); + let left_expr = bin_op.left_child(); + let right_expr = bin_op.right_child(); + let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { + return vec![]; + }; + let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { + return vec![]; + }; + let can_convert = if left_expr.index() < left_schema.len() + && right_expr.index() >= left_schema.len() + { + true + } else if right_expr.index() < left_schema.len() + && left_expr.index() >= left_schema.len() + { + (left_expr, right_expr) = (right_expr, left_expr); + true + } else { + false + }; + if !can_convert { + return vec![]; + } + let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); + right_exprs.push(right_expr.into_pred_node()); + left_exprs.push(left_expr.into_pred_node()); + } + + let node = PhysicalHashJoin::new_unchecked( + left, + right, + ListPred::new(left_exprs), + ListPred::new(right_exprs), + *join_type, ); return vec![node.into_plan_node().into()]; } From fa667278bd27b7a0b1a72c590b2a14bfb0fd5184 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Sun, 22 Dec 2024 10:38:52 -0500 Subject: [PATCH 02/11] apply planner test Signed-off-by: Yuchen Liang --- .../subqueries/subquery_unnesting.planner.sql | 207 ++++++++---------- .../tests/tpch/q16.planner.sql | 6 +- .../tests/tpch/q17.planner.sql | 88 ++++---- optd-sqlplannertest/tests/tpch/q2.planner.sql | 7 +- .../tests/tpch/q20.planner.sql | 137 ++++++------ .../tests/tpch/q22.planner.sql | 7 +- optd-sqlplannertest/tests/tpch/q4.planner.sql | 7 +- 7 files changed, 194 insertions(+), 265 deletions(-) diff --git a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql index 5b3be1b7..e2d3ad69 100644 --- a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql +++ b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql @@ -62,36 +62,29 @@ LogicalProjection { exprs: [ #0, #1 ] } ├── LogicalAgg { exprs: [], groups: [ #0 ] } │ └── LogicalScan { table: t1 } └── LogicalScan { table: t2 } -PhysicalProjection { exprs: [ #0, #1 ], cost: {compute=4033080,io=4000}, stat: {row_cnt=10} } -└── PhysicalProjection { exprs: [ #0, #1, #2, #4 ], cost: {compute=4033050,io=4000}, stat: {row_cnt=10} } - └── PhysicalFilter - ├── cond:Gt - │ ├── #4 - │ └── 100(i64) - ├── cost: {compute=4033000,io=4000} - ├── stat: {row_cnt=10} - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=4030000,io=4000}, stat: {row_cnt=1000} } - ├── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalNestedLoopJoin - ├── join_type: LeftOuter - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #1 - ├── cost: {compute=4018000,io=3000} - ├── stat: {row_cnt=10000} - ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalAgg - ├── aggrs:Agg(Sum) - │ └── [ Cast { cast_to: Int64, child: #2 } ] - ├── groups: [ #0 ] - ├── cost: {compute=14000,io=2000} - ├── stat: {row_cnt=1000} - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=6000,io=2000}, stat: {row_cnt=1000} } - ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } +PhysicalProjection { exprs: [ #0, #1 ], cost: {compute=24100,io=4000}, stat: {row_cnt=10} } +└── PhysicalProjection { exprs: [ #3, #4, #0, #2 ], cost: {compute=24070,io=4000}, stat: {row_cnt=10} } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=24020,io=4000}, stat: {row_cnt=10} } + ├── PhysicalFilter + │ ├── cond:Gt + │ │ ├── #2 + │ │ └── 100(i64) + │ ├── cost: {compute=23000,io=3000} + │ ├── stat: {row_cnt=10} + │ └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=20000,io=3000}, stat: {row_cnt=1000} } + │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalAgg + │ ├── aggrs:Agg(Sum) + │ │ └── [ Cast { cast_to: Int64, child: #2 } ] + │ ├── groups: [ #0 ] + │ ├── cost: {compute=14000,io=2000} + │ ├── stat: {row_cnt=1000} + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=6000,io=2000}, stat: {row_cnt=1000} } + │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } */ -- Test whether the optimizer can unnest correlated subqueries with (scalar op group agg) @@ -168,53 +161,39 @@ LogicalProjection { exprs: [ #0, #1 ] } ├── LogicalAgg { exprs: [], groups: [ #0 ] } │ └── LogicalScan { table: t1 } └── LogicalScan { table: t2 } -PhysicalProjection { exprs: [ #0, #1 ], cost: {compute=44228080,io=5000}, stat: {row_cnt=10} } -└── PhysicalProjection { exprs: [ #0, #1, #2, #4 ], cost: {compute=44228050,io=5000}, stat: {row_cnt=10} } - └── PhysicalFilter - ├── cond:Gt - │ ├── #4 - │ └── 100(i64) - ├── cost: {compute=44228000,io=5000} - ├── stat: {row_cnt=10} - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=44225000,io=5000}, stat: {row_cnt=1000} } - ├── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalNestedLoopJoin - ├── join_type: LeftOuter - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #1 - ├── cost: {compute=44123000,io=4000} - ├── stat: {row_cnt=100000} - ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalAgg - ├── aggrs:Agg(Sum) - │ └── [ #2 ] - ├── groups: [ #0 ] - ├── cost: {compute=4119000,io=3000} - ├── stat: {row_cnt=10000} - └── PhysicalProjection { exprs: [ #0, #2, #3 ], cost: {compute=4059000,io=3000}, stat: {row_cnt=10000} } - └── PhysicalNestedLoopJoin - ├── join_type: LeftOuter - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #1 - ├── cost: {compute=4019000,io=3000} - ├── stat: {row_cnt=10000} - ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalAgg - ├── aggrs:Agg(Sum) - │ └── [ Cast { cast_to: Int64, child: #2 } ] - ├── groups: [ #0, #1 ] - ├── cost: {compute=15000,io=2000} - ├── stat: {row_cnt=1000} - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=6000,io=2000}, stat: {row_cnt=1000} } - ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } +PhysicalProjection { exprs: [ #0, #1 ], cost: {compute=41100,io=5000}, stat: {row_cnt=10} } +└── PhysicalProjection { exprs: [ #3, #4, #0, #2 ], cost: {compute=41070,io=5000}, stat: {row_cnt=10} } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=41020,io=5000}, stat: {row_cnt=10} } + ├── PhysicalFilter + │ ├── cond:Gt + │ │ ├── #2 + │ │ └── 100(i64) + │ ├── cost: {compute=40000,io=4000} + │ ├── stat: {row_cnt=10} + │ └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=37000,io=4000}, stat: {row_cnt=1000} } + │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalAgg + │ ├── aggrs:Agg(Sum) + │ │ └── [ #2 ] + │ ├── groups: [ #0 ] + │ ├── cost: {compute=31000,io=3000} + │ ├── stat: {row_cnt=1000} + │ └── PhysicalProjection { exprs: [ #0, #2, #3 ], cost: {compute=25000,io=3000}, stat: {row_cnt=1000} } + │ └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=21000,io=3000}, stat: {row_cnt=1000} } + │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalAgg + │ ├── aggrs:Agg(Sum) + │ │ └── [ Cast { cast_to: Int64, child: #2 } ] + │ ├── groups: [ #0, #1 ] + │ ├── cost: {compute=15000,io=2000} + │ ├── stat: {row_cnt=1000} + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=6000,io=2000}, stat: {row_cnt=1000} } + │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } */ -- Test whether the optimizer can unnest correlated subqueries with scalar agg in select list @@ -264,18 +243,11 @@ LogicalProjection { exprs: [ #0, #2 ] } ├── LogicalAgg { exprs: [], groups: [ #0 ] } │ └── LogicalScan { table: t1 } └── LogicalScan { table: t2 } -PhysicalProjection { exprs: [ #0, #3 ], cost: {compute=4038000,io=4000}, stat: {row_cnt=1000} } -└── PhysicalProjection { exprs: [ #0, #1, #2, #4 ], cost: {compute=4035000,io=4000}, stat: {row_cnt=1000} } - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=4030000,io=4000}, stat: {row_cnt=1000} } - ├── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalNestedLoopJoin - ├── join_type: LeftOuter - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #1 - ├── cost: {compute=4018000,io=3000} - ├── stat: {row_cnt=10000} +PhysicalProjection { exprs: [ #0, #3 ], cost: {compute=29000,io=4000}, stat: {row_cnt=1000} } +└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=26000,io=4000}, stat: {row_cnt=1000} } + ├── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + └── PhysicalProjection { exprs: [ #0, #2 ], cost: {compute=23000,io=3000}, stat: {row_cnt=1000} } + └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=20000,io=3000}, stat: {row_cnt=1000} } ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } └── PhysicalAgg @@ -359,37 +331,30 @@ LogicalProjection { exprs: [ #0, #1 ] } └── LogicalJoin { join_type: Inner, cond: true } ├── LogicalScan { table: t2 } └── LogicalScan { table: t3 } -PhysicalProjection { exprs: [ #0, #1 ], cost: {compute=4036080,io=5000}, stat: {row_cnt=10} } -└── PhysicalProjection { exprs: [ #0, #1, #2, #4 ], cost: {compute=4036050,io=5000}, stat: {row_cnt=10} } - └── PhysicalFilter - ├── cond:Gt - │ ├── #4 - │ └── 100(i64) - ├── cost: {compute=4036000,io=5000} - ├── stat: {row_cnt=10} - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=4033000,io=5000}, stat: {row_cnt=1000} } - ├── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalNestedLoopJoin - ├── join_type: LeftOuter - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #1 - ├── cost: {compute=4021000,io=4000} - ├── stat: {row_cnt=10000} - ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalAgg - ├── aggrs:Agg(Sum) - │ └── [ Cast { cast_to: Int64, child: #2 } ] - ├── groups: [ #0 ] - ├── cost: {compute=17000,io=3000} - ├── stat: {row_cnt=1000} - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=9000,io=3000}, stat: {row_cnt=1000} } - ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } - │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #0 ], cost: {compute=3000,io=2000}, stat: {row_cnt=1000} } - ├── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } - └── PhysicalScan { table: t3, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } +PhysicalProjection { exprs: [ #0, #1 ], cost: {compute=27100,io=5000}, stat: {row_cnt=10} } +└── PhysicalProjection { exprs: [ #3, #4, #0, #2 ], cost: {compute=27070,io=5000}, stat: {row_cnt=10} } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=27020,io=5000}, stat: {row_cnt=10} } + ├── PhysicalFilter + │ ├── cond:Gt + │ │ ├── #2 + │ │ └── 100(i64) + │ ├── cost: {compute=26000,io=4000} + │ ├── stat: {row_cnt=10} + │ └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=23000,io=4000}, stat: {row_cnt=1000} } + │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalAgg + │ ├── aggrs:Agg(Sum) + │ │ └── [ Cast { cast_to: Int64, child: #2 } ] + │ ├── groups: [ #0 ] + │ ├── cost: {compute=17000,io=3000} + │ ├── stat: {row_cnt=1000} + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=9000,io=3000}, stat: {row_cnt=1000} } + │ ├── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=3000,io=1000}, stat: {row_cnt=1000} } + │ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #0 ], cost: {compute=3000,io=2000}, stat: {row_cnt=1000} } + │ ├── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalScan { table: t3, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } */ diff --git a/optd-sqlplannertest/tests/tpch/q16.planner.sql b/optd-sqlplannertest/tests/tpch/q16.planner.sql index ca5fdb66..9524f9ab 100644 --- a/optd-sqlplannertest/tests/tpch/q16.planner.sql +++ b/optd-sqlplannertest/tests/tpch/q16.planner.sql @@ -91,11 +91,7 @@ PhysicalSort │ ├── InList { expr: Cast { cast_to: Int64, child: #10 }, list: [ 49(i64), 14(i64), 23(i64), 45(i64), 19(i64), 3(i64), 36(i64), 9(i64) ], negated: false } │ └── Not │ └── [ #14 ] - └── PhysicalNestedLoopJoin - ├── join_type: LeftMark - ├── cond:Eq - │ ├── #1 - │ └── #14 + └── PhysicalHashJoin { join_type: LeftMark, left_keys: [ #1 ], right_keys: [ #0 ] } ├── PhysicalNestedLoopJoin { join_type: Inner, cond: true } │ ├── PhysicalScan { table: partsupp } │ └── PhysicalScan { table: part } diff --git a/optd-sqlplannertest/tests/tpch/q17.planner.sql b/optd-sqlplannertest/tests/tpch/q17.planner.sql index 1410e2e0..5b50a0a3 100644 --- a/optd-sqlplannertest/tests/tpch/q17.planner.sql +++ b/optd-sqlplannertest/tests/tpch/q17.planner.sql @@ -78,52 +78,48 @@ PhysicalProjection └── PhysicalNestedLoopJoin ├── join_type: Inner ├── cond:And - │ ├── Eq - │ │ ├── #16 - │ │ └── #1 - │ └── Lt - │ ├── Cast { cast_to: Decimal128(30, 15), child: #4 } - │ └── #26 - ├── PhysicalScan { table: lineitem } - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } - ├── PhysicalFilter - │ ├── cond:And - │ │ ├── Eq - │ │ │ ├── #3 - │ │ │ └── "Brand#13" - │ │ └── Eq - │ │ ├── #6 - │ │ └── "JUMBO PKG" - │ └── PhysicalScan { table: part } - └── PhysicalProjection - ├── exprs: - │ ┌── #0 - │ └── Cast - │ ├── cast_to: Decimal128(30, 15) - │ ├── child:Mul - │ │ ├── 0.2(float) - │ │ └── Cast { cast_to: Float64, child: #1 } + │ ├── Lt + │ │ ├── Cast { cast_to: Decimal128(30, 15), child: #4 } + │ │ └── #26 + │ └── Eq + │ ├── #16 + │ └── #25 + ├── PhysicalProjection { exprs: [ #9, #10, #11, #12, #13, #14, #15, #16, #17, #18, #19, #20, #21, #22, #23, #24, #0, #1, #2, #3, #4, #5, #6, #7, #8 ] } + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ] } + │ ├── PhysicalFilter + │ │ ├── cond:And + │ │ │ ├── Eq + │ │ │ │ ├── #3 + │ │ │ │ └── "Brand#13" + │ │ │ └── Eq + │ │ │ ├── #6 + │ │ │ └── "JUMBO PKG" + │ │ └── PhysicalScan { table: part } + │ └── PhysicalScan { table: lineitem } + └── PhysicalProjection + ├── exprs: + │ ┌── #0 + │ └── Cast + │ ├── cast_to: Decimal128(30, 15) + │ ├── child:Mul + │ │ ├── 0.2(float) + │ │ └── Cast { cast_to: Float64, child: #1 } - └── PhysicalProjection { exprs: [ #0, #2 ] } - └── PhysicalNestedLoopJoin - ├── join_type: LeftOuter - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #1 - ├── PhysicalAgg { aggrs: [], groups: [ #16 ] } - │ └── PhysicalNestedLoopJoin { join_type: Inner, cond: true } - │ ├── PhysicalScan { table: lineitem } - │ └── PhysicalScan { table: part } - └── PhysicalAgg - ├── aggrs:Agg(Avg) - │ └── [ #5 ] - ├── groups: [ #0 ] - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ] } - ├── PhysicalAgg { aggrs: [], groups: [ #16 ] } - │ └── PhysicalNestedLoopJoin { join_type: Inner, cond: true } - │ ├── PhysicalScan { table: lineitem } - │ └── PhysicalScan { table: part } - └── PhysicalScan { table: lineitem } + └── PhysicalProjection { exprs: [ #0, #2 ] } + └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0 ], right_keys: [ #0 ] } + ├── PhysicalAgg { aggrs: [], groups: [ #16 ] } + │ └── PhysicalNestedLoopJoin { join_type: Inner, cond: true } + │ ├── PhysicalScan { table: lineitem } + │ └── PhysicalScan { table: part } + └── PhysicalAgg + ├── aggrs:Agg(Avg) + │ └── [ #5 ] + ├── groups: [ #0 ] + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ] } + ├── PhysicalAgg { aggrs: [], groups: [ #16 ] } + │ └── PhysicalNestedLoopJoin { join_type: Inner, cond: true } + │ ├── PhysicalScan { table: lineitem } + │ └── PhysicalScan { table: part } + └── PhysicalScan { table: lineitem } */ diff --git a/optd-sqlplannertest/tests/tpch/q2.planner.sql b/optd-sqlplannertest/tests/tpch/q2.planner.sql index 013ff9a4..3b99799e 100644 --- a/optd-sqlplannertest/tests/tpch/q2.planner.sql +++ b/optd-sqlplannertest/tests/tpch/q2.planner.sql @@ -262,12 +262,7 @@ PhysicalLimit { skip: 0(i64), fetch: 100(i64) } │ │ └── "AFRICA" │ └── PhysicalScan { table: region } └── PhysicalProjection { exprs: [ #0, #2 ] } - └── PhysicalNestedLoopJoin - ├── join_type: LeftOuter - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #1 + └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0 ], right_keys: [ #0 ] } ├── PhysicalAgg { aggrs: [], groups: [ #0 ] } │ └── PhysicalNestedLoopJoin { join_type: Inner, cond: true } │ ├── PhysicalNestedLoopJoin { join_type: Inner, cond: true } diff --git a/optd-sqlplannertest/tests/tpch/q20.planner.sql b/optd-sqlplannertest/tests/tpch/q20.planner.sql index 181ad0b3..a9d8b779 100644 --- a/optd-sqlplannertest/tests/tpch/q20.planner.sql +++ b/optd-sqlplannertest/tests/tpch/q20.planner.sql @@ -106,84 +106,71 @@ PhysicalSort │ └── Eq │ ├── #8 │ └── "IRAQ" - └── PhysicalNestedLoopJoin - ├── join_type: LeftMark - ├── cond:Eq - │ ├── #0 - │ └── #11 + └── PhysicalHashJoin { join_type: LeftMark, left_keys: [ #0 ], right_keys: [ #0 ] } ├── PhysicalNestedLoopJoin { join_type: Inner, cond: true } │ ├── PhysicalScan { table: supplier } │ └── PhysicalScan { table: nation } └── PhysicalProjection { exprs: [ #1 ] } - └── PhysicalProjection { exprs: [ #3, #4, #5, #6, #7, #8, #0, #1, #2 ] } - └── PhysicalFilter - ├── cond:And - │ ├── #8 - │ └── Gt - │ ├── Cast { cast_to: Float64, child: #5 } - │ └── #2 - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0, #1 ], right_keys: [ #0, #1 ] } - ├── PhysicalProjection - │ ├── exprs: - │ │ ┌── #0 - │ │ ├── #1 - │ │ └── Mul - │ │ ├── 0.5(float) - │ │ └── Cast { cast_to: Float64, child: #2 } - │ └── PhysicalProjection { exprs: [ #0, #1, #4 ] } - │ └── PhysicalNestedLoopJoin - │ ├── join_type: LeftOuter - │ ├── cond:And - │ │ ├── Eq - │ │ │ ├── #0 - │ │ │ └── #2 - │ │ └── Eq - │ │ ├── #1 - │ │ └── #3 - │ ├── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] } - │ │ └── PhysicalNestedLoopJoin - │ │ ├── join_type: LeftMark - │ │ ├── cond:Eq - │ │ │ ├── #0 - │ │ │ └── #5 - │ │ ├── PhysicalScan { table: partsupp } - │ │ └── PhysicalProjection { exprs: [ #0 ] } - │ │ └── PhysicalFilter { cond: Like { expr: #1, pattern: "indian%", negated: false, case_insensitive: false } } - │ │ └── PhysicalScan { table: part } - │ └── PhysicalAgg - │ ├── aggrs:Agg(Sum) - │ │ └── [ #6 ] - │ ├── groups: [ #0, #1 ] - │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0, #1 ], right_keys: [ #1, #2 ] } - │ ├── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] } - │ │ └── PhysicalNestedLoopJoin - │ │ ├── join_type: LeftMark - │ │ ├── cond:Eq - │ │ │ ├── #0 - │ │ │ └── #5 - │ │ ├── PhysicalScan { table: partsupp } - │ │ └── PhysicalProjection { exprs: [ #0 ] } - │ │ └── PhysicalFilter { cond: Like { expr: #1, pattern: "indian%", negated: false, case_insensitive: false } } - │ │ └── PhysicalScan { table: part } - │ └── PhysicalFilter - │ ├── cond:And - │ │ ├── Geq - │ │ │ ├── #10 - │ │ │ └── Cast { cast_to: Date32, child: "1996-01-01" } - │ │ └── Lt - │ │ ├── #10 - │ │ └── Add - │ │ ├── Cast { cast_to: Date32, child: "1996-01-01" } - │ │ └── INTERVAL_MONTH_DAY_NANO (12, 0, 0) - │ └── PhysicalScan { table: lineitem } - └── PhysicalNestedLoopJoin - ├── join_type: LeftMark - ├── cond:Eq - │ ├── #0 - │ └── #5 - ├── PhysicalScan { table: partsupp } - └── PhysicalProjection { exprs: [ #0 ] } - └── PhysicalFilter { cond: Like { expr: #1, pattern: "indian%", negated: false, case_insensitive: false } } - └── PhysicalScan { table: part } + └── PhysicalNestedLoopJoin + ├── join_type: Inner + ├── cond:And + │ ├── Gt + │ │ ├── Cast { cast_to: Float64, child: #2 } + │ │ └── #8 + │ ├── Eq + │ │ ├── #0 + │ │ └── #6 + │ └── Eq + │ ├── #1 + │ └── #7 + ├── PhysicalFilter { cond: #5 } + │ └── PhysicalHashJoin { join_type: LeftMark, left_keys: [ #0 ], right_keys: [ #0 ] } + │ ├── PhysicalScan { table: partsupp } + │ └── PhysicalProjection { exprs: [ #0 ] } + │ └── PhysicalFilter { cond: Like { expr: #1, pattern: "indian%", negated: false, case_insensitive: false } } + │ └── PhysicalScan { table: part } + └── PhysicalProjection + ├── exprs: + │ ┌── #0 + │ ├── #1 + │ └── Mul + │ ├── 0.5(float) + │ └── Cast { cast_to: Float64, child: #2 } + └── PhysicalProjection { exprs: [ #0, #1, #4 ] } + └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0, #1 ], right_keys: [ #0, #1 ] } + ├── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] } + │ └── PhysicalHashJoin { join_type: LeftMark, left_keys: [ #0 ], right_keys: [ #0 ] } + │ ├── PhysicalScan { table: partsupp } + │ └── PhysicalProjection { exprs: [ #0 ] } + │ └── PhysicalFilter { cond: Like { expr: #1, pattern: "indian%", negated: false, case_insensitive: false } } + │ └── PhysicalScan { table: part } + └── PhysicalAgg + ├── aggrs:Agg(Sum) + │ └── [ #6 ] + ├── groups: [ #0, #1 ] + └── PhysicalFilter + ├── cond:And + │ ├── Eq + │ │ ├── #3 + │ │ └── #0 + │ ├── Eq + │ │ ├── #4 + │ │ └── #1 + │ ├── Geq + │ │ ├── #12 + │ │ └── Cast { cast_to: Date32, child: "1996-01-01" } + │ └── Lt + │ ├── #12 + │ └── Add + │ ├── Cast { cast_to: Date32, child: "1996-01-01" } + │ └── INTERVAL_MONTH_DAY_NANO (12, 0, 0) + └── PhysicalNestedLoopJoin { join_type: Inner, cond: true } + ├── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] } + │ └── PhysicalHashJoin { join_type: LeftMark, left_keys: [ #0 ], right_keys: [ #0 ] } + │ ├── PhysicalScan { table: partsupp } + │ └── PhysicalProjection { exprs: [ #0 ] } + │ └── PhysicalFilter { cond: Like { expr: #1, pattern: "indian%", negated: false, case_insensitive: false } } + │ └── PhysicalScan { table: part } + └── PhysicalScan { table: lineitem } */ diff --git a/optd-sqlplannertest/tests/tpch/q22.planner.sql b/optd-sqlplannertest/tests/tpch/q22.planner.sql index 01a22608..d80ea330 100644 --- a/optd-sqlplannertest/tests/tpch/q22.planner.sql +++ b/optd-sqlplannertest/tests/tpch/q22.planner.sql @@ -121,12 +121,7 @@ PhysicalSort │ │ └── #8 │ └── Not │ └── [ #9 ] - └── PhysicalNestedLoopJoin - ├── join_type: LeftMark - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #9 + └── PhysicalHashJoin { join_type: LeftMark, left_keys: [ #0 ], right_keys: [ #0 ] } ├── PhysicalNestedLoopJoin { join_type: Inner, cond: true } │ ├── PhysicalScan { table: customer } │ └── PhysicalAgg diff --git a/optd-sqlplannertest/tests/tpch/q4.planner.sql b/optd-sqlplannertest/tests/tpch/q4.planner.sql index 99bc78dc..27655428 100644 --- a/optd-sqlplannertest/tests/tpch/q4.planner.sql +++ b/optd-sqlplannertest/tests/tpch/q4.planner.sql @@ -71,12 +71,7 @@ PhysicalSort │ │ ├── Cast { cast_to: Date32, child: "1993-07-01" } │ │ └── INTERVAL_MONTH_DAY_NANO (3, 0, 0) │ └── #9 - └── PhysicalNestedLoopJoin - ├── join_type: LeftMark - ├── cond:And - │ └── Eq - │ ├── #0 - │ └── #9 + └── PhysicalHashJoin { join_type: LeftMark, left_keys: [ #0 ], right_keys: [ #0 ] } ├── PhysicalScan { table: orders } └── PhysicalProjection { exprs: [ #16, #0, #1, #2, #3, #4, #5, #6, #7, #8, #9, #10, #11, #12, #13, #14, #15 ] } └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } From 973e80c5b39fcb0fed6e8ccd09af3923d7747261 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Sun, 22 Dec 2024 11:03:29 -0500 Subject: [PATCH 03/11] use define_impl_rule_discriminant Signed-off-by: Yuchen Liang --- optd-datafusion-repr/src/lib.rs | 6 +- optd-datafusion-repr/src/rules/joins.rs | 239 +---------------------- optd-datafusion-repr/src/rules/macros.rs | 13 +- 3 files changed, 18 insertions(+), 240 deletions(-) diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index a85bc19c..d7d17eec 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -105,9 +105,7 @@ impl DatafusionOptimizer { rule_wrappers.push(Arc::new(rules::FilterInnerJoinTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterSortTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterAggTransposeRule::new())); - rule_wrappers.push(Arc::new(rules::HashJoinInnerRule::new())); - rule_wrappers.push(Arc::new(rules::HashJoinLeftOuterRule::new())); - rule_wrappers.push(Arc::new(rules::HashJoinLeftMarkRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinRule::new())); rule_wrappers.push(Arc::new(rules::JoinCommuteRule::new())); rule_wrappers.push(Arc::new(rules::JoinAssocRule::new())); rule_wrappers.push(Arc::new(rules::ProjectionPullUpJoin::new())); @@ -179,7 +177,7 @@ impl DatafusionOptimizer { for rule in rules { rule_wrappers.push(rule); } - rule_wrappers.push(Arc::new(rules::HashJoinInnerRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinRule::new())); rule_wrappers.insert(0, Arc::new(rules::JoinCommuteRule::new())); rule_wrappers.insert(1, Arc::new(rules::JoinAssocRule::new())); rule_wrappers.insert(2, Arc::new(rules::ProjectionPullUpJoin::new())); diff --git a/optd-datafusion-repr/src/rules/joins.rs b/optd-datafusion-repr/src/rules/joins.rs index a041677c..432c06d3 100644 --- a/optd-datafusion-repr/src/rules/joins.rs +++ b/optd-datafusion-repr/src/rules/joins.rs @@ -9,7 +9,7 @@ use optd_core::nodes::PlanNodeOrGroup; use optd_core::optimizer::Optimizer; use optd_core::rules::{Rule, RuleMatcher}; -use super::macros::{define_impl_rule, define_rule}; +use super::macros::{define_impl_rule_discriminant, define_rule}; use crate::plan_nodes::{ ArcDfPlanNode, BinOpPred, BinOpType, ColumnRefPred, ConstantPred, ConstantType, DfNodeType, DfPredType, DfReprPlanNode, DfReprPredNode, JoinType, ListPred, LogOpType, @@ -140,241 +140,14 @@ fn apply_join_assoc( vec![node.into_plan_node().into()] } -define_impl_rule!( - HashJoinInnerRule, - apply_hash_join_inner, +// Note: this matches all join types despite using `JoinType::Inner` below. +define_impl_rule_discriminant!( + HashJoinRule, + apply_hash_join, (Join(JoinType::Inner), left, right) ); -fn apply_hash_join_inner( - optimizer: &impl Optimizer, - binding: ArcDfPlanNode, -) -> Vec> { - let join = LogicalJoin::from_plan_node(binding).unwrap(); - let cond = join.cond(); - let left = join.left(); - let right = join.right(); - let join_type = join.join_type(); - match cond.typ { - DfPredType::BinOp(BinOpType::Eq) => { - let left_schema = optimizer.get_schema_of(left.clone()); - let op = BinOpPred::from_pred_node(cond.clone()).unwrap(); - let left_expr = op.left_child(); - let right_expr = op.right_child(); - let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { - return vec![]; - }; - let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { - return vec![]; - }; - let can_convert = if left_expr.index() < left_schema.len() - && right_expr.index() >= left_schema.len() - { - true - } else if right_expr.index() < left_schema.len() - && left_expr.index() >= left_schema.len() - { - (left_expr, right_expr) = (right_expr, left_expr); - true - } else { - false - }; - - if can_convert { - let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); - let node = PhysicalHashJoin::new_unchecked( - left, - right, - ListPred::new(vec![left_expr.into_pred_node()]), - ListPred::new(vec![right_expr.into_pred_node()]), - *join_type, - ); - return vec![node.into_plan_node().into()]; - } - } - DfPredType::LogOp(LogOpType::And) => { - // currently only support consecutive equal queries - let mut is_consecutive_eq = true; - for child in cond.children.clone() { - if let DfPredType::BinOp(BinOpType::Eq) = child.typ { - continue; - } else { - is_consecutive_eq = false; - break; - } - } - if !is_consecutive_eq { - return vec![]; - } - - let left_schema = optimizer.get_schema_of(left.clone()); - let mut left_exprs = vec![]; - let mut right_exprs = vec![]; - for child in &cond.children { - let bin_op = BinOpPred::from_pred_node(child.clone()).unwrap(); - let left_expr = bin_op.left_child(); - let right_expr = bin_op.right_child(); - let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { - return vec![]; - }; - let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { - return vec![]; - }; - let can_convert = if left_expr.index() < left_schema.len() - && right_expr.index() >= left_schema.len() - { - true - } else if right_expr.index() < left_schema.len() - && left_expr.index() >= left_schema.len() - { - (left_expr, right_expr) = (right_expr, left_expr); - true - } else { - false - }; - if !can_convert { - return vec![]; - } - let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); - right_exprs.push(right_expr.into_pred_node()); - left_exprs.push(left_expr.into_pred_node()); - } - - let node = PhysicalHashJoin::new_unchecked( - left, - right, - ListPred::new(left_exprs), - ListPred::new(right_exprs), - *join_type, - ); - return vec![node.into_plan_node().into()]; - } - _ => {} - } - vec![] -} - -define_impl_rule!( - HashJoinLeftOuterRule, - apply_hash_join_left_outer, - (Join(JoinType::LeftOuter), left, right) -); - -fn apply_hash_join_left_outer( - optimizer: &impl Optimizer, - binding: ArcDfPlanNode, -) -> Vec> { - let join = LogicalJoin::from_plan_node(binding).unwrap(); - let cond = join.cond(); - let left = join.left(); - let right = join.right(); - let join_type = join.join_type(); - match cond.typ { - DfPredType::BinOp(BinOpType::Eq) => { - let left_schema = optimizer.get_schema_of(left.clone()); - let op = BinOpPred::from_pred_node(cond.clone()).unwrap(); - let left_expr = op.left_child(); - let right_expr = op.right_child(); - let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { - return vec![]; - }; - let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { - return vec![]; - }; - let can_convert = if left_expr.index() < left_schema.len() - && right_expr.index() >= left_schema.len() - { - true - } else if right_expr.index() < left_schema.len() - && left_expr.index() >= left_schema.len() - { - (left_expr, right_expr) = (right_expr, left_expr); - true - } else { - false - }; - - if can_convert { - let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); - let node = PhysicalHashJoin::new_unchecked( - left, - right, - ListPred::new(vec![left_expr.into_pred_node()]), - ListPred::new(vec![right_expr.into_pred_node()]), - *join_type, - ); - return vec![node.into_plan_node().into()]; - } - } - DfPredType::LogOp(LogOpType::And) => { - // currently only support consecutive equal queries - let mut is_consecutive_eq = true; - for child in cond.children.clone() { - if let DfPredType::BinOp(BinOpType::Eq) = child.typ { - continue; - } else { - is_consecutive_eq = false; - break; - } - } - if !is_consecutive_eq { - return vec![]; - } - - let left_schema = optimizer.get_schema_of(left.clone()); - let mut left_exprs = vec![]; - let mut right_exprs = vec![]; - for child in &cond.children { - let bin_op = BinOpPred::from_pred_node(child.clone()).unwrap(); - let left_expr = bin_op.left_child(); - let right_expr = bin_op.right_child(); - let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { - return vec![]; - }; - let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { - return vec![]; - }; - let can_convert = if left_expr.index() < left_schema.len() - && right_expr.index() >= left_schema.len() - { - true - } else if right_expr.index() < left_schema.len() - && left_expr.index() >= left_schema.len() - { - (left_expr, right_expr) = (right_expr, left_expr); - true - } else { - false - }; - if !can_convert { - return vec![]; - } - let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); - right_exprs.push(right_expr.into_pred_node()); - left_exprs.push(left_expr.into_pred_node()); - } - - let node = PhysicalHashJoin::new_unchecked( - left, - right, - ListPred::new(left_exprs), - ListPred::new(right_exprs), - *join_type, - ); - return vec![node.into_plan_node().into()]; - } - _ => {} - } - vec![] -} - -define_impl_rule!( - HashJoinLeftMarkRule, - apply_hash_join_left_mark, - (Join(JoinType::LeftMark), left, right) -); - -fn apply_hash_join_left_mark( +fn apply_hash_join( optimizer: &impl Optimizer, binding: ArcDfPlanNode, ) -> Vec> { diff --git a/optd-datafusion-repr/src/rules/macros.rs b/optd-datafusion-repr/src/rules/macros.rs index 420e2963..412f83c9 100644 --- a/optd-datafusion-repr/src/rules/macros.rs +++ b/optd-datafusion-repr/src/rules/macros.rs @@ -79,12 +79,19 @@ macro_rules! define_rule_discriminant { }; } -macro_rules! define_impl_rule { +// macro_rules! define_impl_rule { +// ($name:ident, $apply:ident, $($matcher:tt)+) => { +// crate::rules::macros::define_rule_inner! { true, false, $name, $apply, $($matcher)+ } +// }; +// } + +macro_rules! define_impl_rule_discriminant { ($name:ident, $apply:ident, $($matcher:tt)+) => { - crate::rules::macros::define_rule_inner! { true, false, $name, $apply, $($matcher)+ } + crate::rules::macros::define_rule_inner! { true, true, $name, $apply, $($matcher)+ } }; } pub(crate) use { - define_impl_rule, define_matcher, define_rule, define_rule_discriminant, define_rule_inner, + define_impl_rule_discriminant, define_matcher, define_rule, define_rule_discriminant, + define_rule_inner, }; From 8138ae3352352cae4305339148ba3f41d796affd Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Sat, 4 Jan 2025 23:56:54 -0500 Subject: [PATCH 04/11] Revert "use define_impl_rule_discriminant" This reverts commit 973e80c5b39fcb0fed6e8ccd09af3923d7747261. --- optd-datafusion-repr/src/lib.rs | 6 +- optd-datafusion-repr/src/rules/joins.rs | 239 ++++++++++++++++++++++- optd-datafusion-repr/src/rules/macros.rs | 13 +- 3 files changed, 240 insertions(+), 18 deletions(-) diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index d7d17eec..a85bc19c 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -105,7 +105,9 @@ impl DatafusionOptimizer { rule_wrappers.push(Arc::new(rules::FilterInnerJoinTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterSortTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterAggTransposeRule::new())); - rule_wrappers.push(Arc::new(rules::HashJoinRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinInnerRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinLeftOuterRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinLeftMarkRule::new())); rule_wrappers.push(Arc::new(rules::JoinCommuteRule::new())); rule_wrappers.push(Arc::new(rules::JoinAssocRule::new())); rule_wrappers.push(Arc::new(rules::ProjectionPullUpJoin::new())); @@ -177,7 +179,7 @@ impl DatafusionOptimizer { for rule in rules { rule_wrappers.push(rule); } - rule_wrappers.push(Arc::new(rules::HashJoinRule::new())); + rule_wrappers.push(Arc::new(rules::HashJoinInnerRule::new())); rule_wrappers.insert(0, Arc::new(rules::JoinCommuteRule::new())); rule_wrappers.insert(1, Arc::new(rules::JoinAssocRule::new())); rule_wrappers.insert(2, Arc::new(rules::ProjectionPullUpJoin::new())); diff --git a/optd-datafusion-repr/src/rules/joins.rs b/optd-datafusion-repr/src/rules/joins.rs index 432c06d3..a041677c 100644 --- a/optd-datafusion-repr/src/rules/joins.rs +++ b/optd-datafusion-repr/src/rules/joins.rs @@ -9,7 +9,7 @@ use optd_core::nodes::PlanNodeOrGroup; use optd_core::optimizer::Optimizer; use optd_core::rules::{Rule, RuleMatcher}; -use super::macros::{define_impl_rule_discriminant, define_rule}; +use super::macros::{define_impl_rule, define_rule}; use crate::plan_nodes::{ ArcDfPlanNode, BinOpPred, BinOpType, ColumnRefPred, ConstantPred, ConstantType, DfNodeType, DfPredType, DfReprPlanNode, DfReprPredNode, JoinType, ListPred, LogOpType, @@ -140,14 +140,241 @@ fn apply_join_assoc( vec![node.into_plan_node().into()] } -// Note: this matches all join types despite using `JoinType::Inner` below. -define_impl_rule_discriminant!( - HashJoinRule, - apply_hash_join, +define_impl_rule!( + HashJoinInnerRule, + apply_hash_join_inner, (Join(JoinType::Inner), left, right) ); -fn apply_hash_join( +fn apply_hash_join_inner( + optimizer: &impl Optimizer, + binding: ArcDfPlanNode, +) -> Vec> { + let join = LogicalJoin::from_plan_node(binding).unwrap(); + let cond = join.cond(); + let left = join.left(); + let right = join.right(); + let join_type = join.join_type(); + match cond.typ { + DfPredType::BinOp(BinOpType::Eq) => { + let left_schema = optimizer.get_schema_of(left.clone()); + let op = BinOpPred::from_pred_node(cond.clone()).unwrap(); + let left_expr = op.left_child(); + let right_expr = op.right_child(); + let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { + return vec![]; + }; + let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { + return vec![]; + }; + let can_convert = if left_expr.index() < left_schema.len() + && right_expr.index() >= left_schema.len() + { + true + } else if right_expr.index() < left_schema.len() + && left_expr.index() >= left_schema.len() + { + (left_expr, right_expr) = (right_expr, left_expr); + true + } else { + false + }; + + if can_convert { + let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); + let node = PhysicalHashJoin::new_unchecked( + left, + right, + ListPred::new(vec![left_expr.into_pred_node()]), + ListPred::new(vec![right_expr.into_pred_node()]), + *join_type, + ); + return vec![node.into_plan_node().into()]; + } + } + DfPredType::LogOp(LogOpType::And) => { + // currently only support consecutive equal queries + let mut is_consecutive_eq = true; + for child in cond.children.clone() { + if let DfPredType::BinOp(BinOpType::Eq) = child.typ { + continue; + } else { + is_consecutive_eq = false; + break; + } + } + if !is_consecutive_eq { + return vec![]; + } + + let left_schema = optimizer.get_schema_of(left.clone()); + let mut left_exprs = vec![]; + let mut right_exprs = vec![]; + for child in &cond.children { + let bin_op = BinOpPred::from_pred_node(child.clone()).unwrap(); + let left_expr = bin_op.left_child(); + let right_expr = bin_op.right_child(); + let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { + return vec![]; + }; + let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { + return vec![]; + }; + let can_convert = if left_expr.index() < left_schema.len() + && right_expr.index() >= left_schema.len() + { + true + } else if right_expr.index() < left_schema.len() + && left_expr.index() >= left_schema.len() + { + (left_expr, right_expr) = (right_expr, left_expr); + true + } else { + false + }; + if !can_convert { + return vec![]; + } + let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); + right_exprs.push(right_expr.into_pred_node()); + left_exprs.push(left_expr.into_pred_node()); + } + + let node = PhysicalHashJoin::new_unchecked( + left, + right, + ListPred::new(left_exprs), + ListPred::new(right_exprs), + *join_type, + ); + return vec![node.into_plan_node().into()]; + } + _ => {} + } + vec![] +} + +define_impl_rule!( + HashJoinLeftOuterRule, + apply_hash_join_left_outer, + (Join(JoinType::LeftOuter), left, right) +); + +fn apply_hash_join_left_outer( + optimizer: &impl Optimizer, + binding: ArcDfPlanNode, +) -> Vec> { + let join = LogicalJoin::from_plan_node(binding).unwrap(); + let cond = join.cond(); + let left = join.left(); + let right = join.right(); + let join_type = join.join_type(); + match cond.typ { + DfPredType::BinOp(BinOpType::Eq) => { + let left_schema = optimizer.get_schema_of(left.clone()); + let op = BinOpPred::from_pred_node(cond.clone()).unwrap(); + let left_expr = op.left_child(); + let right_expr = op.right_child(); + let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { + return vec![]; + }; + let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { + return vec![]; + }; + let can_convert = if left_expr.index() < left_schema.len() + && right_expr.index() >= left_schema.len() + { + true + } else if right_expr.index() < left_schema.len() + && left_expr.index() >= left_schema.len() + { + (left_expr, right_expr) = (right_expr, left_expr); + true + } else { + false + }; + + if can_convert { + let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); + let node = PhysicalHashJoin::new_unchecked( + left, + right, + ListPred::new(vec![left_expr.into_pred_node()]), + ListPred::new(vec![right_expr.into_pred_node()]), + *join_type, + ); + return vec![node.into_plan_node().into()]; + } + } + DfPredType::LogOp(LogOpType::And) => { + // currently only support consecutive equal queries + let mut is_consecutive_eq = true; + for child in cond.children.clone() { + if let DfPredType::BinOp(BinOpType::Eq) = child.typ { + continue; + } else { + is_consecutive_eq = false; + break; + } + } + if !is_consecutive_eq { + return vec![]; + } + + let left_schema = optimizer.get_schema_of(left.clone()); + let mut left_exprs = vec![]; + let mut right_exprs = vec![]; + for child in &cond.children { + let bin_op = BinOpPred::from_pred_node(child.clone()).unwrap(); + let left_expr = bin_op.left_child(); + let right_expr = bin_op.right_child(); + let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { + return vec![]; + }; + let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { + return vec![]; + }; + let can_convert = if left_expr.index() < left_schema.len() + && right_expr.index() >= left_schema.len() + { + true + } else if right_expr.index() < left_schema.len() + && left_expr.index() >= left_schema.len() + { + (left_expr, right_expr) = (right_expr, left_expr); + true + } else { + false + }; + if !can_convert { + return vec![]; + } + let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); + right_exprs.push(right_expr.into_pred_node()); + left_exprs.push(left_expr.into_pred_node()); + } + + let node = PhysicalHashJoin::new_unchecked( + left, + right, + ListPred::new(left_exprs), + ListPred::new(right_exprs), + *join_type, + ); + return vec![node.into_plan_node().into()]; + } + _ => {} + } + vec![] +} + +define_impl_rule!( + HashJoinLeftMarkRule, + apply_hash_join_left_mark, + (Join(JoinType::LeftMark), left, right) +); + +fn apply_hash_join_left_mark( optimizer: &impl Optimizer, binding: ArcDfPlanNode, ) -> Vec> { diff --git a/optd-datafusion-repr/src/rules/macros.rs b/optd-datafusion-repr/src/rules/macros.rs index 412f83c9..420e2963 100644 --- a/optd-datafusion-repr/src/rules/macros.rs +++ b/optd-datafusion-repr/src/rules/macros.rs @@ -79,19 +79,12 @@ macro_rules! define_rule_discriminant { }; } -// macro_rules! define_impl_rule { -// ($name:ident, $apply:ident, $($matcher:tt)+) => { -// crate::rules::macros::define_rule_inner! { true, false, $name, $apply, $($matcher)+ } -// }; -// } - -macro_rules! define_impl_rule_discriminant { +macro_rules! define_impl_rule { ($name:ident, $apply:ident, $($matcher:tt)+) => { - crate::rules::macros::define_rule_inner! { true, true, $name, $apply, $($matcher)+ } + crate::rules::macros::define_rule_inner! { true, false, $name, $apply, $($matcher)+ } }; } pub(crate) use { - define_impl_rule_discriminant, define_matcher, define_rule, define_rule_discriminant, - define_rule_inner, + define_impl_rule, define_matcher, define_rule, define_rule_discriminant, define_rule_inner, }; From ecea664b57b15d251c4110d819fa3794c99fdc6f Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Sun, 5 Jan 2025 00:00:20 -0500 Subject: [PATCH 05/11] reuse apply_hash_join logic Signed-off-by: Yuchen Liang --- optd-datafusion-repr/src/rules/joins.rs | 224 +----------------------- 1 file changed, 4 insertions(+), 220 deletions(-) diff --git a/optd-datafusion-repr/src/rules/joins.rs b/optd-datafusion-repr/src/rules/joins.rs index a041677c..d948a85e 100644 --- a/optd-datafusion-repr/src/rules/joins.rs +++ b/optd-datafusion-repr/src/rules/joins.rs @@ -142,239 +142,23 @@ fn apply_join_assoc( define_impl_rule!( HashJoinInnerRule, - apply_hash_join_inner, + apply_hash_join, (Join(JoinType::Inner), left, right) ); -fn apply_hash_join_inner( - optimizer: &impl Optimizer, - binding: ArcDfPlanNode, -) -> Vec> { - let join = LogicalJoin::from_plan_node(binding).unwrap(); - let cond = join.cond(); - let left = join.left(); - let right = join.right(); - let join_type = join.join_type(); - match cond.typ { - DfPredType::BinOp(BinOpType::Eq) => { - let left_schema = optimizer.get_schema_of(left.clone()); - let op = BinOpPred::from_pred_node(cond.clone()).unwrap(); - let left_expr = op.left_child(); - let right_expr = op.right_child(); - let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { - return vec![]; - }; - let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { - return vec![]; - }; - let can_convert = if left_expr.index() < left_schema.len() - && right_expr.index() >= left_schema.len() - { - true - } else if right_expr.index() < left_schema.len() - && left_expr.index() >= left_schema.len() - { - (left_expr, right_expr) = (right_expr, left_expr); - true - } else { - false - }; - - if can_convert { - let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); - let node = PhysicalHashJoin::new_unchecked( - left, - right, - ListPred::new(vec![left_expr.into_pred_node()]), - ListPred::new(vec![right_expr.into_pred_node()]), - *join_type, - ); - return vec![node.into_plan_node().into()]; - } - } - DfPredType::LogOp(LogOpType::And) => { - // currently only support consecutive equal queries - let mut is_consecutive_eq = true; - for child in cond.children.clone() { - if let DfPredType::BinOp(BinOpType::Eq) = child.typ { - continue; - } else { - is_consecutive_eq = false; - break; - } - } - if !is_consecutive_eq { - return vec![]; - } - - let left_schema = optimizer.get_schema_of(left.clone()); - let mut left_exprs = vec![]; - let mut right_exprs = vec![]; - for child in &cond.children { - let bin_op = BinOpPred::from_pred_node(child.clone()).unwrap(); - let left_expr = bin_op.left_child(); - let right_expr = bin_op.right_child(); - let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { - return vec![]; - }; - let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { - return vec![]; - }; - let can_convert = if left_expr.index() < left_schema.len() - && right_expr.index() >= left_schema.len() - { - true - } else if right_expr.index() < left_schema.len() - && left_expr.index() >= left_schema.len() - { - (left_expr, right_expr) = (right_expr, left_expr); - true - } else { - false - }; - if !can_convert { - return vec![]; - } - let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); - right_exprs.push(right_expr.into_pred_node()); - left_exprs.push(left_expr.into_pred_node()); - } - - let node = PhysicalHashJoin::new_unchecked( - left, - right, - ListPred::new(left_exprs), - ListPred::new(right_exprs), - *join_type, - ); - return vec![node.into_plan_node().into()]; - } - _ => {} - } - vec![] -} - define_impl_rule!( HashJoinLeftOuterRule, - apply_hash_join_left_outer, + apply_hash_join, (Join(JoinType::LeftOuter), left, right) ); -fn apply_hash_join_left_outer( - optimizer: &impl Optimizer, - binding: ArcDfPlanNode, -) -> Vec> { - let join = LogicalJoin::from_plan_node(binding).unwrap(); - let cond = join.cond(); - let left = join.left(); - let right = join.right(); - let join_type = join.join_type(); - match cond.typ { - DfPredType::BinOp(BinOpType::Eq) => { - let left_schema = optimizer.get_schema_of(left.clone()); - let op = BinOpPred::from_pred_node(cond.clone()).unwrap(); - let left_expr = op.left_child(); - let right_expr = op.right_child(); - let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { - return vec![]; - }; - let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { - return vec![]; - }; - let can_convert = if left_expr.index() < left_schema.len() - && right_expr.index() >= left_schema.len() - { - true - } else if right_expr.index() < left_schema.len() - && left_expr.index() >= left_schema.len() - { - (left_expr, right_expr) = (right_expr, left_expr); - true - } else { - false - }; - - if can_convert { - let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); - let node = PhysicalHashJoin::new_unchecked( - left, - right, - ListPred::new(vec![left_expr.into_pred_node()]), - ListPred::new(vec![right_expr.into_pred_node()]), - *join_type, - ); - return vec![node.into_plan_node().into()]; - } - } - DfPredType::LogOp(LogOpType::And) => { - // currently only support consecutive equal queries - let mut is_consecutive_eq = true; - for child in cond.children.clone() { - if let DfPredType::BinOp(BinOpType::Eq) = child.typ { - continue; - } else { - is_consecutive_eq = false; - break; - } - } - if !is_consecutive_eq { - return vec![]; - } - - let left_schema = optimizer.get_schema_of(left.clone()); - let mut left_exprs = vec![]; - let mut right_exprs = vec![]; - for child in &cond.children { - let bin_op = BinOpPred::from_pred_node(child.clone()).unwrap(); - let left_expr = bin_op.left_child(); - let right_expr = bin_op.right_child(); - let Some(mut left_expr) = ColumnRefPred::from_pred_node(left_expr) else { - return vec![]; - }; - let Some(mut right_expr) = ColumnRefPred::from_pred_node(right_expr) else { - return vec![]; - }; - let can_convert = if left_expr.index() < left_schema.len() - && right_expr.index() >= left_schema.len() - { - true - } else if right_expr.index() < left_schema.len() - && left_expr.index() >= left_schema.len() - { - (left_expr, right_expr) = (right_expr, left_expr); - true - } else { - false - }; - if !can_convert { - return vec![]; - } - let right_expr = ColumnRefPred::new(right_expr.index() - left_schema.len()); - right_exprs.push(right_expr.into_pred_node()); - left_exprs.push(left_expr.into_pred_node()); - } - - let node = PhysicalHashJoin::new_unchecked( - left, - right, - ListPred::new(left_exprs), - ListPred::new(right_exprs), - *join_type, - ); - return vec![node.into_plan_node().into()]; - } - _ => {} - } - vec![] -} - define_impl_rule!( HashJoinLeftMarkRule, - apply_hash_join_left_mark, + apply_hash_join, (Join(JoinType::LeftMark), left, right) ); -fn apply_hash_join_left_mark( +fn apply_hash_join( optimizer: &impl Optimizer, binding: ArcDfPlanNode, ) -> Vec> { From c9f59b0cbd11d5eca0045e3c4b5f29d699b04e84 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Sun, 5 Jan 2025 00:31:11 -0500 Subject: [PATCH 06/11] refactor: use match statement in `simplify_log_expr` and remove `unreachable!` Signed-off-by: Yuchen Liang --- optd-datafusion-repr/src/rules/filter.rs | 42 ++++++++++-------------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/optd-datafusion-repr/src/rules/filter.rs b/optd-datafusion-repr/src/rules/filter.rs index 77c85419..894f0802 100644 --- a/optd-datafusion-repr/src/rules/filter.rs +++ b/optd-datafusion-repr/src/rules/filter.rs @@ -38,41 +38,35 @@ pub(crate) fn simplify_log_expr(log_expr: ArcDfPredNode, changed: &mut bool) -> if let DfPredType::Constant(ConstantType::Bool) = new_child.typ { let data = ConstantPred::from_pred_node(new_child).unwrap().value(); *changed = true; - // TrueExpr - if data.as_bool() { - if op == LogOpType::And { - // skip True in And - continue; - } - if op == LogOpType::Or { + + match (data.as_bool(), op) { + (true, LogOpType::Or) => { // replace whole exprList with True return ConstantPred::bool(true).into_pred_node(); } - unreachable!("no other type in logOp"); - } - // FalseExpr - if op == LogOpType::And { - // replace whole exprList with False - return ConstantPred::bool(false).into_pred_node(); - } - if op == LogOpType::Or { - // skip False in Or - continue; + (false, LogOpType::And) => { + // replace whole exprList with False + return ConstantPred::bool(false).into_pred_node(); + } + _ => { + // skip True in `And`, and False in `Or` + continue; + } } - unreachable!("no other type in logOp"); } else if !new_children_set.contains(&new_child) { new_children_set.insert(new_child.clone()); new_children.push(new_child); } } if new_children.is_empty() { - if op == LogOpType::And { - return ConstantPred::bool(true).into_pred_node(); - } - if op == LogOpType::Or { - return ConstantPred::bool(false).into_pred_node(); + match op { + LogOpType::And => { + return ConstantPred::bool(true).into_pred_node(); + } + LogOpType::Or => { + return ConstantPred::bool(false).into_pred_node(); + } } - unreachable!("no other type in logOp"); } if new_children.len() == 1 { *changed = true; From dbc99a058b25b85e865b2fc2946d8ebd0072d74e Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 6 Jan 2025 12:37:03 -0500 Subject: [PATCH 07/11] Add JoinSplitFilter rules Signed-off-by: Yuchen Liang --- .../src/rules/filter_pushdown.rs | 129 +++++++++++++++++- 1 file changed, 128 insertions(+), 1 deletion(-) diff --git a/optd-datafusion-repr/src/rules/filter_pushdown.rs b/optd-datafusion-repr/src/rules/filter_pushdown.rs index dd46734b..1abd7a6d 100644 --- a/optd-datafusion-repr/src/rules/filter_pushdown.rs +++ b/optd-datafusion-repr/src/rules/filter_pushdown.rs @@ -24,7 +24,7 @@ use optd_core::optimizer::Optimizer; use optd_core::rules::{Rule, RuleMatcher}; use super::filter::simplify_log_expr; -use super::macros::define_rule; +use super::macros::{define_rule, define_rule_discriminant}; use crate::plan_nodes::{ ArcDfPlanNode, ArcDfPredNode, ColumnRefPred, DfNodeType, DfPredType, DfReprPlanNode, DfReprPredNode, JoinType, ListPred, LogOpPred, LogOpType, LogicalAgg, LogicalFilter, @@ -160,6 +160,87 @@ fn apply_filter_merge( vec![new_filter.into_plan_node().into()] } +// Rule to split predicates in a join condition into those that can be pushed down as filters. +define_rule!( + InnerJoinSplitFilterRule, + apply_join_split_filter, + (Join(JoinType::Inner), child_a, child_b) +); + +define_rule!( + LeftOuterJoinSplitFilterRule, + apply_join_split_filter, + (Join(JoinType::LeftOuter), child_a, child_b) +); + +fn apply_join_split_filter( + optimizer: &impl Optimizer, + binding: ArcDfPlanNode, +) -> Vec> { + println!("Applying JoinSplitFilterRule"); + let join = LogicalJoin::from_plan_node(binding).unwrap(); + let left_child = join.left(); + let right_child = join.right(); + let join_cond = join.cond(); + let join_typ = join.join_type(); + + let left_schema_size = optimizer.get_schema_of(left_child.clone()).len(); + let right_schema_size = optimizer.get_schema_of(right_child.clone()).len(); + + // Conditions that only involve the left relation. + let mut left_conds = vec![]; + // Conditions that only involve the right relation. + let mut right_conds = vec![]; + // Conditions that involve both relations. + let mut keep_conds = vec![]; + + let categorization_fn = |expr: ArcDfPredNode, children: &[ArcDfPredNode]| { + let location = determine_join_cond_dep(children, left_schema_size, right_schema_size); + match location { + JoinCondDependency::Left => left_conds.push(expr), + JoinCondDependency::Right => right_conds.push( + expr.rewrite_column_refs(|idx| { + Some(LogicalJoin::map_through_join( + idx, + left_schema_size, + right_schema_size, + )) + }) + .unwrap(), + ), + JoinCondDependency::Both => keep_conds.push(expr), + JoinCondDependency::None => { + unreachable!("join condition should always involve at least one relation"); + } + } + }; + categorize_conds(categorization_fn, join_cond); + + let new_left = if !left_conds.is_empty() { + let new_filter_node = + LogicalFilter::new_unchecked(left_child, and_expr_list_to_expr(left_conds)); + PlanNodeOrGroup::PlanNode(new_filter_node.into_plan_node()) + } else { + left_child + }; + + let new_right = if !right_conds.is_empty() { + let new_filter_node = + LogicalFilter::new_unchecked(right_child, and_expr_list_to_expr(right_conds)); + PlanNodeOrGroup::PlanNode(new_filter_node.into_plan_node()) + } else { + right_child + }; + + let new_join = LogicalJoin::new_unchecked( + new_left, + new_right, + and_expr_list_to_expr(keep_conds), + *join_typ, + ); + + vec![new_join.into_plan_node().into()] +} define_rule!( FilterInnerJoinTransposeRule, apply_filter_inner_join_transpose, @@ -442,6 +523,52 @@ mod tests { assert_eq!(col_4.value().as_i32(), 1); } + #[test] + fn join_split_filter() { + let mut test_optimizer = new_test_optimizer(Arc::new(LeftOuterJoinSplitFilterRule::new())); + + let scan1 = LogicalScan::new("customer".into()); + + let scan2 = LogicalScan::new("orders".into()); + + let join_cond = LogOpPred::new( + LogOpType::And, + vec![ + BinOpPred::new( + // This one should be pushed to the left child + ColumnRefPred::new(0).into_pred_node(), + ConstantPred::int32(5).into_pred_node(), + BinOpType::Eq, + ) + .into_pred_node(), + BinOpPred::new( + // This one should be pushed to the right child + ColumnRefPred::new(11).into_pred_node(), + ConstantPred::int32(6).into_pred_node(), + BinOpType::Eq, + ) + .into_pred_node(), + BinOpPred::new( + // This one stay in join condition + ColumnRefPred::new(2).into_pred_node(), + ColumnRefPred::new(8).into_pred_node(), + BinOpType::Eq, + ) + .into_pred_node(), + ], + ); + + let join = LogicalJoin::new( + scan1.into_plan_node(), + scan2.into_plan_node(), + join_cond.into_pred_node(), + super::JoinType::LeftOuter, + ); + + let plan = test_optimizer.optimize(join.into_plan_node()).unwrap(); + println!("{}", plan.explain_to_string(None)); + } + #[test] fn push_past_join_conjunction() { // Test pushing a complex filter past a join, where one clause can From 0e31d4b31b7ace1d620423cd91c5abe6d70e6870 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 6 Jan 2025 12:44:26 -0500 Subject: [PATCH 08/11] Add left_outer_join_split_filter_rule We eliminate another nested loop join in TPC-H Q13 Signed-off-by: Yuchen Liang --- optd-datafusion-repr/src/lib.rs | 2 ++ .../src/rules/filter_pushdown.rs | 3 +- .../tests/tpch/q13.planner.sql | 11 ++---- .../tests/utils/memo_dump.planner.sql | 34 +++++++++---------- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index 1667b942..d335ca6b 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -105,6 +105,8 @@ impl DatafusionOptimizer { rule_wrappers.push(Arc::new(rules::FilterInnerJoinTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterSortTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterAggTransposeRule::new())); + // rule_wrappers.push(Arc::new(rules::InnerJoinSplitFilterRule::new())); + rule_wrappers.push(Arc::new(rules::LeftOuterJoinSplitFilterRule::new())); rule_wrappers.push(Arc::new(rules::HashJoinInnerRule::new())); rule_wrappers.push(Arc::new(rules::HashJoinLeftOuterRule::new())); rule_wrappers.push(Arc::new(rules::HashJoinLeftMarkRule::new())); diff --git a/optd-datafusion-repr/src/rules/filter_pushdown.rs b/optd-datafusion-repr/src/rules/filter_pushdown.rs index 1abd7a6d..835552cd 100644 --- a/optd-datafusion-repr/src/rules/filter_pushdown.rs +++ b/optd-datafusion-repr/src/rules/filter_pushdown.rs @@ -24,7 +24,7 @@ use optd_core::optimizer::Optimizer; use optd_core::rules::{Rule, RuleMatcher}; use super::filter::simplify_log_expr; -use super::macros::{define_rule, define_rule_discriminant}; +use super::macros::define_rule; use crate::plan_nodes::{ ArcDfPlanNode, ArcDfPredNode, ColumnRefPred, DfNodeType, DfPredType, DfReprPlanNode, DfReprPredNode, JoinType, ListPred, LogOpPred, LogOpType, LogicalAgg, LogicalFilter, @@ -177,7 +177,6 @@ fn apply_join_split_filter( optimizer: &impl Optimizer, binding: ArcDfPlanNode, ) -> Vec> { - println!("Applying JoinSplitFilterRule"); let join = LogicalJoin::from_plan_node(binding).unwrap(); let left_child = join.left(); let right_child = join.right(); diff --git a/optd-sqlplannertest/tests/tpch/q13.planner.sql b/optd-sqlplannertest/tests/tpch/q13.planner.sql index 5451f497..e1765060 100644 --- a/optd-sqlplannertest/tests/tpch/q13.planner.sql +++ b/optd-sqlplannertest/tests/tpch/q13.planner.sql @@ -61,14 +61,9 @@ PhysicalSort ├── aggrs:Agg(Count) │ └── [ #8 ] ├── groups: [ #0 ] - └── PhysicalNestedLoopJoin - ├── join_type: LeftOuter - ├── cond:And - │ ├── Eq - │ │ ├── #0 - │ │ └── #9 - │ └── Like { expr: #16, pattern: "%special%requests%", negated: true, case_insensitive: false } + └── PhysicalHashJoin { join_type: LeftOuter, left_keys: [ #0 ], right_keys: [ #1 ] } ├── PhysicalScan { table: customer } - └── PhysicalScan { table: orders } + └── PhysicalFilter { cond: Like { expr: #8, pattern: "%special%requests%", negated: true, case_insensitive: false } } + └── PhysicalScan { table: orders } */ diff --git a/optd-sqlplannertest/tests/utils/memo_dump.planner.sql b/optd-sqlplannertest/tests/utils/memo_dump.planner.sql index 2d17f680..8434c0c4 100644 --- a/optd-sqlplannertest/tests/utils/memo_dump.planner.sql +++ b/optd-sqlplannertest/tests/utils/memo_dump.planner.sql @@ -38,13 +38,13 @@ group_id=!6 winner=21 weighted_cost=1003000 cost={compute=1001000,io=2000} stat= P4=(Constant(Bool) true) P32=(List (ColumnRef 2(u64)) (ColumnRef 3(u64)) (ColumnRef 0(u64)) (ColumnRef 1(u64))) P37=(List (ColumnRef 0(u64)) (ColumnRef 1(u64)) (ColumnRef 2(u64)) (ColumnRef 3(u64))) - step=1/1 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=5 rule_id=19 + step=1/1 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=5 rule_id=22 step=1/5 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=21 rule_id=2 step=1/8 decide_winner group_id=!6 proposed_winner_expr=21 children_winner_exprs=[23,23] total_weighted_cost=1003000 - step=2/9 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=42 rule_id=13 - step=2/10 apply_rule group_id=!6 applied_expr_id=42 produced_expr_id=49 rule_id=17 - step=2/11 apply_rule group_id=!6 applied_expr_id=49 produced_expr_id=42 rule_id=17 - step=2/12 apply_rule group_id=!6 applied_expr_id=49 produced_expr_id=49 rule_id=17 + step=2/9 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=42 rule_id=16 + step=2/10 apply_rule group_id=!6 applied_expr_id=42 produced_expr_id=49 rule_id=20 + step=2/11 apply_rule group_id=!6 applied_expr_id=49 produced_expr_id=42 rule_id=20 + step=2/12 apply_rule group_id=!6 applied_expr_id=49 produced_expr_id=49 rule_id=20 group_id=!12 winner=17 weighted_cost=11908.75477931522 cost={compute=9908.75477931522,io=2000} stat={row_cnt=1000} | (PhysicalSort !31 P10) schema=[t1v1:Int32, t1v2:Int32, t1v1:Int32, t1v2:Int32] column_ref=[t1.0, t1.1, t1.0, t1.1] @@ -79,26 +79,26 @@ group_id=!31 winner=28 weighted_cost=5000 cost={compute=3000,io=2000} stat={row_ step=1/4 apply_rule group_id=!9 applied_expr_id=8 produced_expr_id=19 rule_id=3 step=1/9 decide_winner group_id=!9 proposed_winner_expr=19 children_winner_exprs=[21] total_weighted_cost=1033000 step=1/10 apply_rule group_id=!9 applied_expr_id=15 produced_expr_id=25 rule_id=2 - step=1/11 apply_rule group_id=!9 applied_expr_id=15 produced_expr_id=28 rule_id=12 + step=1/11 apply_rule group_id=!9 applied_expr_id=15 produced_expr_id=28 rule_id=13 step=1/12 decide_winner group_id=!9 proposed_winner_expr=28 children_winner_exprs=[23,23] total_weighted_cost=5000 step=2/2 decide_winner group_id=!9 proposed_winner_expr=28 children_winner_exprs=[23,23] total_weighted_cost=5000 - step=2/3 apply_rule group_id=!9 applied_expr_id=15 produced_expr_id=33 rule_id=13 - step=2/4 apply_rule group_id=!31 applied_expr_id=30 produced_expr_id=36 rule_id=13 - step=2/5 apply_rule group_id=!31 applied_expr_id=36 produced_expr_id=38 rule_id=17 - step=2/6 apply_rule group_id=!31 applied_expr_id=38 produced_expr_id=36 rule_id=17 - step=2/7 apply_rule group_id=!31 applied_expr_id=38 produced_expr_id=38 rule_id=17 - step=2/8 apply_rule group_id=!31 applied_expr_id=36 produced_expr_id=45 rule_id=21 + step=2/3 apply_rule group_id=!9 applied_expr_id=15 produced_expr_id=33 rule_id=16 + step=2/4 apply_rule group_id=!31 applied_expr_id=30 produced_expr_id=36 rule_id=16 + step=2/5 apply_rule group_id=!31 applied_expr_id=36 produced_expr_id=38 rule_id=20 + step=2/6 apply_rule group_id=!31 applied_expr_id=38 produced_expr_id=36 rule_id=20 + step=2/7 apply_rule group_id=!31 applied_expr_id=38 produced_expr_id=38 rule_id=20 + step=2/8 apply_rule group_id=!31 applied_expr_id=36 produced_expr_id=45 rule_id=24 step=2/13 apply_rule group_id=!31 applied_expr_id=45 produced_expr_id=36 rule_id=8 step=2/14 apply_rule group_id=!31 applied_expr_id=45 produced_expr_id=38 rule_id=8 step=2/15 apply_rule group_id=!31 applied_expr_id=45 produced_expr_id=30 rule_id=9 step=2/16 apply_rule group_id=!9 applied_expr_id=33 produced_expr_id=58 rule_id=1 step=2/17 apply_rule group_id=!31 applied_expr_id=30 produced_expr_id=60 rule_id=2 - step=2/18 apply_rule group_id=!31 applied_expr_id=30 produced_expr_id=28 rule_id=12 + step=2/18 apply_rule group_id=!31 applied_expr_id=30 produced_expr_id=28 rule_id=13 step=2/19 decide_winner group_id=!31 proposed_winner_expr=28 children_winner_exprs=[23,23] total_weighted_cost=5000 - step=2/20 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=38 rule_id=17 - step=2/21 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=33 rule_id=17 - step=2/22 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=45 rule_id=21 - step=2/23 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=8 rule_id=21 + step=2/20 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=38 rule_id=20 + step=2/21 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=33 rule_id=20 + step=2/22 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=45 rule_id=24 + step=2/23 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=8 rule_id=24 step=2/24 apply_rule group_id=!31 applied_expr_id=36 produced_expr_id=58 rule_id=1 step=2/25 apply_rule group_id=!31 applied_expr_id=38 produced_expr_id=71 rule_id=1 step=2/26 apply_rule group_id=!31 applied_expr_id=45 produced_expr_id=73 rule_id=3 From 5cd56d71ce0394789599a358b85d9d644db0fd17 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 6 Jan 2025 12:58:58 -0500 Subject: [PATCH 09/11] add JoinInnerSplitFilterRule Signed-off-by: Yuchen Liang --- optd-datafusion-repr/src/lib.rs | 4 +- .../src/rules/filter_pushdown.rs | 16 +- .../tests/utils/memo_dump.planner.sql | 141 +++++++++--------- 3 files changed, 83 insertions(+), 78 deletions(-) diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index d335ca6b..afda4dc8 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -105,8 +105,8 @@ impl DatafusionOptimizer { rule_wrappers.push(Arc::new(rules::FilterInnerJoinTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterSortTransposeRule::new())); rule_wrappers.push(Arc::new(rules::FilterAggTransposeRule::new())); - // rule_wrappers.push(Arc::new(rules::InnerJoinSplitFilterRule::new())); - rule_wrappers.push(Arc::new(rules::LeftOuterJoinSplitFilterRule::new())); + rule_wrappers.push(Arc::new(rules::JoinInnerSplitFilterRule::new())); + rule_wrappers.push(Arc::new(rules::JoinLeftOuterSplitFilterRule::new())); rule_wrappers.push(Arc::new(rules::HashJoinInnerRule::new())); rule_wrappers.push(Arc::new(rules::HashJoinLeftOuterRule::new())); rule_wrappers.push(Arc::new(rules::HashJoinLeftMarkRule::new())); diff --git a/optd-datafusion-repr/src/rules/filter_pushdown.rs b/optd-datafusion-repr/src/rules/filter_pushdown.rs index 835552cd..fc4f5fa4 100644 --- a/optd-datafusion-repr/src/rules/filter_pushdown.rs +++ b/optd-datafusion-repr/src/rules/filter_pushdown.rs @@ -17,6 +17,7 @@ use core::panic; use std::collections::HashSet; +use std::ops::Not; use std::vec; use optd_core::nodes::PlanNodeOrGroup; @@ -162,13 +163,13 @@ fn apply_filter_merge( // Rule to split predicates in a join condition into those that can be pushed down as filters. define_rule!( - InnerJoinSplitFilterRule, + JoinInnerSplitFilterRule, apply_join_split_filter, (Join(JoinType::Inner), child_a, child_b) ); define_rule!( - LeftOuterJoinSplitFilterRule, + JoinLeftOuterSplitFilterRule, apply_join_split_filter, (Join(JoinType::LeftOuter), child_a, child_b) ); @@ -177,7 +178,7 @@ fn apply_join_split_filter( optimizer: &impl Optimizer, binding: ArcDfPlanNode, ) -> Vec> { - let join = LogicalJoin::from_plan_node(binding).unwrap(); + let join = LogicalJoin::from_plan_node(binding.clone()).unwrap(); let left_child = join.left(); let right_child = join.right(); let join_cond = join.cond(); @@ -207,9 +208,10 @@ fn apply_join_split_filter( }) .unwrap(), ), - JoinCondDependency::Both => keep_conds.push(expr), - JoinCondDependency::None => { - unreachable!("join condition should always involve at least one relation"); + JoinCondDependency::Both | JoinCondDependency::None => { + // JoinCondDependency::None could happy if there are no column refs in the predicate. + // e.g. true for CrossJoin. + keep_conds.push(expr); } } }; @@ -524,7 +526,7 @@ mod tests { #[test] fn join_split_filter() { - let mut test_optimizer = new_test_optimizer(Arc::new(LeftOuterJoinSplitFilterRule::new())); + let mut test_optimizer = new_test_optimizer(Arc::new(JoinLeftOuterSplitFilterRule::new())); let scan1 = LogicalScan::new("customer".into()); diff --git a/optd-sqlplannertest/tests/utils/memo_dump.planner.sql b/optd-sqlplannertest/tests/utils/memo_dump.planner.sql index 8434c0c4..fdb05e68 100644 --- a/optd-sqlplannertest/tests/utils/memo_dump.planner.sql +++ b/optd-sqlplannertest/tests/utils/memo_dump.planner.sql @@ -19,89 +19,92 @@ PhysicalSort └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } ├── PhysicalScan { table: t1 } └── PhysicalScan { table: t1 } -group_id=!2 winner=23 weighted_cost=1000 cost={compute=0,io=1000} stat={row_cnt=1000} | (PhysicalScan P0) +group_id=!2 winner=25 weighted_cost=1000 cost={compute=0,io=1000} stat={row_cnt=1000} | (PhysicalScan P0) schema=[t1v1:Int32, t1v2:Int32] column_ref=[t1.0, t1.1] expr_id=1 | (Scan P0) - expr_id=23 | (PhysicalScan P0) + expr_id=25 | (PhysicalScan P0) P0=(Constant(Utf8String) "t1") - step=1/6 apply_rule group_id=!2 applied_expr_id=1 produced_expr_id=23 rule_id=0 - step=1/7 decide_winner group_id=!2 proposed_winner_expr=23 children_winner_exprs=[] total_weighted_cost=1000 - step=2/1 decide_winner group_id=!2 proposed_winner_expr=23 children_winner_exprs=[] total_weighted_cost=1000 -group_id=!6 winner=21 weighted_cost=1003000 cost={compute=1001000,io=2000} stat={row_cnt=10000} | (PhysicalNestedLoopJoin(Inner) !2 !2 P4) + step=1/8 apply_rule group_id=!2 applied_expr_id=1 produced_expr_id=25 rule_id=0 + step=1/9 decide_winner group_id=!2 proposed_winner_expr=25 children_winner_exprs=[] total_weighted_cost=1000 + step=2/1 decide_winner group_id=!2 proposed_winner_expr=25 children_winner_exprs=[] total_weighted_cost=1000 +group_id=!6 winner=23 weighted_cost=1003000 cost={compute=1001000,io=2000} stat={row_cnt=10000} | (PhysicalNestedLoopJoin(Inner) !2 !2 P4) schema=[t1v1:Int32, t1v2:Int32, t1v1:Int32, t1v2:Int32] column_ref=[t1.0, t1.1, t1.0, t1.1] expr_id=5 | (Join(Inner) !2 !2 P4) - expr_id=21 | (PhysicalNestedLoopJoin(Inner) !2 !2 P4) - expr_id=42 | (Projection !6 P32) - expr_id=49 | (Projection !6 P37) + expr_id=23 | (PhysicalNestedLoopJoin(Inner) !2 !2 P4) + expr_id=45 | (Projection !6 P34) + expr_id=52 | (Projection !6 P40) P4=(Constant(Bool) true) - P32=(List (ColumnRef 2(u64)) (ColumnRef 3(u64)) (ColumnRef 0(u64)) (ColumnRef 1(u64))) - P37=(List (ColumnRef 0(u64)) (ColumnRef 1(u64)) (ColumnRef 2(u64)) (ColumnRef 3(u64))) - step=1/1 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=5 rule_id=22 - step=1/5 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=21 rule_id=2 - step=1/8 decide_winner group_id=!6 proposed_winner_expr=21 children_winner_exprs=[23,23] total_weighted_cost=1003000 - step=2/9 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=42 rule_id=16 - step=2/10 apply_rule group_id=!6 applied_expr_id=42 produced_expr_id=49 rule_id=20 - step=2/11 apply_rule group_id=!6 applied_expr_id=49 produced_expr_id=42 rule_id=20 - step=2/12 apply_rule group_id=!6 applied_expr_id=49 produced_expr_id=49 rule_id=20 -group_id=!12 winner=17 weighted_cost=11908.75477931522 cost={compute=9908.75477931522,io=2000} stat={row_cnt=1000} | (PhysicalSort !31 P10) + P34=(List (ColumnRef 2(u64)) (ColumnRef 3(u64)) (ColumnRef 0(u64)) (ColumnRef 1(u64))) + P40=(List (ColumnRef 0(u64)) (ColumnRef 1(u64)) (ColumnRef 2(u64)) (ColumnRef 3(u64))) + step=1/1 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=5 rule_id=12 + step=1/2 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=5 rule_id=23 + step=1/7 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=23 rule_id=2 + step=1/10 decide_winner group_id=!6 proposed_winner_expr=23 children_winner_exprs=[25,25] total_weighted_cost=1003000 + step=2/10 apply_rule group_id=!6 applied_expr_id=5 produced_expr_id=45 rule_id=17 + step=2/11 apply_rule group_id=!6 applied_expr_id=45 produced_expr_id=52 rule_id=21 + step=2/12 apply_rule group_id=!6 applied_expr_id=52 produced_expr_id=45 rule_id=21 + step=2/13 apply_rule group_id=!6 applied_expr_id=52 produced_expr_id=52 rule_id=21 +group_id=!12 winner=19 weighted_cost=11908.75477931522 cost={compute=9908.75477931522,io=2000} stat={row_cnt=1000} | (PhysicalSort !33 P10) schema=[t1v1:Int32, t1v2:Int32, t1v1:Int32, t1v2:Int32] column_ref=[t1.0, t1.1, t1.0, t1.1] - expr_id=11 | (Sort !31 P10) - expr_id=17 | (PhysicalSort !31 P10) + expr_id=11 | (Sort !33 P10) + expr_id=19 | (PhysicalSort !33 P10) P10=(List (SortOrder(Asc) (ColumnRef 0(u64)))) - step=1/3 apply_rule group_id=!12 applied_expr_id=11 produced_expr_id=17 rule_id=4 - step=1/13 decide_winner group_id=!12 proposed_winner_expr=17 children_winner_exprs=[28] total_weighted_cost=11908.75477931522 - step=2/28 decide_winner group_id=!12 proposed_winner_expr=17 children_winner_exprs=[28] total_weighted_cost=11908.75477931522 -group_id=!31 winner=28 weighted_cost=5000 cost={compute=3000,io=2000} stat={row_cnt=1000} | (PhysicalHashJoin(Inner) !2 !2 P26 P26) + step=1/5 apply_rule group_id=!12 applied_expr_id=11 produced_expr_id=19 rule_id=4 + step=1/15 decide_winner group_id=!12 proposed_winner_expr=19 children_winner_exprs=[30] total_weighted_cost=11908.75477931522 + step=2/29 decide_winner group_id=!12 proposed_winner_expr=19 children_winner_exprs=[30] total_weighted_cost=11908.75477931522 +group_id=!33 winner=30 weighted_cost=5000 cost={compute=3000,io=2000} stat={row_cnt=1000} | (PhysicalHashJoin(Inner) !2 !2 P28 P28) schema=[t1v1:Int32, t1v2:Int32, t1v1:Int32, t1v2:Int32] column_ref=[t1.0, t1.1, t1.0, t1.1] expr_id=8 | (Filter !6 P7) - expr_id=15 | (Join(Inner) !2 !2 P7) - expr_id=19 | (PhysicalFilter !6 P7) - expr_id=25 | (PhysicalNestedLoopJoin(Inner) !2 !2 P7) - expr_id=28 | (PhysicalHashJoin(Inner) !2 !2 P26 P26) - expr_id=30 | (Join(Inner) !2 !2 P29) - expr_id=33 | (Projection !31 P32) - expr_id=38 | (Projection !31 P37) - expr_id=45 | (Filter !6 P29) - expr_id=58 | (PhysicalProjection !31 P32) - expr_id=60 | (PhysicalNestedLoopJoin(Inner) !2 !2 P29) - expr_id=71 | (PhysicalProjection !31 P37) - expr_id=73 | (PhysicalFilter !6 P29) + expr_id=16 | (Join(Inner) !2 !2 P7) + expr_id=21 | (PhysicalFilter !6 P7) + expr_id=27 | (PhysicalNestedLoopJoin(Inner) !2 !2 P7) + expr_id=30 | (PhysicalHashJoin(Inner) !2 !2 P28 P28) + expr_id=32 | (Join(Inner) !2 !2 P31) + expr_id=35 | (Projection !33 P34) + expr_id=41 | (Projection !33 P40) + expr_id=48 | (Filter !6 P31) + expr_id=61 | (PhysicalProjection !33 P34) + expr_id=63 | (PhysicalNestedLoopJoin(Inner) !2 !2 P31) + expr_id=74 | (PhysicalProjection !33 P40) + expr_id=76 | (PhysicalFilter !6 P31) P7=(BinOp(Eq) (ColumnRef 0(u64)) (ColumnRef 2(u64))) - P26=(List (ColumnRef 0(u64))) - P29=(BinOp(Eq) (ColumnRef 2(u64)) (ColumnRef 0(u64))) - P32=(List (ColumnRef 2(u64)) (ColumnRef 3(u64)) (ColumnRef 0(u64)) (ColumnRef 1(u64))) - P37=(List (ColumnRef 0(u64)) (ColumnRef 1(u64)) (ColumnRef 2(u64)) (ColumnRef 3(u64))) - step=1/2 apply_rule group_id=!9 applied_expr_id=8 produced_expr_id=15 rule_id=9 - step=1/4 apply_rule group_id=!9 applied_expr_id=8 produced_expr_id=19 rule_id=3 - step=1/9 decide_winner group_id=!9 proposed_winner_expr=19 children_winner_exprs=[21] total_weighted_cost=1033000 - step=1/10 apply_rule group_id=!9 applied_expr_id=15 produced_expr_id=25 rule_id=2 - step=1/11 apply_rule group_id=!9 applied_expr_id=15 produced_expr_id=28 rule_id=13 - step=1/12 decide_winner group_id=!9 proposed_winner_expr=28 children_winner_exprs=[23,23] total_weighted_cost=5000 - step=2/2 decide_winner group_id=!9 proposed_winner_expr=28 children_winner_exprs=[23,23] total_weighted_cost=5000 - step=2/3 apply_rule group_id=!9 applied_expr_id=15 produced_expr_id=33 rule_id=16 - step=2/4 apply_rule group_id=!31 applied_expr_id=30 produced_expr_id=36 rule_id=16 - step=2/5 apply_rule group_id=!31 applied_expr_id=36 produced_expr_id=38 rule_id=20 - step=2/6 apply_rule group_id=!31 applied_expr_id=38 produced_expr_id=36 rule_id=20 - step=2/7 apply_rule group_id=!31 applied_expr_id=38 produced_expr_id=38 rule_id=20 - step=2/8 apply_rule group_id=!31 applied_expr_id=36 produced_expr_id=45 rule_id=24 - step=2/13 apply_rule group_id=!31 applied_expr_id=45 produced_expr_id=36 rule_id=8 - step=2/14 apply_rule group_id=!31 applied_expr_id=45 produced_expr_id=38 rule_id=8 - step=2/15 apply_rule group_id=!31 applied_expr_id=45 produced_expr_id=30 rule_id=9 - step=2/16 apply_rule group_id=!9 applied_expr_id=33 produced_expr_id=58 rule_id=1 - step=2/17 apply_rule group_id=!31 applied_expr_id=30 produced_expr_id=60 rule_id=2 - step=2/18 apply_rule group_id=!31 applied_expr_id=30 produced_expr_id=28 rule_id=13 - step=2/19 decide_winner group_id=!31 proposed_winner_expr=28 children_winner_exprs=[23,23] total_weighted_cost=5000 - step=2/20 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=38 rule_id=20 - step=2/21 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=33 rule_id=20 - step=2/22 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=45 rule_id=24 - step=2/23 apply_rule group_id=!31 applied_expr_id=33 produced_expr_id=8 rule_id=24 - step=2/24 apply_rule group_id=!31 applied_expr_id=36 produced_expr_id=58 rule_id=1 - step=2/25 apply_rule group_id=!31 applied_expr_id=38 produced_expr_id=71 rule_id=1 - step=2/26 apply_rule group_id=!31 applied_expr_id=45 produced_expr_id=73 rule_id=3 - step=2/27 decide_winner group_id=!9 proposed_winner_expr=58 children_winner_exprs=[28] total_weighted_cost=10000 + P28=(List (ColumnRef 0(u64))) + P31=(BinOp(Eq) (ColumnRef 2(u64)) (ColumnRef 0(u64))) + P34=(List (ColumnRef 2(u64)) (ColumnRef 3(u64)) (ColumnRef 0(u64)) (ColumnRef 1(u64))) + P40=(List (ColumnRef 0(u64)) (ColumnRef 1(u64)) (ColumnRef 2(u64)) (ColumnRef 3(u64))) + step=1/3 apply_rule group_id=!9 applied_expr_id=8 produced_expr_id=16 rule_id=9 + step=1/4 apply_rule group_id=!9 applied_expr_id=16 produced_expr_id=16 rule_id=12 + step=1/6 apply_rule group_id=!9 applied_expr_id=8 produced_expr_id=21 rule_id=3 + step=1/11 decide_winner group_id=!9 proposed_winner_expr=21 children_winner_exprs=[23] total_weighted_cost=1033000 + step=1/12 apply_rule group_id=!9 applied_expr_id=16 produced_expr_id=27 rule_id=2 + step=1/13 apply_rule group_id=!9 applied_expr_id=16 produced_expr_id=30 rule_id=14 + step=1/14 decide_winner group_id=!9 proposed_winner_expr=30 children_winner_exprs=[25,25] total_weighted_cost=5000 + step=2/2 decide_winner group_id=!9 proposed_winner_expr=30 children_winner_exprs=[25,25] total_weighted_cost=5000 + step=2/3 apply_rule group_id=!9 applied_expr_id=16 produced_expr_id=35 rule_id=17 + step=2/4 apply_rule group_id=!33 applied_expr_id=32 produced_expr_id=32 rule_id=12 + step=2/5 apply_rule group_id=!33 applied_expr_id=32 produced_expr_id=39 rule_id=17 + step=2/6 apply_rule group_id=!33 applied_expr_id=39 produced_expr_id=41 rule_id=21 + step=2/7 apply_rule group_id=!33 applied_expr_id=41 produced_expr_id=39 rule_id=21 + step=2/8 apply_rule group_id=!33 applied_expr_id=41 produced_expr_id=41 rule_id=21 + step=2/9 apply_rule group_id=!33 applied_expr_id=39 produced_expr_id=48 rule_id=25 + step=2/14 apply_rule group_id=!33 applied_expr_id=48 produced_expr_id=39 rule_id=8 + step=2/15 apply_rule group_id=!33 applied_expr_id=48 produced_expr_id=41 rule_id=8 + step=2/16 apply_rule group_id=!33 applied_expr_id=48 produced_expr_id=32 rule_id=9 + step=2/17 apply_rule group_id=!9 applied_expr_id=35 produced_expr_id=61 rule_id=1 + step=2/18 apply_rule group_id=!33 applied_expr_id=32 produced_expr_id=63 rule_id=2 + step=2/19 apply_rule group_id=!33 applied_expr_id=32 produced_expr_id=30 rule_id=14 + step=2/20 decide_winner group_id=!33 proposed_winner_expr=30 children_winner_exprs=[25,25] total_weighted_cost=5000 + step=2/21 apply_rule group_id=!33 applied_expr_id=35 produced_expr_id=41 rule_id=21 + step=2/22 apply_rule group_id=!33 applied_expr_id=35 produced_expr_id=35 rule_id=21 + step=2/23 apply_rule group_id=!33 applied_expr_id=35 produced_expr_id=48 rule_id=25 + step=2/24 apply_rule group_id=!33 applied_expr_id=35 produced_expr_id=8 rule_id=25 + step=2/25 apply_rule group_id=!33 applied_expr_id=39 produced_expr_id=61 rule_id=1 + step=2/26 apply_rule group_id=!33 applied_expr_id=41 produced_expr_id=74 rule_id=1 + step=2/27 apply_rule group_id=!33 applied_expr_id=48 produced_expr_id=76 rule_id=3 + step=2/28 decide_winner group_id=!9 proposed_winner_expr=61 children_winner_exprs=[30] total_weighted_cost=10000 */ From dede91b7f81a9e702902a2ed9ca0d3ac68e4ad39 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 6 Jan 2025 13:25:05 -0500 Subject: [PATCH 10/11] unit test for the join-split-filter rule Signed-off-by: Yuchen Liang --- .../src/rules/filter_pushdown.rs | 50 +++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/optd-datafusion-repr/src/rules/filter_pushdown.rs b/optd-datafusion-repr/src/rules/filter_pushdown.rs index fc4f5fa4..b7fd41f0 100644 --- a/optd-datafusion-repr/src/rules/filter_pushdown.rs +++ b/optd-datafusion-repr/src/rules/filter_pushdown.rs @@ -17,7 +17,6 @@ use core::panic; use std::collections::HashSet; -use std::ops::Not; use std::vec; use optd_core::nodes::PlanNodeOrGroup; @@ -550,12 +549,14 @@ mod tests { ) .into_pred_node(), BinOpPred::new( - // This one stay in join condition + // This one stays in the join condition. ColumnRefPred::new(2).into_pred_node(), ColumnRefPred::new(8).into_pred_node(), BinOpType::Eq, ) .into_pred_node(), + // This one stays in the join condition. + ConstantPred::bool(true).into_pred_node(), ], ); @@ -567,7 +568,50 @@ mod tests { ); let plan = test_optimizer.optimize(join.into_plan_node()).unwrap(); - println!("{}", plan.explain_to_string(None)); + let join = LogicalJoin::from_plan_node(plan.clone()).unwrap(); + + assert_eq!(join.join_type(), &JoinType::LeftOuter); + + { + // Examine join conditions. + let join_conds = LogOpPred::from_pred_node(join.cond()).unwrap(); + assert!(matches!(join_conds.op_type(), LogOpType::And)); + assert_eq!(join_conds.children().len(), 2); + let bin_op_with_both_ref = + BinOpPred::from_pred_node(join_conds.children()[0].clone()).unwrap(); + assert!(matches!(bin_op_with_both_ref.op_type(), BinOpType::Eq)); + let col_2 = ColumnRefPred::from_pred_node(bin_op_with_both_ref.left_child()).unwrap(); + let col_8 = ColumnRefPred::from_pred_node(bin_op_with_both_ref.right_child()).unwrap(); + assert_eq!(col_2.index(), 2); + assert_eq!(col_8.index(), 8); + let constant_true = + ConstantPred::from_pred_node(join_conds.children()[1].clone()).unwrap(); + assert_eq!(constant_true.value().as_bool(), true); + } + + { + // Examine left child filter + condition + let filter_left = + LogicalFilter::from_plan_node(join.left().unwrap_plan_node()).unwrap(); + let bin_op = BinOpPred::from_pred_node(filter_left.cond()).unwrap(); + assert!(matches!(bin_op.op_type(), BinOpType::Eq)); + let col = ColumnRefPred::from_pred_node(bin_op.left_child()).unwrap(); + let constant = ConstantPred::from_pred_node(bin_op.right_child()).unwrap(); + assert_eq!(col.index(), 0); + assert_eq!(constant.value().as_i32(), 5); + } + + { + // Examine right child filter + condition + let filter_right = + LogicalFilter::from_plan_node(join.right().unwrap_plan_node()).unwrap(); + let bin_op = BinOpPred::from_pred_node(filter_right.cond()).unwrap(); + assert!(matches!(bin_op.op_type(), BinOpType::Eq)); + let col = ColumnRefPred::from_pred_node(bin_op.left_child()).unwrap(); + let constant = ConstantPred::from_pred_node(bin_op.right_child()).unwrap(); + assert_eq!(col.index(), 3); + assert_eq!(constant.value().as_i32(), 6); + } } #[test] From 8f08b4b3523f95414a83328adec8eb9e006174bc Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 6 Jan 2025 13:41:55 -0500 Subject: [PATCH 11/11] fix clippy Signed-off-by: Yuchen Liang --- optd-datafusion-repr/src/rules/filter_pushdown.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/optd-datafusion-repr/src/rules/filter_pushdown.rs b/optd-datafusion-repr/src/rules/filter_pushdown.rs index b7fd41f0..b24d083a 100644 --- a/optd-datafusion-repr/src/rules/filter_pushdown.rs +++ b/optd-datafusion-repr/src/rules/filter_pushdown.rs @@ -450,6 +450,8 @@ fn apply_filter_agg_transpose( mod tests { use std::sync::Arc; + use optd_core::nodes::Value; + use super::*; use crate::plan_nodes::{BinOpPred, BinOpType, ConstantPred, LogicalScan}; use crate::testing::new_test_optimizer; @@ -586,7 +588,7 @@ mod tests { assert_eq!(col_8.index(), 8); let constant_true = ConstantPred::from_pred_node(join_conds.children()[1].clone()).unwrap(); - assert_eq!(constant_true.value().as_bool(), true); + assert_eq!(constant_true.value(), Value::Bool(true)); } {