Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit c3428f7

Browse files
foxishash211
authored andcommitted
Added GC to components (#56)
1 parent 422dceb commit c3428f7

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

resource-managers/kubernetes/core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<name>Spark Project Kubernetes</name>
3030
<properties>
3131
<sbt.project.name>kubernetes</sbt.project.name>
32-
<kubernetes.client.version>1.4.17</kubernetes.client.version>
32+
<kubernetes.client.version>1.4.34</kubernetes.client.version>
3333
</properties>
3434

3535
<dependencies>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ private[spark] class Client(
123123
.endSpec()
124124
.done()
125125
sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName)
126+
sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId)
127+
126128
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
127129
sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
128130
val submitRequest = buildSubmissionRequest()
@@ -131,6 +133,23 @@ private[spark] class Client(
131133

132134
val podWatcher = new Watcher[Pod] {
133135
override def eventReceived(action: Action, t: Pod): Unit = {
136+
if (action == Action.ADDED) {
137+
val ownerRefs = new ArrayBuffer[OwnerReference]
138+
ownerRefs += new OwnerReferenceBuilder()
139+
.withApiVersion(t.getApiVersion)
140+
.withController(true)
141+
.withKind(t.getKind)
142+
.withName(t.getMetadata.getName)
143+
.withUid(t.getMetadata.getUid)
144+
.build()
145+
146+
secret.getMetadata().setOwnerReferences(ownerRefs.asJava)
147+
kubernetesClient.secrets().createOrReplace(secret)
148+
149+
service.getMetadata().setOwnerReferences(ownerRefs.asJava)
150+
kubernetesClient.services().createOrReplace(service)
151+
}
152+
134153
if ((action == Action.ADDED || action == Action.MODIFIED)
135154
&& t.getStatus.getPhase == "Running"
136155
&& !submitCompletedFuture.isDone) {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
6060
.getOrElse(
6161
throw new SparkException("Must specify the service name the driver is running with"))
6262

63+
private val kubernetesDriverPodName = conf
64+
.getOption("spark.kubernetes.driver.pod.name")
65+
.getOrElse(
66+
throw new SparkException("Must specify the driver pod name"))
67+
6368
private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
6469
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
6570

@@ -82,6 +87,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
8287
private val kubernetesClient = KubernetesClientBuilder
8388
.buildFromWithinPod(kubernetesMaster, kubernetesNamespace)
8489

90+
val driverPod = try {
91+
kubernetesClient.pods().inNamespace(kubernetesNamespace).
92+
withName(kubernetesDriverPodName).get()
93+
} catch {
94+
case throwable: Throwable =>
95+
logError(s"Executor cannot find driver pod.", throwable)
96+
throw new SparkException(s"Executor cannot find driver pod", throwable)
97+
}
98+
8599
override val minRegisteredRatio =
86100
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
87101
0.8
@@ -202,7 +216,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
202216
.withNewMetadata()
203217
.withName(name)
204218
.withLabels(selectors)
205-
.endMetadata()
219+
.withOwnerReferences()
220+
.addNewOwnerReference()
221+
.withController(true)
222+
.withApiVersion(driverPod.getApiVersion)
223+
.withKind(driverPod.getKind)
224+
.withName(driverPod.getMetadata.getName)
225+
.withUid(driverPod.getMetadata.getUid)
226+
.endOwnerReference()
227+
.endMetadata()
206228
.withNewSpec()
207229
.addNewContainer()
208230
.withName(s"exec-${applicationId()}-container")

0 commit comments

Comments
 (0)