Skip to content

Commit 34e526e

Browse files
committed
Enable Tungsten shuffle for non-agg shuffles w/ key orderings
1 parent e9471d3 commit 34e526e

File tree

2 files changed

+10
-12
lines changed

2 files changed

+10
-12
lines changed

core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging {
5656
} else if (dependency.aggregator.isDefined) {
5757
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined")
5858
false
59-
} else if (dependency.keyOrdering.isDefined) {
60-
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
61-
false
6259
} else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) {
6360
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
6461
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")

core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
7676
mapSideCombine = false
7777
)))
7878

79+
// Shuffles with key orderings are supported as long as no aggregator is specified
80+
assert(canUseUnsafeShuffle(shuffleDep(
81+
partitioner = new HashPartitioner(2),
82+
serializer = kryo,
83+
keyOrdering = Some(mock(classOf[Ordering[Any]])),
84+
aggregator = None,
85+
mapSideCombine = false
86+
)))
87+
7988
}
8089

8190
test("unsupported shuffle dependencies") {
@@ -100,22 +109,14 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
100109
mapSideCombine = false
101110
)))
102111

103-
// We do not support shuffles that perform any kind of aggregation or sorting of keys
104-
assert(!canUseUnsafeShuffle(shuffleDep(
105-
partitioner = new HashPartitioner(2),
106-
serializer = kryo,
107-
keyOrdering = Some(mock(classOf[Ordering[Any]])),
108-
aggregator = None,
109-
mapSideCombine = false
110-
)))
112+
// We do not support shuffles that perform aggregation
111113
assert(!canUseUnsafeShuffle(shuffleDep(
112114
partitioner = new HashPartitioner(2),
113115
serializer = kryo,
114116
keyOrdering = None,
115117
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
116118
mapSideCombine = false
117119
)))
118-
// We do not support shuffles that perform any kind of aggregation or sorting of keys
119120
assert(!canUseUnsafeShuffle(shuffleDep(
120121
partitioner = new HashPartitioner(2),
121122
serializer = kryo,

0 commit comments

Comments
 (0)