Skip to content

Commit c72362a

Browse files
committed
Added bug fix and test for when iterators are empty
1 parent de1fb40 commit c72362a

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,9 @@ private[spark] class ExternalSorter[K, V, C](
289289
inMemBuffered.hasNext && inMemBuffered.head._1._1 == p
290290
}
291291
override def next(): Product2[K, C] = {
292+
if (!hasNext) {
293+
throw new NoSuchElementException
294+
}
292295
val elem = inMemBuffered.next()
293296
(elem._1._2, elem._2)
294297
}
@@ -314,7 +317,7 @@ private[spark] class ExternalSorter[K, V, C](
314317
private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])
315318
: Iterator[Product2[K, C]] =
316319
{
317-
val bufferedIters = iterators.map(_.buffered)
320+
val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
318321
type Iter = BufferedIterator[Product2[K, C]]
319322
val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
320323
override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)
@@ -404,7 +407,7 @@ private[spark] class ExternalSorter[K, V, C](
404407
// from the iterators, without buffering all the ones that are "equal" to a given key.
405408
// We do so with code similar to mergeSort, except our Iterator.next combines together all
406409
// the elements with the given key.
407-
val bufferedIters = iterators.map(_.buffered)
410+
val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
408411
type Iter = BufferedIterator[Product2[K, C]]
409412
val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
410413
override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)

core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,36 @@ import org.apache.spark._
2525
import org.apache.spark.SparkContext._
2626

2727
class ExternalSorterSuite extends FunSuite with LocalSparkContext {
28+
test("empty data stream") {
29+
val conf = new SparkConf(false)
30+
conf.set("spark.shuffle.memoryFraction", "0.001")
31+
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
32+
sc = new SparkContext("local", "test", conf)
33+
34+
val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
35+
val ord = implicitly[Ordering[Int]]
36+
37+
// Both aggregator and ordering
38+
val sorter = new ExternalSorter[Int, Int, Int](
39+
Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
40+
assert(sorter.iterator.toSeq === Seq())
41+
42+
// Only aggregator
43+
val sorter2 = new ExternalSorter[Int, Int, Int](
44+
Some(agg), Some(new HashPartitioner(3)), None, None)
45+
assert(sorter2.iterator.toSeq === Seq())
46+
47+
// Only ordering
48+
val sorter3 = new ExternalSorter[Int, Int, Int](
49+
None, Some(new HashPartitioner(3)), Some(ord), None)
50+
assert(sorter3.iterator.toSeq === Seq())
51+
52+
// Neither aggregator nor ordering
53+
val sorter4 = new ExternalSorter[Int, Int, Int](
54+
None, Some(new HashPartitioner(3)), None, None)
55+
assert(sorter4.iterator.toSeq === Seq())
56+
}
57+
2858
test("spilling in local cluster") {
2959
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
3060
conf.set("spark.shuffle.memoryFraction", "0.001")

0 commit comments

Comments
 (0)