Skip to content

Commit 4bab48b

Browse files
committed
Allow driver pod name to be optional.
1 parent 19618aa commit 4bab48b

File tree

6 files changed

+31
-24
lines changed

6 files changed

+31
-24
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[spark] case class KubernetesDriverSpecificConf(
4343
*/
4444
private[spark] case class KubernetesExecutorSpecificConf(
4545
executorId: String,
46-
driverPod: Pod)
46+
driverPod: Option[Pod])
4747
extends KubernetesRoleSpecificConf
4848

4949
/**
@@ -178,7 +178,7 @@ private[spark] object KubernetesConf {
178178
sparkConf: SparkConf,
179179
executorId: String,
180180
appId: String,
181-
driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
181+
driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = {
182182
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
183183
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
184184
require(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
21+
import io.fabric8.kubernetes.api.model._
2222

2323
import org.apache.spark.SparkException
2424
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
@@ -152,19 +152,20 @@ private[spark] class BasicExecutorFeatureStep(
152152
.build()
153153
}.getOrElse(executorContainer)
154154
val driverPod = kubernetesConf.roleSpecificConf.driverPod
155+
val ownerReference = driverPod.map(pod =>
156+
new OwnerReferenceBuilder()
157+
.withController(true)
158+
.withApiVersion(pod.getApiVersion)
159+
.withKind(pod.getKind)
160+
.withName(pod.getMetadata.getName)
161+
.withUid(pod.getMetadata.getUid)
162+
.build())
155163
val executorPod = new PodBuilder(pod.pod)
156164
.editOrNewMetadata()
157165
.withName(name)
158166
.withLabels(kubernetesConf.roleLabels.asJava)
159167
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
160-
.withOwnerReferences()
161-
.addNewOwnerReference()
162-
.withController(true)
163-
.withApiVersion(driverPod.getApiVersion)
164-
.withKind(driverPod.getKind)
165-
.withName(driverPod.getMetadata.getName)
166-
.withUid(driverPod.getMetadata.getUid)
167-
.endOwnerReference()
168+
.addToOwnerReferences(ownerReference.toSeq: _*)
168169
.endMetadata()
169170
.editOrNewSpec()
170171
.withHostname(hostname)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ private[spark] class ExecutorPodsAllocator(
5050

5151
private val kubernetesDriverPodName = conf
5252
.get(KUBERNETES_DRIVER_POD_NAME)
53-
.getOrElse(throw new SparkException("Must specify the driver pod name"))
54-
55-
private val driverPod = Option(kubernetesClient.pods()
56-
.withName(kubernetesDriverPodName)
57-
.get())
58-
.getOrElse(throw new SparkException(
59-
s"No pod was found named $kubernetesDriverPodName in the cluster in the" +
60-
s" namespace $namespace (this was supposed to be the driver pod.)."))
53+
54+
private val driverPod = kubernetesDriverPodName
55+
.map(name => Option(kubernetesClient.pods()
56+
.withName(name)
57+
.get())
58+
.getOrElse(throw new SparkException(
59+
s"No pod was found named $kubernetesDriverPodName in the cluster in the " +
60+
s"namespace $namespace (this was supposed to be the driver pod.).")))
6161

6262
// Executor IDs that have been requested from Kubernetes but have not been detected in any
6363
// snapshot yet. Mapped to the timestamp when they were created.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
5050
KUBERNETES_AUTH_CLIENT_MODE_PREFIX
5151
}
5252

53+
val apiServerUrl = if (wasSparkSubmitted) {
54+
KUBERNETES_MASTER_INTERNAL_URL
55+
} else {
56+
masterURL.substring("k8s://".length())
57+
}
58+
5359
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
5460
KUBERNETES_MASTER_INTERNAL_URL,
5561
Some(sc.conf.get(KUBERNETES_NAMESPACE)),

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class BasicExecutorFeatureStepSuite
8181
val step = new BasicExecutorFeatureStep(
8282
KubernetesConf(
8383
baseConf,
84-
KubernetesExecutorSpecificConf("1", DRIVER_POD),
84+
KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
8585
RESOURCE_NAME_PREFIX,
8686
APP_ID,
8787
LABELS,
@@ -120,7 +120,7 @@ class BasicExecutorFeatureStepSuite
120120
val step = new BasicExecutorFeatureStep(
121121
KubernetesConf(
122122
conf,
123-
KubernetesExecutorSpecificConf("1", DRIVER_POD),
123+
KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
124124
longPodNamePrefix,
125125
APP_ID,
126126
LABELS,
@@ -140,7 +140,7 @@ class BasicExecutorFeatureStepSuite
140140
val step = new BasicExecutorFeatureStep(
141141
KubernetesConf(
142142
conf,
143-
KubernetesExecutorSpecificConf("1", DRIVER_POD),
143+
KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
144144
RESOURCE_NAME_PREFIX,
145145
APP_ID,
146146
LABELS,

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite
190190
.withImage(image)
191191
.withImagePullPolicy("IfNotPresent")
192192
.withCommand("/opt/spark/bin/run-example")
193-
.addToArgs("--master", s"k8s://${kubernetesTestComponents.clientConfig.getMasterUrl}")
193+
.addToArgs("--master", s"k8s://https://kubernetes.default.svc")
194194
.addToArgs("--deploy-mode", "client")
195195
.addToArgs(
196196
"--conf",
197197
s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}")
198-
.addToArgs("--conf", "spark.kubernetes.driver.pod.name=driverPodName")
198+
.addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName")
199199
.addToArgs("--conf", "spark.executor.memory=500m")
200200
.addToArgs("--conf", "spark.executor.cores=1")
201201
.addToArgs("--conf", "spark.executor.instances=1")

0 commit comments

Comments
 (0)