Skip to content

Commit 53ed170

Browse files
committed
Refactoring
1 parent d342394 commit 53ed170

File tree

2 files changed

+16
-35
lines changed

2 files changed

+16
-35
lines changed

core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
7474
/** Iterate through the data in a given order. For this class this is not really destructive. */
7575
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
7676
: Iterator[((Int, K), V)] = {
77-
new Sorter(new KVArraySortDataFormat[(Int, K),
78-
AnyRef]).sort(data, 0, curSize, getComparator(keyComparator))
77+
val comparator = getComparator(keyComparator)
78+
new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
7979
iterator
8080
}
8181

core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -74,44 +74,25 @@ private[spark] object WritablePartitionedPairCollection {
7474
}
7575
}
7676

77-
/* Takes an optional parameter (keyComparator), use if provided
77+
/**
78+
* Takes an optional parameter (keyComparator), use if provided
7879
* and returns a comparator for the partitions
7980
*/
80-
def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = {
81-
val comparator : Comparator[(Int, K)] =
82-
if (keyComparator.isEmpty) {
83-
partitionComparator
84-
} else {
85-
new Comparator[(Int, K)] {
86-
// We know we have a non-empty comparator here
87-
val ourKeyComp = keyComparator.get
88-
override def compare(a: (Int, K), b: (Int, K)): Int = {
89-
val partitionDiff = a._1 - b._1
90-
if (partitionDiff != 0) {
91-
partitionDiff
92-
} else {
93-
ourKeyComp.compare(a._2, b._2)
94-
}
81+
def getComparator[K](keyComparator: Option[Comparator[K]]): Comparator[(Int, K)] = {
82+
if (!keyComparator.isDefined) return partitionComparator
83+
else {
84+
val theKeyComp = keyComparator.get
85+
new Comparator[(Int, K)] {
86+
// We know we have a non-empty comparator here
87+
override def compare(a: (Int, K), b: (Int, K)): Int = {
88+
val partitionDiff = a._1 - b._1
89+
if (partitionDiff != 0) {
90+
partitionDiff
91+
} else {
92+
theKeyComp.compare(a._2, b._2)
9593
}
9694
}
9795
}
98-
comparator
99-
}
100-
101-
102-
/**
103-
* A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
104-
*/
105-
def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
106-
new Comparator[(Int, K)] {
107-
override def compare(a: (Int, K), b: (Int, K)): Int = {
108-
val partitionDiff = a._1 - b._1
109-
if (partitionDiff != 0) {
110-
partitionDiff
111-
} else {
112-
keyComparator.compare(a._2, b._2)
113-
}
114-
}
11596
}
11697
}
11798
}

0 commit comments

Comments
 (0)