Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
import org.apache.spark.util.{ShutdownHookManager, Utils}

private[spark] class Client(
sparkConf: SparkConf,
Expand Down Expand Up @@ -75,6 +75,8 @@ private[spark] class Client(
private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS)

private val kubernetesResourceCleaner = new KubernetesResourceCleaner

def run(): Unit = {
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
Expand Down Expand Up @@ -107,97 +109,73 @@ private[spark] class Client(

val k8ClientConfig = k8ConfBuilder.build
Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient =>
ShutdownHookManager.addShutdownHook(() =>
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
val submitServerSecret = kubernetesClient.secrets().createNew()
.withNewMetadata()
.withName(secretName)
.endMetadata()
.withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava)
.withType("Opaque")
.done()
kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret)
try {
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient,
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(
kubernetesClient,
driverSubmitSslOptions,
isKeyStoreLocalFile)
try {
// start outer watch for status logging of driver pod
val driverPodCompletedLatch = new CountDownLatch(1)
// only enable interval logging if in waitForAppCompletion mode
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
loggingInterval)
Utils.tryWithResource(kubernetesClient
.pods()
.withName(kubernetesAppId)
.watch(loggingWatch)) { _ =>
val (driverPod, driverService) = launchDriverKubernetesComponents(
kubernetesClient,
parsedCustomLabels,
submitServerSecret,
driverSubmitSslOptions,
sslSecrets,
sslVolumes,
sslVolumeMounts,
sslEnvs,
isKeyStoreLocalFile)
val ownerReferenceConfiguredDriverService = try {
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslSecrets,
driverPod,
driverService)
} catch {
case e: Throwable =>
cleanupPodAndService(kubernetesClient, driverPod, driverService)
throw new SparkException("Failed to set owner references to the driver pod.", e)
}
try {
submitApplicationToDriverServer(kubernetesClient, driverSubmitSslOptions,
ownerReferenceConfiguredDriverService, submitterLocalFiles, submitterLocalJars)
// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
driverPodCompletedLatch.await()
logInfo(s"Application $kubernetesAppId finished.")
} else {
logInfo(s"Application $kubernetesAppId successfully launched.")
}
} catch {
case e: Throwable =>
cleanupPodAndService(kubernetesClient, driverPod,
ownerReferenceConfiguredDriverService)
throw new SparkException("Failed to submit the application to the driver pod.", e)
}
}
} finally {
Utils.tryLogNonFatalError {
// Secrets may have been mutated so delete by name to avoid problems with not having
// the latest version.
sslSecrets.foreach { secret =>
kubernetesClient.secrets().withName(secret.getMetadata.getName).delete()
}
// start outer watch for status logging of driver pod
val driverPodCompletedLatch = new CountDownLatch(1)
// only enable interval logging if in waitForAppCompletion mode
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
loggingInterval)
Utils.tryWithResource(kubernetesClient
.pods()
.withName(kubernetesAppId)
.watch(loggingWatch)) { _ =>
val (driverPod, driverService) = launchDriverKubernetesComponents(
kubernetesClient,
parsedCustomLabels,
submitServerSecret,
driverSubmitSslOptions,
sslSecrets,
sslVolumes,
sslVolumeMounts,
sslEnvs,
isKeyStoreLocalFile)
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslSecrets,
driverPod,
driverService)
submitApplicationToDriverServer(
kubernetesClient,
driverSubmitSslOptions,
driverService,
submitterLocalFiles,
submitterLocalJars)
// Now that the application has started, persist the components that were created beyond
// the shutdown hook. We still want to purge the one-time secrets, so do not unregister
// those.
kubernetesResourceCleaner.unregisterResource(driverPod)
kubernetesResourceCleaner.unregisterResource(driverService)
// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
driverPodCompletedLatch.await()
logInfo(s"Application $kubernetesAppId finished.")
} else {
logInfo(s"Application $kubernetesAppId successfully launched.")
}
}
} finally {
Utils.tryLogNonFatalError {
kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).delete()
}
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)
}
}
}

private def cleanupPodAndService(
kubernetesClient: KubernetesClient,
driverPod: Pod,
driverService: Service): Unit = {
Utils.tryLogNonFatalError {
kubernetesClient.services().delete(driverService)
}
Utils.tryLogNonFatalError {
kubernetesClient.pods().delete(driverPod)
}
}

private def submitApplicationToDriverServer(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
Expand Down Expand Up @@ -233,11 +211,13 @@ private[spark] class Client(
.withPort(uiPort)
.withNewTargetPort(uiPort)
.build()
kubernetesClient.services().withName(kubernetesAppId).edit().editSpec()
.withType(uiServiceType)
.withPorts(uiServicePort)
.endSpec()
val resolvedService = kubernetesClient.services().withName(kubernetesAppId).edit()
.editSpec()
.withType(uiServiceType)
.withPorts(uiServicePort)
.endSpec()
.done()
kubernetesResourceCleaner.registerOrUpdateResource(resolvedService)
logInfo("Finished submitting application to Kubernetes.")
}

Expand Down Expand Up @@ -278,37 +258,19 @@ private[spark] class Client(
kubernetesClient,
driverKubernetesSelectors,
submitServerSecret)
val driverPod = try {
createDriverPod(
kubernetesClient,
driverKubernetesSelectors,
submitServerSecret,
driverSubmitSslOptions,
sslVolumes,
sslVolumeMounts,
sslEnvs)
} catch {
case e: Throwable =>
Utils.tryLogNonFatalError {
kubernetesClient.services().delete(driverService)
}
throw new SparkException("Failed to create the driver pod.", e)
}
try {
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
serviceReadyFuture, podReadyFuture)
(driverPod, driverService)
} catch {
case e: Throwable =>
Utils.tryLogNonFatalError {
kubernetesClient.services().delete(driverService)
}
Utils.tryLogNonFatalError {
kubernetesClient.pods().delete(driverPod)
}
throw new SparkException("Timed out while waiting for a Kubernetes component to be" +
" ready.", e)
}
kubernetesResourceCleaner.registerOrUpdateResource(driverService)
val driverPod = createDriverPod(
kubernetesClient,
driverKubernetesSelectors,
submitServerSecret,
driverSubmitSslOptions,
sslVolumes,
sslVolumeMounts,
sslEnvs)
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
serviceReadyFuture, podReadyFuture)
(driverPod, driverService)
}
}
}
Expand All @@ -334,22 +296,32 @@ private[spark] class Client(
.withController(true)
.build()
sslSecrets.foreach(secret => {
kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
kubernetesResourceCleaner.registerOrUpdateResource(updatedSecret)
})
kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
kubernetesClient.services().withName(driverService.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
val updatedSubmitServerSecret = kubernetesClient
.secrets()
.withName(submitServerSecret.getMetadata.getName)
.edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
kubernetesResourceCleaner.registerOrUpdateResource(updatedSubmitServerSecret)
val updatedService = kubernetesClient
.services()
.withName(driverService.getMetadata.getName)
.edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
kubernetesResourceCleaner.registerOrUpdateResource(updatedService)
updatedService
}

private def waitForReadyKubernetesComponents(
Expand Down Expand Up @@ -413,7 +385,7 @@ private[spark] class Client(
driverSubmitSslOptions: SSLOptions,
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar]) = {
sslEnvs: Array[EnvVar]): Pod = {
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
.withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP")
Expand Down Expand Up @@ -533,9 +505,11 @@ private[spark] class Client(
(securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
}

private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
private def configureSsl(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
if (driverSubmitSslOptions.enabled) {
val sslSecretsMap = mutable.HashMap[String, String]()
val sslEnvs = mutable.Buffer[EnvVar]()
Expand Down Expand Up @@ -602,6 +576,7 @@ private[spark] class Client(
.withData(sslSecretsMap.asJava)
.withType("Opaque")
.done()
kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets)
secrets += sslSecrets
(sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.HasMetadata
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] class KubernetesResourceCleaner
extends Logging {

private val resources = mutable.HashMap.empty[(String, String), HasMetadata]

// Synchronized because deleteAllRegisteredResourcesFromKubernetes may be called from a
// shutdown hook
def registerOrUpdateResource(resource: HasMetadata): Unit = synchronized {
resources.put((resource.getMetadata.getName, resource.getKind), resource)
}

def unregisterResource(resource: HasMetadata): Unit = synchronized {
resources.remove((resource.getMetadata.getName, resource.getKind))
}

def deleteAllRegisteredResourcesFromKubernetes(kubernetesClient: KubernetesClient): Unit = {
synchronized {
logInfo(s"Deleting ${resources.size} registered Kubernetes resources:")
resources.values.foreach { resource =>
Utils.tryLogNonFatalError {
kubernetesClient.resource(resource).delete()
}
}
resources.clear()
}
}
}