Skip to content

Commit

Permalink
Add ephemeral storage configs for spark on kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
haolinzhang committed Sep 7, 2023
1 parent 622bbf2 commit 884a774
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,34 @@ private[spark] object Config extends Logging {
.version("2.3.0")
.fallbackConf(CONTAINER_IMAGE)

val EXECUTOR_REQUEST_EPHEMERAL_STORAGE_GB =
ConfigBuilder("spark.kubernetes.executor.ephemeralStorageGB.request")
.doc("Ephemeral storage to use for the executors.")
.intConf
.checkValue(_ >= 0, "Ephemeral storage should be a positive integer")
.createWithDefault(0)

val EXECUTOR_LIMIT_EPHEMERAL_STORAGE_GB =
ConfigBuilder("spark.kubernetes.executor.ephemeralStorageGB.limit")
.doc("Ephemeral storage to use for the executors.")
.intConf
.checkValue(_ >= 0, "Ephemeral storage should be a positive integer")
.createWithDefault(0)

val DRIVER_REQUEST_EPHEMERAL_STORAGE_GB =
ConfigBuilder("spark.kubernetes.driver.ephemeralStorageGB.request")
.doc("Ephemeral storage to use for the driver.")
.intConf
.checkValue(_ >= 0, "Ephemeral storage should be a positive integer")
.createWithDefault(0)

val DRIVER_LIMIT_EPHEMERAL_STORAGE_GB =
ConfigBuilder("spark.kubernetes.driver.ephemeralStorageGB.limit")
.doc("Ephemeral storage to use for the driver.")
.intConf
.checkValue(_ >= 0, "Ephemeral storage should be a positive integer")
.createWithDefault(0)

val CONTAINER_IMAGE_PULL_POLICY =
ConfigBuilder("spark.kubernetes.container.image.pullPolicy")
.doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
// Memory settings
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)

// Ephemeral storage settings
private val driverRequestEphemeralStorageGiB = conf.get(DRIVER_REQUEST_EPHEMERAL_STORAGE_GB)
private val driverLimitEphemeralStorageGiB = conf.get(DRIVER_LIMIT_EPHEMERAL_STORAGE_GB)
// The default memory overhead factor to use, derived from the deprecated
// `spark.kubernetes.memoryOverheadFactor` config or the default overhead values.
// If the user has not set it, then use a different default for non-JVM apps. This value is
Expand Down Expand Up @@ -90,7 +93,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
("cpu", new Quantity(limitCores))
}

val driverRequestEphemeralQuantity = new Quantity(s"${driverRequestEphemeralStorageGiB}Gi")
val driverLimitEphemeralQuantity = new Quantity(s"${driverLimitEphemeralStorageGiB}Gi")
val driverResourceQuantities =
KubernetesUtils.buildResourcesQuantities(SPARK_DRIVER_PREFIX, conf.sparkConf)

Expand Down Expand Up @@ -138,7 +142,24 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.addToLimits(driverResourceQuantities.asJava)
.endResources()
.build()

val driverContainerWithEphemeralRequest = if (driverRequestEphemeralStorageGiB.equals(0)) {
driverContainer
} else {
new ContainerBuilder(driverContainer)
.editResources()
.addToRequests("ephemeral-storage", driverRequestEphemeralQuantity)
.endResources()
.build()
}
val driverContainerWithEphemeralLimit = if (driverLimitEphemeralStorageGiB.equals(0)) {
driverContainerWithEphemeralRequest
} else {
new ContainerBuilder(driverContainerWithEphemeralRequest)
.editResources()
.addToLimits("ephemeral-storage", driverLimitEphemeralQuantity)
.endResources()
.build()
}
val driverPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(driverPodName)
Expand All @@ -156,7 +177,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
conf.schedulerName
.foreach(driverPod.getSpec.setSchedulerName)

SparkPod(driverPod, driverContainer)
SparkPod(driverPod, driverContainerWithEphemeralLimit)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ private[spark] class BasicExecutorFeatureStep(
execResources.cores.get.toString
}
private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)

private val executorRequestEphemeralStorageGiB = kubernetesConf.sparkConf
.get(EXECUTOR_REQUEST_EPHEMERAL_STORAGE_GB)
private val executorLimitEphemeralStorageGiB = kubernetesConf.sparkConf
.get(EXECUTOR_LIMIT_EPHEMERAL_STORAGE_GB)
private def buildExecutorResourcesQuantities(
customResources: Set[ExecutorResourceRequest]): Map[String, Quantity] = {
customResources.map { request =>
Expand Down Expand Up @@ -118,6 +121,8 @@ private[spark] class BasicExecutorFeatureStep(

val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
val executorCpuQuantity = new Quantity(executorCoresRequest)
val executorRequestEphemeralQuantity = new Quantity(s"${executorRequestEphemeralStorageGiB}Gi")
val executorLimitEphemeralQuantity = new Quantity(s"${executorLimitEphemeralStorageGiB}Gi")
val executorResourceQuantities =
buildExecutorResourcesQuantities(execResources.customResources.values.toSet)

Expand Down Expand Up @@ -204,10 +209,28 @@ private[spark] class BasicExecutorFeatureStep(
.addAllToPorts(requiredPorts.asJava)
.addToArgs("executor")
.build()
val executorContainerWithConfVolume = if (disableConfigMap) {
val executorContainerWithEphemeralRequest = if (executorRequestEphemeralStorageGiB.equals(0)) {
executorContainer
} else {
new ContainerBuilder(executorContainer)
.editResources()
.addToRequests("ephemeral-storage", executorRequestEphemeralQuantity)
.endResources()
.build()
}
val executorContainerWithEphemeralLimit = if (executorLimitEphemeralStorageGiB.equals(0)) {
executorContainerWithEphemeralRequest
} else {
new ContainerBuilder(executorContainerWithEphemeralRequest)
.editResources()
.addToLimits("ephemeral-storage", executorLimitEphemeralQuantity)
.endResources()
.build()
}
val executorContainerWithConfVolume = if (disableConfigMap) {
executorContainerWithEphemeralLimit
} else {
new ContainerBuilder(executorContainerWithEphemeralLimit)
.addNewVolumeMount()
.withName(SPARK_CONF_VOLUME_EXEC)
.withMountPath(SPARK_CONF_DIR_INTERNAL)
Expand Down

0 comments on commit 884a774

Please sign in to comment.