-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-7308] prevent concurrent attempts for one stage #5964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…rtial fix, still have some concurrent attempts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignored b/c it spawns 10 executors, takes about 2 mins on my laptop, and makes everything pretty sluggish -- I didn't want to swamp jenkins. I tried a variety of permutations and this consistently demonstrated the problem for me, but maybe we can pare this down some. (Or maybe we need another home for tests like this?)
|
Merged build triggered. |
|
Merged build started. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really, stage 3 should have 0 failures as well, I still need to solve that.
|
Test build #32073 has started for PR 5964 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This addresses issue (1)-- if we get a fetch failure, but we've already failed the attempt of the stage that caused the fetch failure, then do not resubmit the stage again. (Lots of other small changes to add stageAttemptId to the task)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block is a little awkward. Being a little more explicit would be good here.
Something like this:
//it is possible failure has already been handled by the scheduler.
val failureRequiresHandling = runningStages.contains(failedStage);
if (failureRequireHandling) {
val stageHasFailed = failedStage.attemptId - 1 > task.stageAttemptId;
if (stageHasFailed) {
...
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused about how this works. Doesn't the stage still get added to failed stages on line 1149, so it will still be resubmitted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, good point. I think it works in my existing test case, because submitStage already checks if the stage is running before submitting it. So now this makes the stage simultaneously running and failed :/. Most likely this would result in issues if my test case had an even longer pipeline of stages in one job, so at some point a later attempt for this stage would succeed, so it would no longer be running and only be failed, and then it would get resubmitted for no reason. this is just from the top of my head though ... I'll need to look more carefully and try some more cases to see what is going on here.
(btw, thanks for looking at it in this state, I do still plan on splitting this apart some, just keep getting sidetracked ...)
|
Test build #32073 has finished for PR 5964 at commit
|
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
Merged build triggered. |
|
Merged build started. |
|
Test build #32076 has started for PR 5964 at commit |
|
Test build #32076 has finished for PR 5964 at commit
|
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
…e actual data is in the middle of it
…ts for the same stage
|
Build triggered. |
|
Build started. |
Conflicts: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
Conflicts: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
|
Merged build triggered. |
|
Merged build started. |
|
Test build #33278 has started for PR 5964 at commit |
|
Test build #33278 has finished for PR 5964 at commit
|
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
Merged build triggered. |
|
Merged build started. |
|
Test build #33291 has started for PR 5964 at commit |
|
Test build #33291 has finished for PR 5964 at commit
|
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
|
Would love to get some feedback from the scheduler maintainers: @mateiz @markhamstra @kayousterhout @pwendell |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this (and everything related to partitionComputeCount) is for issues (3) & (4)
|
I haven't yet looked at this closely, but is it possible to split this change into different pull requests, one for each issue? |
|
@kayousterhout I debated doing that, but I kept them together b/c my test case produces all four. So I wouldn't have passing tests unless I addressed them all. (Though I could actually relax the criteria in the test so 3 & 4 were unnecessary.) To put it another way: I'm happy to restructure this for merging. But, I feel that reviewers should consider all of the issues together to properly grasp what is going wrong in the existing implementation. Again, I'd like to stress running the reproduction and seeing what is wrong (ideally adding a loop and running it many times) , that is far more important than the diff for reviewing this IMO. Though maybe this also leads to another question for reviewers -- how do you feel about that test? Its unlike our other unit tests, in that it doesn't try to very narrowly recreate one issue. instead it simulates a workload, with some randomization. The downside of that test is, its slow and you can't easily tease apart the different issues into separate tests. But the upside is that you actually get better coverage -- eg., I wouldn't have discovered issue (2) without this. |
|
all that said, I do see the value in separating them out as well, so I'll do that in addition. But I'd still like reviewers to consider this holistically :) |
|
Mostly I care about separating (1), since it seems like its handling is totally separate from the other issues; can you put the fix to that in its own pull request (or move the other changes into their own pull request)? For that issue, it seems possible to write a narrow unit test to test the specific issue (I wrote two such tests here that you should be able to mostly re-use, if you like: kayousterhout@2b7d232; the first test passes but the second one fails with the current code). I find that preferable to the end-to-end test that you wrote, since it makes it easier to debug the issue when the test fails, and the test also runs much more quickly. I'm also a little confused about how this fixes (1), as I commented on in the code. |
https://issues.apache.org/jira/browse/SPARK-7308
Reproduction of multiple concurrent stage attempts, and a fix. There is a more complete discussion in the doc on JIRA. This address four different issues. They all happen when there are left over tasks from one attempt for a stage that are still running when another attempt for the stage begins.
Note that problems (3) & (4) are only partially solved here. One attempt for a task may finish after the other stage attempt has had all tasks finish, we may have already started the next stage. However, in this case, though the next stage will fail, the retry behavior should take care of it. In fact, the same thing will happen if we do nothing for problems (3) & (4). So I am actually leaning towards ignore problems (3) & (4). I am still submitting this with some code to partially handle those problems, since I already wrote it, just to see what reviewers think.
I'll highlight which problem is being solved in various parts of the change.
I'd recommend reviewers check out a branch with just has the failure reproduction, without the fix here: https://github.com/squito/spark/tree/SPARK-7308_failure_reproduction. Run this test -- even just watch the logs with
tail -f core/targer/unit-tests.log | grep DAGSchedulerand you will see some really weird behavior: Stage 2 has multiple concurrent attempts, which appear to stomp all over each other; Stage 3 get submitted before Stage 2 ever finishes, and then it will rapidly fire off a bunch of attempts which all quickly die (I've seen > 50 attempts); and lots of executors continue to get lost, though the test case only simulates one executor getting lost. And though the test is contrived, we've seen this exact same behavior from customers with large clusters and real workloads.Another thing to figure out about this, is what to do with the unit test -- it takes a while to run, and its randomized. The randomization was intentional while developing, it helped discover other corner cases, but perhaps we could bring back the unit / integration test split: https://issues.apache.org/jira/browse/SPARK-4746