@@ -937,7 +937,7 @@ for details.
937937 <td > Similar to map, but each input item can be mapped to 0 or more output items (so <i >func</i > should return a Seq rather than a single item). </td >
938938</tr >
939939<tr >
940- <td > <b >mapPartitions</b >(<i >func</i >) </td >
940+ <td > <b >mapPartitions</b >(<i >func</i >) <a name = " MapPartLink " ></ a > < /td >
941941 <td > Similar to map, but runs separately on each partition (block) of the RDD, so <i >func</i > must be of type
942942 Iterator<T> => Iterator<U> when running on an RDD of type T. </td>
943943</tr >
@@ -964,7 +964,7 @@ for details.
964964 <td > Return a new dataset that contains the distinct elements of the source dataset.</td >
965965</tr >
966966<tr >
967- <td > <b >groupByKey</b >([<i >numTasks</i >]) </td >
967+ <td > <b >groupByKey</b >([<i >numTasks</i >]) <a name = " GroupByLink " ></ a > < /td >
968968 <td > When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable< ; V> ; ) pairs. <br />
969969 <b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
970970 average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
@@ -975,25 +975,25 @@ for details.
975975 </td >
976976</tr >
977977<tr >
978- <td > <b >reduceByKey</b >(<i >func</i >, [<i >numTasks</i >]) </td >
978+ <td > <b >reduceByKey</b >(<i >func</i >, [<i >numTasks</i >]) <a name = " ReduceByLink " ></ a > < /td >
979979 <td > When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i >func</i >, which must be of type (V,V) => V. Like in <code >groupByKey</code >, the number of reduce tasks is configurable through an optional second argument. </td >
980980</tr >
981981<tr >
982- <td > <b >aggregateByKey</b >(<i >zeroValue</i >)(<i >seqOp</i >, <i >combOp</i >, [<i >numTasks</i >]) </td >
982+ <td > <b >aggregateByKey</b >(<i >zeroValue</i >)(<i >seqOp</i >, <i >combOp</i >, [<i >numTasks</i >]) <a name = " AggregateByLink " ></ a > < /td >
983983 <td > When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code >groupByKey</code >, the number of reduce tasks is configurable through an optional second argument. </td >
984984</tr >
985985<tr >
986- <td > <b >sortByKey</b >([<i >ascending</i >], [<i >numTasks</i >]) </td >
986+ <td > <b >sortByKey</b >([<i >ascending</i >], [<i >numTasks</i >]) <a name = " SortByLink " ></ a > < /td >
987987 <td > When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code >ascending</code > argument.</td >
988988</tr >
989989<tr >
990- <td > <b >join</b >(<i >otherDataset</i >, [<i >numTasks</i >]) </td >
990+ <td > <b >join</b >(<i >otherDataset</i >, [<i >numTasks</i >]) <a name = " JoinLink " ></ a > < /td >
991991 <td > When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
992992 Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
993993 </td >
994994</tr >
995995<tr >
996- <td > <b >cogroup</b >(<i >otherDataset</i >, [<i >numTasks</i >]) </td >
996+ <td > <b >cogroup</b >(<i >otherDataset</i >, [<i >numTasks</i >]) <a name = " CogroupLink " ></ a > < /td >
997997 <td > When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable< ; V> ; , Iterable< ; W> ; )) tuples. This operation is also called <code >groupWith</code >. </td >
998998</tr >
999999<tr >
@@ -1006,17 +1006,17 @@ for details.
10061006 process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
10071007</tr >
10081008<tr >
1009- <td > <b >coalesce</b >(<i >numPartitions</i >) </td >
1009+ <td > <b >coalesce</b >(<i >numPartitions</i >) <a name = " CoalesceLink " ></ a > < /td >
10101010 <td > Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
10111011 after filtering down a large dataset. </td>
10121012</tr >
10131013<tr >
10141014 <td > <b >repartition</b >(<i >numPartitions</i >) </td >
10151015 <td > Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
1016- This always shuffles all data over the network. </td>
1016+ This always shuffles all data over the network. <a name="RepartitionLink"></a>< /td>
10171017</tr >
10181018<tr >
1019- <td > <b >repartitionAndSortWithinPartitions</b >(<i >partitioner</i >) </td >
1019+ <td > <b >repartitionAndSortWithinPartitions</b >(<i >partitioner</i >) <a name = " Repartition2Link " ></ a >< /td >
10201020 <td > Repartition the RDD according to the given partitioner and, within each resulting partition,
10211021 sort records by their keys. This is more efficient than calling <code >repartition</code > and then sorting within
10221022 each partition because it can push the sorting down into the shuffle machinery. </td >
@@ -1080,7 +1080,7 @@ for details.
10801080 <code>SparkContext.objectFile()</code>. </td>
10811081</tr >
10821082<tr >
1083- <td > <b >countByKey</b >() </td >
1083+ <td > <b >countByKey</b >() <a name = " CountByLink " ></ a > < /td >
10841084 <td > Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td >
10851085</tr >
10861086<tr >
@@ -1090,6 +1090,67 @@ for details.
10901090</tr >
10911091</table >
10921092
1093+ ### Shuffle operations
1094+
1095+ Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's
1096+ mechanism for re-distributing data so that is grouped differently across partitions. This typically
1097+ involves copying data across executors and machines, making the shuffle a complex and
1098+ costly operation.
1099+
1100+ #### Background
1101+
1102+ To understand what happens during the shuffle we can consider the example of the
1103+ [ ` reduceByKey ` ] ( #ReduceByLink ) operation. The ` reduceByKey ` operation generates a new RDD where all
1104+ values for a single key are combined into a tuple - the key and the result of executing a reduce
1105+ function against all values associated with that key. The challenge is that not all values for a
1106+ single key necessarily reside on the same partition, or even the same machine, but they must be
1107+ co-located to compute the result.
1108+
1109+ In Spark, data is generally not distributed across partitions to be in the necessary place for a
1110+ specific operation. During computations, a single task will operate on a single partition - thus, to
1111+ organize all the data for a single ` reduceByKey ` reduce task to execute, Spark needs to perform an
1112+ all-to-all operation. It must read from all partitions to find all the values for all keys,
1113+ and then bring together values across partitions to compute the final result for each key -
1114+ this is called the ** shuffle** .
1115+
1116+ Although the set of elements in each partition of newly shuffled data will be deterministic, and so
1117+ is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
1118+ ordered data following shuffle then it's possible to use:
1119+
1120+ * ` mapPartitions ` to sort each partition using, for example, ` .sorted `
1121+ * ` repartitionAndSortWithinPartitions ` to efficiently sort partitions while simultaneously repartitioning
1122+ * ` sortBy ` to make a globally ordered RDD
1123+
1124+ Operations which can cause a shuffle include ** repartition** operations like
1125+ [ ` repartition ` ] ( #RepartitionLink ) , and [ ` coalesce ` ] ( #CoalesceLink ) , ** 'ByKey** operations
1126+ (except for counting) like [ ` groupByKey ` ] ( #GroupByLink ) and [ ` reduceByKey ` ] ( #ReduceByLink ) , and
1127+ ** join** operations like [ ` cogroup ` ] ( #CogroupLink ) and [ ` join ` ] ( #JoinLink ) .
1128+
1129+ #### Performance Impact
1130+ The ** Shuffle** is an expensive operation since it involves disk I/O, data serialization, and
1131+ network I/O. To organize data for the shuffle, Spark generates sets of tasks - * map* tasks to
1132+ organize the data, and a set of * reduce* tasks to aggregate it. This nomenclature comes from
1133+ MapReduce and does not directly relate to Spark's ` map ` and ` reduce ` operations.
1134+
1135+ Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
1136+ are sorted based on the target partition and written to a single file. On the reduce side, tasks
1137+ read the relevant sorted blocks.
1138+
1139+ Certain shuffle operations can consume significant amounts of heap memory since they employ
1140+ in-memory data structures to organize records before or after transferring them. Specifically,
1141+ ` reduceByKey ` and ` aggregateByKey ` create these structures on the map side and ` 'ByKey ` operations
1142+ generate these on the reduce side. When data does not fit in memory Spark will spill these tables
1143+ to disk, incurring the additional overhead of disk I/O and increased garbage collection.
1144+
1145+ Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
1146+ are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
1147+ long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need
1148+ to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the
1149+ ` spark.local.dir ` configuration parameter when configuring the Spark context.
1150+
1151+ Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the
1152+ 'Shuffle Behavior' section within the [ Spark Configuration Guide] ( configuration.html ) .
1153+
10931154## RDD Persistence
10941155
10951156One of the most important capabilities in Spark is * persisting* (or * caching* ) a dataset in memory
0 commit comments