From 343e67780b1550aced368b4eb65b6146de519761 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 30 May 2017 10:50:48 -0700 Subject: [PATCH 1/8] Use node affinity to launch executors on data local nodes --- .../spark/deploy/kubernetes/constants.scala | 1 + .../KubernetesClusterSchedulerBackend.scala | 79 ++++++++++++++++--- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 950c1f6efe4e8..a0749fc42a1d3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -87,6 +87,7 @@ package object constants { private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret" // Miscellaneous + private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity" private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 5627f7c20de3d..0f760799bc830 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,16 +17,18 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable +import java.net.InetAddress import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} @@ -154,10 +156,24 @@ private[spark] class KubernetesClusterSchedulerBackend( } else if (totalExpectedExecutors.get() <= runningExecutorPods.size) { logDebug("Maximum allowed executor limit reached. Not scaling up further.") } else { + val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs.values.toList + } + val nodeToLocalTaskCount = mutable.Map[String, Int]() ++ + KubernetesClusterSchedulerBackend.this.synchronized { + hostToLocalTaskCount + } + for (pod <- executorPodsWithIPs) { + nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty || + nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty || + nodeToLocalTaskCount.remove( + InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty + } + val preferredNodes = nodeToLocalTaskCount.toMap[String, Int] RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (i <- 0 until math.min( totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) { - runningExecutorPods += allocateNewExecutorPod() + runningExecutorPods += allocateNewExecutorPod(preferredNodes) logInfo( s"Requesting a new executor, total executors is now ${runningExecutorPods.size}") } @@ -241,7 +257,13 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - private def allocateNewExecutorPod(): (String, Pod) = { + /** + * Allocates a new executor pod + * @param nodeToTaskCount A map of K8s cluster nodes to the number of tasks that could benefit + * from data locality if an executor launches on the cluster node. + * @return A tuple of the new executor name and the Pod data structure. + */ + private def allocateNewExecutorPod(nodeToTaskCount: Map[String, Int]): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString val name = s"${applicationId()}-exec-$executorId" @@ -348,14 +370,49 @@ private[spark] class KubernetesClusterSchedulerBackend( .endSpec() } }.getOrElse(basePodBuilder) - val resolvedExecutorPod = executorInitContainerBootstrap.map { bootstrap => - bootstrap.bootstrapInitContainerAndVolumes( - "executor", - withMaybeShuffleConfigPodBuilder) - }.getOrElse(withMaybeShuffleConfigPodBuilder) + val executorInitContainerPodBuilder = executorInitContainerBootstrap.map { + bootstrap => + bootstrap.bootstrapInitContainerAndVolumes( + "executor", + withMaybeShuffleConfigPodBuilder) + }.getOrElse(withMaybeShuffleConfigPodBuilder) + + val resolvedExecutorPodBuilder = if (nodeToTaskCount.nonEmpty) { + val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2) + // Normalize to node affinity weights in 1 to 100 range. + val nodeToWeight = nodeToTaskCount.map{ + case (node, taskCount) => + (node, math.min(100, math.max(1, math.round(taskCount * 100.0 / taskTotal))))} + val weightToNodes = nodeToWeight.values.map( + weight => (weight, nodeToWeight.keys.filter(nodeToWeight(_) == weight))).toMap + // TODO: Use non-annotation syntax when we switch to K8s version 1.6. + val nodeAffinity = s"""{ + |"nodeAffinity": { + |"preferredDuringSchedulingIgnoredDuringExecution": [ + ${(for ((weight, nodes) <- weightToNodes) yield + s"""{"weight": $weight, + |"preference": { + |"matchExpressions": [{ + |"key": "kubernetes.io/hostname", + |"operator": "In", + |"values": ${nodes.mkString("[\"", "\",\"", "\"]")} + |}] + |} + |}""" + ).mkString(",")} + |] + |} + |}""".stripMargin.replaceAll("\n", " ") + logDebug(s"Adding nodeAffinity as annotation $nodeAffinity") + executorInitContainerPodBuilder.editMetadata() + .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinity) + .endMetadata() + } else { + executorInitContainerPodBuilder + } try { - (executorId, kubernetesClient.pods.create(resolvedExecutorPod.build())) + (executorId, kubernetesClient.pods.create(resolvedExecutorPodBuilder.build())) } catch { case throwable: Throwable => logError("Failed to allocate executor pod.", throwable) From beb378623742fe259f15ff6df504e3a1c2b4059c Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 30 May 2017 15:10:28 -0700 Subject: [PATCH 2/8] Fix comment style --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 0f760799bc830..b950a2ea9d45d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -258,11 +258,11 @@ private[spark] class KubernetesClusterSchedulerBackend( } /** - * Allocates a new executor pod - * @param nodeToTaskCount A map of K8s cluster nodes to the number of tasks that could benefit - * from data locality if an executor launches on the cluster node. - * @return A tuple of the new executor name and the Pod data structure. - */ + * Allocates a new executor pod + * @param nodeToTaskCount A map of K8s cluster nodes to the number of tasks that could benefit + * from data locality if an executor launches on the cluster node. + * @return A tuple of the new executor name and the Pod data structure. + */ private def allocateNewExecutorPod(nodeToTaskCount: Map[String, Int]): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString val name = s"${applicationId()}-exec-$executorId" From c22fce0b4a991c0c93e99da353ad402794aadc2c Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 1 Jun 2017 09:31:43 -0700 Subject: [PATCH 3/8] Use JSON object mapper --- .../KubernetesClusterSchedulerBackend.scala | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index b950a2ea9d45d..b41bcb72dbb72 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -21,14 +21,15 @@ import java.net.InetAddress import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future} - +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} @@ -164,6 +165,7 @@ private[spark] class KubernetesClusterSchedulerBackend( hostToLocalTaskCount } for (pod <- executorPodsWithIPs) { + // Remove cluster nodes that are running our executors already. nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty || nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty || nodeToLocalTaskCount.remove( @@ -182,6 +184,8 @@ private[spark] class KubernetesClusterSchedulerBackend( } } + private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) @@ -382,30 +386,21 @@ private[spark] class KubernetesClusterSchedulerBackend( // Normalize to node affinity weights in 1 to 100 range. val nodeToWeight = nodeToTaskCount.map{ case (node, taskCount) => - (node, math.min(100, math.max(1, math.round(taskCount * 100.0 / taskTotal))))} + (node, math.min(100, math.max(1, math.round(taskCount * 100.0 / taskTotal).toInt)))} val weightToNodes = nodeToWeight.values.map( weight => (weight, nodeToWeight.keys.filter(nodeToWeight(_) == weight))).toMap + val nodeAffinity = NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution = + for ((weight, nodes) <- weightToNodes) yield + WeightedPreference(weight, + Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) + ) + val nodeAffinityJson = + s"""{"nodeAffinity": ${objectMapper.writeValueAsString(nodeAffinity)}}""" + // TODO: Use non-annotation syntax when we switch to K8s version 1.6. - val nodeAffinity = s"""{ - |"nodeAffinity": { - |"preferredDuringSchedulingIgnoredDuringExecution": [ - ${(for ((weight, nodes) <- weightToNodes) yield - s"""{"weight": $weight, - |"preference": { - |"matchExpressions": [{ - |"key": "kubernetes.io/hostname", - |"operator": "In", - |"values": ${nodes.mkString("[\"", "\",\"", "\"]")} - |}] - |} - |}""" - ).mkString(",")} - |] - |} - |}""".stripMargin.replaceAll("\n", " ") - logDebug(s"Adding nodeAffinity as annotation $nodeAffinity") + logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") executorInitContainerPodBuilder.editMetadata() - .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinity) + .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson) .endMetadata() } else { executorInitContainerPodBuilder @@ -526,3 +521,9 @@ private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) } + +case class NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution: + Iterable[WeightedPreference]) +case class WeightedPreference(weight: Int, preference: Preference) +case class Preference(matchExpressions: Array[MatchExpression]) +case class MatchExpression(key: String, operator: String, values: Iterable[String]) From 74755c5048535542fd31d446cbee9c4edaf482ad Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 5 Jun 2017 11:32:25 -0700 Subject: [PATCH 4/8] Address review comments --- .../KubernetesClusterSchedulerBackend.scala | 101 +++++++++++------- 1 file changed, 60 insertions(+), 41 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 5f49ec8eb8cd8..f3b5fd1e1e3ff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -179,31 +179,18 @@ private[spark] class KubernetesClusterSchedulerBackend( .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") private val allocatorRunnable: Runnable = new Runnable { + override def run(): Unit = { if (totalRegisteredExecutors.get() < runningExecutorPods.size) { logDebug("Waiting for pending executors before scaling") } else if (totalExpectedExecutors.get() <= runningExecutorPods.size) { logDebug("Maximum allowed executor limit reached. Not scaling up further.") } else { - val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK.synchronized { - executorPodsByIPs.values.toList - } - val nodeToLocalTaskCount = mutable.Map[String, Int]() ++ - KubernetesClusterSchedulerBackend.this.synchronized { - hostToLocalTaskCount - } - for (pod <- executorPodsWithIPs) { - // Remove cluster nodes that are running our executors already. - nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty || - nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty || - nodeToLocalTaskCount.remove( - InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty - } - val preferredNodes = nodeToLocalTaskCount.toMap[String, Int] + val nodeToTaskCount = getCurrentNodesWithTaskCounts RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (i <- 0 until math.min( totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) { - runningExecutorPods += allocateNewExecutorPod(preferredNodes) + runningExecutorPods += allocateNewExecutorPod(nodeToTaskCount) logInfo( s"Requesting a new executor, total executors is now ${runningExecutorPods.size}") } @@ -299,9 +286,59 @@ private[spark] class KubernetesClusterSchedulerBackend( } } + private def getCurrentNodesWithTaskCounts() : Map[String, Int] = { + val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs.values.toList // toList makes a defensive copy. + } + val nodeToLocalTaskCount = mutable.Map[String, Int]() ++ + KubernetesClusterSchedulerBackend.this.synchronized { + hostToLocalTaskCount + } + for (pod <- executorPodsWithIPs) { + // Remove cluster nodes that are running our executors already. + nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty || + nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty || + nodeToLocalTaskCount.remove( + InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty + } + nodeToLocalTaskCount.toMap[String, Int] + } + + def addNodeAffinityAnnotationIfUseful(basePodBuilder: PodBuilder, + nodeToTaskCount: Map[String, Int]): PodBuilder = { + def scaleToRange(value: Int, baseMin: Double, baseMax: Double, + rangeMin: Double, rangeMax: Double): Int = + (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt + + if (nodeToTaskCount.nonEmpty) { + val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2) + // Normalize to node affinity weights in 1 to 100 range. + val nodeToWeight = nodeToTaskCount.map{ + case (node, taskCount) => + (node, scaleToRange(taskCount, 1, taskTotal, rangeMin = 1, rangeMax = 100))} + val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys) + // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node + val nodeAffinity = NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution = + for ((weight, nodes) <- weightToNodes) yield + WeightedPreference(weight, + Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) + ) + val nodeAffinityJson = + s"""{"nodeAffinity": ${objectMapper.writeValueAsString(nodeAffinity)}}""" + // TODO: Use non-annotation syntax when we switch to K8s version 1.6. + logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") + basePodBuilder.editMetadata() + .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson) + .endMetadata() + } else { + basePodBuilder + } + } + /** * Allocates a new executor pod - * @param nodeToTaskCount A map of K8s cluster nodes to the number of tasks that could benefit + * + * @param nodeToTaskCount A map of K8s cluster nodes to the number of tasks that could benefit * from data locality if an executor launches on the cluster node. * @return A tuple of the new executor name and the Pod data structure. */ @@ -415,6 +452,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .endSpec() } }.getOrElse(basePodBuilder) + val executorInitContainerPodBuilder = executorInitContainerBootstrap.map { bootstrap => bootstrap.bootstrapInitContainerAndVolumes( @@ -422,30 +460,8 @@ private[spark] class KubernetesClusterSchedulerBackend( withMaybeShuffleConfigPodBuilder) }.getOrElse(withMaybeShuffleConfigPodBuilder) - val resolvedExecutorPodBuilder = if (nodeToTaskCount.nonEmpty) { - val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2) - // Normalize to node affinity weights in 1 to 100 range. - val nodeToWeight = nodeToTaskCount.map{ - case (node, taskCount) => - (node, math.min(100, math.max(1, math.round(taskCount * 100.0 / taskTotal).toInt)))} - val weightToNodes = nodeToWeight.values.map( - weight => (weight, nodeToWeight.keys.filter(nodeToWeight(_) == weight))).toMap - val nodeAffinity = NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution = - for ((weight, nodes) <- weightToNodes) yield - WeightedPreference(weight, - Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) - ) - val nodeAffinityJson = - s"""{"nodeAffinity": ${objectMapper.writeValueAsString(nodeAffinity)}}""" - - // TODO: Use non-annotation syntax when we switch to K8s version 1.6. - logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") - executorInitContainerPodBuilder.editMetadata() - .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson) - .endMetadata() - } else { - executorInitContainerPodBuilder - } + val resolvedExecutorPodBuilder = addNodeAffinityAnnotationIfUseful( + executorInitContainerPodBuilder, nodeToTaskCount) try { (executorId, kubernetesClient.pods.create(resolvedExecutorPodBuilder.build())) @@ -570,6 +586,9 @@ private object KubernetesClusterSchedulerBackend { private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) } +// These case classes model K8s node affinity syntax for +// preferredDuringSchedulingIgnoredDuringExecution. +// @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#node-affinity-beta-feature case class NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution: Iterable[WeightedPreference]) case class WeightedPreference(weight: Int, preference: Preference) From f0fe6f68c09421878b3a445afb28beeea2e97b4e Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 8 Jun 2017 13:00:31 -0700 Subject: [PATCH 5/8] Fix a style issue --- .../cluster/kubernetes/KubernetesClusterSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index f3b5fd1e1e3ff..644586ec58c99 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -588,7 +588,7 @@ private object KubernetesClusterSchedulerBackend { // These case classes model K8s node affinity syntax for // preferredDuringSchedulingIgnoredDuringExecution. -// @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#node-affinity-beta-feature +// @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node case class NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution: Iterable[WeightedPreference]) case class WeightedPreference(weight: Int, preference: Preference) From b066b45d7cd631af2e5047b7dfbe5666ad62ea34 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 12 Jun 2017 11:22:20 -0700 Subject: [PATCH 6/8] Clean up and add a TODO --- .../KubernetesClusterSchedulerBackend.scala | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 644586ec58c99..204a57193d2f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -186,11 +186,11 @@ private[spark] class KubernetesClusterSchedulerBackend( } else if (totalExpectedExecutors.get() <= runningExecutorPods.size) { logDebug("Maximum allowed executor limit reached. Not scaling up further.") } else { - val nodeToTaskCount = getCurrentNodesWithTaskCounts + val nodeToLocalTaskCount = getNodesWithLocalTaskCounts RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (i <- 0 until math.min( totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) { - runningExecutorPods += allocateNewExecutorPod(nodeToTaskCount) + runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount) logInfo( s"Requesting a new executor, total executors is now ${runningExecutorPods.size}") } @@ -286,7 +286,11 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - private def getCurrentNodesWithTaskCounts() : Map[String, Int] = { + /** + * @return A map of K8s cluster nodes to the number of tasks that could benefit from data + * locality if an executor launches on the cluster node. + */ + private def getNodesWithLocalTaskCounts() : Map[String, Int] = { val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK.synchronized { executorPodsByIPs.values.toList // toList makes a defensive copy. } @@ -296,6 +300,9 @@ private[spark] class KubernetesClusterSchedulerBackend( } for (pod <- executorPodsWithIPs) { // Remove cluster nodes that are running our executors already. + // TODO: This prefers spreading out executors across nodes. In case users want + // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut + // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty || nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty || nodeToLocalTaskCount.remove( @@ -304,8 +311,8 @@ private[spark] class KubernetesClusterSchedulerBackend( nodeToLocalTaskCount.toMap[String, Int] } - def addNodeAffinityAnnotationIfUseful(basePodBuilder: PodBuilder, - nodeToTaskCount: Map[String, Int]): PodBuilder = { + private def addNodeAffinityAnnotationIfUseful(basePodBuilder: PodBuilder, + nodeToTaskCount: Map[String, Int]): PodBuilder = { def scaleToRange(value: Int, baseMin: Double, baseMax: Double, rangeMin: Double, rangeMax: Double): Int = (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt @@ -337,12 +344,13 @@ private[spark] class KubernetesClusterSchedulerBackend( /** * Allocates a new executor pod - * - * @param nodeToTaskCount A map of K8s cluster nodes to the number of tasks that could benefit - * from data locality if an executor launches on the cluster node. + * + * @param nodeToLocalTaskCount A map of K8s cluster nodes to the number of tasks that could + * benefit from data locality if an executor launches on the cluster + * node. * @return A tuple of the new executor name and the Pod data structure. */ - private def allocateNewExecutorPod(nodeToTaskCount: Map[String, Int]): (String, Pod) = { + private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString val name = s"${applicationId()}-exec-$executorId" @@ -461,7 +469,7 @@ private[spark] class KubernetesClusterSchedulerBackend( }.getOrElse(withMaybeShuffleConfigPodBuilder) val resolvedExecutorPodBuilder = addNodeAffinityAnnotationIfUseful( - executorInitContainerPodBuilder, nodeToTaskCount) + executorInitContainerPodBuilder, nodeToLocalTaskCount) try { (executorId, kubernetesClient.pods.create(resolvedExecutorPodBuilder.build())) From c67b8abdbe0774d279d816da4597ab9f8f122228 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 12 Jun 2017 15:38:31 -0700 Subject: [PATCH 7/8] Fix style issue --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 66727f8a49cd0..f330113350ffa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -291,9 +291,9 @@ private[spark] class KubernetesClusterSchedulerBackend( } /** - * @return A map of K8s cluster nodes to the number of tasks that could benefit from data - * locality if an executor launches on the cluster node. - */ + * @return A map of K8s cluster nodes to the number of tasks that could benefit from data + * locality if an executor launches on the cluster node. + */ private def getNodesWithLocalTaskCounts() : Map[String, Int] = { val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK.synchronized { executorPodsByIPs.values.toList // toList makes a defensive copy. From 7eb6365eb4c9d868732f2600f5fd340399e8a144 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 13 Jun 2017 14:48:32 -0700 Subject: [PATCH 8/8] Address review comments --- .../KubernetesClusterSchedulerBackend.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index f330113350ffa..85ce5f01200b2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -329,13 +329,12 @@ private[spark] class KubernetesClusterSchedulerBackend( (node, scaleToRange(taskCount, 1, taskTotal, rangeMin = 1, rangeMax = 100))} val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys) // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node - val nodeAffinity = NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution = - for ((weight, nodes) <- weightToNodes) yield - WeightedPreference(weight, - Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) - ) - val nodeAffinityJson = - s"""{"nodeAffinity": ${objectMapper.writeValueAsString(nodeAffinity)}}""" + val nodeAffinityJson = objectMapper.writeValueAsString(SchedulerAffinity(NodeAffinity( + preferredDuringSchedulingIgnoredDuringExecution = + for ((weight, nodes) <- weightToNodes) yield + WeightedPreference(weight, + Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) + ))) // TODO: Use non-annotation syntax when we switch to K8s version 1.6. logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") basePodBuilder.editMetadata() @@ -598,9 +597,12 @@ private object KubernetesClusterSchedulerBackend { private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) } -// These case classes model K8s node affinity syntax for -// preferredDuringSchedulingIgnoredDuringExecution. -// @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node +/** + * These case classes model K8s node affinity syntax for + * preferredDuringSchedulingIgnoredDuringExecution. + * @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node + */ +case class SchedulerAffinity(nodeAffinity: NodeAffinity) case class NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution: Iterable[WeightedPreference]) case class WeightedPreference(weight: Int, preference: Preference)