Skip to content

Commit 452f8f1

Browse files
mccheahash211
authored andcommitted
Use a separate class to track components that need to be cleaned up (apache-spark-on-k8s#122)
* Refactor the cleaning up of Kubernetes components. Create a KubernetesComponentsCleaner which can register arbitrary pods, services, secrets, and ingresses. When an exception is thrown or the JVM shuts down, the cleaner automatically purges any of its registered components from Kubernetes. The components can be unregistered when the driver successfully begins running, so that the application persists beyond the lifetime of the spark-submit process. * Fix spacing * Address comments * Fix compiler error * Pull KubernetesComponentCleaner into instance variable * Remove a parameter * Remove redundant registerOrUpdateSecret for SSL * Remove Ingresses from component cleaner * Clear resources generically as opposed to specifying each type * Remove incorrect test assertion * Rename variable
1 parent 913a60e commit 452f8f1

File tree

2 files changed

+152
-125
lines changed

2 files changed

+152
-125
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 100 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.deploy.kubernetes.constants._
3838
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
3939
import org.apache.spark.deploy.rest.kubernetes._
4040
import org.apache.spark.internal.Logging
41-
import org.apache.spark.util.Utils
41+
import org.apache.spark.util.{ShutdownHookManager, Utils}
4242

4343
private[spark] class Client(
4444
sparkConf: SparkConf,
@@ -79,6 +79,8 @@ private[spark] class Client(
7979
private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
8080
private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS)
8181

82+
private val kubernetesResourceCleaner = new KubernetesResourceCleaner
83+
8284
def run(): Unit = {
8385
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
8486
val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
@@ -111,97 +113,73 @@ private[spark] class Client(
111113

112114
val k8ClientConfig = k8ConfBuilder.build
113115
Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient =>
116+
ShutdownHookManager.addShutdownHook(() =>
117+
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
114118
val submitServerSecret = kubernetesClient.secrets().createNew()
115119
.withNewMetadata()
116120
.withName(secretName)
117121
.endMetadata()
118122
.withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava)
119123
.withType("Opaque")
120124
.done()
125+
kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret)
121126
try {
122-
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient,
127+
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(
128+
kubernetesClient,
123129
driverSubmitSslOptions,
124130
isKeyStoreLocalFile)
125-
try {
126-
// start outer watch for status logging of driver pod
127-
val driverPodCompletedLatch = new CountDownLatch(1)
128-
// only enable interval logging if in waitForAppCompletion mode
129-
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
130-
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
131-
loggingInterval)
132-
Utils.tryWithResource(kubernetesClient
133-
.pods()
134-
.withName(kubernetesAppId)
135-
.watch(loggingWatch)) { _ =>
136-
val (driverPod, driverService) = launchDriverKubernetesComponents(
137-
kubernetesClient,
138-
parsedCustomLabels,
139-
submitServerSecret,
140-
driverSubmitSslOptions,
141-
sslSecrets,
142-
sslVolumes,
143-
sslVolumeMounts,
144-
sslEnvs,
145-
isKeyStoreLocalFile)
146-
val ownerReferenceConfiguredDriverService = try {
147-
configureOwnerReferences(
148-
kubernetesClient,
149-
submitServerSecret,
150-
sslSecrets,
151-
driverPod,
152-
driverService)
153-
} catch {
154-
case e: Throwable =>
155-
cleanupPodAndService(kubernetesClient, driverPod, driverService)
156-
throw new SparkException("Failed to set owner references to the driver pod.", e)
157-
}
158-
try {
159-
submitApplicationToDriverServer(kubernetesClient, driverSubmitSslOptions,
160-
ownerReferenceConfiguredDriverService, submitterLocalFiles, submitterLocalJars)
161-
// wait if configured to do so
162-
if (waitForAppCompletion) {
163-
logInfo(s"Waiting for application $kubernetesAppId to finish...")
164-
driverPodCompletedLatch.await()
165-
logInfo(s"Application $kubernetesAppId finished.")
166-
} else {
167-
logInfo(s"Application $kubernetesAppId successfully launched.")
168-
}
169-
} catch {
170-
case e: Throwable =>
171-
cleanupPodAndService(kubernetesClient, driverPod,
172-
ownerReferenceConfiguredDriverService)
173-
throw new SparkException("Failed to submit the application to the driver pod.", e)
174-
}
175-
}
176-
} finally {
177-
Utils.tryLogNonFatalError {
178-
// Secrets may have been mutated so delete by name to avoid problems with not having
179-
// the latest version.
180-
sslSecrets.foreach { secret =>
181-
kubernetesClient.secrets().withName(secret.getMetadata.getName).delete()
182-
}
131+
// start outer watch for status logging of driver pod
132+
val driverPodCompletedLatch = new CountDownLatch(1)
133+
// only enable interval logging if in waitForAppCompletion mode
134+
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
135+
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
136+
loggingInterval)
137+
Utils.tryWithResource(kubernetesClient
138+
.pods()
139+
.withName(kubernetesAppId)
140+
.watch(loggingWatch)) { _ =>
141+
val (driverPod, driverService) = launchDriverKubernetesComponents(
142+
kubernetesClient,
143+
parsedCustomLabels,
144+
submitServerSecret,
145+
driverSubmitSslOptions,
146+
sslSecrets,
147+
sslVolumes,
148+
sslVolumeMounts,
149+
sslEnvs,
150+
isKeyStoreLocalFile)
151+
configureOwnerReferences(
152+
kubernetesClient,
153+
submitServerSecret,
154+
sslSecrets,
155+
driverPod,
156+
driverService)
157+
submitApplicationToDriverServer(
158+
kubernetesClient,
159+
driverSubmitSslOptions,
160+
driverService,
161+
submitterLocalFiles,
162+
submitterLocalJars)
163+
// Now that the application has started, persist the components that were created beyond
164+
// the shutdown hook. We still want to purge the one-time secrets, so do not unregister
165+
// those.
166+
kubernetesResourceCleaner.unregisterResource(driverPod)
167+
kubernetesResourceCleaner.unregisterResource(driverService)
168+
// wait if configured to do so
169+
if (waitForAppCompletion) {
170+
logInfo(s"Waiting for application $kubernetesAppId to finish...")
171+
driverPodCompletedLatch.await()
172+
logInfo(s"Application $kubernetesAppId finished.")
173+
} else {
174+
logInfo(s"Application $kubernetesAppId successfully launched.")
183175
}
184176
}
185177
} finally {
186-
Utils.tryLogNonFatalError {
187-
kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).delete()
188-
}
178+
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)
189179
}
190180
}
191181
}
192182

193-
private def cleanupPodAndService(
194-
kubernetesClient: KubernetesClient,
195-
driverPod: Pod,
196-
driverService: Service): Unit = {
197-
Utils.tryLogNonFatalError {
198-
kubernetesClient.services().delete(driverService)
199-
}
200-
Utils.tryLogNonFatalError {
201-
kubernetesClient.pods().delete(driverPod)
202-
}
203-
}
204-
205183
private def submitApplicationToDriverServer(
206184
kubernetesClient: KubernetesClient,
207185
driverSubmitSslOptions: SSLOptions,
@@ -237,11 +215,13 @@ private[spark] class Client(
237215
.withPort(uiPort)
238216
.withNewTargetPort(uiPort)
239217
.build()
240-
kubernetesClient.services().withName(kubernetesAppId).edit().editSpec()
241-
.withType(uiServiceType)
242-
.withPorts(uiServicePort)
243-
.endSpec()
218+
val resolvedService = kubernetesClient.services().withName(kubernetesAppId).edit()
219+
.editSpec()
220+
.withType(uiServiceType)
221+
.withPorts(uiServicePort)
222+
.endSpec()
244223
.done()
224+
kubernetesResourceCleaner.registerOrUpdateResource(resolvedService)
245225
logInfo("Finished submitting application to Kubernetes.")
246226
}
247227

@@ -282,37 +262,19 @@ private[spark] class Client(
282262
kubernetesClient,
283263
driverKubernetesSelectors,
284264
submitServerSecret)
285-
val driverPod = try {
286-
createDriverPod(
287-
kubernetesClient,
288-
driverKubernetesSelectors,
289-
submitServerSecret,
290-
driverSubmitSslOptions,
291-
sslVolumes,
292-
sslVolumeMounts,
293-
sslEnvs)
294-
} catch {
295-
case e: Throwable =>
296-
Utils.tryLogNonFatalError {
297-
kubernetesClient.services().delete(driverService)
298-
}
299-
throw new SparkException("Failed to create the driver pod.", e)
300-
}
301-
try {
302-
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
303-
serviceReadyFuture, podReadyFuture)
304-
(driverPod, driverService)
305-
} catch {
306-
case e: Throwable =>
307-
Utils.tryLogNonFatalError {
308-
kubernetesClient.services().delete(driverService)
309-
}
310-
Utils.tryLogNonFatalError {
311-
kubernetesClient.pods().delete(driverPod)
312-
}
313-
throw new SparkException("Timed out while waiting for a Kubernetes component to be" +
314-
" ready.", e)
315-
}
265+
kubernetesResourceCleaner.registerOrUpdateResource(driverService)
266+
val driverPod = createDriverPod(
267+
kubernetesClient,
268+
driverKubernetesSelectors,
269+
submitServerSecret,
270+
driverSubmitSslOptions,
271+
sslVolumes,
272+
sslVolumeMounts,
273+
sslEnvs)
274+
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
275+
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
276+
serviceReadyFuture, podReadyFuture)
277+
(driverPod, driverService)
316278
}
317279
}
318280
}
@@ -338,22 +300,32 @@ private[spark] class Client(
338300
.withController(true)
339301
.build()
340302
sslSecrets.foreach(secret => {
341-
kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
303+
val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
342304
.editMetadata()
343305
.addToOwnerReferences(driverPodOwnerRef)
344306
.endMetadata()
345307
.done()
308+
kubernetesResourceCleaner.registerOrUpdateResource(updatedSecret)
346309
})
347-
kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).edit()
348-
.editMetadata()
349-
.addToOwnerReferences(driverPodOwnerRef)
350-
.endMetadata()
351-
.done()
352-
kubernetesClient.services().withName(driverService.getMetadata.getName).edit()
353-
.editMetadata()
354-
.addToOwnerReferences(driverPodOwnerRef)
355-
.endMetadata()
356-
.done()
310+
val updatedSubmitServerSecret = kubernetesClient
311+
.secrets()
312+
.withName(submitServerSecret.getMetadata.getName)
313+
.edit()
314+
.editMetadata()
315+
.addToOwnerReferences(driverPodOwnerRef)
316+
.endMetadata()
317+
.done()
318+
kubernetesResourceCleaner.registerOrUpdateResource(updatedSubmitServerSecret)
319+
val updatedService = kubernetesClient
320+
.services()
321+
.withName(driverService.getMetadata.getName)
322+
.edit()
323+
.editMetadata()
324+
.addToOwnerReferences(driverPodOwnerRef)
325+
.endMetadata()
326+
.done()
327+
kubernetesResourceCleaner.registerOrUpdateResource(updatedService)
328+
updatedService
357329
}
358330

359331
private def waitForReadyKubernetesComponents(
@@ -417,7 +389,7 @@ private[spark] class Client(
417389
driverSubmitSslOptions: SSLOptions,
418390
sslVolumes: Array[Volume],
419391
sslVolumeMounts: Array[VolumeMount],
420-
sslEnvs: Array[EnvVar]) = {
392+
sslEnvs: Array[EnvVar]): Pod = {
421393
val containerPorts = buildContainerPorts()
422394
val probePingHttpGet = new HTTPGetActionBuilder()
423395
.withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP")
@@ -537,9 +509,11 @@ private[spark] class Client(
537509
(securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
538510
}
539511

540-
private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions,
541-
isKeyStoreLocalFile: Boolean):
542-
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
512+
private def configureSsl(
513+
kubernetesClient: KubernetesClient,
514+
driverSubmitSslOptions: SSLOptions,
515+
isKeyStoreLocalFile: Boolean):
516+
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
543517
if (driverSubmitSslOptions.enabled) {
544518
val sslSecretsMap = mutable.HashMap[String, String]()
545519
val sslEnvs = mutable.Buffer[EnvVar]()
@@ -606,6 +580,7 @@ private[spark] class Client(
606580
.withData(sslSecretsMap.asJava)
607581
.withType("Opaque")
608582
.done()
583+
kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets)
609584
secrets += sslSecrets
610585
(sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
611586
} else {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.HasMetadata
20+
import io.fabric8.kubernetes.client.KubernetesClient
21+
import scala.collection.mutable
22+
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.util.Utils
25+
26+
private[spark] class KubernetesResourceCleaner
27+
extends Logging {
28+
29+
private val resources = mutable.HashMap.empty[(String, String), HasMetadata]
30+
31+
// Synchronized because deleteAllRegisteredResourcesFromKubernetes may be called from a
32+
// shutdown hook
33+
def registerOrUpdateResource(resource: HasMetadata): Unit = synchronized {
34+
resources.put((resource.getMetadata.getName, resource.getKind), resource)
35+
}
36+
37+
def unregisterResource(resource: HasMetadata): Unit = synchronized {
38+
resources.remove((resource.getMetadata.getName, resource.getKind))
39+
}
40+
41+
def deleteAllRegisteredResourcesFromKubernetes(kubernetesClient: KubernetesClient): Unit = {
42+
synchronized {
43+
logInfo(s"Deleting ${resources.size} registered Kubernetes resources:")
44+
resources.values.foreach { resource =>
45+
Utils.tryLogNonFatalError {
46+
kubernetesClient.resource(resource).delete()
47+
}
48+
}
49+
resources.clear()
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)