Skip to content

Commit 4bdfb7b

Browse files
Ilya Ganelinsrowen
authored andcommitted
[SPARK-5750][SPARK-3441][SPARK-5836][CORE] Added documentation explaining shuffle
I've updated the Spark Programming Guide to add a section on the shuffle operation providing some background on what it does. I've also addressed some of its performance impacts. I've included documentation to address the following issues: https://issues.apache.org/jira/browse/SPARK-5836 https://issues.apache.org/jira/browse/SPARK-3441 https://issues.apache.org/jira/browse/SPARK-5750 https://issues.apache.org/jira/browse/SPARK-4227 is related but can be addressed in a separate PR since it involves updates to the Spark Configuration Guide. Author: Ilya Ganelin <[email protected]> Author: Ilya Ganelin <[email protected]> Closes apache#5074 from ilganeli/SPARK-5750 and squashes the following commits: 6178e24 [Ilya Ganelin] Update programming-guide.md 7a0b96f [Ilya Ganelin] Update programming-guide.md 2c5df08 [Ilya Ganelin] Merge branch 'SPARK-5750' of github.com:ilganeli/spark into SPARK-5750 dffbd2d [Ilya Ganelin] [SPARK-5750] Slight wording update 1ff4eb4 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750 85f9c6e [Ilya Ganelin] Update programming-guide.md 349d1fa [Ilya Ganelin] Added cross linkf or configuration page eeb5a7a [Ilya Ganelin] [SPARK-5750] Added some minor fixes dd5cc9d [Ilya Ganelin] [SPARK-5750] Fixed some factual inaccuracies with regards to shuffle internals. a8adb57 [Ilya Ganelin] [SPARK-5750] Incoporated feedback from Sean Owen 9954bbe [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750 159dd1c [Ilya Ganelin] [SPARK-5750] Style fixes from rxin. 75ef67b [Ilya Ganelin] [SPARK-5750][SPARK-3441][SPARK-5836] Added documentation explaining the shuffle operation and included errata from a number of other JIRAs
1 parent de67330 commit 4bdfb7b

File tree

1 file changed

+72
-11
lines changed

1 file changed

+72
-11
lines changed

docs/programming-guide.md

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,7 @@ for details.
937937
<td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td>
938938
</tr>
939939
<tr>
940-
<td> <b>mapPartitions</b>(<i>func</i>) </td>
940+
<td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td>
941941
<td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
942942
Iterator&lt;T&gt; => Iterator&lt;U&gt; when running on an RDD of type T. </td>
943943
</tr>
@@ -964,7 +964,7 @@ for details.
964964
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
965965
</tr>
966966
<tr>
967-
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
967+
<td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td>
968968
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
969969
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
970970
average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
@@ -975,25 +975,25 @@ for details.
975975
</td>
976976
</tr>
977977
<tr>
978-
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
978+
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) <a name="ReduceByLink"></a> </td>
979979
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
980980
</tr>
981981
<tr>
982-
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
982+
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) <a name="AggregateByLink"></a> </td>
983983
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
984984
</tr>
985985
<tr>
986-
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td>
986+
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) <a name="SortByLink"></a> </td>
987987
<td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
988988
</tr>
989989
<tr>
990-
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
990+
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="JoinLink"></a> </td>
991991
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
992992
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
993993
</td>
994994
</tr>
995995
<tr>
996-
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
996+
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="CogroupLink"></a> </td>
997997
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt;, Iterable&lt;W&gt;)) tuples. This operation is also called <code>groupWith</code>. </td>
998998
</tr>
999999
<tr>
@@ -1006,17 +1006,17 @@ for details.
10061006
process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
10071007
</tr>
10081008
<tr>
1009-
<td> <b>coalesce</b>(<i>numPartitions</i>) </td>
1009+
<td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td>
10101010
<td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
10111011
after filtering down a large dataset. </td>
10121012
</tr>
10131013
<tr>
10141014
<td> <b>repartition</b>(<i>numPartitions</i>) </td>
10151015
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
1016-
This always shuffles all data over the network. </td>
1016+
This always shuffles all data over the network. <a name="RepartitionLink"></a></td>
10171017
</tr>
10181018
<tr>
1019-
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td>
1019+
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td>
10201020
<td> Repartition the RDD according to the given partitioner and, within each resulting partition,
10211021
sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
10221022
each partition because it can push the sorting down into the shuffle machinery. </td>
@@ -1080,7 +1080,7 @@ for details.
10801080
<code>SparkContext.objectFile()</code>. </td>
10811081
</tr>
10821082
<tr>
1083-
<td> <b>countByKey</b>() </td>
1083+
<td> <b>countByKey</b>() <a name="CountByLink"></a> </td>
10841084
<td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td>
10851085
</tr>
10861086
<tr>
@@ -1090,6 +1090,67 @@ for details.
10901090
</tr>
10911091
</table>
10921092

1093+
### Shuffle operations
1094+
1095+
Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's
1096+
mechanism for re-distributing data so that is grouped differently across partitions. This typically
1097+
involves copying data across executors and machines, making the shuffle a complex and
1098+
costly operation.
1099+
1100+
#### Background
1101+
1102+
To understand what happens during the shuffle we can consider the example of the
1103+
[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all
1104+
values for a single key are combined into a tuple - the key and the result of executing a reduce
1105+
function against all values associated with that key. The challenge is that not all values for a
1106+
single key necessarily reside on the same partition, or even the same machine, but they must be
1107+
co-located to compute the result.
1108+
1109+
In Spark, data is generally not distributed across partitions to be in the necessary place for a
1110+
specific operation. During computations, a single task will operate on a single partition - thus, to
1111+
organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an
1112+
all-to-all operation. It must read from all partitions to find all the values for all keys,
1113+
and then bring together values across partitions to compute the final result for each key -
1114+
this is called the **shuffle**.
1115+
1116+
Although the set of elements in each partition of newly shuffled data will be deterministic, and so
1117+
is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
1118+
ordered data following shuffle then it's possible to use:
1119+
1120+
* `mapPartitions` to sort each partition using, for example, `.sorted`
1121+
* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning
1122+
* `sortBy` to make a globally ordered RDD
1123+
1124+
Operations which can cause a shuffle include **repartition** operations like
1125+
[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations
1126+
(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and
1127+
**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
1128+
1129+
#### Performance Impact
1130+
The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and
1131+
network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to
1132+
organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from
1133+
MapReduce and does not directly relate to Spark's `map` and `reduce` operations.
1134+
1135+
Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
1136+
are sorted based on the target partition and written to a single file. On the reduce side, tasks
1137+
read the relevant sorted blocks.
1138+
1139+
Certain shuffle operations can consume significant amounts of heap memory since they employ
1140+
in-memory data structures to organize records before or after transferring them. Specifically,
1141+
`reduceByKey` and `aggregateByKey` create these structures on the map side and `'ByKey` operations
1142+
generate these on the reduce side. When data does not fit in memory Spark will spill these tables
1143+
to disk, incurring the additional overhead of disk I/O and increased garbage collection.
1144+
1145+
Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
1146+
are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
1147+
long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need
1148+
to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the
1149+
`spark.local.dir` configuration parameter when configuring the Spark context.
1150+
1151+
Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the
1152+
'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html).
1153+
10931154
## RDD Persistence
10941155

10951156
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory

0 commit comments

Comments
 (0)