From 75ef67bc6bbde8da6d528f9135bfabd3738f3903 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 17 Mar 2015 12:40:11 -0700 Subject: [PATCH 01/10] [SPARK-5750][SPARK-3441][SPARK-5836] Added documentation explaining the shuffle operation and included errata from a number of other JIRAs --- docs/programming-guide.md | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 7b0701828878..196617414173 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -870,7 +870,7 @@ for details. Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). - mapPartitions(func) + mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. @@ -897,7 +897,7 @@ for details. Return a new dataset that contains the distinct elements of the source dataset. - groupByKey([numTasks]) + groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better @@ -908,15 +908,15 @@ for details. - reduceByKey(func, [numTasks]) + reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) + aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. - sortByKey([ascending], [numTasks]) + sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. @@ -939,17 +939,17 @@ for details. process's stdin and lines output to its stdout are returned as an RDD of strings. - coalesce(numPartitions) + coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. - This always shuffles all data over the network. + This always shuffles all data over the network. - repartitionAndSortWithinPartitions(partitioner) + repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery. @@ -1013,7 +1013,7 @@ for details. SparkContext.objectFile(). - countByKey() + countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. @@ -1022,6 +1022,27 @@ for details. +### Shuffle operations + +Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-organizing data to co-locate data associated with particular keys. + +#### Background + +To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a 2-tuple - the key and an Iterable object containing all the associated values. If we think of the map and reduce steps for `groupByKey()` then we can see that to generate the list of all values associated with a key, all of the values must reside on the same reducer, since the output of the reduce step is the complete array. + +In Spark, by default, data is distributed randomly across partitions. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**. + +Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a `Hadoop` style shuffle. + +Operations, which cause a shuffle include [`groupByKey`](#GroupByLink), [`sortByKey`](#SortByLink), [`reduceByKey`](#ReduceByLink), [`aggregateByKey`](#AggregateByLink), [`repartition`](#RepartitionLink), [`repartitionAndSortWithinPartitions`](#Repartition2Link`), [`coalesce`](#CoalesceLink), and [`countByKey`](#CountByLink). + +#### Performance Impact +**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. Shuffle operations can have a serious impact on performance. To organize data for the shuffle, Spark will also generate lookup tables in memory, which, for large operations, can consume significant amounts of heap memory. When out of memory, for all shuffle operations with the exception of `sortByKey`, Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Since `sortByKey` does not spill these intermediate tables to disk, the shuffle operation may cause OOM errors. + +Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context. + +Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the 'Shuffle Behavior' section within the Spark Configuration Guide. + ## RDD Persistence One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory From 159dd1c70e9d1e4d66aebdd094752b6e8053e0a6 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 18 Mar 2015 06:26:42 -0700 Subject: [PATCH 02/10] [SPARK-5750] Style fixes from rxin. --- docs/programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 196617414173..c792d47db9eb 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1024,20 +1024,20 @@ for details. ### Shuffle operations -Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-organizing data to co-locate data associated with particular keys. +Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-distributing data so data with the same key becomes co-located after a shuffle. #### Background To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a 2-tuple - the key and an Iterable object containing all the associated values. If we think of the map and reduce steps for `groupByKey()` then we can see that to generate the list of all values associated with a key, all of the values must reside on the same reducer, since the output of the reduce step is the complete array. -In Spark, by default, data is distributed randomly across partitions. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**. +In Spark, by default, the way data is distributed across partitions is undefined. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**. Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a `Hadoop` style shuffle. Operations, which cause a shuffle include [`groupByKey`](#GroupByLink), [`sortByKey`](#SortByLink), [`reduceByKey`](#ReduceByLink), [`aggregateByKey`](#AggregateByLink), [`repartition`](#RepartitionLink), [`repartitionAndSortWithinPartitions`](#Repartition2Link`), [`coalesce`](#CoalesceLink), and [`countByKey`](#CountByLink). #### Performance Impact -**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. Shuffle operations can have a serious impact on performance. To organize data for the shuffle, Spark will also generate lookup tables in memory, which, for large operations, can consume significant amounts of heap memory. When out of memory, for all shuffle operations with the exception of `sortByKey`, Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Since `sortByKey` does not spill these intermediate tables to disk, the shuffle operation may cause OOM errors. +**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, internally, Spark creates a hash table, which, for large operations, can consume significant amounts of heap memory. When data does not fit in memory, for all shuffle operations with the exception of `sortByKey`, Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Since `sortByKey` does not spill these intermediate tables to disk, the shuffle operation may cause OOM errors. Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context. From a8adb57208266836e41a92f44f5c6edf6ee655ed Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Thu, 19 Mar 2015 12:18:35 -0700 Subject: [PATCH 03/10] [SPARK-5750] Incoporated feedback from Sean Owen --- docs/programming-guide.md | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index b4a32d192bf0..b984e8477802 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -983,13 +983,13 @@ for details. When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. - join(otherDataset, [numTasks]) + join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. - cogroup(otherDataset, [numTasks]) + cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith. @@ -1088,22 +1088,24 @@ for details. ### Shuffle operations -Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-distributing data so data with the same key becomes co-located after a shuffle. +Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-distributing data so that is grouped differently across partitions. This typically involves re-arranging and copying data across executors and machines, making shuffle a complex and costly operation. #### Background -To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a 2-tuple - the key and an Iterable object containing all the associated values. If we think of the map and reduce steps for `groupByKey()` then we can see that to generate the list of all values associated with a key, all of the values must reside on the same reducer, since the output of the reduce step is the complete array. +To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a tuple - the key and an `Iterable` object containing all the associated values. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to present a single array per key. -In Spark, by default, the way data is distributed across partitions is undefined. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**. +In Spark, data is generally not distributed across partitions to be in the ncessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**. -Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a `Hadoop` style shuffle. +Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy` can be used to perform a global sort. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a Hadoop style shuffle. -Operations, which cause a shuffle include [`groupByKey`](#GroupByLink), [`sortByKey`](#SortByLink), [`reduceByKey`](#ReduceByLink), [`aggregateByKey`](#AggregateByLink), [`repartition`](#RepartitionLink), [`repartitionAndSortWithinPartitions`](#Repartition2Link`), [`coalesce`](#CoalesceLink), and [`countByKey`](#CountByLink). +Operations which can cause a shuffle include **repartion** operations like [`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey** operations (except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink) , and **join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink). #### Performance Impact **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, internally, Spark creates a hash table, which, for large operations, can consume significant amounts of heap memory. When data does not fit in memory, for all shuffle operations with the exception of `sortByKey`, Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Since `sortByKey` does not spill these intermediate tables to disk, the shuffle operation may cause OOM errors. -Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context. +As of Spark 1.1, Spark provides an alternative implementation for the hash-based shuffle described above. This sort-based shuffle avoids generating a file for every combination of map and reduce tasks. Instead, results from individual map jobs are kept in memory until they can't fit. Then, they are organized by the target reduce task (the one that needs those results) and then spilled to a single file, which are subsequently aggregated on the reduce side. Sort-based shuffle can significantly improve reduce-stage performance at the expense of moderately increased map-stage run-time. + +Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. This is done so shuffle doesn't need to be re-computed if the lineage is recomputed. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context. Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the 'Shuffle Behavior' section within the Spark Configuration Guide. From dd5cc9daff4ebffee6201b6bbc9fb6f7512d5203 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Fri, 20 Mar 2015 09:47:37 -0700 Subject: [PATCH 04/10] [SPARK-5750] Fixed some factual inaccuracies with regards to shuffle internals. --- docs/programming-guide.md | 63 +++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index b984e8477802..a1101fa233c0 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1088,26 +1088,59 @@ for details. ### Shuffle operations -Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-distributing data so that is grouped differently across partitions. This typically involves re-arranging and copying data across executors and machines, making shuffle a complex and costly operation. +Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's +mechanism for re-distributing data so that is grouped differently across partitions. This typically +involves re-arranging and copying data across executors and machines, making shuffle a complex and +costly operation. #### Background -To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a tuple - the key and an `Iterable` object containing all the associated values. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to present a single array per key. - -In Spark, data is generally not distributed across partitions to be in the ncessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**. - -Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy` can be used to perform a global sort. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a Hadoop style shuffle. - -Operations which can cause a shuffle include **repartion** operations like [`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey** operations (except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink) , and **join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink). +To understand what happens during the shuffle we can consider the example of the +[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all +values for a single key are combined into a tuple - the key and the result of executing a reduce +function against all values associated with that key. The challenge is that not all values for a +single key necessarily reside on the same partition, or even the same machine, but they must be +co-located to present a single array per key. + +In Spark, data is generally not distributed across partitions to be in the necessary place for a +specific operation. During computations, a single task will operate on a single partition - thus, to +organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an +all-to-all operation. It must read from all partitions to find all the values for all keys, and then +organize those such that all values for any key lie within the same partition - this is called the +**shuffle**. + +Although the set of elements in each partition of newly shuffled data will be deterministic, the +ordering of these elements is not. If one desires predictably ordered data following shuffle +operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy` can be +used to perform a global sort. A similar operation, +[`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, +may be used to enact a Hadoop style shuffle. + +Operations which can cause a shuffle include **repartition** operations like +[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey** operations +(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and +**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink). #### Performance Impact -**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, internally, Spark creates a hash table, which, for large operations, can consume significant amounts of heap memory. When data does not fit in memory, for all shuffle operations with the exception of `sortByKey`, Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Since `sortByKey` does not spill these intermediate tables to disk, the shuffle operation may cause OOM errors. - -As of Spark 1.1, Spark provides an alternative implementation for the hash-based shuffle described above. This sort-based shuffle avoids generating a file for every combination of map and reduce tasks. Instead, results from individual map jobs are kept in memory until they can't fit. Then, they are organized by the target reduce task (the one that needs those results) and then spilled to a single file, which are subsequently aggregated on the reduce side. Sort-based shuffle can significantly improve reduce-stage performance at the expense of moderately increased map-stage run-time. - -Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. This is done so shuffle doesn't need to be re-computed if the lineage is recomputed. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context. - -Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the 'Shuffle Behavior' section within the Spark Configuration Guide. +**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and +network I/O. To organize data for the shuffle, Spark generates two sets of tasks - map tasks to +organize the data, and a set of reduce tasks to aggregate it. Internally, results from individual +map jobs are kept in memory until they can't fit. Then, these are sorted based on the target reduce +task and written to a single file. On the reduce side, tasks read the relevant sorted blocks. + +Certain shuffle operations can consume significant amounts of heap memory since they generate hash +tables in memory. Specifically, `reduceByKey` and `aggregateByKey` on the map-side and `'byKey` +operations on the reduce-side. When data does not fit in memory Spark will spill these tables to +disk, incurring the additional overhead of disk I/O and increased garbage collection. + +Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files +are not cleaned up from Spark's temporary storage until Spark is stopped, which means that +long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need +to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the +`spark.local.dir` configuration parameter when configuring the Spark context. + +Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the +'Shuffle Behavior' section within the Spark Configuration Guide. ## RDD Persistence From eeb5a7a14f8c7968ae715e19fab9e11ab1ec4da4 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 24 Mar 2015 11:00:28 -0700 Subject: [PATCH 05/10] [SPARK-5750] Added some minor fixes --- docs/programming-guide.md | 68 +++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index a1101fa233c0..3fbf18adad19 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1088,58 +1088,62 @@ for details. ### Shuffle operations -Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's -mechanism for re-distributing data so that is grouped differently across partitions. This typically -involves re-arranging and copying data across executors and machines, making shuffle a complex and +Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's +mechanism for re-distributing data so that is grouped differently across partitions. This typically +involves copying data across executors and machines, making the shuffle a complex and costly operation. #### Background -To understand what happens during the shuffle we can consider the example of the -[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all -values for a single key are combined into a tuple - the key and the result of executing a reduce -function against all values associated with that key. The challenge is that not all values for a -single key necessarily reside on the same partition, or even the same machine, but they must be -co-located to present a single array per key. +To understand what happens during the shuffle we can consider the example of the +[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all +values for a single key are combined into a tuple - the key and the result of executing a reduce +function against all values associated with that key. The challenge is that not all values for a +single key necessarily reside on the same partition, or even the same machine, but they must be +co-located to compute the result. -In Spark, data is generally not distributed across partitions to be in the necessary place for a +In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to -organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an +organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then -organize those such that all values for any key lie within the same partition - this is called the +organize those such that all values for any key lie within the same partition - this is called the **shuffle**. -Although the set of elements in each partition of newly shuffled data will be deterministic, the -ordering of these elements is not. If one desires predictably ordered data following shuffle +Although the set of elements in each partition of newly shuffled data will be deterministic, the +ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy` can be -used to perform a global sort. A similar operation, -[`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, +used to perform a global sort. A similar operation, +[`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a Hadoop style shuffle. -Operations which can cause a shuffle include **repartition** operations like +Operations which can cause a shuffle include **repartition** operations like [`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey** operations -(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and +(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and **join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink). #### Performance Impact -**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and -network I/O. To organize data for the shuffle, Spark generates two sets of tasks - map tasks to -organize the data, and a set of reduce tasks to aggregate it. Internally, results from individual -map jobs are kept in memory until they can't fit. Then, these are sorted based on the target reduce -task and written to a single file. On the reduce side, tasks read the relevant sorted blocks. - -Certain shuffle operations can consume significant amounts of heap memory since they generate hash -tables in memory. Specifically, `reduceByKey` and `aggregateByKey` on the map-side and `'byKey` -operations on the reduce-side. When data does not fit in memory Spark will spill these tables to -disk, incurring the additional overhead of disk I/O and increased garbage collection. +**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and +network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to +organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from +MapReduce and does not directly relate to Spark's `map` and `reduce` operations. + +Internally, results from individual map tasks are kept in memory until they can't fit. Then, these +are sorted based on the target partition and written to a single file. On the reduce side, tasks +read the relevant sorted blocks. -Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files +Certain shuffle operations can consume significant amounts of heap memory since they employ +in-memory data structures to organize records before or after transferring them. Specifically, +`reduceByKey` and `aggregateByKey` create these structures on the map-side and `'byKey` operations +generate these on the reduce-side. When data does not fit in memory Spark will spill these tables +to disk, incurring the additional overhead of disk I/O and increased garbage collection. + +Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that -long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need -to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the +long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need +to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context. -Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the +Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the 'Shuffle Behavior' section within the Spark Configuration Guide. ## RDD Persistence From 349d1fa9d26bee9ddc2b02fc5d52db0d0a6dbd97 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 25 Mar 2015 12:53:59 -0700 Subject: [PATCH 06/10] Added cross linkf or configuration page --- docs/programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 3fbf18adad19..49348becce36 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1144,7 +1144,7 @@ to be re-computed if the lineage is re-computed. The temporary storage directory `spark.local.dir` configuration parameter when configuring the Spark context. Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the -'Shuffle Behavior' section within the Spark Configuration Guide. +'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html). ## RDD Persistence From 85f9c6e2e33c73299b85b61366da6155d226704c Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 25 Mar 2015 14:02:54 -0700 Subject: [PATCH 07/10] Update programming-guide.md Fixed a few more small items. --- docs/programming-guide.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 49348becce36..86bdc35e6722 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1117,12 +1117,12 @@ used to perform a global sort. A similar operation, may be used to enact a Hadoop style shuffle. Operations which can cause a shuffle include **repartition** operations like -[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey** operations +[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations (except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and **join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink). #### Performance Impact -**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and +The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark's `map` and `reduce` operations. @@ -1133,8 +1133,8 @@ read the relevant sorted blocks. Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, -`reduceByKey` and `aggregateByKey` create these structures on the map-side and `'byKey` operations -generate these on the reduce-side. When data does not fit in memory Spark will spill these tables +`reduceByKey` and `aggregateByKey` create these structures on the map side and `'ByKey` operations +generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files From dffbd2d766121f281b09b690f6bfdee720a630cb Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Thu, 26 Mar 2015 09:12:45 -0700 Subject: [PATCH 08/10] [SPARK-5750] Slight wording update --- docs/programming-guide.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 25eaa440b526..9c8e25aa84f2 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1115,8 +1115,7 @@ organize those such that all values for any key lie within the same partition - Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle -operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy` can be -used to perform a global sort. A similar operation, +operations, `sortBy` can be used to perform a global sort. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a Hadoop style shuffle. From 7a0b96f24fcf90d853bb85e7a95aa8342bc051f9 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Thu, 26 Mar 2015 12:53:52 -0700 Subject: [PATCH 09/10] Update programming-guide.md Removed extraneous reference to repartitionAndSort --- docs/programming-guide.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 7d26f53c4261..3eb88db32d06 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1115,9 +1115,7 @@ organize those such that all values for any key lie within the same partition - Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle -operations, `sortBy` can be used to perform a global sort. A similar operation, -[`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, -may be used to enact a Hadoop style shuffle. +operations, `sortBy` can be used to perform a global sort. Operations which can cause a shuffle include **repartition** operations like [`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations From 6178e2446924392fd470466caa0e4490b89b4266 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Fri, 27 Mar 2015 08:54:06 -0700 Subject: [PATCH 10/10] Update programming-guide.md Reworded discussion on sorting within partitions. --- docs/programming-guide.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 3eb88db32d06..e31cd7fe2055 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1113,9 +1113,13 @@ all-to-all operation. It must read from all partitions to find all the values fo organize those such that all values for any key lie within the same partition - this is called the **shuffle**. -Although the set of elements in each partition of newly shuffled data will be deterministic, the -ordering of these elements is not. If one desires predictably ordered data following shuffle -operations, `sortBy` can be used to perform a global sort. +Although the set of elements in each partition of newly shuffled data will be deterministic, and so +is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably +ordered data following shuffle then it's possible to use: + +* `mapPartitions` to sort each partition using, for example, `.sorted` +* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning +* `sortBy` to make a globally ordered RDD Operations which can cause a shuffle include **repartition** operations like [`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations