Skip to content

Commit 2285de7

Browse files
sun-ruisrowen
authored andcommitted
[SPARK-16522][MESOS] Spark application throws exception on exit.
This is backport of #14175 to branch 2.0 Author: Sun Rui <[email protected]> Closes #14575 from sun-rui/SPARK-16522-branch-2.0.
1 parent 475ee38 commit 2285de7

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
552552
taskId: String,
553553
reason: String): Unit = {
554554
stateLock.synchronized {
555-
removeExecutor(taskId, SlaveLost(reason))
555+
// Do not call removeExecutor() after this scheduler backend was stopped because
556+
// removeExecutor() internally will send a message to the driver endpoint but
557+
// the driver endpoint is not available now, otherwise an exception will be thrown.
558+
if (!stopCalled) {
559+
removeExecutor(taskId, SlaveLost(reason))
560+
}
556561
slaves(slaveId).taskIDs.remove(taskId)
557562
}
558563
}

core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.Collections
2121

2222
import scala.collection.JavaConverters._
2323
import scala.collection.mutable.ArrayBuffer
24+
import scala.reflect.ClassTag
2425

2526
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
2627
import org.apache.mesos.Protos._
@@ -34,6 +35,7 @@ import org.scalatest.BeforeAndAfter
3435
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
3536
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
3637
import org.apache.spark.rpc.RpcEndpointRef
38+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
3739
import org.apache.spark.scheduler.TaskSchedulerImpl
3840

3941
class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
4749
private var backend: MesosCoarseGrainedSchedulerBackend = _
4850
private var externalShuffleClient: MesosExternalShuffleClient = _
4951
private var driverEndpoint: RpcEndpointRef = _
52+
@volatile private var stopCalled = false
5053

5154
test("mesos supports killing and limiting executors") {
5255
setBackend()
@@ -252,6 +255,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
252255
backend.start()
253256
}
254257

258+
test("Do not call removeExecutor() after backend is stopped") {
259+
setBackend()
260+
261+
// launches a task on a valid offer
262+
val offers = List((backend.executorMemory(sc), 1))
263+
offerResources(offers)
264+
verifyTaskLaunched("o1")
265+
266+
// launches a thread simulating status update
267+
val statusUpdateThread = new Thread {
268+
override def run(): Unit = {
269+
while (!stopCalled) {
270+
Thread.sleep(100)
271+
}
272+
273+
val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
274+
backend.statusUpdate(driver, status)
275+
}
276+
}.start
277+
278+
backend.stop()
279+
// Any method of the backend involving sending messages to the driver endpoint should not
280+
// be called after the backend is stopped.
281+
verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
282+
}
283+
255284
private def verifyDeclinedOffer(driver: SchedulerDriver,
256285
offerId: OfferID,
257286
filter: Boolean = false): Unit = {
@@ -350,6 +379,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
350379
mesosDriver = newDriver
351380
}
352381

382+
override def stopExecutors(): Unit = {
383+
stopCalled = true
384+
}
385+
353386
markRegistered()
354387
}
355388
backend.start()

0 commit comments

Comments
 (0)