-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-5750][SPARK-3441][SPARK-5836][CORE] Added documentation explaining shuffle #5074
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…he shuffle operation and included errata from a number of other JIRAs
|
Test build #28732 has started for PR 5074 at commit
|
|
Test build #28732 has finished for PR 5074 at commit
|
|
Test PASSed. |
docs/programming-guide.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better word is "undefined", rather than "random".
|
Test build #28799 has started for PR 5074 at commit
|
|
Test build #28799 has finished for PR 5074 at commit
|
|
Test PASSed. |
docs/programming-guide.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a shuffle is not just about collecting data by key. For example a repartitioning can cause a shuffle. Personally I'd say that some operations need to redistribute data so that it is grouped differently into partitions, which typically means rearranging and copying data across executors or machines.
|
Thanks for the review, Sean - I've incorporated your recommendations and added a short discussion on sort-based shuffle. |
|
Test build #28882 has started for PR 5074 at commit
|
docs/programming-guide.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would replace uses of "operation" that don't refer to actions or transformations to avoid confusion.
|
The doc here should be broken up across multiple lines. The programming guide is kind of inconsistent on this, but I think the move is towards keeping lines under 100 characters. |
docs/programming-guide.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few factual edits:
- Spark always writes all shuffle data to disk on the map side.
- A hash table is only used on the map side for reduceByKey and aggregateByKey, and only on the reduce side for the ByKey operations.
- sortByKey no longer can OOM.
Also, I would avoid mentioning hash-based shuffle at all because sort-based shuffle is now pretty much what we expect everybody to use.
Fixed a few more small items.
|
Done. |
|
Test build #29178 has started for PR 5074 at commit
|
|
Test build #29170 has finished for PR 5074 at commit
|
|
Test PASSed. |
|
Test build #29178 has finished for PR 5074 at commit
|
|
Test PASSed. |
docs/programming-guide.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be concrete and finish this off, I'd propose keeping only the following text. @sryza @ilganeli
Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, sortBy can be used to perform a global sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sean, I'd suggest this text:
Although the set of elements in each partition of newly shuffled data will be deterministic, the
ordering of these elements is not. If one desires predictably ordered data following shuffle
operations, sortBy can be used to perform a global sort. A similar operation,
repartitionAndSortWithinPartitions coupled with mapPartitions,
may be used to enact a Hadoop style shuffle.
The reason is that this would address https://issues.apache.org/jira/browse/SPARK-3441 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't sort the partitions though, right? and what needs a mapPartitions then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went and re-read the JIRA in question. I think Sandy was simply pointing out that the above could be used as a replacement for groupBy and that repartitionAndSortWithinPartitions functions as a Hadoop-style shuffle. I agree that it's not needed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you both are already on the same page about this already, but I think the relevant question being addressed might be "how do you control (or make deterministic) the ordering of elements within a key". Currently the only way to do this is with a repartitionAndSortWithinPartitions followed by a mapPartitions that groups the sorted elements that make up the partition. Talking about this could be TMI though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that does not give an RDD where partition n all sorts before n+1 right? I get that the question is more determinism but mentioning in the same breath as sortBy feels confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line that sets it up is talking about ordering within a partition. sortBy is mentioned afterward and global sort is specifically called out. If you read the paragraph front to end does it seem unclear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, yeah. This could be a dumb question, but do we know that the partitions will always appear in the same order? I suppose it depends on the partitioner, but, even for HashPartitioner -- do we know that, say, everything hashing to 0 occurs in partition 0 every time? If this is order of partitions is deterministic for any reasonable case, then I get it at last, yeah.
If that understanding is correct, then, here's another pass at the paragraph:
Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements within each partition is not. If one desires predictably ordered data following shuffle operations, then it's possible to use:
mapPartitionsto sort each partition using, for example,.sortedrepartitionAndSortWithinPartitionsto efficiently sort partitions while simultaneously repartitioningsortByto make a globally ordered RDD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sean - I like this final wording, I've added this in the latest. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen that's exactly how HashPartitioner works. As long as the partition function isn't using System.currentTimeMillis or something the ordering of partitions is deterministic.
|
Test build #29238 has started for PR 5074 at commit
|
|
Test build #29238 has finished for PR 5074 at commit
|
|
Test PASSed. |
Removed extraneous reference to repartitionAndSort
|
Test build #29253 has started for PR 5074 at commit
|
|
Test build #29253 has finished for PR 5074 at commit
|
|
Test PASSed. |
|
Looks good. I'll pause a little longer for final comments from @sryza. |
Reworded discussion on sorting within partitions.
|
Test build #29309 has started for PR 5074 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true for reduceByKey, though? It's certainly true for groupByKey, but reduceByKey and combineByKey will perform map-side combining so it's not strictly true that all values for a key must be co-located before computing the new reduced value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the discussion up-thread and noticed that this originally referred to groupByKey() but was changed because we're trying to discourage users from using that operation. The previous sentence is clear, because it says "organize all the data for a single reduceByKey reduce task", which is true because the "data" here can refer to partially-combined results rather than input records. This current sentence was what tripped me up, since only groupByKey requires that "all values" for any key be co-located. But maybe I'm just overthinking this, since I suppose that "values" could also refer to partially-combined values.
I'm fine with this as long as it's not confusing to anyone else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a reasonable point. I think it slightly outweighs the earlier concern about even mentioning groupByKey. @ilganeli would you mind making this final change? to, say, "a single groupByKey task"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion is not great. Leave the reduceByKey example. I think the final sentence can be "... and then bring together values across partitions to compute the final result for each key - ..." I can just tap this in since I am pretty certain everyone is happy at this point.
|
Test build #29309 has finished for PR 5074 at commit
|
|
Test PASSed. |
|
Since this is looking good to me and @ilganeli has been patient, I'd like to just merge this and make that last edit manually today. |
…ning shuffle I've updated the Spark Programming Guide to add a section on the shuffle operation providing some background on what it does. I've also addressed some of its performance impacts. I've included documentation to address the following issues: https://issues.apache.org/jira/browse/SPARK-5836 https://issues.apache.org/jira/browse/SPARK-3441 https://issues.apache.org/jira/browse/SPARK-5750 https://issues.apache.org/jira/browse/SPARK-4227 is related but can be addressed in a separate PR since it involves updates to the Spark Configuration Guide. Author: Ilya Ganelin <[email protected]> Author: Ilya Ganelin <[email protected]> Closes #5074 from ilganeli/SPARK-5750 and squashes the following commits: 6178e24 [Ilya Ganelin] Update programming-guide.md 7a0b96f [Ilya Ganelin] Update programming-guide.md 2c5df08 [Ilya Ganelin] Merge branch 'SPARK-5750' of github.com:ilganeli/spark into SPARK-5750 dffbd2d [Ilya Ganelin] [SPARK-5750] Slight wording update 1ff4eb4 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750 85f9c6e [Ilya Ganelin] Update programming-guide.md 349d1fa [Ilya Ganelin] Added cross linkf or configuration page eeb5a7a [Ilya Ganelin] [SPARK-5750] Added some minor fixes dd5cc9d [Ilya Ganelin] [SPARK-5750] Fixed some factual inaccuracies with regards to shuffle internals. a8adb57 [Ilya Ganelin] [SPARK-5750] Incoporated feedback from Sean Owen 9954bbe [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750 159dd1c [Ilya Ganelin] [SPARK-5750] Style fixes from rxin. 75ef67b [Ilya Ganelin] [SPARK-5750][SPARK-3441][SPARK-5836] Added documentation explaining the shuffle operation and included errata from a number of other JIRAs (cherry picked from commit 4bdfb7b) Signed-off-by: Sean Owen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ilganeli @srowen @JoshRosen Can you explain why was this added? Shuffle files are cleared automatically when the driver garbage collects the shuffle object which triggers messages to all the executors to delete all files related to the shuffle. This was added in Spark 1.0. Has there been any change in this behavior since then that justifies this statement?
If this statement is true, then this is a major major issue for long running jobs like Spark Streaming job. If this statement is not true, then this should not have been added and should be fixed promptly. Users have come to be personally asking me that they are not upgrading to Spark 1.3 because they this behavior is a regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I had a couple of people ask me this during Spark Summit. I was investigating this myself today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas This bit is a response to https://issues.apache.org/jira/browse/SPARK-5836 You would likely know better, and if GC of shuffle-related objects should trigger cleanup of shuffle files, then phew, at least there is some mechanism for that. I know people occasionally ask about problems with too many shuffle files lying around, but that doesn't mean the mechanism doesn't work. I think the best short-term change is just to update this statement if you're pretty confident this mechanism works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That clarifies things for me. I know that there has been some concern about
the shuffle files filling up disk, but that as of now can happen because
one or more of the following reasons.
- GC does not kick in for a long time (very high driver memory). The
solution may often be periodically call GC. - Nothing goes out of scope and so nothing is GCed.
- There are some issues reported with shuffle files not being cleaned up
in Mesos
The 3rd one is a bug and we will fix it. The first two should be clarified
in the docs. That is better than than this current very scary description.
On Wed, Jun 17, 2015 at 11:41 PM, Sean Owen [email protected]
wrote:
In docs/programming-guide.md
#5074 (comment):+organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from
+MapReduce and does not directly relate to Spark'smapandreduceoperations.
+
+Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
+are sorted based on the target partition and written to a single file. On the reduce side, tasks
+read the relevant sorted blocks.
+
+Certain shuffle operations can consume significant amounts of heap memory since they employ
+in-memory data structures to organize records before or after transferring them. Specifically,
+reduceByKeyandaggregateByKeycreate these structures on the map side and'ByKeyoperations
+generate these on the reduce side. When data does not fit in memory Spark will spill these tables
+to disk, incurring the additional overhead of disk I/O and increased garbage collection.
+
+Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
+are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
+long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need@tdas https://github.com/tdas This bit is a response to
https://issues.apache.org/jira/browse/SPARK-5836 You would likely know
better, and if GC of shuffle-related objects should trigger cleanup of
shuffle files, then phew, at least there is some mechanism for that. I
know people occasionally ask about problems with too many shuffle files
lying around, but that doesn't mean the mechanism doesn't work. I think the
best short-term change is just to update this statement if you're pretty
confident this mechanism works.—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5074/files#r32702504.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the Spark Programming Guide to add a section on the shuffle operation providing some background on what it does. I've also addressed some of its performance impacts.
I've included documentation to address the following issues:
https://issues.apache.org/jira/browse/SPARK-5836
https://issues.apache.org/jira/browse/SPARK-3441
https://issues.apache.org/jira/browse/SPARK-5750
https://issues.apache.org/jira/browse/SPARK-4227 is related but can be addressed in a separate PR since it involves updates to the Spark Configuration Guide.