From 0d1411c0f14c7679931124838f01e8f44618c15f Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Wed, 2 Nov 2016 11:36:28 +0000 Subject: [PATCH 1/6] [SPARK-18224] [CORE] Optimise PartitionedPairBuffer implementation This change is very similar to my pull request or improving PartitionedPairAppendOnlyMap: https://github.com/apache/spark/pull/15735 Summarising (more detail above), we avoid the slow iterator wrapping in favour of helping the inliner. We observed that this, when combined with the above change, leads to a 3% performance increase on the HiBench large PageRank benchmark with both IBM's SDK for Java and with OpenJDK 8 --- .../util/collection/PartitionedPairBuffer.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index f5844d5353be..33d99d28e98c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -74,7 +74,20 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) /** Iterate through the data in a given order. For this class this is not really destructive. */ override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { - val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator) + val comparator : Comparator[(Int, K)] = + if (keyComparator.isEmpty) { + partitionComparator + } else + new Comparator[(Int, K)] { + override def compare(a: (Int, K), b: (Int, K)): Int = { + val partitionDiff = a._1 - b._1 + if (partitionDiff != 0) { + partitionDiff + } else { + keyComparator.get.compare(a._2, b._2) + } + } + } new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator) iterator } From a3d85b62b8baeaaba2b2102029ddb560346557de Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Wed, 2 Nov 2016 13:29:21 +0000 Subject: [PATCH 2/6] Scalastyle for PartitionedPairBuffer --- .../apache/spark/util/collection/PartitionedPairBuffer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index 33d99d28e98c..d758a50965d1 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -74,10 +74,10 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) /** Iterate through the data in a given order. For this class this is not really destructive. */ override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { - val comparator : Comparator[(Int, K)] = + val comparator : Comparator[(Int, K)] = if (keyComparator.isEmpty) { partitionComparator - } else + } else { new Comparator[(Int, K)] { override def compare(a: (Int, K), b: (Int, K)): Int = { val partitionDiff = a._1 - b._1 @@ -87,6 +87,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) keyComparator.get.compare(a._2, b._2) } } + } } new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator) iterator From af4aea3be6a6c80b81d7855fad1c4c0739cc46a4 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Wed, 2 Nov 2016 13:42:53 +0000 Subject: [PATCH 3/6] Improve PartitionedAppendOnlyMap Inline benefit with this approach as we avoid the bad iterator wrapping --- .../collection/PartitionedAppendOnlyMap.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala index d0d25b43d047..5d555e645355 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala @@ -30,7 +30,21 @@ private[spark] class PartitionedAppendOnlyMap[K, V] def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { - val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator) + val comparator : Comparator[(Int, K)] = + if (keyComparator.isEmpty) { + partitionComparator + } else { + new Comparator[(Int, K)] { + override def compare(a: (Int, K), b: (Int, K)): Int = { + val partitionDiff = a._1 - b._1 + if (partitionDiff != 0) { + partitionDiff + } else { + keyComparator.get.compare(a._2, b._2) + } + } + } + } destructiveSortedIterator(comparator) } From fc8f98edec16f27d1fd9c58661e930f3b5aa7d6d Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Fri, 25 Nov 2016 13:41:46 +0000 Subject: [PATCH 4/6] Refactor to remove code duplication and do the .get earlier --- .../collection/PartitionedAppendOnlyMap.scala | 18 ++----------- .../collection/PartitionedPairBuffer.scala | 19 +++----------- .../WritablePartitionedPairCollection.scala | 26 +++++++++++++++++++ 3 files changed, 31 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala index 5d555e645355..77ad9881f40f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala @@ -30,25 +30,11 @@ private[spark] class PartitionedAppendOnlyMap[K, V] def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { - val comparator : Comparator[(Int, K)] = - if (keyComparator.isEmpty) { - partitionComparator - } else { - new Comparator[(Int, K)] { - override def compare(a: (Int, K), b: (Int, K)): Int = { - val partitionDiff = a._1 - b._1 - if (partitionDiff != 0) { - partitionDiff - } else { - keyComparator.get.compare(a._2, b._2) - } - } - } - } - destructiveSortedIterator(comparator) + destructiveSortedIterator(getComparator(keyComparator)) } def insert(partition: Int, key: K, value: V): Unit = { update((partition, key), value) } } + diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index d758a50965d1..b8af24d9af45 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -74,22 +74,8 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) /** Iterate through the data in a given order. For this class this is not really destructive. */ override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { - val comparator : Comparator[(Int, K)] = - if (keyComparator.isEmpty) { - partitionComparator - } else { - new Comparator[(Int, K)] { - override def compare(a: (Int, K), b: (Int, K)): Int = { - val partitionDiff = a._1 - b._1 - if (partitionDiff != 0) { - partitionDiff - } else { - keyComparator.get.compare(a._2, b._2) - } - } - } - } - new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator) + new Sorter(new KVArraySortDataFormat[(Int, K), + AnyRef]).sort(data, 0, curSize, getComparator(keyComparator)) iterator } @@ -112,3 +98,4 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) private object PartitionedPairBuffer { val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1 } + diff --git a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala index 5232c2bd8d6f..d4aaa9f29fb8 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala @@ -74,6 +74,31 @@ private[spark] object WritablePartitionedPairCollection { } } + /* Takes an optional parameter (keyComparator), use if provided + * and returns a comparator for the partitions + */ + def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = { + val comparator : Comparator[(Int, K)] = + if (keyComparator.isEmpty) { + partitionComparator + } else { + new Comparator[(Int, K)] { + // We know we have a non-empty comparator here + val ourKeyComp = keyComparator.get + override def compare(a: (Int, K), b: (Int, K)): Int = { + val partitionDiff = a._1 - b._1 + if (partitionDiff != 0) { + partitionDiff + } else { + ourKeyComp.compare(a._2, b._2) + } + } + } + } + comparator + } + + /** * A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering. */ @@ -102,3 +127,4 @@ private[spark] trait WritablePartitionedIterator { def nextPartition(): Int } + From d3423945c08a8863a63c16005d848af272c025c3 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Fri, 25 Nov 2016 13:43:26 +0000 Subject: [PATCH 5/6] And remove extra newlines --- .../apache/spark/util/collection/PartitionedAppendOnlyMap.scala | 1 - .../org/apache/spark/util/collection/PartitionedPairBuffer.scala | 1 - .../util/collection/WritablePartitionedPairCollection.scala | 1 - 3 files changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala index 77ad9881f40f..8f83262b63ff 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala @@ -37,4 +37,3 @@ private[spark] class PartitionedAppendOnlyMap[K, V] update((partition, key), value) } } - diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index b8af24d9af45..e89e4421f6b4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -98,4 +98,3 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) private object PartitionedPairBuffer { val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1 } - diff --git a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala index d4aaa9f29fb8..f117399b890b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala @@ -127,4 +127,3 @@ private[spark] trait WritablePartitionedIterator { def nextPartition(): Int } - From 53ed1708112fbf66b04fe89502e534ca3270d15c Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Fri, 25 Nov 2016 18:26:33 +0000 Subject: [PATCH 6/6] Refactoring --- .../collection/PartitionedPairBuffer.scala | 4 +- .../WritablePartitionedPairCollection.scala | 47 ++++++------------- 2 files changed, 16 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index e89e4421f6b4..e43bc78b3f08 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -74,8 +74,8 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) /** Iterate through the data in a given order. For this class this is not really destructive. */ override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { - new Sorter(new KVArraySortDataFormat[(Int, K), - AnyRef]).sort(data, 0, curSize, getComparator(keyComparator)) + val comparator = getComparator(keyComparator) + new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator) iterator } diff --git a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala index f117399b890b..cb50d446011d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala @@ -74,44 +74,25 @@ private[spark] object WritablePartitionedPairCollection { } } - /* Takes an optional parameter (keyComparator), use if provided + /** + * Takes an optional parameter (keyComparator), use if provided * and returns a comparator for the partitions */ - def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = { - val comparator : Comparator[(Int, K)] = - if (keyComparator.isEmpty) { - partitionComparator - } else { - new Comparator[(Int, K)] { - // We know we have a non-empty comparator here - val ourKeyComp = keyComparator.get - override def compare(a: (Int, K), b: (Int, K)): Int = { - val partitionDiff = a._1 - b._1 - if (partitionDiff != 0) { - partitionDiff - } else { - ourKeyComp.compare(a._2, b._2) - } + def getComparator[K](keyComparator: Option[Comparator[K]]): Comparator[(Int, K)] = { + if (!keyComparator.isDefined) return partitionComparator + else { + val theKeyComp = keyComparator.get + new Comparator[(Int, K)] { + // We know we have a non-empty comparator here + override def compare(a: (Int, K), b: (Int, K)): Int = { + val partitionDiff = a._1 - b._1 + if (partitionDiff != 0) { + partitionDiff + } else { + theKeyComp.compare(a._2, b._2) } } } - comparator - } - - - /** - * A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering. - */ - def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = { - new Comparator[(Int, K)] { - override def compare(a: (Int, K), b: (Int, K)): Int = { - val partitionDiff = a._1 - b._1 - if (partitionDiff != 0) { - partitionDiff - } else { - keyComparator.compare(a._2, b._2) - } - } } } }