From a851cb7357bc1e1fa19141ad1ad3beabfe95f982 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 15 Feb 2017 14:49:38 -0800 Subject: [PATCH 01/11] Refactor the cleaning up of Kubernetes components. Create a KubernetesComponentsCleaner which can register arbitrary pods, services, secrets, and ingresses. When an exception is thrown or the JVM shuts down, the cleaner automatically purges any of its registered components from Kubernetes. The components can be unregistered when the driver successfully begins running, so that the application persists beyond the lifetime of the spark-submit process. --- .../spark/deploy/kubernetes/Client.scala | 234 ++++++++---------- .../KubernetesComponentCleaner.scala | 93 +++++++ 2 files changed, 202 insertions(+), 125 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index aa273a024f6f9..3bbe9869c3738 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -107,6 +107,7 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => + val kubernetesComponentCleaner = new KubernetesComponentCleaner(kubernetesClient) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -114,92 +115,72 @@ private[spark] class Client( .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() + kubernetesComponentCleaner.registerOrUpdateSecret(submitServerSecret) try { - val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient, + val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl( + kubernetesClient, + kubernetesComponentCleaner, driverSubmitSslOptions, isKeyStoreLocalFile) - try { - // start outer watch for status logging of driver pod - val driverPodCompletedLatch = new CountDownLatch(1) - // only enable interval logging if in waitForAppCompletion mode - val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 - val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, - loggingInterval) - Utils.tryWithResource(kubernetesClient - .pods() - .withName(kubernetesAppId) - .watch(loggingWatch)) { _ => - val (driverPod, driverService) = launchDriverKubernetesComponents( - kubernetesClient, - parsedCustomLabels, - submitServerSecret, - driverSubmitSslOptions, - sslSecrets, - sslVolumes, - sslVolumeMounts, - sslEnvs, - isKeyStoreLocalFile) - val ownerReferenceConfiguredDriverService = try { - configureOwnerReferences( - kubernetesClient, - submitServerSecret, - sslSecrets, - driverPod, - driverService) - } catch { - case e: Throwable => - cleanupPodAndService(kubernetesClient, driverPod, driverService) - throw new SparkException("Failed to set owner references to the driver pod.", e) - } - try { - submitApplicationToDriverServer(kubernetesClient, driverSubmitSslOptions, - ownerReferenceConfiguredDriverService, submitterLocalFiles, submitterLocalJars) - // wait if configured to do so - if (waitForAppCompletion) { - logInfo(s"Waiting for application $kubernetesAppId to finish...") - driverPodCompletedLatch.await() - logInfo(s"Application $kubernetesAppId finished.") - } else { - logInfo(s"Application $kubernetesAppId successfully launched.") - } - } catch { - case e: Throwable => - cleanupPodAndService(kubernetesClient, driverPod, - ownerReferenceConfiguredDriverService) - throw new SparkException("Failed to submit the application to the driver pod.", e) - } - } - } finally { - Utils.tryLogNonFatalError { - // Secrets may have been mutated so delete by name to avoid problems with not having - // the latest version. - sslSecrets.foreach { secret => - kubernetesClient.secrets().withName(secret.getMetadata.getName).delete() - } + sslSecrets.foreach(kubernetesComponentCleaner.registerOrUpdateSecret) + // start outer watch for status logging of driver pod + val driverPodCompletedLatch = new CountDownLatch(1) + // only enable interval logging if in waitForAppCompletion mode + val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 + val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, + loggingInterval) + Utils.tryWithResource(kubernetesClient + .pods() + .withName(kubernetesAppId) + .watch(loggingWatch)) { _ => + val (driverPod, driverService, driverIngress) = launchDriverKubernetesComponents( + kubernetesClient, + kubernetesComponentCleaner, + parsedCustomLabels, + submitServerSecret, + driverSubmitSslOptions, + sslSecrets, + sslVolumes, + sslVolumeMounts, + sslEnvs, + isKeyStoreLocalFile) + configureOwnerReferences( + kubernetesClient, + kubernetesComponentCleaner, + submitServerSecret, + sslSecrets, + driverPod, + driverService) + submitApplicationToDriverServer( + kubernetesClient, + kubernetesComponentCleaner, + driverSubmitSslOptions, + driverService, + submitterLocalFiles, + submitterLocalJars) + // Now that the application has started, persist the components that were created beyond + // the shutdown hook. We still want to purge the one-time secrets, so do not unregister + // those. + kubernetesComponentCleaner.unregisterPod(driverPod) + kubernetesComponentCleaner.unregisterService(driverService) + // wait if configured to do so + if (waitForAppCompletion) { + logInfo(s"Waiting for application $kubernetesAppId to finish...") + driverPodCompletedLatch.await() + logInfo(s"Application $kubernetesAppId finished.") + } else { + logInfo(s"Application $kubernetesAppId successfully launched.") } } } finally { - Utils.tryLogNonFatalError { - kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).delete() - } + kubernetesComponentCleaner.purgeAllRegisteredComponentsFromKubernetes() } } } - private def cleanupPodAndService( - kubernetesClient: KubernetesClient, - driverPod: Pod, - driverService: Service): Unit = { - Utils.tryLogNonFatalError { - kubernetesClient.services().delete(driverService) - } - Utils.tryLogNonFatalError { - kubernetesClient.pods().delete(driverPod) - } - } - private def submitApplicationToDriverServer( kubernetesClient: KubernetesClient, + kubernetesComponentCleaner: KubernetesComponentCleaner, driverSubmitSslOptions: SSLOptions, driverService: Service, submitterLocalFiles: Iterable[String], @@ -231,16 +212,22 @@ private[spark] class Client( .withPort(uiPort) .withNewTargetPort(uiPort) .build() - kubernetesClient.services().withName(kubernetesAppId).edit().editSpec() - .withType("ClusterIP") - .withPorts(uiServicePort) - .endSpec() - .done() + val clusterIPService = kubernetesClient + .services() + .withName(kubernetesAppId) + .edit() + .editSpec() + .withType("ClusterIP") + .withPorts(uiServicePort) + .endSpec() + .done() + kubernetesComponentCleaner.registerOrUpdateService(clusterIPService) logInfo("Finished submitting application to Kubernetes.") } private def launchDriverKubernetesComponents( kubernetesClient: KubernetesClient, + kubernetesComponentsCleaner: KubernetesComponentCleaner, parsedCustomLabels: Map[String, String], submitServerSecret: Secret, driverSubmitSslOptions: SSLOptions, @@ -276,37 +263,19 @@ private[spark] class Client( kubernetesClient, driverKubernetesSelectors, submitServerSecret) - val driverPod = try { - createDriverPod( - kubernetesClient, - driverKubernetesSelectors, - submitServerSecret, - driverSubmitSslOptions, - sslVolumes, - sslVolumeMounts, - sslEnvs) - } catch { - case e: Throwable => - Utils.tryLogNonFatalError { - kubernetesClient.services().delete(driverService) - } - throw new SparkException("Failed to create the driver pod.", e) - } - try { - waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, - serviceReadyFuture, podReadyFuture) - (driverPod, driverService) - } catch { - case e: Throwable => - Utils.tryLogNonFatalError { - kubernetesClient.services().delete(driverService) - } - Utils.tryLogNonFatalError { - kubernetesClient.pods().delete(driverPod) - } - throw new SparkException("Timed out while waiting for a Kubernetes component to be" + - " ready.", e) - } + kubernetesComponentsCleaner.registerOrUpdateService(driverService) + val driverPod = createDriverPod( + kubernetesClient, + driverKubernetesSelectors, + submitServerSecret, + driverSubmitSslOptions, + sslVolumes, + sslVolumeMounts, + sslEnvs) + kubernetesComponentsCleaner.registerOrUpdatePod(driverPod) + waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, + serviceReadyFuture, podReadyFuture) + (driverPod, driverService) } } } @@ -320,6 +289,7 @@ private[spark] class Client( */ private def configureOwnerReferences( kubernetesClient: KubernetesClient, + kubernetesComponentCleaner: KubernetesComponentCleaner, submitServerSecret: Secret, sslSecrets: Array[Secret], driverPod: Pod, @@ -332,22 +302,32 @@ private[spark] class Client( .withController(true) .build() sslSecrets.foreach(secret => { - kubernetesClient.secrets().withName(secret.getMetadata.getName).edit() + val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit() .editMetadata() .addToOwnerReferences(driverPodOwnerRef) .endMetadata() .done() + kubernetesComponentCleaner.registerOrUpdateSecret(updatedSecret) }) - kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).edit() - .editMetadata() - .addToOwnerReferences(driverPodOwnerRef) - .endMetadata() - .done() - kubernetesClient.services().withName(driverService.getMetadata.getName).edit() - .editMetadata() - .addToOwnerReferences(driverPodOwnerRef) - .endMetadata() - .done() + val updatedSubmitServerSecret = kubernetesClient + .secrets() + .withName(submitServerSecret.getMetadata.getName) + .edit() + .editMetadata() + .addToOwnerReferences(driverPodOwnerRef) + .endMetadata() + .done() + kubernetesComponentCleaner.registerOrUpdateSecret(updatedSubmitServerSecret) + val updatedService = kubernetesClient + .services() + .withName(driverService.getMetadata.getName) + .edit() + .editMetadata() + .addToOwnerReferences(driverPodOwnerRef) + .endMetadata() + .done() + kubernetesComponentCleaner.registerOrUpdateService(updatedService) + updatedService } private def waitForReadyKubernetesComponents( @@ -411,7 +391,7 @@ private[spark] class Client( driverSubmitSslOptions: SSLOptions, sslVolumes: Array[Volume], sslVolumeMounts: Array[VolumeMount], - sslEnvs: Array[EnvVar]) = { + sslEnvs: Array[EnvVar]): Pod = { val containerPorts = buildContainerPorts() val probePingHttpGet = new HTTPGetActionBuilder() .withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP") @@ -531,9 +511,12 @@ private[spark] class Client( (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore) } - private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions, - isKeyStoreLocalFile: Boolean): - (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { + private def configureSsl( + kubernetesClient: KubernetesClient, + kubernetesComponentCleaner: KubernetesComponentCleaner, + driverSubmitSslOptions: SSLOptions, + isKeyStoreLocalFile: Boolean): + (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { if (driverSubmitSslOptions.enabled) { val sslSecretsMap = mutable.HashMap[String, String]() val sslEnvs = mutable.Buffer[EnvVar]() @@ -600,6 +583,7 @@ private[spark] class Client( .withData(sslSecretsMap.asJava) .withType("Opaque") .done() + kubernetesComponentCleaner.registerOrUpdateSecret(sslSecrets) secrets += sslSecrets (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) } else { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala new file mode 100644 index 0000000000000..bcdeb34f68eb2 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import io.fabric8.kubernetes.api.model.{Pod, Secret, Service} +import io.fabric8.kubernetes.api.model.extensions.Ingress +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.util.{ShutdownHookManager, Utils} + +private[spark] class KubernetesComponentCleaner(kubernetesClient: KubernetesClient) { + private val LOCK = new Object + private val registeredPods = mutable.HashMap.empty[String, Pod] + private val registeredServices = mutable.HashMap.empty[String, Service] + private val registeredSecrets = mutable.HashMap.empty[String, Secret] + private val registeredIngresses = mutable.HashMap.empty[String, Ingress] + + ShutdownHookManager.addShutdownHook(() => purgeAllRegisteredComponentsFromKubernetes()) + + def registerOrUpdatePod(pod: Pod): Unit = LOCK.synchronized { + registeredPods.put(pod.getMetadata.getName, pod) + } + + def unregisterPod(pod: Pod): Unit = LOCK.synchronized { + registeredPods.remove(pod.getMetadata.getName) + } + + def registerOrUpdateService(service: Service): Unit = LOCK.synchronized { + registeredServices.put(service.getMetadata.getName, service) + } + + def unregisterService(service: Service): Unit = LOCK.synchronized { + registeredServices.remove(service.getMetadata.getName) + } + + def registerOrUpdateSecret(secret: Secret): Unit = LOCK.synchronized { + registeredSecrets.put(secret.getMetadata.getName, secret) + } + + def unregisterSecret(secret: Secret): Unit = LOCK.synchronized { + registeredSecrets.remove(secret.getMetadata.getName) + } + + def registerOrUpdateIngress(ingress: Ingress): Unit = LOCK.synchronized { + registeredIngresses.put(ingress.getMetadata.getName, ingress) + } + + def unregisterIngress(ingress: Ingress): Unit = LOCK.synchronized { + registeredIngresses.remove(ingress.getMetadata.getName) + } + + def purgeAllRegisteredComponentsFromKubernetes(): Unit = LOCK.synchronized { + registeredPods.values.foreach { pod => + Utils.tryLogNonFatalError { + kubernetesClient.pods().delete(pod) + } + } + registeredPods.clear() + registeredServices.values.foreach { service => + Utils.tryLogNonFatalError { + kubernetesClient.services().delete(service) + } + } + registeredServices.clear() + registeredSecrets.values.foreach { secret => + Utils.tryLogNonFatalError { + kubernetesClient.secrets().delete(secret) + } + } + registeredSecrets.clear() + registeredIngresses.values.foreach { ingress => + Utils.tryLogNonFatalError { + kubernetesClient.extensions().ingresses().delete(ingress) + } + } + registeredIngresses.clear() + } +} From c4f596e138a303e860500122c68e12e76899db28 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 15 Feb 2017 15:23:01 -0800 Subject: [PATCH 02/11] Fix spacing --- .../scala/org/apache/spark/deploy/kubernetes/Client.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 3bbe9869c3738..f5b1f78bb5599 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -227,7 +227,7 @@ private[spark] class Client( private def launchDriverKubernetesComponents( kubernetesClient: KubernetesClient, - kubernetesComponentsCleaner: KubernetesComponentCleaner, + kubernetesComponentCleaner: KubernetesComponentCleaner, parsedCustomLabels: Map[String, String], submitServerSecret: Secret, driverSubmitSslOptions: SSLOptions, @@ -263,7 +263,7 @@ private[spark] class Client( kubernetesClient, driverKubernetesSelectors, submitServerSecret) - kubernetesComponentsCleaner.registerOrUpdateService(driverService) + kubernetesComponentCleaner.registerOrUpdateService(driverService) val driverPod = createDriverPod( kubernetesClient, driverKubernetesSelectors, @@ -272,7 +272,7 @@ private[spark] class Client( sslVolumes, sslVolumeMounts, sslEnvs) - kubernetesComponentsCleaner.registerOrUpdatePod(driverPod) + kubernetesComponentCleaner.registerOrUpdatePod(driverPod) waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, serviceReadyFuture, podReadyFuture) (driverPod, driverService) From d40fcda6e41fbc2224d905530f7ccf423231308e Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Feb 2017 14:43:17 -0800 Subject: [PATCH 03/11] Address comments --- .../spark/deploy/kubernetes/Client.scala | 2 +- .../KubernetesComponentCleaner.scala | 71 +++++++++++-------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index f5b1f78bb5599..15891ce38fab8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -173,7 +173,7 @@ private[spark] class Client( } } } finally { - kubernetesComponentCleaner.purgeAllRegisteredComponentsFromKubernetes() + kubernetesComponentCleaner.deleteAllRegisteredComponentsFromKubernetes() } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala index bcdeb34f68eb2..8771139a314df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala @@ -21,73 +21,88 @@ import io.fabric8.kubernetes.api.model.extensions.Ingress import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.mutable +import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} -private[spark] class KubernetesComponentCleaner(kubernetesClient: KubernetesClient) { - private val LOCK = new Object +private[spark] class KubernetesComponentCleaner(kubernetesClient: KubernetesClient) + extends Logging { private val registeredPods = mutable.HashMap.empty[String, Pod] private val registeredServices = mutable.HashMap.empty[String, Service] private val registeredSecrets = mutable.HashMap.empty[String, Secret] private val registeredIngresses = mutable.HashMap.empty[String, Ingress] - ShutdownHookManager.addShutdownHook(() => purgeAllRegisteredComponentsFromKubernetes()) + ShutdownHookManager.addShutdownHook(() => deleteAllRegisteredComponentsFromKubernetes()) - def registerOrUpdatePod(pod: Pod): Unit = LOCK.synchronized { + def registerOrUpdatePod(pod: Pod): Unit = registeredPods.synchronized { registeredPods.put(pod.getMetadata.getName, pod) } - def unregisterPod(pod: Pod): Unit = LOCK.synchronized { + def unregisterPod(pod: Pod): Unit = registeredPods.synchronized { registeredPods.remove(pod.getMetadata.getName) } - def registerOrUpdateService(service: Service): Unit = LOCK.synchronized { + def registerOrUpdateService(service: Service): Unit = registeredServices.synchronized { registeredServices.put(service.getMetadata.getName, service) } - def unregisterService(service: Service): Unit = LOCK.synchronized { + def unregisterService(service: Service): Unit = registeredServices.synchronized { registeredServices.remove(service.getMetadata.getName) } - def registerOrUpdateSecret(secret: Secret): Unit = LOCK.synchronized { + def registerOrUpdateSecret(secret: Secret): Unit = registeredSecrets.synchronized { registeredSecrets.put(secret.getMetadata.getName, secret) } - def unregisterSecret(secret: Secret): Unit = LOCK.synchronized { + def unregisterSecret(secret: Secret): Unit = registeredSecrets.synchronized { registeredSecrets.remove(secret.getMetadata.getName) } - def registerOrUpdateIngress(ingress: Ingress): Unit = LOCK.synchronized { + def registerOrUpdateIngress(ingress: Ingress): Unit = registeredIngresses.synchronized { registeredIngresses.put(ingress.getMetadata.getName, ingress) } - def unregisterIngress(ingress: Ingress): Unit = LOCK.synchronized { + def unregisterIngress(ingress: Ingress): Unit = registeredIngresses.synchronized { registeredIngresses.remove(ingress.getMetadata.getName) } - def purgeAllRegisteredComponentsFromKubernetes(): Unit = LOCK.synchronized { - registeredPods.values.foreach { pod => - Utils.tryLogNonFatalError { - kubernetesClient.pods().delete(pod) + def deleteAllRegisteredComponentsFromKubernetes(): Unit = { + logInfo(s"Deleting registered Kubernetes components:" + + s" ${registeredPods.size} pod(s), ${registeredServices.size} service(s)," + + s" ${registeredSecrets.size} secret(s), and ${registeredIngresses} ingress(es).") + registeredPods.synchronized { + registeredPods.values.foreach { pod => + Utils.tryLogNonFatalError { + kubernetesClient.pods().delete(pod) + } } + registeredPods.clear() } - registeredPods.clear() - registeredServices.values.foreach { service => - Utils.tryLogNonFatalError { - kubernetesClient.services().delete(service) + + registeredServices.synchronized { + registeredServices.values.foreach { service => + Utils.tryLogNonFatalError { + kubernetesClient.services().delete(service) + } } + registeredServices.clear() } - registeredServices.clear() - registeredSecrets.values.foreach { secret => - Utils.tryLogNonFatalError { - kubernetesClient.secrets().delete(secret) + + registeredSecrets.synchronized { + registeredSecrets.values.foreach { secret => + Utils.tryLogNonFatalError { + kubernetesClient.secrets().delete(secret) + } } + registeredSecrets.clear() } - registeredSecrets.clear() - registeredIngresses.values.foreach { ingress => - Utils.tryLogNonFatalError { - kubernetesClient.extensions().ingresses().delete(ingress) + + registeredIngresses.synchronized { + registeredIngresses.values.foreach { ingress => + Utils.tryLogNonFatalError { + kubernetesClient.extensions().ingresses().delete(ingress) + } } + registeredIngresses.clear() } - registeredIngresses.clear() } } From 693d1ecd65f7966fda2b437c74bffbfd5a2871cd Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 22 Feb 2017 15:15:25 -0800 Subject: [PATCH 04/11] Fix compiler error --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 15891ce38fab8..93e32f1ee269e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -133,7 +133,7 @@ private[spark] class Client( .pods() .withName(kubernetesAppId) .watch(loggingWatch)) { _ => - val (driverPod, driverService, driverIngress) = launchDriverKubernetesComponents( + val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, kubernetesComponentCleaner, parsedCustomLabels, From e0b3b38239aa5b26e7a76bf03fe23881286ba6c2 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 22 Feb 2017 15:49:38 -0800 Subject: [PATCH 05/11] Pull KubernetesComponentCleaner into instance variable --- .../apache/spark/deploy/kubernetes/Client.scala | 15 ++++++--------- .../kubernetes/KubernetesComponentCleaner.scala | 14 ++------------ 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 30a556a53a0d7..340428b305b72 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -38,7 +38,7 @@ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShutdownHookManager, Utils} private[spark] class Client( sparkConf: SparkConf, @@ -75,6 +75,8 @@ private[spark] class Client( private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS) + private val kubernetesComponentCleaner = new KubernetesComponentCleaner + def run(): Unit = { logInfo(s"Starting application $kubernetesAppId in Kubernetes...") val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles) @@ -107,7 +109,8 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => - val kubernetesComponentCleaner = new KubernetesComponentCleaner(kubernetesClient) + ShutdownHookManager.addShutdownHook(() => + kubernetesComponentCleaner.deleteAllRegisteredComponentsFromKubernetes(kubernetesClient)) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -135,7 +138,6 @@ private[spark] class Client( .watch(loggingWatch)) { _ => val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, - kubernetesComponentCleaner, parsedCustomLabels, submitServerSecret, driverSubmitSslOptions, @@ -146,14 +148,12 @@ private[spark] class Client( isKeyStoreLocalFile) configureOwnerReferences( kubernetesClient, - kubernetesComponentCleaner, submitServerSecret, sslSecrets, driverPod, driverService) submitApplicationToDriverServer( kubernetesClient, - kubernetesComponentCleaner, driverSubmitSslOptions, driverService, submitterLocalFiles, @@ -173,14 +173,13 @@ private[spark] class Client( } } } finally { - kubernetesComponentCleaner.deleteAllRegisteredComponentsFromKubernetes() + kubernetesComponentCleaner.deleteAllRegisteredComponentsFromKubernetes(kubernetesClient) } } } private def submitApplicationToDriverServer( kubernetesClient: KubernetesClient, - kubernetesComponentCleaner: KubernetesComponentCleaner, driverSubmitSslOptions: SSLOptions, driverService: Service, submitterLocalFiles: Iterable[String], @@ -226,7 +225,6 @@ private[spark] class Client( private def launchDriverKubernetesComponents( kubernetesClient: KubernetesClient, - kubernetesComponentCleaner: KubernetesComponentCleaner, parsedCustomLabels: Map[String, String], submitServerSecret: Secret, driverSubmitSslOptions: SSLOptions, @@ -288,7 +286,6 @@ private[spark] class Client( */ private def configureOwnerReferences( kubernetesClient: KubernetesClient, - kubernetesComponentCleaner: KubernetesComponentCleaner, submitServerSecret: Secret, sslSecrets: Array[Secret], driverPod: Pod, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala index 8771139a314df..438005d660fe2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala @@ -24,15 +24,13 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} -private[spark] class KubernetesComponentCleaner(kubernetesClient: KubernetesClient) +private[spark] class KubernetesComponentCleaner extends Logging { private val registeredPods = mutable.HashMap.empty[String, Pod] private val registeredServices = mutable.HashMap.empty[String, Service] private val registeredSecrets = mutable.HashMap.empty[String, Secret] private val registeredIngresses = mutable.HashMap.empty[String, Ingress] - ShutdownHookManager.addShutdownHook(() => deleteAllRegisteredComponentsFromKubernetes()) - def registerOrUpdatePod(pod: Pod): Unit = registeredPods.synchronized { registeredPods.put(pod.getMetadata.getName, pod) } @@ -57,15 +55,7 @@ private[spark] class KubernetesComponentCleaner(kubernetesClient: KubernetesClie registeredSecrets.remove(secret.getMetadata.getName) } - def registerOrUpdateIngress(ingress: Ingress): Unit = registeredIngresses.synchronized { - registeredIngresses.put(ingress.getMetadata.getName, ingress) - } - - def unregisterIngress(ingress: Ingress): Unit = registeredIngresses.synchronized { - registeredIngresses.remove(ingress.getMetadata.getName) - } - - def deleteAllRegisteredComponentsFromKubernetes(): Unit = { + def deleteAllRegisteredComponentsFromKubernetes(kubernetesClient: KubernetesClient): Unit = { logInfo(s"Deleting registered Kubernetes components:" + s" ${registeredPods.size} pod(s), ${registeredServices.size} service(s)," + s" ${registeredSecrets.size} secret(s), and ${registeredIngresses} ingress(es).") From 567a284c7a8783f2172dfa996747fea596cf9941 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 22 Feb 2017 15:59:47 -0800 Subject: [PATCH 06/11] Remove a parameter --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 340428b305b72..51b4dd1b74af9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -122,7 +122,6 @@ private[spark] class Client( try { val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl( kubernetesClient, - kubernetesComponentCleaner, driverSubmitSslOptions, isKeyStoreLocalFile) sslSecrets.foreach(kubernetesComponentCleaner.registerOrUpdateSecret) @@ -509,7 +508,6 @@ private[spark] class Client( private def configureSsl( kubernetesClient: KubernetesClient, - kubernetesComponentCleaner: KubernetesComponentCleaner, driverSubmitSslOptions: SSLOptions, isKeyStoreLocalFile: Boolean): (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { From 50bce5d63d447fae3e56de835f9c52df174b7941 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 23 Feb 2017 10:43:57 -0800 Subject: [PATCH 07/11] Remove redundant registerOrUpdateSecret for SSL --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 51b4dd1b74af9..c047b68577e2c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -124,7 +124,6 @@ private[spark] class Client( kubernetesClient, driverSubmitSslOptions, isKeyStoreLocalFile) - sslSecrets.foreach(kubernetesComponentCleaner.registerOrUpdateSecret) // start outer watch for status logging of driver pod val driverPodCompletedLatch = new CountDownLatch(1) // only enable interval logging if in waitForAppCompletion mode From 43b02e089d4f5b3f32d4fef9e44be0d04992a8b5 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 23 Feb 2017 10:46:18 -0800 Subject: [PATCH 08/11] Remove Ingresses from component cleaner --- .../kubernetes/KubernetesComponentCleaner.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala index 438005d660fe2..bd8cdf7e93b08 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala @@ -29,7 +29,6 @@ private[spark] class KubernetesComponentCleaner private val registeredPods = mutable.HashMap.empty[String, Pod] private val registeredServices = mutable.HashMap.empty[String, Service] private val registeredSecrets = mutable.HashMap.empty[String, Secret] - private val registeredIngresses = mutable.HashMap.empty[String, Ingress] def registerOrUpdatePod(pod: Pod): Unit = registeredPods.synchronized { registeredPods.put(pod.getMetadata.getName, pod) @@ -57,8 +56,8 @@ private[spark] class KubernetesComponentCleaner def deleteAllRegisteredComponentsFromKubernetes(kubernetesClient: KubernetesClient): Unit = { logInfo(s"Deleting registered Kubernetes components:" + - s" ${registeredPods.size} pod(s), ${registeredServices.size} service(s)," + - s" ${registeredSecrets.size} secret(s), and ${registeredIngresses} ingress(es).") + s" ${registeredPods.size} pod(s), ${registeredServices.size} service(s), and" + + s" ${registeredSecrets.size} secret(s).") registeredPods.synchronized { registeredPods.values.foreach { pod => Utils.tryLogNonFatalError { @@ -85,14 +84,5 @@ private[spark] class KubernetesComponentCleaner } registeredSecrets.clear() } - - registeredIngresses.synchronized { - registeredIngresses.values.foreach { ingress => - Utils.tryLogNonFatalError { - kubernetesClient.extensions().ingresses().delete(ingress) - } - } - registeredIngresses.clear() - } } } From 258d8f829c6e66f68ee329d00a2f0cc10992b71f Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 23 Feb 2017 12:43:51 -0800 Subject: [PATCH 09/11] Clear resources generically as opposed to specifying each type --- .../spark/deploy/kubernetes/Client.scala | 26 +++--- .../KubernetesComponentCleaner.scala | 88 ------------------- .../KubernetesResourceCleaner.scala | 52 +++++++++++ .../integrationtest/KubernetesSuite.scala | 1 + 4 files changed, 66 insertions(+), 101 deletions(-) delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c047b68577e2c..340ba40cc53d9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -75,7 +75,7 @@ private[spark] class Client( private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS) - private val kubernetesComponentCleaner = new KubernetesComponentCleaner + private val kubernetesComponentCleaner = new KubernetesResourceCleaner def run(): Unit = { logInfo(s"Starting application $kubernetesAppId in Kubernetes...") @@ -110,7 +110,7 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => ShutdownHookManager.addShutdownHook(() => - kubernetesComponentCleaner.deleteAllRegisteredComponentsFromKubernetes(kubernetesClient)) + kubernetesComponentCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -118,7 +118,7 @@ private[spark] class Client( .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() - kubernetesComponentCleaner.registerOrUpdateSecret(submitServerSecret) + kubernetesComponentCleaner.registerOrUpdateResource(submitServerSecret) try { val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl( kubernetesClient, @@ -159,8 +159,8 @@ private[spark] class Client( // Now that the application has started, persist the components that were created beyond // the shutdown hook. We still want to purge the one-time secrets, so do not unregister // those. - kubernetesComponentCleaner.unregisterPod(driverPod) - kubernetesComponentCleaner.unregisterService(driverService) + kubernetesComponentCleaner.unregisterResource(driverPod) + kubernetesComponentCleaner.unregisterResource(driverService) // wait if configured to do so if (waitForAppCompletion) { logInfo(s"Waiting for application $kubernetesAppId to finish...") @@ -171,7 +171,7 @@ private[spark] class Client( } } } finally { - kubernetesComponentCleaner.deleteAllRegisteredComponentsFromKubernetes(kubernetesClient) + kubernetesComponentCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) } } } @@ -217,7 +217,7 @@ private[spark] class Client( .withPorts(uiServicePort) .endSpec() .done() - kubernetesComponentCleaner.registerOrUpdateService(resolvedService) + kubernetesComponentCleaner.registerOrUpdateResource(resolvedService) logInfo("Finished submitting application to Kubernetes.") } @@ -258,7 +258,7 @@ private[spark] class Client( kubernetesClient, driverKubernetesSelectors, submitServerSecret) - kubernetesComponentCleaner.registerOrUpdateService(driverService) + kubernetesComponentCleaner.registerOrUpdateResource(driverService) val driverPod = createDriverPod( kubernetesClient, driverKubernetesSelectors, @@ -267,7 +267,7 @@ private[spark] class Client( sslVolumes, sslVolumeMounts, sslEnvs) - kubernetesComponentCleaner.registerOrUpdatePod(driverPod) + kubernetesComponentCleaner.registerOrUpdateResource(driverPod) waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, serviceReadyFuture, podReadyFuture) (driverPod, driverService) @@ -301,7 +301,7 @@ private[spark] class Client( .addToOwnerReferences(driverPodOwnerRef) .endMetadata() .done() - kubernetesComponentCleaner.registerOrUpdateSecret(updatedSecret) + kubernetesComponentCleaner.registerOrUpdateResource(updatedSecret) }) val updatedSubmitServerSecret = kubernetesClient .secrets() @@ -311,7 +311,7 @@ private[spark] class Client( .addToOwnerReferences(driverPodOwnerRef) .endMetadata() .done() - kubernetesComponentCleaner.registerOrUpdateSecret(updatedSubmitServerSecret) + kubernetesComponentCleaner.registerOrUpdateResource(updatedSubmitServerSecret) val updatedService = kubernetesClient .services() .withName(driverService.getMetadata.getName) @@ -320,7 +320,7 @@ private[spark] class Client( .addToOwnerReferences(driverPodOwnerRef) .endMetadata() .done() - kubernetesComponentCleaner.registerOrUpdateService(updatedService) + kubernetesComponentCleaner.registerOrUpdateResource(updatedService) updatedService } @@ -576,7 +576,7 @@ private[spark] class Client( .withData(sslSecretsMap.asJava) .withType("Opaque") .done() - kubernetesComponentCleaner.registerOrUpdateSecret(sslSecrets) + kubernetesComponentCleaner.registerOrUpdateResource(sslSecrets) secrets += sslSecrets (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) } else { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala deleted file mode 100644 index bd8cdf7e93b08..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesComponentCleaner.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.kubernetes - -import io.fabric8.kubernetes.api.model.{Pod, Secret, Service} -import io.fabric8.kubernetes.api.model.extensions.Ingress -import io.fabric8.kubernetes.client.KubernetesClient -import scala.collection.mutable - -import org.apache.spark.internal.Logging -import org.apache.spark.util.{ShutdownHookManager, Utils} - -private[spark] class KubernetesComponentCleaner - extends Logging { - private val registeredPods = mutable.HashMap.empty[String, Pod] - private val registeredServices = mutable.HashMap.empty[String, Service] - private val registeredSecrets = mutable.HashMap.empty[String, Secret] - - def registerOrUpdatePod(pod: Pod): Unit = registeredPods.synchronized { - registeredPods.put(pod.getMetadata.getName, pod) - } - - def unregisterPod(pod: Pod): Unit = registeredPods.synchronized { - registeredPods.remove(pod.getMetadata.getName) - } - - def registerOrUpdateService(service: Service): Unit = registeredServices.synchronized { - registeredServices.put(service.getMetadata.getName, service) - } - - def unregisterService(service: Service): Unit = registeredServices.synchronized { - registeredServices.remove(service.getMetadata.getName) - } - - def registerOrUpdateSecret(secret: Secret): Unit = registeredSecrets.synchronized { - registeredSecrets.put(secret.getMetadata.getName, secret) - } - - def unregisterSecret(secret: Secret): Unit = registeredSecrets.synchronized { - registeredSecrets.remove(secret.getMetadata.getName) - } - - def deleteAllRegisteredComponentsFromKubernetes(kubernetesClient: KubernetesClient): Unit = { - logInfo(s"Deleting registered Kubernetes components:" + - s" ${registeredPods.size} pod(s), ${registeredServices.size} service(s), and" + - s" ${registeredSecrets.size} secret(s).") - registeredPods.synchronized { - registeredPods.values.foreach { pod => - Utils.tryLogNonFatalError { - kubernetesClient.pods().delete(pod) - } - } - registeredPods.clear() - } - - registeredServices.synchronized { - registeredServices.values.foreach { service => - Utils.tryLogNonFatalError { - kubernetesClient.services().delete(service) - } - } - registeredServices.clear() - } - - registeredSecrets.synchronized { - registeredSecrets.values.foreach { secret => - Utils.tryLogNonFatalError { - kubernetesClient.secrets().delete(secret) - } - } - registeredSecrets.clear() - } - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala new file mode 100644 index 0000000000000..fb76b04604479 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[spark] class KubernetesResourceCleaner + extends Logging { + + private val resources = mutable.HashMap.empty[(String, String), HasMetadata] + + // Synchronized because deleteAllRegisteredResourcesFromKubernetes may be called from a + // shutdown hook + def registerOrUpdateResource(resource: HasMetadata): Unit = synchronized { + resources.put((resource.getMetadata.getName, resource.getKind), resource) + } + + def unregisterResource(resource: HasMetadata): Unit = synchronized { + resources.remove((resource.getMetadata.getName, resource.getKind)) + } + + def deleteAllRegisteredResourcesFromKubernetes(kubernetesClient: KubernetesClient): Unit = { + synchronized { + logInfo(s"Deleting ${resources.size} registered Kubernetes resources:") + resources.values.foreach { resource => + Utils.tryLogNonFatalError { + kubernetesClient.resource(resource).delete() + } + } + resources.clear() + } + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index fe171db15b3d1..deef87c72ce52 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -148,6 +148,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { assert(result.size == 1) result } + assert(minikubeKubernetesClient.secrets().list().getItems.asScala.nonEmpty) } test("Run a simple example") { From f1c978a9a1bf8b8986b5a26f893f8548b5cfb2a4 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 23 Feb 2017 12:45:57 -0800 Subject: [PATCH 10/11] Remove incorrect test assertion --- .../deploy/kubernetes/integrationtest/KubernetesSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index deef87c72ce52..fe171db15b3d1 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -148,7 +148,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { assert(result.size == 1) result } - assert(minikubeKubernetesClient.secrets().list().getItems.asScala.nonEmpty) } test("Run a simple example") { From 8902f02c7ec5ac964e05ba2e501c9452960bbe9c Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 23 Feb 2017 13:51:50 -0800 Subject: [PATCH 11/11] Rename variable --- .../spark/deploy/kubernetes/Client.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 340ba40cc53d9..6678ba1a09a5b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -75,7 +75,7 @@ private[spark] class Client( private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS) - private val kubernetesComponentCleaner = new KubernetesResourceCleaner + private val kubernetesResourceCleaner = new KubernetesResourceCleaner def run(): Unit = { logInfo(s"Starting application $kubernetesAppId in Kubernetes...") @@ -110,7 +110,7 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => ShutdownHookManager.addShutdownHook(() => - kubernetesComponentCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -118,7 +118,7 @@ private[spark] class Client( .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() - kubernetesComponentCleaner.registerOrUpdateResource(submitServerSecret) + kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) try { val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl( kubernetesClient, @@ -159,8 +159,8 @@ private[spark] class Client( // Now that the application has started, persist the components that were created beyond // the shutdown hook. We still want to purge the one-time secrets, so do not unregister // those. - kubernetesComponentCleaner.unregisterResource(driverPod) - kubernetesComponentCleaner.unregisterResource(driverService) + kubernetesResourceCleaner.unregisterResource(driverPod) + kubernetesResourceCleaner.unregisterResource(driverService) // wait if configured to do so if (waitForAppCompletion) { logInfo(s"Waiting for application $kubernetesAppId to finish...") @@ -171,7 +171,7 @@ private[spark] class Client( } } } finally { - kubernetesComponentCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) } } } @@ -217,7 +217,7 @@ private[spark] class Client( .withPorts(uiServicePort) .endSpec() .done() - kubernetesComponentCleaner.registerOrUpdateResource(resolvedService) + kubernetesResourceCleaner.registerOrUpdateResource(resolvedService) logInfo("Finished submitting application to Kubernetes.") } @@ -258,7 +258,7 @@ private[spark] class Client( kubernetesClient, driverKubernetesSelectors, submitServerSecret) - kubernetesComponentCleaner.registerOrUpdateResource(driverService) + kubernetesResourceCleaner.registerOrUpdateResource(driverService) val driverPod = createDriverPod( kubernetesClient, driverKubernetesSelectors, @@ -267,7 +267,7 @@ private[spark] class Client( sslVolumes, sslVolumeMounts, sslEnvs) - kubernetesComponentCleaner.registerOrUpdateResource(driverPod) + kubernetesResourceCleaner.registerOrUpdateResource(driverPod) waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, serviceReadyFuture, podReadyFuture) (driverPod, driverService) @@ -301,7 +301,7 @@ private[spark] class Client( .addToOwnerReferences(driverPodOwnerRef) .endMetadata() .done() - kubernetesComponentCleaner.registerOrUpdateResource(updatedSecret) + kubernetesResourceCleaner.registerOrUpdateResource(updatedSecret) }) val updatedSubmitServerSecret = kubernetesClient .secrets() @@ -311,7 +311,7 @@ private[spark] class Client( .addToOwnerReferences(driverPodOwnerRef) .endMetadata() .done() - kubernetesComponentCleaner.registerOrUpdateResource(updatedSubmitServerSecret) + kubernetesResourceCleaner.registerOrUpdateResource(updatedSubmitServerSecret) val updatedService = kubernetesClient .services() .withName(driverService.getMetadata.getName) @@ -320,7 +320,7 @@ private[spark] class Client( .addToOwnerReferences(driverPodOwnerRef) .endMetadata() .done() - kubernetesComponentCleaner.registerOrUpdateResource(updatedService) + kubernetesResourceCleaner.registerOrUpdateResource(updatedService) updatedService } @@ -576,7 +576,7 @@ private[spark] class Client( .withData(sslSecretsMap.asJava) .withType("Opaque") .done() - kubernetesComponentCleaner.registerOrUpdateResource(sslSecrets) + kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets) secrets += sslSecrets (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) } else {