diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 9c46d7494b187..f2f1136e54fe4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -90,6 +90,7 @@ package object constants { private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret" // Miscellaneous + private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity" private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 6ab6480d848a2..85ce5f01200b2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,9 +17,12 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable +import java.net.InetAddress import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action @@ -177,16 +180,18 @@ private[spark] class KubernetesClusterSchedulerBackend( .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") private val allocatorRunnable: Runnable = new Runnable { + override def run(): Unit = { if (totalRegisteredExecutors.get() < runningExecutorPods.size) { logDebug("Waiting for pending executors before scaling") } else if (totalExpectedExecutors.get() <= runningExecutorPods.size) { logDebug("Maximum allowed executor limit reached. Not scaling up further.") } else { + val nodeToLocalTaskCount = getNodesWithLocalTaskCounts RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (i <- 0 until math.min( totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) { - runningExecutorPods += allocateNewExecutorPod() + runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount) logInfo( s"Requesting a new executor, total executors is now ${runningExecutorPods.size}") } @@ -195,6 +200,8 @@ private[spark] class KubernetesClusterSchedulerBackend( } } + private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + private def getShuffleClient(): KubernetesExternalShuffleClient = { new KubernetesExternalShuffleClient( SparkTransportConf.fromSparkConf(conf, "shuffle"), @@ -283,7 +290,70 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - private def allocateNewExecutorPod(): (String, Pod) = { + /** + * @return A map of K8s cluster nodes to the number of tasks that could benefit from data + * locality if an executor launches on the cluster node. + */ + private def getNodesWithLocalTaskCounts() : Map[String, Int] = { + val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs.values.toList // toList makes a defensive copy. + } + val nodeToLocalTaskCount = mutable.Map[String, Int]() ++ + KubernetesClusterSchedulerBackend.this.synchronized { + hostToLocalTaskCount + } + for (pod <- executorPodsWithIPs) { + // Remove cluster nodes that are running our executors already. + // TODO: This prefers spreading out executors across nodes. In case users want + // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut + // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html + nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty || + nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty || + nodeToLocalTaskCount.remove( + InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty + } + nodeToLocalTaskCount.toMap[String, Int] + } + + private def addNodeAffinityAnnotationIfUseful(basePodBuilder: PodBuilder, + nodeToTaskCount: Map[String, Int]): PodBuilder = { + def scaleToRange(value: Int, baseMin: Double, baseMax: Double, + rangeMin: Double, rangeMax: Double): Int = + (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt + + if (nodeToTaskCount.nonEmpty) { + val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2) + // Normalize to node affinity weights in 1 to 100 range. + val nodeToWeight = nodeToTaskCount.map{ + case (node, taskCount) => + (node, scaleToRange(taskCount, 1, taskTotal, rangeMin = 1, rangeMax = 100))} + val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys) + // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node + val nodeAffinityJson = objectMapper.writeValueAsString(SchedulerAffinity(NodeAffinity( + preferredDuringSchedulingIgnoredDuringExecution = + for ((weight, nodes) <- weightToNodes) yield + WeightedPreference(weight, + Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) + ))) + // TODO: Use non-annotation syntax when we switch to K8s version 1.6. + logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") + basePodBuilder.editMetadata() + .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson) + .endMetadata() + } else { + basePodBuilder + } + } + + /** + * Allocates a new executor pod + * + * @param nodeToLocalTaskCount A map of K8s cluster nodes to the number of tasks that could + * benefit from data locality if an executor launches on the cluster + * node. + * @return A tuple of the new executor name and the Pod data structure. + */ + private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString val name = s"$executorPodNamePrefix-exec-$executorId" @@ -393,14 +463,19 @@ private[spark] class KubernetesClusterSchedulerBackend( .endSpec() } }.getOrElse(basePodBuilder) - val resolvedExecutorPod = executorInitContainerBootstrap.map { bootstrap => - bootstrap.bootstrapInitContainerAndVolumes( - "executor", - withMaybeShuffleConfigPodBuilder) - }.getOrElse(withMaybeShuffleConfigPodBuilder) + + val executorInitContainerPodBuilder = executorInitContainerBootstrap.map { + bootstrap => + bootstrap.bootstrapInitContainerAndVolumes( + "executor", + withMaybeShuffleConfigPodBuilder) + }.getOrElse(withMaybeShuffleConfigPodBuilder) + + val resolvedExecutorPodBuilder = addNodeAffinityAnnotationIfUseful( + executorInitContainerPodBuilder, nodeToLocalTaskCount) try { - (executorId, kubernetesClient.pods.create(resolvedExecutorPod.build())) + (executorId, kubernetesClient.pods.create(resolvedExecutorPodBuilder.build())) } catch { case throwable: Throwable => logError("Failed to allocate executor pod.", throwable) @@ -521,3 +596,15 @@ private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) } + +/** + * These case classes model K8s node affinity syntax for + * preferredDuringSchedulingIgnoredDuringExecution. + * @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node + */ +case class SchedulerAffinity(nodeAffinity: NodeAffinity) +case class NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution: + Iterable[WeightedPreference]) +case class WeightedPreference(weight: Int, preference: Preference) +case class Preference(matchExpressions: Array[MatchExpression]) +case class MatchExpression(key: String, operator: String, values: Iterable[String])