Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskId: String,
reason: String): Unit = {
stateLock.synchronized {
removeExecutor(taskId, SlaveLost(reason))
// Do not call removeExecutor() after this scheduler backend was stopped because

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment needs to be on the super class's removeExecutor method. All clients need to be aware of when they're allowed to call it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only removeExecutor(), but also other methods, like reviveOffers(), killTask(), ..., should not be called after stopped. If you prefer adding comment in the parent class, then it seems it is more complete to add comment to all methods that may encounter such case. However, I don't think it is necessary to do so, as exceptions will be thrown in such case notifying the caller it is not valid to do such calls, just as why this issue was found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgummelt, what's your opinion?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this going in as-is just to the get the problem solved, but I do still think that classes should try to ensure that their public methods are callable w/o state consideration, so I would have rather we fixed this in the parent. Let's try to maintain that going forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about submitting another JIRA issue on better handling of state management after stop() is called for CoarseGrainedSchedulerBackend?

// removeExecutor() internally will send a message to the driver endpoint but
// the driver endpoint is not available now, otherwise an exception will be thrown.
if (!stopCalled) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be adding the guard here. It's the parent class that's incorrectly making a request to the driverEndpoint despite the driverEndpoint being shut down. So it's the parent class that should add the guard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add the guard in the parent class, namely CoarseGrainedSchedulerBackend, what's the appropriate behavior of the guard? Silently ignore all message requests after stop() is called and log warnings, or throw an exception? If latter, then the call to removeExecutor has to be wrapped with a try.
Since the call to removeExecutor() is done in MesosCoarseGrainedSchedulerBackend, I think current fix is simpler and reasonable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK you've convinced me. But please add a clarifying comment to super.removeExecutor() specifying that it should not becalled after super.stop() is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment added.

removeExecutor(taskId, SlaveLost(reason))
}
slaves(slaveId).taskIDs.remove(taskId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Collections

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
Expand All @@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._

Expand All @@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var backend: MesosCoarseGrainedSchedulerBackend = _
private var externalShuffleClient: MesosExternalShuffleClient = _
private var driverEndpoint: RpcEndpointRef = _
@volatile private var stopCalled = false

test("mesos supports killing and limiting executors") {
setBackend()
Expand Down Expand Up @@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage)
}

test("Do not call removeExecutor() after backend is stopped") {
setBackend()

// launches a task on a valid offer
val offers = List((backend.executorMemory(sc), 1))
offerResources(offers)
verifyTaskLaunched(driver, "o1")

// launches a thread simulating status update
val statusUpdateThread = new Thread {
override def run(): Unit = {
while (!stopCalled) {
Thread.sleep(100)
}

val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
backend.statusUpdate(driver, status)
}
}.start

backend.stop()
// Any method of the backend involving sending messages to the driver endpoint should not
// be called after the backend is stopped.
verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really what we care to test. What we care to test is that no exception is thrown in statusUpdate. Can you check that instead?

Copy link
Contributor Author

@sun-rui sun-rui Aug 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do this. but found that driverEndpoint is mocked, meaning no exception will be thrown in any case. Since the exception is thrown from within RemoveExecutor, I think this has the same purpose.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good point.

}

private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
Expand Down Expand Up @@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
mesosDriver = newDriver
}

override def stopExecutors(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the flag stopCalled in the backend can't be accessed because it is private. Here by overriding stopExecutors() we can set a flag when stop is called.
Another solution is to change the flag stopCalled in the backend to be public for test purpose.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I messed up in my comment here. It's fine as is.

stopCalled = true
}

markRegistered()
}
backend.start()
Expand Down