Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an example notebook on dask-sql. #171

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ You can run these examples in a live session here: |Binder|
delayed
futures
machine-learning
sql
xarray

.. toctree::
Expand Down
323 changes: 323 additions & 0 deletions sql.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Analyzing dask data with SQL"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask's abilities to analyze data are huge and due to its large similarities with the pandas dataframe API, it is very easy for pandas experts to migrate.\n",
"However, for many applications amd users, SQL still plays a large role for querying and retrieving data.\n",
"For very good reasons: it is easy to learn, it is a common language many (data) systems understand and it contains all the important elements to query the data.\n",
"\n",
"With [dask-sql](https://nils-braun.github.io/dask-sql/), which leverages [Apache Calcite](https://calcite.apache.org/), it is possible to query the data with SQL and still use the full power of a dask cluster."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`dask-sql` can be installed via conda (or mamba) or pip, like in the following cell:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"! mamba install -y dask-sql"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"If you want to analyze data with dask-sql in python, you need to do three steps:\n",
"\n",
"1. Create a context\n",
"2. Load and register your data \n",
"3. Start querying!\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Create a Context"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In SQL, all tables and functions are specified via names. Therefore we need to have some place to store all the registered tables (and functions), so that dask-sql knows which data it refers to.\n",
"This is the task of the Context.\n",
"You typically create a single context once at the beginning of your python script/notebook and use it through the rest of the application."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask_sql import Context\n",
"c = Context()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set up a dask cluster"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now would be the best time to connect to your dask cluster if you have one. \n",
"dask-sql leverages dask for performing the computations on the data.\n",
"\n",
"Check out one of [the many ways](https://docs.dask.org/en/latest/setup.html) to create and connect to your dask cluster.\n",
"\n",
"For this example we will create a cluster running locally.\n",
"This is optional as dask can also create one implicetly, but we can get more diagnostics and insights.\n",
"You can click the link shown after the client intialization to show the dask dashboard."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"\n",
"client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Load and register the data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"So far, no data was involved. Let's change that! There are many ways how you can get the data into your cluster and tell dask-sql, where to find it. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Register a dask or pandas dataframe"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you have already a dask dataframe (which is dask's abstraction of a pandas dataframe with nearly the same API), you can directly associate it with a name:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.datasets import timeseries\n",
"df = timeseries()\n",
"type(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c.create_table(\"timeseries\", df.persist())"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"c.create_table(\"timeseries\", df.persist())"
"c.create_table(\"timeseries\", persist=True)"

This can be done using a kwarg of create_table.

]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<div class=\"alert alert-info\">\n",
"\n",
"Please note that we have persisted the data before passing it to dask-sql.\n",
"This will tell dask that we want to prefetch the data into memory.\n",
"Doing so will speed up the queries a lot, so you probably always want to do this.\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given some of the discussion on dask-contrib/dask-sql#218 and the fact that persisting is no longer dask-sql's default behavior (dask-contrib/dask-sql#245), it might be worth discussing here the trade-offs of persisting to memory before (speed up vs. potential OOM errors). cc @VibhuJawa in case you have any thoughts here

"\n",
"</div>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It is also possible to register a pandas dataframe directly."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"df = pd.DataFrame({\"column\": [1, 2, 3]})\n",
"c.create_table(\"pandas\", df)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"c.create_table(\"pandas\", df)"
"c.create_table(\"pandas\", df, persist=True)"

By the next release of dask-sql, persisting will no longer be the default behavior, and we would need to include this to make sure that happens. Given the discussion above, we may opt to not persist here.

]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read in data an from external location and register it"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In most of the cases however, your data will live on some external storage device, such as a local disk, S3 or hdfs.\n",
"You can leverage dask's large set of understood input formats and sources to load the data.\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any publicly available database we could register to illustrate this concept?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @charlesbluca

If you are looking for s3 datasets, Can we make use of the new york dataset used in coiled docs

df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
        "store_and_fwd_flag": "string",
        "PULocationID": "UInt16",
        "DOLocationID": "UInt16",
    },
    storage_options={"anon": True},
    blocksize="16 MiB",
).persist()

c.create_table("trip_data", persist=True)
 

And try to add some groupby/ Aggregation examples? what do you think?

#something like
c.sql("select passenger_count,mean(tip_amount) from trip_data group by passenger_count")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that seems like a good idea! Do you know if it's possible to read this in directly with a query, with something like:

    c.sql(
        f"""
        CREATE TABLE
            trip_data
        WITH (
            location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
            format = 'csv',
            ...
        )
    """
    )

Passing the kwargs into the WITH (...)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't tried this yet, but Can we try something like this? expecting it should parse and pass Kwargs argument to input Plugins (inspired from this example here)

What do you think?

CREATE TABLE trip_data
 WITH (
    location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
    format = 'csv',
    parse_dates = ARRAY ['pep_pickup_datetime', 'tpep_dropoff_datetime'],
    type = MAP ['payment_type', 'UInt8',
               'VendorID', 'UInt8',
               'passenger_count', 'UInt8',
               'RatecodeID', 'UInt8',
               'store_and_fwd_flag', 'string',
               'PULocationID', 'UInt16',
                'DOLocationID', 'UInt16']
	storage_options= MAP ['anon','true'],
	blocksize='16 MiB',
)


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried this SQL after fixing bugs it seems working for me, let me know if this query works for you as well .

refactored query :

CREATE TABLE trip_data
 WITH (
    location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
    format = 'csv',
    parse_dates = ARRAY ['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    dtype = MAP ['payment_type', 'UInt8',
               'VendorID', 'UInt8',
               'passenger_count', 'UInt8',
               'RatecodeID', 'UInt8',
               'store_and_fwd_flag', 'string',
               'PULocationID', 'UInt16',
                'DOLocationID', 'UInt16'],
	storage_options= MAP ['anon','true'],
	blocksize='16 MiB'
)



Copy link
Member

@charlesbluca charlesbluca Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that works! The table is loaded in, though it looks like groupby operations may be a little complex for a simple demo:

In [7]: c.sql("select passenger_count, sum(tip_amount) from trip_data group by passenger_count")
Out[7]: 
Dask DataFrame Structure:
              passenger_count SUM("trip_data"."tip_amount")
npartitions=1                                              
                         Int8                       float64
                          ...                           ...
Dask Name: getitem, 11857 tasks

Maybe just showing the futures is sufficient to show that it works. Thanks for the help @rajagurunath 😄

"Find our more information in the [documentation](https://dask-sql.readthedocs.io/en/latest/pages/data_input.html) of dask-sql."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We have now registered tables in our context with the given names.\n",
"Let's do some data analysis!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Query the data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Whenever you call the `.sql()` method of the context, dask-sql will hand your query to Apache Calcite to turn it into a relational algebra and will then create a dask computation graph, which will execute your computation. \n",
"\n",
"Let's see how this works in action:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c.sql(\"\"\"\n",
" SELECT AVG(x) FROM timeseries\n",
"\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The result is again a dask dataframe, which only represents the computation (without having executed it) so far.\n",
"Lets trigger it now!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c.sql(\"\"\"\n",
" SELECT AVG(x) FROM timeseries\n",
"\"\"\").compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Congratulations! You have just queried your data with SQL.\n",
"If you check out the dask dashboard, you see that this has triggered some computations in dask.\n",
"\n",
"Of course, it is also possible to calculate more complex queries:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c.sql(\"\"\"\n",
" SELECT\n",
" lhs.name,\n",
" lhs.id,\n",
" lhs.x\n",
" FROM\n",
" timeseries AS lhs\n",
" JOIN\n",
" (\n",
" SELECT\n",
" name AS max_name,\n",
" MAX(x) AS max_x\n",
" FROM timeseries\n",
" GROUP BY name\n",
" ) AS rhs\n",
" ON\n",
" lhs.name = rhs.max_name AND\n",
" lhs.x = rhs.max_x\n",
"\"\"\").compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can find more information on dask-sql in the [documentation](https://dask-sql.readthedocs.io/)."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}