Skip to content

Commit 5974d10

Browse files
Yash DattaYash Datta
authored andcommitted
SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
1 parent 14fa87b commit 5974d10

File tree

1 file changed

+10
-5
lines changed
  • core/src/main/scala/org/apache/spark/rdd

1 file changed

+10
-5
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,15 +1146,20 @@ abstract class RDD[T: ClassTag](
11461146
if (num == 0) {
11471147
Array.empty
11481148
} else {
1149-
mapPartitions { items =>
1149+
val mapRDDs = mapPartitions { items =>
11501150
// Priority keeps the largest elements, so let's reverse the ordering.
11511151
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
11521152
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
11531153
Iterator.single(queue)
1154-
}.reduce { (queue1, queue2) =>
1155-
queue1 ++= queue2
1156-
queue1
1157-
}.toArray.sorted(ord)
1154+
}
1155+
if (mapRDDs.partitions.size == 0) {
1156+
Array.empty
1157+
} else {
1158+
mapRDDs.reduce { (queue1, queue2) =>
1159+
queue1 ++= queue2
1160+
queue1
1161+
}.toArray.sorted(ord)
1162+
}
11581163
}
11591164
}
11601165

0 commit comments

Comments
 (0)