Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 9822155

Browse files
committed
Add a unit test for the submission client.
1 parent 5e58c80 commit 9822155

File tree

8 files changed

+248
-95
lines changed

8 files changed

+248
-95
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -621,17 +621,12 @@ object SparkSubmit {
621621
if (isKubernetesCluster) {
622622
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
623623
if (args.isPython) {
624-
childArgs += "--py-file"
625-
childArgs += args.primaryResource
626-
childArgs += "--main-class"
627-
childArgs += "org.apache.spark.deploy.PythonRunner"
628-
childArgs += "--other-py-files"
629-
childArgs += args.pyFiles
624+
childArgs ++= Array("--primary-py-file", args.primaryResource)
625+
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
626+
childArgs ++= Array("--other-py-files", args.pyFiles)
630627
} else {
631-
childArgs += "--primary-java-resource"
632-
childArgs += args.primaryResource
633-
childArgs += "--main-class"
634-
childArgs += args.mainClass
628+
childArgs ++= Array("--primary-java-resource", args.primaryResource)
629+
childArgs ++= Array("--main-class", args.mainClass)
635630
}
636631
args.childArgs.foreach { arg =>
637632
childArgs += "--arg"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[spark] object ClientArguments {
4343
var mainClass: Option[String] = None
4444
val driverArgs = mutable.Buffer.empty[String]
4545
args.sliding(2).toList.collect {
46-
case Array("--py-file", mainPyFile: String) =>
46+
case Array("--primary-py-file", mainPyFile: String) =>
4747
mainAppResource = Some(PythonMainAppResource(mainPyFile))
4848
case Array("--primary-java-resource", primaryJavaResource: String) =>
4949
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
@@ -79,17 +79,25 @@ private[spark] class Client(
7979
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
8080

8181
def run(): Unit = {
82-
var currentDriverSpec = new KubernetesDriverSpec(
83-
driverPod = new PodBuilder().build(),
82+
// Set new metadata and a new spec so that submission steps can use PodBuilder#editMetadata()
83+
// and/or PodBuilder#editSpec() safely.
84+
val basePod = new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
85+
var currentDriverSpec = KubernetesDriverSpec(
86+
driverPod = basePod,
8487
driverContainer = new ContainerBuilder().build(),
8588
driverSparkConf = submissionSparkConf.clone(),
8689
otherKubernetesResources = Seq.empty[HasMetadata])
8790
for (nextStep <- submissionSteps) {
8891
currentDriverSpec = nextStep.prepareSubmission(currentDriverSpec)
8992
}
90-
val resolvedDriverJavaOpts = currentDriverSpec.driverSparkConf.getAll.map {
91-
case (confKey, confValue) => s"-D$confKey=$confValue"
92-
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
93+
val resolvedDriverJavaOpts = currentDriverSpec
94+
.driverSparkConf
95+
// We don't need this anymore since we just set the JVM options on the environment
96+
.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
97+
.getAll
98+
.map {
99+
case (confKey, confValue) => s"-D$confKey=$confValue"
100+
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
93101
val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
94102
.addNewEnv()
95103
.withName(ENV_DRIVER_JAVA_OPTS)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ExecutorInitContainerConfiguration.scala

Lines changed: 0 additions & 47 deletions
This file was deleted.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseSubmissionStep.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private[spark] class BaseSubmissionStep(
101101
.endResources()
102102
.build()
103103
val baseDriverPod = new PodBuilder(driverSpec.driverPod)
104-
.withNewMetadata()
104+
.editOrNewMetadata()
105105
.withName(kubernetesDriverPodName)
106106
.addToLabels(driverLabels.asJava)
107107
.addToAnnotations(getAllDriverAnnotations(submissionSparkConf).asJava)
@@ -114,8 +114,6 @@ private[spark] class BaseSubmissionStep(
114114
.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
115115
.set("spark.app.id", kubernetesAppId)
116116
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, kubernetesResourceNamePrefix)
117-
// We don't need this anymore since we just set the JVM options on the environment
118-
.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
119117
driverSpec.copy(
120118
driverPod = baseDriverPod,
121119
driverSparkConf = resolvedSparkConf,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
433433
.withPorts(requiredPorts.asJava)
434434
.build()
435435

436-
437436
val executorPod = new PodBuilder()
438437
.withNewMetadata()
439438
.withName(name)
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit
18+
19+
import com.google.common.collect.Iterables
20+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder}
21+
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
22+
import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable, PodResource, Resource}
23+
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
24+
import org.mockito.Mockito.{doReturn, verify, when}
25+
import org.mockito.invocation.InvocationOnMock
26+
import org.mockito.stubbing.Answer
27+
import org.scalatest.BeforeAndAfter
28+
import org.scalatest.mock.MockitoSugar._
29+
import scala.collection.JavaConverters._
30+
31+
import org.apache.spark.{SparkConf, SparkFunSuite}
32+
import org.apache.spark.deploy.kubernetes.constants._
33+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{KubernetesDriverSpec, KubernetesSubmissionStep}
34+
35+
class ClientSuite extends SparkFunSuite with BeforeAndAfter {
36+
37+
private val DRIVER_POD_UID = "pod-id"
38+
private val DRIVER_POD_API_VERSION = "v1"
39+
private val DRIVER_POD_KIND = "pod"
40+
41+
private type ResourceList = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
42+
HasMetadata, Boolean]
43+
private type Pods = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
44+
45+
@Mock
46+
private var kubernetesClient: KubernetesClient = _
47+
48+
@Mock
49+
private var podOperations: Pods = _
50+
51+
@Mock
52+
private var namedPods: PodResource[Pod, DoneablePod] = _
53+
54+
@Mock
55+
private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
56+
57+
@Mock
58+
private var resourceList: ResourceList = _
59+
60+
private val firstSubmissionStep = new FirstTestSubmissionStep
61+
private val secondSubmissionStep = new SecondTestSubmissionStep
62+
private val submissionSteps = Seq(firstSubmissionStep, secondSubmissionStep)
63+
private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
64+
private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _
65+
66+
before {
67+
MockitoAnnotations.initMocks(this)
68+
when(kubernetesClient.pods()).thenReturn(podOperations)
69+
when(podOperations.withName(firstSubmissionStep.podName)).thenReturn(namedPods)
70+
71+
createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod])
72+
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
73+
when(podOperations.create(createdPodArgumentCaptor.capture())).thenAnswer(new Answer[Pod] {
74+
override def answer(invocation: InvocationOnMock): Pod = {
75+
new PodBuilder(invocation.getArgumentAt(0, classOf[Pod]))
76+
.editMetadata()
77+
.withUid(DRIVER_POD_UID)
78+
.endMetadata()
79+
.withApiVersion(DRIVER_POD_API_VERSION)
80+
.withKind(DRIVER_POD_KIND)
81+
.build()
82+
}
83+
})
84+
when(podOperations.withName(firstSubmissionStep.podName)).thenReturn(namedPods)
85+
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
86+
doReturn(resourceList)
87+
.when(kubernetesClient)
88+
.resourceList(createdResourcesArgumentCaptor.capture())
89+
}
90+
91+
test("The client should configure the pod with the submission steps.") {
92+
val submissionClient = new Client(
93+
submissionSteps,
94+
new SparkConf(false),
95+
kubernetesClient,
96+
false,
97+
"spark",
98+
loggingPodStatusWatcher)
99+
submissionClient.run()
100+
val createdPod = createdPodArgumentCaptor.getValue
101+
assert(createdPod.getMetadata.getName === firstSubmissionStep.podName)
102+
assert(createdPod.getMetadata.getLabels.asScala ===
103+
Map(firstSubmissionStep.labelKey -> firstSubmissionStep.labelValue))
104+
assert(createdPod.getMetadata.getAnnotations.asScala ===
105+
Map(secondSubmissionStep.annotationKey -> secondSubmissionStep.annotationValue))
106+
assert(createdPod.getSpec.getContainers.size() === 1)
107+
assert(createdPod.getSpec.getContainers.get(0).getName ===
108+
secondSubmissionStep.containerName)
109+
}
110+
111+
test("The client should create the secondary Kubernetes resources.") {
112+
val submissionClient = new Client(
113+
submissionSteps,
114+
new SparkConf(false),
115+
kubernetesClient,
116+
false,
117+
"spark",
118+
loggingPodStatusWatcher)
119+
submissionClient.run()
120+
val createdPod = createdPodArgumentCaptor.getValue
121+
val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
122+
assert(otherCreatedResources.size === 1)
123+
val createdResource = Iterables.getOnlyElement(otherCreatedResources).asInstanceOf[Secret]
124+
assert(createdResource.getMetadata.getName === firstSubmissionStep.secretName)
125+
assert(createdResource.getData.asScala ===
126+
Map(firstSubmissionStep.secretKey -> firstSubmissionStep.secretData))
127+
val ownerReference = Iterables.getOnlyElement(createdResource.getMetadata.getOwnerReferences)
128+
assert(ownerReference.getName === createdPod.getMetadata.getName)
129+
assert(ownerReference.getKind === DRIVER_POD_KIND)
130+
assert(ownerReference.getUid === DRIVER_POD_UID)
131+
assert(ownerReference.getApiVersion === DRIVER_POD_API_VERSION)
132+
}
133+
134+
test("The client should attach the driver container with the appropriate JVM options.") {
135+
val sparkConf = new SparkConf(false)
136+
.set("spark.logConf", "true")
137+
.set(
138+
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS,
139+
"-XX:+|-HeapDumpOnOutOfMemoryError")
140+
val submissionClient = new Client(
141+
submissionSteps,
142+
sparkConf,
143+
kubernetesClient,
144+
false,
145+
"spark",
146+
loggingPodStatusWatcher)
147+
submissionClient.run()
148+
val createdPod = createdPodArgumentCaptor.getValue
149+
val driverContainer = Iterables.getOnlyElement(createdPod.getSpec.getContainers)
150+
assert(driverContainer.getName === secondSubmissionStep.containerName)
151+
val driverJvmOptsEnv = Iterables.getOnlyElement(driverContainer.getEnv)
152+
assert(driverJvmOptsEnv.getName === ENV_DRIVER_JAVA_OPTS)
153+
val driverJvmOpts = driverJvmOptsEnv.getValue.split(" ").toSet
154+
assert(driverJvmOpts.contains("-Dspark.logConf=true"))
155+
assert(driverJvmOpts.contains(
156+
s"-D${secondSubmissionStep.sparkConfKey}=${secondSubmissionStep.sparkConfValue}"))
157+
assert(driverJvmOpts.contains(
158+
"-XX:+|-HeapDumpOnOutOfMemoryError"))
159+
}
160+
161+
test("Waiting for app completion should stall on the watcher") {
162+
val submissionClient = new Client(
163+
submissionSteps,
164+
new SparkConf(false),
165+
kubernetesClient,
166+
true,
167+
"spark",
168+
loggingPodStatusWatcher)
169+
submissionClient.run()
170+
verify(loggingPodStatusWatcher).awaitCompletion()
171+
}
172+
173+
private class FirstTestSubmissionStep extends KubernetesSubmissionStep {
174+
175+
val podName = "test-pod"
176+
val secretName = "test-secret"
177+
val labelKey = "first-submit"
178+
val labelValue = "true"
179+
val secretKey = "secretKey"
180+
val secretData = "secretData"
181+
182+
override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
183+
val modifiedPod = new PodBuilder(driverSpec.driverPod)
184+
.editMetadata()
185+
.withName(podName)
186+
.addToLabels(labelKey, labelValue)
187+
.endMetadata()
188+
.build()
189+
val additionalResource = new SecretBuilder()
190+
.withNewMetadata()
191+
.withName(secretName)
192+
.endMetadata()
193+
.addToData(secretKey, secretData)
194+
.build()
195+
driverSpec.copy(
196+
driverPod = modifiedPod,
197+
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(additionalResource))
198+
}
199+
}
200+
201+
private class SecondTestSubmissionStep extends KubernetesSubmissionStep {
202+
203+
val annotationKey = "second-submit"
204+
val annotationValue = "submitted"
205+
val sparkConfKey = "spark.custom-conf"
206+
val sparkConfValue = "custom-conf-value"
207+
val containerName = "driverContainer"
208+
209+
override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
210+
val modifiedPod = new PodBuilder(driverSpec.driverPod)
211+
.editMetadata()
212+
.addToAnnotations(annotationKey, annotationValue)
213+
.endMetadata()
214+
.build()
215+
val resolvedSparkConf = driverSpec.driverSparkConf.clone().set(sparkConfKey, sparkConfValue)
216+
val modifiedContainer = new ContainerBuilder(driverSpec.driverContainer)
217+
.withName(containerName)
218+
.build()
219+
driverSpec.copy(
220+
driverPod = modifiedPod,
221+
driverSparkConf = resolvedSparkConf,
222+
driverContainer = modifiedContainer)
223+
}
224+
}
225+
}
226+
227+

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala

Lines changed: 0 additions & 27 deletions
This file was deleted.

0 commit comments

Comments
 (0)