diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs index a841fe532294..30178d17aa54 100644 --- a/benchmarks/src/tpch/convert.rs +++ b/benchmarks/src/tpch/convert.rs @@ -88,9 +88,7 @@ impl ConvertOpt { .schema() .iter() .take(schema.fields.len() - 1) - .map(|(qualifier, field)| { - Expr::Column(Column::from((qualifier, field.as_ref()))) - }) + .map(Expr::from) .collect(); csv = csv.select(selection)?; diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index dec87d9d071e..ae3146516349 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -17,7 +17,7 @@ //! Column -use arrow_schema::Field; +use arrow_schema::{Field, FieldRef}; use crate::error::_schema_err; use crate::utils::{parse_identifiers_normalized, quote_identifier}; @@ -63,6 +63,8 @@ impl Column { } /// Create Column from unqualified name. + /// + /// Alias for `Column::new_unqualified` pub fn from_name(name: impl Into) -> Self { Self { relation: None, @@ -346,6 +348,13 @@ impl From<(Option<&TableReference>, &Field)> for Column { } } +/// Create a column, use qualifier and field name +impl From<(Option<&TableReference>, &FieldRef)> for Column { + fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self { + Self::new(relation.cloned(), field.name()) + } +} + impl FromStr for Column { type Err = Infallible; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index abf09772e5bb..bd561e89832e 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1332,7 +1332,7 @@ impl DataFrame { col_exists = true; new_column.clone() } else { - col(Column::from((qualifier, field.as_ref()))) + col(Column::from((qualifier, field))) } }) .collect(); @@ -1402,9 +1402,9 @@ impl DataFrame { .iter() .map(|(qualifier, field)| { if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename { - col(Column::from((qualifier, field.as_ref()))).alias(new_name) + col(Column::from((qualifier, field))).alias(new_name) } else { - col(Column::from((qualifier, field.as_ref()))) + col(Column::from((qualifier, field))) } }) .collect::>(); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e6785b1dec2a..848f561ffb85 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1261,15 +1261,8 @@ impl DefaultPhysicalPlanner { // Remove temporary projected columns if left_projected || right_projected { - let final_join_result = join_schema - .iter() - .map(|(qualifier, field)| { - Expr::Column(datafusion_common::Column::from(( - qualifier, - field.as_ref(), - ))) - }) - .collect::>(); + let final_join_result = + join_schema.iter().map(Expr::from).collect::>(); let projection = LogicalPlan::Projection(Projection::try_new( final_join_result, Arc::new(new_join), diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index fb75a3cc7a43..6f76936806c6 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -32,7 +32,7 @@ use crate::{ Signature, }; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, FieldRef}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ internal_err, plan_err, Column, DFSchema, Result, ScalarValue, TableReference, @@ -84,6 +84,29 @@ use sqlparser::ast::NullTreatment; /// assert_eq!(binary_expr.op, Operator::Eq); /// } /// ``` +/// +/// ## Return a list of [`Expr::Column`] from a schema's columns +/// ``` +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::{DFSchema, Column}; +/// # use datafusion_expr::Expr; +/// +/// let arrow_schema = Schema::new(vec![ +/// Field::new("c1", DataType::Int32, false), +/// Field::new("c2", DataType::Float64, false), +/// ]); +/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap(); +/// +/// // Form a list of expressions for each item in the schema +/// let exprs: Vec<_> = df_schema.iter() +/// .map(Expr::from) +/// .collect(); +/// +/// assert_eq!(exprs, vec![ +/// Expr::from(Column::from_qualified_name("t1.c1")), +/// Expr::from(Column::from_qualified_name("t1.c2")), +/// ]); +/// ``` #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub enum Expr { /// An expression with a specific name. @@ -190,6 +213,23 @@ impl Default for Expr { } } +/// Create an [`Expr`] from a [`Column`] +impl From for Expr { + fn from(value: Column) -> Self { + Expr::Column(value) + } +} + +/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is +/// useful for creating [`Expr`] from a [`DFSchema`]. +/// +/// See example on [`Expr`] +impl<'a> From<(Option<&'a TableReference>, &'a FieldRef)> for Expr { + fn from(value: (Option<&'a TableReference>, &'a FieldRef)) -> Self { + Expr::from(Column::from(value)) + } +} + #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct Unnest { pub expr: Box, diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index f5779df812f1..fd6446eba971 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -218,13 +218,7 @@ pub fn coerce_plan_expr_for_schema( Ok(LogicalPlan::Projection(projection)) } _ => { - let exprs: Vec = plan - .schema() - .iter() - .map(|(qualifier, field)| { - Expr::Column(Column::from((qualifier, field.as_ref()))) - }) - .collect(); + let exprs: Vec = plan.schema().iter().map(Expr::from).collect(); let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?; let add_project = new_exprs.iter().any(|expr| expr.try_into_col().is_err()); diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2810425ae1d8..fa4b0b964295 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1577,7 +1577,7 @@ pub fn unnest_with_options( return Ok(input); } }; - qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref()))); + qualified_columns.push(Column::from((unnest_qualifier, &unnested_field))); unnested_fields.insert(index, unnested_field); } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 8da93c244c07..64fe98c23b08 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -356,12 +356,7 @@ fn get_exprs_except_skipped( columns_to_skip: HashSet, ) -> Vec { if columns_to_skip.is_empty() { - schema - .iter() - .map(|(qualifier, field)| { - Expr::Column(Column::from((qualifier, field.as_ref()))) - }) - .collect::>() + schema.iter().map(Expr::from).collect::>() } else { schema .columns() @@ -855,7 +850,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { match expr { Expr::Column(col) => { let (qualifier, field) = plan.schema().qualified_field_from_column(col)?; - Ok(Expr::Column(Column::from((qualifier, field)))) + Ok(Expr::from(Column::from((qualifier, field)))) } _ => Ok(Expr::Column(Column::from_name(expr.display_name()?))), } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 690b596ed35f..b859dda9d53f 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -506,7 +506,7 @@ fn build_common_expr_project_plan( for (qualifier, field) in input.schema().iter() { if fields_set.insert(qualified_name(qualifier, field.name())) { - project_exprs.push(Expr::Column(Column::from((qualifier, field.as_ref())))); + project_exprs.push(Expr::from((qualifier, field))); } } @@ -525,10 +525,7 @@ fn build_recover_project_plan( schema: &DFSchema, input: LogicalPlan, ) -> Result { - let col_exprs = schema - .iter() - .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field.as_ref())))) - .collect(); + let col_exprs = schema.iter().map(Expr::from).collect(); Ok(LogicalPlan::Projection(Projection::try_new( col_exprs, Arc::new(input), diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index f464506057ff..4f68e2623f40 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -127,7 +127,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { .skip(on_expr.len()) .zip(schema.iter()) .map(|((new_qualifier, new_field), (old_qualifier, old_field))| { - Ok(col(Column::from((new_qualifier, new_field.as_ref()))) + Ok(col(Column::from((new_qualifier, new_field))) .alias_qualified(old_qualifier.cloned(), old_field.name())) }) .collect::>>()?; diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 4bc0719fbaa5..3f2134bf7e9b 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -21,8 +21,8 @@ use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, JsonOperator, TrimWhereField, Va use sqlparser::parser::ParserError::ParserError; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, plan_err, Column, DFSchema, - Result, ScalarValue, + internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result, + ScalarValue, }; use datafusion_expr::expr::AggregateFunctionDefinition; use datafusion_expr::expr::InList; @@ -142,9 +142,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } _ => false, }) { - Some((qualifier, df_field)) => { - Expr::Column(Column::from((qualifier, df_field.as_ref()))) - } + Some((qualifier, df_field)) => Expr::from((qualifier, df_field)), None => Expr::Column(col), } } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 759a5e8ce9d3..c81217aa7017 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1307,8 +1307,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )) } else { datafusion_expr::Expr::Column(Column::from(( - qualifier, - field.as_ref(), + qualifier, field, ))) } }