diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala index c169f88e77d02..a677d8725f9f5 100644 --- a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala @@ -80,14 +80,32 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) // This is the URL of the client jar. val clientJarUri = args.userJar - conf.setExecutorEnv("spark.executor.jar", clientJarUri) - conf.setExecutorEnv("spark.kubernetes.namespace", nameSpace) - conf.setExecutorEnv("spark.kubernetes.driver.image", sparkDriverImage) // This is the kubernetes master we're launching on. val kubernetesHost = "k8s://" + client.getMasterUrl().getHost() logInfo("Using as kubernetes-master: " + kubernetesHost.toString()) + val submitArgs = scala.collection.mutable.ArrayBuffer.empty[String] + submitArgs ++= Vector( + clientJarUri, + s"--class=${args.userClass}", + s"--master=$kubernetesHost", + s"--executor-memory=${driverDescription.mem}", + s"--conf=spark.executor.jar=$clientJarUri", + s"--conf=spark.executor.instances=$instances", + s"--conf=spark.kubernetes.namespace=$nameSpace", + s"--conf=spark.kubernetes.driver.image=$sparkDriverImage") + + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + submitArgs ++= Vector( + "--conf spark.dynamicAllocation.enabled=true", + "--conf spark.shuffle.service.enabled=true") + } + + // these have to come at end of arg list + submitArgs ++= Vector("/opt/spark/kubernetes/client.jar", + args.userArgs.mkString(" ")) + val labelMap = Map("type" -> "spark-driver") val pod = new PodBuilder() .withNewMetadata() @@ -102,16 +120,7 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) .withImage(sparkDriverImage) .withImagePullPolicy("Always") .withCommand(s"/opt/driver.sh") - .withArgs(s"$clientJarUri", - s"--class=${args.userClass}", - s"--master=$kubernetesHost", - s"--executor-memory=${driverDescription.mem}", - s"--conf=spark.executor.jar=$clientJarUri", - s"--conf=spark.executor.instances=$instances", - s"--conf=spark.kubernetes.namespace=$nameSpace", - s"--conf=spark.kubernetes.driver.image=$sparkDriverImage", - "/opt/spark/kubernetes/client.jar", - args.userArgs.mkString(" ")) + .withArgs(submitArgs :_*) .endContainer() .endSpec() .build() diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index b5e312824f777..8d4f67a173071 100644 --- a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -22,7 +22,9 @@ import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.api.model.extensions.JobBuilder import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} import org.apache.spark.internal.config._ +import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.TaskSchedulerImpl @@ -30,6 +32,7 @@ import org.apache.spark.util.Utils import scala.collection.mutable import scala.util.Random +import scala.concurrent.Future private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -40,41 +43,111 @@ private[spark] class KubernetesClusterSchedulerBackend( val DEFAULT_NUMBER_EXECUTORS = 2 val sparkExecutorName = s"spark-executor-${Random.alphanumeric take 5 mkString("")}".toLowerCase() - var executorPods = mutable.ArrayBuffer[String]() + + // TODO: do these need mutex guarding? + // key is executor id, value is pod name + var executorToPod = mutable.Map.empty[String, String] // active executors + var shutdownToPod = mutable.Map.empty[String, String] // pending shutdown + var executorID = 0 val sparkDriverImage = sc.getConf.get("spark.kubernetes.driver.image") val clientJarUri = sc.getConf.get("spark.executor.jar") val ns = sc.getConf.get("spark.kubernetes.namespace") + val dynamicExecutors = Utils.isDynamicAllocationEnabled(sc.getConf) + + // executor back-ends take their configuration this way + if (dynamicExecutors) { + sc.getConf.setExecutorEnv("spark.dynamicAllocation.enabled", "true") + sc.getConf.setExecutorEnv("spark.shuffle.service.enabled", "true") + } - override def start() { + override def start(): Unit = { super.start() - var i = 0 - for(i <- 1 to getInitialTargetExecutorNumber(sc.conf)){ - executorPods += createExecutorPod(i) - } - None + createExecutorPods(getInitialTargetExecutorNumber(sc.getConf)) } override def stop(): Unit = { - for (i <- 0 to executorPods.length) { - client.pods().inNamespace(ns).withName(executorPods(i)).delete() - } + // Kill all executor pods indiscriminately + killExecutorPods(executorToPod.toVector) + killExecutorPods(shutdownToPod.toVector) super.stop() } // Dynamic allocation interfaces - override def doRequestTotalExecutors(requestedTotal: Int): scala.concurrent.Future[Boolean] = { - return super.doRequestTotalExecutors(requestedTotal) + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { + logInfo(s"Received doRequestTotalExecutors: $requestedTotal") + val n = executorToPod.size + val delta = requestedTotal - n + if (delta > 0) { + logInfo(s"Adding $delta new executors") + createExecutorPods(delta) + } else if (delta < 0) { + val d = -delta + val idle = executorToPod.toVector.filter { case (id, _) => !scheduler.isExecutorBusy(id) } + if (idle.length > 0) { + logInfo(s"Shutting down ${idle.length} idle executors") + shutdownExecutors(idle.take(d)) + } + val r = math.max(0, d - idle.length) + if (r > 0) { + logInfo(s"Shutting down $r non-idle executors") + shutdownExecutors(executorToPod.toVector.slice(n - r, n)) + } + } + // TODO: are there meaningful failure modes here? + Future.successful(true) } - override def doKillExecutors(executorIds: Seq[String]): scala.concurrent.Future[Boolean] = { - return super.doKillExecutors(executorIds) + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { + logInfo(s"Received doKillExecutors") + killExecutorPods(executorIds.map { id => (id, executorToPod(id)) }) + Future.successful(true) + } + + private def createExecutorPods(n: Int) { + for (i <- 1 to n) { + executorID += 1 + executorToPod += ((executorID.toString, createExecutorPod(executorID))) + } + } + + def shutdownExecutors(idPodPairs: Seq[(String, String)]) { + val active = getExecutorIds.toSet + + // Check for any finished shutting down and kill the pods + val shutdown = shutdownToPod.toVector.filter { case (e, _) => !active.contains(e) } + killExecutorPods(shutdown) + + // Now request shutdown for the new ones. + // Move them from executor list to list pending shutdown + for ((id, pod) <- idPodPairs) { + try { + // TODO: 'ask' returns a future - can it be used to check eventual success? + Option(driverEndpoint).foreach(_.ask[Boolean](RemoveExecutor(id, ExecutorKilled))) + executorToPod -= id + shutdownToPod += ((id, pod)) + } catch { + case e: Exception => logError(s"Error shutting down executor $id", e) + } + } + } + + private def killExecutorPods(idPodPairs: Seq[(String, String)]) { + for ((id, pod) <- idPodPairs) { + try { + client.pods().inNamespace(ns).withName(pod).delete() + executorToPod -= id + shutdownToPod -= id + } catch { + case e: Exception => logError(s"Error killing executor pod $pod", e) + } + } } def getInitialTargetExecutorNumber(conf: SparkConf, numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { - if (Utils.isDynamicAllocationEnabled(conf)) { + if (dynamicExecutors) { val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) @@ -99,6 +172,21 @@ private[spark] class KubernetesClusterSchedulerBackend( // create a single k8s executor pod. val labelMap = Map("type" -> "spark-executor") val podName = s"$sparkExecutorName-$executorNum" + + val submitArgs = mutable.ArrayBuffer.empty[String] + + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + submitArgs ++= Vector( + "dynamic-executors") + } + + submitArgs ++= Vector("org.apache.spark.executor.CoarseGrainedExecutorBackend", + "--driver-url", s"$driverURL", + "--executor-id", s"$executorNum", + "--hostname", "localhost", + "--app-id", "1", // TODO: change app-id per application and pass from driver. + "--cores", "1") + var pod = new PodBuilder() .withNewMetadata() .withLabels(labelMap.asJava) @@ -110,17 +198,13 @@ private[spark] class KubernetesClusterSchedulerBackend( .addNewContainer().withName("spark-executor").withImage(sparkDriverImage) .withImagePullPolicy("IfNotPresent") .withCommand("/opt/executor.sh") - .withArgs("org.apache.spark.executor.CoarseGrainedExecutorBackend", - "--driver-url", s"$driverURL", - "--executor-id", s"$executorNum", - "--hostname", "localhost", - "--cores", "1", - "--app-id", "1") //TODO: change app-id per application and pass from driver. + .withArgs(submitArgs :_*) .endContainer() .endSpec().build() client.pods().inNamespace(ns).withName(podName).create(pod) - return podName + + podName } protected def driverURL: String = {