Skip to content

Commit e3f1a6b

Browse files
ratnopamcvara-bonthunabuskey
authored
feat: Enhance Spark operator blueprint to add S3 Tables support (#721)
Signed-off-by: Manabu McCloskey <[email protected]> Co-authored-by: Vara Bonthu <[email protected]> Co-authored-by: Manabu McCloskey <[email protected]>
1 parent e811724 commit e3f1a6b

File tree

11 files changed

+530
-5
lines changed

11 files changed

+530
-5
lines changed

analytics/terraform/spark-k8s-operator/README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/
4343
|------|------|
4444
| [aws_eks_access_entry.karpenter_nodes](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/eks_access_entry) | resource |
4545
| [aws_iam_policy.grafana](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
46+
| [aws_iam_policy.s3tables](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
47+
| [aws_iam_policy.s3tables_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
4648
| [aws_iam_policy.spark](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
4749
| [aws_prometheus_workspace.amp](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/prometheus_workspace) | resource |
4850
| [aws_s3_object.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
@@ -61,6 +63,7 @@ Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/
6163
| [aws_ecrpublic_authorization_token.token](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecrpublic_authorization_token) | data source |
6264
| [aws_eks_cluster_auth.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/eks_cluster_auth) | data source |
6365
| [aws_iam_policy_document.grafana](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
66+
| [aws_iam_policy_document.s3tables_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
6467
| [aws_iam_policy_document.spark_operator](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
6568
| [aws_iam_session_context.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_session_context) | data source |
6669
| [aws_partition.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/partition) | data source |
@@ -71,13 +74,13 @@ Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/
7174

7275
| Name | Description | Type | Default | Required |
7376
|------|-------------|------|---------|:--------:|
74-
| <a name="input_eks_cluster_version"></a> [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS Cluster version | `string` | `"1.30"` | no |
77+
| <a name="input_eks_cluster_version"></a> [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS Cluster version | `string` | `"1.31"` | no |
7578
| <a name="input_eks_data_plane_subnet_secondary_cidr"></a> [eks\_data\_plane\_subnet\_secondary\_cidr](#input\_eks\_data\_plane\_subnet\_secondary\_cidr) | Secondary CIDR blocks. 32766 IPs per Subnet per Subnet/AZ for EKS Node and Pods | `list(string)` | <pre>[<br> "100.64.0.0/17",<br> "100.64.128.0/17"<br>]</pre> | no |
7679
| <a name="input_enable_amazon_prometheus"></a> [enable\_amazon\_prometheus](#input\_enable\_amazon\_prometheus) | Enable AWS Managed Prometheus service | `bool` | `true` | no |
7780
| <a name="input_enable_vpc_endpoints"></a> [enable\_vpc\_endpoints](#input\_enable\_vpc\_endpoints) | Enable VPC Endpoints | `bool` | `false` | no |
78-
| <a name="input_enable_yunikorn"></a> [enable\_yunikorn](#input\_enable\_yunikorn) | Enable Apache YuniKorn Scheduler | `bool` | `true` | no |
81+
| <a name="input_enable_yunikorn"></a> [enable\_yunikorn](#input\_enable\_yunikorn) | Enable Apache YuniKorn Scheduler | `bool` | `false` | no |
7982
| <a name="input_kms_key_admin_roles"></a> [kms\_key\_admin\_roles](#input\_kms\_key\_admin\_roles) | list of role ARNs to add to the KMS policy | `list(string)` | `[]` | no |
80-
| <a name="input_name"></a> [name](#input\_name) | Name of the VPC and EKS Cluster | `string` | `"spark-operator-doeks"` | no |
83+
| <a name="input_name"></a> [name](#input\_name) | Name of the VPC and EKS Cluster | `string` | `"spark-eks-s3tables"` | no |
8184
| <a name="input_private_subnets"></a> [private\_subnets](#input\_private\_subnets) | Private Subnets CIDRs. 254 IPs per Subnet/AZ for Private NAT + NLB + Airflow + EC2 Jumphost etc. | `list(string)` | <pre>[<br> "10.1.1.0/24",<br> "10.1.2.0/24"<br>]</pre> | no |
8285
| <a name="input_public_subnets"></a> [public\_subnets](#input\_public\_subnets) | Public Subnets CIDRs. 62 IPs per Subnet/AZ | `list(string)` | <pre>[<br> "10.1.0.0/26",<br> "10.1.0.64/26"<br>]</pre> | no |
8386
| <a name="input_region"></a> [region](#input\_region) | Region | `string` | `"us-west-2"` | no |

analytics/terraform/spark-k8s-operator/addons.tf

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@ module "eks_blueprints_addons" {
502502
karpenter_node = {
503503
iam_role_additional_policies = {
504504
AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
505+
S3TableAccess = aws_iam_policy.s3tables_policy.arn
505506
}
506507
}
507508
karpenter = {
@@ -644,3 +645,44 @@ resource "aws_secretsmanager_secret_version" "grafana" {
644645
secret_id = aws_secretsmanager_secret.grafana.id
645646
secret_string = random_password.grafana.result
646647
}
648+
649+
#---------------------------------------------------------------
650+
# S3Table IAM policy for Karpenter nodes
651+
#---------------------------------------------------------------
652+
resource "aws_iam_policy" "s3tables_policy" {
653+
name_prefix = "${local.name}-s3tables"
654+
path = "/"
655+
description = "S3Tables Metadata access for Nodes"
656+
657+
policy = jsonencode({
658+
Version = "2012-10-17"
659+
Statement = [
660+
{
661+
Sid = "VisualEditor0"
662+
Effect = "Allow"
663+
Action = [
664+
"s3tables:UpdateTableMetadataLocation",
665+
"s3tables:GetNamespace",
666+
"s3tables:GetTableBucket",
667+
"s3tables:GetTableBucketMaintenanceConfiguration",
668+
"s3tables:GetTableBucketPolicy"
669+
]
670+
Resource = "arn:aws:s3tables:*:${data.aws_caller_identity.current.account_id}:bucket/*"
671+
},
672+
{
673+
Sid = "VisualEditor1"
674+
Effect = "Allow"
675+
Action = [
676+
"s3tables:GetTableMaintenanceJobStatus",
677+
"s3tables:GetTablePolicy",
678+
"s3tables:GetTable",
679+
"s3tables:GetTableMetadataLocation",
680+
"s3tables:UpdateTableMetadataLocation",
681+
"s3tables:GetTableData",
682+
"s3tables:GetTableMaintenanceConfiguration"
683+
]
684+
Resource = "arn:aws:s3tables:*:${data.aws_caller_identity.current.account_id}:bucket/*/table/*"
685+
}
686+
]
687+
})
688+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#--------------------------------------------------------------------------------------------
2+
# Dockerfile for Apache Spark 3.3.1 with S3A Support on multi-arch platforms (AMD64 & ARM64)
3+
#--------------------------------------------------------------------------------------------
4+
# Step1: Create a Private or Public ECR repo from AWS Console or CLI
5+
# e.g., aws ecr-public create-repository --repository-name spark --region us-east-1
6+
#---
7+
# Step2: Docker Login:
8+
# aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws/<repoAlias>
9+
#---
10+
# Step3: Build multi arch image and push it to ECR:
11+
# docker buildx build --platform linux/amd64,linux/arm64 -t public.ecr.aws/<repoAlias>/spark:3.5.3-scala2.12-java17-python3-ubuntu --push .
12+
#--------------------------------------------------------------------------------------------
13+
14+
# Use the official Spark base image with Java 17 and Python 3, version 3.5.3
15+
FROM apache/spark:3.5.3-scala2.12-java17-python3-ubuntu
16+
17+
# Arguments for version control
18+
ARG HADOOP_VERSION=3.4.1
19+
ARG PREV_HADOOP_VERSION=3.3.4
20+
ARG AWS_SDK_VERSION=2.29.45
21+
ARG ICEBERG_VERSION=1.6.1
22+
ARG S3_TABLES_VERSION=0.1.3
23+
ARG SPARK_UID=185
24+
25+
# Set environment variables
26+
ENV SPARK_HOME=/opt/spark
27+
28+
# Set up as root to install dependencies and tools
29+
USER root
30+
31+
# Remove any old Hadoop libraries to avoid conflicts
32+
RUN rm -f ${SPARK_HOME}/jars/hadoop-client-* && \
33+
rm -f ${SPARK_HOME}/jars/hadoop-yarn-server-web-proxy-*.jar
34+
35+
# Add Hadoop AWS connector and related Hadoop dependencies
36+
RUN cd ${SPARK_HOME}/jars && \
37+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O hadoop-aws-${PREV_HADOOP_VERSION}.jar && \
38+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/${HADOOP_VERSION}/hadoop-client-api-${HADOOP_VERSION}.jar -O hadoop-client-api-${PREV_HADOOP_VERSION}.jar && \
39+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/${HADOOP_VERSION}/hadoop-client-runtime-${HADOOP_VERSION}.jar -O hadoop-client-runtime-${PREV_HADOOP_VERSION}.jar && \
40+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/${HADOOP_VERSION}/hadoop-common-${HADOOP_VERSION}.jar -O hadoop-common-${PREV_HADOOP_VERSION}.jar && \
41+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-yarn-server-web-proxy/${HADOOP_VERSION}/hadoop-yarn-server-web-proxy-${HADOOP_VERSION}.jar -O hadoop-yarn-server-web-proxy-${PREV_HADOOP_VERSION}.jar
42+
43+
# Add Iceberg, AWS SDK bundle, and S3 Tables Catalog for Iceberg runtime
44+
RUN cd ${SPARK_HOME}/jars && \
45+
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar && \
46+
wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/${AWS_SDK_VERSION}/bundle-${AWS_SDK_VERSION}.jar && \
47+
wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/${S3_TABLES_VERSION}/s3-tables-catalog-for-iceberg-runtime-${S3_TABLES_VERSION}.jar
48+
49+
# Set working directory to Spark home
50+
WORKDIR ${SPARK_HOME}
51+
52+
# Switch to non-root user for security best practices
53+
USER ${SPARK_UID}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# S3Table with OSS Spark on EKS Guide
2+
3+
This guide provides step-by-step instructions for setting up and running a Spark job on Amazon EKS using S3Table for data storage.
4+
5+
## Prerequisites
6+
7+
- Latest version of AWS CLI installed (must include S3Tables API support)
8+
9+
## Step 1: Deploy Spark Cluster on EKS
10+
11+
Follow the steps to deploy Spark Cluster on EKS
12+
13+
[Spark Operator on EKS with YuniKorn Scheduler](https://awslabs.github.io/data-on-eks/docs/blueprints/data-analytics/spark-operator-yunikorn#prerequisites)
14+
15+
Once your cluster is up and running, proceed with the following steps to execute a sample Spark job using S3Tables.
16+
17+
## Step 2: Create Test Data for the job
18+
19+
Navigate to the example directory and Generate sample data:
20+
21+
```sh
22+
cd analytics/terraform/spark-k8s-operator/examples/s3-tables
23+
./input-data-gen.sh
24+
```
25+
26+
This will create a file called `employee_data.csv` locally with 100 records. Modify the script to adjust the number of records as needed.
27+
28+
## Step 3: Upload Test Input data to your S3 Bucket
29+
30+
Replace `<YOUR_S3_BUCKET>` with the name of the S3 bucket created by your blueprint and run the below command.
31+
32+
```sh
33+
aws s3 cp employee_data.csv s3://<S3_BUCKET>/s3table-example/input/
34+
```
35+
36+
## Step 4: Upload PySpark Script to S3 Bucket
37+
38+
Replace `<S3_BUCKET>` with the name of the S3 bucket created by your blueprint and run the below command to upload sample Spark job to S3 buckets.
39+
40+
```sh
41+
aws s3 cp s3table-iceberg-pyspark.py s3://<S3_BUCKET>/s3table-example/scripts/
42+
```
43+
44+
## Step 5: Create S3Table
45+
46+
Replace <REGION> and <S3TABLE_BUCKET_NAME> with desired names.
47+
48+
```sh
49+
aws s3tables create-table-bucket \
50+
--region "<REGION>" \
51+
--name "<S3TABLE_BUCKET_NAME>"
52+
```
53+
54+
Make note of the S3TABLE ARN generated by this command.
55+
56+
## Step 6: Update Spark Operator YAML File
57+
58+
- Open `s3table-spark-operator.yaml` file in your preferred text editor.
59+
- Replace `<S3_BUCKET>` with your S3 bucket created by this blueprint(Check Terraform outputs). S3 Bucket where you copied test data and sample spark job in the above steps.
60+
- REPLACE `<S3TABLE_ARN>` with your S3 Table ARN.
61+
62+
## Step 7: Execute Spark Job
63+
64+
Apply the updated YAML file to your Kubernetes cluster to submit the Spark Job.
65+
66+
```sh
67+
cd analytics/terraform/spark-k8s-operator/examples/s3-tables
68+
kubectl apply -f s3table-spark-operator.yaml
69+
```
70+
71+
## Step 8: Verify the Spark Driver log for the output
72+
73+
Check the Spark driver logs to verify job progress and output:
74+
75+
```sh
76+
kubectl logs <spark-driver-pod-name> -n spark-team-a
77+
```
78+
79+
## Step 9: Verify the S3Table using S3Table API
80+
81+
Use the S3Table API to confirm the table was created successfully. Just replace the `<ACCOUNT_ID>` and run the command.
82+
83+
```sh
84+
aws s3tables get-table --table-bucket-arn arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table --namespace doeks_namespace --name employee_s3_table
85+
```
86+
87+
Output looks like below.
88+
89+
```json
90+
{
91+
"name": "employee_s3_table",
92+
"type": "customer",
93+
"tableARN": "arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table/table/55511111-7a03-4513-b921-e372b0030daf",
94+
"namespace": [
95+
"doeks_namespace"
96+
],
97+
"versionToken": "aafc39ddd462690d2a0c",
98+
"metadataLocation": "s3://55511111-7a03-4513-bumiqc8ihp8rnxymuhyz8t1ammu7ausw2b--table-s3/metadata/00004-62cc4be3-59b5-4647-a78d-1cdf69ec5ed8.metadata.json",
99+
"warehouseLocation": "s3://55511111-7a03-4513-bumiqc8ihp8rnxymuhyz8t1ammu7ausw2b--table-s3",
100+
"createdAt": "2025-01-07T22:14:48.689581+00:00",
101+
"createdBy": "<ACCOUNT_ID>",
102+
"modifiedAt": "2025-01-09T00:06:09.222917+00:00",
103+
"ownerAccountId": "<ACCOUNT_ID>",
104+
"format": "ICEBERG"
105+
}
106+
```
107+
108+
Monitor the table maintenance job status:
109+
110+
```sh
111+
aws s3tables get-table-maintenance-job-status --table-bucket-arn arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table --namespace doeks_namespace --name employee_s3_table
112+
```
113+
114+
This command provides information about Iceberg compaction, snapshot management, and unreferenced file removal processes.
115+
116+
```json
117+
{
118+
"tableARN": "arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table/table/55511111-7a03-4513-b921-e372b0030daf",
119+
"status": {
120+
"icebergCompaction": {
121+
"status": "Successful",
122+
"lastRunTimestamp": "2025-01-08T01:18:08.857000+00:00"
123+
},
124+
"icebergSnapshotManagement": {
125+
"status": "Successful",
126+
"lastRunTimestamp": "2025-01-08T22:17:08.811000+00:00"
127+
},
128+
"icebergUnreferencedFileRemoval": {
129+
"status": "Successful",
130+
"lastRunTimestamp": "2025-01-08T22:17:10.377000+00:00"
131+
}
132+
}
133+
}
134+
```
135+
136+
## Step10: Clean up
137+
138+
Delete the table.
139+
140+
```bash
141+
aws s3tables delete-table \
142+
--namespace doeks_namespace \
143+
--table-bucket-arn ${S3TABLE_ARN} \
144+
--name employee_s3_table
145+
```
146+
147+
Delete the namespace.
148+
149+
```bash
150+
aws s3tables delete-namespace \
151+
--namespace doeks_namespace \
152+
--table-bucket-arn ${S3TABLE_ARN}
153+
```
154+
155+
Finally, delete the bucket table
156+
157+
```bash
158+
aws s3tables delete-table-bucket \
159+
--region "<REGION>" \
160+
--table-bucket-arn ${S3TABLE_ARN}
161+
```
162+
163+
164+
# Conclusion
165+
You have successfully set up and run a Spark job on Amazon EKS using S3Table for data storage. This setup provides a scalable and efficient way to process large datasets using Spark on Kubernetes with the added benefits of S3Table's data management capabilities.
166+
167+
For more advanced usage, refer to the official AWS documentation on S3Table and Spark on EKS.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/bin/bash
2+
3+
# Variables
4+
output_file="employee_data.csv"
5+
num_records=100
6+
7+
# Levels array
8+
levels=("Junior" "Mid" "Senior" "Exec")
9+
10+
# Create or overwrite the CSV file with the header
11+
echo "id,name,level,salary" > $output_file
12+
13+
# Generate data
14+
for ((i=1; i<=num_records; i++))
15+
do
16+
# Generate random name
17+
name="Employee_$i"
18+
19+
# Pick a random level
20+
level=${levels[$RANDOM % ${#levels[@]}]}
21+
22+
# Generate a random salary between 50,000 and 200,000
23+
salary=$(echo "scale=2; $RANDOM/32768 * (200000 - 50000) + 50000" | bc)
24+
25+
# Append the data to the CSV file
26+
echo "$i,$name,$level,$salary" >> $output_file
27+
done
28+
29+
# Print a success message
30+
echo "Generated $num_records employee records in $output_file"

0 commit comments

Comments
 (0)