From c386e56beec70577fced050be67d4c8af9205326 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 30 Nov 2015 14:46:55 -0800 Subject: [PATCH] [SPARK-12113] [SQL] Add some timing metrics for blocking pipelines. This patch adds timing metrics to portions of the execution. Row at a time pipelining means these metrics can only be added at the end of blocking phases. The metrics add in this patch should be interpreted as a event timeline where 0 means the beginning of the execution. The metric is computed as when the last task of the blocking operator computes. This makes sense if it is interpreted as a timeline and gives a measure of wall clock latency of that phase. For example, in a plan with Scan -> Agg -> Exchange -> Agg -> Project. There are two blocking phases: the end of the first agg (when it's done the agg and not yet returned the results) and similar for the second agg. This patch adds a timing to each that is the time since the beginning. For example it might contain Agg1: 10 seconds Agg2: 11 seconds This captures a timeline so it means that Scan + Agg1 took 10 seconds. Agg1 returning results + exchange + agg2 took 1 second. We can post process this timeline to get the time spent entirely in one pipeline. This adds the metrics to Agg and Sort but we should add more in subsequent patches. The patch also does not account of clock skew between the different machines. If this is a problem in practice, we can adjust for that as well if ntp is not used. --- .../spark/sql/execution/QueryExecution.scala | 3 +++ .../org/apache/spark/sql/execution/Sort.scala | 5 +++- .../spark/sql/execution/SparkPlan.scala | 11 ++++++++ .../aggregate/TungstenAggregate.scala | 6 +++-- .../sql/execution/metric/SQLMetrics.scala | 26 +++++++++++++++++++ 5 files changed, 48 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 107570f9dbcc8..2da76e231cf67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -51,6 +51,9 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { // only used for execution. lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan) + // Set the start time for the entire plan. + executedPlan.setStartTimeMs(System.currentTimeMillis()) + /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala index 24207cb46fd29..a61e7a392f448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala @@ -52,7 +52,9 @@ case class Sort( override private[sql] lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "blockPhaseFinishTime" -> SQLMetrics.createTimingMetric( + sparkContext, "blocking phase finish time", startTimeMs)) protected override def doExecute(): RDD[InternalRow] = { val schema = child.schema @@ -89,6 +91,7 @@ case class Sort( val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) + longMetric("blockPhaseFinishTime") += System.currentTimeMillis() dataSize += sorter.getPeakMemoryUsage spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index a78177751c9dc..1404054c2ea4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -78,6 +78,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ private[sql] def longMetric(name: String): LongSQLMetric = metrics(name).asInstanceOf[LongSQLMetric] + /** + * Called in the driver to set the start time for the entire plan. Recursively set for the + * children. The executors use this to have an approximate timestamp for when the query starts. + * This should be used only for metrics as the clock cannot be sychronized across the cluster. + */ + private[sql] var startTimeMs: Long = 0 + private[sql] def setStartTimeMs(v: Long): Unit = { + startTimeMs = v + children.foreach(_.setStartTimeMs(v)) + } + // TODO: Move to `DistributedPlan` /** Specifies how data is partitioned across different nodes in the cluster. */ def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 920de615e1d86..1833d489b7fff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -50,7 +50,9 @@ case class TungstenAggregate( "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "blockPhaseFinishTime" -> SQLMetrics.createTimingMetric( + sparkContext, "blocking phase finish time", startTimeMs)) override def outputsUnsafeRows: Boolean = true @@ -84,7 +86,6 @@ case class TungstenAggregate( val spillSize = longMetric("spillSize") child.execute().mapPartitions { iter => - val hasInput = iter.hasNext if (!hasInput && groupingExpressions.nonEmpty) { // This is a grouped aggregate and the input iterator is empty, @@ -108,6 +109,7 @@ case class TungstenAggregate( numOutputRows, dataSize, spillSize) + longMetric("blockPhaseFinishTime") += System.currentTimeMillis() if (!hasInput && groupingExpressions.isEmpty) { numOutputRows += 1 Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 1c253e3942e95..d16074fe74fd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -148,6 +148,32 @@ private[sql] object SQLMetrics { createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L) } + /** + * Create a timing metric that reports duration in millis relative to startTime. + * + * The expected usage pattern is: + * On the driver: + * metric = createTimingMetric(..., System.currentTimeMillis) + * On each executor + * < Do some work > + * metric += System.currentTimeMillis + * The metric will then output the latest value across all the executors. This is a proxy for + * wall clock latency as it measures when the last executor finished this stage. + */ + def createTimingMetric(sc: SparkContext, name: String, startTime: Long): LongSQLMetric = { + val stringValue = (values: Seq[Long]) => { + val validValues = values.filter(_ >= startTime) + if (validValues.isEmpty) { + // The clocks between the different machines are not perfectly synced so this can happen. + "0" + } else { + val max = validValues.max + Utils.msDurationToString(max - startTime) + } + } + createLongMetric(sc, name, stringValue, 0) + } + /** * A metric that its value will be ignored. Use this one when we need a metric parameter but don't * care about the value.