From 7d2e53afdddd6d9c903f89c2457ca4b256693849 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Wed, 2 Nov 2016 11:30:51 +0000 Subject: [PATCH] [SPARK-18223] [CORE] Optimise PartitionedAppendOnlyMap implementation This class, like the PartitionedPairBuffer class, are both core Spark data structures that allow us to spill data to disk. From the comment in ExternalSorter before instantiating said data structures: // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. All of our data within RDDs has a partition ID and the ordering operations will order by a partition before any other criteria. Such data structures share a partitionKeyComparator from WriteablePartitionedPairCollection. While this change adds more code, it is the bad iterator wrapping we remove that has a negative performance impact. In this case we avoid said wrapping to help the inliner. When avoided we've observed a 3% PageRank performance increase on HiBench large for both IBM's SDK for Java and OpenJDK 8 as a result of the inliner being better able to figure out what's going on. This observation is seen when combined with an optimisation PartitionedPairBuffer implementation I'll also contribute. --- .../collection/PartitionedAppendOnlyMap.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala index d0d25b43d0477..798e7b54aa8c3 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala @@ -30,7 +30,20 @@ private[spark] class PartitionedAppendOnlyMap[K, V] def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { - val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator) + val comparator : Comparator[(Int, K)] = + if (keyComparator.isEmpty) { + partitionComparator + } else + new Comparator[(Int, K)] { + override def compare(a: (Int, K), b: (Int, K)): Int = { + val partitionDiff = a._1 - b._1 + if (partitionDiff != 0) { + partitionDiff + } else { + keyComparator.get.compare(a._2, b._2) + } + } + } destructiveSortedIterator(comparator) }