Skip to content

Commit fc8f98e

Browse files
committed
Refactor to remove code duplication and do the .get earlier
1 parent af4aea3 commit fc8f98e

File tree

3 files changed

+31
-32
lines changed

3 files changed

+31
-32
lines changed

core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,11 @@ private[spark] class PartitionedAppendOnlyMap[K, V]
3030

3131
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
3232
: Iterator[((Int, K), V)] = {
33-
val comparator : Comparator[(Int, K)] =
34-
if (keyComparator.isEmpty) {
35-
partitionComparator
36-
} else {
37-
new Comparator[(Int, K)] {
38-
override def compare(a: (Int, K), b: (Int, K)): Int = {
39-
val partitionDiff = a._1 - b._1
40-
if (partitionDiff != 0) {
41-
partitionDiff
42-
} else {
43-
keyComparator.get.compare(a._2, b._2)
44-
}
45-
}
46-
}
47-
}
48-
destructiveSortedIterator(comparator)
33+
destructiveSortedIterator(getComparator(keyComparator))
4934
}
5035

5136
def insert(partition: Int, key: K, value: V): Unit = {
5237
update((partition, key), value)
5338
}
5439
}
40+

core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,8 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
7474
/** Iterate through the data in a given order. For this class this is not really destructive. */
7575
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
7676
: Iterator[((Int, K), V)] = {
77-
val comparator : Comparator[(Int, K)] =
78-
if (keyComparator.isEmpty) {
79-
partitionComparator
80-
} else {
81-
new Comparator[(Int, K)] {
82-
override def compare(a: (Int, K), b: (Int, K)): Int = {
83-
val partitionDiff = a._1 - b._1
84-
if (partitionDiff != 0) {
85-
partitionDiff
86-
} else {
87-
keyComparator.get.compare(a._2, b._2)
88-
}
89-
}
90-
}
91-
}
92-
new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
77+
new Sorter(new KVArraySortDataFormat[(Int, K),
78+
AnyRef]).sort(data, 0, curSize, getComparator(keyComparator))
9379
iterator
9480
}
9581

@@ -112,3 +98,4 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
11298
private object PartitionedPairBuffer {
11399
val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
114100
}
101+

core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,31 @@ private[spark] object WritablePartitionedPairCollection {
7474
}
7575
}
7676

77+
/* Takes an optional parameter (keyComparator), use if provided
78+
* and returns a comparator for the partitions
79+
*/
80+
def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = {
81+
val comparator : Comparator[(Int, K)] =
82+
if (keyComparator.isEmpty) {
83+
partitionComparator
84+
} else {
85+
new Comparator[(Int, K)] {
86+
// We know we have a non-empty comparator here
87+
val ourKeyComp = keyComparator.get
88+
override def compare(a: (Int, K), b: (Int, K)): Int = {
89+
val partitionDiff = a._1 - b._1
90+
if (partitionDiff != 0) {
91+
partitionDiff
92+
} else {
93+
ourKeyComp.compare(a._2, b._2)
94+
}
95+
}
96+
}
97+
}
98+
comparator
99+
}
100+
101+
77102
/**
78103
* A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
79104
*/
@@ -102,3 +127,4 @@ private[spark] trait WritablePartitionedIterator {
102127

103128
def nextPartition(): Int
104129
}
130+

0 commit comments

Comments
 (0)