From 89c438c074b8ed8713551c7a32e21a6a91367050 Mon Sep 17 00:00:00 2001 From: xinglwang Date: Fri, 14 Dec 2018 20:19:44 +0800 Subject: [PATCH 1/8] [SPARK-25922][K8] Spark Driver/Executor spark-app-selector label mismatch ## What changes were proposed in this pull request? In K8S Cluster mode, the algorithm to genereate spark-app-selector/spark.app.id of spark driver is different with spark executor. This patch consolidated the algorithm for dirver and executor to have a universial logic. This will help to monitor resource consumuption from K8S perspective. ## How was this patch tested? Manually run. --- .../spark/deploy/k8s/KubernetesUtils.scala | 8 +++- .../submit/KubernetesClientApplication.scala | 2 +- .../KubernetesClusterSchedulerBackend.scala | 41 ++++++++++++++++++- 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 6fafac3ee13c9..4d526fa09982a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -17,12 +17,11 @@ package org.apache.spark.deploy.k8s import java.io.File +import java.util.UUID import scala.collection.JavaConverters._ - import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient - import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -205,4 +204,9 @@ private[spark] object KubernetesUtils extends Logging { def formatTime(time: String): String = { if (time != null) time else "N/A" } + + def generateAppId(): String = { + s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 3888778bf84ca..5383e983a0b4e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -207,7 +207,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { // to be added as a label to group resources belonging to the same application. Label values are // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. - val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" + val kubernetesAppId = KubernetesUtils.generateAppId() val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 68f6f2e46e316..07770000bf183 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -18,12 +18,14 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.ExecutorService -import io.fabric8.kubernetes.client.KubernetesClient import scala.concurrent.{ExecutionContext, Future} +import io.fabric8.kubernetes.client.KubernetesClient + import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesUtils import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} @@ -60,6 +62,43 @@ private[spark] class KubernetesClusterSchedulerBackend( removeExecutor(executorId, reason) } + /** + * Get an application ID associated with the job. + * This returns the string value of [[appId]] if set, otherwise + * the locally-generated ID from the superclass. + * @return The application ID + */ + + var appId: Option[String] = None; + + override def applicationId(): String = { + + appId.map(_.toString).getOrElse { + logInfo("Initializing Application ID.") + bindApplicationId(); + appId.get + } + } + + def bindApplicationId(): Unit = { + val appIdString = { + val wasSparkSubmittedInClusterMode = conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) + + // cluster mode: get appId from driver env + if (wasSparkSubmittedInClusterMode) { + val sparkAppId = conf.getOption("spark.app.id") + sparkAppId.map(_.toString).getOrElse { + logWarning("Application ID is not initialized yet in cluster mode.") + super.applicationId + } + } else { + // client mode: generate new appId + KubernetesUtils.generateAppId() + } + } + appId = Some(appIdString) + } + override def start(): Unit = { super.start() if (!Utils.isDynamicAllocationEnabled(conf)) { From ed2c4caedcafb323dfef5ba01456f60f286fccd5 Mon Sep 17 00:00:00 2001 From: suxingfate Date: Sat, 15 Dec 2018 10:52:18 +0800 Subject: [PATCH 2/8] Simplify the logic to use spark.app.id if set otherwise use superclass. applicationId --- .../spark/deploy/k8s/KubernetesUtils.scala | 5 --- .../submit/KubernetesClientApplication.scala | 2 +- .../KubernetesClusterSchedulerBackend.scala | 33 ++++--------------- 3 files changed, 7 insertions(+), 33 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 4d526fa09982a..037542e9559a5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -204,9 +204,4 @@ private[spark] object KubernetesUtils extends Logging { def formatTime(time: String): String = { if (time != null) time else "N/A" } - - def generateAppId(): String = { - s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" - } - } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 5383e983a0b4e..3888778bf84ca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -207,7 +207,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { // to be added as a label to group resources belonging to the same application. Label values are // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. - val kubernetesAppId = KubernetesUtils.generateAppId() + val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 07770000bf183..11df1ce2df49c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -64,39 +64,18 @@ private[spark] class KubernetesClusterSchedulerBackend( /** * Get an application ID associated with the job. - * This returns the string value of [[appId]] if set, otherwise + * This returns the string value of spark.app.id if set, otherwise * the locally-generated ID from the superclass. * @return The application ID */ - - var appId: Option[String] = None; - override def applicationId(): String = { - - appId.map(_.toString).getOrElse { - logInfo("Initializing Application ID.") - bindApplicationId(); - appId.get - } - } - - def bindApplicationId(): Unit = { - val appIdString = { - val wasSparkSubmittedInClusterMode = conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) - - // cluster mode: get appId from driver env - if (wasSparkSubmittedInClusterMode) { - val sparkAppId = conf.getOption("spark.app.id") - sparkAppId.map(_.toString).getOrElse { - logWarning("Application ID is not initialized yet in cluster mode.") - super.applicationId - } - } else { - // client mode: generate new appId - KubernetesUtils.generateAppId() + val appId = { + val sparkAppId = conf.getOption("spark.app.id") + sparkAppId.map(_.toString).getOrElse { + super.applicationId } } - appId = Some(appIdString) + appId } override def start(): Unit = { From 5ed8e1a20789e0ead10bab95a5d3bc02f43c7150 Mon Sep 17 00:00:00 2001 From: suxingfate Date: Sat, 15 Dec 2018 11:01:34 +0800 Subject: [PATCH 3/8] remove unnessesary KubernetesUtils import in KubernetesClusterSchedulerBackend.scala --- .../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 11df1ce2df49c..055ead68d406e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -25,7 +25,6 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.KubernetesUtils import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} From db3fcebd25b803e7b4358201997ecc3f995642b1 Mon Sep 17 00:00:00 2001 From: suxingfate Date: Sat, 15 Dec 2018 11:06:23 +0800 Subject: [PATCH 4/8] remove unnessesary UUID import in KubernetesUtils.scala --- .../scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 037542e9559a5..6fafac3ee13c9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -17,11 +17,12 @@ package org.apache.spark.deploy.k8s import java.io.File -import java.util.UUID import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient + import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils From af7342746cb5374057c68f974fe99896c4dec672 Mon Sep 17 00:00:00 2001 From: suxingfate Date: Sat, 15 Dec 2018 11:10:46 +0800 Subject: [PATCH 5/8] refactor applicationId to be neat as one line --- .../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 055ead68d406e..9c55163cbece1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -68,13 +68,7 @@ private[spark] class KubernetesClusterSchedulerBackend( * @return The application ID */ override def applicationId(): String = { - val appId = { - val sparkAppId = conf.getOption("spark.app.id") - sparkAppId.map(_.toString).getOrElse { - super.applicationId - } - } - appId + conf.getOption("spark.app.id").map(_.toString).getOrElse {super.applicationId} } override def start(): Unit = { From 778acb4781b25bffaa568055329d7103660b9572 Mon Sep 17 00:00:00 2001 From: suxingfate Date: Sat, 15 Dec 2018 11:34:22 +0800 Subject: [PATCH 6/8] add unit testing to test applicationId in case spark.app.id is set --- .../k8s/KubernetesClusterSchedulerBackendSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 75232f7b98b04..89a285c80a5c2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -36,7 +36,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn private val requestExecutorsService = new DeterministicScheduler() private val sparkConf = new SparkConf(false) - .set("spark.executor.instances", "3") + .set("spark.executor.instances", "3").set("spark.app.id", TEST_SPARK_APP_ID) @Mock private var sc: SparkContext = _ @@ -100,9 +100,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn podAllocator, lifecycleEventHandler, watchEvents, - pollEvents) { - override def applicationId(): String = TEST_SPARK_APP_ID - } + pollEvents) } test("Start all components") { From bf4bc0e0cae67cfe06829fb80eb17a8b7e57c70c Mon Sep 17 00:00:00 2001 From: suxingfate Date: Sat, 15 Dec 2018 15:29:54 +0800 Subject: [PATCH 7/8] update style for getOrElse --- .../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 9c55163cbece1..604ad5fa88010 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -68,7 +68,7 @@ private[spark] class KubernetesClusterSchedulerBackend( * @return The application ID */ override def applicationId(): String = { - conf.getOption("spark.app.id").map(_.toString).getOrElse {super.applicationId} + conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId) } override def start(): Unit = { From 45ded646aa13f832b69e90621a8df994cea193ce Mon Sep 17 00:00:00 2001 From: suxingfate Date: Sat, 15 Dec 2018 16:49:43 +0800 Subject: [PATCH 8/8] run ./dev/scalafmt to reformat --- .../KubernetesClusterSchedulerBackend.scala | 25 +++++++++++-------- ...bernetesClusterSchedulerBackendSuite.scala | 12 +++++---- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 604ad5fa88010..03f5da2bb0bce 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -40,10 +40,10 @@ private[spark] class KubernetesClusterSchedulerBackend( lifecycleEventHandler: ExecutorPodsLifecycleManager, watchEvents: ExecutorPodsWatchSnapshotSource, pollEvents: ExecutorPodsPollingSnapshotSource) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { - private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( - requestExecutorsService) + private implicit val requestExecutorContext = + ExecutionContext.fromExecutorService(requestExecutorsService) protected override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { @@ -62,11 +62,12 @@ private[spark] class KubernetesClusterSchedulerBackend( } /** - * Get an application ID associated with the job. - * This returns the string value of spark.app.id if set, otherwise - * the locally-generated ID from the superclass. - * @return The application ID - */ + * Get an application ID associated with the job. + * This returns the string value of spark.app.id if set, otherwise + * the locally-generated ID from the superclass. + * + * @return The application ID + */ override def applicationId(): String = { conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId) } @@ -99,7 +100,8 @@ private[spark] class KubernetesClusterSchedulerBackend( if (shouldDeleteExecutors) { Utils.tryLogNonFatalError { - kubernetesClient.pods() + kubernetesClient + .pods() .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .delete() @@ -131,7 +133,8 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { - kubernetesClient.pods() + kubernetesClient + .pods() .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*) @@ -144,7 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { + extends DriverEndpoint(rpcEnv, sparkProperties) { override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 89a285c80a5c2..6e182bed459f8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -36,7 +36,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn private val requestExecutorsService = new DeterministicScheduler() private val sparkConf = new SparkConf(false) - .set("spark.executor.instances", "3").set("spark.app.id", TEST_SPARK_APP_ID) + .set("spark.executor.instances", "3") + .set("spark.app.id", TEST_SPARK_APP_ID) @Mock private var sc: SparkContext = _ @@ -87,8 +88,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn when(sc.env).thenReturn(env) when(env.rpcEnv).thenReturn(rpcEnv) driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint]) - when(rpcEnv.setupEndpoint( - mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture())) + when( + rpcEnv.setupEndpoint( + mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), + driverEndpoint.capture())) .thenReturn(driverEndpointRef) when(kubernetesClient.pods()).thenReturn(podOperations) schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend( @@ -125,8 +128,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn test("Remove executor") { schedulerBackendUnderTest.start() - schedulerBackendUnderTest.doRemoveExecutor( - "1", ExecutorKilled) + schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled) verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled)) }