Skip to content

Commit d4bc277

Browse files
victors-oaiForVic
authored andcommitted
[SPARK-54173][K8S] Add support for Deployment API on K8s
### What changes were proposed in this pull request? Adds support for K8s `Deployment` API to allocate pods. ### Why are the changes needed? Allocating individual pods is not ideal, and we can allocate with higher level APIs. #33508 helps this by adding an interface for arbitrary allocators and adds a statefulset allocator. However, dynamic allocation only works if you have implemented a PodDisruptionBudget associated with the decommission label. Since Deployment uses ReplicaSet, which supports `pod-deletion-cost` annotation, we can avoid needing to create a separate PDB resource, and allow dynamic allocation (w/ shuffle tracking) by adding a low deletion cost to executors we are scaling down. When we scale the Deployment, it will choose to scale down the pods with the low deletion cost. ### Does this PR introduce _any_ user-facing change? Yes, adds user-facing configs ``` spark.kubernetes.executor.podDeletionCost ``` ### How was this patch tested? New unit tests + passing existing unit tests + tested in a cluster with shuffle tracking and dynamic allocation enabled ### Was this patch authored or co-authored using generative AI tooling? No Closes #52867 from ForVic/dev/victors/deployment_allocator. Lead-authored-by: Victor Sunderland <[email protected]> Co-authored-by: victors-oai <[email protected]> Co-authored-by: Victor Sunderland <[email protected]> Signed-off-by: Chao Sun <[email protected]>
1 parent 67547ef commit d4bc277

File tree

11 files changed

+605
-34
lines changed

11 files changed

+605
-34
lines changed

docs/running-on-kubernetes.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,6 +1474,16 @@ See the [configuration page](configuration.html) for information on Spark config
14741474
</td>
14751475
<td>3.3.0</td>
14761476
</tr>
1477+
<tr>
1478+
<td><code>spark.kubernetes.executor.podDeletionCost</code></td>
1479+
<td>(none)</td>
1480+
<td>
1481+
Value to apply to the <code>controller.kubernetes.io/pod-deletion-cost</code> annotation
1482+
when Spark tells a deployment-based allocator to remove executor pods. Set this to steer
1483+
Kubernetes to remove the same pods that Spark selected when the deployment scales down.
1484+
</td>
1485+
<td>4.2.0</td>
1486+
</tr>
14771487
<tr>
14781488
<td><code>spark.kubernetes.executor.scheduler.name</code></td>
14791489
<td>(none)</td>
@@ -1654,10 +1664,10 @@ See the [configuration page](configuration.html) for information on Spark config
16541664
<td><code>spark.kubernetes.allocation.pods.allocator</code></td>
16551665
<td><code>direct</code></td>
16561666
<td>
1657-
Allocator to use for pods. Possible values are <code>direct</code> (the default)
1658-
and <code>statefulset</code>, or a full class name of a class implementing `AbstractPodsAllocator`.
1659-
Future version may add Job or replicaset. This is a developer API and may change
1660-
or be removed at anytime.
1667+
Allocator to use for pods. Possible values are <code>direct</code> (the default),
1668+
<code>statefulset</code>, <code>deployment</code>, or a full class name of a class
1669+
implementing `AbstractPodsAllocator`. Future version may add Job or replicaset.
1670+
This is a developer API and may change or be removed at anytime.
16611671
</td>
16621672
<td>3.3.0</td>
16631673
</tr>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
2121

2222
import org.apache.spark.deploy.k8s.Constants._
2323
import org.apache.spark.internal.Logging
24-
import org.apache.spark.internal.config.ConfigBuilder
24+
import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_ENABLED}
2525

2626
private[spark] object Config extends Logging {
2727

@@ -462,14 +462,25 @@ private[spark] object Config extends Logging {
462462

463463
val KUBERNETES_ALLOCATION_PODS_ALLOCATOR =
464464
ConfigBuilder("spark.kubernetes.allocation.pods.allocator")
465-
.doc("Allocator to use for pods. Possible values are direct (the default) and statefulset " +
466-
", or a full class name of a class implementing AbstractPodsAllocator. " +
465+
.doc("Allocator to use for pods. Possible values are direct (the default), statefulset," +
466+
" deployment, or a full class name of a class implementing AbstractPodsAllocator. " +
467467
"Future version may add Job or replicaset. This is a developer API and may change " +
468468
"or be removed at anytime.")
469469
.version("3.3.0")
470470
.stringConf
471471
.createWithDefault("direct")
472472

473+
val KUBERNETES_EXECUTOR_POD_DELETION_COST =
474+
ConfigBuilder("spark.kubernetes.executor.podDeletionCost")
475+
.doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" +
476+
" annotation when Spark asks a deployment-based allocator to remove executor pods. This " +
477+
"helps Kubernetes pick the same pods Spark selected when the deployment scales down." +
478+
s" This should only be enabled when both $KUBERNETES_ALLOCATION_PODS_ALLOCATOR is set to " +
479+
s"deployment, and $DYN_ALLOCATION_ENABLED is enabled.")
480+
.version("4.2.0")
481+
.intConf
482+
.createOptional
483+
473484
val KUBERNETES_ALLOCATION_BATCH_SIZE =
474485
ConfigBuilder("spark.kubernetes.allocation.batch.size")
475486
.doc("Number of pods to launch at once in each round of executor allocation.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.features
1818

19+
import java.util.Locale
20+
1921
import scala.jdk.CollectionConverters._
2022

2123
import io.fabric8.kubernetes.api.model._
@@ -115,12 +117,17 @@ private[spark] class BasicExecutorFeatureStep(
115117
// hostname must be no longer than `KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH`(63) characters,
116118
// so take the last 63 characters of the pod name as the hostname.
117119
// This preserves uniqueness since the end of name contains executorId
118-
val hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
120+
var hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
119121
// Remove non-word characters from the start of the hostname
120122
.replaceAll("^[^\\w]+", "")
121123
// Replace dangerous characters in the remaining string with a safe alternative.
122124
.replaceAll("[^\\w-]+", "_")
123125

126+
// Deployment resource does not support capital characters in the hostname
127+
if (kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
128+
hostname = hostname.toLowerCase(Locale.ROOT)
129+
}
130+
124131
val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
125132
val executorCpuQuantity = new Quantity(executorCoresRequest)
126133
val executorResourceQuantities =
@@ -270,7 +277,7 @@ private[spark] class BasicExecutorFeatureStep(
270277
}
271278

272279
val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
273-
case "statefulset" => "Always"
280+
case "statefulset" | "deployment" => "Always"
274281
case _ => "Never"
275282
}
276283

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.k8s
18+
19+
import java.util.concurrent.TimeUnit
20+
21+
import scala.collection.mutable
22+
import scala.jdk.CollectionConverters._
23+
24+
import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodSpecBuilder, PodTemplateSpec}
25+
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder
26+
import io.fabric8.kubernetes.client.KubernetesClient
27+
28+
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
29+
import org.apache.spark.deploy.k8s.Config._
30+
import org.apache.spark.deploy.k8s.Constants._
31+
import org.apache.spark.deploy.k8s.KubernetesConf
32+
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
33+
import org.apache.spark.internal.Logging
34+
import org.apache.spark.resource.ResourceProfile
35+
import org.apache.spark.util.{Clock, Utils}
36+
37+
/**
38+
* A pods allocator backed by Kubernetes Deployments.
39+
*
40+
* The Deployment controller honours the `controller.kubernetes.io/pod-deletion-cost`
41+
* annotation, so executors selected by Spark for removal can be prioritised when the
42+
* deployment scales down. This provides predictable downscale behaviour for dynamic
43+
* allocation that is not possible with StatefulSets which only remove pods in ordinal order.
44+
*/
45+
class DeploymentPodsAllocator(
46+
conf: SparkConf,
47+
secMgr: SecurityManager,
48+
executorBuilder: KubernetesExecutorBuilder,
49+
kubernetesClient: KubernetesClient,
50+
snapshotsStore: ExecutorPodsSnapshotsStore,
51+
clock: Clock) extends AbstractPodsAllocator() with Logging {
52+
53+
private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
54+
55+
private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
56+
57+
private val namespace = conf.get(KUBERNETES_NAMESPACE)
58+
59+
private val kubernetesDriverPodName = conf.get(KUBERNETES_DRIVER_POD_NAME)
60+
61+
val driverPod: Option[Pod] = kubernetesDriverPodName
62+
.map(name => Option(kubernetesClient.pods()
63+
.inNamespace(namespace)
64+
.withName(name)
65+
.get())
66+
.getOrElse(throw new SparkException(
67+
s"No pod was found named $name in the cluster in the " +
68+
s"namespace $namespace (this was supposed to be the driver pod.).")))
69+
70+
private var appId: String = _
71+
72+
private val deploymentsCreated = new mutable.HashSet[Int]()
73+
74+
private val podDeletionCostAnnotation = "controller.kubernetes.io/pod-deletion-cost"
75+
76+
override def start(
77+
applicationId: String,
78+
schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
79+
appId = applicationId
80+
driverPod.foreach { pod =>
81+
Utils.tryLogNonFatalError {
82+
kubernetesClient
83+
.pods()
84+
.inNamespace(namespace)
85+
.withName(pod.getMetadata.getName)
86+
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
87+
}
88+
}
89+
}
90+
91+
override def setTotalExpectedExecutors(
92+
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
93+
if (appId == null) {
94+
throw new SparkException("setTotalExpectedExecutors called before start of allocator.")
95+
}
96+
resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
97+
rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
98+
setTargetExecutorsDeployment(numExecs, appId, rp.id)
99+
}
100+
}
101+
102+
override def isDeleted(executorId: String): Boolean = false
103+
104+
private def setName(applicationId: String, resourceProfileId: Int): String = {
105+
s"spark-d-$applicationId-$resourceProfileId"
106+
}
107+
108+
private def setTargetExecutorsDeployment(
109+
expected: Int,
110+
applicationId: String,
111+
resourceProfileId: Int): Unit = {
112+
if (deploymentsCreated.contains(resourceProfileId)) {
113+
kubernetesClient
114+
.apps()
115+
.deployments()
116+
.inNamespace(namespace)
117+
.withName(setName(applicationId, resourceProfileId))
118+
.scale(expected)
119+
} else {
120+
val executorConf = KubernetesConf.createExecutorConf(
121+
conf,
122+
"EXECID",
123+
applicationId,
124+
driverPod,
125+
resourceProfileId)
126+
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(
127+
executorConf,
128+
secMgr,
129+
kubernetesClient,
130+
rpIdToResourceProfile(resourceProfileId))
131+
val executorPod = resolvedExecutorSpec.pod
132+
133+
val podSpecBuilder = executorPod.pod.getSpec match {
134+
case null => new PodSpecBuilder()
135+
case s => new PodSpecBuilder(s)
136+
}
137+
val podWithAttachedContainer: PodSpec = podSpecBuilder
138+
.addToContainers(executorPod.container)
139+
.build()
140+
141+
val meta = executorPod.pod.getMetadata
142+
val resources = resolvedExecutorSpec.executorKubernetesResources
143+
val failureMessage =
144+
"PersistentVolumeClaims are not supported with the deployment allocator. " +
145+
"Please remove PVC requirements or choose a different pods allocator."
146+
val dynamicVolumeClaims = resources.filter(_.getKind == "PersistentVolumeClaim")
147+
if (dynamicVolumeClaims.nonEmpty) {
148+
throw new SparkException(failureMessage)
149+
}
150+
val staticVolumeClaims = Option(podWithAttachedContainer.getVolumes)
151+
.map(_.asScala.filter(_.getPersistentVolumeClaim != null))
152+
.getOrElse(Seq.empty)
153+
if (staticVolumeClaims.nonEmpty) {
154+
throw new SparkException(failureMessage)
155+
}
156+
157+
val currentAnnotations = Option(meta.getAnnotations)
158+
.map(_.asScala).getOrElse(Map.empty[String, String])
159+
if (!currentAnnotations.contains(podDeletionCostAnnotation)) {
160+
val newAnnotations = currentAnnotations.concat(Seq(podDeletionCostAnnotation -> "0"))
161+
meta.setAnnotations(newAnnotations.asJava)
162+
}
163+
164+
val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer)
165+
166+
val deployment = new DeploymentBuilder()
167+
.withNewMetadata()
168+
.withName(setName(applicationId, resourceProfileId))
169+
.withNamespace(namespace)
170+
.endMetadata()
171+
.withNewSpec()
172+
.withReplicas(expected)
173+
.withNewSelector()
174+
.addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
175+
.addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
176+
.addToMatchLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, resourceProfileId.toString)
177+
.endSelector()
178+
.withTemplate(podTemplateSpec)
179+
.endSpec()
180+
.build()
181+
182+
addOwnerReference(driverPod.get, Seq(deployment))
183+
kubernetesClient.apps().deployments().inNamespace(namespace).resource(deployment).create()
184+
deploymentsCreated += resourceProfileId
185+
}
186+
}
187+
188+
override def stop(applicationId: String): Unit = {
189+
deploymentsCreated.foreach { rpid =>
190+
Utils.tryLogNonFatalError {
191+
kubernetesClient
192+
.apps()
193+
.deployments()
194+
.inNamespace(namespace)
195+
.withName(setName(applicationId, rpid))
196+
.delete()
197+
}
198+
}
199+
}
200+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import io.fabric8.kubernetes.client.Config
2222
import io.fabric8.kubernetes.client.KubernetesClient
2323

24-
import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex}
24+
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkMasterRegex}
2525
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
2626
import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
@@ -160,9 +160,19 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
160160

161161
private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient,
162162
snapshotsStore: ExecutorPodsSnapshotsStore) = {
163-
val executorPodsAllocatorName = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
163+
val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
164+
if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) &&
165+
sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) {
166+
throw new SparkException(
167+
s"Dynamic allocation with the deployment pods allocator requires " +
168+
s"'${KUBERNETES_EXECUTOR_POD_DELETION_COST.key}' to be configured.")
169+
}
170+
171+
val executorPodsAllocatorName = allocator match {
164172
case "statefulset" =>
165173
classOf[StatefulSetPodsAllocator].getName
174+
case "deployment" =>
175+
classOf[DeploymentPodsAllocator].getName
166176
case "direct" =>
167177
classOf[ExecutorPodsAllocator].getName
168178
case fullClass =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
7373

7474
private val namespace = conf.get(KUBERNETES_NAMESPACE)
7575

76+
// KEP 2255: When a Deployment or Replicaset is scaled down, the pods will be deleted in the
77+
// order of the value of this annotation, ascending.
78+
private val podDeletionCostAnnotation = "controller.kubernetes.io/pod-deletion-cost"
79+
7680
// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
7781
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
7882
removeExecutor(executorId, reason)
@@ -195,6 +199,31 @@ private[spark] class KubernetesClusterSchedulerBackend(
195199
super.getExecutorIds()
196200
}
197201

202+
private def annotateExecutorDeletionCost(execIds: Seq[String]): Unit = {
203+
conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).foreach { cost =>
204+
logInfo(s"Annotating executor pod(s) ${execIds.mkString(",")} with deletion cost $cost")
205+
val annotateTask = new Runnable() {
206+
override def run(): Unit = Utils.tryLogNonFatalError {
207+
kubernetesClient
208+
.pods()
209+
.inNamespace(namespace)
210+
.withLabel(SPARK_APP_ID_LABEL, applicationId())
211+
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
212+
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
213+
.resources()
214+
.forEach { podResource =>
215+
podResource.edit({ p: Pod =>
216+
new PodBuilder(p).editOrNewMetadata()
217+
.addToAnnotations(podDeletionCostAnnotation, cost.toString)
218+
.endMetadata()
219+
.build()})
220+
}
221+
}
222+
}
223+
executorService.execute(annotateTask)
224+
}
225+
}
226+
198227
private def labelDecommissioningExecs(execIds: Seq[String]) = {
199228
// Only kick off the labeling task if we have a label.
200229
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
@@ -228,13 +257,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
228257
// picked the pod to evict so we don't need to update the labels.
229258
if (!triggeredByExecutor) {
230259
labelDecommissioningExecs(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
260+
if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
261+
annotateExecutorDeletionCost(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
262+
}
231263
}
232264
super.decommissionExecutors(executorsAndDecomInfo, adjustTargetNumExecutors,
233265
triggeredByExecutor)
234266
}
235267

236268
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
237269
// If we've decided to remove some executors we should tell Kubernetes that we don't care.
270+
if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
271+
annotateExecutorDeletionCost(executorIds)
272+
}
238273
labelDecommissioningExecs(executorIds)
239274

240275
// Tell the executors to exit themselves.

0 commit comments

Comments
 (0)