Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ package org.apache.spark.deploy.k8s
import java.util.Locale

import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep.{DRIVER_SVC_POSTFIX, MAX_SERVICE_NAME_LENGTH}
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.Utils
import org.apache.spark.util.{SystemClock, Utils}


/**
* Structure containing metadata for Kubernetes logic to build Spark pods.
Expand Down Expand Up @@ -168,7 +172,7 @@ private[spark] class KubernetesExecutorConf(

}

private[spark] object KubernetesConf {
private[spark] object KubernetesConf extends Logging{
def createDriverConf(
sparkConf: SparkConf,
appId: String,
Expand Down Expand Up @@ -199,4 +203,35 @@ private[spark] object KubernetesConf {
.replaceAll("[^a-z0-9\\-]", "")
.replaceAll("-+", "-")
}

def getStandardPodName(driverPodName: String): String = {
KubernetesResourceUtil.sanitizeName(driverPodName).replaceAll("^-", "")
}

def getStandardSecretName(secretName: String): String = {
KubernetesResourceUtil.sanitizeName(secretName).replaceAll("^-", "")
}

def getStandardServiceName(serviceName: String): String = {
val preferredServiceName = KubernetesResourceUtil.sanitizeName(serviceName).replaceAll("^-", "")

val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH
&& Character.isLetter(preferredServiceName.charAt(0))) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID(new SystemClock())
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters)" +
s"or the first character of $preferredServiceName is not letter which is not support. " +
s"Falling back to use $shorterServiceName as the driver service's name.")
shorterServiceName
}
resolvedServiceName
}

def getStandardConfigMapName(configMapName: String): String = {
KubernetesResourceUtil.sanitizeName(configMapName).replaceAll("^-", "")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import org.apache.spark.util.Utils
private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
extends KubernetesFeatureConfigStep {

private val driverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(s"${conf.resourceNamePrefix}-driver")
private val driverPodName = KubernetesConf.getStandardPodName(conf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(s"${conf.resourceNamePrefix}-driver"))

private val driverContainerImage = conf
.get(DRIVER_CONTAINER_IMAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ private[spark] class BasicExecutorFeatureStep(
private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)

override def configurePod(pod: SparkPod): SparkPod = {
val name = s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}"
val name = KubernetesConf.getStandardPodName(
s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}")

// hostname must be no longer than 63 characters, so take the last 63 characters of the pod
// name as the hostname. This preserves uniqueness since the end of name contains
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube
clientKeyDataBase64.isDefined ||
clientCertDataBase64.isDefined

private val driverCredentialsSecretName =
s"${kubernetesConf.resourceNamePrefix}-kubernetes-credentials"
private val driverCredentialsSecretName = KubernetesConf.getStandardSecretName(
s"${kubernetesConf.resourceNamePrefix}-kubernetes-credentials")


override def configurePod(pod: SparkPod): SparkPod = {
if (!shouldMountSecret) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}

import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.{Logging, config}
import org.apache.spark.util.{Clock, SystemClock}

private[spark] class DriverServiceFeatureStep(
Expand All @@ -39,16 +37,7 @@ private[spark] class DriverServiceFeatureStep(
"managed via a Kubernetes service.")

private val preferredServiceName = s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX"
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
private val resolvedServiceName = KubernetesConf.getStandardServiceName(preferredServiceName)

private val driverPort = kubernetesConf.sparkConf.getInt(
config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
}
}

private def newConfigMapName: String = s"${conf.resourceNamePrefix}-hadoop-config"
private def newConfigMapName: String = KubernetesConf.getStandardConfigMapName(
s"${conf.resourceNamePrefix}-hadoop-config")

private def hasHadoopConf: Boolean = confDir.isDefined || existingConfMap.isDefined

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import java.io.File
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._

import com.google.common.io.Files
import io.fabric8.kubernetes.api.model._
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
Expand Down Expand Up @@ -108,13 +106,16 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri

private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_))

private def dtSecretName: String = s"${kubernetesConf.resourceNamePrefix}-delegation-tokens"
private def dtSecretName: String = KubernetesConf.getStandardSecretName(
s"${kubernetesConf.resourceNamePrefix}-delegation-tokens")

private def ktSecretName: String = s"${kubernetesConf.resourceNamePrefix}-kerberos-keytab"
private def ktSecretName: String = KubernetesConf.getStandardSecretName(
s"${kubernetesConf.resourceNamePrefix}-kerberos-keytab")

private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined

private def newConfigMapName: String = s"${kubernetesConf.resourceNamePrefix}-krb5-file"
private def newConfigMapName: String = KubernetesConf.getStandardConfigMapName(
s"${kubernetesConf.resourceNamePrefix}-krb5-file")

override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if hasKerberosConf =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private[spark] class Client(

def run(): Unit = {
val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
val configMapName = KubernetesConf.getStandardConfigMapName(
s"${conf.resourceNamePrefix}-driver-conf-map")
val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
// The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
// Spark command builder to pickup on the Java Options present in the ConfigMap
Expand Down