Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
// only used for execution.
lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)

// Set the start time for the entire plan.
executedPlan.setStartTimeMs(System.currentTimeMillis())

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ case class Sort(

override private[sql] lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"blockPhaseFinishTime" -> SQLMetrics.createTimingMetric(
sparkContext, "blocking phase finish time", startTimeMs))

protected override def doExecute(): RDD[InternalRow] = {
val schema = child.schema
Expand Down Expand Up @@ -89,6 +91,7 @@ case class Sort(

val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])

longMetric("blockPhaseFinishTime") += System.currentTimeMillis()
dataSize += sorter.getPeakMemoryUsage
spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
private[sql] def longMetric(name: String): LongSQLMetric =
metrics(name).asInstanceOf[LongSQLMetric]

/**
* Called in the driver to set the start time for the entire plan. Recursively set for the
* children. The executors use this to have an approximate timestamp for when the query starts.
* This should be used only for metrics as the clock cannot be sychronized across the cluster.
*/
private[sql] var startTimeMs: Long = 0
private[sql] def setStartTimeMs(v: Long): Unit = {
startTimeMs = v
children.foreach(_.setStartTimeMs(v))
}

// TODO: Move to `DistributedPlan`
/** Specifies how data is partitioned across different nodes in the cluster. */
def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ case class TungstenAggregate(
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"blockPhaseFinishTime" -> SQLMetrics.createTimingMetric(
sparkContext, "blocking phase finish time", startTimeMs))

override def outputsUnsafeRows: Boolean = true

Expand Down Expand Up @@ -84,7 +86,6 @@ case class TungstenAggregate(
val spillSize = longMetric("spillSize")

child.execute().mapPartitions { iter =>

val hasInput = iter.hasNext
if (!hasInput && groupingExpressions.nonEmpty) {
// This is a grouped aggregate and the input iterator is empty,
Expand All @@ -108,6 +109,7 @@ case class TungstenAggregate(
numOutputRows,
dataSize,
spillSize)
longMetric("blockPhaseFinishTime") += System.currentTimeMillis()
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,32 @@ private[sql] object SQLMetrics {
createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L)
}

/**
* Create a timing metric that reports duration in millis relative to startTime.
*
* The expected usage pattern is:
* On the driver:
* metric = createTimingMetric(..., System.currentTimeMillis)
* On each executor
* < Do some work >
* metric += System.currentTimeMillis
* The metric will then output the latest value across all the executors. This is a proxy for
* wall clock latency as it measures when the last executor finished this stage.
*/
def createTimingMetric(sc: SparkContext, name: String, startTime: Long): LongSQLMetric = {
val stringValue = (values: Seq[Long]) => {
val validValues = values.filter(_ >= startTime)
if (validValues.isEmpty) {
// The clocks between the different machines are not perfectly synced so this can happen.
"0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice feature for performance analysis!

I am wondering if we can detect whether the machine clocks are synced when starting Spark?

} else {
val max = validValues.max
Utils.msDurationToString(max - startTime)
}
}
createLongMetric(sc, name, stringValue, 0)
}

/**
* A metric that its value will be ignored. Use this one when we need a metric parameter but don't
* care about the value.
Expand Down