Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 67 additions & 21 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ The Spark master, specified either via passing the `--master` command line argum
master string with `k8s://` will cause the Spark application to launch on the Kubernetes cluster, with the API server
being contacted at `api_server_url`. If no HTTP protocol is specified in the URL, it defaults to `https`. For example,
setting the master to `k8s://example.com:443` is equivalent to setting it to `k8s://https://example.com:443`, but to
connect without SSL on a different port, the master would be set to `k8s://http://example.com:8443`.
connect without TLS on a different port, the master would be set to `k8s://http://example.com:8443`.

If you have a Kubernetes cluster setup, one way to discover the apiserver URL is by executing `kubectl cluster-info`.

Expand Down Expand Up @@ -119,20 +119,20 @@ is currently supported.

## Advanced

### Setting Up SSL For Submitting the Driver
### Setting Up TLS For Submitting the Driver

When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server
receives the driver's configuration, including uploaded driver jars, from the client before starting the application.
Spark supports using SSL to encrypt the traffic in this bootstrapping process. It is recommended to configure this
Spark supports using TLS to encrypt the traffic in this bootstrapping process. It is recommended to configure this
whenever possible.

See the [security page](security.html) and [configuration](configuration.html) sections for more information on
configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context
configuring TLS; use the prefix `spark.ssl.kubernetes.submission` in configuring the TLS-related fields in the context
of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver
pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`.
pod in starting the application, set `spark.ssl.kubernetes.submission.trustStore`.

One note about the keyStore is that it can be specified as either a file on the client machine or a file in the
container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:`
container image's disk. Thus `spark.ssl.kubernetes.submission.keyStore` can be a URI with a scheme of either `file:`
or `local:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto
the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme
`local:`, the file is assumed to already be on the container's disk at the appropriate path.
Expand Down Expand Up @@ -200,42 +200,88 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submit.caCertFile</code></td>
<td><code>spark.kubernetes.authenticate.submission.caCertFile</code></td>
<td>(none)</td>
<td>
CA cert file for connecting to Kubernetes over SSL. This file should be located on the submitting machine's disk.
Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file
must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide
a scheme).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submit.clientKeyFile</code></td>
<td><code>spark.kubernetes.authenticate.submission.clientKeyFile</code></td>
<td>(none)</td>
<td>
Client key file for authenticating against the Kubernetes API server. This file should be located on the submitting
machine's disk.
Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file
must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide
a scheme).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submit.clientCertFile</code></td>
<td><code>spark.kubernetes.authenticate.submission.clientCertFile</code></td>
<td>(none)</td>
<td>
Client cert file for authenticating against the Kubernetes API server. This file should be located on the submitting
machine's disk.
Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This
file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submit.oauthToken</code></td>
<td><code>spark.kubernetes.authenticate.submission.oauthToken</code></td>
<td>(none)</td>
<td>
OAuth token to use when authenticating against the against the Kubernetes API server. Note that unlike the other
authentication options, this should be the exact string value of the token to use for the authentication.
OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note
that unlike the other authentication options, this is expected to be the exact string value of the token to use for
the authentication.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submit.serviceAccountName</code></td>
<td><code>spark.kubernetes.authenticate.driver.caCertFile</code></td>
<td>(none)</td>
<td>
Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting
executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod.
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.clientKeyFile</code></td>
<td>(none)</td>
<td>
Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting
executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod.
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly
recommended to set up TLS for the driver submission server, as this value is sensitive information that would be
passed to the driver pod in plaintext otherwise.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.clientCertFile</code></td>
<td>(none)</td>
<td>
Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when
requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the
driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.oauthToken</code></td>
<td>(none)</td>
<td>
OAuth token to use when authenticating against the against the Kubernetes API server from the driver pod when
requesting executors. Note that unlike the other authentication options, this must be the exact string value of
the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is
highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would
be passed to the driver pod in plaintext otherwise.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
<td><code>default</code></td>
<td>
Service account that is used when running the driver pod. The driver pod uses this service account when requesting
executor pods from the API server.
executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file,
client cert file, and/or OAuth token.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -281,7 +327,7 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverSubmitTimeout</code></td>
<td><code>spark.kubernetes.driverSubmissionTimeout</code></td>
<td>60s</td>
<td>
Time to wait for the driver pod to start running before aborting its execution.
Expand All @@ -296,7 +342,7 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submit.waitAppCompletion</code></td>
<td><code>spark.kubernetes.submission.waitAppCompletion</code></td>
<td><code>true</code></td>
<td>
In cluster mode, whether to wait for the application to finish before exiting the launcher process. When changed to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.security.SecureRandom
import java.util.ServiceLoader
import java.util.concurrent.{CountDownLatch, TimeUnit}

import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
Expand All @@ -33,7 +32,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, KubernetesCredentials, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ShutdownHookManager, Utils}
Expand All @@ -53,7 +52,7 @@ private[spark] class Client(
.getOrElse("spark")
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId"
private val secretDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
Expand Down Expand Up @@ -119,23 +118,22 @@ private[spark] class Client(
customAnnotations,
KUBERNETES_DRIVER_ANNOTATIONS.key,
"annotations")
val driverPodKubernetesCredentials = new DriverPodKubernetesCredentialsProvider(sparkConf).get()
var k8ConfBuilder = new K8SConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withNamespace(namespace)
sparkConf.get(KUBERNETES_CA_CERT_FILE).foreach {
sparkConf.get(KUBERNETES_SUBMIT_CA_CERT_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f)
}
sparkConf.get(KUBERNETES_CLIENT_KEY_FILE).foreach {
sparkConf.get(KUBERNETES_SUBMIT_CLIENT_KEY_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f)
}
sparkConf.get(KUBERNETES_CLIENT_CERT_FILE).foreach {
sparkConf.get(KUBERNETES_SUBMIT_CLIENT_CERT_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f)
}
sparkConf.get(KUBERNETES_OAUTH_TOKEN).foreach { token =>
sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { token =>
k8ConfBuilder = k8ConfBuilder.withOauthToken(token)
// Remove the oauth token from Spark conf so that its doesn't appear in the Spark UI.
sparkConf.set(KUBERNETES_OAUTH_TOKEN, "<present_but_redacted>")
}

val k8ClientConfig = k8ConfBuilder.build
Expand Down Expand Up @@ -174,11 +172,6 @@ private[spark] class Client(
.done()
kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret)
val sslConfiguration = sslConfigurationProvider.getSslConfiguration()
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels)
val (driverPod, driverService) = launchDriverKubernetesComponents(
kubernetesClient,
driverServiceManager,
Expand All @@ -198,7 +191,8 @@ private[spark] class Client(
sslConfiguration,
driverService,
submitterLocalFiles,
submitterLocalJars)
submitterLocalJars,
driverPodKubernetesCredentials)
// Now that the application has started, persist the components that were created beyond
// the shutdown hook. We still want to purge the one-time secrets, so do not unregister
// those.
Expand Down Expand Up @@ -245,7 +239,8 @@ private[spark] class Client(
sslConfiguration: SslConfiguration,
driverService: Service,
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String]): Unit = {
submitterLocalJars: Iterable[String],
driverPodKubernetesCredentials: KubernetesCredentials): Unit = {
sparkConf.getOption("spark.app.id").foreach { id =>
logWarning(s"Warning: Provided app id in spark.app.id as $id will be" +
s" overridden as $kubernetesAppId")
Expand All @@ -257,6 +252,12 @@ private[spark] class Client(
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
}
sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "<present_but_redacted>")
}
val driverSubmitter = buildDriverSubmissionClient(
kubernetesClient,
driverServiceManager,
Expand All @@ -266,7 +267,10 @@ private[spark] class Client(
driverSubmitter.ping()
logInfo(s"Submitting local resources to driver pod for application " +
s"$kubernetesAppId ...")
val submitRequest = buildSubmissionRequest(submitterLocalFiles, submitterLocalJars)
val submitRequest = buildSubmissionRequest(
submitterLocalFiles,
submitterLocalJars,
driverPodKubernetesCredentials)
driverSubmitter.submitApplication(submitRequest)
logInfo("Successfully submitted local resources and driver configuration to" +
" driver pod.")
Expand Down Expand Up @@ -449,7 +453,7 @@ private[spark] class Client(
.endSecret()
.endVolume()
.addToVolumes(sslConfiguration.sslPodVolumes: _*)
.withServiceAccount(serviceAccount)
.withServiceAccount(serviceAccount.getOrElse("default"))
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
Expand Down Expand Up @@ -625,7 +629,8 @@ private[spark] class Client(

private def buildSubmissionRequest(
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String]): KubernetesCreateSubmissionRequest = {
submitterLocalJars: Iterable[String],
driverPodKubernetesCredentials: KubernetesCredentials): KubernetesCreateSubmissionRequest = {
val mainResourceUri = Utils.resolveURI(mainAppResource)
val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme)
.getOrElse("file") match {
Expand All @@ -648,7 +653,8 @@ private[spark] class Client(
secret = secretBase64String,
sparkProperties = sparkConf.getAll.toMap,
uploadedJarsBase64Contents = uploadJarsBase64Contents,
uploadedFilesBase64Contents = uploadFilesBase64Contents)
uploadedFilesBase64Contents = uploadFilesBase64Contents,
driverPodKubernetesCredentials = driverPodKubernetesCredentials)
}

private def buildDriverSubmissionClient(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.io.File

import com.google.common.io.{BaseEncoding, Files}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.rest.KubernetesCredentials
import org.apache.spark.internal.config.OptionalConfigEntry

private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) {

def get(): KubernetesCredentials = {
sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).foreach { _ =>
require(sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).isEmpty,
"Cannot specify both a service account and a driver pod OAuth token.")
require(sparkConf.get(KUBERNETES_DRIVER_CA_CERT_FILE).isEmpty,
"Cannot specify both a service account and a driver pod CA cert file.")
require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_KEY_FILE).isEmpty,
"Cannot specify both a service account and a driver pod client key file.")
require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty,
"Cannot specify both a service account and a driver pod client cert file.")
}
val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN)
val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE,
s"Driver CA cert file provided at %s does not exist or is not a file.")
val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE,
s"Driver client key file provided at %s does not exist or is not a file.")
val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE,
s"Driver client cert file provided at %s does not exist or is not a file.")
val serviceAccountName = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
KubernetesCredentials(
oauthToken = oauthToken,
caCertDataBase64 = caCertDataBase64,
clientKeyDataBase64 = clientKeyDataBase64,
clientCertDataBase64 = clientCertDataBase64)
}

private def safeFileConfToBase64(
conf: OptionalConfigEntry[String],
fileNotFoundFormatString: String): Option[String] = {
sparkConf.get(conf)
.map(new File(_))
.map { file =>
require(file.isFile, String.format(fileNotFoundFormatString, file.getAbsolutePath))
BaseEncoding.base64().encode(Files.toByteArray(file))
}
}
}
Loading