Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
Expand Down Expand Up @@ -715,6 +716,27 @@ object SQLConf {
.stringConf
.createWithDefault(TimeZone.getDefault().getID())

val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in window operator")
.intConf
.createWithDefault(4096)

val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in sort merge join operator")
.intConf
.createWithDefault(Int.MaxValue)

val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in cartesian product operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -945,6 +967,14 @@ class SQLConf extends Serializable with Logging {

def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)

def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)

def sortMergeJoinExecBufferSpillThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)

def cartesianProductExecBufferSpillThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)

def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.util.ConcurrentModificationException

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
import org.apache.spark.storage.BlockManager
import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}

/**
* An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined
* threshold of rows is reached.
*
* Setting spill threshold faces following trade-off:
*
* - If the spill threshold is too high, the in-memory array may occupy more memory than is
* available, resulting in OOM.
* - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes.
* This may lead to a performance regression compared to the normal case of using an
* [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskMemoryManager: TaskMemoryManager,
blockManager: BlockManager,
serializerManager: SerializerManager,
taskContext: TaskContext,
initialSize: Int,
pageSizeBytes: Long,
numRowsSpillThreshold: Int) extends Logging {

def this(numRowsSpillThreshold: Int) {
this(
TaskContext.get().taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get(),
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
numRowsSpillThreshold)
}

private val initialSizeOfInMemoryBuffer =
Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)

private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
} else {
null
}

private var spillableArray: UnsafeExternalSorter = _
private var numRows = 0

// A counter to keep track of total modifications done to this array since its creation.
// This helps to invalidate iterators when there are changes done to the backing array.
private var modificationsCount: Long = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding: this is to detect modifications in the same thread right? When does this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to detect modifications in the same thread right?

Yes

When does this happen?

It can happen when a client generates an iterator but has not iterated the whole array. Later, there are new entries being added to the array OR the array is cleared. Any attempt to use the old iterator now can give inconsistent view of the array and this counter helps with invalidation of the iterator.

In my opinion, none of the existing places where this array is used hits this scenario. But having said that I added this to make this data structure robust to the new usages would be protected against such usage (or even if there are bugs introduced in the existing usages which would otherwise silently work).


private var numFieldsPerRow = 0

def length: Int = numRows

def isEmpty: Boolean = numRows == 0

/**
* Clears up resources (eg. memory) held by the backing storage
*/
def clear(): Unit = {
if (spillableArray != null) {
// The last `spillableArray` of this task will be cleaned up via task completion listener
// inside `UnsafeExternalSorter`
spillableArray.cleanupResources()
spillableArray = null
} else if (inMemoryBuffer != null) {
inMemoryBuffer.clear()
}
numFieldsPerRow = 0
numRows = 0
modificationsCount += 1
}

def add(unsafeRow: UnsafeRow): Unit = {
if (numRows < numRowsSpillThreshold) {
inMemoryBuffer += unsafeRow.copy()
} else {
if (spillableArray == null) {
logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " +
s"${classOf[UnsafeExternalSorter].getName}")

// We will not sort the rows, so prefixComparator and recordComparator are null
spillableArray = UnsafeExternalSorter.create(
taskMemoryManager,
blockManager,
serializerManager,
taskContext,
null,
null,
initialSize,
pageSizeBytes,
numRowsSpillThreshold,
false)

// populate with existing in-memory buffered rows
if (inMemoryBuffer != null) {
inMemoryBuffer.foreach(existingUnsafeRow =>
spillableArray.insertRecord(
existingUnsafeRow.getBaseObject,
existingUnsafeRow.getBaseOffset,
existingUnsafeRow.getSizeInBytes,
0,
false)
)
inMemoryBuffer.clear()
}
numFieldsPerRow = unsafeRow.numFields()
}

spillableArray.insertRecord(
unsafeRow.getBaseObject,
unsafeRow.getBaseOffset,
unsafeRow.getSizeInBytes,
0,
false)
}

numRows += 1
modificationsCount += 1
}

/**
* Creates an [[Iterator]] for the current rows in the array starting from a user provided index
*
* If there are subsequent [[add()]] or [[clear()]] calls made on this array after creation of
* the iterator, then the iterator is invalidated thus saving clients from thinking that they
* have read all the data while there were new rows added to this array.
*/
def generateIterator(startIndex: Int): Iterator[UnsafeRow] = {
if (startIndex < 0 || (numRows > 0 && startIndex > numRows)) {
throw new ArrayIndexOutOfBoundsException(
"Invalid `startIndex` provided for generating iterator over the array. " +
s"Total elements: $numRows, requested `startIndex`: $startIndex")
}

if (spillableArray == null) {
new InMemoryBufferIterator(startIndex)
} else {
new SpillableArrayIterator(spillableArray.getIterator, numFieldsPerRow, startIndex)
}
}

def generateIterator(): Iterator[UnsafeRow] = generateIterator(startIndex = 0)

private[this]
abstract class ExternalAppendOnlyUnsafeRowArrayIterator extends Iterator[UnsafeRow] {
private val expectedModificationsCount = modificationsCount

protected def isModified(): Boolean = expectedModificationsCount != modificationsCount

protected def throwExceptionIfModified(): Unit = {
if (expectedModificationsCount != modificationsCount) {
throw new ConcurrentModificationException(
s"The backing ${classOf[ExternalAppendOnlyUnsafeRowArray].getName} has been modified " +
s"since the creation of this Iterator")
}
}
}

private[this] class InMemoryBufferIterator(startIndex: Int)
extends ExternalAppendOnlyUnsafeRowArrayIterator {

private var currentIndex = startIndex

override def hasNext(): Boolean = !isModified() && currentIndex < numRows

override def next(): UnsafeRow = {
throwExceptionIfModified()
val result = inMemoryBuffer(currentIndex)
currentIndex += 1
result
}
}

private[this] class SpillableArrayIterator(
iterator: UnsafeSorterIterator,
numFieldPerRow: Int,
startIndex: Int)
extends ExternalAppendOnlyUnsafeRowArrayIterator {

private val currentRow = new UnsafeRow(numFieldPerRow)

def init(): Unit = {
var i = 0
while (i < startIndex) {
if (iterator.hasNext) {
iterator.loadNext()
} else {
throw new ArrayIndexOutOfBoundsException(
"Invalid `startIndex` provided for generating iterator over the array. " +
s"Total elements: $numRows, requested `startIndex`: $startIndex")
}
i += 1
}
}

// Traverse upto the given [[startIndex]]
init()

override def hasNext(): Boolean = !isModified() && iterator.hasNext

override def next(): UnsafeRow = {
throwExceptionIfModified()
iterator.loadNext()
currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength)
currentRow
}
}
}

private[sql] object ExternalAppendOnlyUnsafeRowArray {
val DefaultInitialSizeOfInMemoryBuffer = 128
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,39 @@ package org.apache.spark.sql.execution.joins

import org.apache.spark._
import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.{BinaryExecNode, ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

/**
* An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
* will be much faster than building the right partition for every row in left RDD, it also
* materialize the right RDD (in case of the right RDD is nondeterministic).
*/
class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int)
class UnsafeCartesianRDD(
left : RDD[UnsafeRow],
right : RDD[UnsafeRow],
numFieldsOfRight: Int,
spillThreshold: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {

override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
// We will not sort the rows, so prefixComparator and recordComparator are null.
val sorter = UnsafeExternalSorter.create(
context.taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
context,
null,
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
false)
val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)

val partition = split.asInstanceOf[CartesianPartition]
for (y <- rdd2.iterator(partition.s2, context)) {
sorter.insertRecord(y.getBaseObject, y.getBaseOffset, y.getSizeInBytes, 0, false)
}
rdd2.iterator(partition.s2, context).foreach(rowArray.add)

// Create an iterator from sorter and wrapper it as Iterator[UnsafeRow]
def createIter(): Iterator[UnsafeRow] = {
val iter = sorter.getIterator
val unsafeRow = new UnsafeRow(numFieldsOfRight)
new Iterator[UnsafeRow] {
override def hasNext: Boolean = {
iter.hasNext
}
override def next(): UnsafeRow = {
iter.loadNext()
unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength)
unsafeRow
}
}
}
// Create an iterator from rowArray
def createIter(): Iterator[UnsafeRow] = rowArray.generateIterator()

val resultIter =
for (x <- rdd1.iterator(partition.s1, context);
y <- createIter()) yield (x, y)
CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]](
resultIter, sorter.cleanupResources())
resultIter, rowArray.clear())
}
}

Expand All @@ -97,7 +71,9 @@ case class CartesianProductExec(
val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]

val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size)
val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold

val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold)
pair.mapPartitionsWithIndexInternal { (index, iter) =>
val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
val filtered = if (condition.isDefined) {
Expand Down
Loading