Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Spark Benchmarks on Graviton R-series #701

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
```dockerfile
# Use the official Spark base image with Java 17 and Python 3
FROM apache/spark:3.5.3-scala2.12-java17-python3-ubuntu

# Arguments for version control
ARG HADOOP_VERSION=3.4.1
ARG AWS_SDK_VERSION=2.29.0
ARG SPARK_UID=185

# Set environment variables
ENV SPARK_HOME=/opt/spark

# Set up as root to install dependencies and tools
USER root

# Install necessary build tools and specific sbt version 0.13.18
RUN apt-get update && \
apt-get install -y \
gcc \
make \
flex \
bison \
git \
openjdk-17-jdk \
wget \
curl && \
# Install sbt 0.13.18
wget https://github.com/sbt/sbt/releases/download/v0.13.18/sbt-0.13.18.tgz && \
tar -xzf sbt-0.13.18.tgz -C /usr/local && \
ln -s /usr/local/sbt/bin/sbt /usr/local/bin/sbt && \
# Cleanup
rm sbt-0.13.18.tgz && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Clone and compile TPC-DS toolkit
WORKDIR /opt
RUN git clone https://github.com/databricks/tpcds-kit.git && \
cd tpcds-kit/tools && \
make OS=LINUX && \
chmod +x dsdgen dsqgen

# Clone the SQL perf library and related files
# Change the branch from delta to tpcds-v2.13 for latest
RUN git clone -b delta https://github.com/aws-samples/emr-on-eks-benchmark.git /tmp/emr-on-eks-benchmark

# Build the Databricks SQL perf library
RUN cd /tmp/emr-on-eks-benchmark/spark-sql-perf && sbt +package

# Use the compiled Databricks SQL perf library to build benchmark utility
RUN cd /tmp/emr-on-eks-benchmark/ && \
mkdir -p /tmp/emr-on-eks-benchmark/benchmark/libs && \
cp /tmp/emr-on-eks-benchmark/spark-sql-perf/target/scala-2.12/*.jar /tmp/emr-on-eks-benchmark/benchmark/libs && \
cd /tmp/emr-on-eks-benchmark/benchmark && sbt assembly

# Remove any old Hadoop libraries
RUN rm -f ${SPARK_HOME}/jars/hadoop-client-* && \
rm -f ${SPARK_HOME}/jars/hadoop-yarn-server-web-proxy-*.jar

# Add Hadoop AWS connector and AWS SDK for S3A support, along with hadoop-common dependencies
# TODO: hadoop-common, hadoop-yarn-server-web-proxy might not be required. Remove these and test it.
RUN cd ${SPARK_HOME}/jars && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/${HADOOP_VERSION}/hadoop-client-api-${HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/${HADOOP_VERSION}/hadoop-client-runtime-${HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/${HADOOP_VERSION}/hadoop-common-${HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-yarn-server-web-proxy/${HADOOP_VERSION}/hadoop-yarn-server-web-proxy-${HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/${AWS_SDK_VERSION}/bundle-${AWS_SDK_VERSION}.jar

# Create directory for TPC-DS data and set permissions
RUN mkdir -p /opt/tpcds-data && \
chown -R ${SPARK_UID}:${SPARK_UID} /opt/tpcds-data

# Copy the built JARs to Spark's jars directory
RUN mkdir -p ${SPARK_HOME}/examples/jars/ && \
cp /tmp/emr-on-eks-benchmark/benchmark/target/scala-2.12/*jar ${SPARK_HOME}/examples/jars/ && \
chown -R ${SPARK_UID}:${SPARK_UID} ${SPARK_HOME}/examples

# Set working directory
WORKDIR ${SPARK_HOME}

# Switch to non-root user
USER ${SPARK_UID}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
```
spark_benchmark_ebs = {
name = "spark_benchmark_ebs"
description = "Managed node group for Spark Benchmarks with EBS using x86 or ARM"
# Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the nodes/node groups will be provisioned
subnet_ids = [element(compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) :
substr(cidr_block, 0, 4) == "100." ? subnet_id : null]), 0)
]

# Change ami_type= AL2023_x86_64_STANDARD for x86 instances
ami_type = "AL2023_ARM_64_STANDARD" # arm64

# Node group will be created with zero instances when you deploy the blueprint.
# You can change the min_size and desired_size to 6 instances
# desired_size might not be applied through terrafrom once the node group is created so this needs to be adjusted in AWS Console.
min_size = 0 # Change min and desired to 6 for running benchmarks
max_size = 8
desired_size = 0 # Change min and desired to 6 for running benchmarks

# This storage is used as a shuffle for non NVMe SSD instances. e.g., r8g instances
block_device_mappings = {
xvda = {
device_name = "/dev/xvda"
ebs = {
volume_size = 300
volume_type = "gp3"
iops = 3000
encrypted = true
delete_on_termination = true
}
}
}

# Change the instance type as you desire and match with ami_type
instance_types = ["r8g.12xlarge"] # Change Instance type to run the benchmark with various instance types

labels = {
NodeGroupType = "spark_benchmark_ebs"
}

tags = {
Name = "spark_benchmark_ebs"
NodeGroupType = "spark_benchmark_ebs"
}
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
```yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: tpcds-benchmark-1tb-ebs # Change for each test with instancetype etc,
namespace: spark-team-a
spec:
# Temporarily commented out until the YuniKorn issue is resolved; falls back to the default Kubernetes scheduler
# batchScheduler: yunikorn
# batchSchedulerOptions:
# queue: root.default
type: Scala
mode: cluster
image: public.ecr.aws/data-on-eks/spark3.5.3-scala2.12-java17-python3-ubuntu-tpcds:v2
imagePullPolicy: IfNotPresent
sparkVersion: 3.5.3
mainClass: com.amazonaws.eks.tpcds.BenchmarkSQL
mainApplicationFile: local:///opt/spark/examples/jars/eks-spark-benchmark-assembly-1.0.jar
arguments:
# TPC-DS data location
- "s3a://<S3_BUCKET>/TPCDS-TEST-1TB"
# results location
- "s3a://<S3_BUCKET>/TPCDS-TEST-1T-RESULT"
# Path to kit in the docker image
- "/opt/tpcds-kit/tools"
# Data Format
- "parquet"
# Scale factor (in GB)
- "1000" # changed from 3000 to 100gb for demo
# Number of iterations
- "1"
# Optimize queries with hive tables
- "false"
# Filter queries, will run all if empty - "q98-v2.4,q99-v2.4,ss_max-v2.4,q95-v2.4"
- ""
# Logging set to WARN
- "true"
sparkConf:
# Expose Spark metrics for Prometheus
"spark.ui.prometheus.enabled": "true"
"spark.executor.processTreeMetrics.enabled": "true"
"spark.metrics.conf.*.sink.prometheusServlet.class": "org.apache.spark.metrics.sink.PrometheusServlet"
"spark.metrics.conf.driver.sink.prometheusServlet.path": "/metrics/driver/prometheus/"
"spark.metrics.conf.executor.sink.prometheusServlet.path": "/metrics/executors/prometheus/"

# Spark Event logs
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "s3a://<S3_BUCKET>/spark-event-logs"
"spark.eventLog.rolling.enabled": "true"
"spark.eventLog.rolling.maxFileSize": "64m"

"spark.network.timeout": "2000s"
"spark.executor.heartbeatInterval": "300s"
# AQE
"spark.sql.adaptive.enabled": "true"
"spark.sql.adaptive.localShuffleReader.enabled": "true"
"spark.sql.adaptive.coalescePartitions.enabled": "true"
"spark.sql.adaptive.skewJoin.enabled": "true"
"spark.kubernetes.executor.podNamePrefix": "benchmark-exec-ebs"
# S3 Optimizations
# "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" # This is using AWS SDK V1 in maintenance mode
"spark.hadoop.fs.s3a.aws.credentials.provider.mapping": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider=software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider"
"spark.hadoop.fs.s3a.aws.credentials.provider": "software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider" # AWS SDK V2 https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/aws_sdk_upgrade.html
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
"spark.hadoop.fs.s3a.fast.upload": "true"
"spark.hadoop.fs.s3a.path.style.access": "true"
"spark.hadoop.fs.s3a.fast.upload.buffer": "disk"
"spark.hadoop.fs.s3a.buffer.dir": "/tmp/s3a"
"spark.hadoop.fs.s3a.multipart.size": "128M" # Good for large files
"spark.hadoop.fs.s3a.multipart.threshold": "256M"
"spark.hadoop.fs.s3a.threads.max": "50"
"spark.hadoop.fs.s3a.connection.maximum": "200"

"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2"
"spark.executor.defaultJavaOptions": "-verbose:gc -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70"
# "spark.hadoop.fs.s3a.readahead.range": "256K"

# -----------------------------------------------------
# This block is very critical when you get errors like
# Exception in thread \"main\" io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred
# Caused by: java.net.SocketTimeoutException: timeout
# spark.kubernetes.local.dirs.tmpfs: "true" # More details here https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-ram-for-local-storage
spark.kubernetes.submission.connectionTimeout: "120000" # milliseconds
spark.kubernetes.submission.requestTimeout: "120000"
spark.kubernetes.driver.connectionTimeout: "120000"
spark.kubernetes.driver.requestTimeout: "120000"
# spark.kubernetes.allocation.batch.size: "20" # default 5 but adjust according to your cluster size
# -----------------------------------------------------
# S3 Optimizations
"spark.hadoop.fs.s3a.multipart.size": "67108864" # 64 MB part size for S3 uploads
"spark.hadoop.fs.s3a.threads.max": "40" # Limit S3 threads for optimized throughput
"spark.hadoop.fs.s3a.connection.maximum": "100" # Set max connections for S3

# Data writing and shuffle tuning
"spark.shuffle.file.buffer": "1m" # Increase shuffle buffer for better disk I/O
"spark.reducer.maxSizeInFlight": "48m" # Increase reducer buffer size in-flight data

# Optional: Tuning multipart upload threshold
"spark.hadoop.fs.s3a.multipart.purge": "true" # Automatically clear failed multipart uploads
"spark.hadoop.fs.s3a.multipart.threshold": "134217728" # 128 MB threshold to start multi-part upload
driver:
cores: 5
memory: "20g"
memoryOverhead: "6g"
serviceAccount: spark-team-a
securityContext:
runAsUser: 185
env:
- name: JAVA_HOME
value: "/opt/java/openjdk"
nodeSelector:
NodeGroupType: spark_benchmark_ebs
executor:
cores: 5
memory: "20g"
memoryOverhead: "6g"
# 8 executors per node
instances: 36 # 6 pods per node; 6 nodes with EKS Managed Node group
serviceAccount: spark-team-a
securityContext:
runAsUser: 185
env:
- name: JAVA_HOME
value: "/opt/java/openjdk"
nodeSelector:
NodeGroupType: spark_benchmark_ebs
restartPolicy:
type: Never
```
Loading
Loading