diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 5562d98983f7..23b4700ea8f6 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1594,10 +1594,14 @@ name = "datafusion-physical-optimizer" version = "44.0.0" dependencies = [ "arrow", + "arrow-schema", "datafusion-common", "datafusion-execution", + "datafusion-expr", "datafusion-expr-common", + "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-common", "datafusion-physical-plan", "futures", "itertools 0.14.0", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 97b88a0b0c3d..e341816b2b8a 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -141,6 +141,7 @@ async-trait = { workspace = true } criterion = { version = "0.5", features = ["async_tokio"] } ctor = { workspace = true } datafusion-functions-window-common = { workspace = true } +datafusion-physical-optimizer = { workspace = true } doc-comment = { workspace = true } env_logger = { workspace = true } paste = "^1.0" diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c44200a492eb..fbadceba0948 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1416,9 +1416,6 @@ pub(crate) mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; - use crate::physical_optimizer::test_utils::{ - check_integrity, coalesce_partitions_exec, repartition_exec, - }; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::expressions::col; use crate::physical_plan::filter::FilterExec; @@ -1427,6 +1424,9 @@ pub(crate) mod tests { use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; use datafusion_physical_optimizer::output_requirements::OutputRequirements; + use datafusion_physical_optimizer::test_utils::{ + check_integrity, coalesce_partitions_exec, repartition_exec, + }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ScalarValue; diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index dd8e9d900b7d..167f9d6d45e7 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -658,17 +658,17 @@ fn get_sort_exprs( mod tests { use super::*; use crate::physical_optimizer::enforce_distribution::EnforceDistribution; - use crate::physical_optimizer::test_utils::{ - aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, - coalesce_partitions_exec, filter_exec, global_limit_exec, hash_join_exec, - limit_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_sorted, - repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, - sort_preserving_merge_exec, spr_repartition_exec, union_exec, - RequirementsTestExec, - }; + use crate::physical_optimizer::test_utils::{parquet_exec, parquet_exec_sorted}; use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered}; + use datafusion_physical_optimizer::test_utils::{ + aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, + coalesce_partitions_exec, filter_exec, global_limit_exec, hash_join_exec, + limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec, + sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, + spr_repartition_exec, union_exec, RequirementsTestExec, + }; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index e2c497936afa..63fe115e602c 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -27,7 +27,6 @@ pub mod enforce_sorting; pub mod optimizer; pub mod projection_pushdown; pub mod replace_with_order_preserving_variants; -pub mod sanity_checker; #[cfg(test)] pub mod test_utils; diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 9f5afc7abc2e..1bbe9d483cbd 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -286,7 +286,6 @@ mod tests { use super::*; use crate::execution::TaskContext; - use crate::physical_optimizer::test_utils::check_integrity; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::{HashJoinExec, PartitionMode}; @@ -296,6 +295,7 @@ mod tests { }; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::TestStreamPartition; + use datafusion_physical_optimizer::test_utils::check_integrity; use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs deleted file mode 100644 index 8e8787aec96b..000000000000 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ /dev/null @@ -1,684 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! The [SanityCheckPlan] rule ensures that a given plan can -//! accommodate its infinite sources, if there are any. It will reject -//! non-runnable query plans that use pipeline-breaking operators on -//! infinite input(s). In addition, it will check if all order and -//! distribution requirements of a plan are satisfied by its children. - -use std::sync::Arc; - -use crate::error::Result; -use crate::physical_plan::ExecutionPlan; - -use datafusion_common::config::{ConfigOptions, OptimizerOptions}; -use datafusion_common::plan_err; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; -use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; -use datafusion_physical_plan::joins::SymmetricHashJoinExec; -use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; - -use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list; -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use itertools::izip; - -/// The SanityCheckPlan rule rejects the following query plans: -/// 1. Invalid plans containing nodes whose order and/or distribution requirements -/// are not satisfied by their children. -/// 2. Plans that use pipeline-breaking operators on infinite input(s), -/// it is impossible to execute such queries (they will never generate output nor finish) -#[derive(Default, Debug)] -pub struct SanityCheckPlan {} - -impl SanityCheckPlan { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -impl PhysicalOptimizerRule for SanityCheckPlan { - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result> { - plan.transform_up(|p| check_plan_sanity(p, &config.optimizer)) - .data() - } - - fn name(&self) -> &str { - "SanityCheckPlan" - } - - fn schema_check(&self) -> bool { - true - } -} - -/// This function propagates finiteness information and rejects any plan with -/// pipeline-breaking operators acting on infinite inputs. -pub fn check_finiteness_requirements( - input: Arc, - optimizer_options: &OptimizerOptions, -) -> Result>> { - if let Some(exec) = input.as_any().downcast_ref::() { - if !(optimizer_options.allow_symmetric_joins_without_pruning - || (exec.check_if_order_information_available()? && is_prunable(exec))) - { - return plan_err!("Join operation cannot operate on a non-prunable stream without enabling \ - the 'allow_symmetric_joins_without_pruning' configuration flag"); - } - } - - if matches!( - input.boundedness(), - Boundedness::Unbounded { - requires_infinite_memory: true - } - ) || (input.boundedness().is_unbounded() - && input.pipeline_behavior() == EmissionType::Final) - { - plan_err!( - "Cannot execute pipeline breaking queries, operator: {:?}", - input - ) - } else { - Ok(Transformed::no(input)) - } -} - -/// This function returns whether a given symmetric hash join is amenable to -/// data pruning. For this to be possible, it needs to have a filter where -/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support -/// interval calculations. -/// -/// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr -/// [`Operator`]: datafusion_expr::Operator -fn is_prunable(join: &SymmetricHashJoinExec) -> bool { - join.filter().is_some_and(|filter| { - check_support(filter.expression(), &join.schema()) - && filter - .schema() - .fields() - .iter() - .all(|f| is_datatype_supported(f.data_type())) - }) -} - -/// Ensures that the plan is pipeline friendly and the order and -/// distribution requirements from its children are satisfied. -pub fn check_plan_sanity( - plan: Arc, - optimizer_options: &OptimizerOptions, -) -> Result>> { - check_finiteness_requirements(Arc::clone(&plan), optimizer_options)?; - - for ((idx, child), sort_req, dist_req) in izip!( - plan.children().into_iter().enumerate(), - plan.required_input_ordering(), - plan.required_input_distribution(), - ) { - let child_eq_props = child.equivalence_properties(); - if let Some(sort_req) = sort_req { - if !child_eq_props.ordering_satisfy_requirement(&sort_req) { - let plan_str = get_plan_string(&plan); - return plan_err!( - "Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}", - plan_str, - format_physical_sort_requirement_list(&sort_req), - idx, - child_eq_props.oeq_class() - ); - } - } - - if !child - .output_partitioning() - .satisfy(&dist_req, child_eq_props) - { - let plan_str = get_plan_string(&plan); - return plan_err!( - "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}", - plan_str, - dist_req, - idx, - child.output_partitioning() - ); - } - } - - Ok(Transformed::no(plan)) -} - -#[cfg(test)] -mod tests { - use super::*; - - use crate::physical_optimizer::test_utils::{ - bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, - repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, - BinaryTestCase, QueryCase, SourceType, UnaryTestCase, - }; - - use arrow::compute::SortOptions; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::Result; - use datafusion_expr::JoinType; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr::Partitioning; - use datafusion_physical_plan::displayable; - use datafusion_physical_plan::repartition::RepartitionExec; - - fn create_test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)])) - } - - fn create_test_schema2() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ])) - } - - /// Check if sanity checker should accept or reject plans. - fn assert_sanity_check(plan: &Arc, is_sane: bool) { - let sanity_checker = SanityCheckPlan::new(); - let opts = ConfigOptions::default(); - assert_eq!( - sanity_checker.optimize(plan.clone(), &opts).is_ok(), - is_sane - ); - } - - /// Check if the plan we created is as expected by comparing the plan - /// formatted as a string. - fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { - let plan_str = displayable(plan).indent(true).to_string(); - let actual_lines: Vec<&str> = plan_str.trim().lines().collect(); - assert_eq!(actual_lines, expected_lines); - } - - #[tokio::test] - async fn test_hash_left_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: false, - }; - - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - // Left join for bounded build side and unbounded probe side can generate - // both incremental matched rows and final non-matched rows. - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_right_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_inner_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: false, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "Join Error".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_full_outer_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - // Full join for bounded build side and unbounded probe side can generate - // both incremental matched rows and final non-matched rows. - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_aggregate() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT c1, MIN(c4) FROM test GROUP BY c1".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: AggregateExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_window_agg_hash_partition() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT - c9, - SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 - FROM test - LIMIT 5".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: SortExec".to_string() - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_window_agg_single_partition() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT - c9, - SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 - FROM test".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: SortExec".to_string() - }; - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_cross_join() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Unbounded), - expect_fail: true, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: true, - }; - let test4 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 CROSS JOIN right as t2".to_string(), - cases: vec![ - Arc::new(test1), - Arc::new(test2), - Arc::new(test3), - Arc::new(test4), - ], - error_operator: "operator: CrossJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_analyzer() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: false, - }; - let case = QueryCase { - sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "Analyze Error".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - /// Tests that plan is valid when the sort requirements are satisfied. - async fn test_bounded_window_agg_sort_requirement() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr_options( - "c9", - &source.schema(), - SortOptions { - descending: false, - nulls_first: false, - }, - )]; - let sort = sort_exec(sort_exprs.clone(), source); - let bw = bounded_window_exec("c9", sort_exprs, sort); - assert_plan(bw.as_ref(), vec![ - "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]" - ]); - assert_sanity_check(&bw, true); - Ok(()) - } - - #[tokio::test] - /// Tests that plan is invalid when the sort requirements are not satisfied. - async fn test_bounded_window_agg_no_sort_requirement() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr_options( - "c9", - &source.schema(), - SortOptions { - descending: false, - nulls_first: false, - }, - )]; - let bw = bounded_window_exec("c9", sort_exprs, source); - assert_plan(bw.as_ref(), vec![ - "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " MemoryExec: partitions=1, partition_sizes=[0]" - ]); - // Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check. - assert_sanity_check(&bw, false); - Ok(()) - } - - #[tokio::test] - /// A valid when a single partition requirement - /// is satisfied. - async fn test_global_limit_single_partition() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = global_limit_exec(source); - - assert_plan( - limit.as_ref(), - vec![ - "GlobalLimitExec: skip=0, fetch=100", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&limit, true); - Ok(()) - } - - #[tokio::test] - /// An invalid plan when a single partition requirement - /// is not satisfied. - async fn test_global_limit_multi_partition() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = global_limit_exec(repartition_exec(source)); - - assert_plan( - limit.as_ref(), - vec![ - "GlobalLimitExec: skip=0, fetch=100", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Distribution requirement of the `GlobalLimitExec` is not satisfied. We expect to receive error during sanity check. - assert_sanity_check(&limit, false); - Ok(()) - } - - #[tokio::test] - /// A plan with no requirements should satisfy. - async fn test_local_limit() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = local_limit_exec(source); - - assert_plan( - limit.as_ref(), - vec![ - "LocalLimitExec: fetch=100", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&limit, true); - Ok(()) - } - - #[tokio::test] - /// Valid plan with multiple children satisfy both order and distribution. - async fn test_sort_merge_join_satisfied() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let source2 = memory_exec(&schema2); - let sort_opts = SortOptions::default(); - let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; - let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; - let left = sort_exec(sort_exprs1, source1); - let right = sort_exec(sort_exprs2, source2); - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(vec![right_jcol.clone()], 10), - )?); - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&smj, true); - Ok(()) - } - - #[tokio::test] - /// Invalid case when the order is not satisfied by the 2nd - /// child. - async fn test_sort_merge_join_order_missing() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let right = memory_exec(&schema2); - let sort_exprs1 = vec![sort_expr_options( - "c9", - &source1.schema(), - SortOptions::default(), - )]; - let left = sort_exec(sort_exprs1, source1); - // Missing sort of the right child here.. - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(vec![right_jcol.clone()], 10), - )?); - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Order requirement for the `SortMergeJoin` is not satisfied for right child. We expect to receive error during sanity check. - assert_sanity_check(&smj, false); - Ok(()) - } - - #[tokio::test] - /// Invalid case when the distribution is not satisfied by the 2nd - /// child. - async fn test_sort_merge_join_dist_missing() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let source2 = memory_exec(&schema2); - let sort_opts = SortOptions::default(); - let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; - let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; - let left = sort_exec(sort_exprs1, source1); - let right = sort_exec(sort_exprs2, source2); - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::RoundRobinBatch(10), - )?); - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - // Missing hash partitioning on right child. - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Distribution requirement for the `SortMergeJoin` is not satisfied for right child (has round-robin partitioning). We expect to receive error during sanity check. - assert_sanity_check(&smj, false); - Ok(()) - } -} diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 9156301393c0..aba24309b2a0 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -19,268 +19,15 @@ #![allow(missing_docs)] -use std::any::Any; -use std::fmt::Formatter; use std::sync::Arc; use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; -use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; -use crate::error::Result; -use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; -use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; -use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use crate::physical_plan::filter::FilterExec; -use crate::physical_plan::joins::utils::{JoinFilter, JoinOn}; -use crate::physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; -use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use crate::physical_plan::memory::MemoryExec; -use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; -use crate::physical_plan::union::UnionExec; -use crate::physical_plan::windows::create_window_expr; -use crate::physical_plan::{ExecutionPlan, InputOrderMode, Partitioning}; -use crate::prelude::{CsvReadOptions, SessionContext}; +use crate::physical_plan::ExecutionPlan; -use arrow_schema::{Schema, SchemaRef, SortOptions}; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::JoinType; +use arrow_schema::SchemaRef; use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; -use datafusion_functions_aggregate::count::count_udaf; -use datafusion_physical_expr::expressions::col; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use datafusion_physical_plan::tree_node::PlanContext; -use datafusion_physical_plan::{ - displayable, DisplayAs, DisplayFormatType, PlanProperties, -}; - -use async_trait::async_trait; -use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; - -async fn register_current_csv( - ctx: &SessionContext, - table_name: &str, - infinite: bool, -) -> Result<()> { - let testdata = crate::test_util::arrow_test_data(); - let schema = crate::test_util::aggr_test_schema(); - let path = format!("{testdata}/csv/aggregate_test_100.csv"); - - match infinite { - true => { - let source = FileStreamProvider::new_file(schema, path.into()); - let config = StreamConfig::new(Arc::new(source)); - ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; - } - false => { - ctx.register_csv(table_name, &path, CsvReadOptions::new().schema(&schema)) - .await?; - } - } - - Ok(()) -} - -#[derive(Eq, PartialEq, Debug)] -pub enum SourceType { - Unbounded, - Bounded, -} - -#[async_trait] -pub trait SqlTestCase { - async fn register_table(&self, ctx: &SessionContext) -> Result<()>; - fn expect_fail(&self) -> bool; -} - -/// [UnaryTestCase] is designed for single input [ExecutionPlan]s. -pub struct UnaryTestCase { - pub(crate) source_type: SourceType, - pub(crate) expect_fail: bool, -} - -#[async_trait] -impl SqlTestCase for UnaryTestCase { - async fn register_table(&self, ctx: &SessionContext) -> Result<()> { - let table_is_infinite = self.source_type == SourceType::Unbounded; - register_current_csv(ctx, "test", table_is_infinite).await?; - Ok(()) - } - - fn expect_fail(&self) -> bool { - self.expect_fail - } -} -/// [BinaryTestCase] is designed for binary input [ExecutionPlan]s. -pub struct BinaryTestCase { - pub(crate) source_types: (SourceType, SourceType), - pub(crate) expect_fail: bool, -} - -#[async_trait] -impl SqlTestCase for BinaryTestCase { - async fn register_table(&self, ctx: &SessionContext) -> Result<()> { - let left_table_is_infinite = self.source_types.0 == SourceType::Unbounded; - let right_table_is_infinite = self.source_types.1 == SourceType::Unbounded; - register_current_csv(ctx, "left", left_table_is_infinite).await?; - register_current_csv(ctx, "right", right_table_is_infinite).await?; - Ok(()) - } - - fn expect_fail(&self) -> bool { - self.expect_fail - } -} - -pub struct QueryCase { - pub(crate) sql: String, - pub(crate) cases: Vec>, - pub(crate) error_operator: String, -} - -impl QueryCase { - /// Run the test cases - pub(crate) async fn run(&self) -> Result<()> { - for case in &self.cases { - let ctx = SessionContext::new(); - case.register_table(&ctx).await?; - let error = if case.expect_fail() { - Some(&self.error_operator) - } else { - None - }; - self.run_case(ctx, error).await?; - } - Ok(()) - } - async fn run_case(&self, ctx: SessionContext, error: Option<&String>) -> Result<()> { - let dataframe = ctx.sql(self.sql.as_str()).await?; - let plan = dataframe.create_physical_plan().await; - if let Some(error) = error { - let plan_error = plan.unwrap_err(); - assert!( - plan_error.to_string().contains(error.as_str()), - "plan_error: {:?} doesn't contain message: {:?}", - plan_error, - error.as_str() - ); - } else { - assert!(plan.is_ok()) - } - Ok(()) - } -} - -pub fn sort_merge_join_exec( - left: Arc, - right: Arc, - join_on: &JoinOn, - join_type: &JoinType, -) -> Arc { - Arc::new( - SortMergeJoinExec::try_new( - left, - right, - join_on.clone(), - None, - *join_type, - vec![SortOptions::default(); join_on.len()], - false, - ) - .unwrap(), - ) -} - -/// make PhysicalSortExpr with default options -pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { - sort_expr_options(name, schema, SortOptions::default()) -} - -/// PhysicalSortExpr with specified options -pub fn sort_expr_options( - name: &str, - schema: &Schema, - options: SortOptions, -) -> PhysicalSortExpr { - PhysicalSortExpr { - expr: col(name, schema).unwrap(), - options, - } -} - -pub fn coalesce_partitions_exec(input: Arc) -> Arc { - Arc::new(CoalescePartitionsExec::new(input)) -} - -pub(crate) fn memory_exec(schema: &SchemaRef) -> Arc { - Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap()) -} - -pub fn hash_join_exec( - left: Arc, - right: Arc, - on: JoinOn, - filter: Option, - join_type: &JoinType, -) -> Result> { - Ok(Arc::new(HashJoinExec::try_new( - left, - right, - on, - filter, - join_type, - None, - PartitionMode::Partitioned, - true, - )?)) -} - -pub fn bounded_window_exec( - col_name: &str, - sort_exprs: impl IntoIterator, - input: Arc, -) -> Arc { - let sort_exprs: LexOrdering = sort_exprs.into_iter().collect(); - let schema = input.schema(); - - Arc::new( - crate::physical_plan::windows::BoundedWindowAggExec::try_new( - vec![create_window_expr( - &WindowFunctionDefinition::AggregateUDF(count_udaf()), - "count".to_owned(), - &[col(col_name, &schema).unwrap()], - &[], - sort_exprs.as_ref(), - Arc::new(WindowFrame::new(Some(false))), - schema.as_ref(), - false, - ) - .unwrap()], - input.clone(), - vec![], - InputOrderMode::Sorted, - ) - .unwrap(), - ) -} - -pub fn filter_exec( - predicate: Arc, - input: Arc, -) -> Arc { - Arc::new(FilterExec::try_new(predicate, input).unwrap()) -} - -pub fn sort_preserving_merge_exec( - sort_exprs: impl IntoIterator, - input: Arc, -) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortPreservingMergeExec::new(sort_exprs, input)) -} +use datafusion_physical_expr::PhysicalSortExpr; /// Create a non sorted parquet exec pub fn parquet_exec(schema: &SchemaRef) -> Arc { @@ -305,174 +52,3 @@ pub fn parquet_exec_sorted( ) .build_arc() } - -pub fn union_exec(input: Vec>) -> Arc { - Arc::new(UnionExec::new(input)) -} - -pub fn limit_exec(input: Arc) -> Arc { - global_limit_exec(local_limit_exec(input)) -} - -pub fn local_limit_exec(input: Arc) -> Arc { - Arc::new(LocalLimitExec::new(input, 100)) -} - -pub fn global_limit_exec(input: Arc) -> Arc { - Arc::new(GlobalLimitExec::new(input, 0, Some(100))) -} - -pub fn repartition_exec(input: Arc) -> Arc { - Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap()) -} - -pub fn spr_repartition_exec(input: Arc) -> Arc { - Arc::new( - RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)) - .unwrap() - .with_preserve_order(), - ) -} - -pub fn aggregate_exec(input: Arc) -> Arc { - let schema = input.schema(); - Arc::new( - AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![], - vec![], - input, - schema, - ) - .unwrap(), - ) -} - -pub fn coalesce_batches_exec(input: Arc) -> Arc { - Arc::new(CoalesceBatchesExec::new(input, 128)) -} - -pub fn sort_exec( - sort_exprs: impl IntoIterator, - input: Arc, -) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortExec::new(sort_exprs, input)) -} - -/// A test [`ExecutionPlan`] whose requirements can be configured. -#[derive(Debug)] -pub struct RequirementsTestExec { - required_input_ordering: LexOrdering, - maintains_input_order: bool, - input: Arc, -} - -impl RequirementsTestExec { - pub fn new(input: Arc) -> Self { - Self { - required_input_ordering: LexOrdering::default(), - maintains_input_order: true, - input, - } - } - - /// sets the required input ordering - pub fn with_required_input_ordering( - mut self, - required_input_ordering: LexOrdering, - ) -> Self { - self.required_input_ordering = required_input_ordering; - self - } - - /// set the maintains_input_order flag - pub fn with_maintains_input_order(mut self, maintains_input_order: bool) -> Self { - self.maintains_input_order = maintains_input_order; - self - } - - /// returns this ExecutionPlan as an Arc - pub fn into_arc(self) -> Arc { - Arc::new(self) - } -} - -impl DisplayAs for RequirementsTestExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "RequiredInputOrderingExec") - } -} - -impl ExecutionPlan for RequirementsTestExec { - fn name(&self) -> &str { - "RequiredInputOrderingExec" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - self.input.properties() - } - - fn required_input_ordering(&self) -> Vec> { - let requirement = LexRequirement::from(self.required_input_ordering.clone()); - vec![Some(requirement)] - } - - fn maintains_input_order(&self) -> Vec { - vec![self.maintains_input_order] - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.input] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - assert_eq!(children.len(), 1); - Ok(RequirementsTestExec::new(children[0].clone()) - .with_required_input_ordering(self.required_input_ordering.clone()) - .with_maintains_input_order(self.maintains_input_order) - .into_arc()) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unimplemented!("Test exec does not support execution") - } -} - -/// A [`PlanContext`] object is susceptible to being left in an inconsistent state after -/// untested mutable operations. It is crucial that there be no discrepancies between a plan -/// associated with the root node and the plan generated after traversing all nodes -/// within the [`PlanContext`] tree. In addition to verifying the plans resulting from optimizer -/// rules, it is essential to ensure that the overall tree structure corresponds with the plans -/// contained within the node contexts. -/// TODO: Once [`ExecutionPlan`] implements [`PartialEq`], string comparisons should be -/// replaced with direct plan equality checks. -pub fn check_integrity(context: PlanContext) -> Result> { - context - .transform_up(|node| { - let children_plans = node.plan.children(); - assert_eq!(node.children.len(), children_plans.len()); - for (child_plan, child_node) in - children_plans.iter().zip(node.children.iter()) - { - assert_eq!( - displayable(child_plan.as_ref()).one_line().to_string(), - displayable(child_node.plan.as_ref()).one_line().to_string() - ); - } - Ok(Transformed::no(node)) - }) - .data() -} diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index efe377891128..1fac68e2505c 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -17,4 +17,5 @@ mod combine_partial_final_agg; mod limited_distinct_aggregation; -mod test_util; +mod sanity_checker; +pub(crate) mod test_util; diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs new file mode 100644 index 000000000000..538f0e443ddb --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -0,0 +1,536 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for [`SanityCheckPlan`] physical optimizer rule +//! +//! Note these tests are not in the same module as the optimizer pass because +//! they rely on `ParquetExec` which is in the core crate. + +use crate::physical_optimizer::test_util::{ + BinaryTestCase, QueryCase, SourceType, UnaryTestCase, +}; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::Result; +use datafusion_expr::JoinType; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_optimizer::test_utils::{ + bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, + repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, +}; +use datafusion_physical_optimizer::{sanity_checker::*, PhysicalOptimizerRule}; +use datafusion_physical_plan::displayable; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::ExecutionPlan; +use std::sync::Arc; + +fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)])) +} + +fn create_test_schema2() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])) +} + +/// Check if sanity checker should accept or reject plans. +fn assert_sanity_check(plan: &Arc, is_sane: bool) { + let sanity_checker = SanityCheckPlan::new(); + let opts = ConfigOptions::default(); + assert_eq!( + sanity_checker.optimize(plan.clone(), &opts).is_ok(), + is_sane + ); +} + +/// Check if the plan we created is as expected by comparing the plan +/// formatted as a string. +fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { + let plan_str = displayable(plan).indent(true).to_string(); + let actual_lines: Vec<&str> = plan_str.trim().lines().collect(); + assert_eq!(actual_lines, expected_lines); +} + +#[tokio::test] +async fn test_hash_left_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: false, + }; + + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + // Left join for bounded build side and unbounded probe side can generate + // both incremental matched rows and final non-matched rows. + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_hash_right_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_hash_inner_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: false, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "Join Error".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_hash_full_outer_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + // Full join for bounded build side and unbounded probe side can generate + // both incremental matched rows and final non-matched rows. + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_aggregate() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT c1, MIN(c4) FROM test GROUP BY c1".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: AggregateExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_window_agg_hash_partition() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT + c9, + SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 + FROM test + LIMIT 5".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: SortExec".to_string() + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_window_agg_single_partition() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT + c9, + SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 + FROM test".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: SortExec".to_string() + }; + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_hash_cross_join() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Unbounded), + expect_fail: true, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: true, + }; + let test4 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 CROSS JOIN right as t2".to_string(), + cases: vec![ + Arc::new(test1), + Arc::new(test2), + Arc::new(test3), + Arc::new(test4), + ], + error_operator: "operator: CrossJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_analyzer() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: false, + }; + let case = QueryCase { + sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "Analyze Error".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +/// Tests that plan is valid when the sort requirements are satisfied. +async fn test_bounded_window_agg_sort_requirement() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr_options( + "c9", + &source.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + let sort = sort_exec(sort_exprs.clone(), source); + let bw = bounded_window_exec("c9", sort_exprs, sort); + assert_plan(bw.as_ref(), vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]" + ]); + assert_sanity_check(&bw, true); + Ok(()) +} + +#[tokio::test] +/// Tests that plan is invalid when the sort requirements are not satisfied. +async fn test_bounded_window_agg_no_sort_requirement() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr_options( + "c9", + &source.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + let bw = bounded_window_exec("c9", sort_exprs, source); + assert_plan(bw.as_ref(), vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " MemoryExec: partitions=1, partition_sizes=[0]" + ]); + // Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check. + assert_sanity_check(&bw, false); + Ok(()) +} + +#[tokio::test] +/// A valid when a single partition requirement +/// is satisfied. +async fn test_global_limit_single_partition() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = global_limit_exec(source); + + assert_plan( + limit.as_ref(), + vec![ + "GlobalLimitExec: skip=0, fetch=100", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&limit, true); + Ok(()) +} + +#[tokio::test] +/// An invalid plan when a single partition requirement +/// is not satisfied. +async fn test_global_limit_multi_partition() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = global_limit_exec(repartition_exec(source)); + + assert_plan( + limit.as_ref(), + vec![ + "GlobalLimitExec: skip=0, fetch=100", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Distribution requirement of the `GlobalLimitExec` is not satisfied. We expect to receive error during sanity check. + assert_sanity_check(&limit, false); + Ok(()) +} + +#[tokio::test] +/// A plan with no requirements should satisfy. +async fn test_local_limit() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = local_limit_exec(source); + + assert_plan( + limit.as_ref(), + vec![ + "LocalLimitExec: fetch=100", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&limit, true); + Ok(()) +} + +#[tokio::test] +/// Valid plan with multiple children satisfy both order and distribution. +async fn test_sort_merge_join_satisfied() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let source2 = memory_exec(&schema2); + let sort_opts = SortOptions::default(); + let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; + let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; + let left = sort_exec(sort_exprs1, source1); + let right = sort_exec(sort_exprs2, source2); + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(vec![right_jcol.clone()], 10), + )?); + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&smj, true); + Ok(()) +} + +#[tokio::test] +/// Invalid case when the order is not satisfied by the 2nd +/// child. +async fn test_sort_merge_join_order_missing() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let right = memory_exec(&schema2); + let sort_exprs1 = vec![sort_expr_options( + "c9", + &source1.schema(), + SortOptions::default(), + )]; + let left = sort_exec(sort_exprs1, source1); + // Missing sort of the right child here.. + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(vec![right_jcol.clone()], 10), + )?); + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Order requirement for the `SortMergeJoin` is not satisfied for right child. We expect to receive error during sanity check. + assert_sanity_check(&smj, false); + Ok(()) +} + +#[tokio::test] +/// Invalid case when the distribution is not satisfied by the 2nd +/// child. +async fn test_sort_merge_join_dist_missing() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let source2 = memory_exec(&schema2); + let sort_opts = SortOptions::default(); + let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; + let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; + let left = sort_exec(sort_exprs1, source1); + let right = sort_exec(sort_exprs2, source2); + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::RoundRobinBatch(10), + )?); + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + // Missing hash partitioning on right child. + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Distribution requirement for the `SortMergeJoin` is not satisfied for right child (has round-robin partitioning). We expect to receive error during sanity check. + assert_sanity_check(&smj, false); + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/test_util.rs b/datafusion/core/tests/physical_optimizer/test_util.rs index 12cd08fb3db3..ea4b80a7899c 100644 --- a/datafusion/core/tests/physical_optimizer/test_util.rs +++ b/datafusion/core/tests/physical_optimizer/test_util.rs @@ -19,6 +19,11 @@ use std::sync::Arc; +use async_trait::async_trait; +use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; +use datafusion::error::Result; +use datafusion::prelude::{CsvReadOptions, SessionContext}; + use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion::datasource::{ listing::PartitionedFile, @@ -55,3 +60,117 @@ pub(crate) fn trim_plan_display(plan: &str) -> Vec<&str> { .filter(|s| !s.is_empty()) .collect() } + +async fn register_current_csv( + ctx: &SessionContext, + table_name: &str, + infinite: bool, +) -> Result<()> { + let testdata = datafusion::test_util::arrow_test_data(); + let schema = datafusion::test_util::aggr_test_schema(); + let path = format!("{testdata}/csv/aggregate_test_100.csv"); + + match infinite { + true => { + let source = FileStreamProvider::new_file(schema, path.into()); + let config = StreamConfig::new(Arc::new(source)); + ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; + } + false => { + ctx.register_csv(table_name, &path, CsvReadOptions::new().schema(&schema)) + .await?; + } + } + + Ok(()) +} + +#[derive(Eq, PartialEq, Debug)] +pub enum SourceType { + Unbounded, + Bounded, +} + +#[async_trait] +pub trait SqlTestCase { + async fn register_table(&self, ctx: &SessionContext) -> Result<()>; + fn expect_fail(&self) -> bool; +} + +/// [UnaryTestCase] is designed for single input [ExecutionPlan]s. +pub struct UnaryTestCase { + pub source_type: SourceType, + pub expect_fail: bool, +} + +#[async_trait] +impl SqlTestCase for UnaryTestCase { + async fn register_table(&self, ctx: &SessionContext) -> Result<()> { + let table_is_infinite = self.source_type == SourceType::Unbounded; + register_current_csv(ctx, "test", table_is_infinite).await?; + Ok(()) + } + + fn expect_fail(&self) -> bool { + self.expect_fail + } +} +/// [BinaryTestCase] is designed for binary input [ExecutionPlan]s. +pub struct BinaryTestCase { + pub source_types: (SourceType, SourceType), + pub expect_fail: bool, +} + +#[async_trait] +impl SqlTestCase for BinaryTestCase { + async fn register_table(&self, ctx: &SessionContext) -> Result<()> { + let left_table_is_infinite = self.source_types.0 == SourceType::Unbounded; + let right_table_is_infinite = self.source_types.1 == SourceType::Unbounded; + register_current_csv(ctx, "left", left_table_is_infinite).await?; + register_current_csv(ctx, "right", right_table_is_infinite).await?; + Ok(()) + } + + fn expect_fail(&self) -> bool { + self.expect_fail + } +} + +pub struct QueryCase { + pub sql: String, + pub cases: Vec>, + pub error_operator: String, +} + +impl QueryCase { + /// Run the test cases + pub async fn run(&self) -> Result<()> { + for case in &self.cases { + let ctx = SessionContext::new(); + case.register_table(&ctx).await?; + let error = if case.expect_fail() { + Some(&self.error_operator) + } else { + None + }; + self.run_case(ctx, error).await?; + } + Ok(()) + } + async fn run_case(&self, ctx: SessionContext, error: Option<&String>) -> Result<()> { + let dataframe = ctx.sql(self.sql.as_str()).await?; + let plan = dataframe.create_physical_plan().await; + if let Some(error) = error { + let plan_error = plan.unwrap_err(); + assert!( + plan_error.to_string().contains(error.as_str()), + "plan_error: {:?} doesn't contain message: {:?}", + plan_error, + error.as_str() + ); + } else { + assert!(plan.is_ok()) + } + Ok(()) + } +} diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 4e8b36546dae..338d37671f96 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -36,10 +36,14 @@ recursive_protection = ["dep:recursive"] [dependencies] arrow = { workspace = true } +arrow-schema = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } datafusion-expr-common = { workspace = true, default-features = true } +datafusion-functions-aggregate = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index ee1249febba8..ccb18f679171 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -27,6 +27,8 @@ pub mod limited_distinct_aggregation; mod optimizer; pub mod output_requirements; pub mod pruning; +pub mod sanity_checker; +pub mod test_utils; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs new file mode 100644 index 000000000000..1cf89ed8d8a4 --- /dev/null +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The [SanityCheckPlan] rule ensures that a given plan can +//! accommodate its infinite sources, if there are any. It will reject +//! non-runnable query plans that use pipeline-breaking operators on +//! infinite input(s). In addition, it will check if all order and +//! distribution requirements of a plan are satisfied by its children. + +use std::sync::Arc; + +use datafusion_common::Result; +use datafusion_physical_plan::ExecutionPlan; + +use datafusion_common::config::{ConfigOptions, OptimizerOptions}; +use datafusion_common::plan_err; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; + +use crate::PhysicalOptimizerRule; +use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list; +use itertools::izip; + +/// The SanityCheckPlan rule rejects the following query plans: +/// 1. Invalid plans containing nodes whose order and/or distribution requirements +/// are not satisfied by their children. +/// 2. Plans that use pipeline-breaking operators on infinite input(s), +/// it is impossible to execute such queries (they will never generate output nor finish) +#[derive(Default, Debug)] +pub struct SanityCheckPlan {} + +impl SanityCheckPlan { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for SanityCheckPlan { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + plan.transform_up(|p| check_plan_sanity(p, &config.optimizer)) + .data() + } + + fn name(&self) -> &str { + "SanityCheckPlan" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// This function propagates finiteness information and rejects any plan with +/// pipeline-breaking operators acting on infinite inputs. +pub fn check_finiteness_requirements( + input: Arc, + optimizer_options: &OptimizerOptions, +) -> Result>> { + if let Some(exec) = input.as_any().downcast_ref::() { + if !(optimizer_options.allow_symmetric_joins_without_pruning + || (exec.check_if_order_information_available()? && is_prunable(exec))) + { + return plan_err!("Join operation cannot operate on a non-prunable stream without enabling \ + the 'allow_symmetric_joins_without_pruning' configuration flag"); + } + } + + if matches!( + input.boundedness(), + Boundedness::Unbounded { + requires_infinite_memory: true + } + ) || (input.boundedness().is_unbounded() + && input.pipeline_behavior() == EmissionType::Final) + { + plan_err!( + "Cannot execute pipeline breaking queries, operator: {:?}", + input + ) + } else { + Ok(Transformed::no(input)) + } +} + +/// This function returns whether a given symmetric hash join is amenable to +/// data pruning. For this to be possible, it needs to have a filter where +/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support +/// interval calculations. +/// +/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr +/// [`Operator`]: datafusion_expr::Operator +fn is_prunable(join: &SymmetricHashJoinExec) -> bool { + join.filter().is_some_and(|filter| { + check_support(filter.expression(), &join.schema()) + && filter + .schema() + .fields() + .iter() + .all(|f| is_datatype_supported(f.data_type())) + }) +} + +/// Ensures that the plan is pipeline friendly and the order and +/// distribution requirements from its children are satisfied. +pub fn check_plan_sanity( + plan: Arc, + optimizer_options: &OptimizerOptions, +) -> Result>> { + check_finiteness_requirements(Arc::clone(&plan), optimizer_options)?; + + for ((idx, child), sort_req, dist_req) in izip!( + plan.children().into_iter().enumerate(), + plan.required_input_ordering(), + plan.required_input_distribution(), + ) { + let child_eq_props = child.equivalence_properties(); + if let Some(sort_req) = sort_req { + if !child_eq_props.ordering_satisfy_requirement(&sort_req) { + let plan_str = get_plan_string(&plan); + return plan_err!( + "Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}", + plan_str, + format_physical_sort_requirement_list(&sort_req), + idx, + child_eq_props.oeq_class() + ); + } + } + + if !child + .output_partitioning() + .satisfy(&dist_req, child_eq_props) + { + let plan_str = get_plan_string(&plan); + return plan_err!( + "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}", + plan_str, + dist_req, + idx, + child.output_partitioning() + ); + } + } + + Ok(Transformed::no(plan)) +} diff --git a/datafusion/physical-optimizer/src/test_utils.rs b/datafusion/physical-optimizer/src/test_utils.rs new file mode 100644 index 000000000000..dc68f1dc9764 --- /dev/null +++ b/datafusion/physical-optimizer/src/test_utils.rs @@ -0,0 +1,336 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collection of testing utility functions that are leveraged by the query optimizer rules + +use std::sync::Arc; + +use std::any::Any; +use std::fmt::Formatter; + +use arrow_schema::{Schema, SchemaRef, SortOptions}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{JoinType, Result}; +use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; +use datafusion_functions_aggregate::count::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::joins::utils::{JoinFilter, JoinOn}; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::memory::MemoryExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::union::UnionExec; +use datafusion_physical_plan::windows::{create_window_expr, BoundedWindowAggExec}; +use datafusion_physical_plan::{InputOrderMode, Partitioning}; + +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_plan::ExecutionPlan; + +use datafusion_physical_plan::tree_node::PlanContext; +use datafusion_physical_plan::{ + displayable, DisplayAs, DisplayFormatType, PlanProperties, +}; + +pub fn sort_merge_join_exec( + left: Arc, + right: Arc, + join_on: &JoinOn, + join_type: &JoinType, +) -> Arc { + Arc::new( + SortMergeJoinExec::try_new( + left, + right, + join_on.clone(), + None, + *join_type, + vec![SortOptions::default(); join_on.len()], + false, + ) + .unwrap(), + ) +} + +/// make PhysicalSortExpr with default options +pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { + sort_expr_options(name, schema, SortOptions::default()) +} + +/// PhysicalSortExpr with specified options +pub fn sort_expr_options( + name: &str, + schema: &Schema, + options: SortOptions, +) -> PhysicalSortExpr { + PhysicalSortExpr { + expr: col(name, schema).unwrap(), + options, + } +} + +pub fn coalesce_partitions_exec(input: Arc) -> Arc { + Arc::new(CoalescePartitionsExec::new(input)) +} + +pub fn memory_exec(schema: &SchemaRef) -> Arc { + Arc::new(MemoryExec::try_new(&[vec![]], Arc::clone(schema), None).unwrap()) +} + +pub fn hash_join_exec( + left: Arc, + right: Arc, + on: JoinOn, + filter: Option, + join_type: &JoinType, +) -> Result> { + Ok(Arc::new(HashJoinExec::try_new( + left, + right, + on, + filter, + join_type, + None, + PartitionMode::Partitioned, + true, + )?)) +} + +pub fn bounded_window_exec( + col_name: &str, + sort_exprs: impl IntoIterator, + input: Arc, +) -> Arc { + let sort_exprs: LexOrdering = sort_exprs.into_iter().collect(); + let schema = input.schema(); + + Arc::new( + BoundedWindowAggExec::try_new( + vec![create_window_expr( + &WindowFunctionDefinition::AggregateUDF(count_udaf()), + "count".to_owned(), + &[col(col_name, &schema).unwrap()], + &[], + sort_exprs.as_ref(), + Arc::new(WindowFrame::new(Some(false))), + schema.as_ref(), + false, + ) + .unwrap()], + Arc::clone(&input), + vec![], + InputOrderMode::Sorted, + ) + .unwrap(), + ) +} + +pub fn filter_exec( + predicate: Arc, + input: Arc, +) -> Arc { + Arc::new(FilterExec::try_new(predicate, input).unwrap()) +} + +pub fn sort_preserving_merge_exec( + sort_exprs: impl IntoIterator, + input: Arc, +) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortPreservingMergeExec::new(sort_exprs, input)) +} + +pub fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) +} + +pub fn limit_exec(input: Arc) -> Arc { + global_limit_exec(local_limit_exec(input)) +} + +pub fn local_limit_exec(input: Arc) -> Arc { + Arc::new(LocalLimitExec::new(input, 100)) +} + +pub fn global_limit_exec(input: Arc) -> Arc { + Arc::new(GlobalLimitExec::new(input, 0, Some(100))) +} + +pub fn repartition_exec(input: Arc) -> Arc { + Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap()) +} + +pub fn spr_repartition_exec(input: Arc) -> Arc { + Arc::new( + RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)) + .unwrap() + .with_preserve_order(), + ) +} + +pub fn aggregate_exec(input: Arc) -> Arc { + let schema = input.schema(); + Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) +} + +pub fn coalesce_batches_exec(input: Arc) -> Arc { + Arc::new(CoalesceBatchesExec::new(input, 128)) +} + +pub fn sort_exec( + sort_exprs: impl IntoIterator, + input: Arc, +) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortExec::new(sort_exprs, input)) +} + +/// A test [`ExecutionPlan`] whose requirements can be configured. +#[derive(Debug)] +pub struct RequirementsTestExec { + required_input_ordering: LexOrdering, + maintains_input_order: bool, + input: Arc, +} + +impl RequirementsTestExec { + pub fn new(input: Arc) -> Self { + Self { + required_input_ordering: LexOrdering::default(), + maintains_input_order: true, + input, + } + } + + /// sets the required input ordering + pub fn with_required_input_ordering( + mut self, + required_input_ordering: LexOrdering, + ) -> Self { + self.required_input_ordering = required_input_ordering; + self + } + + /// set the maintains_input_order flag + pub fn with_maintains_input_order(mut self, maintains_input_order: bool) -> Self { + self.maintains_input_order = maintains_input_order; + self + } + + /// returns this ExecutionPlan as an `Arc` + pub fn into_arc(self) -> Arc { + Arc::new(self) + } +} + +impl DisplayAs for RequirementsTestExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "RequiredInputOrderingExec") + } +} + +impl ExecutionPlan for RequirementsTestExec { + fn name(&self) -> &str { + "RequiredInputOrderingExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.input.properties() + } + + fn required_input_ordering(&self) -> Vec> { + let requirement = LexRequirement::from(self.required_input_ordering.clone()); + vec![Some(requirement)] + } + + fn maintains_input_order(&self) -> Vec { + vec![self.maintains_input_order] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + Ok(RequirementsTestExec::new(Arc::clone(&children[0])) + .with_required_input_ordering(self.required_input_ordering.clone()) + .with_maintains_input_order(self.maintains_input_order) + .into_arc()) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!("Test exec does not support execution") + } +} + +/// A [`PlanContext`] object is susceptible to being left in an inconsistent state after +/// untested mutable operations. It is crucial that there be no discrepancies between a plan +/// associated with the root node and the plan generated after traversing all nodes +/// within the [`PlanContext`] tree. In addition to verifying the plans resulting from optimizer +/// rules, it is essential to ensure that the overall tree structure corresponds with the plans +/// contained within the node contexts. +/// TODO: Once [`ExecutionPlan`] implements [`PartialEq`], string comparisons should be +/// replaced with direct plan equality checks. +pub fn check_integrity(context: PlanContext) -> Result> { + context + .transform_up(|node| { + let children_plans = node.plan.children(); + assert_eq!(node.children.len(), children_plans.len()); + for (child_plan, child_node) in + children_plans.iter().zip(node.children.iter()) + { + assert_eq!( + displayable(child_plan.as_ref()).one_line().to_string(), + displayable(child_node.plan.as_ref()).one_line().to_string() + ); + } + Ok(Transformed::no(node)) + }) + .data() +}