Skip to content

Commit

Permalink
Merge commit '1ec65a4a4a697d382d64ac2382b8486709dcf680' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-2
  • Loading branch information
appletreeisyellow committed Apr 24, 2024
2 parents 059ce1a + 1ec65a4 commit d7aaeeb
Show file tree
Hide file tree
Showing 33 changed files with 864 additions and 728 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object_store = { version = "0.9.1", default-features = false }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
rstest = "0.18.0"
rstest = "0.19.0"
serde_json = "1"
sqlparser = { version = "0.44.0", features = ["visitor"] }
tempfile = "3"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn main() -> Result<()> {

// then run the optimizer with our custom rule
let optimizer = Optimizer::with_rules(vec![Arc::new(MyOptimizerRule {})]);
let optimized_plan = optimizer.optimize(&analyzed_plan, &config, observe)?;
let optimized_plan = optimizer.optimize(analyzed_plan, &config, observe)?;
println!(
"Optimized Logical Plan:\n\n{}\n",
optimized_plan.display_indent()
Expand Down
12 changes: 10 additions & 2 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub trait TreeNode: Sized {
/// Visit the tree node using the given [`TreeNodeVisitor`], performing a
/// depth-first walk of the node and its children.
///
/// See also:
/// * [`Self::rewrite`] to rewrite owned `TreeNode`s
///
/// Consider the following tree structure:
/// ```text
/// ParentNode
Expand Down Expand Up @@ -93,6 +96,9 @@ pub trait TreeNode: Sized {
/// Implements the [visitor pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for
/// recursively transforming [`TreeNode`]s.
///
/// See also:
/// * [`Self::visit`] for inspecting (without modification) `TreeNode`s
///
/// Consider the following tree structure:
/// ```text
/// ParentNode
Expand Down Expand Up @@ -310,13 +316,15 @@ pub trait TreeNode: Sized {
}

/// Apply the closure `F` to the node's children.
///
/// See `mutate_children` for rewriting in place
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
&self,
f: F,
) -> Result<TreeNodeRecursion>;

/// Apply transform `F` to the node's children. Note that the transform `F`
/// might have a direction (pre-order or post-order).
/// Apply transform `F` to potentially rewrite the node's children. Note
/// that the transform `F` might have a direction (pre-order or post-order).
fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: F,
Expand Down
77 changes: 70 additions & 7 deletions datafusion/core/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,83 @@ pub trait TableProvider: Sync + Send {
/// Specify if DataFusion should provide filter expressions to the
/// TableProvider to apply *during* the scan.
///
/// The return value must have one element for each filter expression passed
/// in. The value of each element indicates if the TableProvider can apply
/// that particular filter during the scan.
///
/// Some TableProviders can evaluate filters more efficiently than the
/// `Filter` operator in DataFusion, for example by using an index.
///
/// By default, returns [`Unsupported`] for all filters, meaning no filters
/// will be provided to [`Self::scan`]. If the TableProvider can implement
/// filter pushdown, it should return either [`Exact`] or [`Inexact`].
/// # Parameters and Return Value
///
/// The return `Vec` must have one element for each element of the `filters`
/// argument. The value of each element indicates if the TableProvider can
/// apply the corresponding filter during the scan. The position in the return
/// value corresponds to the expression in the `filters` parameter.
///
/// If the length of the resulting `Vec` does not match the `filters` input
/// an error will be thrown.
///
/// Each element in the resulting `Vec` is one of the following:
/// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter
/// during scan
/// * [`Unsupported`]: The TableProvider cannot apply the filter during scan
///
/// By default, this function returns [`Unsupported`] for all filters,
/// meaning no filters will be provided to [`Self::scan`].
///
/// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
/// [`Exact`]: TableProviderFilterPushDown::Exact
/// [`Inexact`]: TableProviderFilterPushDown::Inexact
/// # Example
///
/// ```rust
/// # use std::any::Any;
/// # use std::sync::Arc;
/// # use arrow_schema::SchemaRef;
/// # use async_trait::async_trait;
/// # use datafusion::datasource::TableProvider;
/// # use datafusion::error::{Result, DataFusionError};
/// # use datafusion::execution::context::SessionState;
/// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
/// # use datafusion_physical_plan::ExecutionPlan;
/// // Define a struct that implements the TableProvider trait
/// struct TestDataSource {}
///
/// #[async_trait]
/// impl TableProvider for TestDataSource {
/// # fn as_any(&self) -> &dyn Any { todo!() }
/// # fn schema(&self) -> SchemaRef { todo!() }
/// # fn table_type(&self) -> TableType { todo!() }
/// # async fn scan(&self, s: &SessionState, p: Option<&Vec<usize>>, f: &[Expr], l: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> {
/// todo!()
/// # }
/// // Override the supports_filters_pushdown to evaluate which expressions
/// // to accept as pushdown predicates.
/// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
/// // Process each filter
/// let support: Vec<_> = filters.iter().map(|expr| {
/// match expr {
/// // This example only supports a between expr with a single column named "c1".
/// Expr::Between(between_expr) => {
/// between_expr.expr
/// .try_into_col()
/// .map(|column| {
/// if column.name == "c1" {
/// TableProviderFilterPushDown::Exact
/// } else {
/// TableProviderFilterPushDown::Unsupported
/// }
/// })
/// // If there is no column in the expr set the filter to unsupported.
/// .unwrap_or(TableProviderFilterPushDown::Unsupported)
/// }
/// _ => {
/// // For all other cases return Unsupported.
/// TableProviderFilterPushDown::Unsupported
/// }
/// }
/// }).collect();
/// Ok(support)
/// }
/// }
/// ```
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1881,7 +1881,7 @@ impl SessionState {

// optimize the child plan, capturing the output of each optimizer
let optimized_plan = self.optimizer.optimize(
&analyzed_plan,
analyzed_plan,
self,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
Expand Down Expand Up @@ -1911,7 +1911,7 @@ impl SessionState {
let analyzed_plan =
self.analyzer
.execute_and_check(plan, self.options(), |_, _| {})?;
self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
self.optimizer.optimize(analyzed_plan, self, |_, _| {})
}
}

Expand Down
10 changes: 7 additions & 3 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,15 @@
//! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
//! [`LogicalPlan`]s, each potentially containing embedded [`Expr`]s.
//!
//! [`Expr`]s can be rewritten using the [`TreeNode`] API and simplified using
//! [`ExprSimplifier`]. Examples of working with and executing `Expr`s can be found in the
//! [`expr_api`.rs] example
//! `LogicalPlan`s can be rewritten with [`TreeNode`] API, see the
//! [`tree_node module`] for more details.
//!
//! [`Expr`]s can also be rewritten with [`TreeNode`] API and simplified using
//! [`ExprSimplifier`]. Examples of working with and executing `Expr`s can be
//! found in the [`expr_api`.rs] example
//!
//! [`TreeNode`]: datafusion_common::tree_node::TreeNode
//! [`tree_node module`]: datafusion_expr::logical_plan::tree_node
//! [`ExprSimplifier`]: crate::optimizer::simplify_expressions::ExprSimplifier
//! [`expr_api`.rs]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs
//!
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let optimizer = Optimizer::new();
// analyze and optimize the logical plan
let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
optimizer.optimize(&plan, &config, |_, _| {})
optimizer.optimize(plan, &config, |_, _| {})
}

#[derive(Default)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod dml;
mod extension;
mod plan;
mod statement;
mod tree_node;
pub mod tree_node;

pub use builder::{
build_join_schema, table_scan, union, wrap_projection_for_join_if_necessary,
Expand Down
20 changes: 19 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ pub use datafusion_common::{JoinConstraint, JoinType};
/// an output relation (table) with a (potentially) different
/// schema. A plan represents a dataflow tree where data flows
/// from leaves up to the root to produce the query result.
///
/// # See also:
/// * [`tree_node`]: visiting and rewriting API
///
/// [`tree_node`]: crate::logical_plan::tree_node
#[derive(Clone, PartialEq, Eq, Hash)]
pub enum LogicalPlan {
/// Evaluates an arbitrary list of expressions (essentially a
Expand Down Expand Up @@ -238,7 +243,10 @@ impl LogicalPlan {
}

/// Returns all expressions (non-recursively) evaluated by the current
/// logical plan node. This does not include expressions in any children
/// logical plan node. This does not include expressions in any children.
///
/// Note this method `clone`s all the expressions. When possible, the
/// [`tree_node`] API should be used instead of this API.
///
/// The returned expressions do not necessarily represent or even
/// contributed to the output schema of this node. For example,
Expand All @@ -248,6 +256,8 @@ impl LogicalPlan {
/// The expressions do contain all the columns that are used by this plan,
/// so if there are columns not referenced by these expressions then
/// DataFusion's optimizer attempts to optimize them away.
///
/// [`tree_node`]: crate::logical_plan::tree_node
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
let mut exprs = vec![];
self.apply_expressions(|e| {
Expand Down Expand Up @@ -773,10 +783,16 @@ impl LogicalPlan {
/// Returns a new `LogicalPlan` based on `self` with inputs and
/// expressions replaced.
///
/// Note this method creates an entirely new node, which requires a large
/// amount of clone'ing. When possible, the [`tree_node`] API should be used
/// instead of this API.
///
/// The exprs correspond to the same order of expressions returned
/// by [`Self::expressions`]. This function is used by optimizers
/// to rewrite plans using the following pattern:
///
/// [`tree_node`]: crate::logical_plan::tree_node
///
/// ```text
/// let new_inputs = optimize_children(..., plan, props);
///
Expand Down Expand Up @@ -1367,6 +1383,7 @@ macro_rules! handle_transform_recursion_up {
}

impl LogicalPlan {
/// Visits a plan similarly to [`Self::visit`], but including embedded subqueries.
pub fn visit_with_subqueries<V: TreeNodeVisitor<Node = Self>>(
&self,
visitor: &mut V,
Expand All @@ -1380,6 +1397,7 @@ impl LogicalPlan {
.visit_parent(|| visitor.f_up(self))
}

/// Rewrites a plan similarly t [`Self::visit`], but including embedded subqueries.
pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand Down
32 changes: 25 additions & 7 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,35 @@
// specific language governing permissions and limitations
// under the License.

//! Tree node implementation for logical plan
//! [`TreeNode`] based visiting and rewriting for [`LogicalPlan`]s
//!
//! Visiting (read only) APIs
//! * [`LogicalPlan::visit`]: recursively visit the node and all of its inputs
//! * [`LogicalPlan::visit_with_subqueries`]: recursively visit the node and all of its inputs, including subqueries
//! * [`LogicalPlan::apply_children`]: recursively visit all inputs of this node
//! * [`LogicalPlan::apply_expressions`]: (non recursively) visit all expressions of this node
//! * [`LogicalPlan::apply_subqueries`]: (non recursively) visit all subqueries of this node
//! * [`LogicalPlan::apply_with_subqueries`]: recursively visit all inputs and embedded subqueries.
//!
//! Rewriting (update) APIs:
//! * [`LogicalPlan::exists`]: search for an expression in a plan
//! * [`LogicalPlan::rewrite`]: recursively rewrite the node and all of its inputs
//! * [`LogicalPlan::map_children`]: recursively rewrite all inputs of this node
//! * [`LogicalPlan::map_expressions`]: (non recursively) visit all expressions of this node
//! * [`LogicalPlan::map_subqueries`]: (non recursively) rewrite all subqueries of this node
//! * [`LogicalPlan::rewrite_with_subqueries`]: recursively rewrite the node and all of its inputs, including subqueries
//!
//! (Re)creation APIs (these require substantial cloning and thus are slow):
//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
use crate::{
Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin, DdlStatement, Distinct,
DistinctOn, DmlStatement, Explain, Extension, Filter, Join, Limit, LogicalPlan,
Prepare, Projection, RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
Union, Unnest, Window,
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin,
DdlStatement, Distinct, DistinctOn, DmlStatement, Explain, Extension, Filter, Join,
Limit, LogicalPlan, Prepare, Projection, RecursiveQuery, Repartition, Sort, Subquery,
SubqueryAlias, Union, Unnest, Window,
};
use std::sync::Arc;

use crate::dml::CopyTo;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion,
};
Expand Down
Loading

0 comments on commit d7aaeeb

Please sign in to comment.