Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance issue on TPCH comparing with sparksql #423

Open
wangxiaoying opened this issue Mar 27, 2024 · 12 comments
Open

Performance issue on TPCH comparing with sparksql #423

wangxiaoying opened this issue Mar 27, 2024 · 12 comments

Comments

@wangxiaoying
Copy link

wangxiaoying commented Mar 27, 2024

Description

I'm trying to run TPC-H Q3 and compare the performance between Wayang and SparkSQL under the following setup:

  • Running both Spark (3.5.1) and Wayang on a local VM with 32 CPU cores and 128GB memory
  • Running a postgres instance that maintains all the TPC-H tables (sf=10) on a remote VM

I try to keep the spark setting the same on both runs. And for Q3 wayang took around 3 minutes while spark took only 40 seconds.

To reproduce

To run Wayang, I compile the project locally (using tag 1.0.0) and use the benchmark code under wayang-benchmark directly: ./wayang-1.0.0-SNAPSHOT/bin/wayang-submit org.apache.wayang.apps.tpch.TpcH exp\(123\) spark,postgres file:///path/to/wayang.properties Q3

The wayang.properties file is like the following:

wayang.postgres.jdbc.url = jdbc:postgresql://{POSTGRES_IP}:{POSTGRES_PORT}/{TPCH_DB}
wayang.postgres.jdbc.user = {POSTGRES_USER}
wayang.postgres.jdbc.password = {POSTGRES_PASSWORD}

spark.master = local[32]
spark.driver.memory = 110G
spark.executor.memory = 110G
spark.executor.cores = 32
wayang.giraph.hdfs.tempdir = file:///tmp/result/

spark.rdd.compress = true
spark.log.level = INFO

To run Spark, I use the following code:

import sys
import time
from pyspark.sql import SparkSession
from contexttimer import Timer

SPARK_JARS = "path/to/jar/postgresql-42.3.8.jar"
POSTGRES_URL = "jdbc:postgresql://{POSTGRES_IP}:{POSTGRES_PORT}/{TPCH_DB}"
POSTGRES_USER = "{POSTGRES_USER}"
POSTGRES_PASSWORD = "{POSTGRES_PASSWORD}"

TPCH_Q3 = """SELECT
    l_orderkey,
    sum(l_extendedprice * (1 - l_discount)) AS revenue,
    o_orderdate,
    o_shippriority
FROM
    customer,
    orders,
    lineitem
WHERE
    c_mktsegment = 'BUILDING'
    AND c_custkey = o_custkey
    AND l_orderkey = o_orderkey
    AND o_orderdate < CAST('1995-03-15' AS date)
    AND l_shipdate > CAST('1995-03-15' AS date)
GROUP BY
    l_orderkey,
    o_orderdate,
    o_shippriority
ORDER BY
    revenue DESC,
    o_orderdate"""

def registerPostgres(spark, tables, url):
    for name in tables:
        spark.sql(f"""
            CREATE TEMPORARY VIEW {name}
            USING org.apache.spark.sql.jdbc
            OPTIONS (
              driver "org.postgresql.Driver",
              url "{url}",
              dbtable "public.{name}",
              user '{POSTGRES_USER}',
              password '{POSTGRES_PASSWORD}',
              pushDownAggregate 'true'
            )
            """)
            

def registerViews(spark):
    registerPostgres(spark, ["lineitem", "customer", "orders", "nation", "region", "supplier", "part", "partsupp"], POSTGRES_URL)


def run_query(spark, query):
    with Timer() as timer:
        df = spark.sql(query)
        df.collect()
    print(f"get {df.count()} rows, {len(df.columns)} cols")
    print(f"plan: {df.explain()}")
    print(f"took {timer.elapsed:.2f} in total")
    # print(df)
    print()
    sys.stdout.flush()

        

if __name__ == '__main__':

    spark = (
        SparkSession.builder.master("local[32]")
        .appName("test-spark")
        .config("spark.jars", SPARK_JARS)
        .config("spark.executor.memory", "110g")
        .config("spark.driver.memory", "110g")
        .config("spark.log.level", "INFO")
        .config("spark.ui.port", "4040")
        .getOrCreate()
    )

    print(spark.sparkContext.getConf().getAll())
    registerViews(spark)

    run_query(spark, TPCH_Q3)
    time.sleep(2)
    spark.stop()

Some investigation

The queries that are used to fetch data from postgres using both platforms, which are basically the same (filter and projection pushdown are enabled).

I try to print the logs of spark execution as much as I can to see the difference between the two. One significant overhead I found is that wayang produces much larger ShuffleMapTask for join than spark does (~46500000 bytes v.s. 8000 bytes), which causes ~2 seconds to serialize each task (64 tasks in total) one by one and result in a 1 minutes overhead. On the other hand, the serialization time on spark is negligible.

I'm not very familiar with spark execution, so I'm not sure why it is the case. Can anyone give me a pointer? Is there anything I'm missing such as the way I run the query or something in configuration? Thank you!

@wangxiaoying
Copy link
Author

wangxiaoying commented Mar 27, 2024

Just in case it might be helpful, here is a log snippet related to the serialization overhead of spark execution with wayang I mentioned above:

24/03/26 18:18:26 WARN TaskSetManager: Stage 2 contains a task of very large size (45404 KiB). The maximum recommended task size is 1000 KiB.
24/03/26 18:18:26 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 0) (10.155.96.97, executor driver, partition 0, PROCESS_LOCAL, 46494022 bytes) 
24/03/26 18:18:26 INFO TaskSetManager: start serialize task 1...
24/03/26 18:18:26 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 1)
24/03/26 18:18:26 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 1)
24/03/26 18:18:28 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 1)
24/03/26 18:18:28 INFO TaskSetManager: finish serialize task 1...
24/03/26 18:18:28 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 1) (10.155.96.97, executor driver, partition 1, PROCESS_LOCAL, 46490225 bytes) 
24/03/26 18:18:28 INFO TaskSetManager: start serialize task 2...
24/03/26 18:18:28 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 2)
24/03/26 18:18:28 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 2)
24/03/26 18:18:29 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 2)
24/03/26 18:18:29 INFO TaskSetManager: finish serialize task 2...
24/03/26 18:18:29 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 2) (10.155.96.97, executor driver, partition 2, PROCESS_LOCAL, 46488713 bytes) 
24/03/26 18:18:29 INFO TaskSetManager: start serialize task 3...
24/03/26 18:18:29 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 3)
24/03/26 18:18:29 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 3)
24/03/26 18:18:31 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 3)
24/03/26 18:18:31 INFO TaskSetManager: finish serialize task 3...
24/03/26 18:18:31 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 3) (10.155.96.97, executor driver, partition 3, PROCESS_LOCAL, 46488685 bytes) 
24/03/26 18:18:31 INFO TaskSetManager: start serialize task 4...
24/03/26 18:18:31 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 4)
24/03/26 18:18:31 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 4)
24/03/26 18:18:32 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 4)
24/03/26 18:18:32 INFO TaskSetManager: finish serialize task 4...
24/03/26 18:18:32 INFO TaskSetManager: Starting task 4.0 in stage 2.0 (TID 4) (10.155.96.97, executor driver, partition 4, PROCESS_LOCAL, 46486155 bytes) 
24/03/26 18:18:32 INFO TaskSetManager: start serialize task 5...
24/03/26 18:18:32 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 5)
24/03/26 18:18:32 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 5)
24/03/26 18:18:34 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 5)
24/03/26 18:18:34 INFO TaskSetManager: finish serialize task 5...
24/03/26 18:18:34 INFO TaskSetManager: Starting task 5.0 in stage 2.0 (TID 5) (10.155.96.97, executor driver, partition 5, PROCESS_LOCAL, 46490264 bytes) 
24/03/26 18:18:34 INFO TaskSetManager: start serialize task 6...
24/03/26 18:18:34 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 6)
24/03/26 18:18:34 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 6)
24/03/26 18:18:36 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 6)
24/03/26 18:18:36 INFO TaskSetManager: finish serialize task 6...
24/03/26 18:18:36 INFO TaskSetManager: Starting task 6.0 in stage 2.0 (TID 6) (10.155.96.97, executor driver, partition 6, PROCESS_LOCAL, 46488245 bytes) 
24/03/26 18:18:36 INFO TaskSetManager: start serialize task 7...
24/03/26 18:18:36 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 7)
24/03/26 18:18:36 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 7)
24/03/26 18:18:38 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 7)
24/03/26 18:18:38 INFO TaskSetManager: finish serialize task 7...
24/03/26 18:18:38 INFO TaskSetManager: Starting task 7.0 in stage 2.0 (TID 7) (10.155.96.97, executor driver, partition 7, PROCESS_LOCAL, 46487068 bytes) 
24/03/26 18:18:38 INFO TaskSetManager: start serialize task 8...
24/03/26 18:18:38 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 8)
24/03/26 18:18:38 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 8)
24/03/26 18:18:40 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 8)
24/03/26 18:18:40 INFO TaskSetManager: finish serialize task 8...
24/03/26 18:18:40 INFO TaskSetManager: Starting task 8.0 in stage 2.0 (TID 8) (10.155.96.97, executor driver, partition 8, PROCESS_LOCAL, 46489857 bytes) 
24/03/26 18:18:40 INFO TaskSetManager: start serialize task 9...
24/03/26 18:18:40 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 9)
24/03/26 18:18:40 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 9)
24/03/26 18:18:41 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 9)
24/03/26 18:18:41 INFO TaskSetManager: finish serialize task 9...
24/03/26 18:18:41 INFO TaskSetManager: Starting task 9.0 in stage 2.0 (TID 9) (10.155.96.97, executor driver, partition 9, PROCESS_LOCAL, 46489664 bytes) 
24/03/26 18:18:41 INFO TaskSetManager: start serialize task 10...
24/03/26 18:18:41 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 10)
24/03/26 18:18:41 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 10)
24/03/26 18:18:43 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 10)
24/03/26 18:18:43 INFO TaskSetManager: finish serialize task 10...
24/03/26 18:18:43 INFO TaskSetManager: Starting task 10.0 in stage 2.0 (TID 10) (10.155.96.97, executor driver, partition 10, PROCESS_LOCAL, 46488355 bytes) 
24/03/26 18:18:43 INFO TaskSetManager: start serialize task 11...
24/03/26 18:18:43 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 11)
24/03/26 18:18:43 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 11)
24/03/26 18:18:44 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 11)
24/03/26 18:18:44 INFO TaskSetManager: finish serialize task 11...
24/03/26 18:18:44 INFO TaskSetManager: Starting task 11.0 in stage 2.0 (TID 11) (10.155.96.97, executor driver, partition 11, PROCESS_LOCAL, 46488438 bytes) 
24/03/26 18:18:44 INFO TaskSetManager: start serialize task 12...
24/03/26 18:18:44 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 12)
24/03/26 18:18:44 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 12)
24/03/26 18:18:46 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 12)
24/03/26 18:18:46 INFO TaskSetManager: finish serialize task 12...
24/03/26 18:18:46 INFO TaskSetManager: Starting task 12.0 in stage 2.0 (TID 12) (10.155.96.97, executor driver, partition 12, PROCESS_LOCAL, 46488729 bytes) 
24/03/26 18:18:46 INFO TaskSetManager: start serialize task 13...
24/03/26 18:18:46 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 13)
24/03/26 18:18:46 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 13)
24/03/26 18:18:48 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 13)
24/03/26 18:18:48 INFO TaskSetManager: finish serialize task 13...
24/03/26 18:18:48 INFO TaskSetManager: Starting task 13.0 in stage 2.0 (TID 13) (10.155.96.97, executor driver, partition 13, PROCESS_LOCAL, 46486309 bytes) 
24/03/26 18:18:48 INFO TaskSetManager: start serialize task 14...
24/03/26 18:18:48 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 14)
24/03/26 18:18:48 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 14)
24/03/26 18:18:50 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 14)
24/03/26 18:18:50 INFO TaskSetManager: finish serialize task 14...
24/03/26 18:18:50 INFO TaskSetManager: Starting task 14.0 in stage 2.0 (TID 14) (10.155.96.97, executor driver, partition 14, PROCESS_LOCAL, 46489736 bytes) 
24/03/26 18:18:50 INFO TaskSetManager: start serialize task 15...
24/03/26 18:18:50 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 15)
24/03/26 18:18:50 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 15)
24/03/26 18:18:51 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 15)
24/03/26 18:18:51 INFO TaskSetManager: finish serialize task 15...
24/03/26 18:18:51 INFO TaskSetManager: Starting task 15.0 in stage 2.0 (TID 15) (10.155.96.97, executor driver, partition 15, PROCESS_LOCAL, 46489059 bytes) 
24/03/26 18:18:51 INFO TaskSetManager: start serialize task 16...
24/03/26 18:18:51 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 16)
24/03/26 18:18:51 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 16)
24/03/26 18:18:53 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 16)
24/03/26 18:18:53 INFO TaskSetManager: finish serialize task 16...
24/03/26 18:18:53 INFO TaskSetManager: Starting task 16.0 in stage 2.0 (TID 16) (10.155.96.97, executor driver, partition 16, PROCESS_LOCAL, 46488938 bytes) 
24/03/26 18:18:53 INFO TaskSetManager: start serialize task 17...
24/03/26 18:18:53 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 17)
24/03/26 18:18:53 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 17)
24/03/26 18:18:54 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 17)
24/03/26 18:18:54 INFO TaskSetManager: finish serialize task 17...
24/03/26 18:18:54 INFO TaskSetManager: Starting task 17.0 in stage 2.0 (TID 17) (10.155.96.97, executor driver, partition 17, PROCESS_LOCAL, 46485930 bytes) 
24/03/26 18:18:54 INFO TaskSetManager: start serialize task 18...
24/03/26 18:18:54 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 18)
24/03/26 18:18:54 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 18)
24/03/26 18:18:56 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 18)
24/03/26 18:18:56 INFO TaskSetManager: finish serialize task 18...
24/03/26 18:18:56 INFO TaskSetManager: Starting task 18.0 in stage 2.0 (TID 18) (10.155.96.97, executor driver, partition 18, PROCESS_LOCAL, 46486144 bytes) 
24/03/26 18:18:56 INFO TaskSetManager: start serialize task 19...
24/03/26 18:18:56 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 19)
24/03/26 18:18:56 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 19)
24/03/26 18:18:58 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 19)
24/03/26 18:18:58 INFO TaskSetManager: finish serialize task 19...
24/03/26 18:18:58 INFO TaskSetManager: Starting task 19.0 in stage 2.0 (TID 19) (10.155.96.97, executor driver, partition 19, PROCESS_LOCAL, 46491094 bytes) 
24/03/26 18:18:58 INFO TaskSetManager: start serialize task 20...
24/03/26 18:18:58 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 20)
24/03/26 18:18:58 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 20)
24/03/26 18:18:59 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 20)
24/03/26 18:18:59 INFO TaskSetManager: finish serialize task 20...
24/03/26 18:18:59 INFO TaskSetManager: Starting task 20.0 in stage 2.0 (TID 20) (10.155.96.97, executor driver, partition 20, PROCESS_LOCAL, 46492332 bytes) 
24/03/26 18:18:59 INFO TaskSetManager: start serialize task 21...
24/03/26 18:18:59 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 21)
24/03/26 18:18:59 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 21)
24/03/26 18:19:01 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 21)
24/03/26 18:19:01 INFO TaskSetManager: finish serialize task 21...
24/03/26 18:19:01 INFO TaskSetManager: Starting task 21.0 in stage 2.0 (TID 21) (10.155.96.97, executor driver, partition 21, PROCESS_LOCAL, 46485583 bytes) 
24/03/26 18:19:01 INFO TaskSetManager: start serialize task 22...
24/03/26 18:19:01 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 22)
24/03/26 18:19:01 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 22)
24/03/26 18:19:03 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 22)
24/03/26 18:19:03 INFO TaskSetManager: finish serialize task 22...
24/03/26 18:19:03 INFO TaskSetManager: Starting task 22.0 in stage 2.0 (TID 22) (10.155.96.97, executor driver, partition 22, PROCESS_LOCAL, 46488872 bytes) 
24/03/26 18:19:03 INFO TaskSetManager: start serialize task 23...
24/03/26 18:19:03 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 23)
24/03/26 18:19:03 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 23)
24/03/26 18:19:04 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 23)
24/03/26 18:19:04 INFO TaskSetManager: finish serialize task 23...
24/03/26 18:19:04 INFO TaskSetManager: Starting task 23.0 in stage 2.0 (TID 23) (10.155.96.97, executor driver, partition 23, PROCESS_LOCAL, 46489351 bytes) 
24/03/26 18:19:04 INFO TaskSetManager: start serialize task 24...
24/03/26 18:19:04 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 24)
24/03/26 18:19:04 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 24)
24/03/26 18:19:06 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 24)
24/03/26 18:19:06 INFO TaskSetManager: finish serialize task 24...
24/03/26 18:19:06 INFO TaskSetManager: Starting task 24.0 in stage 2.0 (TID 24) (10.155.96.97, executor driver, partition 24, PROCESS_LOCAL, 46486199 bytes) 
24/03/26 18:19:06 INFO TaskSetManager: start serialize task 25...
24/03/26 18:19:06 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 25)
24/03/26 18:19:06 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 25)
24/03/26 18:19:08 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 25)
24/03/26 18:19:08 INFO TaskSetManager: finish serialize task 25...
24/03/26 18:19:08 INFO TaskSetManager: Starting task 25.0 in stage 2.0 (TID 25) (10.155.96.97, executor driver, partition 25, PROCESS_LOCAL, 46485572 bytes) 
24/03/26 18:19:08 INFO TaskSetManager: start serialize task 26...
24/03/26 18:19:08 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 26)
24/03/26 18:19:08 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 26)
24/03/26 18:19:09 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 26)
24/03/26 18:19:09 INFO TaskSetManager: finish serialize task 26...
24/03/26 18:19:09 INFO TaskSetManager: Starting task 26.0 in stage 2.0 (TID 26) (10.155.96.97, executor driver, partition 26, PROCESS_LOCAL, 46486711 bytes) 
24/03/26 18:19:09 INFO TaskSetManager: start serialize task 27...
24/03/26 18:19:09 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 27)
24/03/26 18:19:09 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 27)
24/03/26 18:19:11 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 27)
24/03/26 18:19:11 INFO TaskSetManager: finish serialize task 27...
24/03/26 18:19:11 INFO TaskSetManager: Starting task 27.0 in stage 2.0 (TID 27) (10.155.96.97, executor driver, partition 27, PROCESS_LOCAL, 46491886 bytes) 
24/03/26 18:19:11 INFO TaskSetManager: start serialize task 28...
24/03/26 18:19:11 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 28)
24/03/26 18:19:11 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 28)
24/03/26 18:19:13 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 28)
24/03/26 18:19:13 INFO TaskSetManager: finish serialize task 28...
24/03/26 18:19:13 INFO TaskSetManager: Starting task 28.0 in stage 2.0 (TID 28) (10.155.96.97, executor driver, partition 28, PROCESS_LOCAL, 46490368 bytes) 
24/03/26 18:19:13 INFO TaskSetManager: start serialize task 29...
24/03/26 18:19:13 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 29)
24/03/26 18:19:13 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 29)
24/03/26 18:19:14 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 29)
24/03/26 18:19:14 INFO TaskSetManager: finish serialize task 29...
24/03/26 18:19:14 INFO TaskSetManager: Starting task 29.0 in stage 2.0 (TID 29) (10.155.96.97, executor driver, partition 29, PROCESS_LOCAL, 46489395 bytes) 
24/03/26 18:19:14 INFO TaskSetManager: start serialize task 30...
24/03/26 18:19:14 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 30)
24/03/26 18:19:14 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 30)
24/03/26 18:19:16 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 30)
24/03/26 18:19:16 INFO TaskSetManager: finish serialize task 30...
24/03/26 18:19:16 INFO TaskSetManager: Starting task 30.0 in stage 2.0 (TID 30) (10.155.96.97, executor driver, partition 30, PROCESS_LOCAL, 46488916 bytes) 
24/03/26 18:19:16 INFO TaskSetManager: start serialize task 31...
24/03/26 18:19:16 INFO JavaSerializerInstance: before serialize stream ShuffleMapTask(2, 31)
24/03/26 18:19:16 INFO JavaSerializerInstance: after serialize stream ShuffleMapTask(2, 31)
24/03/26 18:19:17 INFO JavaSerializerInstance: after write object ShuffleMapTask(2, 31)

@zkaoudi
Copy link
Contributor

zkaoudi commented Mar 27, 2024

Thanks for all the info @wangxiaoying. I can take a look at it next week.

In the meantime, can you confirm that the operations executed in postgres and in Spark with SparkSQL are the same when executed in Wayang? In Wayang you can actually force each operator where to be executed with the .withPlatform() method so that you make sure that the plans are the same.

@wangxiaoying
Copy link
Author

wangxiaoying commented Mar 27, 2024

Thank you @zkaoudi for the quick response!

In the meantime, can you confirm that the operations executed in postgres and in Spark with SparkSQL are the same when executed in Wayang?

Yes. I checked the log at postgres side. Here is the query fetching related log when using wayang:

2024-03-27 20:33:44.571 UTC [13283] LOG:  execute <unnamed>: SELECT l_orderkey, l_extendedprice, l_discount FROM LINEITEM WHERE l_shipDate > date('1995-03-15')
2024-03-27 20:34:10.255 UTC [13284] LOG:  execute <unnamed>: BEGIN
2024-03-27 20:34:10.256 UTC [13284] LOG:  execute <unnamed>: SET extra_float_digits = 3
2024-03-27 20:34:10.256 UTC [13284] LOG:  execute <unnamed>: SET application_name = 'PostgreSQL JDBC Driver'
2024-03-27 20:34:10.257 UTC [13284] LOG:  execute <unnamed>: COMMIT
2024-03-27 20:34:10.258 UTC [13284] LOG:  execute <unnamed>: SELECT c_custkey FROM CUSTOMER WHERE c_mktsegment LIKE 'BUILDING%'
2024-03-27 20:34:10.774 UTC [13285] LOG:  execute <unnamed>: BEGIN
2024-03-27 20:34:10.775 UTC [13285] LOG:  execute <unnamed>: SET extra_float_digits = 3
2024-03-27 20:34:10.775 UTC [13285] LOG:  execute <unnamed>: SET application_name = 'PostgreSQL JDBC Driver'
2024-03-27 20:34:10.776 UTC [13285] LOG:  execute <unnamed>: COMMIT
2024-03-27 20:34:10.810 UTC [13285] LOG:  execute <unnamed>: SELECT o_orderkey, o_custkey, o_orderdate, o_shippriority FROM ORDERS WHERE o_orderdate < date('1995-03-15')

And this is the log when using sparksql:

2024-03-27 20:37:25.668 UTC [13302] LOG:  execute <unnamed>: SELECT "c_custkey","c_mktsegment" FROM public.customer  WHERE ("c_custkey" IS NOT NULL)
2024-03-27 20:37:25.701 UTC [13300] LOG:  execute <unnamed>: SELECT "o_orderkey","o_custkey","o_orderdate","o_shippriority" FROM public.orders  WHERE ("o_orderdate" IS NOT NULL) AND ("o_orderdate" < '1995-03-15') AND ("o_custkey" IS NOT NULL) AND ("o_orderkey" IS NOT NULL)
2024-03-27 20:37:25.701 UTC [13301] LOG:  execute <unnamed>: SELECT "l_orderkey","l_extendedprice","l_discount" FROM public.lineitem  WHERE ("l_shipdate" IS NOT NULL) AND ("l_shipdate" > '1995-03-15') AND ("l_orderkey" IS NOT NULL)

In general, postgres does similar computation under the two setups. It seems like sparksql would generate additional filters with "IS NOT NULL", but it won't really filter our any data since the TPC-H dataset does not contain NULL value. In addition it didn't pushdown the LIKE 'BUILDING%' predicate as wayang does, which may cause more data to transfer for spark (although the table is not that big comparing to lineitem).

P.S.
Another thing I found when I check the log is that spark would issue the three sql queries in parallel, while wayang issue them one by one (shows in the timestamp of the log). I tried to enable parallelism in wayang by setting wayang.core.optimizer.enumeration.parallel-tasks = true, however it gives me an exception:

        Exception in thread "Thread-0" java.util.ConcurrentModificationException
	at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
	at org.apache.wayang.core.platform.CrossPlatformExecutor.getOrCreateExecutorFor(CrossPlatformExecutor.java:391)
	at org.apache.wayang.core.platform.CrossPlatformExecutor$ParallelExecutionThread.run(CrossPlatformExecutor.java:1104)
	at java.base/java.lang.Thread.run(Thread.java:834)

I think it is due to the racing on the executors member inside the CrossPlatformExecutor.java.

This can be one of the reason for the performance difference, but I think the later execution difference inside spark platform is more significant in terms of the whole query.

@zkaoudi
Copy link
Contributor

zkaoudi commented Apr 3, 2024

Hi @wangxiaoying,

before digging into the details and just to make sure we are comparing same installations, I was wondering whether you are using the same Spark version for both runs.

@zkaoudi
Copy link
Contributor

zkaoudi commented Apr 3, 2024

Hi again,

I would suggest two things to check:

  1. The type of join that Spark SQL uses. Wayang's current join operator maps to the corresponding join in RDDs, which if I'm not mistaken is implemented as as hash join. Maybe Spark SQL uses a broadcast join and thus, the difference in the data transferred?

  2. I'm not very familiar with the views in Spark, but when one registers the temporary views are they materialized in memory? If so, the timer you have would measure data accessed via memory. But again not sure how the temp views in Spark work. Maybe you could time the registerviews method to check this out.

@kbeedkar
Copy link
Contributor

kbeedkar commented Apr 4, 2024

The performance difference also stems from the current implementation of the Postgres To Spark channel conversion;

public class SqlToRddOperator extends UnaryToUnaryOperator<Record, Record> implements SparkExecutionOperator, JsonSerializable {

@wangxiaoying
Copy link
Author

Hi @wangxiaoying,

before digging into the details and just to make sure we are comparing same installations, I was wondering whether you are using the same Spark version for both runs.

Sorry for the late reply, I was out for last week. Yes, I can confirm this.

@wangxiaoying
Copy link
Author

wangxiaoying commented Apr 8, 2024

  1. The type of join that Spark SQL uses. Wayang's current join operator maps to the corresponding join in RDDs, which if I'm not mistaken is implemented as as hash join. Maybe Spark SQL uses a broadcast join and thus, the difference in the data transferred?

Yes, I think the executed join algorithms are different from the two approaches. Below is the default physical plan generated by spark:

+- == Final Plan ==
   *(8) Sort [revenue#138 DESC NULLS LAST, o_orderdate#58 ASC NULLS FIRST], true, 0
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 5
         +- Exchange rangepartitioning(revenue#138 DESC NULLS LAST, o_orderdate#58 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=448]
            +- *(7) HashAggregate(keys=[l_orderkey#0, o_orderdate#58, o_shippriority#61], functions=[sum((l_extendedprice#5 * (1 - l_discount#6)))])
               +- *(7) HashAggregate(keys=[l_orderkey#0, o_orderdate#58, o_shippriority#61], functions=[partial_sum((l_extendedprice#5 * (1 - l_discount#6)))])
                  +- *(7) Project [o_orderdate#58, o_shippriority#61, l_orderkey#0, l_extendedprice#5, l_discount#6]
                     +- *(7) SortMergeJoin [o_orderkey#54], [l_orderkey#0], Inner
                        :- *(5) Sort [o_orderkey#54 ASC NULLS FIRST], false, 0
                        :  +- AQEShuffleRead coalesced
                        :     +- ShuffleQueryStage 4
                        :        +- Exchange hashpartitioning(o_orderkey#54, 200), ENSURE_REQUIREMENTS, [plan_id=341]
                        :           +- *(4) Project [o_orderkey#54, o_orderdate#58, o_shippriority#61]
                        :              +- *(4) BroadcastHashJoin [c_custkey#36], [o_custkey#55], Inner, BuildLeft, false
                        :                 :- BroadcastQueryStage 3
                        :                 :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=225]
                        :                 :     +- AQEShuffleRead local
                        :                 :        +- ShuffleQueryStage 0
                        :                 :           +- Exchange hashpartitioning(c_custkey#36, 200), ENSURE_REQUIREMENTS, [plan_id=132]
                        :                 :              +- *(1) Project [c_custkey#36]
                        :                 :                 +- *(1) Filter (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, c_mktsegment#42, 10, true, false, true) = BUILDING  )
                        :                 :                    +- *(1) Scan JDBCRelation(public.customer) [numPartitions=1] [c_custkey#36,c_mktsegment#42] PushedFilters: [*IsNotNull(c_custkey)], ReadSchema: struct<c_custkey:int,c_mktsegment:string>
                        :                 +- AQEShuffleRead local
                        :                    +- ShuffleQueryStage 1
                        :                       +- Exchange hashpartitioning(o_custkey#55, 200), ENSURE_REQUIREMENTS, [plan_id=139]
                        :                          +- *(2) Scan JDBCRelation(public.orders) [numPartitions=1] [o_orderkey#54,o_custkey#55,o_orderdate#58,o_shippriority#61] PushedFilters: [*IsNotNull(o_orderdate), *LessThan(o_orderdate,1995-03-15), *IsNotNull(o_custkey), *IsNotNull(o_..., ReadSchema: struct<o_orderkey:int,o_custkey:int,o_orderdate:date,o_shippriority:int>
                        +- *(6) Sort [l_orderkey#0 ASC NULLS FIRST], false, 0
                           +- AQEShuffleRead coalesced
                              +- ShuffleQueryStage 2
                                 +- Exchange hashpartitioning(l_orderkey#0, 200), ENSURE_REQUIREMENTS, [plan_id=150]
                                    +- *(3) Scan JDBCRelation(public.lineitem) [numPartitions=1] [l_orderkey#0,l_extendedprice#5,l_discount#6] PushedFilters: [*IsNotNull(l_shipdate), *GreaterThan(l_shipdate,1995-03-15), *IsNotNull(l_orderkey)], ReadSchema: struct<l_orderkey:int,l_extendedprice:decimal(15,2),l_discount:decimal(15,2)>

I tried to add config: .config("spark.sql.join.preferSortMergeJoin", "false") when building the spark session so the SortMergeJoin above will become a HashJoin, but the performance does not changes much. BoradcastJoin is still used though.

  1. I'm not very familiar with the views in Spark, but when one registers the temporary views are they materialized in memory? If so, the timer you have would measure data accessed via memory. But again not sure how the temp views in Spark work. Maybe you could time the registerviews method to check this out.

Spark uses lazy evaluation so the view creation does not take much time (only some metadata will be fetched). And as I have shown above in the postgres log, spark does fetch the three tables (with projection and filter pushdown) during runtime like wayang does.

I still think one key difference is task serialization, where wayang creates much larger spark tasks (>40MB) that makes serialization overhead no longer negligible, but I'm not sure why such big tasks are created.

@zkaoudi
Copy link
Contributor

zkaoudi commented Apr 8, 2024

Thanks @wangxiaoying. I guess the broadcast join reduces the amount of data shuffled for this specific dataset/query. Could you disable the broadcast join in Spark to make sure if the difference comes from the join only?
Sth like: spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) That would at least make equal the amount of data shuffled to better understand the performance difference.

@wangxiaoying
Copy link
Author

wangxiaoying commented Apr 8, 2024

Thanks @wangxiaoying. I guess the broadcast join reduces the amount of data shuffled for this specific dataset/query. Could you disable the broadcast join in Spark to make sure if the difference comes from the join only? Sth like: spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) That would at least make equal the amount of data shuffled to better understand the performance difference.

Hi @zkaoudi , I set the config of autoBroadcastJoinThreashold to 1048576 (10485760 by default) since setting this value too small will make both joins SortMergeJoin instead of HashJoin (as this post shows). Here is the result plan with two hash joins:

   *(6) Sort [revenue#138 DESC NULLS LAST, o_orderdate#58 ASC NULLS FIRST], true, 0
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 4
         +- Exchange rangepartitioning(revenue#138 DESC NULLS LAST, o_orderdate#58 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=350]
            +- *(5) HashAggregate(keys=[l_orderkey#0, o_orderdate#58, o_shippriority#61], functions=[sum((l_extendedprice#5 * (1 - l_discount#6)))])
               +- *(5) HashAggregate(keys=[l_orderkey#0, o_orderdate#58, o_shippriority#61], functions=[partial_sum((l_extendedprice#5 * (1 - l_discount#6)))])
                  +- *(5) Project [o_orderdate#58, o_shippriority#61, l_orderkey#0, l_extendedprice#5, l_discount#6]
                     +- *(5) ShuffledHashJoin [o_orderkey#54], [l_orderkey#0], Inner, BuildLeft
                        :- AQEShuffleRead coalesced
                        :  +- ShuffleQueryStage 3
                        :     +- Exchange hashpartitioning(o_orderkey#54, 200), ENSURE_REQUIREMENTS, [plan_id=266]
                        :        +- *(4) Project [o_orderkey#54, o_orderdate#58, o_shippriority#61]
                        :           +- *(4) ShuffledHashJoin [c_custkey#36], [o_custkey#55], Inner, BuildLeft
                        :              :- AQEShuffleRead coalesced
                        :              :  +- ShuffleQueryStage 0
                        :              :     +- Exchange hashpartitioning(c_custkey#36, 200), ENSURE_REQUIREMENTS, [plan_id=132]
                        :              :        +- *(1) Project [c_custkey#36]
                        :              :           +- *(1) Filter (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, c_mktsegment#42, 10, true, false,
 true) = BUILDING  )
                        :              :              +- *(1) Scan JDBCRelation(public.customer) [numPartitions=1] [c_custkey#36,c_mktsegment#42] PushedFilters: [*IsNotNull(c_custkey)], ReadSchema: struct<c_c
ustkey:int,c_mktsegment:string>
                        :              +- AQEShuffleRead coalesced
                        :                 +- ShuffleQueryStage 1
                        :                    +- Exchange hashpartitioning(o_custkey#55, 200), ENSURE_REQUIREMENTS, [plan_id=139]
                        :                       +- *(2) Scan JDBCRelation(public.orders) [numPartitions=1] [o_orderkey#54,o_custkey#55,o_orderdate#58,o_shippriority#61] PushedFilters: [*IsNotNull(o_orderdate)
, *LessThan(o_orderdate,1995-03-15), *IsNotNull(o_custkey), *IsNotNull(o_..., ReadSchema: struct<o_orderkey:int,o_custkey:int,o_orderdate:date,o_shippriority:int>
                        +- AQEShuffleRead coalesced
                           +- ShuffleQueryStage 2
                              +- Exchange hashpartitioning(l_orderkey#0, 200), ENSURE_REQUIREMENTS, [plan_id=150]
                                 +- *(3) Scan JDBCRelation(public.lineitem) [numPartitions=1] [l_orderkey#0,l_extendedprice#5,l_discount#6] PushedFilters: [*IsNotNull(l_shipdate), *GreaterThan(l_shipdate,1995
-03-15), *IsNotNull(l_orderkey)], ReadSchema: struct<l_orderkey:int,l_extendedprice:decimal(15,2),l_discount:decimal(15,2)>

The performance does not change much (still ~40s).

@zkaoudi
Copy link
Contributor

zkaoudi commented Apr 11, 2024

To go around the potential performance issue of the Postgres To Spark channel conversion you could add the Java platform and see what you get.
So sth like:
./wayang-1.0.0-SNAPSHOT/bin/wayang-submit org.apache.wayang.apps.tpch.TpcH exp(123) spark,postgres,java file:///path/to/wayang.properties Q3

@wangxiaoying
Copy link
Author

To go around the potential performance issue of the Postgres To Spark channel conversion you could add the Java platform and see what you get. So sth like: ./wayang-1.0.0-SNAPSHOT/bin/wayang-submit org.apache.wayang.apps.tpch.TpcH exp(123) spark,postgres,java file:///path/to/wayang.properties Q3

Thanks @zkaoudi for the suggestion. Adding java improves the overall performance (since the final plan won't involve the usage of spark). It results in 44 seconds (similar but still a little bit inferior comparing to sparksql (40 seconds).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants