diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index e25e189aa6d74..e256535fbbc9d 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -140,12 +140,12 @@ Spark supports using SSL to encrypt the traffic in this bootstrapping process. I
whenever possible.
See the [security page](security.html) and [configuration](configuration.html) sections for more information on
-configuring SSL; use the prefix `spark.ssl.kubernetes.driverlaunch` in configuring the SSL-related fields in the context
+configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context
of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver
-pod in starting the application, set `spark.ssl.kubernetes.driverlaunch.trustStore`.
+pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`.
One note about the keyStore is that it can be specified as either a file on the client machine or a file in the
-container image's disk. Thus `spark.ssl.kubernetes.driverlaunch.keyStore` can be a URI with a scheme of either `file:`
+container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:`
or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto
the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme
`container:`, the file is assumed to already be on the container's disk at the appropriate path.
@@ -235,7 +235,15 @@ from the other deployment modes. See the [configuration page](configuration.html
(none) |
Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs,
- where each label is in the format key=value.
+ where each label is in the format key=value. Note that Spark also adds its own labels to the driver pod
+ for bookkeeping purposes.
+ |
+
+
+ spark.kubernetes.driverSubmitTimeout |
+ 60s |
+
+ Time to wait for the driver pod to start running before aborting its execution.
|
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
index 07a45c7577bcd..fed9334dbbab4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
@@ -18,13 +18,13 @@ package org.apache.spark.deploy.kubernetes
import java.io.{File, FileInputStream}
import java.security.{KeyStore, SecureRandom}
-import java.util.concurrent.{Executors, TimeoutException, TimeUnit}
+import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}
import com.google.common.base.Charsets
import com.google.common.io.Files
-import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder}
+import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
@@ -34,11 +34,13 @@ import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
-import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions}
+import org.apache.spark.deploy.kubernetes.config._
+import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class Client(
sparkConf: SparkConf,
@@ -47,25 +49,21 @@ private[spark] class Client(
appArgs: Array[String]) extends Logging {
import Client._
- private val namespace = sparkConf.get("spark.kubernetes.namespace", "default")
+ private val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
private val master = resolveK8sMaster(sparkConf.get("spark.master"))
private val launchTime = System.currentTimeMillis
private val appName = sparkConf.getOption("spark.app.name")
- .orElse(sparkConf.getOption("spark.app.id"))
.getOrElse("spark")
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
- private val secretName = s"spark-submission-server-secret-$kubernetesAppId"
- private val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId"
- private val sslSecretsDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId-ssl"
- private val sslSecretsName = s"spark-submission-server-ssl-$kubernetesAppId"
- private val driverLauncherSelectorValue = s"driver-launcher-$launchTime"
- private val driverDockerImage = sparkConf.get(
- "spark.kubernetes.driver.docker.image", s"spark-driver:$sparkVersion")
- private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars")
+ private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
+ private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId"
+ private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
+ private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
+ private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
+ private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
- private val driverLaunchTimeoutSecs = sparkConf.getTimeAsSeconds(
- "spark.kubernetes.driverLaunchTimeout", s"${DEFAULT_LAUNCH_TIMEOUT_SECONDS}s")
+ private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
private val secretBase64String = {
val secretBytes = new Array[Byte](128)
@@ -73,32 +71,27 @@ private[spark] class Client(
Base64.encodeBase64String(secretBytes)
}
- private val serviceAccount = sparkConf.get("spark.kubernetes.submit.serviceAccountName",
- "default")
-
- private val customLabels = sparkConf.get("spark.kubernetes.driver.labels", "")
+ private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+ private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS)
private implicit val retryableExecutionContext = ExecutionContext
.fromExecutorService(
- Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("kubernetes-client-retryable-futures-%d")
- .setDaemon(true)
- .build()))
+ ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-client-retryable-futures"))
def run(): Unit = {
- val (driverLaunchSslOptions, isKeyStoreLocalFile) = parseDriverLaunchSslOptions()
+ val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
val parsedCustomLabels = parseCustomLabels(customLabels)
var k8ConfBuilder = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withNamespace(namespace)
- sparkConf.getOption("spark.kubernetes.submit.caCertFile").foreach {
+ sparkConf.get(KUBERNETES_CA_CERT_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f)
}
- sparkConf.getOption("spark.kubernetes.submit.clientKeyFile").foreach {
+ sparkConf.get(KUBERNETES_CLIENT_KEY_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f)
}
- sparkConf.getOption("spark.kubernetes.submit.clientCertFile").foreach {
+ sparkConf.get(KUBERNETES_CLIENT_CERT_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f)
}
@@ -108,15 +101,16 @@ private[spark] class Client(
.withNewMetadata()
.withName(secretName)
.endMetadata()
- .withData(Map((SUBMISSION_SERVER_SECRET_NAME, secretBase64String)).asJava)
+ .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava)
.withType("Opaque")
.done()
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient,
- driverLaunchSslOptions,
+ driverSubmitSslOptions,
isKeyStoreLocalFile)
try {
val driverKubernetesSelectors = (Map(
- DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue,
+ SPARK_DRIVER_LABEL -> kubernetesAppId,
+ SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
val containerPorts = buildContainerPorts()
@@ -126,7 +120,7 @@ private[spark] class Client(
submitCompletedFuture,
submitPending,
kubernetesClient,
- driverLaunchSslOptions,
+ driverSubmitSslOptions,
Array(submitServerSecret) ++ sslSecrets,
driverKubernetesSelectors)
Utils.tryWithResource(kubernetesClient
@@ -141,7 +135,7 @@ private[spark] class Client(
.withNewSpec()
.withRestartPolicy("OnFailure")
.addNewVolume()
- .withName(s"spark-submission-secret-volume")
+ .withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
@@ -149,22 +143,22 @@ private[spark] class Client(
.addToVolumes(sslVolumes: _*)
.withServiceAccount(serviceAccount)
.addNewContainer()
- .withName(DRIVER_LAUNCHER_CONTAINER_NAME)
+ .withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
- .withName("spark-submission-secret-volume")
+ .withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
.addToVolumeMounts(sslVolumeMounts: _*)
.addNewEnv()
- .withName("SPARK_SUBMISSION_SECRET_LOCATION")
- .withValue(s"$secretDirectory/$SUBMISSION_SERVER_SECRET_NAME")
+ .withName(ENV_SUBMISSION_SECRET_LOCATION)
+ .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
.endEnv()
.addNewEnv()
- .withName("SPARK_DRIVER_LAUNCHER_SERVER_PORT")
- .withValue(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT.toString)
+ .withName(ENV_SUBMISSION_SERVER_PORT)
+ .withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
.addToEnv(sslEnvs: _*)
.withPorts(containerPorts.asJava)
@@ -173,7 +167,7 @@ private[spark] class Client(
.done()
var submitSucceeded = false
try {
- submitCompletedFuture.get(driverLaunchTimeoutSecs, TimeUnit.SECONDS)
+ submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
submitSucceeded = true
} catch {
case e: TimeoutException =>
@@ -199,8 +193,8 @@ private[spark] class Client(
}
}
- private def parseDriverLaunchSslOptions(): (SSLOptions, Boolean) = {
- val maybeKeyStore = sparkConf.getOption("spark.ssl.kubernetes.driverlaunch.keyStore")
+ private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = {
+ val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE)
val resolvedSparkConf = sparkConf.clone()
val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => {
val keyStoreURI = Utils.resolveURI(keyStore)
@@ -214,30 +208,29 @@ private[spark] class Client(
(isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath))
}).getOrElse((true, Option.empty[String]))
resolvedKeyStore.foreach {
- resolvedSparkConf.set("spark.ssl.kubernetes.driverlaunch.keyStore", _)
+ resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _)
}
- sparkConf.getOption("spark.ssl.kubernetes.driverlaunch.trustStore").foreach { trustStore =>
+ sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore =>
val trustStoreURI = Utils.resolveURI(trustStore)
trustStoreURI.getScheme match {
case "file" | null =>
- resolvedSparkConf.set("spark.ssl.kubernetes.driverlaunch.trustStore",
- trustStoreURI.getPath)
+ resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, trustStoreURI.getPath)
case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" +
" for submit server must have no scheme, or scheme file://")
}
}
val securityManager = new SecurityManager(resolvedSparkConf)
- (securityManager.getSSLOptions("kubernetes.driverlaunch"), isLocalKeyStore)
+ (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
}
- private def configureSsl(kubernetesClient: KubernetesClient, driverLaunchSslOptions: SSLOptions,
+ private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
- if (driverLaunchSslOptions.enabled) {
+ if (driverSubmitSslOptions.enabled) {
val sslSecretsMap = mutable.HashMap[String, String]()
val sslEnvs = mutable.Buffer[EnvVar]()
val secrets = mutable.Buffer[Secret]()
- driverLaunchSslOptions.keyStore.foreach(store => {
+ driverSubmitSslOptions.keyStore.foreach(store => {
val resolvedKeyStoreFile = if (isKeyStoreLocalFile) {
if (!store.isFile) {
throw new SparkException(s"KeyStore specified at $store is not a file or" +
@@ -245,40 +238,40 @@ private[spark] class Client(
}
val keyStoreBytes = Files.toByteArray(store)
val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes)
- sslSecretsMap += (SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64)
- s"$sslSecretsDirectory/$SSL_KEYSTORE_SECRET_NAME"
+ sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64)
+ s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME"
} else {
store.getAbsolutePath
}
sslEnvs += new EnvVarBuilder()
- .withName("SPARK_SUBMISSION_KEYSTORE_FILE")
+ .withName(ENV_SUBMISSION_KEYSTORE_FILE)
.withValue(resolvedKeyStoreFile)
.build()
})
- driverLaunchSslOptions.keyStorePassword.foreach(password => {
+ driverSubmitSslOptions.keyStorePassword.foreach(password => {
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
- sslSecretsMap += (SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64)
+ sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64)
sslEnvs += new EnvVarBuilder()
- .withName("SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE")
- .withValue(s"$sslSecretsDirectory/$SSL_KEYSTORE_PASSWORD_SECRET_NAME")
+ .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
+ .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
.build()
})
- driverLaunchSslOptions.keyPassword.foreach(password => {
+ driverSubmitSslOptions.keyPassword.foreach(password => {
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
- sslSecretsMap += (SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64)
+ sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64)
sslEnvs += new EnvVarBuilder()
- .withName("SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE")
- .withValue(s"$sslSecretsDirectory/$SSL_KEY_PASSWORD_SECRET_NAME")
+ .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
+ .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
.build()
})
- driverLaunchSslOptions.keyStoreType.foreach(storeType => {
+ driverSubmitSslOptions.keyStoreType.foreach(storeType => {
sslEnvs += new EnvVarBuilder()
- .withName("SPARK_SUBMISSION_KEYSTORE_TYPE")
+ .withName(ENV_SUBMISSION_KEYSTORE_TYPE)
.withValue(storeType)
.build()
})
sslEnvs += new EnvVarBuilder()
- .withName("SPARK_SUBMISSION_USE_SSL")
+ .withName(ENV_SUBMISSION_USE_SSL)
.withValue("true")
.build()
val sslSecrets = kubernetesClient.secrets().createNew()
@@ -290,13 +283,13 @@ private[spark] class Client(
.done()
secrets += sslSecrets
val sslVolume = new VolumeBuilder()
- .withName("spark-submission-server-ssl-secrets")
+ .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withNewSecret()
.withSecretName(sslSecrets.getMetadata.getName)
.endSecret()
.build()
val sslVolumeMount = new VolumeMountBuilder()
- .withName("spark-submission-server-ssl-secrets")
+ .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withReadOnly(true)
.withMountPath(sslSecretsDirectory)
.build()
@@ -310,7 +303,7 @@ private[spark] class Client(
submitCompletedFuture: SettableFuture[Boolean],
submitPending: AtomicBoolean,
kubernetesClient: KubernetesClient,
- driverLaunchSslOptions: SSLOptions,
+ driverSubmitSslOptions: SSLOptions,
applicationSecrets: Array[Secret],
driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] {
override def eventReceived(action: Action, pod: Pod): Unit = {
@@ -322,7 +315,7 @@ private[spark] class Client(
.getContainerStatuses
.asScala
.find(status =>
- status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match {
+ status.getName == DRIVER_CONTAINER_NAME && status.getReady) match {
case Some(_) =>
val ownerRefs = Seq(new OwnerReferenceBuilder()
.withName(pod.getMetadata.getName)
@@ -337,10 +330,10 @@ private[spark] class Client(
kubernetesClient.secrets().createOrReplace(secret)
})
- val driverLauncherServicePort = new ServicePortBuilder()
- .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME)
- .withPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT)
- .withNewTargetPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT)
+ val driverSubmissionServicePort = new ServicePortBuilder()
+ .withName(SUBMISSION_SERVER_PORT_NAME)
+ .withPort(SUBMISSION_SERVER_PORT)
+ .withNewTargetPort(SUBMISSION_SERVER_PORT)
.build()
val service = kubernetesClient.services().createNew()
.withNewMetadata()
@@ -351,20 +344,25 @@ private[spark] class Client(
.withNewSpec()
.withType("NodePort")
.withSelector(driverKubernetesSelectors)
- .withPorts(driverLauncherServicePort)
+ .withPorts(driverSubmissionServicePort)
.endSpec()
.done()
try {
- sparkConf.set("spark.kubernetes.driver.service.name",
- service.getMetadata.getName)
- sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId)
+ sparkConf.getOption("spark.app.id").foreach { id =>
+ logWarning(s"Warning: Provided app id in spark.app.id as $id will be" +
+ s" overridden as $kubernetesAppId")
+ }
+ sparkConf.set(KUBERNETES_DRIVER_POD_NAME, kubernetesAppId)
+ sparkConf.set(KUBERNETES_DRIVER_SERVICE_NAME, service.getMetadata.getName)
+ sparkConf.set("spark.app.id", kubernetesAppId)
+ sparkConf.setIfMissing("spark.app.name", appName)
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
- val driverLauncher = buildDriverLauncherClient(kubernetesClient, service,
- driverLaunchSslOptions)
+ val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, service,
+ driverSubmitSslOptions)
val ping = Retry.retry(5, 5.seconds) {
- driverLauncher.ping()
+ driverSubmitter.ping()
}
ping onFailure {
case t: Throwable =>
@@ -375,7 +373,7 @@ private[spark] class Client(
Future {
sparkConf.set("spark.driver.host", pod.getStatus.getPodIP)
val submitRequest = buildSubmissionRequest()
- driverLauncher.create(submitRequest)
+ driverSubmitter.submitApplication(submitRequest)
}
}
submitComplete onFailure {
@@ -436,17 +434,17 @@ private[spark] class Client(
kubernetesClient.pods().withName(kubernetesAppId).get()
} catch {
case throwable: Throwable =>
- logError(s"Timed out while waiting $driverLaunchTimeoutSecs seconds for the" +
+ logError(s"Timed out while waiting $driverSubmitTimeoutSecs seconds for the" +
" driver pod to start, but an error occurred while fetching the driver" +
" pod's details.", throwable)
- throw new SparkException(s"Timed out while waiting $driverLaunchTimeoutSecs" +
+ throw new SparkException(s"Timed out while waiting $driverSubmitTimeoutSecs" +
" seconds for the driver pod to start. Unfortunately, in attempting to fetch" +
" the latest state of the pod, another error was thrown. Check the logs for" +
" the error that was thrown in looking up the driver pod.", e)
}
val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" +
s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" +
- s" $driverLaunchTimeoutSecs seconds."
+ s" $driverSubmitTimeoutSecs seconds."
val podStatusPhase = if (driverPod.getStatus.getPhase != null) {
s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}"
} else {
@@ -460,7 +458,7 @@ private[spark] class Client(
val failedDriverContainerStatusString = driverPod.getStatus
.getContainerStatuses
.asScala
- .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME)
+ .find(_.getName == DRIVER_CONTAINER_NAME)
.map(status => {
val lastState = status.getState
if (lastState.getRunning != null) {
@@ -481,17 +479,21 @@ private[spark] class Client(
"Driver container last state: Unknown"
}
}).getOrElse("The driver container wasn't found in the pod; expected to find" +
- s" container with name $DRIVER_LAUNCHER_CONTAINER_NAME")
+ s" container with name $DRIVER_CONTAINER_NAME")
s"$topLevelMessage\n" +
s"$podStatusPhase\n" +
s"$podStatusMessage\n\n$failedDriverContainerStatusString"
}
private def buildContainerPorts(): Seq[ContainerPort] = {
- Seq(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
- sparkConf.getInt("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT),
- DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT,
- uiPort).map(new ContainerPortBuilder().withContainerPort(_).build())
+ Seq((DRIVER_PORT_NAME, sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)),
+ (BLOCK_MANAGER_PORT_NAME,
+ sparkConf.getInt("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT)),
+ (SUBMISSION_SERVER_PORT_NAME, SUBMISSION_SERVER_PORT),
+ (UI_PORT_NAME, uiPort)).map(port => new ContainerPortBuilder()
+ .withName(port._1)
+ .withContainerPort(port._2)
+ .build())
}
private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = {
@@ -526,22 +528,22 @@ private[spark] class Client(
.map(CompressionUtils.createTarGzip(_))
}
- private def buildDriverLauncherClient(
+ private def buildDriverSubmissionClient(
kubernetesClient: KubernetesClient,
service: Service,
- driverLaunchSslOptions: SSLOptions): KubernetesSparkRestApi = {
+ driverSubmitSslOptions: SSLOptions): KubernetesSparkRestApi = {
val servicePort = service
.getSpec
.getPorts
.asScala
- .filter(_.getName == DRIVER_LAUNCHER_SERVICE_PORT_NAME)
+ .filter(_.getName == SUBMISSION_SERVER_PORT_NAME)
.head
.getNodePort
// NodePort is exposed on every node, so just pick one of them.
// TODO be resilient to node failures and try all of them
val node = kubernetesClient.nodes.list.getItems.asScala.head
val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress
- val urlScheme = if (driverLaunchSslOptions.enabled) {
+ val urlScheme = if (driverSubmitSslOptions.enabled) {
"https"
} else {
logWarning("Submitting application details, application secret, and local" +
@@ -550,8 +552,8 @@ private[spark] class Client(
"http"
}
val (trustManager, sslContext): (X509TrustManager, SSLContext) =
- if (driverLaunchSslOptions.enabled) {
- buildSslConnectionConfiguration(driverLaunchSslOptions)
+ if (driverSubmitSslOptions.enabled) {
+ buildSslConnectionConfiguration(driverSubmitSslOptions)
} else {
(null, SSLContext.getDefault)
}
@@ -562,18 +564,18 @@ private[spark] class Client(
trustContext = trustManager)
}
- private def buildSslConnectionConfiguration(driverLaunchSslOptions: SSLOptions) = {
- driverLaunchSslOptions.trustStore.map(trustStoreFile => {
+ private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions) = {
+ driverSubmitSslOptions.trustStore.map(trustStoreFile => {
val trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm)
val trustStore = KeyStore.getInstance(
- driverLaunchSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType))
+ driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType))
if (!trustStoreFile.isFile) {
throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
s" does not exist or is not a file.")
}
Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
- driverLaunchSslOptions.trustStorePassword match {
+ driverSubmitSslOptions.trustStorePassword match {
case Some(password) =>
trustStore.load(trustStoreStream, password.toCharArray)
case None => trustStore.load(trustStoreStream, null)
@@ -587,44 +589,29 @@ private[spark] class Client(
}).getOrElse((null, SSLContext.getDefault))
}
- private def parseCustomLabels(labels: String): Map[String, String] = {
- labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
- label.split("=", 2).toSeq match {
- case Seq(k, v) =>
- require(k != DRIVER_LAUNCHER_SELECTOR_LABEL, "Label with key" +
- s" $DRIVER_LAUNCHER_SELECTOR_LABEL cannot be used in" +
- " spark.kubernetes.driver.labels, as it is reserved for Spark's" +
- " internal configuration.")
- (k, v)
- case _ =>
- throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
- " must be a comma-separated list of key-value pairs, with format =." +
- s" Got label: $label. All labels: $labels")
- }
- }).toMap
+ private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = {
+ maybeLabels.map(labels => {
+ labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
+ label.split("=", 2).toSeq match {
+ case Seq(k, v) =>
+ require(k != SPARK_APP_ID_LABEL, "Label with key" +
+ s" $SPARK_APP_ID_LABEL cannot be used in" +
+ " spark.kubernetes.driver.labels, as it is reserved for Spark's" +
+ " internal configuration.")
+ (k, v)
+ case _ =>
+ throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
+ " must be a comma-separated list of key-value pairs, with format =." +
+ s" Got label: $label. All labels: $labels")
+ }
+ }).toMap
+ }).getOrElse(Map.empty[String, String])
}
}
private[spark] object Client extends Logging {
- private val SUBMISSION_SERVER_SECRET_NAME = "spark-submission-server-secret"
- private val SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore"
- private val SSL_KEYSTORE_PASSWORD_SECRET_NAME = "spark-submission-server-keystore-password"
- private val SSL_KEY_PASSWORD_SECRET_NAME = "spark-submission-server-key-password"
- private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector"
- private val DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT = 7077
- private val DEFAULT_DRIVER_PORT = 7078
- private val DEFAULT_BLOCKMANAGER_PORT = 7079
- private val DEFAULT_UI_PORT = 4040
- private val UI_PORT_NAME = "spark-ui-port"
- private val DRIVER_LAUNCHER_SERVICE_PORT_NAME = "driver-launcher-port"
- private val DRIVER_PORT_NAME = "driver-port"
- private val BLOCKMANAGER_PORT_NAME = "block-manager-port"
- private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher"
- private val SECURE_RANDOM = new SecureRandom()
- private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission"
- private val DEFAULT_LAUNCH_TIMEOUT_SECONDS = 60
- private val SPARK_APP_NAME_LABEL = "spark-app-name"
+ private[spark] val SECURE_RANDOM = new SecureRandom()
def main(args: Array[String]): Unit = {
require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
new file mode 100644
index 0000000000000..9b145370f87d6
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.config.ConfigBuilder
+
+package object config {
+
+ private[spark] val KUBERNETES_NAMESPACE =
+ ConfigBuilder("spark.kubernetes.namespace")
+ .doc("""
+ | The namespace that will be used for running the driver and
+ | executor pods. When using spark-submit in cluster mode,
+ | this can also be passed to spark-submit via the
+ | --kubernetes-namespace command line argument.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault("default")
+
+ private[spark] val DRIVER_DOCKER_IMAGE =
+ ConfigBuilder("spark.kubernetes.driver.docker.image")
+ .doc("""
+ | Docker image to use for the driver. Specify this using the
+ | standard Docker tag format.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault(s"spark-driver:$sparkVersion")
+
+ private[spark] val EXECUTOR_DOCKER_IMAGE =
+ ConfigBuilder("spark.kubernetes.executor.docker.image")
+ .doc("""
+ | Docker image to use for the executors. Specify this using
+ | the standard Docker tag format.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault(s"spark-executor:$sparkVersion")
+
+ private[spark] val KUBERNETES_CA_CERT_FILE =
+ ConfigBuilder("spark.kubernetes.submit.caCertFile")
+ .doc("""
+ | CA cert file for connecting to Kubernetes over SSL. This
+ | file should be located on the submitting machine's disk.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_CLIENT_KEY_FILE =
+ ConfigBuilder("spark.kubernetes.submit.clientKeyFile")
+ .doc("""
+ | Client key file for authenticating against the Kubernetes
+ | API server. This file should be located on the submitting
+ | machine's disk.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_CLIENT_CERT_FILE =
+ ConfigBuilder("spark.kubernetes.submit.clientCertFile")
+ .doc("""
+ | Client cert file for authenticating against the
+ | Kubernetes API server. This file should be located on
+ | the submitting machine's disk.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME =
+ ConfigBuilder("spark.kubernetes.submit.serviceAccountName")
+ .doc("""
+ | Service account that is used when running the driver pod.
+ | The driver pod uses this service account when requesting
+ | executor pods from the API server.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault("default")
+
+ private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS =
+ ConfigBuilder("spark.kubernetes.driver.uploads.jars")
+ .doc("""
+ | Comma-separated list of jars to sent to the driver and
+ | all executors when submitting the application in cluster
+ | mode.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ // Note that while we set a default for this when we start up the
+ // scheduler, the specific default value is dynamically determined
+ // based on the executor memory.
+ private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
+ ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
+ .doc("""
+ | The amount of off-heap memory (in megabytes) to be
+ | allocated per executor. This is memory that accounts for
+ | things like VM overheads, interned strings, other native
+ | overheads, etc. This tends to grow with the executor size
+ | (typically 6-10%).
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_LABELS =
+ ConfigBuilder("spark.kubernetes.driver.labels")
+ .doc("""
+ | Custom labels that will be added to the driver pod.
+ | This should be a comma-separated list of label key-value
+ | pairs, where each label is in the format key=value. Note
+ | that Spark also adds its own labels to the driver pod
+ | for bookkeeping purposes.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.driverSubmitTimeout")
+ .doc("""
+ | Time to wait for the driver process to start running
+ | before aborting its execution.
+ """.stripMargin)
+ .timeConf(TimeUnit.SECONDS)
+ .createWithDefault(60L)
+
+ private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE =
+ ConfigBuilder("spark.ssl.kubernetes.submit.keyStore")
+ .doc("""
+ | KeyStore file for the driver submission server listening
+ | on SSL. Can be pre-mounted on the driver container
+ | or uploaded from the submitting client.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE =
+ ConfigBuilder("spark.ssl.kubernetes.submit.trustStore")
+ .doc("""
+ | TrustStore containing certificates for communicating
+ | to the driver submission server over SSL.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_SERVICE_NAME =
+ ConfigBuilder("spark.kubernetes.driver.service.name")
+ .doc("""
+ | Kubernetes service that exposes the driver pod
+ | for external access.
+ """.stripMargin)
+ .internal()
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_DRIVER_POD_NAME =
+ ConfigBuilder("spark.kubernetes.driver.pod.name")
+ .doc("""
+ | Name of the driver pod.
+ """.stripMargin)
+ .internal()
+ .stringConf
+ .createOptional
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
new file mode 100644
index 0000000000000..027cc3c022b4e
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes
+
+package object constants {
+ // Labels
+ private[spark] val SPARK_DRIVER_LABEL = "spark-driver"
+ private[spark] val SPARK_APP_ID_LABEL = "spark-app-id"
+ private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name"
+ private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
+
+ // Secrets
+ private[spark] val DRIVER_CONTAINER_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission"
+ private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret"
+ private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret"
+ private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume"
+ private[spark] val SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME =
+ "spark-submission-server-key-password"
+ private[spark] val SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME =
+ "spark-submission-server-keystore-password"
+ private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore"
+ private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl"
+ private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
+
+ // Default and fixed ports
+ private[spark] val SUBMISSION_SERVER_PORT = 7077
+ private[spark] val DEFAULT_DRIVER_PORT = 7078
+ private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
+ private[spark] val DEFAULT_UI_PORT = 4040
+ private[spark] val UI_PORT_NAME = "spark-ui-port"
+ private[spark] val SUBMISSION_SERVER_PORT_NAME = "submit-server"
+ private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
+ private[spark] val DRIVER_PORT_NAME = "driver"
+ private[spark] val EXECUTOR_PORT_NAME = "executor"
+
+ // Environment Variables
+ private[spark] val ENV_SUBMISSION_SECRET_LOCATION = "SPARK_SUBMISSION_SECRET_LOCATION"
+ private[spark] val ENV_SUBMISSION_SERVER_PORT = "SPARK_SUBMISSION_SERVER_PORT"
+ private[spark] val ENV_SUBMISSION_KEYSTORE_FILE = "SPARK_SUBMISSION_KEYSTORE_FILE"
+ private[spark] val ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE =
+ "SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
+ private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE =
+ "SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
+ private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE"
+ private[spark] val ENV_SUBMISSION_USE_SSL = "SPARK_SUBMISSION_USE_SSL"
+ private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
+ private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
+ private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
+ private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
+ private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
+ private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
+
+ // Miscellaneous
+ private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
+ private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit"
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
index 813d070e0f876..8beba23bc8e11 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
@@ -20,23 +20,22 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
import org.apache.spark.SPARK_VERSION
-// TODO: jars should probably be compressed. Shipping tarballs would be optimal.
case class KubernetesCreateSubmissionRequest(
- val appResource: AppResource,
- val mainClass: String,
- val appArgs: Array[String],
- val sparkProperties: Map[String, String],
- val secret: String,
- val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
+ appResource: AppResource,
+ mainClass: String,
+ appArgs: Array[String],
+ sparkProperties: Map[String, String],
+ secret: String,
+ uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
}
case class TarGzippedData(
- val dataBase64: String,
- val blockSize: Int = 10240,
- val recordSize: Int = 512,
- val encoding: String
+ dataBase64: String,
+ blockSize: Int = 10240,
+ recordSize: Int = 512,
+ encoding: String
)
@JsonTypeInfo(
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala
index 3cbcb16293b1d..18eb9b7a12ca6 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala
@@ -28,12 +28,11 @@ trait KubernetesSparkRestApi {
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/create")
- def create(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse
+ def submitApplication(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse
@GET
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/ping")
def ping(): PingResponse
-
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
index dae4b2714b4e4..550ddd113fa42 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
@@ -21,17 +21,18 @@ import java.util.concurrent.Executors
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import com.google.common.util.concurrent.ThreadFactoryBuilder
-import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, EnvVar, EnvVarBuilder, Pod, QuantityBuilder}
+import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder}
+import org.apache.spark.deploy.kubernetes.config._
+import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
@@ -44,24 +45,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod]
private val kubernetesMaster = Client.resolveK8sMaster(sc.master)
-
- private val executorDockerImage = conf
- .get("spark.kubernetes.executor.docker.image", s"spark-executor:${sc.version}")
-
- private val kubernetesNamespace = conf.get("spark.kubernetes.namespace", "default")
-
+ private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
+ private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
-
private val blockmanagerPort = conf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
private val kubernetesDriverServiceName = conf
- .getOption("spark.kubernetes.driver.service.name")
+ .get(KUBERNETES_DRIVER_SERVICE_NAME)
.getOrElse(
throw new SparkException("Must specify the service name the driver is running with"))
private val kubernetesDriverPodName = conf
- .getOption("spark.kubernetes.driver.pod.name")
+ .get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(
throw new SparkException("Must specify the driver pod name"))
@@ -69,7 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
private val memoryOverheadBytes = conf
- .getOption("spark.kubernetes.executor.memoryOverhead")
+ .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
.map(overhead => Utils.byteStringAsBytes(overhead))
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt,
MEMORY_OVERHEAD_MIN))
@@ -78,16 +74,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("kubernetes-executor-requests-%d")
- .build))
+ ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
private val kubernetesClient = KubernetesClientBuilder
.buildFromWithinPod(kubernetesMaster, kubernetesNamespace)
- val driverPod = try {
+ private val driverPod = try {
kubernetesClient.pods().inNamespace(kubernetesNamespace).
withName(kubernetesDriverPodName).get()
} catch {
@@ -127,6 +119,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}
+ override def applicationId(): String = conf.get("spark.app.id", super.applicationId())
+
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
}
@@ -163,9 +157,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
private def allocateNewExecutorPod(): (String, Pod) = {
val executorKubernetesId = UUID.randomUUID().toString.replaceAll("-", "")
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
- val name = s"$kubernetesDriverServiceName-exec-$executorKubernetesId"
- val selectors = Map(SPARK_EXECUTOR_SELECTOR -> executorId,
- SPARK_APP_SELECTOR -> applicationId()).asJava
+ val name = s"${applicationId()}-exec-$executorKubernetesId"
+ val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId,
+ SPARK_APP_ID_LABEL -> applicationId()).asJava
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(executorMemoryBytes.toString)
.build()
@@ -175,69 +169,61 @@ private[spark] class KubernetesClusterSchedulerBackend(
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCores)
.build()
- val requiredEnv = new ArrayBuffer[EnvVar]
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_EXECUTOR_PORT")
- .withValue(executorPort.toString)
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_DRIVER_URL")
- .withValue(driverUrl)
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_EXECUTOR_CORES")
- .withValue(executorCores)
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_EXECUTOR_MEMORY")
- .withValue(executorMemory)
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_APPLICATION_ID")
- .withValue(applicationId())
- .build()
- requiredEnv += new EnvVarBuilder()
- .withName("SPARK_EXECUTOR_ID")
- .withValue(executorId)
- .build()
- val requiredPorts = new ArrayBuffer[ContainerPort]
- requiredPorts += new ContainerPortBuilder()
- .withName(EXECUTOR_PORT_NAME)
- .withContainerPort(executorPort)
- .build()
- requiredPorts += new ContainerPortBuilder()
- .withName(BLOCK_MANAGER_PORT_NAME)
- .withContainerPort(blockmanagerPort)
- .build()
- (executorKubernetesId, kubernetesClient.pods().createNew()
- .withNewMetadata()
- .withName(name)
- .withLabels(selectors)
- .withOwnerReferences()
- .addNewOwnerReference()
- .withController(true)
- .withApiVersion(driverPod.getApiVersion)
- .withKind(driverPod.getKind)
- .withName(driverPod.getMetadata.getName)
- .withUid(driverPod.getMetadata.getUid)
- .endOwnerReference()
- .endMetadata()
- .withNewSpec()
- .addNewContainer()
- .withName(s"exec-${applicationId()}-container")
- .withImage(executorDockerImage)
- .withImagePullPolicy("IfNotPresent")
- .withNewResources()
- .addToRequests("memory", executorMemoryQuantity)
- .addToLimits("memory", executorMemoryLimitQuantity)
- .addToRequests("cpu", executorCpuQuantity)
- .addToLimits("cpu", executorCpuQuantity)
- .endResources()
- .withEnv(requiredEnv.asJava)
- .withPorts(requiredPorts.asJava)
- .endContainer()
- .endSpec()
- .done())
+ val requiredEnv = Seq(
+ (ENV_EXECUTOR_PORT, executorPort.toString),
+ (ENV_DRIVER_URL, driverUrl),
+ (ENV_EXECUTOR_CORES, executorCores),
+ (ENV_EXECUTOR_MEMORY, executorMemory),
+ (ENV_APPLICATION_ID, applicationId()),
+ (ENV_EXECUTOR_ID, executorId)
+ ).map(env => new EnvVarBuilder()
+ .withName(env._1)
+ .withValue(env._2)
+ .build())
+ val requiredPorts = Seq(
+ (EXECUTOR_PORT_NAME, executorPort),
+ (BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
+ .map(port => {
+ new ContainerPortBuilder()
+ .withName(port._1)
+ .withContainerPort(port._2)
+ .build()
+ })
+ try {
+ (executorKubernetesId, kubernetesClient.pods().createNew()
+ .withNewMetadata()
+ .withName(name)
+ .withLabels(selectors)
+ .withOwnerReferences()
+ .addNewOwnerReference()
+ .withController(true)
+ .withApiVersion(driverPod.getApiVersion)
+ .withKind(driverPod.getKind)
+ .withName(driverPod.getMetadata.getName)
+ .withUid(driverPod.getMetadata.getUid)
+ .endOwnerReference()
+ .endMetadata()
+ .withNewSpec()
+ .addNewContainer()
+ .withName(s"executor")
+ .withImage(executorDockerImage)
+ .withImagePullPolicy("IfNotPresent")
+ .withNewResources()
+ .addToRequests("memory", executorMemoryQuantity)
+ .addToLimits("memory", executorMemoryLimitQuantity)
+ .addToRequests("cpu", executorCpuQuantity)
+ .addToLimits("cpu", executorCpuQuantity)
+ .endResources()
+ .withEnv(requiredEnv.asJava)
+ .withPorts(requiredPorts.asJava)
+ .endContainer()
+ .endSpec()
+ .done())
+ } catch {
+ case throwable: Throwable =>
+ logError("Failed to allocate executor pod.", throwable)
+ throw throwable
+ }
}
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
@@ -269,13 +255,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
private object KubernetesClusterSchedulerBackend {
- private val SPARK_EXECUTOR_SELECTOR = "spark-exec"
- private val SPARK_APP_SELECTOR = "spark-app"
private val DEFAULT_STATIC_PORT = 10000
- private val DEFAULT_BLOCKMANAGER_PORT = 7079
- private val DEFAULT_DRIVER_PORT = 7078
- private val BLOCK_MANAGER_PORT_NAME = "blockmanager"
- private val EXECUTOR_PORT_NAME = "executor"
private val MEMORY_OVERHEAD_FACTOR = 0.10
private val MEMORY_OVERHEAD_MIN = 384L
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
index 070008fce7410..92fdfb8ac5f41 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
@@ -27,6 +27,6 @@ CMD SSL_ARGS="" && \
if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \
exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \
--hostname $HOSTNAME \
- --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT \
+ --port $SPARK_SUBMISSION_SERVER_PORT \
--secret-file $SPARK_SUBMISSION_SECRET_LOCATION \
${SSL_ARGS}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
index 13edea02dce9a..16de71118dec4 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
@@ -172,7 +172,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.set("spark.executor.memory", "500m")
.set("spark.executor.cores", "1")
.set("spark.executors.instances", "1")
- .set("spark.app.id", "spark-pi")
+ .set("spark.app.name", "spark-pi")
.set("spark.ui.enabled", "true")
.set("spark.testing", "false")
val mainAppResource = s"file://$EXAMPLES_JAR"
@@ -298,11 +298,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.getLabels
// We can't match all of the selectors directly since one of the selectors is based on the
// launch time.
- assert(driverPodLabels.size == 4, "Unexpected number of pod labels.")
- assert(driverPodLabels.containsKey("driver-launcher-selector"), "Expected driver launcher" +
- " selector label to be present.")
+ assert(driverPodLabels.size == 5, "Unexpected number of pod labels.")
assert(driverPodLabels.get("spark-app-name") == "spark-pi", "Unexpected value for" +
" spark-app-name label.")
+ assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" +
+ " spark-app-id label (should be prefixed with the app name).")
assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1")
assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2")
}
@@ -323,12 +323,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
"--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}",
"--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
"--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
- "--conf", "spark.ssl.kubernetes.driverlaunch.enabled=true",
- "--conf", "spark.ssl.kubernetes.driverlaunch.keyStore=" +
+ "--conf", "spark.ssl.kubernetes.submit.enabled=true",
+ "--conf", "spark.ssl.kubernetes.submit.keyStore=" +
s"file://${keyStoreFile.getAbsolutePath}",
- "--conf", "spark.ssl.kubernetes.driverlaunch.keyStorePassword=changeit",
- "--conf", "spark.ssl.kubernetes.driverlaunch.keyPassword=changeit",
- "--conf", "spark.ssl.kubernetes.driverlaunch.trustStore=" +
+ "--conf", "spark.ssl.kubernetes.submit.keyStorePassword=changeit",
+ "--conf", "spark.ssl.kubernetes.submit.keyPassword=changeit",
+ "--conf", "spark.ssl.kubernetes.submit.trustStore=" +
s"file://${trustStoreFile.getAbsolutePath}",
"--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit",
EXAMPLES_JAR)