Skip to content

Commit

Permalink
Add benchmark for planning sorted unions (#14157)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jan 19, 2025
1 parent 52fb1a0 commit 3a65be0
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ mod data_utils;

use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow_array::{ArrayRef, RecordBatch};
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::ScalarValue;
use datafusion_expr::col;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
Expand Down Expand Up @@ -147,6 +149,77 @@ fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Benche
});
}

/// Registers a table like this:
/// c0,c1,c2...,c99
/// 0,100...9900
/// 0,200...19800
/// 0,300...29700
fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) {
// ("c0", [0, 0, ...])
// ("c1": [100, 200, ...])
// etc
let iter = (0..num_columns).map(|i| i as u64).map(|i| {
let array: ArrayRef = Arc::new(arrow::array::UInt64Array::from_iter_values(
(0..num_rows)
.map(|j| j as u64 * 100 + i)
.collect::<Vec<_>>(),
));
(format!("c{}", i), array)
});
let batch = RecordBatch::try_from_iter(iter).unwrap();
let schema = batch.schema();
let partitions = vec![vec![batch]];

// tell DataFusion that the table is sorted by all columns
let sort_order = (0..num_columns)
.map(|i| col(format!("c{}", i)).sort(true, true))
.collect::<Vec<_>>();

// create the table
let table = MemTable::try_new(schema, partitions)
.unwrap()
.with_sort_order(vec![sort_order]);

ctx.register_table("t", Arc::new(table)).unwrap();
}

/// return a query like
/// ```sql
/// select c1, null as c2, ... null as cn from t ORDER BY c1
/// UNION ALL
/// select null as c1, c2, ... null as cn from t ORDER BY c2
/// ...
/// select null as c1, null as c2, ... cn from t ORDER BY cn
/// ORDER BY c1, c2 ... CN
/// ```
fn union_orderby_query(n: usize) -> String {
let mut query = String::new();
for i in 0..n {
if i != 0 {
query.push_str("\n UNION ALL \n");
}
let select_list = (0..n)
.map(|j| {
if i == j {
format!("c{j}")
} else {
format!("null as c{j}")
}
})
.collect::<Vec<_>>()
.join(", ");
query.push_str(&format!("(SELECT {} FROM t ORDER BY c{})", select_list, i));
}
query.push_str(&format!(
"\nORDER BY {}",
(0..n)
.map(|i| format!("c{}", i))
.collect::<Vec<_>>()
.join(", ")
));
query
}

fn criterion_benchmark(c: &mut Criterion) {
// verify that we can load the clickbench data prior to running the benchmark
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
Expand Down Expand Up @@ -289,6 +362,17 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

// -- Sorted Queries --
register_union_order_table(&ctx, 100, 1000);

// this query has many expressions in its sort order so stresses
// order equivalence validation
c.bench_function("physical_sorted_union_orderby", |b| {
// SELECT ... UNION ALL ...
let query = union_orderby_query(20);
b.iter(|| physical_plan(&ctx, &query))
});

// --- TPC-H ---

let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
Expand Down

0 comments on commit 3a65be0

Please sign in to comment.