From 87413a29e24c485a8d6c619c4236b67f879399cf Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 17 Jan 2017 18:02:43 -0800 Subject: [PATCH 01/11] Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started --- .../spark/deploy/kubernetes/Client.scala | 243 +++++++++--------- .../KubernetesClusterSchedulerBackend.scala | 2 +- 2 files changed, 126 insertions(+), 119 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index fe3256b9e12b..5aac20105b45 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -19,17 +19,14 @@ package org.apache.spark.deploy.kubernetes import java.io.File import java.security.SecureRandom import java.util.concurrent.{Executors, TimeoutException, TimeUnit} -import javax.net.ssl.X509TrustManager import com.google.common.io.Files import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watch, Watcher} import io.fabric8.kubernetes.client.Watcher.Action -import io.fabric8.kubernetes.client.internal.SSLUtils import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext import scala.concurrent.duration.DurationInt import scala.util.Success @@ -60,6 +57,7 @@ private[spark] class Client( private val driverDockerImage = sparkConf.get( "spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION") private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars") + private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val secretBase64String = { val secretBytes = new Array[Byte](128) @@ -109,7 +107,13 @@ private[spark] class Client( DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava - val (servicePorts, containerPorts) = configurePorts() + val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava + val containerPorts = configureContainerPorts() + val driverLauncherServicePort = new ServicePortBuilder() + .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME) + .withPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + .withNewTargetPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + .build() val service = kubernetesClient.services().createNew() .withNewMetadata() .withName(kubernetesAppId) @@ -117,30 +121,29 @@ private[spark] class Client( .endMetadata() .withNewSpec() .withSelector(resolvedSelectors) - .withPorts(servicePorts.asJava) + .withType("NodePort") + .withPorts(driverLauncherServicePort) .endSpec() .done() sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName) sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) - val submitRequest = buildSubmissionRequest() val submitCompletedFuture = SettableFuture.create[Boolean] val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" val podWatcher = new Watcher[Pod] { - override def eventReceived(action: Action, t: Pod): Unit = { + override def eventReceived(action: Action, pod: Pod): Unit = { if ((action == Action.ADDED || action == Action.MODIFIED) - && t.getStatus.getPhase == "Running" + && pod.getStatus.getPhase == "Running" && !submitCompletedFuture.isDone) { - t.getStatus + pod.getStatus .getContainerStatuses .asScala .find(status => status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { case Some(_) => try { - val driverLauncher = getDriverLauncherService( - k8ClientConfig, master) + val driverLauncher = getDriverLauncherService(kubernetesClient, kubernetesAppId) val ping = Retry.retry(5, 5.seconds) { driverLauncher.ping() } @@ -152,8 +155,25 @@ private[spark] class Client( } val submitComplete = ping andThen { case Success(_) => + sparkConf.set("spark.driver.host", pod.getStatus.getPodIP) + val submitRequest = buildSubmissionRequest() driverLauncher.create(submitRequest) submitCompletedFuture.set(true) + // After submitting, adjust the service to only expose the Spark UI + val uiServicePort = new ServicePortBuilder() + .withName(UI_PORT_NAME) + .withPort(uiPort) + .withNewTargetPort(uiPort) + .build + kubernetesClient + .services + .withName(kubernetesAppId) + .edit() + .editSpec() + .withType("ClusterIP") + .withPorts(uiServicePort) + .endSpec + .done } submitComplete onFailure { case t: Throwable => @@ -222,59 +242,7 @@ private[spark] class Client( submitSucceeded = true } catch { case e: TimeoutException => - val driverPod = try { - kubernetesClient.pods().withName(kubernetesAppId).get() - } catch { - case throwable: Throwable => - logError(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS seconds for the" + - " driver pod to start, but an error occurred while fetching the driver" + - " pod's details.", throwable) - throw new SparkException(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS" + - " seconds for the driver pod to start. Unfortunately, in attempting to fetch" + - " the latest state of the pod, another error was thrown. Check the logs for" + - " the error that was thrown in looking up the driver pod.", e) - } - val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + - s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" + - s" $LAUNCH_TIMEOUT_SECONDS seconds." - val podStatusPhase = if (driverPod.getStatus.getPhase != null) { - s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}" - } else { - "The pod had no final phase." - } - val podStatusMessage = if (driverPod.getStatus.getMessage != null) { - s"Latest message from the pod is: ${driverPod.getStatus.getMessage}" - } else { - "The pod had no final message." - } - val failedDriverContainerStatusString = driverPod.getStatus - .getContainerStatuses - .asScala - .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME) - .map(status => { - val lastState = status.getState - if (lastState.getRunning != null) { - "Driver container last state: Running\n" + - s"Driver container started at: ${lastState.getRunning.getStartedAt}" - } else if (lastState.getWaiting != null) { - "Driver container last state: Waiting\n" + - s"Driver container wait reason: ${lastState.getWaiting.getReason}\n" + - s"Driver container message: ${lastState.getWaiting.getMessage}\n" - } else if (lastState.getTerminated != null) { - "Driver container last state: Terminated\n" + - s"Driver container started at: ${lastState.getTerminated.getStartedAt}\n" + - s"Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n" + - s"Driver container exit reason: ${lastState.getTerminated.getReason}\n" + - s"Driver container exit code: ${lastState.getTerminated.getExitCode}\n" + - s"Driver container message: ${lastState.getTerminated.getMessage}" - } else { - "Driver container last state: Unknown" - } - }).getOrElse("The driver container wasn't found in the pod; expected to find" + - s" container with name $DRIVER_LAUNCHER_CONTAINER_NAME") - val finalErrorMessage = s"$topLevelMessage\n" + - s"$podStatusPhase\n" + - s"$podStatusMessage\n\n$failedDriverContainerStatusString" + val finalErrorMessage: String = getSubmitErrorMessage(kubernetesClient, e) logError(finalErrorMessage, e) throw new SparkException(finalErrorMessage, e) } finally { @@ -285,6 +253,12 @@ private[spark] class Client( case throwable: Throwable => logError("Failed to delete driver pod after it failed to run.", throwable) } + try { + kubernetesClient.services.delete(service) + } catch { + case throwable: Throwable => + logError("Failed to delete driver service after it failed to run.", throwable) + } } } } @@ -299,44 +273,78 @@ private[spark] class Client( }) } - private def configurePorts(): (Seq[ServicePort], Seq[ContainerPort]) = { - val servicePorts = new ArrayBuffer[ServicePort] - val containerPorts = new ArrayBuffer[ContainerPort] - - def addPortToServiceAndContainer(portName: String, portValue: Int): Unit = { - servicePorts += new ServicePortBuilder() - .withName(portName) - .withPort(portValue) - .withNewTargetPort(portValue) - .build() - containerPorts += new ContainerPortBuilder() - .withContainerPort(portValue) - .build() + private def getSubmitErrorMessage( + kubernetesClient: DefaultKubernetesClient, + e: TimeoutException): String = { + val driverPod = try { + kubernetesClient.pods().withName(kubernetesAppId).get() + } catch { + case throwable: Throwable => + logError(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS seconds for the" + + " driver pod to start, but an error occurred while fetching the driver" + + " pod's details.", throwable) + throw new SparkException(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS" + + " seconds for the driver pod to start. Unfortunately, in attempting to fetch" + + " the latest state of the pod, another error was thrown. Check the logs for" + + " the error that was thrown in looking up the driver pod.", e) } + val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + + s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" + + s" $LAUNCH_TIMEOUT_SECONDS seconds." + val podStatusPhase = if (driverPod.getStatus.getPhase != null) { + s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}" + } else { + "The pod had no final phase." + } + val podStatusMessage = if (driverPod.getStatus.getMessage != null) { + s"Latest message from the pod is: ${driverPod.getStatus.getMessage}" + } else { + "The pod had no final message." + } + val failedDriverContainerStatusString = driverPod.getStatus + .getContainerStatuses + .asScala + .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME) + .map(status => { + val lastState = status.getState + if (lastState.getRunning != null) { + "Driver container last state: Running\n" + + s"Driver container started at: ${lastState.getRunning.getStartedAt}" + } else if (lastState.getWaiting != null) { + "Driver container last state: Waiting\n" + + s"Driver container wait reason: ${lastState.getWaiting.getReason}\n" + + s"Driver container message: ${lastState.getWaiting.getMessage}\n" + } else if (lastState.getTerminated != null) { + "Driver container last state: Terminated\n" + + s"Driver container started at: ${lastState.getTerminated.getStartedAt}\n" + + s"Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n" + + s"Driver container exit reason: ${lastState.getTerminated.getReason}\n" + + s"Driver container exit code: ${lastState.getTerminated.getExitCode}\n" + + s"Driver container message: ${lastState.getTerminated.getMessage}" + } else { + "Driver container last state: Unknown" + } + }).getOrElse("The driver container wasn't found in the pod; expected to find" + + s" container with name $DRIVER_LAUNCHER_CONTAINER_NAME") + s"$topLevelMessage\n" + + s"$podStatusPhase\n" + + s"$podStatusMessage\n\n$failedDriverContainerStatusString" + } - addPortToServiceAndContainer( - DRIVER_LAUNCHER_SERVICE_PORT_NAME, - DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) - addPortToServiceAndContainer( - DRIVER_PORT_NAME, - sparkConf - .getOption("spark.driver.port") - .map(_.toInt) - .getOrElse(DEFAULT_DRIVER_PORT)) - addPortToServiceAndContainer( - BLOCKMANAGER_PORT_NAME, - sparkConf - .getOption("spark.blockmanager.port") - .map(_.toInt) - .getOrElse(DEFAULT_BLOCKMANAGER_PORT)) - - addPortToServiceAndContainer( - UI_PORT_NAME, - sparkConf - .getOption("spark.ui.port") - .map(_.toInt) - .getOrElse(DEFAULT_UI_PORT)) - (servicePorts, containerPorts) + private def configureContainerPorts(): Seq[ContainerPort] = { + Seq( + new ContainerPortBuilder() + .withContainerPort(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)) + .build(), + new ContainerPortBuilder() + .withContainerPort(sparkConf.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)) + .build(), + new ContainerPortBuilder() + .withContainerPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + .build(), + new ContainerPortBuilder() + .withContainerPort(uiPort) + .build()) } private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = { @@ -372,23 +380,22 @@ private[spark] class Client( } private def getDriverLauncherService( - k8ClientConfig: Config, - kubernetesMaster: String): KubernetesSparkRestApi = { - val url = s"${ - Array[String]( - kubernetesMaster, - "api", "v1", "proxy", - "namespaces", namespace, - "services", kubernetesAppId).mkString("/")}" + - s":$DRIVER_LAUNCHER_SERVICE_PORT_NAME/" - - val sslContext = SSLUtils.sslContext(k8ClientConfig) - val trustManager = SSLUtils.trustManagers( - k8ClientConfig)(0).asInstanceOf[X509TrustManager] - HttpClientUtil.createClient[KubernetesSparkRestApi]( - uri = url, - sslSocketFactory = sslContext.getSocketFactory, - trustContext = trustManager) + kubernetesClient: KubernetesClient, + serviceName: String): KubernetesSparkRestApi = { + val service = kubernetesClient.services.withName(serviceName).get + val servicePort = service + .getSpec + .getPorts + .asScala + .filter(_.getName == DRIVER_LAUNCHER_SERVICE_PORT_NAME) + .head + .getNodePort + // NodePort is exposed on every node, so just pick one of them. + // TODO be resilient to node failures and try all of them + val node = kubernetesClient.nodes.list.getItems.asScala.head + val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress + val url = s"http://$nodeAddress:$servicePort" + HttpClientUtil.createClient[KubernetesSparkRestApi](uri = url) } private def parseCustomLabels(labels: String): Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index b7110ba90184..114250ad4f58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -92,7 +92,7 @@ private[spark] class KubernetesClusterSchedulerBackend( protected var totalExpectedExecutors = new AtomicInteger(0) private val driverUrl = RpcEndpointAddress( - System.getenv(s"${convertToEnvMode(kubernetesDriverServiceName)}_SERVICE_HOST"), + sc.getConf.get("spark.driver.host"), sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString From 5cbdba0f11a259e612e543a321bd19572b9d7206 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 18 Jan 2017 12:05:54 -0800 Subject: [PATCH 02/11] Move service creation down and more thorough error handling --- .../spark/deploy/kubernetes/Client.scala | 157 +++++++++--------- 1 file changed, 83 insertions(+), 74 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 5aac20105b45..65851f930377 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.kubernetes import java.io.File import java.security.SecureRandom import java.util.concurrent.{Executors, TimeoutException, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean import com.google.common.io.Files import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} @@ -27,9 +28,8 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, Kub import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.DurationInt -import scala.util.Success import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException} import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} @@ -109,86 +109,102 @@ private[spark] class Client( ++ parsedCustomLabels).asJava val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava val containerPorts = configureContainerPorts() - val driverLauncherServicePort = new ServicePortBuilder() - .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME) - .withPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) - .withNewTargetPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) - .build() - val service = kubernetesClient.services().createNew() - .withNewMetadata() - .withName(kubernetesAppId) - .withLabels(Map(SPARK_APP_NAME_LABEL -> appName).asJava) - .endMetadata() - .withNewSpec() - .withSelector(resolvedSelectors) - .withType("NodePort") - .withPorts(driverLauncherServicePort) - .endSpec() - .done() - sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName) - sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) - sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) val submitCompletedFuture = SettableFuture.create[Boolean] val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" - + val submitPending = new AtomicBoolean(false) val podWatcher = new Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { if ((action == Action.ADDED || action == Action.MODIFIED) && pod.getStatus.getPhase == "Running" && !submitCompletedFuture.isDone) { - pod.getStatus - .getContainerStatuses - .asScala - .find(status => - status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { - case Some(_) => - try { - val driverLauncher = getDriverLauncherService(kubernetesClient, kubernetesAppId) - val ping = Retry.retry(5, 5.seconds) { - driverLauncher.ping() - } - ping onFailure { - case t: Throwable => - if (!submitCompletedFuture.isDone) { + if (!submitPending.getAndSet(true)) { + pod.getStatus + .getContainerStatuses + .asScala + .find(status => + status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { + case Some(_) => + val driverLauncherServicePort = new ServicePortBuilder() + .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME) + .withPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + .withNewTargetPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + .build() + val service = kubernetesClient.services().createNew() + .withNewMetadata() + .withName(kubernetesAppId) + .endMetadata() + .withNewSpec() + .withType("NodePort") + .withSelector(selectors) + .withPorts(driverLauncherServicePort) + .endSpec() + .done() + try { + sparkConf.set("spark.kubernetes.driver.service.name", + service.getMetadata.getName) + sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) + sparkConf.setIfMissing("spark.blockmanager.port", + DEFAULT_BLOCKMANAGER_PORT.toString) + val driverLauncher = getDriverLauncherClient(kubernetesClient, service) + val ping = Retry.retry(5, 5.seconds) { + driverLauncher.ping() + } + ping onFailure { + case t: Throwable => submitCompletedFuture.setException(t) + kubernetesClient.services().delete(service) + } + val submitComplete = ping.flatMap { _ => + Future { + sparkConf.set("spark.driver.host", pod.getStatus.getPodIP) + val submitRequest = buildSubmissionRequest() + driverLauncher.create(submitRequest) } - } - val submitComplete = ping andThen { - case Success(_) => - sparkConf.set("spark.driver.host", pod.getStatus.getPodIP) - val submitRequest = buildSubmissionRequest() - driverLauncher.create(submitRequest) - submitCompletedFuture.set(true) - // After submitting, adjust the service to only expose the Spark UI - val uiServicePort = new ServicePortBuilder() - .withName(UI_PORT_NAME) - .withPort(uiPort) - .withNewTargetPort(uiPort) - .build - kubernetesClient - .services - .withName(kubernetesAppId) - .edit() + } + submitComplete onFailure { + case t: Throwable => + submitCompletedFuture.setException(t) + kubernetesClient.services().delete(service) + } + val adjustServicePort = submitComplete.flatMap { _ => + Future { + // After submitting, adjust the service to only expose the Spark UI + val uiServicePort = new ServicePortBuilder() + .withName(UI_PORT_NAME) + .withPort(uiPort) + .withNewTargetPort(uiPort) + .build + kubernetesClient.services().withName(kubernetesAppId).edit() .editSpec() .withType("ClusterIP") .withPorts(uiServicePort) .endSpec .done - } - submitComplete onFailure { - case t: Throwable => - if (!submitCompletedFuture.isDone) { - submitCompletedFuture.setException(t) } - } - } catch { - case e: Throwable => - if (!submitCompletedFuture.isDone) { + } + adjustServicePort onSuccess { + case _ => + submitCompletedFuture.set(true) + } + adjustServicePort onFailure { + case throwable: Throwable => + submitCompletedFuture.setException(throwable) + kubernetesClient.services().delete(service) + } + } catch { + case e: Throwable => submitCompletedFuture.setException(e) + try { + kubernetesClient.services().delete(service) + } catch { + case throwable: Throwable => + logError("Submitting the job failed but failed to" + + " clean up the created service.", throwable) + } throw e - } - } - case None => + } + case None => + } } } } @@ -253,12 +269,6 @@ private[spark] class Client( case throwable: Throwable => logError("Failed to delete driver pod after it failed to run.", throwable) } - try { - kubernetesClient.services.delete(service) - } catch { - case throwable: Throwable => - logError("Failed to delete driver service after it failed to run.", throwable) - } } } } @@ -379,10 +389,9 @@ private[spark] class Client( .map(CompressionUtils.createTarGzip(_)) } - private def getDriverLauncherService( + private def getDriverLauncherClient( kubernetesClient: KubernetesClient, - serviceName: String): KubernetesSparkRestApi = { - val service = kubernetesClient.services.withName(serviceName).get + service: Service): KubernetesSparkRestApi = { val servicePort = service .getSpec .getPorts From 3e7ce16043e8837e9080c3fe42f5a8e8aa3ed305 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 25 Jan 2017 15:00:50 -0800 Subject: [PATCH 03/11] Fix missed merge conflict --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c9bc7e618e73..3f0464175244 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -302,7 +302,7 @@ private[spark] class Client( } val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" + - s" $LAUNCH_TIMEOUT_SECONDS seconds." + s" $driverLaunchTimeoutSecs seconds." val podStatusPhase = if (driverPod.getStatus.getPhase != null) { s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}" } else { From 8e14ebd3039b9e8ca6a90aefddafa9f32290f092 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 25 Jan 2017 15:03:59 -0800 Subject: [PATCH 04/11] Add braces --- .../scala/org/apache/spark/deploy/kubernetes/Client.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 3f0464175244..e5395256bdc6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -175,12 +175,12 @@ private[spark] class Client( .withName(UI_PORT_NAME) .withPort(uiPort) .withNewTargetPort(uiPort) - .build + .build() kubernetesClient.services().withName(kubernetesAppId).edit() .editSpec() .withType("ClusterIP") .withPorts(uiServicePort) - .endSpec + .endSpec() .done } } From 7ff6eae3d9469e07b2668af443f4e4ae2344f99c Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 25 Jan 2017 15:44:00 -0800 Subject: [PATCH 05/11] Fix bad merge --- .../scala/org/apache/spark/deploy/kubernetes/Client.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index e5395256bdc6..fe635e1b1bc3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -109,7 +109,6 @@ private[spark] class Client( DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava - val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava val containerPorts = configureContainerPorts() val submitCompletedFuture = SettableFuture.create[Boolean] val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" @@ -134,10 +133,11 @@ private[spark] class Client( val service = kubernetesClient.services().createNew() .withNewMetadata() .withName(kubernetesAppId) + .withLabels(resolvedSelectors) .endMetadata() .withNewSpec() .withType("NodePort") - .withSelector(selectors) + .withSelector(resolvedSelectors) .withPorts(driverLauncherServicePort) .endSpec() .done() From 282b27fca49be52c7e2b4855cfe74b3a8b5b5b18 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 26 Jan 2017 12:11:47 -0800 Subject: [PATCH 06/11] Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() --- .../spark/deploy/kubernetes/Client.scala | 262 +++++++++--------- 1 file changed, 132 insertions(+), 130 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index fe635e1b1bc3..f3f108beece4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean import com.google.common.io.Files import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ @@ -105,132 +105,30 @@ private[spark] class Client( .withType("Opaque") .done() try { - val resolvedSelectors = (Map( + val driverKubernetesSelectors = (Map( DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava - val containerPorts = configureContainerPorts() + val containerPorts = buildContainerPorts() val submitCompletedFuture = SettableFuture.create[Boolean] val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" val submitPending = new AtomicBoolean(false) - val podWatcher = new Watcher[Pod] { - override def eventReceived(action: Action, pod: Pod): Unit = { - if ((action == Action.ADDED || action == Action.MODIFIED) - && pod.getStatus.getPhase == "Running" - && !submitCompletedFuture.isDone) { - if (!submitPending.getAndSet(true)) { - pod.getStatus - .getContainerStatuses - .asScala - .find(status => - status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { - case Some(_) => - val driverLauncherServicePort = new ServicePortBuilder() - .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME) - .withPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) - .withNewTargetPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) - .build() - val service = kubernetesClient.services().createNew() - .withNewMetadata() - .withName(kubernetesAppId) - .withLabels(resolvedSelectors) - .endMetadata() - .withNewSpec() - .withType("NodePort") - .withSelector(resolvedSelectors) - .withPorts(driverLauncherServicePort) - .endSpec() - .done() - try { - sparkConf.set("spark.kubernetes.driver.service.name", - service.getMetadata.getName) - sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) - sparkConf.setIfMissing("spark.blockmanager.port", - DEFAULT_BLOCKMANAGER_PORT.toString) - val driverLauncher = getDriverLauncherClient(kubernetesClient, service) - val ping = Retry.retry(5, 5.seconds) { - driverLauncher.ping() - } - ping onFailure { - case t: Throwable => - submitCompletedFuture.setException(t) - kubernetesClient.services().delete(service) - } - val submitComplete = ping.flatMap { _ => - Future { - sparkConf.set("spark.driver.host", pod.getStatus.getPodIP) - val submitRequest = buildSubmissionRequest() - driverLauncher.create(submitRequest) - } - } - submitComplete onFailure { - case t: Throwable => - submitCompletedFuture.setException(t) - kubernetesClient.services().delete(service) - } - val adjustServicePort = submitComplete.flatMap { _ => - Future { - // After submitting, adjust the service to only expose the Spark UI - val uiServicePort = new ServicePortBuilder() - .withName(UI_PORT_NAME) - .withPort(uiPort) - .withNewTargetPort(uiPort) - .build() - kubernetesClient.services().withName(kubernetesAppId).edit() - .editSpec() - .withType("ClusterIP") - .withPorts(uiServicePort) - .endSpec() - .done - } - } - adjustServicePort onSuccess { - case _ => - submitCompletedFuture.set(true) - } - adjustServicePort onFailure { - case throwable: Throwable => - submitCompletedFuture.setException(throwable) - kubernetesClient.services().delete(service) - } - } catch { - case e: Throwable => - submitCompletedFuture.setException(e) - try { - kubernetesClient.services().delete(service) - } catch { - case throwable: Throwable => - logError("Submitting the job failed but failed to" + - " clean up the created service.", throwable) - } - throw e - } - case None => - } - } - } - } - - override def onClose(e: KubernetesClientException): Unit = { - if (!submitCompletedFuture.isDone) { - submitCompletedFuture.setException(e) - } - } - } - - def createDriverPod(unused: Watch): Unit = { + val podWatcher = new DriverPodWatcher(submitCompletedFuture, submitPending, + kubernetesClient, driverKubernetesSelectors) + Utils.tryWithResource(kubernetesClient + .pods() + .withLabels(driverKubernetesSelectors) + .watch(podWatcher)) { _ => kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) - .withLabels(resolvedSelectors) + .withLabels(driverKubernetesSelectors) .endMetadata() .withNewSpec() .withRestartPolicy("OnFailure") .addNewVolume() .withName(s"spark-submission-secret-volume") - .withNewSecret() - .withSecretName(secret.getMetadata.getName) - .endSecret() + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() .endVolume .withServiceAccount(serviceAccount) .addNewContainer() @@ -260,32 +158,136 @@ private[spark] class Client( submitSucceeded = true } catch { case e: TimeoutException => - val finalErrorMessage: String = getSubmitErrorMessage(kubernetesClient, e) + val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e) logError(finalErrorMessage, e) throw new SparkException(finalErrorMessage, e) - } finally { - if (!submitSucceeded) { - try { - kubernetesClient.pods.withName(kubernetesAppId).delete - } catch { - case throwable: Throwable => - logError("Failed to delete driver pod after it failed to run.", throwable) - } + } finally { + if (!submitSucceeded) { + try { + kubernetesClient.pods.withName(kubernetesAppId).delete + } catch { + case throwable: Throwable => + logError("Failed to delete driver pod after it failed to run.", throwable) } } } - - Utils.tryWithResource(kubernetesClient - .pods() - .withLabels(resolvedSelectors) - .watch(podWatcher)) { createDriverPod } + } } finally { kubernetesClient.secrets().delete(secret) } }) } - private def getSubmitErrorMessage( + private class DriverPodWatcher( + submitCompletedFuture: SettableFuture[Boolean], + submitPending: AtomicBoolean, + kubernetesClient: KubernetesClient, + driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] { + override def eventReceived(action: Action, pod: Pod): Unit = { + if ((action == Action.ADDED || action == Action.MODIFIED) + && pod.getStatus.getPhase == "Running" + && !submitCompletedFuture.isDone) { + if (!submitPending.getAndSet(true)) { + pod.getStatus + .getContainerStatuses + .asScala + .find(status => + status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { + case Some(_) => + val driverLauncherServicePort = new ServicePortBuilder() + .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME) + .withPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + .withNewTargetPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + .build() + val service = kubernetesClient.services().createNew() + .withNewMetadata() + .withName(kubernetesAppId) + .withLabels(driverKubernetesSelectors) + .endMetadata() + .withNewSpec() + .withType("NodePort") + .withSelector(driverKubernetesSelectors) + .withPorts(driverLauncherServicePort) + .endSpec() + .done() + try { + sparkConf.set("spark.kubernetes.driver.service.name", + service.getMetadata.getName) + sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) + sparkConf.setIfMissing("spark.blockmanager.port", + DEFAULT_BLOCKMANAGER_PORT.toString) + val driverLauncher = buildDriverLauncherClient(kubernetesClient, service) + val ping = Retry.retry(5, 5.seconds) { + driverLauncher.ping() + } + ping onFailure { + case t: Throwable => + submitCompletedFuture.setException(t) + kubernetesClient.services().delete(service) + } + val submitComplete = ping.flatMap { _ => + Future { + sparkConf.set("spark.driver.host", pod.getStatus.getPodIP) + val submitRequest = buildSubmissionRequest() + driverLauncher.create(submitRequest) + } + } + submitComplete onFailure { + case t: Throwable => + submitCompletedFuture.setException(t) + kubernetesClient.services().delete(service) + } + val adjustServicePort = submitComplete.flatMap { _ => + Future { + // After submitting, adjust the service to only expose the Spark UI + val uiServicePort = new ServicePortBuilder() + .withName(UI_PORT_NAME) + .withPort(uiPort) + .withNewTargetPort(uiPort) + .build() + kubernetesClient.services().withName(kubernetesAppId).edit() + .editSpec() + .withType("ClusterIP") + .withPorts(uiServicePort) + .endSpec() + .done + } + } + adjustServicePort onSuccess { + case _ => + submitCompletedFuture.set(true) + } + adjustServicePort onFailure { + case throwable: Throwable => + submitCompletedFuture.setException(throwable) + kubernetesClient.services().delete(service) + } + } catch { + case e: Throwable => + submitCompletedFuture.setException(e) + try { + kubernetesClient.services().delete(service) + } catch { + case throwable: Throwable => + logError("Submitting the job failed but failed to" + + " clean up the created service.", throwable) + } + throw e + } + case None => + } + } + } + } + + override def onClose(e: KubernetesClientException): Unit = { + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(e) + } + } + } + + private def buildSubmitFailedErrorMessage( kubernetesClient: DefaultKubernetesClient, e: TimeoutException): String = { val driverPod = try { @@ -343,7 +345,7 @@ private[spark] class Client( s"$podStatusMessage\n\n$failedDriverContainerStatusString" } - private def configureContainerPorts(): Seq[ContainerPort] = { + private def buildContainerPorts(): Seq[ContainerPort] = { Seq( new ContainerPortBuilder() .withContainerPort(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)) @@ -391,7 +393,7 @@ private[spark] class Client( .map(CompressionUtils.createTarGzip(_)) } - private def getDriverLauncherClient( + private def buildDriverLauncherClient( kubernetesClient: KubernetesClient, service: Service): KubernetesSparkRestApi = { val servicePort = service From 0b0a7ed46247eb661d5d70c879a7478862f86420 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 27 Jan 2017 14:06:44 -0800 Subject: [PATCH 07/11] Remove unused method --- .../cluster/kubernetes/KubernetesClusterSchedulerBackend.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 114250ad4f58..32a860960d13 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -96,9 +96,6 @@ private[spark] class KubernetesClusterSchedulerBackend( sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - private def convertToEnvMode(value: String): String = - value.toUpperCase.map { c => if (c == '-') '_' else c } - private val initialExecutors = getInitialTargetExecutorNumber(1) private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { From ee8b0a5170ad3b5b92fb93b70c642a766767d963 Mon Sep 17 00:00:00 2001 From: mccheah Date: Fri, 27 Jan 2017 14:33:06 -0800 Subject: [PATCH 08/11] Support SSL configuration for the driver application submission (#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts --- .../deploy/rest/RestSubmissionServer.scala | 40 ++- docs/running-on-kubernetes.md | 18 ++ pom.xml | 7 +- .../spark/deploy/kubernetes/Client.scala | 249 +++++++++++++++--- .../KubernetesSparkRestServer.scala | 58 +++- .../src/main/docker/driver/Dockerfile | 13 +- .../kubernetes/integration-tests/pom.xml | 4 + .../integrationtest/KubernetesSuite.scala | 43 +++ .../integrationtest/sslutil/SSLUtils.scala | 80 ++++++ 9 files changed, 445 insertions(+), 67 deletions(-) create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index b30c980e95a9..524726c2ccf9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -19,16 +19,16 @@ package org.apache.spark.deploy.rest import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import scala.io.Source - import com.fasterxml.jackson.core.JsonProcessingException -import org.eclipse.jetty.server.{HttpConnectionFactory, Server, ServerConnector} +import org.eclipse.jetty.http.HttpVersion +import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory} import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler} import org.json4s._ import org.json4s.jackson.JsonMethods._ +import scala.io.Source -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} +import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SSLOptions} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -50,7 +50,8 @@ import org.apache.spark.util.Utils private[spark] abstract class RestSubmissionServer( val host: String, val requestedPort: Int, - val masterConf: SparkConf) extends Logging { + val masterConf: SparkConf, + val sslOptions: SSLOptions = SSLOptions()) extends Logging { protected val submitRequestServlet: SubmitRequestServlet protected val killRequestServlet: KillRequestServlet protected val statusRequestServlet: StatusRequestServlet @@ -79,19 +80,32 @@ private[spark] abstract class RestSubmissionServer( * Return a 2-tuple of the started server and the bound port. */ private def doStart(startPort: Int): (Server, Int) = { + // TODO consider using JettyUtils#startServer to do this instead val threadPool = new QueuedThreadPool threadPool.setDaemon(true) val server = new Server(threadPool) + val resolvedConnectionFactories = sslOptions + .createJettySslContextFactory() + .map(sslFactory => { + val sslConnectionFactory = new SslConnectionFactory( + sslFactory, HttpVersion.HTTP_1_1.asString()) + val rawHttpConfiguration = new HttpConfiguration() + rawHttpConfiguration.setSecureScheme("https") + rawHttpConfiguration.setSecurePort(startPort) + val rawHttpConnectionFactory = new HttpConnectionFactory(rawHttpConfiguration) + Array(sslConnectionFactory, rawHttpConnectionFactory) + }).getOrElse(Array(new HttpConnectionFactory())) + val connector = new ServerConnector( - server, - null, - // Call this full constructor to set this, which forces daemon threads: - new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true), - null, - -1, - -1, - new HttpConnectionFactory()) + server, + null, + // Call this full constructor to set this, which forces daemon threads: + new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true), + null, + -1, + -1, + resolvedConnectionFactories: _*) connector.setHost(host) connector.setPort(startPort) server.addConnector(connector) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 5a73b1ad1ea2..e25e189aa6d7 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -132,6 +132,24 @@ To specify a main application resource that is in the Docker image, and if it ha --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ container:///home/applications/examples/example.jar +### Setting Up SSL For Submitting the Driver + +When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server +receives the driver's configuration, including uploaded driver jars, from the client before starting the application. +Spark supports using SSL to encrypt the traffic in this bootstrapping process. It is recommended to configure this +whenever possible. + +See the [security page](security.html) and [configuration](configuration.html) sections for more information on +configuring SSL; use the prefix `spark.ssl.kubernetes.driverlaunch` in configuring the SSL-related fields in the context +of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver +pod in starting the application, set `spark.ssl.kubernetes.driverlaunch.trustStore`. + +One note about the keyStore is that it can be specified as either a file on the client machine or a file in the +container image's disk. Thus `spark.ssl.kubernetes.driverlaunch.keyStore` can be a URI with a scheme of either `file:` +or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto +the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme +`container:`, the file is assumed to already be on the container's disk at the appropriate path. + ### Spark Properties Below are some other common properties that are specific to Kubernetes. Most of the other configurations are the same diff --git a/pom.xml b/pom.xml index 810a2f42d251..a27daf08a90b 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,7 @@ 1.8.1 1.6.0 8.18.0 + 1.52 9.2.16.v20160414 3.1.0 0.8.0 @@ -337,7 +338,11 @@ okhttp 3.4.1 - + + org.bouncycastle + bcpkix-jdk15on + ${bouncycastle.version} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 924c9baf4994..2dbd7691a2de 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -16,11 +16,13 @@ */ package org.apache.spark.deploy.kubernetes -import java.io.File -import java.security.SecureRandom +import java.io.{File, FileInputStream} +import java.security.{KeyStore, SecureRandom} import java.util.concurrent.{Executors, TimeoutException, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} +import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} import io.fabric8.kubernetes.api.model._ @@ -28,10 +30,11 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, Kub import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.DurationInt -import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException} +import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging @@ -53,9 +56,12 @@ private[spark] class Client( .getOrElse("spark") private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") private val secretName = s"spark-submission-server-secret-$kubernetesAppId" + private val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" + private val sslSecretsDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId-ssl" + private val sslSecretsName = s"spark-submission-server-ssl-$kubernetesAppId" private val driverLauncherSelectorValue = s"driver-launcher-$launchTime" private val driverDockerImage = sparkConf.get( - "spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION") + "spark.kubernetes.driver.docker.image", s"spark-driver:$sparkVersion") private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars") private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverLaunchTimeoutSecs = sparkConf.getTimeAsSeconds( @@ -80,6 +86,7 @@ private[spark] class Client( .build())) def run(): Unit = { + val (driverLaunchSslOptions, isKeyStoreLocalFile) = parseDriverLaunchSslOptions() val parsedCustomLabels = parseCustomLabels(customLabels) var k8ConfBuilder = new ConfigBuilder() .withApiVersion("v1") @@ -97,13 +104,16 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig))(kubernetesClient => { - val applicationSubmitSecret = kubernetesClient.secrets().createNew() + val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() - .withName(secretName) - .endMetadata() + .withName(secretName) + .endMetadata() .withData(Map((SUBMISSION_SERVER_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() + val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient, + driverLaunchSslOptions, + isKeyStoreLocalFile) try { val driverKubernetesSelectors = (Map( DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, @@ -111,10 +121,14 @@ private[spark] class Client( ++ parsedCustomLabels).asJava val containerPorts = buildContainerPorts() val submitCompletedFuture = SettableFuture.create[Boolean] - val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" val submitPending = new AtomicBoolean(false) - val podWatcher = new DriverPodWatcher(submitCompletedFuture, submitPending, - kubernetesClient, applicationSubmitSecret, driverKubernetesSelectors) + val podWatcher = new DriverPodWatcher( + submitCompletedFuture, + submitPending, + kubernetesClient, + driverLaunchSslOptions, + Array(submitServerSecret) ++ sslSecrets, + driverKubernetesSelectors) Utils.tryWithResource(kubernetesClient .pods() .withLabels(driverKubernetesSelectors) @@ -129,9 +143,10 @@ private[spark] class Client( .addNewVolume() .withName(s"spark-submission-secret-volume") .withNewSecret() - .withSecretName(applicationSubmitSecret.getMetadata.getName) + .withSecretName(submitServerSecret.getMetadata.getName) .endSecret() .endVolume + .addToVolumes(sslVolumes: _*) .withServiceAccount(serviceAccount) .addNewContainer() .withName(DRIVER_LAUNCHER_CONTAINER_NAME) @@ -142,6 +157,7 @@ private[spark] class Client( .withMountPath(secretDirectory) .withReadOnly(true) .endVolumeMount() + .addToVolumeMounts(sslVolumeMounts: _*) .addNewEnv() .withName("SPARK_SUBMISSION_SECRET_LOCATION") .withValue(s"$secretDirectory/$SUBMISSION_SERVER_SECRET_NAME") @@ -150,6 +166,7 @@ private[spark] class Client( .withName("SPARK_DRIVER_LAUNCHER_SERVER_PORT") .withValue(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT.toString) .endEnv() + .addToEnv(sslEnvs: _*) .withPorts(containerPorts.asJava) .endContainer() .endSpec() @@ -165,26 +182,136 @@ private[spark] class Client( throw new SparkException(finalErrorMessage, e) } finally { if (!submitSucceeded) { - try { + Utils.tryLogNonFatalError({ kubernetesClient.pods.withName(kubernetesAppId).delete - } catch { - case throwable: Throwable => - logError("Failed to delete driver pod after it failed to run.", throwable) - } + }) } } } } finally { - kubernetesClient.secrets().delete(applicationSubmitSecret) + Utils.tryLogNonFatalError({ + kubernetesClient.secrets().delete(submitServerSecret) + }) + Utils.tryLogNonFatalError({ + kubernetesClient.secrets().delete(sslSecrets: _*) + }) } }) } + private def parseDriverLaunchSslOptions(): (SSLOptions, Boolean) = { + val maybeKeyStore = sparkConf.getOption("spark.ssl.kubernetes.driverlaunch.keyStore") + val resolvedSparkConf = sparkConf.clone() + val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => { + val keyStoreURI = Utils.resolveURI(keyStore) + val isProvidedKeyStoreLocal = keyStoreURI.getScheme match { + case "file" | null => true + case "container" => false + case _ => throw new SparkException(s"Invalid KeyStore URI $keyStore; keyStore URI" + + " for submit server must have scheme file:// or container:// (no scheme defaults" + + " to file://)") + } + (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath)) + }).getOrElse((true, Option.empty[String])) + resolvedKeyStore.foreach { + resolvedSparkConf.set("spark.ssl.kubernetes.driverlaunch.keyStore", _) + } + sparkConf.getOption("spark.ssl.kubernetes.driverlaunch.trustStore").foreach { trustStore => + val trustStoreURI = Utils.resolveURI(trustStore) + trustStoreURI.getScheme match { + case "file" | null => + resolvedSparkConf.set("spark.ssl.kubernetes.driverlaunch.trustStore", + trustStoreURI.getPath) + case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" + + " for submit server must have no scheme, or scheme file://") + } + } + val securityManager = new SecurityManager(resolvedSparkConf) + (securityManager.getSSLOptions("kubernetes.driverlaunch"), isLocalKeyStore) + } + + private def configureSsl(kubernetesClient: KubernetesClient, driverLaunchSslOptions: SSLOptions, + isKeyStoreLocalFile: Boolean): + (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { + if (driverLaunchSslOptions.enabled) { + val sslSecretsMap = mutable.HashMap[String, String]() + val sslEnvs = mutable.Buffer[EnvVar]() + val secrets = mutable.Buffer[Secret]() + driverLaunchSslOptions.keyStore.foreach(store => { + val resolvedKeyStoreFile = if (isKeyStoreLocalFile) { + if (!store.isFile) { + throw new SparkException(s"KeyStore specified at $store is not a file or" + + s" does not exist.") + } + val keyStoreBytes = Files.toByteArray(store) + val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes) + sslSecretsMap += (SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64) + s"$sslSecretsDirectory/$SSL_KEYSTORE_SECRET_NAME" + } else { + store.getAbsolutePath + } + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_KEYSTORE_FILE") + .withValue(resolvedKeyStoreFile) + .build() + }) + driverLaunchSslOptions.keyStorePassword.foreach(password => { + val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) + sslSecretsMap += (SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64) + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE") + .withValue(s"$sslSecretsDirectory/$SSL_KEYSTORE_PASSWORD_SECRET_NAME") + .build() + }) + driverLaunchSslOptions.keyPassword.foreach(password => { + val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) + sslSecretsMap += (SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64) + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE") + .withValue(s"$sslSecretsDirectory/$SSL_KEY_PASSWORD_SECRET_NAME") + .build() + }) + driverLaunchSslOptions.keyStoreType.foreach(storeType => { + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_KEYSTORE_TYPE") + .withValue(storeType) + .build() + }) + sslEnvs += new EnvVarBuilder() + .withName("SPARK_SUBMISSION_USE_SSL") + .withValue("true") + .build() + val sslSecrets = kubernetesClient.secrets().createNew() + .withNewMetadata() + .withName(sslSecretsName) + .endMetadata() + .withData(sslSecretsMap.asJava) + .withType("Opaque") + .done() + secrets += sslSecrets + val sslVolume = new VolumeBuilder() + .withName("spark-submission-server-ssl-secrets") + .withNewSecret() + .withSecretName(sslSecrets.getMetadata.getName) + .endSecret() + .build() + val sslVolumeMount = new VolumeMountBuilder() + .withName("spark-submission-server-ssl-secrets") + .withReadOnly(true) + .withMountPath(sslSecretsDirectory) + .build() + (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) + } else { + (Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]()) + } + } + private class DriverPodWatcher( submitCompletedFuture: SettableFuture[Boolean], submitPending: AtomicBoolean, kubernetesClient: KubernetesClient, - applicationSubmitSecret: Secret, + driverLaunchSslOptions: SSLOptions, + applicationSecrets: Array[Secret], driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { if ((action == Action.ADDED || action == Action.MODIFIED) @@ -205,8 +332,10 @@ private[spark] class Client( .withController(true) .build()) - applicationSubmitSecret.getMetadata.setOwnerReferences(ownerRefs.asJava) - kubernetesClient.secrets().createOrReplace(applicationSubmitSecret) + applicationSecrets.foreach(secret => { + secret.getMetadata.setOwnerReferences(ownerRefs.asJava) + kubernetesClient.secrets().createOrReplace(secret) + }) val driverLauncherServicePort = new ServicePortBuilder() .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME) @@ -232,7 +361,8 @@ private[spark] class Client( sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) - val driverLauncher = buildDriverLauncherClient(kubernetesClient, service) + val driverLauncher = buildDriverLauncherClient(kubernetesClient, service, + driverLaunchSslOptions) val ping = Retry.retry(5, 5.seconds) { driverLauncher.ping() } @@ -281,13 +411,9 @@ private[spark] class Client( } catch { case e: Throwable => submitCompletedFuture.setException(e) - try { + Utils.tryLogNonFatalError({ kubernetesClient.services().delete(service) - } catch { - case throwable: Throwable => - logError("Submitting the job failed but failed to" + - " clean up the created service.", throwable) - } + }) throw e } case None => @@ -362,19 +488,10 @@ private[spark] class Client( } private def buildContainerPorts(): Seq[ContainerPort] = { - Seq( - new ContainerPortBuilder() - .withContainerPort(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)) - .build(), - new ContainerPortBuilder() - .withContainerPort(sparkConf.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)) - .build(), - new ContainerPortBuilder() - .withContainerPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) - .build(), - new ContainerPortBuilder() - .withContainerPort(uiPort) - .build()) + Seq(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + sparkConf.getInt("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT), + DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT, + uiPort).map(new ContainerPortBuilder().withContainerPort(_).build()) } private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = { @@ -411,7 +528,8 @@ private[spark] class Client( private def buildDriverLauncherClient( kubernetesClient: KubernetesClient, - service: Service): KubernetesSparkRestApi = { + service: Service, + driverLaunchSslOptions: SSLOptions): KubernetesSparkRestApi = { val servicePort = service .getSpec .getPorts @@ -423,8 +541,50 @@ private[spark] class Client( // TODO be resilient to node failures and try all of them val node = kubernetesClient.nodes.list.getItems.asScala.head val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress - val url = s"http://$nodeAddress:$servicePort" - HttpClientUtil.createClient[KubernetesSparkRestApi](uri = url) + val urlScheme = if (driverLaunchSslOptions.enabled) { + "https" + } else { + logWarning("Submitting application details, application secret, and local" + + " jars to the cluster over an insecure connection. You should configure SSL" + + " to secure this step.") + "http" + } + val (trustManager, sslContext): (X509TrustManager, SSLContext) = + if (driverLaunchSslOptions.enabled) { + buildSslConnectionConfiguration(driverLaunchSslOptions) + } else { + (null, SSLContext.getDefault) + } + val url = s"$urlScheme://$nodeAddress:$servicePort" + HttpClientUtil.createClient[KubernetesSparkRestApi]( + url, + sslSocketFactory = sslContext.getSocketFactory, + trustContext = trustManager) + } + + private def buildSslConnectionConfiguration(driverLaunchSslOptions: SSLOptions) = { + driverLaunchSslOptions.trustStore.map(trustStoreFile => { + val trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm) + val trustStore = KeyStore.getInstance( + driverLaunchSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) + if (!trustStoreFile.isFile) { + throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + + s" does not exist or is not a file.") + } + Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => + driverLaunchSslOptions.trustStorePassword match { + case Some(password) => + trustStore.load(trustStoreStream, password.toCharArray) + case None => trustStore.load(trustStoreStream, null) + } + } + trustManagerFactory.init(trustStore) + val trustManagers = trustManagerFactory.getTrustManagers + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(null, trustManagers, SECURE_RANDOM) + (trustManagers(0).asInstanceOf[X509TrustManager], sslContext) + }).getOrElse((null, SSLContext.getDefault)) } private def parseCustomLabels(labels: String): Map[String, String] = { @@ -448,6 +608,9 @@ private[spark] class Client( private[spark] object Client extends Logging { private val SUBMISSION_SERVER_SECRET_NAME = "spark-submission-server-secret" + private val SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore" + private val SSL_KEYSTORE_PASSWORD_SECRET_NAME = "spark-submission-server-keystore-password" + private val SSL_KEY_PASSWORD_SECRET_NAME = "spark-submission-server-key-password" private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector" private val DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT = 7077 private val DEFAULT_DRIVER_PORT = 7078 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 38fa4d1d3f0b..451dc96dd65e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -21,20 +21,26 @@ import java.net.URI import java.util.concurrent.CountDownLatch import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.commons.codec.binary.Base64 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf} +import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} private case class KubernetesSparkRestServerArguments( - val host: Option[String] = None, - val port: Option[Int] = None, - val secretFile: Option[String] = None) { + host: Option[String] = None, + port: Option[Int] = None, + useSsl: Boolean = false, + secretFile: Option[String] = None, + keyStoreFile: Option[String] = None, + keyStorePasswordFile: Option[String] = None, + keyStoreType: Option[String] = None, + keyPasswordFile: Option[String] = None) { def validate(): KubernetesSparkRestServerArguments = { require(host.isDefined, "Hostname not set via --hostname.") require(port.isDefined, "Port not set via --port") @@ -58,6 +64,21 @@ private object KubernetesSparkRestServerArguments { case "--secret-file" :: value :: tail => args = tail resolvedArguments.copy(secretFile = Some(value)) + case "--use-ssl" :: value :: tail => + args = tail + resolvedArguments.copy(useSsl = value.toBoolean) + case "--keystore-file" :: value :: tail => + args = tail + resolvedArguments.copy(keyStoreFile = Some(value)) + case "--keystore-password-file" :: value :: tail => + args = tail + resolvedArguments.copy(keyStorePasswordFile = Some(value)) + case "--keystore-type" :: value :: tail => + args = tail + resolvedArguments.copy(keyStoreType = Some(value)) + case "--keystore-key-password-file" :: value :: tail => + args = tail + resolvedArguments.copy(keyPasswordFile = Some(value)) // TODO polish usage message case Nil => resolvedArguments case unknown => throw new IllegalStateException(s"Unknown argument(s) found: $unknown") @@ -78,8 +99,9 @@ private[spark] class KubernetesSparkRestServer( port: Int, conf: SparkConf, expectedApplicationSecret: Array[Byte], - shutdownLock: CountDownLatch) - extends RestSubmissionServer(host, port, conf) { + shutdownLock: CountDownLatch, + sslOptions: SSLOptions = new SSLOptions) + extends RestSubmissionServer(host, port, conf, sslOptions) { private val SERVLET_LOCK = new Object private val javaExecutable = s"${System.getenv("JAVA_HOME")}/bin/java" @@ -196,7 +218,7 @@ private[spark] class KubernetesSparkRestServer( response.success = true response.submissionId = null response.message = "success" - response.serverSparkVersion = SPARK_VERSION + response.serverSparkVersion = sparkVersion response } case unexpected => @@ -249,6 +271,7 @@ private[spark] class KubernetesSparkRestServer( private[spark] object KubernetesSparkRestServer { private val barrier = new CountDownLatch(1) + def main(args: Array[String]): Unit = { val parsedArguments = KubernetesSparkRestServerArguments.fromArgsArray(args) val secretFile = new File(parsedArguments.secretFile.get) @@ -256,6 +279,24 @@ private[spark] object KubernetesSparkRestServer { throw new IllegalArgumentException(s"Secret file specified by --secret-file" + " is not a file, or does not exist.") } + val sslOptions = if (parsedArguments.useSsl) { + val keyStorePassword = parsedArguments + .keyStorePasswordFile + .map(new File(_)) + .map(Files.toString(_, Charsets.UTF_8)) + val keyPassword = parsedArguments + .keyPasswordFile + .map(new File(_)) + .map(Files.toString(_, Charsets.UTF_8)) + new SSLOptions( + enabled = true, + keyStore = parsedArguments.keyStoreFile.map(new File(_)), + keyStoreType = parsedArguments.keyStoreType, + keyStorePassword = keyStorePassword, + keyPassword = keyPassword) + } else { + new SSLOptions + } val secretBytes = Files.toByteArray(secretFile) val sparkConf = new SparkConf(true) val server = new KubernetesSparkRestServer( @@ -263,7 +304,8 @@ private[spark] object KubernetesSparkRestServer { parsedArguments.port.get, sparkConf, secretBytes, - barrier) + barrier, + sslOptions) server.start() ShutdownHookManager.addShutdownHook(() => { try { diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index 4d345158f356..070008fce741 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -19,5 +19,14 @@ ENV SPARK_HOME /opt/spark WORKDIR /opt/spark -# This class will also require setting a secret via the SPARK_APP_SECRET environment variable -CMD exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer --hostname $HOSTNAME --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT --secret-file $SPARK_SUBMISSION_SECRET_LOCATION +CMD SSL_ARGS="" && \ + if ! [ -z ${SPARK_SUBMISSION_USE_SSL+x} ]; then SSL_ARGS="$SSL_ARGS --use-ssl $SPARK_SUBMISSION_USE_SSL"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-file $SPARK_SUBMISSION_KEYSTORE_FILE"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_TYPE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-type $SPARK_SUBMISSION_KEYSTORE_TYPE"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-password-file $SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \ + exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \ + --hostname $HOSTNAME \ + --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT \ + --secret-file $SPARK_SUBMISSION_SECRET_LOCATION \ + ${SSL_ARGS} diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 569527de8e30..f6a322f18cd7 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -106,6 +106,10 @@ + + org.bouncycastle + bcpkix-jdk15on + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index c4bb389f5ada..3fed2b18dd5b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest +import java.io.File import java.nio.file.Paths import java.util.UUID import java.util.concurrent.TimeUnit @@ -36,7 +37,12 @@ import org.apache.spark.deploy.kubernetes.Client import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 +<<<<<<< HEAD +import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils +||||||| merged common ancestors +======= import org.apache.spark.internal.Logging +>>>>>>> nodeport-upload import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @@ -68,6 +74,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private val NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "") private var minikubeKubernetesClient: KubernetesClient = _ private var clientConfig: Config = _ + private var keyStoreFile: File = _ + private var trustStoreFile: File = _ override def beforeAll(): Unit = { Minikube.startMinikube() @@ -79,6 +87,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .done() minikubeKubernetesClient = Minikube.getKubernetesClient.inNamespace(NAMESPACE) clientConfig = minikubeKubernetesClient.getConfiguration + val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( + Minikube.getMinikubeIp, + "changeit", + "changeit", + "changeit") + keyStoreFile = keyStore + trustStoreFile = trustStore } before { @@ -296,4 +311,32 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1") assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2") } + + test("Enable SSL on the driver submit server") { + val args = Array( + "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", + "--deploy-mode", "cluster", + "--kubernetes-namespace", NAMESPACE, + "--name", "spark-pi", + "--executor-memory", "512m", + "--executor-cores", "1", + "--num-executors", "1", + "--upload-jars", HELPER_JAR, + "--class", MAIN_CLASS, + "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", + "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", + "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", + "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", + "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", + "--conf", "spark.ssl.kubernetes.driverlaunch.enabled=true", + "--conf", "spark.ssl.kubernetes.driverlaunch.keyStore=" + + s"file://${keyStoreFile.getAbsolutePath}", + "--conf", "spark.ssl.kubernetes.driverlaunch.keyStorePassword=changeit", + "--conf", "spark.ssl.kubernetes.driverlaunch.keyPassword=changeit", + "--conf", "spark.ssl.kubernetes.driverlaunch.trustStore=" + + s"file://${trustStoreFile.getAbsolutePath}", + "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", + EXAMPLES_JAR) + SparkSubmit.main(args) + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala new file mode 100644 index 000000000000..bde7b4322666 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.sslutil + +import java.io.{File, FileOutputStream} +import java.math.BigInteger +import java.nio.file.Files +import java.security.{KeyPairGenerator, KeyStore, SecureRandom} +import java.util.{Calendar, Random} +import javax.security.auth.x500.X500Principal + +import org.bouncycastle.asn1.x509.{Extension, GeneralName, GeneralNames} +import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3CertificateBuilder} +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder + +import org.apache.spark.util.Utils + +private[spark] object SSLUtils { + + def generateKeyStoreTrustStorePair( + ipAddress: String, + keyStorePassword: String, + keyPassword: String, + trustStorePassword: String): (File, File) = { + val keyPairGenerator = KeyPairGenerator.getInstance("RSA") + keyPairGenerator.initialize(512) + val keyPair = keyPairGenerator.generateKeyPair() + val selfPrincipal = new X500Principal(s"cn=$ipAddress") + val currentDate = Calendar.getInstance + val validForOneHundredYears = Calendar.getInstance + validForOneHundredYears.add(Calendar.YEAR, 100) + val certificateBuilder = new JcaX509v3CertificateBuilder( + selfPrincipal, + new BigInteger(4096, new Random()), + currentDate.getTime, + validForOneHundredYears.getTime, + selfPrincipal, + keyPair.getPublic) + certificateBuilder.addExtension(Extension.subjectAlternativeName, false, + new GeneralNames(new GeneralName(GeneralName.iPAddress, ipAddress))) + val signer = new JcaContentSignerBuilder("SHA1WithRSA") + .setSecureRandom(new SecureRandom()) + .build(keyPair.getPrivate) + val bcCertificate = certificateBuilder.build(signer) + val jcaCertificate = new JcaX509CertificateConverter().getCertificate(bcCertificate) + val keyStore = KeyStore.getInstance("JKS") + keyStore.load(null, null) + keyStore.setKeyEntry("key", keyPair.getPrivate, + keyPassword.toCharArray, Array(jcaCertificate)) + val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile() + tempDir.deleteOnExit() + val keyStoreFile = new File(tempDir, "keyStore.jks") + Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { + keyStore.store(_, keyStorePassword.toCharArray) + } + val trustStore = KeyStore.getInstance("JKS") + trustStore.load(null, null) + trustStore.setCertificateEntry("key", jcaCertificate) + val trustStoreFile = new File(tempDir, "trustStore.jks") + Utils.tryWithResource(new FileOutputStream(trustStoreFile)) { + trustStore.store(_, trustStorePassword.toCharArray) + } + (keyStoreFile, trustStoreFile) + } + +} From 2f667ce87e6b792acbf8bc120310f47c44fc70a0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 27 Jan 2017 14:36:42 -0800 Subject: [PATCH 09/11] Fix bad merge --- .../deploy/kubernetes/integrationtest/KubernetesSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 3fed2b18dd5b..13edea02dce9 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -37,12 +37,7 @@ import org.apache.spark.deploy.kubernetes.Client import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 -<<<<<<< HEAD import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils -||||||| merged common ancestors -======= -import org.apache.spark.internal.Logging ->>>>>>> nodeport-upload import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils From 45b063ccb11d634dd51fabc2be6da4ab41cc7289 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 30 Jan 2017 10:58:12 -0800 Subject: [PATCH 10/11] Remove unnecessary braces --- .../apache/spark/deploy/kubernetes/Client.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 2dbd7691a2de..a9bbafb0b5b2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -103,7 +103,7 @@ private[spark] class Client( } val k8ClientConfig = k8ConfBuilder.build - Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig))(kubernetesClient => { + Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -182,19 +182,19 @@ private[spark] class Client( throw new SparkException(finalErrorMessage, e) } finally { if (!submitSucceeded) { - Utils.tryLogNonFatalError({ - kubernetesClient.pods.withName(kubernetesAppId).delete - }) + Utils.tryLogNonFatalError { + kubernetesClient.pods.withName(kubernetesAppId).delete() + } } } } } finally { - Utils.tryLogNonFatalError({ + Utils.tryLogNonFatalError { kubernetesClient.secrets().delete(submitServerSecret) - }) - Utils.tryLogNonFatalError({ + } + Utils.tryLogNonFatalError { kubernetesClient.secrets().delete(sslSecrets: _*) - }) + } } }) } From b32d6310247254a375b78e6d929ad77a8120dff0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 30 Jan 2017 11:53:28 -0800 Subject: [PATCH 11/11] Fix compiler error --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index a9bbafb0b5b2..07a45c7577bc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -196,7 +196,7 @@ private[spark] class Client( kubernetesClient.secrets().delete(sslSecrets: _*) } } - }) + } } private def parseDriverLaunchSslOptions(): (SSLOptions, Boolean) = {