Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
*/
package org.apache.spark.deploy.kubernetes

import java.io.{File, FileInputStream}
import java.security.{KeyStore, SecureRandom}
import java.io.File
import java.security.SecureRandom
import java.util
import java.util.concurrent.{CountDownLatch, TimeUnit}
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}

import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
Expand All @@ -56,8 +53,6 @@ private[spark] class Client(
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId"
private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
Expand Down Expand Up @@ -95,7 +90,6 @@ private[spark] class Client(
throw new SparkException(s"Main app resource file $mainAppResource is not a file or" +
s" is a directory.")
}
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
val parsedCustomLabels = parseCustomLabels(customLabels)
var k8ConfBuilder = new K8SConfigBuilder()
.withApiVersion("v1")
Expand All @@ -115,6 +109,8 @@ private[spark] class Client(
Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient =>
ShutdownHookManager.addShutdownHook(() =>
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
val sslConfigurationProvider = new SslConfigurationProvider(
sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner)
val submitServerSecret = kubernetesClient.secrets().createNew()
.withNewMetadata()
.withName(secretName)
Expand All @@ -124,10 +120,7 @@ private[spark] class Client(
.done()
kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret)
try {
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(
kubernetesClient,
driverSubmitSslOptions,
isKeyStoreLocalFile)
val sslConfiguration = sslConfigurationProvider.getSslConfiguration()
// start outer watch for status logging of driver pod
val driverPodCompletedLatch = new CountDownLatch(1)
// only enable interval logging if in waitForAppCompletion mode
Expand All @@ -142,21 +135,16 @@ private[spark] class Client(
kubernetesClient,
parsedCustomLabels,
submitServerSecret,
driverSubmitSslOptions,
sslSecrets,
sslVolumes,
sslVolumeMounts,
sslEnvs,
isKeyStoreLocalFile)
sslConfiguration)
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslSecrets,
sslConfiguration.sslSecrets,
driverPod,
driverService)
submitApplicationToDriverServer(
kubernetesClient,
driverSubmitSslOptions,
sslConfiguration,
driverService,
submitterLocalFiles,
submitterLocalJars)
Expand All @@ -182,7 +170,7 @@ private[spark] class Client(

private def submitApplicationToDriverServer(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
sslConfiguration: SslConfiguration,
driverService: Service,
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String]): Unit = {
Expand All @@ -198,7 +186,7 @@ private[spark] class Client(
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, driverService,
driverSubmitSslOptions)
sslConfiguration)
// Sanity check to see if the driver submitter is even reachable.
driverSubmitter.ping()
logInfo(s"Submitting local resources to driver pod for application " +
Expand Down Expand Up @@ -229,20 +217,15 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
parsedCustomLabels: Map[String, String],
submitServerSecret: Secret,
driverSubmitSslOptions: SSLOptions,
sslSecrets: Array[Secret],
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar],
isKeyStoreLocalFile: Boolean): (Pod, Service) = {
val endpointsReadyFuture = SettableFuture.create[Endpoints]
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
val serviceReadyFuture = SettableFuture.create[Service]
sslConfiguration: SslConfiguration): (Pod, Service) = {
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
val endpointsReadyFuture = SettableFuture.create[Endpoints]
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
val serviceReadyFuture = SettableFuture.create[Service]
val serviceReadyWatcher = new DriverServiceReadyWatcher(serviceReadyFuture)
val podReadyFuture = SettableFuture.create[Pod]
val podWatcher = new DriverPodReadyWatcher(podReadyFuture)
Expand All @@ -267,10 +250,7 @@ private[spark] class Client(
kubernetesClient,
driverKubernetesSelectors,
submitServerSecret,
driverSubmitSslOptions,
sslVolumes,
sslVolumeMounts,
sslEnvs)
sslConfiguration)
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
serviceReadyFuture, podReadyFuture)
Expand Down Expand Up @@ -386,13 +366,10 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
driverKubernetesSelectors: util.Map[String, String],
submitServerSecret: Secret,
driverSubmitSslOptions: SSLOptions,
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar]): Pod = {
sslConfiguration: SslConfiguration): Pod = {
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
.withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP")
.withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP")
.withPath("/v1/submissions/ping")
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
Expand All @@ -409,7 +386,7 @@ private[spark] class Client(
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume()
.addToVolumes(sslVolumes: _*)
.addToVolumes(sslConfiguration.sslPodVolumes: _*)
.withServiceAccount(serviceAccount)
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
Expand All @@ -420,7 +397,7 @@ private[spark] class Client(
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
.addToVolumeMounts(sslVolumeMounts: _*)
.addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*)
.addNewEnv()
.withName(ENV_SUBMISSION_SECRET_LOCATION)
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
Expand All @@ -429,7 +406,7 @@ private[spark] class Client(
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
.addToEnv(sslEnvs: _*)
.addToEnv(sslConfiguration.sslPodEnvVars: _*)
.withPorts(containerPorts.asJava)
.withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe()
.endContainer()
Expand Down Expand Up @@ -486,108 +463,6 @@ private[spark] class Client(
}
}

private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = {
val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE)
val resolvedSparkConf = sparkConf.clone()
val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => {
(KubernetesFileUtils.isUriLocalFile(keyStore),
Option.apply(Utils.resolveURI(keyStore).getPath))
}).getOrElse((false, Option.empty[String]))
resolvedKeyStore.foreach {
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _)
}
sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore =>
if (KubernetesFileUtils.isUriLocalFile(trustStore)) {
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE,
Utils.resolveURI(trustStore).getPath)
} else {
throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" +
" for submit server must have no scheme, or scheme file://")
}
}
val securityManager = new SecurityManager(resolvedSparkConf)
(securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
}

private def configureSsl(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
if (driverSubmitSslOptions.enabled) {
val sslSecretsMap = mutable.HashMap[String, String]()
val sslEnvs = mutable.Buffer[EnvVar]()
val secrets = mutable.Buffer[Secret]()
driverSubmitSslOptions.keyStore.foreach(store => {
val resolvedKeyStoreFile = if (isKeyStoreLocalFile) {
if (!store.isFile) {
throw new SparkException(s"KeyStore specified at $store is not a file or" +
s" does not exist.")
}
val keyStoreBytes = Files.toByteArray(store)
val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes)
sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64)
s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME"
} else {
store.getAbsolutePath
}
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_FILE)
.withValue(resolvedKeyStoreFile)
.build()
})
driverSubmitSslOptions.keyStorePassword.foreach(password => {
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64)
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
.withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
.build()
})
driverSubmitSslOptions.keyPassword.foreach(password => {
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64)
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
.withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
.build()
})
driverSubmitSslOptions.keyStoreType.foreach(storeType => {
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_TYPE)
.withValue(storeType)
.build()
})
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_USE_SSL)
.withValue("true")
.build()
val sslVolume = new VolumeBuilder()
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withNewSecret()
.withSecretName(sslSecretsName)
.endSecret()
.build()
val sslVolumeMount = new VolumeMountBuilder()
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withReadOnly(true)
.withMountPath(sslSecretsDirectory)
.build()
val sslSecrets = kubernetesClient.secrets().createNew()
.withNewMetadata()
.withName(sslSecretsName)
.endMetadata()
.withData(sslSecretsMap.asJava)
.withType("Opaque")
.done()
kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets)
secrets += sslSecrets
(sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
} else {
(Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]())
}
}

private def buildSubmitFailedErrorMessage(
kubernetesClient: KubernetesClient,
e: Throwable): String = {
Expand Down Expand Up @@ -688,8 +563,8 @@ private[spark] class Client(
private def buildDriverSubmissionClient(
kubernetesClient: KubernetesClient,
service: Service,
driverSubmitSslOptions: SSLOptions): KubernetesSparkRestApi = {
val urlScheme = if (driverSubmitSslOptions.enabled) {
sslConfiguration: SslConfiguration): KubernetesSparkRestApi = {
val urlScheme = if (sslConfiguration.sslOptions.enabled) {
"https"
} else {
logWarning("Submitting application details, application secret, and local" +
Expand All @@ -714,45 +589,18 @@ private[spark] class Client(
s"$urlScheme://${address.getAddress}:$servicePort"
}).toSet
require(nodeUrls.nonEmpty, "No nodes found to contact the driver!")
val (trustManager, sslContext): (X509TrustManager, SSLContext) =
if (driverSubmitSslOptions.enabled) {
buildSslConnectionConfiguration(driverSubmitSslOptions)
} else {
(null, SSLContext.getDefault)
}
HttpClientUtil.createClient[KubernetesSparkRestApi](
uris = nodeUrls,
maxRetriesPerServer = 3,
sslSocketFactory = sslContext.getSocketFactory,
trustContext = trustManager,
sslSocketFactory = sslConfiguration
.driverSubmitClientSslContext
.getSocketFactory,
trustContext = sslConfiguration
.driverSubmitClientTrustManager
.orNull,
connectTimeoutMillis = 5000)
}

private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions) = {
driverSubmitSslOptions.trustStore.map(trustStoreFile => {
val trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm)
val trustStore = KeyStore.getInstance(
driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType))
if (!trustStoreFile.isFile) {
throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
s" does not exist or is not a file.")
}
Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
driverSubmitSslOptions.trustStorePassword match {
case Some(password) =>
trustStore.load(trustStoreStream, password.toCharArray)
case None => trustStore.load(trustStoreStream, null)
}
}
trustManagerFactory.init(trustStore)
val trustManagers = trustManagerFactory.getTrustManagers
val sslContext = SSLContext.getInstance("TLSv1.2")
sslContext.init(null, trustManagers, SECURE_RANDOM)
(trustManagers(0).asInstanceOf[X509TrustManager], sslContext)
}).getOrElse((null, SSLContext.getDefault))
}

private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = {
maybeLabels.map(labels => {
labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
Expand Down
Loading