Skip to content

Commit

Permalink
Stop copying LogicalPlan and Exprs in ReplaceDistinctWithAggregate (a…
Browse files Browse the repository at this point in the history
…pache#10460)

* patch: implement rewrite for RDWA

Signed-off-by: cailue <[email protected]>

* refactor: rewrite replace_distinct_aggregate

Signed-off-by: 蔡略 <[email protected]>

* patch: recorrect aggr_expr

Signed-off-by: 蔡略 <[email protected]>

* Update datafusion/optimizer/src/replace_distinct_aggregate.rs

---------

Signed-off-by: cailue <[email protected]>
Signed-off-by: 蔡略 <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
ClSlaid and alamb authored May 13, 2024
1 parent 9cc981b commit a2eca29
Showing 1 changed file with 49 additions and 24 deletions.
73 changes: 49 additions & 24 deletions datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
use crate::optimizer::{ApplyOrder, ApplyOrder::BottomUp};
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::{Column, Result};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Column, Result};
use datafusion_expr::expr_rewriter::normalize_cols;
use datafusion_expr::utils::expand_wildcard;
use datafusion_expr::{
aggregate_function::AggregateFunction as AggregateFunctionFunc, col,
Expand Down Expand Up @@ -66,20 +68,24 @@ impl ReplaceDistinctWithAggregate {
}

impl OptimizerRule for ReplaceDistinctWithAggregate {
fn try_optimize(
fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: &LogicalPlan,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Distinct(Distinct::All(input)) => {
let group_expr = expand_wildcard(input.schema(), input, None)?;
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
let group_expr = expand_wildcard(input.schema(), &input, None)?;
let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new(
input,
group_expr,
vec![],
)?);
Ok(Some(aggregate))
Ok(Transformed::yes(aggr_plan))
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
select_expr,
Expand All @@ -88,13 +94,15 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
input,
schema,
})) => {
let expr_cnt = on_expr.len();

// Construct the aggregation expression to be used to fetch the selected expressions.
let aggr_expr = select_expr
.iter()
.into_iter()
.map(|e| {
Expr::AggregateFunction(AggregateFunction::new(
AggregateFunctionFunc::FirstValue,
vec![e.clone()],
vec![e],
false,
None,
sort_expr.clone(),
Expand All @@ -103,45 +111,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
})
.collect::<Vec<Expr>>();

let aggr_expr = normalize_cols(aggr_expr, input.as_ref())?;
let group_expr = normalize_cols(on_expr, input.as_ref())?;

// Build the aggregation plan
let plan = LogicalPlanBuilder::from(input.as_ref().clone())
.aggregate(on_expr.clone(), aggr_expr.to_vec())?
.build()?;
let plan = LogicalPlan::Aggregate(Aggregate::try_new(
input, group_expr, aggr_expr,
)?);
// TODO use LogicalPlanBuilder directly rather than recreating the Aggregate
// when https://github.com/apache/datafusion/issues/10485 is available
let lpb = LogicalPlanBuilder::from(plan);

let plan = if let Some(sort_expr) = sort_expr {
let plan = if let Some(mut sort_expr) = sort_expr {
// While sort expressions were used in the `FIRST_VALUE` aggregation itself above,
// this on it's own isn't enough to guarantee the proper output order of the grouping
// (`ON`) expression, so we need to sort those as well.
LogicalPlanBuilder::from(plan)
.sort(sort_expr[..on_expr.len()].to_vec())?
.build()?

// truncate the sort_expr to the length of on_expr
sort_expr.truncate(expr_cnt);

lpb.sort(sort_expr)?.build()?
} else {
plan
lpb.build()?
};

// Whereas the aggregation plan by default outputs both the grouping and the aggregation
// expressions, for `DISTINCT ON` we only need to emit the original selection expressions.

let project_exprs = plan
.schema()
.iter()
.skip(on_expr.len())
.skip(expr_cnt)
.zip(schema.iter())
.map(|((new_qualifier, new_field), (old_qualifier, old_field))| {
Ok(col(Column::from((new_qualifier, new_field)))
.alias_qualified(old_qualifier.cloned(), old_field.name()))
col(Column::from((new_qualifier, new_field)))
.alias_qualified(old_qualifier.cloned(), old_field.name())
})
.collect::<Result<Vec<Expr>>>()?;
.collect::<Vec<Expr>>();

let plan = LogicalPlanBuilder::from(plan)
.project(project_exprs)?
.build()?;

Ok(Some(plan))
Ok(Transformed::yes(plan))
}
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}

fn try_optimize(
&self,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called ReplaceDistinctWithAggregate::rewrite")
}

fn name(&self) -> &str {
"replace_distinct_aggregate"
}
Expand Down

0 comments on commit a2eca29

Please sign in to comment.