Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskId: String,
reason: String): Unit = {
stateLock.synchronized {
removeExecutor(taskId, SlaveLost(reason))
// Do not call removeExecutor() after this scheduler backend was stopped because
// removeExecutor() internally will send a message to the driver endpoint but
// the driver endpoint is not available now, otherwise an exception will be thrown.
if (!stopCalled) {
removeExecutor(taskId, SlaveLost(reason))
}
slaves(slaveId).taskIDs.remove(taskId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Collections

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
Expand All @@ -34,6 +35,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.TaskSchedulerImpl

class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
Expand All @@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var backend: MesosCoarseGrainedSchedulerBackend = _
private var externalShuffleClient: MesosExternalShuffleClient = _
private var driverEndpoint: RpcEndpointRef = _
@volatile private var stopCalled = false

test("mesos supports killing and limiting executors") {
setBackend()
Expand Down Expand Up @@ -252,6 +255,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}

test("Do not call removeExecutor() after backend is stopped") {
setBackend()

// launches a task on a valid offer
val offers = List((backend.executorMemory(sc), 1))
offerResources(offers)
verifyTaskLaunched("o1")

// launches a thread simulating status update
val statusUpdateThread = new Thread {
override def run(): Unit = {
while (!stopCalled) {
Thread.sleep(100)
}

val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
backend.statusUpdate(driver, status)
}
}.start

backend.stop()
// Any method of the backend involving sending messages to the driver endpoint should not
// be called after the backend is stopped.
verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
}

private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
Expand Down Expand Up @@ -350,6 +379,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
mesosDriver = newDriver
}

override def stopExecutors(): Unit = {
stopCalled = true
}

markRegistered()
}
backend.start()
Expand Down