Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private MemoryBlock currentPage = null;
private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
private long totalSpillBytes = 0L;
private volatile SpillableIterator readingIterator = null;

public static UnsafeExternalSorter createWithExistingInMemorySorter(
Expand Down Expand Up @@ -215,7 +216,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
// we might not be able to get memory for the pointer array.

taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);

totalSpillBytes += spillSize;
return spillSize;
}

Expand Down Expand Up @@ -246,6 +247,13 @@ public long getPeakMemoryUsedBytes() {
return peakMemoryUsedBytes;
}

/**
* Return the total number of bytes that has been spilled into disk so far.
*/
public long getSpillSize() {
return totalSpillBytes;
}

@VisibleForTesting
public int getNumberOfAllocatedPages() {
return allocatedPages.size();
Expand Down Expand Up @@ -499,6 +507,8 @@ public long spill() throws IOException {
released += inMemSorter.getMemoryUsage();
inMemSorter.free();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);
totalSpillBytes += released;
return released;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ public KVSorterIterator sortedIterator() throws IOException {
}
}

/**
* Return the total number of bytes that has been spilled into disk so far.
*/
public long getSpillSize() {
return sorter.getSpillSize();
}

/**
* Return the peak memory used so far, in bytes.
*/
Expand Down
19 changes: 11 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ case class Sort(
private val enableRadixSort = sqlContext.conf.enableRadixSort

override private[sql] lazy val metrics = Map(
"sortTime" -> SQLMetrics.createLongMetric(sparkContext, "sort time"),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))

def createSorter(): UnsafeExternalRowSorter = {
Expand Down Expand Up @@ -86,8 +86,9 @@ case class Sort(
}

protected override def doExecute(): RDD[InternalRow] = {
val dataSize = longMetric("dataSize")
val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
val sortTime = longMetric("sortTime")

child.execute().mapPartitionsInternal { iter =>
val sorter = createSorter()
Expand All @@ -96,10 +97,12 @@ case class Sort(
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
val beforeSort = System.nanoTime()

val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])

dataSize += sorter.getPeakMemoryUsage
sortTime += (System.nanoTime() - beforeSort) / 1000000
peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)

Expand Down Expand Up @@ -145,19 +148,19 @@ case class Sort(
ctx.copyResult = false

val outputRow = ctx.freshName("outputRow")
val dataSize = metricTerm(ctx, "dataSize")
val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val spillSizeBefore = ctx.freshName("spillSizeBefore")
val startTime = ctx.freshName("startTime")
val sortTime = metricTerm(ctx, "sortTime")
s"""
| if ($needToSort) {
| $addToSorter();
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
| long $startTime = System.nanoTime();
| $addToSorter();
| $sortedIterator = $sorterVariable.sort();
| $sortTime.add(System.nanoTime() - $startTime);
| $dataSize.add($sorterVariable.getPeakMemoryUsage());
| $sortTime.add((System.nanoTime() - $startTime) / 1000000);
| $peakMemory.add($sorterVariable.getPeakMemoryUsage());
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
| $needToSort = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams

import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.unsafe.Platform

/**
Expand All @@ -39,12 +40,17 @@ import org.apache.spark.unsafe.Platform
*
* @param numFields the number of fields in the row being serialized.
*/
private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new UnsafeRowSerializerInstance(numFields)
private[sql] class UnsafeRowSerializer(
numFields: Int,
dataSize: LongSQLMetric = null) extends Serializer with Serializable {
override def newInstance(): SerializerInstance =
new UnsafeRowSerializerInstance(numFields, dataSize)
override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true
}

private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInstance {
private class UnsafeRowSerializerInstance(
numFields: Int,
dataSize: LongSQLMetric) extends SerializerInstance {
/**
* Serializes a stream of UnsafeRows. Within the stream, each record consists of a record
* length (stored as a 4-byte integer, written high byte first), followed by the record's bytes.
Expand All @@ -54,9 +60,14 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
private[this] val dOut: DataOutputStream =
new DataOutputStream(new BufferedOutputStream(out))

// LongSQLMetricParam.add() is faster than LongSQLMetric.+=
val localDataSize = if (dataSize != null) dataSize.localValue else null

override def writeValue[T: ClassTag](value: T): SerializationStream = {
val row = value.asInstanceOf[UnsafeRow]

if (localDataSize != null) {
localDataSize.add(row.getSizeInBytes)
}
dOut.writeInt(row.getSizeInBytes)
row.writeToStream(dOut, writeBuffer)
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
import org.apache.spark.unsafe.KVIterator

Expand All @@ -52,8 +52,9 @@ case class TungstenAggregate(

override private[sql] lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"))

override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)

Expand Down Expand Up @@ -83,7 +84,7 @@ case class TungstenAggregate(

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numOutputRows = longMetric("numOutputRows")
val dataSize = longMetric("dataSize")
val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")

child.execute().mapPartitions { iter =>
Expand All @@ -107,7 +108,7 @@ case class TungstenAggregate(
iter,
testFallbackStartsAt,
numOutputRows,
dataSize,
peakMemory,
spillSize)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Expand Down Expand Up @@ -212,10 +213,14 @@ case class TungstenAggregate(
""".stripMargin)

val numOutput = metricTerm(ctx, "numOutputRows")
val aggTime = metricTerm(ctx, "aggTime")
val beforeAgg = ctx.freshName("beforeAgg")
s"""
| while (!$initAgg) {
| $initAgg = true;
| long $beforeAgg = System.nanoTime();
| $doAgg();
| $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
|
| // output the result
| ${genResult.trim}
Expand Down Expand Up @@ -303,15 +308,17 @@ case class TungstenAggregate(
*/
def finishAggregate(
hashMap: UnsafeFixedWidthAggregationMap,
sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = {
sorter: UnsafeKVExternalSorter,
peakMemory: LongSQLMetricValue,
spillSize: LongSQLMetricValue): KVIterator[UnsafeRow, UnsafeRow] = {

// update peak execution memory
val mapMemory = hashMap.getPeakMemoryUsedBytes
val sorterMemory = Option(sorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
val peakMemory = Math.max(mapMemory, sorterMemory)
val maxMemory = Math.max(mapMemory, sorterMemory)
val metrics = TaskContext.get().taskMetrics()
metrics.incPeakExecutionMemory(peakMemory)
// TODO: update data size and spill size
peakMemory.add(maxMemory)
metrics.incPeakExecutionMemory(maxMemory)

if (sorter == null) {
// not spilled
Expand Down Expand Up @@ -365,6 +372,7 @@ case class TungstenAggregate(

true
} else {
spillSize.add(sorter.getSpillSize)
false
}
}
Expand Down Expand Up @@ -476,6 +484,8 @@ case class TungstenAggregate(
ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "")

val doAgg = ctx.freshName("doAggregateWithKeys")
val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
ctx.addNewFunction(doAgg,
s"""
${if (isVectorizedHashMapEnabled) vectorizedHashMapGenerator.generate() else ""}
Expand All @@ -486,7 +496,7 @@ case class TungstenAggregate(
${if (isVectorizedHashMapEnabled) {
s"$iterTermForVectorizedHashMap = $vectorizedHashMapTerm.rowIterator();"} else ""}

$iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm);
$iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize);
}
""")

Expand Down Expand Up @@ -528,10 +538,14 @@ case class TungstenAggregate(
} else None
}

val aggTime = metricTerm(ctx, "aggTime")
val beforeAgg = ctx.freshName("beforeAgg")
s"""
if (!$initAgg) {
$initAgg = true;
long $beforeAgg = System.nanoTime();
$doAgg();
$aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
}

// output the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TungstenAggregationIterator(
inputIter: Iterator[InternalRow],
testFallbackStartsAt: Option[(Int, Int)],
numOutputRows: LongSQLMetric,
dataSize: LongSQLMetric,
peakMemory: LongSQLMetric,
spillSize: LongSQLMetric)
extends AggregationIterator(
groupingExpressions,
Expand Down Expand Up @@ -415,11 +415,11 @@ class TungstenAggregationIterator(
if (!hasNext) {
val mapMemory = hashMap.getPeakMemoryUsedBytes
val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
val peakMemory = Math.max(mapMemory, sorterMemory)
val maxMemory = Math.max(mapMemory, sorterMemory)
val metrics = TaskContext.get().taskMetrics()
dataSize += peakMemory
peakMemory += maxMemory
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(peakMemory)
metrics.incPeakExecutionMemory(maxMemory)
}
numOutputRows += 1
res
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import scala.concurrent.duration._
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.ThreadUtils

/**
Expand All @@ -35,6 +37,12 @@ case class BroadcastExchange(
mode: BroadcastMode,
child: SparkPlan) extends Exchange {

override private[sql] lazy val metrics = Map(
"dataSize" -> SQLMetrics.createLongMetric(sparkContext, "data size (bytes)"),
"collectTime" -> SQLMetrics.createLongMetric(sparkContext, "time to collect (ms)"),
"buildTime" -> SQLMetrics.createLongMetric(sparkContext, "time to build (ms)"),
"broadcastTime" -> SQLMetrics.createLongMetric(sparkContext, "time to broadcast (ms)"))

override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)

override def sameResult(plan: SparkPlan): Boolean = plan match {
Expand All @@ -61,11 +69,21 @@ case class BroadcastExchange(
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
val beforeCollect = System.nanoTime()
// Note that we use .executeCollect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = child.executeCollect()
val beforeBuild = System.nanoTime()
longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
longMetric("dataSize") += input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum

// Construct and broadcast the relation.
sparkContext.broadcast(mode.transform(input))
val relation = mode.transform(input)
val beforeBroadcast = System.nanoTime()
longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000

val broadcasted = sparkContext.broadcast(relation)
longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
broadcasted
}
}(BroadcastExchange.executionContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.MutablePair

/**
Expand All @@ -39,6 +40,9 @@ case class ShuffleExchange(
child: SparkPlan,
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {

override private[sql] lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))

override def nodeName: String = {
val extraInfo = coordinator match {
case Some(exchangeCoordinator) if exchangeCoordinator.isEstimated =>
Expand All @@ -54,7 +58,8 @@ case class ShuffleExchange(

override def outputPartitioning: Partitioning = newPartitioning

private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
private val serializer: Serializer =
new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))

override protected def doPrepare(): Unit = {
// If an ExchangeCoordinator is needed, we register this Exchange operator
Expand Down
Loading