Skip to content

Commit 1922798

Browse files
Yikundongjoon-hyun
authored andcommitted
[SPARK-37331][K8S] Add the ability to create resources before driverPod creating
### What changes were proposed in this pull request? This patch adds a new method `getAdditionalPreKubernetesResources` for `KubernetesFeatureConfigStep`. It returns any additional Kubernetes resources that should be added to support this feature and resources would be setup before driver pod creating. After this patch: - `getAdditionalPreKubernetesResources`: Devs should return resources in here when they want to create resources before pod creating - `getAdditionalKubernetesResources`: Devs should return resources in here when they can accept the resources create after pod, and spark will also help to refresh owner reference after resources created, that means if any resource is expected to refresh the owner pod reference, it should be added it here, even if it already in the getAdditionalPreKubernetesResources as same. ### Why are the changes needed? We need to setup K8S resources or extension resources before driver pod creating, and then create pod, after the pod created, the owner refernce would be owner to this Pod. These pre-resources are usually necessary in pod creation and scheduling, they should be ready before pod creation, should be deleted when user delete pod, the lifecycle of these resource is same with pod, such as the customized batch scheduler scenario, the user need to create the addtional pre resources (such as pod group in Volcano) to help the specific spark job complete batch scheduling. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #34599 from Yikun/SPARK-37331. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 897f056 commit 1922798

File tree

8 files changed

+158
-5
lines changed

8 files changed

+158
-5
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata
2020

2121
private[spark] case class KubernetesDriverSpec(
2222
pod: SparkPod,
23+
driverPreKubernetesResources: Seq[HasMetadata],
2324
driverKubernetesResources: Seq[HasMetadata],
2425
systemProperties: Map[String, String])

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,15 @@ trait KubernetesFeatureConfigStep {
7070

7171
/**
7272
* Return any additional Kubernetes resources that should be added to support this feature. Only
73-
* applicable when creating the driver in cluster mode.
73+
* applicable when creating the driver in cluster mode. Resources would be setup/refresh before
74+
* Pod creation.
75+
*/
76+
def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = Seq.empty
77+
78+
/**
79+
* Return any additional Kubernetes resources that should be added to support this feature. Only
80+
* applicable when creating the driver in cluster mode. Resources would be setup/refresh after
81+
* Pod creation.
7482
*/
7583
def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
7684
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,41 @@ private[spark] class Client(
133133
.build()
134134
val driverPodName = resolvedDriverPod.getMetadata.getName
135135

136+
// setup resources before pod creation
137+
val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources
138+
try {
139+
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
140+
} catch {
141+
case NonFatal(e) =>
142+
logError("Please check \"kubectl auth can-i create [resource]\" first." +
143+
" It should be yes. And please also check your feature step implementation.")
144+
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
145+
throw e
146+
}
147+
136148
var watch: Watch = null
137149
var createdDriverPod: Pod = null
138150
try {
139151
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
140152
} catch {
141153
case NonFatal(e) =>
154+
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
142155
logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")
143156
throw e
144157
}
158+
159+
// Refresh all pre-resources' owner references
160+
try {
161+
addOwnerReference(createdDriverPod, preKubernetesResources)
162+
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
163+
} catch {
164+
case NonFatal(e) =>
165+
kubernetesClient.pods().delete(createdDriverPod)
166+
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
167+
throw e
168+
}
169+
170+
// setup resources after pod creation, and refresh all resources' owner references
145171
try {
146172
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
147173
addOwnerReference(createdDriverPod, otherKubernetesResources)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,18 @@ private[spark] class KubernetesDriverBuilder {
5757

5858
val spec = KubernetesDriverSpec(
5959
initialPod,
60+
driverPreKubernetesResources = Seq.empty,
6061
driverKubernetesResources = Seq.empty,
6162
conf.sparkConf.getAll.toMap)
6263

6364
features.foldLeft(spec) { case (spec, feature) =>
6465
val configuredPod = feature.configurePod(spec.pod)
6566
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
67+
val addedPreResources = feature.getAdditionalPreKubernetesResources()
6668
val addedResources = feature.getAdditionalKubernetesResources()
6769
KubernetesDriverSpec(
6870
configuredPod,
71+
spec.driverPreKubernetesResources ++ addedPreResources,
6972
spec.driverKubernetesResources ++ addedResources,
7073
spec.systemProperties ++ addedSystemProperties)
7174
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ abstract class PodBuilderSuite extends SparkFunSuite {
3737

3838
protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod
3939

40-
private val baseConf = new SparkConf(false)
40+
protected val baseConf = new SparkConf(false)
4141
.set(Config.CONTAINER_IMAGE, "spark-executor:latest")
4242

4343
test("use empty initial pod if template is not specified") {
@@ -80,7 +80,7 @@ abstract class PodBuilderSuite extends SparkFunSuite {
8080
assert(exception.getMessage.contains("Could not load pod from template file."))
8181
}
8282

83-
private def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
83+
protected def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
8484
val kubernetesClient = mock(classOf[KubernetesClient])
8585
val pods =
8686
mock(classOf[MixedOperation[Pod, PodList, PodResource[Pod]]])

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
175175
}
176176
val pod = step.configurePod(SparkPod.initialPod())
177177
val props = step.getAdditionalPodSystemProperties()
178-
KubernetesDriverSpec(pod, Nil, props)
178+
KubernetesDriverSpec(pod, Nil, Nil, props)
179179
}
180180

181181
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.nio.file.Files
2323
import scala.collection.JavaConverters._
2424

2525
import io.fabric8.kubernetes.api.model._
26+
import io.fabric8.kubernetes.api.model.apiextensions.v1.{CustomResourceDefinition, CustomResourceDefinitionBuilder}
2627
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
2728
import io.fabric8.kubernetes.client.dsl.PodResource
2829
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
@@ -31,7 +32,7 @@ import org.scalatest.BeforeAndAfter
3132
import org.scalatestplus.mockito.MockitoSugar._
3233

3334
import org.apache.spark.{SparkConf, SparkFunSuite}
34-
import org.apache.spark.deploy.k8s._
35+
import org.apache.spark.deploy.k8s.{Config, _}
3536
import org.apache.spark.deploy.k8s.Constants._
3637
import org.apache.spark.deploy.k8s.Fabric8Aliases._
3738
import org.apache.spark.util.Utils
@@ -62,8 +63,17 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
6263
private val ADDITIONAL_RESOURCES = Seq(
6364
new SecretBuilder().withNewMetadata().withName("secret").endMetadata().build())
6465

66+
private val PRE_RESOURCES = Seq(
67+
new CustomResourceDefinitionBuilder().withNewMetadata().withName("preCRD").endMetadata().build()
68+
)
6569
private val BUILT_KUBERNETES_SPEC = KubernetesDriverSpec(
6670
SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER),
71+
Nil,
72+
ADDITIONAL_RESOURCES,
73+
RESOLVED_JAVA_OPTIONS)
74+
private val BUILT_KUBERNETES_SPEC_WITH_PRERES = KubernetesDriverSpec(
75+
SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER),
76+
PRE_RESOURCES,
6777
ADDITIONAL_RESOURCES,
6878
RESOLVED_JAVA_OPTIONS)
6979

@@ -118,6 +128,20 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
118128
.build()
119129
}
120130

131+
private val PRE_ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES = PRE_RESOURCES.map { crd =>
132+
new CustomResourceDefinitionBuilder(crd)
133+
.editMetadata()
134+
.addNewOwnerReference()
135+
.withName(POD_NAME)
136+
.withApiVersion(DRIVER_POD_API_VERSION)
137+
.withKind(DRIVER_POD_KIND)
138+
.withController(true)
139+
.withUid(DRIVER_POD_UID)
140+
.endOwnerReference()
141+
.endMetadata()
142+
.build()
143+
}
144+
121145
@Mock
122146
private var kubernetesClient: KubernetesClient = _
123147

@@ -192,6 +216,52 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
192216
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
193217
}
194218

219+
test("SPARK-37331: The client should create Kubernetes resources with pre resources") {
220+
val sparkConf = new SparkConf(false)
221+
.set(Config.CONTAINER_IMAGE, "spark-executor:latest")
222+
.set(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS.key,
223+
"org.apache.spark.deploy.k8s.TestStepTwo," +
224+
"org.apache.spark.deploy.k8s.TestStep")
225+
val preResKconf: KubernetesDriverConf = KubernetesTestConf.createDriverConf(
226+
sparkConf = sparkConf,
227+
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)
228+
)
229+
230+
when(driverBuilder.buildFromFeatures(preResKconf, kubernetesClient))
231+
.thenReturn(BUILT_KUBERNETES_SPEC_WITH_PRERES)
232+
val submissionClient = new Client(
233+
preResKconf,
234+
driverBuilder,
235+
kubernetesClient,
236+
loggingPodStatusWatcher)
237+
submissionClient.run()
238+
val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
239+
240+
// 2 for pre-resource creation/update, 1 for resource creation, 1 for config map
241+
assert(otherCreatedResources.size === 4)
242+
val preRes = otherCreatedResources.toArray
243+
.filter(_.isInstanceOf[CustomResourceDefinition]).toSeq
244+
245+
// Make sure pre-resource creation/owner reference as expected
246+
assert(preRes.size === 2)
247+
assert(preRes.last === PRE_ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES.head)
248+
249+
// Make sure original resource and config map process are not affected
250+
val secrets = otherCreatedResources.toArray.filter(_.isInstanceOf[Secret]).toSeq
251+
assert(secrets === ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES)
252+
val configMaps = otherCreatedResources.toArray
253+
.filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap])
254+
assert(secrets.nonEmpty)
255+
assert(configMaps.nonEmpty)
256+
val configMap = configMaps.head
257+
assert(configMap.getMetadata.getName ===
258+
KubernetesClientUtils.configMapNameDriver)
259+
assert(configMap.getImmutable())
260+
assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME))
261+
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value"))
262+
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
263+
}
264+
195265
test("All files from SPARK_CONF_DIR, " +
196266
"except templates, spark config, binary files and are within size limit, " +
197267
"should be populated to pod's configMap.") {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.submit
1818

19+
import io.fabric8.kubernetes.api.model.HasMetadata
20+
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionBuilder
1921
import io.fabric8.kubernetes.client.KubernetesClient
2022

2123
import org.apache.spark.SparkConf
2224
import org.apache.spark.deploy.k8s._
25+
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
2326
import org.apache.spark.internal.config.ConfigEntry
2427

2528
class KubernetesDriverBuilderSuite extends PodBuilderSuite {
@@ -36,4 +39,46 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite {
3639
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
3740
new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod
3841
}
42+
43+
private val ADDITION_PRE_RESOURCES = Seq(
44+
new CustomResourceDefinitionBuilder().withNewMetadata().withName("preCRD").endMetadata().build()
45+
)
46+
47+
test("SPARK-37331: check driver pre kubernetes resource, empty by default") {
48+
val sparkConf = new SparkConf(false)
49+
.set(Config.CONTAINER_IMAGE, "spark-driver:latest")
50+
val client = mockKubernetesClient()
51+
val conf = KubernetesTestConf.createDriverConf(sparkConf)
52+
val spec = new KubernetesDriverBuilder().buildFromFeatures(conf, client)
53+
assert(spec.driverPreKubernetesResources.size === 0)
54+
}
55+
56+
test("SPARK-37331: check driver pre kubernetes resource as expected") {
57+
val sparkConf = new SparkConf(false)
58+
.set(Config.CONTAINER_IMAGE, "spark-driver:latest")
59+
.set(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS.key,
60+
"org.apache.spark.deploy.k8s.submit.TestStep")
61+
val client = mockKubernetesClient()
62+
val conf = KubernetesTestConf.createDriverConf(
63+
sparkConf = sparkConf
64+
)
65+
val spec = new KubernetesDriverBuilder().buildFromFeatures(conf, client)
66+
assert(spec.driverPreKubernetesResources.size === 1)
67+
assert(spec.driverPreKubernetesResources === ADDITION_PRE_RESOURCES)
68+
}
69+
}
70+
71+
class TestStep extends KubernetesFeatureConfigStep {
72+
73+
override def configurePod(pod: SparkPod): SparkPod = {
74+
pod
75+
}
76+
77+
override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = Seq(
78+
new CustomResourceDefinitionBuilder()
79+
.withNewMetadata()
80+
.withName("preCRD")
81+
.endMetadata()
82+
.build()
83+
)
3984
}

0 commit comments

Comments
 (0)