From 07ee27dad780a0f54464d2c1ab9980b5260d7ef7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 25 Jan 2017 11:13:52 -0800 Subject: [PATCH 1/8] Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. --- .../deploy/rest/RestSubmissionServer.scala | 38 ++-- docs/running-on-kubernetes.md | 18 ++ .../spark/deploy/kubernetes/Client.scala | 176 ++++++++++++++++-- .../KubernetesSparkRestServer.scala | 63 ++++++- .../src/main/docker/driver/Dockerfile | 15 +- .../src/main/docker/executor/Dockerfile | 2 +- .../integrationtest/KubernetesSuite.scala | 33 ++++ .../integration-tests/ssl/keyStore.jks | Bin 0 -> 2272 bytes .../integration-tests/ssl/trustStore.jks | Bin 0 -> 982 bytes 9 files changed, 309 insertions(+), 36 deletions(-) create mode 100644 resource-managers/kubernetes/integration-tests/ssl/keyStore.jks create mode 100644 resource-managers/kubernetes/integration-tests/ssl/trustStore.jks diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index b30c980e95a9a..ed24e582a4c06 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -20,15 +20,15 @@ package org.apache.spark.deploy.rest import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import scala.io.Source - import com.fasterxml.jackson.core.JsonProcessingException -import org.eclipse.jetty.server.{HttpConnectionFactory, Server, ServerConnector} +import org.eclipse.jetty.http.HttpVersion +import org.eclipse.jetty.server.{AbstractConnectionFactory, HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory} import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler} import org.json4s._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} +import org.apache.spark.{SSLOptions, SparkConf, SPARK_VERSION => sparkVersion} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -50,7 +50,8 @@ import org.apache.spark.util.Utils private[spark] abstract class RestSubmissionServer( val host: String, val requestedPort: Int, - val masterConf: SparkConf) extends Logging { + val masterConf: SparkConf, + val sslOptions: SSLOptions = SSLOptions()) extends Logging { protected val submitRequestServlet: SubmitRequestServlet protected val killRequestServlet: KillRequestServlet protected val statusRequestServlet: StatusRequestServlet @@ -79,19 +80,32 @@ private[spark] abstract class RestSubmissionServer( * Return a 2-tuple of the started server and the bound port. */ private def doStart(startPort: Int): (Server, Int) = { + // TODO consider using JettyUtils#startServer to do this instead val threadPool = new QueuedThreadPool threadPool.setDaemon(true) val server = new Server(threadPool) + val resolvedConnectionFactories = sslOptions + .createJettySslContextFactory() + .map(sslFactory => { + val sslConnectionFactory = new SslConnectionFactory( + sslFactory, HttpVersion.HTTP_1_1.asString()) + val rawHttpConfiguration = new HttpConfiguration() + rawHttpConfiguration.setSecureScheme("https") + rawHttpConfiguration.setSecurePort(startPort) + val rawHttpConnectionFactory = new HttpConnectionFactory(rawHttpConfiguration) + Array(sslConnectionFactory, rawHttpConnectionFactory) + }).getOrElse(Array(new HttpConnectionFactory())) + val connector = new ServerConnector( - server, - null, - // Call this full constructor to set this, which forces daemon threads: - new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true), - null, - -1, - -1, - new HttpConnectionFactory()) + server, + null, + // Call this full constructor to set this, which forces daemon threads: + new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true), + null, + -1, + -1, + resolvedConnectionFactories: _*) connector.setHost(host) connector.setPort(startPort) server.addConnector(connector) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 5a73b1ad1ea29..e25e189aa6d74 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -132,6 +132,24 @@ To specify a main application resource that is in the Docker image, and if it ha --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ container:///home/applications/examples/example.jar +### Setting Up SSL For Submitting the Driver + +When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server +receives the driver's configuration, including uploaded driver jars, from the client before starting the application. +Spark supports using SSL to encrypt the traffic in this bootstrapping process. It is recommended to configure this +whenever possible. + +See the [security page](security.html) and [configuration](configuration.html) sections for more information on +configuring SSL; use the prefix `spark.ssl.kubernetes.driverlaunch` in configuring the SSL-related fields in the context +of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver +pod in starting the application, set `spark.ssl.kubernetes.driverlaunch.trustStore`. + +One note about the keyStore is that it can be specified as either a file on the client machine or a file in the +container image's disk. Thus `spark.ssl.kubernetes.driverlaunch.keyStore` can be a URI with a scheme of either `file:` +or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto +the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme +`container:`, the file is assumed to already be on the container's disk at the appropriate path. + ### Spark Properties Below are some other common properties that are specific to Kubernetes. Most of the other configurations are the same diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 65851f930377b..fd2940009d32e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -16,11 +16,13 @@ */ package org.apache.spark.deploy.kubernetes -import java.io.File -import java.security.SecureRandom +import java.io.{File, FileInputStream} +import java.security.{KeyStore, SecureRandom} import java.util.concurrent.{Executors, TimeoutException, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} +import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} import io.fabric8.kubernetes.api.model._ @@ -28,10 +30,11 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, Kub import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.DurationInt -import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException} +import org.apache.spark.{SPARK_VERSION, SecurityManager, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging @@ -53,12 +56,17 @@ private[spark] class Client( .getOrElse("spark") private val kubernetesAppId = s"$appName-$launchTime" private val secretName = s"spark-submission-server-secret-$kubernetesAppId" + private val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" + private val sslSecretsDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId-ssl" + private val sslSecretsName = s"spark-submission-server-ssl-$kubernetesAppId" private val driverLauncherSelectorValue = s"driver-launcher-$launchTime" private val driverDockerImage = sparkConf.get( "spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION") private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars") private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) + private val (driverLaunchSslOptions, isKeyStoreLocalFile) = parseDriverLaunchSslOptions() + private val secretBase64String = { val secretBytes = new Array[Byte](128) SECURE_RANDOM.nextBytes(secretBytes) @@ -95,10 +103,10 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig))(kubernetesClient => { - val secret = kubernetesClient.secrets().createNew() + val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() - .withName(secretName) - .endMetadata() + .withName(secretName) + .endMetadata() .withData(Map((SUBMISSION_SERVER_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() @@ -107,10 +115,9 @@ private[spark] class Client( DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava - val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava + val (sslEnvs, sslVolumes, sslVolumeMounts) = configureSsl(kubernetesClient) val containerPorts = configureContainerPorts() val submitCompletedFuture = SettableFuture.create[Boolean] - val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" val submitPending = new AtomicBoolean(false) val podWatcher = new Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { @@ -135,7 +142,7 @@ private[spark] class Client( .endMetadata() .withNewSpec() .withType("NodePort") - .withSelector(selectors) + .withSelector(resolvedSelectors) .withPorts(driverLauncherServicePort) .endSpec() .done() @@ -227,9 +234,10 @@ private[spark] class Client( .addNewVolume() .withName(s"spark-submission-secret-volume") .withNewSecret() - .withSecretName(secret.getMetadata.getName) + .withSecretName(submitServerSecret.getMetadata.getName) .endSecret() .endVolume + .addToVolumes(sslVolumes: _*) .withServiceAccount(serviceAccount) .addNewContainer() .withName(DRIVER_LAUNCHER_CONTAINER_NAME) @@ -240,6 +248,7 @@ private[spark] class Client( .withMountPath(secretDirectory) .withReadOnly(true) .endVolumeMount() + .addToVolumeMounts(sslVolumeMounts: _*) .addNewEnv() .withName("SPARK_SUBMISSION_SECRET_LOCATION") .withValue(s"$secretDirectory/$SUBMISSION_SERVER_SECRET_NAME") @@ -248,6 +257,7 @@ private[spark] class Client( .withName("SPARK_DRIVER_LAUNCHER_SERVER_PORT") .withValue(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT.toString) .endEnv() + .addToEnv(sslEnvs: _*) .withPorts(containerPorts.asJava) .endContainer() .endSpec() @@ -278,11 +288,115 @@ private[spark] class Client( .withLabels(resolvedSelectors) .watch(podWatcher)) { createDriverPod } } finally { - kubernetesClient.secrets().delete(secret) + kubernetesClient.secrets().delete(submitServerSecret) } }) } + private def parseDriverLaunchSslOptions(): (SSLOptions, Boolean) = { + val maybeKeyStore = sparkConf.getOption("spark.ssl.kubernetes.driverlaunch.keyStore") + val resolvedSparkConf = sparkConf.clone() + val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => { + val keyStoreURI = Utils.resolveURI(keyStore) + val isProvidedKeyStoreLocal = keyStoreURI.getScheme match { + case "file" | null => true + case "container" => false + case _ => throw new SparkException(s"Invalid KeyStore URI $keyStore; keyStore URI" + + " for submit server must have scheme file:// or container:// (no scheme defaults" + + " to file://)") + } + (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath)) + }).getOrElse((true, Option.empty[String])) + resolvedKeyStore.foreach { + resolvedSparkConf.set("spark.ssl.kubernetes.driverlaunch.keyStore", _) + } + sparkConf.getOption("spark.ssl.kubernetes.driverlaunch.trustStore").foreach { trustStore => + val trustStoreURI = Utils.resolveURI(trustStore) + trustStoreURI.getScheme match { + case "file" | null => + resolvedSparkConf.set("spark.ssl.kubernetes.driverlaunch.trustStore", + trustStoreURI.getPath) + case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" + + " for submit server must have no scheme, or scheme file://") + } + } + val securityManager = new SecurityManager(resolvedSparkConf) + (securityManager.getSSLOptions("kubernetes.driverlaunch"), isLocalKeyStore) + } + + private def configureSsl(kubernetesClient: KubernetesClient) + : (Array[EnvVar], Array[Volume], Array[VolumeMount]) = { + if (driverLaunchSslOptions.enabled) { + val sslSecretsMap = mutable.HashMap[String, String]() + val sslEnvs = mutable.Buffer[EnvVar]() + driverLaunchSslOptions.keyStore.foreach(store => { + val resolvedKeyStoreFile = if (isKeyStoreLocalFile) { + if (!store.isFile) { + throw new SparkException(s"KeyStore specified at $store is not a file or" + + s" does not exist.") + } + val keyStoreBytes = Files.toByteArray(store) + val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes) + sslSecretsMap += (SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64) + s"$sslSecretsDirectory/$SSL_KEYSTORE_SECRET_NAME" + } else { + store.getAbsolutePath + } + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_KEYSTORE_FILE") + .withValue(resolvedKeyStoreFile) + .build() + }) + driverLaunchSslOptions.keyStorePassword.foreach(password => { + val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) + sslSecretsMap += (SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64) + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE") + .withValue(s"$sslSecretsDirectory/$SSL_KEYSTORE_PASSWORD_SECRET_NAME") + .build() + }) + driverLaunchSslOptions.keyPassword.foreach(password => { + val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) + sslSecretsMap += (SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64) + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE") + .withValue(s"$sslSecretsDirectory/$SSL_KEY_PASSWORD_SECRET_NAME") + .build() + }) + driverLaunchSslOptions.keyStoreType.foreach(storeType => { + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_KEYSTORE_TYPE") + .withValue(storeType) + .build() + }) + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_USE_SSL") + .withValue("true") + .build() + val sslSecrets = kubernetesClient.secrets().createNew() + .withNewMetadata() + .withName(sslSecretsName) + .endMetadata() + .withData(sslSecretsMap.asJava) + .withType("Opaque") + .done() + val sslVolume = new VolumeBuilder() + .withName("spark-submission-server-ssl-secrets") + .withNewSecret() + .withSecretName(sslSecrets.getMetadata.getName) + .endSecret() + .build() + val sslVolumeMount = new VolumeMountBuilder() + .withName("spark-submission-server-ssl-secrets") + .withReadOnly(true) + .withMountPath(sslSecretsDirectory) + .build() + (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount)) + } else { + (Array[EnvVar](), Array[Volume](), Array[VolumeMount]()) + } + } + private def getSubmitErrorMessage( kubernetesClient: DefaultKubernetesClient, e: TimeoutException): String = { @@ -403,8 +517,41 @@ private[spark] class Client( // TODO be resilient to node failures and try all of them val node = kubernetesClient.nodes.list.getItems.asScala.head val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress - val url = s"http://$nodeAddress:$servicePort" - HttpClientUtil.createClient[KubernetesSparkRestApi](uri = url) + val urlScheme = if (driverLaunchSslOptions.enabled) "https" else "http" + val (trustManager: X509TrustManager, sslContext: SSLContext) = + if (driverLaunchSslOptions.enabled) { + driverLaunchSslOptions.trustStore match { + case Some(trustStoreFile) => + val trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm) + val trustStore = KeyStore.getInstance( + driverLaunchSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) + if (!trustStoreFile.isFile) { + throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + + s" does not exist or is not a file.") + } + Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => + driverLaunchSslOptions.trustStorePassword match { + case Some(password) => + trustStore.load(trustStoreStream, password.toCharArray) + case None => trustStore.load(trustStoreStream, null) + } + } + trustManagerFactory.init(trustStore) + val trustManagers = trustManagerFactory.getTrustManagers + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(null, trustManagers, SECURE_RANDOM) + (trustManagers(0).asInstanceOf[X509TrustManager], sslContext) + case None => (null, SSLContext.getDefault) + } + } else { + (null, SSLContext.getDefault) + } + val url = s"$urlScheme://$nodeAddress:$servicePort" + HttpClientUtil.createClient[KubernetesSparkRestApi]( + url, + sslSocketFactory = sslContext.getSocketFactory, + trustContext = trustManager) } private def parseCustomLabels(labels: String): Map[String, String] = { @@ -428,6 +575,9 @@ private[spark] class Client( private[spark] object Client extends Logging { private val SUBMISSION_SERVER_SECRET_NAME = "spark-submission-server-secret" + private val SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore" + private val SSL_KEYSTORE_PASSWORD_SECRET_NAME = "spark-submission-server-keystore-password" + private val SSL_KEY_PASSWORD_SECRET_NAME = "spark-submission-server-key-password" private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector" private val DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT = 7077 private val DEFAULT_DRIVER_PORT = 7078 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 08ddbaf5e50dc..77b249c38ff2f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -21,20 +21,26 @@ import java.net.URI import java.util.concurrent.CountDownLatch import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.commons.codec.binary.Base64 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf} +import org.apache.spark.{SPARK_VERSION, SSLOptions, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} private case class KubernetesSparkRestServerArguments( - val host: Option[String] = None, - val port: Option[Int] = None, - val secretFile: Option[String] = None) { + host: Option[String] = None, + port: Option[Int] = None, + useSsl: Boolean = false, + secretFile: Option[String] = None, + keyStoreFile: Option[String] = None, + keyStorePasswordFile: Option[String] = None, + keyStoreType: Option[String] = None, + keyPasswordFile: Option[String] = None) { def validate(): KubernetesSparkRestServerArguments = { require(host.isDefined, "Hostname not set via --hostname.") require(port.isDefined, "Port not set via --port") @@ -55,9 +61,24 @@ private object KubernetesSparkRestServerArguments { case "--port" :: value :: tail => args = tail resolvedArguments.copy(port = Some(value.toInt)) + case "--use-ssl" :: value :: tail => + args = tail + resolvedArguments.copy(useSsl = value.toBoolean) case "--secret-file" :: value :: tail => args = tail resolvedArguments.copy(secretFile = Some(value)) + case "--keystore-file" :: value :: tail => + args = tail + resolvedArguments.copy(keyStoreFile = Some(value)) + case "--keystore-password-file" :: value :: tail => + args = tail + resolvedArguments.copy(keyStorePasswordFile = Some(value)) + case "--keystore-type" :: value :: tail => + args = tail + resolvedArguments.copy(keyStoreType = Some(value)) + case "--keystore-key-password-file" :: value :: tail => + args = tail + resolvedArguments.copy(keyPasswordFile = Some(value)) // TODO polish usage message case Nil => resolvedArguments case unknown => throw new IllegalStateException(s"Unknown argument(s) found: $unknown") @@ -78,8 +99,9 @@ private[spark] class KubernetesSparkRestServer( port: Int, conf: SparkConf, expectedApplicationSecret: Array[Byte], - shutdownLock: CountDownLatch) - extends RestSubmissionServer(host, port, conf) { + shutdownLock: CountDownLatch, + sslOptions: SSLOptions = new SSLOptions) + extends RestSubmissionServer(host, port, conf, sslOptions) { private val SERVLET_LOCK = new Object private val javaExecutable = s"${System.getenv("JAVA_HOME")}/bin/java" @@ -257,6 +279,22 @@ private[spark] object KubernetesSparkRestServer { throw new IllegalArgumentException(s"Secret file specified by --secret-file" + " is not a file, or does not exist.") } + val sslOptions = if (parsedArguments.useSsl) { + val keyStorePassword = parsedArguments + .keyStorePasswordFile + .map(fileToUtf8String) + val keyPassword = parsedArguments + .keyPasswordFile + .map(fileToUtf8String) + new SSLOptions( + enabled = true, + keyStore = parsedArguments.keyStoreFile.map(new File(_)), + keyStoreType = parsedArguments.keyStoreType, + keyStorePassword = keyStorePassword, + keyPassword = keyPassword) + } else { + new SSLOptions + } val secretBytes = Files.toByteArray(secretFile) val sparkConf = new SparkConf(true) val server = new KubernetesSparkRestServer( @@ -264,7 +302,8 @@ private[spark] object KubernetesSparkRestServer { parsedArguments.port.get, sparkConf, secretBytes, - barrier) + barrier, + sslOptions) server.start() ShutdownHookManager.addShutdownHook(() => { try { @@ -275,5 +314,15 @@ private[spark] object KubernetesSparkRestServer { }) barrier.await() } + + private def fileToUtf8String(filePath: String) = { + val passwordFile = new File(filePath) + if (!passwordFile.isFile) { + throw new IllegalArgumentException("KeyStore password file does not exist or " + + "is a directory.") + } + val passwordBytes = Files.toByteArray(passwordFile) + new String(passwordBytes, Charsets.UTF_8) + } } diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index c2b562a39b572..922ae8ba6a4f1 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -1,4 +1,4 @@ -FROM anapsix/alpine-java:8 +FROM anapsix/alpine-java:8_jdk_unlimited # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: @@ -17,5 +17,14 @@ ENV SPARK_HOME /opt/spark WORKDIR /opt/spark -# This class will also require setting a secret via the SPARK_APP_SECRET environment variable -CMD exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer --hostname $HOSTNAME --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT --secret-file $SPARK_SUBMISSION_SECRET_LOCATION +CMD SSL_ARGS="" && \ + if ! [ -z ${SPARK_SUBMISSION_USE_SSL+x} ]; then SSL_ARGS="$SSL_ARGS --use-ssl $SPARK_SUBMISSION_USE_SSL"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-file $SPARK_SUBMISSION_KEYSTORE_FILE"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_TYPE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-type $SPARK_SUBMISSION_KEYSTORE_TYPE"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-password-file $SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \ + exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \ + --hostname $HOSTNAME \ + --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT \ + --secret-file $SPARK_SUBMISSION_SECRET_LOCATION \ + ${SSL_ARGS} \ No newline at end of file diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index 35f0ca3f645b9..4bc6ae7a659c8 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -1,4 +1,4 @@ -FROM anapsix/alpine-java:8 +FROM anapsix/alpine-java:8_jdk_unlimited # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 6a92ae1cba49f..16776c2b70ae4 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -60,6 +60,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .getOrElse(throw new IllegalStateException("Expected to find spark-examples jar; was the" + " pre-integration-test phase run?")) + private val KEYSTORE_FILE = Paths.get("ssl", "keyStore.jks") + .toFile + .getAbsolutePath + private val TRUSTSTORE_FILE = Paths.get("ssl", "trustStore.jks") + .toFile + .getAbsolutePath + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) private val MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + @@ -281,4 +288,30 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1") assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2") } + + test("Enable SSL on the driver submit server") { + val args = Array( + "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", + "--deploy-mode", "cluster", + "--kubernetes-namespace", NAMESPACE, + "--name", "spark-pi", + "--executor-memory", "512m", + "--executor-cores", "1", + "--num-executors", "1", + "--upload-jars", HELPER_JAR, + "--class", MAIN_CLASS, + "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", + "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", + "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", + "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", + "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", + "--conf", "spark.ssl.kubernetes.driverlaunch.enabled=true", + "--conf", s"spark.ssl.kubernetes.driverlaunch.keyStore=file://$KEYSTORE_FILE", + "--conf", "spark.ssl.kubernetes.driverlaunch.keyStorePassword=changeit", + "--conf", "spark.ssl.kubernetes.driverlaunch.keyPassword=changeit", + "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStore=file://$TRUSTSTORE_FILE", + "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", + EXAMPLES_JAR) + SparkSubmit.main(args) + } } diff --git a/resource-managers/kubernetes/integration-tests/ssl/keyStore.jks b/resource-managers/kubernetes/integration-tests/ssl/keyStore.jks new file mode 100644 index 0000000000000000000000000000000000000000..7e90367a94f979286e6fa2350628d32f60dde13b GIT binary patch literal 2272 zcmc(g`8N~{7sqEqk->~D8D-1Pj3wC~OAR8%Hg>Z1RLU|_G?JJa$)06o7b6DQk|ix- zJdZsyge=jBm<(l)=uzjq@B5t7AMl=Y?>YB=ez@nJ@4e^VJ@y_O004mY9r(+^0se&j z*kv$Q_C5f>0YVZXdq^lJw-P510#pPE0D&9;2ob_6wYEHQSv^L-5LV`%ju?`5fAHO^ zhOOOw!^|@o-9*_513v&{xU@vsW_TTib(1;J^-wq33co{u&@D-uBtwh|)x)fhQli?e*nV3VUs1;-= zUwY+i&FE|`2~b$8NF9~4U}JY}j$DtUp6R3NhCb8BAH!Gq^x$WUe|%aImG8T zPj~7`bNL35oHUF5Ac`td62@ZYn01`|hAUU3S?3c{1^g@Kj`sJfL%zOnUMYt!HwZT&3*1zZXY@&s&#zzYjp*C-Xd6-f6T?^i-rlW# zM8Q7Dd)qXOK9nIy_qHsY9KIm2BlPjZB<%-!5%t{v0;1D2B$z4O+KyotHLKY}3n|&M zO-)I(hPc_H+K$+^J~V-NQ-bnNCR(7%qQVMxn8@tX%f!3*Q^)a9*7;|k({>ut@H6V8 zJ`JM*=6V8N{qf$_Pb9abAhp3Vf!ojJ`W=g%Lal?tRv|GASV;zg4e#gIs6eH)Uj~Te z-9L0-fL?}Pb6Bn8lF@Z^Dp2|=dzMG{eJ|L!1@L~H%dV8$PM!jjunAKz3 zLCv$Z%v$XBsOGBeTw;q*VA%#Q9`lZgS01p5u&ZtFK0bSo$edN@8JWPc5Rp_pdBzglUq38> zB`wVIDx&PR%sPe_!nf-P>W8-^;Of-E4?yZ1@r`-5ct4Qy{dU7Ir)yp+${M^GPvUw#b=&u&~7Oh$LDtA zWBz1Sv%@WH=&quf>bh4mra<&)c5&X#q;=BYo;x&mB1VO^wh|GAQ0uB4(8czBE}5z0 zD=OL6L0;gPW_bLDPEYqVl({GxXnFmk(~Hff^F#PM*(K zYC8ItutCtufm7U}Cy$=rx&N$WP20tK4fkBLHeCx|*h)U0my!L+Bb{nKpYy}nvr*3^ zKFcy(hch6sOxar5XS-%u*JZD(Y&^wuT$^=~Nw;BHXfmJukx|4UdN~E*;HJ4g?lBz} z=^j9}9^T9BwP?7~cBn`6^!WWow0+x@I)?8pJ5gjAF8h(Ta_`q!%vE#yzrbp4fT{x|*q zG(`kT{ac*<-vK59`2a{FkQ+n<0s$#+vh&yQIELif%4tTh!;wa#@0_Rny6vgzmsx-> z@ho_~o#l?!lotnLKqg+`yM>pY-}~S>3VF<@R_gg+ysyFB+8H>W+4w9=2KwuyMSggK3KEC~_A_O1@keqiRt!2J`NW@P7g$P3g19RRB z^-V10p3VtJ9};8~rA2}%|5kN><@Qwt0=$b_O~?P1JsKpfU~3WoZ6WWYO?XwQYErpb z3q$-ZFzAW@#p-jLYB6v^g!`r5h!AtFK5nAo?l3FrG4K%WoT3IgLiX*0&a>Xa2NU=F ziE^v_WKF#ZB|p}~!}8^F49>)@`91c%`IEy4|(s-NUj79I#qLp2e=z= zw5()$O$W$Jd4!q9Ir`)-C7Sj`*~FFRgPpo~I?6`o?G?go6{*#NkFa+(o<6M1!R$^o z@}wFq5e;O_e&B;D!e`fs&LjgYtMIDYQ-J?1GuP0+R-;cHEfsq8$hjg^!%1g(t|U6j vBsfcNQC|MA&oXL~a|+K)Su>T}s6^+#(MpccWyJDIR-YYolMr#|c0TqGgg)u| literal 0 HcmV?d00001 diff --git a/resource-managers/kubernetes/integration-tests/ssl/trustStore.jks b/resource-managers/kubernetes/integration-tests/ssl/trustStore.jks new file mode 100644 index 0000000000000000000000000000000000000000..02e69dd7cf25de0fb68a46bbb39258ed2aa2732e GIT binary patch literal 982 zcmezO_TO6u1_mY|W(3pB*{PL4cI4$wCItr82t88+O9lq!Sq4qa(+rxJ>J~6FF)}f+ zD8DFbGT>$7)N1o+`_9YA$j!=NP;4k*z{kcM%EBhh9-5b(mtUR-6XC!RVaE{R!Vuv` zh=?1Af=p%><}R1jAqNg4 zD+6;ABR_*d6C)Q>6C)$Tg!8lJeJL)wr1<5N%cX)4>Fo}G*ql>N1+O&9dByOd`<3vP zAn)I%_Y+uUFKBnK>->A`G{fYi-AC4h6-l+<+IU|5S9nT@KHt1IvsayKz4vX^bh~01 z(^ktLuA6>-570j3kjc$_PJM^k8NVnt^G@L^_gkL}c2sn1xaRrsX0)JAQR2lN(R;kBchn^gg&Y-Ja>Ya(!rJkA36h(3$&MFC+!=O?t*G zx6S6gXrb{bgXeQsCR*Ll(07y&x_Wuq`uR-Ej0}v6jeya|4~$h=K^6`Jwsw{SE0R+T zWI+OaEMhDod9&|{p1t#Kj?3&SS00f|ize4GFF_7fV3GrdDkDSUq62QXkE_b(D{BUN zb^m-h_ent2y5)L(tK4^AlDW#5zd1E?ef)R*HsQ*en5;83g&wA7xtg^8UVGKBmXYU3 zyq2+Rjq24ICwvmcID7x4HmQFWm}X*qM?2-!LP?EP>X+Dhf4unjw)4e?XYL2Dr7Za% zp!s7VZ~J4#PkEQa*Rxk;$2fl5{b_o_gKQ0@*mAe7u%tQfdfiSp1avK*#~gl=`{;`6 zFM>781GQGJ7g-kC@@?zFHM9Ny-rvqW$>Cj-t&02q;{4TBPro)rwAzNgT2P|Dl_9m> zJ*w>3=SFAw<)ZtSMXW9}4!3;&Y-wYIQ^8E@HyRp}N$(BsvfVF!G~tVz`nNT%?B{L& V?Rs6^S+>UeMSXbq$wI{!1^`k5Y%u@; literal 0 HcmV?d00001 From 21d5bc40d14930834f0842cc36f46b756a64e506 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 25 Jan 2017 16:00:50 -0800 Subject: [PATCH 2/8] Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. --- .../org/apache/spark/deploy/kubernetes/Client.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 2dbf8aa3dca9c..0e20d541ad2f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -112,12 +112,12 @@ private[spark] class Client( .withData(Map((SUBMISSION_SERVER_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() + val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient) try { val resolvedSelectors = (Map( DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava - val (sslEnvs, sslVolumes, sslVolumeMounts) = configureSsl(kubernetesClient) val containerPorts = configureContainerPorts() val submitCompletedFuture = SettableFuture.create[Boolean] val submitPending = new AtomicBoolean(false) @@ -292,6 +292,7 @@ private[spark] class Client( .watch(podWatcher)) { createDriverPod } } finally { kubernetesClient.secrets().delete(submitServerSecret) + kubernetesClient.secrets().delete(sslSecrets: _*) } }) } @@ -328,10 +329,11 @@ private[spark] class Client( } private def configureSsl(kubernetesClient: KubernetesClient) - : (Array[EnvVar], Array[Volume], Array[VolumeMount]) = { + : (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { if (driverLaunchSslOptions.enabled) { val sslSecretsMap = mutable.HashMap[String, String]() val sslEnvs = mutable.Buffer[EnvVar]() + val secrets = mutable.Buffer[Secret]() driverLaunchSslOptions.keyStore.foreach(store => { val resolvedKeyStoreFile = if (isKeyStoreLocalFile) { if (!store.isFile) { @@ -383,6 +385,7 @@ private[spark] class Client( .withData(sslSecretsMap.asJava) .withType("Opaque") .done() + secrets += sslSecrets val sslVolume = new VolumeBuilder() .withName("spark-submission-server-ssl-secrets") .withNewSecret() @@ -394,9 +397,9 @@ private[spark] class Client( .withReadOnly(true) .withMountPath(sslSecretsDirectory) .build() - (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount)) + (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) } else { - (Array[EnvVar](), Array[Volume](), Array[VolumeMount]()) + (Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]()) } } From c91b308b078a29e61090aedfeac25ed2d37399cc Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 25 Jan 2017 17:26:39 -0800 Subject: [PATCH 3/8] Fix compilation error --- .../spark/deploy/kubernetes/Client.scala | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 0e20d541ad2f2..dd4ad9304e86f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -524,32 +524,30 @@ private[spark] class Client( val node = kubernetesClient.nodes.list.getItems.asScala.head val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress val urlScheme = if (driverLaunchSslOptions.enabled) "https" else "http" - val (trustManager: X509TrustManager, sslContext: SSLContext) = + val (trustManager, sslContext): (X509TrustManager, SSLContext) = if (driverLaunchSslOptions.enabled) { - driverLaunchSslOptions.trustStore match { - case Some(trustStoreFile) => - val trustManagerFactory = TrustManagerFactory.getInstance( - TrustManagerFactory.getDefaultAlgorithm) - val trustStore = KeyStore.getInstance( - driverLaunchSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) - if (!trustStoreFile.isFile) { - throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + - s" does not exist or is not a file.") - } - Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => - driverLaunchSslOptions.trustStorePassword match { - case Some(password) => - trustStore.load(trustStoreStream, password.toCharArray) - case None => trustStore.load(trustStoreStream, null) - } + driverLaunchSslOptions.trustStore.map(trustStoreFile => { + val trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm) + val trustStore = KeyStore.getInstance( + driverLaunchSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) + if (!trustStoreFile.isFile) { + throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + + s" does not exist or is not a file.") + } + Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => + driverLaunchSslOptions.trustStorePassword match { + case Some(password) => + trustStore.load(trustStoreStream, password.toCharArray) + case None => trustStore.load(trustStoreStream, null) } - trustManagerFactory.init(trustStore) - val trustManagers = trustManagerFactory.getTrustManagers - val sslContext = SSLContext.getInstance("TLSv1.2") - sslContext.init(null, trustManagers, SECURE_RANDOM) - (trustManagers(0).asInstanceOf[X509TrustManager], sslContext) - case None => (null, SSLContext.getDefault) - } + } + trustManagerFactory.init(trustStore) + val trustManagers = trustManagerFactory.getTrustManagers + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(null, trustManagers, SECURE_RANDOM) + (trustManagers(0).asInstanceOf[X509TrustManager], sslContext) + }).getOrElse((null, SSLContext.getDefault)) } else { (null, SSLContext.getDefault) } From a43367c5b774778b67480d1c8ccbeda42bee1088 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 25 Jan 2017 17:40:15 -0800 Subject: [PATCH 4/8] Revert image change --- .../docker-minimal-bundle/src/main/docker/driver/Dockerfile | 4 ++-- .../docker-minimal-bundle/src/main/docker/executor/Dockerfile | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index 922ae8ba6a4f1..420a4cdc33991 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -1,4 +1,4 @@ -FROM anapsix/alpine-java:8_jdk_unlimited +FROM anapsix/alpine-java:8 # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: @@ -27,4 +27,4 @@ CMD SSL_ARGS="" && \ --hostname $HOSTNAME \ --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT \ --secret-file $SPARK_SUBMISSION_SECRET_LOCATION \ - ${SSL_ARGS} \ No newline at end of file + ${SSL_ARGS} diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index 4bc6ae7a659c8..35f0ca3f645b9 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -1,4 +1,4 @@ -FROM anapsix/alpine-java:8_jdk_unlimited +FROM anapsix/alpine-java:8 # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: From 866d90b9ef052d4aea151fd7788429a414ed9dc5 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 26 Jan 2017 12:56:55 -0800 Subject: [PATCH 5/8] Address comments --- .../deploy/rest/RestSubmissionServer.scala | 6 +++--- .../spark/deploy/kubernetes/Client.scala | 15 +++++++++++---- .../kubernetes/KubernetesSparkRestServer.scala | 18 +++++++++--------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index ed24e582a4c06..524726c2ccf92 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -19,16 +19,16 @@ package org.apache.spark.deploy.rest import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import scala.io.Source import com.fasterxml.jackson.core.JsonProcessingException import org.eclipse.jetty.http.HttpVersion -import org.eclipse.jetty.server.{AbstractConnectionFactory, HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory} +import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory} import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler} import org.json4s._ import org.json4s.jackson.JsonMethods._ +import scala.io.Source -import org.apache.spark.{SSLOptions, SparkConf, SPARK_VERSION => sparkVersion} +import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SSLOptions} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 8591e46649209..fe1a817a6a792 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -34,7 +34,7 @@ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.DurationInt -import org.apache.spark.{SPARK_VERSION, SecurityManager, SparkConf, SparkException, SSLOptions} +import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging @@ -61,7 +61,7 @@ private[spark] class Client( private val sslSecretsName = s"spark-submission-server-ssl-$kubernetesAppId" private val driverLauncherSelectorValue = s"driver-launcher-$launchTime" private val driverDockerImage = sparkConf.get( - "spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION") + "spark.kubernetes.driver.docker.image", s"spark-driver:$sparkVersion") private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars") private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverLaunchTimeoutSecs = sparkConf.getTimeAsSeconds( @@ -223,7 +223,7 @@ private[spark] class Client( private def configureSsl(kubernetesClient: KubernetesClient) : (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { - if (driverLaunchSslOptions.enabled) { + if (!driverLaunchSslOptions.enabled) { val sslSecretsMap = mutable.HashMap[String, String]() val sslEnvs = mutable.Buffer[EnvVar]() val secrets = mutable.Buffer[Secret]() @@ -525,7 +525,14 @@ private[spark] class Client( // TODO be resilient to node failures and try all of them val node = kubernetesClient.nodes.list.getItems.asScala.head val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress - val urlScheme = if (driverLaunchSslOptions.enabled) "https" else "http" + val urlScheme = if (driverLaunchSslOptions.enabled) { + "https" + } else { + logWarning("Submitting application details and local jars to the cluster" + + " over an insecure connection. Consider configuring SSL to secure" + + " this step.") + "http" + } val (trustManager, sslContext): (X509TrustManager, SSLContext) = if (driverLaunchSslOptions.enabled) { driverLaunchSslOptions.trustStore.map(trustStoreFile => { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 77b249c38ff2f..be028e84a7d40 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -27,7 +27,7 @@ import org.apache.commons.codec.binary.Base64 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SPARK_VERSION, SSLOptions, SecurityManager, SparkConf} +import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} @@ -61,12 +61,12 @@ private object KubernetesSparkRestServerArguments { case "--port" :: value :: tail => args = tail resolvedArguments.copy(port = Some(value.toInt)) - case "--use-ssl" :: value :: tail => - args = tail - resolvedArguments.copy(useSsl = value.toBoolean) case "--secret-file" :: value :: tail => args = tail resolvedArguments.copy(secretFile = Some(value)) + case "--use-ssl" :: value :: tail => + args = tail + resolvedArguments.copy(useSsl = value.toBoolean) case "--keystore-file" :: value :: tail => args = tail resolvedArguments.copy(keyStoreFile = Some(value)) @@ -219,7 +219,7 @@ private[spark] class KubernetesSparkRestServer( response.success = true response.submissionId = null response.message = "success" - response.serverSparkVersion = SPARK_VERSION + response.serverSparkVersion = sparkVersion response } case unexpected => @@ -282,10 +282,10 @@ private[spark] object KubernetesSparkRestServer { val sslOptions = if (parsedArguments.useSsl) { val keyStorePassword = parsedArguments .keyStorePasswordFile - .map(fileToUtf8String) + .map(fileToUtf8String(_, "KeyStore Password file")) val keyPassword = parsedArguments .keyPasswordFile - .map(fileToUtf8String) + .map(fileToUtf8String(_, "Key Password file")) new SSLOptions( enabled = true, keyStore = parsedArguments.keyStoreFile.map(new File(_)), @@ -315,10 +315,10 @@ private[spark] object KubernetesSparkRestServer { barrier.await() } - private def fileToUtf8String(filePath: String) = { + private def fileToUtf8String(filePath: String, fileType: String) = { val passwordFile = new File(filePath) if (!passwordFile.isFile) { - throw new IllegalArgumentException("KeyStore password file does not exist or " + + throw new SparkException(s"$fileType at $filePath does not exist or " + "is a directory.") } val passwordBytes = Files.toByteArray(passwordFile) From c03fbbbbc3d820c12c8ebf78fe0e48fec202ec21 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 26 Jan 2017 17:10:28 -0800 Subject: [PATCH 6/8] Programmatically generate certificates for integration tests. --- pom.xml | 7 +- .../spark/deploy/kubernetes/Client.scala | 6 +- .../kubernetes/integration-tests/pom.xml | 4 + .../integrationtest/KubernetesSuite.scala | 24 ++++-- .../integrationtest/sslutil/SSLUtils.scala | 80 ++++++++++++++++++ .../integration-tests/ssl/keyStore.jks | Bin 2272 -> 0 bytes .../integration-tests/ssl/trustStore.jks | Bin 982 -> 0 bytes 7 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala delete mode 100644 resource-managers/kubernetes/integration-tests/ssl/keyStore.jks delete mode 100644 resource-managers/kubernetes/integration-tests/ssl/trustStore.jks diff --git a/pom.xml b/pom.xml index 74fb7ce064457..0231dac081cda 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,7 @@ 1.8.1 1.6.0 8.18.0 + 1.52 9.2.16.v20160414 3.1.0 0.8.0 @@ -336,7 +337,11 @@ okhttp 3.4.1 - + + org.bouncycastle + bcpkix-jdk15on + ${bouncycastle.version} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index fe1a817a6a792..87cf374b9920d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -221,9 +221,9 @@ private[spark] class Client( (securityManager.getSSLOptions("kubernetes.driverlaunch"), isLocalKeyStore) } - private def configureSsl(kubernetesClient: KubernetesClient) - : (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { - if (!driverLaunchSslOptions.enabled) { + private def configureSsl(kubernetesClient: KubernetesClient): + (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { + if (driverLaunchSslOptions.enabled) { val sslSecretsMap = mutable.HashMap[String, String]() val sslEnvs = mutable.Buffer[EnvVar]() val secrets = mutable.Buffer[Secret]() diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 569527de8e300..f6a322f18cd75 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -106,6 +106,10 @@ + + org.bouncycastle + bcpkix-jdk15on + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 16776c2b70ae4..08377db742237 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest +import java.io.File import java.nio.file.Paths import java.util.UUID import java.util.concurrent.TimeUnit @@ -36,6 +37,7 @@ import org.apache.spark.deploy.kubernetes.Client import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 +import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @@ -60,13 +62,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .getOrElse(throw new IllegalStateException("Expected to find spark-examples jar; was the" + " pre-integration-test phase run?")) - private val KEYSTORE_FILE = Paths.get("ssl", "keyStore.jks") - .toFile - .getAbsolutePath - private val TRUSTSTORE_FILE = Paths.get("ssl", "trustStore.jks") - .toFile - .getAbsolutePath - private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) private val MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + @@ -74,6 +69,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private val NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "") private var minikubeKubernetesClient: KubernetesClient = _ private var clientConfig: Config = _ + private var keyStoreFile: File = _ + private var trustStoreFile: File = _ override def beforeAll(): Unit = { Minikube.startMinikube() @@ -85,6 +82,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .done() minikubeKubernetesClient = Minikube.getKubernetesClient.inNamespace(NAMESPACE) clientConfig = minikubeKubernetesClient.getConfiguration + val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( + Minikube.getMinikubeIp, + "changeit", + "changeit", + "changeit") + keyStoreFile = keyStore + trustStoreFile = trustStore } before { @@ -306,10 +310,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", "--conf", "spark.ssl.kubernetes.driverlaunch.enabled=true", - "--conf", s"spark.ssl.kubernetes.driverlaunch.keyStore=file://$KEYSTORE_FILE", + "--conf", "spark.ssl.kubernetes.driverlaunch.keyStore=" + + s"file://${keyStoreFile.getAbsolutePath}", "--conf", "spark.ssl.kubernetes.driverlaunch.keyStorePassword=changeit", "--conf", "spark.ssl.kubernetes.driverlaunch.keyPassword=changeit", - "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStore=file://$TRUSTSTORE_FILE", + "--conf", "spark.ssl.kubernetes.driverlaunch.trustStore=" + + s"file://${trustStoreFile.getAbsolutePath}", "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", EXAMPLES_JAR) SparkSubmit.main(args) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala new file mode 100644 index 0000000000000..bde7b43226660 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.sslutil + +import java.io.{File, FileOutputStream} +import java.math.BigInteger +import java.nio.file.Files +import java.security.{KeyPairGenerator, KeyStore, SecureRandom} +import java.util.{Calendar, Random} +import javax.security.auth.x500.X500Principal + +import org.bouncycastle.asn1.x509.{Extension, GeneralName, GeneralNames} +import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3CertificateBuilder} +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder + +import org.apache.spark.util.Utils + +private[spark] object SSLUtils { + + def generateKeyStoreTrustStorePair( + ipAddress: String, + keyStorePassword: String, + keyPassword: String, + trustStorePassword: String): (File, File) = { + val keyPairGenerator = KeyPairGenerator.getInstance("RSA") + keyPairGenerator.initialize(512) + val keyPair = keyPairGenerator.generateKeyPair() + val selfPrincipal = new X500Principal(s"cn=$ipAddress") + val currentDate = Calendar.getInstance + val validForOneHundredYears = Calendar.getInstance + validForOneHundredYears.add(Calendar.YEAR, 100) + val certificateBuilder = new JcaX509v3CertificateBuilder( + selfPrincipal, + new BigInteger(4096, new Random()), + currentDate.getTime, + validForOneHundredYears.getTime, + selfPrincipal, + keyPair.getPublic) + certificateBuilder.addExtension(Extension.subjectAlternativeName, false, + new GeneralNames(new GeneralName(GeneralName.iPAddress, ipAddress))) + val signer = new JcaContentSignerBuilder("SHA1WithRSA") + .setSecureRandom(new SecureRandom()) + .build(keyPair.getPrivate) + val bcCertificate = certificateBuilder.build(signer) + val jcaCertificate = new JcaX509CertificateConverter().getCertificate(bcCertificate) + val keyStore = KeyStore.getInstance("JKS") + keyStore.load(null, null) + keyStore.setKeyEntry("key", keyPair.getPrivate, + keyPassword.toCharArray, Array(jcaCertificate)) + val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile() + tempDir.deleteOnExit() + val keyStoreFile = new File(tempDir, "keyStore.jks") + Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { + keyStore.store(_, keyStorePassword.toCharArray) + } + val trustStore = KeyStore.getInstance("JKS") + trustStore.load(null, null) + trustStore.setCertificateEntry("key", jcaCertificate) + val trustStoreFile = new File(tempDir, "trustStore.jks") + Utils.tryWithResource(new FileOutputStream(trustStoreFile)) { + trustStore.store(_, trustStorePassword.toCharArray) + } + (keyStoreFile, trustStoreFile) + } + +} diff --git a/resource-managers/kubernetes/integration-tests/ssl/keyStore.jks b/resource-managers/kubernetes/integration-tests/ssl/keyStore.jks deleted file mode 100644 index 7e90367a94f979286e6fa2350628d32f60dde13b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2272 zcmc(g`8N~{7sqEqk->~D8D-1Pj3wC~OAR8%Hg>Z1RLU|_G?JJa$)06o7b6DQk|ix- zJdZsyge=jBm<(l)=uzjq@B5t7AMl=Y?>YB=ez@nJ@4e^VJ@y_O004mY9r(+^0se&j z*kv$Q_C5f>0YVZXdq^lJw-P510#pPE0D&9;2ob_6wYEHQSv^L-5LV`%ju?`5fAHO^ zhOOOw!^|@o-9*_513v&{xU@vsW_TTib(1;J^-wq33co{u&@D-uBtwh|)x)fhQli?e*nV3VUs1;-= zUwY+i&FE|`2~b$8NF9~4U}JY}j$DtUp6R3NhCb8BAH!Gq^x$WUe|%aImG8T zPj~7`bNL35oHUF5Ac`td62@ZYn01`|hAUU3S?3c{1^g@Kj`sJfL%zOnUMYt!HwZT&3*1zZXY@&s&#zzYjp*C-Xd6-f6T?^i-rlW# zM8Q7Dd)qXOK9nIy_qHsY9KIm2BlPjZB<%-!5%t{v0;1D2B$z4O+KyotHLKY}3n|&M zO-)I(hPc_H+K$+^J~V-NQ-bnNCR(7%qQVMxn8@tX%f!3*Q^)a9*7;|k({>ut@H6V8 zJ`JM*=6V8N{qf$_Pb9abAhp3Vf!ojJ`W=g%Lal?tRv|GASV;zg4e#gIs6eH)Uj~Te z-9L0-fL?}Pb6Bn8lF@Z^Dp2|=dzMG{eJ|L!1@L~H%dV8$PM!jjunAKz3 zLCv$Z%v$XBsOGBeTw;q*VA%#Q9`lZgS01p5u&ZtFK0bSo$edN@8JWPc5Rp_pdBzglUq38> zB`wVIDx&PR%sPe_!nf-P>W8-^;Of-E4?yZ1@r`-5ct4Qy{dU7Ir)yp+${M^GPvUw#b=&u&~7Oh$LDtA zWBz1Sv%@WH=&quf>bh4mra<&)c5&X#q;=BYo;x&mB1VO^wh|GAQ0uB4(8czBE}5z0 zD=OL6L0;gPW_bLDPEYqVl({GxXnFmk(~Hff^F#PM*(K zYC8ItutCtufm7U}Cy$=rx&N$WP20tK4fkBLHeCx|*h)U0my!L+Bb{nKpYy}nvr*3^ zKFcy(hch6sOxar5XS-%u*JZD(Y&^wuT$^=~Nw;BHXfmJukx|4UdN~E*;HJ4g?lBz} z=^j9}9^T9BwP?7~cBn`6^!WWow0+x@I)?8pJ5gjAF8h(Ta_`q!%vE#yzrbp4fT{x|*q zG(`kT{ac*<-vK59`2a{FkQ+n<0s$#+vh&yQIELif%4tTh!;wa#@0_Rny6vgzmsx-> z@ho_~o#l?!lotnLKqg+`yM>pY-}~S>3VF<@R_gg+ysyFB+8H>W+4w9=2KwuyMSggK3KEC~_A_O1@keqiRt!2J`NW@P7g$P3g19RRB z^-V10p3VtJ9};8~rA2}%|5kN><@Qwt0=$b_O~?P1JsKpfU~3WoZ6WWYO?XwQYErpb z3q$-ZFzAW@#p-jLYB6v^g!`r5h!AtFK5nAo?l3FrG4K%WoT3IgLiX*0&a>Xa2NU=F ziE^v_WKF#ZB|p}~!}8^F49>)@`91c%`IEy4|(s-NUj79I#qLp2e=z= zw5()$O$W$Jd4!q9Ir`)-C7Sj`*~FFRgPpo~I?6`o?G?go6{*#NkFa+(o<6M1!R$^o z@}wFq5e;O_e&B;D!e`fs&LjgYtMIDYQ-J?1GuP0+R-;cHEfsq8$hjg^!%1g(t|U6j vBsfcNQC|MA&oXL~a|+K)Su>T}s6^+#(MpccWyJDIR-YYolMr#|c0TqGgg)u| diff --git a/resource-managers/kubernetes/integration-tests/ssl/trustStore.jks b/resource-managers/kubernetes/integration-tests/ssl/trustStore.jks deleted file mode 100644 index 02e69dd7cf25de0fb68a46bbb39258ed2aa2732e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 982 zcmezO_TO6u1_mY|W(3pB*{PL4cI4$wCItr82t88+O9lq!Sq4qa(+rxJ>J~6FF)}f+ zD8DFbGT>$7)N1o+`_9YA$j!=NP;4k*z{kcM%EBhh9-5b(mtUR-6XC!RVaE{R!Vuv` zh=?1Af=p%><}R1jAqNg4 zD+6;ABR_*d6C)Q>6C)$Tg!8lJeJL)wr1<5N%cX)4>Fo}G*ql>N1+O&9dByOd`<3vP zAn)I%_Y+uUFKBnK>->A`G{fYi-AC4h6-l+<+IU|5S9nT@KHt1IvsayKz4vX^bh~01 z(^ktLuA6>-570j3kjc$_PJM^k8NVnt^G@L^_gkL}c2sn1xaRrsX0)JAQR2lN(R;kBchn^gg&Y-Ja>Ya(!rJkA36h(3$&MFC+!=O?t*G zx6S6gXrb{bgXeQsCR*Ll(07y&x_Wuq`uR-Ej0}v6jeya|4~$h=K^6`Jwsw{SE0R+T zWI+OaEMhDod9&|{p1t#Kj?3&SS00f|ize4GFF_7fV3GrdDkDSUq62QXkE_b(D{BUN zb^m-h_ent2y5)L(tK4^AlDW#5zd1E?ef)R*HsQ*en5;83g&wA7xtg^8UVGKBmXYU3 zyq2+Rjq24ICwvmcID7x4HmQFWm}X*qM?2-!LP?EP>X+Dhf4unjw)4e?XYL2Dr7Za% zp!s7VZ~J4#PkEQa*Rxk;$2fl5{b_o_gKQ0@*mAe7u%tQfdfiSp1avK*#~gl=`{;`6 zFM>781GQGJ7g-kC@@?zFHM9Ny-rvqW$>Cj-t&02q;{4TBPro)rwAzNgT2P|Dl_9m> zJ*w>3=SFAw<)ZtSMXW9}4!3;&Y-wYIQ^8E@HyRp}N$(BsvfVF!G~tVz`nNT%?B{L& V?Rs6^S+>UeMSXbq$wI{!1^`k5Y%u@; From 55176ff8bb9fc9c9f4bbd0e45a82980db006424f Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 27 Jan 2017 12:31:08 -0800 Subject: [PATCH 7/8] Address comments --- .../spark/deploy/kubernetes/Client.scala | 94 +++++++++---------- .../KubernetesSparkRestServer.scala | 17 +--- 2 files changed, 52 insertions(+), 59 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 87cf374b9920d..17068ac2c0296 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -67,8 +67,6 @@ private[spark] class Client( private val driverLaunchTimeoutSecs = sparkConf.getTimeAsSeconds( "spark.kubernetes.driverLaunchTimeout", s"${DEFAULT_LAUNCH_TIMEOUT_SECONDS}s") - private val (driverLaunchSslOptions, isKeyStoreLocalFile) = parseDriverLaunchSslOptions() - private val secretBase64String = { val secretBytes = new Array[Byte](128) SECURE_RANDOM.nextBytes(secretBytes) @@ -88,6 +86,7 @@ private[spark] class Client( .build())) def run(): Unit = { + val (driverLaunchSslOptions, isKeyStoreLocalFile) = parseDriverLaunchSslOptions() val parsedCustomLabels = parseCustomLabels(customLabels) var k8ConfBuilder = new ConfigBuilder() .withApiVersion("v1") @@ -112,7 +111,9 @@ private[spark] class Client( .withData(Map((SUBMISSION_SERVER_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() - val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient) + val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient, + driverLaunchSslOptions, + isKeyStoreLocalFile) try { val driverKubernetesSelectors = (Map( DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, @@ -122,7 +123,7 @@ private[spark] class Client( val submitCompletedFuture = SettableFuture.create[Boolean] val submitPending = new AtomicBoolean(false) val podWatcher = new DriverPodWatcher(submitCompletedFuture, submitPending, - kubernetesClient, driverKubernetesSelectors) + kubernetesClient, driverKubernetesSelectors, driverLaunchSslOptions) Utils.tryWithResource(kubernetesClient .pods() .withLabels(driverKubernetesSelectors) @@ -221,8 +222,9 @@ private[spark] class Client( (securityManager.getSSLOptions("kubernetes.driverlaunch"), isLocalKeyStore) } - private def configureSsl(kubernetesClient: KubernetesClient): - (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { + private def configureSsl(kubernetesClient: KubernetesClient, driverLaunchSslOptions: SSLOptions, + isKeyStoreLocalFile: Boolean): + (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { if (driverLaunchSslOptions.enabled) { val sslSecretsMap = mutable.HashMap[String, String]() val sslEnvs = mutable.Buffer[EnvVar]() @@ -300,7 +302,8 @@ private[spark] class Client( submitCompletedFuture: SettableFuture[Boolean], submitPending: AtomicBoolean, kubernetesClient: KubernetesClient, - driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] { + driverKubernetesSelectors: java.util.Map[String, String], + driverLaunchSslOptions: SSLOptions) extends Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { if ((action == Action.ADDED || action == Action.MODIFIED) && pod.getStatus.getPhase == "Running" @@ -334,7 +337,8 @@ private[spark] class Client( sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) - val driverLauncher = buildDriverLauncherClient(kubernetesClient, service) + val driverLauncher = buildDriverLauncherClient(kubernetesClient, service, + driverLaunchSslOptions) val ping = Retry.retry(5, 5.seconds) { driverLauncher.ping() } @@ -464,19 +468,10 @@ private[spark] class Client( } private def buildContainerPorts(): Seq[ContainerPort] = { - Seq( - new ContainerPortBuilder() - .withContainerPort(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)) - .build(), - new ContainerPortBuilder() - .withContainerPort(sparkConf.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)) - .build(), - new ContainerPortBuilder() - .withContainerPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) - .build(), - new ContainerPortBuilder() - .withContainerPort(uiPort) - .build()) + Seq(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + sparkConf.getInt("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT), + DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT, + uiPort).map(new ContainerPortBuilder().withContainerPort(_).build()) } private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = { @@ -513,7 +508,8 @@ private[spark] class Client( private def buildDriverLauncherClient( kubernetesClient: KubernetesClient, - service: Service): KubernetesSparkRestApi = { + service: Service, + driverLaunchSslOptions: SSLOptions): KubernetesSparkRestApi = { val servicePort = service .getSpec .getPorts @@ -528,35 +524,14 @@ private[spark] class Client( val urlScheme = if (driverLaunchSslOptions.enabled) { "https" } else { - logWarning("Submitting application details and local jars to the cluster" + - " over an insecure connection. Consider configuring SSL to secure" + - " this step.") + logWarning("Submitting application details, application secret, and local" + + " jars to the cluster over an insecure connection. You should configure SSL" + + " to secure this step.") "http" } val (trustManager, sslContext): (X509TrustManager, SSLContext) = if (driverLaunchSslOptions.enabled) { - driverLaunchSslOptions.trustStore.map(trustStoreFile => { - val trustManagerFactory = TrustManagerFactory.getInstance( - TrustManagerFactory.getDefaultAlgorithm) - val trustStore = KeyStore.getInstance( - driverLaunchSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) - if (!trustStoreFile.isFile) { - throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + - s" does not exist or is not a file.") - } - Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => - driverLaunchSslOptions.trustStorePassword match { - case Some(password) => - trustStore.load(trustStoreStream, password.toCharArray) - case None => trustStore.load(trustStoreStream, null) - } - } - trustManagerFactory.init(trustStore) - val trustManagers = trustManagerFactory.getTrustManagers - val sslContext = SSLContext.getInstance("TLSv1.2") - sslContext.init(null, trustManagers, SECURE_RANDOM) - (trustManagers(0).asInstanceOf[X509TrustManager], sslContext) - }).getOrElse((null, SSLContext.getDefault)) + buildSslConnectionConfiguration(driverLaunchSslOptions) } else { (null, SSLContext.getDefault) } @@ -567,6 +542,31 @@ private[spark] class Client( trustContext = trustManager) } + private def buildSslConnectionConfiguration(driverLaunchSslOptions: SSLOptions) = { + driverLaunchSslOptions.trustStore.map(trustStoreFile => { + val trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm) + val trustStore = KeyStore.getInstance( + driverLaunchSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) + if (!trustStoreFile.isFile) { + throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + + s" does not exist or is not a file.") + } + Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => + driverLaunchSslOptions.trustStorePassword match { + case Some(password) => + trustStore.load(trustStoreStream, password.toCharArray) + case None => trustStore.load(trustStoreStream, null) + } + } + trustManagerFactory.init(trustStore) + val trustManagers = trustManagerFactory.getTrustManagers + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(null, trustManagers, SECURE_RANDOM) + (trustManagers(0).asInstanceOf[X509TrustManager], sslContext) + }).getOrElse((null, SSLContext.getDefault)) + } + private def parseCustomLabels(labels: String): Map[String, String] = { labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => { label.split("=", 2).toSeq match { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index be028e84a7d40..69601c348ca89 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -272,6 +272,7 @@ private[spark] class KubernetesSparkRestServer( private[spark] object KubernetesSparkRestServer { private val barrier = new CountDownLatch(1) + def main(args: Array[String]): Unit = { val parsedArguments = KubernetesSparkRestServerArguments.fromArgsArray(args) val secretFile = new File(parsedArguments.secretFile.get) @@ -282,10 +283,12 @@ private[spark] object KubernetesSparkRestServer { val sslOptions = if (parsedArguments.useSsl) { val keyStorePassword = parsedArguments .keyStorePasswordFile - .map(fileToUtf8String(_, "KeyStore Password file")) + .map(new File(_)) + .map(Files.toString(_, Charsets.UTF_8)) val keyPassword = parsedArguments .keyPasswordFile - .map(fileToUtf8String(_, "Key Password file")) + .map(new File(_)) + .map(Files.toString(_, Charsets.UTF_8)) new SSLOptions( enabled = true, keyStore = parsedArguments.keyStoreFile.map(new File(_)), @@ -314,15 +317,5 @@ private[spark] object KubernetesSparkRestServer { }) barrier.await() } - - private def fileToUtf8String(filePath: String, fileType: String) = { - val passwordFile = new File(filePath) - if (!passwordFile.isFile) { - throw new SparkException(s"$fileType at $filePath does not exist or " + - "is a directory.") - } - val passwordBytes = Files.toByteArray(passwordFile) - new String(passwordBytes, Charsets.UTF_8) - } } From 3438a8afb751c97e7085971db6b42c6fad61722e Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 27 Jan 2017 14:30:17 -0800 Subject: [PATCH 8/8] Resolve merge conflicts --- .../spark/deploy/kubernetes/Client.scala | 62 +++++++------------ 1 file changed, 24 insertions(+), 38 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c900572079ab4..2dbd7691a2de3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -122,8 +122,13 @@ private[spark] class Client( val containerPorts = buildContainerPorts() val submitCompletedFuture = SettableFuture.create[Boolean] val submitPending = new AtomicBoolean(false) - val podWatcher = new DriverPodWatcher(submitCompletedFuture, submitPending, - kubernetesClient, submitServerSecret, driverKubernetesSelectors) + val podWatcher = new DriverPodWatcher( + submitCompletedFuture, + submitPending, + kubernetesClient, + driverLaunchSslOptions, + Array(submitServerSecret) ++ sslSecrets, + driverKubernetesSelectors) Utils.tryWithResource(kubernetesClient .pods() .withLabels(driverKubernetesSelectors) @@ -137,15 +142,9 @@ private[spark] class Client( .withRestartPolicy("OnFailure") .addNewVolume() .withName(s"spark-submission-secret-volume") -<<<<<<< HEAD - .withNewSecret().withSecretName(submitServerSecret.getMetadata.getName).endSecret() -||||||| merged common ancestors - .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() -======= .withNewSecret() - .withSecretName(applicationSubmitSecret.getMetadata.getName) + .withSecretName(submitServerSecret.getMetadata.getName) .endSecret() ->>>>>>> nodeport-upload .endVolume .addToVolumes(sslVolumes: _*) .withServiceAccount(serviceAccount) @@ -183,24 +182,19 @@ private[spark] class Client( throw new SparkException(finalErrorMessage, e) } finally { if (!submitSucceeded) { - try { + Utils.tryLogNonFatalError({ kubernetesClient.pods.withName(kubernetesAppId).delete - } catch { - case throwable: Throwable => - logError("Failed to delete driver pod after it failed to run.", throwable) - } + }) } } } } finally { -<<<<<<< HEAD - kubernetesClient.secrets().delete(submitServerSecret) - kubernetesClient.secrets().delete(sslSecrets: _*) -||||||| merged common ancestors - kubernetesClient.secrets().delete(secret) -======= - kubernetesClient.secrets().delete(applicationSubmitSecret) ->>>>>>> nodeport-upload + Utils.tryLogNonFatalError({ + kubernetesClient.secrets().delete(submitServerSecret) + }) + Utils.tryLogNonFatalError({ + kubernetesClient.secrets().delete(sslSecrets: _*) + }) } }) } @@ -316,15 +310,9 @@ private[spark] class Client( submitCompletedFuture: SettableFuture[Boolean], submitPending: AtomicBoolean, kubernetesClient: KubernetesClient, -<<<<<<< HEAD - driverKubernetesSelectors: java.util.Map[String, String], - driverLaunchSslOptions: SSLOptions) extends Watcher[Pod] { -||||||| merged common ancestors + driverLaunchSslOptions: SSLOptions, + applicationSecrets: Array[Secret], driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] { -======= - applicationSubmitSecret: Secret, - driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] { ->>>>>>> nodeport-upload override def eventReceived(action: Action, pod: Pod): Unit = { if ((action == Action.ADDED || action == Action.MODIFIED) && pod.getStatus.getPhase == "Running" @@ -344,8 +332,10 @@ private[spark] class Client( .withController(true) .build()) - applicationSubmitSecret.getMetadata.setOwnerReferences(ownerRefs.asJava) - kubernetesClient.secrets().createOrReplace(applicationSubmitSecret) + applicationSecrets.foreach(secret => { + secret.getMetadata.setOwnerReferences(ownerRefs.asJava) + kubernetesClient.secrets().createOrReplace(secret) + }) val driverLauncherServicePort = new ServicePortBuilder() .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME) @@ -421,13 +411,9 @@ private[spark] class Client( } catch { case e: Throwable => submitCompletedFuture.setException(e) - try { + Utils.tryLogNonFatalError({ kubernetesClient.services().delete(service) - } catch { - case throwable: Throwable => - logError("Submitting the job failed but failed to" + - " clean up the created service.", throwable) - } + }) throw e } case None =>