diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
index b30c980e95a9a..524726c2ccf92 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -19,16 +19,16 @@ package org.apache.spark.deploy.rest
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import scala.io.Source
-
import com.fasterxml.jackson.core.JsonProcessingException
-import org.eclipse.jetty.server.{HttpConnectionFactory, Server, ServerConnector}
+import org.eclipse.jetty.http.HttpVersion
+import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory}
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}
import org.json4s._
import org.json4s.jackson.JsonMethods._
+import scala.io.Source
-import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SSLOptions}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@@ -50,7 +50,8 @@ import org.apache.spark.util.Utils
private[spark] abstract class RestSubmissionServer(
val host: String,
val requestedPort: Int,
- val masterConf: SparkConf) extends Logging {
+ val masterConf: SparkConf,
+ val sslOptions: SSLOptions = SSLOptions()) extends Logging {
protected val submitRequestServlet: SubmitRequestServlet
protected val killRequestServlet: KillRequestServlet
protected val statusRequestServlet: StatusRequestServlet
@@ -79,19 +80,32 @@ private[spark] abstract class RestSubmissionServer(
* Return a 2-tuple of the started server and the bound port.
*/
private def doStart(startPort: Int): (Server, Int) = {
+ // TODO consider using JettyUtils#startServer to do this instead
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
val server = new Server(threadPool)
+ val resolvedConnectionFactories = sslOptions
+ .createJettySslContextFactory()
+ .map(sslFactory => {
+ val sslConnectionFactory = new SslConnectionFactory(
+ sslFactory, HttpVersion.HTTP_1_1.asString())
+ val rawHttpConfiguration = new HttpConfiguration()
+ rawHttpConfiguration.setSecureScheme("https")
+ rawHttpConfiguration.setSecurePort(startPort)
+ val rawHttpConnectionFactory = new HttpConnectionFactory(rawHttpConfiguration)
+ Array(sslConnectionFactory, rawHttpConnectionFactory)
+ }).getOrElse(Array(new HttpConnectionFactory()))
+
val connector = new ServerConnector(
- server,
- null,
- // Call this full constructor to set this, which forces daemon threads:
- new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true),
- null,
- -1,
- -1,
- new HttpConnectionFactory())
+ server,
+ null,
+ // Call this full constructor to set this, which forces daemon threads:
+ new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true),
+ null,
+ -1,
+ -1,
+ resolvedConnectionFactories: _*)
connector.setHost(host)
connector.setPort(startPort)
server.addConnector(connector)
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 5a73b1ad1ea29..283ee078e783b 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -132,6 +132,24 @@ To specify a main application resource that is in the Docker image, and if it ha
--conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
container:///home/applications/examples/example.jar
+### Setting Up SSL For Submitting the Driver
+
+When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server
+receives the driver's configuration, including uploaded driver jars, from the client before starting the application.
+Spark supports using SSL to encrypt the traffic in this bootstrapping process. It is recommended to configure this
+whenever possible.
+
+See the [security page](security.html) and [configuration](configuration.html) sections for more information on
+configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context
+of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver
+pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`.
+
+One note about the keyStore is that it can be specified as either a file on the client machine or a file in the
+container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:`
+or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto
+the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme
+`container:`, the file is assumed to already be on the container's disk at the appropriate path.
+
### Spark Properties
Below are some other common properties that are specific to Kubernetes. Most of the other configurations are the same
@@ -220,6 +238,13 @@ from the other deployment modes. See the [configuration page](configuration.html
where each label is in the format key=value.
+
+ spark.kubernetes.driverSubmitTimeout |
+ 60s |
+
+ Time to wait for the driver pod to be initially ready before aborting the job.
+ |
+
## Current Limitations
diff --git a/pom.xml b/pom.xml
index 810a2f42d2516..a27daf08a90bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,6 +137,7 @@
1.8.1
1.6.0
8.18.0
+ 1.52
9.2.16.v20160414
3.1.0
0.8.0
@@ -337,7 +338,11 @@
okhttp
3.4.1
-
+
+ org.bouncycastle
+ bcpkix-jdk15on
+ ${bouncycastle.version}
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
index 77b7c793dc37e..c761f783b21fb 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
@@ -16,25 +16,27 @@
*/
package org.apache.spark.deploy.kubernetes
-import java.io.File
-import java.security.SecureRandom
+import java.io.{File, FileInputStream}
+import java.security.{KeyStore, SecureRandom}
import java.util.concurrent.{Executors, TimeoutException, TimeUnit}
-import javax.net.ssl.X509TrustManager
+import java.util.concurrent.atomic.AtomicBoolean
+import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}
+import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder}
import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClientException, Watch, Watcher}
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
-import io.fabric8.kubernetes.client.internal.SSLUtils
import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
-import scala.util.Success
-import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions}
+import org.apache.spark.deploy.kubernetes.config._
+import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
@@ -47,21 +49,23 @@ private[spark] class Client(
appArgs: Array[String]) extends Logging {
import Client._
- private val namespace = sparkConf.get("spark.kubernetes.namespace", "default")
+ private val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
private val master = resolveK8sMaster(sparkConf.get("spark.master"))
private val launchTime = System.currentTimeMillis
private val appName = sparkConf.getOption("spark.app.name")
.orElse(sparkConf.getOption("spark.app.id"))
.getOrElse("spark")
- private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
- private val secretName = s"spark-submission-server-secret-$kubernetesAppId"
- private val driverLauncherSelectorValue = s"driver-launcher-$launchTime"
- private val driverDockerImage = sparkConf.get(
- "spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION")
- private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars")
- private val driverLaunchTimeoutSecs = sparkConf.getTimeAsSeconds(
- "spark.kubernetes.driverLaunchTimeout", s"${DEFAULT_LAUNCH_TIMEOUT_SECONDS}s")
+ private val kubernetesAppId = sparkConf
+ .get("spark.app.id", s"$appName-$launchTime").toLowerCase.replaceAll("\\.", "-")
+ private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
+ private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId"
+ private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
+ private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
+ private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
+ private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS)
+ private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
+ private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
private val secretBase64String = {
val secretBytes = new Array[Byte](128)
@@ -69,10 +73,8 @@ private[spark] class Client(
Base64.encodeBase64String(secretBytes)
}
- private val serviceAccount = sparkConf.get("spark.kubernetes.submit.serviceAccountName",
- "default")
-
- private val customLabels = sparkConf.get("spark.kubernetes.driver.labels", "")
+ private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+ private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS)
private implicit val retryableExecutionContext = ExecutionContext
.fromExecutorService(
@@ -82,282 +84,417 @@ private[spark] class Client(
.build()))
def run(): Unit = {
+ val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
val parsedCustomLabels = parseCustomLabels(customLabels)
var k8ConfBuilder = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withNamespace(namespace)
- sparkConf.getOption("spark.kubernetes.submit.caCertFile").foreach {
+ sparkConf.get(KUBERNETES_CA_CERT_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f)
}
- sparkConf.getOption("spark.kubernetes.submit.clientKeyFile").foreach {
+ sparkConf.get(KUBERNETES_CLIENT_KEY_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f)
}
- sparkConf.getOption("spark.kubernetes.submit.clientCertFile").foreach {
+ sparkConf.get(KUBERNETES_CLIENT_CERT_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f)
}
val k8ClientConfig = k8ConfBuilder.build
- Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig))(kubernetesClient => {
- val secret = kubernetesClient.secrets().createNew()
+ Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient =>
+ val submitServerSecret = kubernetesClient.secrets().createNew()
.withNewMetadata()
- .withName(secretName)
- .endMetadata()
- .withData(Map((SUBMISSION_SERVER_SECRET_NAME, secretBase64String)).asJava)
+ .withName(secretName)
+ .endMetadata()
+ .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava)
.withType("Opaque")
.done()
+ val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient,
+ driverSubmitSslOptions,
+ isKeyStoreLocalFile)
try {
- val resolvedSelectors = (Map(
- DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue,
+ val driverKubernetesSelectors = (Map(
+ SPARK_DRIVER_LABEL -> kubernetesAppId,
+ SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
- val (servicePorts, containerPorts) = configurePorts()
- val service = kubernetesClient.services().createNew()
- .withNewMetadata()
- .withName(kubernetesAppId)
- .withLabels(Map(SPARK_APP_NAME_LABEL -> appName).asJava)
- .endMetadata()
- .withNewSpec()
- .withSelector(resolvedSelectors)
- .withPorts(servicePorts.asJava)
- .endSpec()
- .done()
- sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName)
- sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId)
-
- sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
- sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
- val submitRequest = buildSubmissionRequest()
+ val containerPorts = buildContainerPorts()
val submitCompletedFuture = SettableFuture.create[Boolean]
- val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId"
-
- val podWatcher = new Watcher[Pod] {
- override def eventReceived(action: Action, t: Pod): Unit = {
- if (action == Action.ADDED) {
- val ownerRefs = new ArrayBuffer[OwnerReference]
- ownerRefs += new OwnerReferenceBuilder()
- .withApiVersion(t.getApiVersion)
- .withController(true)
- .withKind(t.getKind)
- .withName(t.getMetadata.getName)
- .withUid(t.getMetadata.getUid)
- .build()
-
- secret.getMetadata().setOwnerReferences(ownerRefs.asJava)
- kubernetesClient.secrets().createOrReplace(secret)
-
- service.getMetadata().setOwnerReferences(ownerRefs.asJava)
- kubernetesClient.services().createOrReplace(service)
- }
-
- if ((action == Action.ADDED || action == Action.MODIFIED)
- && t.getStatus.getPhase == "Running"
- && !submitCompletedFuture.isDone) {
- t.getStatus
- .getContainerStatuses
- .asScala
- .find(status =>
- status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match {
- case Some(_) =>
- try {
- val driverLauncher = getDriverLauncherService(
- k8ClientConfig, master)
- val ping = Retry.retry(5, 5.seconds) {
- driverLauncher.ping()
- }
- ping onFailure {
- case t: Throwable =>
- if (!submitCompletedFuture.isDone) {
- submitCompletedFuture.setException(t)
- }
- }
- val submitComplete = ping andThen {
- case Success(_) =>
- driverLauncher.create(submitRequest)
- submitCompletedFuture.set(true)
- }
- submitComplete onFailure {
- case t: Throwable =>
- if (!submitCompletedFuture.isDone) {
- submitCompletedFuture.setException(t)
- }
- }
- } catch {
- case e: Throwable =>
- if (!submitCompletedFuture.isDone) {
- submitCompletedFuture.setException(e)
- throw e
- }
- }
- case None =>
- }
- }
- }
-
- override def onClose(e: KubernetesClientException): Unit = {
- if (!submitCompletedFuture.isDone) {
- submitCompletedFuture.setException(e)
- }
- }
- }
-
- def createDriverPod(unused: Watch): Unit = {
+ val submitPending = new AtomicBoolean(false)
+ val podWatcher = new DriverPodWatcher(
+ submitCompletedFuture,
+ submitPending,
+ kubernetesClient,
+ driverSubmitSslOptions,
+ Array(submitServerSecret) ++ sslSecrets,
+ driverKubernetesSelectors)
+ Utils.tryWithResource(kubernetesClient
+ .pods()
+ .withLabels(driverKubernetesSelectors)
+ .watch(podWatcher)) { _ =>
kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
- .withLabels(resolvedSelectors)
+ .withLabels(driverKubernetesSelectors)
.endMetadata()
.withNewSpec()
.withRestartPolicy("OnFailure")
.addNewVolume()
- .withName(s"spark-submission-secret-volume")
- .withNewSecret()
- .withSecretName(secret.getMetadata.getName)
+ .withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
+ .withNewSecret()
+ .withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume
+ .addToVolumes(sslVolumes: _*)
.withServiceAccount(serviceAccount)
.addNewContainer()
- .withName(DRIVER_LAUNCHER_CONTAINER_NAME)
+ .withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
- .withName("spark-submission-secret-volume")
+ .withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
+ .addToVolumeMounts(sslVolumeMounts: _*)
.addNewEnv()
- .withName("SPARK_SUBMISSION_SECRET_LOCATION")
- .withValue(s"$secretDirectory/$SUBMISSION_SERVER_SECRET_NAME")
+ .withName(ENV_SUBMISSION_SECRET_LOCATION)
+ .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
.endEnv()
.addNewEnv()
- .withName("SPARK_DRIVER_LAUNCHER_SERVER_PORT")
- .withValue(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT.toString)
+ .withName(ENV_SUBMISSION_SERVER_PORT)
+ .withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
+ .addToEnv(sslEnvs: _*)
.withPorts(containerPorts.asJava)
.endContainer()
.endSpec()
.done()
var submitSucceeded = false
try {
- submitCompletedFuture.get(driverLaunchTimeoutSecs, TimeUnit.SECONDS)
+ submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
submitSucceeded = true
} catch {
case e: TimeoutException =>
- val driverPod = try {
- kubernetesClient.pods().withName(kubernetesAppId).get()
- } catch {
- case throwable: Throwable =>
- logError(s"Timed out while waiting $driverLaunchTimeoutSecs seconds for the" +
- " driver pod to start, but an error occurred while fetching the driver" +
- " pod's details.", throwable)
- throw new SparkException(s"Timed out while waiting $driverLaunchTimeoutSecs" +
- " seconds for the driver pod to start. Unfortunately, in attempting to fetch" +
- " the latest state of the pod, another error was thrown. Check the logs for" +
- " the error that was thrown in looking up the driver pod.", e)
- }
- val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" +
- s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" +
- s" $driverLaunchTimeoutSecs seconds."
- val podStatusPhase = if (driverPod.getStatus.getPhase != null) {
- s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}"
- } else {
- "The pod had no final phase."
- }
- val podStatusMessage = if (driverPod.getStatus.getMessage != null) {
- s"Latest message from the pod is: ${driverPod.getStatus.getMessage}"
- } else {
- "The pod had no final message."
- }
- val failedDriverContainerStatusString = driverPod.getStatus
- .getContainerStatuses
- .asScala
- .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME)
- .map(status => {
- val lastState = status.getState
- if (lastState.getRunning != null) {
- "Driver container last state: Running\n" +
- s"Driver container started at: ${lastState.getRunning.getStartedAt}"
- } else if (lastState.getWaiting != null) {
- "Driver container last state: Waiting\n" +
- s"Driver container wait reason: ${lastState.getWaiting.getReason}\n" +
- s"Driver container message: ${lastState.getWaiting.getMessage}\n"
- } else if (lastState.getTerminated != null) {
- "Driver container last state: Terminated\n" +
- s"Driver container started at: ${lastState.getTerminated.getStartedAt}\n" +
- s"Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n" +
- s"Driver container exit reason: ${lastState.getTerminated.getReason}\n" +
- s"Driver container exit code: ${lastState.getTerminated.getExitCode}\n" +
- s"Driver container message: ${lastState.getTerminated.getMessage}"
- } else {
- "Driver container last state: Unknown"
- }
- }).getOrElse("The driver container wasn't found in the pod; expected to find" +
- s" container with name $DRIVER_LAUNCHER_CONTAINER_NAME")
- val finalErrorMessage = s"$topLevelMessage\n" +
- s"$podStatusPhase\n" +
- s"$podStatusMessage\n\n$failedDriverContainerStatusString"
+ val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e)
logError(finalErrorMessage, e)
throw new SparkException(finalErrorMessage, e)
- } finally {
- if (!submitSucceeded) {
- try {
- kubernetesClient.pods.withName(kubernetesAppId).delete
- } catch {
- case throwable: Throwable =>
- logError("Failed to delete driver pod after it failed to run.", throwable)
- }
+ } finally {
+ if (!submitSucceeded) {
+ Utils.tryLogNonFatalError {
+ kubernetesClient.pods.withName(kubernetesAppId).delete()
}
}
}
-
- Utils.tryWithResource(kubernetesClient
- .pods()
- .withLabels(resolvedSelectors)
- .watch(podWatcher)) { createDriverPod }
+ }
} finally {
- kubernetesClient.secrets().delete(secret)
+ Utils.tryLogNonFatalError {
+ kubernetesClient.secrets().delete(submitServerSecret)
+ }
+ Utils.tryLogNonFatalError {
+ kubernetesClient.secrets().delete(sslSecrets: _*)
+ }
}
- })
+ }
}
- private def configurePorts(): (Seq[ServicePort], Seq[ContainerPort]) = {
- val servicePorts = new ArrayBuffer[ServicePort]
- val containerPorts = new ArrayBuffer[ContainerPort]
+ private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = {
+ val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE)
+ val resolvedSparkConf = sparkConf.clone()
+ val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => {
+ val keyStoreURI = Utils.resolveURI(keyStore)
+ val isProvidedKeyStoreLocal = keyStoreURI.getScheme match {
+ case "file" | null => true
+ case "container" => false
+ case _ => throw new SparkException(s"Invalid KeyStore URI $keyStore; keyStore URI" +
+ " for submit server must have scheme file:// or container:// (no scheme defaults" +
+ " to file://)")
+ }
+ (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath))
+ }).getOrElse((true, Option.empty[String]))
+ resolvedKeyStore.foreach {
+ resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _)
+ }
+ sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore =>
+ val trustStoreURI = Utils.resolveURI(trustStore)
+ trustStoreURI.getScheme match {
+ case "file" | null =>
+ resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, trustStoreURI.getPath)
+ case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" +
+ " for submit server must have no scheme, or scheme file://")
+ }
+ }
+ val securityManager = new SecurityManager(resolvedSparkConf)
+ (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
+ }
- def addPortToServiceAndContainer(portName: String, portValue: Int): Unit = {
- servicePorts += new ServicePortBuilder()
- .withName(portName)
- .withPort(portValue)
- .withNewTargetPort(portValue)
+ private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions,
+ isKeyStoreLocalFile: Boolean):
+ (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
+ if (driverSubmitSslOptions.enabled) {
+ val sslSecretsMap = mutable.HashMap[String, String]()
+ val sslEnvs = mutable.Buffer[EnvVar]()
+ val secrets = mutable.Buffer[Secret]()
+ driverSubmitSslOptions.keyStore.foreach(store => {
+ val resolvedKeyStoreFile = if (isKeyStoreLocalFile) {
+ if (!store.isFile) {
+ throw new SparkException(s"KeyStore specified at $store is not a file or" +
+ s" does not exist.")
+ }
+ val keyStoreBytes = Files.toByteArray(store)
+ val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes)
+ sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64)
+ s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME"
+ } else {
+ store.getAbsolutePath
+ }
+ sslEnvs += new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_KEYSTORE_FILE)
+ .withValue(resolvedKeyStoreFile)
+ .build()
+ })
+ driverSubmitSslOptions.keyStorePassword.foreach(password => {
+ val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
+ sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64)
+ sslEnvs += new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
+ .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
+ .build()
+ })
+ driverSubmitSslOptions.keyPassword.foreach(password => {
+ val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
+ sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64)
+ sslEnvs += new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
+ .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
+ .build()
+ })
+ driverSubmitSslOptions.keyStoreType.foreach(storeType => {
+ sslEnvs += new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_KEYSTORE_TYPE)
+ .withValue(storeType)
+ .build()
+ })
+ sslEnvs += new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_USE_SSL)
+ .withValue("true")
+ .build()
+ val sslSecrets = kubernetesClient.secrets().createNew()
+ .withNewMetadata()
+ .withName(sslSecretsName)
+ .endMetadata()
+ .withData(sslSecretsMap.asJava)
+ .withType("Opaque")
+ .done()
+ secrets += sslSecrets
+ val sslVolume = new VolumeBuilder()
+ .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
+ .withNewSecret()
+ .withSecretName(sslSecrets.getMetadata.getName)
+ .endSecret()
.build()
- containerPorts += new ContainerPortBuilder()
- .withContainerPort(portValue)
+ val sslVolumeMount = new VolumeMountBuilder()
+ .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
+ .withReadOnly(true)
+ .withMountPath(sslSecretsDirectory)
.build()
+ (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
+ } else {
+ (Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]())
+ }
+ }
+
+ private class DriverPodWatcher(
+ submitCompletedFuture: SettableFuture[Boolean],
+ submitPending: AtomicBoolean,
+ kubernetesClient: KubernetesClient,
+ driverSubmitSslOptions: SSLOptions,
+ applicationSecrets: Array[Secret],
+ driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] {
+ override def eventReceived(action: Action, pod: Pod): Unit = {
+ if ((action == Action.ADDED || action == Action.MODIFIED)
+ && pod.getStatus.getPhase == "Running"
+ && !submitCompletedFuture.isDone) {
+ if (!submitPending.getAndSet(true)) {
+ pod.getStatus
+ .getContainerStatuses
+ .asScala
+ .find(status =>
+ status.getName == DRIVER_CONTAINER_NAME && status.getReady) match {
+ case Some(_) =>
+ val ownerRefs = Seq(new OwnerReferenceBuilder()
+ .withName(pod.getMetadata.getName)
+ .withUid(pod.getMetadata.getUid)
+ .withApiVersion(pod.getApiVersion)
+ .withKind(pod.getKind)
+ .withController(true)
+ .build())
+
+ applicationSecrets.foreach(secret => {
+ secret.getMetadata.setOwnerReferences(ownerRefs.asJava)
+ kubernetesClient.secrets().createOrReplace(secret)
+ })
+
+ val driverSubmissionServicePort = new ServicePortBuilder()
+ .withName(SUBMISSION_SERVER_PORT_NAME)
+ .withPort(SUBMISSION_SERVER_PORT)
+ .withNewTargetPort(SUBMISSION_SERVER_PORT)
+ .build()
+ val service = kubernetesClient.services().createNew()
+ .withNewMetadata()
+ .withName(kubernetesAppId)
+ .withLabels(driverKubernetesSelectors)
+ .withOwnerReferences(ownerRefs.asJava)
+ .endMetadata()
+ .withNewSpec()
+ .withType("NodePort")
+ .withSelector(driverKubernetesSelectors)
+ .withPorts(driverSubmissionServicePort)
+ .endSpec()
+ .done()
+ try {
+ sparkConf.set(KUBERNETES_DRIVER_POD_NAME, kubernetesAppId)
+ sparkConf.set(KUBERNETES_DRIVER_SERVICE_NAME, service.getMetadata.getName)
+ sparkConf.setIfMissing("spark.app.id", kubernetesAppId)
+ sparkConf.setIfMissing("spark.app.name", appName)
+ sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
+ sparkConf.setIfMissing("spark.blockmanager.port",
+ DEFAULT_BLOCKMANAGER_PORT.toString)
+ val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, service,
+ driverSubmitSslOptions)
+ val ping = Retry.retry(5, 5.seconds) {
+ driverSubmitter.ping()
+ }
+ ping onFailure {
+ case t: Throwable =>
+ submitCompletedFuture.setException(t)
+ kubernetesClient.services().delete(service)
+ }
+ val submitComplete = ping.flatMap { _ =>
+ Future {
+ sparkConf.set("spark.driver.host", pod.getStatus.getPodIP)
+ val submitRequest = buildSubmissionRequest()
+ driverSubmitter.submitApplication(submitRequest)
+ }
+ }
+ submitComplete onFailure {
+ case t: Throwable =>
+ submitCompletedFuture.setException(t)
+ kubernetesClient.services().delete(service)
+ }
+ val adjustServicePort = submitComplete.flatMap { _ =>
+ Future {
+ // After submitting, adjust the service to only expose the Spark UI
+ val uiServicePort = new ServicePortBuilder()
+ .withName(UI_PORT_NAME)
+ .withPort(uiPort)
+ .withNewTargetPort(uiPort)
+ .build()
+ kubernetesClient.services().withName(kubernetesAppId).edit()
+ .editSpec()
+ .withType("ClusterIP")
+ .withPorts(uiServicePort)
+ .endSpec()
+ .done
+ }
+ }
+ adjustServicePort onSuccess {
+ case _ =>
+ submitCompletedFuture.set(true)
+ }
+ adjustServicePort onFailure {
+ case throwable: Throwable =>
+ submitCompletedFuture.setException(throwable)
+ kubernetesClient.services().delete(service)
+ }
+ } catch {
+ case e: Throwable =>
+ submitCompletedFuture.setException(e)
+ Utils.tryLogNonFatalError({
+ kubernetesClient.services().delete(service)
+ })
+ throw e
+ }
+ case None =>
+ }
+ }
+ }
+ }
+
+ override def onClose(e: KubernetesClientException): Unit = {
+ if (!submitCompletedFuture.isDone) {
+ submitCompletedFuture.setException(e)
+ }
}
+ }
- addPortToServiceAndContainer(
- DRIVER_LAUNCHER_SERVICE_PORT_NAME,
- DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT)
- addPortToServiceAndContainer(
- DRIVER_PORT_NAME,
- sparkConf
- .getOption("spark.driver.port")
- .map(_.toInt)
- .getOrElse(DEFAULT_DRIVER_PORT))
- addPortToServiceAndContainer(
- BLOCKMANAGER_PORT_NAME,
- sparkConf
- .getOption("spark.blockmanager.port")
- .map(_.toInt)
- .getOrElse(DEFAULT_BLOCKMANAGER_PORT))
+ private def buildSubmitFailedErrorMessage(
+ kubernetesClient: DefaultKubernetesClient,
+ e: TimeoutException): String = {
+ val driverPod = try {
+ kubernetesClient.pods().withName(kubernetesAppId).get()
+ } catch {
+ case throwable: Throwable =>
+ logError(s"Timed out while waiting $driverSubmitTimeoutSecs seconds for the" +
+ " driver pod to start, but an error occurred while fetching the driver" +
+ " pod's details.", throwable)
+ throw new SparkException(s"Timed out while waiting $driverSubmitTimeoutSecs" +
+ " seconds for the driver pod to start. Unfortunately, in attempting to fetch" +
+ " the latest state of the pod, another error was thrown. Check the logs for" +
+ " the error that was thrown in looking up the driver pod.", e)
+ }
+ val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" +
+ s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" +
+ s" $driverSubmitTimeoutSecs seconds."
+ val podStatusPhase = if (driverPod.getStatus.getPhase != null) {
+ s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}"
+ } else {
+ "The pod had no final phase."
+ }
+ val podStatusMessage = if (driverPod.getStatus.getMessage != null) {
+ s"Latest message from the pod is: ${driverPod.getStatus.getMessage}"
+ } else {
+ "The pod had no final message."
+ }
+ val failedDriverContainerStatusString = driverPod.getStatus
+ .getContainerStatuses
+ .asScala
+ .find(_.getName == DRIVER_CONTAINER_NAME)
+ .map(status => {
+ val lastState = status.getState
+ if (lastState.getRunning != null) {
+ "Driver container last state: Running\n" +
+ s"Driver container started at: ${lastState.getRunning.getStartedAt}"
+ } else if (lastState.getWaiting != null) {
+ "Driver container last state: Waiting\n" +
+ s"Driver container wait reason: ${lastState.getWaiting.getReason}\n" +
+ s"Driver container message: ${lastState.getWaiting.getMessage}\n"
+ } else if (lastState.getTerminated != null) {
+ "Driver container last state: Terminated\n" +
+ s"Driver container started at: ${lastState.getTerminated.getStartedAt}\n" +
+ s"Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n" +
+ s"Driver container exit reason: ${lastState.getTerminated.getReason}\n" +
+ s"Driver container exit code: ${lastState.getTerminated.getExitCode}\n" +
+ s"Driver container message: ${lastState.getTerminated.getMessage}"
+ } else {
+ "Driver container last state: Unknown"
+ }
+ }).getOrElse("The driver container wasn't found in the pod; expected to find" +
+ s" container with name $DRIVER_CONTAINER_NAME")
+ s"$topLevelMessage\n" +
+ s"$podStatusPhase\n" +
+ s"$podStatusMessage\n\n$failedDriverContainerStatusString"
+ }
- addPortToServiceAndContainer(
- UI_PORT_NAME,
- sparkConf
- .getOption("spark.ui.port")
- .map(_.toInt)
- .getOrElse(DEFAULT_UI_PORT))
- (servicePorts, containerPorts)
+ private def buildContainerPorts(): Seq[ContainerPort] = {
+ Seq((DRIVER_PORT_NAME, sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)),
+ (BLOCK_MANAGER_PORT_NAME,
+ sparkConf.getInt("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT)),
+ (SUBMISSION_SERVER_PORT_NAME, SUBMISSION_SERVER_PORT),
+ (UI_PORT_NAME, uiPort)).map(port => new ContainerPortBuilder()
+ .withName(port._1)
+ .withContainerPort(port._2)
+ .build())
}
private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = {
@@ -392,61 +529,90 @@ private[spark] class Client(
.map(CompressionUtils.createTarGzip(_))
}
- private def getDriverLauncherService(
- k8ClientConfig: Config,
- kubernetesMaster: String): KubernetesSparkRestApi = {
- val url = s"${
- Array[String](
- kubernetesMaster,
- "api", "v1", "proxy",
- "namespaces", namespace,
- "services", kubernetesAppId).mkString("/")}" +
- s":$DRIVER_LAUNCHER_SERVICE_PORT_NAME/"
-
- val sslContext = SSLUtils.sslContext(k8ClientConfig)
- val trustManager = SSLUtils.trustManagers(
- k8ClientConfig)(0).asInstanceOf[X509TrustManager]
+ private def buildDriverSubmissionClient(
+ kubernetesClient: KubernetesClient,
+ service: Service,
+ driverSubmitSslOptions: SSLOptions): KubernetesSparkRestApi = {
+ val servicePort = service
+ .getSpec
+ .getPorts
+ .asScala
+ .filter(_.getName == SUBMISSION_SERVER_PORT_NAME)
+ .head
+ .getNodePort
+ // NodePort is exposed on every node, so just pick one of them.
+ // TODO be resilient to node failures and try all of them
+ val node = kubernetesClient.nodes.list.getItems.asScala.head
+ val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress
+ val urlScheme = if (driverSubmitSslOptions.enabled) {
+ "https"
+ } else {
+ logWarning("Submitting application details, application secret, and local" +
+ " jars to the cluster over an insecure connection. You should configure SSL" +
+ " to secure this step.")
+ "http"
+ }
+ val (trustManager, sslContext): (X509TrustManager, SSLContext) =
+ if (driverSubmitSslOptions.enabled) {
+ buildSslConnectionConfiguration(driverSubmitSslOptions)
+ } else {
+ (null, SSLContext.getDefault)
+ }
+ val url = s"$urlScheme://$nodeAddress:$servicePort"
HttpClientUtil.createClient[KubernetesSparkRestApi](
- uri = url,
+ url,
sslSocketFactory = sslContext.getSocketFactory,
trustContext = trustManager)
}
- private def parseCustomLabels(labels: String): Map[String, String] = {
- labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
- label.split("=", 2).toSeq match {
- case Seq(k, v) =>
- require(k != DRIVER_LAUNCHER_SELECTOR_LABEL, "Label with key" +
- s" $DRIVER_LAUNCHER_SELECTOR_LABEL cannot be used in" +
- " spark.kubernetes.driver.labels, as it is reserved for Spark's" +
- " internal configuration.")
- (k, v)
- case _ =>
- throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
- " must be a comma-separated list of key-value pairs, with format =." +
- s" Got label: $label. All labels: $labels")
+ private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions) = {
+ driverSubmitSslOptions.trustStore.map(trustStoreFile => {
+ val trustManagerFactory = TrustManagerFactory.getInstance(
+ TrustManagerFactory.getDefaultAlgorithm)
+ val trustStore = KeyStore.getInstance(
+ driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType))
+ if (!trustStoreFile.isFile) {
+ throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
+ s" does not exist or is not a file.")
}
- }).toMap
+ Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
+ driverSubmitSslOptions.trustStorePassword match {
+ case Some(password) =>
+ trustStore.load(trustStoreStream, password.toCharArray)
+ case None => trustStore.load(trustStoreStream, null)
+ }
+ }
+ trustManagerFactory.init(trustStore)
+ val trustManagers = trustManagerFactory.getTrustManagers
+ val sslContext = SSLContext.getInstance("TLSv1.2")
+ sslContext.init(null, trustManagers, SECURE_RANDOM)
+ (trustManagers(0).asInstanceOf[X509TrustManager], sslContext)
+ }).getOrElse((null, SSLContext.getDefault))
+ }
+
+ private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = {
+ maybeLabels.map(labels => {
+ labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
+ label.split("=", 2).toSeq match {
+ case Seq(k, v) =>
+ require(k != SPARK_APP_ID_LABEL, "Label with key" +
+ s" $SPARK_APP_ID_LABEL cannot be used in" +
+ " spark.kubernetes.driver.labels, as it is reserved for Spark's" +
+ " internal configuration.")
+ (k, v)
+ case _ =>
+ throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
+ " must be a comma-separated list of key-value pairs, with format =." +
+ s" Got label: $label. All labels: $labels")
+ }
+ }).toMap
+ }).getOrElse(Map.empty[String, String])
}
}
private[spark] object Client extends Logging {
- private val SUBMISSION_SERVER_SECRET_NAME = "spark-submission-server-secret"
- private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector"
- private val DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT = 7077
- private val DEFAULT_DRIVER_PORT = 7078
- private val DEFAULT_BLOCKMANAGER_PORT = 7079
- private val DEFAULT_UI_PORT = 4040
- private val UI_PORT_NAME = "spark-ui-port"
- private val DRIVER_LAUNCHER_SERVICE_PORT_NAME = "driver-launcher-port"
- private val DRIVER_PORT_NAME = "driver-port"
- private val BLOCKMANAGER_PORT_NAME = "block-manager-port"
- private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher"
- private val SECURE_RANDOM = new SecureRandom()
- private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission"
- private val DEFAULT_LAUNCH_TIMEOUT_SECONDS = 60
- private val SPARK_APP_NAME_LABEL = "spark-app-name"
+ private[spark] val SECURE_RANDOM = new SecureRandom()
def main(args: Array[String]): Unit = {
require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
new file mode 100644
index 0000000000000..43e262cb98c52
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.config.ConfigBuilder
+
+package object config {
+
+ private[spark] val KUBERNETES_NAMESPACE =
+ ConfigBuilder("spark.kubernetes.namespace")
+ .doc("""
+ | The namespace that will be used for running the driver and
+ | executor pods. When using spark-submit in cluster mode,
+ | this can also be passed to spark-submit via the
+ | --kubernetes-namespace command line argument.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault("default")
+
+ private[spark] val DRIVER_DOCKER_IMAGE =
+ ConfigBuilder("spark.kubernetes.driver.docker.image")
+ .doc("""
+ | Docker image to use for the driver. Specify this using the
+ | standard Docker tag format.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault(s"spark-driver:$sparkVersion")
+
+ private[spark] val EXECUTOR_DOCKER_IMAGE =
+ ConfigBuilder("spark.kubernetes.executor.docker.image")
+ .doc("""
+ | Docker image to use for the executors. Specify this using
+ | the standard Docker tag format.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault(s"spark-executor:$sparkVersion")
+
+ private[spark] val KUBERNETES_CA_CERT_FILE =
+ ConfigBuilder("spark.kubernetes.submit.caCertFile")
+ .doc("""
+ | CA cert file for connecting to Kubernetes over SSL. This
+ | file should be located on the submitting machine's disk.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_CLIENT_KEY_FILE =
+ ConfigBuilder("spark.kubernetes.submit.clientKeyFile")
+ .doc("""
+ | Client key file for authenticating against the Kubernetes
+ | API server. This file should be located on the submitting
+ | machine's disk.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_CLIENT_CERT_FILE =
+ ConfigBuilder("spark.kubernetes.submit.clientCertFile")
+ .doc("""
+ | Client cert file for authenticating against the
+ | Kubernetes API server. This file should be located on
+ | the submitting machine's disk.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME =
+ ConfigBuilder("spark.kubernetes.submit.serviceAccountName")
+ .doc("""
+ | Service account that is used when running the driver pod.
+ | The driver pod uses this service account when requesting
+ | executor pods from the API server.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault("default")
+
+ private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS =
+ ConfigBuilder("spark.kubernetes.driver.uploads.jars")
+ .doc("""
+ | Comma-separated list of jars to sent to the driver and
+ | all executors when submitting the application in cluster
+ | mode.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ // Note that while we set a default for this in practice, it's
+ // dynamically determined based on the executor memory.
+ private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
+ ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
+ .doc("""
+ | The amount of off-heap memory (in megabytes) to be
+ | allocated per executor. This is memory that accounts for
+ | things like VM overheads, interned strings, other native
+ | overheads, etc. This tends to grow with the executor size
+ | (typically 6-10%).
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_LABELS =
+ ConfigBuilder("spark.kubernetes.driver.labels")
+ .doc("""
+ | Custom labels that will be added to the driver pod.
+ | This should be a comma-separated list of label key-value
+ | pairs, where each label is in the format key=value.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.driverSubmitTimeout")
+ .doc("""
+ | Time to wait for the driver pod to be initially ready
+ | before aborting the job.
+ """.stripMargin)
+ .timeConf(TimeUnit.SECONDS)
+ .createWithDefault(60L)
+
+ private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE =
+ ConfigBuilder("spark.ssl.kubernetes.submit.keyStore")
+ .doc("""
+ | KeyStore file for the driver submission server listening
+ | on SSL. Can be pre-mounted on the driver container
+ | or uploaded from the submitting client.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE =
+ ConfigBuilder("spark.ssl.kubernetes.submit.trustStore")
+ .doc("""
+ | TrustStore containing certificates for communicating
+ | to the driver submission server over SSL.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_SERVICE_NAME =
+ ConfigBuilder("spark.kubernetes.driver.service.name")
+ .doc("""
+ | Kubernetes service that is in front of the driver
+ | pod.
+ """.stripMargin)
+ .internal()
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_POD_NAME =
+ ConfigBuilder("spark.kubernetes.driver.pod.name")
+ .doc("""
+ | Name of the driver pod.
+ """.stripMargin)
+ .internal()
+ .stringConf
+ .createOptional
+
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
new file mode 100644
index 0000000000000..027cc3c022b4e
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes
+
+package object constants {
+ // Labels
+ private[spark] val SPARK_DRIVER_LABEL = "spark-driver"
+ private[spark] val SPARK_APP_ID_LABEL = "spark-app-id"
+ private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name"
+ private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
+
+ // Secrets
+ private[spark] val DRIVER_CONTAINER_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission"
+ private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret"
+ private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret"
+ private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume"
+ private[spark] val SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME =
+ "spark-submission-server-key-password"
+ private[spark] val SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME =
+ "spark-submission-server-keystore-password"
+ private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore"
+ private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl"
+ private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
+
+ // Default and fixed ports
+ private[spark] val SUBMISSION_SERVER_PORT = 7077
+ private[spark] val DEFAULT_DRIVER_PORT = 7078
+ private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
+ private[spark] val DEFAULT_UI_PORT = 4040
+ private[spark] val UI_PORT_NAME = "spark-ui-port"
+ private[spark] val SUBMISSION_SERVER_PORT_NAME = "submit-server"
+ private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
+ private[spark] val DRIVER_PORT_NAME = "driver"
+ private[spark] val EXECUTOR_PORT_NAME = "executor"
+
+ // Environment Variables
+ private[spark] val ENV_SUBMISSION_SECRET_LOCATION = "SPARK_SUBMISSION_SECRET_LOCATION"
+ private[spark] val ENV_SUBMISSION_SERVER_PORT = "SPARK_SUBMISSION_SERVER_PORT"
+ private[spark] val ENV_SUBMISSION_KEYSTORE_FILE = "SPARK_SUBMISSION_KEYSTORE_FILE"
+ private[spark] val ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE =
+ "SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
+ private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE =
+ "SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
+ private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE"
+ private[spark] val ENV_SUBMISSION_USE_SSL = "SPARK_SUBMISSION_USE_SSL"
+ private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
+ private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
+ private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
+ private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
+ private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
+ private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
+
+ // Miscellaneous
+ private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
+ private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit"
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
index 813d070e0f876..8beba23bc8e11 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
@@ -20,23 +20,22 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
import org.apache.spark.SPARK_VERSION
-// TODO: jars should probably be compressed. Shipping tarballs would be optimal.
case class KubernetesCreateSubmissionRequest(
- val appResource: AppResource,
- val mainClass: String,
- val appArgs: Array[String],
- val sparkProperties: Map[String, String],
- val secret: String,
- val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
+ appResource: AppResource,
+ mainClass: String,
+ appArgs: Array[String],
+ sparkProperties: Map[String, String],
+ secret: String,
+ uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
}
case class TarGzippedData(
- val dataBase64: String,
- val blockSize: Int = 10240,
- val recordSize: Int = 512,
- val encoding: String
+ dataBase64: String,
+ blockSize: Int = 10240,
+ recordSize: Int = 512,
+ encoding: String
)
@JsonTypeInfo(
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala
index 3cbcb16293b1d..18eb9b7a12ca6 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala
@@ -28,12 +28,11 @@ trait KubernetesSparkRestApi {
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/create")
- def create(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse
+ def submitApplication(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse
@GET
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/ping")
def ping(): PingResponse
-
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
index 38fa4d1d3f0b2..451dc96dd65ed 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
@@ -21,20 +21,26 @@ import java.net.URI
import java.util.concurrent.CountDownLatch
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.commons.codec.binary.Base64
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf}
+import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
private case class KubernetesSparkRestServerArguments(
- val host: Option[String] = None,
- val port: Option[Int] = None,
- val secretFile: Option[String] = None) {
+ host: Option[String] = None,
+ port: Option[Int] = None,
+ useSsl: Boolean = false,
+ secretFile: Option[String] = None,
+ keyStoreFile: Option[String] = None,
+ keyStorePasswordFile: Option[String] = None,
+ keyStoreType: Option[String] = None,
+ keyPasswordFile: Option[String] = None) {
def validate(): KubernetesSparkRestServerArguments = {
require(host.isDefined, "Hostname not set via --hostname.")
require(port.isDefined, "Port not set via --port")
@@ -58,6 +64,21 @@ private object KubernetesSparkRestServerArguments {
case "--secret-file" :: value :: tail =>
args = tail
resolvedArguments.copy(secretFile = Some(value))
+ case "--use-ssl" :: value :: tail =>
+ args = tail
+ resolvedArguments.copy(useSsl = value.toBoolean)
+ case "--keystore-file" :: value :: tail =>
+ args = tail
+ resolvedArguments.copy(keyStoreFile = Some(value))
+ case "--keystore-password-file" :: value :: tail =>
+ args = tail
+ resolvedArguments.copy(keyStorePasswordFile = Some(value))
+ case "--keystore-type" :: value :: tail =>
+ args = tail
+ resolvedArguments.copy(keyStoreType = Some(value))
+ case "--keystore-key-password-file" :: value :: tail =>
+ args = tail
+ resolvedArguments.copy(keyPasswordFile = Some(value))
// TODO polish usage message
case Nil => resolvedArguments
case unknown => throw new IllegalStateException(s"Unknown argument(s) found: $unknown")
@@ -78,8 +99,9 @@ private[spark] class KubernetesSparkRestServer(
port: Int,
conf: SparkConf,
expectedApplicationSecret: Array[Byte],
- shutdownLock: CountDownLatch)
- extends RestSubmissionServer(host, port, conf) {
+ shutdownLock: CountDownLatch,
+ sslOptions: SSLOptions = new SSLOptions)
+ extends RestSubmissionServer(host, port, conf, sslOptions) {
private val SERVLET_LOCK = new Object
private val javaExecutable = s"${System.getenv("JAVA_HOME")}/bin/java"
@@ -196,7 +218,7 @@ private[spark] class KubernetesSparkRestServer(
response.success = true
response.submissionId = null
response.message = "success"
- response.serverSparkVersion = SPARK_VERSION
+ response.serverSparkVersion = sparkVersion
response
}
case unexpected =>
@@ -249,6 +271,7 @@ private[spark] class KubernetesSparkRestServer(
private[spark] object KubernetesSparkRestServer {
private val barrier = new CountDownLatch(1)
+
def main(args: Array[String]): Unit = {
val parsedArguments = KubernetesSparkRestServerArguments.fromArgsArray(args)
val secretFile = new File(parsedArguments.secretFile.get)
@@ -256,6 +279,24 @@ private[spark] object KubernetesSparkRestServer {
throw new IllegalArgumentException(s"Secret file specified by --secret-file" +
" is not a file, or does not exist.")
}
+ val sslOptions = if (parsedArguments.useSsl) {
+ val keyStorePassword = parsedArguments
+ .keyStorePasswordFile
+ .map(new File(_))
+ .map(Files.toString(_, Charsets.UTF_8))
+ val keyPassword = parsedArguments
+ .keyPasswordFile
+ .map(new File(_))
+ .map(Files.toString(_, Charsets.UTF_8))
+ new SSLOptions(
+ enabled = true,
+ keyStore = parsedArguments.keyStoreFile.map(new File(_)),
+ keyStoreType = parsedArguments.keyStoreType,
+ keyStorePassword = keyStorePassword,
+ keyPassword = keyPassword)
+ } else {
+ new SSLOptions
+ }
val secretBytes = Files.toByteArray(secretFile)
val sparkConf = new SparkConf(true)
val server = new KubernetesSparkRestServer(
@@ -263,7 +304,8 @@ private[spark] object KubernetesSparkRestServer {
parsedArguments.port.get,
sparkConf,
secretBytes,
- barrier)
+ barrier,
+ sslOptions)
server.start()
ShutdownHookManager.addShutdownHook(() => {
try {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
index f512c50a9a934..1a11f376fddb2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
@@ -21,13 +21,14 @@ import java.util.concurrent.Executors
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import com.google.common.util.concurrent.ThreadFactoryBuilder
-import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, EnvVar, EnvVarBuilder, Pod, QuantityBuilder}
+import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder}
+import org.apache.spark.deploy.kubernetes.config._
+import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -44,24 +45,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod]
private val kubernetesMaster = Client.resolveK8sMaster(sc.master)
-
- private val executorDockerImage = conf
- .get("spark.kubernetes.executor.docker.image", s"spark-executor:${sc.version}")
-
- private val kubernetesNamespace = conf.get("spark.kubernetes.namespace", "default")
-
+ private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
+ private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
-
private val blockmanagerPort = conf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
private val kubernetesDriverServiceName = conf
- .getOption("spark.kubernetes.driver.service.name")
+ .get(KUBERNETES_DRIVER_SERVICE_NAME)
.getOrElse(
throw new SparkException("Must specify the service name the driver is running with"))
private val kubernetesDriverPodName = conf
- .getOption("spark.kubernetes.driver.pod.name")
+ .get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(
throw new SparkException("Must specify the driver pod name"))
@@ -69,7 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
private val memoryOverheadBytes = conf
- .getOption("spark.kubernetes.executor.memoryOverhead")
+ .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
.map(overhead => Utils.byteStringAsBytes(overhead))
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt,
MEMORY_OVERHEAD_MIN))
@@ -87,7 +83,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val kubernetesClient = KubernetesClientBuilder
.buildFromWithinPod(kubernetesMaster, kubernetesNamespace)
- val driverPod = try {
+ private val driverPod = try {
kubernetesClient.pods().inNamespace(kubernetesNamespace).
withName(kubernetesDriverPodName).get()
} catch {
@@ -106,13 +102,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
protected var totalExpectedExecutors = new AtomicInteger(0)
private val driverUrl = RpcEndpointAddress(
- System.getenv(s"${convertToEnvMode(kubernetesDriverServiceName)}_SERVICE_HOST"),
+ sc.getConf.get("spark.driver.host"),
sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
- private def convertToEnvMode(value: String): String =
- value.toUpperCase.map { c => if (c == '-') '_' else c }
-
private val initialExecutors = getInitialTargetExecutorNumber(1)
private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = {
@@ -130,6 +123,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}
+ override def applicationId(): String = conf
+ .getOption("spark.app.id")
+ .getOrElse(super.applicationId())
+
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
}
@@ -166,9 +163,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
private def allocateNewExecutorPod(): (String, Pod) = {
val executorKubernetesId = UUID.randomUUID().toString.replaceAll("-", "")
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
- val name = s"$kubernetesDriverServiceName-exec-$executorKubernetesId"
- val selectors = Map(SPARK_EXECUTOR_SELECTOR -> executorId,
- SPARK_APP_SELECTOR -> applicationId()).asJava
+ val name = s"${applicationId()}-exec-$executorKubernetesId"
+ val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId,
+ SPARK_APP_ID_LABEL -> applicationId()).asJava
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(executorMemoryBytes.toString)
.build()
@@ -178,69 +175,61 @@ private[spark] class KubernetesClusterSchedulerBackend(
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCores)
.build()
- val requiredEnv = new ArrayBuffer[EnvVar]
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_EXECUTOR_PORT")
- .withValue(executorPort.toString)
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_DRIVER_URL")
- .withValue(driverUrl)
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_EXECUTOR_CORES")
- .withValue(executorCores)
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_EXECUTOR_MEMORY")
- .withValue(executorMemory)
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_APPLICATION_ID")
- .withValue(applicationId())
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_EXECUTOR_ID")
- .withValue(executorId)
- .build()
- val requiredPorts = new ArrayBuffer[ContainerPort]
- requiredPorts += new ContainerPortBuilder()
- .withName(EXECUTOR_PORT_NAME)
- .withContainerPort(executorPort)
- .build()
- requiredPorts += new ContainerPortBuilder()
- .withName(BLOCK_MANAGER_PORT_NAME)
- .withContainerPort(blockmanagerPort)
- .build()
- (executorKubernetesId, kubernetesClient.pods().createNew()
- .withNewMetadata()
- .withName(name)
- .withLabels(selectors)
- .withOwnerReferences()
- .addNewOwnerReference()
- .withController(true)
- .withApiVersion(driverPod.getApiVersion)
- .withKind(driverPod.getKind)
- .withName(driverPod.getMetadata.getName)
- .withUid(driverPod.getMetadata.getUid)
- .endOwnerReference()
- .endMetadata()
- .withNewSpec()
- .addNewContainer()
- .withName(s"exec-${applicationId()}-container")
- .withImage(executorDockerImage)
- .withImagePullPolicy("IfNotPresent")
- .withNewResources()
- .addToRequests("memory", executorMemoryQuantity)
- .addToLimits("memory", executorMemoryLimitQuantity)
- .addToRequests("cpu", executorCpuQuantity)
- .addToLimits("cpu", executorCpuQuantity)
- .endResources()
- .withEnv(requiredEnv.asJava)
- .withPorts(requiredPorts.asJava)
- .endContainer()
- .endSpec()
- .done())
+ val requiredEnv = Seq(
+ (ENV_EXECUTOR_PORT, executorPort.toString),
+ (ENV_DRIVER_URL, driverUrl),
+ (ENV_EXECUTOR_CORES, executorCores),
+ (ENV_EXECUTOR_MEMORY, executorMemory),
+ (ENV_APPLICATION_ID, applicationId()),
+ (ENV_EXECUTOR_ID, executorId)
+ ).map(env => new EnvVarBuilder()
+ .withName(env._1)
+ .withValue(env._2)
+ .build())
+ val requiredPorts = Seq(
+ (EXECUTOR_PORT_NAME, executorPort),
+ (BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
+ .map(port => {
+ new ContainerPortBuilder()
+ .withName(port._1)
+ .withContainerPort(port._2)
+ .build()
+ })
+ try {
+ (executorKubernetesId, kubernetesClient.pods().createNew()
+ .withNewMetadata()
+ .withName(name)
+ .withLabels(selectors)
+ .withOwnerReferences()
+ .addNewOwnerReference()
+ .withController(true)
+ .withApiVersion(driverPod.getApiVersion)
+ .withKind(driverPod.getKind)
+ .withName(driverPod.getMetadata.getName)
+ .withUid(driverPod.getMetadata.getUid)
+ .endOwnerReference()
+ .endMetadata()
+ .withNewSpec()
+ .addNewContainer()
+ .withName(s"executor")
+ .withImage(executorDockerImage)
+ .withImagePullPolicy("IfNotPresent")
+ .withNewResources()
+ .addToRequests("memory", executorMemoryQuantity)
+ .addToLimits("memory", executorMemoryLimitQuantity)
+ .addToRequests("cpu", executorCpuQuantity)
+ .addToLimits("cpu", executorCpuQuantity)
+ .endResources()
+ .withEnv(requiredEnv.asJava)
+ .withPorts(requiredPorts.asJava)
+ .endContainer()
+ .endSpec()
+ .done())
+ } catch {
+ case throwable: Throwable =>
+ logError("Failed to allocate executor pod.", throwable)
+ throw throwable
+ }
}
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
@@ -272,13 +261,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
private object KubernetesClusterSchedulerBackend {
- private val SPARK_EXECUTOR_SELECTOR = "spark-exec"
- private val SPARK_APP_SELECTOR = "spark-app"
private val DEFAULT_STATIC_PORT = 10000
- private val DEFAULT_BLOCKMANAGER_PORT = 7079
- private val DEFAULT_DRIVER_PORT = 7078
- private val BLOCK_MANAGER_PORT_NAME = "blockmanager"
- private val EXECUTOR_PORT_NAME = "executor"
private val MEMORY_OVERHEAD_FACTOR = 0.10
private val MEMORY_OVERHEAD_MIN = 384L
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
index 4d345158f356a..92fdfb8ac5f41 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
@@ -19,5 +19,14 @@ ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark
-# This class will also require setting a secret via the SPARK_APP_SECRET environment variable
-CMD exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer --hostname $HOSTNAME --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT --secret-file $SPARK_SUBMISSION_SECRET_LOCATION
+CMD SSL_ARGS="" && \
+ if ! [ -z ${SPARK_SUBMISSION_USE_SSL+x} ]; then SSL_ARGS="$SSL_ARGS --use-ssl $SPARK_SUBMISSION_USE_SSL"; fi && \
+ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-file $SPARK_SUBMISSION_KEYSTORE_FILE"; fi && \
+ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_TYPE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-type $SPARK_SUBMISSION_KEYSTORE_TYPE"; fi && \
+ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-password-file $SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"; fi && \
+ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \
+ exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \
+ --hostname $HOSTNAME \
+ --port $SPARK_SUBMISSION_SERVER_PORT \
+ --secret-file $SPARK_SUBMISSION_SECRET_LOCATION \
+ ${SSL_ARGS}
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index 569527de8e300..f6a322f18cd75 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -106,6 +106,10 @@
+
+ org.bouncycastle
+ bcpkix-jdk15on
+
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
index c4bb389f5ada2..16de71118dec4 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes.integrationtest
+import java.io.File
import java.nio.file.Paths
import java.util.UUID
import java.util.concurrent.TimeUnit
@@ -36,7 +37,7 @@ import org.apache.spark.deploy.kubernetes.Client
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
-import org.apache.spark.internal.Logging
+import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
import org.apache.spark.util.Utils
@@ -68,6 +69,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
private val NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "")
private var minikubeKubernetesClient: KubernetesClient = _
private var clientConfig: Config = _
+ private var keyStoreFile: File = _
+ private var trustStoreFile: File = _
override def beforeAll(): Unit = {
Minikube.startMinikube()
@@ -79,6 +82,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.done()
minikubeKubernetesClient = Minikube.getKubernetesClient.inNamespace(NAMESPACE)
clientConfig = minikubeKubernetesClient.getConfiguration
+ val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair(
+ Minikube.getMinikubeIp,
+ "changeit",
+ "changeit",
+ "changeit")
+ keyStoreFile = keyStore
+ trustStoreFile = trustStore
}
before {
@@ -162,7 +172,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.set("spark.executor.memory", "500m")
.set("spark.executor.cores", "1")
.set("spark.executors.instances", "1")
- .set("spark.app.id", "spark-pi")
+ .set("spark.app.name", "spark-pi")
.set("spark.ui.enabled", "true")
.set("spark.testing", "false")
val mainAppResource = s"file://$EXAMPLES_JAR"
@@ -288,12 +298,40 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.getLabels
// We can't match all of the selectors directly since one of the selectors is based on the
// launch time.
- assert(driverPodLabels.size == 4, "Unexpected number of pod labels.")
- assert(driverPodLabels.containsKey("driver-launcher-selector"), "Expected driver launcher" +
- " selector label to be present.")
+ assert(driverPodLabels.size == 5, "Unexpected number of pod labels.")
assert(driverPodLabels.get("spark-app-name") == "spark-pi", "Unexpected value for" +
" spark-app-name label.")
+ assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" +
+ " spark-app-id label (should be prefixed with the app name).")
assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1")
assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2")
}
+
+ test("Enable SSL on the driver submit server") {
+ val args = Array(
+ "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443",
+ "--deploy-mode", "cluster",
+ "--kubernetes-namespace", NAMESPACE,
+ "--name", "spark-pi",
+ "--executor-memory", "512m",
+ "--executor-cores", "1",
+ "--num-executors", "1",
+ "--upload-jars", HELPER_JAR,
+ "--class", MAIN_CLASS,
+ "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}",
+ "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}",
+ "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}",
+ "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
+ "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
+ "--conf", "spark.ssl.kubernetes.submit.enabled=true",
+ "--conf", "spark.ssl.kubernetes.submit.keyStore=" +
+ s"file://${keyStoreFile.getAbsolutePath}",
+ "--conf", "spark.ssl.kubernetes.submit.keyStorePassword=changeit",
+ "--conf", "spark.ssl.kubernetes.submit.keyPassword=changeit",
+ "--conf", "spark.ssl.kubernetes.submit.trustStore=" +
+ s"file://${trustStoreFile.getAbsolutePath}",
+ "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit",
+ EXAMPLES_JAR)
+ SparkSubmit.main(args)
+ }
}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala
new file mode 100644
index 0000000000000..bde7b43226660
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.integrationtest.sslutil
+
+import java.io.{File, FileOutputStream}
+import java.math.BigInteger
+import java.nio.file.Files
+import java.security.{KeyPairGenerator, KeyStore, SecureRandom}
+import java.util.{Calendar, Random}
+import javax.security.auth.x500.X500Principal
+
+import org.bouncycastle.asn1.x509.{Extension, GeneralName, GeneralNames}
+import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3CertificateBuilder}
+import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder
+
+import org.apache.spark.util.Utils
+
+private[spark] object SSLUtils {
+
+ def generateKeyStoreTrustStorePair(
+ ipAddress: String,
+ keyStorePassword: String,
+ keyPassword: String,
+ trustStorePassword: String): (File, File) = {
+ val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
+ keyPairGenerator.initialize(512)
+ val keyPair = keyPairGenerator.generateKeyPair()
+ val selfPrincipal = new X500Principal(s"cn=$ipAddress")
+ val currentDate = Calendar.getInstance
+ val validForOneHundredYears = Calendar.getInstance
+ validForOneHundredYears.add(Calendar.YEAR, 100)
+ val certificateBuilder = new JcaX509v3CertificateBuilder(
+ selfPrincipal,
+ new BigInteger(4096, new Random()),
+ currentDate.getTime,
+ validForOneHundredYears.getTime,
+ selfPrincipal,
+ keyPair.getPublic)
+ certificateBuilder.addExtension(Extension.subjectAlternativeName, false,
+ new GeneralNames(new GeneralName(GeneralName.iPAddress, ipAddress)))
+ val signer = new JcaContentSignerBuilder("SHA1WithRSA")
+ .setSecureRandom(new SecureRandom())
+ .build(keyPair.getPrivate)
+ val bcCertificate = certificateBuilder.build(signer)
+ val jcaCertificate = new JcaX509CertificateConverter().getCertificate(bcCertificate)
+ val keyStore = KeyStore.getInstance("JKS")
+ keyStore.load(null, null)
+ keyStore.setKeyEntry("key", keyPair.getPrivate,
+ keyPassword.toCharArray, Array(jcaCertificate))
+ val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile()
+ tempDir.deleteOnExit()
+ val keyStoreFile = new File(tempDir, "keyStore.jks")
+ Utils.tryWithResource(new FileOutputStream(keyStoreFile)) {
+ keyStore.store(_, keyStorePassword.toCharArray)
+ }
+ val trustStore = KeyStore.getInstance("JKS")
+ trustStore.load(null, null)
+ trustStore.setCertificateEntry("key", jcaCertificate)
+ val trustStoreFile = new File(tempDir, "trustStore.jks")
+ Utils.tryWithResource(new FileOutputStream(trustStoreFile)) {
+ trustStore.store(_, trustStorePassword.toCharArray)
+ }
+ (keyStoreFile, trustStoreFile)
+ }
+
+}