diff --git a/gpu_bdb/bdb_tools/q22_utils.py b/gpu_bdb/bdb_tools/q22_utils.py index e31dd6e4..722aeb1c 100644 --- a/gpu_bdb/bdb_tools/q22_utils.py +++ b/gpu_bdb/bdb_tools/q22_utils.py @@ -14,6 +14,12 @@ # limitations under the License. # +import numpy as np +import pandas as pd + +import cudf +import dask_cudf + from bdb_tools.readers import build_reader from bdb_tools.utils import convert_datestring_to_days @@ -45,7 +51,18 @@ def read_tables(config, c=None): dd_columns = ["d_date_sk", "d_date"] date_dim = table_reader.read("date_dim", relevant_cols=dd_columns) - date_dim = date_dim.map_partitions(convert_datestring_to_days) + + meta_d = { + "d_date_sk": np.ones(1, dtype=np.int64), + "d_date": np.ones(1, dtype=np.int64) + } + + if isinstance(date_dim, dask_cudf.DataFrame): + meta_df = cudf.DataFrame(meta_d) + else: + meta_df = pd.DataFrame(meta_d) + + date_dim = date_dim.map_partitions(convert_datestring_to_days, meta=meta_df) if c: c.create_table('inventory', inventory, persist=False) diff --git a/gpu_bdb/bdb_tools/utils.py b/gpu_bdb/bdb_tools/utils.py index 5afe1855..4d4f0fbf 100755 --- a/gpu_bdb/bdb_tools/utils.py +++ b/gpu_bdb/bdb_tools/utils.py @@ -34,7 +34,8 @@ import numpy as np -import cudf +import cudf +import dask_cudf import pandas as pd import dask.dataframe as dd from dask.utils import parse_bytes @@ -931,27 +932,37 @@ def left_semi_join(df_1, df_2, left_on, right_on): """ Pefrorm left semi join b/w tables """ - left_merge = lambda df_1, df_2: df_1.merge( - df_2, left_on=left_on, right_on=right_on, how="leftsemi" - ) + if instance(df_1, dask_cudf.DataFrame): + left_merge = lambda df_1, df_2: df_1.merge( + df_2, left_on=left_on, right_on=right_on, how="leftsemi" + ) - ## asserting that number of partitions of the right frame is always 1 - assert df_2.npartitions == 1 + ## asserting that number of partitions of the right frame is always 1 + assert df_2.npartitions == 1 - return df_1.map_partitions(left_merge, df_2.to_delayed()[0], meta=df_1._meta) + return df_1.map_partitions(left_merge, df_2.to_delayed()[0], meta=df_1._meta) + + else: + return df_1[df_1[right_on].isin(df_2[left_on].compute())].copy() def convert_datestring_to_days(df): - - import cudf + if isinstance(df, cudf.DataFrame): + df["d_date"] = ( + cudf.to_datetime(df["d_date"], format="%Y-%m-%d") + .astype("datetime64[s]") + .astype("int64") + / 86400 + ) + df["d_date"] = df["d_date"].astype("int64") + else: + df["d_date"] = ( + pd.to_datetime(df["d_date"], format="%Y-%m-%d") + .view("int64") + / 8.64e+13 + ) + df["d_date"] = df["d_date"].astype("int64") - df["d_date"] = ( - cudf.to_datetime(df["d_date"], format="%Y-%m-%d") - .astype("datetime64[s]") - .astype("int64") - / 86400 - ) - df["d_date"] = df["d_date"].astype("int64") return df diff --git a/gpu_bdb/queries/q11/gpu_bdb_query_11.py b/gpu_bdb/queries/q11/gpu_bdb_query_11.py index cffb60fc..a837296f 100755 --- a/gpu_bdb/queries/q11/gpu_bdb_query_11.py +++ b/gpu_bdb/queries/q11/gpu_bdb_query_11.py @@ -15,17 +15,19 @@ # import cudf +import dask_cudf from bdb_tools.utils import ( benchmark, gpubdb_argparser, run_query, - convert_datestring_to_days, + convert_datestring_to_days ) from bdb_tools.q11_utils import read_tables import numpy as np +import pandas as pd q11_start_date = "2003-01-02" q11_end_date = "2003-02-02" @@ -37,8 +39,18 @@ def main(client, config): config=config, compute_result=config["get_read_time"], ) - - date_df = date_df.map_partitions(convert_datestring_to_days) + + meta_d = { + "d_date_sk": np.ones(1, dtype=np.int64), + "d_date": np.ones(1, dtype=np.int64) + } + + if isinstance(date_df, dask_cudf.DataFrame): + meta_df = cudf.DataFrame(meta_d) + else: + meta_df = pd.DataFrame(meta_d) + + date_df = date_df.map_partitions(convert_datestring_to_days, meta=meta_df) # Filter limit in days min_date = np.datetime64(q11_start_date, "D").astype(int) diff --git a/gpu_bdb/queries/q12/gpu_bdb_query_12.py b/gpu_bdb/queries/q12/gpu_bdb_query_12.py index 9be484d8..c87607db 100755 --- a/gpu_bdb/queries/q12/gpu_bdb_query_12.py +++ b/gpu_bdb/queries/q12/gpu_bdb_query_12.py @@ -29,6 +29,7 @@ from distributed import wait import numpy as np +import pandas as pd from dask import delayed ### Current Implementation Assumption @@ -80,9 +81,15 @@ def filter_wcs_table(web_clickstreams_fn, filtered_item_df): "wcs_item_sk", "wcs_sales_sk", ] - web_clickstreams_df = cudf.read_parquet( - web_clickstreams_fn, columns=web_clickstreams_cols - ) + + if isinstance(filtered_item_df, dask_cudf.DataFrame): + web_clickstreams_df = cudf.read_parquet( + web_clickstreams_fn, columns=web_clickstreams_cols + ) + else: + web_clickstreams_df = pd.read_parquet( + web_clickstreams_fn, columns=web_clickstreams_cols + ) filter_wcs_df = web_clickstreams_df[ web_clickstreams_df["wcs_user_sk"].notnull() @@ -155,7 +162,12 @@ def main(client, config): "wcs_user_sk": np.ones(1, dtype=np.int64), "wcs_click_date_sk": np.ones(1, dtype=np.int64), } - meta_df = cudf.DataFrame(meta_d) + + if isinstance(filtered_item_df, dask_cudf.DataFrame): + meta_df = cudf.DataFrame(meta_d) + else: + meta_df = pd.DataFrame(meta_d) + web_clickstream_flist = glob.glob(os.path.join(config["data_dir"], "web_clickstreams/*.parquet")) task_ls = [ delayed(filter_wcs_table)(fn, filtered_item_df.to_delayed()[0]) @@ -172,7 +184,11 @@ def main(client, config): "ss_customer_sk": np.ones(1, dtype=store_sales_df["ss_customer_sk"].dtype), "ss_sold_date_sk": np.ones(1, dtype=np.int64), } - meta_df = cudf.DataFrame(meta_d) + + if isinstance(filtered_item_df, dask_cudf.DataFrame): + meta_df = cudf.DataFrame(meta_d) + else: + meta_df = pd.DataFrame(meta_d) filtered_ss_df = store_sales_df.map_partitions( filter_ss_table, filtered_item_df.to_delayed()[0], meta=meta_df diff --git a/gpu_bdb/queries/q15/gpu_bdb_query_15.py b/gpu_bdb/queries/q15/gpu_bdb_query_15.py index e01ccd06..f00f3721 100755 --- a/gpu_bdb/queries/q15/gpu_bdb_query_15.py +++ b/gpu_bdb/queries/q15/gpu_bdb_query_15.py @@ -14,6 +14,12 @@ # limitations under the License. # +import numpy as np +import pandas as pd + +import cudf +import dask_cudf + from bdb_tools.utils import ( benchmark, gpubdb_argparser, @@ -42,7 +48,17 @@ def main(client, config): store_sales_df = store_sales_df.query(f"ss_store_sk == {q15_store_sk}") ### Query 1. Date Time Filteration Logic - date_dim_cov_df = date_dim_df.map_partitions(convert_datestring_to_days) + meta_d = { + "d_date": np.ones(1, dtype=np.int64), + "d_date_sk": np.ones(1, dtype=np.int64) + } + + if isinstance(date_dim_df, dask_cudf.DataFrame): + meta_df = cudf.DataFrame(meta_d) + else: + meta_df = pd.DataFrame(meta_d) + + date_dim_cov_df = date_dim_df.map_partitions(convert_datestring_to_days, meta=meta_df) q15_start_dt = datetime.datetime.strptime(q15_startDate, "%Y-%m-%d") q15_start_dt = (q15_start_dt - datetime.datetime(1970, 1, 1)) / datetime.timedelta( diff --git a/gpu_bdb/queries/q16/gpu_bdb_query_16.py b/gpu_bdb/queries/q16/gpu_bdb_query_16.py index 5e427849..5313bf70 100755 --- a/gpu_bdb/queries/q16/gpu_bdb_query_16.py +++ b/gpu_bdb/queries/q16/gpu_bdb_query_16.py @@ -15,6 +15,7 @@ # import cudf +import dask_cudf from bdb_tools.utils import ( benchmark, @@ -28,6 +29,7 @@ from dask.distributed import wait import numpy as np +import pandas as pd ### conf @@ -104,7 +106,18 @@ def main(client, config): # AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('${hiveconf:q16_date}', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds ##todo: remove below - date_dim_cov_df = date_dim_df.map_partitions(convert_datestring_to_days) + meta_d = { + "d_date": np.ones(1, dtype=np.int64), + "d_date_sk": np.ones(1, dtype=np.int64) + } + + if isinstance(date_dim_df, dask_cudf.DataFrame): + meta_df = cudf.DataFrame(meta_d) + else: + meta_df = pd.DataFrame(meta_d) + + date_dim_cov_df = date_dim_df.map_partitions(convert_datestring_to_days, meta=meta_df) + q16_timestamp = np.datetime64(q16_date, "D").astype(int) filtered_date_df = date_dim_cov_df.query( f"d_date >={q16_timestamp- 30} and d_date <= {q16_timestamp+30}", diff --git a/gpu_bdb/queries/q17/gpu_bdb_query_17.py b/gpu_bdb/queries/q17/gpu_bdb_query_17.py index 053d4143..7242ec41 100755 --- a/gpu_bdb/queries/q17/gpu_bdb_query_17.py +++ b/gpu_bdb/queries/q17/gpu_bdb_query_17.py @@ -55,8 +55,8 @@ def main(client, config): ss_date_join = left_semi_join( store_sales_df, filtered_date_df, - left_on=["ss_sold_date_sk"], - right_on=["d_date_sk"], + left_on="ss_sold_date_sk", + right_on="d_date_sk", ) ss_date_join = ss_date_join[store_sales_cols] @@ -65,7 +65,7 @@ def main(client, config): item_df["i_category"].isin(q17_i_category_IN) ].reset_index(drop=True) ss_date_item_join = left_semi_join( - ss_date_join, filtered_item_df, left_on=["ss_item_sk"], right_on=["i_item_sk"] + ss_date_join, filtered_item_df, left_on="ss_item_sk", right_on="i_item_sk" ) # LEFT SEMI JOIN store s ON ss.ss_store_sk = s.s_store_sk AND s.s_gmt_offset = ${hiveconf:q17_gmt_offset} @@ -76,8 +76,8 @@ def main(client, config): ss_date_item_store_join = left_semi_join( ss_date_item_join, filtered_store_df, - left_on=["ss_store_sk"], - right_on=["s_store_sk"], + left_on="ss_store_sk", + right_on="s_store_sk", ) # (SELECT c.c_customer_sk FROM customer c LEFT SEMI JOIN customer_address ca @@ -91,8 +91,8 @@ def main(client, config): sub_c = left_semi_join( customer_df, filtered_customer_address, - left_on=["c_current_addr_sk"], - right_on=["ca_address_sk"], + left_on="c_current_addr_sk", + right_on="ca_address_sk", ) # sub_c ON ss.ss_customer_sk = sub_c.c_customer_sk @@ -100,8 +100,8 @@ def main(client, config): ss_date_item_store_customer_join = left_semi_join( ss_date_item_store_join, sub_c, - left_on=["ss_customer_sk"], - right_on=["c_customer_sk"], + left_on="ss_customer_sk", + right_on="c_customer_sk", ) # JOIN promotion p ON ss.ss_promo_sk = p.p_promo_sk