1717package org .apache .spark .deploy .kubernetes .submit
1818
1919import java .io .File
20- import java .util .Collections
20+ import java .util .{ Collections , UUID }
2121
2222import io .fabric8 .kubernetes .api .model .{ContainerBuilder , EnvVarBuilder , OwnerReferenceBuilder , PodBuilder , QuantityBuilder }
2323import io .fabric8 .kubernetes .client .KubernetesClient
2424import scala .collection .JavaConverters ._
2525
26- import org .apache .spark .{ SparkConf , SparkException }
26+ import org .apache .spark .SparkConf
2727import org .apache .spark .deploy .kubernetes .{ConfigurationUtils , SparkKubernetesClientFactory }
2828import org .apache .spark .deploy .kubernetes .config ._
2929import org .apache .spark .deploy .kubernetes .constants ._
@@ -43,22 +43,21 @@ import org.apache.spark.util.Utils
4343 * where different steps of submission should be factored out into separate classes.
4444 */
4545private [spark] class Client (
46- appName : String ,
47- kubernetesAppId : String ,
48- mainClass : String ,
49- sparkConf : SparkConf ,
50- appArgs : Array [String ],
51- sparkJars : Seq [String ],
52- sparkFiles : Seq [String ],
53- waitForAppCompletion : Boolean ,
54- kubernetesClient : KubernetesClient ,
55- initContainerComponentsProvider : DriverInitContainerComponentsProvider ,
56- kubernetesCredentialsMounterProvider : DriverPodKubernetesCredentialsMounterProvider ,
57- loggingPodStatusWatcher : LoggingPodStatusWatcher )
58- extends Logging {
59-
46+ appName : String ,
47+ kubernetesResourceNamePrefix : String ,
48+ kubernetesAppId : String ,
49+ mainClass : String ,
50+ sparkConf : SparkConf ,
51+ appArgs : Array [String ],
52+ sparkJars : Seq [String ],
53+ sparkFiles : Seq [String ],
54+ waitForAppCompletion : Boolean ,
55+ kubernetesClient : KubernetesClient ,
56+ initContainerComponentsProvider : DriverInitContainerComponentsProvider ,
57+ kubernetesCredentialsMounterProvider : DriverPodKubernetesCredentialsMounterProvider ,
58+ loggingPodStatusWatcher : LoggingPodStatusWatcher ) extends Logging {
6059 private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME )
61- .getOrElse(kubernetesAppId )
60+ .getOrElse(s " $kubernetesResourceNamePrefix -driver " )
6261 private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE )
6362 private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY )
6463
@@ -86,15 +85,16 @@ private[spark] class Client(
8685 val parsedCustomLabels = ConfigurationUtils .parseKeyValuePairs(
8786 customLabels, KUBERNETES_DRIVER_LABELS .key, " labels" )
8887 require(! parsedCustomLabels.contains(SPARK_APP_ID_LABEL ), s " Label with key " +
89- s " $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping operations. " )
90- require(! parsedCustomLabels.contains(SPARK_APP_NAME_LABEL ), s " Label with key " +
91- s " $SPARK_APP_NAME_LABEL is not allowed as it is reserved for Spark bookkeeping operations. " )
88+ s " $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
89+ s " operations. " )
90+ val parsedCustomAnnotations = ConfigurationUtils .parseKeyValuePairs(
91+ customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS .key, " annotations" )
92+ require(! parsedCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION ), s " Annotation with key " +
93+ s " $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for Spark bookkeeping " +
94+ s " operations. " )
9295 val allLabels = parsedCustomLabels ++ Map (
9396 SPARK_APP_ID_LABEL -> kubernetesAppId,
94- SPARK_APP_NAME_LABEL -> appName,
95- SPARK_ROLE_LABEL -> " driver" )
96- val parsedCustomAnnotations = ConfigurationUtils .parseKeyValuePairs(
97- customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS .key, " annotations" )
97+ SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE )
9898
9999 val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
100100 new EnvVarBuilder ()
@@ -140,6 +140,7 @@ private[spark] class Client(
140140 .withName(kubernetesDriverPodName)
141141 .addToLabels(allLabels.asJava)
142142 .addToAnnotations(parsedCustomAnnotations.asJava)
143+ .addToAnnotations(SPARK_APP_NAME_ANNOTATION , appName)
143144 .endMetadata()
144145 .withNewSpec()
145146 .withRestartPolicy(" Never" )
@@ -186,6 +187,7 @@ private[spark] class Client(
186187 }
187188 resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME , kubernetesDriverPodName)
188189 resolvedSparkConf.set(" spark.app.id" , kubernetesAppId)
190+ resolvedSparkConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX , kubernetesResourceNamePrefix)
189191 // We don't need this anymore since we just set the JVM options on the environment
190192 resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS )
191193 val resolvedLocalClasspath = containerLocalizedFilesResolver
@@ -234,11 +236,11 @@ private[spark] class Client(
234236 throw e
235237 }
236238 if (waitForAppCompletion) {
237- logInfo(s " Waiting for application $kubernetesAppId to finish... " )
239+ logInfo(s " Waiting for application $appName to finish... " )
238240 loggingPodStatusWatcher.awaitCompletion()
239- logInfo(s " Application $kubernetesAppId finished. " )
241+ logInfo(s " Application $appName finished. " )
240242 } else {
241- logInfo(s " Deployed Spark application $kubernetesAppId into Kubernetes. " )
243+ logInfo(s " Deployed Spark application $appName into Kubernetes. " )
242244 }
243245 }
244246 }
@@ -279,15 +281,21 @@ private[spark] object Client {
279281 val sparkFiles = sparkConf.getOption(" spark.files" )
280282 .map(_.split(" ," ))
281283 .getOrElse(Array .empty[String ])
282- val appName = sparkConf.getOption(" spark.app.name" )
283- .getOrElse(" spark" )
284- val kubernetesAppId = s " $appName- $launchTime" .toLowerCase.replaceAll(" \\ ." , " -" )
284+ val appName = sparkConf.getOption(" spark.app.name" ).getOrElse(" spark" )
285+ // The resource name prefix is derived from the application name, making it easy to connect the
286+ // names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the
287+ // application the user submitted. However, we can't use the application name in the label, as
288+ // label values are considerably restrictive, e.g. must be no longer than 63 characters in
289+ // length. So we generate a separate identifier for the app ID itself, and bookkeeping that
290+ // requires finding "all pods for this application" should use the kubernetesAppId.
291+ val kubernetesResourceNamePrefix = s " $appName- $launchTime" .toLowerCase.replaceAll(" \\ ." , " -" )
292+ val kubernetesAppId = s " spark- ${UUID .randomUUID().toString.replaceAll(" -" , " " )}"
285293 val namespace = sparkConf.get(KUBERNETES_NAMESPACE )
286294 val master = resolveK8sMaster(sparkConf.get(" spark.master" ))
287295 val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl (sparkConf)
288296 val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl (
289297 sparkConf,
290- kubernetesAppId ,
298+ kubernetesResourceNamePrefix ,
291299 namespace,
292300 sparkJars,
293301 sparkFiles,
@@ -300,14 +308,16 @@ private[spark] object Client {
300308 None ,
301309 None )) { kubernetesClient =>
302310 val kubernetesCredentialsMounterProvider =
303- new DriverPodKubernetesCredentialsMounterProviderImpl (sparkConf, kubernetesAppId)
311+ new DriverPodKubernetesCredentialsMounterProviderImpl (
312+ sparkConf, kubernetesResourceNamePrefix)
304313 val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION )
305314 val loggingInterval = Option (sparkConf.get(REPORT_INTERVAL ))
306315 .filter( _ => waitForAppCompletion)
307316 val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl (
308- kubernetesAppId , loggingInterval)
317+ kubernetesResourceNamePrefix , loggingInterval)
309318 new Client (
310319 appName,
320+ kubernetesResourceNamePrefix,
311321 kubernetesAppId,
312322 mainClass,
313323 sparkConf,
0 commit comments