Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 72 additions & 11 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ for details.
<td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td>
</tr>
<tr>
<td> <b>mapPartitions</b>(<i>func</i>) </td>
<td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td>
<td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
Iterator&lt;T&gt; => Iterator&lt;U&gt; when running on an RDD of type T. </td>
</tr>
Expand All @@ -964,7 +964,7 @@ for details.
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
</tr>
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
Expand All @@ -975,25 +975,25 @@ for details.
</td>
</tr>
<tr>
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) <a name="ReduceByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) <a name="AggregateByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td>
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) <a name="SortByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
</tr>
<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="JoinLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
</td>
</tr>
<tr>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="CogroupLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt;, Iterable&lt;W&gt;)) tuples. This operation is also called <code>groupWith</code>. </td>
</tr>
<tr>
Expand All @@ -1006,17 +1006,17 @@ for details.
process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
</tr>
<tr>
<td> <b>coalesce</b>(<i>numPartitions</i>) </td>
<td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td>
<td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
after filtering down a large dataset. </td>
</tr>
<tr>
<td> <b>repartition</b>(<i>numPartitions</i>) </td>
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
This always shuffles all data over the network. </td>
This always shuffles all data over the network. <a name="RepartitionLink"></a></td>
</tr>
<tr>
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td>
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td>
<td> Repartition the RDD according to the given partitioner and, within each resulting partition,
sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
each partition because it can push the sorting down into the shuffle machinery. </td>
Expand Down Expand Up @@ -1080,7 +1080,7 @@ for details.
<code>SparkContext.objectFile()</code>. </td>
</tr>
<tr>
<td> <b>countByKey</b>() </td>
<td> <b>countByKey</b>() <a name="CountByLink"></a> </td>
<td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td>
</tr>
<tr>
Expand All @@ -1090,6 +1090,67 @@ for details.
</tr>
</table>

### Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's
mechanism for re-distributing data so that is grouped differently across partitions. This typically
involves copying data across executors and machines, making the shuffle a complex and
costly operation.

#### Background

To understand what happens during the shuffle we can consider the example of the
[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all
values for a single key are combined into a tuple - the key and the result of executing a reduce
function against all values associated with that key. The challenge is that not all values for a
single key necessarily reside on the same partition, or even the same machine, but they must be
co-located to compute the result.

In Spark, data is generally not distributed across partitions to be in the necessary place for a
specific operation. During computations, a single task will operate on a single partition - thus, to
organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an
all-to-all operation. It must read from all partitions to find all the values for all keys, and then
organize those such that all values for any key lie within the same partition - this is called the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true for reduceByKey, though? It's certainly true for groupByKey, but reduceByKey and combineByKey will perform map-side combining so it's not strictly true that all values for a key must be co-located before computing the new reduced value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the discussion up-thread and noticed that this originally referred to groupByKey() but was changed because we're trying to discourage users from using that operation. The previous sentence is clear, because it says "organize all the data for a single reduceByKey reduce task", which is true because the "data" here can refer to partially-combined results rather than input records. This current sentence was what tripped me up, since only groupByKey requires that "all values" for any key be co-located. But maybe I'm just overthinking this, since I suppose that "values" could also refer to partially-combined values.

I'm fine with this as long as it's not confusing to anyone else.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a reasonable point. I think it slightly outweighs the earlier concern about even mentioning groupByKey. @ilganeli would you mind making this final change? to, say, "a single groupByKey task"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is not great. Leave the reduceByKey example. I think the final sentence can be "... and then bring together values across partitions to compute the final result for each key - ..." I can just tap this in since I am pretty certain everyone is happy at this point.

**shuffle**.

Although the set of elements in each partition of newly shuffled data will be deterministic, and so
is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
ordered data following shuffle then it's possible to use:

* `mapPartitions` to sort each partition using, for example, `.sorted`
* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning
* `sortBy` to make a globally ordered RDD

Operations which can cause a shuffle include **repartition** operations like
[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations
(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and
**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).

#### Performance Impact
The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and
network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to
organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from
MapReduce and does not directly relate to Spark's `map` and `reduce` operations.

Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
are sorted based on the target partition and written to a single file. On the reduce side, tasks
read the relevant sorted blocks.

Certain shuffle operations can consume significant amounts of heap memory since they employ
in-memory data structures to organize records before or after transferring them. Specifically,
`reduceByKey` and `aggregateByKey` create these structures on the map side and `'ByKey` operations
generate these on the reduce side. When data does not fit in memory Spark will spill these tables
to disk, incurring the additional overhead of disk I/O and increased garbage collection.

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ilganeli @srowen @JoshRosen Can you explain why was this added? Shuffle files are cleared automatically when the driver garbage collects the shuffle object which triggers messages to all the executors to delete all files related to the shuffle. This was added in Spark 1.0. Has there been any change in this behavior since then that justifies this statement?

If this statement is true, then this is a major major issue for long running jobs like Spark Streaming job. If this statement is not true, then this should not have been added and should be fixed promptly. Users have come to be personally asking me that they are not upgrading to Spark 1.3 because they this behavior is a regression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I had a couple of people ask me this during Spark Summit. I was investigating this myself today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas This bit is a response to https://issues.apache.org/jira/browse/SPARK-5836 You would likely know better, and if GC of shuffle-related objects should trigger cleanup of shuffle files, then phew, at least there is some mechanism for that. I know people occasionally ask about problems with too many shuffle files lying around, but that doesn't mean the mechanism doesn't work. I think the best short-term change is just to update this statement if you're pretty confident this mechanism works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That clarifies things for me. I know that there has been some concern about
the shuffle files filling up disk, but that as of now can happen because
one or more of the following reasons.

  1. GC does not kick in for a long time (very high driver memory). The
    solution may often be periodically call GC.
  2. Nothing goes out of scope and so nothing is GCed.
  3. There are some issues reported with shuffle files not being cleaned up
    in Mesos

The 3rd one is a bug and we will fix it. The first two should be clarified
in the docs. That is better than than this current very scary description.

On Wed, Jun 17, 2015 at 11:41 PM, Sean Owen [email protected]
wrote:

In docs/programming-guide.md
#5074 (comment):

+organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from
+MapReduce and does not directly relate to Spark's map and reduce operations.
+
+Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
+are sorted based on the target partition and written to a single file. On the reduce side, tasks
+read the relevant sorted blocks.
+
+Certain shuffle operations can consume significant amounts of heap memory since they employ
+in-memory data structures to organize records before or after transferring them. Specifically,
+reduceByKey and aggregateByKey create these structures on the map side and 'ByKey operations
+generate these on the reduce side. When data does not fit in memory Spark will spill these tables
+to disk, incurring the additional overhead of disk I/O and increased garbage collection.
+
+Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
+are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
+long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need

@tdas https://github.com/tdas This bit is a response to
https://issues.apache.org/jira/browse/SPARK-5836 You would likely know
better, and if GC of shuffle-related objects should trigger cleanup of
shuffle files, then phew, at least there is some mechanism for that. I
know people occasionally ask about problems with too many shuffle files
lying around, but that doesn't mean the mechanism doesn't work. I think the
best short-term change is just to update this statement if you're pretty
confident this mechanism works.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5074/files#r32702504.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas #6901 WDYT?

to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the
`spark.local.dir` configuration parameter when configuring the Spark context.

Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the
'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html).

## RDD Persistence

One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory
Expand Down