Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Mountpoint S3 README fix #617

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/bin/bash

# Spark Variables
# NOTE: The Spark version needs to have compatible dependency Hadoop and AWS SDK JAR version files.
HADOOP_VERSION="3.3.1" # Replace with your desired Hadoop version
AWS_SDK_VERSION="1.12.647" # Replace with your desired AWS SDK version

# S3 Variables
S3_BUCKET_NAME="<S3_BUCKET_NAME>" # Replace with your S3 bucket name
# The folder name in the S3 bucket where the JAR files will be stored
FOLDER_NAME="jars"

# Python SparkApplication
PYTHON_SCRIPT_NAME="pyspark-taxi-trip.py"

# JAR file URLs
HADOOP_URL="https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar"
AWS_SDK_URL="https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar"

# Create folder in S3 bucket
aws s3api put-object --bucket "${S3_BUCKET_NAME}" --key "${FOLDER_NAME}/"
if [ $? -ne 0 ]; then
echo "Failed to create folder in S3 bucket. Exit status: $?"
exit 1
else
echo "Folder ${FOLDER_NAME} already created in S3 bucket ${S3_BUCKET_NAME}"
fi

# Download JAR files if they do not exist locally
if [ ! -f "hadoop-aws-${HADOOP_VERSION}.jar" ]; then
wget -O "hadoop-aws-${HADOOP_VERSION}.jar" "${HADOOP_URL}"
if [ $? -ne 0 ]; then
echo "Failed to download hadoop-aws-${HADOOP_VERSION}.jar. Exit status: $?"
exit 1
else
echo "Downloaded hadoop-aws-${HADOOP_VERSION}.jar successfully"
fi
else
echo "hadoop-aws-${HADOOP_VERSION}.jar already exists locally, skipping download"
fi

if [ ! -f "aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar" ]; then
wget -O "aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar" "${AWS_SDK_URL}"
if [ $? -ne 0 ]; then
echo "Failed to download aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar. Exit status: $?"
exit 1
else
echo "Downloaded aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar successfully"
fi
else
echo "aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar already exists locally, skipping download"
fi

# Upload JAR files to S3 bucket
aws s3 cp "hadoop-aws-${HADOOP_VERSION}.jar" "s3://${S3_BUCKET_NAME}/${FOLDER_NAME}/"
if [ $? -ne 0 ]; then
echo "Failed to upload hadoop-aws-${HADOOP_VERSION}.jar to S3. Exit status: $?"
exit 1
else
echo "Uploaded hadoop-aws-${HADOOP_VERSION}.jar to S3 successfully"
fi

aws s3 cp "aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar" "s3://${S3_BUCKET_NAME}/${FOLDER_NAME}/"
if [ $? -ne 0 ]; then
echo "Failed to upload aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar to S3. Exit status: $?"
exit 1
else
echo "Uploaded aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar to S3 successfully"
fi

# Upload the Python script to S3 bucket
aws s3 cp "${PYTHON_SCRIPT_NAME}" "s3://${S3_BUCKET_NAME}/${FOLDER_NAME}/"
if [ $? -ne 0 ]; then
echo "Failed to upload ${PYTHON_SCRIPT_NAME} to S3. Exit status: $?"
exit 1
else
echo "Uploaded ${PYTHON_SCRIPT_NAME} to S3 successfully"
fi

# Clean up downloaded files
rm "hadoop-aws-${HADOOP_VERSION}.jar" "aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar"
if [ $? -ne 0 ]; then
echo "Failed to remove local JAR files. Exit status: $?"
exit 1
else
echo "Removed local JAR files successfully"
fi

# List contents of the newly created folder in S3 bucket
aws s3 ls "s3://${S3_BUCKET_NAME}/${FOLDER_NAME}/"
if [ $? -ne 0 ]; then
echo "Failed to list contents of s3://${S3_BUCKET_NAME}/${FOLDER_NAME}/. Exit status: $?"
exit 1
else
echo "Contents of s3://${S3_BUCKET_NAME}/${FOLDER_NAME}/ listed successfully"
fi

echo "Script completed successfully."
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: s3-mount-script
namespace: spark-team-a
data:
monitor_s3_mount.sh: |
#!/bin/bash

set -e # Exit immediately if a command exits with a non-zero status

# ENVIRONMENT VARIABLES
LOG_FILE="/var/log/s3-mount.log"
S3_BUCKET_NAME="<S3_BUCKET_NAME>" # Replace with your S3 Bucket Name before applying to EKS cluster
MOUNT_POINT="/mnt/s3"
CACHE_DIR="/tmp"
MOUNT_S3_BIN="/usr/bin/mount-s3"
MOUNT_S3_URL="https://s3.amazonaws.com/mountpoint-s3-release/latest/x86_64/mount-s3.rpm"

# Function to install mount-s3
install_mount_s3() {
echo "$(date): Installing mount-s3" | tee -a $LOG_FILE
yum update -y | tee -a $LOG_FILE
yum install -y wget util-linux | tee -a $LOG_FILE
wget $MOUNT_S3_URL -O /tmp/mount-s3.rpm | tee -a $LOG_FILE
yum install -y /tmp/mount-s3.rpm | tee -a $LOG_FILE
}

# Function to mount S3 bucket
mount_s3_bucket() {
echo "$(date): Mounting S3 bucket: $S3_BUCKET_NAME to $MOUNT_POINT" | tee -a $LOG_FILE
$MOUNT_S3_BIN --metadata-ttl indefinite --allow-other --cache $CACHE_DIR $S3_BUCKET_NAME $MOUNT_POINT | tee -a $LOG_FILE
if [ $? -ne 0 ]; then
echo "$(date): Failed to mount S3 bucket: $S3_BUCKET_NAME" | tee -a $LOG_FILE
exit 1
fi
}

# Ensure the mount point directory exists
ensure_mount_point() {
if [ ! -d $MOUNT_POINT ]; then
echo "$(date): Creating mount point directory: $MOUNT_POINT" | tee -a $LOG_FILE
mkdir -p $MOUNT_POINT
fi
}

# Install mount-s3
install_mount_s3

# Continuous monitoring and remounting loop
while true; do
echo "$(date): Checking if S3 bucket is mounted" | tee -a $LOG_FILE
ensure_mount_point
if mount | grep $MOUNT_POINT > /dev/null; then
echo "$(date): S3 bucket is already mounted" | tee -a $LOG_FILE
if ! ls $MOUNT_POINT > /dev/null 2>&1; then
echo "$(date): Transport endpoint is not connected, remounting S3 bucket" | tee -a $LOG_FILE
fusermount -u $MOUNT_POINT || echo "$(date): Failed to unmount S3 bucket" | tee -a $LOG_FILE
rm -rf $MOUNT_POINT || echo "$(date): Failed to remove mount point directory" | tee -a $LOG_FILE
ensure_mount_point
mount_s3_bucket
fi
else
echo "$(date): S3 bucket is not mounted, mounting now" | tee -a $LOG_FILE
mount_s3_bucket
fi
sleep 60 # Check every 60 seconds
done

---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: s3-mount-daemonset
namespace: spark-team-a
spec:
selector:
matchLabels:
name: s3-mount-daemonset
template:
metadata:
labels:
name: s3-mount-daemonset
spec:
hostPID: true
hostIPC: true
hostNetwork: true
volumes:
- name: script
configMap:
name: s3-mount-script
- name: host-root
hostPath:
path: /
type: Directory
restartPolicy: Always
containers:
- name: s3-mount
image: amazonlinux:2
volumeMounts:
- name: script
mountPath: /config
- name: host-root
mountPath: /host
mountPropagation: Bidirectional
securityContext:
privileged: true
command:
- /bin/bash
- -c
- |
set -e
echo "Starting s3-mount"
yum install -y util-linux
echo "Copying script to /usr/bin"
cp /config/monitor_s3_mount.sh /host/usr/bin/monitor_s3_mount.sh
chmod +x /host/usr/bin/monitor_s3_mount.sh
echo "Verifying the copied script"
ls -lha /host/usr/bin/monitor_s3_mount.sh
echo "Running the script in Host space"
nsenter --target 1 --mount --uts --ipc --net --pid ./usr/bin/monitor_s3_mount.sh
echo "Done"
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Pre-requisite before running this job
# Replace <S3_BUCKET_NAME> with your S3 bucket created by this blueprint(Check Terraform outputs)
# Verify that the version for spark matches

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: "taxi-trip"
namespace: spark-team-a
labels:
app: "taxi-trip"
applicationId: "taxi-trip-nvme"
queue: root.test
spec:
type: Python
sparkVersion: "3.4.0" # Mountpoint-S3 Configuration
pythonVersion: "3"
mode: cluster
image: "apache/spark-py:v3.4.0" # Mountpoint-S3 Configuration
imagePullPolicy: IfNotPresent
mainApplicationFile: "local:///mnt/s3/jars/pyspark-taxi-trip.py" # MainFile is the path to a bundled JAR, Python, or R file of the application
deps:
jars:
- "local:///mnt/s3/jars/aws-java-sdk-bundle-1.12.647.jar" # Mountpoint-S3 Configuration
- "local:///mnt/s3/jars/hadoop-aws-3.3.1.jar" # Mountpoint-S3 Configuration
volumes:
- name: spark-volume
hostPath:
path: /mnt/s3/jars # Mountpoint-S3 Configuration
type: Directory
arguments:
- "s3a://<S3_BUCKET_NAME>/taxi-trip/input/"
- "s3a://<S3_BUCKET_NAME>/taxi-trip/output/"
hadoopConf:
"fs.s3a.aws.credentials.provider": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
"fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
"mapreduce.fileoutputcommitter.algorithm.version": "2"
sparkConf:
"spark.app.name": "taxi-trip"
"spark.driver.extraClassPath": "mnt/s3/jars/*" # Mountpoint-S3 Configuration
"spark.executor.extraClassPath": "mnt/s3/jars/*" # Mountpoint-S3 Configuration
"spark.kubernetes.driver.pod.name": "taxi-trip"
"spark.kubernetes.executor.podNamePrefix": "taxi-trip"
"spark.speculation": "false"
"spark.network.timeout": "2400"
"spark.hadoop.fs.s3a.connection.timeout": "1200000"
"spark.hadoop.fs.s3a.path.style.access": "true"
"spark.hadoop.fs.s3a.connection.maximum": "200"
"spark.hadoop.fs.s3a.fast.upload": "true"
"spark.hadoop.fs.s3a.readahead.range": "256K"
"spark.hadoop.fs.s3a.input.fadvise": "random"
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"

# Spark Event logs
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "s3a://<S3_BUCKET_NAME>/spark-event-logs"
"spark.eventLog.rolling.enabled": "true"
"spark.eventLog.rolling.maxFileSize": "64m"

# Expose Spark metrics for Prometheus
"spark.ui.prometheus.enabled": "true"
"spark.executor.processTreeMetrics.enabled": "true"
"spark.kubernetes.driver.annotation.prometheus.io/scrape": "true"
"spark.kubernetes.driver.annotation.prometheus.io/path": "/metrics/executors/prometheus/"
"spark.kubernetes.driver.annotation.prometheus.io/port": "4040"
"spark.kubernetes.driver.service.annotation.prometheus.io/scrape": "true"
"spark.kubernetes.driver.service.annotation.prometheus.io/path": "/metrics/driver/prometheus/"
"spark.kubernetes.driver.service.annotation.prometheus.io/port": "4040"
"spark.metrics.conf.*.sink.prometheusServlet.class": "org.apache.spark.metrics.sink.PrometheusServlet"
"spark.metrics.conf.*.sink.prometheusServlet.path": "/metrics/driver/prometheus/"
"spark.metrics.conf.master.sink.prometheusServlet.path": "/metrics/master/prometheus/"
"spark.metrics.conf.applications.sink.prometheusServlet.path": "/metrics/applications/prometheus/"

restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
podSecurityContext:
fsGroup: 2000 # Group ID
runAsUser: 1000 # User ID
cores: 1
coreLimit: "1200m"
volumeMounts:
- mountPath: /mnt/s3/jars # Mountpoint-S3 Configuration
name: spark-volume
memory: "4g"
memoryOverhead: "4g"
serviceAccount: spark-team-a
labels:
version: 3.4.0
nodeSelector:
multiArch: Spark

executor:
podSecurityContext:
fsGroup: 2000 # Group ID
runAsUser: 1000 # User ID
cores: 1
volumeMounts:
- mountPath: /mnt/s3/jars # Mountpoint-S3 Configuration
name: spark-volume
coreLimit: "3400m"
instances: 4
memory: "4g"
memoryOverhead: "4g"
serviceAccount: spark-team-a
labels:
version: 3.4.0
nodeSelector:
multiArch: Spark
Loading
Loading