Skip to content

Commit

Permalink
Merge commit '8fcb3e4b8a8fe6c4c7f83922dd860c9fa78c0639' into smith/up…
Browse files Browse the repository at this point in the history
…date-df-june-week-1
  • Loading branch information
jeffreyssmith2nd committed Jun 14, 2024
2 parents 55cdcad + 8fcb3e4 commit e495f1f
Show file tree
Hide file tree
Showing 44 changed files with 2,979 additions and 1,977 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ rand = "0.8"
regex = "1.8"
rstest = "0.21.0"
serde_json = "1"
sqlparser = { version = "0.45.0", features = ["visitor"] }
sqlparser = { version = "0.47", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
Expand Down
10 changes: 8 additions & 2 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ Create / download a specific dataset (TPCH)

Data is placed in the `data` subdirectory.

## Select join algorithm
The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm.
To run TPCH benchmarks with join other than HASH:
```shell
PREFER_HASH_JOIN=false ./bench.sh run tpch
```

## Comparing performance of main and a branch

```shell
Expand Down Expand Up @@ -177,7 +184,6 @@ The benchmark program also supports CSV and Parquet input file formats and a uti
```bash
cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet
```

Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.

### Comparing results between runs
Expand Down Expand Up @@ -261,7 +267,7 @@ SUBCOMMANDS:

# Benchmarks

The output of `dfbench` help includes a descripion of each benchmark, which is reproducedd here for convenience
The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience

## ClickBench

Expand Down
56 changes: 18 additions & 38 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
#CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # for faster iterations
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}

usage() {
echo "
Expand All @@ -52,8 +53,8 @@ Examples:
# Create the datasets for all benchmarks in $DATA_DIR
./bench.sh data
# Run the 'tpch' benchmark on the datafusion checkout in /source/arrow-datafusion
DATAFUSION_DIR=/source/arrow-datafusion ./bench.sh run tpch
# Run the 'tpch' benchmark on the datafusion checkout in /source/datafusion
DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch
**********
* Commands
Expand All @@ -67,10 +68,8 @@ compare: Compares results from benchmark runs
**********
all(default): Data/Run/Compare for all benchmarks
tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, hash join
tpch_smj: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, sort merge join
tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, hash join
tpch_smj10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, sort merge join
tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
Expand All @@ -81,10 +80,11 @@ clickbench_extended: ClickBench "inspired" queries against a single parquet (
**********
* Supported Configuration (Environment Variables)
**********
DATA_DIR directory to store datasets
CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
DATA_DIR directory to store datasets
CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
PREFER_HASH_JOIN Prefer hash join algorithm(default true)
"
exit 1
}
Expand Down Expand Up @@ -131,6 +131,7 @@ main() {
echo "BENCHMARK: ${BENCHMARK}"
echo "DATA_DIR: ${DATA_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
echo "***************************"
case "$BENCHMARK" in
all)
Expand Down Expand Up @@ -185,6 +186,7 @@ main() {
echo "DATA_DIR: ${DATA_DIR}"
echo "RESULTS_DIR: ${RESULTS_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN": ${PREFER_HASH_JOIN}
echo "***************************"

# navigate to the appropriate directory
Expand Down Expand Up @@ -215,12 +217,6 @@ main() {
tpch_mem10)
run_tpch_mem "10"
;;
tpch_smj)
run_tpch_smj "1"
;;
tpch_smj10)
run_tpch_smj "10"
;;
parquet)
run_parquet
;;
Expand Down Expand Up @@ -306,7 +302,7 @@ data_tpch() {
else
echo " creating parquet files using benchmark binary ..."
pushd "${SCRIPT_DIR}" > /dev/null
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --output "${TPCH_DIR}" --format parquet
popd > /dev/null
fi
}
Expand All @@ -323,22 +319,7 @@ run_tpch() {
RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch benchmark with sort merge join
run_tpch_smj() {
SCALE_FACTOR=$1
if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi
TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

RESULTS_FILE="${RESULTS_DIR}/tpch_smj_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch SMJ benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join false --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch in memory
Expand All @@ -354,23 +335,23 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} -m --format parquet -o ${RESULTS_FILE}
}

# Runs the parquet filter benchmark
run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}


Expand Down Expand Up @@ -424,26 +405,25 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench benchmark with the partitioned parquet files
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
}


compare_benchmarks() {
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
BRANCH1="$1"
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 6 additions & 10 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,14 @@ async fn main_inner() -> Result<()> {
let rt_config =
// set memory pool size
if let Some(memory_limit) = args.memory_limit {
// unwrap is safe here because is_valid_memory_pool_size already checked the value
let memory_limit = extract_memory_pool_size(&memory_limit).unwrap();
// set memory pool type
if let Some(mem_pool_type) = args.mem_pool_type {
match mem_pool_type {
PoolType::Greedy => rt_config
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))),
PoolType::Fair => rt_config
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
}
} else {
rt_config
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
match args.mem_pool_type {
Some(PoolType::Fair) => rt_config
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
_ => rt_config
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
}
} else {
rt_config
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ fn init() {
let _ = env_logger::try_init();
}

// Disabled due to https://github.com/apache/datafusion/issues/10793
#[cfg(not(target_family = "windows"))]
#[rstest]
#[case::exec_from_commands(
["--command", "select 1", "--format", "json", "-q"],
Expand Down
3 changes: 3 additions & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.11"
url = { workspace = true }
uuid = "1.7"

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.28.0", features = ["fs"] }
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ cargo run --example csv_sql
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
Expand Down
Loading

0 comments on commit e495f1f

Please sign in to comment.