Skip to content

Commit

Permalink
Merge commit '952c98ecc4800df620e97b06463ed7daff227fe6' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-2
  • Loading branch information
appletreeisyellow committed Apr 24, 2024
2 parents 4f283e1 + 952c98e commit 23c0c13
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 236 deletions.
2 changes: 1 addition & 1 deletion .github/actions/setup-rust-runtime/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ runs:
using: "composite"
steps:
- name: Run sccache-cache
uses: mozilla-actions/[email protected].3
uses: mozilla-actions/[email protected].4
- name: Configure runtime env
shell: bash
# do not produce debug symbols to keep memory usage down
Expand Down
79 changes: 28 additions & 51 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,14 +913,19 @@ impl LogicalPlan {
param_values: impl Into<ParamValues>,
) -> Result<LogicalPlan> {
let param_values = param_values.into();
match self {
LogicalPlan::Prepare(prepare_lp) => {
param_values.verify(&prepare_lp.data_types)?;
let input_plan = prepare_lp.input;
input_plan.replace_params_with_values(&param_values)
let plan_with_values = self.replace_params_with_values(&param_values)?;

// unwrap Prepare
Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values {
param_values.verify(&prepare_lp.data_types)?;
// try and take ownership of the input if is not shared, clone otherwise
match Arc::try_unwrap(prepare_lp.input) {
Ok(input) => input,
Err(arc_input) => arc_input.as_ref().clone(),
}
_ => self.replace_params_with_values(&param_values),
}
} else {
plan_with_values
})
}

/// Returns the maximum number of rows that this plan can output, if known.
Expand Down Expand Up @@ -1046,27 +1051,26 @@ impl LogicalPlan {
/// ...) replaced with corresponding values provided in
/// `params_values`
///
/// See [`Self::with_param_values`] for examples and usage
/// See [`Self::with_param_values`] for examples and usage with an owned
/// `ParamValues`
pub fn replace_params_with_values(
&self,
self,
param_values: &ParamValues,
) -> Result<LogicalPlan> {
let new_exprs = self
.expressions()
.into_iter()
.map(|e| {
let e = e.infer_placeholder_types(self.schema())?;
Self::replace_placeholders_with_values(e, param_values)
self.transform_up_with_subqueries(&|plan| {
let schema = plan.schema().clone();
plan.map_expressions(|e| {
e.infer_placeholder_types(&schema)?.transform_up(&|e| {
if let Expr::Placeholder(Placeholder { id, .. }) = e {
let value = param_values.get_placeholders_with_values(&id)?;
Ok(Transformed::yes(Expr::Literal(value)))
} else {
Ok(Transformed::no(e))
}
})
})
.collect::<Result<Vec<_>>>()?;

let new_inputs_with_values = self
.inputs()
.into_iter()
.map(|inp| inp.replace_params_with_values(param_values))
.collect::<Result<Vec<_>>>()?;

self.with_new_exprs(new_exprs, new_inputs_with_values)
})
.map(|res| res.data)
}

/// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
Expand Down Expand Up @@ -1099,33 +1103,6 @@ impl LogicalPlan {
.map(|_| param_types)
}

/// Return an Expr with all placeholders replaced with their
/// corresponding values provided in the params_values
fn replace_placeholders_with_values(
expr: Expr,
param_values: &ParamValues,
) -> Result<Expr> {
expr.transform(&|expr| {
match &expr {
Expr::Placeholder(Placeholder { id, .. }) => {
let value = param_values.get_placeholders_with_values(id)?;
// Replace the placeholder with the value
Ok(Transformed::yes(Expr::Literal(value)))
}
Expr::ScalarSubquery(qry) => {
let subquery =
Arc::new(qry.subquery.replace_params_with_values(param_values)?);
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns: qry.outer_ref_columns.clone(),
})))
}
_ => Ok(Transformed::no(expr)),
}
})
.data()
}

// ------------
// Various implementations for printing out LogicalPlans
// ------------
Expand Down
166 changes: 44 additions & 122 deletions datafusion/optimizer/src/analyzer/function_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
use super::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DFSchema, Result};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::expr_rewriter::{rewrite_preserving_name, FunctionRewrite};

use crate::utils::NamePreserver;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::utils::merge_schema;
use datafusion_expr::{Expr, LogicalPlan, Subquery};
use datafusion_expr::LogicalPlan;
use std::sync::Arc;

/// Analyzer rule that invokes [`FunctionRewrite`]s on expressions
Expand All @@ -38,132 +39,53 @@ impl ApplyFunctionRewrites {
pub fn new(function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>) -> Self {
Self { function_rewrites }
}
}

impl AnalyzerRule for ApplyFunctionRewrites {
fn name(&self) -> &str {
"apply_function_rewrites"
}

fn analyze(&self, plan: LogicalPlan, options: &ConfigOptions) -> Result<LogicalPlan> {
analyze_internal(&plan, &self.function_rewrites, options)
}
}

fn analyze_internal(
plan: &LogicalPlan,
function_rewrites: &[Arc<dyn FunctionRewrite + Send + Sync>],
options: &ConfigOptions,
) -> Result<LogicalPlan> {
// optimize child plans first
let new_inputs = plan
.inputs()
.iter()
.map(|p| analyze_internal(p, function_rewrites, options))
.collect::<Result<Vec<_>>>()?;
/// Rewrite a single plan, and all its expressions using the provided rewriters
fn rewrite_plan(
&self,
plan: LogicalPlan,
options: &ConfigOptions,
) -> Result<Transformed<LogicalPlan>> {
// get schema representing all available input fields. This is used for data type
// resolution only, so order does not matter here
let mut schema = merge_schema(plan.inputs());

if let LogicalPlan::TableScan(ts) = &plan {
let source_schema = DFSchema::try_from_qualified_schema(
ts.table_name.clone(),
&ts.source.schema(),
)?;
schema.merge(&source_schema);
}

// get schema representing all available input fields. This is used for data type
// resolution only, so order does not matter here
let mut schema = merge_schema(new_inputs.iter().collect());
let name_preserver = NamePreserver::new(&plan);

if let LogicalPlan::TableScan(ts) = plan {
let source_schema = DFSchema::try_from_qualified_schema(
ts.table_name.clone(),
&ts.source.schema(),
)?;
schema.merge(&source_schema);
}
plan.map_expressions(|expr| {
let original_name = name_preserver.save(&expr)?;

let mut expr_rewrite = OperatorToFunctionRewriter {
function_rewrites,
options,
schema: &schema,
};
// recursively transform the expression, applying the rewrites at each step
let result = expr.transform_up(&|expr| {
let mut result = Transformed::no(expr);
for rewriter in self.function_rewrites.iter() {
result = result.transform_data(|expr| {
rewriter.rewrite(expr, &schema, options)
})?;
}
Ok(result)
})?;

let new_expr = plan
.expressions()
.into_iter()
.map(|expr| {
// ensure names don't change:
// https://github.com/apache/arrow-datafusion/issues/3555
rewrite_preserving_name(expr, &mut expr_rewrite)
result.map_data(|expr| original_name.restore(expr))
})
.collect::<Result<Vec<_>>>()?;

plan.with_new_exprs(new_expr, new_inputs)
}

fn rewrite_subquery(
mut subquery: Subquery,
function_rewrites: &[Arc<dyn FunctionRewrite + Send + Sync>],
options: &ConfigOptions,
) -> Result<Subquery> {
subquery.subquery = Arc::new(analyze_internal(
&subquery.subquery,
function_rewrites,
options,
)?);
Ok(subquery)
}

struct OperatorToFunctionRewriter<'a> {
function_rewrites: &'a [Arc<dyn FunctionRewrite + Send + Sync>],
options: &'a ConfigOptions,
schema: &'a DFSchema,
}
}

impl<'a> TreeNodeRewriter for OperatorToFunctionRewriter<'a> {
type Node = Expr;

fn f_up(&mut self, mut expr: Expr) -> Result<Transformed<Expr>> {
// apply transforms one by one
let mut transformed = false;
for rewriter in self.function_rewrites.iter() {
let result = rewriter.rewrite(expr, self.schema, self.options)?;
if result.transformed {
transformed = true;
}
expr = result.data
}

// recurse into subqueries if needed
let expr = match expr {
Expr::ScalarSubquery(subquery) => Expr::ScalarSubquery(rewrite_subquery(
subquery,
self.function_rewrites,
self.options,
)?),

Expr::Exists(Exists { subquery, negated }) => Expr::Exists(Exists {
subquery: rewrite_subquery(
subquery,
self.function_rewrites,
self.options,
)?,
negated,
}),

Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
}) => Expr::InSubquery(InSubquery {
expr,
subquery: rewrite_subquery(
subquery,
self.function_rewrites,
self.options,
)?,
negated,
}),

expr => expr,
};
impl AnalyzerRule for ApplyFunctionRewrites {
fn name(&self) -> &str {
"apply_function_rewrites"
}

Ok(if transformed {
Transformed::yes(expr)
} else {
Transformed::no(expr)
})
fn analyze(&self, plan: LogicalPlan, options: &ConfigOptions) -> Result<LogicalPlan> {
plan.transform_up_with_subqueries(&|plan| self.rewrite_plan(plan, options))
.map(|res| res.data)
}
}
Loading

0 comments on commit 23c0c13

Please sign in to comment.