From 4d941c7bb606ebf7d8a76b7b3e3d6fd6a3a00b95 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Feb 2021 20:06:12 -0800 Subject: [PATCH 01/14] Add interface for DS v2 metrics. --- .../sql/connector/read/CustomMetric.java | 43 +++++++++++++++++++ .../sql/connector/read/PartitionReader.java | 8 ++++ .../apache/spark/sql/connector/read/Scan.java | 9 ++++ .../datasources/v2/BatchScanExec.scala | 15 ++++++- .../datasources/v2/ContinuousScanExec.scala | 15 ++++++- .../datasources/v2/DataSourceRDD.scala | 18 ++++++-- .../v2/DataSourceV2ScanExecBase.scala | 9 +++- .../datasources/v2/MicroBatchScanExec.scala | 15 ++++++- .../continuous/ContinuousDataSourceRDD.scala | 15 +++++-- .../ContinuousQueuedDataReader.scala | 3 ++ 10 files changed, 135 insertions(+), 15 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/CustomMetric.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/CustomMetric.java new file mode 100644 index 000000000000..0cc60262c3e6 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/CustomMetric.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; + +/** + * A custom metric for {@link Scan}. + * + * @since 3.2.0 + */ +@Evolving +public interface CustomMetric { + /** + * Returns the name of custom metric. + */ + String getName(); + + /** + * Returns the description of custom metric. + */ + String getDescription(); + + /** + * Returns the value of custom metric. + */ + Long getValue(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index 23fbd95800e2..78ac2af5f26c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -48,4 +48,12 @@ public interface PartitionReader extends Closeable { * Return the current record. This method should return same value until `next` is called. */ T get(); + + /** + * Returns an array of custom metrics. By default it returns empty array. + */ + default CustomMetric[] getCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index 4146f217985b..e1b8f3dd7e8f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -102,4 +102,13 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { default ContinuousStream toContinuousStream(String checkpointLocation) { throw new UnsupportedOperationException(description() + ": Continuous scan are not supported"); } + + /** + * Returns an array of supported custom metrics with name and description. + * By default it returns empty array. + */ + default CustomMetric[] supportedCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index c199df676ced..c399026d1262 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, Scan} /** * Physical plan node for scanning a batch of data from a data source v2. @@ -44,8 +44,19 @@ case class BatchScanExec( override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() + /** + * The callback function which is called when the output iterator of input RDD is consumed + * completely. + */ + private def onOutputCompletion(reader: PartitionReader[_]) = { + reader.getCustomMetrics.foreach { metric => + longMetric(metric.getName) += metric.getValue + } + } + override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar) + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, + onOutputCompletion) } override def doCanonicalize(): BatchScanExec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index dc95d157e40f..d00165bd4d8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, Scan} import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, Offset} import org.apache.spark.sql.execution.streaming.continuous._ @@ -47,6 +47,16 @@ case class ContinuousScanExec( stream.createContinuousReaderFactory() } + /** + * The callback function which is called when the output iterator of input RDD is consumed + * completely. + */ + private def onOutputCompletion(reader: PartitionReader[_]) = { + reader.getCustomMetrics.foreach { metric => + longMetric(metric.getName) += metric.getValue + } + } + override lazy val inputRDD: RDD[InternalRow] = { EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), @@ -58,6 +68,7 @@ case class ContinuousScanExec( sqlContext.conf.continuousStreamingExecutorPollIntervalMs, partitions, schema, - readerFactory.asInstanceOf[ContinuousPartitionReaderFactory]) + readerFactory.asInstanceOf[ContinuousPartitionReaderFactory], + onOutputCompletion) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 63403b957723..562b170bfae3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.CompletionIterator class DataSourceRDDPartition(val index: Int, val inputPartition: InputPartition) extends Partition with Serializable @@ -36,7 +37,8 @@ class DataSourceRDD( sc: SparkContext, @transient private val inputPartitions: Seq[InputPartition], partitionReaderFactory: PartitionReaderFactory, - columnarReads: Boolean) + columnarReads: Boolean, + onCompletion: PartitionReader[_] => Unit = _ => {}) extends RDD[InternalRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { @@ -55,11 +57,21 @@ class DataSourceRDD( val (iter, reader) = if (columnarReads) { val batchReader = partitionReaderFactory.createColumnarReader(inputPartition) val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader)) - (iter, batchReader) + def completionFunction = { + onCompletion(batchReader) + } + val completionIterator = CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]]( + iter, completionFunction) + (completionIterator, batchReader) } else { val rowReader = partitionReaderFactory.createReader(inputPartition) val iter = new MetricsRowIterator(new PartitionIterator[InternalRow](rowReader)) - (iter, rowReader) + def completionFunction = { + onCompletion(rowReader) + } + val completionIterator = CompletionIterator[InternalRow, Iterator[InternalRow]]( + iter, completionFunction) + (completionIterator, rowReader) } context.addTaskCompletionListener[Unit](_ => reader.close()) // TODO: SPARK-25083 remove the type erasure hack in data source scan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 083c6bc7999b..4862fb36e367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -32,8 +32,13 @@ import org.apache.spark.util.Utils trait DataSourceV2ScanExecBase extends LeafExecNode { - override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override lazy val metrics = { + val customMetrics = scan.supportedCustomMetrics().map { customMetric => + customMetric.getName -> SQLMetrics.createMetric(sparkContext, customMetric.getDescription) + } + Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++ + customMetrics + } def scan: Scan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index bca28e3cacb6..e09a9a4eebb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, Scan} import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} /** @@ -45,7 +45,18 @@ case class MicroBatchScanExec( override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() + /** + * The callback function which is called when the output iterator of input RDD is consumed + * completely. + */ + private def onOutputCompletion(reader: PartitionReader[_]) = { + reader.getCustomMetrics.foreach { metric => + longMetric(metric.getName) += metric.getValue + } + } + override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar) + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, + onOutputCompletion) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala index 5ee27c71aa73..b30311c507a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory import org.apache.spark.sql.types.StructType -import org.apache.spark.util.NextIterator +import org.apache.spark.util.{CompletionIterator, NextIterator} class ContinuousDataSourceRDDPartition( val index: Int, @@ -52,7 +52,8 @@ class ContinuousDataSourceRDD( epochPollIntervalMs: Long, private val inputPartitions: Seq[InputPartition], schema: StructType, - partitionReaderFactory: ContinuousPartitionReaderFactory) + partitionReaderFactory: ContinuousPartitionReaderFactory, + onCompletion: PartitionReader[_] => Unit = _ => {}) extends RDD[InternalRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { @@ -88,7 +89,7 @@ class ContinuousDataSourceRDD( partition.queueReader } - new NextIterator[InternalRow] { + val nextIter = new NextIterator[InternalRow] { override def getNext(): InternalRow = { readerForPartition.next() match { case null => @@ -100,6 +101,12 @@ class ContinuousDataSourceRDD( override def close(): Unit = {} } + + def completionFunction = { + onCompletion(readerForPartition.getPartitionReader()) + } + CompletionIterator[InternalRow, Iterator[InternalRow]]( + nextIter, completionFunction) } override def getPreferredLocations(split: Partition): Seq[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala index dff2fa69e42f..02893f274902 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReader, PartitionOffset} import org.apache.spark.sql.types.StructType import org.apache.spark.util.ThreadUtils @@ -47,6 +48,8 @@ class ContinuousQueuedDataReader( // Important sequencing - we must get our starting point before the provider threads start running private var currentOffset: PartitionOffset = reader.getOffset + def getPartitionReader(): PartitionReader[InternalRow] = reader + /** * The record types in the read buffer. */ From 4215ec0930082c34316c21f943a6395ebbeff72f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 27 Mar 2021 11:14:28 -0700 Subject: [PATCH 02/14] update test. --- .../spark/sql/execution/SparkPlanInfo.scala | 3 ++- .../adaptive/AdaptiveSparkPlanExec.scala | 3 ++- .../datasources/v2/BatchScanExec.scala | 4 +-- .../datasources/v2/ContinuousScanExec.scala | 4 +-- .../datasources/v2/MicroBatchScanExec.scala | 4 +-- .../sql/execution/metric/SQLMetricInfo.scala | 3 ++- .../sql/execution/metric/SQLMetrics.scala | 9 ++++--- .../execution/ui/SQLAppStatusListener.scala | 6 ++++- .../sql/execution/ui/SQLAppStatusStore.scala | 3 ++- .../sql/execution/ui/SparkPlanGraph.scala | 6 +++-- .../ui/MetricsAggregationBenchmark.scala | 5 ++-- .../status/api/v1/sql/SqlResourceSuite.scala | 26 ++++++++++--------- 12 files changed, 46 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index db587dd98685..ac7a59b9c348 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -61,7 +61,8 @@ private[execution] object SparkPlanInfo { case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => - new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) + new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType, + metric.aggregateMethod) } // dump the file scan metadata (e.g file path) to event log diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index fba32a4dd1da..ca24f5060a9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -627,7 +627,8 @@ case class AdaptiveSparkPlanExec( // of the new plan nodes, so that it can track the valid accumulator updates later // and display SQL metrics correctly. val newMetrics = newSubPlans.flatMap { p => - p.flatMap(_.metrics.values.map(m => SQLPlanMetric(m.name.get, m.id, m.metricType))) + p.flatMap(_.metrics.values.map(m => + SQLPlanMetric(m.name.get, m.id, m.metricType, m.aggregateMethod))) } context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates( executionId.toLong, newMetrics)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index c399026d1262..b7957662c412 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -49,8 +49,8 @@ case class BatchScanExec( * completely. */ private def onOutputCompletion(reader: PartitionReader[_]) = { - reader.getCustomMetrics.foreach { metric => - longMetric(metric.getName) += metric.getValue + reader.currentMetricsValues.foreach { metric => + longMetric(metric.name()) += metric.value() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index d00165bd4d8f..7e62c4bb3f5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -52,8 +52,8 @@ case class ContinuousScanExec( * completely. */ private def onOutputCompletion(reader: PartitionReader[_]) = { - reader.getCustomMetrics.foreach { metric => - longMetric(metric.getName) += metric.getValue + reader.currentMetricsValues().foreach { metric => + longMetric(metric.name()) += metric.value() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index e09a9a4eebb5..2ae21d565a9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -50,8 +50,8 @@ case class MicroBatchScanExec( * completely. */ private def onOutputCompletion(reader: PartitionReader[_]) = { - reader.getCustomMetrics.foreach { metric => - longMetric(metric.getName) += metric.getValue + reader.currentMetricsValues().foreach { metric => + longMetric(metric.name()) += metric.value() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala index adb81519dbc8..869b654ee82e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala @@ -27,4 +27,5 @@ import org.apache.spark.annotation.DeveloperApi class SQLMetricInfo( val name: String, val accumulatorId: Long, - val metricType: String) + val metricType: String, + val aggregateMethod: (Array[Long], Array[Long]) => String) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 938b359850f6..95722948f987 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -37,7 +37,7 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils} class SQLMetric( val metricType: String, initValue: Long = 0L, - aggregateMethod: (Array[Long], Array[Long]) => String) extends AccumulatorV2[Long, Long] { + val aggregateMethod: (Array[Long], Array[Long]) => String) extends AccumulatorV2[Long, Long] { // This is a workaround for SPARK-11013. // We may use -1 as initial value of the accumulator, if the accumulator is valid, we will // update it at the end of task and the value will be at least 0. Then we can filter out the -1 @@ -46,7 +46,7 @@ class SQLMetric( private var _zeroValue = initValue override def copy(): SQLMetric = { - val newAcc = new SQLMetric(metricType, _value) + val newAcc = new SQLMetric(metricType, _value, aggregateMethod = aggregateMethod) newAcc._zeroValue = initValue newAcc } @@ -95,10 +95,13 @@ object SQLMetrics { private val baseForAvgMetric: Int = 10 - private def defaultAggregateMethod(metricType: String): (Array[Long], Array[Long]) => String = { + // For built-in SQLMetrics, we use default aggregation method. + def defaultAggregateMethod( + metricType: String): (Array[Long], Array[Long]) => String = { SQLMetrics.stringValue(metricType, _, _) } + // For DS V2 custom metrics, the aggregation method is custimized. private def customAggregateMethod( aggregator: (Array[Long]) => String): (Array[Long], Array[Long]) => String = { (metrics, _) => aggregator(metrics) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 963aec7ca36c..60c4b59ef459 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -199,6 +199,9 @@ class SQLAppStatusListener( private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap + val metricAggregationMethods = exec.metrics.map { m => + (m.accumulatorId, m.aggregateMethod) + }.toMap val liveStageMetrics = exec.stages.toSeq .flatMap { stageId => Option(stageMetrics.get(stageId)) } @@ -255,7 +258,8 @@ class SQLAppStatusListener( } val aggregatedMetrics = allMetrics.map { case (id, values) => - id -> SQLMetrics.stringValue(metricTypes(id), values, maxMetricsFromAllStages.getOrElse(id, + val aggMethod = metricAggregationMethods(id) + id -> aggMethod(values, maxMetricsFromAllStages.getOrElse(id, Array.empty[Long])) }.toMap diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index a90f37a80d52..8ad4e1e9ce39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -146,4 +146,5 @@ class SparkPlanGraphNodeWrapper( case class SQLPlanMetric( name: String, accumulatorId: Long, - metricType: String) + metricType: String, + aggregateMethod: (Array[Long], Array[Long]) => String) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 3b011301421f..97addc34a527 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -80,7 +80,8 @@ object SparkPlanGraph { planInfo.nodeName match { case name if name.startsWith("WholeStageCodegen") => val metrics = planInfo.metrics.map { metric => - SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType) + SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType, + metric.aggregateMethod) } val cluster = new SparkPlanGraphCluster( @@ -123,7 +124,8 @@ object SparkPlanGraph { edges += SparkPlanGraphEdge(node.id, parent.id) case name => val metrics = planInfo.metrics.map { metric => - SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType) + SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType, + metric.aggregateMethod) } val node = new SparkPlanGraphNode( nodeIdGenerator.getAndIncrement(), planInfo.nodeName, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index c09ff51ecaff..1e5b4dcd31f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.config.Status._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.SQLMetricInfo +import org.apache.spark.sql.execution.metric.{SQLMetricInfo, SQLMetrics} import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator, Utils} import org.apache.spark.util.kvstore.InMemoryStore @@ -60,7 +60,8 @@ object MetricsAggregationBenchmark extends BenchmarkBase { val store = new SQLAppStatusStore(kvstore, Some(listener)) val metrics = (0 until numMetrics).map { i => - new SQLMetricInfo(s"metric$i", i.toLong, "average") + new SQLMetricInfo(s"metric$i", i.toLong, "average", + SQLMetrics.defaultAggregateMethod("average")) } val planInfo = new SparkPlanInfo( diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index dbc33c47fed5..4ee5a19c119a 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.PrivateMethodTester import org.apache.spark.{JobExecutionStatus, SparkFunSuite} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphEdge, SparkPlanGraphNode, SQLExecutionUIData, SQLPlanMetric} object SqlResourceSuite { @@ -41,18 +42,19 @@ object SqlResourceSuite { val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map(1L -> Some(1L)) + val defaultAggregateMethod = SQLMetrics.defaultAggregateMethod("") val filterNode = new SparkPlanGraphNode(1, FILTER, "", - metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""))) + metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "", defaultAggregateMethod))) val nodes: Seq[SparkPlanGraphNode] = Seq( new SparkPlanGraphCluster(0, WHOLE_STAGE_CODEGEN_1, "", nodes = ArrayBuffer(filterNode), - metrics = Seq(SQLPlanMetric(DURATION, 0, ""))), + metrics = Seq(SQLPlanMetric(DURATION, 0, "", defaultAggregateMethod))), new SparkPlanGraphNode(2, SCAN_TEXT, "", metrics = Seq( - SQLPlanMetric(METADATA_TIME, 2, ""), - SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""), - SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""), - SQLPlanMetric(SIZE_OF_FILES_READ, 5, "")))) + SQLPlanMetric(METADATA_TIME, 2, "", defaultAggregateMethod), + SQLPlanMetric(NUMBER_OF_FILES_READ, 3, "", defaultAggregateMethod), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, "", defaultAggregateMethod), + SQLPlanMetric(SIZE_OF_FILES_READ, 5, "", defaultAggregateMethod)))) val edges: Seq[SparkPlanGraphEdge] = Seq(SparkPlanGraphEdge(3, 2)) @@ -60,12 +62,12 @@ object SqlResourceSuite { SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1) val metrics: Seq[SQLPlanMetric] = { - Seq(SQLPlanMetric(DURATION, 0, ""), - SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""), - SQLPlanMetric(METADATA_TIME, 2, ""), - SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""), - SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""), - SQLPlanMetric(SIZE_OF_FILES_READ, 5, "")) + Seq(SQLPlanMetric(DURATION, 0, "", defaultAggregateMethod), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "", defaultAggregateMethod), + SQLPlanMetric(METADATA_TIME, 2, "", defaultAggregateMethod), + SQLPlanMetric(NUMBER_OF_FILES_READ, 3, "", defaultAggregateMethod), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, "", defaultAggregateMethod), + SQLPlanMetric(SIZE_OF_FILES_READ, 5, "", defaultAggregateMethod)) } val sqlExecutionUIData: SQLExecutionUIData = { From d3bf283c1f0e0e7dfb54243c6d8bfc15e30082b6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 27 Mar 2021 15:44:51 -0700 Subject: [PATCH 03/14] Remove old CustomMetric. --- .../sql/connector/read/CustomMetric.java | 43 ------------------- 1 file changed, 43 deletions(-) delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/CustomMetric.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/CustomMetric.java deleted file mode 100644 index 0cc60262c3e6..000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/CustomMetric.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.read; - -import org.apache.spark.annotation.Evolving; - -/** - * A custom metric for {@link Scan}. - * - * @since 3.2.0 - */ -@Evolving -public interface CustomMetric { - /** - * Returns the name of custom metric. - */ - String getName(); - - /** - * Returns the description of custom metric. - */ - String getDescription(); - - /** - * Returns the value of custom metric. - */ - Long getValue(); -} From aedb965a2b7cef2027bf7857fa4b7c10f4e8d571 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 28 Mar 2021 00:40:29 -0700 Subject: [PATCH 04/14] update. --- .../datasources/v2/BatchScanExec.scala | 15 ++------- .../datasources/v2/ContinuousScanExec.scala | 14 ++------- .../datasources/v2/DataSourceRDD.scala | 31 +++++++++---------- .../datasources/v2/MicroBatchScanExec.scala | 15 ++------- .../continuous/ContinuousDataSourceRDD.scala | 19 ++++++------ 5 files changed, 29 insertions(+), 65 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index b7957662c412..f9e3e46d7428 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, Scan} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} /** * Physical plan node for scanning a batch of data from a data source v2. @@ -44,19 +44,8 @@ case class BatchScanExec( override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() - /** - * The callback function which is called when the output iterator of input RDD is consumed - * completely. - */ - private def onOutputCompletion(reader: PartitionReader[_]) = { - reader.currentMetricsValues.foreach { metric => - longMetric(metric.name()) += metric.value() - } - } - override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, - onOutputCompletion) + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, metrics) } override def doCanonicalize(): BatchScanExec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index 7e62c4bb3f5a..b6a30c37c731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, Scan} +import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, Offset} import org.apache.spark.sql.execution.streaming.continuous._ @@ -47,16 +47,6 @@ case class ContinuousScanExec( stream.createContinuousReaderFactory() } - /** - * The callback function which is called when the output iterator of input RDD is consumed - * completely. - */ - private def onOutputCompletion(reader: PartitionReader[_]) = { - reader.currentMetricsValues().foreach { metric => - longMetric(metric.name()) += metric.value() - } - } - override lazy val inputRDD: RDD[InternalRow] = { EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), @@ -69,6 +59,6 @@ case class ContinuousScanExec( partitions, schema, readerFactory.asInstanceOf[ContinuousPartitionReaderFactory], - onOutputCompletion) + metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 0d64a7a4a135..4d7274732777 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -26,8 +26,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.CompletionIterator class DataSourceRDDPartition(val index: Int, val inputPartition: InputPartition) extends Partition with Serializable @@ -39,7 +39,7 @@ class DataSourceRDD( @transient private val inputPartitions: Seq[InputPartition], partitionReaderFactory: PartitionReaderFactory, columnarReads: Boolean, - onCompletion: PartitionReader[_] => Unit = _ => {}) + sqlMetrics: Map[String, SQLMetric]) extends RDD[InternalRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { @@ -57,22 +57,14 @@ class DataSourceRDD( val inputPartition = castPartition(split).inputPartition val (iter, reader) = if (columnarReads) { val batchReader = partitionReaderFactory.createColumnarReader(inputPartition) - val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader)) - def completionFunction = { - onCompletion(batchReader) - } - val completionIterator = CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]]( - iter, completionFunction) - (completionIterator, batchReader) + val iter = new MetricsBatchIterator( + new PartitionIterator[ColumnarBatch](batchReader, sqlMetrics)) + (iter, batchReader) } else { val rowReader = partitionReaderFactory.createReader(inputPartition) - val iter = new MetricsRowIterator(new PartitionIterator[InternalRow](rowReader)) - def completionFunction = { - onCompletion(rowReader) - } - val completionIterator = CompletionIterator[InternalRow, Iterator[InternalRow]]( - iter, completionFunction) - (completionIterator, rowReader) + val iter = new MetricsRowIterator( + new PartitionIterator[InternalRow](rowReader, sqlMetrics)) + (iter, rowReader) } context.addTaskCompletionListener[Unit](_ => reader.close()) // TODO: SPARK-25083 remove the type erasure hack in data source scan @@ -84,7 +76,9 @@ class DataSourceRDD( } } -private class PartitionIterator[T](reader: PartitionReader[T]) extends Iterator[T] { +private class PartitionIterator[T]( + reader: PartitionReader[T], + sqlMetrics: Map[String, SQLMetric]) extends Iterator[T] { private[this] var valuePrepared = false override def hasNext: Boolean = { @@ -98,6 +92,9 @@ private class PartitionIterator[T](reader: PartitionReader[T]) extends Iterator[ if (!hasNext) { throw QueryExecutionErrors.endOfStreamError() } + reader.currentMetricsValues.foreach { metric => + sqlMetrics(metric.name()) += metric.value() + } valuePrepared = false reader.get() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index 2ae21d565a9f..1a9e75158b60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, Scan} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} /** @@ -45,18 +45,7 @@ case class MicroBatchScanExec( override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() - /** - * The callback function which is called when the output iterator of input RDD is consumed - * completely. - */ - private def onOutputCompletion(reader: PartitionReader[_]) = { - reader.currentMetricsValues().foreach { metric => - longMetric(metric.name()) += metric.value() - } - } - override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, - onOutputCompletion) + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala index b30311c507a8..6fe50a09b380 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} +import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType -import org.apache.spark.util.{CompletionIterator, NextIterator} +import org.apache.spark.util.NextIterator class ContinuousDataSourceRDDPartition( val index: Int, @@ -53,7 +54,7 @@ class ContinuousDataSourceRDD( private val inputPartitions: Seq[InputPartition], schema: StructType, partitionReaderFactory: ContinuousPartitionReaderFactory, - onCompletion: PartitionReader[_] => Unit = _ => {}) + sqlMetrics: Map[String, SQLMetric]) extends RDD[InternalRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { @@ -89,8 +90,12 @@ class ContinuousDataSourceRDD( partition.queueReader } - val nextIter = new NextIterator[InternalRow] { + val partitionReader = readerForPartition.getPartitionReader() + new NextIterator[InternalRow] { override def getNext(): InternalRow = { + partitionReader.currentMetricsValues.foreach { metric => + sqlMetrics(metric.name()) += metric.value() + } readerForPartition.next() match { case null => finished = true @@ -101,12 +106,6 @@ class ContinuousDataSourceRDD( override def close(): Unit = {} } - - def completionFunction = { - onCompletion(readerForPartition.getPartitionReader()) - } - CompletionIterator[InternalRow, Iterator[InternalRow]]( - nextIter, completionFunction) } override def getPreferredLocations(split: Partition): Seq[String] = { From 918c90da4c62419f57674cbee707628e0c65f8a6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 28 Mar 2021 00:45:27 -0700 Subject: [PATCH 05/14] Only pass custom sql metrics. --- .../sql/execution/datasources/v2/BatchScanExec.scala | 2 +- .../execution/datasources/v2/ContinuousScanExec.scala | 2 +- .../sql/execution/datasources/v2/DataSourceRDD.scala | 10 +++++----- .../datasources/v2/DataSourceV2ScanExecBase.scala | 7 ++++--- .../execution/datasources/v2/MicroBatchScanExec.scala | 2 +- .../streaming/continuous/ContinuousDataSourceRDD.scala | 4 ++-- 6 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index f9e3e46d7428..1987c9e63a64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -45,7 +45,7 @@ case class BatchScanExec( override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, metrics) + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics) } override def doCanonicalize(): BatchScanExec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index b6a30c37c731..fea89c581e18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -59,6 +59,6 @@ case class ContinuousScanExec( partitions, schema, readerFactory.asInstanceOf[ContinuousPartitionReaderFactory], - metrics) + customMetrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 4d7274732777..937de5dcf283 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -39,7 +39,7 @@ class DataSourceRDD( @transient private val inputPartitions: Seq[InputPartition], partitionReaderFactory: PartitionReaderFactory, columnarReads: Boolean, - sqlMetrics: Map[String, SQLMetric]) + customMetrics: Map[String, SQLMetric]) extends RDD[InternalRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { @@ -58,12 +58,12 @@ class DataSourceRDD( val (iter, reader) = if (columnarReads) { val batchReader = partitionReaderFactory.createColumnarReader(inputPartition) val iter = new MetricsBatchIterator( - new PartitionIterator[ColumnarBatch](batchReader, sqlMetrics)) + new PartitionIterator[ColumnarBatch](batchReader, customMetrics)) (iter, batchReader) } else { val rowReader = partitionReaderFactory.createReader(inputPartition) val iter = new MetricsRowIterator( - new PartitionIterator[InternalRow](rowReader, sqlMetrics)) + new PartitionIterator[InternalRow](rowReader, customMetrics)) (iter, rowReader) } context.addTaskCompletionListener[Unit](_ => reader.close()) @@ -78,7 +78,7 @@ class DataSourceRDD( private class PartitionIterator[T]( reader: PartitionReader[T], - sqlMetrics: Map[String, SQLMetric]) extends Iterator[T] { + customMetrics: Map[String, SQLMetric]) extends Iterator[T] { private[this] var valuePrepared = false override def hasNext: Boolean = { @@ -93,7 +93,7 @@ private class PartitionIterator[T]( throw QueryExecutionErrors.endOfStreamError() } reader.currentMetricsValues.foreach { metric => - sqlMetrics(metric.name()) += metric.value() + customMetrics(metric.name()) += metric.value() } valuePrepared = false reader.get() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 143d2019c402..512e7895c581 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -32,10 +32,11 @@ import org.apache.spark.util.Utils trait DataSourceV2ScanExecBase extends LeafExecNode { + val customMetrics = scan.supportedCustomMetrics().map { customMetric => + customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) + }.toMap + override lazy val metrics = { - val customMetrics = scan.supportedCustomMetrics().map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - } Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++ customMetrics } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index 1a9e75158b60..1430a32c8e81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -46,6 +46,6 @@ case class MicroBatchScanExec( override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, metrics) + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala index 6fe50a09b380..b9473dfdb02b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -54,7 +54,7 @@ class ContinuousDataSourceRDD( private val inputPartitions: Seq[InputPartition], schema: StructType, partitionReaderFactory: ContinuousPartitionReaderFactory, - sqlMetrics: Map[String, SQLMetric]) + customMetrics: Map[String, SQLMetric]) extends RDD[InternalRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { @@ -94,7 +94,7 @@ class ContinuousDataSourceRDD( new NextIterator[InternalRow] { override def getNext(): InternalRow = { partitionReader.currentMetricsValues.foreach { metric => - sqlMetrics(metric.name()) += metric.value() + customMetrics(metric.name()) += metric.value() } readerForPartition.next() match { case null => From e8576ec04596ea962177a6a18818149e332a9b01 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 30 Mar 2021 11:51:21 -0700 Subject: [PATCH 06/14] Add test. --- .../spark/sql/connector/CustomMetric.java | 4 +- .../datasources/v2/DataSourceRDD.scala | 2 +- .../continuous/ContinuousDataSourceRDD.scala | 2 +- .../execution/ui/SQLAppStatusListener.scala | 8 ++-- .../sql/connector/DataSourceV2Suite.scala | 48 +++++++++++++++++++ .../ui/SQLAppStatusListenerSuite.scala | 39 +++++++++++++++ 6 files changed, 96 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java index bbd35ac94677..8ccb0a67def1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.connector; +import java.io.Serializable; + import org.apache.spark.annotation.Evolving; /** @@ -28,7 +30,7 @@ * @since 3.2.0 */ @Evolving -public interface CustomMetric { +public interface CustomMetric extends Serializable { /** * Returns the name of custom metric. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 937de5dcf283..f8bf0d7f93d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -93,7 +93,7 @@ private class PartitionIterator[T]( throw QueryExecutionErrors.endOfStreamError() } reader.currentMetricsValues.foreach { metric => - customMetrics(metric.name()) += metric.value() + customMetrics(metric.name()).set(metric.value()) } valuePrepared = false reader.get() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala index b9473dfdb02b..4e32cefbe31a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -94,7 +94,7 @@ class ContinuousDataSourceRDD( new NextIterator[InternalRow] { override def getNext(): InternalRow = { partitionReader.currentMetricsValues.foreach { metric => - customMetrics(metric.name()) += metric.value() + customMetrics(metric.name()).set(metric.value()) } readerForPartition.next() match { case null => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 60c4b59ef459..11d54078eb98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -198,7 +198,7 @@ class SQLAppStatusListener( } private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { - val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap + val accumIds = exec.metrics.map(_.accumulatorId).toSet val metricAggregationMethods = exec.metrics.map { m => (m.accumulatorId, m.aggregateMethod) }.toMap @@ -214,7 +214,7 @@ class SQLAppStatusListener( val maxMetricsFromAllStages = new mutable.HashMap[Long, Array[Long]]() - taskMetrics.filter(m => metricTypes.contains(m._1)).foreach { case (id, values) => + taskMetrics.filter(m => accumIds.contains(m._1)).foreach { case (id, values) => val prev = allMetrics.getOrElse(id, null) val updated = if (prev != null) { prev ++ values @@ -225,7 +225,7 @@ class SQLAppStatusListener( } // Find the max for each metric id between all stages. - val validMaxMetrics = maxMetrics.filter(m => metricTypes.contains(m._1)) + val validMaxMetrics = maxMetrics.filter(m => accumIds.contains(m._1)) validMaxMetrics.foreach { case (id, value, taskId, stageId, attemptId) => val updated = maxMetricsFromAllStages.getOrElse(id, Array(value, stageId, attemptId, taskId)) if (value > updated(0)) { @@ -238,7 +238,7 @@ class SQLAppStatusListener( } exec.driverAccumUpdates.foreach { case (id, value) => - if (metricTypes.contains(id)) { + if (accumIds.contains(id)) { val prev = allMetrics.getOrElse(id, null) val updated = if (prev != null) { // If the driver updates same metrics as tasks and has higher value then remove diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 49a107880055..077eb694fde9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -818,3 +818,51 @@ class ReportStatisticsDataSource extends SimpleWritableDataSource { } } } + + +class SimpleCustomMetric extends CustomMetric { + override def name(): String = "custom_metric" + override def description(): String = "a simple custom metric" + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + s"custom_metric: ${taskMetrics.mkString(", ")}" + } +} + +// The followings are for custom metrics of V2 data source. +object CustomMetricReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val RangeInputPartition(start, end) = partition + new PartitionReader[InternalRow] { + private var current = start - 1 + + override def next(): Boolean = { + current += 1 + current < end + } + + override def get(): InternalRow = InternalRow(current, -current) + + override def close(): Unit = {} + + override def currentMetricsValues(): Array[CustomTaskMetric] = { + val metric = new CustomTaskMetric { + override def name(): String = "custom_metric" + override def value(): Long = 12345 + } + Array(metric) + } + } + } +} + +class CustomMetricScanBuilder extends SimpleScanBuilder { + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + override def supportedCustomMetrics(): Array[CustomMetric] = { + Array(new SimpleCustomMetric) + } + + override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 00f23718a0e9..04c00dd85c52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -37,13 +37,16 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.connector.CustomMetricScanBuilder import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.functions.count import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} import org.apache.spark.util.kvstore.InMemoryStore @@ -811,6 +814,42 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) } + + + test("SPARK-34338: Report metrics from Datasource v2 scan") { + val statusStore = spark.sharedState.statusStore + val oldCount = statusStore.executionsList().size + + val schema = new StructType().add("i", "int").add("j", "int") + val physicalPlan = BatchScanExec(schema.toAttributes, new CustomMetricScanBuilder()) + val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) { + override lazy val sparkPlan = physicalPlan + override lazy val executedPlan = physicalPlan + } + + SQLExecution.withNewExecutionId(dummyQueryExecution) { + physicalPlan.execute().collect() + } + + // Wait until the new execution is started and being tracked. + while (statusStore.executionsCount() < oldCount) { + Thread.sleep(100) + } + + // Wait for listener to finish computing the metrics for the execution. + while (statusStore.executionsList().isEmpty || + statusStore.executionsList().last.metricValues == null) { + Thread.sleep(100) + } + + val execId = statusStore.executionsList().last.executionId + val metrics = statusStore.executionMetrics(execId) + val expectedMetric = physicalPlan.metrics("custom_metric") + val expectedValue = "custom_metric: 12345, 12345" + + assert(metrics.contains(expectedMetric.id)) + assert(metrics(expectedMetric.id) === expectedValue) + } } From d5d867880ebb57c49ac422251ba50bbabf1159d1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 30 Mar 2021 16:02:37 -0700 Subject: [PATCH 07/14] Add note to API doc and use lazy val for custom metrics. --- .../org/apache/spark/sql/connector/read/PartitionReader.java | 3 ++- .../execution/datasources/v2/DataSourceV2ScanExecBase.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index dfecb77c669b..33725f33b22f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -51,7 +51,8 @@ public interface PartitionReader extends Closeable { T get(); /** - * Returns an array of custom task metrics. By default it returns empty array. + * Returns an array of custom task metrics. By default it returns empty array. Note that it is + * not recommended to put heavy logic in this method as it may affect reading performance. */ default CustomTaskMetric[] currentMetricsValues() { CustomTaskMetric[] NO_METRICS = {}; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 512e7895c581..1248f89b2bdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils trait DataSourceV2ScanExecBase extends LeafExecNode { - val customMetrics = scan.supportedCustomMetrics().map { customMetric => + lazy val customMetrics = scan.supportedCustomMetrics().map { customMetric => customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) }.toMap From cf05fb74c03fc271b67a65dff75dc6a62d27c295 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 14 Apr 2021 00:44:28 -0700 Subject: [PATCH 08/14] Record and parse metric type for custom metric. --- .../spark/sql/connector/CustomMetric.java | 4 +- .../spark/sql/execution/SparkPlanInfo.scala | 3 +- .../adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../sql/execution/metric/CustomMetrics.scala | 71 +++++++++++++++++++ .../sql/execution/metric/SQLMetricInfo.scala | 3 +- .../sql/execution/metric/SQLMetrics.scala | 36 +++------- .../execution/ui/SQLAppStatusListener.scala | 16 ++++- .../sql/execution/ui/SQLAppStatusStore.scala | 3 +- .../ui/MetricsAggregationBenchmark.scala | 5 +- .../status/api/v1/sql/SqlResourceSuite.scala | 26 ++++--- 10 files changed, 113 insertions(+), 56 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java index 8ccb0a67def1..bbd35ac94677 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java @@ -17,8 +17,6 @@ package org.apache.spark.sql.connector; -import java.io.Serializable; - import org.apache.spark.annotation.Evolving; /** @@ -30,7 +28,7 @@ * @since 3.2.0 */ @Evolving -public interface CustomMetric extends Serializable { +public interface CustomMetric { /** * Returns the name of custom metric. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index ac7a59b9c348..db587dd98685 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -61,8 +61,7 @@ private[execution] object SparkPlanInfo { case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => - new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType, - metric.aggregateMethod) + new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } // dump the file scan metadata (e.g file path) to event log diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index ca24f5060a9a..30020d50fdd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -628,7 +628,7 @@ case class AdaptiveSparkPlanExec( // and display SQL metrics correctly. val newMetrics = newSubPlans.flatMap { p => p.flatMap(_.metrics.values.map(m => - SQLPlanMetric(m.name.get, m.id, m.metricType, m.aggregateMethod))) + SQLPlanMetric(m.name.get, m.id, m.metricType))) } context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates( executionId.toLong, newMetrics)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala new file mode 100644 index 000000000000..fbfb5f8a2aca --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.metric + +import org.apache.spark.sql.connector.CustomMetric + +object CustomMetrics { + private val V2_CUSTOM = "v2Custom" + + /** + * Given a class name, builds and returns a metric type for a V2 custom metric class + * `CustomMetric`. + */ + def buildV2CustomMetricTypeName(customMetric: CustomMetric): String = { + s"${V2_CUSTOM}_${customMetric.getClass.getCanonicalName}" + } + + /** + * Given a V2 custom metric type name, this method parses it and returns the corresponding + * `CustomMetric` class name. + */ + def parseV2CustomMetricType(metricType: String): String = { + val className = metricType.stripPrefix(s"${V2_CUSTOM}_") + + if (className == metricType) { + throw new IllegalStateException(s"Metric type $metricType is not a V2 custom metric type.") + } else { + className + } + } +} + +class CustomSumMetric extends CustomMetric { + override def name(): String = "CustomSumMetric" + + override def description(): String = "Sum up CustomMetric" + + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + taskMetrics.sum.toString + } +} + +class CustomAvgMetric extends CustomMetric { + override def name(): String = "CustomAvgMetric" + + override def description(): String = "Average CustomMetric" + + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + val average = if (taskMetrics.isEmpty) { + 0L + } else { + taskMetrics.sum / taskMetrics.length + } + average.toString + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala index 869b654ee82e..adb81519dbc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala @@ -27,5 +27,4 @@ import org.apache.spark.annotation.DeveloperApi class SQLMetricInfo( val name: String, val accumulatorId: Long, - val metricType: String, - val aggregateMethod: (Array[Long], Array[Long]) => String) + val metricType: String) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 95722948f987..72254c2d661b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -34,10 +34,7 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils} * the executor side are automatically propagated and shown in the SQL UI through metrics. Updates * on the driver side must be explicitly posted using [[SQLMetrics.postDriverMetricUpdates()]]. */ -class SQLMetric( - val metricType: String, - initValue: Long = 0L, - val aggregateMethod: (Array[Long], Array[Long]) => String) extends AccumulatorV2[Long, Long] { +class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] { // This is a workaround for SPARK-11013. // We may use -1 as initial value of the accumulator, if the accumulator is valid, we will // update it at the end of task and the value will be at least 0. Then we can filter out the -1 @@ -46,7 +43,7 @@ class SQLMetric( private var _zeroValue = initValue override def copy(): SQLMetric = { - val newAcc = new SQLMetric(metricType, _value, aggregateMethod = aggregateMethod) + val newAcc = new SQLMetric(metricType, _value) newAcc._zeroValue = initValue newAcc } @@ -91,21 +88,9 @@ object SQLMetrics { private val TIMING_METRIC = "timing" private val NS_TIMING_METRIC = "nsTiming" private val AVERAGE_METRIC = "average" - private val V2_CUSTOM = "v2Custom" private val baseForAvgMetric: Int = 10 - // For built-in SQLMetrics, we use default aggregation method. - def defaultAggregateMethod( - metricType: String): (Array[Long], Array[Long]) => String = { - SQLMetrics.stringValue(metricType, _, _) - } - - // For DS V2 custom metrics, the aggregation method is custimized. - private def customAggregateMethod( - aggregator: (Array[Long]) => String): (Array[Long], Array[Long]) => String = { - (metrics, _) => aggregator(metrics) - } /** * Converts a double value to long value by multiplying a base integer, so we can store it in * `SQLMetrics`. It only works for average metrics. When showing the metrics on UI, we restore @@ -118,7 +103,7 @@ object SQLMetrics { } def createMetric(sc: SparkContext, name: String): SQLMetric = { - val acc = new SQLMetric(SUM_METRIC, aggregateMethod = defaultAggregateMethod(SUM_METRIC)) + val acc = new SQLMetric(SUM_METRIC) acc.register(sc, name = Some(name), countFailedValues = false) acc } @@ -127,8 +112,7 @@ object SQLMetrics { * Create a metric to report data source v2 custom metric. */ def createV2CustomMetric(sc: SparkContext, customMetric: CustomMetric): SQLMetric = { - val acc = new SQLMetric(V2_CUSTOM, - aggregateMethod = customAggregateMethod(customMetric.aggregateTaskMetrics)) + val acc = new SQLMetric(CustomMetrics.buildV2CustomMetricTypeName(customMetric)) acc.register(sc, name = Some(customMetric.name()), countFailedValues = false) acc } @@ -141,8 +125,7 @@ object SQLMetrics { // The final result of this metric in physical operator UI may look like: // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) - val acc = new SQLMetric(SIZE_METRIC, -1, - aggregateMethod = defaultAggregateMethod(SIZE_METRIC)) + val acc = new SQLMetric(SIZE_METRIC, -1) acc.register(sc, name = Some(name), countFailedValues = false) acc } @@ -151,16 +134,14 @@ object SQLMetrics { // The final result of this metric in physical operator UI may looks like: // duration total (min, med, max): // 5s (800ms, 1s, 2s) - val acc = new SQLMetric(TIMING_METRIC, -1, - aggregateMethod = defaultAggregateMethod(TIMING_METRIC)) + val acc = new SQLMetric(TIMING_METRIC, -1) acc.register(sc, name = Some(name), countFailedValues = false) acc } def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just normalize the unit of time to millisecond. - val acc = new SQLMetric(NS_TIMING_METRIC, -1, - aggregateMethod = defaultAggregateMethod(NS_TIMING_METRIC)) + val acc = new SQLMetric(NS_TIMING_METRIC, -1) acc.register(sc, name = Some(name), countFailedValues = false) acc } @@ -175,8 +156,7 @@ object SQLMetrics { // The final result of this metric in physical operator UI may looks like: // probe avg (min, med, max): // (1.2, 2.2, 6.3) - val acc = new SQLMetric(AVERAGE_METRIC, - aggregateMethod = defaultAggregateMethod(AVERAGE_METRIC)) + val acc = new SQLMetric(AVERAGE_METRIC) acc.register(sc, name = Some(name), countFailedValues = false) acc } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 11d54078eb98..c73b42bc058a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -27,10 +27,12 @@ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status._ import org.apache.spark.scheduler._ +import org.apache.spark.sql.connector.CustomMetric import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} +import org.apache.spark.util.Utils import org.apache.spark.util.collection.OpenHashMap class SQLAppStatusListener( @@ -199,8 +201,20 @@ class SQLAppStatusListener( private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { val accumIds = exec.metrics.map(_.accumulatorId).toSet + + val metricAggregationMap = new mutable.HashMap[String, CustomMetric]() val metricAggregationMethods = exec.metrics.map { m => - (m.accumulatorId, m.aggregateMethod) + val className = CustomMetrics.parseV2CustomMetricType(m.metricType) + val customMetric = if (metricAggregationMap.contains(className)) { + metricAggregationMap(className) + } else { + // Try to initiate custom metric object + val metric = Utils.loadExtensions(classOf[CustomMetric], Seq(className), conf).head + metricAggregationMap.put(className, metric) + metric + } + (m.accumulatorId, + (metrics: Array[Long], _: Array[Long]) => customMetric.aggregateTaskMetrics(metrics)) }.toMap val liveStageMetrics = exec.stages.toSeq diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 8ad4e1e9ce39..a90f37a80d52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -146,5 +146,4 @@ class SparkPlanGraphNodeWrapper( case class SQLPlanMetric( name: String, accumulatorId: Long, - metricType: String, - aggregateMethod: (Array[Long], Array[Long]) => String) + metricType: String) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index 1e5b4dcd31f8..c09ff51ecaff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.config.Status._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.{SQLMetricInfo, SQLMetrics} +import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator, Utils} import org.apache.spark.util.kvstore.InMemoryStore @@ -60,8 +60,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase { val store = new SQLAppStatusStore(kvstore, Some(listener)) val metrics = (0 until numMetrics).map { i => - new SQLMetricInfo(s"metric$i", i.toLong, "average", - SQLMetrics.defaultAggregateMethod("average")) + new SQLMetricInfo(s"metric$i", i.toLong, "average") } val planInfo = new SparkPlanInfo( diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index 4ee5a19c119a..dbc33c47fed5 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.PrivateMethodTester import org.apache.spark.{JobExecutionStatus, SparkFunSuite} -import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphEdge, SparkPlanGraphNode, SQLExecutionUIData, SQLPlanMetric} object SqlResourceSuite { @@ -42,19 +41,18 @@ object SqlResourceSuite { val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map(1L -> Some(1L)) - val defaultAggregateMethod = SQLMetrics.defaultAggregateMethod("") val filterNode = new SparkPlanGraphNode(1, FILTER, "", - metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "", defaultAggregateMethod))) + metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""))) val nodes: Seq[SparkPlanGraphNode] = Seq( new SparkPlanGraphCluster(0, WHOLE_STAGE_CODEGEN_1, "", nodes = ArrayBuffer(filterNode), - metrics = Seq(SQLPlanMetric(DURATION, 0, "", defaultAggregateMethod))), + metrics = Seq(SQLPlanMetric(DURATION, 0, ""))), new SparkPlanGraphNode(2, SCAN_TEXT, "", metrics = Seq( - SQLPlanMetric(METADATA_TIME, 2, "", defaultAggregateMethod), - SQLPlanMetric(NUMBER_OF_FILES_READ, 3, "", defaultAggregateMethod), - SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, "", defaultAggregateMethod), - SQLPlanMetric(SIZE_OF_FILES_READ, 5, "", defaultAggregateMethod)))) + SQLPlanMetric(METADATA_TIME, 2, ""), + SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""), + SQLPlanMetric(SIZE_OF_FILES_READ, 5, "")))) val edges: Seq[SparkPlanGraphEdge] = Seq(SparkPlanGraphEdge(3, 2)) @@ -62,12 +60,12 @@ object SqlResourceSuite { SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1) val metrics: Seq[SQLPlanMetric] = { - Seq(SQLPlanMetric(DURATION, 0, "", defaultAggregateMethod), - SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "", defaultAggregateMethod), - SQLPlanMetric(METADATA_TIME, 2, "", defaultAggregateMethod), - SQLPlanMetric(NUMBER_OF_FILES_READ, 3, "", defaultAggregateMethod), - SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, "", defaultAggregateMethod), - SQLPlanMetric(SIZE_OF_FILES_READ, 5, "", defaultAggregateMethod)) + Seq(SQLPlanMetric(DURATION, 0, ""), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""), + SQLPlanMetric(METADATA_TIME, 2, ""), + SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""), + SQLPlanMetric(SIZE_OF_FILES_READ, 5, "")) } val sqlExecutionUIData: SQLExecutionUIData = { From a50bf40194a98916c3fbeb1b864e6eb0fac7797e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 14 Apr 2021 18:29:22 -0700 Subject: [PATCH 09/14] Fix custom metric initialization. --- .../sql/execution/metric/CustomMetrics.scala | 6 +-- .../execution/ui/SQLAppStatusListener.scala | 40 +++++++++++++------ .../sql/execution/ui/SparkPlanGraph.scala | 6 +-- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index fbfb5f8a2aca..77bcd6335137 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -34,13 +34,13 @@ object CustomMetrics { * Given a V2 custom metric type name, this method parses it and returns the corresponding * `CustomMetric` class name. */ - def parseV2CustomMetricType(metricType: String): String = { + def parseV2CustomMetricType(metricType: String): Option[String] = { val className = metricType.stripPrefix(s"${V2_CUSTOM}_") if (className == metricType) { - throw new IllegalStateException(s"Metric type $metricType is not a V2 custom metric type.") + None } else { - className + Some(className) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 4762a0541c39..4d5e02024316 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.util.control.NonFatal import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.internal.Logging @@ -203,19 +204,34 @@ class SQLAppStatusListener( private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { val accumIds = exec.metrics.map(_.accumulatorId).toSet - val metricAggregationMap = new mutable.HashMap[String, CustomMetric]() + val metricAggregationMap = new mutable.HashMap[String, (Array[Long], Array[Long]) => String]() val metricAggregationMethods = exec.metrics.map { m => - val className = CustomMetrics.parseV2CustomMetricType(m.metricType) - val customMetric = if (metricAggregationMap.contains(className)) { - metricAggregationMap(className) - } else { - // Try to initiate custom metric object - val metric = Utils.loadExtensions(classOf[CustomMetric], Seq(className), conf).head - metricAggregationMap.put(className, metric) - metric - } - (m.accumulatorId, - (metrics: Array[Long], _: Array[Long]) => customMetric.aggregateTaskMetrics(metrics)) + val optClassName = CustomMetrics.parseV2CustomMetricType(m.metricType) + val metricAggMethod = optClassName.map { className => + if (metricAggregationMap.contains(className)) { + metricAggregationMap(className) + } else { + // Try to initiate custom metric object + try { + val metric = Utils.loadExtensions(classOf[CustomMetric], Seq(className), conf).head + val method = + (metrics: Array[Long], _: Array[Long]) => metric.aggregateTaskMetrics(metrics) + metricAggregationMap.put(className, method) + method + } catch { + case NonFatal(_) => + // Cannot initiaize custom metric object, we might be in history server that does + // not have the custom metric class. + val defaultMethod = (_: Array[Long], _: Array[Long]) => "N/A" + metricAggregationMap.put(className, defaultMethod) + defaultMethod + } + } + }.getOrElse( + // Built-in SQLMetric + SQLMetrics.stringValue(m.metricType, _, _) + ) + (m.accumulatorId, metricAggMethod) }.toMap val liveStageMetrics = exec.stages.toSeq diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 97addc34a527..3b011301421f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -80,8 +80,7 @@ object SparkPlanGraph { planInfo.nodeName match { case name if name.startsWith("WholeStageCodegen") => val metrics = planInfo.metrics.map { metric => - SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType, - metric.aggregateMethod) + SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType) } val cluster = new SparkPlanGraphCluster( @@ -124,8 +123,7 @@ object SparkPlanGraph { edges += SparkPlanGraphEdge(node.id, parent.id) case name => val metrics = planInfo.metrics.map { metric => - SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType, - metric.aggregateMethod) + SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType) } val node = new SparkPlanGraphNode( nodeIdGenerator.getAndIncrement(), planInfo.nodeName, From 7d500275d7b096710df10df1e100a0ab292b299e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 1 Apr 2021 13:36:11 -0700 Subject: [PATCH 10/14] add simple ds v2. --- .../v2/CustomMetricDataSourceV2.scala | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CustomMetricDataSourceV2.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CustomMetricDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CustomMetricDataSourceV2.scala new file mode 100644 index 000000000000..7d3b7eca5adf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CustomMetricDataSourceV2.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.{CustomMetric, CustomTaskMetric} +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider} +import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class RangeInputPartition(start: Int, end: Int) extends InputPartition + +trait TestingV2Source extends TableProvider { + override def inferSchema(options: CaseInsensitiveStringMap): StructType = { + TestingV2Source.schema + } + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + getTable(new CaseInsensitiveStringMap(properties)) + } + + def getTable(options: CaseInsensitiveStringMap): Table +} + +abstract class SimpleBatchTable extends Table with SupportsRead { + + override def schema(): StructType = TestingV2Source.schema + override def name(): String = this.getClass.toString + override def capabilities(): util.Set[TableCapability] = Set(BATCH_READ).asJava +} + +object TestingV2Source { + val schema = new StructType().add("i", "int").add("j", "int") +} + +object SimpleReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val RangeInputPartition(start, end) = partition + new PartitionReader[InternalRow] { + private var current = start - 1 + + override def next(): Boolean = { + current += 1 + current < end + } + + override def get(): InternalRow = InternalRow(current, -current) + + override def close(): Unit = {} + } + } +} + +abstract class SimpleScanBuilder extends ScanBuilder + with Batch with Scan { + + override def build(): Scan = this + + override def toBatch: Batch = this + + override def readSchema(): StructType = TestingV2Source.schema + + override def createReaderFactory(): PartitionReaderFactory = SimpleReaderFactory +} + +class SimpleCustomMetric extends CustomMetric { + override def name(): String = "custom_metric" + override def description(): String = "a simple custom metric" + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + s"custom_metric: ${taskMetrics.mkString(", ")}" + } +} + +// The followings are for custom metrics of V2 data source. +object CustomMetricReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val RangeInputPartition(start, end) = partition + new PartitionReader[InternalRow] { + private var current = start - 1 + + override def next(): Boolean = { + current += 1 + current < end + } + + override def get(): InternalRow = InternalRow(current, -current) + + override def close(): Unit = {} + + override def currentMetricsValues(): Array[CustomTaskMetric] = { + val metric = new CustomTaskMetric { + override def name(): String = "custom_metric" + override def value(): Long = 12345 + } + Array(metric) + } + } + } +} + +class CustomMetricScanBuilder extends SimpleScanBuilder { + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + override def supportedCustomMetrics(): Array[CustomMetric] = { + Array(new SimpleCustomMetric) + } + + override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory +} + +class CustomMetricDataSourceV2 extends TestingV2Source { + + override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new CustomMetricScanBuilder() + } + } +} From b5be5ba2abfc5022ff8c6b72e598c9361fbb1323 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 15 Apr 2021 00:02:17 -0700 Subject: [PATCH 11/14] Add tests. --- .../sql/execution/metric/CustomMetrics.scala | 8 ++- .../execution/metric/CustomMetricsSuite.scala | 55 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index 77bcd6335137..72fcb83061e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.metric import org.apache.spark.sql.connector.CustomMetric object CustomMetrics { - private val V2_CUSTOM = "v2Custom" + private[spark] val V2_CUSTOM = "v2Custom" /** * Given a class name, builds and returns a metric type for a V2 custom metric class @@ -45,6 +45,9 @@ object CustomMetrics { } } +/** + * Built-in `CustomMetric` that sums up metric values. + */ class CustomSumMetric extends CustomMetric { override def name(): String = "CustomSumMetric" @@ -55,6 +58,9 @@ class CustomSumMetric extends CustomMetric { } } +/** + * Built-in `CustomMetric` that computes average of metric values. + */ class CustomAvgMetric extends CustomMetric { override def name(): String = "CustomAvgMetric" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala new file mode 100644 index 000000000000..a0cffef1e498 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.metric + +import org.apache.spark.SparkFunSuite + +class CustomMetricsSuite extends SparkFunSuite { + + test("Build/parse custom metric metric type") { + Seq(new CustomSumMetric, new CustomAvgMetric).foreach { customMetric => + val metricType = CustomMetrics.buildV2CustomMetricTypeName(customMetric) + + assert(metricType == CustomMetrics.V2_CUSTOM + "_" + customMetric.getClass.getCanonicalName) + assert(CustomMetrics.parseV2CustomMetricType(metricType).isDefined) + assert(CustomMetrics.parseV2CustomMetricType(metricType).get == + customMetric.getClass.getCanonicalName) + } + } + + test("Built-in CustomSumMetric") { + val metric = new CustomSumMetric + + val metricValues1 = Array(0L, 1L, 5L, 5L, 7L, 10L) + assert(metric.aggregateTaskMetrics(metricValues1) == metricValues1.sum.toString) + + val metricValues2 = Array.empty[Long] + assert(metric.aggregateTaskMetrics(metricValues2) == "0") + } + + test("Built-in CustomAvgMetric") { + val metric = new CustomAvgMetric + + val metricValues1 = Array(0L, 1L, 5L, 5L, 7L, 10L) + assert(metric.aggregateTaskMetrics(metricValues1) == + (metricValues1.sum / metricValues1.length).toString) + + val metricValues2 = Array.empty[Long] + assert(metric.aggregateTaskMetrics(metricValues2) == "0") + } +} From 0f38782bfeae0f55bb662780d66153270b5ac40d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 15 Apr 2021 10:04:12 -0700 Subject: [PATCH 12/14] Remove CustomMetricDataSourceV2. --- .../adaptive/AdaptiveSparkPlanExec.scala | 3 +- .../v2/CustomMetricDataSourceV2.scala | 144 ------------------ .../execution/ui/SQLAppStatusListener.scala | 3 +- 3 files changed, 2 insertions(+), 148 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CustomMetricDataSourceV2.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 30020d50fdd0..fba32a4dd1da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -627,8 +627,7 @@ case class AdaptiveSparkPlanExec( // of the new plan nodes, so that it can track the valid accumulator updates later // and display SQL metrics correctly. val newMetrics = newSubPlans.flatMap { p => - p.flatMap(_.metrics.values.map(m => - SQLPlanMetric(m.name.get, m.id, m.metricType))) + p.flatMap(_.metrics.values.map(m => SQLPlanMetric(m.name.get, m.id, m.metricType))) } context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates( executionId.toLong, newMetrics)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CustomMetricDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CustomMetricDataSourceV2.scala deleted file mode 100644 index 7d3b7eca5adf..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CustomMetricDataSourceV2.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.v2 - -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.{CustomMetric, CustomTaskMetric} -import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider} -import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ -import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap - -case class RangeInputPartition(start: Int, end: Int) extends InputPartition - -trait TestingV2Source extends TableProvider { - override def inferSchema(options: CaseInsensitiveStringMap): StructType = { - TestingV2Source.schema - } - - override def getTable( - schema: StructType, - partitioning: Array[Transform], - properties: util.Map[String, String]): Table = { - getTable(new CaseInsensitiveStringMap(properties)) - } - - def getTable(options: CaseInsensitiveStringMap): Table -} - -abstract class SimpleBatchTable extends Table with SupportsRead { - - override def schema(): StructType = TestingV2Source.schema - override def name(): String = this.getClass.toString - override def capabilities(): util.Set[TableCapability] = Set(BATCH_READ).asJava -} - -object TestingV2Source { - val schema = new StructType().add("i", "int").add("j", "int") -} - -object SimpleReaderFactory extends PartitionReaderFactory { - override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - val RangeInputPartition(start, end) = partition - new PartitionReader[InternalRow] { - private var current = start - 1 - - override def next(): Boolean = { - current += 1 - current < end - } - - override def get(): InternalRow = InternalRow(current, -current) - - override def close(): Unit = {} - } - } -} - -abstract class SimpleScanBuilder extends ScanBuilder - with Batch with Scan { - - override def build(): Scan = this - - override def toBatch: Batch = this - - override def readSchema(): StructType = TestingV2Source.schema - - override def createReaderFactory(): PartitionReaderFactory = SimpleReaderFactory -} - -class SimpleCustomMetric extends CustomMetric { - override def name(): String = "custom_metric" - override def description(): String = "a simple custom metric" - override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { - s"custom_metric: ${taskMetrics.mkString(", ")}" - } -} - -// The followings are for custom metrics of V2 data source. -object CustomMetricReaderFactory extends PartitionReaderFactory { - override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - val RangeInputPartition(start, end) = partition - new PartitionReader[InternalRow] { - private var current = start - 1 - - override def next(): Boolean = { - current += 1 - current < end - } - - override def get(): InternalRow = InternalRow(current, -current) - - override def close(): Unit = {} - - override def currentMetricsValues(): Array[CustomTaskMetric] = { - val metric = new CustomTaskMetric { - override def name(): String = "custom_metric" - override def value(): Long = 12345 - } - Array(metric) - } - } - } -} - -class CustomMetricScanBuilder extends SimpleScanBuilder { - override def planInputPartitions(): Array[InputPartition] = { - Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) - } - - override def supportedCustomMetrics(): Array[CustomMetric] = { - Array(new SimpleCustomMetric) - } - - override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory -} - -class CustomMetricDataSourceV2 extends TestingV2Source { - - override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable { - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - new CustomMetricScanBuilder() - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 4d5e02024316..76c4462e499e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -289,8 +289,7 @@ class SQLAppStatusListener( } val aggregatedMetrics = allMetrics.map { case (id, values) => - val aggMethod = metricAggregationMethods(id) - id -> aggMethod(values, maxMetricsFromAllStages.getOrElse(id, + id -> metricAggregationMethods(id)(values, maxMetricsFromAllStages.getOrElse(id, Array.empty[Long])) }.toMap From 50ed317c2a3fde4bdc1fd74a337d75b2fc85eb1c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 19 Apr 2021 13:48:17 -0700 Subject: [PATCH 13/14] Address comments. --- .../datasources/v2/DataSourceRDD.scala | 3 ++ .../sql/execution/metric/CustomMetrics.scala | 18 ++++--- .../sql/connector/DataSourceV2Suite.scala | 48 ------------------ .../execution/metric/CustomMetricsSuite.scala | 3 +- .../ui/SQLAppStatusListenerSuite.scala | 50 ++++++++++++++++++- 5 files changed, 63 insertions(+), 59 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index f8bf0d7f93d8..7850dfa39d16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -93,6 +93,9 @@ private class PartitionIterator[T]( throw QueryExecutionErrors.endOfStreamError() } reader.currentMetricsValues.foreach { metric => + assert(customMetrics.contains(metric.name()), + s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " + + s"${metric.name()}") customMetrics(metric.name()).set(metric.value()) } valuePrepared = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index 72fcb83061e0..3cb20f87ae63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.metric +import java.text.NumberFormat +import java.util.Locale + import org.apache.spark.sql.connector.CustomMetric object CustomMetrics { @@ -35,12 +38,10 @@ object CustomMetrics { * `CustomMetric` class name. */ def parseV2CustomMetricType(metricType: String): Option[String] = { - val className = metricType.stripPrefix(s"${V2_CUSTOM}_") - - if (className == metricType) { - None + if (metricType.startsWith(s"${V2_CUSTOM}_")) { + Some(metricType.drop(V2_CUSTOM.length + 1)) } else { - Some(className) + None } } } @@ -68,10 +69,11 @@ class CustomAvgMetric extends CustomMetric { override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { val average = if (taskMetrics.isEmpty) { - 0L + 0.0 } else { - taskMetrics.sum / taskMetrics.length + taskMetrics.sum.toDouble / taskMetrics.length } - average.toString + val numberFormat = NumberFormat.getNumberInstance(Locale.US) + numberFormat.format(average) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 077eb694fde9..49a107880055 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -818,51 +818,3 @@ class ReportStatisticsDataSource extends SimpleWritableDataSource { } } } - - -class SimpleCustomMetric extends CustomMetric { - override def name(): String = "custom_metric" - override def description(): String = "a simple custom metric" - override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { - s"custom_metric: ${taskMetrics.mkString(", ")}" - } -} - -// The followings are for custom metrics of V2 data source. -object CustomMetricReaderFactory extends PartitionReaderFactory { - override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - val RangeInputPartition(start, end) = partition - new PartitionReader[InternalRow] { - private var current = start - 1 - - override def next(): Boolean = { - current += 1 - current < end - } - - override def get(): InternalRow = InternalRow(current, -current) - - override def close(): Unit = {} - - override def currentMetricsValues(): Array[CustomTaskMetric] = { - val metric = new CustomTaskMetric { - override def name(): String = "custom_metric" - override def value(): Long = 12345 - } - Array(metric) - } - } - } -} - -class CustomMetricScanBuilder extends SimpleScanBuilder { - override def planInputPartitions(): Array[InputPartition] = { - Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) - } - - override def supportedCustomMetrics(): Array[CustomMetric] = { - Array(new SimpleCustomMetric) - } - - override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala index a0cffef1e498..e2fa03ff23c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala @@ -46,8 +46,7 @@ class CustomMetricsSuite extends SparkFunSuite { val metric = new CustomAvgMetric val metricValues1 = Array(0L, 1L, 5L, 5L, 7L, 10L) - assert(metric.aggregateTaskMetrics(metricValues1) == - (metricValues1.sum / metricValues1.length).toString) + assert(metric.aggregateTaskMetrics(metricValues1) == "4.667") val metricValues2 = Array.empty[Long] assert(metric.aggregateTaskMetrics(metricValues2) == "0") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 04c00dd85c52..a58265124d70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -37,7 +37,8 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.connector.CustomMetricScanBuilder +import org.apache.spark.sql.connector.{CustomMetric, CustomTaskMetric, RangeInputPartition, SimpleScanBuilder} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -924,3 +925,50 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { } } } + +class SimpleCustomMetric extends CustomMetric { + override def name(): String = "custom_metric" + override def description(): String = "a simple custom metric" + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + s"custom_metric: ${taskMetrics.mkString(", ")}" + } +} + +// The followings are for custom metrics of V2 data source. +object CustomMetricReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val RangeInputPartition(start, end) = partition + new PartitionReader[InternalRow] { + private var current = start - 1 + + override def next(): Boolean = { + current += 1 + current < end + } + + override def get(): InternalRow = InternalRow(current, -current) + + override def close(): Unit = {} + + override def currentMetricsValues(): Array[CustomTaskMetric] = { + val metric = new CustomTaskMetric { + override def name(): String = "custom_metric" + override def value(): Long = 12345 + } + Array(metric) + } + } + } +} + +class CustomMetricScanBuilder extends SimpleScanBuilder { + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + override def supportedCustomMetrics(): Array[CustomMetric] = { + Array(new SimpleCustomMetric) + } + + override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory +} From 06eb9c79a3fdd807ec08540deb2234939396325a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 19 Apr 2021 23:45:10 -0700 Subject: [PATCH 14/14] Fix typo. --- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 76c4462e499e..a3238551b2fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -220,7 +220,7 @@ class SQLAppStatusListener( method } catch { case NonFatal(_) => - // Cannot initiaize custom metric object, we might be in history server that does + // Cannot initialize custom metric object, we might be in history server that does // not have the custom metric class. val defaultMethod = (_: Array[Long], _: Array[Long]) => "N/A" metricAggregationMap.put(className, defaultMethod)