diff --git a/Cargo.lock b/Cargo.lock index 8fafcf4..29477dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,7 +2085,7 @@ dependencies = [ [[package]] name = "raysql" -version = "0.2.0" +version = "0.2.1" dependencies = [ "datafusion", "datafusion-proto", diff --git a/Cargo.toml b/Cargo.toml index b77c1c7..6c2f47b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ description = "RaySQL: DataFusion on Ray" homepage = "https://github.com/andygrove/ray-sql" repository = "https://github.com/andygrove/ray-sql" authors = ["Andy Grove "] -version = "0.2.0" +version = "0.3.0" edition = "2021" readme = "README.md" license = "Apache-2.0" @@ -33,4 +33,8 @@ name = "raysql" crate-type = ["cdylib", "rlib"] [package.metadata.maturin] -name = "raysql._raysql_internal" \ No newline at end of file +name = "raysql._raysql_internal" + +[profile.release] +codegen-units = 1 +lto = true \ No newline at end of file diff --git a/README.md b/README.md index 27d2b6e..566b4eb 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # RaySQL: DataFusion on Ray -This is an experimental research project to evaluate the concept of performing distributed SQL queries from Python, using +This is a personal research project to evaluate performing distributed SQL queries from Python, using [Ray](https://www.ray.io/) and [DataFusion](https://github.com/apache/arrow-datafusion). ## Example -See [examples/tips.py](examples/tips.py). +Run the following example live in your browser using a Google Colab [notebook](https://colab.research.google.com/drive/1tmSX0Lu6UFh58_-DBUVoyYx6BoXHOszP?usp=sharing). ```python import ray @@ -40,11 +40,31 @@ print(result_set) ## Performance -This chart shows the relative performance of RaySQL compared to other open-source distributed SQL frameworks. +This chart shows the performance of RaySQL compared to Apache Spark for +[SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set (10GB), running on my desktop (Threadripper +with 24 physical cores). Both RaySQL and Spark are configured with 24 executors. -Performance is looking pretty respectable! +Note that query 15 is excluded from both results since RaySQL does not support DDL yet. -![SQLBench-H Performance Chart](./docs/sqlbench-h-workstation-10-distributed-perquery.png) +### Overall Time + +RaySQL is ~30% faster overall for this scale factor and environment. + +![SQLBench-H Total](./docs/sqlbench-h-total.png) + +### Per Query Time + +Spark is much faster on some queries, likely due to broadcast exchanges, which RaySQL hasn't implemented yet. + +![SQLBench-H Per Query](./docs/sqlbench-h-per-query.png) + +### Performance Plan + +I'm planning on experimenting with the following changes to improve performance: + +- Make better use of Ray futures to run more tasks in parallel +- Use Ray object store for shuffle data transfer to reduce disk I/O cost +- Keep upgrading to newer versions of DataFusion to pick up the latest optimizations ## Building diff --git a/docs/sqlbench-h-per-query.png b/docs/sqlbench-h-per-query.png new file mode 100644 index 0000000..d2b75d7 Binary files /dev/null and b/docs/sqlbench-h-per-query.png differ diff --git a/docs/sqlbench-h-total.png b/docs/sqlbench-h-total.png new file mode 100644 index 0000000..02bbf35 Binary files /dev/null and b/docs/sqlbench-h-total.png differ diff --git a/docs/sqlbench-h-workstation-10-distributed-perquery.png b/docs/sqlbench-h-workstation-10-distributed-perquery.png deleted file mode 100644 index e2cb062..0000000 Binary files a/docs/sqlbench-h-workstation-10-distributed-perquery.png and /dev/null differ diff --git a/raysql/context.py b/raysql/context.py index ff8c26f..d18034b 100644 --- a/raysql/context.py +++ b/raysql/context.py @@ -39,7 +39,7 @@ def execute_query_stage(self, graph, stage): print("Scheduling query stage #{} with {} input partitions and {} output partitions".format(stage.id(), partition_count, stage.get_output_partition_count())) # serialize the plan - plan_bytes = self.ctx.serialize_execution_plan(stage.get_execution_plan()) + plan_bytes = ray.put(self.ctx.serialize_execution_plan(stage.get_execution_plan())) # round-robin allocation across workers futures = [] diff --git a/src/context.rs b/src/context.rs index aed2133..43a5c8b 100644 --- a/src/context.rs +++ b/src/context.rs @@ -29,7 +29,13 @@ pub struct PyContext { impl PyContext { #[new] pub fn new(target_partitions: usize) -> Self { - let config = SessionConfig::default().with_target_partitions(target_partitions); + let config = SessionConfig::default() + .with_target_partitions(target_partitions) + .with_batch_size(16*1024) + .with_repartition_aggregations(true) + .with_repartition_windows(true) + .with_repartition_joins(true) + .with_parquet_pruning(true); Self { ctx: SessionContext::with_config(config), }