Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Enable using CPU backend with first set of dask queries #239

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion gpu_bdb/bdb_tools/q22_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
# limitations under the License.
#

import numpy as np
import pandas as pd

import cudf
import dask_cudf

from bdb_tools.readers import build_reader
from bdb_tools.utils import convert_datestring_to_days

Expand Down Expand Up @@ -45,7 +51,18 @@ def read_tables(config, c=None):

dd_columns = ["d_date_sk", "d_date"]
date_dim = table_reader.read("date_dim", relevant_cols=dd_columns)
date_dim = date_dim.map_partitions(convert_datestring_to_days)

meta_d = {
"d_date_sk": np.ones(1, dtype=np.int64),
"d_date": np.ones(1, dtype=np.int64)
}

if isinstance(date_dim, dask_cudf.DataFrame):
meta_df = cudf.DataFrame(meta_d)
else:
meta_df = pd.DataFrame(meta_d)

date_dim = date_dim.map_partitions(convert_datestring_to_days, meta=meta_df)

if c:
c.create_table('inventory', inventory, persist=False)
Expand Down
43 changes: 27 additions & 16 deletions gpu_bdb/bdb_tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

import numpy as np

import cudf
import cudf
import dask_cudf
import pandas as pd
import dask.dataframe as dd
from dask.utils import parse_bytes
Expand Down Expand Up @@ -931,27 +932,37 @@ def left_semi_join(df_1, df_2, left_on, right_on):
"""
Pefrorm left semi join b/w tables
"""
left_merge = lambda df_1, df_2: df_1.merge(
df_2, left_on=left_on, right_on=right_on, how="leftsemi"
)
if instance(df_1, dask_cudf.DataFrame):
left_merge = lambda df_1, df_2: df_1.merge(
df_2, left_on=left_on, right_on=right_on, how="leftsemi"
)

## asserting that number of partitions of the right frame is always 1
assert df_2.npartitions == 1
## asserting that number of partitions of the right frame is always 1
assert df_2.npartitions == 1

return df_1.map_partitions(left_merge, df_2.to_delayed()[0], meta=df_1._meta)
return df_1.map_partitions(left_merge, df_2.to_delayed()[0], meta=df_1._meta)

else:
return df_1[df_1[right_on].isin(df_2[left_on].compute())].copy()


def convert_datestring_to_days(df):

import cudf
if isinstance(df, cudf.DataFrame):
df["d_date"] = (
cudf.to_datetime(df["d_date"], format="%Y-%m-%d")
.astype("datetime64[s]")
.astype("int64")
/ 86400
)
df["d_date"] = df["d_date"].astype("int64")
else:
df["d_date"] = (
pd.to_datetime(df["d_date"], format="%Y-%m-%d")
.view("int64")
/ 8.64e+13
)
df["d_date"] = df["d_date"].astype("int64")

df["d_date"] = (
cudf.to_datetime(df["d_date"], format="%Y-%m-%d")
.astype("datetime64[s]")
.astype("int64")
/ 86400
)
df["d_date"] = df["d_date"].astype("int64")
return df


Expand Down
18 changes: 15 additions & 3 deletions gpu_bdb/queries/q11/gpu_bdb_query_11.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
#

import cudf
import dask_cudf

from bdb_tools.utils import (
benchmark,
gpubdb_argparser,
run_query,
convert_datestring_to_days,
convert_datestring_to_days
)

from bdb_tools.q11_utils import read_tables

import numpy as np
import pandas as pd

q11_start_date = "2003-01-02"
q11_end_date = "2003-02-02"
Expand All @@ -37,8 +39,18 @@ def main(client, config):
config=config,
compute_result=config["get_read_time"],
)

date_df = date_df.map_partitions(convert_datestring_to_days)

meta_d = {
"d_date_sk": np.ones(1, dtype=np.int64),
"d_date": np.ones(1, dtype=np.int64)
}

if isinstance(date_df, dask_cudf.DataFrame):
meta_df = cudf.DataFrame(meta_d)
else:
meta_df = pd.DataFrame(meta_d)

date_df = date_df.map_partitions(convert_datestring_to_days, meta=meta_df)

# Filter limit in days
min_date = np.datetime64(q11_start_date, "D").astype(int)
Expand Down
26 changes: 21 additions & 5 deletions gpu_bdb/queries/q12/gpu_bdb_query_12.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from distributed import wait
import numpy as np
import pandas as pd
from dask import delayed

### Current Implementation Assumption
Expand Down Expand Up @@ -80,9 +81,15 @@ def filter_wcs_table(web_clickstreams_fn, filtered_item_df):
"wcs_item_sk",
"wcs_sales_sk",
]
web_clickstreams_df = cudf.read_parquet(
web_clickstreams_fn, columns=web_clickstreams_cols
)

if isinstance(filtered_item_df, dask_cudf.DataFrame):
web_clickstreams_df = cudf.read_parquet(
web_clickstreams_fn, columns=web_clickstreams_cols
)
else:
web_clickstreams_df = pd.read_parquet(
web_clickstreams_fn, columns=web_clickstreams_cols
)

filter_wcs_df = web_clickstreams_df[
web_clickstreams_df["wcs_user_sk"].notnull()
Expand Down Expand Up @@ -155,7 +162,12 @@ def main(client, config):
"wcs_user_sk": np.ones(1, dtype=np.int64),
"wcs_click_date_sk": np.ones(1, dtype=np.int64),
}
meta_df = cudf.DataFrame(meta_d)

if isinstance(filtered_item_df, dask_cudf.DataFrame):
meta_df = cudf.DataFrame(meta_d)
else:
meta_df = pd.DataFrame(meta_d)

web_clickstream_flist = glob.glob(os.path.join(config["data_dir"], "web_clickstreams/*.parquet"))
task_ls = [
delayed(filter_wcs_table)(fn, filtered_item_df.to_delayed()[0])
Expand All @@ -172,7 +184,11 @@ def main(client, config):
"ss_customer_sk": np.ones(1, dtype=store_sales_df["ss_customer_sk"].dtype),
"ss_sold_date_sk": np.ones(1, dtype=np.int64),
}
meta_df = cudf.DataFrame(meta_d)

if isinstance(filtered_item_df, dask_cudf.DataFrame):
meta_df = cudf.DataFrame(meta_d)
else:
meta_df = pd.DataFrame(meta_d)

filtered_ss_df = store_sales_df.map_partitions(
filter_ss_table, filtered_item_df.to_delayed()[0], meta=meta_df
Expand Down
18 changes: 17 additions & 1 deletion gpu_bdb/queries/q15/gpu_bdb_query_15.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
# limitations under the License.
#

import numpy as np
import pandas as pd

import cudf
import dask_cudf

from bdb_tools.utils import (
benchmark,
gpubdb_argparser,
Expand Down Expand Up @@ -42,7 +48,17 @@ def main(client, config):
store_sales_df = store_sales_df.query(f"ss_store_sk == {q15_store_sk}")

### Query 1. Date Time Filteration Logic
date_dim_cov_df = date_dim_df.map_partitions(convert_datestring_to_days)
meta_d = {
"d_date": np.ones(1, dtype=np.int64),
"d_date_sk": np.ones(1, dtype=np.int64)
}

if isinstance(date_dim_df, dask_cudf.DataFrame):
meta_df = cudf.DataFrame(meta_d)
else:
meta_df = pd.DataFrame(meta_d)

date_dim_cov_df = date_dim_df.map_partitions(convert_datestring_to_days, meta=meta_df)

q15_start_dt = datetime.datetime.strptime(q15_startDate, "%Y-%m-%d")
q15_start_dt = (q15_start_dt - datetime.datetime(1970, 1, 1)) / datetime.timedelta(
Expand Down
15 changes: 14 additions & 1 deletion gpu_bdb/queries/q16/gpu_bdb_query_16.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

import cudf
import dask_cudf

from bdb_tools.utils import (
benchmark,
Expand All @@ -28,6 +29,7 @@
from dask.distributed import wait

import numpy as np
import pandas as pd


### conf
Expand Down Expand Up @@ -104,7 +106,18 @@ def main(client, config):
# AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('${hiveconf:q16_date}', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds

##todo: remove below
date_dim_cov_df = date_dim_df.map_partitions(convert_datestring_to_days)
meta_d = {
"d_date": np.ones(1, dtype=np.int64),
"d_date_sk": np.ones(1, dtype=np.int64)
}

if isinstance(date_dim_df, dask_cudf.DataFrame):
meta_df = cudf.DataFrame(meta_d)
else:
meta_df = pd.DataFrame(meta_d)

date_dim_cov_df = date_dim_df.map_partitions(convert_datestring_to_days, meta=meta_df)

q16_timestamp = np.datetime64(q16_date, "D").astype(int)
filtered_date_df = date_dim_cov_df.query(
f"d_date >={q16_timestamp- 30} and d_date <= {q16_timestamp+30}",
Expand Down
18 changes: 9 additions & 9 deletions gpu_bdb/queries/q17/gpu_bdb_query_17.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def main(client, config):
ss_date_join = left_semi_join(
store_sales_df,
filtered_date_df,
left_on=["ss_sold_date_sk"],
right_on=["d_date_sk"],
left_on="ss_sold_date_sk",
right_on="d_date_sk",
)
ss_date_join = ss_date_join[store_sales_cols]

Expand All @@ -65,7 +65,7 @@ def main(client, config):
item_df["i_category"].isin(q17_i_category_IN)
].reset_index(drop=True)
ss_date_item_join = left_semi_join(
ss_date_join, filtered_item_df, left_on=["ss_item_sk"], right_on=["i_item_sk"]
ss_date_join, filtered_item_df, left_on="ss_item_sk", right_on="i_item_sk"
)

# LEFT SEMI JOIN store s ON ss.ss_store_sk = s.s_store_sk AND s.s_gmt_offset = ${hiveconf:q17_gmt_offset}
Expand All @@ -76,8 +76,8 @@ def main(client, config):
ss_date_item_store_join = left_semi_join(
ss_date_item_join,
filtered_store_df,
left_on=["ss_store_sk"],
right_on=["s_store_sk"],
left_on="ss_store_sk",
right_on="s_store_sk",
)

# (SELECT c.c_customer_sk FROM customer c LEFT SEMI JOIN customer_address ca
Expand All @@ -91,17 +91,17 @@ def main(client, config):
sub_c = left_semi_join(
customer_df,
filtered_customer_address,
left_on=["c_current_addr_sk"],
right_on=["ca_address_sk"],
left_on="c_current_addr_sk",
right_on="ca_address_sk",
)

# sub_c ON ss.ss_customer_sk = sub_c.c_customer_sk

ss_date_item_store_customer_join = left_semi_join(
ss_date_item_store_join,
sub_c,
left_on=["ss_customer_sk"],
right_on=["c_customer_sk"],
left_on="ss_customer_sk",
right_on="c_customer_sk",
)

# JOIN promotion p ON ss.ss_promo_sk = p.p_promo_sk
Expand Down