Skip to content

Conversation

@tomreitz
Copy link
Collaborator

@tomreitz tomreitz commented Feb 20, 2025

This DRAFT PR is not yet ready for merge, however I want to explain what the work represents.

earthmover currently uses Dask (as opposed to Pandas, or other DataFrame libraries) for essentially one reason only: the ability to handle larger-then-memory data by spilling to disk. However, earthmover's use of Dask is single-threaded... if you run earthmover on a large machine with (say) 32 cores, only one core will be used to do transformation.

Some transformation workloads you might want to run with earthmover are memory-bound: such workloads wouldn't benefit much or possibly at all from parallelizing Dask. However, some transformation workloads are compute-bound: such workloads could benefit from parallelization - possibly a lot.

This branch uses Dask's LocalCluster to parallelize workloads across a configurable number of concurrent workers (assigned to different cores on the machine). Initial tests on my laptop, with 16 CPU cores and 8GB memory, seem to show that this can make some workloads run faster.

In example_projects/01_simple/ we have a big_earthmover.yaml which does a simple transformation on a 1B-row, 3.2GB TSV file and produces a 1B-line 27.9GB JSONL file.

  • Under the most-recent released version of earthmover, this runs in 71 minutes.
  • Under this branch, with the configured 4 cores with 1.2 GB memory each, it ran in 49 minutes (30% improvement).

To get this to work, several things had to be refactored in earthmover, especially because Dask parallelizes by serializing/pickling data and objects when it ships them to workers on another core. Some of the specific changes that were needed:

  • use Python partials instead of lambda expressions (which are not serializable)
  • move Jinja template compilation into each partition (the method called by .map_partitions()) to deal with the fact that compiled Jinja templates are not serializable
  • read input files with a smaller-than-default blocksize, since they get wider (and larger) when turned into JSONL in a destination
  • write output files with df.to_csv(self.file, single_file=True, ...) to avoid each thread blocking on I/O
  • remove extra * kwargs from several methods, which prevent object serialization

See the further comments below for performance analysis of earthmover distributed, and for details about how earthmover works with Dask.

Further work to wrap up here includes:

  • fixing the Package class and deps building process (currently commented out) which still throws Dask serialization errors
  • document and determine reasonable defaults for earthmover.yml config.dask and dask_cluster_kwargs (possibly dynamic, based on host machine specs)

@tomreitz
Copy link
Collaborator Author

tomreitz commented Mar 12, 2025

Since writing the PR description above, I've been testing this branch with various other workloads, which has helped to shed light on issues with other operations. The big_earthmover.yml project, while a good test of a certain kind of scaling, is an incredibly simple transformation - it doesn't change the grain of the data at all, it simply

  • maps values of one column
  • renames one column
  • adds one column

When I started testing with other transformations, I found and had to fix issues with join and group_by, among others. In this comment, I will share some performance data for various workloads I've tested.

First, an update on big_earthmover.yml (the 1B-row, 3.2GB TSV input file that produces a 1B-line 27.9GB JSONL output file). In addition to the previous two data points:

  • Under the most-recent released version of earthmover, this runs in 71 minutes.
  • Under this branch, with the configured 4 cores with 1.2 GB memory each, it ran in 49 minutes (30% improvement).

I also ran with two other configurations:

  • Under this branch, with 2 cores with 2.4 GB memory each, it ran in 42 minutes (41% improvement over current main).
  • Under this branch, with 6 cores with 800 MB memory each, it ran in 51 minutes (28% improvement over current main).

Now, turning to other workloads: I used the TPC-H datasets and queries I wrote about here. Below I briefly explain each query/workload, and then give the performance figures. At the end, I'll discuss some takeaways and next steps. Finally, in a separate comment, I'll give some detail about how various operations work under-the-hood and why some workloads benefit from dask distributed while others do not.

Background

TPC-H is a standard data schema representing a business's operations: parts, suppliers, orders, customers, etc. It has been used for decades to test relational database systems. For testing earthmover, I used tcph-kit to generate various (doubling) sizes (from about 10MB to over 20GB) of the TPC-H dataset. I then created earthmover transformation YAML for the first 5 of the canonical TPC-H queries.

TPC-H query 1 (group_by)

This query computes various statistics about ordered items by status. It consists of

  • select from a single table
  • filter rows by date
  • group by statuses and compute sum and average of several columns
  • sort result by statuses

(The query ultimately produces very few rows, less than a dozen.)

earthmover performance results (numbers are runtime in seconds):

Input size main em distributed em (2 core x 2.4 GB) distributed em (4 core x 1.2 GB) distributed em (6 core x 800 MB)
10M 18 21 33 32
20M 34 31 36 47
50M 57 66 69 92
100M 130 126 131 151
200M 582 290 216 296
500M 3254 765 916 -

image

TPC-H query 2 (join)

This query computes account balances for suppliers. It consists of

  • select from 5 tables (joined together)
  • filter rows by supplier region and part type and size
  • a subquery
  • sort result by balance and other fields

(The earthmover.yml implementation pushes the filters up front, to minimize the number of rows that must be joined.)

earthmover performance results (numbers are runtime in seconds):

Input size main em distributed em (2 core x 2.4 GB) distributed em (4 core x 1.2 GB) distributed em (6 core x 800 MB)
10M 6 14 18 24
20M 7 15 19 22
50M 7 19 25 32
100M 9 25 28 42
200M 12 39 49 65
500M 25 84 77 138
1G 167 174 137 267
2G 391 324 328 379
5G 1075 1295 1103 -

image

TPC-H query 3 (join and group_by)

This query computes revenue per order. It consists of

  • select from 3 tables (joined together)
  • filter rows by market segment and for a specific date
  • group by several order-related fields
  • sort result by revenue and order date

earthmover performance results (numbers are runtime in seconds):

Input size main em distributed em (2 core x 2.4 GB) distributed em (4 core x 1.2 GB) distributed em (6 core x 800 MB)
10M 6 15 14 19
20M 7 19 16 20
50M 10 25 23 21
100M 16 39 30 33
200M 37 76 57 57
500M 225 258 156 154
1G - 568 496 507
2G - 2446 1177 1266

image

TPC-H query 4 (join and group_by)

This query computes the number of orders by priority over a certain date range. It consists of

  • select from one table
  • filter rows by a specific date range
  • a subquery (for existence) over another table
  • group by order priority
  • sort result by order priority

earthmover performance results (numbers are runtime in seconds):

Input size main em distributed em (2 core x 2.4 GB) distributed em (4 core x 1.2 GB) distributed em (6 core x 800 MB)
10M 5 10 10 12
20M 7 10 10 14
50M 11 14 13 18
100M 20 20 16 20
200M 59 38 31 34
500M 149 102 63 77
1G 291 219 131 147
2G 485 583 329 417
5G 2076 1253 1013 915

image

TPC-H query 5 (join and group_by)

This query computes revenue by country over a certain date range. It consists of

  • select from 6 table
  • filter rows by a specific date range
  • group by country name, sum a revenue expression
  • sort result by revenue

earthmover performance results (numbers are runtime in seconds):

Input size main em distributed em (2 core x 2.4 GB) distributed em (4 core x 1.2 GB) distributed em (6 core x 800 MB)
10M 10 19 21 22
20M 15 30 29 29
50M 30 42 40 36
100M 59 99 60 52
200M 327 229 151 133
500M 1502 653 456 325
1G 4397 1086 734 942

image


More about dask distributed performance in the next comment, but general takeaways include

  • dask distributed certainly adds some overhead
  • group_by performance is the worst of any operation, and the more create_columns there are, the worse it is
  • benefits of using earthmover distributed are only realized on large data (hundreds of MB or several GB or larger); distributed earthmover improves some workloads' wall-clock processing time, and also enables processing larger datasets (on which single-thread earthmover can fail)
  • the "sweet spot" depends on the workload, but often using 4 workers/cores seems like a good balance of parallelism vs. coordination
  • scaling too wide (too many workers/cores) can make memory the bottleneck
  • scaling tends to be approximately linear

My next testing step will be to test distributed earthmover's performance with the Ed-Fi bundles and some large assessment data files.

@tomreitz
Copy link
Collaborator Author

tomreitz commented Mar 12, 2025

In this comment, I'll explain some things I've learned about how Dask and Dask distributed work, how earthmover's various operations use them, and how this affects performance. Once this work is more mature and thoroughly tested, perhaps portions of this comment can be included in the earthmover docs on a new "distributed" page.

A core innovation of Dask is representation of dataframes as a collection of partitions, each being a Pandas dataframe. This, together with writing intermediate Pandas partitions to disk as needed and clever re-implementation of some Pandas dataframe methods, enable Dask to handle dataframes larger than memory. And this is why earthmover initially used Dask.
image
Once data is partitioned, it becomes possible to process it in parallel - this is the functionality that Dask distributed provides. It consists of

  1. a scheduler
  2. a cluster of Dask workers - which can be CPU cores on a single machine (how earthmover works) or even separate machines on a network

The scheduler (1) carves up data transformations on an entire dataframe into smaller transformations on each partition, and it sends these smaller pieces of work to the workers (2).

Let us consider some of the operations earthmover supports, and how they work with Dask's partitioning.

union

image
Union is quite straightforward; the partitions of two (or more) dataframes are stacked to form a single larger dataframe.

column-operations

image
Column operations such as add_columns, modify_columns, keep_columns, drop_columns, map_values, etc. can be applied to each partition separately, and so are highly parallelizable.

join

image
Join can be fast and parallelizable if both frames are sorted on the join key(s) - each worker can take one or a few partitions from each dataframe and join those (in memory). If one or both dataframes are not sorted, then each partition from one dataframe may need to be joined against every partition of the other dataframe (the dotted lines in the picture above) - obviously much slower.

group_by

image
Group-by is among the slowest of operations in earthmover and Dask, particularly because, when the grouping drastically reduces the number of rows (so when there are few groups), the operation almost must be followed by a repartition (see below) since otherwise many partitions will be empty, and leaving them not repartitioned can cause downstream NaN values and other issues. (Dask unfortunately does not have an efficient way to drop empty partitions, nor any way in general to make downstream computations conditional on properties of the data itself.)

image

Add to this the fact that each of the create_columns must be separately run against the Dask GroupBy structure and then repartitione, and the performance implications of group_by become apparent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants