Skip to content

Conversation

@venkata91
Copy link
Contributor

@venkata91 venkata91 commented Apr 22, 2020

What changes were proposed in this pull request?

In this change, when dynamic allocation is enabled instead of aborting immediately when there is an unschedulable taskset due to blacklisting, pass an event saying SparkListenerUnschedulableTaskSetAdded which will be handled by ExecutorAllocationManager and request more executors needed to schedule the unschedulable blacklisted tasks. Once the event is sent, we start the abortTimer similar to [SPARK-22148][SPARK-15815] to abort in the case when no new executors launched either due to max executors reached or cluster manager is out of capacity.

Why are the changes needed?

This is an improvement. In the case when dynamic allocation is enabled, this would request more executors to schedule the unschedulable tasks instead of aborting the stage without even retrying upto spark.task.maxFailures times (in some cases not retrying at all). This is a potential issue with respect to Spark's Fault tolerance.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests both in ExecutorAllocationManagerSuite and TaskSchedulerImplSuite

@venkata91
Copy link
Contributor Author

Can you please review @squito @tgravescs @mridulm ?

@tgravescs
Copy link
Contributor

so took a quick look and I don't follow. You send a message that says all blacklisted, but all you do is call the onSchedulerBacklogged, how does that add another executor if they are all blacklisted? that will update when it needs to add more, but it doesn't change the number if calculated. So for instance say I needed 3 executors. All of them blacklisted, updating the time to get a new one won't change that it thinks it needs 3.
If we change that to tell it to get one more, then my question is how do you do the proper accounting on that. This is why in the jira and other PR we said the allocation manager needs to be closely tied with the blacklist manager.

@venkata91
Copy link
Contributor Author

so took a quick look and I don't follow. You send a message that says all blacklisted, but all you do is call the onSchedulerBacklogged, how does that add another executor if they are all blacklisted? that will update when it needs to add more, but it doesn't change the number if calculated. So for instance say I needed 3 executors. All of them blacklisted, updating the time to get a new one won't change that it thinks it needs 3.
If we change that to tell it to get one more, then my question is how do you do the proper accounting on that. This is why in the jira and other PR we said the allocation manager needs to be closely tied with the blacklist manager.

Ok. Got it, makes sense now. Let me think more about it.

@venkata91
Copy link
Contributor Author

@tgravescs #22288 I went through the discussions as part of this PR. Is there any other PR you're referring to with discussions around Dynamic allocation and BlacklistManager? Also can you please explain a bit more on the proper accounting? Lets say we're requesting one executor (its possible, we're doing multiple rounds of requesting one executor) and also ensuring its with in the bound, won't dynamic allocation eventually get the num executors to consistent state with idle executors getting removed periodically?

@tgravescs
Copy link
Contributor

it might have been discussions no on PR or buried in comments, I don't have time to go looking. there are many different conditions to consider. The main one we were focusing on is that you had as many executors as you needed to execute the task you had left. This means the allocation manager was not going to ask for more. The problem is that some or all of those executor can get blacklisted. The only way for the dynamic allocation manager to know it needs to ask for more is for it to know that nodes are blacklisted and it needs to ask for some more executors - thus internally incrementing is count of executors needed and asking yarn or other resource manager for more. so now the number of executors the allocation manager is different then what its normal calculations would figure out. simplified: #executors = (#tasks * #cpus per task/#cores per executor). So you have to change the number of executors and you have to keep taking that it account because the allocation manager is always trying to calculate if it needs more or less executors. You also have to notify it when executors become unblacklisted, or perhaps the ones that were blacklisted idle timeout, etc. The allocation manager has to know a lot more details about the blacklisting and take that into account when its calculating the number of executors it needs.

@venkata91 venkata91 force-pushed the SPARK-31418 branch 3 times, most recently from 7e12d91 to bf0cf52 Compare June 20, 2020 17:38
@venkata91
Copy link
Contributor Author

@tgravescs After thinking about the problem and also after discussing with @mridulm, I have handled this problem now by just keeping track of unschedulable task sets in order to add more executors when dynamic allocation is enabled. Now once some task becomes schedulable, we'll clear this set since some executor got free or we have just acquired a new executor and found a way to make progress. Let me know what do you think about this change. Thanks for taking a look previously and giving the overall context

@venkata91
Copy link
Contributor Author

Can someone help me with why this Generate documents check is failing? Not sure I'm understanding the issue here. Any pointers would be appreciated.

@tgravescs
Copy link
Contributor

sorry haven't had a chance to look at your rework, I rekicked the checks as it might have been transient issue.

@venkata91
Copy link
Contributor Author

No worries @tgravescs Thanks for taking a look. For some reason, these checks keeps failing but it doesn't look to be related to my changes. some cache issue probably?

@tgravescs
Copy link
Contributor

yes its possible, I saw some issues on a few other pro although it was with different parts. It maybe early next week before I can review.

@venkata91
Copy link
Contributor Author

yes its possible, I saw some issues on a few other pro although it was with different parts. It maybe early next week before I can review.

Thanks that should be fine.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a high level seems like an ok approach to request at least some executors, even if it won't fit all of them, it makes it so you can make progress. Its unfortunate to add yet more tracking of the same thing in multiple places. I wish the allocation manager would move into scheduler, but that is much bigger change.

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125613 has finished for PR 28287 at commit 0784dc3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
  • case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int)
  • case class SparkListenerUnschedulableTaskSetAdded(
  • case class SparkListenerUnschedulableTaskSetRemoved(

@venkata91
Copy link
Contributor Author

Test build #125613 has finished for PR 28287 at commit 0784dc3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
  • case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int)
  • case class SparkListenerUnschedulableTaskSetAdded(
  • case class SparkListenerUnschedulableTaskSetRemoved(

It seems like the failed tests are unrelated seems like transient issues. @tgravescs Should we rerun it again?

@tgravescs
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jul 11, 2020

Test build #125637 has finished for PR 28287 at commit 0784dc3.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
  • case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int)
  • case class SparkListenerUnschedulableTaskSetAdded(
  • case class SparkListenerUnschedulableTaskSetRemoved(

@SparkQA
Copy link

SparkQA commented Jul 12, 2020

Test build #125723 has finished for PR 28287 at commit b4c27fb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
  • case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int)
  • case class SparkListenerUnschedulableTaskSetAdded(
  • case class SparkListenerUnschedulableTaskSetRemoved(

@venkata91
Copy link
Contributor Author

@tgravescs Can you please take a look again? Now that the tests are passing. Thanks

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple minor things otherwise looks good

@venkata91
Copy link
Contributor Author

couple minor things otherwise looks good

Awesome. Thanks. Addressed the review comments. Hopefully the tests should pass fine the first time itself.

venkata91 added 11 commits July 21, 2020 15:14
…location is enabled and a task becomes unschedulable due to spark's blacklisting feature.

In this change, in the case of dynamic allocation is enabled instead of aborting an unschedulable blacklisted task blacklist immediately using the SparkListener pass an event saying UnschedulableBlacklistTaskSubmitted which will be handled by ExecutorAllocationManager and request more executors to schedule the unschedulable blacklisted task. Once the event is sent, we start the abortTimer similar to [SPARK-22148][SPARK-15815]

Currently manually tested it in our clusters. Also trying to figure out how to add unit tests.
@venkata91
Copy link
Contributor Author

@tgravescs it seems like couple of tests are failing. Can you please kick this off again?

@SparkQA
Copy link

SparkQA commented Jul 22, 2020

Test build #126282 has finished for PR 28287 at commit 2f019d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2020

Test build #126283 has finished for PR 28287 at commit d9f473d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

@venkata91 I assume your updates keep doing rebase? If possible its great if you can just do up merges because doing rebases makes seeing the diffs between requested changes much harder.

@venkata91
Copy link
Contributor Author

@venkata91 I assume your updates keep doing rebase? If possible its great if you can just do up merges because doing rebases makes seeing the diffs between requested changes much harder.

Sure, makes sense. Will keep that in mind for the future.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending Jenkins, looks good.

@SparkQA
Copy link

SparkQA commented Jul 22, 2020

Test build #126352 has finished for PR 28287 at commit d6f1e73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

merged to master, thanks @venkata91

@asfgit asfgit closed this in e7fb67c Jul 23, 2020
@venkata91
Copy link
Contributor Author

merged to master, thanks @venkata91

Thanks @tgravescs for patiently doing multiple rounds of reviews. :)

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…location is enabled and a task becomes unschedulable due to spark's blacklisting feature

Ref: LIHADOOP:52649

In this change, when dynamic allocation is enabled instead of aborting immediately when there is an unschedulable taskset due to blacklisting, pass an event saying `SparkListenerUnschedulableTaskSetAdded` which will be handled by `ExecutorAllocationManager` and request more executors needed to schedule the unschedulable blacklisted tasks. Once the event is sent, we start the abortTimer similar to [SPARK-22148][SPARK-15815] to abort in the case when no new executors launched either due to max executors reached or cluster manager is out of capacity.

This is an improvement. In the case when dynamic allocation is enabled, this would request more executors to schedule the unschedulable tasks instead of aborting the stage without even retrying upto spark.task.maxFailures times (in some cases not retrying at all). This is a potential issue with respect to Spark's Fault tolerance.

Added unit tests both in ExecutorAllocationManagerSuite and TaskSchedulerImplSuite

Closes apache#28287 from venkata91/SPARK-31418.

Authored-by: Venkata krishnan Sowrirajan <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>

RB=2048861
BUG=LIHADOOP-52649
G=spark-reviewers
R=ekrogen,mshen
A=ekrogen
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants