Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
19618aa
[SPARK-23146] Support client mode.
mccheah Jul 11, 2018
4bab48b
Allow driver pod name to be optional.
mccheah Jul 11, 2018
94ed1cc
Add driver pod name check for cluster mode
mccheah Jul 11, 2018
560993e
Fix build
mccheah Jul 11, 2018
e961fd3
Remove unused imports
mccheah Jul 11, 2018
a00561f
Fix build
mccheah Jul 11, 2018
a2609b0
More compilation fixes.
mccheah Jul 11, 2018
97f1284
Fix build again
mccheah Jul 11, 2018
9a17830
Small change to force build to re-run
mccheah Jul 11, 2018
2205220
Merge remote-tracking branch 'apache/master' into k8s-client-mode
mccheah Jul 11, 2018
64cc39b
Fix build
mccheah Jul 11, 2018
0f6ad73
Fix integration test.
mccheah Jul 11, 2018
88a9d7f
Add namespaces for client mode test
mccheah Jul 11, 2018
5785ce7
Add container name for client mode test.
mccheah Jul 13, 2018
75db063
Fix dockerfile bug
mccheah Jul 13, 2018
846f093
Add container image to client mode test
mccheah Jul 13, 2018
69bf2a4
Some cleanup and rewording
mccheah Jul 13, 2018
ee5c267
Documentation
mccheah Jul 14, 2018
bc38be7
Address comments.
mccheah Jul 16, 2018
bd102b3
Merge remote-tracking branch 'apache/master' into k8s-client-mode
mccheah Jul 16, 2018
37537c6
Reword docs
mccheah Jul 16, 2018
0db7a79
Fix typo
mccheah Jul 16, 2018
d286de1
Merge remote-tracking branch 'apache/master' into k8s-client-mode
mccheah Jul 18, 2018
086747e
Clarify some docs
mccheah Jul 20, 2018
7d0bc2a
Merge remote-tracking branch 'apache/master' into k8s-client-mode
mccheah Jul 20, 2018
d90f753
Fix typo
mccheah Jul 20, 2018
001a525
Fix wording
mccheah Jul 20, 2018
72c96e0
More feedback
mccheah Jul 20, 2018
ded1ff6
Helper method for master url parsing
mccheah Jul 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 110 additions & 28 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0.
spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`.
This URI is the location of the example jar that is already in the Docker image.

## Client Mode

Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application
runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode,
it is recommended to account for the following factors:

### Client Mode Networking

Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark
executors. The specific network configuration that will be required for Spark to work in client mode will vary per
setup. If you run your driver inside a Kubernetes pod, you can use a
[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your
driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that
the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver
pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's
hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`.
Copy link
Member

@felixcheung felixcheung Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah as for your comment #21748 (comment) so this manual setup is ok, right?

there are some level of complexity here - perhaps a quick follow up of some sample template/kubectl commands would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah manual setup is fine for now. Think additional docs around how to do all this can be a separate PR.


### Client Mode Executor Pod Garbage Collection

If you run your Spark driver in a pod, it is highly recommended to set `spark.driver.pod.name` to the name of that pod.
When this property is set, the Spark scheduler will deploy the executor pods with an
[OwnerReference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will
ensure that once the driver pod is deleted from the cluster, all of the application's executor pods will also be deleted.
The driver will look for a pod with the given name in the namespace specified by `spark.kubernetes.namespace`, and
an OwnerReference pointing to that pod will be added to each executor pod's OwnerReferences list. Be careful to avoid
setting the OwnerReference to a pod that is not actually that driver pod, or else the executors may be terminated
prematurely when the wrong pod is deleted.

If your application is not running inside a pod, or if `spark.driver.pod.name` is not set when your application is
actually running in a pod, keep in mind that the executor pods may not be properly deleted from the cluster when the
application exits. The Spark scheduler attempts to delete these pods, but if the network request to the API server fails
for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the
driver, so the executor pods should not consume compute resources (cpu and memory) in the cluster after your application
Copy link
Member

@felixcheung felixcheung Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor processes should exit when they cannot reach the driver
what's the time out value? is it configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear, it triggers in the onDisconnected event so I think there's a persistent socket connection that's dropped that causes the exit. So, it should more or less be instantaneous.

exits.

### Authentication Parameters

Use the exact prefix `spark.kubernetes.authenticate` for Kubernetes authentication parameters in client mode.

## Dependency Management

If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to
Expand Down Expand Up @@ -258,10 +297,6 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl
[Using RBAC Authorization](https://kubernetes.io/docs/admin/authorization/rbac/) and
[Configure Service Accounts for Pods](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/).

## Client Mode

Client mode is not currently supported.

## Future Work

There are several Spark on Kubernetes features that are currently being incubated in a fork -
Expand Down Expand Up @@ -354,7 +389,7 @@ specific to Spark on Kubernetes.
<td>
Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file
must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide
a scheme).
a scheme). In client mode, use <code>spark.kubernetes.authenticate.caCertFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -363,7 +398,7 @@ specific to Spark on Kubernetes.
<td>
Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file
must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide
a scheme).
a scheme). In client mode, use <code>spark.kubernetes.authenticate.clientKeyFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -372,7 +407,7 @@ specific to Spark on Kubernetes.
<td>
Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This
file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme).
provide a scheme). In client mode, use <code>spark.kubernetes.authenticate.clientCertFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -381,7 +416,7 @@ specific to Spark on Kubernetes.
<td>
OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note
that unlike the other authentication options, this is expected to be the exact string value of the token to use for
the authentication.
the authentication. In client mode, use <code>spark.kubernetes.authenticate.oauthToken</code> instead.
</td>
</tr>
<tr>
Expand All @@ -390,7 +425,7 @@ specific to Spark on Kubernetes.
<td>
Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server when starting the driver.
This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme).
provide a scheme). In client mode, use <code>spark.kubernetes.authenticate.oauthTokenFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -399,18 +434,18 @@ specific to Spark on Kubernetes.
<td>
Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting
executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod.
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use
<code>spark.kubernetes.authenticate.caCertFile</code> instead.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.clientKeyFile</code></td>
<td>(none)</td>
<td>
Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting
executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod.
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly
recommended to set up TLS for the driver submission server, as this value is sensitive information that would be
passed to the driver pod in plaintext otherwise.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove

    recommended to set up TLS for the driver submission server, as this value is sensitive information that would be
   passed to the driver pod in plaintext otherwise.```?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is very much out of date actually, it's referring to an old version of this code where we didn't use K8s secrets to push this data, but we instead used a custom HTTP server mounted on the driver. It's from before this was merged into apache/master.

executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod as
a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
In client mode, use <code>spark.kubernetes.authenticate.clientKeyFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -419,7 +454,8 @@ specific to Spark on Kubernetes.
<td>
Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when
requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the
driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
driver pod as a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
In client mode, use <code>spark.kubernetes.authenticate.clientCertFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -428,9 +464,8 @@ specific to Spark on Kubernetes.
<td>
OAuth token to use when authenticating against the Kubernetes API server from the driver pod when
requesting executors. Note that unlike the other authentication options, this must be the exact string value of
the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is
highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would
be passed to the driver pod in plaintext otherwise.
the token to use for the authentication. This token value is uploaded to the driver pod as a Kubernetes secret.
In client mode, use <code>spark.kubernetes.authenticate.oauthToken</code> instead.
</td>
</tr>
<tr>
Expand All @@ -439,9 +474,8 @@ specific to Spark on Kubernetes.
<td>
Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server from the driver pod when
requesting executors. Note that unlike the other authentication options, this file must contain the exact string value of
the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is
highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would
be passed to the driver pod in plaintext otherwise.
the token to use for the authentication. This token value is uploaded to the driver pod as a secret. In client mode, use
<code>spark.kubernetes.authenticate.oauthTokenFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -450,7 +484,8 @@ specific to Spark on Kubernetes.
<td>
Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting
executors. This path must be accessible from the driver pod.
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use
<code>spark.kubernetes.authenticate.caCertFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -459,7 +494,8 @@ specific to Spark on Kubernetes.
<td>
Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting
executors. This path must be accessible from the driver pod.
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use
<code>spark.kubernetes.authenticate.clientKeyFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -468,7 +504,8 @@ specific to Spark on Kubernetes.
<td>
Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when
requesting executors. This path must be accessible from the driver pod.
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use
<code>spark.kubernetes.authenticate.clientCertFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -477,7 +514,8 @@ specific to Spark on Kubernetes.
<td>
Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server from the driver pod when
requesting executors. This path must be accessible from the driver pod.
Note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication.
Note that unlike the other authentication options, this file must contain the exact string value of the token to use
for the authentication. In client mode, use <code>spark.kubernetes.authenticate.oauthTokenFile</code> instead.
</td>
</tr>
<tr>
Expand All @@ -486,7 +524,48 @@ specific to Spark on Kubernetes.
<td>
Service account that is used when running the driver pod. The driver pod uses this service account when requesting
executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file,
client cert file, and/or OAuth token.
client cert file, and/or OAuth token. In client mode, use <code>spark.kubernetes.authenticate.serviceAccountName</code> instead.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.caCertFile</code></td>
<td>(none)</td>
<td>
In client mode, path to the CA cert file for connecting to the Kubernetes API server over TLS when
requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.clientKeyFile</code></td>
<td>(none)</td>
<td>
In client mode, path to the client key file for authenticating against the Kubernetes API server
when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.clientCertFile</code></td>
<td>(none)</td>
<td>
In client mode, path to the client cert file for authenticating against the Kubernetes API server
when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.oauthToken</code></td>
<td>(none)</td>
<td>
In client mode, the OAuth token to use when authenticating against the Kubernetes API server when
requesting executors. Note that unlike the other authentication options, this must be the exact string value of
the token to use for the authentication.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.oauthTokenFile</code></td>
<td>(none)</td>
<td>
In client mode, path to the file containing the OAuth token to use when authenticating against the Kubernetes API
server when requesting executors.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -529,8 +608,11 @@ specific to Spark on Kubernetes.
<td><code>spark.kubernetes.driver.pod.name</code></td>
<td>(none)</td>
<td>
Name of the driver pod. If not set, the driver pod name is set to "spark.app.name" suffixed by the current timestamp
to avoid name conflicts.
Name of the driver pod. In cluster mode, if this is not set, the driver pod name is set to "spark.app.name"
suffixed by the current timestamp to avoid name conflicts. In client mode, if your application is running
inside a pod, it is highly recommended to set this to the name of the pod your driver is running in. Setting this
value in client mode allows the driver to become the owner of its executor pods, which in turn allows the executor
pods to be garbage collected by the cluster.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private[spark] object Config extends Logging {
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
Expand All @@ -90,7 +91,7 @@ private[spark] object Config extends Logging {
ConfigBuilder("spark.kubernetes.submitInDriver")
.internal()
.booleanConf
.createOptional
.createWithDefault(false)

val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] case class KubernetesDriverSpecificConf(
*/
private[spark] case class KubernetesExecutorSpecificConf(
executorId: String,
driverPod: Pod)
driverPod: Option[Pod])
extends KubernetesRoleSpecificConf

/**
Expand Down Expand Up @@ -186,7 +186,7 @@ private[spark] object KubernetesConf {
sparkConf: SparkConf,
executorId: String,
appId: String,
driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = {
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
require(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ private[spark] object KubernetesUtils {
case _ => uri
}
}

def parseMasterUrl(url: String): String = url.substring("k8s://".length)
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,20 @@ private[spark] class BasicExecutorFeatureStep(
.build()
}.getOrElse(executorContainer)
val driverPod = kubernetesConf.roleSpecificConf.driverPod
val ownerReference = driverPod.map(pod =>
new OwnerReferenceBuilder()
.withController(true)
.withApiVersion(pod.getApiVersion)
.withKind(pod.getKind)
.withName(pod.getMetadata.getName)
.withUid(pod.getMetadata.getUid)
.build())
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.addToOwnerReferences(ownerReference.toSeq: _*)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -228,7 +228,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val namespace = kubernetesConf.namespace()
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = sparkConf.get("spark.master").substring("k8s://".length)
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,18 @@ private[spark] class ExecutorPodsAllocator(

private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000)

private val namespace = conf.get(KUBERNETES_NAMESPACE)

private val kubernetesDriverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(throw new SparkException("Must specify the driver pod name"))

private val driverPod = kubernetesClient.pods()
.withName(kubernetesDriverPodName)
.get()
private val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.withName(name)
.get())
.getOrElse(throw new SparkException(
s"No pod was found named $kubernetesDriverPodName in the cluster in the " +
s"namespace $namespace (this was supposed to be the driver pod.).")))

// Executor IDs that have been requested from Kubernetes but have not been detected in any
// snapshot yet. Mapped to the timestamp when they were created.
Expand Down
Loading