Skip to content

Commit

Permalink
test: update tests to reflect updated invariant checking paradigm
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Jan 14, 2025
1 parent 94482d1 commit ad15c85
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 22 deletions.
217 changes: 196 additions & 21 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2082,6 +2082,8 @@ mod tests {
use super::*;
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::MemTable;
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
use crate::physical_plan::{
expressions, DisplayAs, DisplayFormatType, PlanProperties,
SendableRecordBatchStream,
Expand All @@ -2098,7 +2100,11 @@ mod tests {
use datafusion_execution::TaskContext;
use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::expressions::{
col as physical_expr_col, lit as physical_expr_lit,
};
use datafusion_physical_expr::{Distribution, EquivalenceProperties, LexRequirement};
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

fn make_session_state() -> SessionState {
Expand Down Expand Up @@ -2848,9 +2854,29 @@ digraph {
assert_contains!(generated_graph, expected_tooltip);
}

fn default_plan_props() -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(Arc::new(Schema::empty())),
Partitioning::RoundRobinBatch(1),
EmissionType::Final,
Boundedness::Bounded,
)
}

/// Extension Node which passes invariant checks
#[derive(Debug)]
struct OkExtensionNode(Vec<Arc<dyn ExecutionPlan>>);
struct OkExtensionNode {
children: Vec<Arc<dyn ExecutionPlan>>,
properties: PlanProperties,
}
impl OkExtensionNode {
fn new(children: Vec<Arc<dyn ExecutionPlan>>) -> Self {
Self {
children,
properties: default_plan_props(),
}
}
}
impl ExecutionPlan for OkExtensionNode {
fn name(&self) -> &str {
"always ok"
Expand All @@ -2859,19 +2885,19 @@ digraph {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self(children)))
Ok(Arc::new(Self::new(children)))
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
self
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
self.children.iter().collect::<Vec<_>>()
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
&self.properties
}
fn execute(
&self,
Expand All @@ -2889,7 +2915,16 @@ digraph {

/// Extension Node which fails invariant checks
#[derive(Debug)]
struct InvariantFailsExtensionNode;
struct InvariantFailsExtensionNode {
properties: PlanProperties,
}
impl InvariantFailsExtensionNode {
fn new() -> Self {
Self {
properties: default_plan_props(),
}
}
}
impl ExecutionPlan for InvariantFailsExtensionNode {
fn name(&self) -> &str {
"InvariantFailsExtensionNode"
Expand All @@ -2907,13 +2942,13 @@ digraph {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
self
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
vec![]
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
&self.properties
}
fn execute(
&self,
Expand Down Expand Up @@ -2946,15 +2981,18 @@ digraph {
fn schema_check(&self) -> bool {
true
}
fn executable_check(&self, _previous_plan_is_valid: bool) -> bool {
true
}
}

#[test]
fn test_invariant_checker() -> Result<()> {
fn test_invariant_checker_with_execution_plan_extensions() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(OptimizerRuleWithSchemaCheck);

// ok plan
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode::new(vec![]));
let child = Arc::clone(&ok_node);
let ok_plan = Arc::clone(&ok_node).with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?,
Expand All @@ -2963,38 +3001,175 @@ digraph {

// Test: check should pass with same schema
let equal_schema = ok_plan.schema();
InvariantChecker.check(&ok_plan, &rule, equal_schema)?;
InvariantChecker::new(&Default::default(), &rule).check(
&ok_plan,
equal_schema,
true,
)?;

// Test: should fail with schema changed
let different_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let expected_err = InvariantChecker
.check(&ok_plan, &rule, different_schema)
let expected_err = InvariantChecker::new(&Default::default(), &rule)
.check(&ok_plan, different_schema, true)
.unwrap_err();
assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema"));

// Test: should fail when extension node fails it's own invariant check
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let expected_err = InvariantChecker
.check(&failing_node, &rule, ok_plan.schema())
let failing_node: Arc<dyn ExecutionPlan> =
Arc::new(InvariantFailsExtensionNode::new());
let expected_err = InvariantChecker::new(&Default::default(), &rule)
.check(&failing_node, ok_plan.schema(), true)
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined invariant check"));

// Test: should fail when descendent extension node fails
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let failing_node: Arc<dyn ExecutionPlan> =
Arc::new(InvariantFailsExtensionNode::new());
let invalid_plan = ok_node.with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
Arc::clone(&child),
])?;
let expected_err = InvariantChecker
.check(&invalid_plan, &rule, ok_plan.schema())
let expected_err = InvariantChecker::new(&Default::default(), &rule)
.check(&invalid_plan, ok_plan.schema(), true)
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined invariant check"));

// Test: confirm error message contains both the user-defined extension name and the optimizer rule name
assert!(expected_err
.to_string()
.contains("Invariant for ExecutionPlan node 'InvariantFailsExtensionNode' failed for PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck'"));

Ok(())
}

fn wrap_in_nonexecutable(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, false)]));
let col_a = physical_expr_col("a", &schema)?;

// mutate the tree is such a way that is NOT yet executable
Ok(Arc::new(OutputRequirementExec::new(
plan,
Some(LexRequirement::from_lex_ordering(
vec![PhysicalSortExpr::new_default(col_a)].into(),
)),
Distribution::UnspecifiedDistribution,
)))
}

/// Extension where the physical plan mutation creates a non-executable plan.
///
/// This is a "failing" extension, since it doesn't implement [`PhysicalOptimizerRule::executable_check`]
/// as false.
#[derive(Debug)]
struct ExtensionRuleDoesBadMutation;
impl PhysicalOptimizerRule for ExtensionRuleDoesBadMutation {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
wrap_in_nonexecutable(plan)
}
fn name(&self) -> &str {
"ExtensionRuleDoesBadMutation"
}
fn schema_check(&self) -> bool {
true
}
}

/// Extension where the physical plan mutation creates a non-executable plan.
///
/// This extension properly implements [`PhysicalOptimizerRule::executable_check`] => false.
/// And then the follow up optimizer runs may be performed.
#[derive(Debug)]
struct ExtensionRuleNeedsMoreRuns;
impl PhysicalOptimizerRule for ExtensionRuleNeedsMoreRuns {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
wrap_in_nonexecutable(plan)
}
fn name(&self) -> &str {
"ExtensionRuleNeedsMoreRuns"
}
fn schema_check(&self) -> bool {
true
}
fn executable_check(&self, _previous_plan_is_valid: bool) -> bool {
false
}
}

#[test]
fn test_invariant_checker_with_optimization_extension() -> Result<()> {
let planner = DefaultPhysicalPlanner {
extension_planners: vec![],
};

// ok plan
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)]));
let ok_plan = Arc::new(MemoryExec::try_new_as_values(
schema,
vec![vec![physical_expr_lit(ScalarValue::UInt32(None))]],
)?);

// Test: check should pass with valid OpimizerRule mutation
let session = SessionStateBuilder::new()
.with_physical_optimizer_rules(vec![Arc::new(OptimizerRuleWithSchemaCheck)])
.build();
assert_eq!(
session.physical_optimizers().len(),
1,
"should have the 1 valid optimizer rule"
);
planner.optimize_physical_plan(ok_plan.clone(), &session, |_, _| {})?;

// Test: should fail with invalid OpimizerRule mutation that leaves plan not executable
let session = SessionStateBuilder::new()
.with_physical_optimizer_rules(vec![
Arc::new(SanityCheckPlan::new()), // should produce executable plan
Arc::new(ExtensionRuleDoesBadMutation), // will fail executable check
])
.build();
assert_eq!(
session.physical_optimizers().len(),
2,
"should have 2 optimizer rules"
);
let expected_err = planner
.optimize_physical_plan(ok_plan.clone(), &session, |_, _| {})
.unwrap_err();
assert!(expected_err
.to_string()
.contains("SanityCheckPlan failed for PhysicalOptimizer rule 'ExtensionRuleDoesBadMutation'"));

// Test: should pass once the proper additional optimizer rules are applied after the Extension rule
let session = SessionStateBuilder::new()
.with_physical_optimizer_rules(vec![
Arc::new(SanityCheckPlan::new()), // should produce executable plan
Arc::new(ExtensionRuleNeedsMoreRuns), // Extension states that the returned plan is not executable
Arc::new(EnforceSorting::new()), // should mutate plan
Arc::new(SanityCheckPlan::new()), // should produce executable plan
])
.build();
assert_eq!(
session.physical_optimizers().len(),
4,
"should have 4 optimizer rules"
);
planner.optimize_physical_plan(ok_plan, &session, |_, _| {})?;

Ok(())
}
}
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// A default set of invariants is provided in the default implementation.
/// Extension nodes can provide their own invariants.
fn check_node_invariants(&self) -> Result<()> {
// TODO
Ok(())
}

Expand Down

0 comments on commit ad15c85

Please sign in to comment.