-
-
Notifications
You must be signed in to change notification settings - Fork 226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added an example notebook on dask-sql. #171
Changes from all commits
cd8ac19
e637822
12840cd
3c0b2f2
fa29c86
77f59f1
a4eed0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,323 @@ | ||||||
{ | ||||||
"cells": [ | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"# Analyzing dask data with SQL" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"Dask's abilities to analyze data are huge and due to its large similarities with the pandas dataframe API, it is very easy for pandas experts to migrate.\n", | ||||||
"However, for many applications amd users, SQL still plays a large role for querying and retrieving data.\n", | ||||||
"For very good reasons: it is easy to learn, it is a common language many (data) systems understand and it contains all the important elements to query the data.\n", | ||||||
"\n", | ||||||
"With [dask-sql](https://nils-braun.github.io/dask-sql/), which leverages [Apache Calcite](https://calcite.apache.org/), it is possible to query the data with SQL and still use the full power of a dask cluster." | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"`dask-sql` can be installed via conda (or mamba) or pip, like in the following cell:" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"! mamba install -y dask-sql" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"\n", | ||||||
"If you want to analyze data with dask-sql in python, you need to do three steps:\n", | ||||||
"\n", | ||||||
"1. Create a context\n", | ||||||
"2. Load and register your data \n", | ||||||
"3. Start querying!\n" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"## 1. Create a Context" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"In SQL, all tables and functions are specified via names. Therefore we need to have some place to store all the registered tables (and functions), so that dask-sql knows which data it refers to.\n", | ||||||
"This is the task of the Context.\n", | ||||||
"You typically create a single context once at the beginning of your python script/notebook and use it through the rest of the application." | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"from dask_sql import Context\n", | ||||||
"c = Context()" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"## Set up a dask cluster" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"Now would be the best time to connect to your dask cluster if you have one. \n", | ||||||
"dask-sql leverages dask for performing the computations on the data.\n", | ||||||
"\n", | ||||||
"Check out one of [the many ways](https://docs.dask.org/en/latest/setup.html) to create and connect to your dask cluster.\n", | ||||||
"\n", | ||||||
"For this example we will create a cluster running locally.\n", | ||||||
"This is optional as dask can also create one implicetly, but we can get more diagnostics and insights.\n", | ||||||
"You can click the link shown after the client intialization to show the dask dashboard." | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"from dask.distributed import Client\n", | ||||||
"\n", | ||||||
"client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')\n", | ||||||
"client" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"## 2. Load and register the data" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"So far, no data was involved. Let's change that! There are many ways how you can get the data into your cluster and tell dask-sql, where to find it. " | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"### Register a dask or pandas dataframe" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"If you have already a dask dataframe (which is dask's abstraction of a pandas dataframe with nearly the same API), you can directly associate it with a name:" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"from dask.datasets import timeseries\n", | ||||||
"df = timeseries()\n", | ||||||
"type(df)" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"c.create_table(\"timeseries\", df.persist())" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"<div class=\"alert alert-info\">\n", | ||||||
"\n", | ||||||
"Please note that we have persisted the data before passing it to dask-sql.\n", | ||||||
"This will tell dask that we want to prefetch the data into memory.\n", | ||||||
"Doing so will speed up the queries a lot, so you probably always want to do this.\n", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given some of the discussion on dask-contrib/dask-sql#218 and the fact that persisting is no longer dask-sql's default behavior (dask-contrib/dask-sql#245), it might be worth discussing here the trade-offs of persisting to memory before (speed up vs. potential OOM errors). cc @VibhuJawa in case you have any thoughts here |
||||||
"\n", | ||||||
"</div>" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"It is also possible to register a pandas dataframe directly." | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"import pandas as pd\n", | ||||||
"df = pd.DataFrame({\"column\": [1, 2, 3]})\n", | ||||||
"c.create_table(\"pandas\", df)" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
By the next release of dask-sql, persisting will no longer be the default behavior, and we would need to include this to make sure that happens. Given the discussion above, we may opt to not persist here. |
||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"### Read in data an from external location and register it" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"In most of the cases however, your data will live on some external storage device, such as a local disk, S3 or hdfs.\n", | ||||||
"You can leverage dask's large set of understood input formats and sources to load the data.\n", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any publicly available database we could register to illustrate this concept? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are looking for s3 datasets, Can we make use of the new york dataset used in coiled docs
And try to add some groupby/ Aggregation examples? what do you think?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that seems like a good idea! Do you know if it's possible to read this in directly with a query, with something like: c.sql(
f"""
CREATE TABLE
trip_data
WITH (
location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
format = 'csv',
...
)
"""
) Passing the kwargs into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haven't tried this yet, but Can we try something like this? expecting it should parse and pass Kwargs argument to input Plugins (inspired from this example here) What do you think?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have tried this SQL after fixing bugs it seems working for me, let me know if this query works for you as well . refactored query :
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that works! The table is loaded in, though it looks like groupby operations may be a little complex for a simple demo: In [7]: c.sql("select passenger_count, sum(tip_amount) from trip_data group by passenger_count")
Out[7]:
Dask DataFrame Structure:
passenger_count SUM("trip_data"."tip_amount")
npartitions=1
Int8 float64
... ...
Dask Name: getitem, 11857 tasks Maybe just showing the futures is sufficient to show that it works. Thanks for the help @rajagurunath 😄 |
||||||
"Find our more information in the [documentation](https://dask-sql.readthedocs.io/en/latest/pages/data_input.html) of dask-sql." | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"We have now registered tables in our context with the given names.\n", | ||||||
"Let's do some data analysis!" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"## 3. Query the data" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"Whenever you call the `.sql()` method of the context, dask-sql will hand your query to Apache Calcite to turn it into a relational algebra and will then create a dask computation graph, which will execute your computation. \n", | ||||||
"\n", | ||||||
"Let's see how this works in action:" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"c.sql(\"\"\"\n", | ||||||
" SELECT AVG(x) FROM timeseries\n", | ||||||
"\"\"\")" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"The result is again a dask dataframe, which only represents the computation (without having executed it) so far.\n", | ||||||
"Lets trigger it now!" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"c.sql(\"\"\"\n", | ||||||
" SELECT AVG(x) FROM timeseries\n", | ||||||
"\"\"\").compute()" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"Congratulations! You have just queried your data with SQL.\n", | ||||||
"If you check out the dask dashboard, you see that this has triggered some computations in dask.\n", | ||||||
"\n", | ||||||
"Of course, it is also possible to calculate more complex queries:" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "code", | ||||||
"execution_count": null, | ||||||
"metadata": {}, | ||||||
"outputs": [], | ||||||
"source": [ | ||||||
"c.sql(\"\"\"\n", | ||||||
" SELECT\n", | ||||||
" lhs.name,\n", | ||||||
" lhs.id,\n", | ||||||
" lhs.x\n", | ||||||
" FROM\n", | ||||||
" timeseries AS lhs\n", | ||||||
" JOIN\n", | ||||||
" (\n", | ||||||
" SELECT\n", | ||||||
" name AS max_name,\n", | ||||||
" MAX(x) AS max_x\n", | ||||||
" FROM timeseries\n", | ||||||
" GROUP BY name\n", | ||||||
" ) AS rhs\n", | ||||||
" ON\n", | ||||||
" lhs.name = rhs.max_name AND\n", | ||||||
" lhs.x = rhs.max_x\n", | ||||||
"\"\"\").compute()" | ||||||
] | ||||||
}, | ||||||
{ | ||||||
"cell_type": "markdown", | ||||||
"metadata": {}, | ||||||
"source": [ | ||||||
"You can find more information on dask-sql in the [documentation](https://dask-sql.readthedocs.io/)." | ||||||
] | ||||||
} | ||||||
], | ||||||
"metadata": { | ||||||
"kernelspec": { | ||||||
"display_name": "Python 3", | ||||||
"language": "python", | ||||||
"name": "python3" | ||||||
}, | ||||||
"language_info": { | ||||||
"codemirror_mode": { | ||||||
"name": "ipython", | ||||||
"version": 3 | ||||||
}, | ||||||
"file_extension": ".py", | ||||||
"mimetype": "text/x-python", | ||||||
"name": "python", | ||||||
"nbconvert_exporter": "python", | ||||||
"pygments_lexer": "ipython3", | ||||||
"version": "3.8.5" | ||||||
} | ||||||
}, | ||||||
"nbformat": 4, | ||||||
"nbformat_minor": 4 | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done using a kwarg of
create_table
.