Skip to content

Conversation

@KaiXinXiaoLei
Copy link

When killing executor, driver will send RequestExecutors to AM. But in ExecutorAllocationManager, the value of numExecutorsTarget will be not changed.

@SparkQA
Copy link

SparkQA commented Sep 9, 2015

Test build #42203 has finished for PR 8668 at commit a03023d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

@KaiXinXiaoLei if you delete this then sc.killExecutor will no longer lower the target, so new executors will always replace the old ones that we just killed. This is not the behavior that we want.

@vanzin
Copy link
Contributor

vanzin commented Sep 9, 2015

Which, btw, is why the tests fail.

@KaiXinXiaoLei
Copy link
Author

@andrewor14 Now i have a problem. For example, there is three executors. After same minutes, There executors will be removed because of no recent heartbeats. Before new executor be registered, the tread spark-dynamic-executor-allocation sends killExecutor to driver because executors has been idle for 60 seconds. Then driver will send RequestExecutors to AM, and change a total number of executors requested by Driver. So the number is different with ExecutorAllocationManager.

@vanzin
Copy link
Contributor

vanzin commented Sep 10, 2015

@KaiXinXiaoLei I'm not sure I follow you. It sounds like you may be describing a race condition somewhere, but it's not clear.

Both the heartbeat receiver and the allocation manager will kill executors using the same API (killExecutors) but with different parameters. That method is synchronized so I don't see how there could be a race.

The only thing I can potentially come up with is that if the heartbeat receiver kills executors (and then asks for new ones to replace them), the idle timeout for the old executors will be lost (the new executors will start with a new idle timer). That means those "x" executors will be alive for more time than maybe would be optimal, but I don't necessarily see that as a problem.

@KaiXinXiaoLei
Copy link
Author

@vanzin If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.

@vanzin
Copy link
Contributor

vanzin commented Sep 10, 2015

@KaiXinXiaoLei I still don't understand what's the issue.

  • Executors are hung / dead / in gc hell and don't send heartbeats
  • Hearbeat receiver kills them. All traces of the dead executor (including any idle timeouts) are lost. Code asks for new executors to replace them.
  • New executors come up, register, have fresh idle timeouts.

So, aside from the new executors living for a little longer than the old ones would, what is the issue? Where is the count wrong?

@KaiXinXiaoLei
Copy link
Author

@vanzin For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1.. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.

@KaiXinXiaoLei
Copy link
Author

@andrewor14 when the numbers of executors requested is lower, driver will send message to AM to changed in ExecutorAllocationManager.
image

@SparkQA
Copy link

SparkQA commented Sep 12, 2015

Test build #42367 has finished for PR 8668 at commit 773a11e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 12, 2015

Test build #42371 has finished for PR 8668 at commit 558cd04.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@KaiXinXiaoLei
Copy link
Author

Using dynamic-executor-allocation, the number of executor needed by driver should be calculated according to the number of task. For example, During running tasks, if the number of tasks reduces, the number of executors will be changed. So there is no need to change the number of executors when killing executors.

And the tests, i think , should changed. If you believe my statement is reasonable, i will change tests. Thanks.

@vanzin
Copy link
Contributor

vanzin commented Sep 16, 2015

Ok, I think I see what the problem is. But your fix is not correct.

The problem is here:

  doRequestTotalExecutors(
    numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)

By subtracting executorsPendingToRemove.size when that list contains an executor that is pending replacement, that replacement will be lost. The fix is to keep track of how many replacement executors the code is waiting for, and account for that in the above equation, not to remove that code altogether.

So there is no need to change the number of executors when killing executors.

That's not true, in YARN, at least. See SPARK-6325. So you can't make your current change unless you also change how the YARN backend does accounting for the running executors.

@vanzin
Copy link
Contributor

vanzin commented Sep 16, 2015

To clarify: the YARN backend also needs to know whether the executor being killed needs to be replaced or not. Right now, when the executor is not to be replaced, that's communicated to the YARN backend using two RPCs: one to kill the executor, one to update the number of requested executors.

So for your current patch to work on YARN, you'd have to propagate that information (whether the executor needs to be replaced) in the KillExecutors message sent to the YARN backend, and make the AM updating its bookkeeping accordingly.

@KaiXinXiaoLei KaiXinXiaoLei changed the title [SPARK-10515] When killing executor, there is no need to seed RequestExecutors to AM [SPARK-10515] When killing executor, the pending replacement executors should not be lost Sep 17, 2015
@SparkQA
Copy link

SparkQA commented Sep 17, 2015

Test build #42591 has finished for PR 8668 at commit cf56d21.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@KaiXinXiaoLei
Copy link
Author

@vanzin I try this way, thanks.

@SparkQA
Copy link

SparkQA commented Sep 17, 2015

Test build #42601 has finished for PR 8668 at commit 2722425.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@KaiXinXiaoLei
Copy link
Author

@vanzin I think in this way to resolve problem is not better. in spark-dynamic-executor-allocation, the total of executors should be consistent with in AM. So i think change in spark-dynamic-executor-allocation, eg

   private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
330 +  val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
331    // Do not request more executors if it would put our target over the upper bound
  -  if (numExecutorsTarget >= maxNumExecutors) {
332 +  if (numExistingExecutors >= maxNumExecutors) { 
333      logDebug(s"Not adding executors because our current target total " +
334          s"is already $numExecutorsTarget (limit $maxNumExecutors)")
335       numExecutorsToAdd = 1
336       return 0
337    }
338
338 -  val oldNumExecutorsTarget = numExecutorsTarget

349 -  val delta = numExecutorsTarget - oldNumExecutorsTarget
349 +  val delta = numExecutorsTarget - numExistingExecutors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: if (executorsToReplace.remove(executorId)) { ... }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code is not need, thanks.

@vanzin
Copy link
Contributor

vanzin commented Sep 21, 2015

There's one possible race in this current code; let's say executor X fails a heartbeat and the driver kills it expecting a replacement. Slightly later, the allocation manager triggers an idle timeout for X, and tries to kill it again, with no replacement. The code should remove that executor from the list of executors expecting a replacement, and inform the backend that its accounting should be updated.

Also, let me go through your last comment and try to understand it.

@vanzin
Copy link
Contributor

vanzin commented Sep 21, 2015

@KaiXinXiaoLei I think it understood what the change you propose in your last comment would do. I think it would work, except for a very minor issue where it might cause unnecessary messages to be sent to allocator when the number of executors hasn't changed.

e.g.

  • at t=1, there are x executors running
  • at t=2, addExecutors is called and x + 1 executors are requested
  • at t=3, no executor has been added yet; addExecutors is called with x + 1 again, and instead of not sending the message to the allocator, a message is sent.

That is fine since these messages are not sent that often (at worst, once a second), and the allocator should handle this situation fine.

It would be nice to confirm that theory with tests, though; are the current unit tests enough to make sure that change would do the right thing?

@KaiXinXiaoLei
Copy link
Author

@vanzin Yes, addExecutors will often send message to AM, but the total number of executors in spark-dynamic-executor-allocation will be the same with AM. I think, the total number of executors is get in spark-dynamic-executor-allocation, and send this value to AM, instead of calculated in CoarseGrainedSchedulerBackend.

@SparkQA
Copy link

SparkQA commented Sep 22, 2015

Test build #42839 has finished for PR 8668 at commit 7e0c199.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "have not been replaced"

@vanzin
Copy link
Contributor

vanzin commented Sep 22, 2015

The change LGTM; I'd be more comfortable if there was a unit test for this code, but I tried to craft one and CoarseGrainedSchedulerBackend is not very unit-testable at all. Perhaps that's something to look at, given how hard it is to grok this code.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42882 has finished for PR 8668 at commit 71a59a3.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42883 has finished for PR 8668 at commit 0041fde.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42881 has finished for PR 8668 at commit d738641.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@KaiXinXiaoLei
Copy link
Author

@vanzin I add a unit test for this problem. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... this doesn't look right. It should probably be using a new version of killNExecutors that calls killAndReplaceExecutor and syncs things.

@andrewor14 might be a better person to comment on this, since he wrote the original tests. I'm not sure about how much we can trust the counts to update atomically when killNExecutors and friends are called, but the other tests seem to be passing reliably...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this might be flaky.

@vanzin
Copy link
Contributor

vanzin commented Sep 25, 2015

FYI: #8914 makes some changes to these tests to avoid the races I alluded to in my last comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update this to reflect the changes made in #8914

@andrewor14
Copy link
Contributor

@KaiXinXiaoLei The problem makes sense now and I think this fix is correct, though I suggested a way to make it simpler. Once you address the comments would you mind also updating the JIRA? Right now the description is empty and there are no details on the bug: https://issues.apache.org/jira/browse/SPARK-10515

@KaiXinXiaoLei
Copy link
Author

@andrewor14 I change code according to your suggest. see: #8945

@andrewor14
Copy link
Contributor

@KaiXinXiaoLei in the future please just have one PR open for the same issue. You can do this by pushing to the same branch instead of creating a whole new one (i.e. deleteRequest). It's confusing to reviewers to have many very similar patches open. Please close this PR if your other patch supersedes this.

@srowen
Copy link
Member

srowen commented Oct 1, 2015

@KaiXinXiaoLei do you mind closing this PR?

@KaiXinXiaoLei
Copy link
Author

see #8945

asfgit pushed a commit that referenced this pull request Oct 15, 2015
…s should not be lost

If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.

see: #8668

Author: KaiXinXiaoLei <[email protected]>
Author: huleilei <[email protected]>

Closes #8945 from KaiXinXiaoLei/pendingexecutor.
asfgit pushed a commit that referenced this pull request Oct 15, 2015
…s should not be lost

If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.

see: #8668

Author: KaiXinXiaoLei <[email protected]>
Author: huleilei <[email protected]>

Closes #8945 from KaiXinXiaoLei/pendingexecutor.
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Oct 16, 2015
…s should not be lost

If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.

see: apache#8668

Author: KaiXinXiaoLei <[email protected]>
Author: huleilei <[email protected]>

Closes apache#8945 from KaiXinXiaoLei/pendingexecutor.
kiszk pushed a commit to kiszk/spark-gpu that referenced this pull request Dec 26, 2015
…s should not be lost

If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.

see: apache/spark#8668

Author: KaiXinXiaoLei <[email protected]>
Author: huleilei <[email protected]>

Closes #8945 from KaiXinXiaoLei/pendingexecutor.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants