Skip to content

Commit 75ef67b

Browse files
author
Ilya Ganelin
committed
[SPARK-5750][SPARK-3441][SPARK-5836] Added documentation explaining the shuffle operation and included errata from a number of other JIRAs
1 parent dd077ab commit 75ef67b

File tree

1 file changed

+30
-9
lines changed

1 file changed

+30
-9
lines changed

docs/programming-guide.md

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ for details.
870870
<td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td>
871871
</tr>
872872
<tr>
873-
<td> <b>mapPartitions</b>(<i>func</i>) </td>
873+
<td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td>
874874
<td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
875875
Iterator&lt;T&gt; => Iterator&lt;U&gt; when running on an RDD of type T. </td>
876876
</tr>
@@ -897,7 +897,7 @@ for details.
897897
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
898898
</tr>
899899
<tr>
900-
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
900+
<td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td>
901901
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
902902
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
903903
average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
@@ -908,15 +908,15 @@ for details.
908908
</td>
909909
</tr>
910910
<tr>
911-
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
911+
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) <a name="ReduceByLink"></a> </td>
912912
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
913913
</tr>
914914
<tr>
915-
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
915+
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) <a name="AggregateByLink"></a> </td>
916916
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
917917
</tr>
918918
<tr>
919-
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td>
919+
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) <a name="SortByLink"></a> </td>
920920
<td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
921921
</tr>
922922
<tr>
@@ -939,17 +939,17 @@ for details.
939939
process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
940940
</tr>
941941
<tr>
942-
<td> <b>coalesce</b>(<i>numPartitions</i>) </td>
942+
<td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td>
943943
<td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
944944
after filtering down a large dataset. </td>
945945
</tr>
946946
<tr>
947947
<td> <b>repartition</b>(<i>numPartitions</i>) </td>
948948
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
949-
This always shuffles all data over the network. </td>
949+
This always shuffles all data over the network. <a name="RepartitionLink"></a></td>
950950
</tr>
951951
<tr>
952-
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td>
952+
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td>
953953
<td> Repartition the RDD according to the given partitioner and, within each resulting partition,
954954
sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
955955
each partition because it can push the sorting down into the shuffle machinery. </td>
@@ -1013,7 +1013,7 @@ for details.
10131013
<code>SparkContext.objectFile()</code>. </td>
10141014
</tr>
10151015
<tr>
1016-
<td> <b>countByKey</b>() </td>
1016+
<td> <b>countByKey</b>() <a name="CountByLink"></a> </td>
10171017
<td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td>
10181018
</tr>
10191019
<tr>
@@ -1022,6 +1022,27 @@ for details.
10221022
</tr>
10231023
</table>
10241024

1025+
### Shuffle operations
1026+
1027+
Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-organizing data to co-locate data associated with particular keys.
1028+
1029+
#### Background
1030+
1031+
To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a 2-tuple - the key and an Iterable object containing all the associated values. If we think of the map and reduce steps for `groupByKey()` then we can see that to generate the list of all values associated with a key, all of the values must reside on the same reducer, since the output of the reduce step is the complete array.
1032+
1033+
In Spark, by default, data is distributed randomly across partitions. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**.
1034+
1035+
Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a `Hadoop` style shuffle.
1036+
1037+
Operations, which cause a shuffle include [`groupByKey`](#GroupByLink), [`sortByKey`](#SortByLink), [`reduceByKey`](#ReduceByLink), [`aggregateByKey`](#AggregateByLink), [`repartition`](#RepartitionLink), [`repartitionAndSortWithinPartitions`](#Repartition2Link`), [`coalesce`](#CoalesceLink), and [`countByKey`](#CountByLink).
1038+
1039+
#### Performance Impact
1040+
**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. Shuffle operations can have a serious impact on performance. To organize data for the shuffle, Spark will also generate lookup tables in memory, which, for large operations, can consume significant amounts of heap memory. When out of memory, for all shuffle operations with the exception of `sortByKey`, Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Since `sortByKey` does not spill these intermediate tables to disk, the shuffle operation may cause OOM errors.
1041+
1042+
Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context.
1043+
1044+
Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the 'Shuffle Behavior' section within the Spark Configuration Guide.
1045+
10251046
## RDD Persistence
10261047

10271048
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory

0 commit comments

Comments
 (0)