From 200ce24f1ae2949ba18ff6facb46171ae78f5eee Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 4 Apr 2017 16:02:56 -0700 Subject: [PATCH 01/13] Dispatch tasks to right executors that have tasks' input HDFS data on local disks --- .../spark/scheduler/TaskSetManager.scala | 2 +- .../kubernetes/KubernetesClusterManager.scala | 52 ++++++++++++++-- .../KubernetesClusterSchedulerBackend.scala | 60 +++++++++++++++++-- 3 files changed, 105 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b766e4148e496..30df8862c3589 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -221,7 +221,7 @@ private[spark] class TaskSetManager( * Return the pending tasks list for a given host, or an empty list if * there is no map entry for that host */ - private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { + protected def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { pendingTasksForHost.getOrElse(host, ArrayBuffer()) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 36f7149a832c3..16fcf1c029547 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -16,27 +16,71 @@ */ package org.apache.spark.scheduler.cluster.kubernetes +import java.util.concurrent.atomic.AtomicReference + import org.apache.spark.SparkContext -import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} +import org.apache.spark.scheduler._ + +import scala.collection.mutable.ArrayBuffer private[spark] class KubernetesClusterManager extends ExternalClusterManager { + private val EMPTY_TASKS = new ArrayBuffer[Int]() + private var clusterSchedulerBackend : AtomicReference[KubernetesClusterSchedulerBackend] = + new AtomicReference() + override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - val scheduler = new TaskSchedulerImpl(sc) + val scheduler = new TaskSchedulerImpl(sc) { + + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + new TaskSetManager(sched = this, taskSet, maxTaskFailures) { + + // Returns preferred tasks for an executor that may have local data there, + // using the physical cluster node name that it is running on. + override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { + var pendingTasks = super.getPendingTasksForHost(executorIP) + if (pendingTasks.nonEmpty) { + return pendingTasks + } + val backend = clusterSchedulerBackend.get + if (backend == null) { + return EMPTY_TASKS + } + val pod = backend.getClusterNodeForExecutorIP(executorIP) + if (pod.isEmpty) { + return EMPTY_TASKS + } + val clusterNodeName = pod.get.getSpec.getNodeName + val clusterNodeIP = pod.get.getStatus.getHostIP + pendingTasks = super.getPendingTasksForHost(pod.get.getSpec.getNodeName) + if (pendingTasks.isEmpty) { + pendingTasks = super.getPendingTasksForHost(pod.get.getStatus.getHostIP) + } + if (pendingTasks.nonEmpty) { + logInfo(s"Got preferred task list $pendingTasks for executor host $executorIP " + + s"using cluster node $clusterNodeName at $clusterNodeIP") + } + pendingTasks + } + } + } + } sc.taskScheduler = scheduler scheduler } override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler) : SchedulerBackend = { - new KubernetesClusterSchedulerBackend(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc) + val schedulerBackend = new KubernetesClusterSchedulerBackend( + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc) + this.clusterSchedulerBackend.set(schedulerBackend) + schedulerBackend } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } - } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 234829a541c30..19eaefe5b1076 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,12 +16,16 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import java.io.Closeable +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} - import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder import org.apache.spark.deploy.kubernetes.config._ @@ -40,6 +44,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val EXECUTOR_MODIFICATION_LOCK = new Object private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] + private val executorWatchResources = new scala.collection.mutable.HashMap[String, Closeable] + private val executorPodIPs = new scala.collection.mutable.HashMap[String, Pod] private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) @@ -135,6 +141,8 @@ private[spark] class KubernetesClusterSchedulerBackend( // indication as to why. try { runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + executorWatchResources.values.foreach(_.close) + executorPodIPs.clear() } catch { case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) } @@ -144,6 +152,7 @@ private[spark] class KubernetesClusterSchedulerBackend( case e: Throwable => logError("Uncaught exception while shutting down driver service.", e) } try { + logInfo("Closing kubernetes client") kubernetesClient.close() } catch { case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) @@ -171,7 +180,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores) .build() - val requiredEnv = Seq( + var requiredEnv = Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), (ENV_EXECUTOR_CORES, executorCores), @@ -236,7 +245,16 @@ private[spark] class KubernetesClusterSchedulerBackend( + s" additional executors, expecting total $requestedTotal and currently" + s" expected ${totalExpectedExecutors.get}") for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { - runningExecutorPods += allocateNewExecutorPod() + val (executorId, pod) = allocateNewExecutorPod() + logInfo(s"Allocated executor $executorId") + runningExecutorPods += ((executorId, pod)) + val podReadyFuture = SettableFuture.create[Pod] + val podWatcher = new ExecutorPodReadyWatcher(podReadyFuture) + val watchConnectionManager = kubernetesClient + .pods() + .withName(pod.getMetadata.getName) + .watch(podWatcher) + executorWatchResources += ((executorId, watchConnectionManager)) } } totalExpectedExecutors.set(requestedTotal) @@ -249,12 +267,46 @@ private[spark] class KubernetesClusterSchedulerBackend( for (executor <- executorIds) { runningExecutorPods.remove(executor) match { case Some(pod) => kubernetesClient.pods().delete(pod) + executorWatchResources.remove(executor).foreach(_.close) + executorPodIPs.remove(pod.getStatus.getPodIP) case None => logWarning(s"Unable to remove pod for unknown executor $executor") } } } true } + + def getClusterNodeForExecutorIP(podIP: String): Option[Pod] = { + EXECUTOR_MODIFICATION_LOCK.synchronized { + executorPodIPs.get(podIP) + } + } + + private class ExecutorPodReadyWatcher(resolvedExecutorPod: SettableFuture[Pod]) + extends Watcher[Pod] { + + override def eventReceived(action: Action, pod: Pod): Unit = { + if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" + && !resolvedExecutorPod.isDone) { + pod.getStatus + .getContainerStatuses + .asScala + .find(status => status.getReady) + .foreach { _ => resolvedExecutorPod.set(pod) } + val podName = pod.getMetadata.getName + val podIP = pod.getStatus.getPodIP + val clusterNodeName = pod.getSpec.getNodeName + logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") + EXECUTOR_MODIFICATION_LOCK.synchronized { + executorPodIPs += ((podIP, pod)) + } + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logDebug("Executor pod readiness watch closed.", cause) + } + } } private object KubernetesClusterSchedulerBackend { From 7499e3b89a70057c96c752e1abf7b89e257d15d9 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 4 Apr 2017 17:06:08 -0700 Subject: [PATCH 02/13] Fix style issues --- .../cluster/kubernetes/KubernetesClusterManager.scala | 3 ++- .../cluster/kubernetes/KubernetesClusterSchedulerBackend.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 16fcf1c029547..4d592f758bc47 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -19,7 +19,8 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.util.concurrent.atomic.AtomicReference import org.apache.spark.SparkContext -import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, + TaskScheduler, TaskSchedulerImpl, TaskSet, TaskSetManager} import scala.collection.mutable.ArrayBuffer diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 19eaefe5b1076..7b369e3754135 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -180,7 +180,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores) .build() - var requiredEnv = Seq( + val requiredEnv = Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), (ENV_EXECUTOR_CORES, executorCores), From 66e79d6014933d8a40d12b606aeff0f33aaba4eb Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 4 Apr 2017 18:31:01 -0700 Subject: [PATCH 03/13] Clean up unnecessary fields --- .../kubernetes/KubernetesClusterManager.scala | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 4d592f758bc47..665e407ddd9f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -16,20 +16,14 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.concurrent.atomic.AtomicReference +import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkContext import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl, TaskSet, TaskSetManager} -import scala.collection.mutable.ArrayBuffer - private[spark] class KubernetesClusterManager extends ExternalClusterManager { - private val EMPTY_TASKS = new ArrayBuffer[Int]() - private var clusterSchedulerBackend : AtomicReference[KubernetesClusterSchedulerBackend] = - new AtomicReference() - override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { @@ -45,13 +39,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { if (pendingTasks.nonEmpty) { return pendingTasks } - val backend = clusterSchedulerBackend.get - if (backend == null) { - return EMPTY_TASKS - } + val backend = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].backend.asInstanceOf[ + KubernetesClusterSchedulerBackend] val pod = backend.getClusterNodeForExecutorIP(executorIP) if (pod.isEmpty) { - return EMPTY_TASKS + return pendingTasks // Empty } val clusterNodeName = pod.get.getSpec.getNodeName val clusterNodeIP = pod.get.getStatus.getHostIP @@ -74,10 +66,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler) : SchedulerBackend = { - val schedulerBackend = new KubernetesClusterSchedulerBackend( - sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc) - this.clusterSchedulerBackend.set(schedulerBackend) - schedulerBackend + new KubernetesClusterSchedulerBackend(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc) } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { From 46f1140d475a967415a2a63d8a3a4a9d8f68ae13 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 4 Apr 2017 19:02:30 -0700 Subject: [PATCH 04/13] Clean up a misleading method name --- .../kubernetes/KubernetesClusterManager.scala | 2 +- .../KubernetesClusterSchedulerBackend.scala | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 665e407ddd9f4..c214f04edac72 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -41,7 +41,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { } val backend = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].backend.asInstanceOf[ KubernetesClusterSchedulerBackend] - val pod = backend.getClusterNodeForExecutorIP(executorIP) + val pod = backend.getExecutorPodByIP(executorIP) if (pod.isEmpty) { return pendingTasks // Empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 7b369e3754135..b168a1735f83f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder @@ -43,9 +44,9 @@ private[spark] class KubernetesClusterSchedulerBackend( import KubernetesClusterSchedulerBackend._ private val EXECUTOR_MODIFICATION_LOCK = new Object - private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] - private val executorWatchResources = new scala.collection.mutable.HashMap[String, Closeable] - private val executorPodIPs = new scala.collection.mutable.HashMap[String, Pod] + private val runningExecutorPods = new HashMap[String, Pod] // Indexed by IDs. + private val executorWatchResources = new HashMap[String, Closeable] // Indexed by IDs. + private val executorPodsByIPs = new HashMap[String, Pod] // Indexed by IP addrs. private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) @@ -142,7 +143,7 @@ private[spark] class KubernetesClusterSchedulerBackend( try { runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) executorWatchResources.values.foreach(_.close) - executorPodIPs.clear() + executorPodsByIPs.clear() } catch { case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) } @@ -268,7 +269,7 @@ private[spark] class KubernetesClusterSchedulerBackend( runningExecutorPods.remove(executor) match { case Some(pod) => kubernetesClient.pods().delete(pod) executorWatchResources.remove(executor).foreach(_.close) - executorPodIPs.remove(pod.getStatus.getPodIP) + executorPodsByIPs.remove(pod.getStatus.getPodIP) case None => logWarning(s"Unable to remove pod for unknown executor $executor") } } @@ -276,9 +277,9 @@ private[spark] class KubernetesClusterSchedulerBackend( true } - def getClusterNodeForExecutorIP(podIP: String): Option[Pod] = { + def getExecutorPodByIP(podIP: String): Option[Pod] = { EXECUTOR_MODIFICATION_LOCK.synchronized { - executorPodIPs.get(podIP) + executorPodsByIPs.get(podIP) } } @@ -298,7 +299,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val clusterNodeName = pod.getSpec.getNodeName logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") EXECUTOR_MODIFICATION_LOCK.synchronized { - executorPodIPs += ((podIP, pod)) + executorPodsByIPs += ((podIP, pod)) } } } From 23d287f9a263814f91dd8567633b0a04f9f86ecb Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Wed, 5 Apr 2017 16:09:18 -0700 Subject: [PATCH 05/13] Address review comments --- .../kubernetes/KubernetesClusterManager.scala | 17 ++++--- .../KubernetesClusterSchedulerBackend.scala | 46 ++++++++----------- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index c214f04edac72..1351c8e93bd11 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -32,8 +32,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new TaskSetManager(sched = this, taskSet, maxTaskFailures) { - // Returns preferred tasks for an executor that may have local data there, - // using the physical cluster node name that it is running on. + /** + * Overrides the lookup to use not only the executor pod IP, but also the cluster node + * name and host IP address that the pod is running on. The base class may have populated + * the lookup target map with HDFS datanode locations if this task set reads HDFS data. + * Those datanode locations are based on cluster node names or host IP addresses. Using + * only executor pod IPs may not match them. + */ override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { var pendingTasks = super.getPendingTasksForHost(executorIP) if (pendingTasks.nonEmpty) { @@ -47,12 +52,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { } val clusterNodeName = pod.get.getSpec.getNodeName val clusterNodeIP = pod.get.getStatus.getHostIP - pendingTasks = super.getPendingTasksForHost(pod.get.getSpec.getNodeName) + pendingTasks = super.getPendingTasksForHost(clusterNodeName) if (pendingTasks.isEmpty) { - pendingTasks = super.getPendingTasksForHost(pod.get.getStatus.getHostIP) + pendingTasks = super.getPendingTasksForHost(clusterNodeIP) } - if (pendingTasks.nonEmpty) { - logInfo(s"Got preferred task list $pendingTasks for executor host $executorIP " + + if (pendingTasks.nonEmpty && log.isDebugEnabled) { + logDebug(s"Got preferred task list $pendingTasks for executor host $executorIP " + s"using cluster node $clusterNodeName at $clusterNodeIP") } pendingTasks diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index b168a1735f83f..0e150ac5ccaf1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -21,11 +21,11 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} -import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder @@ -36,6 +36,7 @@ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} + private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, val sc: SparkContext) @@ -44,9 +45,8 @@ private[spark] class KubernetesClusterSchedulerBackend( import KubernetesClusterSchedulerBackend._ private val EXECUTOR_MODIFICATION_LOCK = new Object - private val runningExecutorPods = new HashMap[String, Pod] // Indexed by IDs. - private val executorWatchResources = new HashMap[String, Closeable] // Indexed by IDs. - private val executorPodsByIPs = new HashMap[String, Pod] // Indexed by IP addrs. + private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. + private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) @@ -99,6 +99,7 @@ private[spark] class KubernetesClusterSchedulerBackend( super.minRegisteredRatio } + private val executorWatchResource = new AtomicReference[Closeable] protected var totalExpectedExecutors = new AtomicInteger(0) private val driverUrl = RpcEndpointAddress( @@ -131,6 +132,8 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() + executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) + .watch(new ExecutorPodsReadyWatcher())) if (!Utils.isDynamicAllocationEnabled(sc.conf)) { doRequestTotalExecutors(initialExecutors) } @@ -141,9 +144,14 @@ private[spark] class KubernetesClusterSchedulerBackend( // When using Utils.tryLogNonFatalError some of the code fails but without any logs or // indication as to why. try { - runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) - executorWatchResources.values.foreach(_.close) - executorPodsByIPs.clear() + EXECUTOR_MODIFICATION_LOCK.synchronized { + runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + executorPodsByIPs.clear() + } + val resource = executorWatchResource.getAndSet(null) + if (resource != null) { + resource.close() + } } catch { case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) } @@ -249,13 +257,6 @@ private[spark] class KubernetesClusterSchedulerBackend( val (executorId, pod) = allocateNewExecutorPod() logInfo(s"Allocated executor $executorId") runningExecutorPods += ((executorId, pod)) - val podReadyFuture = SettableFuture.create[Pod] - val podWatcher = new ExecutorPodReadyWatcher(podReadyFuture) - val watchConnectionManager = kubernetesClient - .pods() - .withName(pod.getMetadata.getName) - .watch(podWatcher) - executorWatchResources += ((executorId, watchConnectionManager)) } } totalExpectedExecutors.set(requestedTotal) @@ -267,8 +268,8 @@ private[spark] class KubernetesClusterSchedulerBackend( EXECUTOR_MODIFICATION_LOCK.synchronized { for (executor <- executorIds) { runningExecutorPods.remove(executor) match { - case Some(pod) => kubernetesClient.pods().delete(pod) - executorWatchResources.remove(executor).foreach(_.close) + case Some(pod) => + kubernetesClient.pods().delete(pod) executorPodsByIPs.remove(pod.getStatus.getPodIP) case None => logWarning(s"Unable to remove pod for unknown executor $executor") } @@ -283,17 +284,10 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - private class ExecutorPodReadyWatcher(resolvedExecutorPod: SettableFuture[Pod]) - extends Watcher[Pod] { + private class ExecutorPodsReadyWatcher extends Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { - if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" - && !resolvedExecutorPod.isDone) { - pod.getStatus - .getContainerStatuses - .asScala - .find(status => status.getReady) - .foreach { _ => resolvedExecutorPod.set(pod) } + if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running") { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP val clusterNodeName = pod.getSpec.getNodeName From f56f3f96f01f59b992ae9b89be41d0e458122c70 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Wed, 5 Apr 2017 17:52:25 -0700 Subject: [PATCH 06/13] Fix import ordering --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index c99c6fd2b2306..1e7570a787cd6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -19,14 +19,15 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, QuantityBuilder} import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder import org.apache.spark.deploy.kubernetes.config._ @@ -36,7 +37,6 @@ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} - private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, val sc: SparkContext) From 177e1ebca922c4755143bba98b89eb95316e64b8 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 6 Apr 2017 17:48:02 -0700 Subject: [PATCH 07/13] Delete executor pods in watcher --- .../KubernetesClusterSchedulerBackend.scala | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 1e7570a787cd6..3194d9c2fd817 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -44,8 +44,10 @@ private[spark] class KubernetesClusterSchedulerBackend( import KubernetesClusterSchedulerBackend._ - private val EXECUTOR_MODIFICATION_LOCK = new Object + private val RUNNING_EXECUTOR_PODS_LOCK = new Object private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. + + private val EXECUTOR_PODS_BY_IPS_LOCK = new Object private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) @@ -133,7 +135,7 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) - .watch(new ExecutorPodsReadyWatcher())) + .watch(new ExecutorPodsWatcher())) if (!Utils.isDynamicAllocationEnabled(sc.conf)) { doRequestTotalExecutors(initialExecutors) } @@ -144,8 +146,10 @@ private[spark] class KubernetesClusterSchedulerBackend( // When using Utils.tryLogNonFatalError some of the code fails but without any logs or // indication as to why. try { - EXECUTOR_MODIFICATION_LOCK.synchronized { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + } + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { executorPodsByIPs.clear() } val resource = executorWatchResource.getAndSet(null) @@ -256,15 +260,13 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { - EXECUTOR_MODIFICATION_LOCK.synchronized { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { if (requestedTotal > totalExpectedExecutors.get) { logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" + s" additional executors, expecting total $requestedTotal and currently" + s" expected ${totalExpectedExecutors.get}") for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { - val (executorId, pod) = allocateNewExecutorPod() - logInfo(s"Allocated executor $executorId") - runningExecutorPods += ((executorId, pod)) + runningExecutorPods += allocateNewExecutorPod() } } totalExpectedExecutors.set(requestedTotal) @@ -273,12 +275,10 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { - EXECUTOR_MODIFICATION_LOCK.synchronized { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (executor <- executorIds) { runningExecutorPods.remove(executor) match { - case Some(pod) => - kubernetesClient.pods().delete(pod) - executorPodsByIPs.remove(pod.getStatus.getPodIP) + case Some(pod) => kubernetesClient.pods().delete(pod) case None => logWarning(s"Unable to remove pod for unknown executor $executor") } } @@ -287,27 +287,35 @@ private[spark] class KubernetesClusterSchedulerBackend( } def getExecutorPodByIP(podIP: String): Option[Pod] = { - EXECUTOR_MODIFICATION_LOCK.synchronized { + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { executorPodsByIPs.get(podIP) } } - private class ExecutorPodsReadyWatcher extends Watcher[Pod] { + private class ExecutorPodsWatcher extends Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { - if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running") { + if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" + && pod.getMetadata.getDeletionTimestamp == null) { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP val clusterNodeName = pod.getSpec.getNodeName - logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") - EXECUTOR_MODIFICATION_LOCK.synchronized { + logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { executorPodsByIPs += ((podIP, pod)) } + } else if (action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) { + val podName = pod.getMetadata.getName + val podIP = pod.getStatus.getPodIP + logDebug(s"Executor pod $podName at IP $podIP was deleted.") + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs -= podIP + } } } override def onClose(cause: KubernetesClientException): Unit = { - logDebug("Executor pod readiness watch closed.", cause) + logDebug("Executor pod watch closed.", cause) } } } From a94522a575bd986de9a73ff5260d85fe6b525e65 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 6 Apr 2017 22:47:26 -0700 Subject: [PATCH 08/13] Fix the driver hang by unblocking the main thread --- .../kubernetes/KubernetesClientBuilder.scala | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 554ed17ff25c4..7760b33f357d2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -17,11 +17,13 @@ package org.apache.spark.deploy.kubernetes import java.io.File +import java.util.concurrent.{ThreadFactory, ThreadPoolExecutor} import com.google.common.base.Charsets import com.google.common.io.Files +import io.fabric8.kubernetes.client.utils.HttpClientUtils import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} - +import okhttp3.Dispatcher import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -78,6 +80,25 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St } serviceAccountConfigBuilder } - new DefaultKubernetesClient(configBuilder.build) + val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor] + // Set threads to be daemons in order to allow the driver main thread + // to shut down upon errors. Otherwise the driver will hang indefinitely. + threadPoolExecutor.setThreadFactory(new ThreadFactory { + override def newThread(r: Runnable): Thread = { + val thread = new Thread(r, "spark-on-k8s") + thread.setDaemon(true) + thread + } + }) + // Disable the ping thread that is not daemon, in order to allow + // the driver main thread to shut down upon errors. Otherwise, the driver + // will hang indefinitely. + val config = configBuilder + .withWebsocketPingInterval(0) + .build() + val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() + .dispatcher(new Dispatcher(threadPoolExecutor)) + .build() + new DefaultKubernetesClient(httpClient, config) } } From 4a7738e779ce0291f6d88958ff553af9f338e25b Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 6 Apr 2017 23:06:36 -0700 Subject: [PATCH 09/13] Fix import order --- .../spark/deploy/kubernetes/KubernetesClientBuilder.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 7760b33f357d2..cbc5cdd3e2d56 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -21,9 +21,10 @@ import java.util.concurrent.{ThreadFactory, ThreadPoolExecutor} import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.utils.HttpClientUtils import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils import okhttp3.Dispatcher + import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ From fef7ebc955ab6ef31749be1eb295ad6da10107cd Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 21 Apr 2017 17:35:50 -0700 Subject: [PATCH 10/13] Clear runningExecutorPods --- .../cluster/kubernetes/KubernetesClusterSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index f5b1b97712221..7f32a6900d587 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -151,6 +151,7 @@ private[spark] class KubernetesClusterSchedulerBackend( try { RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + runningExecutorPods.clear() } EXECUTOR_PODS_BY_IPS_LOCK.synchronized { executorPodsByIPs.clear() @@ -299,7 +300,6 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) { - val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP val clusterNodeName = pod.getSpec.getNodeName logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") From b3855d651a7f639a25546fd0ca526b0bf5dbdbf7 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 26 Apr 2017 00:35:50 -0400 Subject: [PATCH 11/13] Fix incorrect merge --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 0198f6e78c438..def9d390d0192 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -158,11 +158,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } catch { case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) } - try { - kubernetesClient.services().withName(kubernetesDriverServiceName).delete() - } catch { - case e: Throwable => logError("Uncaught exception while shutting down driver service.", e) - } try { logInfo("Closing kubernetes client") kubernetesClient.close() From 7085995a7e6f378b6e153f4b4b899167984c41da Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Wed, 26 Apr 2017 14:58:00 -0700 Subject: [PATCH 12/13] Address review comments --- .../kubernetes/KubernetesClientBuilder.scala | 16 ++--- .../kubernetes/KubernetesClusterManager.scala | 45 +------------ .../KubernetesClusterSchedulerBackend.scala | 11 ++-- .../KubernetesTaskSchedulerImpl.scala | 27 ++++++++ .../kubernetes/KubernetesTaskSetManager.scala | 63 +++++++++++++++++++ 5 files changed, 104 insertions(+), 58 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala index afbe98d94077d..d73f38924571f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala @@ -24,10 +24,10 @@ import com.google.common.io.Files import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} import io.fabric8.kubernetes.client.utils.HttpClientUtils import okhttp3.Dispatcher - import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.util.ThreadUtils private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) { private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) @@ -81,16 +81,6 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St } serviceAccountConfigBuilder } - val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor] - // Set threads to be daemons in order to allow the driver main thread - // to shut down upon errors. Otherwise the driver will hang indefinitely. - threadPoolExecutor.setThreadFactory(new ThreadFactory { - override def newThread(r: Runnable): Thread = { - val thread = new Thread(r, "spark-on-k8s") - thread.setDaemon(true) - thread - } - }) // Disable the ping thread that is not daemon, in order to allow // the driver main thread to shut down upon errors. Otherwise, the driver // will hang indefinitely. @@ -98,7 +88,9 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St .withWebsocketPingInterval(0) .build() val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() - .dispatcher(new Dispatcher(threadPoolExecutor)) + // Use a Dispatcher with a custom executor service that creates daemon threads. The default + // executor service used by Dispatcher creates non-daemon threads. + .dispatcher(new Dispatcher(ThreadUtils.newDaemonCachedThreadPool("spark-on-k8s"))) .build() new DefaultKubernetesClient(httpClient, config) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 1351c8e93bd11..b745e73575839 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -16,55 +16,16 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.SparkContext -import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, - TaskScheduler, TaskSchedulerImpl, TaskSet, TaskSetManager} +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, + TaskSchedulerImpl} private[spark] class KubernetesClusterManager extends ExternalClusterManager { override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - val scheduler = new TaskSchedulerImpl(sc) { - - override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(sched = this, taskSet, maxTaskFailures) { - - /** - * Overrides the lookup to use not only the executor pod IP, but also the cluster node - * name and host IP address that the pod is running on. The base class may have populated - * the lookup target map with HDFS datanode locations if this task set reads HDFS data. - * Those datanode locations are based on cluster node names or host IP addresses. Using - * only executor pod IPs may not match them. - */ - override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { - var pendingTasks = super.getPendingTasksForHost(executorIP) - if (pendingTasks.nonEmpty) { - return pendingTasks - } - val backend = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].backend.asInstanceOf[ - KubernetesClusterSchedulerBackend] - val pod = backend.getExecutorPodByIP(executorIP) - if (pod.isEmpty) { - return pendingTasks // Empty - } - val clusterNodeName = pod.get.getSpec.getNodeName - val clusterNodeIP = pod.get.getStatus.getHostIP - pendingTasks = super.getPendingTasksForHost(clusterNodeName) - if (pendingTasks.isEmpty) { - pendingTasks = super.getPendingTasksForHost(clusterNodeIP) - } - if (pendingTasks.nonEmpty && log.isDebugEnabled) { - logDebug(s"Got preferred task list $pendingTasks for executor host $executorIP " + - s"using cluster node $clusterNodeName at $clusterNodeIP") - } - pendingTasks - } - } - } - } + val scheduler = new KubernetesTaskSchedulerImpl(sc) sc.taskScheduler = scheduler scheduler } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 7f32a6900d587..9be34097126de 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -306,12 +306,15 @@ private[spark] class KubernetesClusterSchedulerBackend( EXECUTOR_PODS_BY_IPS_LOCK.synchronized { executorPodsByIPs += ((podIP, pod)) } - } else if (action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) { + } else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) || + action == Action.DELETED || action == Action.ERROR) { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP - logDebug(s"Executor pod $podName at IP $podIP was deleted.") - EXECUTOR_PODS_BY_IPS_LOCK.synchronized { - executorPodsByIPs -= podIP + logDebug(s"Executor pod $podName at IP $podIP was at $action.") + if (podIP != null) { + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs -= podIP + } } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala new file mode 100644 index 0000000000000..a5e126480b83d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} + +private[spark] class KubernetesTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) { + + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + new KubernetesTaskSetManager(this, taskSet, maxTaskFailures) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala new file mode 100644 index 0000000000000..5cea95be382f0 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} + +private[spark] class KubernetesTaskSetManager( + sched: TaskSchedulerImpl, + taskSet: TaskSet, + maxTaskFailures: Int) extends TaskSetManager(sched, taskSet, maxTaskFailures) { + + /** + * Overrides the lookup to use not only the executor pod IP, but also the cluster node + * name and host IP address that the pod is running on. The base class may have populated + * the lookup target map with HDFS datanode locations if this task set reads HDFS data. + * Those datanode locations are based on cluster node names or host IP addresses. Using + * only executor pod IPs may not match them. + */ + override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { + val pendingTasksExecutorIP = super.getPendingTasksForHost(executorIP) + if (pendingTasksExecutorIP.nonEmpty) { + pendingTasksExecutorIP + } else { + val backend = sched.backend.asInstanceOf[KubernetesClusterSchedulerBackend] + val pod = backend.getExecutorPodByIP(executorIP) + if (pod.nonEmpty) { + val clusterNodeName = pod.get.getSpec.getNodeName + val pendingTasksClusterNodeName = super.getPendingTasksForHost(clusterNodeName) + if (pendingTasksClusterNodeName.nonEmpty) { + logDebug(s"Got preferred task list $pendingTasksClusterNodeName for executor host " + + s"$executorIP using cluster node name $clusterNodeName") + pendingTasksClusterNodeName + } else { + val clusterNodeIP = pod.get.getStatus.getHostIP + val pendingTasksClusterNodeIP = super.getPendingTasksForHost(clusterNodeIP) + if (pendingTasksClusterNodeIP.nonEmpty) { + logDebug(s"Got preferred task list $pendingTasksClusterNodeIP for executor host " + + s"$executorIP using cluster node IP $clusterNodeIP") + } + pendingTasksClusterNodeIP + } + } else { + pendingTasksExecutorIP // Empty + } + } + } +} From dc0755a2b8f6e41ac58a948d5c6b2d87613392f8 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Wed, 26 Apr 2017 15:04:45 -0700 Subject: [PATCH 13/13] Clean up imports --- .../scheduler/cluster/kubernetes/KubernetesClientBuilder.scala | 2 +- .../cluster/kubernetes/KubernetesClusterManager.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala index d73f38924571f..31c6eda77d058 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala @@ -17,13 +17,13 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.File -import java.util.concurrent.{ThreadFactory, ThreadPoolExecutor} import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} import io.fabric8.kubernetes.client.utils.HttpClientUtils import okhttp3.Dispatcher + import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index b745e73575839..70098f1f46ac0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -17,8 +17,7 @@ package org.apache.spark.scheduler.cluster.kubernetes import org.apache.spark.SparkContext -import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, - TaskSchedulerImpl} +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} private[spark] class KubernetesClusterManager extends ExternalClusterManager {