Skip to content

Commit 4a05a40

Browse files
committed
Modify according to comments
1 parent 1f7dcc8 commit 4a05a40

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.spark.shuffle.hash
1919

20+
import org.apache.spark.{InterruptibleIterator, TaskContext}
2021
import org.apache.spark.serializer.Serializer
2122
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
22-
import org.apache.spark.TaskContext
2323

2424
class HashShuffleReader[K, C](
2525
handle: BaseShuffleHandle[K, _, C],
@@ -40,12 +40,12 @@ class HashShuffleReader[K, C](
4040

4141
if (dep.aggregator.isDefined) {
4242
if (dep.mapSideCombine) {
43-
dep.aggregator.get.combineCombinersByKey(iter, context)
43+
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
4444
} else {
45-
dep.aggregator.get.combineValuesByKey(iter, context)
45+
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
4646
}
4747
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
48-
throw new IllegalStateException("Aggregator is empty for reduce-side combine")
48+
throw new IllegalStateException("Aggregator is empty for map-side combine")
4949
} else {
5050
iter
5151
}

core/src/test/scala/org/apache/spark/ShuffleSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
6060
NonJavaSerializableClass,
6161
NonJavaSerializableClass,
6262
(Int, NonJavaSerializableClass)](b, new HashPartitioner(NUM_BLOCKS))
63-
.setSerializer(new KryoSerializer(conf))
63+
c.setSerializer(new KryoSerializer(conf))
6464
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
6565

6666
assert(c.count === 10)
@@ -85,7 +85,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
8585
NonJavaSerializableClass,
8686
NonJavaSerializableClass,
8787
(Int, NonJavaSerializableClass)](b, new HashPartitioner(3))
88-
.setSerializer(new KryoSerializer(conf))
88+
c.setSerializer(new KryoSerializer(conf))
8989
assert(c.count === 10)
9090
}
9191

0 commit comments

Comments
 (0)