diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 107570f9dbcc8..2da76e231cf67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -51,6 +51,9 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { // only used for execution. lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan) + // Set the start time for the entire plan. + executedPlan.setStartTimeMs(System.currentTimeMillis()) + /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala index 24207cb46fd29..a61e7a392f448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala @@ -52,7 +52,9 @@ case class Sort( override private[sql] lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "blockPhaseFinishTime" -> SQLMetrics.createTimingMetric( + sparkContext, "blocking phase finish time", startTimeMs)) protected override def doExecute(): RDD[InternalRow] = { val schema = child.schema @@ -89,6 +91,7 @@ case class Sort( val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) + longMetric("blockPhaseFinishTime") += System.currentTimeMillis() dataSize += sorter.getPeakMemoryUsage spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index a78177751c9dc..1404054c2ea4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -78,6 +78,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ private[sql] def longMetric(name: String): LongSQLMetric = metrics(name).asInstanceOf[LongSQLMetric] + /** + * Called in the driver to set the start time for the entire plan. Recursively set for the + * children. The executors use this to have an approximate timestamp for when the query starts. + * This should be used only for metrics as the clock cannot be sychronized across the cluster. + */ + private[sql] var startTimeMs: Long = 0 + private[sql] def setStartTimeMs(v: Long): Unit = { + startTimeMs = v + children.foreach(_.setStartTimeMs(v)) + } + // TODO: Move to `DistributedPlan` /** Specifies how data is partitioned across different nodes in the cluster. */ def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 920de615e1d86..1833d489b7fff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -50,7 +50,9 @@ case class TungstenAggregate( "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "blockPhaseFinishTime" -> SQLMetrics.createTimingMetric( + sparkContext, "blocking phase finish time", startTimeMs)) override def outputsUnsafeRows: Boolean = true @@ -84,7 +86,6 @@ case class TungstenAggregate( val spillSize = longMetric("spillSize") child.execute().mapPartitions { iter => - val hasInput = iter.hasNext if (!hasInput && groupingExpressions.nonEmpty) { // This is a grouped aggregate and the input iterator is empty, @@ -108,6 +109,7 @@ case class TungstenAggregate( numOutputRows, dataSize, spillSize) + longMetric("blockPhaseFinishTime") += System.currentTimeMillis() if (!hasInput && groupingExpressions.isEmpty) { numOutputRows += 1 Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 1c253e3942e95..d16074fe74fd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -148,6 +148,32 @@ private[sql] object SQLMetrics { createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L) } + /** + * Create a timing metric that reports duration in millis relative to startTime. + * + * The expected usage pattern is: + * On the driver: + * metric = createTimingMetric(..., System.currentTimeMillis) + * On each executor + * < Do some work > + * metric += System.currentTimeMillis + * The metric will then output the latest value across all the executors. This is a proxy for + * wall clock latency as it measures when the last executor finished this stage. + */ + def createTimingMetric(sc: SparkContext, name: String, startTime: Long): LongSQLMetric = { + val stringValue = (values: Seq[Long]) => { + val validValues = values.filter(_ >= startTime) + if (validValues.isEmpty) { + // The clocks between the different machines are not perfectly synced so this can happen. + "0" + } else { + val max = validValues.max + Utils.msDurationToString(max - startTime) + } + } + createLongMetric(sc, name, stringValue, 0) + } + /** * A metric that its value will be ignored. Use this one when we need a metric parameter but don't * care about the value.