1717package org .apache .spark .scheduler .cluster .kubernetes
1818
1919import java .io .Closeable
20+ import java .net .InetAddress
2021import java .util .concurrent .TimeUnit
2122import java .util .concurrent .atomic .{AtomicInteger , AtomicLong , AtomicReference }
2223
24+ import com .fasterxml .jackson .databind .ObjectMapper
25+ import com .fasterxml .jackson .module .scala .DefaultScalaModule
2326import io .fabric8 .kubernetes .api .model .{ContainerPortBuilder , EnvVarBuilder , EnvVarSourceBuilder , Pod , PodBuilder , QuantityBuilder }
2427import io .fabric8 .kubernetes .client .{KubernetesClient , KubernetesClientException , Watcher }
2528import io .fabric8 .kubernetes .client .Watcher .Action
@@ -177,16 +180,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
177180 .newDaemonSingleThreadScheduledExecutor(" kubernetes-pod-allocator" )
178181
179182 private val allocatorRunnable : Runnable = new Runnable {
183+
180184 override def run (): Unit = {
181185 if (totalRegisteredExecutors.get() < runningExecutorPods.size) {
182186 logDebug(" Waiting for pending executors before scaling" )
183187 } else if (totalExpectedExecutors.get() <= runningExecutorPods.size) {
184188 logDebug(" Maximum allowed executor limit reached. Not scaling up further." )
185189 } else {
190+ val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
186191 RUNNING_EXECUTOR_PODS_LOCK .synchronized {
187192 for (i <- 0 until math.min(
188193 totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) {
189- runningExecutorPods += allocateNewExecutorPod()
194+ runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount )
190195 logInfo(
191196 s " Requesting a new executor, total executors is now ${runningExecutorPods.size}" )
192197 }
@@ -195,6 +200,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
195200 }
196201 }
197202
203+ private val objectMapper = new ObjectMapper ().registerModule(DefaultScalaModule )
204+
198205 private def getShuffleClient (): KubernetesExternalShuffleClient = {
199206 new KubernetesExternalShuffleClient (
200207 SparkTransportConf .fromSparkConf(conf, " shuffle" ),
@@ -283,7 +290,70 @@ private[spark] class KubernetesClusterSchedulerBackend(
283290 }
284291 }
285292
286- private def allocateNewExecutorPod (): (String , Pod ) = {
293+ /**
294+ * @return A map of K8s cluster nodes to the number of tasks that could benefit from data
295+ * locality if an executor launches on the cluster node.
296+ */
297+ private def getNodesWithLocalTaskCounts () : Map [String , Int ] = {
298+ val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK .synchronized {
299+ executorPodsByIPs.values.toList // toList makes a defensive copy.
300+ }
301+ val nodeToLocalTaskCount = mutable.Map [String , Int ]() ++
302+ KubernetesClusterSchedulerBackend .this .synchronized {
303+ hostToLocalTaskCount
304+ }
305+ for (pod <- executorPodsWithIPs) {
306+ // Remove cluster nodes that are running our executors already.
307+ // TODO: This prefers spreading out executors across nodes. In case users want
308+ // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
309+ // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html
310+ nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty ||
311+ nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty ||
312+ nodeToLocalTaskCount.remove(
313+ InetAddress .getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty
314+ }
315+ nodeToLocalTaskCount.toMap[String , Int ]
316+ }
317+
318+ private def addNodeAffinityAnnotationIfUseful (basePodBuilder : PodBuilder ,
319+ nodeToTaskCount : Map [String , Int ]): PodBuilder = {
320+ def scaleToRange (value : Int , baseMin : Double , baseMax : Double ,
321+ rangeMin : Double , rangeMax : Double ): Int =
322+ (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt
323+
324+ if (nodeToTaskCount.nonEmpty) {
325+ val taskTotal = nodeToTaskCount.foldLeft(0 )(_ + _._2)
326+ // Normalize to node affinity weights in 1 to 100 range.
327+ val nodeToWeight = nodeToTaskCount.map{
328+ case (node, taskCount) =>
329+ (node, scaleToRange(taskCount, 1 , taskTotal, rangeMin = 1 , rangeMax = 100 ))}
330+ val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys)
331+ // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node
332+ val nodeAffinityJson = objectMapper.writeValueAsString(SchedulerAffinity (NodeAffinity (
333+ preferredDuringSchedulingIgnoredDuringExecution =
334+ for ((weight, nodes) <- weightToNodes) yield
335+ WeightedPreference (weight,
336+ Preference (Array (MatchExpression (" kubernetes.io/hostname" , " In" , nodes))))
337+ )))
338+ // TODO: Use non-annotation syntax when we switch to K8s version 1.6.
339+ logDebug(s " Adding nodeAffinity as annotation $nodeAffinityJson" )
340+ basePodBuilder.editMetadata()
341+ .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY , nodeAffinityJson)
342+ .endMetadata()
343+ } else {
344+ basePodBuilder
345+ }
346+ }
347+
348+ /**
349+ * Allocates a new executor pod
350+ *
351+ * @param nodeToLocalTaskCount A map of K8s cluster nodes to the number of tasks that could
352+ * benefit from data locality if an executor launches on the cluster
353+ * node.
354+ * @return A tuple of the new executor name and the Pod data structure.
355+ */
356+ private def allocateNewExecutorPod (nodeToLocalTaskCount : Map [String , Int ]): (String , Pod ) = {
287357 val executorId = EXECUTOR_ID_COUNTER .incrementAndGet().toString
288358 val name = s " $executorPodNamePrefix-exec- $executorId"
289359
@@ -393,14 +463,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
393463 .endSpec()
394464 }
395465 }.getOrElse(basePodBuilder)
396- val resolvedExecutorPod = executorInitContainerBootstrap.map { bootstrap =>
397- bootstrap.bootstrapInitContainerAndVolumes(
398- " executor" ,
399- withMaybeShuffleConfigPodBuilder)
400- }.getOrElse(withMaybeShuffleConfigPodBuilder)
466+
467+ val executorInitContainerPodBuilder = executorInitContainerBootstrap.map {
468+ bootstrap =>
469+ bootstrap.bootstrapInitContainerAndVolumes(
470+ " executor" ,
471+ withMaybeShuffleConfigPodBuilder)
472+ }.getOrElse(withMaybeShuffleConfigPodBuilder)
473+
474+ val resolvedExecutorPodBuilder = addNodeAffinityAnnotationIfUseful(
475+ executorInitContainerPodBuilder, nodeToLocalTaskCount)
401476
402477 try {
403- (executorId, kubernetesClient.pods.create(resolvedExecutorPod .build()))
478+ (executorId, kubernetesClient.pods.create(resolvedExecutorPodBuilder .build()))
404479 } catch {
405480 case throwable : Throwable =>
406481 logError(" Failed to allocate executor pod." , throwable)
@@ -521,3 +596,15 @@ private object KubernetesClusterSchedulerBackend {
521596 private val DEFAULT_STATIC_PORT = 10000
522597 private val EXECUTOR_ID_COUNTER = new AtomicLong (0L )
523598}
599+
600+ /**
601+ * These case classes model K8s node affinity syntax for
602+ * preferredDuringSchedulingIgnoredDuringExecution.
603+ * @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node
604+ */
605+ case class SchedulerAffinity (nodeAffinity : NodeAffinity )
606+ case class NodeAffinity (preferredDuringSchedulingIgnoredDuringExecution :
607+ Iterable [WeightedPreference ])
608+ case class WeightedPreference (weight : Int , preference : Preference )
609+ case class Preference (matchExpressions : Array [MatchExpression ])
610+ case class MatchExpression (key : String , operator : String , values : Iterable [String ])
0 commit comments