Skip to content

Conversation

@jsoltren
Copy link
Owner

What changes were proposed in this pull request?

Implement Automatic Killing of Blacklisted Executors - work in progress

How was this patch tested?

testOnly org.apache.spark.scheduler.BlacklistTrackerSuite org.apache.spark.scheduler.TaskSetManagerSuite

Copy link

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks solid. The one tricky part that isn't covered yet is that race between blacklisting a node and then having an executor register on it afterwards.

CoarseGrainedSchedulerBackend can get that via the nodeBlacklist method in TaskScheduler, so hopefully doesn't require making things too tangled.

@DeveloperApi
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))

/**
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Api

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops. Fixed.

* :: DeveloperApo ::
* Request that the cluster manager kill all executors on the specified host.
*
* Note: This is an indication to the cluster manager that the application wishes to adjust
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why adjust downwards? I would have expected kill and replace. That is what we want for blacklisting, anyway.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this means that killExecutorsOnHost has to be updated as well.

@volatile protected var currentExecutorIdCounter = 0

// The set of executors we have on each host.
protected val hostToExecutors = new HashMap[String, HashSet[String]]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoarseGrainedSchedulerBackend already has a TaskSchedulerImpl, which has this functionality in getExecutorsAliveOnHost. I think you should just call that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't executorDataMap contain this information already?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using executorDataMap requires walking through the list of all the executors and picking out the ones that happen to be on a particular host. This is unfortunate on a large cluster.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are you calling "a large cluster"?

Let's say you have 5000 executors. How expensive is this walk? Compared to sending messages over the network to kill the matching executors? Taking into account how often you actually have to call this method?

val host = e._1
hostToExecutors -= host
}
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you switch to using TaskSchedulerImpl.getExecutorsAliveOnHost you won't need this at all, but some comments anyhow on this version just for the thought experiment:

  1. this should be in removeExecutor, so you do it in all cases when the executor is removed
  2. you can use a scala's PartialFunction syntax to unravel the pair a little more cleanly:
hostToExecutors.foreach { case (host, execs) =>
 ...
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remember this for next time.

private[scheduler] class BlacklistTracker (
private val listenerBus: LiveListenerBus,
conf: SparkConf,
sc: Option[SparkContext],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this take an ExecutorAllocationClient instead, to avoid exposing all of SparkContext? also doesn't seem like it should be an option. If the only reason is for tests, you can use a mock in those tests.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I'd rather avoid making changes to SparkContext at all. Especially since this particular API doesn't seem particularly useful for an app developer.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is done.

}
}

test("kill all executors on localhost") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't really have anything to do with Dynamic Allocation, so this is a strange suite to put this test in. Though it does seem to have a useful framework -- perhaps there is a base class to pull out?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll refactor these tests when I start adding more tests for configuration parameters and the like.

*
* @return whether the request is received.
*/
@DeveloperApi
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to add this method anymore

*/
final override def killExecutorsOnHost(host: String): Seq[String] = {
logInfo(s"Requesting to kill any and all executors on host ${host}")
killExecutors(scheduler.getExecutorsAliveOnHost(host).get.toSeq, replace = true, force = true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be possible that scheduler.getExecutorsAliveOnHost returns None -- the executor happens to get removed between when it gets blacklisted and when you end up here.

We might prevent that by the locks we have & the way msgs are processed, but it would be better to be safe on this anyhow:

scheduler.getExecutorsAliveOnHost(host).foreach(killExecutors(_.toSeq, ...))

I am also now wondering if we should kill all the executors immediately, or if we should push a request into one of the internal queues in the driver. Killing itself is async, so maybe it fine like this. I'm not really sure yet, something to keep thinking about.

@jsoltren
Copy link
Owner Author

jsoltren commented Dec 2, 2016

@squito and I chatted some offline about the race he mentioned. To quote:

There is also one race you will need to watch out for:

  1. the driver requests a new executor
  2. the driver then blacklists a node
  3. the cluster manager responds to the earlier request for an executor by giving an executor on the node that is now blacklisted
  4. the driver blacklists the new executor, but never kills it.

We noted that CoarseGrainedSchedulerBackend is itself a singleton class and a driver endpoint for RPC messages. Thus, the easiest way to avoid this race, is to perform the killing of blacklisted executors in CoarseGrainedSchedulerBackend.

Copy link

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you still need to add something in CoarseGrainedSchedulerBackend.receiveAndReply / RegisterExecutor, so that you reject executors if they are already blacklisted?

other than minor comments

if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
s" task failures in successful task sets")
conf.get(config.BLACKLIST_ENABLED) match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlacklistTracker.isBlacklistEnabled

if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE) {
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
conf.get(config.BLACKLIST_ENABLED) match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlacklistTracker.isBlacklistEnabled

Some(new BlacklistTracker(sc, scheduler))
val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match {
case b: ExecutorAllocationClient => Some(b.asInstanceOf[ExecutorAllocationClient])
case _ => None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its best to just fail fast right here if blacklist.kill is enabled, but you don't have an ExecutorAllocationClient?

final override def killExecutorsOnHost(host: String): Unit = {
logInfo(s"Requesting to kill any and all executors on host ${host}")
killExecutors(scheduler.getExecutorsAliveOnHost(host).get.toSeq, replace = true, force = true)
driverEndpoint.send(KillExecutorsOnHost(host))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd include a comment here on why you do delegate this to driver endpoint, rather than doing it immediately here, something along the lines of:

We have to be careful that there isn't a race between killing all executors on the bad host, and another executor getting registered on the same host. We do that by doing it within the DriverEndpoint, which is guaranteed to handle one message at a time since its a ThreadSafeRPCEndpoint

* Request that the cluster manager try harder to kill the specified executors,
* and maybe replace them.
* @return whether the request is acknowledged by the cluster manager.
*/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this in the api, do you?

@jsoltren
Copy link
Owner Author

Don't you still need to add something in CoarseGrainedSchedulerBackend.receiveAndReply / RegisterExecutor, so that you reject executors if they are already blacklisted?

We had discussed this earlier. I had thought through this. I'm not sure if it makes sense since we're going through an RPC method that should atomically be killing executors on a node. As far as the RegisterExecutor response is concerned, either we'll have done the killing already, or doing so is pending.

I guess BlacklistTracker could try to update CoarseGrainedSchedulerManager ASAP, separately from the RPC mechanism. But again, I thought that going through RPC removed the need for modifying RegisterExecutor.

@squito
Copy link

squito commented Dec 14, 2016

Don't you still need to add something in CoarseGrainedSchedulerBackend.receiveAndReply / RegisterExecutor, so that you reject executors if they are already blacklisted?

We had discussed this earlier. I had thought through this. I'm not sure if it makes sense since we're going through an RPC method that should atomically be killing executors on a node. As far as the RegisterExecutor response is concerned, either we'll have done the killing already, or doing so is pending.

I guess BlacklistTracker could try to update CoarseGrainedSchedulerManager ASAP, separately from the RPC mechanism. But again, I thought that going through RPC removed the need for modifying RegisterExecutor.

I dont' think that is enough. There are multiple sources for the race. First, there are multiple threads in the driver touching shared memory. We need to make sure that there isn't a race within those threads -- one thread registers a new executor on the bad host, while at the same time another thread thinks its killing all executors on the new host, but it doesn't know about the new executor yet. By grabbing the list of executors within DriverEndpoint, we avoid that race.

But another race is that the actual creation of executors is happening in a different process, likely a different host. We don't want to make that distributed process serial. So you could have

  1. Driver requests more executors from cluster manager
  2. Driver decides host X is bad
  3. Driver kills all executors on host X
  4. Cluster manager gives driver a new executor on host X

We can't force (4) to happen before any of the other steps. That event might come in long after the driver is done killing all executors, but it still needs to reject the new executor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants