-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-22148][SPARK-15815][Scheduler] Acquire new executors to avoid hang because of blacklisting #22288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #95485 has finished for PR 22288 at commit
|
|
@squito @tgravescs Can you review this PR? Thanks. |
| hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ | ||
| executor => | ||
| logDebug("Killing executor because of task unschedulability: " + executor) | ||
| blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seriously? You killed all executors ? What if other taskSets' tasks are running on them ?
BTW, if you want to refresh executors, you have to enable spark.blacklist.killBlacklistedExecutors also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- To refresh executors, you need to enable
spark.blacklist.killBlacklistedExecutors. - I was thinking about it, killing all the executors is a little too harsh. Killing only a single executor would help mitigate this, although this would also lead to failing the running tasks on that executor.
| }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS) | ||
| } | ||
| } else { | ||
| // TODO: try acquiring new executors for static allocation before aborting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How ? Waiting for other tasks finish and release resources ?
|
Test build #95729 has finished for PR 22288 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking at this @dhruve, sorry for the delay. Just some first thoughts from me, I need to go read my thoughts on the related jiras a bit.
I think you can add tests for this in BlacklistIntegrationSuite, but you'd need to extend it to allow for executors to get added and removed.
| // unable to schedule any task from the taskSet. | ||
| // Note: We keep a track of schedulability on a per taskSet basis rather than on a | ||
| // per task basis. | ||
| val executor = hostToExecutors.valuesIterator.next().iterator.next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hostToExecutors.head._2.head
just thinking "aloud" -- I guess taking an arbitrary executor here is OK, as we know there is some task that can't run on any executor. But I wonder if we could have some priority here -- eg. I'd much rather kill an executor which has been blacklisted for an entire stage or the whole app, rather than one that was blacklisted for just some task. Need to look into if there is an efficient way to keep that priority list, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a nice suggestion.
There was a case where you could have a few executors running, let's say just 3 of them and all are blacklisted but have some tasks running on them. To satisfy this, I had started modifying this to take down an executor with the least no. of tasks running on them. I'll check some more on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering is it worth to kill someone executor which has some tasks running on it ? After all, a task blaklisted on all executors(currently allocated) can not be guaranteed to run on a new allocated executor.
| // Note: We keep a track of schedulability on a per taskSet basis rather than on a | ||
| // per task basis. | ||
| val executor = hostToExecutors.valuesIterator.next().iterator.next() | ||
| logDebug("Killing executor because of task unschedulability: " + executor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should probably be logInfo (unless there is something else similar at INFO level elsewhere)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noted.
| blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) | ||
|
|
||
| if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { | ||
| unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd include a logInfo here that spark can't schedule anything because of blacklisting, but its going to try to kill blacklisted executors and acquire new ones. Also mention how long it will wait before giving up and the associated conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noted.
| } else { | ||
| // If a task was scheduled, we clear the expiry time for the taskSet. The abort timer | ||
| // checks this entry to decide if we want to abort the taskSet. | ||
| if (unschedulableTaskSetToExpiryTime.contains(taskSet)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can move this up to the else so its an else if. Or you could also just call remove without checking contains, that avoids probing twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just calling the remove sounds like a good idea.
| * will hang. | ||
| * spark.task.maxFailures. We need to detect this so we can avoid the job from being hung. | ||
| * If dynamic allocation is enabled we try to acquire new executor/s by killing the existing one. | ||
| * In case of static allocation we abort the taskSet immediately to fail the job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you want something different with static allocation? If you kill an executor, static allocation will also request a replacement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The change of removing a single executor takes care of static allocation as well. I will update the comments.
|
Ok I looked at jiras, and this looks it also covers SPARK-15815, right? you could add that to the summary too. You mention some future improvements:
I mentioned this on an inline comment too, but now that I'm thinking about, it seems like this will be fine with static allocation as well. It just seems like the problem is the worst in DA, as you can end up with one executor left for the straggler task, and then that executor gets blacklisted. But, with static allocation, maybe you only requested a small number of executors on a large cluster, and by chance you get them all on a host with bad disks, so then everything starts failing. You could still just kill those executors and request new ones to keep things going. Anything I'm missing?
what's the concern here -- that if you're on a small cluster, there is very little chance of getting a good replacement so you should go back to failing fast? I guess that would be nice, but much less important in my opinion.
I don't understand this part -- do you mean for locality preferences? |
dhruve
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It covers SPARK-15815 as well - but you provided a fix for the hang by aborting immediately.
1 - This satisfies the condition with Static Allocation as well. I will remove the comment from the code.
2 - Failing fast is the intent here. Its a good to have so kept it as a todo if we really want it.
3 - If it takes more time to acquire a new executor after killing a blacklisted one and the abort timer is up, we end up aborting the TaskSet. This was to see if we want to account for the time elapsed which doesn't include the time it took to obtain a new executor. Or we could just set the abortTimer expiration interval to a higher default value which should cover most of the cases.
|
As I mentioned at #22288 (comment), I'm quite worry about this killing behaviour. I thik we should kill a executor iff it is idle. By looking through dissuction above, give my thoughts below:
Maybe, we can add
Set |
yeah I'm not sure you can do much better. What if it takes forever to get a new executor? There's no guarantee you will get anything else. I don't see much value in adding another timer for that case, but happy to hear about an alternative. |
yes, you have a good point. So the two extremes we need to consider are:
In (2), we should think about other jobs running concurrently. (You could have concurrent jobs with (1), but if there is another job you probably have other executors up, so its less likely.) It would be bad for us to kill things in scenario (2), where one bad taskset leads to us killing executors for other jobs. But if we wait indefinitely for an idle executor to kill, then that taskset may wait indefinitely, which is also bad.
maybe this would help, lemme think about it ... I'd rather avoid adding this to the Listener api just for this as it should be an entirely internal detail, but maybe that is all we can do. I guess this would let you bump up the request, as long as you're lower than the max executors, so it would solve the case when there is only one executor. But in case 2, you'd probably end up requesting a whole bunch more executors very briefly, until there are enough failures on one specific task. or maybe we can ensure that even if there are a huge number of unschedulable tasks, we only ever request one extra executor?
sorry I don't think I understand this part. Is this the same as the current pr, but just killing only if idle? |
|
cc @jiangxb1987 @attilapiros also for thoughts |
|
(I'm on a outside trip these days, so I have to use my mobile phone to type these words. Sorry for the format.)
Yes, simillar. This avoids a TaskSet to wait to be scheduled indefinitely. So, in case 2, if we do not find a idle executor before timeout, the TaskSet would abort, rather than hang.
I'm not sure I have understand this part totally. But I realized a fact that, by now, our DA' strategy is basically based on tasks' status, e.g. pending, specatulative. However, a executor whether to be blacklisted depends on a success TaskSet' status (IIRC). So this fact may introduce level mismatch when we want to introduce DA in TaskScheduleImpl. (hope I understood your main thought) |
|
Test build #96443 has finished for PR 22288 at commit
|
|
Test build #96432 has finished for PR 22288 at commit
|
|
retest this please |
|
the failures seem to be unrelated. I wasn't able to reproduce them. |
|
Test build #96767 has finished for PR 22288 at commit
|
| // blacklisting. | ||
| private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT = | ||
| ConfigBuilder("spark.scheduler.unschedulableTaskSetTimeout") | ||
| .doc("The timeout in seconds to wait before aborting a TaskSet to acquire a new executor " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reword to be timeout in seconds to wait to try to acquire a new executor and schedule a task before aborting....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add blacklist to the name of the config, since it really only applies to blacklisted executors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also need to document in the .md file
| protected val executorIdToHost = new HashMap[String, String] | ||
|
|
||
| private val abortTimer = new Timer(true) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unneeded newline
| private val abortTimer = new Timer(true) | ||
|
|
||
| private val clock = new SystemClock | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove newline
|
|
||
| // If the taskSet is unschedulable we try to find an existing idle blacklisted | ||
| // executor. If we cannot find one, we abort immediately. Else we kill the idle | ||
| // executor and kick off an abortTimer which after waiting will abort the taskSet if |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which if it doesn't schedule a task within the timeout will abort the taskset
| // executor. If we cannot find one, we abort immediately. Else we kill the idle | ||
| // executor and kick off an abortTimer which after waiting will abort the taskSet if | ||
| // we were unable to schedule any task from the taskSet. | ||
| // Note 1: We keep a track of schedulability on a per taskSet basis rather than on a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we keep track
| } | ||
| case _ => // Abort Immediately | ||
| logInfo("Cannot schedule any task because of complete blacklisting. No idle" + | ||
| s" executors could be found. Aborting $taskSet." ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be found to kill
| s" executors could be found. Aborting $taskSet." ) | ||
| taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) | ||
| } | ||
| case _ => // Do nothing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps expand to say do nothing if no tasks completely blacklisted. It looks like the indentation is off here too but it might just be because of the diff and comments
| } else { | ||
| // If a task was scheduled, we clear the expiry time for the taskSet. The abort timer | ||
| // checks this entry to decide if we want to abort the taskSet. | ||
| if (unschedulableTaskSetToExpiryTime.contains(taskSet)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just calling the remove sounds like a good idea.
| ) | ||
| // Wait for the failed task to propagate. | ||
| Thread.sleep(500) | ||
| // taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented out code
| } | ||
|
|
||
| test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra line
|
Test build #97165 has finished for PR 22288 at commit
|
|
It applies to both DA and SA. I have updated the description. |
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had another thought I was looking through this again -- do we have any test cases for if you've got some task locality, and you blacklist the executors with preferred locality, but other executors are available and you just haven't crossed the locality delay time yet? I think everything will be OK, but would be nice to have a test case for it.
| case Some(taskIndex) => // Returns the taskIndex which was unschedulable | ||
|
|
||
| // If the taskSet is unschedulable we try to find an existing idle blacklisted | ||
| // executor. If we cannot find one, we abort immediately. Else we kill the idle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is true -- if there is no idle executor here, you abort the taskset immediately, you're not starting any timer, from this case lower down: case _ => // Abort Immediately.
I think to do what you described, you would instead need to do something different in that case, like start the same abortTimer, and also set a flag needToKillIdleExecutor and then on every call to resourceOffer, check that flag and potentially find an executor to kill. (However I haven't totally thought through that, not sure if it would really work. again, I'm not saying this has to be addressed now, just thinking this through)
|
Test build #97879 has finished for PR 22288 at commit
|
|
@squito for the locality wait, it would be the same as the condition where it is not completely blacklisted. I have added a test for this. If we want to ensure the sequence for the timeout expiring and the task being scheduled, we will have to add some more delay. Let me know if we want to do it, or the test seems to suffice. |
|
Test build #97943 has finished for PR 22288 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly minor stuff, but I did have one concern about jobs waiting indefinitely. (Perhaps I don't understand things properly yet)
| s" executors can be found to kill. Aborting $taskSet." ) | ||
| taskSet.abortSinceCompletelyBlacklisted(taskIndex) | ||
| } | ||
| case _ => // Do nothing if no tasks completely blacklisted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove this case if instead above you do
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have seen this style earlier in the code base. Is this a norm (just curious)? I read a few scenarios where this would be better. However, personally every time I read a foreach, its instinctive to think the entity on which its being invoked as an iterable rather than an option, so it feels a bit odd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't matter a ton, I think its just a scala-ism it takes a while to get used to. my rough guidline is: use pattern-matching if you're doing something distinct in both the Some and None cases, or if you can make use of more complex patterns to avoid more nesting (eg. case Some(x) if x.isFoo() =>). If you're only doing something in the Some branch, then generally prefer map, foreach, filter, etc.
My reason for wanting it here is that when I look at this code, I needed to scroll back to figure out what you were even matching on here and make sure you weren't ignoring something important. When I see the match up above, I assume something is going to happen in both branches. OTOH if there was a foreach, when I see the foreach I know right away you're ignoring None.
again this is really minor, I don't actually care that much, just explaining my thinking.
| abortTimer.schedule( | ||
| createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) | ||
| } | ||
| case _ => // Abort Immediately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really minor, I think its a bit more clear if you say case None here (otherwise I take a just a second to figure out what other patterns will fall under this catch-all)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Will update it.
| // We want to defer killing any taskSets as long as we have a non blacklisted executor | ||
| // which can be used to schedule a task from any active taskSets. This ensures that the | ||
| // job can make progress and if we encounter a flawed taskSet it will eventually either | ||
| // fail or abort due to being completely blacklisted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should say here that you may have a job wait indefinitely, if its effectively blacklisted the entire cluster, but other jobs keep coming in and keeping resources occupied so the cluster stays busy. So its not really accurate to say that it will be aborted eventually, we are actually not guaranteeing that (if I understood things correctly).
Since its folded now lemme reference the prior discussion on this: #22288 (comment)
Want to make sure I understand this part, and why you aren't only clearing the timer for the taskset you just scheduled a task for. If you have multiple tasksets running simultaneously, one is making progress but the other is totally blacklisted, I guess you do not want to kill anything, because that would mess with the taskset that is working correctly? Instead you'll just let the taskset which is totally blacklisted eventually fail from the timeout? I guess that makes sense, because if one taskset is progressing, it means the failing taskset probably is probably flawed, not the executors.
If that's right, would be good to include something along those lines in the comment (personally I don't find a comment about how its related to the timer that useful, that's obvious from the code).
dhruve 7 days ago Contributor
That is correct. It also covers other scenario that @tgravescs originally pointed out.Lets say if you have multiple taskSets running which are completely blacklisted. If you were able to get an executor, you would just clear the timer for that specific taskSet. Now due to resource constraint, if you weren't able to obtain another executor within the timeout for the other taskSet, you would abort the other taskSet when you could actually wait for it to be scheduled on the newly obtained executor.
So clearing the timer for all the taskSets ensures that currently we aren't in a completely blacklisted state and should try to run to completion. However if the taskset itself is flawed, we would eventually fail. This could result in wasted effort, but we don't have a way to determine that yet, so this should be okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your understanding is correct. I will update the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgravescs since we've been back and forth on the discussion of the cases here, just want to make sure you're aware of the possibility for waiting indefinitely here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out, but if I'm reading the discussion properly, I don't think you will actually wait indefinitely. Eventually you will either abort immediately or you should fail due to max number of task failures. Let me know if I'm missing something from the scenario.
Lets say you have taskset1 that is blacklisted on all nodes (lets say we have 3). 3 cases can happen at this point:
- taskset 2 hasn't started, so it tries to kill an executor and starts the timer.
- taskset 2 has started, if its running on all nodes then we abort immediately because no executors to kill to kill
- taskset 2 has started but its not running on all blacklisted nodes, then we will kill an executor
At this point lets say we didn't abort so we killed an executor. Taskset 1 will get a chance to run on the new executor and either work or have a task failure. If it has a task failure and it gets blacklisted, we go back into the case above. But the # of task failures gets one closer.
so it seems like eventually you would either abort immediately if there aren't any executors to kill or you would eventually fail with max number of task attempts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the scenario I'm worried about:
- taskset1 and taskset2 are both running currently. taskset1 has enough failures to get blacklisted everywhere.
- there is an idle executor, even though taskset2 is running (eg. the executor that is available doesn't meet the locality preferences of taskset2). So abort timer is started.
- the idle executor is killed, and you get a new one.
- just by luck, taskset2 gets a hold of the new idle executor (eg. the executor is on a node blacklisted by taskset1, or taskset2 just has a higher priority). abort timer is cleared
- taskset2 finishes, but meanwhile taskset3 has been launched, and it uses the idle executor. etc. for taskSetN, so you keep launching tasks, abort timer gets cleared, but nothing even gets scheduled on taskset1.
admittedly this would not be the normal scenario -- you'll need more tasksets to keep coming, and you need tight enough resource constraints that taskset1 never get a hold of anything, even the new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, yeah it seems like it would have to be very timing dependent that taskset1 never got a chance for that executor, really that would just be a normal indefinite postponement problem in the scheduler regardless of blacklisting. I don't think with fifo its a problem as first taskset should always be first. With Fair scheduler perhaps it could but probably depends on much more specific scenario.
I guess I'm ok with this if you are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so its a bit worse than regular starvation from having competing tasksets, as in this case you might actually have resources available on your cluster, but you never ask for them, because the executor allocation manager thinks you have enough based on the number of pending tasks.
In any case, I agree this is a stretch, and overall its an improvement, so I'm OK with it.
| WorkerOffer("executor0", "host0", 1) | ||
| )).flatten.size === 0) | ||
| // Wait for the abort timer to kick in. Without sleep the test exits before the timer is | ||
| // triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment is out of date, there is no sleep anymore. But it is still worth explaining that even though there is no configured delay, we still have to wait for the abort timer to run in a separate thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment still out of date
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
Show resolved
Hide resolved
| assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) | ||
|
|
||
| val tsm2 = stageToMockTaskSetManager(1) | ||
| val failedTask2 = secondTaskAttempts.find(_.executorId == "executor0").get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, you've only got one taskattempt here, you could just do
val failedTask2 = secondTaskAttempts.head| )).flatten | ||
|
|
||
| // Fail the running task | ||
| val failedTask = taskAttempts.find(_.executorId == "executor0").get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
|
Test build #98107 has finished for PR 22288 at commit
|
|
@dhruve is this ready to review again? |
|
@tgravescs I have fixed a nit and its good to be reviewed. @squito I have updated the comment, let me know if its okay. Thanks for the reviews. |
|
You mentioned in the description that you did some manual testing -- since this has been through some changes since the initial versions, can you do that again? please be sure to run some manual tests with (a) flawed jobs on a small cluster, so it really should abort and (b) OK jobs but with a failed straggler when only one executor is still active, which should kill the executor and get a new one. If you've already done that on relatively recent revision, that's fine too. |
|
Test build #98325 has finished for PR 22288 at commit
|
|
@squito I have tested it again with both scenarios and I was able to verify the expected behavior. For the cases that are not covered in the PR, i will mention them in the jira. |
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm other than a minor comment on a test
| // make an offer but we won't schedule anything yet as scheduler locality is still PROCESS_LOCAL | ||
| assert(taskScheduler.resourceOffers(IndexedSeq( | ||
| WorkerOffer("executor1", "host0", 1) | ||
| )).flatten.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is dependent on the system clock not advancing past the locality timeout. I've seen pauses on jenkins over 5 seconds in flaky tests -- either put in a manual clock or just increase the locality timeout in this test to avoid flakiness here
|
Test build #98481 has finished for PR 22288 at commit
|
|
+1 |
…hang because of blacklisting
## What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors so that we can complete the job successfully. We try to acquire a new executor only when we can kill an existing idle executor. We fallback to the older implementation where we abort the job if we cannot find an idle executor.
## How was this patch tested?
I performed some manual tests to check and validate the behavior.
```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)
import org.apache.spark.TaskContext
val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail for blacklisting")}; iterator.toList.map (x => x + " -> " + index).iterator } )
mapped.collect
```
Closes #22288 from dhruve/bug/SPARK-22148.
Lead-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Tom Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit fdd3bac)
Signed-off-by: Thomas Graves <[email protected]>
|
merged to master and 2.4 branch, thanks @dhruve |
|
Thanks for the reviews and feedback @tgravescs , @squito ! |
…hang because of blacklisting
## What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors so that we can complete the job successfully. We try to acquire a new executor only when we can kill an existing idle executor. We fallback to the older implementation where we abort the job if we cannot find an idle executor.
## How was this patch tested?
I performed some manual tests to check and validate the behavior.
```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)
import org.apache.spark.TaskContext
val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail for blacklisting")}; iterator.toList.map (x => x + " -> " + index).iterator } )
mapped.collect
```
Closes apache#22288 from dhruve/bug/SPARK-22148.
Lead-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Tom Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
…hang because of blacklisting
## What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors so that we can complete the job successfully. We try to acquire a new executor only when we can kill an existing idle executor. We fallback to the older implementation where we abort the job if we cannot find an idle executor.
## How was this patch tested?
I performed some manual tests to check and validate the behavior.
```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)
import org.apache.spark.TaskContext
val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail for blacklisting")}; iterator.toList.map (x => x + " -> " + index).iterator } )
mapped.collect
```
Closes apache#22288 from dhruve/bug/SPARK-22148.
Lead-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Tom Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit fdd3bac)
Signed-off-by: Thomas Graves <[email protected]>
…hang because of blacklisting
## What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors so that we can complete the job successfully. We try to acquire a new executor only when we can kill an existing idle executor. We fallback to the older implementation where we abort the job if we cannot find an idle executor.
## How was this patch tested?
I performed some manual tests to check and validate the behavior.
```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)
import org.apache.spark.TaskContext
val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail for blacklisting")}; iterator.toList.map (x => x + " -> " + index).iterator } )
mapped.collect
```
Closes apache#22288 from dhruve/bug/SPARK-22148.
Lead-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Tom Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit fdd3bac)
Signed-off-by: Thomas Graves <[email protected]>
…hang because of blacklisting
Ref: LIHADOOP-51946
Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors so that we can complete the job successfully. We try to acquire a new executor only when we can kill an existing idle executor. We fallback to the older implementation where we abort the job if we cannot find an idle executor.
I performed some manual tests to check and validate the behavior.
```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)
import org.apache.spark.TaskContext
val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail for blacklisting")}; iterator.toList.map (x => x + " -> " + index).iterator } )
mapped.collect
```
Closes apache#22288 from dhruve/bug/SPARK-22148.
Lead-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Dhruve Ashar <[email protected]>
Co-authored-by: Tom Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit fdd3bac)
Signed-off-by: Thomas Graves <[email protected]>
RB=2001207
A=
What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors so that we can complete the job successfully. We try to acquire a new executor only when we can kill an existing idle executor. We fallback to the older implementation where we abort the job if we cannot find an idle executor.
How was this patch tested?
I performed some manual tests to check and validate the behavior.