Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ package object constants {
private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"

// Miscellaneous
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package org.apache.spark.scheduler.cluster.kubernetes

import java.io.Closeable
import java.net.InetAddress
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
Expand Down Expand Up @@ -177,16 +180,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")

private val allocatorRunnable: Runnable = new Runnable {

override def run(): Unit = {
if (totalRegisteredExecutors.get() < runningExecutorPods.size) {
logDebug("Waiting for pending executors before scaling")
} else if (totalExpectedExecutors.get() <= runningExecutorPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
for (i <- 0 until math.min(
totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) {
runningExecutorPods += allocateNewExecutorPod()
runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount)
logInfo(
s"Requesting a new executor, total executors is now ${runningExecutorPods.size}")
}
Expand All @@ -195,6 +200,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}

private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

private def getShuffleClient(): KubernetesExternalShuffleClient = {
new KubernetesExternalShuffleClient(
SparkTransportConf.fromSparkConf(conf, "shuffle"),
Expand Down Expand Up @@ -283,7 +290,70 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}

private def allocateNewExecutorPod(): (String, Pod) = {
/**
* @return A map of K8s cluster nodes to the number of tasks that could benefit from data
* locality if an executor launches on the cluster node.
*/
private def getNodesWithLocalTaskCounts() : Map[String, Int] = {
val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
executorPodsByIPs.values.toList // toList makes a defensive copy.
}
val nodeToLocalTaskCount = mutable.Map[String, Int]() ++
KubernetesClusterSchedulerBackend.this.synchronized {
hostToLocalTaskCount
}
for (pod <- executorPodsWithIPs) {
// Remove cluster nodes that are running our executors already.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is preferring not-previously-used executors always a good idea? It reminds me of the spark.deploy.spreadOut flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html

If you were doing a highly iterative workload that was network bound, it may be better to allow (or even prefer!) scheduling multiple executor pods on the same k8s node so that network is only on the loopback interface rather than actually crossing between servers.

Possibly we should consider adding a flag for this in the future. The typical job I would run would prefer the spread out mode you have here so I'm not in a rush to support the consolidated mode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a great question. Good to learn about the spark.deploy.spreadOut flag. I agree we should consider a flag for this in the future. I'll add a TODO here.

One interesting related fact is that the loop at line 192 - 197 may have the effect of consolidated mode within the next batch of pods, because we are not changing nodes within this loop. But we don't know how much consolidation will be done by the scheduler.

           for (i <- 0 until math.min(
             totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) {
            runningExecutorPods += allocateNewExecutorPod(nodeToTaskCount)
             logInfo(
               s"Requesting a new executor, total executors is now ${runningExecutorPods.size}")
           }
 

// TODO: This prefers spreading out executors across nodes. In case users want
// consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
// flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html
nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty ||
nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty ||
nodeToLocalTaskCount.remove(
InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty
}
nodeToLocalTaskCount.toMap[String, Int]
}

private def addNodeAffinityAnnotationIfUseful(basePodBuilder: PodBuilder,
nodeToTaskCount: Map[String, Int]): PodBuilder = {
def scaleToRange(value: Int, baseMin: Double, baseMax: Double,
rangeMin: Double, rangeMax: Double): Int =
(((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt

if (nodeToTaskCount.nonEmpty) {
val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2)
// Normalize to node affinity weights in 1 to 100 range.
val nodeToWeight = nodeToTaskCount.map{
case (node, taskCount) =>
(node, scaleToRange(taskCount, 1, taskTotal, rangeMin = 1, rangeMax = 100))}
val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys)
// @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node
val nodeAffinityJson = objectMapper.writeValueAsString(SchedulerAffinity(NodeAffinity(
preferredDuringSchedulingIgnoredDuringExecution =
for ((weight, nodes) <- weightToNodes) yield
WeightedPreference(weight,
Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes))))
)))
// TODO: Use non-annotation syntax when we switch to K8s version 1.6.
logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson")
basePodBuilder.editMetadata()
.addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson)
.endMetadata()
} else {
basePodBuilder
}
}

/**
* Allocates a new executor pod
*
* @param nodeToLocalTaskCount A map of K8s cluster nodes to the number of tasks that could
* benefit from data locality if an executor launches on the cluster
* node.
* @return A tuple of the new executor name and the Pod data structure.
*/
private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = {
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
val name = s"$executorPodNamePrefix-exec-$executorId"

Expand Down Expand Up @@ -393,14 +463,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
.endSpec()
}
}.getOrElse(basePodBuilder)
val resolvedExecutorPod = executorInitContainerBootstrap.map { bootstrap =>
bootstrap.bootstrapInitContainerAndVolumes(
"executor",
withMaybeShuffleConfigPodBuilder)
}.getOrElse(withMaybeShuffleConfigPodBuilder)

val executorInitContainerPodBuilder = executorInitContainerBootstrap.map {
bootstrap =>
bootstrap.bootstrapInitContainerAndVolumes(
"executor",
withMaybeShuffleConfigPodBuilder)
}.getOrElse(withMaybeShuffleConfigPodBuilder)

val resolvedExecutorPodBuilder = addNodeAffinityAnnotationIfUseful(
executorInitContainerPodBuilder, nodeToLocalTaskCount)

try {
(executorId, kubernetesClient.pods.create(resolvedExecutorPod.build()))
(executorId, kubernetesClient.pods.create(resolvedExecutorPodBuilder.build()))
} catch {
case throwable: Throwable =>
logError("Failed to allocate executor pod.", throwable)
Expand Down Expand Up @@ -521,3 +596,15 @@ private object KubernetesClusterSchedulerBackend {
private val DEFAULT_STATIC_PORT = 10000
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
}

/**
* These case classes model K8s node affinity syntax for
* preferredDuringSchedulingIgnoredDuringExecution.
* @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node
*/
case class SchedulerAffinity(nodeAffinity: NodeAffinity)
case class NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution:
Iterable[WeightedPreference])
case class WeightedPreference(weight: Int, preference: Preference)
case class Preference(matchExpressions: Array[MatchExpression])
case class MatchExpression(key: String, operator: String, values: Iterable[String])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a k8s spec that these are following? if so please add a comment above these case classes with a link to k8s docs on what they mean

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.