From a4f7404ac28a913b4b5cc390e3f321ae6627b941 Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Sun, 4 May 2014 23:23:46 -0400 Subject: [PATCH 1/9] Address SPARK-1717 --- ec2/spark_ec2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index db393748a33b..c21ef9c586aa 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -612,7 +612,7 @@ def ssh_command(opts): return ['ssh'] + ssh_args(opts) -# Run a command on a host through ssh, retrying up to two times +# Run a command on a host through ssh, retrying up to five times # and then throwing an exception if ssh continues to fail. def ssh(host, opts, command): tries = 0 @@ -621,7 +621,7 @@ def ssh(host, opts, command): return subprocess.check_call( ssh_command(opts) + ['-t', '-t', '%s@%s' % (opts.user, host), stringify_command(command)]) except subprocess.CalledProcessError as e: - if (tries > 2): + if (tries > 5): # If this was an ssh failure, provide the user with hints. if e.returncode == 255: raise UsageError("Failed to SSH to remote host {0}.\nPlease check that you have provided the correct --identity-file and --key-pair parameters and try again.".format(host)) @@ -648,7 +648,7 @@ def ssh_write(host, opts, command, input): status = proc.wait() if status == 0: break - elif (tries > 2): + elif (tries > 5): raise RuntimeError("ssh_write failed with error %s" % proc.returncode) else: print >> stderr, "Error {0} while executing remote command, retrying after 30 seconds".format(status) From a76b3a15bba7d23fb4d3cdd5cc57f0c8bef8efca Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Sun, 1 Jun 2014 14:16:32 -0400 Subject: [PATCH 2/9] Add sortPartitions(...) --- .../spark/rdd/OrderedRDDFunctions.scala | 13 +++---- .../main/scala/org/apache/spark/rdd/RDD.scala | 5 +++ .../apache/spark/rdd/SortedParitionsRDD.scala | 35 +++++++++++++++++++ .../org/apache/spark/rdd/SortingSuite.scala | 12 ++++++- 4 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index 6a3f69844428..8147f7bc382f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -58,13 +58,10 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = { val part = new RangePartitioner(numPartitions, self, ascending) val shuffled = new ShuffledRDD[K, V, P](self, part) - shuffled.mapPartitions(iter => { - val buf = iter.toArray - if (ascending) { - buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator - } else { - buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator - } - }, preservesPartitioning = true) + if (ascending) { + shuffled.sortPartitions((x, y) => ordering.lt(x._1, y._1)) + } else { + shuffled.sortPartitions((x, y) => ordering.gt(x._1, y._1)) + } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index aa03e9276fb3..cf8306d2671f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -281,6 +281,11 @@ abstract class RDD[T: ClassTag]( */ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) + /** + * Return a new RDD containing sorted partitions in this RDD. + */ + def sortPartitions(lt: (T, T) => Boolean): RDD[T] = new SortedPartitionsRDD(this, sc.clean(lt)) + /** * Return a new RDD containing the distinct elements in this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala new file mode 100644 index 000000000000..4cde67aa0a58 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.{Partition, TaskContext} + +private[spark] class SortedPartitionsRDD[T: ClassTag]( + prev: RDD[T], + lt: (T, T) => Boolean) + extends RDD[T](prev) { + + override def getPartitions: Array[Partition] = firstParent[T].partitions + + override val partitioner = prev.partitioner // Since sort cannot change a partition's keys + + override def compute(split: Partition, context: TaskContext) = + firstParent[T].iterator(split, context).toArray.sortWith(lt).iterator +} diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index d0619559bb45..8cca7d7c4962 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -120,5 +120,15 @@ class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers partitions(1).last should be > partitions(2).head partitions(2).last should be > partitions(3).head } -} + test("sortPartitions") { + val rand = new scala.util.Random() + val array = Array.fill(10) { (rand.nextInt()) } + val pairs = sc.parallelize(array, 2) + val sorted = pairs.sortPartitions((x, y) => x < y) + assert(sorted.partitions.size === 2) + val partitions = sorted.collectPartitions() + assert(partitions(0) === partitions(0).sortWith((x, y) => x < y)) + assert(partitions(1) === partitions(1).sortWith((x, y) => x < y)) + } +} From 63825ba3ff21a72a1308e97c9b5fe3368a86c68e Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Tue, 10 Jun 2014 22:33:44 -0400 Subject: [PATCH 3/9] Basic merge sert --- .../apache/spark/rdd/SortedParitionsRDD.scala | 90 ++++++++++++++++++- 1 file changed, 87 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala index 4cde67aa0a58..97db470677e2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala @@ -18,8 +18,10 @@ package org.apache.spark.rdd import scala.reflect.ClassTag +import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.storage.{BlockId, BlockManager} private[spark] class SortedPartitionsRDD[T: ClassTag]( prev: RDD[T], @@ -28,8 +30,90 @@ private[spark] class SortedPartitionsRDD[T: ClassTag]( override def getPartitions: Array[Partition] = firstParent[T].partitions - override val partitioner = prev.partitioner // Since sort cannot change a partition's keys + override val partitioner = prev.partitioner // Since sorting partitions cannot change a partition's keys - override def compute(split: Partition, context: TaskContext) = - firstParent[T].iterator(split, context).toArray.sortWith(lt).iterator + override def compute(split: Partition, context: TaskContext) = { + new SortedIterator(firstParent[T].iterator(split, context), lt) + } +} + +private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) extends Iterator[T] { + private val sorted = doSort() + + def hasNext : Boolean = { + sorted.hasNext + } + + def next : T = { + sorted.next + } + + private def doSort() : Iterator[T] = { + val chunkList = new ArrayBuffer[Iterator[T]]() + chunkList += nextChunk + while (iter.hasNext) { + chunkList += nextChunk + } + merge(chunkList) + } + + private def merge(list : ArrayBuffer[Iterator[T]]) : Iterator[T] = { + if (list.size == 1) { + return list(0) + } + if (list.size == 2) { + return doMerge(list(0), list(1)) + } + val mid = list.size >> 1 + // sort right first since it's in memory + val right = merge(list.slice(mid, list.size)) + val left = merge(list.slice(0, mid)) + doMerge(left, right) + } + + private def doMerge(it1 : Iterator[T], it2 : Iterator[T]) : Iterator[T] = { + // TODO: write to disk block + var array = new ArrayBuffer[T](100) + if (!it1.hasNext) { + array ++= it2 + return array.iterator + } + if (!it2.hasNext) { + array ++= it1 + return array.iterator + } + var t1 = it1.next + var t2 = it2.next + while (true) { + if (lt(t1, t2)) { + array += t1 + if (it1.hasNext) { + t1 = it1.next + } else { + array += t2 + array ++= it2 + return array.iterator + } + } else { + array += t2 + if (it2.hasNext) { + t2 = it2.next + } else { + array += t1 + array ++= it1 + return array.iterator + } + } + } + array.iterator + } + + private def nextChunk() : Iterator[T] = { + var array = new ArrayBuffer[T](10000) + // TODO: use SizeEstimator + while (array.size < 100 && iter.hasNext) { + array += iter.next + } + return array.sortWith(lt).iterator + } } From 1d7709e5f013fc76bca92a97e7108efbc38071ec Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Wed, 11 Jun 2014 20:30:08 -0400 Subject: [PATCH 4/9] [SPARK-983][WIP] Add DiskBuffer/DiskBufferIterator --- .../apache/spark/rdd/SortedParitionsRDD.scala | 77 ++++++++++++++++++- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala index 97db470677e2..01a58150c57d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala @@ -19,8 +19,11 @@ package org.apache.spark.rdd import scala.reflect.ClassTag import scala.collection.mutable.ArrayBuffer +import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException} import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BlockManager} private[spark] class SortedPartitionsRDD[T: ClassTag]( @@ -37,7 +40,7 @@ private[spark] class SortedPartitionsRDD[T: ClassTag]( } } -private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) extends Iterator[T] { +private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) extends Iterator[T] { private val sorted = doSort() def hasNext : Boolean = { @@ -72,8 +75,8 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) } private def doMerge(it1 : Iterator[T], it2 : Iterator[T]) : Iterator[T] = { - // TODO: write to disk block - var array = new ArrayBuffer[T](100) + var array = new DiskBuffer[T]() +// var array = new ArrayBuffer[T]() if (!it1.hasNext) { array ++= it2 return array.iterator @@ -111,9 +114,75 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) private def nextChunk() : Iterator[T] = { var array = new ArrayBuffer[T](10000) // TODO: use SizeEstimator - while (array.size < 100 && iter.hasNext) { + while (array.size < 1000 && iter.hasNext) { array += iter.next } return array.sortWith(lt).iterator } } + +private class DiskBuffer[T] { + private val serializer = SparkEnv.get.serializer + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val sparkConf = SparkEnv.get.conf + private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 + + val (blockId, file) = diskBlockManager.createTempBlock() + var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize) + + def +=(elem: T): DiskBuffer.this.type = { + writer.write(elem) + this + } + + def ++=(xs: TraversableOnce[T]): DiskBuffer.this.type = { + xs.foreach(writer.write(_)) + this + } + + def iterator : Iterator[T] = { + writer.close + val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 + new DiskBufferIterator(file, blockId, serializer, fileBufferSize) + } +} + +private class DiskBufferIterator[T](file: File, blockId: BlockId, serializer: Serializer, fileBufferSize : Int) extends Iterator[T] { + private val fileStream = new FileInputStream(file) + private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) + private var compressedStream = SparkEnv.get.blockManager.wrapForCompression(blockId, bufferedStream) + private var deserializeStream = serializer.newInstance.deserializeStream(compressedStream) + private var nextItem : T = null.asInstanceOf[T] + + def hasNext : Boolean = { + if (nextItem == null) { + nextItem = doNext() + } + nextItem != null + } + + def next() : T = { + val item = if (nextItem == null) doNext else nextItem + if (item == null) { + throw new NoSuchElementException + } + nextItem = null.asInstanceOf[T] + item + } + + private def doNext() : T = { + try { + deserializeStream.readObject().asInstanceOf[T] + } catch { + case e: EOFException => + cleanup + null.asInstanceOf[T] + } + } + + private def cleanup() = { + deserializeStream.close() + file.delete() + } +} From 5c2b55ab314b9c961a41910076d46820eef5e2c1 Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Wed, 11 Jun 2014 21:39:11 -0400 Subject: [PATCH 5/9] [SPARK-983][WIP] refactor code/spill sublists --- .../apache/spark/rdd/SortedParitionsRDD.scala | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala index 01a58150c57d..272c7efe83a5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala @@ -52,14 +52,34 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) } private def doSort() : Iterator[T] = { - val chunkList = new ArrayBuffer[Iterator[T]]() - chunkList += nextChunk + val subLists = new ArrayBuffer[Iterator[T]]() + + // keep the first sub list in memory + subLists += nextSubList + while (iter.hasNext) { - chunkList += nextChunk + // spill remaining sub lists to disk + var diskBuffer = new DiskBuffer[T]() + diskBuffer ++= nextSubList + subLists += diskBuffer.iterator + } + + merge(subLists) + } + + private def nextSubList() : Iterator[T] = { + var subList = new ArrayBuffer[T](10000) + while (fitsInMemory(subList) && iter.hasNext) { + subList += iter.next } - merge(chunkList) + return subList.sortWith(lt).iterator } + private def fitsInMemory(list : ArrayBuffer[T]) : Boolean = { + // TODO: use SizeEstimator + list.size < 100 + } + private def merge(list : ArrayBuffer[Iterator[T]]) : Iterator[T] = { if (list.size == 1) { return list(0) @@ -68,14 +88,14 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) return doMerge(list(0), list(1)) } val mid = list.size >> 1 - // sort right first since it's in memory - val right = merge(list.slice(mid, list.size)) val left = merge(list.slice(0, mid)) + val right = merge(list.slice(mid, list.size)) doMerge(left, right) } private def doMerge(it1 : Iterator[T], it2 : Iterator[T]) : Iterator[T] = { var array = new DiskBuffer[T]() +// for testing... // var array = new ArrayBuffer[T]() if (!it1.hasNext) { array ++= it2 @@ -110,15 +130,6 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) } array.iterator } - - private def nextChunk() : Iterator[T] = { - var array = new ArrayBuffer[T](10000) - // TODO: use SizeEstimator - while (array.size < 1000 && iter.hasNext) { - array += iter.next - } - return array.sortWith(lt).iterator - } } private class DiskBuffer[T] { From 3dd4be624a082214113fbf3f7b2ab655cd14bf92 Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Fri, 13 Jun 2014 23:57:33 -0400 Subject: [PATCH 6/9] [SPARK-983][WIP] fetch upstream/refactor code --- .../apache/spark/rdd/SortedParitionsRDD.scala | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala index 272c7efe83a5..0e29f2e505af 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala @@ -77,7 +77,7 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) private def fitsInMemory(list : ArrayBuffer[T]) : Boolean = { // TODO: use SizeEstimator - list.size < 100 + list.size < 500 } private def merge(list : ArrayBuffer[Iterator[T]]) : Iterator[T] = { @@ -164,31 +164,39 @@ private class DiskBufferIterator[T](file: File, blockId: BlockId, serializer: Se private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) private var compressedStream = SparkEnv.get.blockManager.wrapForCompression(blockId, bufferedStream) private var deserializeStream = serializer.newInstance.deserializeStream(compressedStream) - private var nextItem : T = null.asInstanceOf[T] + private var nextItem = None : Option[T] def hasNext : Boolean = { - if (nextItem == null) { - nextItem = doNext() + nextItem match { + case Some(item) => true + case None => nextItem = doNext() + } + nextItem match { + case Some(item) => true + case None => false } - nextItem != null } def next() : T = { - val item = if (nextItem == null) doNext else nextItem - if (item == null) { - throw new NoSuchElementException + nextItem match { + case Some(item) => + nextItem = None + item + case None => + doNext match { + case Some(item) => item + case None => throw new NoSuchElementException + } } - nextItem = null.asInstanceOf[T] - item } - private def doNext() : T = { + private def doNext() : Option[T] = { try { - deserializeStream.readObject().asInstanceOf[T] + Some(deserializeStream.readObject().asInstanceOf[T]) } catch { case e: EOFException => cleanup - null.asInstanceOf[T] + None } } From 5ffdf96780e5a48c2542c3bb10c4459c9fbaea00 Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Sat, 14 Jun 2014 14:47:21 -0400 Subject: [PATCH 7/9] [SPARK-983][WIP] add SizeEstimator --- .../apache/spark/rdd/SortedParitionsRDD.scala | 74 ++++++++++++++++--- 1 file changed, 65 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala index 0e29f2e505af..1ef33e48ef8d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala @@ -20,11 +20,11 @@ package org.apache.spark.rdd import scala.reflect.ClassTag import scala.collection.mutable.ArrayBuffer import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException} - import org.apache.spark.{Partition, TaskContext} import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BlockManager} +import org.apache.spark.util.SizeEstimator private[spark] class SortedPartitionsRDD[T: ClassTag]( prev: RDD[T], @@ -40,7 +40,15 @@ private[spark] class SortedPartitionsRDD[T: ClassTag]( } } -private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) extends Iterator[T] { +private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) extends Iterator[T] with Logging { + private val sparkConf = SparkEnv.get.conf + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { + val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) + val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) + (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + private val sorted = doSort() def hasNext : Boolean = { @@ -63,21 +71,23 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) diskBuffer ++= nextSubList subLists += diskBuffer.iterator } + println("Merge sorting one in-memory list with %d external list(s)".format(subLists.size - 1)) + logInfo("Merge sorting one in-memory list with %d external list(s)".format(subLists.size - 1)) merge(subLists) } private def nextSubList() : Iterator[T] = { - var subList = new ArrayBuffer[T](10000) + var subList = new SizeTrackingArrayBuffer[T](10) while (fitsInMemory(subList) && iter.hasNext) { subList += iter.next } return subList.sortWith(lt).iterator } - private def fitsInMemory(list : ArrayBuffer[T]) : Boolean = { - // TODO: use SizeEstimator - list.size < 500 + private def fitsInMemory(list : SizeTrackingArrayBuffer[T]) : Boolean = { + // TODO: use maxMemoryThreshold + list.estimateSize < 10000 } private def merge(list : ArrayBuffer[Iterator[T]]) : Iterator[T] = { @@ -132,6 +142,50 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) } } +private class SizeTrackingArrayBuffer[T](initialSize : Int) { + private var array = new ArrayBuffer[T](initialSize) + private var averageSize : Double = 0.0 + private var nextSampleNum : Int = 1 + + def +=(elem: T): this.type = { + array += elem + updateAverage + this + } + + def ++=(xs: TraversableOnce[T]): this.type = { + array ++= xs + updateAverage + this + } + + def size : Int = { + array.size + } + + def sortWith(lt: (T, T) => Boolean): this.type = { + array = array.sortWith(lt) + this + } + + def iterator : Iterator[T] = { + array.iterator + } + + def updateAverage = { + if (array.size >= nextSampleNum) { + averageSize = SizeEstimator.estimate(array) + averageSize /= array.size + nextSampleNum <<= 1 + assert(nextSampleNum < 0x40000000) + } + } + + def estimateSize(): Long = { + (array.size * averageSize).toLong + } +} + private class DiskBuffer[T] { private val serializer = SparkEnv.get.serializer private val blockManager = SparkEnv.get.blockManager @@ -141,14 +195,16 @@ private class DiskBuffer[T] { val (blockId, file) = diskBlockManager.createTempBlock() var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize) + var numObjects : Int = 0 - def +=(elem: T): DiskBuffer.this.type = { + def +=(elem: T): this.type = { + numObjects += 1 writer.write(elem) this } - def ++=(xs: TraversableOnce[T]): DiskBuffer.this.type = { - xs.foreach(writer.write(_)) + def ++=(xs: TraversableOnce[T]): this.type = { + xs.foreach({ numObjects += 1; writer.write(_) }) this } From ad1a1a8b1a2e28d3bd900a0075c7a0e2ff741a20 Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Sat, 14 Jun 2014 21:51:58 -0400 Subject: [PATCH 8/9] [SPARK-983] complete disk spill logic --- .../apache/spark/rdd/SortedParitionsRDD.scala | 88 +++++++++++++++++-- 1 file changed, 79 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala index 1ef33e48ef8d..6de91c6aab40 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala @@ -26,6 +26,21 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BlockManager} import org.apache.spark.util.SizeEstimator +/** + * An RDD that sorts each of it's partitions independently. + * + * If partitions are too large to fit in memory, they are externally sorted. + * + * Two parameters control the memory threshold for external sort: + * + * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing + * sub lists as a fraction of the executor's total memory. Since each concurrently running + * task maintains one map, the actual threshold for each map is this quantity divided by the + * number of running tasks. + * + * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of + * this threshold, in case sub list size estimation is not sufficiently accurate. + */ private[spark] class SortedPartitionsRDD[T: ClassTag]( prev: RDD[T], lt: (T, T) => Boolean) @@ -40,6 +55,9 @@ private[spark] class SortedPartitionsRDD[T: ClassTag]( } } +/** + * An iterator that sorts a supplied iterator, either in-memory or externally. + */ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) extends Iterator[T] with Logging { private val sparkConf = SparkEnv.get.conf // Collective memory threshold shared across all running tasks @@ -49,6 +67,9 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } + // Number of list elements before tracking memory usage + private val trackMemoryThreshold = 1000 + private val sorted = doSort() def hasNext : Boolean = { @@ -59,37 +80,72 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) sorted.next } + /** + * Sort the incoming iterator. + * Any input that cannot fit in memory is split into sorted sub-lists and spilled to disk. + * Any spilled sub-lists are merge sorted and written back to disk. + */ private def doSort() : Iterator[T] = { val subLists = new ArrayBuffer[Iterator[T]]() - // keep the first sub list in memory + // keep the first sub-list in memory subLists += nextSubList while (iter.hasNext) { - // spill remaining sub lists to disk + // spill remaining sub-lists to disk var diskBuffer = new DiskBuffer[T]() diskBuffer ++= nextSubList subLists += diskBuffer.iterator } - println("Merge sorting one in-memory list with %d external list(s)".format(subLists.size - 1)) logInfo("Merge sorting one in-memory list with %d external list(s)".format(subLists.size - 1)) merge(subLists) } + /** + * Gets a sorted sub-list that can fit in memory. + */ private def nextSubList() : Iterator[T] = { - var subList = new SizeTrackingArrayBuffer[T](10) + var subList = new SizeTrackingArrayBuffer[T](1000) while (fitsInMemory(subList) && iter.hasNext) { subList += iter.next } return subList.sortWith(lt).iterator } - + + /** + * Determines if a given list can fit in memory. + * This algorithm is similar to that found in ExternalAppendOnlyMap. + */ private def fitsInMemory(list : SizeTrackingArrayBuffer[T]) : Boolean = { - // TODO: use maxMemoryThreshold - list.estimateSize < 10000 + if (list.size > trackMemoryThreshold && list.atNextSampleSize) { + val listSize = list.estimateSize() + val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap + + // Atomically check whether there is sufficient memory in the global pool for + // this map to grow and, if possible, allocate the required amount + shuffleMemoryMap.synchronized { + val threadId = Thread.currentThread().getId + val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) + val availableMemory = maxMemoryThreshold - + (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) + + // Assume list growth factor is 2x + if (availableMemory > listSize * 2) { + shuffleMemoryMap(threadId) = listSize * 2 + } else { + shuffleMemoryMap(threadId) = 0 + return false + } + } + } + return true } + /** + * Merge-sort a list of iterators, which might be in memory or disk. + * Returns a sorted iterator. + */ private def merge(list : ArrayBuffer[Iterator[T]]) : Iterator[T] = { if (list.size == 1) { return list(0) @@ -103,10 +159,11 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) doMerge(left, right) } + /** + * Merge two iterators, returning a sorted iterator. + */ private def doMerge(it1 : Iterator[T], it2 : Iterator[T]) : Iterator[T] = { var array = new DiskBuffer[T]() -// for testing... -// var array = new ArrayBuffer[T]() if (!it1.hasNext) { array ++= it2 return array.iterator @@ -142,6 +199,9 @@ private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) } } +/** + * A buffer similar to ArrayBuffer that can estimate it's size in bytes. + */ private class SizeTrackingArrayBuffer[T](initialSize : Int) { private var array = new ArrayBuffer[T](initialSize) private var averageSize : Double = 0.0 @@ -172,6 +232,10 @@ private class SizeTrackingArrayBuffer[T](initialSize : Int) { array.iterator } + def atNextSampleSize : Boolean = { + array.size >= nextSampleNum + } + def updateAverage = { if (array.size >= nextSampleNum) { averageSize = SizeEstimator.estimate(array) @@ -186,6 +250,9 @@ private class SizeTrackingArrayBuffer[T](initialSize : Int) { } } +/** + * A buffer similar to ArrayBuffer, but stored on disk. + */ private class DiskBuffer[T] { private val serializer = SparkEnv.get.serializer private val blockManager = SparkEnv.get.blockManager @@ -215,6 +282,9 @@ private class DiskBuffer[T] { } } +/** + * An iterator for DiskBuffer + */ private class DiskBufferIterator[T](file: File, blockId: BlockId, serializer: Serializer, fileBufferSize : Int) extends Iterator[T] { private val fileStream = new FileInputStream(file) private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) From 71c0dfad75f8116ded6cc7c24cce2bdbb00e9f87 Mon Sep 17 00:00:00 2001 From: msiddalingaiah Date: Sat, 14 Jun 2014 22:33:57 -0400 Subject: [PATCH 9/9] [SPARK-983] fix Scala style errors --- .../org/apache/spark/rdd/SortedParitionsRDD.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala index 6de91c6aab40..9a530e10955a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SortedParitionsRDD.scala @@ -48,7 +48,8 @@ private[spark] class SortedPartitionsRDD[T: ClassTag]( override def getPartitions: Array[Partition] = firstParent[T].partitions - override val partitioner = prev.partitioner // Since sorting partitions cannot change a partition's keys + // Since sorting partitions cannot change a partition's keys + override val partitioner = prev.partitioner override def compute(split: Partition, context: TaskContext) = { new SortedIterator(firstParent[T].iterator(split, context), lt) @@ -58,7 +59,8 @@ private[spark] class SortedPartitionsRDD[T: ClassTag]( /** * An iterator that sorts a supplied iterator, either in-memory or externally. */ -private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) extends Iterator[T] with Logging { +private[spark] class SortedIterator[T](iter: Iterator[T], lt: (T, T) => Boolean) + extends Iterator[T] with Logging { private val sparkConf = SparkEnv.get.conf // Collective memory threshold shared across all running tasks private val maxMemoryThreshold = { @@ -285,10 +287,12 @@ private class DiskBuffer[T] { /** * An iterator for DiskBuffer */ -private class DiskBufferIterator[T](file: File, blockId: BlockId, serializer: Serializer, fileBufferSize : Int) extends Iterator[T] { +private class DiskBufferIterator[T](file: File, blockId: BlockId, serializer: Serializer, + fileBufferSize : Int) extends Iterator[T] { private val fileStream = new FileInputStream(file) private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) - private var compressedStream = SparkEnv.get.blockManager.wrapForCompression(blockId, bufferedStream) + private var compressedStream = + SparkEnv.get.blockManager.wrapForCompression(blockId, bufferedStream) private var deserializeStream = serializer.newInstance.deserializeStream(compressedStream) private var nextItem = None : Option[T]