Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit f7ca555

Browse files
committed
Merge remote-tracking branch 'apache-spark-on-k8s/k8s-support-alternate-incremental' into nodeport-upload
2 parents 0b0a7ed + c3428f7 commit f7ca555

File tree

14 files changed

+121
-32
lines changed

14 files changed

+121
-32
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,8 +464,6 @@ object SparkSubmit extends CommandLineUtils {
464464
sysProp = "spark.kubernetes.namespace"),
465465
OptionAssigner(args.kubernetesUploadJars, KUBERNETES, CLUSTER,
466466
sysProp = "spark.kubernetes.driver.uploads.jars"),
467-
OptionAssigner(args.kubernetesUploadDriverExtraClasspath, KUBERNETES, CLUSTER,
468-
sysProp = "spark.kubernetes.driver.uploads.driverExtraClasspath"),
469467

470468
// Other options
471469
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
7474
// Kubernetes only
7575
var kubernetesNamespace: String = null
7676
var kubernetesUploadJars: String = null
77-
var kubernetesUploadDriverExtraClasspath: String = null
7877

7978
// Standalone cluster mode only
8079
var supervise: Boolean = false
@@ -197,9 +196,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
197196
kubernetesUploadJars = Option(kubernetesUploadJars)
198197
.orElse(sparkProperties.get("spark.kubernetes.driver.uploads.jars"))
199198
.orNull
200-
kubernetesUploadDriverExtraClasspath = Option(kubernetesUploadDriverExtraClasspath)
201-
.orElse(sparkProperties.get("spark.kubernetes.driver.uploads.driverExtraClasspath"))
202-
.orNull
203199

204200
// Try to set main class from JAR if no --class argument is given
205201
if (mainClass == null && !isPython && !isR && primaryResource != null) {
@@ -444,9 +440,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
444440
case KUBERNETES_UPLOAD_JARS =>
445441
kubernetesUploadJars = value
446442

447-
case KUBERNETES_UPLOAD_DRIVER_EXTRA_CLASSPATH =>
448-
kubernetesUploadDriverExtraClasspath = value
449-
450443
case HELP =>
451444
printUsageAndExit(0)
452445

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ class SparkSubmitOptionParser {
8080
protected final String KUBERNETES_MASTER = "--kubernetes-master";
8181
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";
8282
protected final String KUBERNETES_UPLOAD_JARS = "--upload-jars";
83-
protected final String KUBERNETES_UPLOAD_DRIVER_EXTRA_CLASSPATH =
84-
"--upload-driver-extra-classpath";
8583

8684
/**
8785
* This is the canonical list of spark-submit options. Each entry in the array contains the
@@ -124,8 +122,7 @@ class SparkSubmitOptionParser {
124122
{ TOTAL_EXECUTOR_CORES },
125123
{ KUBERNETES_MASTER },
126124
{ KUBERNETES_NAMESPACE },
127-
{ KUBERNETES_UPLOAD_JARS },
128-
{ KUBERNETES_UPLOAD_DRIVER_EXTRA_CLASSPATH }
125+
{ KUBERNETES_UPLOAD_JARS }
129126
};
130127

131128
/**

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@
223223
<PermGen>64m</PermGen>
224224
<MaxPermGen>512m</MaxPermGen>
225225
<CodeCacheSize>512m</CodeCacheSize>
226+
<extraScalaTestArgs></extraScalaTestArgs>
226227
</properties>
227228
<repositories>
228229
<repository>
@@ -2096,7 +2097,7 @@
20962097
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
20972098
<junitxml>.</junitxml>
20982099
<filereports>SparkTestSuite.txt</filereports>
2099-
<argLine>-ea -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
2100+
<argLine>-ea -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize} ${extraScalaTestArgs}</argLine>
21002101
<stderr/>
21012102
<environmentVariables>
21022103
<!--

resource-managers/kubernetes/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,20 @@ Afterwards, the integration tests can be executed with Maven or your IDE. Note t
5151
`pre-integration-test` phase must be run every time the Spark main code changes. When running tests from the
5252
command line, the `pre-integration-test` phase should automatically be invoked if the `integration-test` phase is run.
5353

54+
# Preserve the Minikube VM
55+
56+
The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine
57+
and setup a single-node kubernetes cluster within it. By default the vm is destroyed after the tests are finished.
58+
If you want to preserve the vm, e.g. to reduce the running time of tests during development, you can pass the property
59+
`spark.docker.test.persistMinikube` to the test process:
60+
61+
```sh
62+
build/mvn integration-test \
63+
-Pkubernetes -Pkubernetes-integration-tests \
64+
-pl resource-managers/kubernetes/integration-tests -am \
65+
-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true
66+
```
67+
5468
# Usage Guide
5569

5670
See the [usage guide](../../docs/running-on-kubernetes.md) for more information.

resource-managers/kubernetes/core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<name>Spark Project Kubernetes</name>
3030
<properties>
3131
<sbt.project.name>kubernetes</sbt.project.name>
32-
<kubernetes.client.version>1.4.17</kubernetes.client.version>
32+
<kubernetes.client.version>1.4.34</kubernetes.client.version>
3333
</properties>
3434

3535
<dependencies>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[spark] class Client(
5151
private val appName = sparkConf.getOption("spark.app.name")
5252
.orElse(sparkConf.getOption("spark.app.id"))
5353
.getOrElse("spark")
54-
private val kubernetesAppId = s"$appName-$launchTime"
54+
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
5555
private val secretName = s"spark-submission-server-secret-$kubernetesAppId"
5656
private val driverLauncherSelectorValue = s"driver-launcher-$launchTime"
5757
private val driverDockerImage = sparkConf.get(
@@ -97,7 +97,7 @@ private[spark] class Client(
9797

9898
val k8ClientConfig = k8ConfBuilder.build
9999
Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig))(kubernetesClient => {
100-
val secret = kubernetesClient.secrets().createNew()
100+
val applicationSubmitSecret = kubernetesClient.secrets().createNew()
101101
.withNewMetadata()
102102
.withName(secretName)
103103
.endMetadata()
@@ -114,7 +114,7 @@ private[spark] class Client(
114114
val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId"
115115
val submitPending = new AtomicBoolean(false)
116116
val podWatcher = new DriverPodWatcher(submitCompletedFuture, submitPending,
117-
kubernetesClient, driverKubernetesSelectors)
117+
kubernetesClient, applicationSubmitSecret, driverKubernetesSelectors)
118118
Utils.tryWithResource(kubernetesClient
119119
.pods()
120120
.withLabels(driverKubernetesSelectors)
@@ -128,7 +128,9 @@ private[spark] class Client(
128128
.withRestartPolicy("OnFailure")
129129
.addNewVolume()
130130
.withName(s"spark-submission-secret-volume")
131-
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
131+
.withNewSecret()
132+
.withSecretName(applicationSubmitSecret.getMetadata.getName)
133+
.endSecret()
132134
.endVolume
133135
.withServiceAccount(serviceAccount)
134136
.addNewContainer()
@@ -173,7 +175,7 @@ private[spark] class Client(
173175
}
174176
}
175177
} finally {
176-
kubernetesClient.secrets().delete(secret)
178+
kubernetesClient.secrets().delete(applicationSubmitSecret)
177179
}
178180
})
179181
}
@@ -182,6 +184,7 @@ private[spark] class Client(
182184
submitCompletedFuture: SettableFuture[Boolean],
183185
submitPending: AtomicBoolean,
184186
kubernetesClient: KubernetesClient,
187+
applicationSubmitSecret: Secret,
185188
driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] {
186189
override def eventReceived(action: Action, pod: Pod): Unit = {
187190
if ((action == Action.ADDED || action == Action.MODIFIED)
@@ -194,6 +197,17 @@ private[spark] class Client(
194197
.find(status =>
195198
status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match {
196199
case Some(_) =>
200+
val ownerRefs = Seq(new OwnerReferenceBuilder()
201+
.withName(pod.getMetadata.getName)
202+
.withUid(pod.getMetadata.getUid)
203+
.withApiVersion(pod.getApiVersion)
204+
.withKind(pod.getKind)
205+
.withController(true)
206+
.build())
207+
208+
applicationSubmitSecret.getMetadata.setOwnerReferences(ownerRefs.asJava)
209+
kubernetesClient.secrets().createOrReplace(applicationSubmitSecret)
210+
197211
val driverLauncherServicePort = new ServicePortBuilder()
198212
.withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME)
199213
.withPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT)
@@ -203,6 +217,7 @@ private[spark] class Client(
203217
.withNewMetadata()
204218
.withName(kubernetesAppId)
205219
.withLabels(driverKubernetesSelectors)
220+
.withOwnerReferences(ownerRefs.asJava)
206221
.endMetadata()
207222
.withNewSpec()
208223
.withType("NodePort")
@@ -213,6 +228,7 @@ private[spark] class Client(
213228
try {
214229
sparkConf.set("spark.kubernetes.driver.service.name",
215230
service.getMetadata.getName)
231+
sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId)
216232
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
217233
sparkConf.setIfMissing("spark.blockmanager.port",
218234
DEFAULT_BLOCKMANAGER_PORT.toString)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,6 @@ private[spark] class KubernetesSparkRestServer(
135135
} else {
136136
val tempDir = Utils.createTempDir()
137137
val appResourcePath = resolvedAppResource(appResource, tempDir)
138-
val driverClasspathDirectory = new File(tempDir, "driver-extra-classpath")
139-
if (!driverClasspathDirectory.mkdir) {
140-
throw new IllegalStateException("Failed to create driver extra classpath" +
141-
s" dir at ${driverClasspathDirectory.getAbsolutePath}")
142-
}
143138
val jarsDirectory = new File(tempDir, "jars")
144139
if (!jarsDirectory.mkdir) {
145140
throw new IllegalStateException("Failed to create jars dir at" +
@@ -173,6 +168,10 @@ private[spark] class KubernetesSparkRestServer(
173168
val driverMemory = resolvedSparkProperties.getOrElse("spark.driver.memory", "1g")
174169
command += s"-Xms$driverMemory"
175170
command += s"-Xmx$driverMemory"
171+
val extraJavaOpts = resolvedSparkProperties.get("spark.driver.extraJavaOptions")
172+
.map(Utils.splitCommandString)
173+
.getOrElse(Seq.empty)
174+
command ++= extraJavaOpts
176175
command += mainClass
177176
command ++= appArgs
178177
val pb = new ProcessBuilder(command: _*).inheritIO()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
6060
.getOrElse(
6161
throw new SparkException("Must specify the service name the driver is running with"))
6262

63+
private val kubernetesDriverPodName = conf
64+
.getOption("spark.kubernetes.driver.pod.name")
65+
.getOrElse(
66+
throw new SparkException("Must specify the driver pod name"))
67+
6368
private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
6469
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
6570

@@ -82,6 +87,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
8287
private val kubernetesClient = KubernetesClientBuilder
8388
.buildFromWithinPod(kubernetesMaster, kubernetesNamespace)
8489

90+
val driverPod = try {
91+
kubernetesClient.pods().inNamespace(kubernetesNamespace).
92+
withName(kubernetesDriverPodName).get()
93+
} catch {
94+
case throwable: Throwable =>
95+
logError(s"Executor cannot find driver pod.", throwable)
96+
throw new SparkException(s"Executor cannot find driver pod", throwable)
97+
}
98+
8599
override val minRegisteredRatio =
86100
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
87101
0.8
@@ -199,7 +213,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
199213
.withNewMetadata()
200214
.withName(name)
201215
.withLabels(selectors)
202-
.endMetadata()
216+
.withOwnerReferences()
217+
.addNewOwnerReference()
218+
.withController(true)
219+
.withApiVersion(driverPod.getApiVersion)
220+
.withKind(driverPod.getKind)
221+
.withName(driverPod.getMetadata.getName)
222+
.withUid(driverPod.getMetadata.getUid)
223+
.endOwnerReference()
224+
.endMetadata()
203225
.withNewSpec()
204226
.addNewContainer()
205227
.withName(s"exec-${applicationId()}-container")

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
FROM anapsix/alpine-java:8
1+
FROM openjdk:8-alpine
22

33
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
44
# command should be invoked from the top level directory of the Spark distribution. E.g.:
55
# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
66

7+
RUN apk upgrade --update
8+
RUN apk add --update bash
79
RUN mkdir -p /opt/spark
810
RUN touch /opt/spark/RELEASE
911

0 commit comments

Comments
 (0)