From a5e579d26a25a998cad878355194d093337ed04c Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 23 Aug 2017 17:08:17 -0700 Subject: [PATCH 1/6] Start unit tests for the scheduler backend. --- .../kubernetes/KubernetesClusterManager.scala | 11 +- .../KubernetesClusterSchedulerBackend.scala | 31 ++-- ...bernetesClusterSchedulerBackendSuite.scala | 136 ++++++++++++++++++ 3 files changed, 157 insertions(+), 21 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index df2d94ec85216..a4f9dd2f2f9b9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { @@ -132,12 +132,15 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit executorInitContainerBootstrap, executorInitContainerSecretVolumePlugin, kubernetesShuffleManager) + val allocatorExecutor = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") new KubernetesClusterSchedulerBackend( - sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], - sc, + scheduler.asInstanceOf[TaskSchedulerImpl], + sc.env.rpcEnv, executorPodFactory, kubernetesShuffleManager, - kubernetesClient) + kubernetesClient, + allocatorExecutor) } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 1593a9a842e98..f537f69c69ff8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable import java.net.InetAddress import java.util.Collections -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import io.fabric8.kubernetes.api.model._ @@ -29,7 +29,7 @@ import scala.collection.{concurrent, mutable} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} -import org.apache.spark.{SparkContext, SparkEnv, SparkException} +import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} @@ -40,11 +40,12 @@ import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, - val sc: SparkContext, + rpcEnv: RpcEnv, executorPodFactory: ExecutorPodFactory, shuffleManager: Option[KubernetesExternalShuffleManager], - kubernetesClient: KubernetesClient) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + kubernetesClient: KubernetesClient, + allocatorExecutor: ScheduledExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { import KubernetesClusterSchedulerBackend._ @@ -68,7 +69,6 @@ private[spark] class KubernetesClusterSchedulerBackend( .get(KUBERNETES_DRIVER_POD_NAME) .getOrElse( throw new SparkException("Must specify the driver pod name")) - private val executorPodNamePrefix = conf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) @@ -93,9 +93,9 @@ private[spark] class KubernetesClusterSchedulerBackend( protected var totalExpectedExecutors = new AtomicInteger(0) private val driverUrl = RpcEndpointAddress( - sc.getConf.get("spark.driver.host"), - sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), - CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString private val initialExecutors = getInitialTargetExecutorNumber() @@ -109,9 +109,6 @@ private[spark] class KubernetesClusterSchedulerBackend( s"${KUBERNETES_ALLOCATION_BATCH_SIZE} " + s"is ${podAllocationSize}, should be a positive integer") - private val allocator = ThreadUtils - .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") - private val allocatorRunnable: Runnable = new Runnable { // Number of times we are allowed check for the loss reason for an executor before we give up @@ -214,18 +211,18 @@ private[spark] class KubernetesClusterSchedulerBackend( .withLabel(SPARK_APP_ID_LABEL, applicationId()) .watch(new ExecutorPodsWatcher())) - allocator.scheduleWithFixedDelay( - allocatorRunnable, 0, podAllocationInterval, TimeUnit.SECONDS) + allocatorExecutor.scheduleWithFixedDelay( + allocatorRunnable, 0, podAllocationInterval, TimeUnit.SECONDS) shuffleManager.foreach(_.start(applicationId())) - if (!Utils.isDynamicAllocationEnabled(sc.conf)) { + if (!Utils.isDynamicAllocationEnabled(conf)) { doRequestTotalExecutors(initialExecutors) } } override def stop(): Unit = { // stop allocation of new resources and caches. - allocator.shutdown() + allocatorExecutor.shutdown() shuffleManager.foreach(_.stop()) // send stop message to executors so they shut down cleanly @@ -298,7 +295,7 @@ private[spark] class KubernetesClusterSchedulerBackend( executorId, applicationId(), driverUrl, - sc.conf.getExecutorEnv, + conf.getExecutorEnv, driverPod, nodeToLocalTaskCount) try { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..127cc16c219c5 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import java.util.concurrent.ScheduledExecutorService + +import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList} +import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher} +import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource, Resource} +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.Mockito.{verify, when} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.scheduler.TaskSchedulerImpl + +private[spark] class KubernetesClusterSchedulerBackendSuite + extends SparkFunSuite with BeforeAndAfter { + + private val APP_ID = "test-spark-app" + private val DRIVER_POD_NAME = "spark-driver-pod" + private val NAMESPACE = "test-namespace" + private val SPARK_DRIVER_HOST = "localhost" + private val SPARK_DRIVER_PORT = 7077 + + private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] + private type LABELLED_PODS = FilterWatchListDeletable[ + Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]] + private type IN_NAMESPACE_PODS = NonNamespaceOperation[ + Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] + + @Mock + private var sparkContext: SparkContext = _ + + @Mock + private var taskSchedulerImpl: TaskSchedulerImpl = _ + + @Mock + private var allocatorExecutor: ScheduledExecutorService = _ + + @Mock + private var executorPodFactory: ExecutorPodFactory = _ + + @Mock + private var shuffleManager: KubernetesExternalShuffleManager = _ + + @Mock + private var kubernetesClient: KubernetesClient = _ + + @Mock + private var podOperations: PODS = _ + + @Mock + private var podsWithLabelOperations: LABELLED_PODS = _ + + @Mock + private var podsInNamespace: IN_NAMESPACE_PODS = _ + + @Mock + private var podsWithDriverName: PodResource[Pod, DoneablePod] = _ + + @Mock + private var rpcEnv: RpcEnv = _ + + @Mock + private var executorPodsWatch: Watch = _ + + private var sparkConf: SparkConf = _ + private var executorPodsWatcherArgument: ArgumentCaptor[Watcher[Pod]] = _ + + private val driverPod = new PodBuilder() + .withNewMetadata() + .withName(DRIVER_POD_NAME) + .addToLabels(SPARK_APP_ID_LABEL, APP_ID) + .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE) + .endMetadata() + .build() + + before { + MockitoAnnotations.initMocks(this) + sparkConf = new SparkConf() + .set("spark.app.id", APP_ID) + .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME) + .set(KUBERNETES_NAMESPACE, NAMESPACE) + .set("spark.driver.host", SPARK_DRIVER_HOST) + .set("spark.driver.port", SPARK_DRIVER_PORT.toString) + executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]]) + when(sparkContext.conf).thenReturn(sparkConf) + when(taskSchedulerImpl.sc).thenReturn(sparkContext) + when(kubernetesClient.pods()).thenReturn(podOperations) + when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations) + when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture())) + .thenReturn(executorPodsWatch) + when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace) + when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName) + when(podsWithDriverName.get()).thenReturn(driverPod) + } + + test("Basic lifecycle expectations when starting and stopping the scheduler.") { + val scheduler = newSchedulerBackend(true) + scheduler.start() + verify(shuffleManager).start(APP_ID) + assert(executorPodsWatcherArgument.getValue != null) + scheduler.stop() + verify(shuffleManager).stop() + verify(executorPodsWatch).close() + } + + private def newSchedulerBackend(externalShuffle: Boolean): KubernetesClusterSchedulerBackend = { + new KubernetesClusterSchedulerBackend( + taskSchedulerImpl, + rpcEnv, + executorPodFactory, + if (externalShuffle) Some(shuffleManager) else None, + kubernetesClient, + allocatorExecutor) + } + +} From 5caa5197f61ba782937222d233089b799fde83dc Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 24 Aug 2017 16:24:38 -0700 Subject: [PATCH 2/6] More tests for the scheduler backend. --- .../kubernetes/KubernetesClusterManager.scala | 5 +- .../KubernetesClusterSchedulerBackend.scala | 27 +-- ...bernetesClusterSchedulerBackendSuite.scala | 181 +++++++++++++++++- 3 files changed, 191 insertions(+), 22 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index a4f9dd2f2f9b9..165dd04ad7e4f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -134,13 +134,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit kubernetesShuffleManager) val allocatorExecutor = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") + val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( + "kubernetes-request-executors") new KubernetesClusterSchedulerBackend( scheduler.asInstanceOf[TaskSchedulerImpl], sc.env.rpcEnv, executorPodFactory, kubernetesShuffleManager, kubernetesClient, - allocatorExecutor) + allocatorExecutor, + requestExecutorsService) } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index f537f69c69ff8..c08690cd8db6c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable import java.net.InetAddress import java.util.Collections -import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import io.fabric8.kubernetes.api.model._ @@ -36,7 +36,7 @@ import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, Rpc import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.Utils private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -44,11 +44,13 @@ private[spark] class KubernetesClusterSchedulerBackend( executorPodFactory: ExecutorPodFactory, shuffleManager: Option[KubernetesExternalShuffleManager], kubernetesClient: KubernetesClient, - allocatorExecutor: ScheduledExecutorService) + allocatorExecutor: ScheduledExecutorService, + requestExecutorsService: ExecutorService) extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { import KubernetesClusterSchedulerBackend._ + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) private val RUNNING_EXECUTOR_PODS_LOCK = new Object // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. private val runningExecutorsToPods = new mutable.HashMap[String, Pod] @@ -71,7 +73,7 @@ private[spark] class KubernetesClusterSchedulerBackend( throw new SparkException("Must specify the driver pod name")) private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) + requestExecutorsService) private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). @@ -212,7 +214,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .watch(new ExecutorPodsWatcher())) allocatorExecutor.scheduleWithFixedDelay( - allocatorRunnable, 0, podAllocationInterval, TimeUnit.SECONDS) + allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS) shuffleManager.foreach(_.start(applicationId())) if (!Utils.isDynamicAllocationEnabled(conf)) { @@ -315,11 +317,13 @@ private[spark] class KubernetesClusterSchedulerBackend( override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (executor <- executorIds) { - runningExecutorsToPods.remove(executor) match { - case Some(pod) => - kubernetesClient.pods().delete(pod) - runningPodsToExecutors.remove(pod.getMetadata.getName) - case None => logWarning(s"Unable to remove pod for unknown executor $executor") + val maybeRemovedExecutor = runningExecutorsToPods.remove(executor) + maybeRemovedExecutor.foreach { executorPod => + kubernetesClient.pods().delete(executorPod) + runningPodsToExecutors.remove(executorPod.getMetadata.getName) + } + if (maybeRemovedExecutor.isEmpty) { + logWarning(s"Unable to remove pod for unknown executor $executor") } } } @@ -445,7 +449,7 @@ private[spark] class KubernetesClusterSchedulerBackend( new PartialFunction[Any, Unit]() { override def isDefinedAt(msg: Any): Boolean = { msg match { - case RetrieveSparkAppConfig(executorId) => + case RetrieveSparkAppConfig(_) => shuffleManager.isDefined case _ => false } @@ -475,7 +479,6 @@ private[spark] class KubernetesClusterSchedulerBackend( private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 - private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) private val VMEM_EXCEEDED_EXIT_CODE = -103 private val PMEM_EXCEEDED_EXIT_CODE = -104 private val UNKNOWN_EXIT_CODE = -111 diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala index 127cc16c219c5..3533e61471d97 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala @@ -16,20 +16,25 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList} import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher} -import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource, Resource} -import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} -import org.mockito.Mockito.{verify, when} +import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource} +import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.Matchers.{any, eq => mockitoEq} +import org.mockito.Mockito.{doNothing, never, verify, when} import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar._ +import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.rpc.RpcEnv -import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointAddress, RpcEndpointRef, RpcEnv} +import org.apache.spark.scheduler.{LiveListenerBus, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend private[spark] class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter { @@ -39,22 +44,31 @@ private[spark] class KubernetesClusterSchedulerBackendSuite private val NAMESPACE = "test-namespace" private val SPARK_DRIVER_HOST = "localhost" private val SPARK_DRIVER_PORT = 7077 + private val POD_ALLOCATION_INTERVAL = 60L + private val DRIVER_URL = RpcEndpointAddress( + SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] private type LABELLED_PODS = FilterWatchListDeletable[ - Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]] + Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]] private type IN_NAMESPACE_PODS = NonNamespaceOperation[ - Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] + Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] @Mock private var sparkContext: SparkContext = _ + @Mock + private var listenerBus: LiveListenerBus = _ + @Mock private var taskSchedulerImpl: TaskSchedulerImpl = _ @Mock private var allocatorExecutor: ScheduledExecutorService = _ + @Mock + private var requestExecutorsService: ExecutorService = _ + @Mock private var executorPodFactory: ExecutorPodFactory = _ @@ -79,11 +93,17 @@ private[spark] class KubernetesClusterSchedulerBackendSuite @Mock private var rpcEnv: RpcEnv = _ + @Mock + private var driverEndpointRef: RpcEndpointRef = _ + @Mock private var executorPodsWatch: Watch = _ private var sparkConf: SparkConf = _ private var executorPodsWatcherArgument: ArgumentCaptor[Watcher[Pod]] = _ + private var allocatorRunnable: ArgumentCaptor[Runnable] = _ + private var requestExecutorRunnable: ArgumentCaptor[Runnable] = _ + private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _ private val driverPod = new PodBuilder() .withNewMetadata() @@ -101,8 +121,13 @@ private[spark] class KubernetesClusterSchedulerBackendSuite .set(KUBERNETES_NAMESPACE, NAMESPACE) .set("spark.driver.host", SPARK_DRIVER_HOST) .set("spark.driver.port", SPARK_DRIVER_PORT.toString) + .set(KUBERNETES_ALLOCATION_BATCH_DELAY, POD_ALLOCATION_INTERVAL) executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]]) + allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable]) + requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable]) + driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint]) when(sparkContext.conf).thenReturn(sparkConf) + when(sparkContext.listenerBus).thenReturn(listenerBus) when(taskSchedulerImpl.sc).thenReturn(sparkContext) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations) @@ -111,6 +136,17 @@ private[spark] class KubernetesClusterSchedulerBackendSuite when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace) when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName) when(podsWithDriverName.get()).thenReturn(driverPod) + when(allocatorExecutor.scheduleWithFixedDelay( + allocatorRunnable.capture(), + mockitoEq(0L), + mockitoEq(POD_ALLOCATION_INTERVAL), + mockitoEq(TimeUnit.SECONDS))).thenReturn(null) + // Creating Futures in Scala backed by a Java executor service resolves to running + // ExecutorService#execute (as opposed to submit) + doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture()) + when(rpcEnv.setupEndpoint( + mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture())) + .thenReturn(driverEndpointRef) } test("Basic lifecycle expectations when starting and stopping the scheduler.") { @@ -118,11 +154,137 @@ private[spark] class KubernetesClusterSchedulerBackendSuite scheduler.start() verify(shuffleManager).start(APP_ID) assert(executorPodsWatcherArgument.getValue != null) + assert(allocatorRunnable.getValue != null) scheduler.stop() verify(shuffleManager).stop() verify(executorPodsWatch).close() } + test("Static allocation should request executors upon first allocator run.") { + sparkConf + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) + val scheduler = newSchedulerBackend(true) + scheduler.start() + requestExecutorRunnable.getValue.run() + val firstExecutorPod = new PodBuilder() + .withNewMetadata() + .withName("pod1") + .endMetadata() + .build() + val secondExecutorPod = new PodBuilder() + .withNewMetadata() + .withName("pod2") + .endMetadata() + .build() + when(podOperations.create(firstExecutorPod)) + .thenReturn(firstExecutorPod) + when(podOperations.create(secondExecutorPod)) + .thenReturn(secondExecutorPod) + when(executorPodFactory.createExecutorPod( + "1", + APP_ID, + DRIVER_URL, + sparkConf.getExecutorEnv, + driverPod, + Map.empty)).thenReturn(firstExecutorPod) + when(executorPodFactory.createExecutorPod( + "2", + APP_ID, + DRIVER_URL, + sparkConf.getExecutorEnv, + driverPod, + Map.empty)).thenReturn(secondExecutorPod) + allocatorRunnable.getValue.run() + verify(podOperations).create(firstExecutorPod) + verify(podOperations).create(secondExecutorPod) + } + + test("Killing executors deletes the executor pods") { + sparkConf + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) + val scheduler = newSchedulerBackend(true) + scheduler.start() + requestExecutorRunnable.getValue.run() + val firstExecutorPod = new PodBuilder() + .withNewMetadata() + .withName("pod1") + .endMetadata() + .build() + val secondExecutorPod = new PodBuilder() + .withNewMetadata() + .withName("pod2") + .endMetadata() + .build() + when(podOperations.create(any(classOf[Pod]))) + .thenAnswer(AdditionalAnswers.returnsFirstArg()) + when(executorPodFactory.createExecutorPod( + "1", + APP_ID, + DRIVER_URL, + sparkConf.getExecutorEnv, + driverPod, + Map.empty)).thenReturn(firstExecutorPod) + when(executorPodFactory.createExecutorPod( + "2", + APP_ID, + DRIVER_URL, + sparkConf.getExecutorEnv, + driverPod, + Map.empty)).thenReturn(secondExecutorPod) + allocatorRunnable.getValue.run() + scheduler.doKillExecutors(Seq("1", "2")) + requestExecutorRunnable.getAllValues.asScala.last.run() + verify(podOperations).delete(firstExecutorPod) + verify(podOperations).delete(secondExecutorPod) + } + + test("Executors should be requested in batches.") { + sparkConf + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) + val scheduler = newSchedulerBackend(true) + scheduler.start() + requestExecutorRunnable.getValue.run() + val firstExecutorPod = new PodBuilder() + .withNewMetadata() + .withName("pod1") + .endMetadata() + .build() + val secondExecutorPod = new PodBuilder() + .withNewMetadata() + .withName("pod2") + .endMetadata() + .build() + when(podOperations.create(any(classOf[Pod]))) + .thenAnswer(AdditionalAnswers.returnsFirstArg()) + when(executorPodFactory.createExecutorPod( + "1", + APP_ID, + DRIVER_URL, + sparkConf.getExecutorEnv, + driverPod, + Map.empty)).thenReturn(firstExecutorPod) + when(executorPodFactory.createExecutorPod( + "2", + APP_ID, + DRIVER_URL, + sparkConf.getExecutorEnv, + driverPod, + Map.empty)).thenReturn(secondExecutorPod) + allocatorRunnable.getValue.run() + verify(podOperations).create(firstExecutorPod) + verify(podOperations, never()).create(secondExecutorPod) + val registerFirstExecutorMessage = RegisterExecutor( + "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String]) + when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) + driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) + .apply(registerFirstExecutorMessage) + allocatorRunnable.getValue.run() + verify(podOperations).create(secondExecutorPod) + } + private def newSchedulerBackend(externalShuffle: Boolean): KubernetesClusterSchedulerBackend = { new KubernetesClusterSchedulerBackend( taskSchedulerImpl, @@ -130,7 +292,8 @@ private[spark] class KubernetesClusterSchedulerBackendSuite executorPodFactory, if (externalShuffle) Some(shuffleManager) else None, kubernetesClient, - allocatorExecutor) + allocatorExecutor, + requestExecutorsService) } } From b729789ecb6803f47d7ae36b5df46590c30382b8 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 28 Aug 2017 15:58:26 -0700 Subject: [PATCH 3/6] Unit tests and possible preemptive corrections to failover logic. --- .../KubernetesClusterSchedulerBackend.scala | 115 ++++---- ...bernetesClusterSchedulerBackendSuite.scala | 264 ++++++++++++------ 2 files changed, 238 insertions(+), 141 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index c08690cd8db6c..dea2709a65fbc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -60,10 +60,10 @@ private[spark] class KubernetesClusterSchedulerBackend( private val EXECUTOR_PODS_BY_IPS_LOCK = new Object // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK private val executorPodsByIPs = new mutable.HashMap[String, Pod] - private val failedPods: concurrent.Map[String, ExecutorExited] = new - ConcurrentHashMap[String, ExecutorExited]().asScala - private val executorsToRemove = Collections.newSetFromMap[String]( - new ConcurrentHashMap[String, java.lang.Boolean]()).asScala + private val podsWithKnownExitReasons: concurrent.Map[String, ExecutorExited] = + new ConcurrentHashMap[String, ExecutorExited]().asScala + private val disconnectedPodsByExecutorIdPendingRemoval = + new ConcurrentHashMap[String, Pod]().asScala private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) @@ -111,18 +111,14 @@ private[spark] class KubernetesClusterSchedulerBackend( s"${KUBERNETES_ALLOCATION_BATCH_SIZE} " + s"is ${podAllocationSize}, should be a positive integer") - private val allocatorRunnable: Runnable = new Runnable { + private val allocatorRunnable = new Runnable { - // Number of times we are allowed check for the loss reason for an executor before we give up - // and assume the executor failed for good, and attribute it to a framework fault. - private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10 - private val executorsToRecover = new mutable.HashSet[String] // Maintains a map of executor id to count of checks performed to learn the loss reason // for an executor. - private val executorReasonChecks = new mutable.HashMap[String, Int] + private val executorReasonCheckAttemptCounts = new mutable.HashMap[String, Int] override def run(): Unit = { - removeFailedExecutors() + handleDisconnectedExecutors() RUNNING_EXECUTOR_PODS_LOCK.synchronized { if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) { logDebug("Waiting for pending executors before scaling") @@ -131,7 +127,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } else { val nodeToLocalTaskCount = getNodesWithLocalTaskCounts for (i <- 0 until math.min( - totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) { + totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) { val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount) runningExecutorsToPods.put(executorId, pod) runningPodsToExecutors.put(pod.getMetadata.getName, executorId) @@ -142,43 +138,47 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - def removeFailedExecutors(): Unit = { - val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized { - runningExecutorsToPods.toMap - } - executorsToRemove.foreach { case (executorId) => - localRunningExecutorsToPods.get(executorId).map { pod: Pod => - failedPods.get(pod.getMetadata.getName).map { executorExited: ExecutorExited => - logDebug(s"Removing executor $executorId with loss reason " + executorExited.message) - removeExecutor(executorId, executorExited) - if (!executorExited.exitCausedByApp) { - executorsToRecover.add(executorId) - } - }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId)) - }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId)) - - executorsToRecover.foreach(executorId => { - executorsToRemove -= executorId - executorReasonChecks -= executorId - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - runningExecutorsToPods.remove(executorId).map { pod: Pod => - kubernetesClient.pods().delete(pod) - runningPodsToExecutors.remove(pod.getMetadata.getName) - }.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId")) + def handleDisconnectedExecutors(): Unit = { + // For each disconnected executor, synchronize with the loss reasons that may have been found + // by the executor pod watcher. If the loss reason was discovered by the watcher, + // inform the parent class with removeExecutor. + val disconnectedPodsByExecutorIdPendingRemovalCopy = + Map.empty ++ disconnectedPodsByExecutorIdPendingRemoval + disconnectedPodsByExecutorIdPendingRemovalCopy.foreach { case (executorId, executorPod) => + val knownExitReason = podsWithKnownExitReasons.remove(executorPod.getMetadata.getName) + knownExitReason.fold { + removeExecutorOrIncrementLossReasonCheckCount(executorId) + } { executorExited => + logDebug(s"Removing executor $executorId with loss reason " + executorExited.message) + removeExecutor(executorId, executorExited) + // We keep around executors that have exit conditions caused by the application. This + // allows them to be debugged later on. Otherwise, mark them as to be deleted from the + // the API server. + if (!executorExited.exitCausedByApp) { + deleteExecutorFromApiAndDataStructures(executorId) } - }) - executorsToRecover.clear() + } } } def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = { - val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0) - if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS) { - removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons")) - executorsToRecover.add(executorId) - executorReasonChecks -= executorId + val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0) + if (reasonCheckCount >= MAX_EXECUTOR_LOST_REASON_CHECKS) { + removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons.")) + deleteExecutorFromApiAndDataStructures(executorId) } else { - executorReasonChecks.put(executorId, reasonCheckCount + 1) + executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1) + } + } + + def deleteExecutorFromApiAndDataStructures(executorId: String): Unit = { + disconnectedPodsByExecutorIdPendingRemoval -= executorId + executorReasonCheckAttemptCounts -= executorId + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.remove(executorId).map { pod => + kubernetesClient.pods().delete(pod) + runningPodsToExecutors.remove(pod.getMetadata.getName) + }.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId")) } } } @@ -320,6 +320,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val maybeRemovedExecutor = runningExecutorsToPods.remove(executor) maybeRemovedExecutor.foreach { executorPod => kubernetesClient.pods().delete(executorPod) + disconnectedPodsByExecutorIdPendingRemoval(executor) = executorPod runningPodsToExecutors.remove(executorPod.getMetadata.getName) } if (maybeRemovedExecutor.isEmpty) { @@ -412,17 +413,24 @@ private[spark] class KubernetesClusterSchedulerBackend( // Here we can't be sure that that exit was caused by the application but this seems // to be the right default since we know the pod was not explicitly deleted by // the user. - "Pod exited with following container exit status code " + containerExitStatus + s"Pod ${pod.getMetadata.getName}'s executor container exited with exit status" + + s" code $containerExitStatus." } ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) } - failedPods.put(pod.getMetadata.getName, exitReason) + podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason) } def handleDeletedPod(pod: Pod): Unit = { - val exitReason = ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, - "Pod " + pod.getMetadata.getName + " deleted or lost.") - failedPods.put(pod.getMetadata.getName, exitReason) + val alreadyReleased = isPodAlreadyReleased(pod) + val exitMessage = if (alreadyReleased) { + s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request." + } else { + s"Pod ${pod.getMetadata.getName} deleted or lost." + } + val exitReason = ExecutorExited( + getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage) + podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason) } } @@ -434,12 +442,15 @@ private[spark] class KubernetesClusterSchedulerBackend( rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends DriverEndpoint(rpcEnv, sparkProperties) { - private val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) override def onDisconnected(rpcAddress: RpcAddress): Unit = { addressToExecutorId.get(rpcAddress).foreach { executorId => if (disableExecutor(executorId)) { - executorsToRemove.add(executorId) + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.get(executorId).foreach { pod => + disconnectedPodsByExecutorIdPendingRemoval(executorId) = pod + } + } } } } @@ -478,10 +489,12 @@ private[spark] class KubernetesClusterSchedulerBackend( } private object KubernetesClusterSchedulerBackend { - private val DEFAULT_STATIC_PORT = 10000 private val VMEM_EXCEEDED_EXIT_CODE = -103 private val PMEM_EXCEEDED_EXIT_CODE = -104 private val UNKNOWN_EXIT_CODE = -111 + // Number of times we are allowed check for the loss reason for an executor before we give up + // and assume the executor failed for good, and attribute it to a framework fault. + val MAX_EXECUTOR_LOST_REASON_CHECKS = 10 def memLimitExceededLogMessage(diagnostics: String): String = { s"Pod/Container killed for exceeding memory limits. $diagnostics" + diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala index 3533e61471d97..b30d1c2543bea 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala @@ -20,20 +20,22 @@ import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList} import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource} import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.Matchers.{any, eq => mockitoEq} -import org.mockito.Mockito.{doNothing, never, verify, when} +import org.mockito.Mockito.{doNothing, never, times, verify, when} import org.scalatest.BeforeAndAfter import org.scalatest.mock.MockitoSugar._ import scala.collection.JavaConverters._ +import scala.concurrent.Future import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointAddress, RpcEndpointRef, RpcEnv} -import org.apache.spark.scheduler.{LiveListenerBus, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor +import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpoint, RpcEndpointAddress, RpcEndpointRef, RpcEnv, RpcTimeout} +import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend private[spark] class KubernetesClusterSchedulerBackendSuite @@ -47,6 +49,28 @@ private[spark] class KubernetesClusterSchedulerBackendSuite private val POD_ALLOCATION_INTERVAL = 60L private val DRIVER_URL = RpcEndpointAddress( SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + private val FIRST_EXECUTOR_POD = new PodBuilder() + .withNewMetadata() + .withName("pod1") + .endMetadata() + .withNewSpec() + .withNodeName("node1") + .endSpec() + .withNewStatus() + .withHostIP("192.168.99.100") + .endStatus() + .build() + private val SECOND_EXECUTOR_POD = new PodBuilder() + .withNewMetadata() + .withName("pod2") + .endMetadata() + .withNewSpec() + .withNodeName("node2") + .endSpec() + .withNewStatus() + .withHostIP("192.168.99.101") + .endStatus() + .build() private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] private type LABELLED_PODS = FilterWatchListDeletable[ @@ -147,6 +171,9 @@ private[spark] class KubernetesClusterSchedulerBackendSuite when(rpcEnv.setupEndpoint( mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture())) .thenReturn(driverEndpointRef) + when(driverEndpointRef.ask[Boolean] + (any(classOf[Any])) + (any())).thenReturn(mock[Future[Boolean]]) } test("Basic lifecycle expectations when starting and stopping the scheduler.") { @@ -167,37 +194,12 @@ private[spark] class KubernetesClusterSchedulerBackendSuite val scheduler = newSchedulerBackend(true) scheduler.start() requestExecutorRunnable.getValue.run() - val firstExecutorPod = new PodBuilder() - .withNewMetadata() - .withName("pod1") - .endMetadata() - .build() - val secondExecutorPod = new PodBuilder() - .withNewMetadata() - .withName("pod2") - .endMetadata() - .build() - when(podOperations.create(firstExecutorPod)) - .thenReturn(firstExecutorPod) - when(podOperations.create(secondExecutorPod)) - .thenReturn(secondExecutorPod) - when(executorPodFactory.createExecutorPod( - "1", - APP_ID, - DRIVER_URL, - sparkConf.getExecutorEnv, - driverPod, - Map.empty)).thenReturn(firstExecutorPod) - when(executorPodFactory.createExecutorPod( - "2", - APP_ID, - DRIVER_URL, - sparkConf.getExecutorEnv, - driverPod, - Map.empty)).thenReturn(secondExecutorPod) + expectPodCreationWithId(1, FIRST_EXECUTOR_POD) + expectPodCreationWithId(2, SECOND_EXECUTOR_POD) + when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) allocatorRunnable.getValue.run() - verify(podOperations).create(firstExecutorPod) - verify(podOperations).create(secondExecutorPod) + verify(podOperations).create(FIRST_EXECUTOR_POD) + verify(podOperations).create(SECOND_EXECUTOR_POD) } test("Killing executors deletes the executor pods") { @@ -207,82 +209,140 @@ private[spark] class KubernetesClusterSchedulerBackendSuite val scheduler = newSchedulerBackend(true) scheduler.start() requestExecutorRunnable.getValue.run() - val firstExecutorPod = new PodBuilder() - .withNewMetadata() - .withName("pod1") - .endMetadata() - .build() - val secondExecutorPod = new PodBuilder() - .withNewMetadata() - .withName("pod2") - .endMetadata() - .build() + expectPodCreationWithId(1, FIRST_EXECUTOR_POD) + expectPodCreationWithId(2, SECOND_EXECUTOR_POD) when(podOperations.create(any(classOf[Pod]))) .thenAnswer(AdditionalAnswers.returnsFirstArg()) - when(executorPodFactory.createExecutorPod( - "1", - APP_ID, - DRIVER_URL, - sparkConf.getExecutorEnv, - driverPod, - Map.empty)).thenReturn(firstExecutorPod) - when(executorPodFactory.createExecutorPod( - "2", - APP_ID, - DRIVER_URL, - sparkConf.getExecutorEnv, - driverPod, - Map.empty)).thenReturn(secondExecutorPod) allocatorRunnable.getValue.run() - scheduler.doKillExecutors(Seq("1", "2")) + scheduler.doKillExecutors(Seq("2")) requestExecutorRunnable.getAllValues.asScala.last.run() - verify(podOperations).delete(firstExecutorPod) - verify(podOperations).delete(secondExecutorPod) + verify(podOperations).delete(SECOND_EXECUTOR_POD) + verify(podOperations, never()).delete(FIRST_EXECUTOR_POD) } test("Executors should be requested in batches.") { sparkConf - .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) - .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) val scheduler = newSchedulerBackend(true) scheduler.start() requestExecutorRunnable.getValue.run() - val firstExecutorPod = new PodBuilder() - .withNewMetadata() - .withName("pod1") - .endMetadata() - .build() - val secondExecutorPod = new PodBuilder() - .withNewMetadata() - .withName("pod2") - .endMetadata() - .build() when(podOperations.create(any(classOf[Pod]))) .thenAnswer(AdditionalAnswers.returnsFirstArg()) - when(executorPodFactory.createExecutorPod( - "1", - APP_ID, - DRIVER_URL, - sparkConf.getExecutorEnv, - driverPod, - Map.empty)).thenReturn(firstExecutorPod) - when(executorPodFactory.createExecutorPod( - "2", - APP_ID, - DRIVER_URL, - sparkConf.getExecutorEnv, - driverPod, - Map.empty)).thenReturn(secondExecutorPod) + expectPodCreationWithId(1, FIRST_EXECUTOR_POD) + expectPodCreationWithId(2, SECOND_EXECUTOR_POD) allocatorRunnable.getValue.run() - verify(podOperations).create(firstExecutorPod) - verify(podOperations, never()).create(secondExecutorPod) + verify(podOperations).create(FIRST_EXECUTOR_POD) + verify(podOperations, never()).create(SECOND_EXECUTOR_POD) val registerFirstExecutorMessage = RegisterExecutor( "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String]) when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) .apply(registerFirstExecutorMessage) allocatorRunnable.getValue.run() - verify(podOperations).create(secondExecutorPod) + verify(podOperations).create(SECOND_EXECUTOR_POD) + } + + test("Deleting executors and then running an allocator pass after finding the loss reason" + + " should only delete the pod once.") { + sparkConf + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) + val scheduler = newSchedulerBackend(true) + scheduler.start() + requestExecutorRunnable.getValue.run() + when(podOperations.create(any(classOf[Pod]))) + .thenAnswer(AdditionalAnswers.returnsFirstArg()) + expectPodCreationWithId(1, FIRST_EXECUTOR_POD) + allocatorRunnable.getValue.run() + val executorEndpointRef = mock[RpcEndpointRef] + when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000)) + val registerFirstExecutorMessage = RegisterExecutor( + "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) + when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) + driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) + .apply(registerFirstExecutorMessage) + scheduler.doRequestTotalExecutors(0) + requestExecutorRunnable.getAllValues.asScala.last.run() + scheduler.doKillExecutors(Seq("1")) + requestExecutorRunnable.getAllValues.asScala.last.run() + verify(podOperations, times(1)).delete(FIRST_EXECUTOR_POD) + driverEndpoint.getValue.onDisconnected(executorEndpointRef.address) + + val exitedPod = exitPod(FIRST_EXECUTOR_POD, 0) + executorPodsWatcherArgument.getValue.eventReceived(Action.DELETED, exitedPod) + allocatorRunnable.getValue.run() + verify(podOperations, times(1)).delete(FIRST_EXECUTOR_POD) + verify(driverEndpointRef, times(1)).ask[Boolean]( + RemoveExecutor("1", ExecutorExited( + 0, + exitCausedByApp = false, + s"Container in pod ${exitedPod.getMetadata.getName} exited from" + + s" explicit termination request."))) + } + + test("Executors that disconnect from application errors are noted as exits caused by app.") { + sparkConf + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) + val scheduler = newSchedulerBackend(true) + scheduler.start() + expectPodCreationWithId(1, FIRST_EXECUTOR_POD) + when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) + requestExecutorRunnable.getValue.run() + allocatorRunnable.getValue.run() + val executorEndpointRef = mock[RpcEndpointRef] + when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000)) + val registerFirstExecutorMessage = RegisterExecutor( + "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) + when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) + driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) + .apply(registerFirstExecutorMessage) + driverEndpoint.getValue.onDisconnected(executorEndpointRef.address) + executorPodsWatcherArgument.getValue.eventReceived( + Action.ERROR, exitPod(FIRST_EXECUTOR_POD, 1)) + + expectPodCreationWithId(2, SECOND_EXECUTOR_POD) + scheduler.doRequestTotalExecutors(1) + requestExecutorRunnable.getValue.run() + allocatorRunnable.getAllValues.asScala.last.run() + verify(driverEndpointRef).ask[Boolean]( + RemoveExecutor("1", ExecutorExited( + 1, + exitCausedByApp = true, + s"Pod ${FIRST_EXECUTOR_POD.getMetadata.getName}'s executor container exited with" + + " exit status code 1."))) + verify(podOperations, never()).delete(FIRST_EXECUTOR_POD) + } + + test("Executors should only try to get the loss reason a number of times before giving up and" + + " removing the executor.") { + sparkConf + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) + val scheduler = newSchedulerBackend(true) + scheduler.start() + expectPodCreationWithId(1, FIRST_EXECUTOR_POD) + when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) + requestExecutorRunnable.getValue.run() + allocatorRunnable.getValue.run() + val executorEndpointRef = mock[RpcEndpointRef] + when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000)) + val registerFirstExecutorMessage = RegisterExecutor( + "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) + when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) + driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) + .apply(registerFirstExecutorMessage) + driverEndpoint.getValue.onDisconnected(executorEndpointRef.address) + 1 to KubernetesClusterSchedulerBackend.MAX_EXECUTOR_LOST_REASON_CHECKS foreach { _ => + allocatorRunnable.getValue.run() + verify(podOperations, never()).delete(FIRST_EXECUTOR_POD) + } + expectPodCreationWithId(2, SECOND_EXECUTOR_POD) + allocatorRunnable.getValue.run() + verify(podOperations).delete(FIRST_EXECUTOR_POD) + verify(driverEndpointRef).ask[Boolean]( + RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons."))) } private def newSchedulerBackend(externalShuffle: Boolean): KubernetesClusterSchedulerBackend = { @@ -296,4 +356,28 @@ private[spark] class KubernetesClusterSchedulerBackendSuite requestExecutorsService) } + private def exitPod(basePod: Pod, exitCode: Int): Pod = { + new PodBuilder(FIRST_EXECUTOR_POD) + .editStatus() + .addNewContainerStatus() + .withNewState() + .withNewTerminated() + .withExitCode(exitCode) + .endTerminated() + .endState() + .endContainerStatus() + .endStatus() + .build() + } + + private def expectPodCreationWithId(executorId: Int, expectedPod: Pod): Unit = { + when(executorPodFactory.createExecutorPod( + executorId.toString, + APP_ID, + DRIVER_URL, + sparkConf.getExecutorEnv, + driverPod, + Map.empty)).thenReturn(expectedPod) + } + } From e0936c55d394a18e9fb6facb068996d31b89afd9 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 29 Aug 2017 17:49:36 -0700 Subject: [PATCH 4/6] Address PR comments. --- .../KubernetesClusterSchedulerBackend.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index dea2709a65fbc..9d576e05f3e58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -155,7 +155,7 @@ private[spark] class KubernetesClusterSchedulerBackend( // allows them to be debugged later on. Otherwise, mark them as to be deleted from the // the API server. if (!executorExited.exitCausedByApp) { - deleteExecutorFromApiAndDataStructures(executorId) + deleteExecutorFromClusterAndDataStructures(executorId) } } } @@ -165,13 +165,13 @@ private[spark] class KubernetesClusterSchedulerBackend( val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0) if (reasonCheckCount >= MAX_EXECUTOR_LOST_REASON_CHECKS) { removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons.")) - deleteExecutorFromApiAndDataStructures(executorId) + deleteExecutorFromClusterAndDataStructures(executorId) } else { executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1) } } - def deleteExecutorFromApiAndDataStructures(executorId: String): Unit = { + def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = { disconnectedPodsByExecutorIdPendingRemoval -= executorId executorReasonCheckAttemptCounts -= executorId RUNNING_EXECUTOR_PODS_LOCK.synchronized { @@ -398,10 +398,9 @@ private[spark] class KubernetesClusterSchedulerBackend( } def handleErroredPod(pod: Pod): Unit = { - val alreadyReleased = isPodAlreadyReleased(pod) val containerExitStatus = getExecutorExitStatus(pod) // container was probably actively killed by the driver. - val exitReason = if (alreadyReleased) { + val exitReason = if (isPodAlreadyReleased(pod)) { ExecutorExited(containerExitStatus, exitCausedByApp = false, s"Container in pod " + pod.getMetadata.getName + " exited from explicit termination request.") @@ -422,8 +421,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } def handleDeletedPod(pod: Pod): Unit = { - val alreadyReleased = isPodAlreadyReleased(pod) - val exitMessage = if (alreadyReleased) { + val exitMessage = if (isPodAlreadyReleased(pod)) { s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request." } else { s"Pod ${pod.getMetadata.getName} deleted or lost." From 2dcaa52a7d7a8df88a6bbf313dc1a9e0d30dd0f1 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 30 Aug 2017 15:06:37 -0700 Subject: [PATCH 5/6] Resolve merge conflicts. Move MiB change to ExecutorPodFactory. --- .../cluster/kubernetes/KubernetesClusterSchedulerBackend.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 9d576e05f3e58..54612c80cdf28 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -71,7 +71,6 @@ private[spark] class KubernetesClusterSchedulerBackend( .get(KUBERNETES_DRIVER_POD_NAME) .getOrElse( throw new SparkException("Must specify the driver pod name")) - private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( requestExecutorsService) From e972ccf5f03781c856cc73d0c03a88e224956f5d Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 7 Sep 2017 18:48:05 -0700 Subject: [PATCH 6/6] Revert accidental thread pool name change --- .../scheduler/cluster/kubernetes/KubernetesClusterManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 165dd04ad7e4f..764e351e70286 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -135,7 +135,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val allocatorExecutor = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( - "kubernetes-request-executors") + "kubernetes-executor-requests") new KubernetesClusterSchedulerBackend( scheduler.asInstanceOf[TaskSchedulerImpl], sc.env.rpcEnv,