Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{CONFIG, EXECUTOR_ENV_REGEX}
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* Structure containing metadata for Kubernetes logic to build Spark pods.
Expand Down Expand Up @@ -83,12 +84,27 @@ private[spark] class KubernetesDriverConf(
val mainAppResource: MainAppResource,
val mainClass: String,
val appArgs: Array[String],
val proxyUser: Option[String])
extends KubernetesConf(sparkConf) {
val proxyUser: Option[String],
clock: Clock = new SystemClock())
extends KubernetesConf(sparkConf) with Logging {

def driverNodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX)

lazy val driverServiceName: String = {
val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
}

override val resourceNamePrefix: String = {
val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None
custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}

import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
Expand All @@ -39,17 +39,7 @@ private[spark] class DriverServiceFeatureStep(
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
"managed via a Kubernetes service.")

private val preferredServiceName = s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX"
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
private val resolvedServiceName = kubernetesConf.driverServiceName
private val ipFamilyPolicy =
kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY)
private val ipFamilies =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.api.model.Pod
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
import org.apache.spark.util.{Clock, SystemClock}

/**
* Builder methods for KubernetesConf that allow easy control over what to return for a few
Expand Down Expand Up @@ -52,7 +53,8 @@ object KubernetesTestConf {
secretEnvNamesToKeyRefs: Map[String, String] = Map.empty,
secretNamesToMountPaths: Map[String, String] = Map.empty,
volumes: Seq[KubernetesVolumeSpec] = Seq.empty,
proxyUser: Option[String] = None): KubernetesDriverConf = {
proxyUser: Option[String] = None,
clock: Clock = new SystemClock()): KubernetesDriverConf = {
val conf = sparkConf.clone()

resourceNamePrefix.foreach { prefix =>
Expand All @@ -67,7 +69,7 @@ object KubernetesTestConf {
setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, secretEnvNamesToKeyRefs)
setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes)

new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, proxyUser)
new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, proxyUser, clock)
}
// scalastyle:on argcount

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
}

test("Long prefixes should switch to using a generated unique name.") {
val clock = new ManualClock()
val sparkConf = new SparkConf(false)
.set(KUBERNETES_NAMESPACE, "my-namespace")
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS)
val clock = new ManualClock()

// Ensure that multiple services created at the same time generate unique names.
val services = (1 to 10).map { _ =>
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS,
clock = clock)
val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock)
val serviceName = configurationStep
.getAdditionalKubernetesResources()
Expand All @@ -130,11 +131,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
val hostAddress = configurationStep
.getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key)

(serviceName -> hostAddress)
}.toMap
Tuple3(kconf, serviceName, hostAddress)
}

assert(services.size === 10)
services.foreach { case (name, address) =>
services.foreach { case (kconf, name, address) =>
assert(!name.startsWith(kconf.resourceNamePrefix))
assert(!address.startsWith(kconf.resourceNamePrefix))
assert(InternetDomainName.isValid(address))
Expand Down