-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8029][core] shuffleoutput per attempt #6648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8029][core] shuffleoutput per attempt #6648
Conversation
…rtial fix, still have some concurrent attempts
…e actual data is in the middle of it
…ts for the same stage
Conflicts: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
Conflicts: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
…les, tests do not)
Conflicts: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: comment should follow javadoc formatting:
/**
* Comment.
*/
|
Looks sane, but this isn't really my area of expertise. Just a reminder that you should either enable DAGSchedulerFailureRecoverySuite or remove it from the patch. Also, left a question about backwards compatibility. |
|
Test build #43401 has finished for PR 6648 at commit
|
|
Test build #43470 has finished for PR 6648 at commit
|
|
Jenkins, retest this please |
Conflicts: project/MimaExcludes.scala
|
Test build #43475 has finished for PR 6648 at commit
|
|
@vanzin @JoshRosen made external shuffle service backwards compatible and got rid of DAGSchedulerFailureRecoverySuite |
|
I looked at the diffs since my last review, looks good. |
|
I will get @JoshRosen to take a look at this. |
|
Hey Imran, Given the number of changes required for this approach, I wonder whether an atomic rename design wouldn't be simpler (in particular, the "first attempt wins" in the doc). The doc seems to be worried that a file output might be corrupted, but in that case, why not send a message to the node asking it to delete its old output files, and then send a new map task? It can just be the delete-block message that the block manager already supports. This seems much nicer because it doesn't require any changes to the data structures in the rest of Spark. |
|
BTW, with that design, I also wouldn't even implement the delete message in the first patch, unless we've actually seen block corruptions happen; but it sounds like we haven't seen such things and we probably wouldn't have a great way to detect them now anyway (i.e. the reduce task would mark a fetch successful and just crash). |
|
Test build #45389 has finished for PR 6648 at commit
|
|
Jenkins, retest this please |
|
Test build #45437 has finished for PR 6648 at commit
|
|
Jenkins, retest this please |
|
Test build #45484 has started for PR 6648 at commit |
|
Jenkins, retest this please |
|
Test build #45528 has finished for PR 6648 at commit
|
|
Jenkins, retest this please |
|
Test build #45533 has finished for PR 6648 at commit
|
https://issues.apache.org/jira/browse/SPARK-8029
This implements one of the approaches in the design doc on the jira: now each
ShuffleMapTaskattempt write to a different location.ShuffleBlockIdis extended to include the stage attempt id, so the fetch side knows which files to read from.MapStatusalso includes the stage attempt, so now there is oneMapStatusper(executor, attempt)as opposed to one perexecutor. This won't really matter when there is just one attempt per stage. In a pathological case, you'd end up with oneMapStatusper partition, which would be much worse, but that is very unlikely.This touches a lot of files, but almost all of the changes are just plumbing a
stageAttemptIdthrough a lot of different places.cc @JoshRosen