diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index f5d80bbcf3557..f84f5f64dea33 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -17,27 +17,28 @@ package org.apache.spark.shuffle.sort; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; - -import scala.Product2; -import scala.Tuple2; -import scala.collection.Iterator; - import com.google.common.io.Closeables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.serializer.Serializer; import org.apache.spark.serializer.SerializerInstance; -import org.apache.spark.storage.*; +import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.BlockManager; +import org.apache.spark.storage.DiskBlockObjectWriter; +import org.apache.spark.storage.TempShuffleBlockId; import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Product2; +import scala.Tuple2; +import scala.collection.Iterator; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; /** * This class implements sort-based shuffle's hash-style shuffle fallback path. This write path @@ -73,6 +74,7 @@ final class BypassMergeSortShuffleWriter implements SortShuffleFileWriter< private final Partitioner partitioner; private final ShuffleWriteMetrics writeMetrics; private final Serializer serializer; + private final boolean dropKeys; /** Array of file writers, one for each partition */ private DiskBlockObjectWriter[] partitionWriters; @@ -82,7 +84,8 @@ public BypassMergeSortShuffleWriter( BlockManager blockManager, Partitioner partitioner, ShuffleWriteMetrics writeMetrics, - Serializer serializer) { + Serializer serializer, + boolean dropKeys) { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); @@ -91,10 +94,11 @@ public BypassMergeSortShuffleWriter( this.partitioner = partitioner; this.writeMetrics = writeMetrics; this.serializer = serializer; + this.dropKeys = dropKeys; } @Override - public void insertAll(Iterator> records) throws IOException { + public void insertAll(Iterator> records, boolean dropKeys) throws IOException { assert (partitionWriters == null); if (!records.hasNext()) { return; @@ -118,7 +122,12 @@ public void insertAll(Iterator> records) throws IOException { while (records.hasNext()) { final Product2 record = records.next(); final K key = record._1(); - partitionWriters[partitioner.getPartition(key)].write(key, record._2()); + + if(dropKeys) { + partitionWriters[partitioner.getPartition(key)].write(record._2()); + } else { + partitionWriters[partitioner.getPartition(key)].write(key, record._2()); + } } for (DiskBlockObjectWriter writer : partitionWriters) { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java index 656ea0401a144..6ea17a3a9b486 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java @@ -17,15 +17,14 @@ package org.apache.spark.shuffle.sort; -import java.io.File; -import java.io.IOException; - +import org.apache.spark.TaskContext; +import org.apache.spark.annotation.Private; +import org.apache.spark.storage.BlockId; import scala.Product2; import scala.collection.Iterator; -import org.apache.spark.annotation.Private; -import org.apache.spark.TaskContext; -import org.apache.spark.storage.BlockId; +import java.io.File; +import java.io.IOException; /** * Interface for objects that {@link SortShuffleWriter} uses to write its output files. @@ -33,7 +32,7 @@ @Private public interface SortShuffleFileWriter { - void insertAll(Iterator> records) throws IOException; + void insertAll(Iterator> records, boolean dropKeys) throws IOException; /** * Write all the data added into this shuffle sorter into a file in the disk store. This is diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 9aafc9eb1cde7..e1b17d054d332 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -73,7 +73,8 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, - val mapSideCombine: Boolean = false) + val mapSideCombine: Boolean = false, + val dropKeys: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] @@ -88,7 +89,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( - shuffleId, _rdd.partitions.size, this) + shuffleId, _rdd.partitions.size, this).setDropKeys(dropKeys) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a56e542242d5f..a8d90a3fce17a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -398,9 +398,11 @@ abstract class RDD[T: ClassTag]( // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( - new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), - new HashPartitioner(numPartitions)), - numPartitions).values + new ShuffledRDD[Int, T, T]( + mapPartitionsWithIndex(distributePartition), + new HashPartitioner(numPartitions) + ).setDropKeys(true).mapPartitions(_.asInstanceOf[Iterator[T]]), + numPartitions) } else { new CoalescedRDD(this, numPartitions) } diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index a013c3f66a3a8..7bb0507a79680 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -52,6 +52,8 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( private var mapSideCombine: Boolean = false + private var dropKeys: Boolean = false + /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C] = { this.serializer = Option(serializer) @@ -76,8 +78,15 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( this } + /** Set dropKeys flag for RDD's shuffle. */ + def setDropKeys(dropKeys: Boolean): ShuffledRDD[K, V, C] = { + this.dropKeys = dropKeys + this + } + override def getDependencies: Seq[Dependency[_]] = { - List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) + List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, + mapSideCombine, dropKeys)) } override val partitioner = Some(part) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 7c3e2b5a3703b..badab8936e2d4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -61,12 +61,16 @@ private[spark] class BlockStoreShuffleReader[K, C]( // Note: the asKeyValueIterator below wraps a key/value iterator inside of a // NextIterator. The NextIterator makes sure that close() is called on the // underlying InputStream when all records have been read. - serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator + if (!handle.dropKeys) { + serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator + } else { + serializerInstance.deserializeStream(wrappedStream).asIterator + } } // Update the context task metrics for each record read. val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() - val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( + val metricIter = CompletionIterator[Any, Iterator[Any]]( recordIter.map(record => { readMetrics.incRecordsRead(1) record @@ -74,7 +78,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().updateShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation - val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) + val interruptibleIter = new InterruptibleIterator[Any](context, metricIter) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { @@ -99,7 +103,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) - sorter.insertAll(aggregatedIter) + sorter.insertAll(aggregatedIter, handle.dropKeys) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.internalMetricsToAccumulators( diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleHandle.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleHandle.scala index e04c97fe61894..c9ac80ccabd6a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleHandle.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleHandle.scala @@ -25,4 +25,15 @@ import org.apache.spark.annotation.DeveloperApi * @param shuffleId ID of the shuffle */ @DeveloperApi -abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {} +abstract class ShuffleHandle(val shuffleId: Int) extends Serializable { + private var _dropKeys: Boolean = false + + def setDropKeys(dropKeys: Boolean): ShuffleHandle = { + this._dropKeys = dropKeys + this + } + + def dropKeys: Boolean = { + _dropKeys + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 5865e7640c1cf..91333006e296c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -52,8 +52,8 @@ private[spark] class SortShuffleWriter[K, V, C]( override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") - new ExternalSorter[K, V, C]( - dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) + new ExternalSorter[K, V, C](dep.aggregator, Some(dep.partitioner), + dep.keyOrdering, dep.serializer, dep.shuffleHandle.dropKeys) } else if (SortShuffleWriter.shouldBypassMergeSort( SparkEnv.get.conf, dep.partitioner.numPartitions, aggregator = None, keyOrdering = None)) { // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't @@ -62,15 +62,15 @@ private[spark] class SortShuffleWriter[K, V, C]( // together the spilled files, which would happen with the normal code path. The downside is // having multiple files open at a time and thus more memory allocated to buffers. new BypassMergeSortShuffleWriter[K, V](SparkEnv.get.conf, blockManager, dep.partitioner, - writeMetrics, Serializer.getSerializer(dep.serializer)) + writeMetrics, Serializer.getSerializer(dep.serializer), dep.shuffleHandle.dropKeys) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. - new ExternalSorter[K, V, V]( - aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) + new ExternalSorter[K, V, V](aggregator = None, Some(dep.partitioner), + ordering = None, dep.serializer, dep.shuffleHandle.dropKeys) } - sorter.insertAll(records) + sorter.insertAll(records, handle.dropKeys) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 80d426fadc65e..b62baf7990a24 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -185,6 +185,18 @@ private[spark] class DiskBlockObjectWriter( recordWritten() } + /** + * Writes an object. + */ + def write(obj: Any) { + if (!initialized) { + open() + } + + objOut.writeObject(obj) + recordWritten() + } + override def write(b: Int): Unit = throw new UnsupportedOperationException() override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 749be34d8e8fd..d092e8dedfaf9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -91,7 +91,8 @@ private[spark] class ExternalSorter[K, V, C]( aggregator: Option[Aggregator[K, V, C]] = None, partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, - serializer: Option[Serializer] = None) + serializer: Option[Serializer] = None, + dropKeys: Boolean = false) extends Logging with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { @@ -192,7 +193,7 @@ private[spark] class ExternalSorter[K, V, C]( */ private[spark] def numSpills: Int = spills.size - override def insertAll(records: Iterator[Product2[K, V]]): Unit = { + override def insertAll(records: Iterator[Product2[K, V]], dropKeys: Boolean): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined @@ -670,7 +671,7 @@ private[spark] class ExternalSorter[K, V, C]( if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer - val it = collection.destructiveSortedWritablePartitionedIterator(comparator) + val it = collection.destructiveSortedWritablePartitionedIterator(comparator, dropKeys) while (it.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) @@ -689,7 +690,11 @@ private[spark] class ExternalSorter[K, V, C]( val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { - writer.write(elem._1, elem._2) + if (dropKeys) { + writer.write(elem._2) + } else { + writer.write(elem._1, elem._2) + } } writer.commitAndClose() val segment = writer.fileSegment() diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala index 87a786b02d651..9efe3cbd2d530 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala @@ -128,7 +128,8 @@ private[spark] class PartitionedSerializedPairBuffer[K, V]( override def estimateSize: Long = metaBuffer.capacity * 4L + kvBuffer.capacity - override def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]) + override def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]], + dropKeys: Boolean = false) : WritablePartitionedIterator = { sort(keyComparator) new WritablePartitionedIterator { diff --git a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala index 38848e9018c6c..39f536c055494 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala @@ -45,14 +45,19 @@ private[spark] trait WritablePartitionedPairCollection[K, V] { * returned in order of their partition ID and then the given comparator. * This may destroy the underlying collection. */ - def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]) + def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]], + dropKeys: Boolean = false) : WritablePartitionedIterator = { val it = partitionedDestructiveSortedIterator(keyComparator) new WritablePartitionedIterator { private[this] var cur = if (it.hasNext) it.next() else null def writeNext(writer: DiskBlockObjectWriter): Unit = { - writer.write(cur._1._2, cur._2) + if (dropKeys) { + writer.write(cur._2) + } else { + writer.write(cur._1._2, cur._2) + } cur = if (it.hasNext) it.next() else null } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 341f56df2dafc..1a181206cd71b 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -111,9 +111,10 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, new HashPartitioner(7), shuffleWriteMetrics, - serializer + serializer, + false ) - writer.insertAll(Iterator.empty) + writer.insertAll(Iterator.empty, false) val partitionLengths = writer.writePartitionedFile(shuffleBlockId, taskContext, outputFile) assert(partitionLengths.sum === 0) assert(outputFile.exists()) @@ -133,9 +134,10 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, new HashPartitioner(7), shuffleWriteMetrics, - serializer + serializer, + false ) - writer.insertAll(records) + writer.insertAll(records, false) assert(temporaryFilesCreated.nonEmpty) val partitionLengths = writer.writePartitionedFile(shuffleBlockId, taskContext, outputFile) assert(partitionLengths.sum === outputFile.length()) @@ -152,7 +154,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, new HashPartitioner(7), shuffleWriteMetrics, - serializer + serializer, + false ) intercept[SparkException] { writer.insertAll((0 until 100000).iterator.map(i => { @@ -160,7 +163,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte throw new SparkException("Intentional failure") } (i, i) - })) + }), false) } assert(temporaryFilesCreated.nonEmpty) writer.stop() diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index bdb0f4d507a7e..930205f870bbe 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -105,28 +105,28 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { // Both aggregator and ordering val sorter = new ExternalSorter[Int, Int, Int]( Some(agg), Some(new HashPartitioner(7)), Some(ord), None) - sorter.insertAll(elements.iterator) + sorter.insertAll(elements.iterator, false) assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter.stop() // Only aggregator val sorter2 = new ExternalSorter[Int, Int, Int]( Some(agg), Some(new HashPartitioner(7)), None, None) - sorter2.insertAll(elements.iterator) + sorter2.insertAll(elements.iterator, false) assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter2.stop() // Only ordering val sorter3 = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(7)), Some(ord), None) - sorter3.insertAll(elements.iterator) + sorter3.insertAll(elements.iterator, false) assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter3.stop() // Neither aggregator nor ordering val sorter4 = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(7)), None, None) - sorter4.insertAll(elements.iterator) + sorter4.insertAll(elements.iterator, false) assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter4.stop() } @@ -150,7 +150,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val sorter = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(7)), Some(ord), None) - sorter.insertAll(elements) + sorter.insertAll(elements, false) assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) assert(iter.next() === (0, Nil)) @@ -332,14 +332,14 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val sorter = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(3)), Some(ord), None) - sorter.insertAll((0 until 120000).iterator.map(i => (i, i))) + sorter.insertAll((0 until 120000).iterator.map(i => (i, i)), false) assert(diskBlockManager.getAllFiles().length > 0) sorter.stop() assert(diskBlockManager.getAllBlocks().length === 0) val sorter2 = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(3)), Some(ord), None) - sorter2.insertAll((0 until 120000).iterator.map(i => (i, i))) + sorter2.insertAll((0 until 120000).iterator.map(i => (i, i)), false) assert(diskBlockManager.getAllFiles().length > 0) assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet) sorter2.stop() @@ -363,7 +363,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { throw new SparkException("Intentional failure") } (i, i) - })) + }), false) } assert(diskBlockManager.getAllFiles().length > 0) sorter.stop() @@ -421,7 +421,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) - sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i))) + sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i)), false) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { (p, (0 until 100000).map(i => (i / 4, i)).filter(_._1 % 3 == p).toSet) @@ -444,7 +444,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None) - sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i))) + sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i)), false) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { (p, (0 until 50).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) @@ -467,7 +467,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None) - sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i))) + sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i)), false) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) @@ -494,8 +494,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None) // avoid combine before spill - sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i))) - sorter.insertAll((0 until 50000).iterator.map(i => (i, 2 * i + 1))) + sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i)), false) + sorter.insertAll((0 until 50000).iterator.map(i => (i, 2 * i + 1)), false) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) @@ -519,7 +519,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val ord = implicitly[Ordering[Int]] val sorter = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(3)), Some(ord), None) - sorter.insertAll((0 until 100).iterator.map(i => (i, i))) + sorter.insertAll((0 until 100).iterator.map(i => (i, i)), false) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq val expected = (0 until 3).map(p => { (p, (0 until 100).map(i => (i, i)).filter(_._1 % 3 == p).toSeq) @@ -543,7 +543,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val ord = implicitly[Ordering[Int]] val sorter = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(3)), Some(ord), None) - sorter.insertAll((0 until 100000).iterator.map(i => (i, i))) + sorter.insertAll((0 until 100000).iterator.map(i => (i, i)), false) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq val expected = (0 until 3).map(p => { (p, (0 until 100000).map(i => (i, i)).filter(_._1 % 3 == p).toSeq) @@ -590,7 +590,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val toInsert = (1 to 100000).iterator.map(_.toString).map(s => (s, s)) ++ collisionPairs.iterator ++ collisionPairs.iterator.map(_.swap) - sorter.insertAll(toInsert) + sorter.insertAll(toInsert, false) // A map of collision pairs in both directions val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap @@ -619,7 +619,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes // problems if the map fails to group together the objects with the same code (SPARK-2043). val toInsert = for (i <- 1 to 10; j <- 1 to 10000) yield (FixedHashObject(j, j % 2), 1) - sorter.insertAll(toInsert.iterator) + sorter.insertAll(toInsert.iterator, false) val it = sorter.iterator var count = 0 @@ -646,7 +646,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None) sorter.insertAll( - (1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue))) + (1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)), false) val it = sorter.iterator while (it.hasNext) { @@ -675,7 +675,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { (null.asInstanceOf[String], "1"), ("1", null.asInstanceOf[String]), (null.asInstanceOf[String], null.asInstanceOf[String]) - )) + ), false) val it = sorter.iterator while (it.hasNext) { @@ -712,7 +712,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val sorter1 = new ExternalSorter[String, String, String]( None, None, Some(wrongOrdering), None) val thrown = intercept[IllegalArgumentException] { - sorter1.insertAll(testData.iterator.map(i => (i, i))) + sorter1.insertAll(testData.iterator.map(i => (i, i)), false) sorter1.iterator } @@ -732,7 +732,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]]( Some(agg), None, None, None) - sorter2.insertAll(testData.iterator.map(i => (i, i))) + sorter2.insertAll(testData.iterator.map(i => (i, i)), false) // To validate the hash ordering of key var minKey = Int.MinValue diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index 27f26245a5ef0..85ad044c50f81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -49,7 +49,7 @@ case class Sort( child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering)) - sorter.insertAll(iterator.map(r => (r.copy(), null))) + sorter.insertAll(iterator.map(r => (r.copy(), null)), false) val baseIterator = sorter.iterator.map(_._1) val context = TaskContext.get() context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index f7d48bc53ebbc..10e0b51697f5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -118,7 +118,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { // Ensure we spilled something and have to merge them later assert(sorter.numSpills === 0) - sorter.insertAll(data) + sorter.insertAll(data, false) assert(sorter.numSpills > 0) // Merging spilled files should not throw assertion error