You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/programming-guide.md
+48-15Lines changed: 48 additions & 15 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1088,26 +1088,59 @@ for details.
1088
1088
1089
1089
### Shuffle operations
1090
1090
1091
-
Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-distributing data so that is grouped differently across partitions. This typically involves re-arranging and copying data across executors and machines, making shuffle a complex and costly operation.
1091
+
Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's
1092
+
mechanism for re-distributing data so that is grouped differently across partitions. This typically
1093
+
involves re-arranging and copying data across executors and machines, making shuffle a complex and
1094
+
costly operation.
1092
1095
1093
1096
#### Background
1094
1097
1095
-
To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a tuple - the key and an `Iterable` object containing all the associated values. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to present a single array per key.
1096
-
1097
-
In Spark, data is generally not distributed across partitions to be in the ncessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**.
1098
-
1099
-
Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy` can be used to perform a global sort. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a Hadoop style shuffle.
1100
-
1101
-
Operations which can cause a shuffle include **repartion** operations like [`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey** operations (except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink) , and **join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
1098
+
To understand what happens during the shuffle we can consider the example of the
1099
+
[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all
1100
+
values for a single key are combined into a tuple - the key and the result of executing a reduce
1101
+
function against all values associated with that key. The challenge is that not all values for a
1102
+
single key necessarily reside on the same partition, or even the same machine, but they must be
1103
+
co-located to present a single array per key.
1104
+
1105
+
In Spark, data is generally not distributed across partitions to be in the necessary place for a
1106
+
specific operation. During computations, a single task will operate on a single partition - thus, to
1107
+
organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an
1108
+
all-to-all operation. It must read from all partitions to find all the values for all keys, and then
1109
+
organize those such that all values for any key lie within the same partition - this is called the
1110
+
**shuffle**.
1111
+
1112
+
Although the set of elements in each partition of newly shuffled data will be deterministic, the
1113
+
ordering of these elements is not. If one desires predictably ordered data following shuffle
1114
+
operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy` can be
1115
+
used to perform a global sort. A similar operation,
1116
+
[`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`,
1117
+
may be used to enact a Hadoop style shuffle.
1118
+
1119
+
Operations which can cause a shuffle include **repartition** operations like
1120
+
[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey** operations
1121
+
(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and
1122
+
**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
1102
1123
1103
1124
#### Performance Impact
1104
-
**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, internally, Spark creates a hash table, which, for large operations, can consume significant amounts of heap memory. When data does not fit in memory, for all shuffle operations with the exception of `sortByKey`, Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Since `sortByKey` does not spill these intermediate tables to disk, the shuffle operation may cause OOM errors.
1105
-
1106
-
As of Spark 1.1, Spark provides an alternative implementation for the hash-based shuffle described above. This sort-based shuffle avoids generating a file for every combination of map and reduce tasks. Instead, results from individual map jobs are kept in memory until they can't fit. Then, they are organized by the target reduce task (the one that needs those results) and then spilled to a single file, which are subsequently aggregated on the reduce side. Sort-based shuffle can significantly improve reduce-stage performance at the expense of moderately increased map-stage run-time.
1107
-
1108
-
Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. This is done so shuffle doesn't need to be re-computed if the lineage is recomputed. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context.
1109
-
1110
-
Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the 'Shuffle Behavior' section within the Spark Configuration Guide.
1125
+
**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and
1126
+
network I/O. To organize data for the shuffle, Spark generates two sets of tasks - map tasks to
1127
+
organize the data, and a set of reduce tasks to aggregate it. Internally, results from individual
1128
+
map jobs are kept in memory until they can't fit. Then, these are sorted based on the target reduce
1129
+
task and written to a single file. On the reduce side, tasks read the relevant sorted blocks.
1130
+
1131
+
Certain shuffle operations can consume significant amounts of heap memory since they generate hash
1132
+
tables in memory. Specifically, `reduceByKey` and `aggregateByKey` on the map-side and `'byKey`
1133
+
operations on the reduce-side. When data does not fit in memory Spark will spill these tables to
1134
+
disk, incurring the additional overhead of disk I/O and increased garbage collection.
1135
+
1136
+
Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
1137
+
are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
1138
+
long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need
1139
+
to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the
1140
+
`spark.local.dir` configuration parameter when configuring the Spark context.
1141
+
1142
+
Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the
1143
+
'Shuffle Behavior' section within the Spark Configuration Guide.
0 commit comments