diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b766e4148e496..30df8862c3589 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -221,7 +221,7 @@ private[spark] class TaskSetManager( * Return the pending tasks list for a given host, or an empty list if * there is no map entry for that host */ - private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { + protected def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { pendingTasksForHost.getOrElse(host, ArrayBuffer()) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala index 6725992aae978..31c6eda77d058 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala @@ -21,10 +21,13 @@ import java.io.File import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils +import okhttp3.Dispatcher import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.util.ThreadUtils private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) { private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) @@ -78,6 +81,17 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St } serviceAccountConfigBuilder } - new DefaultKubernetesClient(configBuilder.build) + // Disable the ping thread that is not daemon, in order to allow + // the driver main thread to shut down upon errors. Otherwise, the driver + // will hang indefinitely. + val config = configBuilder + .withWebsocketPingInterval(0) + .build() + val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() + // Use a Dispatcher with a custom executor service that creates daemon threads. The default + // executor service used by Dispatcher creates non-daemon threads. + .dispatcher(new Dispatcher(ThreadUtils.newDaemonCachedThreadPool("spark-on-k8s"))) + .build() + new DefaultKubernetesClient(httpClient, config) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 36f7149a832c3..70098f1f46ac0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -24,7 +24,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - val scheduler = new TaskSchedulerImpl(sc) + val scheduler = new KubernetesTaskSchedulerImpl(sc) sc.taskScheduler = scheduler scheduler } @@ -37,6 +37,5 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } - } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 15457db7e1459..a2294a6766980 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,13 +16,18 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import java.io.Closeable +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, - EnvVarSourceBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} +import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, + EnvVarSourceBuilder, Pod, QuantityBuilder} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -38,8 +43,11 @@ private[spark] class KubernetesClusterSchedulerBackend( import KubernetesClusterSchedulerBackend._ - private val EXECUTOR_MODIFICATION_LOCK = new Object - private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. + + private val EXECUTOR_PODS_BY_IPS_LOCK = new Object + private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) @@ -87,6 +95,7 @@ private[spark] class KubernetesClusterSchedulerBackend( super.minRegisteredRatio } + private val executorWatchResource = new AtomicReference[Closeable] protected var totalExpectedExecutors = new AtomicInteger(0) private val driverUrl = RpcEndpointAddress( @@ -119,6 +128,8 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() + executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) + .watch(new ExecutorPodsWatcher())) if (!Utils.isDynamicAllocationEnabled(sc.conf)) { doRequestTotalExecutors(initialExecutors) } @@ -133,11 +144,22 @@ private[spark] class KubernetesClusterSchedulerBackend( // When using Utils.tryLogNonFatalError some of the code fails but without any logs or // indication as to why. try { - runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + runningExecutorPods.clear() + } + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs.clear() + } + val resource = executorWatchResource.getAndSet(null) + if (resource != null) { + resource.close() + } } catch { case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) } try { + logInfo("Closing kubernetes client") kubernetesClient.close() } catch { case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) @@ -231,7 +253,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { - EXECUTOR_MODIFICATION_LOCK.synchronized { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { if (requestedTotal > totalExpectedExecutors.get) { logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" + s" additional executors, expecting total $requestedTotal and currently" + @@ -246,7 +268,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { - EXECUTOR_MODIFICATION_LOCK.synchronized { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (executor <- executorIds) { runningExecutorPods.remove(executor) match { case Some(pod) => kubernetesClient.pods().delete(pod) @@ -256,6 +278,41 @@ private[spark] class KubernetesClusterSchedulerBackend( } true } + + def getExecutorPodByIP(podIP: String): Option[Pod] = { + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs.get(podIP) + } + } + + private class ExecutorPodsWatcher extends Watcher[Pod] { + + override def eventReceived(action: Action, pod: Pod): Unit = { + if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" + && pod.getMetadata.getDeletionTimestamp == null) { + val podIP = pod.getStatus.getPodIP + val clusterNodeName = pod.getSpec.getNodeName + logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs += ((podIP, pod)) + } + } else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) || + action == Action.DELETED || action == Action.ERROR) { + val podName = pod.getMetadata.getName + val podIP = pod.getStatus.getPodIP + logDebug(s"Executor pod $podName at IP $podIP was at $action.") + if (podIP != null) { + EXECUTOR_PODS_BY_IPS_LOCK.synchronized { + executorPodsByIPs -= podIP + } + } + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logDebug("Executor pod watch closed.", cause) + } + } } private object KubernetesClusterSchedulerBackend { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala new file mode 100644 index 0000000000000..a5e126480b83d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} + +private[spark] class KubernetesTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) { + + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + new KubernetesTaskSetManager(this, taskSet, maxTaskFailures) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala new file mode 100644 index 0000000000000..5cea95be382f0 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} + +private[spark] class KubernetesTaskSetManager( + sched: TaskSchedulerImpl, + taskSet: TaskSet, + maxTaskFailures: Int) extends TaskSetManager(sched, taskSet, maxTaskFailures) { + + /** + * Overrides the lookup to use not only the executor pod IP, but also the cluster node + * name and host IP address that the pod is running on. The base class may have populated + * the lookup target map with HDFS datanode locations if this task set reads HDFS data. + * Those datanode locations are based on cluster node names or host IP addresses. Using + * only executor pod IPs may not match them. + */ + override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { + val pendingTasksExecutorIP = super.getPendingTasksForHost(executorIP) + if (pendingTasksExecutorIP.nonEmpty) { + pendingTasksExecutorIP + } else { + val backend = sched.backend.asInstanceOf[KubernetesClusterSchedulerBackend] + val pod = backend.getExecutorPodByIP(executorIP) + if (pod.nonEmpty) { + val clusterNodeName = pod.get.getSpec.getNodeName + val pendingTasksClusterNodeName = super.getPendingTasksForHost(clusterNodeName) + if (pendingTasksClusterNodeName.nonEmpty) { + logDebug(s"Got preferred task list $pendingTasksClusterNodeName for executor host " + + s"$executorIP using cluster node name $clusterNodeName") + pendingTasksClusterNodeName + } else { + val clusterNodeIP = pod.get.getStatus.getHostIP + val pendingTasksClusterNodeIP = super.getPendingTasksForHost(clusterNodeIP) + if (pendingTasksClusterNodeIP.nonEmpty) { + logDebug(s"Got preferred task list $pendingTasksClusterNodeIP for executor host " + + s"$executorIP using cluster node IP $clusterNodeIP") + } + pendingTasksClusterNodeIP + } + } else { + pendingTasksExecutorIP // Empty + } + } + } +}