From 509bf737dac12689624a3e25ab4f73b09a851f52 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Fri, 11 Nov 2016 17:10:25 -0700 Subject: [PATCH 1/3] Add support for dynamic executors --- .../KubernetesClusterScheduler.scala | 35 +++++--- .../KubernetesClusterSchedulerBackend.scala | 86 ++++++++++++++----- 2 files changed, 87 insertions(+), 34 deletions(-) diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala index c169f88e77d02..a677d8725f9f5 100644 --- a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala @@ -80,14 +80,32 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) // This is the URL of the client jar. val clientJarUri = args.userJar - conf.setExecutorEnv("spark.executor.jar", clientJarUri) - conf.setExecutorEnv("spark.kubernetes.namespace", nameSpace) - conf.setExecutorEnv("spark.kubernetes.driver.image", sparkDriverImage) // This is the kubernetes master we're launching on. val kubernetesHost = "k8s://" + client.getMasterUrl().getHost() logInfo("Using as kubernetes-master: " + kubernetesHost.toString()) + val submitArgs = scala.collection.mutable.ArrayBuffer.empty[String] + submitArgs ++= Vector( + clientJarUri, + s"--class=${args.userClass}", + s"--master=$kubernetesHost", + s"--executor-memory=${driverDescription.mem}", + s"--conf=spark.executor.jar=$clientJarUri", + s"--conf=spark.executor.instances=$instances", + s"--conf=spark.kubernetes.namespace=$nameSpace", + s"--conf=spark.kubernetes.driver.image=$sparkDriverImage") + + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + submitArgs ++= Vector( + "--conf spark.dynamicAllocation.enabled=true", + "--conf spark.shuffle.service.enabled=true") + } + + // these have to come at end of arg list + submitArgs ++= Vector("/opt/spark/kubernetes/client.jar", + args.userArgs.mkString(" ")) + val labelMap = Map("type" -> "spark-driver") val pod = new PodBuilder() .withNewMetadata() @@ -102,16 +120,7 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) .withImage(sparkDriverImage) .withImagePullPolicy("Always") .withCommand(s"/opt/driver.sh") - .withArgs(s"$clientJarUri", - s"--class=${args.userClass}", - s"--master=$kubernetesHost", - s"--executor-memory=${driverDescription.mem}", - s"--conf=spark.executor.jar=$clientJarUri", - s"--conf=spark.executor.instances=$instances", - s"--conf=spark.kubernetes.namespace=$nameSpace", - s"--conf=spark.kubernetes.driver.image=$sparkDriverImage", - "/opt/spark/kubernetes/client.jar", - args.userArgs.mkString(" ")) + .withArgs(submitArgs :_*) .endContainer() .endSpec() .build() diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index b5e312824f777..3c0c3e2d6d45d 100644 --- a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils import scala.collection.mutable import scala.util.Random +import scala.concurrent.Future private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -40,41 +41,73 @@ private[spark] class KubernetesClusterSchedulerBackend( val DEFAULT_NUMBER_EXECUTORS = 2 val sparkExecutorName = s"spark-executor-${Random.alphanumeric take 5 mkString("")}".toLowerCase() - var executorPods = mutable.ArrayBuffer[String]() + + var executorPods = mutable.ArrayBuffer.empty[String] + var executorID = 0 val sparkDriverImage = sc.getConf.get("spark.kubernetes.driver.image") val clientJarUri = sc.getConf.get("spark.executor.jar") val ns = sc.getConf.get("spark.kubernetes.namespace") + val dynamicExecutors = Utils.isDynamicAllocationEnabled(sc.getConf) + + // executor back-ends take their configuration this way + if (dynamicExecutors) { + sc.getConf.setExecutorEnv("spark.dynamicAllocation.enabled", "true") + sc.getConf.setExecutorEnv("spark.shuffle.service.enabled", "true") + } override def start() { super.start() - var i = 0 - for(i <- 1 to getInitialTargetExecutorNumber(sc.conf)){ - executorPods += createExecutorPod(i) - } - None + createExecutorPods(getInitialTargetExecutorNumber(sc.getConf)) } override def stop(): Unit = { - for (i <- 0 to executorPods.length) { - client.pods().inNamespace(ns).withName(executorPods(i)).delete() - } + deleteExecutorPods(executorPods) super.stop() } // Dynamic allocation interfaces - override def doRequestTotalExecutors(requestedTotal: Int): scala.concurrent.Future[Boolean] = { - return super.doRequestTotalExecutors(requestedTotal) + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { + logInfo(s"Received doRequestTotalExecutors: $requestedTotal") + val delta = requestedTotal - executorPods.length + if (delta > 0) { + logInfo(s"Adding $delta new executor Pods") + createExecutorPods(delta) + } else if (delta < 0) { + logInfo(s"Deleting ${-delta} new executor Pods") + // TODO: What is an informed way to kill executors with the least (or zero) load? + val killList = executorPods.slice(executorPods.length + delta, executorPods.length) + executorPods = executorPods.take(executorPods.length + delta) + deleteExecutorPods(killList) + } + // TODO: are there meaningful failure modes here? + Future.successful(true) + } + + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { + // I don't see doKillExecutors being called in the kube context, so I'm leaving it + // stubbed out with an error message + logError(s"""UNIMPLEMENTED: doKillExecutors: ${executorIds.mkString(",")}""") + Future.successful(false) + } + + private def createExecutorPods(n: Int) { + for (i <- 1 to n) { + executorID += 1 + executorPods += createExecutorPod(executorID) + } } - override def doKillExecutors(executorIds: Seq[String]): scala.concurrent.Future[Boolean] = { - return super.doKillExecutors(executorIds) + private def deleteExecutorPods(podNames: Seq[String]) { + for (pod <- podNames) { + client.pods().inNamespace(ns).withName(pod).delete() + } } def getInitialTargetExecutorNumber(conf: SparkConf, numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { - if (Utils.isDynamicAllocationEnabled(conf)) { + if (dynamicExecutors) { val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) @@ -99,6 +132,21 @@ private[spark] class KubernetesClusterSchedulerBackend( // create a single k8s executor pod. val labelMap = Map("type" -> "spark-executor") val podName = s"$sparkExecutorName-$executorNum" + + val submitArgs = mutable.ArrayBuffer.empty[String] + + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + submitArgs ++= Vector( + "dynamic-executors") + } + + submitArgs ++= Vector("org.apache.spark.executor.CoarseGrainedExecutorBackend", + "--driver-url", s"$driverURL", + "--executor-id", s"$executorNum", + "--hostname", "localhost", + "--app-id", "1", // TODO: change app-id per application and pass from driver. + "--cores", "1") + var pod = new PodBuilder() .withNewMetadata() .withLabels(labelMap.asJava) @@ -110,17 +158,13 @@ private[spark] class KubernetesClusterSchedulerBackend( .addNewContainer().withName("spark-executor").withImage(sparkDriverImage) .withImagePullPolicy("IfNotPresent") .withCommand("/opt/executor.sh") - .withArgs("org.apache.spark.executor.CoarseGrainedExecutorBackend", - "--driver-url", s"$driverURL", - "--executor-id", s"$executorNum", - "--hostname", "localhost", - "--cores", "1", - "--app-id", "1") //TODO: change app-id per application and pass from driver. + .withArgs(submitArgs :_*) .endContainer() .endSpec().build() client.pods().inNamespace(ns).withName(podName).create(pod) - return podName + + podName } protected def driverURL: String = { From 7ec754680020269b0af7605ddf804e33a8edad55 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Sat, 12 Nov 2016 10:39:36 -0700 Subject: [PATCH 2/3] fill in some sane logic for doKillExecutors --- .../KubernetesClusterSchedulerBackend.scala | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 3c0c3e2d6d45d..6440086c8d70e 100644 --- a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -42,7 +42,8 @@ private[spark] class KubernetesClusterSchedulerBackend( val DEFAULT_NUMBER_EXECUTORS = 2 val sparkExecutorName = s"spark-executor-${Random.alphanumeric take 5 mkString("")}".toLowerCase() - var executorPods = mutable.ArrayBuffer.empty[String] + // key is executor id, value is pod name + var executorToPod = mutable.Map.empty[String, String] var executorID = 0 val sparkDriverImage = sc.getConf.get("spark.kubernetes.driver.image") @@ -56,51 +57,55 @@ private[spark] class KubernetesClusterSchedulerBackend( sc.getConf.setExecutorEnv("spark.shuffle.service.enabled", "true") } - override def start() { + override def start(): Unit = { super.start() createExecutorPods(getInitialTargetExecutorNumber(sc.getConf)) } override def stop(): Unit = { - deleteExecutorPods(executorPods) + killExecutorPods(executorToPod.toVector) super.stop() } // Dynamic allocation interfaces override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { logInfo(s"Received doRequestTotalExecutors: $requestedTotal") - val delta = requestedTotal - executorPods.length + val n = executorToPod.size + val delta = requestedTotal - n if (delta > 0) { logInfo(s"Adding $delta new executor Pods") createExecutorPods(delta) } else if (delta < 0) { logInfo(s"Deleting ${-delta} new executor Pods") // TODO: What is an informed way to kill executors with the least (or zero) load? - val killList = executorPods.slice(executorPods.length + delta, executorPods.length) - executorPods = executorPods.take(executorPods.length + delta) - deleteExecutorPods(killList) + val kill = executorToPod.toVector.slice(n + delta, n) + killExecutorPods(kill) } // TODO: are there meaningful failure modes here? Future.successful(true) } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { - // I don't see doKillExecutors being called in the kube context, so I'm leaving it - // stubbed out with an error message - logError(s"""UNIMPLEMENTED: doKillExecutors: ${executorIds.mkString(",")}""") - Future.successful(false) + logInfo(s"doKillExecutors") + killExecutorPods(executorIds.map { id => (id, executorToPod(id)) }) + // TODO: send shutdown message? take off active list? put onto waiting que and kill + // pods after graceful shutdown? + Future.successful(true) } private def createExecutorPods(n: Int) { for (i <- 1 to n) { executorID += 1 - executorPods += createExecutorPod(executorID) + executorToPod += ((executorID.toString, createExecutorPod(executorID))) } } - private def deleteExecutorPods(podNames: Seq[String]) { - for (pod <- podNames) { - client.pods().inNamespace(ns).withName(pod).delete() + private def killExecutorPods(idPodPairs: Seq[(String, String)]) { + for (kv <- idPodPairs) { + executorToPod -= kv._1 + client.pods().inNamespace(ns).withName(kv._2).delete() + // TODO: send message to take off the active list? + // send shutdown message to executor back-end? } } From 8243127100909ccfa29dc9ca493f68f04946c609 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Mon, 14 Nov 2016 15:15:19 -0700 Subject: [PATCH 3/3] doRequestTotalExecutors signals graceful executor shutdown, and favors idle executors --- .../KubernetesClusterSchedulerBackend.scala | 63 ++++++++++++++----- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 6440086c8d70e..8d4f67a173071 100644 --- a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -22,7 +22,9 @@ import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.api.model.extensions.JobBuilder import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} import org.apache.spark.internal.config._ +import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.TaskSchedulerImpl @@ -42,8 +44,10 @@ private[spark] class KubernetesClusterSchedulerBackend( val DEFAULT_NUMBER_EXECUTORS = 2 val sparkExecutorName = s"spark-executor-${Random.alphanumeric take 5 mkString("")}".toLowerCase() + // TODO: do these need mutex guarding? // key is executor id, value is pod name - var executorToPod = mutable.Map.empty[String, String] + var executorToPod = mutable.Map.empty[String, String] // active executors + var shutdownToPod = mutable.Map.empty[String, String] // pending shutdown var executorID = 0 val sparkDriverImage = sc.getConf.get("spark.kubernetes.driver.image") @@ -63,7 +67,9 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def stop(): Unit = { + // Kill all executor pods indiscriminately killExecutorPods(executorToPod.toVector) + killExecutorPods(shutdownToPod.toVector) super.stop() } @@ -73,23 +79,28 @@ private[spark] class KubernetesClusterSchedulerBackend( val n = executorToPod.size val delta = requestedTotal - n if (delta > 0) { - logInfo(s"Adding $delta new executor Pods") + logInfo(s"Adding $delta new executors") createExecutorPods(delta) } else if (delta < 0) { - logInfo(s"Deleting ${-delta} new executor Pods") - // TODO: What is an informed way to kill executors with the least (or zero) load? - val kill = executorToPod.toVector.slice(n + delta, n) - killExecutorPods(kill) + val d = -delta + val idle = executorToPod.toVector.filter { case (id, _) => !scheduler.isExecutorBusy(id) } + if (idle.length > 0) { + logInfo(s"Shutting down ${idle.length} idle executors") + shutdownExecutors(idle.take(d)) + } + val r = math.max(0, d - idle.length) + if (r > 0) { + logInfo(s"Shutting down $r non-idle executors") + shutdownExecutors(executorToPod.toVector.slice(n - r, n)) + } } // TODO: are there meaningful failure modes here? Future.successful(true) } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { - logInfo(s"doKillExecutors") + logInfo(s"Received doKillExecutors") killExecutorPods(executorIds.map { id => (id, executorToPod(id)) }) - // TODO: send shutdown message? take off active list? put onto waiting que and kill - // pods after graceful shutdown? Future.successful(true) } @@ -100,12 +111,36 @@ private[spark] class KubernetesClusterSchedulerBackend( } } + def shutdownExecutors(idPodPairs: Seq[(String, String)]) { + val active = getExecutorIds.toSet + + // Check for any finished shutting down and kill the pods + val shutdown = shutdownToPod.toVector.filter { case (e, _) => !active.contains(e) } + killExecutorPods(shutdown) + + // Now request shutdown for the new ones. + // Move them from executor list to list pending shutdown + for ((id, pod) <- idPodPairs) { + try { + // TODO: 'ask' returns a future - can it be used to check eventual success? + Option(driverEndpoint).foreach(_.ask[Boolean](RemoveExecutor(id, ExecutorKilled))) + executorToPod -= id + shutdownToPod += ((id, pod)) + } catch { + case e: Exception => logError(s"Error shutting down executor $id", e) + } + } + } + private def killExecutorPods(idPodPairs: Seq[(String, String)]) { - for (kv <- idPodPairs) { - executorToPod -= kv._1 - client.pods().inNamespace(ns).withName(kv._2).delete() - // TODO: send message to take off the active list? - // send shutdown message to executor back-end? + for ((id, pod) <- idPodPairs) { + try { + client.pods().inNamespace(ns).withName(pod).delete() + executorToPod -= id + shutdownToPod -= id + } catch { + case e: Exception => logError(s"Error killing executor pod $pod", e) + } } }