Skip to content

Commit ae538dc

Browse files
committed
Document UnsafeShuffleManager.
1 parent ec6d626 commit ae538dc

File tree

2 files changed

+185
-0
lines changed

2 files changed

+185
-0
lines changed

core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import org.apache.spark.serializer.Serializer
2222
import org.apache.spark.shuffle._
2323
import org.apache.spark.shuffle.sort.SortShuffleManager
2424

25+
/**
26+
* Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the new shuffle.
27+
*/
2528
private class UnsafeShuffleHandle[K, V](
2629
shuffleId: Int,
2730
override val numMaps: Int,
@@ -30,6 +33,10 @@ private class UnsafeShuffleHandle[K, V](
3033
}
3134

3235
private[spark] object UnsafeShuffleManager extends Logging {
36+
/**
37+
* Helper method for determining whether a shuffle should use the optimized unsafe shuffle
38+
* path or whether it should fall back to the original sort-based shuffle.
39+
*/
3340
def canUseUnsafeShuffle[K, V, C](dependency: ShuffleDependency[K, V, C]): Boolean = {
3441
val shufId = dependency.shuffleId
3542
val serializer = Serializer.getSerializer(dependency.serializer)
@@ -43,13 +50,63 @@ private[spark] object UnsafeShuffleManager extends Logging {
4350
} else if (dependency.keyOrdering.isDefined) {
4451
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
4552
false
53+
} else if (dependency.partitioner.numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) {
54+
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
55+
s"${PackedRecordPointer.MAXIMUM_PARTITION_ID} partitions")
56+
false
4657
} else {
4758
log.debug(s"Can use UnsafeShuffle for shuffle $shufId")
4859
true
4960
}
5061
}
5162
}
5263

64+
/**
65+
* A shuffle implementation that uses directly-managed memory to implement several performance
66+
* optimizations for certain types of shuffles. In cases where the new performance optimizations
67+
* cannot be applied, this shuffle manager delegates to [[SortShuffleManager]] to handle those
68+
* shuffles.
69+
*
70+
* UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold:
71+
*
72+
* - The shuffle dependency specifies no aggregation or output ordering.
73+
* - The shuffle serializer supports relocation of serialized values (this is currently supported
74+
* by KryoSerializer and Spark SQL's custom serializers).
75+
* - The shuffle produces fewer than 16777216 output partitions.
76+
* - No individual record is larger than 128 MB when serialized.
77+
*
78+
* In addition, extra spill-merging optimizations are automatically applied when the shuffle
79+
* compression codec supports concatenation of serialized streams. This is currently supported by
80+
* Spark's LZF serializer.
81+
*
82+
* At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager.
83+
* In sort-based shuffle, incoming records are sorted according to their target partition ids, then
84+
* written to a single map output file. Reducers fetch contiguous regions of this file in order to
85+
* read their portion of the map output. In cases where the map output data is too large to fit in
86+
* memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
87+
* to produce the final output file.
88+
*
89+
* UnsafeShuffleManager optimizes this process in several ways:
90+
*
91+
* - Its sort operates on serialized binary data rather than Java objects, which reduces memory
92+
* consumption and GC overheads. This optimization requires the record serializer to have certain
93+
* properties to allow serialized records to be re-ordered without requiring deserialization.
94+
* See SPARK-4550, where this optimization was first proposed and implemented, for more details.
95+
*
96+
* - It uses a specialized cache-efficient sorter ([[UnsafeShuffleExternalSorter]]) that sorts
97+
* arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
98+
* record in the sorting array, this fits more of the array into cache.
99+
*
100+
* - The spill merging procedure operates on blocks of serialized records that belong to the same
101+
* partition and does not need to deserialize records during the merge.
102+
*
103+
* - When the spill compression codec supports concatenation of compressed data, the spill merge
104+
* simply concatenates the serialized and compressed spill partitions to produce the final output
105+
* partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used
106+
* and avoids the need to allocate decompression or copying buffers during the merge.
107+
*
108+
* For more details on UnsafeShuffleManager's design, see SPARK-7081.
109+
*/
53110
private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager {
54111

55112
private[this] val sortShuffleManager: SortShuffleManager = new SortShuffleManager(conf)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe
19+
20+
import org.mockito.Mockito._
21+
import org.mockito.invocation.InvocationOnMock
22+
import org.mockito.stubbing.Answer
23+
import org.scalatest.{FunSuite, Matchers}
24+
25+
import org.apache.spark._
26+
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
27+
28+
/**
29+
* Tests for the fallback logic in UnsafeShuffleManager. Actual tests of shuffling data are
30+
* performed in other suites.
31+
*/
32+
class UnsafeShuffleManagerSuite extends FunSuite with Matchers {
33+
34+
import UnsafeShuffleManager.canUseUnsafeShuffle
35+
36+
private class RuntimeExceptionAnswer extends Answer[Object] {
37+
override def answer(invocation: InvocationOnMock): Object = {
38+
throw new RuntimeException("Called non-stubbed method, " + invocation.getMethod.getName)
39+
}
40+
}
41+
42+
private def shuffleDep(
43+
partitioner: Partitioner,
44+
serializer: Option[Serializer],
45+
keyOrdering: Option[Ordering[Any]],
46+
aggregator: Option[Aggregator[Any, Any, Any]],
47+
mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = {
48+
val dep = mock(classOf[ShuffleDependency[Any, Any, Any]], new RuntimeExceptionAnswer())
49+
doReturn(0).when(dep).shuffleId
50+
doReturn(partitioner).when(dep).partitioner
51+
doReturn(serializer).when(dep).serializer
52+
doReturn(keyOrdering).when(dep).keyOrdering
53+
doReturn(aggregator).when(dep).aggregator
54+
doReturn(mapSideCombine).when(dep).mapSideCombine
55+
dep
56+
}
57+
58+
test("supported shuffle dependencies") {
59+
val kryo = Some(new KryoSerializer(new SparkConf()))
60+
61+
assert(canUseUnsafeShuffle(shuffleDep(
62+
partitioner = new HashPartitioner(2),
63+
serializer = kryo,
64+
keyOrdering = None,
65+
aggregator = None,
66+
mapSideCombine = false
67+
)))
68+
69+
val rangePartitioner = mock(classOf[RangePartitioner[Any, Any]])
70+
when(rangePartitioner.numPartitions).thenReturn(2)
71+
assert(canUseUnsafeShuffle(shuffleDep(
72+
partitioner = rangePartitioner,
73+
serializer = kryo,
74+
keyOrdering = None,
75+
aggregator = None,
76+
mapSideCombine = false
77+
)))
78+
79+
}
80+
81+
test("unsupported shuffle dependencies") {
82+
val kryo = Some(new KryoSerializer(new SparkConf()))
83+
val java = Some(new JavaSerializer(new SparkConf()))
84+
85+
// We only support serializers that support object relocation
86+
assert(!canUseUnsafeShuffle(shuffleDep(
87+
partitioner = new HashPartitioner(2),
88+
serializer = java,
89+
keyOrdering = None,
90+
aggregator = None,
91+
mapSideCombine = false
92+
)))
93+
94+
// We do not support shuffles with more than 16 million output partitions
95+
assert(!canUseUnsafeShuffle(shuffleDep(
96+
partitioner = new HashPartitioner(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1),
97+
serializer = kryo,
98+
keyOrdering = None,
99+
aggregator = None,
100+
mapSideCombine = false
101+
)))
102+
103+
// We do not support shuffles that perform any kind of aggregation or sorting of keys
104+
assert(!canUseUnsafeShuffle(shuffleDep(
105+
partitioner = new HashPartitioner(2),
106+
serializer = kryo,
107+
keyOrdering = Some(mock(classOf[Ordering[Any]])),
108+
aggregator = None,
109+
mapSideCombine = false
110+
)))
111+
assert(!canUseUnsafeShuffle(shuffleDep(
112+
partitioner = new HashPartitioner(2),
113+
serializer = kryo,
114+
keyOrdering = None,
115+
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
116+
mapSideCombine = false
117+
)))
118+
// We do not support shuffles that perform any kind of aggregation or sorting of keys
119+
assert(!canUseUnsafeShuffle(shuffleDep(
120+
partitioner = new HashPartitioner(2),
121+
serializer = kryo,
122+
keyOrdering = Some(mock(classOf[Ordering[Any]])),
123+
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
124+
mapSideCombine = true
125+
)))
126+
}
127+
128+
}

0 commit comments

Comments
 (0)