-
Notifications
You must be signed in to change notification settings - Fork 5
Feature/dask distributed #152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Since writing the PR description above, I've been testing this branch with various other workloads, which has helped to shed light on issues with other
When I started testing with other transformations, I found and had to fix issues with First, an update on
I also ran with two other configurations:
Now, turning to other workloads: I used the TPC-H datasets and queries I wrote about here. Below I briefly explain each query/workload, and then give the performance figures. At the end, I'll discuss some takeaways and next steps. Finally, in a separate comment, I'll give some detail about how various operations work under-the-hood and why some workloads benefit from dask distributed while others do not. BackgroundTPC-H is a standard data schema representing a business's operations: parts, suppliers, orders, customers, etc. It has been used for decades to test relational database systems. For testing earthmover, I used tcph-kit to generate various (doubling) sizes (from about 10MB to over 20GB) of the TPC-H dataset. I then created earthmover transformation YAML for the first 5 of the canonical TPC-H queries. TPC-H query 1 (
|
| Input size | main em | distributed em (2 core x 2.4 GB) | distributed em (4 core x 1.2 GB) | distributed em (6 core x 800 MB) |
|---|---|---|---|---|
| 10M | 18 | 21 | 33 | 32 |
| 20M | 34 | 31 | 36 | 47 |
| 50M | 57 | 66 | 69 | 92 |
| 100M | 130 | 126 | 131 | 151 |
| 200M | 582 | 290 | 216 | 296 |
| 500M | 3254 | 765 | 916 | - |
TPC-H query 2 (join)
This query computes account balances for suppliers. It consists of
- select from 5 tables (joined together)
- filter rows by supplier region and part type and size
- a subquery
- sort result by balance and other fields
(The earthmover.yml implementation pushes the filters up front, to minimize the number of rows that must be joined.)
earthmover performance results (numbers are runtime in seconds):
| Input size | main em | distributed em (2 core x 2.4 GB) | distributed em (4 core x 1.2 GB) | distributed em (6 core x 800 MB) |
|---|---|---|---|---|
| 10M | 6 | 14 | 18 | 24 |
| 20M | 7 | 15 | 19 | 22 |
| 50M | 7 | 19 | 25 | 32 |
| 100M | 9 | 25 | 28 | 42 |
| 200M | 12 | 39 | 49 | 65 |
| 500M | 25 | 84 | 77 | 138 |
| 1G | 167 | 174 | 137 | 267 |
| 2G | 391 | 324 | 328 | 379 |
| 5G | 1075 | 1295 | 1103 | - |
TPC-H query 3 (join and group_by)
This query computes revenue per order. It consists of
- select from 3 tables (joined together)
- filter rows by market segment and for a specific date
- group by several order-related fields
- sort result by revenue and order date
earthmover performance results (numbers are runtime in seconds):
| Input size | main em | distributed em (2 core x 2.4 GB) | distributed em (4 core x 1.2 GB) | distributed em (6 core x 800 MB) |
|---|---|---|---|---|
| 10M | 6 | 15 | 14 | 19 |
| 20M | 7 | 19 | 16 | 20 |
| 50M | 10 | 25 | 23 | 21 |
| 100M | 16 | 39 | 30 | 33 |
| 200M | 37 | 76 | 57 | 57 |
| 500M | 225 | 258 | 156 | 154 |
| 1G | - | 568 | 496 | 507 |
| 2G | - | 2446 | 1177 | 1266 |
TPC-H query 4 (join and group_by)
This query computes the number of orders by priority over a certain date range. It consists of
- select from one table
- filter rows by a specific date range
- a subquery (for existence) over another table
- group by order priority
- sort result by order priority
earthmover performance results (numbers are runtime in seconds):
| Input size | main em | distributed em (2 core x 2.4 GB) | distributed em (4 core x 1.2 GB) | distributed em (6 core x 800 MB) |
|---|---|---|---|---|
| 10M | 5 | 10 | 10 | 12 |
| 20M | 7 | 10 | 10 | 14 |
| 50M | 11 | 14 | 13 | 18 |
| 100M | 20 | 20 | 16 | 20 |
| 200M | 59 | 38 | 31 | 34 |
| 500M | 149 | 102 | 63 | 77 |
| 1G | 291 | 219 | 131 | 147 |
| 2G | 485 | 583 | 329 | 417 |
| 5G | 2076 | 1253 | 1013 | 915 |
TPC-H query 5 (join and group_by)
This query computes revenue by country over a certain date range. It consists of
- select from 6 table
- filter rows by a specific date range
- group by country name, sum a revenue expression
- sort result by revenue
earthmover performance results (numbers are runtime in seconds):
| Input size | main em | distributed em (2 core x 2.4 GB) | distributed em (4 core x 1.2 GB) | distributed em (6 core x 800 MB) |
|---|---|---|---|---|
| 10M | 10 | 19 | 21 | 22 |
| 20M | 15 | 30 | 29 | 29 |
| 50M | 30 | 42 | 40 | 36 |
| 100M | 59 | 99 | 60 | 52 |
| 200M | 327 | 229 | 151 | 133 |
| 500M | 1502 | 653 | 456 | 325 |
| 1G | 4397 | 1086 | 734 | 942 |
More about dask distributed performance in the next comment, but general takeaways include
- dask distributed certainly adds some overhead
group_byperformance is the worst of anyoperation, and the morecreate_columnsthere are, the worse it is- benefits of using earthmover distributed are only realized on large data (hundreds of MB or several GB or larger); distributed earthmover improves some workloads' wall-clock processing time, and also enables processing larger datasets (on which single-thread earthmover can fail)
- the "sweet spot" depends on the workload, but often using 4 workers/cores seems like a good balance of parallelism vs. coordination
- scaling too wide (too many workers/cores) can make memory the bottleneck
- scaling tends to be approximately linear
My next testing step will be to test distributed earthmover's performance with the Ed-Fi bundles and some large assessment data files.
|
In this comment, I'll explain some things I've learned about how Dask and Dask distributed work, how earthmover's various operations use them, and how this affects performance. Once this work is more mature and thoroughly tested, perhaps portions of this comment can be included in the earthmover docs on a new "distributed" page. A core innovation of Dask is representation of dataframes as a collection of partitions, each being a Pandas dataframe. This, together with writing intermediate Pandas partitions to disk as needed and clever re-implementation of some Pandas dataframe methods, enable Dask to handle dataframes larger than memory. And this is why earthmover initially used Dask.
The scheduler (1) carves up data transformations on an entire dataframe into smaller transformations on each partition, and it sends these smaller pieces of work to the workers (2). Let us consider some of the
|
…ndex on destinatino final JSON series











This DRAFT PR is not yet ready for merge, however I want to explain what the work represents.
earthmovercurrently uses Dask (as opposed to Pandas, or other DataFrame libraries) for essentially one reason only: the ability to handle larger-then-memory data by spilling to disk. However,earthmover's use of Dask is single-threaded... if you runearthmoveron a large machine with (say) 32 cores, only one core will be used to do transformation.Some transformation workloads you might want to run with
earthmoverare memory-bound: such workloads wouldn't benefit much or possibly at all from parallelizing Dask. However, some transformation workloads are compute-bound: such workloads could benefit from parallelization - possibly a lot.This branch uses Dask's LocalCluster to parallelize workloads across a configurable number of concurrent workers (assigned to different cores on the machine). Initial tests on my laptop, with 16 CPU cores and 8GB memory, seem to show that this can make some workloads run faster.
In
example_projects/01_simple/we have abig_earthmover.yamlwhich does a simple transformation on a 1B-row, 3.2GB TSV file and produces a 1B-line 27.9GB JSONL file.earthmover, this runs in 71 minutes.To get this to work, several things had to be refactored in
earthmover, especially because Dask parallelizes by serializing/pickling data and objects when it ships them to workers on another core. Some of the specific changes that were needed:.map_partitions()) to deal with the fact that compiled Jinja templates are not serializableblocksize, since they get wider (and larger) when turned into JSONL in a destinationdf.to_csv(self.file, single_file=True, ...)to avoid each thread blocking on I/O*kwargs from several methods, which prevent object serializationSee the further comments below for performance analysis of earthmover distributed, and for details about how earthmover works with Dask.
Further work to wrap up here includes:
fixing thePackageclass anddepsbuilding process (currently commented out) which still throws Dask serialization errorsconfig.daskanddask_cluster_kwargs(possibly dynamic, based on host machine specs)