From 86a00c2da0184959d3e0e18400dbbc6f31e4bbc3 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 3 Feb 2017 18:14:33 -0800 Subject: [PATCH 1/9] [SPARK-13450] Introduce UnsafeRowExternalArray. Make SortMergeJoin and WindowExec use it --- .../ExternalAppendOnlyUnsafeRowArray.scala | 179 +++++++++++ .../execution/joins/SortMergeJoinExec.scala | 118 ++++--- .../sql/execution/window/RowBuffer.scala | 86 +---- .../sql/execution/window/WindowExec.scala | 58 +--- .../org/apache/spark/sql/JoinSuite.scala | 136 +++++++- ...xternalAppendOnlyUnsafeRowArraySuite.scala | 300 ++++++++++++++++++ .../execution/SQLWindowFunctionSuite.scala | 33 ++ 7 files changed, 741 insertions(+), 169 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala new file mode 100644 index 0000000000000..0aa055ef324f4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} + +/** + * An append-only array for [[UnsafeRow]]s that spills content to disk when there is insufficient + * space for it to grow. + * + * Setting spill threshold faces following trade-off: + * + * - If the spill threshold is too high, the in-memory array may occupy more memory than is + * available, resulting in OOM. + * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. + * This may lead to a performance regression compared to the normal case of using an + * [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { + private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = ArrayBuffer.empty[UnsafeRow] + + private var spillableArray: UnsafeExternalSorter = null + private var numElements = 0 + + // A counter to keep track of total additions made to this array since its creation. + // This helps to invalidate iterators when there are changes done to the backing array. + private var modCount: Long = 0 + + private var numFieldPerRow = 0 + + def length: Int = numElements + + def isEmpty: Boolean = numElements == 0 + + /** + * Clears up resources (eg. memory) held by the backing storage + */ + def clear(): Unit = { + if (spillableArray != null) { + // The last `spillableArray` of this task will be cleaned up via task completion listener + // inside `UnsafeExternalSorter` + spillableArray.cleanupResources() + spillableArray = null + } else { + inMemoryBuffer.clear() + } + numFieldPerRow = 0 + numElements = 0 + modCount += 1 + } + + def add(entry: InternalRow): Unit = { + val unsafeRow = entry.asInstanceOf[UnsafeRow] + + if (numElements < numRowsSpillThreshold) { + inMemoryBuffer += unsafeRow.copy() + } else { + if (spillableArray == null) { + logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " + + s"${classOf[org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray].getName}") + + // We will not sort the rows, so prefixComparator and recordComparator are null + spillableArray = UnsafeExternalSorter.create( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + null, + null, + if (numRowsSpillThreshold > 2) numRowsSpillThreshold / 2 else 1, + SparkEnv.get.memoryManager.pageSizeBytes, + numRowsSpillThreshold, + false) + + inMemoryBuffer.foreach(existingUnsafeRow => + spillableArray.insertRecord( + existingUnsafeRow.getBaseObject, + existingUnsafeRow.getBaseOffset, + existingUnsafeRow.getSizeInBytes, + 0, + false) + ) + inMemoryBuffer.clear() + numFieldPerRow = unsafeRow.numFields() + } + + spillableArray.insertRecord( + unsafeRow.getBaseObject, + unsafeRow.getBaseOffset, + unsafeRow.getSizeInBytes, + 0, + false) + } + + numElements += 1 + modCount += 1 + } + + /** + * Creates an [[Iterator]] with the current rows in the array. If there are subsequent + * [[add()]] or [[clear()]] calls made on this array after creation of the iterator, + * then the iterator is invalidated thus saving clients from getting inconsistent data. + */ + def generateIterator(): Iterator[UnsafeRow] = { + if (spillableArray == null) { + new InMemoryBufferIterator(inMemoryBuffer.iterator) + } else { + new SpillableArrayIterator(spillableArray.getIterator, numFieldPerRow) + } + } + + private[this] + abstract class ExternalAppendOnlyUnsafeRowArrayIterator extends Iterator[UnsafeRow] { + private val expectedModCount = modCount + + protected def checkForModification(): Unit = { + if (expectedModCount != modCount) { + throw new ConcurrentModificationException( + s"The backing ${classOf[ExternalAppendOnlyUnsafeRowArray].getName} has been modified " + + s"since the creation of this Iterator") + } + } + } + + private[this] class InMemoryBufferIterator(iterator: Iterator[UnsafeRow]) + extends ExternalAppendOnlyUnsafeRowArrayIterator { + + override def hasNext(): Boolean = { + checkForModification() + iterator.hasNext + } + + override def next(): UnsafeRow = { + checkForModification() + iterator.next() + } + } + + private[this] class SpillableArrayIterator(iterator: UnsafeSorterIterator, numFieldPerRow: Int) + extends ExternalAppendOnlyUnsafeRowArrayIterator { + + private val currentRow = new UnsafeRow(numFieldPerRow) + + override def hasNext(): Boolean = { + checkForModification() + iterator.hasNext + } + + override def next(): UnsafeRow = { + checkForModification() + iterator.loadNext() + currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength) + currentRow + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index ca9c0ed8cec32..d6a8604531fb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, RowIterator, SparkPlan} +import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, +ExternalAppendOnlyUnsafeRowArray, RowIterator, SparkPlan} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.collection.BitSet @@ -97,6 +98,11 @@ case class SortMergeJoinExec( protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") + val spillThreshold = + sqlContext.conf.getConfString( + "spark.sql.sortMergeJoinExec.buffer.spill.threshold", + Int.MaxValue.toString + ).toInt left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => val boundCondition: (InternalRow) => Boolean = { @@ -115,39 +121,39 @@ case class SortMergeJoinExec( case _: InnerLike => new RowIterator { private[this] var currentLeftRow: InternalRow = _ - private[this] var currentRightMatches: ArrayBuffer[InternalRow] = _ - private[this] var currentMatchIdx: Int = -1 + private[this] var currentRightMatches: ExternalAppendOnlyUnsafeRowArray = _ + private[this] var rightMatchesIterator: Iterator[UnsafeRow] = null private[this] val smjScanner = new SortMergeJoinScanner( createLeftKeyGenerator(), createRightKeyGenerator(), keyOrdering, RowIterator.fromScala(leftIter), - RowIterator.fromScala(rightIter) + RowIterator.fromScala(rightIter), + spillThreshold ) private[this] val joinRow = new JoinedRow if (smjScanner.findNextInnerJoinRows()) { currentRightMatches = smjScanner.getBufferedMatches currentLeftRow = smjScanner.getStreamedRow - currentMatchIdx = 0 + rightMatchesIterator = currentRightMatches.generateIterator() } override def advanceNext(): Boolean = { - while (currentMatchIdx >= 0) { - if (currentMatchIdx == currentRightMatches.length) { + while (rightMatchesIterator != null) { + if (!rightMatchesIterator.hasNext) { if (smjScanner.findNextInnerJoinRows()) { currentRightMatches = smjScanner.getBufferedMatches currentLeftRow = smjScanner.getStreamedRow - currentMatchIdx = 0 + rightMatchesIterator = currentRightMatches.generateIterator() } else { currentRightMatches = null currentLeftRow = null - currentMatchIdx = -1 + rightMatchesIterator = null return false } } - joinRow(currentLeftRow, currentRightMatches(currentMatchIdx)) - currentMatchIdx += 1 + joinRow(currentLeftRow, rightMatchesIterator.next()) if (boundCondition(joinRow)) { numOutputRows += 1 return true @@ -165,7 +171,8 @@ case class SortMergeJoinExec( bufferedKeyGenerator = createRightKeyGenerator(), keyOrdering, streamedIter = RowIterator.fromScala(leftIter), - bufferedIter = RowIterator.fromScala(rightIter) + bufferedIter = RowIterator.fromScala(rightIter), + spillThreshold ) val rightNullRow = new GenericInternalRow(right.output.length) new LeftOuterIterator( @@ -177,7 +184,8 @@ case class SortMergeJoinExec( bufferedKeyGenerator = createLeftKeyGenerator(), keyOrdering, streamedIter = RowIterator.fromScala(rightIter), - bufferedIter = RowIterator.fromScala(leftIter) + bufferedIter = RowIterator.fromScala(leftIter), + spillThreshold ) val leftNullRow = new GenericInternalRow(left.output.length) new RightOuterIterator( @@ -209,7 +217,8 @@ case class SortMergeJoinExec( createRightKeyGenerator(), keyOrdering, RowIterator.fromScala(leftIter), - RowIterator.fromScala(rightIter) + RowIterator.fromScala(rightIter), + spillThreshold ) private[this] val joinRow = new JoinedRow @@ -217,14 +226,15 @@ case class SortMergeJoinExec( while (smjScanner.findNextInnerJoinRows()) { val currentRightMatches = smjScanner.getBufferedMatches currentLeftRow = smjScanner.getStreamedRow - var i = 0 - while (i < currentRightMatches.length) { - joinRow(currentLeftRow, currentRightMatches(i)) - if (boundCondition(joinRow)) { - numOutputRows += 1 - return true + if (currentRightMatches != null && currentRightMatches.length > 0) { + val rightMatchesIterator = currentRightMatches.generateIterator() + while (rightMatchesIterator.hasNext) { + joinRow(currentLeftRow, rightMatchesIterator.next()) + if (boundCondition(joinRow)) { + numOutputRows += 1 + return true + } } - i += 1 } } false @@ -241,7 +251,8 @@ case class SortMergeJoinExec( createRightKeyGenerator(), keyOrdering, RowIterator.fromScala(leftIter), - RowIterator.fromScala(rightIter) + RowIterator.fromScala(rightIter), + spillThreshold ) private[this] val joinRow = new JoinedRow @@ -249,17 +260,16 @@ case class SortMergeJoinExec( while (smjScanner.findNextOuterJoinRows()) { currentLeftRow = smjScanner.getStreamedRow val currentRightMatches = smjScanner.getBufferedMatches - if (currentRightMatches == null) { + if (currentRightMatches == null || currentRightMatches.length == 0) { return true } - var i = 0 var found = false - while (!found && i < currentRightMatches.length) { - joinRow(currentLeftRow, currentRightMatches(i)) + val rightMatchesIterator = currentRightMatches.generateIterator() + while (!found && rightMatchesIterator.hasNext) { + joinRow(currentLeftRow, rightMatchesIterator.next()) if (boundCondition(joinRow)) { found = true } - i += 1 } if (!found) { numOutputRows += 1 @@ -281,7 +291,8 @@ case class SortMergeJoinExec( createRightKeyGenerator(), keyOrdering, RowIterator.fromScala(leftIter), - RowIterator.fromScala(rightIter) + RowIterator.fromScala(rightIter), + spillThreshold ) private[this] val joinRow = new JoinedRow @@ -290,14 +301,13 @@ case class SortMergeJoinExec( currentLeftRow = smjScanner.getStreamedRow val currentRightMatches = smjScanner.getBufferedMatches var found = false - if (currentRightMatches != null) { - var i = 0 - while (!found && i < currentRightMatches.length) { - joinRow(currentLeftRow, currentRightMatches(i)) + if (currentRightMatches != null && currentRightMatches.length > 0) { + val rightMatchesIterator = currentRightMatches.generateIterator() + while (!found && rightMatchesIterator.hasNext) { + joinRow(currentLeftRow, rightMatchesIterator.next()) if (boundCondition(joinRow)) { found = true } - i += 1 } } result.setBoolean(0, found) @@ -376,8 +386,15 @@ case class SortMergeJoinExec( // A list to hold all matched rows from right side. val matches = ctx.freshName("matches") - val clsName = classOf[java.util.ArrayList[InternalRow]].getName - ctx.addMutableState(clsName, matches, s"$matches = new $clsName();") + val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName + + val spillThreshold = + sqlContext.conf.getConfString( + "spark.sql.sortMergeJoinExec.buffer.spill.threshold", + Int.MaxValue.toString + ).toInt + + ctx.addMutableState(clsName, matches, s"$matches = new $clsName($spillThreshold);") // Copy the left keys as class members so they could be used in next function call. val matchedKeyVars = copyKeys(ctx, leftKeyVars) @@ -517,8 +534,7 @@ case class SortMergeJoinExec( val rightRow = ctx.freshName("rightRow") val rightVars = createRightVar(ctx, rightRow) - val size = ctx.freshName("size") - val i = ctx.freshName("i") + val iterator = ctx.freshName("iterator") val numOutput = metricTerm(ctx, "numOutputRows") val (beforeLoop, condCheck) = if (condition.isDefined) { // Split the code of creating variables based on whether it's used by condition or not. @@ -551,10 +567,10 @@ case class SortMergeJoinExec( s""" |while (findNextInnerJoinRows($leftInput, $rightInput)) { - | int $size = $matches.size(); | ${beforeLoop.trim} - | for (int $i = 0; $i < $size; $i ++) { - | InternalRow $rightRow = (InternalRow) $matches.get($i); + | scala.collection.Iterator $iterator = $matches.generateIterator(); + | while ($iterator.hasNext()) { + | InternalRow $rightRow = (InternalRow) $iterator.next(); | ${condCheck.trim} | $numOutput.add(1); | ${consume(ctx, leftVars ++ rightVars)} @@ -589,7 +605,8 @@ private[joins] class SortMergeJoinScanner( bufferedKeyGenerator: Projection, keyOrdering: Ordering[InternalRow], streamedIter: RowIterator, - bufferedIter: RowIterator) { + bufferedIter: RowIterator, + bufferThreshold: Int) { private[this] var streamedRow: InternalRow = _ private[this] var streamedRowKey: InternalRow = _ private[this] var bufferedRow: InternalRow = _ @@ -600,7 +617,7 @@ private[joins] class SortMergeJoinScanner( */ private[this] var matchJoinKey: InternalRow = _ /** Buffered rows from the buffered side of the join. This is empty if there are no matches. */ - private[this] val bufferedMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow] + private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(bufferThreshold) // Initialization (note: do _not_ want to advance streamed here). advancedBufferedToRowWithNullFreeJoinKey() @@ -609,7 +626,7 @@ private[joins] class SortMergeJoinScanner( def getStreamedRow: InternalRow = streamedRow - def getBufferedMatches: ArrayBuffer[InternalRow] = bufferedMatches + def getBufferedMatches: ExternalAppendOnlyUnsafeRowArray = bufferedMatches /** * Advances both input iterators, stopping when we have found rows with matching join keys. @@ -755,7 +772,7 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = streamedRowKey.copy() bufferedMatches.clear() do { - bufferedMatches += bufferedRow.copy() // need to copy mutable rows before buffering them + bufferedMatches.add(bufferedRow) advancedBufferedToRowWithNullFreeJoinKey() } while (bufferedRow != null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0) } @@ -819,7 +836,7 @@ private abstract class OneSideOuterIterator( protected[this] val joinedRow: JoinedRow = new JoinedRow() // Index of the buffered rows, reset to 0 whenever we advance to a new streamed row - private[this] var bufferIndex: Int = 0 + private[this] var rightMatchesIterator: Iterator[UnsafeRow] = null // This iterator is initialized lazily so there should be no matches initially assert(smjScanner.getBufferedMatches.length == 0) @@ -833,7 +850,7 @@ private abstract class OneSideOuterIterator( * @return whether there are more rows in the stream to consume. */ private def advanceStream(): Boolean = { - bufferIndex = 0 + rightMatchesIterator = null if (smjScanner.findNextOuterJoinRows()) { setStreamSideOutput(smjScanner.getStreamedRow) if (smjScanner.getBufferedMatches.isEmpty) { @@ -858,10 +875,13 @@ private abstract class OneSideOuterIterator( */ private def advanceBufferUntilBoundConditionSatisfied(): Boolean = { var foundMatch: Boolean = false - while (!foundMatch && bufferIndex < smjScanner.getBufferedMatches.length) { - setBufferedSideOutput(smjScanner.getBufferedMatches(bufferIndex)) + if (rightMatchesIterator == null) { + rightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() + } + + while (!foundMatch && rightMatchesIterator.hasNext) { + setBufferedSideOutput(rightMatchesIterator.next()) foundMatch = boundCondition(joinedRow) - bufferIndex += 1 } foundMatch } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala index ee36c84251519..0bd13e7b83c14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala @@ -17,99 +17,33 @@ package org.apache.spark.sql.execution.window -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} - +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray /** - * The interface of row buffer for a partition. In absence of a buffer pool (with locking), the + * Represents row buffer for a partition. In absence of a buffer pool (with locking), the * row buffer is used to materialize a partition of rows since we need to repeatedly scan these * rows in window function processing. */ -private[window] abstract class RowBuffer { - - /** Number of rows. */ - def size: Int - - /** Return next row in the buffer, null if no more left. */ - def next(): InternalRow - - /** Skip the next `n` rows. */ - def skip(n: Int): Unit - - /** Return a new RowBuffer that has the same rows. */ - def copy(): RowBuffer -} - -/** - * A row buffer based on ArrayBuffer (the number of rows is limited). - */ -private[window] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer { - - private[this] var cursor: Int = -1 - - /** Number of rows. */ - override def size: Int = buffer.length - - /** Return next row in the buffer, null if no more left. */ - override def next(): InternalRow = { - cursor += 1 - if (cursor < buffer.length) { - buffer(cursor) - } else { - null - } - } - - /** Skip the next `n` rows. */ - override def skip(n: Int): Unit = { - cursor += n - } - - /** Return a new RowBuffer that has the same rows. */ - override def copy(): RowBuffer = { - new ArrayRowBuffer(buffer) - } -} - -/** - * An external buffer of rows based on UnsafeExternalSorter. - */ -private[window] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int) - extends RowBuffer { - - private[this] val iter: UnsafeSorterIterator = sorter.getIterator - - private[this] val currentRow = new UnsafeRow(numFields) +private[window] class RowBuffer(appendOnlyExternalArray: ExternalAppendOnlyUnsafeRowArray) { + val iterator: Iterator[UnsafeRow] = appendOnlyExternalArray.generateIterator() /** Number of rows. */ - override def size: Int = iter.getNumRecords() + def size: Int = appendOnlyExternalArray.length /** Return next row in the buffer, null if no more left. */ - override def next(): InternalRow = { - if (iter.hasNext) { - iter.loadNext() - currentRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength) - currentRow - } else { - null - } - } + def next(): InternalRow = if (iterator.hasNext) iterator.next() else null /** Skip the next `n` rows. */ - override def skip(n: Int): Unit = { + def skip(n: Int): Unit = { var i = 0 - while (i < n && iter.hasNext) { - iter.loadNext() + while (i < n && iterator.hasNext) { + iterator.next() i += 1 } } /** Return a new RowBuffer that has the same rows. */ - override def copy(): RowBuffer = { - new ExternalRowBuffer(sorter, numFields) - } + def copy(): RowBuffer = new RowBuffer(appendOnlyExternalArray) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 80b87d5ffa797..d6ee74af4ce09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -20,15 +20,13 @@ package org.apache.spark.sql.execution.window import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.IntegerType -import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted) @@ -285,6 +283,9 @@ case class WindowExec( val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1) val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray + var spillThreshold = + sqlContext.conf.getConfString("spark.sql.windowExec.buffer.spill.threshold", "4096").toInt + // Start processing. child.execute().mapPartitions { stream => new Iterator[InternalRow] { @@ -310,10 +311,15 @@ case class WindowExec( fetchNextRow() // Manage the current partition. - val rows = ArrayBuffer.empty[UnsafeRow] val inputFields = child.output.length - var sorter: UnsafeExternalSorter = null var rowBuffer: RowBuffer = null + if (sqlContext == null) { + + } + + val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(spillThreshold) + val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType)) val frames = factories.map(_(windowFunctionResult)) val numFrames = frames.length @@ -323,48 +329,14 @@ case class WindowExec( val currentGroup = nextGroup.copy() // clear last partition - if (sorter != null) { - // the last sorter of this task will be cleaned up via task completion listener - sorter.cleanupResources() - sorter = null - } else { - rows.clear() - } + buffer.clear() while (nextRowAvailable && nextGroup == currentGroup) { - if (sorter == null) { - rows += nextRow.copy() - - if (rows.length >= 4096) { - // We will not sort the rows, so prefixComparator and recordComparator are null. - sorter = UnsafeExternalSorter.create( - TaskContext.get().taskMemoryManager(), - SparkEnv.get.blockManager, - SparkEnv.get.serializerManager, - TaskContext.get(), - null, - null, - 1024, - SparkEnv.get.memoryManager.pageSizeBytes, - SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", - UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), - false) - rows.foreach { r => - sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false) - } - rows.clear() - } - } else { - sorter.insertRecord(nextRow.getBaseObject, nextRow.getBaseOffset, - nextRow.getSizeInBytes, 0, false) - } + buffer.add(nextRow) fetchNextRow() } - if (sorter != null) { - rowBuffer = new ExternalRowBuffer(sorter, inputFields) - } else { - rowBuffer = new ArrayRowBuffer(rows) - } + + rowBuffer = new RowBuffer(buffer) // Setup the frames. var i = 0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 2e006735d123e..1a66aa85f5a02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import scala.collection.mutable.ListBuffer import scala.language.existentials import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -24,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext - +import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} class JoinSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -604,4 +605,137 @@ class JoinSuite extends QueryTest with SharedSQLContext { cartesianQueries.foreach(checkCartesianDetection) } + + test("test SortMergeJoin (without spill)") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1", + "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> Int.MaxValue.toString) { + + assertNotSpilled(sparkContext, "inner join") { + checkAnswer( + sql("SELECT * FROM testData JOIN testData2 ON key = a where key = 2"), + Row(2, "2", 2, 1) :: Row(2, "2", 2, 2) :: Nil + ) + } + + val expected = new ListBuffer[Row]() + expected.append( + Row(1, "1", 1, 1), Row(1, "1", 1, 2), + Row(2, "2", 2, 1), Row(2, "2", 2, 2), + Row(3, "3", 3, 1), Row(3, "3", 3, 2) + ) + for (i <- 4 to 100) { + expected.append(Row(i, i.toString, null, null)) + } + + assertNotSpilled(sparkContext, "left outer join") { + checkAnswer( + sql( + """ + |SELECT + | big.key, big.value, small.a, small.b + |FROM + | testData big + |LEFT OUTER JOIN + | testData2 small + |ON + | big.key = small.a + """.stripMargin), + expected + ) + } + + assertNotSpilled(sparkContext, "right outer join") { + checkAnswer( + sql( + """ + |SELECT + | big.key, big.value, small.a, small.b + |FROM + | testData2 small + |RIGHT OUTER JOIN + | testData big + |ON + | big.key = small.a + """.stripMargin), + expected + ) + } + } + } + + test("test SortMergeJoin (with spill)") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1", + "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "0") { + + assertSpilled(sparkContext, "inner join") { + checkAnswer( + sql("SELECT * FROM testData JOIN testData2 ON key = a where key = 2"), + Row(2, "2", 2, 1) :: Row(2, "2", 2, 2) :: Nil + ) + } + + val expected = new ListBuffer[Row]() + expected.append( + Row(1, "1", 1, 1), Row(1, "1", 1, 2), + Row(2, "2", 2, 1), Row(2, "2", 2, 2), + Row(3, "3", 3, 1), Row(3, "3", 3, 2) + ) + for (i <- 4 to 100) { + expected.append(Row(i, i.toString, null, null)) + } + + assertSpilled(sparkContext, "left outer join") { + checkAnswer( + sql( + """ + |SELECT + | big.key, big.value, small.a, small.b + |FROM + | testData big + |LEFT OUTER JOIN + | testData2 small + |ON + | big.key = small.a + """.stripMargin), + expected + ) + } + + assertSpilled(sparkContext, "right outer join") { + checkAnswer( + sql( + """ + |SELECT + | big.key, big.value, small.a, small.b + |FROM + | testData2 small + |RIGHT OUTER JOIN + | testData big + |ON + | big.key = small.a + """.stripMargin), + expected + ) + } + + // FULL OUTER JOIN still does not use [[ExternalAppendOnlyUnsafeRowArray]] + // so should not cause any spill + assertNotSpilled(sparkContext, "full outer join") { + checkAnswer( + sql( + """ + |SELECT + | big.key, big.value, small.a, small.b + |FROM + | testData2 small + |FULL OUTER JOIN + | testData big + |ON + | big.key = small.a + """.stripMargin), + expected + ) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala new file mode 100644 index 0000000000000..b44a917d660e4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark._ +import org.apache.spark.memory.MemoryTestingUtils +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext { + private val random = new java.util.Random() + + private def createSparkConf(): SparkConf = { + val conf = new SparkConf(false) + // Make the Java serializer write a reset instruction (TC_RESET) after each object to test + // for a bug we had with bytes written past the last object in a batch (SPARK-2792) + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + conf + } + + private def withExternalArray(spillThreshold: Int) + (f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = { + sc = new SparkContext("local", "test", createSparkConf()) + val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) + TaskContext.setTaskContext(taskContext) + val array = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) + + try f(array) finally { + array.clear() + sc.stop() + } + } + + private def insertRow(array: ExternalAppendOnlyUnsafeRowArray): Long = { + val valueInserted = random.nextLong() + + val row = new UnsafeRow(1) + row.pointTo(new Array[Byte](64), 16) + row.setLong(0, valueInserted) + array.add(row) + valueInserted + } + + private def checkIfValueExits(iterator: Iterator[UnsafeRow], expectedValue: Long): Unit = { + assert(iterator.hasNext) + val actualRow = iterator.next() + assert(actualRow.getLong(0) == expectedValue) + assert(actualRow.getSizeInBytes == 16) + } + + test("insert rows less than the spillThreshold") { + val spillThreshold = 100 + withExternalArray(spillThreshold) { array => + assert(array.isEmpty) + + val expectedValues = new ArrayBuffer[Long] + expectedValues.append(insertRow(array)) + assert(!array.isEmpty) + assert(array.length == 1) + + val iterator1 = array.generateIterator() + checkIfValueExits(iterator1, expectedValues.head) + assert(array.length == 1) + assert(!iterator1.hasNext) + + // Add more rows (but not too many to trigger switch to [[UnsafeExternalSorter]]) + while (expectedValues.length < spillThreshold) { + expectedValues.append(insertRow(array)) + } + assert(array.length == spillThreshold) + + // Verify that NO spill has happened + assert(TaskContext.get().taskMetrics().memoryBytesSpilled == 0) + + val iterator2 = array.generateIterator() + for (expectedValue <- expectedValues) { + checkIfValueExits(iterator2, expectedValue) + } + + intercept[ConcurrentModificationException](iterator1.hasNext) + assert(!iterator2.hasNext) + } + } + + test("insert rows more than the spillThreshold to force spill") { + val spillThreshold = 100 + withExternalArray(spillThreshold) { array => + assert(array.isEmpty) + + val numValuesInserted = 20 * spillThreshold + val expectedValues = new ArrayBuffer[Long] + + expectedValues.append(insertRow(array)) + assert(!array.isEmpty) + assert(array.length == 1) + + val iterator1 = array.generateIterator() + assert(iterator1.hasNext) + checkIfValueExits(iterator1, expectedValues.head) + assert(!iterator1.hasNext) + + while (expectedValues.length < numValuesInserted) { + expectedValues.append(insertRow(array)) + } + assert(array.length == numValuesInserted) + + // Verify that spill has happened + assert(TaskContext.get().taskMetrics().memoryBytesSpilled > 0) + + val iterator2 = array.generateIterator() + for (value <- expectedValues) { + checkIfValueExits(iterator2, value) + } + + assert(!iterator2.hasNext) + intercept[ConcurrentModificationException](iterator1.hasNext) + intercept[ConcurrentModificationException](iterator1.next()) + } + } + + test("iterator on an empty array should be empty") { + withExternalArray(spillThreshold = 10) { array => + val iterator = array.generateIterator() + assert(array.isEmpty) + assert(array.length == 0) + assert(!iterator.hasNext) + } + } + + test("test iterator invalidation (without spill)") { + withExternalArray(spillThreshold = 10) { array => + // insert 2 rows, iterate until the first row + insertRow(array) + insertRow(array) + + var iterator = array.generateIterator() + assert(iterator.hasNext) + iterator.next() + + // Adding more row(s) should invalidate any old iterators + insertRow(array) + intercept[ConcurrentModificationException](iterator.hasNext) + intercept[ConcurrentModificationException](iterator.next()) + + // Clearing the array should also invalidate any old iterators + iterator = array.generateIterator() + assert(iterator.hasNext) + iterator.next() + + array.clear() + intercept[ConcurrentModificationException](iterator.hasNext) + intercept[ConcurrentModificationException](iterator.next()) + } + } + + test("test iterator invalidation (with spill)") { + val spillThreshold = 10 + withExternalArray(spillThreshold) { array => + for (_ <- 0 until (spillThreshold * 2)) { + insertRow(array) + } + // Verify that spill has happened + assert(TaskContext.get().taskMetrics().memoryBytesSpilled > 0) + + var iterator = array.generateIterator() + assert(iterator.hasNext) + iterator.next() + + // Adding more row(s) should invalidate any old iterators + insertRow(array) + intercept[ConcurrentModificationException](iterator.hasNext) + intercept[ConcurrentModificationException](iterator.next()) + + // Clearing the array should also invalidate any old iterators + iterator = array.generateIterator() + assert(iterator.hasNext) + iterator.next() + + array.clear() + intercept[ConcurrentModificationException](iterator.hasNext) + intercept[ConcurrentModificationException](iterator.next()) + } + } + + test("clear on an empty the array") { + withExternalArray(spillThreshold = 2) { array => + val iterator = array.generateIterator() + assert(!iterator.hasNext) + + // multiple clear'ing should not have an side-effect + array.clear() + array.clear() + array.clear() + assert(array.isEmpty) + assert(array.length == 0) + + // Clearing an empty array should also invalidate any old iterators + intercept[ConcurrentModificationException](iterator.hasNext) + intercept[ConcurrentModificationException](iterator.next()) + } + } + + test("clear array (without spill)") { + val spillThreshold = 10 + withExternalArray(spillThreshold) { array => + for (_ <- 0 until (spillThreshold / 2)) { + insertRow(array) + } + // Verify that NO spill has happened + assert(TaskContext.get().taskMetrics().memoryBytesSpilled == 0) + + // Clear the array + array.clear() + assert(array.isEmpty) + assert(array.length == 0) + + // Re-populate few rows so that there is no spill + val expectedValues = new ArrayBuffer[Long] + for (_ <- 0 until (spillThreshold / 3)) { + expectedValues.append(insertRow(array)) + } + // Verify the data. Verify that there was no spill + assert(TaskContext.get().taskMetrics().memoryBytesSpilled == 0) + var iterator = array.generateIterator() + for (value <- expectedValues) { + checkIfValueExits(iterator, value) + } + + // Populate more rows .. enough to not trigger a spill + for (_ <- 0 until (spillThreshold / 3)) { + expectedValues.append(insertRow(array)) + } + // Verify the data. Verify that there was no spill + assert(TaskContext.get().taskMetrics().memoryBytesSpilled == 0) + iterator = array.generateIterator() + for (value <- expectedValues) { + checkIfValueExits(iterator, value) + } + } + } + + test("clear array (with spill)") { + val spillThreshold = 10 + withExternalArray(spillThreshold) { array => + for (_ <- 0 until (spillThreshold * 2)) { + insertRow(array) + } + // Verify that spill has happened + val bytesSpilled = TaskContext.get().taskMetrics().memoryBytesSpilled + assert(bytesSpilled > 0) + + // Clear the array + array.clear() + assert(array.isEmpty) + assert(array.length == 0) + + // Re-populate the array ... but not upto the point that there is spill + val expectedValues = new ArrayBuffer[Long] + for (_ <- 0 until (spillThreshold / 2)) { + expectedValues.append(insertRow(array)) + } + // Verify the data. Verify that there was no "extra" spill + assert(TaskContext.get().taskMetrics().memoryBytesSpilled == bytesSpilled) + var iterator = array.generateIterator() + for (value <- expectedValues) { + checkIfValueExits(iterator, value) + } + + // Populate more rows to trigger spill + for (_ <- 0 until (spillThreshold * 2)) { + expectedValues.append(insertRow(array)) + } + // Verify the data. Verify that there was "extra" spill + assert(TaskContext.get().taskMetrics().memoryBytesSpilled > bytesSpilled) + iterator = array.generateIterator() + for (value <- expectedValues) { + checkIfValueExits(iterator, value) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index afd47897ed4b2..52e4f047225de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.TestUtils.assertSpilled case class WindowData(month: Int, area: String, product: Int) @@ -412,4 +413,36 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { """.stripMargin), Row(1, 3, null) :: Row(2, null, 4) :: Nil) } + + test("test with low buffer spill threshold") { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + + val expected = + Row(1, 1, 1) :: + Row(0, 2, 3) :: + Row(1, 3, 6) :: + Row(0, 4, 10) :: + Row(1, 5, 15) :: + Row(0, 6, 21) :: + Row(1, 7, 28) :: + Row(0, 8, 36) :: + Row(1, 9, 45) :: + Row(0, 10, 55) :: Nil + + val actual = sql( + """ + |SELECT y, x, sum(x) OVER w1 AS running_sum + |FROM nums + |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDiNG AND CURRENT RoW) + """.stripMargin) + + withSQLConf("spark.sql.windowExec.buffer.spill.threshold" -> "1") { + assertSpilled(sparkContext, "test with low buffer spill threshold") { + checkAnswer(actual, expected) + } + } + + spark.catalog.dropTempView("nums") + } } From 22a5eca02868206268dcb583893a0f6ac98fb63e Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Mon, 13 Feb 2017 06:56:40 -0800 Subject: [PATCH 2/9] round #1 review comments --- .../apache/spark/sql/internal/SQLConf.scala | 17 ++ .../ExternalAppendOnlyUnsafeRowArray.scala | 107 +++++--- .../execution/joins/SortMergeJoinExec.scala | 21 +- .../sql/execution/window/RowBuffer.scala | 49 ---- .../sql/execution/window/WindowExec.scala | 30 +-- .../window/WindowFunctionFrame.scala | 93 ++++--- ...nalAppendOnlyUnsafeRowArrayBenchmark.scala | 150 +++++++++++ ...xternalAppendOnlyUnsafeRowArraySuite.scala | 240 +++++++++++------- 8 files changed, 465 insertions(+), 242 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8f65672d5a839..a46a98b50b960 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -715,6 +715,18 @@ object SQLConf { .stringConf .createWithDefault(TimeZone.getDefault().getID()) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = + buildConf("spark.sql.windowExec.buffer.spill.threshold") + .doc("Threshold for number of rows buffered in window operator") + .intConf + .createWithDefault(4096) + + val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD = + buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold") + .doc("Threshold for number of rows buffered in sort merge join operator") + .intConf + .createWithDefault(Int.MaxValue) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -945,6 +957,11 @@ class SQLConf extends Serializable with Logging { def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) + def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD) + + def sortMergeJoinExecBufferSpillThreshold: Int = + getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD) + def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index 0aa055ef324f4..9ad16cf5bd21c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -23,13 +23,13 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} /** - * An append-only array for [[UnsafeRow]]s that spills content to disk when there is insufficient - * space for it to grow. + * An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined + * threshold of rows is reached. * * Setting spill threshold faces following trade-off: * @@ -40,20 +40,23 @@ import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, Unsaf * [[ArrayBuffer]] or [[Array]]. */ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { - private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = ArrayBuffer.empty[UnsafeRow] + private val initialSizeOfInMemoryBuffer = + Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold) - private var spillableArray: UnsafeExternalSorter = null - private var numElements = 0 + private val inMemoryBuffer = new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer) - // A counter to keep track of total additions made to this array since its creation. + private var spillableArray: UnsafeExternalSorter = _ + private var numRows = 0 + + // A counter to keep track of total modifications done to this array since its creation. // This helps to invalidate iterators when there are changes done to the backing array. - private var modCount: Long = 0 + private var modificationsCount: Long = 0 - private var numFieldPerRow = 0 + private var numFieldsPerRow = 0 - def length: Int = numElements + def length: Int = numRows - def isEmpty: Boolean = numElements == 0 + def isEmpty: Boolean = numRows == 0 /** * Clears up resources (eg. memory) held by the backing storage @@ -67,20 +70,18 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) } else { inMemoryBuffer.clear() } - numFieldPerRow = 0 - numElements = 0 - modCount += 1 + numFieldsPerRow = 0 + numRows = 0 + modificationsCount += 1 } - def add(entry: InternalRow): Unit = { - val unsafeRow = entry.asInstanceOf[UnsafeRow] - - if (numElements < numRowsSpillThreshold) { + def add(unsafeRow: UnsafeRow): Unit = { + if (numRows < numRowsSpillThreshold) { inMemoryBuffer += unsafeRow.copy() } else { if (spillableArray == null) { logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " + - s"${classOf[org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray].getName}") + s"${classOf[UnsafeExternalSorter].getName}") // We will not sort the rows, so prefixComparator and recordComparator are null spillableArray = UnsafeExternalSorter.create( @@ -104,7 +105,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) false) ) inMemoryBuffer.clear() - numFieldPerRow = unsafeRow.numFields() + numFieldsPerRow = unsafeRow.numFields() } spillableArray.insertRecord( @@ -115,29 +116,39 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) false) } - numElements += 1 - modCount += 1 + numRows += 1 + modificationsCount += 1 } /** - * Creates an [[Iterator]] with the current rows in the array. If there are subsequent - * [[add()]] or [[clear()]] calls made on this array after creation of the iterator, - * then the iterator is invalidated thus saving clients from getting inconsistent data. + * Creates an [[Iterator]] for the current rows in the array starting from a user provided index + * + * If there are subsequent [[add()]] or [[clear()]] calls made on this array after creation of + * the iterator, then the iterator is invalidated thus saving clients from thinking that they + * have read all the data while there were new rows added to this array. */ - def generateIterator(): Iterator[UnsafeRow] = { + def generateIterator(startIndex: Int): Iterator[UnsafeRow] = { + if (startIndex < 0 || (numRows > 0 && startIndex > numRows)) { + throw new ArrayIndexOutOfBoundsException( + "Invalid `startIndex` provided for generating iterator over the array. " + + s"Total elements: $numRows, requested `startIndex`: $startIndex") + } + if (spillableArray == null) { - new InMemoryBufferIterator(inMemoryBuffer.iterator) + new InMemoryBufferIterator(startIndex) } else { - new SpillableArrayIterator(spillableArray.getIterator, numFieldPerRow) + new SpillableArrayIterator(spillableArray.getIterator, numFieldsPerRow, startIndex) } } + def generateIterator(): Iterator[UnsafeRow] = generateIterator(startIndex = 0) + private[this] abstract class ExternalAppendOnlyUnsafeRowArrayIterator extends Iterator[UnsafeRow] { - private val expectedModCount = modCount + private val expectedModificationsCount = modificationsCount protected def checkForModification(): Unit = { - if (expectedModCount != modCount) { + if (expectedModificationsCount != modificationsCount) { throw new ConcurrentModificationException( s"The backing ${classOf[ExternalAppendOnlyUnsafeRowArray].getName} has been modified " + s"since the creation of this Iterator") @@ -145,25 +156,49 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) } } - private[this] class InMemoryBufferIterator(iterator: Iterator[UnsafeRow]) + private[this] class InMemoryBufferIterator(startIndex: Int) extends ExternalAppendOnlyUnsafeRowArrayIterator { + private var currentIndex = startIndex + override def hasNext(): Boolean = { checkForModification() - iterator.hasNext + currentIndex < numRows } override def next(): UnsafeRow = { checkForModification() - iterator.next() + val result = inMemoryBuffer(currentIndex) + currentIndex += 1 + result } } - private[this] class SpillableArrayIterator(iterator: UnsafeSorterIterator, numFieldPerRow: Int) + private[this] class SpillableArrayIterator( + iterator: UnsafeSorterIterator, + numFieldPerRow: Int, + startIndex: Int) extends ExternalAppendOnlyUnsafeRowArrayIterator { private val currentRow = new UnsafeRow(numFieldPerRow) + def init(): Unit = { + var i = 0 + while (i < startIndex) { + if (iterator.hasNext) { + iterator.loadNext() + } else { + throw new ArrayIndexOutOfBoundsException( + "Invalid `startIndex` provided for generating iterator over the array. " + + s"Total elements: $numRows, requested `startIndex`: $startIndex") + } + i += 1 + } + } + + // Traverse upto the given [[startIndex]] + init() + override def hasNext(): Boolean = { checkForModification() iterator.hasNext @@ -177,3 +212,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) } } } + +private[sql] object ExternalAppendOnlyUnsafeRowArray { + private val DefaultInitialSizeOfInMemoryBuffer = 128 +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index d6a8604531fb0..bcdc4dcdf7d99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -96,14 +96,13 @@ case class SortMergeJoinExec( private def createRightKeyGenerator(): Projection = UnsafeProjection.create(rightKeys, right.output) + private def getSpillThreshold: Int = { + sqlContext.conf.sortMergeJoinExecBufferSpillThreshold + } + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - val spillThreshold = - sqlContext.conf.getConfString( - "spark.sql.sortMergeJoinExec.buffer.spill.threshold", - Int.MaxValue.toString - ).toInt - + val spillThreshold = getSpillThreshold left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => val boundCondition: (InternalRow) => Boolean = { condition.map { cond => @@ -388,11 +387,7 @@ case class SortMergeJoinExec( val matches = ctx.freshName("matches") val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName - val spillThreshold = - sqlContext.conf.getConfString( - "spark.sql.sortMergeJoinExec.buffer.spill.threshold", - Int.MaxValue.toString - ).toInt + val spillThreshold = getSpillThreshold ctx.addMutableState(clsName, matches, s"$matches = new $clsName($spillThreshold);") // Copy the left keys as class members so they could be used in next function call. @@ -445,7 +440,7 @@ case class SortMergeJoinExec( | } | $leftRow = null; | } else { - | $matches.add($rightRow.copy()); + | $matches.add((UnsafeRow) $rightRow); | $rightRow = null;; | } | } while ($leftRow != null); @@ -772,7 +767,7 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = streamedRowKey.copy() bufferedMatches.clear() do { - bufferedMatches.add(bufferedRow) + bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow]) advancedBufferedToRowWithNullFreeJoinKey() } while (bufferedRow != null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala deleted file mode 100644 index 0bd13e7b83c14..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.window - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray - -/** - * Represents row buffer for a partition. In absence of a buffer pool (with locking), the - * row buffer is used to materialize a partition of rows since we need to repeatedly scan these - * rows in window function processing. - */ -private[window] class RowBuffer(appendOnlyExternalArray: ExternalAppendOnlyUnsafeRowArray) { - val iterator: Iterator[UnsafeRow] = appendOnlyExternalArray.generateIterator() - - /** Number of rows. */ - def size: Int = appendOnlyExternalArray.length - - /** Return next row in the buffer, null if no more left. */ - def next(): InternalRow = if (iterator.hasNext) iterator.next() else null - - /** Skip the next `n` rows. */ - def skip(n: Int): Unit = { - var i = 0 - while (i < n && iterator.hasNext) { - iterator.next() - i += 1 - } - } - - /** Return a new RowBuffer that has the same rows. */ - def copy(): RowBuffer = new RowBuffer(appendOnlyExternalArray) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index d6ee74af4ce09..950a6794a74a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -282,9 +282,7 @@ case class WindowExec( // Unwrap the expressions and factories from the map. val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1) val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray - - var spillThreshold = - sqlContext.conf.getConfString("spark.sql.windowExec.buffer.spill.threshold", "4096").toInt + val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold // Start processing. child.execute().mapPartitions { stream => @@ -312,13 +310,10 @@ case class WindowExec( // Manage the current partition. val inputFields = child.output.length - var rowBuffer: RowBuffer = null - if (sqlContext == null) { - - } val buffer: ExternalAppendOnlyUnsafeRowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) + var bufferIterator: Iterator[UnsafeRow] = _ val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType)) val frames = factories.map(_(windowFunctionResult)) @@ -336,37 +331,36 @@ case class WindowExec( fetchNextRow() } - rowBuffer = new RowBuffer(buffer) - // Setup the frames. var i = 0 while (i < numFrames) { - frames(i).prepare(rowBuffer.copy()) + frames(i).prepare(buffer) i += 1 } // Setup iteration rowIndex = 0 - rowsSize = rowBuffer.size + bufferIterator = buffer.generateIterator() } // Iteration var rowIndex = 0 - var rowsSize = 0L - override final def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable + override final def hasNext: Boolean = + (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable val join = new JoinedRow override final def next(): InternalRow = { // Load the next partition if we need to. - if (rowIndex >= rowsSize && nextRowAvailable) { + if ((bufferIterator == null || !bufferIterator.hasNext) && nextRowAvailable) { fetchNextPartition() } - if (rowIndex < rowsSize) { + if (bufferIterator.hasNext) { + val current = bufferIterator.next() + // Get the results for the window frames. var i = 0 - val current = rowBuffer.next() while (i < numFrames) { frames(i).write(rowIndex, current) i += 1 @@ -378,7 +372,9 @@ case class WindowExec( // Return the projection. result(join) - } else throw new NoSuchElementException + } else { + throw new NoSuchElementException + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala index 70efc0f78ddb0..412d337a62308 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala @@ -22,6 +22,7 @@ import java.util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray /** @@ -35,7 +36,7 @@ private[window] abstract class WindowFunctionFrame { * * @param rows to calculate the frame results for. */ - def prepare(rows: RowBuffer): Unit + def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit /** * Write the current results to the target row. @@ -65,7 +66,12 @@ private[window] final class OffsetWindowFunctionFrame( extends WindowFunctionFrame { /** Rows of the partition currently being processed. */ - private[this] var input: RowBuffer = null + private[this] var input: ExternalAppendOnlyUnsafeRowArray = null + + /** + * An iterator over the [[input]] + */ + private[this] var inputIterator: Iterator[UnsafeRow] = _ /** Index of the input row currently used for output. */ private[this] var inputIndex = 0 @@ -103,20 +109,21 @@ private[window] final class OffsetWindowFunctionFrame( newMutableProjection(boundExpressions, Nil).target(target) } - override def prepare(rows: RowBuffer): Unit = { + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { input = rows + inputIterator = input.generateIterator() // drain the first few rows if offset is larger than zero inputIndex = 0 while (inputIndex < offset) { - input.next() + if (inputIterator.hasNext) inputIterator.next() inputIndex += 1 } inputIndex = offset } override def write(index: Int, current: InternalRow): Unit = { - if (inputIndex >= 0 && inputIndex < input.size) { - val r = input.next() + if (inputIndex >= 0 && inputIndex < input.length) { + val r = if (inputIterator.hasNext) inputIterator.next() else null projection(r) } else { // Use default values since the offset row does not exist. @@ -143,7 +150,12 @@ private[window] final class SlidingWindowFunctionFrame( extends WindowFunctionFrame { /** Rows of the partition currently being processed. */ - private[this] var input: RowBuffer = null + private[this] var input: ExternalAppendOnlyUnsafeRowArray = null + + /** + * An iterator over the [[input]] + */ + private[this] var inputIterator: Iterator[UnsafeRow] = _ /** The next row from `input`. */ private[this] var nextRow: InternalRow = null @@ -164,9 +176,12 @@ private[window] final class SlidingWindowFunctionFrame( private[this] var inputLowIndex = 0 /** Prepare the frame for calculating a new partition. Reset all variables. */ - override def prepare(rows: RowBuffer): Unit = { + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { input = rows - nextRow = rows.next() + inputIterator = input.generateIterator() + if (inputIterator.hasNext) { + nextRow = inputIterator.next() + } inputHighIndex = 0 inputLowIndex = 0 buffer.clear() @@ -180,7 +195,7 @@ private[window] final class SlidingWindowFunctionFrame( // the output row upper bound. while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) { buffer.add(nextRow.copy()) - nextRow = input.next() + nextRow = if (inputIterator.hasNext) inputIterator.next() else null inputHighIndex += 1 bufferUpdated = true } @@ -195,7 +210,7 @@ private[window] final class SlidingWindowFunctionFrame( // Only recalculate and update when the buffer changes. if (bufferUpdated) { - processor.initialize(input.size) + processor.initialize(input.length) val iter = buffer.iterator() while (iter.hasNext) { processor.update(iter.next()) @@ -222,13 +237,12 @@ private[window] final class UnboundedWindowFunctionFrame( extends WindowFunctionFrame { /** Prepare the frame for calculating a new partition. Process all rows eagerly. */ - override def prepare(rows: RowBuffer): Unit = { - val size = rows.size - processor.initialize(size) - var i = 0 - while (i < size) { - processor.update(rows.next()) - i += 1 + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { + processor.initialize(rows.length) + + val iterator = rows.generateIterator() + while (iterator.hasNext) { + processor.update(iterator.next()) } } @@ -261,7 +275,12 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame( extends WindowFunctionFrame { /** Rows of the partition currently being processed. */ - private[this] var input: RowBuffer = null + private[this] var input: ExternalAppendOnlyUnsafeRowArray = null + + /** + * An iterator over the [[input]] + */ + private[this] var inputIterator: Iterator[UnsafeRow] = _ /** The next row from `input`. */ private[this] var nextRow: InternalRow = null @@ -273,11 +292,15 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame( private[this] var inputIndex = 0 /** Prepare the frame for calculating a new partition. */ - override def prepare(rows: RowBuffer): Unit = { + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { input = rows - nextRow = rows.next() inputIndex = 0 - processor.initialize(input.size) + inputIterator = input.generateIterator() + if (inputIterator.hasNext) { + nextRow = inputIterator.next() + } + + processor.initialize(input.length) } /** Write the frame columns for the current row to the given target row. */ @@ -288,7 +311,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame( // the output row upper bound. while (nextRow != null && ubound.compare(nextRow, inputIndex, current, index) <= 0) { processor.update(nextRow) - nextRow = input.next() + nextRow = if (inputIterator.hasNext) inputIterator.next() else null inputIndex += 1 bufferUpdated = true } @@ -323,7 +346,7 @@ private[window] final class UnboundedFollowingWindowFunctionFrame( extends WindowFunctionFrame { /** Rows of the partition currently being processed. */ - private[this] var input: RowBuffer = null + private[this] var input: ExternalAppendOnlyUnsafeRowArray = null /** * Index of the first input row with a value equal to or greater than the lower bound of the @@ -332,7 +355,7 @@ private[window] final class UnboundedFollowingWindowFunctionFrame( private[this] var inputIndex = 0 /** Prepare the frame for calculating a new partition. */ - override def prepare(rows: RowBuffer): Unit = { + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { input = rows inputIndex = 0 } @@ -341,25 +364,27 @@ private[window] final class UnboundedFollowingWindowFunctionFrame( override def write(index: Int, current: InternalRow): Unit = { var bufferUpdated = index == 0 - // Duplicate the input to have a new iterator - val tmp = input.copy() - - // Drop all rows from the buffer for which the input row value is smaller than + // Ignore all the rows from the buffer for which the input row value is smaller than // the output row lower bound. - tmp.skip(inputIndex) - var nextRow = tmp.next() + val iterator = input.generateIterator(startIndex = inputIndex) + + def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = { + if (iterator.hasNext) iterator.next() else null + } + + var nextRow = getNextOrNull(iterator) while (nextRow != null && lbound.compare(nextRow, inputIndex, current, index) < 0) { - nextRow = tmp.next() inputIndex += 1 bufferUpdated = true + nextRow = getNextOrNull(iterator) } // Only recalculate and update when the buffer changes. if (bufferUpdated) { - processor.initialize(input.size) + processor.initialize(input.length) while (nextRow != null) { processor.update(nextRow) - nextRow = tmp.next() + nextRow = getNextOrNull(iterator) } processor.evaluate(target) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala new file mode 100644 index 0000000000000..26f54615d6d0e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.memory.MemoryTestingUtils +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.Benchmark + +object ExternalAppendOnlyUnsafeRowArrayBenchmark { + + def test(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { + val random = new java.util.Random() + val rows = (1 to numRows).map(_ => { + val row = new UnsafeRow(1) + row.pointTo(new Array[Byte](64), 16) + row.setLong(0, random.nextLong()) + row + }) + + val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows) + + benchmark.addCase("ArrayBuffer version") { _: Int => + var sum = 0L + for (_ <- 0L until iterations) { + val array = new ArrayBuffer[UnsafeRow]() + rows.foreach(array.append(_)) + + var i = 0 + val n = array.length + while (i < n) { + if (i < n) { + sum = sum + array(i).getLong(0) + i += 1 + } + } + array.clear() + } + } + + benchmark.addCase("ExternalAppendOnlyUnsafeRowArray version") { _: Int => + var sum = 0L + for (_ <- 0L until iterations) { + val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold) + rows.foreach(array.add) + + val iterator = array.generateIterator() + while (iterator.hasNext) { + sum = sum + iterator.next().getLong(0) + } + + array.clear() + } + } + + val conf = new SparkConf(false) + // Make the Java serializer write a reset instruction (TC_RESET) after each object to test + // for a bug we had with bytes written past the last object in a batch (SPARK-2792) + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + + val sc = new SparkContext("local", "test", conf) + val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) + TaskContext.setTaskContext(taskContext) + benchmark.run() + sc.stop() + } + + def main(args: Array[String]): Unit = { + + // ----------------------------------------------------------------------------------------- // + // WITHOUT SPILL + // ----------------------------------------------------------------------------------------- // + + var spillThreshold = 100 * 1000 + + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + + Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + ArrayBuffer version 1969 / 2004 33.3 30.0 1.0X + ExternalAppendOnlyUnsafeRowArray version 2184 / 2233 30.0 33.3 0.9X + */ + test(spillThreshold, 1000, 1 << 16) + + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + + Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + ArrayBuffer version 2442 / 2465 50.3 19.9 1.0X + ExternalAppendOnlyUnsafeRowArray version 5015 / 5039 24.5 40.8 0.5X + */ + test(spillThreshold, 30 * 1000, 1 << 12) + + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + + Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + ArrayBuffer version 4492 / 4575 22.8 43.9 1.0X + ExternalAppendOnlyUnsafeRowArray version 6654 / 6656 15.4 65.0 0.7X + */ + test(spillThreshold, 100 * 1000, 1 << 10) + + // ----------------------------------------------------------------------------------------- // + // WITH SPILL + // ----------------------------------------------------------------------------------------- // + + spillThreshold = 100 + + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + + Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + ArrayBuffer version 4 / 5 61.2 16.3 1.0X + ExternalAppendOnlyUnsafeRowArray version 2331 / 2356 0.1 9106.1 0.0X + */ + test(spillThreshold, 1000, 1 << 8) + + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + + Array with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + ArrayBuffer version 3 / 4 57.6 17.3 1.0X + ExternalAppendOnlyUnsafeRowArray version 2229 / 2310 0.1 13928.8 0.0X + */ + test(spillThreshold, 10 * 1000, 1 << 4) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index b44a917d660e4..4e38349f31820 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -60,41 +60,72 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar valueInserted } - private def checkIfValueExits(iterator: Iterator[UnsafeRow], expectedValue: Long): Unit = { + private def checkIfValueExists(iterator: Iterator[UnsafeRow], expectedValue: Long): Unit = { assert(iterator.hasNext) val actualRow = iterator.next() assert(actualRow.getLong(0) == expectedValue) assert(actualRow.getSizeInBytes == 16) } + private def validateData( + array: ExternalAppendOnlyUnsafeRowArray, + expectedValues: ArrayBuffer[Long]): Iterator[UnsafeRow] = { + val iterator = array.generateIterator() + for (value <- expectedValues) { + checkIfValueExists(iterator, value) + } + + assert(!iterator.hasNext) + iterator + } + + private def populateRows( + array: ExternalAppendOnlyUnsafeRowArray, + numRowsToBePopulated: Int): ArrayBuffer[Long] = { + val populatedValues = new ArrayBuffer[Long] + populateRows(array, numRowsToBePopulated, populatedValues) + } + + private def populateRows( + array: ExternalAppendOnlyUnsafeRowArray, + numRowsToBePopulated: Int, + populatedValues: ArrayBuffer[Long]): ArrayBuffer[Long] = { + for (_ <- 0 until numRowsToBePopulated) { + populatedValues.append(insertRow(array)) + } + populatedValues + } + + private def getNumBytesSpilled: Long = { + TaskContext.get().taskMetrics().memoryBytesSpilled + } + + private def assertNoSpill(): Unit = { + assert(getNumBytesSpilled == 0) + } + + private def assertSpill(): Unit = { + assert(getNumBytesSpilled > 0) + } + test("insert rows less than the spillThreshold") { val spillThreshold = 100 withExternalArray(spillThreshold) { array => assert(array.isEmpty) - val expectedValues = new ArrayBuffer[Long] - expectedValues.append(insertRow(array)) + val expectedValues = populateRows(array, 1) assert(!array.isEmpty) assert(array.length == 1) - val iterator1 = array.generateIterator() - checkIfValueExits(iterator1, expectedValues.head) - assert(array.length == 1) - assert(!iterator1.hasNext) + val iterator1 = validateData(array, expectedValues) // Add more rows (but not too many to trigger switch to [[UnsafeExternalSorter]]) - while (expectedValues.length < spillThreshold) { - expectedValues.append(insertRow(array)) - } - assert(array.length == spillThreshold) - // Verify that NO spill has happened - assert(TaskContext.get().taskMetrics().memoryBytesSpilled == 0) + populateRows(array, spillThreshold - 1, expectedValues) + assert(array.length == spillThreshold) + assertNoSpill() - val iterator2 = array.generateIterator() - for (expectedValue <- expectedValues) { - checkIfValueExits(iterator2, expectedValue) - } + val iterator2 = validateData(array, expectedValues) intercept[ConcurrentModificationException](iterator1.hasNext) assert(!iterator2.hasNext) @@ -104,34 +135,22 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar test("insert rows more than the spillThreshold to force spill") { val spillThreshold = 100 withExternalArray(spillThreshold) { array => - assert(array.isEmpty) - val numValuesInserted = 20 * spillThreshold - val expectedValues = new ArrayBuffer[Long] - expectedValues.append(insertRow(array)) - assert(!array.isEmpty) + assert(array.isEmpty) + val expectedValues = populateRows(array, 1) assert(array.length == 1) - val iterator1 = array.generateIterator() - assert(iterator1.hasNext) - checkIfValueExits(iterator1, expectedValues.head) - assert(!iterator1.hasNext) + val iterator1 = validateData(array, expectedValues) - while (expectedValues.length < numValuesInserted) { - expectedValues.append(insertRow(array)) - } + // Populate more rows to trigger spill. Verify that spill has happened + populateRows(array, numValuesInserted - 1, expectedValues) assert(array.length == numValuesInserted) + assertSpill() - // Verify that spill has happened - assert(TaskContext.get().taskMetrics().memoryBytesSpilled > 0) - - val iterator2 = array.generateIterator() - for (value <- expectedValues) { - checkIfValueExits(iterator2, value) - } - + val iterator2 = validateData(array, expectedValues) assert(!iterator2.hasNext) + intercept[ConcurrentModificationException](iterator1.hasNext) intercept[ConcurrentModificationException](iterator1.next()) } @@ -146,18 +165,79 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } } + test("generate iterator with negative start index") { + withExternalArray(spillThreshold = 2) { array => + val exception = + intercept[ArrayIndexOutOfBoundsException](array.generateIterator(startIndex = -10)) + + assert(exception.getMessage.contains( + "Invalid `startIndex` provided for generating iterator over the array") + ) + } + } + + test("generate iterator with start index exceeding array's size (without spill)") { + val spillThreshold = 2 + withExternalArray(spillThreshold) { array => + populateRows(array, spillThreshold / 2) + + val exception = + intercept[ArrayIndexOutOfBoundsException]( + array.generateIterator(startIndex = spillThreshold * 10)) + assert(exception.getMessage.contains( + "Invalid `startIndex` provided for generating iterator over the array")) + } + } + + test("generate iterator with start index exceeding array's size (with spill)") { + val spillThreshold = 2 + withExternalArray(spillThreshold) { array => + populateRows(array, spillThreshold * 2) + + val exception = + intercept[ArrayIndexOutOfBoundsException]( + array.generateIterator(startIndex = spillThreshold * 10)) + + assert(exception.getMessage.contains( + "Invalid `startIndex` provided for generating iterator over the array")) + } + } + + test("generate iterator with custom start index (without spill)") { + val spillThreshold = 10 + withExternalArray(spillThreshold) { array => + val expectedValues = populateRows(array, spillThreshold) + val startIndex = spillThreshold / 2 + val iterator = array.generateIterator(startIndex = startIndex) + for (i <- startIndex until expectedValues.length) { + checkIfValueExists(iterator, expectedValues(i)) + } + } + } + + test("generate iterator with custom start index (with spill)") { + val spillThreshold = 10 + withExternalArray(spillThreshold) { array => + val expectedValues = populateRows(array, spillThreshold * 10) + val startIndex = spillThreshold * 2 + val iterator = array.generateIterator(startIndex = startIndex) + for (i <- startIndex until expectedValues.length) { + checkIfValueExists(iterator, expectedValues(i)) + } + } + } + test("test iterator invalidation (without spill)") { withExternalArray(spillThreshold = 10) { array => // insert 2 rows, iterate until the first row - insertRow(array) - insertRow(array) + populateRows(array, 2) var iterator = array.generateIterator() assert(iterator.hasNext) iterator.next() // Adding more row(s) should invalidate any old iterators - insertRow(array) + populateRows(array, 1) intercept[ConcurrentModificationException](iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) @@ -175,18 +255,16 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar test("test iterator invalidation (with spill)") { val spillThreshold = 10 withExternalArray(spillThreshold) { array => - for (_ <- 0 until (spillThreshold * 2)) { - insertRow(array) - } - // Verify that spill has happened - assert(TaskContext.get().taskMetrics().memoryBytesSpilled > 0) + // Populate enough rows so that spill has happens + populateRows(array, spillThreshold * 2) + assertSpill() var iterator = array.generateIterator() assert(iterator.hasNext) iterator.next() // Adding more row(s) should invalidate any old iterators - insertRow(array) + populateRows(array, 1) intercept[ConcurrentModificationException](iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) @@ -222,79 +300,51 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar test("clear array (without spill)") { val spillThreshold = 10 withExternalArray(spillThreshold) { array => - for (_ <- 0 until (spillThreshold / 2)) { - insertRow(array) - } - // Verify that NO spill has happened - assert(TaskContext.get().taskMetrics().memoryBytesSpilled == 0) + // Populate rows ... but not enough to trigger spill + populateRows(array, spillThreshold / 2) + assertNoSpill() // Clear the array array.clear() assert(array.isEmpty) - assert(array.length == 0) // Re-populate few rows so that there is no spill - val expectedValues = new ArrayBuffer[Long] - for (_ <- 0 until (spillThreshold / 3)) { - expectedValues.append(insertRow(array)) - } // Verify the data. Verify that there was no spill - assert(TaskContext.get().taskMetrics().memoryBytesSpilled == 0) - var iterator = array.generateIterator() - for (value <- expectedValues) { - checkIfValueExits(iterator, value) - } + val expectedValues = populateRows(array, spillThreshold / 3) + validateData(array, expectedValues) + assertNoSpill() - // Populate more rows .. enough to not trigger a spill - for (_ <- 0 until (spillThreshold / 3)) { - expectedValues.append(insertRow(array)) - } + // Populate more rows .. enough to not trigger a spill. // Verify the data. Verify that there was no spill - assert(TaskContext.get().taskMetrics().memoryBytesSpilled == 0) - iterator = array.generateIterator() - for (value <- expectedValues) { - checkIfValueExits(iterator, value) - } + populateRows(array, spillThreshold / 3, expectedValues) + validateData(array, expectedValues) + assertNoSpill() } } test("clear array (with spill)") { val spillThreshold = 10 withExternalArray(spillThreshold) { array => - for (_ <- 0 until (spillThreshold * 2)) { - insertRow(array) - } - // Verify that spill has happened - val bytesSpilled = TaskContext.get().taskMetrics().memoryBytesSpilled + // Populate enough rows to trigger spill + populateRows(array, spillThreshold * 2) + val bytesSpilled = getNumBytesSpilled assert(bytesSpilled > 0) // Clear the array array.clear() assert(array.isEmpty) - assert(array.length == 0) - // Re-populate the array ... but not upto the point that there is spill - val expectedValues = new ArrayBuffer[Long] - for (_ <- 0 until (spillThreshold / 2)) { - expectedValues.append(insertRow(array)) - } - // Verify the data. Verify that there was no "extra" spill - assert(TaskContext.get().taskMetrics().memoryBytesSpilled == bytesSpilled) - var iterator = array.generateIterator() - for (value <- expectedValues) { - checkIfValueExits(iterator, value) - } + // Re-populate the array ... but NOT upto the point that there is spill. + // Verify data. Verify that there was NO "extra" spill + val expectedValues = populateRows(array, spillThreshold / 2) + validateData(array, expectedValues) + assert(getNumBytesSpilled == bytesSpilled) // Populate more rows to trigger spill - for (_ <- 0 until (spillThreshold * 2)) { - expectedValues.append(insertRow(array)) - } // Verify the data. Verify that there was "extra" spill - assert(TaskContext.get().taskMetrics().memoryBytesSpilled > bytesSpilled) - iterator = array.generateIterator() - for (value <- expectedValues) { - checkIfValueExits(iterator, value) - } + populateRows(array, spillThreshold * 2, expectedValues) + validateData(array, expectedValues) + assert(getNumBytesSpilled > bytesSpilled) } } } From a82c4538ebd0fda3ff6a446ae10c3e0ea029e669 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 14 Feb 2017 22:37:42 -0800 Subject: [PATCH 3/9] fixed benchmark --- .../ExternalAppendOnlyUnsafeRowArray.scala | 2 +- ...nalAppendOnlyUnsafeRowArrayBenchmark.scala | 59 +++++++++++-------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index 9ad16cf5bd21c..f75cc23a5858f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -214,5 +214,5 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) } private[sql] object ExternalAppendOnlyUnsafeRowArray { - private val DefaultInitialSizeOfInMemoryBuffer = 128 + val DefaultInitialSizeOfInMemoryBuffer = 128 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 26f54615d6d0e..59b21eb769c91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -37,35 +37,42 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows) - benchmark.addCase("ArrayBuffer version") { _: Int => + // Internally, `ExternalAppendOnlyUnsafeRowArray` will create an + // in-memory buffer of size `numSpillThreshold`. This will mimic that + val initialSize = + Math.min( + ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer, + numSpillThreshold) + + benchmark.addCase("ArrayBuffer") { _: Int => var sum = 0L for (_ <- 0L until iterations) { - val array = new ArrayBuffer[UnsafeRow]() - rows.foreach(array.append(_)) + val array = new ArrayBuffer[UnsafeRow](initialSize) + + // Internally, `ExternalAppendOnlyUnsafeRowArray` will create a + // copy of the row. This will mimic that + rows.foreach(x => array += x.copy()) var i = 0 val n = array.length while (i < n) { - if (i < n) { - sum = sum + array(i).getLong(0) - i += 1 - } + sum = sum + array(i).getLong(0) + i += 1 } array.clear() } } - benchmark.addCase("ExternalAppendOnlyUnsafeRowArray version") { _: Int => + benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int => var sum = 0L for (_ <- 0L until iterations) { val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold) - rows.foreach(array.add) + rows.foreach(x => array.add(x)) val iterator = array.generateIterator() while (iterator.hasNext) { sum = sum + iterator.next().getLong(0) } - array.clear() } } @@ -85,9 +92,9 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { def main(args: Array[String]): Unit = { - // ----------------------------------------------------------------------------------------- // + // ========================================================================================= // // WITHOUT SPILL - // ----------------------------------------------------------------------------------------- // + // ========================================================================================= // var spillThreshold = 100 * 1000 @@ -96,34 +103,34 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - ArrayBuffer version 1969 / 2004 33.3 30.0 1.0X - ExternalAppendOnlyUnsafeRowArray version 2184 / 2233 30.0 33.3 0.9X + ArrayBuffer 7821 / 7941 33.5 29.8 1.0X + ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X */ - test(spillThreshold, 1000, 1 << 16) + test(spillThreshold, 1000, 1 << 18) /* Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - ArrayBuffer version 2442 / 2465 50.3 19.9 1.0X - ExternalAppendOnlyUnsafeRowArray version 5015 / 5039 24.5 40.8 0.5X + ArrayBuffer 19200 / 19206 25.6 39.1 1.0X + ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X */ - test(spillThreshold, 30 * 1000, 1 << 12) + test(spillThreshold, 30 * 1000, 1 << 14) /* Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - ArrayBuffer version 4492 / 4575 22.8 43.9 1.0X - ExternalAppendOnlyUnsafeRowArray version 6654 / 6656 15.4 65.0 0.7X + ArrayBuffer 5949 / 6028 17.2 58.1 1.0X + ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X */ test(spillThreshold, 100 * 1000, 1 << 10) - // ----------------------------------------------------------------------------------------- // + // ========================================================================================= // // WITH SPILL - // ----------------------------------------------------------------------------------------- // + // ========================================================================================= // spillThreshold = 100 @@ -132,8 +139,8 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - ArrayBuffer version 4 / 5 61.2 16.3 1.0X - ExternalAppendOnlyUnsafeRowArray version 2331 / 2356 0.1 9106.1 0.0X + ArrayBuffer 8 / 9 31.8 31.4 1.0X + ExternalAppendOnlyUnsafeRowArray 2218 / 2247 0.1 8663.2 0.0X */ test(spillThreshold, 1000, 1 << 8) @@ -142,8 +149,8 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { Array with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - ArrayBuffer version 3 / 4 57.6 17.3 1.0X - ExternalAppendOnlyUnsafeRowArray version 2229 / 2310 0.1 13928.8 0.0X + ArrayBuffer 5 / 6 30.5 32.8 1.0X + ExternalAppendOnlyUnsafeRowArray 2498 / 2585 0.1 15615.3 0.0X */ test(spillThreshold, 10 * 1000, 1 << 4) } From 6f1b5472678d3e7a5c001c990c65b8f7cc94c7b7 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Wed, 15 Feb 2017 14:32:07 -0800 Subject: [PATCH 4/9] Support CartesianProductExec and added micro benchmark comparing `ExternalAppendOnlyUnsafeRowArray` against using raw `UnsafeExternalSorter` --- .../apache/spark/sql/internal/SQLConf.scala | 10 ++ .../ExternalAppendOnlyUnsafeRowArray.scala | 10 +- .../joins/CartesianProductExec.scala | 51 +++------ ...nalAppendOnlyUnsafeRowArrayBenchmark.scala | 106 +++++++++++++++--- 4 files changed, 122 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a46a98b50b960..220a8d3adfe4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the configuration options for Spark SQL. @@ -727,6 +728,12 @@ object SQLConf { .intConf .createWithDefault(Int.MaxValue) + val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD = + buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold") + .doc("Threshold for number of rows buffered in cartesian product operator") + .intConf + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -962,6 +969,9 @@ class SQLConf extends Serializable with Logging { def sortMergeJoinExecBufferSpillThreshold: Int = getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD) + def cartesianProductExecBufferSpillThreshold: Int = + getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD) + def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index f75cc23a5858f..69a7bcceaac9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -43,7 +43,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) private val initialSizeOfInMemoryBuffer = Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold) - private val inMemoryBuffer = new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer) + private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) { + new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer) + } else { + null + } private var spillableArray: UnsafeExternalSorter = _ private var numRows = 0 @@ -67,7 +71,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) // inside `UnsafeExternalSorter` spillableArray.cleanupResources() spillableArray = null - } else { + } else if (inMemoryBuffer != null) { inMemoryBuffer.clear() } numFieldsPerRow = 0 @@ -91,7 +95,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) TaskContext.get(), null, null, - if (numRowsSpillThreshold > 2) numRowsSpillThreshold / 2 else 1, + 1024, SparkEnv.get.memoryManager.pageSizeBytes, numRowsSpillThreshold, false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 8341fe2ffd078..3b0b80139c896 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -19,65 +19,40 @@ package org.apache.spark.sql.execution.joins import org.apache.spark._ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD} -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner -import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.{BinaryExecNode, ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.CompletionIterator -import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD, * will be much faster than building the right partition for every row in left RDD, it also * materialize the right RDD (in case of the right RDD is nondeterministic). */ -class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int) +class UnsafeCartesianRDD( + left : RDD[UnsafeRow], + right : RDD[UnsafeRow], + numFieldsOfRight: Int, + spillThreshold: Int) extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) { override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = { // We will not sort the rows, so prefixComparator and recordComparator are null. - val sorter = UnsafeExternalSorter.create( - context.taskMemoryManager(), - SparkEnv.get.blockManager, - SparkEnv.get.serializerManager, - context, - null, - null, - 1024, - SparkEnv.get.memoryManager.pageSizeBytes, - SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", - UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), - false) + val sorter = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) val partition = split.asInstanceOf[CartesianPartition] - for (y <- rdd2.iterator(partition.s2, context)) { - sorter.insertRecord(y.getBaseObject, y.getBaseOffset, y.getSizeInBytes, 0, false) - } + rdd2.iterator(partition.s2, context).foreach(sorter.add) - // Create an iterator from sorter and wrapper it as Iterator[UnsafeRow] - def createIter(): Iterator[UnsafeRow] = { - val iter = sorter.getIterator - val unsafeRow = new UnsafeRow(numFieldsOfRight) - new Iterator[UnsafeRow] { - override def hasNext: Boolean = { - iter.hasNext - } - override def next(): UnsafeRow = { - iter.loadNext() - unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength) - unsafeRow - } - } - } + // Create an iterator from sorter + def createIter(): Iterator[UnsafeRow] = sorter.generateIterator() val resultIter = for (x <- rdd1.iterator(partition.s1, context); y <- createIter()) yield (x, y) CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]]( - resultIter, sorter.cleanupResources()) + resultIter, sorter.clear()) } } @@ -97,7 +72,9 @@ case class CartesianProductExec( val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]] val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]] - val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size) + val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold + + val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold) pair.mapPartitionsWithIndexInternal { (index, iter) => val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema) val filtered = if (condition.isDefined) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 59b21eb769c91..00c5f2550cbb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -23,10 +23,11 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext} import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.util.Benchmark +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter object ExternalAppendOnlyUnsafeRowArrayBenchmark { - def test(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { + def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { val random = new java.util.Random() val rows = (1 to numRows).map(_ => { val row = new UnsafeRow(1) @@ -90,13 +91,89 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { sc.stop() } + def testAgainstRawUnsafeExternalSorter( + numSpillThreshold: Int, + numRows: Int, + iterations: Int): Unit = { + + val random = new java.util.Random() + val rows = (1 to numRows).map(_ => { + val row = new UnsafeRow(1) + row.pointTo(new Array[Byte](64), 16) + row.setLong(0, random.nextLong()) + row + }) + + val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows) + + benchmark.addCase("UnsafeExternalSorter") { _: Int => + var sum = 0L + for (_ <- 0L until iterations) { + val array = UnsafeExternalSorter.create( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + null, + null, + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + numSpillThreshold, + false) + + rows.foreach(x => + array.insertRecord( + x.getBaseObject, + x.getBaseOffset, + x.getSizeInBytes, + 0, + false)) + + val unsafeRow = new UnsafeRow(1) + val iter = array.getIterator + while (iter.hasNext) { + iter.loadNext() + unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength) + sum = sum + unsafeRow.getLong(0) + } + array.cleanupResources() + } + } + + benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int => + var sum = 0L + for (_ <- 0L until iterations) { + val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold) + rows.foreach(x => array.add(x)) + + val iterator = array.generateIterator() + while (iterator.hasNext) { + sum = sum + iterator.next().getLong(0) + } + array.clear() + } + } + + val conf = new SparkConf(false) + // Make the Java serializer write a reset instruction (TC_RESET) after each object to test + // for a bug we had with bytes written past the last object in a batch (SPARK-2792) + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + + val sc = new SparkContext("local", "test", conf) + val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) + TaskContext.setTaskContext(taskContext) + benchmark.run() + sc.stop() + } + def main(args: Array[String]): Unit = { // ========================================================================================= // // WITHOUT SPILL // ========================================================================================= // - var spillThreshold = 100 * 1000 + val spillThreshold = 100 * 1000 /* Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz @@ -106,7 +183,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { ArrayBuffer 7821 / 7941 33.5 29.8 1.0X ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X */ - test(spillThreshold, 1000, 1 << 18) + testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18) /* Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz @@ -116,7 +193,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { ArrayBuffer 19200 / 19206 25.6 39.1 1.0X ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X */ - test(spillThreshold, 30 * 1000, 1 << 14) + testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14) /* Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz @@ -126,32 +203,31 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { ArrayBuffer 5949 / 6028 17.2 58.1 1.0X ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X */ - test(spillThreshold, 100 * 1000, 1 << 10) + testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10) // ========================================================================================= // // WITH SPILL // ========================================================================================= // - spillThreshold = 100 - /* Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - ArrayBuffer 8 / 9 31.8 31.4 1.0X - ExternalAppendOnlyUnsafeRowArray 2218 / 2247 0.1 8663.2 0.0X + UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X + ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X */ - test(spillThreshold, 1000, 1 << 8) + testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) /* Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - Array with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - ArrayBuffer 5 / 6 30.5 32.8 1.0X - ExternalAppendOnlyUnsafeRowArray 2498 / 2585 0.1 15615.3 0.0X + UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X + ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X */ - test(spillThreshold, 10 * 1000, 1 << 4) + testAgainstRawUnsafeExternalSorter( + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt, 10 * 1000, 1 << 4) } } From d9e7d43e0f4aeea8f4f7a41d02f18bbfdc1f796f Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Sun, 19 Feb 2017 14:28:24 -0800 Subject: [PATCH 5/9] attempt to fix unit test --- ...xternalAppendOnlyUnsafeRowArraySuite.scala | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index 4e38349f31820..8f0fcf20512ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -28,25 +28,20 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext { private val random = new java.util.Random() - private def createSparkConf(): SparkConf = { - val conf = new SparkConf(false) - // Make the Java serializer write a reset instruction (TC_RESET) after each object to test - // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - conf.set("spark.serializer.objectStreamReset", "1") - conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") - conf + override def beforeAll(): Unit = { + super.beforeAll() + sc = new SparkContext("local", "test", new SparkConf()) + val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) + TaskContext.setTaskContext(taskContext) } + override def afterAll(): Unit = TaskContext.unset() + private def withExternalArray(spillThreshold: Int) (f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = { - sc = new SparkContext("local", "test", createSparkConf()) - val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) - TaskContext.setTaskContext(taskContext) val array = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) - try f(array) finally { array.clear() - sc.stop() } } From fc95f329e404d81f9321d259a6e3e1fa4f847de0 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Mon, 20 Feb 2017 16:23:46 -0800 Subject: [PATCH 6/9] attempt #2 --- .../ExternalAppendOnlyUnsafeRowArray.scala | 57 +++++++++++++------ ...xternalAppendOnlyUnsafeRowArraySuite.scala | 15 ++++- 2 files changed, 53 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index 69a7bcceaac9f..c9a3f62ab9a4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -23,8 +23,11 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.serializer.SerializerManager import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer +import org.apache.spark.storage.BlockManager import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} /** @@ -39,7 +42,26 @@ import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, Unsaf * This may lead to a performance regression compared to the normal case of using an * [[ArrayBuffer]] or [[Array]]. */ -private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { +private[sql] class ExternalAppendOnlyUnsafeRowArray( + taskMemoryManager: TaskMemoryManager, + blockManager: BlockManager, + serializerManager: SerializerManager, + taskContext: TaskContext, + initialSize: Int, + pageSizeBytes: Long, + numRowsSpillThreshold: Int) extends Logging { + + def this(numRowsSpillThreshold: Int) { + this( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + numRowsSpillThreshold) + } + private val initialSizeOfInMemoryBuffer = Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold) @@ -89,26 +111,29 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) // We will not sort the rows, so prefixComparator and recordComparator are null spillableArray = UnsafeExternalSorter.create( - TaskContext.get().taskMemoryManager(), - SparkEnv.get.blockManager, - SparkEnv.get.serializerManager, - TaskContext.get(), + taskMemoryManager, + blockManager, + serializerManager, + taskContext, null, null, - 1024, - SparkEnv.get.memoryManager.pageSizeBytes, + initialSize, + pageSizeBytes, numRowsSpillThreshold, false) - inMemoryBuffer.foreach(existingUnsafeRow => - spillableArray.insertRecord( - existingUnsafeRow.getBaseObject, - existingUnsafeRow.getBaseOffset, - existingUnsafeRow.getSizeInBytes, - 0, - false) - ) - inMemoryBuffer.clear() + // populate with existing in-memory buffered rows + if (inMemoryBuffer != null) { + inMemoryBuffer.foreach(existingUnsafeRow => + spillableArray.insertRecord( + existingUnsafeRow.getBaseObject, + existingUnsafeRow.getBaseOffset, + existingUnsafeRow.getSizeInBytes, + 0, + false) + ) + inMemoryBuffer.clear() + } numFieldsPerRow = unsafeRow.numFields() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index 8f0fcf20512ff..28688b05522b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -27,11 +27,13 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext { private val random = new java.util.Random() + private var taskContext: TaskContext = _ override def beforeAll(): Unit = { super.beforeAll() - sc = new SparkContext("local", "test", new SparkConf()) - val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) + sc = new SparkContext("local", "test", new SparkConf(false)) + + taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) TaskContext.setTaskContext(taskContext) } @@ -39,7 +41,14 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar private def withExternalArray(spillThreshold: Int) (f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = { - val array = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) + val array = new ExternalAppendOnlyUnsafeRowArray( + taskContext.taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + taskContext, + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + spillThreshold) try f(array) finally { array.clear() } From 6e7c219c4858d5c3c4ec74926a2fa62c6d289235 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Wed, 1 Mar 2017 18:06:30 +0000 Subject: [PATCH 7/9] attempt to fix unit test --- .../ExternalAppendOnlyUnsafeRowArraySuite.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index 28688b05522b5..79febccf3658f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -29,18 +29,15 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar private val random = new java.util.Random() private var taskContext: TaskContext = _ - override def beforeAll(): Unit = { - super.beforeAll() + override def afterAll(): Unit = TaskContext.unset() + + private def withExternalArray(spillThreshold: Int) + (f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = { sc = new SparkContext("local", "test", new SparkConf(false)) taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) TaskContext.setTaskContext(taskContext) - } - override def afterAll(): Unit = TaskContext.unset() - - private def withExternalArray(spillThreshold: Int) - (f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = { val array = new ExternalAppendOnlyUnsafeRowArray( taskContext.taskMemoryManager(), SparkEnv.get.blockManager, From e714a08e18c82ae5ccaa4a94075b05e3b6d8ed33 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 2 Mar 2017 00:58:33 +0000 Subject: [PATCH 8/9] fix failing test --- .../ExternalAppendOnlyUnsafeRowArray.scala | 18 +++++++----------- .../execution/joins/CartesianProductExec.scala | 11 +++++------ ...ExternalAppendOnlyUnsafeRowArraySuite.scala | 14 +++++++------- 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index c9a3f62ab9a4d..458ac4ba3637c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -176,7 +176,9 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( abstract class ExternalAppendOnlyUnsafeRowArrayIterator extends Iterator[UnsafeRow] { private val expectedModificationsCount = modificationsCount - protected def checkForModification(): Unit = { + protected def isModified(): Boolean = expectedModificationsCount != modificationsCount + + protected def throwExceptionIfModified(): Unit = { if (expectedModificationsCount != modificationsCount) { throw new ConcurrentModificationException( s"The backing ${classOf[ExternalAppendOnlyUnsafeRowArray].getName} has been modified " + @@ -190,13 +192,10 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( private var currentIndex = startIndex - override def hasNext(): Boolean = { - checkForModification() - currentIndex < numRows - } + override def hasNext(): Boolean = !isModified() && currentIndex < numRows override def next(): UnsafeRow = { - checkForModification() + throwExceptionIfModified() val result = inMemoryBuffer(currentIndex) currentIndex += 1 result @@ -228,13 +227,10 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( // Traverse upto the given [[startIndex]] init() - override def hasNext(): Boolean = { - checkForModification() - iterator.hasNext - } + override def hasNext(): Boolean = !isModified() && iterator.hasNext override def next(): UnsafeRow = { - checkForModification() + throwExceptionIfModified() iterator.loadNext() currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength) currentRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 3b0b80139c896..f380986951317 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -39,20 +39,19 @@ class UnsafeCartesianRDD( extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) { override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = { - // We will not sort the rows, so prefixComparator and recordComparator are null. - val sorter = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) + val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) val partition = split.asInstanceOf[CartesianPartition] - rdd2.iterator(partition.s2, context).foreach(sorter.add) + rdd2.iterator(partition.s2, context).foreach(rowArray.add) - // Create an iterator from sorter - def createIter(): Iterator[UnsafeRow] = sorter.generateIterator() + // Create an iterator from rowArray + def createIter(): Iterator[UnsafeRow] = rowArray.generateIterator() val resultIter = for (x <- rdd1.iterator(partition.s1, context); y <- createIter()) yield (x, y) CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]]( - resultIter, sorter.clear()) + resultIter, rowArray.clear()) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index 79febccf3658f..53c41639942b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -128,7 +128,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar val iterator2 = validateData(array, expectedValues) - intercept[ConcurrentModificationException](iterator1.hasNext) + assert(!iterator1.hasNext) assert(!iterator2.hasNext) } } @@ -152,7 +152,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar val iterator2 = validateData(array, expectedValues) assert(!iterator2.hasNext) - intercept[ConcurrentModificationException](iterator1.hasNext) + assert(!iterator1.hasNext) intercept[ConcurrentModificationException](iterator1.next()) } } @@ -239,7 +239,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar // Adding more row(s) should invalidate any old iterators populateRows(array, 1) - intercept[ConcurrentModificationException](iterator.hasNext) + assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) // Clearing the array should also invalidate any old iterators @@ -248,7 +248,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar iterator.next() array.clear() - intercept[ConcurrentModificationException](iterator.hasNext) + assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) } } @@ -266,7 +266,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar // Adding more row(s) should invalidate any old iterators populateRows(array, 1) - intercept[ConcurrentModificationException](iterator.hasNext) + assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) // Clearing the array should also invalidate any old iterators @@ -275,7 +275,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar iterator.next() array.clear() - intercept[ConcurrentModificationException](iterator.hasNext) + assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) } } @@ -293,7 +293,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar assert(array.length == 0) // Clearing an empty array should also invalidate any old iterators - intercept[ConcurrentModificationException](iterator.hasNext) + assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) } } From 23acc3ff82bc885b295d069bd865129427d3c59e Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Wed, 8 Mar 2017 22:23:55 -0800 Subject: [PATCH 9/9] review comments #2 --- .../apache/spark/sql/internal/SQLConf.scala | 3 ++ .../window/WindowFunctionFrame.scala | 30 ++++++++++--------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 220a8d3adfe4d..a85f87aece45b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -718,18 +718,21 @@ object SQLConf { val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") + .internal() .doc("Threshold for number of rows buffered in window operator") .intConf .createWithDefault(4096) val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold") + .internal() .doc("Threshold for number of rows buffered in sort merge join operator") .intConf .createWithDefault(Int.MaxValue) val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold") + .internal() .doc("Threshold for number of rows buffered in cartesian product operator") .intConf .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala index 412d337a62308..af2b4fb92062b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala @@ -44,6 +44,12 @@ private[window] abstract class WindowFunctionFrame { def write(index: Int, current: InternalRow): Unit } +object WindowFunctionFrame { + def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = { + if (iterator.hasNext) iterator.next() else null + } +} + /** * The offset window frame calculates frames containing LEAD/LAG statements. * @@ -123,7 +129,7 @@ private[window] final class OffsetWindowFunctionFrame( override def write(index: Int, current: InternalRow): Unit = { if (inputIndex >= 0 && inputIndex < input.length) { - val r = if (inputIterator.hasNext) inputIterator.next() else null + val r = WindowFunctionFrame.getNextOrNull(inputIterator) projection(r) } else { // Use default values since the offset row does not exist. @@ -179,9 +185,7 @@ private[window] final class SlidingWindowFunctionFrame( override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { input = rows inputIterator = input.generateIterator() - if (inputIterator.hasNext) { - nextRow = inputIterator.next() - } + nextRow = WindowFunctionFrame.getNextOrNull(inputIterator) inputHighIndex = 0 inputLowIndex = 0 buffer.clear() @@ -195,7 +199,7 @@ private[window] final class SlidingWindowFunctionFrame( // the output row upper bound. while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) { buffer.add(nextRow.copy()) - nextRow = if (inputIterator.hasNext) inputIterator.next() else null + nextRow = WindowFunctionFrame.getNextOrNull(inputIterator) inputHighIndex += 1 bufferUpdated = true } @@ -311,7 +315,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame( // the output row upper bound. while (nextRow != null && ubound.compare(nextRow, inputIndex, current, index) <= 0) { processor.update(nextRow) - nextRow = if (inputIterator.hasNext) inputIterator.next() else null + nextRow = WindowFunctionFrame.getNextOrNull(inputIterator) inputIndex += 1 bufferUpdated = true } @@ -368,23 +372,21 @@ private[window] final class UnboundedFollowingWindowFunctionFrame( // the output row lower bound. val iterator = input.generateIterator(startIndex = inputIndex) - def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = { - if (iterator.hasNext) iterator.next() else null - } - - var nextRow = getNextOrNull(iterator) + var nextRow = WindowFunctionFrame.getNextOrNull(iterator) while (nextRow != null && lbound.compare(nextRow, inputIndex, current, index) < 0) { inputIndex += 1 bufferUpdated = true - nextRow = getNextOrNull(iterator) + nextRow = WindowFunctionFrame.getNextOrNull(iterator) } // Only recalculate and update when the buffer changes. if (bufferUpdated) { processor.initialize(input.length) - while (nextRow != null) { + if (nextRow != null) { processor.update(nextRow) - nextRow = getNextOrNull(iterator) + } + while (iterator.hasNext) { + processor.update(iterator.next()) } processor.evaluate(target) }