diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ecb941c5fa9e6..733d80e9d46cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -844,24 +844,47 @@ object SQLConf { .stringConf .createWithDefaultFunction(() => TimeZone.getDefault.getID) + val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD = + buildConf("spark.sql.windowExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the window operator") + .intConf + .createWithDefault(4096) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in window operator") + .doc("Threshold for number of rows to be spilled by window operator") .intConf - .createWithDefault(4096) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = + buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + + "join operator") + .intConf + .createWithDefault(Int.MaxValue) val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in sort merge join operator") + .doc("Threshold for number of rows to be spilled by sort merge join operator") .intConf - .createWithDefault(Int.MaxValue) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD = + buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the cartesian " + + "product operator") + .intConf + .createWithDefault(4096) val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in cartesian product operator") + .doc("Threshold for number of rows to be spilled by cartesian product operator") .intConf .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) @@ -1137,11 +1160,19 @@ class SQLConf extends Serializable with Logging { def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER) + def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD) + def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD) + def sortMergeJoinExecBufferInMemoryThreshold: Int = + getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD) + def sortMergeJoinExecBufferSpillThreshold: Int = getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD) + def cartesianProductExecBufferInMemoryThreshold: Int = + getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD) + def cartesianProductExecBufferSpillThreshold: Int = getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index c4d383421f976..ac282ea2e94f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -31,16 +31,16 @@ import org.apache.spark.storage.BlockManager import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} /** - * An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined - * threshold of rows is reached. + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is + * excessive memory consumption). Setting these threshold involves following trade-offs: * - * Setting spill threshold faces following trade-off: - * - * - If the spill threshold is too high, the in-memory array may occupy more memory than is - * available, resulting in OOM. - * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. - * This may lead to a performance regression compared to the normal case of using an - * [[ArrayBuffer]] or [[Array]]. + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory + * than is available, resulting in OOM. + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to + * excessive disk writes. This may lead to a performance regression compared to the normal case + * of using an [[ArrayBuffer]] or [[Array]]. */ private[sql] class ExternalAppendOnlyUnsafeRowArray( taskMemoryManager: TaskMemoryManager, @@ -49,9 +49,10 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( taskContext: TaskContext, initialSize: Int, pageSizeBytes: Long, + numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) extends Logging { - def this(numRowsSpillThreshold: Int) { + def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) { this( TaskContext.get().taskMemoryManager(), SparkEnv.get.blockManager, @@ -59,11 +60,12 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( TaskContext.get(), 1024, SparkEnv.get.memoryManager.pageSizeBytes, + numRowsInMemoryBufferThreshold, numRowsSpillThreshold) } private val initialSizeOfInMemoryBuffer = - Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold) + Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold) private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) { new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer) @@ -102,11 +104,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( } def add(unsafeRow: UnsafeRow): Unit = { - if (numRows < numRowsSpillThreshold) { + if (numRows < numRowsInMemoryBufferThreshold) { inMemoryBuffer += unsafeRow.copy() } else { if (spillableArray == null) { - logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " + + logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold rows, switching to " + s"${classOf[UnsafeExternalSorter].getName}") // We will not sort the rows, so prefixComparator and recordComparator are null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index f380986951317..4d261dd422bc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -35,11 +35,12 @@ class UnsafeCartesianRDD( left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int, + inMemoryBufferThreshold: Int, spillThreshold: Int) extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) { override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = { - val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) + val rowArray = new ExternalAppendOnlyUnsafeRowArray(inMemoryBufferThreshold, spillThreshold) val partition = split.asInstanceOf[CartesianPartition] rdd2.iterator(partition.s2, context).foreach(rowArray.add) @@ -71,9 +72,12 @@ case class CartesianProductExec( val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]] val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]] - val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold - - val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold) + val pair = new UnsafeCartesianRDD( + leftResults, + rightResults, + right.output.size, + sqlContext.conf.cartesianProductExecBufferInMemoryThreshold, + sqlContext.conf.cartesianProductExecBufferSpillThreshold) pair.mapPartitionsWithIndexInternal { (index, iter) => val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema) val filtered = if (condition.isDefined) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index f41fa14213df5..91d214e1978e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -130,9 +130,14 @@ case class SortMergeJoinExec( sqlContext.conf.sortMergeJoinExecBufferSpillThreshold } + private def getInMemoryThreshold: Int = { + sqlContext.conf.sortMergeJoinExecBufferInMemoryThreshold + } + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val spillThreshold = getSpillThreshold + val inMemoryThreshold = getInMemoryThreshold left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => val boundCondition: (InternalRow) => Boolean = { condition.map { cond => @@ -158,6 +163,7 @@ case class SortMergeJoinExec( keyOrdering, RowIterator.fromScala(leftIter), RowIterator.fromScala(rightIter), + inMemoryThreshold, spillThreshold ) private[this] val joinRow = new JoinedRow @@ -201,6 +207,7 @@ case class SortMergeJoinExec( keyOrdering, streamedIter = RowIterator.fromScala(leftIter), bufferedIter = RowIterator.fromScala(rightIter), + inMemoryThreshold, spillThreshold ) val rightNullRow = new GenericInternalRow(right.output.length) @@ -214,6 +221,7 @@ case class SortMergeJoinExec( keyOrdering, streamedIter = RowIterator.fromScala(rightIter), bufferedIter = RowIterator.fromScala(leftIter), + inMemoryThreshold, spillThreshold ) val leftNullRow = new GenericInternalRow(left.output.length) @@ -247,6 +255,7 @@ case class SortMergeJoinExec( keyOrdering, RowIterator.fromScala(leftIter), RowIterator.fromScala(rightIter), + inMemoryThreshold, spillThreshold ) private[this] val joinRow = new JoinedRow @@ -281,6 +290,7 @@ case class SortMergeJoinExec( keyOrdering, RowIterator.fromScala(leftIter), RowIterator.fromScala(rightIter), + inMemoryThreshold, spillThreshold ) private[this] val joinRow = new JoinedRow @@ -322,6 +332,7 @@ case class SortMergeJoinExec( keyOrdering, RowIterator.fromScala(leftIter), RowIterator.fromScala(rightIter), + inMemoryThreshold, spillThreshold ) private[this] val joinRow = new JoinedRow @@ -420,8 +431,10 @@ case class SortMergeJoinExec( val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName val spillThreshold = getSpillThreshold + val inMemoryThreshold = getInMemoryThreshold - ctx.addMutableState(clsName, matches, s"$matches = new $clsName($spillThreshold);") + ctx.addMutableState(clsName, matches, + s"$matches = new $clsName($inMemoryThreshold, $spillThreshold);") // Copy the left keys as class members so they could be used in next function call. val matchedKeyVars = copyKeys(ctx, leftKeyVars) @@ -626,6 +639,9 @@ case class SortMergeJoinExec( * @param streamedIter an input whose rows will be streamed. * @param bufferedIter an input whose rows will be buffered to construct sequences of rows that * have the same join key. + * @param inMemoryThreshold Threshold for number of rows guaranteed to be held in memory by + * internal buffer + * @param spillThreshold Threshold for number of rows to be spilled by internal buffer */ private[joins] class SortMergeJoinScanner( streamedKeyGenerator: Projection, @@ -633,7 +649,8 @@ private[joins] class SortMergeJoinScanner( keyOrdering: Ordering[InternalRow], streamedIter: RowIterator, bufferedIter: RowIterator, - bufferThreshold: Int) { + inMemoryThreshold: Int, + spillThreshold: Int) { private[this] var streamedRow: InternalRow = _ private[this] var streamedRowKey: InternalRow = _ private[this] var bufferedRow: InternalRow = _ @@ -644,7 +661,8 @@ private[joins] class SortMergeJoinScanner( */ private[this] var matchJoinKey: InternalRow = _ /** Buffered rows from the buffered side of the join. This is empty if there are no matches. */ - private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(bufferThreshold) + private[this] val bufferedMatches = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) // Initialization (note: do _not_ want to advance streamed here). advancedBufferedToRowWithNullFreeJoinKey() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index f8bb667e30064..800a2ea3f3996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -292,6 +292,7 @@ case class WindowExec( // Unwrap the expressions and factories from the map. val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1) val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray + val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold // Start processing. @@ -322,7 +323,8 @@ case class WindowExec( val inputFields = child.output.length val buffer: ExternalAppendOnlyUnsafeRowArray = - new ExternalAppendOnlyUnsafeRowArray(spillThreshold) + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) + var bufferIterator: Iterator[UnsafeRow] = _ val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 895ca196a7a51..0008d503a2cbe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -665,7 +665,8 @@ class JoinSuite extends QueryTest with SharedSQLContext { test("test SortMergeJoin (with spill)") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1", - "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "0") { + "spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0", + "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") { assertSpilled(sparkContext, "inner join") { checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 031ac38c17d7b..efe28afab08e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -67,7 +67,10 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int => var sum = 0L for (_ <- 0L until iterations) { - val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold) + val array = new ExternalAppendOnlyUnsafeRowArray( + ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer, + numSpillThreshold) + rows.foreach(x => array.add(x)) val iterator = array.generateIterator() @@ -143,7 +146,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int => var sum = 0L for (_ <- 0L until iterations) { - val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold) + val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold) rows.foreach(x => array.add(x)) val iterator = array.generateIterator() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index 53c41639942b4..ecc7264d79442 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -31,7 +31,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar override def afterAll(): Unit = TaskContext.unset() - private def withExternalArray(spillThreshold: Int) + private def withExternalArray(inMemoryThreshold: Int, spillThreshold: Int) (f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = { sc = new SparkContext("local", "test", new SparkConf(false)) @@ -45,6 +45,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar taskContext, 1024, SparkEnv.get.memoryManager.pageSizeBytes, + inMemoryThreshold, spillThreshold) try f(array) finally { array.clear() @@ -109,9 +110,9 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar assert(getNumBytesSpilled > 0) } - test("insert rows less than the spillThreshold") { - val spillThreshold = 100 - withExternalArray(spillThreshold) { array => + test("insert rows less than the inMemoryThreshold") { + val (inMemoryThreshold, spillThreshold) = (100, 50) + withExternalArray(inMemoryThreshold, spillThreshold) { array => assert(array.isEmpty) val expectedValues = populateRows(array, 1) @@ -122,8 +123,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar // Add more rows (but not too many to trigger switch to [[UnsafeExternalSorter]]) // Verify that NO spill has happened - populateRows(array, spillThreshold - 1, expectedValues) - assert(array.length == spillThreshold) + populateRows(array, inMemoryThreshold - 1, expectedValues) + assert(array.length == inMemoryThreshold) assertNoSpill() val iterator2 = validateData(array, expectedValues) @@ -133,20 +134,42 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } } - test("insert rows more than the spillThreshold to force spill") { - val spillThreshold = 100 - withExternalArray(spillThreshold) { array => - val numValuesInserted = 20 * spillThreshold - + test("insert rows more than the inMemoryThreshold but less than spillThreshold") { + val (inMemoryThreshold, spillThreshold) = (10, 50) + withExternalArray(inMemoryThreshold, spillThreshold) { array => assert(array.isEmpty) - val expectedValues = populateRows(array, 1) - assert(array.length == 1) + val expectedValues = populateRows(array, inMemoryThreshold - 1) + assert(array.length == (inMemoryThreshold - 1)) + val iterator1 = validateData(array, expectedValues) + assertNoSpill() + + // Add more rows to trigger switch to [[UnsafeExternalSorter]] but not too many to cause a + // spill to happen. Verify that NO spill has happened + populateRows(array, spillThreshold - expectedValues.length - 1, expectedValues) + assert(array.length == spillThreshold - 1) + assertNoSpill() + + val iterator2 = validateData(array, expectedValues) + assert(!iterator2.hasNext) + assert(!iterator1.hasNext) + intercept[ConcurrentModificationException](iterator1.next()) + } + } + + test("insert rows enough to force spill") { + val (inMemoryThreshold, spillThreshold) = (20, 10) + withExternalArray(inMemoryThreshold, spillThreshold) { array => + assert(array.isEmpty) + val expectedValues = populateRows(array, inMemoryThreshold - 1) + assert(array.length == (inMemoryThreshold - 1)) val iterator1 = validateData(array, expectedValues) + assertNoSpill() - // Populate more rows to trigger spill. Verify that spill has happened - populateRows(array, numValuesInserted - 1, expectedValues) - assert(array.length == numValuesInserted) + // Add more rows to trigger switch to [[UnsafeExternalSorter]] and cause a spill to happen. + // Verify that spill has happened + populateRows(array, 2, expectedValues) + assert(array.length == inMemoryThreshold + 1) assertSpill() val iterator2 = validateData(array, expectedValues) @@ -158,7 +181,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("iterator on an empty array should be empty") { - withExternalArray(spillThreshold = 10) { array => + withExternalArray(inMemoryThreshold = 4, spillThreshold = 10) { array => val iterator = array.generateIterator() assert(array.isEmpty) assert(array.length == 0) @@ -167,7 +190,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("generate iterator with negative start index") { - withExternalArray(spillThreshold = 2) { array => + withExternalArray(inMemoryThreshold = 100, spillThreshold = 56) { array => val exception = intercept[ArrayIndexOutOfBoundsException](array.generateIterator(startIndex = -10)) @@ -178,8 +201,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("generate iterator with start index exceeding array's size (without spill)") { - val spillThreshold = 2 - withExternalArray(spillThreshold) { array => + val (inMemoryThreshold, spillThreshold) = (20, 100) + withExternalArray(inMemoryThreshold, spillThreshold) { array => populateRows(array, spillThreshold / 2) val exception = @@ -191,8 +214,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("generate iterator with start index exceeding array's size (with spill)") { - val spillThreshold = 2 - withExternalArray(spillThreshold) { array => + val (inMemoryThreshold, spillThreshold) = (20, 100) + withExternalArray(inMemoryThreshold, spillThreshold) { array => populateRows(array, spillThreshold * 2) val exception = @@ -205,10 +228,10 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("generate iterator with custom start index (without spill)") { - val spillThreshold = 10 - withExternalArray(spillThreshold) { array => - val expectedValues = populateRows(array, spillThreshold) - val startIndex = spillThreshold / 2 + val (inMemoryThreshold, spillThreshold) = (20, 100) + withExternalArray(inMemoryThreshold, spillThreshold) { array => + val expectedValues = populateRows(array, inMemoryThreshold) + val startIndex = inMemoryThreshold / 2 val iterator = array.generateIterator(startIndex = startIndex) for (i <- startIndex until expectedValues.length) { checkIfValueExists(iterator, expectedValues(i)) @@ -217,8 +240,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("generate iterator with custom start index (with spill)") { - val spillThreshold = 10 - withExternalArray(spillThreshold) { array => + val (inMemoryThreshold, spillThreshold) = (20, 100) + withExternalArray(inMemoryThreshold, spillThreshold) { array => val expectedValues = populateRows(array, spillThreshold * 10) val startIndex = spillThreshold * 2 val iterator = array.generateIterator(startIndex = startIndex) @@ -229,7 +252,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("test iterator invalidation (without spill)") { - withExternalArray(spillThreshold = 10) { array => + withExternalArray(inMemoryThreshold = 10, spillThreshold = 100) { array => // insert 2 rows, iterate until the first row populateRows(array, 2) @@ -254,9 +277,9 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("test iterator invalidation (with spill)") { - val spillThreshold = 10 - withExternalArray(spillThreshold) { array => - // Populate enough rows so that spill has happens + val (inMemoryThreshold, spillThreshold) = (2, 10) + withExternalArray(inMemoryThreshold, spillThreshold) { array => + // Populate enough rows so that spill happens populateRows(array, spillThreshold * 2) assertSpill() @@ -281,7 +304,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("clear on an empty the array") { - withExternalArray(spillThreshold = 2) { array => + withExternalArray(inMemoryThreshold = 2, spillThreshold = 3) { array => val iterator = array.generateIterator() assert(!iterator.hasNext) @@ -299,10 +322,10 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("clear array (without spill)") { - val spillThreshold = 10 - withExternalArray(spillThreshold) { array => + val (inMemoryThreshold, spillThreshold) = (10, 100) + withExternalArray(inMemoryThreshold, spillThreshold) { array => // Populate rows ... but not enough to trigger spill - populateRows(array, spillThreshold / 2) + populateRows(array, inMemoryThreshold / 2) assertNoSpill() // Clear the array @@ -311,21 +334,21 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar // Re-populate few rows so that there is no spill // Verify the data. Verify that there was no spill - val expectedValues = populateRows(array, spillThreshold / 3) + val expectedValues = populateRows(array, inMemoryThreshold / 2) validateData(array, expectedValues) assertNoSpill() // Populate more rows .. enough to not trigger a spill. // Verify the data. Verify that there was no spill - populateRows(array, spillThreshold / 3, expectedValues) + populateRows(array, inMemoryThreshold / 2, expectedValues) validateData(array, expectedValues) assertNoSpill() } } test("clear array (with spill)") { - val spillThreshold = 10 - withExternalArray(spillThreshold) { array => + val (inMemoryThreshold, spillThreshold) = (10, 20) + withExternalArray(inMemoryThreshold, spillThreshold) { array => // Populate enough rows to trigger spill populateRows(array, spillThreshold * 2) val bytesSpilled = getNumBytesSpilled diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index a9f3fb355c775..a57514c256b90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -477,7 +477,8 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDiNG AND CURRENT RoW) """.stripMargin) - withSQLConf("spark.sql.windowExec.buffer.spill.threshold" -> "1") { + withSQLConf("spark.sql.windowExec.buffer.in.memory.threshold" -> "1", + "spark.sql.windowExec.buffer.spill.threshold" -> "2") { assertSpilled(sparkContext, "test with low buffer spill threshold") { checkAnswer(actual, expected) }