Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -844,24 +844,47 @@ object SQLConf {
.stringConf
.createWithDefaultFunction(() => TimeZone.getDefault.getID)

val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.in.memory.threshold")
.internal()
.doc("Threshold for number of rows guaranteed to be held in memory by the window operator")
.intConf
.createWithDefault(4096)

val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in window operator")
.doc("Threshold for number of rows to be spilled by window operator")
.intConf
.createWithDefault(4096)
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just have one config for both window and SMJ? ideally we can say this config is for ExternalAppendOnlyUnsafeRowArray

Copy link
Contributor Author

@tejasapatil tejasapatil Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with that. We can even go a step further and just have two configs : in-mem threshold and spill threshold at the ExternalAppendOnlyUnsafeRowArray for all its clients (currently SMJ, cartesian product, Window). That way we have consistency across all clients and both knobs. One downside is backward compatibility : spill threshold was already defined per operator level and people might be using it in prod.

Let me know what you think about that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's keep them separated for each operator.

buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
.internal()
.doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " +
"join operator")
.intConf
.createWithDefault(Int.MaxValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a reasonable default value? won't it lead to OOM according to the document?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the current value. I suppose you want to be able to tune it if you have to. Not all of us are running Spark at FB scale :)...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before introducing ExternalAppendOnlyUnsafeRowArray, SMJ used to hold in-memory data in scala's ArrayBuffer. Its backed by an array which would at max be Int.MaxValue in size... so this default is keeping things as they were before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it


val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in sort merge join operator")
.doc("Threshold for number of rows to be spilled by sort merge join operator")
.intConf
.createWithDefault(Int.MaxValue)
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
.internal()
.doc("Threshold for number of rows guaranteed to be held in memory by the cartesian " +
"product operator")
.intConf
.createWithDefault(4096)

val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in cartesian product operator")
.doc("Threshold for number of rows to be spilled by cartesian product operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

Expand Down Expand Up @@ -1137,11 +1160,19 @@ class SQLConf extends Serializable with Logging {

def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)

def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD)

def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)

def sortMergeJoinExecBufferInMemoryThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)

def sortMergeJoinExecBufferSpillThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)

def cartesianProductExecBufferInMemoryThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD)

def cartesianProductExecBufferSpillThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ import org.apache.spark.storage.BlockManager
import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}

/**
* An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined
* threshold of rows is reached.
* An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
* until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
* would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
* excessive memory consumption). Setting these threshold involves following trade-offs:
*
* Setting spill threshold faces following trade-off:
*
* - If the spill threshold is too high, the in-memory array may occupy more memory than is
* available, resulting in OOM.
* - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes.
* This may lead to a performance regression compared to the normal case of using an
* [[ArrayBuffer]] or [[Array]].
* - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
* than is available, resulting in OOM.
* - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
* excessive disk writes. This may lead to a performance regression compared to the normal case
* of using an [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskMemoryManager: TaskMemoryManager,
Expand All @@ -49,21 +49,23 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskContext: TaskContext,
initialSize: Int,
pageSizeBytes: Long,
numRowsInMemoryBufferThreshold: Int,
numRowsSpillThreshold: Int) extends Logging {

def this(numRowsSpillThreshold: Int) {
def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) {
this(
TaskContext.get().taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get(),
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
numRowsInMemoryBufferThreshold,
numRowsSpillThreshold)
}

private val initialSizeOfInMemoryBuffer =
Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)
Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold)

private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
Expand Down Expand Up @@ -102,11 +104,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
}

def add(unsafeRow: UnsafeRow): Unit = {
if (numRows < numRowsSpillThreshold) {
if (numRows < numRowsInMemoryBufferThreshold) {
inMemoryBuffer += unsafeRow.copy()
} else {
if (spillableArray == null) {
logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " +
logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold rows, switching to " +
s"${classOf[UnsafeExternalSorter].getName}")

// We will not sort the rows, so prefixComparator and recordComparator are null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ class UnsafeCartesianRDD(
left : RDD[UnsafeRow],
right : RDD[UnsafeRow],
numFieldsOfRight: Int,
inMemoryBufferThreshold: Int,
spillThreshold: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {

override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
val rowArray = new ExternalAppendOnlyUnsafeRowArray(inMemoryBufferThreshold, spillThreshold)

val partition = split.asInstanceOf[CartesianPartition]
rdd2.iterator(partition.s2, context).foreach(rowArray.add)
Expand Down Expand Up @@ -71,9 +72,12 @@ case class CartesianProductExec(
val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]

val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold

val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold)
val pair = new UnsafeCartesianRDD(
leftResults,
rightResults,
right.output.size,
sqlContext.conf.cartesianProductExecBufferInMemoryThreshold,
sqlContext.conf.cartesianProductExecBufferSpillThreshold)
pair.mapPartitionsWithIndexInternal { (index, iter) =>
val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
val filtered = if (condition.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,14 @@ case class SortMergeJoinExec(
sqlContext.conf.sortMergeJoinExecBufferSpillThreshold
}

private def getInMemoryThreshold: Int = {
sqlContext.conf.sortMergeJoinExecBufferInMemoryThreshold
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
val boundCondition: (InternalRow) => Boolean = {
condition.map { cond =>
Expand All @@ -158,6 +163,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
Expand Down Expand Up @@ -201,6 +207,7 @@ case class SortMergeJoinExec(
keyOrdering,
streamedIter = RowIterator.fromScala(leftIter),
bufferedIter = RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
val rightNullRow = new GenericInternalRow(right.output.length)
Expand All @@ -214,6 +221,7 @@ case class SortMergeJoinExec(
keyOrdering,
streamedIter = RowIterator.fromScala(rightIter),
bufferedIter = RowIterator.fromScala(leftIter),
inMemoryThreshold,
spillThreshold
)
val leftNullRow = new GenericInternalRow(left.output.length)
Expand Down Expand Up @@ -247,6 +255,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
Expand Down Expand Up @@ -281,6 +290,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
Expand Down Expand Up @@ -322,6 +332,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
Expand Down Expand Up @@ -420,8 +431,10 @@ case class SortMergeJoinExec(
val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName

val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold

ctx.addMutableState(clsName, matches, s"$matches = new $clsName($spillThreshold);")
ctx.addMutableState(clsName, matches,
s"$matches = new $clsName($inMemoryThreshold, $spillThreshold);")
// Copy the left keys as class members so they could be used in next function call.
val matchedKeyVars = copyKeys(ctx, leftKeyVars)

Expand Down Expand Up @@ -626,14 +639,18 @@ case class SortMergeJoinExec(
* @param streamedIter an input whose rows will be streamed.
* @param bufferedIter an input whose rows will be buffered to construct sequences of rows that
* have the same join key.
* @param inMemoryThreshold Threshold for number of rows guaranteed to be held in memory by
* internal buffer
* @param spillThreshold Threshold for number of rows to be spilled by internal buffer
*/
private[joins] class SortMergeJoinScanner(
streamedKeyGenerator: Projection,
bufferedKeyGenerator: Projection,
keyOrdering: Ordering[InternalRow],
streamedIter: RowIterator,
bufferedIter: RowIterator,
bufferThreshold: Int) {
inMemoryThreshold: Int,
spillThreshold: Int) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
Expand All @@ -644,7 +661,8 @@ private[joins] class SortMergeJoinScanner(
*/
private[this] var matchJoinKey: InternalRow = _
/** Buffered rows from the buffered side of the join. This is empty if there are no matches. */
private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(bufferThreshold)
private[this] val bufferedMatches =
new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)

// Initialization (note: do _not_ want to advance streamed here).
advancedBufferedToRowWithNullFreeJoinKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ case class WindowExec(
// Unwrap the expressions and factories from the map.
val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold

// Start processing.
Expand Down Expand Up @@ -322,7 +323,8 @@ case class WindowExec(
val inputFields = child.output.length

val buffer: ExternalAppendOnlyUnsafeRowArray =
new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)

var bufferIterator: Iterator[UnsafeRow] = _

val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType))
Expand Down
3 changes: 2 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {

test("test SortMergeJoin (with spill)") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
"spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "0") {
"spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0",
"spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") {

assertSpilled(sparkContext, "inner join") {
checkAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
var sum = 0L
for (_ <- 0L until iterations) {
val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
val array = new ExternalAppendOnlyUnsafeRowArray(
ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer,
numSpillThreshold)

rows.foreach(x => array.add(x))

val iterator = array.generateIterator()
Expand Down Expand Up @@ -143,7 +146,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
var sum = 0L
for (_ <- 0L until iterations) {
val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold)
rows.foreach(x => array.add(x))

val iterator = array.generateIterator()
Expand Down
Loading